利用Zookeeper实现分布式应用的Leader选举

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: 利用Zookeeper实现分布式应用的Leader选举

概述


在分布式系统中,一个应用通常需要部署多个节点,这些节点之间可能存在这一主多从的运行模式。也就是说从这些节点中需要选择出一个主节点,其他为从节点,主节点可以做些特殊的事情。当主节点宕机后,选择一个从节点成为主节点。为了实现这样的功能,我们可以利用Zookeeper的特性来实现,本文使用Apache Curator框架提供的两种选举策略来实现。


LeaderLatch策略


实现思路


该策略通过Zookeeper的临时有序节点和监听器特性实现,大致实现思路如下:

1671160492831.jpg

  1. 不同节点在选举主题下创建临时有序节点
  2. 排在第一个的节点成为leader
  3. 后续节点设置对前一个节点的监听
  4. 如果Leader节点挂了,后续节点依照顺序成为Leader节点


关键API


  1. org.apache.curator.framework.recipes.leader.LeaderLatch

说明: 用来进行leader选择的关键类


关键方法:

//调用start方法开始抢主
void start()
//调用close方法释放leader权限
void close()
//await方法阻塞线程,尝试获取leader权限,但不一定成功,超时失败
boolean await(long, java.util.concurrent.TimeUnit)
//判断是否拥有leader权限
boolean hasLeadership()


例子演示


public class LeaderLatchTest {
    // zk地址
    private static final String CONNECT_STR = "localhost:2181";
    // leader的path
    private static final String LEADER_PATH = "/app/leader";
    public static void main(String[] args) {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderLatch> leaderLatches = Lists.newArrayList();
        try {
            // 模拟10个节点
            for(int i=0; i<10; i++) {
                // 创建客户端
                CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECT_STR, new ExponentialBackoffRetry(1000, 3));
                clients.add(client);
                // 创建leaderLatch
                LeaderLatch leaderLatch = new LeaderLatch(client, LEADER_PATH, "Client #" + i, LeaderLatch.CloseMode.NOTIFY_LEADER);
                // 设置选主成功后触发的监听器
                leaderLatch.addListener(new LeaderLatchListener() {
                    @Override
                    public void isLeader() {
                        System.out.println("我是leader");
                    }
                    @Override
                    public void notLeader() {
                        System.out.println("我不是leader");
                    }
                });
                leaderLatches.add(leaderLatch);
                // 启动客户端
                client.start();
                // 启动leaderLatch
                leaderLatch.start();
            }
            // 等待一段时间,让选举leader成功
            Thread.sleep(10000);
            // 当前的leader节点
            LeaderLatch currentLeaderLatch = null;
            // 遍历leaderLatch,找出主节点
            for (LeaderLatch leaderLatch : leaderLatches) {
                // 判断当前是不是主节点
                if (leaderLatch.hasLeadership()) {
                    currentLeaderLatch = leaderLatch;
                }
            }
            System.out.println("当前Leader是 " + currentLeaderLatch.getId());
            // 关闭Leader节点
            System.out.println("关闭Leader " + currentLeaderLatch.getId());
            currentLeaderLatch.close();
            // 从列表中移除
            leaderLatches.remove(currentLeaderLatch);
            // 等待一段时间,重新选择Leader
            System.out.println("重新选择Leader中.....");
            Thread.sleep(5000);
            for (LeaderLatch leaderLatch : leaderLatches) {
                // 判断当前是不是主节点
                if (leaderLatch.hasLeadership()) {
                    currentLeaderLatch = leaderLatch;
                }
            }
            System.out.println("重现选出的Leader是 " + currentLeaderLatch.getId());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            for (LeaderLatch leaderLatch : leaderLatches) {
                CloseableUtils.closeQuietly(leaderLatch);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
        }
    }
}

输出结果:

我是leader
当前Leader是 Client #2
关闭Leader Client #2
我不是leader
重新选择Leader中.....
我是leader
重现选出的Leader是 Client #9
我不是leader

正如预期,可以看到在Leader节点close后,会重新选择Leader。


