ES选举:Elasticsearch中Master选举完全解读

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: ES选举:Elasticsearch中Master选举完全解读

引言

Master选举的意义在于集群主节点在遭遇宕机时保障服务的可用性。

理解选举机制及相关算法,有利于了解ES底层的高可用原理,并学习及借鉴其设计思想。

理解Master选举的另一个重要原因是:其为 ES 常见面试题之一!

本文我将结合Elasticsearch源码、文字、绘图的方式剖析Master选举的完整过程。有任何问题,欢迎在社区群与我探讨。


Master选举完整流程:

6e8a18d51c5446e2b9be280005308448.png


1、Master选举中的几个重要角色

  • 主节点(active master):一般指的是集群中活跃的主节点,每个集群中只能有一个。
  • 候选节点(master node):具备master角色的节点默认都有“被选举权”,即是一个候选节点。候选节点可以参与Master选举过程
  • **投票节点(master node):**每个候选节点默认都有投票权,即每个候选节点默认都是一个投票节点,但如果配置了“voting_only ”的候选节点将只有选举权而没有被选举权,即仅投票节点。
  • 专用主节点:即 node.roles: [master],一般指的是只保留master角色的候选节点。
  • 仅投票节点:即 node.roles: [master, voting_only],指仅具备选举权,而被阉割了被选举权的master节点。仅投票节点的意义是在出现平票的时候投出关键票(决胜票)。仅投票节点因为没有被选举权,因此永远不会被选举为active master,即永远不会成为活跃的主节点,因此通常同时配置为数据节点,以提高资源利用率。

ba7f6c576fd34451a6aeb622872c08ce.png


2、选举何时会发生(何时触发选举)

2.1 节点失效检测

在 Elasticsearch 源码的描述文件中有这样一段描述:

There are two fault detection processes running. The first is by the
master, to ping all the other nodes in the cluster and verify that they
are alive. And on the other end, each node pings to master to verify if
its still alive or an election process needs to be initiated


从上述信息得知,在ES中有两个和选举相关的工作进程专门用于检查节点的存活状态,分别为:

  • NodesFaultDetection:即NodesFD,用于定期检查集群中的节点是否存活。
  • MasterFaultDetection:即MasterFD,作用是定期检查Master节点是否存活。


他们分别会通过 ping 的方式定期检测集群中的普通节点和 master 节点是否存活。

bb77c118f2c64fb8a681171a841e1d3a.png


2.2 触发选举的两种情况

  • master-eligible节点数量小于法定票数:当主节点侦测到候选节点数量小于法定票数的时候,会主动放弃主节点身份。
  • 当主节点宕机


3、选主流程

3.1 连接线程实现:innerJoinCluster

org.elasticsearch.discovery.zen.ZenDiscovery

连接线程的主要功能,这个函数保证在失效时加入集群或生成一个新的连接线程.

private void innerJoinCluster() {
    // 定义临时Master节点
        DiscoveryNode masterNode = null;
        final Thread currentThread = Thread.currentThread();
        // 开启选举上下文
        nodeJoinController.startElectionContext();
        // 如果 masterNode 不存在,连接线程控制已启动,并且提供的线程是当前活跃的joinThread
        while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
            // 选出临时Master,因为此时节点状态还没有发布给集群
            masterNode = findMaster();
        }
        // thread不再在currentJoinThread中,停止
        if (!joinThreadControl.joinThreadActive(currentThread)) {
            logger.trace("thread is no longer in currentJoinThread. Stopping.");
            return;
        }
        // 如果当前节点就是 master 节点
        if (transportService.getLocalNode().equals(masterNode)) {
            final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
            logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
            nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
                    new NodeJoinController.ElectionCallback() {
                        @Override
                        public void onElectedAsMaster(ClusterState state) {
                            synchronized (stateMutex) {
                                joinThreadControl.markThreadAsDone(currentThread);
                            }
                        }
                        @Override
                        public void onFailure(Throwable t) {
                            logger.trace("failed while waiting for nodes to join, rejoining", t);
                            synchronized (stateMutex) {
                                joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                            }
                        }
                    }
            );
        } else {
            // 组织任何尝试加入请求,因为当前节点不是master节点
            nodeJoinController.stopElectionContext(masterNode + " elected");
            // 当前节点向Master节点发送加入集群请求
            final boolean success = joinElectedMaster(masterNode);
            synchronized (stateMutex) {
                if (success) {
                    DiscoveryNode currentMasterNode = this.clusterState().getNodes().getMasterNode();
                    if (currentMasterNode == null) {
                        // Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
                        // a valid master.
                        logger.debug("no master node is set, despite of join request completing. retrying pings.");
                        joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                    } else if (currentMasterNode.equals(masterNode) == false) {
                        // 更新集群状态信息
                        joinThreadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join");
                    }
                    joinThreadControl.markThreadAsDone(currentThread);
                } else {
                    // 如果加入失败,再次尝试加入
                    joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                }
            }
        }
    }


