Contents

MIT 6.824 Lab3A笔记

Lab3目标

在顺利完成整个lab2后,我们成功实现了一个基于raft的共享日志库,它是一个日志复制框架,可以把日志可靠地复制到不同节点中,从而实现对日志序列的高可用。在之前的lab2中我们讲到,共识算法的主要作用之一是在无需第三方”arbitrator”介入的前提下,实现一组节点对一组数据达成共识,这一特性使得应用raft的日志集群十分适合用来为分布式系统提供配置相关的数据,如当前leader是谁(leader选举),微服务的真实ip(服务注册发现)等,因为它自身就避免了单点故障,不会因网络分区而引入不正确性。

而在lab3中,我们就需要用自己在lab2实现的raft library,构建一个可以被分布式系统使用的高可用键值存储。由于raft本身是基于日志的,所以我们就需要通过日志来表达键值数据,比如说,一开始数据是x=3, y=4,有一个客户发起了x=5的put请求,于是现在有x=5, y=4,便可以用这样的日志序列来表示:

1
PUT(x, 3) -> PUT(y, 4) -> PUT(x, 5)

为了不让lab过于困难,学生们需要让对kv server的所有请求都是可线性化(linearizable)的,也就是说:

  • 假如每个请求(GET & PUT)一个一个的发过来,那么系统应该就像它内部只有一个状态一样处理它们,每个请求都应观察到它之前请求对这状态的影响
  • 假如有并发请求,那么最终整个系统的状态应该跟某一个可能的线性执行顺序一致,通俗地讲,假如有并发请求A和B,那么最终态必须跟线性执行A→B或者B→A一致,而不能跟A执行一半,B执行,然后A执行后一半所导致的系统状态一致

要实现这个特性其实并不难:只需要把所有请求,无论是GET还是PUT,都看作一样的日志条目,写入raft就可以了,当过半数的raft节点对某一条日志(请求)达成共识,那么leader就可以执行这个请求,并发送响应给用户。

读写的可线性化实际上简化了系统设计,现实世界中的分布式kv服务很多是不会对读作可线性化要求,这样的好处是follower也可以处理客户端的读请求,而很多服务都是读多写少的场景,这样可以减轻leader节点的压力,并提高整个分布式系统的并发性能。比如zookeeper就只要求可线性化写,而不同客户端可能因为读取不同的follower而读到过期数据。

整体架构

lab3要实现的kv server简化版架构如下,复杂详细点的在这里

每个客户端需要构建一个clerk,这个clerk负责与raft的leader节点通信。每个raft节点都有一个server,但只有leader所在的server会接收所有clerk的日志RPC请求,其余则会拒绝。在接收多个请求的同时,server天然根据接收时间的不同决定一个并发请求执行的顺序,然后发送给自身的raft实例,最后raft实例通过raft算法把请求日志复制到所有其他节点中。

具体来说,我们需要实现clerk和server,clerk与server交互,server和lab2写好的raft交互。

代码实现

Clerk

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type Clerk struct {
	servers []*labrpc.ClientEnd
	//...
}

func (ck *Clerk) PutAppend(key string, value string, op string) {
	rm := ck.newRequest()
	args := &PutAppendArgs{
		Op: op,
		Key: key,
		Value: value,
		RequestMeta: rm,
	}
	reply := &PutAppendReply{}
	for reply.Err != OK {
		reply = &PutAppendReply{}
		replied := ck.servers[ck.lastLeader].Call("KVServer.PutAppend", args, reply)
		if reply.Err == ErrWrongLeader || !replied {
			DPrintf("%v is not the leader or failed, retrying...", ck.lastLeader)
			ck.guessLeader()
		}
		time.Sleep(3 * time.Millisecond)
	}
}

上面是clerk的部分代码。可以发现,客户端在实例化clerk后,可以通过ck.PutAppend()来向kvraft发请求(get/put/append),clerk在收到请求后,会对结构体中的servers进行转发,具体来说,由于raft集群的leader可能在任意时刻改变,所以clerk需要随机选取并进行RPC请求尝试,重试直到找到leader为止,这也就是guessLeader()做的事情:随机选取leader并记录下来,那么假如成功,则当前leader能直接被后续请求找到,失败则重新随机选一个试试。

Server

