Contents

MIT 6.824 Lab2A笔记

前言

最近在学习Raft共识算法,为了更好的沉淀,本来就打算写点笔记,然而分布式算法本质上是一套系统间不同组件交互的规则集合,没有固定的输入和期望输出,一时间竟找不到一个合适的顺序和线索来系统地展开,那既然我是通过mit 6.824和它的lab作为主要学习材料的,那就按照Lab的顺序来记录吧。

Raft 简单介绍

原论文地址

论文中对raft以及共识算法的定义十分简洁准确:

Raft is a consensus algorithm for managing a replicated log.

Raft是一种用于在不同节点间维护同一份日志的强一致性共识算法,那什么是共识算法呢?

Consensus algorithms allow a collection of machines to work as a coherent group that can survive the failures of some of its members.

共识算法让若干个机器像一个群组一样互相协作、对外服务,并且确保在部分机器宕机时整个集群还能对外正确服务。

具体来说,Raft把机器分为多个follower和一个leader,leader负责接受客户端的写入请求,并把日志分派給各个follower,而follower则负责接受读取请求,客户端可以选择不同follower机器进行读取。

Lab 子任务

在6.824 Lab2系列中,学生要把raft一步步实现出来,然后在后面的lab中使用它来实现一个分布式kv存储系统。原论文把Raft分解成三个子问题:leader election、log replication和safety。

在lab 2A中,学生们需要把leader election实现出来,这也是raft集群运行的第一步。

它解决的子问题是,谁来为机器分配leader和follower,如何分配?在leader宕机的情况下,如何重新分配呢?Raft给出的答案是,节点们任何情况下都可以自己选出新的leader,无须人工操作。为了实现这一目标,Raft引入election机制和candidate角色。

Term

在介绍leader election之前,需要先解释一个十分重要的概念:任期(Term)。

  • Raft把集群运行的时间分为一块一块任期,用连续的整数命名
  • 每个节点都记住当前自己所在的term,向其他节点发请求时,把该term带上
  • 每个任期开始时进行选举,假如选举失败,比如说平票或者没人当选,则立即开始下一个任期
  • 节点间的所有请求都需要带上term,当一个节点收到的请求中的term:
    • 比节点自己的term更大时,节点把自己更新到该term,并把自己设置回follower状态
    • 比节点自己的term更小时,拒绝该请求

(任期还有其他重要的用途,留到后续的lab再解释)

Leader Election

先来看看原论文中的状态图,基本描述了选举的全过程:

  • 每个节点一开始时都是follower状态,leader通过向follower发送heartbeat来维护其leader地位
  • 当节点在一定时间内没有收到heartbeat,就假设当前无leader,发起election:
    • 把自己的term加一
    • 把自己设置为candidate
    • 向其他节点发送requestVote的RPC请求,每个节点在自己的当前任期内,只能投票一次
  • 此时在选举期间,candidate可能遇到的情况有:
    • 获得过半数的选票,当选leader
    • 其他节点当选leader,并发送heartbeat给此candidate,此时假如term大于等于此candidate的term,那么就把自己设为follower,停止选举,否则无视这个heartbeat
    • 在election timeout时间后仍没有获得足够选票,此时term加一,开始下一轮选举
  • leader heartbeat实际上就是leader发送给follower追加客户端日志写入的请求,不过没有日志数据

代码实现

在lab中,课程给出的go test文件会调用make函数生成raft struct来模拟一个raft节点,然后再创建一个用go实现的虚拟网络来承载节点间的RPC通讯,最后执行一系列测试看raft集群在各种情况下是否按照论文给出的模式正确运行。本来已经写出跑了1000遍都没问题的版本了,但用了很多锁,且逻辑有点混乱,可以再简洁一点,于是在网上参考了下开源的raft实现,又重新写了一遍,后续估计也会沿用这个框架来做后面的lab了,比之前的优雅不少,且避免了很多节点内多协程数据竞争的问题。

Raft结构体,make函数与run函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// A Go object implementing a single Raft peer.
type Raft struct {
	mu        sync.Mutex          // Lock to protect shared access to this peer's state
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	persister *Persister          // Object to hold this peer's persisted state
	me        int                 // this peer's index into peers[]
	dead      int32               // set by Kill()

	// 2A
	lastRPC         time.Time
	electionTimeout time.Duration
	term            int
	state           int
	votedFor        int
}

func Make(peers []*labrpc.ClientEnd, me int,
	persister *Persister, applyCh chan ApplyMsg) *Raft {
	rf := &Raft{}
	rf.peers = peers
	rf.me = me
    
	// 2A
	rf.electionTimeout = randomTimeout()
	rf.state = follower
	rf.votedFor = -1
	rf.term = 0
	rf.rpcChan = make(chan rpcCall)
    
	go rf.run()
	return rf
}

可以发现,主循环在run()函数里面,这里做基本的初始化,值得注意的是,lastRPC是上次节点收到RPC调用的时间,用来计算节点是否经过了electionTimeout时间,需要发起新的选举,下面便是run函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (rf *Raft) run() {
	for {
		if rf.killed() {
			return
		}
		_, state := rf._GetState()
		switch state {
		case candidate:
			rf.runCandidate()
		case follower:
			rf.runFollower()
		case leader:
			rf.runLeader()
		}
	}
}

runCandidate循环

