GoLang通过gRPC流处理实现发布订阅服务

目录

流式处理

服务端流式

客户端流式

双向流式

发布订阅服务

pubsub原理

代码实现

gRPC代码

服务端代码

订阅端代码

发布端代码


流式处理

所谓流式处理,就是客户端和服务端一方可以源源不断地发送请求,另一方按照发送顺序依次处理请求。流式 RPC 分为三种,分别是服务端流式、客户端流式、双向流式

服务端流式

客户端发送一个请求给服务端,可获取一个数据流用来读取一系列消息。客户端从返回的数据流里一直读取直到没有更多消息为止。

rpc Hello (HelloRequest) returns (stream HelloResponse);

客户端流式

客户端用提供的一个数据流写入并发送一系列消息给服务端。一旦客户端完成消息写入,就等待服务端读取这些消息并返回应答。

rpc Hello (stream HelloRequest) returns (HelloResponse);

双向流式

两边都可以分别通过一个读写数据流来发送一系列消息。这两个数据流操作是相互独立的,所以客户端和服务端能按其希望的任意顺序读写,例如:服务端可以在写应答前等待所有的客户端消息,或者它可以先读一个消息再写一个消息,或者是读写相结合的其他方式。每个数据流里消息的顺序会被保持。

rpc Hello (stream HelloRequest) returns (stream HelloResponse);

 

发布订阅服务

借助流式处理的特点,我们可以通过服务端流式 RPC 实现发布订阅模式,我们使用 gRCP + Docker 中的 pubsub 包来实现一个简单的发布订阅消息队列。

pubsub原理

先来看一下 pubsub 包提供的几个方法,这对我们理解和使用它是很有必要的,核心逻辑已经通过中文注释在源码中体现,如果已经掌握 GoLang 的同步控制,相信读懂它没有什么难度,否则可以先看下这篇文章:https://success.blog.csdn.net/article/details/113788909

package pubsub // import "github.com/docker/docker/pkg/pubsub"

import (
	"sync"
	"time"
)

//等待组放在共享内存池中,减少GC
var wgPool = sync.Pool{New: func() interface{} { return new(sync.WaitGroup) }}

//第一个参数控制发布时最大阻塞时间
//第二个参数是缓冲区大小,控制每个订阅者的chan缓冲区大小
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
	return &Publisher{
		buffer:      buffer,
		timeout:     publishTimeout,
		subscribers: make(map[subscriber]topicFunc),
	}
}

type subscriber chan interface{}
type topicFunc func(v interface{}) bool

type Publisher struct {
	m           sync.RWMutex  //控制订阅者map并发读写安全
	buffer      int           //每个订阅者chan缓冲区大小
	timeout     time.Duration //发布阻塞超时时间
	subscribers map[subscriber]topicFunc
}

//返回订阅者数量
func (p *Publisher) Len() int {
	p.m.RLock()
	i := len(p.subscribers)
	p.m.RUnlock()
	return i
}

//无Topic订阅
func (p *Publisher) Subscribe() chan interface{} {
	return p.SubscribeTopic(nil)
}

//通过Topic订阅
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
	ch := make(chan interface{}, p.buffer)
	p.m.Lock()
	p.subscribers[ch] = topic
	p.m.Unlock()
	return ch
}

//通过自定义chan缓冲区大小定义新的订阅者
func (p *Publisher) SubscribeTopicWithBuffer(topic topicFunc, buffer int) chan interface{} {
	ch := make(chan interface{}, buffer)
	p.m.Lock()
	p.subscribers[ch] = topic
	p.m.Unlock()
	return ch
}

//移除某个订阅者
func (p *Publisher) Evict(sub chan interface{}) {
	p.m.Lock()
	_, exists := p.subscribers[sub]
	if exists {
		delete(p.subscribers, sub)
		close(sub)
	}
	p.m.Unlock()
}

//发布消息
func (p *Publisher) Publish(v interface{}) {
	p.m.RLock()
	if len(p.subscribers) == 0 {
		p.m.RUnlock()
		return
	}

	wg := wgPool.Get().(*sync.WaitGroup)
	for sub, topic := range p.subscribers {
		wg.Add(1)
		go p.sendTopic(sub, topic, v, wg)
	}
	wg.Wait()
	wgPool.Put(wg) 
	p.m.RUnlock()
}

//关闭服务
func (p *Publisher) Close() {
	p.m.Lock()
	for sub := range p.subscribers {
		delete(p.subscribers, sub)
		close(sub)
	}
	p.m.Unlock()
}

//真正发布消息的逻辑,通过Timer,根据传入的timeout控制每次发布消息最大阻塞时长
func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) {
	defer wg.Done()
	if topic != nil && !topic(v) {
		return
	}

	// send under a select as to not block if the receiver is unavailable
	if p.timeout > 0 {
		timeout := time.NewTimer(p.timeout)
		defer timeout.Stop()

		select {
		case sub <- v:
		case <-timeout.C:
		}
		return
	}

	select {
	case sub <- v:
	default:
	}
}

