MIT 6.824 2020 Raft 实现细节备忘(2)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: MIT 6.824 2020 Raft 实现细节备忘(2)

日志同步(Part 2B: log replication)

Leader 接收到 client 请求到应用其包含 command 到状态机大概需要这么几个过程:

  1. 收到 client 的请求,包含一个 command 参数(Start(command interface{}))。
  2. Leader 把该请求追加到本地日志(rf.log = append(rf.log, &logEntry{rf.currentTerm, command}))。
  3. 通过心跳并行通知所有 Follower 写入该日志(AppendEntries RPC)。
  4. 待大多数 Follower 成功写入后,提交该日志到状态机。

心跳是定时的,而同步日志则是在定时的心跳的过程中完成的。如果 RPC 参数中不携带日志条目,则就是一个简单心跳;如果 RPC 参数中携带日志条目,则是 Leader 告诉该 Follower,我认为你需要同步这些日志。

那么 Leader 是如何得知每个 Follower 需要同步哪些日志呢?

试探。

通过试探得到匹配点,Leader 在匹配点之后的日志便是需要同步给 Follower 的部分。试探匹配点即是 Leader 会依照自己的日志条目,从后往前,不断询问,你有没有存这一条日志?只要发现某条日志 Follower 存了,那么它就是个匹配点,其之前的日志必然一样 [2]。为了实现这个逻辑,raft 论文主要使用了这几个变量:matchIndex []、nextIndex [] 和 prevLogIndex、prevLogTerm。

matchIndex 和 nextIndex

这两个数组只对 Leader 有用。其中,matchIndex[] 跟踪 Leader 和每个 Follower 匹配到的日志条目, nextIndex[] 保存要发送每个 Follower 的下一个日志条目。

Candidate 在当选为 Leader 时,会将所有 matchIndex 初始化为 0 [3],表示现在我不知道每个 Peer 的进度;同时会将所有 nextIndex 初始化为 len(rf.log),表示要从其前面,即最后一条日志,开始试探

func (rf *Raft) becomeLeader() {
  ...
  for i := 0; i < rf.peersNum; i++ {
    rf.matchIndex[i] = 0
    rf.nextIndex[i] = len(rf.log)
  }
  ...
}
// in function pingLoop: construct AppendEntriesArgs
prevLogIndex := rf.nextIndex[id] - 1
args := &AppendEntriesArgs{
  Term:         rf.currentTerm,
  LeaderID:     rf.me,
  PrevLogIndex: prevLogIndex,
  PrevLogTerm:  rf.log[prevLogIndex].Term,
  Entries:      rf.log[rf.nextIndex[id]:], // start from next to the end
  LeaderCommit: rf.commitIndex,
}

这里面隐含一个情况,即第一次试探时 args.Entries 是空的,因此对应论文中图二所说的心跳:

Upon election: send initial empty AppendEntries RPCs (heartbeat) to each server; repeat during idle periods to prevent election timeouts (§5.2)

以后随着不断向前试探,心跳中携带的日志条目将会越来越多,在实际工程中可能会引发性能问题。

prevLogIndex 和 prevLogTerm

每次心跳会带上试探信息:prevLogIndexprevLogTerm。Follower 在收到该 RPC 时,会看自己是否有这这条日志。如果没有,则其 prevLogIndex 以及之后的日志必然也不匹配,可以删掉。如果有,那 RPC 参数中携带的日志条目就有用了,将其追加到匹配点之后,同时根据 Leader 要求更新 commit 信息 。

if args.PrevLogIndex >= len(rf.log) || rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
  if args.PrevLogIndex < len(rf.log) {
    rf.log = rf.log[0:args.PrevLogIndex] // delete the log in prevLogIndex and after it
    rf.persist()
  }
  return
}
rf.log = append(rf.log[0:args.PrevLogIndex+1], args.Entries...)
if args.LeaderCommit > rf.commitIndex {
  prevIndex := rf.commitIndex
  rf.commitIndex = minInt(args.LeaderCommit, len(rf.log)-1)
}
reply.Success = true

