ES选举:Elasticsearch中Master选举完全解读

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: ES选举:Elasticsearch中Master选举完全解读

引言

Master选举的意义在于集群主节点在遭遇宕机时保障服务的可用性。

理解选举机制及相关算法,有利于了解ES底层的高可用原理,并学习及借鉴其设计思想。

理解Master选举的另一个重要原因是:其为 ES 常见面试题之一!

本文我将结合Elasticsearch源码、文字、绘图的方式剖析Master选举的完整过程。有任何问题,欢迎在社区群与我探讨。


Master选举完整流程:

6e8a18d51c5446e2b9be280005308448.png


1、Master选举中的几个重要角色

  • 主节点(active master):一般指的是集群中活跃的主节点,每个集群中只能有一个。
  • 候选节点(master node):具备master角色的节点默认都有“被选举权”,即是一个候选节点。候选节点可以参与Master选举过程
  • **投票节点(master node):**每个候选节点默认都有投票权,即每个候选节点默认都是一个投票节点,但如果配置了“voting_only ”的候选节点将只有选举权而没有被选举权,即仅投票节点。
  • 专用主节点:即 node.roles: [master],一般指的是只保留master角色的候选节点。
  • 仅投票节点:即 node.roles: [master, voting_only],指仅具备选举权,而被阉割了被选举权的master节点。仅投票节点的意义是在出现平票的时候投出关键票(决胜票)。仅投票节点因为没有被选举权,因此永远不会被选举为active master,即永远不会成为活跃的主节点,因此通常同时配置为数据节点,以提高资源利用率。

ba7f6c576fd34451a6aeb622872c08ce.png


2、选举何时会发生(何时触发选举)

2.1 节点失效检测

在 Elasticsearch 源码的描述文件中有这样一段描述:

There are two fault detection processes running. The first is by the
master, to ping all the other nodes in the cluster and verify that they
are alive. And on the other end, each node pings to master to verify if
its still alive or an election process needs to be initiated


从上述信息得知,在ES中有两个和选举相关的工作进程专门用于检查节点的存活状态,分别为:

  • NodesFaultDetection:即NodesFD,用于定期检查集群中的节点是否存活。
  • MasterFaultDetection:即MasterFD,作用是定期检查Master节点是否存活。


他们分别会通过 ping 的方式定期检测集群中的普通节点和 master 节点是否存活。

bb77c118f2c64fb8a681171a841e1d3a.png


2.2 触发选举的两种情况

  • master-eligible节点数量小于法定票数:当主节点侦测到候选节点数量小于法定票数的时候,会主动放弃主节点身份。
  • 当主节点宕机


3、选主流程

3.1 连接线程实现:innerJoinCluster

org.elasticsearch.discovery.zen.ZenDiscovery

连接线程的主要功能,这个函数保证在失效时加入集群或生成一个新的连接线程.

private void innerJoinCluster() {
    // 定义临时Master节点
        DiscoveryNode masterNode = null;
        final Thread currentThread = Thread.currentThread();
        // 开启选举上下文
        nodeJoinController.startElectionContext();
        // 如果 masterNode 不存在,连接线程控制已启动,并且提供的线程是当前活跃的joinThread
        while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
            // 选出临时Master,因为此时节点状态还没有发布给集群
            masterNode = findMaster();
        }
        // thread不再在currentJoinThread中,停止
        if (!joinThreadControl.joinThreadActive(currentThread)) {
            logger.trace("thread is no longer in currentJoinThread. Stopping.");
            return;
        }
        // 如果当前节点就是 master 节点
        if (transportService.getLocalNode().equals(masterNode)) {
            final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
            logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
            nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
                    new NodeJoinController.ElectionCallback() {
                        @Override
                        public void onElectedAsMaster(ClusterState state) {
                            synchronized (stateMutex) {
                                joinThreadControl.markThreadAsDone(currentThread);
                            }
                        }
                        @Override
                        public void onFailure(Throwable t) {
                            logger.trace("failed while waiting for nodes to join, rejoining", t);
                            synchronized (stateMutex) {
                                joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                            }
                        }
                    }
            );
        } else {
            // 组织任何尝试加入请求,因为当前节点不是master节点
            nodeJoinController.stopElectionContext(masterNode + " elected");
            // 当前节点向Master节点发送加入集群请求
            final boolean success = joinElectedMaster(masterNode);
            synchronized (stateMutex) {
                if (success) {
                    DiscoveryNode currentMasterNode = this.clusterState().getNodes().getMasterNode();
                    if (currentMasterNode == null) {
                        // Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
                        // a valid master.
                        logger.debug("no master node is set, despite of join request completing. retrying pings.");
                        joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                    } else if (currentMasterNode.equals(masterNode) == false) {
                        // 更新集群状态信息
                        joinThreadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join");
                    }
                    joinThreadControl.markThreadAsDone(currentThread);
                } else {
                    // 如果加入失败,再次尝试加入
                    joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                }
            }
        }
    }