LeaderSelector策略


与LeaderLatch不同, 通过LeaderSelector可以对领导权进行控制, 在适当的时候释放领导权,这样每个节点都有可能获得领导权。 而LeaderLatch一根筋到死, 除非调用close方法,否则它不会释放领导权。


实现思路


利用Curator中InterProcessMutex分布式锁进行抢主,抢到锁的即为Leader, 执行对应的业务逻辑。


关键API


  1. org.apache.curator.framework.recipes.leader.LeaderSelector

说明: 用于选择Leader的核心类

关键方法:

//开始抢leader
void start()
//在抢到leader权限并释放后,自动加入抢主队列,重新抢主
void autoRequeue()
  1. org.apache.curator.framework.recipes.leader.LeaderSelectorListener

说明: LeaderSelectorListener是LeaderSelector客户端节点成为Leader后回调的一个监听器,在takeLeadership()回调方法中编写获得Leader权利后的业务处理逻辑。

关键方法:

//抢主成功后的回调
void takeLeadership()


例子演示


public class LeaderSelectorTest {
    // zk地址
    private static final String CONNECT_STR = "localhost:2181";
    // leader的path
    private static final String LEADER_PATH = "/app/leaderSelector";
    public static void main(String[] args) {
        List<CuratorFramework> clients = Lists.newArrayListWithCapacity(10);
        List<LeaderSelectorClient> leaderSelectorClients = Lists.newArrayListWithCapacity(10);
        try {
            //启动10个zk客户端,每几秒进行一次leader选举
            for (int i = 0; i < 10; i++) {
                CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECT_STR, new ExponentialBackoffRetry(1000, 3));
                client.start();
                clients.add(client);
                LeaderSelectorClient exampleClient = new LeaderSelectorClient(client, LEADER_PATH, "client#" + i);
                leaderSelectorClients.add(exampleClient);
                exampleClient.start();
            }
            //sleep 以观察抢主过程
            Thread.sleep(Integer.MAX_VALUE);
        }catch (Exception e) {
            e.printStackTrace();
        } finally {
            leaderSelectorClients.forEach(leaderSelectorClient -> {
                CloseableUtils.closeQuietly(leaderSelectorClient);
            });
            clients.forEach(client -> {
                CloseableUtils.closeQuietly(client);
            });
        }
    }
    static class LeaderSelectorClient extends LeaderSelectorListenerAdapter implements Closeable {
        private final String name;
        private final LeaderSelector leaderSelector;
        private final AtomicInteger leaderCount = new AtomicInteger(1);
        public LeaderSelectorClient(CuratorFramework client, String path, String name) {
            this.name = name;
            leaderSelector = new LeaderSelector(client, path, this);
            // 该方法能让客户端在释放leader权限后 重新加入leader权限的争夺中
            leaderSelector.autoRequeue();
        }
        public void start() throws IOException {
            leaderSelector.start();
        }
        @Override
        public void close() throws IOException {
            leaderSelector.close();
        }
        @Override
        public void takeLeadership(CuratorFramework client) throws Exception {
            // 抢到leader权限后sleep一段时间,并释放leader权限
            final int waitSeconds = (int) (5 * Math.random()) + 1;
            System.out.println(name + "成为leader.....");
            System.out.println(name + "成为leader的次数是" + leaderCount.getAndIncrement());
            try {
                Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
            } catch (InterruptedException e) {
                System.err.println(name + " was interrupted.");
                Thread.currentThread().interrupt();
            } finally {
                System.out.println(name + "让出leader权限\n");
            }
        }
    }
}

结果:

client#0成为leader.....
client#0成为leader的次数是1
client#0让出leader权限
client#2成为leader.....
client#2成为leader的次数是1
client#2让出leader权限


总结


