前言
最近在学习Raft共识算法,为了更好的沉淀,本来就打算写点笔记,然而分布式算法本质上是一套系统间不同组件交互的规则集合,没有固定的输入和期望输出,一时间竟找不到一个合适的顺序和线索来系统地展开,那既然我是通过mit 6.824和它的lab作为主要学习材料的,那就按照Lab的顺序来记录吧。
Raft 简单介绍
论文中对raft以及共识算法的定义十分简洁准确:
Raft is a consensus algorithm for managing a replicated log.
Raft是一种用于在不同节点间维护同一份日志的强一致性共识算法,那什么是共识算法呢?
Consensus algorithms allow a collection of machines to work as a coherent group that can survive the failures of some of its members.
共识算法让若干个机器像一个群组一样互相协作、对外服务,并且确保在部分机器宕机时整个集群还能对外正确服务。
具体来说,Raft把机器分为多个follower和一个leader,leader负责接受客户端的写入请求,并把日志分派給各个follower,而follower则负责接受读取请求,客户端可以选择不同follower机器进行读取。
Lab 子任务
在6.824 Lab2系列中,学生要把raft一步步实现出来,然后在后面的lab中使用它来实现一个分布式kv存储系统。原论文把Raft分解成三个子问题:leader election、log replication和safety。
在lab 2A中,学生们需要把leader election实现出来,这也是raft集群运行的第一步。
它解决的子问题是,谁来为机器分配leader和follower,如何分配?在leader宕机的情况下,如何重新分配呢?Raft给出的答案是,节点们任何情况下都可以自己选出新的leader,无须人工操作。为了实现这一目标,Raft引入election机制和candidate角色。
Term
在介绍leader election之前,需要先解释一个十分重要的概念:任期(Term)。
- Raft把集群运行的时间分为一块一块任期,用连续的整数命名
- 每个节点都记住当前自己所在的term,向其他节点发请求时,把该term带上
- 每个任期开始时进行选举,假如选举失败,比如说平票或者没人当选,则立即开始下一个任期
- 节点间的所有请求都需要带上term,当一个节点收到的请求中的term:
- 比节点自己的term更大时,节点把自己更新到该term,并把自己设置回follower状态
- 比节点自己的term更小时,拒绝该请求
(任期还有其他重要的用途,留到后续的lab再解释)
Leader Election
先来看看原论文中的状态图,基本描述了选举的全过程:
- 每个节点一开始时都是follower状态,leader通过向follower发送heartbeat来维护其leader地位
- 当节点在一定时间内没有收到heartbeat,就假设当前无leader,发起election:
- 把自己的term加一
- 把自己设置为candidate
- 向其他节点发送requestVote的RPC请求,每个节点在自己的当前任期内,只能投票一次
- 此时在选举期间,candidate可能遇到的情况有:
- 获得过半数的选票,当选leader
- 其他节点当选leader,并发送heartbeat给此candidate,此时假如term大于等于此candidate的term,那么就把自己设为follower,停止选举,否则无视这个heartbeat
- 在election timeout时间后仍没有获得足够选票,此时term加一,开始下一轮选举
- leader heartbeat实际上就是leader发送给follower追加客户端日志写入的请求,不过没有日志数据
代码实现
在lab中,课程给出的go test文件会调用make函数生成raft struct来模拟一个raft节点,然后再创建一个用go实现的虚拟网络来承载节点间的RPC通讯,最后执行一系列测试看raft集群在各种情况下是否按照论文给出的模式正确运行。本来已经写出跑了1000遍都没问题的版本了,但用了很多锁,且逻辑有点混乱,可以再简洁一点,于是在网上参考了下开源的raft实现,又重新写了一遍,后续估计也会沿用这个框架来做后面的lab了,比之前的优雅不少,且避免了很多节点内多协程数据竞争的问题。
Raft结构体,make函数与run函数
// A Go object implementing a single Raft peer.
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()
// 2A
lastRPC time.Time
electionTimeout time.Duration
term int
state int
votedFor int
}
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{}
rf.peers = peers
rf.me = me
// 2A
rf.electionTimeout = randomTimeout()
rf.state = follower
rf.votedFor = -1
rf.term = 0
rf.rpcChan = make(chan rpcCall)
go rf.run()
return rf
}
可以发现,主循环在run()函数里面,这里做基本的初始化,值得注意的是,lastRPC是上次节点收到RPC调用的时间,用来计算节点是否经过了electionTimeout时间,需要发起新的选举,下面便是run函数:
func (rf *Raft) run() {
for {
if rf.killed() {
return
}
_, state := rf._GetState()
switch state {
case candidate:
rf.runCandidate()
case follower:
rf.runFollower()
case leader:
rf.runLeader()
}
}
}
runCandidate循环
可以看到节点的每种state都是一个循环,那么这就意味着state之间假如要转移,只需要从循环中退出就可以了,比如说这是runCandidate():
func (rf *Raft) runCandidate() {
voteCh := rf.startElection()
electionTimer := randomTimeoutCh()
votes := 1
term, state := rf._GetState()
maxTerm := term
for state == candidate {
select {
case <-electionTimer:
return
case v := <-voteCh:
if v.Term > maxTerm {
maxTerm = v.Term
rf.maybeCatchup(maxTerm)
return
}
if v.VoteGranted {
votes++
}
if votes > len(rf.peers)/2 && rf.state == candidate {
rf.mu.Lock()
rf.state = leader
rf.mu.Unlock()
return
}
case r := <-rf.rpcChan:
rf.handleRPC(r)
}
_, state = rf._GetState()
}
}
每个状态主循环中,有一个核心的for select循环来处理节点的事件,然后每次for结束后,重新判断自己是否还是该state,这样就减少了不同线程事件数据并发引起错误的问题,不过估计性能没有纯多线程好就是了。可以看到,每次进入candidate状态时,都会发起新一轮的选举,并且通过channel来监听选举超时以及requestVote RPC的返回,假如票数过半了,就转变为leader状态,然后退出,这样在节点主循环的下一次运行,便会进入leader的循环。
这里还有一个常用的函数maybeCatchup(term int),是对于raft中一个规则的封装:
If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower
最后再看看选举函数:
func (rf *Raft) startElection() <-chan *RequestVoteReply {
rf.mu.Lock()
rf.state = candidate
rf.term++
rf.votedFor = rf.me
term := rf.term
rf.lastRPC = time.Now()
rf.electionTimeout = randomTimeout()
rf.mu.Unlock()
voteCh := make(chan *RequestVoteReply)
for i, _ := range rf.peers {
if i == rf.me {
continue
}
go func(server int) {
req := &RequestVoteArgs{Term: term, CandidateId: rf.me}
reply := &RequestVoteReply{}
rf.sendRequestVote(server, req, reply)
voteCh <- reply
}(i)
}
return voteCh
}
由于raft本身就考虑了因网络不可靠而引起的重复rpc或者延迟rpc的情况,所以在发起requestVote RPC的时候,直接使用goroutine并行发送就可以了,假如term更高的节点收到了,会直接忽略。
requestVote RPC
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
ch := make(chan struct{})
rf.rpcChan <- rpcCall{
rpcName: "RequestVote",
req: args,
resp: reply,
doneCh: ch,
}
// wait until rpc call finished
<-ch
}
func (rf *Raft) handleRPC(r rpcCall) {
switch r.rpcName {
case "RequestVote":
rf._RequestVote(r.req.(*RequestVoteArgs), r.resp.(*RequestVoteReply))
r.doneCh <- struct{}{}
case "AppendEntries":
rf._AppendEntries(r.req.(*AppendEntriesArgs), r.resp.(*AppendEntriesReply))
r.doneCh <- struct{}{}
}
}
func (rf *Raft) _RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
rf.maybeCatchup(args.Term)
term, _ := rf.GetState()
reply.Term = term
if term > args.Term {
reply.VoteGranted = false
return
}
rf.mu.Lock()
if rf.votedFor == -1 || rf.votedFor == args.CandidateId {
rf.lastRPC = time.Now()
reply.VoteGranted = true
rf.votedFor = args.CandidateId
}
rf.mu.Unlock()
}
可以发现,这里使用了一个handleRPC作为函数mux,而rpc函数则负责构造requestVote的函数请求体,并发送到raft.rpcChan里面,最后监听doneCh的事件再返回,这样做主要是等待处理函数把结果更新到reply对象里面再从rpc函数返回,否则调用方无法确定传入的reply结构体是否已经被更新,可以读结果。
总结
runFollower和runLeader就不详细说了,后面还需要实现日志追加的大量逻辑,但主体结构跟runCandidate差不多,都是一个for select循环。在原本lab的skeleton code里面,是给出一个ticker函数来让你实现for循环并且sleep来决定是否需要发起新选举的,这种写法就导致了一个raft里面会有很多不知道啥时候会发生的go协程,在任意时刻,可能ticker刚好唤醒,也可能别的节点的rpc调用到来,生成新的go协程,这就导致了raft的关键字段,比如state和term等不知道什么时候会被改变,在代码逻辑里面常常需要加锁并判断。
于是在这个实现中,通过for select和rpc channel来使事件的发生尽量串行化,这样显然没有让go协程随意生成更能利用多核,但代码的编写和可读性都好太多,完全是值得的。
无论如何,在debug这个lab的过程中,还是获得了不少并发编程经验的。我大致总结了一个规则:在调用或者写入共享变量的时候,以该代码行为中心,看看前后一定范围内的代码逻辑,假如运行中途有别的线程同时在运行会发生什么现象,举个例子:
func (rf *Raft) startElection() {
//...
if rf.maybeCatchup(maxTerm) {
return
}
rf.mu.Lock()
if votes > len(rf.peers)/2 && rf.state == candidate && maxTerm == term {
rf.state = leader
go rf.sendHeartbeats(rf.term)
}
rf.mu.Unlock()
}
在一开始实现的版本中,这里是没有加锁的,随时都可能发生term更新和state被设置为follower。那么在这段代码运行到判断完票数过半,进入更新state为leader之前,就可能有别的节点发到这个节点的term更高的rpc调用,然后在该调用的协程里面进行maybeCatchup(),更新term并把state改为follower,然后当前的rf.state = leader才运行,这样便导致此节点成为更大的term的另一个leader,出现了脑裂现象。
comments powered by Disqus