3.2 发现节点:DiscoveryNode

DiscoveryNode为集群发现节点信息,为所有 Discover 模块下节点的基类。包含节点的名称、节点Id、不同集群状态下的版本号、节点角色等信息。

db754ca8139346edabf5418e23bbd6ae.png


3.3 选举临时Master节点:findMaster()

选主的入口方法为:findMaster() 方法。其作用为选出临时Master节点。完整的过程代码如下:

private DiscoveryNode findMaster() {
        // 获取当前集群活动节点列表,不包含本节点,pingTimeout:默认3秒
        List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();
        if (fullPingResponses == null) {
            logger.trace("No full ping responses");
            return null;
        }
        if (logger.isTraceEnabled()) {
            StringBuilder sb = new StringBuilder();
            if (fullPingResponses.size() == 0) {
                sb.append(" {none}");
            } else {
                for (ZenPing.PingResponse pingResponse : fullPingResponses) {
                    sb.append("\n\t--> ").append(pingResponse);
                }
            }
            logger.trace("full ping responses:{}", sb);
        }
        // 获取当前节点
        final DiscoveryNode localNode = transportService.getLocalNode();
        // 本地节点在不在活动节点列表中
        assert fullPingResponses.stream().map(ZenPing.PingResponse::node)
            .filter(n -> n.equals(localNode)).findAny().isPresent() == false;
        // 添加本地节点
        fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterState()));
        // 过滤无效选票,即不具备候选节点资格的节点的投票
        final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);
        // 定义activeMasters,存儲当前活跃的activeMaster列表,如果此列表为空,代表集祥中没有主节点,此时从masterCandidates(候选节点列表)中选Master
        List<DiscoveryNode> activeMasters = new ArrayList<>();
        // 我们不能在pingMasters列表中包合本地节点,否则我们可能会在没有来自ZenDiscover#inner#Joincluster()
        // 中其他节点的任何检查/验证的情况下选择自己
        // 首先過历所有节点,将每个节点所认为的当前Master节点加入activeMaster列表中(不包含本节点),正常情况下应为一个
        for (ZenPing.PingResponse pingResponse : pingResponses) {
            // We can't include the local node in pingMasters list, otherwise we may up electing ourselves without
            // any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
            // pingResponse.master()为当前节点所认为的主节点,即潘丹当前节点所认为的主节点存在,并且不是当前节点本身
            if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {
                activeMasters.add(pingResponse.master());
            }
        }
        // nodes discovered during pinging
        // ping期间发现的节点,去掉不具备master-eligible资格的节点(候选节点列表)
        List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();
        for (ZenPing.PingResponse pingResponse : pingResponses) {
            // 如果响应当前发出ping请求的节点是一个候选节点
            if (pingResponse.node().isMasterNode()) {
                masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
            }
        }
        // activeMaster不存在
        if (activeMasters.isEmpty()) {
            // 判断候选节点的数量是否满足 candidates.size() >= minimumMasterNodes:候选节点数 大于 法定票数
            if (electMaster.hasEnoughCandidates(masterCandidates)) {
                // 选出 获胜者 选举过程
                final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
                logger.trace("candidate {} won election", winner);
                return winner.getNode();
            } else {
                // if we don't have enough master nodes, we bail, because there are not enough master to elect from
                // 票数不足 达不到法定最小票数
                logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again",
                            masterCandidates, electMaster.minimumMasterNodes());
                return null;
            }
        } else {
            // 如果存在 active master,判断是否当前节点
            assert !activeMasters.contains(localNode) :
                "local node should never be elected as master when other nodes indicate an active master";
            // lets tie break between discovered nodes
            return electMaster.tieBreakActiveMasters(activeMasters);
        }
    }

当主节点不存在的时候,首先判断候选节点的数量是否满足:候选节点数 > 法定票数,即:candidates.size() >= minimumMasterNodes


只有满足条件时,才会发起选举,判断是否满足法定票数的逻辑如下所示:

public boolean hasEnoughCandidates(Collection<MasterCandidate> candidates) {
        if (candidates.isEmpty()) {
            return false;
        }
        if (minimumMasterNodes < 1) {
            return true;
        }
        assert candidates.stream().map(MasterCandidate::getNode).collect(Collectors.toSet()).size() == candidates.size() :
            "duplicates ahead: " + candidates;
        return candidates.size() >= minimumMasterNodes;
    }


选主实现:electMaster

实现:org.elasticsearch.discovery.zen.ElectMasterService#electMaster

public MasterCandidate electMaster(Collection<MasterCandidate> candidates) {
        assert hasEnoughCandidates(candidates);
        List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates);
        sortedCandidates.sort(MasterCandidate::compare);
        return sortedCandidates.get(0);
    }


调用 ElectMasterService.electMaster

选举的是排序后的第一个MasterCandidate(即master-eligible node)。

先按照集群状态排序,最新的状态排在前面,再按照节点id排序,id小的排在前面