本篇讲了利用zookeeper实现的两种leader选择的模式,我们可以根据不同的引用场景选择不同的方案。

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
2月前
|
监控 NoSQL Java
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
分布式锁实现原理问题之ZooKeeper的观察器(Watcher)特点问题如何解决
|
24天前
|
存储 运维 应用服务中间件
阿里云分布式存储应用示例
通过阿里云EDAS,您可以轻松部署与管理微服务应用。创建应用时,使用`CreateApplication`接口基于模板生成新应用,并获得包含应用ID在内的成功响应。随后,利用`DeployApplication`接口将应用部署至云端,返回&quot;Success&quot;确认部署成功。当业务调整需下线应用时,调用`ReleaseApplication`接口释放资源。阿里云EDAS简化了应用全生命周期管理,提升了运维效率与可靠性。[相关链接]提供了详细的操作与返回参数说明。
|
26天前
|
机器学习/深度学习 分布式计算 PyTorch
大规模数据集管理:DataLoader在分布式环境中的应用
【8月更文第29天】随着大数据时代的到来,如何高效地处理和利用大规模数据集成为了许多领域面临的关键挑战之一。本文将探讨如何在分布式环境中使用`DataLoader`来优化大规模数据集的管理与加载过程,并通过具体的代码示例展示其实现方法。
31 1
|
1月前
|
运维 安全 Cloud Native
核心系统转型问题之保障云原生分布式转型中的基础设施和应用层面如何解决
核心系统转型问题之保障云原生分布式转型中的基础设施和应用层面如何解决
|
1月前
|
Kubernetes 安全 云计算
分布式应用的终极革命:Distributionless,告别分布式烦恼!
【8月更文挑战第8天】探讨分布式应用的进化形态——Distributionless,一种使开发者聚焦业务逻辑而非系统细节的理念。借助容器化、云计算与自动化工具的进步,分布式应用的开发与管理变得简易。透过示例展现了使用Bazel构建及Kubernetes部署的流程,预示着Distributionless模式下的应用将更加高效、可靠与安全,引领未来分布式应用的发展趋势。
50 7
|
24天前
|
开发者 云计算 数据库
从桌面跃升至云端的华丽转身:深入解析如何运用WinForms与Azure的强大组合,解锁传统应用向现代化分布式系统演变的秘密,实现性能与安全性的双重飞跃——你不可不知的开发新模式
【8月更文挑战第31天】在数字化转型浪潮中,传统桌面应用面临新挑战。本文探讨如何融合Windows Forms(WinForms)与Microsoft Azure,助力应用向云端转型。通过Azure的虚拟机、容器及无服务器计算,可轻松解决性能瓶颈,满足全球用户需求。文中还提供了连接Azure数据库的示例代码,并介绍了集成Azure Storage和Functions的方法。尽管存在安全性、网络延迟及成本等问题,但合理设计架构可有效应对,帮助开发者构建高效可靠的现代应用。
15 0
|
24天前
|
UED 存储 数据管理
深度解析 Uno Platform 离线状态处理技巧:从网络检测到本地存储同步,全方位提升跨平台应用在无网环境下的用户体验与数据管理策略
【8月更文挑战第31天】处理离线状态下的用户体验是现代应用开发的关键。本文通过在线笔记应用案例,介绍如何使用 Uno Platform 优雅地应对离线状态。首先,利用 `NetworkInformation` 类检测网络状态;其次,使用 SQLite 实现离线存储;然后,在网络恢复时同步数据;最后,通过 UI 反馈提升用户体验。
39 0
|
1月前
|
监控 Java API
分布式链路监控系统问题之对Java应用实现字节码增强的方式的问题如何解决
分布式链路监控系统问题之对Java应用实现字节码增强的方式的问题如何解决
|
2月前
|
存储 数据库
zookeeper 集群环境搭建及集群选举及数据同步机制
zookeeper 集群环境搭建及集群选举及数据同步机制
51 2
|
1月前
|
存储 分布式计算 Hadoop
分布式计算框架在大规模数据处理中的应用
【8月更文第18天】随着大数据时代的到来,对海量数据进行有效的存储、处理和分析变得越来越重要。传统的单机系统已经无法满足PB级别数据集的需求。分布式计算框架,如Apache Hadoop和Apache Spark,成为了处理这些大规模数据集的重要工具。
92 0