Leader 处理回应

当 Leader 收到该 Follower 的回复时,如果发现匹配上了,则更新 matchIndex 和 nextIndex;否则,继续试探前一条,当然,为了加快匹配速度,我们采用了大跨步向前策略,每次跳过一个 term 而非一个 index [4]。不这么优化,有个测试就跑不过去。

if reply.Success {
  rf.matchIndex[id] = args.PrevLogIndex + len(args.Entries) // do not depend on len(rf.log)
  rf.nextIndex[id] = rf.matchIndex[id] + 1
  majorityIndex := getMajoritySameIndex(rf.matchIndex)
  if rf.log[majorityIndex].Term == rf.currentTerm && majorityIndex > rf.commitIndex {
    rf.commitIndex = majorityIndex
    DPrintf("%v advance commit index to %v", rf, rf.commitIndex)
  }
} else {
  prevIndex := args.PrevLogIndex
  for prevIndex > 0 && rf.log[prevIndex].Term == args.PrevLogTerm {
    prevIndex--
  }
  rf.nextIndex[id] = prevIndex + 1
}

一些点

照例,列出我实现中遇到的一些问题 or Bug:

  1. 如何决定哪些日志可以提交,即根据 matchIndex 数组得到大多数 Peer 的匹配点(getMajoritySameIndex)?我的实现比较粗暴,复制一份,从大到小排个序,取 len/2 处的值。
func getMajoritySameIndex(matchIndex []int) int {
  tmp := make([]int, len(matchIndex))
  copy(tmp, matchIndex)
  sort.Sort(sort.Reverse(sort.IntSlice(tmp)))
  idx := len(tmp) / 2
  return tmp[idx]
}

2.rf.applyCh 不要忘记在 Make 函数中使用传入的参数 applyCh 进行赋值。

3.Leader/Candidate/Follower 在接收到比自己大的 term 的 RequestVote RPC,需要立即转为 Follower,并且重置 electionTimer。

4.在 Leader 收到 AppendEntries 的 Reply 时,需要先判断 term,然后再判断状态是否变了,即下面两个 if 语句顺序不能换。否则可能由于某种原因,该 Peer 状态变了(即不再是 Leader 或者 term 发生了更改),就直接返回了, 但有可能其更改后 Term 仍然比 reply.Term 小,从而没有及时变成 Follower。

if reply.Term > rf.currentTerm {
  rf.becomeFollower(reply.Term)
}
if rf.role != LEADER || rf.currentTerm != args.Term {
  DPrintf("%v is not leader or changes from previous term: %v", rf, args.Term)
  return
}

5.每次 Leader 丢掉领导者身份后,其 commitIndex 需不需要回退?以及每次 Candidate 上位 Leader 时,需不需要对 commitIndex = 0?答案是都不需要,因为根据论文中 Leader Completeness 特性,所有被提交了日志必定会出现在后面的 Leader 的日志中。

6.AppendEntries 收回 reply 的时候更新 rf.matchIndex 和 rf.nextIndex 需要注意,不能依赖 len(rf.log),因为他可能被改变,比如由于客户端请求,被追加新的日志条目了。最好用发出 RPC 请求时的参数中的字段值: args.PrevIndex + len(arg.Entnries),上面代码有相应注释。

7.TestRPCBytes2B 这个测试用例我一开始老过不了,后来发现是我心跳的时间间隔太小了,后来发现是单位写错了,将毫秒写成了微秒。从而导致对于同一个 Follower,同样的 AppendEntries 包发了太多次(稍微多发一两次测试程序是可以忍的)。

8.随机种子用 rand.Seed(time.Now().UnixNano()) 好一些。

9.发送 AppendEntries RPC 时,当 peerID 是 Leader 自己时,也要注意更新 nextIndex 和 matchIndex:

if peerID == rf.me {
  rf.nextIndex[peerID] = len(rf.log)
  rf.matchIndex[peerID] = len(rf.log) - 1
  continue
}

状态备份(Part 2C: state persist)

