Preparation
-
实验:http://nil.csail.mit.edu/6.824/2020/labs/lab-raft.html 的 Part 2A.
-
论文:
- 英文版:https://raft.github.io/raft.pdf
- 中文版:https://github.com/maemual/raft-zh_cn/blob/master/raft-zh_cn.md
论文只要求看完 Section 5 即可。
其中个人认为主要需要看的几个点在于:
- Figure 2 & 3.
- Section 5.1
- Section 5.2
- Section 5.4.1
Overview
Lab 2A 是实现 Leader Election。它主要关心各个角色的状态切换,以及对于 AppendEntries RPC 和 RequestVote RPC 的请求响应。因为在 Lab 2A 的测试中并不会有日志相关的操作,所以我们也暂时不需要关心太多日志相关的内容。
Followers
- 响应所有来自 leader 和 candidate 的 RPC 请求。
- 如果在选举时间超时前,没有收到来自当前 leader 的 AppendEntries RPC(心跳检测),或者没有投票给 candidate,则将自己的状态变成 candidate【注意这条规则的逻辑是
isElectionTimeout() || votedFor == nil
,而不是isElectionTimeout() && votedFor == nil
】。
Candidates
- 当状态变为 Candidate 的时候,开始进行选举:
- 递增当前的 term;
- 投票给自己;
- 重置选举超时计时器;
- 发送 RequestVote RPC 给其他的服务器。
- 选举的终止条件以及对应操作:
- 如果在选举过程中收到大多数的选票,则将自身状态变成 leader。
- 如果从新的 leader 接收到了 AppendEntries RPC(心跳检测),则将自身状态变成 follower。
- 如果选举超时,则重新进行新一轮的选举。
Leader
- 当状态变为 Leader 的时候,立即发送 AppendEntries RPC(心跳检测)给其他所有 server。
- (My Hint:当发送心跳检测不能及时收到大多数 Follower 的响应时,将自己的状态变成 Follower。
All Servers
- 在进行请求或者响应来自其他 server 的 RPC 时,若发现其他 server 的 term 大于当前 server 的 term,则将当前 server 的 term 更新为其他 server 的 term。
RequestVote RPC
- 如果
args.Term < rf.currentTerm
,则直接返回false
。 - 如果自己没有投票给其他人或者投给了 candidateID,则返回
true
。 - (存疑,看到 https://zhuanlan.zhihu.com/p/264448558 以及 http://thesecretlivesofdata.com/raft/ 中提到在接受到 RequestVote 的时候需要重置选举超时器,但是论文似乎没有提及,我没有实现这个。
AppendEntries RPC
- 如果
args.Term < rf.currentTerm
,则直接返回false
。 - 重置选举超时器。
- 如果当前状态是 candidate 并且发送者的 term 没有过期,状态变为 follower。
Implementation
Lab 2A 的代码是放在 src/raft
里面,我们需要实现 raft.go
中的一部分。
我的具体实现放在 github 中 https://github.com/shadowdsp/mit6.824 .
Flow Chart
Data Structure
Raft
Raft 的数据结构我们可以看论文中 Figure 2 进行填充,并且补充一些在选举时刻必要的变量。关于日志相关的属性暂时用不到。
type State string
var (
Leader = State("Leader")
Candidate = State("Candidate")
Follower = State("Follower")
)
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer‘s state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer‘s persisted state
me int // this peer‘s index into peers[]
dead int32 // set by Kill()
// Your data here (2A, 2B, 2C).
// Look at the paper‘s Figure 2 for a description of what
// state a Raft server must maintain.
// 1 follower, 2 candidate, 3 leader
state State
// Persistent state on server
currentTerm int
// votedFor initial state is -1
votedFor int
// follower election timeout timestamp
electionTimeoutAt time.Time
}
RPC
领导选举主要涉及两个 RPC:RequestVote 以及 AppendEntries,每个分别对应了请求 Args 和响应 Reply。为了方便 debug,也可以在请求或者响应里面加上 ServerID
。
type RequestVoteArgs struct {
// Your data here (2A, 2B).
Term int
CandidateID int
}
type RequestVoteReply struct {
// Your data here (2A).
Term int
VoteGranted bool
}
type AppendEntriesArgs struct {
Term int
}
type AppendEntriesReply struct {
Term int
// true if follower contained entry matching prevLogIndex and prevLogTerm
Success bool
}
Process
Raft 程序是由 Make
函数来启动的。在 Make
中,我主要是初始化 raft 对象,然后调用 go rf.run(ctx)
来运行 raft 程序主体。
初始的时候,raft 的状态为 Follower
,并且投票为 -1
表示还未投票。
rf := &Raft{
peers: peers,
persister: persister,
me: me,
state: Follower,
votedFor: -1,
}
// Your initialization code here (2A, 2B, 2C).
// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())
ctx := context.Background()
go rf.run(ctx)
rf.run()
主要是对 raft 状态的进行判断,并根据状态执行不同的操作。
这里加了 time.Sleep(10ms)
是因为我跑了 100 个 test,在后面会发现有锁冲突的情况。
func (rf *Raft) run(ctx context.Context) error {
for {
time.Sleep(10 * time.Millisecond)
state := rf.getState()
switch state {
case Follower:
... // check timeout and convert to cdd
break
case Candidate:
... // elect leader
break
case Leader:
... // send heartbeats
break
default:
panic(fmt.Sprintf("Server %v is in unknown state %v", rf.me, rf.state))
}
}
}
接下来就是按照 Figure2 中提到的,去填充每个 state 以及 RPC 的逻辑。
Test
当我们将程序写完,使用 go test -run 2A
去执行测试。
强烈建议将 TestReElection2A
改成循环运行多次,我这里是运行 100 次,否则极大可能只是概率性地通过。概率性地通过意味着程序并不是正确的。
虽然我能通过 100 次也是加了一些 hack,例如在某些位置加了 sleep,以及调整了超时时间等,并不说明我的程序是完全正确的。
如果我的程序有什么问题,求指正,谢谢!!!
Problems
在测试的过程中,我陆续解决了一些问题,可能对你会有帮助。
实现 Figure2 - Rules for Servers - All servers 中的第二条规则时,不要忽略了 server 在收到 rpc 响应的时候也要检查 reply.Term 去更新状态。
这一点在看论文的时候不够仔细,导致出错。
Follower 心跳检测的 timeout 和 candidate 选举的 timeout 都是 electionTimeout。
最开始我是用两个 timeout 去表示的,发现实现起来很奇怪,后面改成使用同一个。
并发编程需要注意死锁以及 goroutine 泄漏。
死锁这个还好,只要报错基本能定位到哪里的问题。
Goroutine 泄漏体现于在 goroutine 中使用 channel,如果最后这个 channel 不会被关闭,那么这个 goroutine 会一直存活。
当 Leader 发出心跳检测后,如果不能及时收到大多数节点的回复,需要变成 Follower。
我在测试 TestReElection2A
的过程中,发现跑了十几次后,经常在 checkNoLeader()
挂了。这是测试三个 server 都出现网络分区的情况。在此时,三个 server 都应该是 Follower state,因此需要加上这个机制。这里我的实现是,在 leader send heartbeats 时,对 rpc 的执行添加超时时间,使用 time.After()
去完成。
这里还有 MIT 助教写的参考指南 https://thesquareplanet.com/blog/students-guide-to-raft/
Summary
Raft leader election 的理论相对容易,实现起来如果有问题,还是如同 Hint 里面说的,多看几遍 Figure 2
: ).
If your code has trouble passing the tests, read the paper‘s Figure 2 again; the full logic for leader election is spread over multiple parts of the figure.