在讲解server代码之前,需要先介绍server具体是如何跟raft交互的:

  • 假如需要发日志给raft,server通过func (rf *Raft) Start(command interface{}) (int, int, bool) 来把日志作为command发送给背后的raft实例,raft假如是leader,就会把command存放在自己的logs中,然后立即返回,也就是说,server无法确定这个日志是否能达成共识并存放在raft集群中,它只能获得当前命令在raft logs中的index以及当前的raft任期号。
  • 在日志达成共识后,server需要接受这些已经合法的日志,并应用到自己的kv数据库中,这一点通过一个go channel来实现,server在实例化raft的时候,把一个用来接收共识日志的go channel发给它,然后不停从这个channel接收数据,而raft则负责在apply的时候把日志发到这个channel中。

基于上述的交互,我们可以发现一个问题:当客户端发送RPC到server后,server发给raft,但server实际上并不知道这个日志什么时候会达成共识,raft什么时候会通过applyCh返回过来,也就是说,server接收客户端RPC后,这个RPC的返回时间是不确定的(lab3的实现中,RPC只有在日志在raft中成功存储后才允许返回给客户端,不能通过异步立即返回)

这就是说,对于任意一个Server中的RPC请求,其RPC函数需要在发送日志给raft之后阻塞,直到raft通过applyCh发送该日志对应的应用指令之后,函数才能返回,假如一直收不到,那么就一直不返回。

在了解这一点后,我们来看看一个RPC函数例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
	cid, seq := args.RequestMeta.ClientID, args.RequestMeta.Seq
	id :=uuid.New()
	op := Op{
		UUID: id,
		Action: ActionGet,
		Key: args.Key,
		ClientID: cid,
		Seq: seq,
	}
	// 通过start()发送给raft
	index, _, isLeader := kv.rf.Start(op)
	if !isLeader {
		reply.Err = ErrWrongLeader
		return
	}
	// 创建一个通知channel,让server在接收到对应的apply msg后调用
	notifyCh := make(chan OpResult)
	ok := kv.callbackChs.Append(index, notifyCh)
	if !ok {
		return
	}
	// 阻塞直到日志被raft集群接受,被server应用
	newop := <-notifyCh
	if newop.op.UUID == id {
		reply.Value = newop.value
		reply.Err = Err(newop.err)
	}
}

在这个Get函数中,server在通过Start()把日志发送给raft后,会建立一个通知channel,然后把这个channel注册到server中的callbackChs数据结构中,并阻塞等待这个channel,可以想到,server会在applyCh收到“该日志的被raft集群接受,可以应用”的消息后在callbackChs中找到这个channel,然后把日志的处理结果通过channel发送回给当前的函数,此时函数便可以返回给客户端,RPC请求结束。

值得注意的是,返回的位于index的日志应用消息不一定就是当前RPC发过去的,在日志复制失败,且其他leader当选的情况下,当前index最后确定下来的日志可能来自别的请求,所以需要做一个uuid的判断,只有确定是自己发送过去的日志,才能返回成功给客户端。

接下来我们就可以看一下server从applyCh接收到日志后,是如何进行处理得:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (kv *KVServer) applier() {
	timer := time.NewTicker(10 * time.Millisecond)
	defer timer.Stop()
	lastTerm := kv.rf.GetTerm()
	for {
		select {
		case msg := <-kv.applyCh:
				op := msg.Command.(Op)
				opres := OpResult{op: &op}
				switch op.Action {
				case ActionGet:
					val, ok := kv.kvdata.Load(op.Key)
					if !ok {
						opres.err = ErrNoKey
					} else {
						opres.err = OK
						opres.value = val.(string)
					}
				}
				//Put & Append cases omitted
				go kv.callbackChs.WakeAll(msg.CommandIndex, opres)
		case <-timer.C:
			curTerm := kv.rf.GetTerm()
			if curTerm != lastTerm {
				lastTerm = curTerm
				go kv.callbackChs.PurgeAll()
			}
		case <- kv.killCh:
			return
		}
	}
}

上面的applier函数会在server实例化的时候在单独一个背景goroutine执行,我们可以看到,如上面所说,server会在从applyCh接收到应用消息后,把对应的请求日志应用到自己维护的kv数据结构中(为了方便,我这里直接使用了sync.Map),然后调用WakeAll()函数,与此同时,定期检查背后raft节点的任期是否变更了,假如变更了,说明自己可能不再是leader,此时调用PurgeAll()函数,把所有等待中的RPC函数释放掉,让它们重新发请求。

最后,我们来看看自定义的callbackChs

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
type callbackChMap struct {
	sync.Mutex
	chanMap map[int][]chan OpResult
	lastCommitIndex int
}