光从实现上来说 2C 比较简单,就是实现序列化(rf.persist())和反序列化(rf.readPersist())的函数,以对需要持久化的状态进行保存或者加载。并且在这些状态发生改变的时候,及时调用 rf.persist()

但却很容易跑不过,我前年做的时候,就是卡在这了好久。因为这一个部分的测试用例更加复杂,很容易将前两个部分没实现好的点测试出来。

序列化和反序列化

需要持久化的状态论文中图二说的很清楚,labgob 的用法,注释中也给的很详细。需要注意的就是 rf.log 这个 Slice 不用区别对待,和普通变量一样处理即可,以序列化为例:

func (rf *Raft) persist() {
  w := new(bytes.Buffer)
  e := labgob.NewEncoder(w)
  err := e.Encode(rf.currentTerm)
  if err != nil {
    DPrintf("%v encode currentTerm error: %v", rf, err)
  }
  err = e.Encode(rf.votedFor)
  if err != nil {
    DPrintf("%v encode votedFor error: %v", rf, err)
  }
  err = e.Encode(rf.log)
  if err != nil {
    DPrintf("%v encode log error: %v", rf, err)
  }
  data := w.Bytes()
  rf.persister.SaveRaftState(data)
}

状态改变

代码中涉及到状态改变的主要有四个地方:

  1. 发起选举,更新 term 和 votedFor 时。
  2. 调用 Start 追加日志,rf.log 改变时。
  3. RequestVote 和 AppendEntries 两个 RPC handler 改变相关状态时。
  4. Candidate/Leader 收到 RPC 的 reply 更新自身状态时。

但是测试用例 TestFigure8Unreliable2C 就是过不了。Google 一番后发现别人也有这个问题。将 pingInterval 再改小一些,拉大其与 electionTimeout 的差距可解决。最后我将参数改为如下值,该用例就过了。

const (
  electionTimeoutMin = 150
  electionTimeoutMax = 300
  heartbeatInterval  = 50 * time.Millisecond
)

但我开始将心跳间隔设置得比较大,是因为看到了材料中这么一句话:

The paper’s Section 5.2 mentions election timeouts in the range of 150 to 300 milliseconds. Such a range only makes sense if the leader sends heartbeats considerably more often than once per 150 milliseconds. Because the tester limits you to 10 heartbeats per second, you will have to use an election timeout larger than the paper’s 150 to 300 milliseconds, but not too large, because then you may fail to elect a leader within five seconds.

但是我改成 heartbeatInterval = 50 * time.Millisecond tester 也让过了,这里我暂时有些不解。

参考材料

  1. Raft 论文:https://pdos.csail.mit.edu/6.824/papers/raft-extended.pdf
  2. Lab2 Raft 课程材料页:https://pdos.csail.mit.edu/6.824/labs/lab-raft.html
  3. 助教总结往年经验:https://thesquareplanet.com/blog/students-guide-to-raft/,顺便说一嘴,这个助教的博客也挺不错。
  4. 助教 Q&A:https://thesquareplanet.com/blog/raft-qa/
  5. Raft 用锁建议:https://pdos.csail.mit.edu/6.824/labs/raft-locking.txt
  6. Raft 实现结构组织建议:https://pdos.csail.mit.edu/6.824/labs/raft-structure.txt
  7. Raft 主页,上面有个可视化动图,能帮你直观的感受下 raft,并且具有一定的交互性。此外还有更多 raft 相关的材料,也可以参考:https://raft.github.io/
  8. 某个网上同学实现:https://wiesen.github.io/post/mit-6.824-lab2-raft-consensus-algorithm-implementation/

注解

[0] Raft 中的每个 Server 文中统一称为 Peer。

[1] 投票者可能为 Follower,也可能为 Candidate。

[2] 参考论文 Figure 3 的 Log Matching 性质。

[3] 0 其实表示 nil,因为 rf.log[0] 是一个无意义的空日志条目,起到哨兵的作用,可以减少一些判断,类似于链表中的 dummy Head node。

