对于一个分布式系统来说,为了保证数据的一致性,通常会选择一个主节点执行数据的写入,然后从节点同步主节点数据,elastic-job同样是一个道理。
首先看一下主节点的选举代码
public void registerStartUpInfo(final boolean enabled) {
listenerManager.startAllListeners();
// leader选举
leaderService.electLeader();
serverService.persistOnline(enabled);
instanceService.persistOnline();
shardingService.setReshardingFlag();
monitorService.listen();
if (!reconcileService.isRunning()) {
reconcileService.startAsync();
}
}
也是在启动之前,注册作业启动信息的时候进行的leader选举。具体看下是如何操作的
public void electLeader() {
log.debug("Elect a new leader now.");
jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());
log.debug("Leader election completed.");
}
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
latch.start();
latch.await();
callback.execute();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
handleException(ex);
}
}
到第二块代码才真正的开始了leader的选举。这里用了curator框架提供的LeaderLatch进行选举,而LeaderLatch的方式则是一种抢占的方式来决定选主。各个节点通过在zookeeper上指定目录下建立临时顺序节点,然后对列表进行排序,排在第一个就是leader。上述代码中调用了start方法开始选主,然后调用await阻塞等待,一旦选举成功后会立即返回,但是先不从start看起,先从await看起,看一下要满足什么条件,然后再看选主过程。
public void await() throws InterruptedException, EOFException
{
synchronized(this)
{
while ( (state.get() == State.STARTED) && !hasLeadership.get() )
{
wait();
}
}
if ( state.get() != State.STARTED )
{
throw new EOFException();
}
}
方法要想返回必须满足两个条件,一是state状态已经开启,另一个则是是否存在leader,这个变量默认值是false,也就是在某个地方会将其更改为true。带着这个疑问现在可以看start方法了:
public void start() throws Exception
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
{
@Override
public void run()
{
try
{
internalStart();
}
finally
{
startTask.set(null);
}
}
}));
}
代码首先更改了state的值更改为started,这里就是刚才await条件之一,更改完毕之后就代表现在正式开始选择leader了。紧接着设置了一个异步任务,等待client连接建立之后执行。
private synchronized void internalStart()
{
if ( state.get() == State.STARTED )
{
client.getConnectionStateListenable().addListener(listener);
try
{
reset();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
log.error("An error occurred checking resetting leadership.", e);
}
}
}
这列主要是建立连接的监听器,方便client对server连接变化时做一些操作。其次调用了reset方法,
void reset() throws Exception
{
setLeadership(false);
setNode(null);
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
if ( debugResetWaitLatch != null )
{
debugResetWaitLatch.await();
debugResetWaitLatch = null;
}
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
setNode(event.getName());
if ( state.get() == State.CLOSED )
{
setNode(null);
}
else
{
getChildren();
}
}
else
{
log.error("getChildren() failed. rc = " + event.getResultCode());
}
}
};
client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
}
直接初始化选主状态为false,然后将上次设置得路径设置为空,然后生成了一个匿名内部类,实现了里面的回调函数用于节点建立之后进行回调,最后是在latch下建立临时顺序节点。节点的名称有一点特别首先看下图:
为什么是这种格式呢?看下面的一行代码
public String forPath(final String givenPath, byte[] data) throws Exception
{
if ( compress )
{
data = client.getCompressionProvider().compress(givenPath, data);
}
// 构建临时顺序节点
final String adjustedPath = adjustPath(client.fixForNamespace(givenPath, createMode.isSequential()));
List<ACL> aclList = acling.getAclList(adjustedPath);
client.getSchemaSet().getSchema(givenPath).validateCreate(createMode, givenPath, data, aclList);
String returnPath = null;
if ( backgrounding.inBackground() )
{
pathInBackground(adjustedPath, data, givenPath);
}
else
{
String path = protectedPathInForeground(adjustedPath, data, aclList);
returnPath = client.unfixForNamespace(path);
}
return returnPath;
}
代码格式在adjustPath方法里面进行了调整,
String adjustPath(String path) throws Exception
{
// 前面设置节点的时候设置了withProtection
if ( protectedMode.doProtected() )
{
ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
// 这里会一次加上 _c_ 、uuid、-、最后时latch-
String name = getProtectedPrefix(protectedMode.protectedId()) + pathAndNode.getNode();
path = ZKPaths.makePath(pathAndNode.getPath(), name);
}
return path;
}
在getProtectedPrefix时会依次加上_c_、uuid、-、latch-、最后那串有序数字则是zookeeper对于在latch下面建立临时顺序节点的时候临时自动编号节点。接着上面建节点说当节点建立成功的时候会回调函数,函数中调用了一个getChildren方法,接着往下走
private void getChildren() throws Exception
{
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
checkLeadership(event.getChildren());
}
}
};
client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));
}
跟之前的代码差不多,一个一名内部类实现了回调函数,下面使用client获取了latchPath下面的子节点,这个地方很关键,之前说过各个client会在指定目录下创建临时顺序节点,然后会对节点进行排序,排序的规则是取节点最后十位zk自动编号的数字。
private void checkLeadership(List<String> children) throws Exception
{
if ( debugCheckLeaderShipLatch != null )
{
debugCheckLeaderShipLatch.await();
}
// 获取本节点的全路径临时顺序节点
// /schedulerJobTest/leader/election/latch/_c_72d42538-e234-46af-bc63-d22167559ac0-latch-0000000031
final String localOurPath = ourPath.get();
// 子节点排序后的集合,主要是通过sorter的fixForSorting截取后十位例如0000000031进行排序比较
List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
// 本节点不存在时重新启动设置
if ( ourIndex < 0 )
{
log.error("Can't find our node. Resetting. Index: " + ourIndex);
reset();
}
// 等于0说明自己处在第一位,是leader,设置为true的代码中会唤醒wait等待的线程。
else if ( ourIndex == 0 )
{
setLeadership(true);
}
// 自己不是leader,但是要监听上一位当上一位节点被删除时重新进行选主
else
{
String watchPath = sortedChildren.get(ourIndex - 1);
Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) )
{
try
{
getChildren();
}
catch ( Exception ex )
{
ThreadUtils.checkInterrupted(ex);
log.error("An error occurred checking the leadership.", ex);
}
}
}
};
// 回调接口,当client getData操作执行后回调,调用reset重新进行启动过程
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
// 如果不存在该节点调用reset重新启动
if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
{
// previous node is gone - reset
reset();
}
}
};
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
}
}
上述代码是选举leader的关键代码,梳理一下步骤:
- 获取本client在latchPath下建立节点的全路径。
- 对latchPath下的节点进行排序。
- 根据排序后的结果查找本client的临时顺序节点的序号,看是否是leader(排在第一位),如果是leader设置hasLeadership(前面代码提到过,这个条件为true结束await)
- 如果不是leader,在获取前一个节点数据时设置监听节点的删除动作,并且在获取数据结束后回调判断节点是否存在,如果不存在则reset重新启动。
好了到这里LeaderLatch的选举原理差不多就到这里了。接下来继续看这块代码:
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
// 开始leader选举
latch.start();
// 节点选举成功后会立即返回
latch.await();
// 被选举为leader之后调用回调函数,设置leader信息
callback.execute();
} catch (Exception ex) {
handleException(ex);
log.error("选举leader发生异常");
}
}
成为leader之后会立即从await中返回,然后调用回调函数,在函数中会先检查是否存在leader节点,没有的话就将本机的设置为leader信息。
class LeaderElectionExecutionCallback implements LeaderExecutionCallback {
@Override
public void execute() {
// 如果还没有设置leader信息则设置,如果有leader信息说明leader成功了
if (!hasLeader()) {
jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
}
}
}
这里是在启动的时候进行leader选举。除此之外当节点数据发生变化时,具体数据监听器在ElectionListenerManager类:一是不存在leader并且有server节点状态更改为enable,二是leader节点被移除,并且当前节点正在运行中,处于可用状态。符合这两种状况的任何一种都会重新进行leader选举。
所以leader选举大概三个地方,一是启动的时候,二三就是上面的两种情况了。