集群状态版本越新就排在越前面,当clusterStateVersion越大,优先级越高。这是为了保证新Master拥有最新的clusterState(即集群的meta),避免已经commit的meta变更丢失。

因为Master当选后,就会以这个版本的clusterState为基础进行更新。

// 集群状态版本越新就排在越前面,当clusterStateVersion越大,优先级越高。
        // 这是为了保证新Master拥有最新的clusterState(即集群的meta),避免已经commit的meta变更丢失。
        // 因为Master当选后,就会以这个版本的clusterState为基础进行更新。
        // (一个例外是集群全部重启,所有节点都没有meta,需要先选出一个master,然后master再通过持久化的数据进行meta恢复,再进行meta同步)。
        public static int compare(MasterCandidate c1, MasterCandidate c2) {
            // we explicitly swap c1 and c2 here. the code expects "better" is lower in a sorted
            // list, so if c2 has a higher cluster state version, it needs to come first.
            int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);
            // 版本相同的时候,按照节点的Id比较(Id为节点第一次启动时随机生成)
            // 当clusterStateVersion相同时,节点的Id越小,优先级越高。即总是倾向于选择Id小的Node,
            // 这个Id是节点第一次启动时生成的一个随机字符串。之所以这么设计,是为了让选举结果尽可能稳定,不要出现都想当master而选不出来的情况。
            if (ret == 0) {
                ret = compareNodes(c1.getNode(), c2.getNode());
            }
            return ret;
        }


此时为临时activeMaster吗,因为此时节点状态还没有发布到集群状态信息里。

未完待续…


声明:

本文为博主原创,文中图片均为自己绘制,保留有Processon原图,任何雷同图片均出自于此,本文无任何搬运,只是在当前平台发布时间较晚。任何质疑,本人都可提供最早发布的证据!


感谢各位博友的支持与厚爱!

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
2月前
|
数据可视化 Java Windows
Elasticsearch入门-环境安装ES和Kibana以及ES-Head可视化插件和浏览器插件es-client
本文介绍了如何在Windows环境下安装Elasticsearch(ES)、Elasticsearch Head可视化插件和Kibana,以及如何配置ES的跨域问题,确保Kibana能够连接到ES集群,并提供了安装过程中可能遇到的问题及其解决方案。
Elasticsearch入门-环境安装ES和Kibana以及ES-Head可视化插件和浏览器插件es-client
|
4月前
|
存储 自然语言处理 算法
面试题ES问题之Solr和Elasticsearch功能实现如何解决
面试题ES问题之Solr和Elasticsearch功能实现如何解决
52 2
|
20天前
|
存储 JSON Java
elasticsearch学习一:了解 ES,版本之间的对应。安装elasticsearch,kibana,head插件、elasticsearch-ik分词器。
这篇文章是关于Elasticsearch的学习指南,包括了解Elasticsearch、版本对应、安装运行Elasticsearch和Kibana、安装head插件和elasticsearch-ik分词器的步骤。
79 0
elasticsearch学习一:了解 ES,版本之间的对应。安装elasticsearch,kibana,head插件、elasticsearch-ik分词器。
|
28天前
|
自然语言处理 搜索推荐 Java
SpringBoot 搜索引擎 海量数据 Elasticsearch-7 es上手指南 毫秒级查询 包括 版本选型、操作内容、结果截图(一)
SpringBoot 搜索引擎 海量数据 Elasticsearch-7 es上手指南 毫秒级查询 包括 版本选型、操作内容、结果截图
42 0
|
28天前
|
存储 自然语言处理 搜索推荐
SpringBoot 搜索引擎 海量数据 Elasticsearch-7 es上手指南 毫秒级查询 包括 版本选型、操作内容、结果截图(二)
SpringBoot 搜索引擎 海量数据 Elasticsearch-7 es上手指南 毫秒级查询 包括 版本选型、操作内容、结果截图(二)
27 0
|
2月前
|
JSON 自然语言处理 数据库
ElasticSearch基础1——索引和文档。Kibana,RestClient操作索引和文档+黑马旅游ES库导入
概念、ik分词器、倒排索引、索引和文档的增删改查、RestClient对索引和文档的增删改查
ElasticSearch基础1——索引和文档。Kibana,RestClient操作索引和文档+黑马旅游ES库导入
|
3月前
|
存储 负载均衡 算法
|
3月前
|
自然语言处理 Java 索引
ElasticSearch 实现分词全文检索 - Java SpringBoot ES 文档操作
ElasticSearch 实现分词全文检索 - Java SpringBoot ES 文档操作
39 0
|
3月前
|
自然语言处理 Java 索引
ElasticSearch 实现分词全文检索 - Java SpringBoot ES 索引操作
ElasticSearch 实现分词全文检索 - Java SpringBoot ES 索引操作
40 0
|
3月前
|
自然语言处理 Docker 容器
ElasticSearch 实现分词全文检索 - ES、Kibana、IK分词器安装
ElasticSearch 实现分词全文检索 - ES、Kibana、IK分词器安装
45 0