[4] raft 论文 7~8 页的引用文字有提到该策略。

[5] 我在实现时, votedFor 用的 int 类型,并且用的 -1 代表 null。


相关实践学习
通过日志服务实现云资源OSS的安全审计
本实验介绍如何通过日志服务实现云资源OSS的安全审计。
相关文章
|
算法 Unix API
指数退避(Exponential backoff)在网络请求中的应用
## 一、背景 最近做云服务 API 测试项目的过程中,发现某些时候会大批量调用 API,从而导致限流的报错。在遇到这种报错时,传统的重试策略是每隔一段时间重试一次。但由于是固定的时间重试一次,重试时又会有大量的请求在同一时刻涌入,会不断地造成限流。 这让我回想起两年前在查阅[Celery Task 文档](http://docs.celeryproject.org/en/latest
12852 1
|
存储 缓存 监控
一文读懂分布式架构知识体系(内含超全核心知识大图)
7月9日 19:00-21:30 阿里云开发者社区首场“Offer 5000”直播开启!15位团队技术大牛在线招人,更有《阿里云技术面试红宝书》助你拿下Offer!马上投递简历:https://developer.aliyun.com/special/offerday01
19468 0
|
存储 缓存 负载均衡
Rendezvous hashing算法介绍
Rendezvous hashing算法介绍
180 3
Rendezvous hashing算法介绍
|
12月前
|
负载均衡 算法 Java
蚂蚁面试:Nacos、Sentinel了解吗?Springcloud 核心底层原理,你知道多少?
40岁老架构师尼恩分享了关于SpringCloud核心组件的底层原理,特别是针对蚂蚁集团面试中常见的面试题进行了详细解析。内容涵盖了Nacos注册中心的AP/CP模式、Distro和Raft分布式协议、Sentinel的高可用组件、负载均衡组件的实现原理等。尼恩强调了系统化学习的重要性,推荐了《尼恩Java面试宝典PDF》等资料,帮助读者更好地准备面试,提高技术实力,最终实现“offer自由”。更多技术资料和指导,可关注公众号【技术自由圈】获取。
蚂蚁面试:Nacos、Sentinel了解吗?Springcloud 核心底层原理,你知道多少?
|
存储 关系型数据库 MySQL
MySQL数据库进阶实战:解锁性能飙升秘籍,从菜鸟到高手的华丽蜕变,让数据操作如行云流水!
【8月更文挑战第5天】MySQL是最流行的开源关系型数据库之一,在Web开发与数据分析等领域广泛应用。本文通过实战代码示例,深入探讨MySQL进阶技能:包括索引优化以提升查询性能;利用JOIN与子查询处理多表关联数据;通过事务处理确保数据一致性;使用存储过程与函数封装复杂逻辑以便重用;设置触发器自动执行特定任务以维护数据完整性。掌握这些技能能显著提高数据处理效率与系统性能。
343 5
|
存储 安全 程序员
【C++篇】深入内存迷宫:C/C++ 高效内存管理全揭秘
【C++篇】深入内存迷宫:C/C++ 高效内存管理全揭秘
630 3
|
文字识别 Python
python代码运行报错:No module named 'aliyunsdkcore'
用python调用阿里云图片OCR识别,使用的是阿里云官方给的传本地图片文件进行检测的代码,运行报错:No module named 'aliyunsdkcore'。在pycharm python软件包和终端里安装aliyunsdkcore这个模块都失败了。
go语言中的运算符和表达式与对应的优先级
go语言中的运算符和表达式与对应的优先级
225 0
|
小程序 Java 开发工具
vc6.0_cn_full(完整绿色版+安装步骤)(支持XP、Win7、Win8、Win10)
VC++6.0中文版(完整绿色精简版) 06-30 VC++6.0中文版(完整绿色精简版) 企业版集成SP6,小巧实用,用于编译VC6.0工程很合适 vc6.0_cn_full(完
1288 0
|
测试技术 API 图形学
DXGI快速截屏录屏技术
DXGI快速截屏录屏技术
1567 0