Lab 子任务
在lab 2B中,学生们需要实现raft的log replication,也就是raft作为共识算法所支撑的核心功能:它让各个节点可以在各种情况下都存储同一组系列日志,且对日志的顺序与内容达成共识。这样就可以保证对这组日志的高可用,在无需第三方协调的前提下,避免单点故障。
套娃的终点
为什么这个前提重要呢?
在mit 6.824中,Robert Morris教授通过先介绍VMWare FT来引出raft所解决的问题,VMWare FT实现了虚拟机系统层面的主从复制,简单来说就是在把发往primary的所有指令都复制到backup来实现复制,在primary宕机时,backup “goes live"来对外服务。但是在network partition的情况下,backup可能会误认为primary宕机,需要升级成新的primary,这样就出现了经典的脑裂问题,此时就需要一个提供test and set服务的第三方作为“arbitrator”来判断二者的状态。
然而,这个第三方又会成为潜在的单点故障,需要用多节点来保证高可用,此时又引入了新的脑裂问题。。。
可见这是一个套娃问题,而raft,以及在它之前的paxos、viewstamp等,就是用于终结这个套娃的共识算法,它们可以使集群内的节点自己相互协调,在network partition的情况下避免多个master的问题。
Log Replication
raft的核心功能是维护一组高可用的日志,使用raft的应用需要是一个状态机,每个节点中的raft内的日志内容和顺序相同,那么当各节点的应用按照一样的日志以一样的顺序apply到自己的状态机时,就能到达完全一样的状态。
在raft中,每一条日志有两个关键字段,index和term,当然,也需要记录日志本身应用相关的内容。index和term两个属性共同维护了一份日志的唯一性,假如一个日志term和index相同,那么它们便是同一份日志的复制。这一性质由log replication的机制保证,又反过来被用于判断节点间每份log到底是不是同一份log。
上图描述了5个节点中日志可能存在的情况,其中,方块里面的数字是term,最上方标记了日志所属的index。可以发现,上图中,由于网络错误或者节点宕机,不同节点存有不同的日志,而raft的log replication机制可以保证,在一段时间后,互通的节点会最终恢复成统一的日志序列,接下来,我们看看lab 2B需要学生实现的代码。
代码实现
为了简洁,下面的代码会有省略的部分,特别是共享变量的加锁解锁。
start函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
func (rf *Raft) Start(command interface{}) (int, int, bool) {
term, isLeader := rf.GetState()
index := -1
if !isLeader {
return index, term, isLeader
}
rf.logMu.Lock()
index = len(rf.logs)
rf.logs = append(rf.logs, &CommandLog{
Command: command,
Index: index,
Term: term,
})
rf.logMu.Unlock()
// broadcast appendEntries RPC
rf.resetLastRPCtime()
return index, term, isLeader
}
|
首先来看看start函数,这是客户端应用向raft leader提交新日志的入口,在lab2B的test中,test通过不断调用start写入日志,并模拟节点掉线,恢复等情况,来测试学生的raft实现是否正确。在我的版本中,start函数收到command后,简单的写入rf.logs中,然后重置所有follower节点的lastRPC,这个lastRPC用来判断leader是否需要发送空的AppendEntries RPC来维持leader地位,重置后,在下一轮心跳时,leader便可以会带上这个新的日志。
runLeader与periodicAppendEntries
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
func (rf *Raft) runLeader() {
term := rf.GetTerm()
state := rf._GetState()
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
go rf.periodicAppendEntries(i, term)
}
for state == leader {
select {
case r := <-rf.rpcChan:
rf.handleRPC(r)
}
state = rf._GetState()
}
}
func (rf *Raft) periodicAppendEntries(server int, term int) {
for rf._GetState() == leader && rf.GetTerm() == term {
if time.Since(rf.lastRPCtime[server]) > heartbeatInterval {
go rf.sendLogEntry(server, term)
}
time.Sleep(10 * time.Millisecond)
}
}
|
runLeader
对应上一次lab的runCandidate
和runFollower
,是leader节点的main loop,可以发现,在函数启动时,leader会给每个follower启动一个专门发送日志与心跳的协程periodicAppendEntries
,每隔10毫秒判断向该节点发送的上一次RPC的时间间隔来判断是否发送新的AppendEntries RPC
sendLogEntry
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
func (rf *Raft) sendLogEntry(server int, term int) {
//last check isLeader, may caught up with higher term but was here already
if rf._GetState() != leader || rf.GetTerm() != term {
return
}
// find entry to send based on nextindex[server]
req := rf.prepareAppendEntries(server)
reply := &AppendEntriesReply{}
// do actual RPC
if rf.sendAppendEntries(server, req, reply) {
if rf.maybeCatchup(reply.Term) {
return
}
if reply.Success {
rf.matchIndex[server] = max(rf.matchIndex[server], req.PrevLogIndex + len(req.Logs))
rf.nextIndex[server] = max(rf.nextIndex[server], req.PrevLogIndex + len(req.Logs) + 1)
// check if update commit index of leader
rf.maybeUpdateCommitIndex()
// if nextIndex < latest, do another sendLogEntry
if rf.nextIndex[server] <= rf.LastLog().Index {
go rf.sendLogEntry(server, term)
}
} else {
// optimized way to update next index based on rpc reply info
rf.updateNextIndex(server, reply.Xterm, reply.Xindex, reply.Xlen)
// retry immediately
go rf.sendLogEntry(server, term)
}
}
rf.lastRPCtime[server] = time.Now()
}
|
sendLogEntry
就是向一个follower节点的一次rpc啦,我们首先看看rf.prepareAppendEntries(server)
中返回的req结构体有什么字段:
1
2
3
4
5
6
7
8
9
10
11
|
type AppendEntriesArgs struct {
// 2A
Term int
LeaderID int
// 2B
PrevLogIndex int // 当前logs中第一个log的上一个log的index
PrevLogTerm int // 当前logs中第一个log的上一个log的term
Logs []CommandLog
LeaderCommit int // 当前leader中已经commit的log中最后一个的index
}
|
此时,我们从leader和follower交替两边看看整个追加的流程:
- 【leader】创建append entries rpc的请求体,在这一步,leader会根据
nextindex
数组定位该follower日志的最后一个index,然后把自身日志中index以及之后的所有日志作为一个数组发给它,假如这是一次心跳,则发送空的日志数组,其中请求体还需要说明这个日志之前的日志的term和index,用来给follower判断是否接受将要收到的日志
- 【leader】进行AppendEntries RPC请求
- 【follower】收到RPC请求,按照原论文给出的规则判断是否接受这次日志追加:
- 规则是,假如存在index为
PrevLogIndex
的日志,且该任期等于PrevLogTerm
,则接受该日志,把PrevLogIndex
之后所有的日志清除(可能存在与leader冲突的无效日志),然后把Logs
追加到自身日志数组的末端
- 假如不存在
PrevLogIndex
位置的日志,或者日志的term不等于PrevLogTerm
,则拒绝此次请求,并返回冲突日志的term和index,以及自己整个日志的长度
- 【leader】收到RPC响应,假如响应中对面发来更新的term,则放弃后续处理,进行catch up
- 【leader】根据RPC结果执行状态更新:
- 假如RPC成功,则说明follower把
PrevLogTerm
之后的所有日志都同步了,此时修改对应的nextIndex
和matchIndex
,并在maybeUpdateCommitIndex
中判断是否可以更新新的commitIndex
- 假如RPC失败,则需要把nextIndex往回移动,在更老的index中尝试寻找最后相同的log,再进行重试,在本次lab中,使用到了lecture中解释原论文提到的优化,也就是每个任期跳跃一次进行寻找,避免一次rpc后退一个index来查找,具体的细节可以参考原论文与这次lecture25分开始的部分
这便是一次AppendEntries日志交换的过程,raft通过这样往复多次交换来使得leader的日志同步到follower中,覆盖失效的日志(比如前任leader没有成功复制到一半以上的部分日志),最终确保集群内整个日志序列的一致性。
日志同步到状态机
最后我们再来看看rf.logs
是如何被同步到状态机的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
func (rf *Raft) applyLog() {
timer := time.NewTicker(10 * time.Millisecond)
defer timer.Stop()
for {
select {
case <-timer.C:
rf.doApply()
case <-rf.applyNotify:
rf.doApply()
}
}
}
func (rf *Raft) doApply() {
for rf.commitIndex > rf.lastApplied {
rf.lastApplied++
command := rf.logs[rf.lastApplied].Command
msg := ApplyMsg{
CommandValid: true,
Command: command,
CommandIndex: rf.lastApplied,
}
rf.applyCh <- msg
}
}
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
//...
go rf.run()
go rf.applyLog()
}
|
简单来说,在raft结构体初始化时,便启动一个applyLog
协程来负责把rf.logs
同步到状态机的,在lab2B中,学生把msg发送到test给出的applyCh
来模拟命令的同步。在applyLog
中会同时监听定时通道和applyNotify
通道,前者负责每10毫秒就尝试同步,后者则在rf.commitIndex
被更新时,对应的函数最后会往applyNotify
发消息来尝试同步,在这个模式下,最后的同步是串行且幂等的,只有rf.commitIndex > rf.lastApplied
时,才会有实际的同步发生。
测试结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
numbers from left to right:
(1)real time (2)number of raft peers
(3)number of RPC sends (4)number of bytes (5)number of Raft argreements reported
Test (2B): basic agreement ...
... Passed -- 0.9 3 16 4724 3
Test (2B): RPC byte count ...
... Passed -- 1.9 3 48 115136 11
Test (2B): test progressive failure of followers ...
... Passed -- 4.9 3 255 60246 3
Test (2B): test failure of leaders ...
... Passed -- 5.5 3 551 128091 3
Test (2B): agreement after follower reconnects ...
... Passed -- 5.9 3 165 45209 8
Test (2B): no agreement if too many followers disconnect ...
... Passed -- 3.8 5 435 104065 3
Test (2B): concurrent Start()s ...
... Passed -- 1.1 3 16 4784 6
Test (2B): rejoin of partitioned leader ...
... Passed -- 5.0 3 462 111854 4
Test (2B): leader backs up quickly over incorrect follower logs ...
... Passed -- 19.7 5 3077 2622645 102
Test (2B): RPC counts aren't too high ...
... Passed -- 2.2 3 36 11252 12
PASS
ok 6.5840/raft 50.941s
real 0m51.372s
user 0m4.844s
sys 0m2.349s
|
总结
最后我们来看看代码调用的大致流程图:
Log Replication可以说是raft最核心而且最复杂的部分了,这次lab让我意识到分布式算法是真挺烧脑的,RPC调用的成功还是失败,不同RPC的返回顺序可能错乱,节点中共享变量的保护等等等等,更不用说出错了还很难debug,需要对着动则上千行的log看半天,想象可能是出现了怎样的执行顺序导致失败,给出猜想并证明,然后才能去想怎么解决。。。
真心佩服paxos,raft等算法的提出者们,居然能在混乱复杂的分布式系统交互中找到一套行之有效,且正确的规则。