前言
之前我们讲了 grpc 怎么简单的使用 ,这次讲讲 grpc 中的 stream,srteam 顾名思义 就是 一种 流,可以源源不断的 推送 数据,很适合 传输一些大数据,或者 服务端 和 客户端 长时间 数据交互,比如 客户端 可以向 服务端 订阅 一个数据,服务端 就 可以利用 stream ,源源不断地 推送数据。
stream的种类:
客户端推送 服务端 rpc GetStream (StreamReqData) returns (stream StreamResData){} 服务端推送 客户端 rpc PutStream (stream StreamReqData) returns (StreamResData){} 客户端与 服务端 互相 推送 rpc AllStream (stream StreamReqData) returns (stream StreamResData){}
其实这个流 已经 基本退化成 tcp了,grpc 底层为我们 分包了,所以真的很方便。
protobuf的定义:
syntax = "proto3";//声明proto的版本 只能 是3,才支持 grpc //声明 包名 package pro; //声明grpc服务 service Greeter { /* 以下 分别是 服务端 推送流, 客户端 推送流 ,双向流。 */ rpc GetStream (StreamReqData) returns (stream StreamResData){} rpc PutStream (stream StreamReqData) returns (StreamResData){} rpc AllStream (stream StreamReqData) returns (stream StreamResData){} } //stream请求结构 message StreamReqData { string data = 1; } //stream返回结构 message StreamResData { string data = 1; }
我们在 protobuf 里面 定义 要提供的服务,如果 你想把哪个数据 源源不断的 推送 就在前面加个stream 就好了,定义好记得编译。
服务端的实现:
package main import ( "context" "fmt" "google.golang.org/grpc" "grpc/pro" "log" "net" "sync" "time" ) const PORT = ":50051" type server struct { } //服务端 单向流 func (s *server)GetStream(req *pro.StreamReqData, res pro.Greeter_GetStreamServer) error{ i:= 0 for{ i++ res.Send(&pro.StreamResData{Data:fmt.Sprintf("%v",time.Now().Unix())}) time.Sleep(1*time.Second) if i >10 { break } } return nil } //客户端 单向流 func (this *server) PutStream(cliStr pro.Greeter_PutStreamServer) error { for { if tem, err := cliStr.Recv(); err == nil { log.Println(tem) } else { log.Println("break, err :", err) break } } return nil } //客户端服务端 双向流 func(this *server) AllStream(allStr pro.Greeter_AllStreamServer) error { wg := sync.WaitGroup{} wg.Add(2) go func() { for { data, _ := allStr.Recv() log.Println(data) } wg.Done() }() go func() { for { allStr.Send(&pro.StreamResData{Data:"ssss"}) time.Sleep(time.Second) } wg.Done() }() wg.Wait() return nil } func main(){ //监听端口 lis,err := net.Listen("tcp",PORT) if err != nil{ return } //创建一个grpc 服务器 s := grpc.NewServer() //注册事件 pro.RegisterGreeterServer(s,&server{}) //处理链接 s.Serve(lis) }
知识点:
- 每个函数都对应着 完成了 protobuf 里面的 定义。
- 每个函数 形参都有对应的 推送 或者 接收 对象,我们只要 不断循环 Recv(),或者 Send() 就能接收或者推送了!
- 当return出函数,就说明此次 推送 或者 接收 结束了,client 会 对应的 收到消息!
客户端调用:
package main import ( "google.golang.org/grpc" "grpc/pro" "log" "context" "time" _ "google.golang.org/grpc/balancer/grpclb" ) const ( ADDRESS = "localhost:50051" ) func main(){ //通过grpc 库 建立一个连接 conn ,err := grpc.Dial(ADDRESS,grpc.WithInsecure()) if err != nil{ return } defer conn.Close() //通过刚刚的连接 生成一个client对象。 c := pro.NewGreeterClient(conn) //调用服务端推送流 reqstreamData := &pro.StreamReqData{Data:"aaa"} res,_ := c.GetStream(context.Background(),reqstreamData) for { aa,err := res.Recv() if err != nil { log.Println(err) break } log.Println(aa) } //客户端 推送 流 putRes, _ := c.PutStream(context.Background()) i := 1 for { i++ putRes.Send(&pro.StreamReqData{Data:"ss"}) time.Sleep(time.Second) if i > 10 { break } } //服务端 客户端 双向流 allStr,_ := c.AllStream(context.Background()) go func() { for { data,_ := allStr.Recv() log.Println(data) } }() go func() { for { allStr.Send(&pro.StreamReqData{Data:"ssss"}) time.Sleep(time.Second) } }() select { } }
client 调用 流的函数, 就会 返回一个 流对象,只要 不断地 对它进行读取或者写入,对应方就能收到。
总结:
grpc 的 stream 和 go的协程 配合 简直完美。通过流 我们 可以更加 灵活的 实现自己的业务。如 订阅,大数据传输等。
作者:xyt001
链接:https://www.jianshu.com/p/85e9cfa16247
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。