Contents

MIT 6.824 Lab2B笔记

Lab 子任务

在lab 2B中,学生们需要实现raft的log replication,也就是raft作为共识算法所支撑的核心功能:它让各个节点可以在各种情况下都存储同一组系列日志,且对日志的顺序与内容达成共识。这样就可以保证对这组日志的高可用,在无需第三方协调的前提下,避免单点故障。

套娃的终点

为什么这个前提重要呢?

在mit 6.824中,Robert Morris教授通过先介绍VMWare FT来引出raft所解决的问题,VMWare FT实现了虚拟机系统层面的主从复制,简单来说就是在把发往primary的所有指令都复制到backup来实现复制,在primary宕机时,backup “goes live"来对外服务。但是在network partition的情况下,backup可能会误认为primary宕机,需要升级成新的primary,这样就出现了经典的脑裂问题,此时就需要一个提供test and set服务的第三方作为“arbitrator”来判断二者的状态。

然而,这个第三方又会成为潜在的单点故障,需要用多节点来保证高可用,此时又引入了新的脑裂问题。。。

可见这是一个套娃问题,而raft,以及在它之前的paxos、viewstamp等,就是用于终结这个套娃的共识算法,它们可以使集群内的节点自己相互协调,在network partition的情况下避免多个master的问题。

Log Replication

raft的核心功能是维护一组高可用的日志,使用raft的应用需要是一个状态机,每个节点中的raft内的日志内容和顺序相同,那么当各节点的应用按照一样的日志以一样的顺序apply到自己的状态机时,就能到达完全一样的状态。

在raft中,每一条日志有两个关键字段,index和term,当然,也需要记录日志本身应用相关的内容。index和term两个属性共同维护了一份日志的唯一性,假如一个日志term和index相同,那么它们便是同一份日志的复制。这一性质由log replication的机制保证,又反过来被用于判断节点间每份log到底是不是同一份log。

上图描述了5个节点中日志可能存在的情况,其中,方块里面的数字是term,最上方标记了日志所属的index。可以发现,上图中,由于网络错误或者节点宕机,不同节点存有不同的日志,而raft的log replication机制可以保证,在一段时间后,互通的节点会最终恢复成统一的日志序列,接下来,我们看看lab 2B需要学生实现的代码。

代码实现

为了简洁,下面的代码会有省略的部分,特别是共享变量的加锁解锁。

start函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (rf *Raft) Start(command interface{}) (int, int, bool) {
	term, isLeader := rf.GetState()
	index := -1
	if !isLeader {
		return index, term, isLeader
	}
	rf.logMu.Lock()
	index = len(rf.logs)
	rf.logs = append(rf.logs, &CommandLog{
		Command: command,
		Index:   index,
		Term:    term,
	})
	rf.logMu.Unlock()
	// broadcast appendEntries RPC
	rf.resetLastRPCtime()
	return index, term, isLeader
}

首先来看看start函数,这是客户端应用向raft leader提交新日志的入口,在lab2B的test中,test通过不断调用start写入日志,并模拟节点掉线,恢复等情况,来测试学生的raft实现是否正确。在我的版本中,start函数收到command后,简单的写入rf.logs中,然后重置所有follower节点的lastRPC,这个lastRPC用来判断leader是否需要发送空的AppendEntries RPC来维持leader地位,重置后,在下一轮心跳时,leader便可以会带上这个新的日志。

runLeader与periodicAppendEntries

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (rf *Raft) runLeader() {
	term := rf.GetTerm()
	state := rf._GetState()
	for i := 0; i < len(rf.peers); i++ {
		if i == rf.me {
			continue
		}
		go rf.periodicAppendEntries(i, term)
	}
	for state == leader {
		select {
		case r := <-rf.rpcChan:
			rf.handleRPC(r)
		}
		state = rf._GetState()
	}
}

