上篇文章,我给你介绍了 Raft 协议的基本流程,以及哨兵实例工作的基本过程。哨兵是通过 serverCron 函数的周期性执行,进而在 serverCron 中调用 sentinelTimer 函数,实现周期性处理哨兵相关的时间事件。而 sentinelTimer 函数处理的时间事件,就包括了对哨兵监听的每个主节点,它会通过调用 sentinelHandleRedisInstance 函数,来检查主节点的在线状态,并在主节点客观下线时进行故障切换。
另外,我还带你了解了 sentinelHandleRedisInstance 函数执行过程的前三步操作,分别是重连断连的实例、周期性给实例发送检测命令,检测实例是否主观下线,这也分别对应了 sentinelReconnectInstance、sentinelSendPeriodicCommands 和 sentinelCheckSubjectivelyDown 这三个函数,你可以再回顾下。
那么,今天这篇文章,我接着来给你介绍 sentinelHandleRedisInstance 函数执行过程中的剩余操作,分别是检测主节点是否客观下线、判断是否需要执行故障切换,以及需要故障切换时的哨兵 Leader 选举的具体过程。
学完这节课的内容,你就可以对哨兵工作的过程有个全面了解了。并且,你可以掌握如何在代码层面实现 Raft 协议来完成 Leader 选举。这样,当你日后在分布式系统中实现分布式共识时,这部分内容就能帮助指导你的代码设计与实现了。
接下来,我们先来看下主节点的客观下线判断。
主节点客观下线判断
现在我们知道,哨兵在 sentinelHandleRedisInstance 函数中会调用 sentinelCheckObjectivelyDown 函数(在 sentinel.c 文件中),来检测主节点是否客观下线。
而 sentinelCheckObjectivelyDown 函数在执行时,除了会检查当前哨兵对主节点主观下线的判断结果,还需要结合监听相同主节点的其他哨兵,对主节点主观下线的判断结果。它把这些判断结果综合起来,才能做出主节点客观下线的最终判断。
从代码实现层面来看,在哨兵用来记录主节点信息的 sentinelRedisInstance 结构体中,本身已经用哈希表保存了监听同一主节点的其他哨兵实例,如下所示:
typedef struct sentinelRedisInstance { … dict *sentinels; … }
这样一来,sentinelCheckObjectivelyDown 函数通过遍历主节点记录的 sentinels 哈希表,就可以获取其他哨兵实例对同一主节点主观下线的判断结果。这也是因为,sentinels 哈希表中保存的哨兵实例,它们同样使用了 sentinelRedisInstance 这个结构体,而这个结构体的成员变量 flags,会记录哨兵对主节点主观下线的判断结果。
具体来说,sentinelCheckObjectivelyDown 函数会使用 quorum 变量,来记录判断主节点为主观下线的哨兵数量。如果当前哨兵已经判断主节点为主观下线,那么它会先把 quorum 值置为 1。然后,它会依次判断其他哨兵的 flags 变量,检查是否设置了 SRI_MASTER_DOWN 的标记。如果设置了,它就会把 quorum 值加 1。
当遍历完 sentinels 哈希表后,sentinelCheckObjectivelyDown 函数会判断 quorum 值是否大于等于预设定的 quorum 阈值,这个阈值保存在了主节点的数据结构中,也就是 master->quorum,而这个阈值是在 sentinel.conf 配置文件中设置的。
如果实际的 quorum 值大于等于预设的 quorum 阈值,sentinelCheckObjectivelyDown 函数就判断主节点为客观下线,并设置变量 odown 为 1,而这个变量就是用来表示当前哨兵对主节点客观下线的判断结果的。
这部分的判断逻辑如下代码所示,你可以看下:
/* Is this instance down according to the configured quorum? * * Note that ODOWN is a weak quorum, it only means that enough Sentinels * reported in a given time range that the instance was not reachable. * However messages can be delayed so there are no strong guarantees about * N instances agreeing at the same time about the down state. */ // 此实例是否根据配置的quorum而关闭? // 请注意,ODOWN 是一个弱quorum,它仅表示在给定时间范围内报告了足够多的 Sentinel 报告该实例无法访问。 // 但是消息可能会延迟,因此无法保证 N 个实例同时同意关闭状态 void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) { dictIterator *di; dictEntry *de; unsigned int quorum = 0, odown = 0; // 当前主节点已经被当前哨兵判断为主观下线 if (master->flags & SRI_S_DOWN) { /* Is down for enough sentinels? */ // 是否有足够多的哨兵 // 当前哨兵将quorum值置为1 quorum = 1; /* the current sentinel. */ /* Count all the other sentinels. */ di = dictGetIterator(master->sentinels); // 遍历监听同一主节点的其他哨兵 while((de = dictNext(di)) != NULL) { sentinelRedisInstance *ri = dictGetVal(de); if (ri->flags & SRI_MASTER_DOWN) quorum++; } dictReleaseIterator(di); // 如果quorum值大于预设的quorum阈值,那么设置odown为1。 if (quorum >= master->quorum) odown = 1; } /* Set the flag accordingly to the outcome. */ // 根据结果设置flag if (odown) { // 如果没有设置SRI_O_DOWN标记 if ((master->flags & SRI_O_DOWN) == 0) { // 发送+odown事件消息 sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d", quorum, master->quorum); // 在主节点的flags中记录SRI_O_DOWN标记 master->flags |= SRI_O_DOWN; // 记录判断客观下线的时间 master->o_down_since_time = mstime(); } } else { if (master->flags & SRI_O_DOWN) { sentinelEvent(LL_WARNING,"-odown",master,"%@"); master->flags &= ~SRI_O_DOWN; } } }
另外,这里我也画了一张图,展示了该判断逻辑,你可以再来回顾下。
也就是说,sentinelCheckObjectivelyDown 函数是通过遍历监听同一主节点的其他哨兵的 flags 变量,来判断主节点是否客观下线的。
不过,你看完刚才的代码可能会有一个疑问,在上节课学习的 sentinelCheckSubjectivelyDown 函数中,如果哨兵判断主节点为主观下线,是会在主节点的 flags 变量中设置 SRI_S_DOWN 标记,如下所示:
//哨兵已判断主节点为主观下线 … //对应主节点的sentinelRedisInstance结构中flags没有记录主观下线 if ((ri->flags & SRI_S_DOWN) == 0) { … ri->flags |= SRI_S_DOWN; //在主节点的flags中记录主观下线的标记, }
但是,sentinelCheckObjectivelyDown 函数,是检查监听同一主节点的其他哨兵 flags 变量中的 SRI_MASTER_DOWN 标记,那么其他哨兵的 SRI_MASTER_DOWN 标记是如何设置的呢?
这就和 sentinelAskMasterStateToOtherSentinels 函数(在 sentinel.c 文件中)有关系了,下面,我们来具体了解下这个函数。
sentinelAskMasterStateToOtherSentinels 函数
sentinelAskMasterStateToOtherSentinels 函数的主要目的,是向监听同一主节点的其他哨兵发送 is-master-down-by-addr 命令,进而询问其他哨兵对主节点的状态判断。
它会调用 redisAsyncCommand 函数(在async.c文件中),依次向其他哨兵发送 sentinel is-master-down-by-addr 命令,同时,它设置了收到该命令返回结果的处理函数为 sentinelReceiveIsMasterDownReply(在 sentinel.c 文件中),如下所示:
/* If we think the master is down, we start sending * SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels * in order to get the replies that allow to reach the quorum * needed to mark the master in ODOWN state and trigger a failover. */ // 如果我们认为 master 宕机了,我们开始向其他 sentinel 发送 SENTINEL IS-MASTER-DOWN-BY-ADDR 请求 // 以获取允许达到将 master 标记为 ODOWN 状态并触发故障转移所需的约定人数的回复 #define SENTINEL_ASK_FORCED (1<<0) void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) { dictIterator *di; dictEntry *de; di = dictGetIterator(master->sentinels); // 遍历监听同一主节点的其他哨兵 while((de = dictNext(di)) != NULL) { // 得到一个哨兵实例 sentinelRedisInstance *ri = dictGetVal(de); mstime_t elapsed = mstime() - ri->last_master_down_reply_time; char port[32]; int retval; /* If the master state from other sentinel is too old, we clear it. */ if (elapsed > SENTINEL_ASK_PERIOD*5) { ri->flags &= ~SRI_MASTER_DOWN; sdsfree(ri->leader); ri->leader = NULL; } /* Only ask if master is down to other sentinels if: * * 1) We believe it is down, or there is a failover in progress. * 2) Sentinel is connected. * 3) We did not receive the info within SENTINEL_ASK_PERIOD ms. */ // 仅在以下情况下询问 master 是否已关闭到其他哨兵: // 1)我们认为它已关闭,或者正在进行故障转移。 // 2) Sentinel 已连接。 // 3) 我们没有在 SENTINEL_ASK_PERIOD (1000) 毫秒内收到信息。 if ((master->flags & SRI_S_DOWN) == 0) continue; if (ri->link->disconnected) continue; if (!(flags & SENTINEL_ASK_FORCED) && mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD) continue; /* Ask */ ll2string(port,sizeof(port),master->addr->port); // 发送sentinel is-master-down-by-addr命令 retval = redisAsyncCommand(ri->link->cc, sentinelReceiveIsMasterDownReply, ri, "%s is-master-down-by-addr %s %s %llu %s", sentinelInstanceMapCommand(ri,"SENTINEL"), announceSentinelAddr(master->addr), port, sentinel.current_epoch, (master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ? sentinel.myid : "*"); if (retval == C_OK) ri->link->pending_commands++; } dictReleaseIterator(di); }
另外从代码中,我们可以看到,sentinel is-master-down-by-addr 命令中还包括主节点 IP、主节点端口号、当前纪元(sentinel.current_epoch)和实例 ID。下面展示的就是这个命令的格式:
sentinel is-master-down-by-addr 主节点IP 主节点端口 当前epoch 实例ID
这其中,哨兵会根据当前主节点所处的状态来设置实例 ID。如果主节点已经要开始进行故障切换了,那么,实例 ID 会被设置为当前哨兵自身的 ID,否则就会被设置为 * 号
这里你需要注意的是,主节点的数据结构是使用了 master->failover_state 来记录故障切换的状态,其初始值为 SENTINEL_FAILOVER_STATE_NONE(对应的数值为 0),当主节点开始故障切换时,这个状态值就会大于 SENTINEL_FAILOVER_STATE_NONE 了。
好了,在了解了 sentinelAskMasterStateToOtherSentinels 函数的基本执行过程之后,我们还需要知道:sentinelAskMasterStateToOtherSentinels 函数向其他哨兵发出了 sentinel is-master-down-by-addr 命令后,其他哨兵是如何处理的呢?
sentinel is-master-down-by-addr 命令的处理
其实,哨兵对于 sentinel 开头的命令,都是在 sentinelCommand 函数(在 sentinel.c 文件)中进行处理的。sentinelCommand 函数会根据 sentinel 命令后面跟的不同子命令,来执行不同的分支,而 is-master-down-by-addr 就是一条子命令。
在 is-master-down-by-addr 子命令对应的代码分支中,sentinelCommand 函数会根据命令中的主节点 IP 和端口号,来获取主节点对应的 sentinelRedisInstance 结构体。
紧接着,它会判断主节点的 flags 变量中是否有 SRI_S_DOWN 和 SRI_MASTER 标记,也就是说,sentinelCommand 函数会检查当前节点是否的确是主节点,以及哨兵是否已经将该节点标记为主观下线了。如果条件符合,那么它会设置 isdown 变量为 1,而这个变量表示的就是哨兵对主节点主观下线的判断结果。
然后,sentinelCommand 函数会把当前哨兵对主节点主观下线的判断结果,返回给发送 sentinel 命令的哨兵。它返回的结果主要包含三部分内容,分别是当前哨兵对主节点主观下线的判断结果、哨兵 Leader 的 ID,以及哨兵 Leader 所属的纪元。
sentinelCommand 函数,对 sentinel 命令处理的基本过程如下所示:
void sentinelCommand(client *c) { … // is-master-down-by-addr子命令对应的分支 else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) { … //当前哨兵判断主节点为主观下线 if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) && (ri->flags & SRI_MASTER)) isdown = 1; … addReplyMultiBulkLen(c,3); //哨兵返回的sentinel命令处理结果中包含三部分内容 addReply(c, isdown ? shared.cone : shared.czero); //如果哨兵判断主节点为主观下线,第一部分为1,否则为0 addReplyBulkCString(c, leader ? leader : "*"); //第二部分是Leader ID或者是* addReplyLongLong(c, (long long)leader_epoch); //第三部分是Leader的纪元 …}
你也可以参考下图:
好了,到这里你就已经知道,哨兵会通过 sentinelAskMasterStateToOtherSentinels 函数,向监听同一节点的其他哨兵发送 sentinel is-master-down-by-addr 命令,来获取其他哨兵对主节点主观下线的判断结果。而其他哨兵是使用 sentinelCommand 函数,来处理 sentinel is-master-down-by-addr 命令,并在命令处理的返回结果中,包含自己对主节点主观下线的判断结果。
不过从刚才的代码中,你也可以看到,在其他哨兵返回的 sentinel 命令处理结果中,会包含哨兵 Leader 的信息。其实,这是因为 sentinelAskMasterStateToOtherSentinels 函数发送的 sentinel is-master-down-by-addr 命令本身,也可以用来触发哨兵 Leader 选举。这个我稍后会给你介绍。
那么,我们再回到前面讲主节点客观下线判断时提出的问题,sentinelCheckObjectivelyDown 函数要检查监听同一主节点的其他哨兵 flags 变量中的 SRI_MASTER_DOWN 标记,但是,其他哨兵的 SRI_MASTER_DOWN 标记是如何设置的呢?
这实际上是和哨兵在 sentinelAskMasterStateToOtherSentinels 函数中,向其他哨兵发送 sentinel is-master-down-by-addr 命令时,设置的命令结果处理函数 sentinelReceiveIsMasterDownReply 有关。
sentinelReceiveIsMasterDownReply 函数
在 sentinelReceiveIsMasterDownReply 函数中,它会判断其他哨兵返回的回复结果。回复结果会包含我刚才介绍的三部分内容,分别是当前哨兵对主节点主观下线的判断结果、哨兵 Leader 的 ID,以及哨兵 Leader 所属的纪元。这个函数会进一步检查,其中第一部分内容“当前哨兵对主节点主观下线的判断结果”是否为 1。
如果是的话,这就表明对应的哨兵已经判断主节点为主观下线了,那么当前哨兵就会把自己记录的对应哨兵的 flags,设置为 SRI_MASTER_DOWN。
下面的代码就展示了 sentinelReceiveIsMasterDownReply 函数判断其他哨兵回复结果的执行逻辑,你可以看下。
/* Receive the SENTINEL is-master-down-by-addr reply, see the * sentinelAskMasterStateToOtherSentinels() function for more information. */ // 接收 SENTINEL is-master-down-by-addr 回复,更多信息请参见 sentinelAskMasterStateToOtherSentinels() 函数 void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) { sentinelRedisInstance *ri = privdata; instanceLink *link = c->data; redisReply *r; if (!reply || !link) return; link->pending_commands--; r = reply; /* Ignore every error or unexpected reply. * Note that if the command returns an error for any reason we'll * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */ // 忽略每个错误或意外回复。请注意,如果命令由于任何原因返回错误,我们将结束清除 SRI_MASTER_DOWN 标志以表示超时。 // r是当前哨兵收到的其他哨兵的命令处理结果 // 如果返回结果包含三部分内容,并且第一,二,三部分内容的类型分别是整数、字符串和整数 if (r->type == REDIS_REPLY_ARRAY && r->elements == 3 && r->element[0]->type == REDIS_REPLY_INTEGER && r->element[1]->type == REDIS_REPLY_STRING && r->element[2]->type == REDIS_REPLY_INTEGER) { ri->last_master_down_reply_time = mstime(); // 如果返回结果第一部分的值为1,则在对应哨兵的flags中设置SRI_MASTER_DOWN标记 if (r->element[0]->integer == 1) { ri->flags |= SRI_MASTER_DOWN; } else { ri->flags &= ~SRI_MASTER_DOWN; } if (strcmp(r->element[1]->str,"*")) { /* If the runid in the reply is not "*" the Sentinel actually replied with a vote. */ // 如果回复中的 runid 不是“*”,则 Sentinel 实际上回复了投票。 sdsfree(ri->leader); if ((long long)ri->leader_epoch != r->element[2]->integer) serverLog(LL_WARNING, "%s voted for %s %llu", ri->name, r->element[1]->str, (unsigned long long) r->element[2]->integer); ri->leader = sdsnew(r->element[1]->str); ri->leader_epoch = r->element[2]->integer; } } }
所以到这里,你就可以知道,一个哨兵调用 sentinelCheckObjectivelyDown 函数,是直接检查其他哨兵的 flags 是否有 SRI_MASTER_DOWN 标记,而哨兵又是通过 sentinelAskMasterStateToOtherSentinels 函数,向其他哨兵发送 sentinel is-master-down-by-addr 命令,从而询问其他哨兵对主节点主观下线的判断结果的,并且会根据命令回复结果,在结果处理函数 sentinelReceiveIsMasterDownReply 中,设置其他哨兵的 flags 为 SRI_MASTER_DOWN。下图也展示了这个执行逻辑,你可以再来整体回顾下。