在实际生产中,特别是分布式系统中,我们经常遇到这样的场景:一个复杂的任务,近需要从分布式机器中选出一台机器来执行。诸如此类的问题,我们统称为“Master选举”。比如,在分布式系统中很常见的一个问题就是定时任务的执行。如果多台机器同时执行相同的定时任务,业务复杂则可能出现灾难性的后果。本篇博客就以定时任务为例来示例说明Curator的Master选举用法。
原理
利用zookeeper来实现Master选举的基本思路如下:
选择一个根节点(与其他业务隔离),比如/jobMaster,多台机器同时在此节点下面创建一个子节点/jobMaster/lock,zookeeper保证了最终只有一台机器能够创建成功,那么这台机器将成为Master。由它来执行业务操作。
Curator所做的事情就是将上面的思路进行了封装,把原生API的节点创建、事件监听和自动选举进行整合封装,提供了一套简单易用的解决方案。
Maven依赖
此选举功能集成在recipes模块中。
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.12.0</version> </dependency>
解决方案
Curator提供了两种选举方案:Leader Latch和Leader Election。下面分别介绍这两种选举方案。
Leader Latch
随机从候选着中选出一台作为leader,选中之后除非调用close()释放leadship,否则其他的后选择无法成为leader。其中spark使用的就是这种方法。
构造方法
public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath, String id)
示例代码
package com.secbro.learn.curator; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.framework.recipes.leader.LeaderLatchListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.CloseableUtils; import java.util.ArrayList; import java.util.List; /** * Created by zhuzs on 2017/4/17. */ public class LeaderLatchTest { private static final String PATH = "/demo/leader"; public static void main(String[] args) { List<LeaderLatch> latchList = new ArrayList<>(); List<CuratorFramework> clients = new ArrayList<>(); try { for (int i = 0; i < 10; i++) { CuratorFramework client = getClient(); clients.add(client); final LeaderLatch leaderLatch = new LeaderLatch(client, PATH, "client#" + i); leaderLatch.addListener(new LeaderLatchListener() { @Override public void isLeader() { System.out.println(leaderLatch.getId() + ":I am leader. I am doing jobs!"); } @Override public void notLeader() { System.out.println(leaderLatch.getId() + ":I am not leader. I will do nothing!"); } }); latchList.add(leaderLatch); leaderLatch.start(); } Thread.sleep(Integer.MAX_VALUE); } catch (Exception e) { e.printStackTrace(); } finally { for(CuratorFramework client : clients){ CloseableUtils.closeQuietly(client); } for(LeaderLatch leaderLatch : latchList){ CloseableUtils.closeQuietly(leaderLatch); } } } private static CuratorFramework getClient() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") .retryPolicy(retryPolicy) .sessionTimeoutMs(6000) .connectionTimeoutMs(3000) .namespace("demo") .build(); client.start(); return client; } }
打印结果:
client#6:I am leader. I am doing jobs!
重复执行几次会发现是不同的client获得leader。
本示例启动了10个client,程序会随机选中其中一个作为leader。通过注册监听的方式来判断自己是否成为leader。调用close()方法释放当前领导权。
LeaderLatch通过增加了一个ConnectionStateListener监听连接问题。如果出现SUSPENDED或者LOST,leader会报告自己不再是leader(直到重新建立连接,否则不会有leader)。如果LOST的连接被重新建立即RECONNECTED,leaderLatch会删除先前的zNode并重新建立zNode。
Leader Election
通过LeaderSelectorListener可以对领导权进行控制, 在适当的时候释放领导权,这样每个节点都有可能获得领导权。 而LeaderLatch则一直持有leadership, 除非调用close方法,否则它不会释放领导权。
构造方法
public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String leaderPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)
示例代码
package com.secbro.learn.curator; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.framework.recipes.leader.LeaderLatchListener; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.CloseableUtils; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; /** * Created by zhuzs on 2017/4/17. */ public class LeaderSelectorTest { private static final String PATH = "/demo/leader"; public static void main(String[] args) { List<LeaderSelector> selectors = new ArrayList<>(); List<CuratorFramework> clients = new ArrayList<>(); try { for (int i = 0; i < 10; i++) { CuratorFramework client = getClient(); clients.add(client); final String name = "client#" + i; LeaderSelector leaderSelector = new LeaderSelector(client, PATH, new LeaderSelectorListener() { @Override public void takeLeadership(CuratorFramework client) throws Exception { System.out.println(name + ":I am leader."); Thread.sleep(2000); } @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { } }); leaderSelector.autoRequeue(); leaderSelector.start(); selectors.add(leaderSelector); } Thread.sleep(Integer.MAX_VALUE); } catch (Exception e) { e.printStackTrace(); } finally { for(CuratorFramework client : clients){ CloseableUtils.closeQuietly(client); } for(LeaderSelector selector : selectors){ CloseableUtils.closeQuietly(selector); } } } private static CuratorFramework getClient() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") .retryPolicy(retryPolicy) .sessionTimeoutMs(6000) .connectionTimeoutMs(3000) .namespace("demo") .build(); client.start(); return client; } }
执行结果,控制台不停打印:
client#5:I am leader.
client#8:I am leader.
client#3:I am leader.
client#1:I am leader.
client#4:I am leader.
client#0:I am leader.
本示例创建了10个LeaderSelector并对起添加监听,当被选为leader之后,调用takeLeadership方法进行业务逻辑处理,处理完成即释放领导权。其中autoRequeue()方法的调用确保此实例在释放领导权后还可能获得领导权。
LeaderSelectorListener类继承了ConnectionStateListener。一旦LeaderSelector启动,它会向curator客户端添加监听器。 使用LeaderSelector必须时刻注意连接的变化。一旦出现连接问题如SUSPENDED,curator实例必须确保它可能不再是leader,直至它重新收到RECONNECTED。如果LOST出现,curator实例不再是leader并且其takeLeadership()应该直接退出。
推荐的做法是,如果发生SUSPENDED或者LOST连接问题,最好直接抛CancelLeadershipException,此时,leaderSelector实例会尝试中断并且取消正在执行takeLeadership()方法的线程。 建议扩展LeaderSelectorListenerAdapter, LeaderSelectorListenerAdapter中已经提供了推荐的处理方式 。
总结
Curator提供了两种方法来实现Leader选举,根据不同的业务场景可选择不同的方式。总之,Curator帮住开发人员封装了很多繁杂的操作,使得实现一个Leader选举变得轻而易举。
PS
关注本人CSDN博客或博客专栏《Zookeeper从入门到专家》了解更多关于Zookeeper的知识。此专栏持续更新中……