Contents

term-chat:基于gRPC的命令行聊天室go应用

最近参考一个github项目,写了一个类似的命令行聊天室,客户端与服务端基于grpc的Bidirectional streaming进行通信,背后的实现比较有趣,在这里记录一下。

项目效果如下:

/posts/term-chat-dev-blog/images/term-chat-screenshot.png

项目地址:

https://github.com/midknight24/term-chat

gRPC与双向流模式

gRPC是基于http/2的RPC框架,默认是用protocol buffers作为通信序列化器,使用者用其定义好rpc接口,然后用官方提供的protoc工具生成对应的接口代码,并实现具体的调用函数,便可以使用生成的server和client stub进行rpc通信,这是来自官网的跨语言调用流程图:

/posts/term-chat-dev-blog/images/landing-2.png

gRPC提供四种调用模式:

  • simple RPC
  • server-side streaming
  • client-side streaming
  • bidirectional streaming

其中,term-chat使用bidirectional streaming,双向流模式,进行客户端与服务端的消息通信,这是term-chat中具体的rpc函数定义,其中函数的名字定为Stream:

1
rpc Stream (stream StreamMessage) returns (stream StreamMessage) {}

在双向流模式中,客户端和服务端可以同时发送消息给对方,gRPC会保证每个方向下消息的有序性。

具体到go语言的实现上,在客户端方面,protoc会在client api接口中添加一个stream方法,方法的签名是这样的:

1
2
3
type ChatRoomClient interface {
	Stream(ctx context.Context, opts ...grpc.CallOption) (ChatRoom_StreamClient, error)
}

在客户端中,开发者调用Stream来获得一个专门进行Stream这个函数的rpc调用的对象,在程序中通过Send(msg)Recv()来进行通讯,比如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
streamClient, err := c.ChatRoomClient.Stream(ctx)

// recv stream from server
for {
	msg, err := streamClient.Recv()
    if err == io.EOF {
        break
    }
}

//send stream to server
for msg := range input_chan {
    streamClient.Send(msg)
}

而在服务端方面,则会在server接口中添加一个开发者需要实现的Stream方法,方法的签名是这样的:

1
2
3
4
type ChatRoomServer interface {
	Stream(ChatRoom_StreamServer) error
	mustEmbedUnimplementedChatRoomServer()
}

在Stream方法中,grpc会传入一个ChatRoom_StreamServer对象,其作用跟客户端的streamClient类似,开发者通过对其调用Send(msg)Recv()方法来与客户端进行通讯:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func (srv *ChatRoomServer) Stream(ssrv ChatRoom_StreamServer) error {
    // send msg to client
    for msg := range broadcast_chan {
        ssrv.Send(msg)
    }
    // recv stream from client
    for {
        msg, err := ssrv.Recv()
        if err == io.EOF {
            break
        }
    }
}

通讯流程

接下来具体聊一下如何使用双向流模式实现聊天室的通讯,这是一个聊天室应用,那就意味着客户端a的消息发送给服务端后,服务端需要广播给聊天室中的所有其他客户端。而默认情况下,每个rpc调用都会在grpc server端不同的goroutine中执行,所以我们需要在服务端rpc函数中,把Recv()收到的消息广播给别的rpc函数,再由它们调用Send(msg)函数发给各自的客户端, 以此来实现消息广播的效果。

在go语言中,可以使用channel来实现goroutine间的消息传递,然后用一个map来管理用户id与对应channel的映射,以下是term-chat整体的函数调用架构:

下面,我们走一遍客户端发rpc消息到服务端,服务端广播,在另一个客户端的rpc中收到消息这个过程来讲解这个架构。

消息发送

首先,这是客户端的stream主函数,负责启动接收循环和发送循环:

1
2
3
4
5
6
7
func (c *client) StreamChats(ctx context.Context, input, output chan string) {
    // sc就是上面提到的专门用于stream rpc调用的对象
    sc, err := c.ChatRoomClient.Stream(ctx)
    defer sc.CloseSend()
    go c.sendLoop(sc, input)
    c.recvLoop(sc, output)
}

在发送消息的客户端中,sendLoop不断从input channel获取用户输入,并调用sc.Send发送给服务端:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15

func (c *client) sendLoop(sc chat.ChatRoom_StreamClient, input chan string) {
Loop:
	for {
		select {
		case msg, ok := <-input:
			if !ok {
				break Loop
			}
            sc.Send(msg)
		case <-sc.Context().Done():
			break Loop
		}
	}
}

服务端接收消息