可以看到节点的每种state都是一个循环,那么这就意味着state之间假如要转移,只需要从循环中退出就可以了,比如说这是runCandidate()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (rf *Raft) runCandidate() {
	voteCh := rf.startElection()
	electionTimer := randomTimeoutCh()
	votes := 1
	term, state := rf._GetState()
	maxTerm := term
	for state == candidate {
		select {
		case <-electionTimer:
			return
		case v := <-voteCh:
			if v.Term > maxTerm {
				maxTerm = v.Term
				rf.maybeCatchup(maxTerm)
				return
			}
			if v.VoteGranted {
				votes++
			}
			if votes > len(rf.peers)/2 && rf.state == candidate {
				rf.mu.Lock()
				rf.state = leader
				rf.mu.Unlock()
				return
			}
		case r := <-rf.rpcChan:
			rf.handleRPC(r)
		}
		_, state = rf._GetState()
	}
}

每个状态主循环中,有一个核心的for select循环来处理节点的事件,然后每次for结束后,重新判断自己是否还是该state,这样就减少了不同线程事件数据并发引起错误的问题,不过估计性能没有纯多线程好就是了。可以看到,每次进入candidate状态时,都会发起新一轮的选举,并且通过channel来监听选举超时以及requestVote RPC的返回,假如票数过半了,就转变为leader状态,然后退出,这样在节点主循环的下一次运行,便会进入leader的循环。

这里还有一个常用的函数maybeCatchup(term int),是对于raft中一个规则的封装:

If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower

最后再看看选举函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

func (rf *Raft) startElection() <-chan *RequestVoteReply {

	rf.mu.Lock()
	rf.state = candidate
	rf.term++
	rf.votedFor = rf.me
	term := rf.term
	rf.lastRPC = time.Now()
	rf.electionTimeout = randomTimeout()
	rf.mu.Unlock()

	voteCh := make(chan *RequestVoteReply)

	for i, _ := range rf.peers {
		if i == rf.me {
			continue
		}
		go func(server int) {
			req := &RequestVoteArgs{Term: term, CandidateId: rf.me}
			reply := &RequestVoteReply{}
			rf.sendRequestVote(server, req, reply)
			voteCh <- reply
		}(i)
	}

	return voteCh
}

由于raft本身就考虑了因网络不可靠而引起的重复rpc或者延迟rpc的情况,所以在发起requestVote RPC的时候,直接使用goroutine并行发送就可以了,假如term更高的节点收到了,会直接忽略。

requestVote RPC

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
	ch := make(chan struct{})
	rf.rpcChan <- rpcCall{
		rpcName: "RequestVote",
		req:     args,
		resp:    reply,
		doneCh:  ch,
	}
	// wait until rpc call finished
	<-ch
}

func (rf *Raft) handleRPC(r rpcCall) {
	switch r.rpcName {
	case "RequestVote":
		rf._RequestVote(r.req.(*RequestVoteArgs), r.resp.(*RequestVoteReply))
		r.doneCh <- struct{}{}
	case "AppendEntries":
		rf._AppendEntries(r.req.(*AppendEntriesArgs), r.resp.(*AppendEntriesReply))
		r.doneCh <- struct{}{}
	}
}

func (rf *Raft) _RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
	rf.maybeCatchup(args.Term)
	term, _ := rf.GetState()
	reply.Term = term
	if term > args.Term {
		reply.VoteGranted = false
		return
	}
	rf.mu.Lock()
	if rf.votedFor == -1 || rf.votedFor == args.CandidateId {
		rf.lastRPC = time.Now()
		reply.VoteGranted = true
		rf.votedFor = args.CandidateId
	}
	rf.mu.Unlock()
}

可以发现,这里使用了一个handleRPC作为函数mux,而rpc函数则负责构造requestVote的函数请求体,并发送到raft.rpcChan里面,最后监听doneCh的事件再返回,这样做主要是等待处理函数把结果更新到reply对象里面再从rpc函数返回,否则调用方无法确定传入的reply结构体是否已经被更新,可以读结果。

总结

runFollowerrunLeader就不详细说了,后面还需要实现日志追加的大量逻辑,但主体结构跟runCandidate差不多,都是一个for select循环。在原本lab的skeleton code里面,是给出一个ticker函数来让你实现for循环并且sleep来决定是否需要发起新选举的,这种写法就导致了一个raft里面会有很多不知道啥时候会发生的go协程,在任意时刻,可能ticker刚好唤醒,也可能别的节点的rpc调用到来,生成新的go协程,这就导致了raft的关键字段,比如state和term等不知道什么时候会被改变,在代码逻辑里面常常需要加锁并判断。

于是在这个实现中,通过for select和rpc channel来使事件的发生尽量串行化,这样显然没有让go协程随意生成更能利用多核,但代码的编写和可读性都好太多,完全是值得的。

无论如何,在debug这个lab的过程中,还是获得了不少并发编程经验的。我大致总结了一个规则:在调用或者写入共享变量的时候,以该代码行为中心,看看前后一定范围内的代码逻辑,假如运行中途有别的线程同时在运行会发生什么现象,举个例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func (rf *Raft) startElection() {
	//...
	if rf.maybeCatchup(maxTerm) {
		return
	}

	rf.mu.Lock()
	if votes > len(rf.peers)/2 && rf.state == candidate && maxTerm == term {
		rf.state = leader
		go rf.sendHeartbeats(rf.term)
	}
	rf.mu.Unlock()
}

在一开始实现的版本中,这里是没有加锁的,随时都可能发生term更新和state被设置为follower。那么在这段代码运行到判断完票数过半,进入更新state为leader之前,就可能有别的节点发到这个节点的term更高的rpc调用,然后在该调用的协程里面进行maybeCatchup(),更新term并把state改为follower,然后当前的rf.state = leader才运行,这样便导致此节点成为更大的term的另一个leader,出现了脑裂现象。