func (rf *Raft) periodicAppendEntries(server int, term int) {
	for rf._GetState() == leader && rf.GetTerm() == term {
		if time.Since(rf.lastRPCtime[server]) > heartbeatInterval {
			go rf.sendLogEntry(server, term)
		}
		time.Sleep(10 * time.Millisecond)
	}
}

runLeader对应上一次lab的runCandidaterunFollower,是leader节点的main loop,可以发现,在函数启动时,leader会给每个follower启动一个专门发送日志与心跳的协程periodicAppendEntries,每隔10毫秒判断向该节点发送的上一次RPC的时间间隔来判断是否发送新的AppendEntries RPC

sendLogEntry

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (rf *Raft) sendLogEntry(server int, term int) {
	//last check isLeader, may caught up with higher term but was here already
	if rf._GetState() != leader || rf.GetTerm() != term {
		return
	}
	// find entry to send based on nextindex[server]
	req := rf.prepareAppendEntries(server)
	reply := &AppendEntriesReply{}
	
    // do actual RPC
	if rf.sendAppendEntries(server, req, reply) {
        if rf.maybeCatchup(reply.Term) {
            return
        }
		if reply.Success {
			rf.matchIndex[server] = max(rf.matchIndex[server], req.PrevLogIndex + len(req.Logs))
			rf.nextIndex[server] = max(rf.nextIndex[server], req.PrevLogIndex + len(req.Logs) + 1)
			// check if update commit index of leader
			rf.maybeUpdateCommitIndex()
			// if nextIndex < latest, do another sendLogEntry
			if rf.nextIndex[server] <= rf.LastLog().Index {
				go rf.sendLogEntry(server, term)
			}
		} else {
            // optimized way to update next index based on rpc reply info
            rf.updateNextIndex(server, reply.Xterm, reply.Xindex, reply.Xlen)
			// retry immediately
			go rf.sendLogEntry(server, term)
		}
	}
	rf.lastRPCtime[server] = time.Now()
}

sendLogEntry就是向一个follower节点的一次rpc啦,我们首先看看rf.prepareAppendEntries(server)中返回的req结构体有什么字段:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
type AppendEntriesArgs struct {
	// 2A
	Term     int
	LeaderID int

	// 2B
	PrevLogIndex int // 当前logs中第一个log的上一个log的index
	PrevLogTerm  int // 当前logs中第一个log的上一个log的term
	Logs         []CommandLog
	LeaderCommit int // 当前leader中已经commit的log中最后一个的index
}

此时,我们从leader和follower交替两边看看整个追加的流程:

  1. 【leader】创建append entries rpc的请求体,在这一步,leader会根据nextindex数组定位该follower日志的最后一个index,然后把自身日志中index以及之后的所有日志作为一个数组发给它,假如这是一次心跳,则发送空的日志数组,其中请求体还需要说明这个日志之前的日志的term和index,用来给follower判断是否接受将要收到的日志
  2. 【leader】进行AppendEntries RPC请求
  3. 【follower】收到RPC请求,按照原论文给出的规则判断是否接受这次日志追加:
    1. 规则是,假如存在index为PrevLogIndex的日志,且该任期等于PrevLogTerm,则接受该日志,把PrevLogIndex之后所有的日志清除(可能存在与leader冲突的无效日志),然后把Logs追加到自身日志数组的末端
    2. 假如不存在PrevLogIndex位置的日志,或者日志的term不等于PrevLogTerm,则拒绝此次请求,并返回冲突日志的term和index,以及自己整个日志的长度
  4. 【leader】收到RPC响应,假如响应中对面发来更新的term,则放弃后续处理,进行catch up
  5. 【leader】根据RPC结果执行状态更新:
    1. 假如RPC成功,则说明follower把PrevLogTerm之后的所有日志都同步了,此时修改对应的nextIndexmatchIndex,并在maybeUpdateCommitIndex中判断是否可以更新新的commitIndex
    2. 假如RPC失败,则需要把nextIndex往回移动,在更老的index中尝试寻找最后相同的log,再进行重试,在本次lab中,使用到了lecture中解释原论文提到的优化,也就是每个任期跳跃一次进行寻找,避免一次rpc后退一个index来查找,具体的细节可以参考原论文与这次lecture25分开始的部分

