概述
在分布式系统中,一个应用通常需要部署多个节点,这些节点之间可能存在这一主多从的运行模式。也就是说从这些节点中需要选择出一个主节点,其他为从节点,主节点可以做些特殊的事情。当主节点宕机后,选择一个从节点成为主节点。为了实现这样的功能,我们可以利用Zookeeper的特性来实现,本文使用Apache Curator框架提供的两种选举策略来实现。
LeaderLatch策略
实现思路
该策略通过Zookeeper的临时有序节点和监听器特性实现,大致实现思路如下:
- 不同节点在选举主题下创建临时有序节点
- 排在第一个的节点成为leader
- 后续节点设置对前一个节点的监听
- 如果Leader节点挂了,后续节点依照顺序成为Leader节点
关键API
- org.apache.curator.framework.recipes.leader.LeaderLatch
说明: 用来进行leader选择的关键类
关键方法:
//调用start方法开始抢主 void start() //调用close方法释放leader权限 void close() //await方法阻塞线程,尝试获取leader权限,但不一定成功,超时失败 boolean await(long, java.util.concurrent.TimeUnit) //判断是否拥有leader权限 boolean hasLeadership()
例子演示
public class LeaderLatchTest { // zk地址 private static final String CONNECT_STR = "localhost:2181"; // leader的path private static final String LEADER_PATH = "/app/leader"; public static void main(String[] args) { List<CuratorFramework> clients = Lists.newArrayList(); List<LeaderLatch> leaderLatches = Lists.newArrayList(); try { // 模拟10个节点 for(int i=0; i<10; i++) { // 创建客户端 CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECT_STR, new ExponentialBackoffRetry(1000, 3)); clients.add(client); // 创建leaderLatch LeaderLatch leaderLatch = new LeaderLatch(client, LEADER_PATH, "Client #" + i, LeaderLatch.CloseMode.NOTIFY_LEADER); // 设置选主成功后触发的监听器 leaderLatch.addListener(new LeaderLatchListener() { @Override public void isLeader() { System.out.println("我是leader"); } @Override public void notLeader() { System.out.println("我不是leader"); } }); leaderLatches.add(leaderLatch); // 启动客户端 client.start(); // 启动leaderLatch leaderLatch.start(); } // 等待一段时间,让选举leader成功 Thread.sleep(10000); // 当前的leader节点 LeaderLatch currentLeaderLatch = null; // 遍历leaderLatch,找出主节点 for (LeaderLatch leaderLatch : leaderLatches) { // 判断当前是不是主节点 if (leaderLatch.hasLeadership()) { currentLeaderLatch = leaderLatch; } } System.out.println("当前Leader是 " + currentLeaderLatch.getId()); // 关闭Leader节点 System.out.println("关闭Leader " + currentLeaderLatch.getId()); currentLeaderLatch.close(); // 从列表中移除 leaderLatches.remove(currentLeaderLatch); // 等待一段时间,重新选择Leader System.out.println("重新选择Leader中....."); Thread.sleep(5000); for (LeaderLatch leaderLatch : leaderLatches) { // 判断当前是不是主节点 if (leaderLatch.hasLeadership()) { currentLeaderLatch = leaderLatch; } } System.out.println("重现选出的Leader是 " + currentLeaderLatch.getId()); } catch (Exception e) { e.printStackTrace(); } finally { for (LeaderLatch leaderLatch : leaderLatches) { CloseableUtils.closeQuietly(leaderLatch); } for (CuratorFramework client : clients) { CloseableUtils.closeQuietly(client); } } } }
输出结果:
我是leader 当前Leader是 Client #2 关闭Leader Client #2 我不是leader 重新选择Leader中..... 我是leader 重现选出的Leader是 Client #9 我不是leader
正如预期,可以看到在Leader节点close后,会重新选择Leader。
LeaderSelector策略
与LeaderLatch不同, 通过LeaderSelector可以对领导权进行控制, 在适当的时候释放领导权,这样每个节点都有可能获得领导权。 而LeaderLatch一根筋到死, 除非调用close方法,否则它不会释放领导权。
实现思路
利用Curator中InterProcessMutex分布式锁进行抢主,抢到锁的即为Leader, 执行对应的业务逻辑。
关键API
- org.apache.curator.framework.recipes.leader.LeaderSelector
说明: 用于选择Leader的核心类
关键方法:
//开始抢leader void start() //在抢到leader权限并释放后,自动加入抢主队列,重新抢主 void autoRequeue()
- org.apache.curator.framework.recipes.leader.LeaderSelectorListener
说明: LeaderSelectorListener是LeaderSelector客户端节点成为Leader后回调的一个监听器,在takeLeadership()回调方法中编写获得Leader权利后的业务处理逻辑。
关键方法:
//抢主成功后的回调 void takeLeadership()
例子演示
public class LeaderSelectorTest { // zk地址 private static final String CONNECT_STR = "localhost:2181"; // leader的path private static final String LEADER_PATH = "/app/leaderSelector"; public static void main(String[] args) { List<CuratorFramework> clients = Lists.newArrayListWithCapacity(10); List<LeaderSelectorClient> leaderSelectorClients = Lists.newArrayListWithCapacity(10); try { //启动10个zk客户端,每几秒进行一次leader选举 for (int i = 0; i < 10; i++) { CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECT_STR, new ExponentialBackoffRetry(1000, 3)); client.start(); clients.add(client); LeaderSelectorClient exampleClient = new LeaderSelectorClient(client, LEADER_PATH, "client#" + i); leaderSelectorClients.add(exampleClient); exampleClient.start(); } //sleep 以观察抢主过程 Thread.sleep(Integer.MAX_VALUE); }catch (Exception e) { e.printStackTrace(); } finally { leaderSelectorClients.forEach(leaderSelectorClient -> { CloseableUtils.closeQuietly(leaderSelectorClient); }); clients.forEach(client -> { CloseableUtils.closeQuietly(client); }); } } static class LeaderSelectorClient extends LeaderSelectorListenerAdapter implements Closeable { private final String name; private final LeaderSelector leaderSelector; private final AtomicInteger leaderCount = new AtomicInteger(1); public LeaderSelectorClient(CuratorFramework client, String path, String name) { this.name = name; leaderSelector = new LeaderSelector(client, path, this); // 该方法能让客户端在释放leader权限后 重新加入leader权限的争夺中 leaderSelector.autoRequeue(); } public void start() throws IOException { leaderSelector.start(); } @Override public void close() throws IOException { leaderSelector.close(); } @Override public void takeLeadership(CuratorFramework client) throws Exception { // 抢到leader权限后sleep一段时间,并释放leader权限 final int waitSeconds = (int) (5 * Math.random()) + 1; System.out.println(name + "成为leader....."); System.out.println(name + "成为leader的次数是" + leaderCount.getAndIncrement()); try { Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds)); } catch (InterruptedException e) { System.err.println(name + " was interrupted."); Thread.currentThread().interrupt(); } finally { System.out.println(name + "让出leader权限\n"); } } } }
结果:
client#0成为leader..... client#0成为leader的次数是1 client#0让出leader权限 client#2成为leader..... client#2成为leader的次数是1 client#2让出leader权限
总结
本篇讲了利用zookeeper实现的两种leader选择的模式,我们可以根据不同的引用场景选择不同的方案。