引子
18 年的时候做过一些 6.824,旧文在此,无奈做到 Part 2C,无论如何也跑不过测试,便一直搁置起来。但在后来的日子,却时时念起此门神课,心下伤感。拖到今日,终于可以来还愿了。
这次能跑过所有测试,原因有三:一来,之前做过一次,很多原理还留有印象;二来,这一年多在工作上有了很多分布式系统的实践;三来,golang 的驾驭上也精进了一些。但是在做的过程中,仍然遇到了大量令人纠结的细节,为了方便日后回顾,将这些细节梳理一下,记在此处。若能好巧对其他做此门课的人有些微启发,则又是快事一件了。
6.824 与 Raft
6.824 是一门关于分布式系统的非常棒的公开课,做课程实验的过程中时时惊叹于其构思之精巧、材料准备之翔实。MIT 的大师们能将这样精华的课程开放出来,实乃名校和大师们的气度,更是我们计算机人的幸事。
Raft 是一个面向理解的分布式共识(consensus)协议。分布式共识算法是分布式领域非常非常经典的问题,同时也是分布式系统中非常难的一块,直观的说,就如同流沙上打下分布式系统大楼的地基。不可靠的网络、易故障的主机,造成的状态变化之复杂,实在不是一般人能在脑中模拟得了的。本人愚钝,只能是感性把握加细节堆叠,堪堪有些认识。说回 Raft,有同领域 Paxos 珠玉在前,何以 Raft 仍能脱颖而出?应该是抓住了以下两点:
- 易于理解。Paxos 是出了名的难以理解,因此也就难以飞入寻常百姓家。而 Raft 通过解耦出多个模块,将算法复杂度进行降维,大大降低了一般人的理解难度。此外,Raft 还有很多精巧的设计,以尽可能避免引入复杂度,从而进一步减轻大家的心智负担。
- 易于实现。易于理解客观上会导致利于实现,但不等同于就能据此产出优秀系统。如果理解流于感性,则实现成空中楼阁。Raft 论文的厉害之处就在于既有感性把握又有细节组织,几乎就是一个系统的设计文档,还是详细设计文档。
要想做好该实验,需要涉猎大量的材料,我把实验中提到的和我看到的汇总在文末。当然,还有英文劝退。虽然我最后测试用例都过了,但仍有很多没实现好的点以及不理解之处。
作者:青藤木鸟 https://www.qtmuniao.com, 转载请注明出处
整体结构
该实验(2020 年版本)分为三个部分,分别是 Part 2A:leader 选举、Part 2B:日志同步、lab2C:状态备份。
我在实现的时候没有用过多的 channel,状态都是通过加锁来阻塞式改变的。我注意到网上有一些实现将所有状态变化都用 channel 控制,这样异步实现可能会效率高些,但可读性稍差。我的实现基本遵从论文叙述,在代码组织上可以概括为三个状态和三个 Loop。
三个状态
Follower、Candidate、Leader。并据此定义了三个函数:becomeFollower、becomeCandidate、becomeLeader,分别封装了一些 Raft Peer 内部的状态变化。
func (rf *Raft) becomeCandidate() { rf.role = CANDIDATE rf.currentTerm++ rf.votedFor = rf.me rf.persist() DPrintf("%s change to candidate", rf) } func (rf *Raft) becomeFollower(term int) { rf.role = FOLLOWER rf.currentTerm = term rf.votedFor = -1 rf.persist() rf.electionTimer.Reset(getRandElectTimeout()) DPrintf("%s change to follower with term %d", rf, term) } func (rf *Raft) becomeLeader() { rf.role = LEADER rf.leaderID = rf.me rf.persist() for i := 0; i < rf.PeersNum; i++ { rf.matchIndex[i] = 0 rf.nextIndex[i] = len(rf.log) } rf.pingTimer.Reset(heartbeatInterval) go rf.pingLoop() DPrintf("%s change to leader", rf) }
三个循环
三个 goroutine:electionLoop,pingLoop,applyLoop*。其中,前两个 loop 都由 timer 驱动, *electionLoop 只在 Peer 不为 Leader 时执行选举逻辑,而 pingLoop 是在每次 Peer 当选为 Leader 时启动,并在失去 Leader 身份后及时退出的。也就是说,对于同一个 Peer,这两个 Loop 实现为了互斥的。
electionLoop 是 Follower 超时变为 Candidate 后进行选举的 Loop。它为后台常驻 goroutine,但是检测到自己是 Leader 时会跳过执行循环体(跳过选举),毕竟谁也不会主动发起选举推翻自己。
func (rf *Raft) electionLoop() { for { <-rf.electionTimer.C rf.electionTimer.Reset(getRandElectTimeout()) if rf.role == LEADER { continue } rf.mu.Lock() rf.becomeCandidate() // request vote from each Peer except itself rf.mu.Unlock() } }
pingLoop 是 Candidate 当选为 Leader 后进行心跳的 Loop,并在心跳的来回中完成日志同步。该 Loop 是在 becomeLeader 函数中被启动的 goroutine,一旦检测到自己不为 Leader 后便立即退出,毕竟 Peer 都是高度自觉的,若是人人欺诈,就永远达不成一致了。
func (rf *Raft) pingLoop() { for { rf.mu.Lock() if rf.role != LEADER { rf.mu.Unlock() return } // append entries to each Peer except itself rf.mu.Unlock() <-rf.pingTimer.C rf.pingTimer.Reset(heartbeatInterval) DPrintf("%v start next ping round", rf) } }
applyLoop 最简单,就是将已经 commit 的 log 不断应用到状态机,也是个后台常驻 goroutine。如此设计的妙处在于解耦。即不是每次 commit 后立即 apply,而是由额外的 goroutine 统一执行,以避免多次 commit 同一个 index(由于大多数 Peer 响应后就可以 commit,之后再收到其他 Peer 的响应,就可能造成多次 commit),进而导致多次 apply。
func (rf *Raft) applyLoop() { for { time.Sleep(10 * time.Millisecond) rf.mu.Lock() for rf.lastApplied < rf.commitIndex { rf.lastApplied++ rf.apply( rf.lastApplied,rf.log[rf.lastApplied]) // put to applyChan in the function } rf.mu.Unlock() } }
加锁原则
我是按照实验材料中建议的,用的比较粗暴,基本都是函数粒度的。仅在发生长耗时操作(网络 IO)前会释放锁:即每次 RPC (sendRequestVote 和 sendAppendEntries)前。因此效率不会太高,但是易于实现和理解。同时为了保证这次发送过程是原子的(不被中断),使用了一个 channel 来同步,保证给下个 Peer 发送 RPC 前,前一个 RPC 已经准备完了 Args;当然也可以将准备 Args 的过程,拿到 goroutine 之外。
func (rf *Raft) pingLoop() { for { rf.mu.Lock() if rf.role != LEADER { rf.mu.Unlock() return } prepareFinish := make(chan bool) for PeerID := range rf.Peers { go func(id int, pf chan bool) { // prepare the append entries arguments pf <- true // [send rpc] -> [wait] ->[handle the reply] }(PeerID, prepareFinish) <-prepareFinish } rf.mu.Unlock() <-rf.pingTimer.C rf.pingTimer.Reset(heartbeatInterval) DPrintf("%v start next ping round", rf) } }
日志
我给主要结构体都实现了 String()
函数,以方便返回当前 Peer 的关键状态、 RPC 前后的参数和返回值,从而易于跟踪、调试。以 Raft 结构体为例:
func (rf *Raft) String() string { return fmt.Sprintf("[%s:%d;Term:%d;VotedFor:%d;logLen:%v;Commit:%v;Apply:%v]", rf.role, rf.me, rf.currentTerm, rf.votedFor, len(rf.log), rf.commitIndex, rf.lastApplied) }
领导者选举(Part 2A: leader election)
Raft 中的任何 Peer(或者说 server) 都处在三种状态之一:Follower、Candidate、Leader。其状态转移图如下:
其中所有 Peer 在启动时都是 Follower,经过一个随机超时后先后变为 Candidate。Candidate 是一个为竞选 Leader 而设计的中间过渡状态。所有的任期( term) 始于 Candidate(即变成 Candidate 时 term+1),如果能当选则持续到 Leader 结束。
Raft 中有一条铁律,就是不论出于什么状态,只要发现自己所处 term 落后于人,就立即改变自己 term 变成 Follower。term 即为事实上的逻辑时钟,所有投票行为(Candidate 和 Voter [1])和日志同步(Leader 和 Follower)动作需要所涉及双方在同一个 term 中。
Raft 使用强势 Leader,只能由 Leader 向 Follower 同步日志,而不能反过来。并且 Leader 本身的日志只会由 Client 进行 Append,不会更改或者删除;由于 Leader 权势巨大,必须为选举设置严苛的门槛,即保证当选的 Candidate 的 log 比过半数的 Peer 更 up-to-date 。因此需要在选举阶段逐一比较 Candidate 和其他 Peer 谁更 up-to-date。根据论文中这段描述:
Raft determines which of two logs is more up-to-date by comparing the index and term of the last entries in the logs. If the logs have last entries with different terms, then the log with the later term is more up-to-date. If the logs end with the same term, then whichever log is longer is more up-to-date.
只考虑两组日志的最后一条日志的 index 和 term 即可:
func notLessUpToDate(currTerm, currIndex int, dstTerm, dstIndex int) bool { if currTerm != dstTerm { return currTerm > dstTerm } return currIndex >= dstIndex }
实现的详细描述可以看论文图二,该描述要当做编程语言而非自然语言来对待,因为它十分精确,尤其要特别注意不要遗漏其状态转换时的触发条件。这里不再赘述,仅列出我觉得在实现时,可能有困惑的一些点:
RequestVoteArgs
结构体的字段都要大写,这是测试程序所要求的。- 实现
GetState
时,是否为 Leader 的判断方式可能有两种:rf.role == LEADER
或者rf.me == rf.leaderId
AppendEntries
请求、回应结构体、以及发送 RPC 的实现,需要自己比对RequestVote
依样画葫芦。logEntry
也需要自己定义,我实现的就只包含 Term 和 Command 两个字段。- Peer 每一轮(term)至多投一次票,可以通过在给出投票前判断
rf.votedFor
是否为 null [5] 来保证;但同时有另一层意思,即每一轮至少可以投一次票,这就要求在发生 term 改变时,需要及时改变 votedFor 。分两种情况:一是 Follower/Candidate 超时变为 Candidate 时,term 会增加 1,这时候先无脑投自己(rf.votedFor = rf.me
),然后发起选举;二是在收到其他 Peer 的 RPC 时(包括 Request 和 Reply),发现别人 term 高,变为 Follower 时,也需要及时清空自己之前投票结果(rf.votedFor = -1
)以使本轮次可以继续投票。 - Peer 在实现 AppendEntries 时,只要本 Peer 的 term 不高于发起心跳的 Leader 的 term,都要及时变为 Follower,这包含以下几种情况:a. 如果 Peer 较小,则需 term 跟上,然后变 Follower;b. 本来为 Candidate 且 term 相同,要停止选举变为 Follower;c. 本来就是 Follower 且 term 相同 ,便重置下 electionTimer。
if args.Term < rf.currentTerm { DPrintf("%v reject append entry rpc from %v for (term)", rf, args) return } rf.leaderID = args.LeaderID rf.becomeFollower(args.Term) // become follower with term
- 关于
electionTimer
的重置:每次变为 Follower 时,可以重置下 timer。此外,对于 Follower/Candidate ,还有两种情况需要重置:一是收到 AppendEntries RPC 时;一是投票给某个 Candidate 时。这个推断从图二中 Followers 中这句话来:
If election timeout elapses without receiving AppendEntries RPC from current leader or granting vote to candidate: convert to candidate
- 每次在收到 RPC 的 reply 时,(Candidate 和 Leader)都要检查一下此时是否和发送 RPC 之前状态一致,如果不一致,需要及时退出 goroutine,不在其位,不谋其政。term + role 可以唯一确定某个 Peer 的状态。
// in pingLoop() if rf.role != LEADER || rf.currentTerm != args.Term { return } // in electionLoop() if rf.role != CANDIDATE || rf.currentTerm != args.Term { return }
在做的时候我还想到了一种极端情况,做了下推演。某个 Peer A 与其他 Peer 产生了网络隔离,于是不断超时 - 选举 - 超时,从而不断更新 term 到一个很大的值 T。某个时刻 A 与其他 Peer 恢复通信,它发起选举,向每个 Peer 要票。于是剩下的所有 Peer 包括 Leader 发现更大 term:T,就会立即变为 Follower。不过由于 up-to-date 的保证,A 也是成不了 Leader 的。所以这样场景的唯一后果就是 A 一下把大家带入了更高的 term,且成就了别人(可能选出一个不为 A 、不为原 Leader 的其他 Peer)。