Lab3目标
在顺利完成整个lab2后,我们成功实现了一个基于raft的共享日志库,它是一个日志复制框架,可以把日志可靠地复制到不同节点中,从而实现对日志序列的高可用。在之前的lab2中我们讲到,共识算法的主要作用之一是在无需第三方”arbitrator”介入的前提下,实现一组节点对一组数据达成共识,这一特性使得应用raft的日志集群十分适合用来为分布式系统提供配置相关的数据,如当前leader是谁(leader选举),微服务的真实ip(服务注册发现)等,因为它自身就避免了单点故障,不会因网络分区而引入不正确性。
而在lab3中,我们就需要用自己在lab2实现的raft library,构建一个可以被分布式系统使用的高可用键值存储。由于raft本身是基于日志的,所以我们就需要通过日志来表达键值数据,比如说,一开始数据是x=3, y=4,有一个客户发起了x=5的put请求,于是现在有x=5, y=4,便可以用这样的日志序列来表示:
1
|
PUT(x, 3) -> PUT(y, 4) -> PUT(x, 5)
|
为了不让lab过于困难,学生们需要让对kv server的所有请求都是可线性化(linearizable)的,也就是说:
- 假如每个请求(GET & PUT)一个一个的发过来,那么系统应该就像它内部只有一个状态一样处理它们,每个请求都应观察到它之前请求对这状态的影响
- 假如有并发请求,那么最终整个系统的状态应该跟某一个可能的线性执行顺序一致,通俗地讲,假如有并发请求A和B,那么最终态必须跟线性执行A→B或者B→A一致,而不能跟A执行一半,B执行,然后A执行后一半所导致的系统状态一致
要实现这个特性其实并不难:只需要把所有请求,无论是GET还是PUT,都看作一样的日志条目,写入raft就可以了,当过半数的raft节点对某一条日志(请求)达成共识,那么leader就可以执行这个请求,并发送响应给用户。
读写的可线性化实际上简化了系统设计,现实世界中的分布式kv服务很多是不会对读作可线性化要求,这样的好处是follower也可以处理客户端的读请求,而很多服务都是读多写少的场景,这样可以减轻leader节点的压力,并提高整个分布式系统的并发性能。比如zookeeper就只要求可线性化写,而不同客户端可能因为读取不同的follower而读到过期数据。
整体架构
lab3要实现的kv server简化版架构如下,复杂详细点的在这里
每个客户端需要构建一个clerk,这个clerk负责与raft的leader节点通信。每个raft节点都有一个server,但只有leader所在的server会接收所有clerk的日志RPC请求,其余则会拒绝。在接收多个请求的同时,server天然根据接收时间的不同决定一个并发请求执行的顺序,然后发送给自身的raft实例,最后raft实例通过raft算法把请求日志复制到所有其他节点中。
具体来说,我们需要实现clerk和server,clerk与server交互,server和lab2写好的raft交互。
代码实现
Clerk
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
type Clerk struct {
servers []*labrpc.ClientEnd
//...
}
func (ck *Clerk) PutAppend(key string, value string, op string) {
rm := ck.newRequest()
args := &PutAppendArgs{
Op: op,
Key: key,
Value: value,
RequestMeta: rm,
}
reply := &PutAppendReply{}
for reply.Err != OK {
reply = &PutAppendReply{}
replied := ck.servers[ck.lastLeader].Call("KVServer.PutAppend", args, reply)
if reply.Err == ErrWrongLeader || !replied {
DPrintf("%v is not the leader or failed, retrying...", ck.lastLeader)
ck.guessLeader()
}
time.Sleep(3 * time.Millisecond)
}
}
|
上面是clerk的部分代码。可以发现,客户端在实例化clerk后,可以通过ck.PutAppend()
来向kvraft发请求(get/put/append),clerk在收到请求后,会对结构体中的servers进行转发,具体来说,由于raft集群的leader可能在任意时刻改变,所以clerk需要随机选取并进行RPC请求尝试,重试直到找到leader为止,这也就是guessLeader()
做的事情:随机选取leader并记录下来,那么假如成功,则当前leader能直接被后续请求找到,失败则重新随机选一个试试。
Server
在讲解server代码之前,需要先介绍server具体是如何跟raft交互的:
- 假如需要发日志给raft,server通过
func (rf *Raft) Start(command interface{}) (int, int, bool)
来把日志作为command发送给背后的raft实例,raft假如是leader,就会把command存放在自己的logs中,然后立即返回,也就是说,server无法确定这个日志是否能达成共识并存放在raft集群中,它只能获得当前命令在raft logs中的index以及当前的raft任期号。
- 在日志达成共识后,server需要接受这些已经合法的日志,并应用到自己的kv数据库中,这一点通过一个go channel来实现,server在实例化raft的时候,把一个用来接收共识日志的go channel发给它,然后不停从这个channel接收数据,而raft则负责在apply的时候把日志发到这个channel中。
基于上述的交互,我们可以发现一个问题:当客户端发送RPC到server后,server发给raft,但server实际上并不知道这个日志什么时候会达成共识,raft什么时候会通过applyCh返回过来,也就是说,server接收客户端RPC后,这个RPC的返回时间是不确定的(lab3的实现中,RPC只有在日志在raft中成功存储后才允许返回给客户端,不能通过异步立即返回)
这就是说,对于任意一个Server中的RPC请求,其RPC函数需要在发送日志给raft之后阻塞,直到raft通过applyCh发送该日志对应的应用指令之后,函数才能返回,假如一直收不到,那么就一直不返回。
在了解这一点后,我们来看看一个RPC函数例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
cid, seq := args.RequestMeta.ClientID, args.RequestMeta.Seq
id :=uuid.New()
op := Op{
UUID: id,
Action: ActionGet,
Key: args.Key,
ClientID: cid,
Seq: seq,
}
// 通过start()发送给raft
index, _, isLeader := kv.rf.Start(op)
if !isLeader {
reply.Err = ErrWrongLeader
return
}
// 创建一个通知channel,让server在接收到对应的apply msg后调用
notifyCh := make(chan OpResult)
ok := kv.callbackChs.Append(index, notifyCh)
if !ok {
return
}
// 阻塞直到日志被raft集群接受,被server应用
newop := <-notifyCh
if newop.op.UUID == id {
reply.Value = newop.value
reply.Err = Err(newop.err)
}
}
|
在这个Get函数中,server在通过Start()
把日志发送给raft后,会建立一个通知channel,然后把这个channel注册到server中的callbackChs
数据结构中,并阻塞等待这个channel,可以想到,server会在applyCh收到“该日志的被raft集群接受,可以应用”的消息后在callbackChs
中找到这个channel,然后把日志的处理结果通过channel发送回给当前的函数,此时函数便可以返回给客户端,RPC请求结束。
值得注意的是,返回的位于index的日志应用消息不一定就是当前RPC发过去的,在日志复制失败,且其他leader当选的情况下,当前index最后确定下来的日志可能来自别的请求,所以需要做一个uuid的判断,只有确定是自己发送过去的日志,才能返回成功给客户端。
接下来我们就可以看一下server从applyCh接收到日志后,是如何进行处理得:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
func (kv *KVServer) applier() {
timer := time.NewTicker(10 * time.Millisecond)
defer timer.Stop()
lastTerm := kv.rf.GetTerm()
for {
select {
case msg := <-kv.applyCh:
op := msg.Command.(Op)
opres := OpResult{op: &op}
switch op.Action {
case ActionGet:
val, ok := kv.kvdata.Load(op.Key)
if !ok {
opres.err = ErrNoKey
} else {
opres.err = OK
opres.value = val.(string)
}
}
//Put & Append cases omitted
go kv.callbackChs.WakeAll(msg.CommandIndex, opres)
case <-timer.C:
curTerm := kv.rf.GetTerm()
if curTerm != lastTerm {
lastTerm = curTerm
go kv.callbackChs.PurgeAll()
}
case <- kv.killCh:
return
}
}
}
|
上面的applier函数会在server实例化的时候在单独一个背景goroutine执行,我们可以看到,如上面所说,server会在从applyCh接收到应用消息后,把对应的请求日志应用到自己维护的kv数据结构中(为了方便,我这里直接使用了sync.Map
),然后调用WakeAll()
函数,与此同时,定期检查背后raft节点的任期是否变更了,假如变更了,说明自己可能不再是leader,此时调用PurgeAll()
函数,把所有等待中的RPC函数释放掉,让它们重新发请求。
最后,我们来看看自定义的callbackChs
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
type callbackChMap struct {
sync.Mutex
chanMap map[int][]chan OpResult
lastCommitIndex int
}
func (cm *callbackChMap) Append(index int, ch chan OpResult) bool {
cm.Lock()
defer cm.Unlock()
if index <= cm.lastCommitIndex {
return false
}
cm.chanMap[index] = append(cm.chanMap[index], ch)
return true
}
func (cm *callbackChMap) WakeAll(index int, op OpResult) {
cm.Lock()
defer cm.Unlock()
chans := cm.chanMap[index]
cm.chanMap[index] = nil
cm.lastCommitIndex = max(index, cm.lastCommitIndex)
for _, c := range chans {
c <- op
}
}
func (cm *callbackChMap) PurgeAll() {
DPrintf("///////////////////////////KILL////////////////////////")
cm.Lock()
defer cm.Unlock()
for _, chans := range cm.chanMap {
for _, c := range chans {
c <- OpResult{err: ErrTermChanged}
}
}
cm.chanMap = make(map[int][]chan OpResult)
}
|
简单来说,当server通过applyCh收到index为x的日志的应用消息后,它便会唤醒所有等待index为x的RPC请求,让它们判断index位置最后确定下来的日志是否属于它们(实际上,现在看来,貌似对于一个raft leader来说,每个index等待的RPC请求只会有一个,可以不用[]chan,不知道当时怎么想的)
请求去重
在lab3实现kvserver的过程中,我们还需要处理请求重复的问题:
One problem you’ll face is that a Clerk may have to send an RPC multiple times until it finds a kvserver that replies positively. If a leader fails just after committing an entry to the Raft log, the Clerk may not receive a reply, and thus may re-send the request to another leader.
在上面描述的情况下,一个clerk的请求X便被raft执行了两次,这实际上也是分布式系统中,RPC调用的一个核心问题:调用方在RPC没有返回的情况下,不知道RPC到底有没有执行,或者执行到哪一步。lab3的项目网页很贴心地给出了详尽的提示,简单来说,就是以下几点:
- 每个客户端一个uuid,其每个请求一个自增序列号seq来标识每个请求的唯一性,并把这些信息一并放入raft的日志体中
- server维护一个哈希表,记录每个客户端对应的最后一次请求:
- 当收到来自该客户端的请求中seq号比最后一次请求的seq号要小,那说明是旧请求,直接返回
- 在最后一次请求处理成功后,在applier协程中,通过日志体中先前附带的uuid和seq定位哈希表中对应的客户端,并更新最后一次请求的处理结果,这样做的目的有两个,一是重复请求可以直接拿到结果,第二是假如旧leader宕机,新leader当选,原本找旧leader发请求的客户端可以直接找新leader拿到处理结果
- 只有在请求对应的日志成功被raft集群接受后,才能更新对应的哈希表,否则假如接收到请求后就立刻更新,假如该请求对应的日志由于集群宕机,后续被其他相同index的日志覆盖,那么对应的客户端重发的请求会为认为是重复的,而永远无法执行
- 有一个特殊的情况:一个重复的请求在最初的请求达成共识前到达server,那么它们都会进入raft的logs中,这就说明,logs中可能会有重复的日志。针对这种情况,server在从applyCh中接受应用消息的时候,也要对消息进行查重,重复的apply msg不被执行。
测试结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
Test: one client (3A) ...
... Passed -- 15.1 5 6314 702
Test: ops complete fast enough (3A) ...
timePerOp: 21.850518ms ... Passed -- 22.5 3 5445 0
Test: many clients (3A) ...
... Passed -- 23.2 5 10100 705
Test: unreliable net, many clients (3A) ...
... Passed -- 16.2 5 8372 868
Test: concurrent append to same key, unreliable (3A) ...
... Passed -- 1.9 3 356 52
Test: progress in majority (3A) ...
... Passed -- 0.7 5 113 2
Test: no progress in minority (3A) ...
... Passed -- 1.0 5 189 3
Test: completion after heal (3A) ...
... Passed -- 1.1 5 76 3
Test: partitions, one client (3A) ...
... Passed -- 22.7 5 6568 678
Test: partitions, many clients (3A) ...
... Passed -- 22.9 5 13417 792
Test: restarts, one client (3A) ...
... Passed -- 19.9 5 6464 782
Test: restarts, many clients (3A) ...
... Passed -- 20.7 5 8371 998
Test: unreliable net, restarts, many clients (3A) ...
... Passed -- 21.5 5 9144 865
Test: restarts, partitions, many clients (3A) ...
... Passed -- 28.2 5 12516 776
Test: unreliable net, restarts, partitions, many clients (3A) ...
... Passed -- 28.3 5 9101 679
Test: unreliable net, restarts, partitions, random keys, many clients (3A) ...
... Passed -- 33.5 7 17149 982
PASS
ok 6.5840/kvraft 280.186s
real 4m40.552s
user 9m47.979s
sys 0m20.625s
|