代码实现

好了,搞清楚了 pubsub 的原理,我们可以开始基于 gRPC 实现我们的发布订阅服务了,先看下我们的代码目录

  • pb 目录存放我们的 proto 文件和生成的 rpc 代码
  • server 目录存放我们服务端代码
  • sub 目录存放我们的订阅代码
  • pub 目录存放我们的发布代码

GoLang通过gRPC流处理实现发布订阅服务

gRPC代码

这个就不再赘述为何这样实现了,不清楚的同学可以先阅读下这篇文章:https://success.blog.csdn.net/article/details/114959896

syntax = "proto3";

package pb;


message Msg {
    string Value = 1;
}

service PubsubService{
    rpc Publish (Msg) returns (Msg);
    rpc Subscribe (Msg) returns (stream Msg);
}
$protoc --go_out=plugins=grpc:. hello.proto
2021/03/21 17:06:06 WARNING: Missing 'go_package' option in "hello.proto",
please specify it with the full Go package path as
a future release of protoc-gen-go will require this be specified.
See https://developers.google.com/protocol-buffers/docs/reference/go-generated#package for more information.

服务端代码

看一下我们生成的代码,注册服务方法的第二个参数接收一个 PubsubServiceServer 接口

// PubsubServiceServer is the server API for PubsubService service.
type PubsubServiceServer interface {
	Publish(context.Context, *Msg) (*Msg, error)
	Subscribe(*Msg, PubsubService_SubscribeServer) error
}

func RegisterPubsubServiceServer(s *grpc.Server, srv PubsubServiceServer) {
	s.RegisterService(&_PubsubService_serviceDesc, srv)
}

所以我们通过自己定义一个服务,同时实现 PubsubServiceServer 中定义的方法,即可注册我们自己的服务,这里我们订阅时使用的 topic 订阅,通过消息前缀来区分不同的 topic 消息:

server.go

package main

import (
	"context"
	"log"
	"net"
	"strings"
	"time"

	"grpc/pb"

	"github.com/docker/docker/pkg/pubsub"
	"google.golang.org/grpc"
)

type PubsubService struct {
	pub *pubsub.Publisher
}

func NewPubsubService() *PubsubService {
	return &PubsubService{
		pub: pubsub.NewPublisher(100*time.Millisecond, 10),
	}
}

func (p *PubsubService) Publish(ctx context.Context, arg *pb.Msg) (*pb.Msg, error) {
	p.pub.Publish(arg.GetValue())
	return &pb.Msg{}, nil
}

func (p *PubsubService) Subscribe(arg *pb.Msg, stream pb.PubsubService_SubscribeServer) error {
	ch := p.pub.SubscribeTopic(func(v interface{}) bool {
		if key, ok := v.(string); ok {
			if strings.HasPrefix(key, arg.GetValue()) {
				return true
			}
		}
		return false
	})

	for v := range ch {
		if err := stream.Send(&pb.Msg{Value: v.(string)}); err != nil {
			return err
		}
	}

	return nil
}

func main() {
	server := grpc.NewServer()
	pb.RegisterPubsubServiceServer(server, NewPubsubService())

	listen, err := net.Listen("tcp", ":1234")
	if nil != err {
		log.Fatal(err)
	}

	server.Serve(listen)
}

订阅端代码

通过开启 TCP 连接到服务端,创建客户端实例,调用 Subscribe 方法订阅 golang topic 中的消息,同时不断调用 Recv 方法接收流数据:

sub.go

package main

import (
	"context"
	"fmt"
	"io"
	"log"

	"grpc/pb"

	"google.golang.org/grpc"
)

func main() {
	conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
	if err != nil {
		log.Fatal(err)
	}
	defer conn.Close()

	client := pb.NewPubsubServiceClient(conn)
	stream, err := client.Subscribe(context.Background(), &pb.Msg{Value: "golang:"})
	if err != nil {
		log.Fatal(err)
	}

	for {
		reply, err := stream.Recv()
		if nil != err {
			if io.EOF == err {
				break
			}
			log.Fatal(err)
		}
		fmt.Println(reply.GetValue())
	}
}

发布端代码

发布端就比较简单了,连接服务端,创建客户端实例,然后调用 Publish 发布即可:

pub.go

package main

import (
	"context"
	"log"

	"grpc/pb"

	"google.golang.org/grpc"
)

func main() {
	conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
	if err != nil {
		log.Fatal(err)
	}
	defer conn.Close()

	client := pb.NewPubsubServiceClient(conn)
	_, err = client.Publish(context.Background(), &pb.Msg{Value: "golang: hello Golang"})
	if err != nil {
		log.Fatal(err)
	}
}

 

 

 

 

上一篇:浙大城市学院第十八届程序设计竞赛


下一篇:Fabric组织动态管理