Zookeeper开源客户端Curator之Master/Leader选举

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Zookeeper开源客户端Curator之Master/Leader选举

在实际生产中,特别是分布式系统中,我们经常遇到这样的场景:一个复杂的任务,近需要从分布式机器中选出一台机器来执行。诸如此类的问题,我们统称为“Master选举”。比如,在分布式系统中很常见的一个问题就是定时任务的执行。如果多台机器同时执行相同的定时任务,业务复杂则可能出现灾难性的后果。本篇博客就以定时任务为例来示例说明Curator的Master选举用法。


原理

利用zookeeper来实现Master选举的基本思路如下:

选择一个根节点(与其他业务隔离),比如/jobMaster,多台机器同时在此节点下面创建一个子节点/jobMaster/lock,zookeeper保证了最终只有一台机器能够创建成功,那么这台机器将成为Master。由它来执行业务操作。


Curator所做的事情就是将上面的思路进行了封装,把原生API的节点创建、事件监听和自动选举进行整合封装,提供了一套简单易用的解决方案。


Maven依赖

此选举功能集成在recipes模块中。


<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.12.0</version>
</dependency>

解决方案

Curator提供了两种选举方案:Leader Latch和Leader Election。下面分别介绍这两种选举方案。


Leader Latch

随机从候选着中选出一台作为leader,选中之后除非调用close()释放leadship,否则其他的后选择无法成为leader。其中spark使用的就是这种方法。


构造方法

public LeaderLatch(CuratorFramework client, String latchPath)


public LeaderLatch(CuratorFramework client, String latchPath, String id)


示例代码

package com.secbro.learn.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import java.util.ArrayList;
import java.util.List;
/**
 * Created by zhuzs on 2017/4/17.
 */
public class LeaderLatchTest {
    private static final String PATH = "/demo/leader";
    public static void main(String[] args) {
        List<LeaderLatch> latchList = new ArrayList<>();
        List<CuratorFramework> clients = new ArrayList<>();
        try {
            for (int i = 0; i < 10; i++) {
                CuratorFramework client = getClient();
                clients.add(client);
                final LeaderLatch leaderLatch = new LeaderLatch(client, PATH, "client#" + i);
                leaderLatch.addListener(new LeaderLatchListener() {
                    @Override
                    public void isLeader() {
                        System.out.println(leaderLatch.getId() +  ":I am leader. I am doing jobs!");
                    }
                    @Override
                    public void notLeader() {
                        System.out.println(leaderLatch.getId() +  ":I am not leader. I will do nothing!");
                    }
                });
                latchList.add(leaderLatch);
                leaderLatch.start();
            }
            Thread.sleep(Integer.MAX_VALUE);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            for(CuratorFramework client : clients){
                CloseableUtils.closeQuietly(client);
            }
            for(LeaderLatch leaderLatch : latchList){
                CloseableUtils.closeQuietly(leaderLatch);
            }
        }
    }
    private static CuratorFramework getClient() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("127.0.0.1:2181")
                .retryPolicy(retryPolicy)
                .sessionTimeoutMs(6000)
                .connectionTimeoutMs(3000)
                .namespace("demo")
                .build();
        client.start();
        return client;
    }
}

打印结果:


client#6:I am leader. I am doing jobs!


重复执行几次会发现是不同的client获得leader。


本示例启动了10个client,程序会随机选中其中一个作为leader。通过注册监听的方式来判断自己是否成为leader。调用close()方法释放当前领导权。


LeaderLatch通过增加了一个ConnectionStateListener监听连接问题。如果出现SUSPENDED或者LOST,leader会报告自己不再是leader(直到重新建立连接,否则不会有leader)。如果LOST的连接被重新建立即RECONNECTED,leaderLatch会删除先前的zNode并重新建立zNode。


Leader Election

通过LeaderSelectorListener可以对领导权进行控制, 在适当的时候释放领导权,这样每个节点都有可能获得领导权。 而LeaderLatch则一直持有leadership, 除非调用close方法,否则它不会释放领导权。


构造方法

public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener)


public LeaderSelector(CuratorFramework client, String leaderPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)


示例代码

package com.secbro.learn.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
/**
 * Created by zhuzs on 2017/4/17.
 */
