最近参考一个github项目,写了一个类似的命令行聊天室,客户端与服务端基于grpc的Bidirectional streaming进行通信,背后的实现比较有趣,在这里记录一下。
项目效果如下:
项目地址:
https://github.com/midknight24/term-chat
gRPC与双向流模式
gRPC是基于http/2的RPC框架,默认是用protocol buffers作为通信序列化器,使用者用其定义好rpc接口,然后用官方提供的protoc工具生成对应的接口代码,并实现具体的调用函数,便可以使用生成的server和client stub进行rpc通信,这是来自官网的跨语言调用流程图:
gRPC提供四种调用模式:
- simple RPC
- server-side streaming
- client-side streaming
- bidirectional streaming
其中,term-chat使用bidirectional streaming,双向流模式,进行客户端与服务端的消息通信,这是term-chat中具体的rpc函数定义,其中函数的名字定为Stream:
1
|
rpc Stream (stream StreamMessage) returns (stream StreamMessage) {}
|
在双向流模式中,客户端和服务端可以同时发送消息给对方,gRPC会保证每个方向下消息的有序性。
具体到go语言的实现上,在客户端方面,protoc会在client api接口中添加一个stream方法,方法的签名是这样的:
1
2
3
|
type ChatRoomClient interface {
Stream(ctx context.Context, opts ...grpc.CallOption) (ChatRoom_StreamClient, error)
}
|
在客户端中,开发者调用Stream来获得一个专门进行Stream这个函数的rpc调用的对象,在程序中通过Send(msg)
和Recv()
来进行通讯,比如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
streamClient, err := c.ChatRoomClient.Stream(ctx)
// recv stream from server
for {
msg, err := streamClient.Recv()
if err == io.EOF {
break
}
}
//send stream to server
for msg := range input_chan {
streamClient.Send(msg)
}
|
而在服务端方面,则会在server接口中添加一个开发者需要实现的Stream方法,方法的签名是这样的:
1
2
3
4
|
type ChatRoomServer interface {
Stream(ChatRoom_StreamServer) error
mustEmbedUnimplementedChatRoomServer()
}
|
在Stream方法中,grpc会传入一个ChatRoom_StreamServer
对象,其作用跟客户端的streamClient类似,开发者通过对其调用Send(msg)
和Recv()
方法来与客户端进行通讯:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func (srv *ChatRoomServer) Stream(ssrv ChatRoom_StreamServer) error {
// send msg to client
for msg := range broadcast_chan {
ssrv.Send(msg)
}
// recv stream from client
for {
msg, err := ssrv.Recv()
if err == io.EOF {
break
}
}
}
|
通讯流程
接下来具体聊一下如何使用双向流模式实现聊天室的通讯,这是一个聊天室应用,那就意味着客户端a的消息发送给服务端后,服务端需要广播给聊天室中的所有其他客户端。而默认情况下,每个rpc调用都会在grpc server端不同的goroutine中执行,所以我们需要在服务端rpc函数中,把Recv()
收到的消息广播给别的rpc函数,再由它们调用Send(msg)
函数发给各自的客户端, 以此来实现消息广播的效果。
在go语言中,可以使用channel来实现goroutine间的消息传递,然后用一个map来管理用户id与对应channel的映射,以下是term-chat整体的函数调用架构:
下面,我们走一遍客户端发rpc消息到服务端,服务端广播,在另一个客户端的rpc中收到消息这个过程来讲解这个架构。
消息发送
首先,这是客户端的stream主函数,负责启动接收循环和发送循环:
1
2
3
4
5
6
7
|
func (c *client) StreamChats(ctx context.Context, input, output chan string) {
// sc就是上面提到的专门用于stream rpc调用的对象
sc, err := c.ChatRoomClient.Stream(ctx)
defer sc.CloseSend()
go c.sendLoop(sc, input)
c.recvLoop(sc, output)
}
|
在发送消息的客户端中,sendLoop不断从input channel获取用户输入,并调用sc.Send
发送给服务端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
func (c *client) sendLoop(sc chat.ChatRoom_StreamClient, input chan string) {
Loop:
for {
select {
case msg, ok := <-input:
if !ok {
break Loop
}
sc.Send(msg)
case <-sc.Context().Done():
break Loop
}
}
}
|
服务端接收消息
此时,服务端的Stream rpc函数便会在ssrv.Recv
中收到消息,然后发送给服务端的Broadcast channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
func (srv *ChatServer) Stream(ssrv chat.ChatRoom_StreamServer) error {
tkn, ok := srv.extractToken(ssrv.Context())
srv.openStream(tkn)
srv.broadcastEnter(tkn)
defer srv.closeStream(tkn)
defer srv.broadcastExit(tkn)
go srv.sendBroadcasts(ssrv, tkn)
for {
msg, err := ssrv.Recv()
if err == io.EOF {
return nil
} else if err != nil {
return err
}
srv.Broadcast <- srv.handleStreamMsg(msg)
}
}
|
除此之外,Stream函数还需要把其他用户的消息通过ssrv.Send
发送回给这个用户,这一步是如何实现的呢?
首先,在客户端发起这个stream函数调用的时候,stream会调用openStream(tkn)
函数,为这个客户在服务器map中创建一个stream channel:
1
2
3
4
5
6
7
|
func (srv *ChatServer) openStream(id string) chan *chat.StreamMessage {
stream := make(chan *chat.StreamMessage, 1000)
srv.streamsMtx.Lock()
srv.ClientStreams[id] = stream
srv.streamsMtx.Unlock()
return stream
}
|
这个channel用来存放要发送给这个客户的广播消息。
在创建完channel后,我们另外启动一个goroutine来消费这个channel中的消息,调用ssrv.Send
来进行实际的消息发送:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
func (srv *ChatServer) sendBroadcasts(ssrv chat.ChatRoom_StreamServer, id string) {
username, _ := srv.getUsername(id)
for msg := range srv.ClientStreams[id] {
if s, ok := status.FromError(ssrv.Send(msg)); ok {
switch s.Code() {
case codes.OK:
case codes.Unavailable, codes.Canceled, codes.DeadlineExceeded:
log.Printf("client [%s] terminated\n", username)
default:
log.Printf("failed to send to client [%s], error: %v", username, s.Err())
}
}
}
}
|
客户端接收广播
在StreamChats函数执行时,最后会阻塞在最后的recvLoop()中,进行广播消息的接收, 并发送到ouput channel:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
func (c *client) recvLoop(sc chat.ChatRoom_StreamClient, output chan string) {
for {
msg, err := sc.Recv()
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
fmt.Println("stream cancelled")
return
} else if err == io.EOF {
fmt.Println("stream closed by server")
return
} else if err != nil {
fmt.Println(err)
return
}
output <- formatClientMessage(msg)
}
}
|
至此,我们的双向通信有了一个大概的模样:
1
2
|
客户A发消息:clientA -> sendLoop() -> sc.Send() -> ssrv.Recv() -> srv.Broadcast
客户B收消息:srv.ClientStreams[id] -> sendBroadcasts() -> sc.Recv() -> recvLoop()
|
不知道你有没有发现,这个通信链条是断的,还缺少从srv.Broadcast channel把客户A的消息发送到ClientStreams字典中的所有channel这一步,也就是消息广播这一步。
这一步由server的broadcast()函数负责,它在服务器启动时会被另一个goroutine启用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
func (srv *ChatServer) Run() {
lis, err := net.Listen("tcp", srv.Addr)
if err != nil {
log.Fatalf("%v", err)
}
var opts []grpc.ServerOption
grpcServer := grpc.NewServer(opts...)
chat.RegisterChatRoomServer(grpcServer, srv)
// 这里:
go srv.broadcast()
fmt.Println("serving")
err = grpcServer.Serve(lis)
if err != nil {
log.Fatal(err)
}
}
|
broadcast函数十分简单,就是从Broadcast channel中不断读取消息,每读到一个,便发送给每个clientstreams的channel中,最后由每个客户各自的rpc stream调用负责进行最后的发送。
1
2
3
4
5
6
7
8
9
10
11
12
|
func (srv *ChatServer) broadcast() {
for msg := range srv.Broadcast {
srv.streamsMtx.RLock()
for _, stream := range srv.ClientStreams {
select {
case stream <- msg:
default:
}
}
srv.streamsMtx.RUnlock()
}
}
|
至此,整个链条就打通了:
1
2
3
|
客户A发消息:clientA -> sendLoop() -> sc.Send() -> ssrv.Recv() -> srv.Broadcast
-> srv.Broadcast() ->
客户B收消息:srv.ClientStreams[id] -> sendBroadcasts() -> sc.Recv() -> recvLoop()
|
可优化的点
- 用sync.map来承载clientStreams,这样便不用手动加锁解锁
- 用bubbletea等tui框架实现客户端
Reference
https://github.com/rodaine/grpc-chat
https://grpc.io/docs/languages/go/basics/
https://blog.wu-boy.com/2022/04/simple-publish-subscribe-pattern-in-golang/