func (cm *callbackChMap) Append(index int, ch chan OpResult) bool {
	cm.Lock()
	defer cm.Unlock()
	if index <= cm.lastCommitIndex {
		return false
	}
	cm.chanMap[index] = append(cm.chanMap[index], ch)
	return true
}
func (cm *callbackChMap) WakeAll(index int, op OpResult) {
	cm.Lock()
	defer cm.Unlock()
	chans := cm.chanMap[index]
	cm.chanMap[index] = nil
	cm.lastCommitIndex = max(index, cm.lastCommitIndex)
	for _, c := range chans {
		c <- op
	}
}
func (cm *callbackChMap) PurgeAll() {
	DPrintf("///////////////////////////KILL////////////////////////")
	cm.Lock()
	defer cm.Unlock()
	for _, chans := range cm.chanMap {
		for _, c := range chans {
			c <- OpResult{err: ErrTermChanged}
		}
	}
	cm.chanMap = make(map[int][]chan OpResult)
}

简单来说,当server通过applyCh收到index为x的日志的应用消息后,它便会唤醒所有等待index为x的RPC请求,让它们判断index位置最后确定下来的日志是否属于它们(实际上,现在看来,貌似对于一个raft leader来说,每个index等待的RPC请求只会有一个,可以不用[]chan,不知道当时怎么想的)

请求去重

在lab3实现kvserver的过程中,我们还需要处理请求重复的问题:

One problem you’ll face is that a Clerk may have to send an RPC multiple times until it finds a kvserver that replies positively. If a leader fails just after committing an entry to the Raft log, the Clerk may not receive a reply, and thus may re-send the request to another leader.

在上面描述的情况下,一个clerk的请求X便被raft执行了两次,这实际上也是分布式系统中,RPC调用的一个核心问题:调用方在RPC没有返回的情况下,不知道RPC到底有没有执行,或者执行到哪一步。lab3的项目网页很贴心地给出了详尽的提示,简单来说,就是以下几点:

  • 每个客户端一个uuid,其每个请求一个自增序列号seq来标识每个请求的唯一性,并把这些信息一并放入raft的日志体中
  • server维护一个哈希表,记录每个客户端对应的最后一次请求:
    • 当收到来自该客户端的请求中seq号比最后一次请求的seq号要小,那说明是旧请求,直接返回
    • 在最后一次请求处理成功后,在applier协程中,通过日志体中先前附带的uuid和seq定位哈希表中对应的客户端,并更新最后一次请求的处理结果,这样做的目的有两个,一是重复请求可以直接拿到结果,第二是假如旧leader宕机,新leader当选,原本找旧leader发请求的客户端可以直接找新leader拿到处理结果
  • 只有在请求对应的日志成功被raft集群接受后,才能更新对应的哈希表,否则假如接收到请求后就立刻更新,假如该请求对应的日志由于集群宕机,后续被其他相同index的日志覆盖,那么对应的客户端重发的请求会为认为是重复的,而永远无法执行
  • 有一个特殊的情况:一个重复的请求在最初的请求达成共识前到达server,那么它们都会进入raft的logs中,这就说明,logs中可能会有重复的日志。针对这种情况,server在从applyCh中接受应用消息的时候,也要对消息进行查重,重复的apply msg不被执行。

测试结果

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
Test: one client (3A) ...
  ... Passed --  15.1  5  6314  702
Test: ops complete fast enough (3A) ...
timePerOp: 21.850518ms  ... Passed --  22.5  3  5445    0
Test: many clients (3A) ...
  ... Passed --  23.2  5 10100  705
Test: unreliable net, many clients (3A) ...
  ... Passed --  16.2  5  8372  868
Test: concurrent append to same key, unreliable (3A) ...
  ... Passed --   1.9  3   356   52
Test: progress in majority (3A) ...
  ... Passed --   0.7  5   113    2
Test: no progress in minority (3A) ...
  ... Passed --   1.0  5   189    3
Test: completion after heal (3A) ...
  ... Passed --   1.1  5    76    3
Test: partitions, one client (3A) ...
  ... Passed --  22.7  5  6568  678
Test: partitions, many clients (3A) ...
  ... Passed --  22.9  5 13417  792
Test: restarts, one client (3A) ...
  ... Passed --  19.9  5  6464  782
Test: restarts, many clients (3A) ...
  ... Passed --  20.7  5  8371  998
Test: unreliable net, restarts, many clients (3A) ...
  ... Passed --  21.5  5  9144  865
Test: restarts, partitions, many clients (3A) ...
  ... Passed --  28.2  5 12516  776
Test: unreliable net, restarts, partitions, many clients (3A) ...
  ... Passed --  28.3  5  9101  679
Test: unreliable net, restarts, partitions, random keys, many clients (3A) ...
  ... Passed --  33.5  7 17149  982
PASS
ok      6.5840/kvraft   280.186s

real    4m40.552s
user    9m47.979s
sys     0m20.625s