public class LeaderSelectorTest {
    private static final String PATH = "/demo/leader";
    public static void main(String[] args) {
        List<LeaderSelector> selectors = new ArrayList<>();
        List<CuratorFramework> clients = new ArrayList<>();
        try {
            for (int i = 0; i < 10; i++) {
                CuratorFramework client = getClient();
                clients.add(client);
                final String name = "client#" + i;
                LeaderSelector leaderSelector = new LeaderSelector(client, PATH, new LeaderSelectorListener() {
                    @Override
                    public void takeLeadership(CuratorFramework client) throws Exception {
                        System.out.println(name + ":I am leader.");
                        Thread.sleep(2000);
                    }
                    @Override
                    public void stateChanged(CuratorFramework client, ConnectionState newState) {
                    }
                });
                leaderSelector.autoRequeue();
                leaderSelector.start();
                selectors.add(leaderSelector);
            }
            Thread.sleep(Integer.MAX_VALUE);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            for(CuratorFramework client : clients){
                CloseableUtils.closeQuietly(client);
            }
            for(LeaderSelector selector : selectors){
                CloseableUtils.closeQuietly(selector);
            }
        }
    }
    private static CuratorFramework getClient() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("127.0.0.1:2181")
                .retryPolicy(retryPolicy)
                .sessionTimeoutMs(6000)
                .connectionTimeoutMs(3000)
                .namespace("demo")
                .build();
        client.start();
        return client;
    }
}

执行结果,控制台不停打印:


client#5:I am leader.

client#8:I am leader.

client#3:I am leader.

client#1:I am leader.

client#4:I am leader.

client#0:I am leader.


本示例创建了10个LeaderSelector并对起添加监听,当被选为leader之后,调用takeLeadership方法进行业务逻辑处理,处理完成即释放领导权。其中autoRequeue()方法的调用确保此实例在释放领导权后还可能获得领导权。


LeaderSelectorListener类继承了ConnectionStateListener。一旦LeaderSelector启动,它会向curator客户端添加监听器。 使用LeaderSelector必须时刻注意连接的变化。一旦出现连接问题如SUSPENDED,curator实例必须确保它可能不再是leader,直至它重新收到RECONNECTED。如果LOST出现,curator实例不再是leader并且其takeLeadership()应该直接退出。


推荐的做法是,如果发生SUSPENDED或者LOST连接问题,最好直接抛CancelLeadershipException,此时,leaderSelector实例会尝试中断并且取消正在执行takeLeadership()方法的线程。 建议扩展LeaderSelectorListenerAdapter, LeaderSelectorListenerAdapter中已经提供了推荐的处理方式 。


总结

Curator提供了两种方法来实现Leader选举,根据不同的业务场景可选择不同的方式。总之,Curator帮住开发人员封装了很多繁杂的操作,使得实现一个Leader选举变得轻而易举。


PS

关注本人CSDN博客或博客专栏《Zookeeper从入门到专家》了解更多关于Zookeeper的知识。此专栏持续更新中……


相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
4月前
|
存储 API Apache
【zookeeper 第三篇章】客户端 API
本文介绍了Apache ZooKeeper客户端的一些常用命令及其用法。首先,`create`命令用于创建不同类型的节点并为其赋值,如持久化节点、有序节点及临时节点等。通过示例展示了如何创建这些节点,并演示了创建过程中的输出结果。其次,`ls`命令用于列出指定路径下的所有子节点。接着,`set`命令用于更新节点中的数据,可以指定版本号实现乐观锁机制。
36 0
|
2月前
|
分布式计算 负载均衡 算法
Hadoop-31 ZooKeeper 内部原理 简述Leader选举 ZAB协议 一致性
Hadoop-31 ZooKeeper 内部原理 简述Leader选举 ZAB协议 一致性
31 1
|
2月前
|
分布式计算 Java Hadoop
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
67 1
|
2月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
50 1
|
2月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
51 0
|
3月前
|
存储 负载均衡 算法
分布式-Zookeeper-Master选举
分布式-Zookeeper-Master选举
|
3月前
|
负载均衡 API 数据安全/隐私保护
Zookeeper的客户端-原生的API
Zookeeper的客户端-原生的API
|
3月前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
3月前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
3月前
|
存储 负载均衡 Dubbo
分布式-Zookeeper(一)
分布式-Zookeeper(一)