3.2 发现节点:DiscoveryNode

DiscoveryNode为集群发现节点信息,为所有 Discover 模块下节点的基类。包含节点的名称、节点Id、不同集群状态下的版本号、节点角色等信息。

db754ca8139346edabf5418e23bbd6ae.png


3.3 选举临时Master节点:findMaster()

选主的入口方法为:findMaster() 方法。其作用为选出临时Master节点。完整的过程代码如下:

private DiscoveryNode findMaster() {
        // 获取当前集群活动节点列表,不包含本节点,pingTimeout:默认3秒
        List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();
        if (fullPingResponses == null) {
            logger.trace("No full ping responses");
            return null;
        }
        if (logger.isTraceEnabled()) {
            StringBuilder sb = new StringBuilder();
            if (fullPingResponses.size() == 0) {
                sb.append(" {none}");
            } else {
                for (ZenPing.PingResponse pingResponse : fullPingResponses) {
                    sb.append("\n\t--> ").append(pingResponse);
                }
            }
            logger.trace("full ping responses:{}", sb);
        }
        // 获取当前节点
        final DiscoveryNode localNode = transportService.getLocalNode();
        // 本地节点在不在活动节点列表中
        assert fullPingResponses.stream().map(ZenPing.PingResponse::node)
            .filter(n -> n.equals(localNode)).findAny().isPresent() == false;
        // 添加本地节点
        fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterState()));
        // 过滤无效选票,即不具备候选节点资格的节点的投票
        final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);
        // 定义activeMasters,存儲当前活跃的activeMaster列表,如果此列表为空,代表集祥中没有主节点,此时从masterCandidates(候选节点列表)中选Master
        List<DiscoveryNode> activeMasters = new ArrayList<>();
        // 我们不能在pingMasters列表中包合本地节点,否则我们可能会在没有来自ZenDiscover#inner#Joincluster()
        // 中其他节点的任何检查/验证的情况下选择自己
        // 首先過历所有节点,将每个节点所认为的当前Master节点加入activeMaster列表中(不包含本节点),正常情况下应为一个
        for (ZenPing.PingResponse pingResponse : pingResponses) {
            // We can't include the local node in pingMasters list, otherwise we may up electing ourselves without
            // any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
            // pingResponse.master()为当前节点所认为的主节点,即潘丹当前节点所认为的主节点存在,并且不是当前节点本身
            if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {
                activeMasters.add(pingResponse.master());
            }
        }
        // nodes discovered during pinging
        // ping期间发现的节点,去掉不具备master-eligible资格的节点(候选节点列表)
        List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();
        for (ZenPing.PingResponse pingResponse : pingResponses) {
            // 如果响应当前发出ping请求的节点是一个候选节点
            if (pingResponse.node().isMasterNode()) {
                masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
            }
        }
        // activeMaster不存在
        if (activeMasters.isEmpty()) {
            // 判断候选节点的数量是否满足 candidates.size() >= minimumMasterNodes:候选节点数 大于 法定票数
            if (electMaster.hasEnoughCandidates(masterCandidates)) {
                // 选出 获胜者 选举过程
                final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
                logger.trace("candidate {} won election", winner);
                return winner.getNode();
            } else {
                // if we don't have enough master nodes, we bail, because there are not enough master to elect from
                // 票数不足 达不到法定最小票数
                logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again",
                            masterCandidates, electMaster.minimumMasterNodes());
                return null;
            }
        } else {
            // 如果存在 active master,判断是否当前节点
            assert !activeMasters.contains(localNode) :
                "local node should never be elected as master when other nodes indicate an active master";
            // lets tie break between discovered nodes
            return electMaster.tieBreakActiveMasters(activeMasters);
        }
    }

当主节点不存在的时候,首先判断候选节点的数量是否满足:候选节点数 > 法定票数,即:candidates.size() >= minimumMasterNodes


只有满足条件时,才会发起选举,判断是否满足法定票数的逻辑如下所示:

public boolean hasEnoughCandidates(Collection<MasterCandidate> candidates) {
        if (candidates.isEmpty()) {
            return false;
        }
        if (minimumMasterNodes < 1) {
            return true;
        }
        assert candidates.stream().map(MasterCandidate::getNode).collect(Collectors.toSet()).size() == candidates.size() :
            "duplicates ahead: " + candidates;
        return candidates.size() >= minimumMasterNodes;
    }


选主实现:electMaster

实现:org.elasticsearch.discovery.zen.ElectMasterService#electMaster