此时,服务端的Stream rpc函数便会在ssrv.Recv中收到消息,然后发送给服务端的Broadcast channel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func (srv *ChatServer) Stream(ssrv chat.ChatRoom_StreamServer) error {
    tkn, ok := srv.extractToken(ssrv.Context())
	srv.openStream(tkn)
	srv.broadcastEnter(tkn)
	defer srv.closeStream(tkn)
	defer srv.broadcastExit(tkn)

	go srv.sendBroadcasts(ssrv, tkn)

	for {
		msg, err := ssrv.Recv()
		if err == io.EOF {
			return nil
		} else if err != nil {
			return err
		}
		srv.Broadcast <- srv.handleStreamMsg(msg)
	}
}

除此之外,Stream函数还需要把其他用户的消息通过ssrv.Send发送回给这个用户,这一步是如何实现的呢?

首先,在客户端发起这个stream函数调用的时候,stream会调用openStream(tkn)函数,为这个客户在服务器map中创建一个stream channel:

1
2
3
4
5
6
7
func (srv *ChatServer) openStream(id string) chan *chat.StreamMessage {
	stream := make(chan *chat.StreamMessage, 1000)
	srv.streamsMtx.Lock()
	srv.ClientStreams[id] = stream
	srv.streamsMtx.Unlock()
	return stream
}

这个channel用来存放要发送给这个客户的广播消息。

在创建完channel后,我们另外启动一个goroutine来消费这个channel中的消息,调用ssrv.Send来进行实际的消息发送:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func (srv *ChatServer) sendBroadcasts(ssrv chat.ChatRoom_StreamServer, id string) {
	username, _ := srv.getUsername(id)
	for msg := range srv.ClientStreams[id] {
		if s, ok := status.FromError(ssrv.Send(msg)); ok {
			switch s.Code() {
			case codes.OK:
			case codes.Unavailable, codes.Canceled, codes.DeadlineExceeded:
				log.Printf("client [%s] terminated\n", username)
			default:
				log.Printf("failed to send to client [%s], error: %v", username, s.Err())
			}
		}
	}

}

客户端接收广播

在StreamChats函数执行时,最后会阻塞在最后的recvLoop()中,进行广播消息的接收, 并发送到ouput channel:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (c *client) recvLoop(sc chat.ChatRoom_StreamClient, output chan string) {
	for {
		msg, err := sc.Recv()
		if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
			fmt.Println("stream cancelled")
			return
		} else if err == io.EOF {
			fmt.Println("stream closed by server")
			return
		} else if err != nil {
			fmt.Println(err)
			return
		}
		output <- formatClientMessage(msg)
	}
}

至此,我们的双向通信有了一个大概的模样:

1
2
客户A发消息:clientA -> sendLoop() -> sc.Send() -> ssrv.Recv() -> srv.Broadcast
客户B收消息:srv.ClientStreams[id] -> sendBroadcasts() -> sc.Recv() -> recvLoop()

不知道你有没有发现,这个通信链条是断的,还缺少从srv.Broadcast channel把客户A的消息发送到ClientStreams字典中的所有channel这一步,也就是消息广播这一步。

这一步由server的broadcast()函数负责,它在服务器启动时会被另一个goroutine启用:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func (srv *ChatServer) Run() {
	lis, err := net.Listen("tcp", srv.Addr)
	if err != nil {
		log.Fatalf("%v", err)
	}
	var opts []grpc.ServerOption
	grpcServer := grpc.NewServer(opts...)
	chat.RegisterChatRoomServer(grpcServer, srv)

    // 这里:
	go srv.broadcast()

	fmt.Println("serving")
	err = grpcServer.Serve(lis)

	if err != nil {
		log.Fatal(err)
	}
}

broadcast函数十分简单,就是从Broadcast channel中不断读取消息,每读到一个,便发送给每个clientstreams的channel中,最后由每个客户各自的rpc stream调用负责进行最后的发送。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func (srv *ChatServer) broadcast() {
	for msg := range srv.Broadcast {
		srv.streamsMtx.RLock()
		for _, stream := range srv.ClientStreams {
			select {
			case stream <- msg:
			default:
			}
		}
		srv.streamsMtx.RUnlock()
	}
}

至此,整个链条就打通了:

1
2
3
客户A发消息:clientA -> sendLoop() -> sc.Send() -> ssrv.Recv() -> srv.Broadcast
-> srv.Broadcast() ->
客户B收消息:srv.ClientStreams[id] -> sendBroadcasts() -> sc.Recv() -> recvLoop()

可优化的点

  • 用sync.map来承载clientStreams,这样便不用手动加锁解锁
  • bubbletea等tui框架实现客户端

Reference

https://github.com/rodaine/grpc-chat

https://grpc.io/docs/languages/go/basics/

https://blog.wu-boy.com/2022/04/simple-publish-subscribe-pattern-in-golang/