目录
流式处理
所谓流式处理,就是客户端和服务端一方可以源源不断地发送请求,另一方按照发送顺序依次处理请求。流式 RPC 分为三种,分别是服务端流式、客户端流式、双向流式
服务端流式
客户端发送一个请求给服务端,可获取一个数据流用来读取一系列消息。客户端从返回的数据流里一直读取直到没有更多消息为止。
rpc Hello (HelloRequest) returns (stream HelloResponse);
客户端流式
客户端用提供的一个数据流写入并发送一系列消息给服务端。一旦客户端完成消息写入,就等待服务端读取这些消息并返回应答。
rpc Hello (stream HelloRequest) returns (HelloResponse);
双向流式
两边都可以分别通过一个读写数据流来发送一系列消息。这两个数据流操作是相互独立的,所以客户端和服务端能按其希望的任意顺序读写,例如:服务端可以在写应答前等待所有的客户端消息,或者它可以先读一个消息再写一个消息,或者是读写相结合的其他方式。每个数据流里消息的顺序会被保持。
rpc Hello (stream HelloRequest) returns (stream HelloResponse);
发布订阅服务
借助流式处理的特点,我们可以通过服务端流式 RPC 实现发布订阅模式,我们使用 gRCP + Docker 中的 pubsub 包来实现一个简单的发布订阅消息队列。
pubsub原理
先来看一下 pubsub 包提供的几个方法,这对我们理解和使用它是很有必要的,核心逻辑已经通过中文注释在源码中体现,如果已经掌握 GoLang 的同步控制,相信读懂它没有什么难度,否则可以先看下这篇文章:https://success.blog.csdn.net/article/details/113788909
package pubsub // import "github.com/docker/docker/pkg/pubsub"
import (
"sync"
"time"
)
//等待组放在共享内存池中,减少GC
var wgPool = sync.Pool{New: func() interface{} { return new(sync.WaitGroup) }}
//第一个参数控制发布时最大阻塞时间
//第二个参数是缓冲区大小,控制每个订阅者的chan缓冲区大小
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
return &Publisher{
buffer: buffer,
timeout: publishTimeout,
subscribers: make(map[subscriber]topicFunc),
}
}
type subscriber chan interface{}
type topicFunc func(v interface{}) bool
type Publisher struct {
m sync.RWMutex //控制订阅者map并发读写安全
buffer int //每个订阅者chan缓冲区大小
timeout time.Duration //发布阻塞超时时间
subscribers map[subscriber]topicFunc
}
//返回订阅者数量
func (p *Publisher) Len() int {
p.m.RLock()
i := len(p.subscribers)
p.m.RUnlock()
return i
}
//无Topic订阅
func (p *Publisher) Subscribe() chan interface{} {
return p.SubscribeTopic(nil)
}
//通过Topic订阅
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
ch := make(chan interface{}, p.buffer)
p.m.Lock()
p.subscribers[ch] = topic
p.m.Unlock()
return ch
}
//通过自定义chan缓冲区大小定义新的订阅者
func (p *Publisher) SubscribeTopicWithBuffer(topic topicFunc, buffer int) chan interface{} {
ch := make(chan interface{}, buffer)
p.m.Lock()
p.subscribers[ch] = topic
p.m.Unlock()
return ch
}
//移除某个订阅者
func (p *Publisher) Evict(sub chan interface{}) {
p.m.Lock()
_, exists := p.subscribers[sub]
if exists {
delete(p.subscribers, sub)
close(sub)
}
p.m.Unlock()
}
//发布消息
func (p *Publisher) Publish(v interface{}) {
p.m.RLock()
if len(p.subscribers) == 0 {
p.m.RUnlock()
return
}
wg := wgPool.Get().(*sync.WaitGroup)
for sub, topic := range p.subscribers {
wg.Add(1)
go p.sendTopic(sub, topic, v, wg)
}
wg.Wait()
wgPool.Put(wg)
p.m.RUnlock()
}
//关闭服务
func (p *Publisher) Close() {
p.m.Lock()
for sub := range p.subscribers {
delete(p.subscribers, sub)
close(sub)
}
p.m.Unlock()
}
//真正发布消息的逻辑,通过Timer,根据传入的timeout控制每次发布消息最大阻塞时长
func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) {
defer wg.Done()
if topic != nil && !topic(v) {
return
}
// send under a select as to not block if the receiver is unavailable
if p.timeout > 0 {
timeout := time.NewTimer(p.timeout)
defer timeout.Stop()
select {
case sub <- v:
case <-timeout.C:
}
return
}
select {
case sub <- v:
default:
}
}
代码实现
好了,搞清楚了 pubsub 的原理,我们可以开始基于 gRPC 实现我们的发布订阅服务了,先看下我们的代码目录
- pb 目录存放我们的 proto 文件和生成的 rpc 代码
- server 目录存放我们服务端代码
- sub 目录存放我们的订阅代码
- pub 目录存放我们的发布代码
gRPC代码
这个就不再赘述为何这样实现了,不清楚的同学可以先阅读下这篇文章:https://success.blog.csdn.net/article/details/114959896
syntax = "proto3";
package pb;
message Msg {
string Value = 1;
}
service PubsubService{
rpc Publish (Msg) returns (Msg);
rpc Subscribe (Msg) returns (stream Msg);
}
$protoc --go_out=plugins=grpc:. hello.proto
2021/03/21 17:06:06 WARNING: Missing 'go_package' option in "hello.proto",
please specify it with the full Go package path as
a future release of protoc-gen-go will require this be specified.
See https://developers.google.com/protocol-buffers/docs/reference/go-generated#package for more information.
服务端代码
看一下我们生成的代码,注册服务方法的第二个参数接收一个 PubsubServiceServer 接口
// PubsubServiceServer is the server API for PubsubService service.
type PubsubServiceServer interface {
Publish(context.Context, *Msg) (*Msg, error)
Subscribe(*Msg, PubsubService_SubscribeServer) error
}
func RegisterPubsubServiceServer(s *grpc.Server, srv PubsubServiceServer) {
s.RegisterService(&_PubsubService_serviceDesc, srv)
}
所以我们通过自己定义一个服务,同时实现 PubsubServiceServer 中定义的方法,即可注册我们自己的服务,这里我们订阅时使用的 topic 订阅,通过消息前缀来区分不同的 topic 消息:
server.go
package main
import (
"context"
"log"
"net"
"strings"
"time"
"grpc/pb"
"github.com/docker/docker/pkg/pubsub"
"google.golang.org/grpc"
)
type PubsubService struct {
pub *pubsub.Publisher
}
func NewPubsubService() *PubsubService {
return &PubsubService{
pub: pubsub.NewPublisher(100*time.Millisecond, 10),
}
}
func (p *PubsubService) Publish(ctx context.Context, arg *pb.Msg) (*pb.Msg, error) {
p.pub.Publish(arg.GetValue())
return &pb.Msg{}, nil
}
func (p *PubsubService) Subscribe(arg *pb.Msg, stream pb.PubsubService_SubscribeServer) error {
ch := p.pub.SubscribeTopic(func(v interface{}) bool {
if key, ok := v.(string); ok {
if strings.HasPrefix(key, arg.GetValue()) {
return true
}
}
return false
})
for v := range ch {
if err := stream.Send(&pb.Msg{Value: v.(string)}); err != nil {
return err
}
}
return nil
}
func main() {
server := grpc.NewServer()
pb.RegisterPubsubServiceServer(server, NewPubsubService())
listen, err := net.Listen("tcp", ":1234")
if nil != err {
log.Fatal(err)
}
server.Serve(listen)
}
订阅端代码
通过开启 TCP 连接到服务端,创建客户端实例,调用 Subscribe 方法订阅 golang topic 中的消息,同时不断调用 Recv 方法接收流数据:
sub.go
package main
import (
"context"
"fmt"
"io"
"log"
"grpc/pb"
"google.golang.org/grpc"
)
func main() {
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := pb.NewPubsubServiceClient(conn)
stream, err := client.Subscribe(context.Background(), &pb.Msg{Value: "golang:"})
if err != nil {
log.Fatal(err)
}
for {
reply, err := stream.Recv()
if nil != err {
if io.EOF == err {
break
}
log.Fatal(err)
}
fmt.Println(reply.GetValue())
}
}
发布端代码
发布端就比较简单了,连接服务端,创建客户端实例,然后调用 Publish 发布即可:
pub.go
package main
import (
"context"
"log"
"grpc/pb"
"google.golang.org/grpc"
)
func main() {
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := pb.NewPubsubServiceClient(conn)
_, err = client.Publish(context.Background(), &pb.Msg{Value: "golang: hello Golang"})
if err != nil {
log.Fatal(err)
}
}