public MasterCandidate electMaster(Collection<MasterCandidate> candidates) {
        assert hasEnoughCandidates(candidates);
        List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates);
        sortedCandidates.sort(MasterCandidate::compare);
        return sortedCandidates.get(0);
    }


调用 ElectMasterService.electMaster

选举的是排序后的第一个MasterCandidate(即master-eligible node)。

先按照集群状态排序,最新的状态排在前面,再按照节点id排序,id小的排在前面

集群状态版本越新就排在越前面,当clusterStateVersion越大,优先级越高。这是为了保证新Master拥有最新的clusterState(即集群的meta),避免已经commit的meta变更丢失。

因为Master当选后,就会以这个版本的clusterState为基础进行更新。

// 集群状态版本越新就排在越前面,当clusterStateVersion越大,优先级越高。
        // 这是为了保证新Master拥有最新的clusterState(即集群的meta),避免已经commit的meta变更丢失。
        // 因为Master当选后,就会以这个版本的clusterState为基础进行更新。
        // (一个例外是集群全部重启,所有节点都没有meta,需要先选出一个master,然后master再通过持久化的数据进行meta恢复,再进行meta同步)。
        public static int compare(MasterCandidate c1, MasterCandidate c2) {
            // we explicitly swap c1 and c2 here. the code expects "better" is lower in a sorted
            // list, so if c2 has a higher cluster state version, it needs to come first.
            int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);
            // 版本相同的时候,按照节点的Id比较(Id为节点第一次启动时随机生成)
            // 当clusterStateVersion相同时,节点的Id越小,优先级越高。即总是倾向于选择Id小的Node,
            // 这个Id是节点第一次启动时生成的一个随机字符串。之所以这么设计,是为了让选举结果尽可能稳定,不要出现都想当master而选不出来的情况。
            if (ret == 0) {
                ret = compareNodes(c1.getNode(), c2.getNode());
            }
            return ret;
        }


此时为临时activeMaster吗,因为此时节点状态还没有发布到集群状态信息里。

未完待续…


声明:

本文为博主原创,文中图片均为自己绘制,保留有Processon原图,任何雷同图片均出自于此,本文无任何搬运,只是在当前平台发布时间较晚。任何质疑,本人都可提供最早发布的证据!


感谢各位博友的支持与厚爱!

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
6月前
|
存储 人工智能 自然语言处理
Elasticsearch Relevance Engine---为AI变革提供高级搜索能力[ES向量搜索、常用配置参数、聚合功能等详解]
Elasticsearch Relevance Engine---为AI变革提供高级搜索能力[ES向量搜索、常用配置参数、聚合功能等详解]
Elasticsearch Relevance Engine---为AI变革提供高级搜索能力[ES向量搜索、常用配置参数、聚合功能等详解]
|
4天前
Elasticsearch【问题记录 02】【不能以root运行es + max virtual memory areas vm.max_map_count [65530] is too low处理】
【4月更文挑战第12天】Elasticsearch【问题记录 02】【不能以root运行es + max virtual memory areas vm.max_map_count [65530] is too low处理】
16 3
|
6月前
|
安全 Java Linux
ElasticSearch第四讲:ES详解:ElasticSearch和Kibana安装
ElasticSearch第四讲:ES详解:ElasticSearch和Kibana安装
203 0
|
2月前
|
数据安全/隐私保护
spring-boot-starter-data-elasticsearch es带x-pack后台配置
spring-boot-starter-data-elasticsearch es带x-pack后台配置
24 0
|
3月前
|
索引
ES(elasticsearch)删除指定索引
ES(elasticsearch)删除指定索引
168 0
|
4月前
|
自然语言处理 Java 关系型数据库
Elasticsearch【环境搭建 01】elasticsearch-6.4.3 单机版不能以root用户运行es 及 max_map_count 问题解决(含 安装包+分词插件 云盘资源)
Elasticsearch【环境搭建 01】elasticsearch-6.4.3 单机版不能以root用户运行es 及 max_map_count 问题解决(含 安装包+分词插件 云盘资源)
31 0
|
4月前
|
Java 关系型数据库 MySQL
springboot集成spring-data-elasticsearch 完成对es的操作
springboot集成spring-data-elasticsearch 完成对es的操作
129 0
|
4月前
|
JavaScript Java 开发工具
ElasticSearch实战 之 es的安装和使用
ElasticSearch实战 之 es的安装和使用
140 0
|
6月前
|
数据安全/隐私保护 Docker 容器
使用docker安装elastic search[ES]和kibana
使用docker安装elastic search[ES]和kibana
131 4
|
6月前
|
Linux 异构计算 索引
释放搜索潜力:基于ES(ElasticSearch)打造高效的语义搜索系统,让信息尽在掌握
释放搜索潜力:基于ES(ElasticSearch)打造高效的语义搜索系统,让信息尽在掌握

热门文章

最新文章