这便是一次AppendEntries日志交换的过程,raft通过这样往复多次交换来使得leader的日志同步到follower中,覆盖失效的日志(比如前任leader没有成功复制到一半以上的部分日志),最终确保集群内整个日志序列的一致性。

日志同步到状态机

最后我们再来看看rf.logs是如何被同步到状态机的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (rf *Raft) applyLog() {
	timer := time.NewTicker(10 * time.Millisecond)
	defer timer.Stop()
	for {
		select {
		case <-timer.C:
			rf.doApply()
		case <-rf.applyNotify:
			rf.doApply()
		}
	}
}

func (rf *Raft) doApply() {
	for rf.commitIndex > rf.lastApplied {
		rf.lastApplied++
		command := rf.logs[rf.lastApplied].Command
		msg := ApplyMsg{
			CommandValid: true,
			Command:      command,
			CommandIndex: rf.lastApplied,
		}
		rf.applyCh <- msg
	}
}

func Make(peers []*labrpc.ClientEnd, me int,
	persister *Persister, applyCh chan ApplyMsg) *Raft {
 	//...
    go rf.run()
	go rf.applyLog()
}

简单来说,在raft结构体初始化时,便启动一个applyLog协程来负责把rf.logs同步到状态机的,在lab2B中,学生把msg发送到test给出的applyCh来模拟命令的同步。在applyLog中会同时监听定时通道和applyNotify通道,前者负责每10毫秒就尝试同步,后者则在rf.commitIndex被更新时,对应的函数最后会往applyNotify发消息来尝试同步,在这个模式下,最后的同步是串行且幂等的,只有rf.commitIndex > rf.lastApplied时,才会有实际的同步发生。

测试结果

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
numbers from left to right:
(1)real time (2)number of raft peers 
(3)number of RPC sends (4)number of bytes (5)number of Raft argreements reported

Test (2B): basic agreement ...
  ... Passed --   0.9  3   16    4724    3
Test (2B): RPC byte count ...
  ... Passed --   1.9  3   48  115136   11
Test (2B): test progressive failure of followers ...
  ... Passed --   4.9  3  255   60246    3
Test (2B): test failure of leaders ...
  ... Passed --   5.5  3  551  128091    3
Test (2B): agreement after follower reconnects ...
  ... Passed --   5.9  3  165   45209    8
Test (2B): no agreement if too many followers disconnect ...
  ... Passed --   3.8  5  435  104065    3
Test (2B): concurrent Start()s ...
  ... Passed --   1.1  3   16    4784    6
Test (2B): rejoin of partitioned leader ...
  ... Passed --   5.0  3  462  111854    4
Test (2B): leader backs up quickly over incorrect follower logs ...
  ... Passed --  19.7  5 3077 2622645  102
Test (2B): RPC counts aren't too high ...
  ... Passed --   2.2  3   36   11252   12
PASS
ok      6.5840/raft     50.941s

real    0m51.372s
user    0m4.844s
sys     0m2.349s

总结

最后我们来看看代码调用的大致流程图:

Log Replication可以说是raft最核心而且最复杂的部分了,这次lab让我意识到分布式算法是真挺烧脑的,RPC调用的成功还是失败,不同RPC的返回顺序可能错乱,节点中共享变量的保护等等等等,更不用说出错了还很难debug,需要对着动则上千行的log看半天,想象可能是出现了怎样的执行顺序导致失败,给出猜想并证明,然后才能去想怎么解决。。。

真心佩服paxos,raft等算法的提出者们,居然能在混乱复杂的分布式系统交互中找到一套行之有效,且正确的规则。