ElasticJob分布式调度服务器包含两个角色分布为主服务器、从服务器。这里的主从服务器并不是传统意义上的主备。
从执行调度任务这一视角来看ElasticJob主从服务器的地位是相同的,都是任务调度执行服务器(彼此之间共同组成一个集群平等的执行分配给各个数据执行调度任务),主从服务器共同构成任务调度的分片节点。
ElasticJob的主服务器的职责是根据当前存活的任务调度服务器生成分片信息,然后各自拉取属于该分片的任务数据并处理之。
为了避免分片信息的不一致ElasticJob必须从所有的调度服务器中选择一台为主服务器,由该台服务器统一计算分片信息,其他服务根据该分片信息各个抽取相应的数据处理,ElasticJob其核心思想是数据的分片。
ElasticJob选主实现由LeaderService实现,从源码分析ElasticJob启动流程(基于Spring)可知,在Job调度服务器的启动流程中会调用ListenerManager#start-AllListeners方法启动ElectionListenerM-anager(主节点选举监听管理器),然后调用LeaderService.electLeader方法执行选主过程。
选主实现LeaderService
- String jobName:任务名称。
- ServiceService serverService:作业服务器服务服务API。
- JobNodeStorage jobNodeStorage:job节点存储实现类,操作ZK api。
接下来分析一下LeaderService#electLeader源码
1/** 2 * 选举主节点. 3 */ 4public void electLeader() { 5 log.debug("Elect a new leader now."); 6 jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback()); 7 log.debug("Leader election completed."); 8}
选主使用的分布式锁节点目录为{name-space}/{jobname}/leader/election/latc--h,LeaderService$LeaderElectionExecutionCallback为成为主节点后的回调处理逻辑。
接下来重点分析jobNodeStorage#executeInLeader
1 /** 2 * 在主节点执行操作. 3 * 4 * @param latchNode 分布式锁使用的作业节点名称 5 * @param callback 执行操作的回调 6 */ 7 public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) { 8 try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) { 9 latch.start(); // @1 10 latch.await(); // @2 11 callback.execute(); //@3 12 //CHECKSTYLE:OFF 13 } catch (final Exception ex) { 14 //CHECKSTYLE:ON 15 handleException(ex); 16 } 17 } }
技巧:ElasticJob选主基于ZK来实现,选用cautor开源框架提供的org.apac-he.curator.framework.recipes.leader.LeaderLatch来实现。
LeaderLatch需要传入两个参数:
- CuratorFramework client:curator框架客户端。
- latchPath:锁节点路径,elasticJob的latchPath为:{namespace}/{Job
name}/leader/election/latch。
代码@1、@2:启动LeaderLatch,其主要实现原理是去锁路径下创建一个ZK临时排序节点,如果创建的节点序号最小,表示获取锁,await方法将返回,否则在前一个节点上监听其删除事件,并同步阻塞。成功获得分布式锁后将执行callback回调方法。
1#LeaderService$LeaderElectionExecutionCallback 2@RequiredArgsConstructor 3public class LeaderElectionExecutionCallback implements LeaderExecutionCallback { 4 @Override 5 public void execute() { 6 if (!hasLeader()) { 7 jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId()); 8 } 9 } 10}
成功获取选主的分布式锁后,如果{na-mespace}/{jobname}/leader/election/in-stance节点不存在则创建临时节点,节点存储的内容为IP地址@-@进程ID,即该节点的意义代表了主服务器的信息。
选主流程如图所示:
上面完成一次选主过程,如果主服务器宕机怎么办?从节点如何接管主服务器的角色呢?
- String jobName:job名称。
- LeaderNode leaderNode:主节点信息,封装主节点在zk中存储节点信息。
- ServerNode serverNode:服务器节点信息。
- LeaderService leaderService:选主服务实现类。
- ServerService serverService:作业服务器服务类。
LeaderNode、ServerNode代表存储在zk服务器上的路径,LeaderNode的类图如图所示:
其中JobNamePath定义了每一个Job在zk服务器的存储组织目录,根据器代码显示,例如数据同步项目(MyProject)下定义了两个定时任务(SyncUserJob、S-yncRoleJob)。
注册中心命名空间取名为项目名:My-Project,在zk的节点存储节点类似如下目录结构,节点存放内容在具体用到时再分析。
ElectionListenerManager#start
1ElectionListenerManager#start 2public void start() { 3 addDataListener(new LeaderElectionJobListener()); 4 addDataListener(new LeaderAbdicationJobListener()); 5}
选主管理器在启动时会添加两个事件监听器。
- LeaderElectionJobListener
主要是当主节点宕机后触发重新选主监听器。 - LeaderAbdicationJobListener
主节点退位监听器。当通过配置方式在线设置主节点状态为disabled时需要删除主节点信息从而再次激活选主事件。
在讲解上述两个事件监听器之前先看一下ElasticJob是如何增加事件监听器的。
1#JobNodeStorage#addDataListener 2/** 3 * 注册数据监听器. 4 * 5 * @param listener 数据监听器 6 */ 7public void addDataListener(final TreeCacheListener listener) { 8 TreeCache cache = (TreeCache) regCenter.getRawCache("/" + jobName); 9 cache.getListenable().addListener(listener); 10}
首先获取TreeCache对象,并通过该对象添加Listener。根据节点路径创建Tre-eCache的方法如下:
1#ZookeeperRegistryCenter#addCacheData 2public void addCacheData(final String cachePath) { 3 TreeCache cache = new TreeCache(client, cachePath); 4 try { 5 cache.start(); 6 //CHECKSTYLE:OFF 7 } catch (final Exception ex) { 8 //CHECKSTYLE:ON 9 RegExceptionHandler.handleException(ex); 10 } 11 caches.put(cachePath + "/", cache); 12}
上述主要是展示如何使用apache cau-tor(ZK客户端)创建TreeCache。
LeaderElectionJobListener
选主事件监听器,监听节点主节点Lea-derNode.INSTANCE,对应路径为{na-mespace}/{jobname}/leader/election/in-stance。
如果由于主节点宕机,将和ZK的连接断开,由于LeaderNode.INSTANCE为临时节点,故此时ZK会将该节点删除,并且从服务器在选主阶段会监听该节点的删除事件,故从服务器能监听到该事件并触发LeaderElectionJobListener的事件方法被调用。
1LeaderElectionJobListener#dataChanged 2protected void dataChanged(final String path, final Type eventType, final String data) { 3 if (!JobRegistry.getInstance().isShutdown(jobName) && (isActiveElection(path, data) || isPassiveElection(path, eventType))) { 4 leaderService.electLeader(); 5 } 6 }
如果该job未停止,并且当前服务器不是主节点或LeaderNode.INST-ANCE节点被删除时,触发一次选主。
isActiveElection方法详解:
1LeaderElectionJobListener #isActiveElection 2private boolean isActiveElection(final String path, final String data) { 3 return !leaderService.hasLeader() && isLocalServerEnabled(path, data); 4}
如果当前节点不是主节点,并且当前服务器运行正常,运行正常的依据是存在 {namespace}/{jobname}/servers/server-ip,并且节点内容不为DISABLED。
isPassiveElection方法详解:
1LeaderElectionJobListener#isPassiveElection 2private boolean isPassiveElection(final String path, final Type eventType) { 3 return isLeaderCrashed(path, eventType) && serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp()); 4}
如果当前事件节点为LeaderNode.INSTANCE并且事件类型为删除,并且该job的当前对应的实例({namespace}/{jobname}/instances/ip)存在并且状态不为DISABLED。
LeaderAbdicationJobListener
1class LeaderAbdicationJobListener extends AbstractJobListener { 2 @Override 3 protected void dataChanged(final String path, final Type eventType, final String data) { 4 if (leaderService.isLeader() && isLocalServerDisabled(path, data)) { 5 leaderService.removeLeader(); 6 } 7 } 8 private boolean isLocalServerDisabled(final String path, final String data) { 9 return serverNode.isLocalServerPath(path) && ServerStatus.DISABLED.name().equals(data); 10 } 11 }
如果通过后台管理配置主节点的状态为disable,此时应该将主节点信息移---除(删除LeaderNode.INSTANCE)节点再次触发选主。
本文详细了结束了Elasticjob的选主实现:
1、通过使用分布式锁的概念,第一个获取锁的节点将成为主节点,其做法就是创建LeaderNode.INSTANCE节点并记录节点的信息(ip,进程ID),从节点则监听该目录。
2、主服务器宕机后,临时节点Leader-Node.INSTANCE被删除,由于从节点监听了该节点的删除事件,故会再次触发选主。
3、如果主服务器被设置为disabled,则移除LeaderNode.INSTANCE,再次触发选主。