日志同步(Part 2B: log replication)
Leader 接收到 client 请求到应用其包含 command 到状态机大概需要这么几个过程:
- 收到 client 的请求,包含一个 command 参数(
Start(command interface{})
)。 - Leader 把该请求追加到本地日志(
rf.log = append(rf.log, &logEntry{rf.currentTerm, command})
)。 - 通过心跳并行通知所有 Follower 写入该日志(AppendEntries RPC)。
- 待大多数 Follower 成功写入后,提交该日志到状态机。
心跳是定时的,而同步日志则是在定时的心跳的过程中完成的。如果 RPC 参数中不携带日志条目,则就是一个简单心跳;如果 RPC 参数中携带日志条目,则是 Leader 告诉该 Follower,我认为你需要同步这些日志。
那么 Leader 是如何得知每个 Follower 需要同步哪些日志呢?
试探。
通过试探得到匹配点,Leader 在匹配点之后的日志便是需要同步给 Follower 的部分。试探匹配点即是 Leader 会依照自己的日志条目,从后往前,不断询问,你有没有存这一条日志?只要发现某条日志 Follower 存了,那么它就是个匹配点,其之前的日志必然一样 [2]。为了实现这个逻辑,raft 论文主要使用了这几个变量:matchIndex []、nextIndex [] 和 prevLogIndex、prevLogTerm。
matchIndex 和 nextIndex
这两个数组只对 Leader 有用。其中,matchIndex[]
跟踪 Leader 和每个 Follower 匹配到的日志条目, nextIndex[]
保存要发送每个 Follower 的下一个日志条目。
Candidate 在当选为 Leader 时,会将所有 matchIndex
初始化为 0 [3],表示现在我不知道每个 Peer 的进度;同时会将所有 nextIndex
初始化为 len(rf.log)
,表示要从其前面,即最后一条日志,开始试探。
func (rf *Raft) becomeLeader() { ... for i := 0; i < rf.peersNum; i++ { rf.matchIndex[i] = 0 rf.nextIndex[i] = len(rf.log) } ... } // in function pingLoop: construct AppendEntriesArgs prevLogIndex := rf.nextIndex[id] - 1 args := &AppendEntriesArgs{ Term: rf.currentTerm, LeaderID: rf.me, PrevLogIndex: prevLogIndex, PrevLogTerm: rf.log[prevLogIndex].Term, Entries: rf.log[rf.nextIndex[id]:], // start from next to the end LeaderCommit: rf.commitIndex, }
这里面隐含一个情况,即第一次试探时 args.Entries
是空的,因此对应论文中图二所说的心跳:
Upon election: send initial empty AppendEntries RPCs (heartbeat) to each server; repeat during idle periods to prevent election timeouts (§5.2)
以后随着不断向前试探,心跳中携带的日志条目将会越来越多,在实际工程中可能会引发性能问题。
prevLogIndex 和 prevLogTerm
每次心跳会带上试探信息:prevLogIndex
和 prevLogTerm
。Follower 在收到该 RPC 时,会看自己是否有这这条日志。如果没有,则其 prevLogIndex
以及之后的日志必然也不匹配,可以删掉。如果有,那 RPC 参数中携带的日志条目就有用了,将其追加到匹配点之后,同时根据 Leader 要求更新 commit 信息 。
if args.PrevLogIndex >= len(rf.log) || rf.log[args.PrevLogIndex].Term != args.PrevLogTerm { if args.PrevLogIndex < len(rf.log) { rf.log = rf.log[0:args.PrevLogIndex] // delete the log in prevLogIndex and after it rf.persist() } return } rf.log = append(rf.log[0:args.PrevLogIndex+1], args.Entries...) if args.LeaderCommit > rf.commitIndex { prevIndex := rf.commitIndex rf.commitIndex = minInt(args.LeaderCommit, len(rf.log)-1) } reply.Success = true
Leader 处理回应
当 Leader 收到该 Follower 的回复时,如果发现匹配上了,则更新 matchIndex 和 nextIndex;否则,继续试探前一条,当然,为了加快匹配速度,我们采用了大跨步向前策略,每次跳过一个 term 而非一个 index [4]。不这么优化,有个测试就跑不过去。
if reply.Success { rf.matchIndex[id] = args.PrevLogIndex + len(args.Entries) // do not depend on len(rf.log) rf.nextIndex[id] = rf.matchIndex[id] + 1 majorityIndex := getMajoritySameIndex(rf.matchIndex) if rf.log[majorityIndex].Term == rf.currentTerm && majorityIndex > rf.commitIndex { rf.commitIndex = majorityIndex DPrintf("%v advance commit index to %v", rf, rf.commitIndex) } } else { prevIndex := args.PrevLogIndex for prevIndex > 0 && rf.log[prevIndex].Term == args.PrevLogTerm { prevIndex-- } rf.nextIndex[id] = prevIndex + 1 }
一些点
照例,列出我实现中遇到的一些问题 or Bug:
- 如何决定哪些日志可以提交,即根据 matchIndex 数组得到大多数 Peer 的匹配点(
getMajoritySameIndex
)?我的实现比较粗暴,复制一份,从大到小排个序,取len/2
处的值。
func getMajoritySameIndex(matchIndex []int) int { tmp := make([]int, len(matchIndex)) copy(tmp, matchIndex) sort.Sort(sort.Reverse(sort.IntSlice(tmp))) idx := len(tmp) / 2 return tmp[idx] }
2.rf.applyCh
不要忘记在 Make
函数中使用传入的参数 applyCh
进行赋值。
3.Leader/Candidate/Follower 在接收到比自己大的 term 的 RequestVote RPC,需要立即转为 Follower,并且重置 electionTimer。
4.在 Leader 收到 AppendEntries 的 Reply 时,需要先判断 term,然后再判断状态是否变了,即下面两个 if 语句顺序不能换。否则可能由于某种原因,该 Peer 状态变了(即不再是 Leader 或者 term 发生了更改),就直接返回了, 但有可能其更改后 Term 仍然比 reply.Term 小,从而没有及时变成 Follower。
if reply.Term > rf.currentTerm { rf.becomeFollower(reply.Term) } if rf.role != LEADER || rf.currentTerm != args.Term { DPrintf("%v is not leader or changes from previous term: %v", rf, args.Term) return }
5.每次 Leader 丢掉领导者身份后,其 commitIndex
需不需要回退?以及每次 Candidate 上位 Leader 时,需不需要对 commitIndex = 0
?答案是都不需要,因为根据论文中 Leader Completeness 特性,所有被提交了日志必定会出现在后面的 Leader 的日志中。
6.AppendEntries 收回 reply 的时候更新 rf.matchIndex 和 rf.nextIndex
需要注意,不能依赖 len(rf.log)
,因为他可能被改变,比如由于客户端请求,被追加新的日志条目了。最好用发出 RPC 请求时的参数中的字段值: args.PrevIndex + len(arg.Entnries)
,上面代码有相应注释。
7.TestRPCBytes2B 这个测试用例我一开始老过不了,后来发现是我心跳的时间间隔太小了,后来发现是单位写错了,将毫秒写成了微秒。从而导致对于同一个 Follower,同样的 AppendEntries 包发了太多次(稍微多发一两次测试程序是可以忍的)。
8.随机种子用 rand.Seed(time.Now().UnixNano())
好一些。
9.发送 AppendEntries RPC 时,当 peerID 是 Leader 自己时,也要注意更新 nextIndex 和 matchIndex:
if peerID == rf.me { rf.nextIndex[peerID] = len(rf.log) rf.matchIndex[peerID] = len(rf.log) - 1 continue }
状态备份(Part 2C: state persist)
光从实现上来说 2C 比较简单,就是实现序列化(rf.persist()
)和反序列化(rf.readPersist()
)的函数,以对需要持久化的状态进行保存或者加载。并且在这些状态发生改变的时候,及时调用 rf.persist()
。
但却很容易跑不过,我前年做的时候,就是卡在这了好久。因为这一个部分的测试用例更加复杂,很容易将前两个部分没实现好的点测试出来。
序列化和反序列化
需要持久化的状态论文中图二说的很清楚,labgob 的用法,注释中也给的很详细。需要注意的就是 rf.log 这个 Slice 不用区别对待,和普通变量一样处理即可,以序列化为例:
func (rf *Raft) persist() { w := new(bytes.Buffer) e := labgob.NewEncoder(w) err := e.Encode(rf.currentTerm) if err != nil { DPrintf("%v encode currentTerm error: %v", rf, err) } err = e.Encode(rf.votedFor) if err != nil { DPrintf("%v encode votedFor error: %v", rf, err) } err = e.Encode(rf.log) if err != nil { DPrintf("%v encode log error: %v", rf, err) } data := w.Bytes() rf.persister.SaveRaftState(data) }
状态改变
代码中涉及到状态改变的主要有四个地方:
- 发起选举,更新 term 和 votedFor 时。
- 调用 Start 追加日志,rf.log 改变时。
- RequestVote 和 AppendEntries 两个 RPC handler 改变相关状态时。
- Candidate/Leader 收到 RPC 的 reply 更新自身状态时。
但是测试用例 TestFigure8Unreliable2C 就是过不了。Google 一番后发现别人也有这个问题。将 pingInterval 再改小一些,拉大其与 electionTimeout 的差距可解决。最后我将参数改为如下值,该用例就过了。
const ( electionTimeoutMin = 150 electionTimeoutMax = 300 heartbeatInterval = 50 * time.Millisecond )
但我开始将心跳间隔设置得比较大,是因为看到了材料中这么一句话:
The paper’s Section 5.2 mentions election timeouts in the range of 150 to 300 milliseconds. Such a range only makes sense if the leader sends heartbeats considerably more often than once per 150 milliseconds. Because the tester limits you to 10 heartbeats per second, you will have to use an election timeout larger than the paper’s 150 to 300 milliseconds, but not too large, because then you may fail to elect a leader within five seconds.
但是我改成 heartbeatInterval = 50 * time.Millisecond
tester 也让过了,这里我暂时有些不解。
参考材料
- Raft 论文:https://pdos.csail.mit.edu/6.824/papers/raft-extended.pdf
- Lab2 Raft 课程材料页:https://pdos.csail.mit.edu/6.824/labs/lab-raft.html
- 助教总结往年经验:https://thesquareplanet.com/blog/students-guide-to-raft/,顺便说一嘴,这个助教的博客也挺不错。
- 助教 Q&A:https://thesquareplanet.com/blog/raft-qa/
- Raft 用锁建议:https://pdos.csail.mit.edu/6.824/labs/raft-locking.txt
- Raft 实现结构组织建议:https://pdos.csail.mit.edu/6.824/labs/raft-structure.txt
- Raft 主页,上面有个可视化动图,能帮你直观的感受下 raft,并且具有一定的交互性。此外还有更多 raft 相关的材料,也可以参考:https://raft.github.io/
- 某个网上同学实现:https://wiesen.github.io/post/mit-6.824-lab2-raft-consensus-algorithm-implementation/
注解
[0] Raft 中的每个 Server 文中统一称为 Peer。
[1] 投票者可能为 Follower,也可能为 Candidate。
[2] 参考论文 Figure 3 的 Log Matching 性质。
[3] 0 其实表示 nil,因为 rf.log[0]
是一个无意义的空日志条目,起到哨兵的作用,可以减少一些判断,类似于链表中的 dummy Head node。
[4] raft 论文 7~8 页的引用文字有提到该策略。
[5] 我在实现时, votedFor 用的 int 类型,并且用的 -1 代表 null。