- 安装
- 流模式
安装
方法1:
go get -u -v google.golang.org/grpc
方法2:
git clone https://github.com/grpc/grpc-go.git $GOPATH/pkg/mod/google.golang.org/grpc
git clone https://github.com/golang/net.git $GOPATH/pkg/mod/google.golang.org/x/net
git clone https://github.com/golang/text.git $GOPATH/pkg/mod/google.golang.org/x/text
git clone https://github.com/google/go-genproto.git $GOPATH/pkg/mod/google.golang.org/genproto
cd $GOPATH/pkg/mod
go install google.golang.org/grpc
stream模式
流模式可以源源不断的推送数据,很适合传输一些大数据,或服务端和客户端长时间数据交互。
简单模式(Simple RPC)
客户端发起一次请求,服务端响应一次数据
服务端
//protobuf定义的服务函数
type OutMsg struct{}
func (om *OutMsg) SayHello(ctx context.Context, p *pb.Person) (*pb.PhoneNumber, error) {
p.Age = 18
p.Name = "wang"
var pn pb.PhoneNumber = pb.PhoneNumber{Number: "11", Type: 2}
fmt.Println("rotem call")
return &pn, nil
}
/**
* @func: CreateGrpcSer
* @msg: 创建grpc服务端
* @param {*}
* @return {*}
*/
func CreateGrpcSer() {
//初始化对象
grpcSer := gRPC.NewServer()
//注册服务
pb.RegisterHelloServer(grpcSer, new(OutMsg))
//设置监听,ip,port
listener, err := net.Listen("tcp", "0.0.0.0:38000")
if err != nil {
log.Fatal(err)
return
}
defer listener.Close()
//启动
grpcSer.Serve(listener)
}
客户端
/**
* @func: CreateGrpcCli
* @msg: 创建grpc客户端
* @param {*}
* @return {*}
*/
func CreateGrpcCli() {
//连接grpc服务
grpcConn, err := gRPC.Dial("192.168.11.140:38000", gRPC.WithInsecure()) // gRPC.WithInsecure()以安全的方式操作
if err != nil {
log.Fatal(err)
return
}
defer grpcConn.Close()
//初始化grpc客户端
grpcClient := pb.NewHelloClient(grpcConn)
//调用远程服务
pNumber, err := grpcClient.SayHello(context.TODO(), &pb.Person{})
if err != nil {
log.Fatal(err)
return
}
fmt.Println(pNumber)
}
服务端数据流模式(Server-side streaming RPC)
客户端发起一次请求,服务端返回一段连续的数据流。
结束传输服务端会传送EOF
客户端数据流模式(Client-side streaming RPC)
与服务端数据流模式相反,客户端源源不断的向服务端发送数据流,而在发送结束后,由服务端返回一个响应
双向数据流模式(Bidirectional streamin RPC)
客户端和服务端都可以同时向对方发送数据流
关键字 stream
stream.proto
syntax="proto3";
option go_package="../;pb";
//关键字 stream
service Greeter{
rpc GetStream(StreamReqData) returns (stream StreamResData){} // 服务端流模式
rpc PostStream(stream StreamReqData) returns (StreamResData); //客户端流模式
rpc AllStream(stream StreamReqData)returns(stream StreamResData)//双向流模式
}
message StreamReqData{
string data=1;
}
message StreamResData{
string data=1;
}
server.go
package main
import (
"fmt"
"grpcStream/pb"
"log"
"net"
"sync"
"time"
"google.golang.org/grpc"
)
const PORT = ":50052"
/*注意 服务端接口函数定义
// GreeterServer is the server API for Greeter service.
type GreeterServer interface {
GetStream(*StreamReqData, Greeter_GetStreamServer) error
PostStream(Greeter_PostStreamServer) error
AllStream(Greeter_AllStreamServer) error
}
*/
type Serv struct{}
/**
* @func:
* @msg: 向客户端推送数据流
* @param {*pb.StreamReqData} req
* @param {pb.Greeter_GetStreamServer} res
* @return {error}
*/
func (s *Serv) GetStream(req *pb.StreamReqData, res pb.Greeter_GetStreamServer) (err error) {
fmt.Println(req.Data)
for i := 0; i < 10; i++ {
err := res.Send(&pb.StreamResData{
Data: fmt.Sprintf("%v,%d", time.Now().Unix(), i),
})
if err != nil {
log.Fatalln(err)
break
}
// time.Sleep(time.Second)
}
return err
}
/**
* @func:
* @msg:接收客户端推送数据
* @param {pb.Greeter_PostStreamServer} cliStr
* @return {*}
*/
func (s *Serv) PostStream(cliStr pb.Greeter_PostStreamServer) (err error) {
for {
if a, err := cliStr.Recv(); err != nil {
fmt.Printf("err:%v", err)
break
} else {
fmt.Println(a.Data)
}
}
return err
}
/**
* @func:
* @msg: 双向推流服务端
* @param {pb.Greeter_AllStreamServer} allStr
* @return {*}
*/
func (s *Serv) AllStream(allStr pb.Greeter_AllStreamServer) (err error) {
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
for {
if a, err := allStr.Recv(); err != nil {
fmt.Printf("err:%v", err)
break
} else {
fmt.Println(a.Data)
}
}
}()
go func() {
defer wg.Done()
for {
err = allStr.Send(&pb.StreamResData{
Data: fmt.Sprintf("server_send:%v", time.Now().Unix()),
})
if err != nil {
fmt.Printf("err:%v", err)
break
}
time.Sleep(time.Second)
}
}()
defer wg.Wait()
return err
}
func main() {
lis, err := net.Listen("tcp", PORT)
if err != nil {
panic(err)
}
defer lis.Close()
s := grpc.NewServer()
pb.RegisterGreeterServer(s, new(Serv))
err = s.Serve(lis)
if err != nil {
panic(err)
}
}
client.go
package main
import (
"context"
"fmt"
"grpcStream/pb"
"log"
"sync"
"time"
"google.golang.org/grpc"
)
/**
* @func: GetGrpcCli
* @msg: 创建grpc客户端-服务端流模式
* @param {*}
* @return {*}
*/
func GetGrpcCli() error {
//连接grpc服务
grpcConn, err := grpc.Dial("192.168.11.140:50052", grpc.WithInsecure()) // gRPC.WithInsecure()以安全的方式操作
if err != nil {
log.Fatal(err)
panic(err)
}
defer grpcConn.Close()
//初始化grpc客户端
grpcClient := pb.NewGreeterClient(grpcConn)
//调用远程服务
res, err := grpcClient.GetStream(context.TODO(), &pb.StreamReqData{Data: "req"})
if err != nil {
log.Fatal(err)
panic(err)
}
for {
a, err := res.Recv()
if err != nil {
fmt.Println(err)
break
}
fmt.Println(a)
}
panic(err)
}
/**
* @func:
* @msg: 创建grpc客户端-客户端流模式
* @param {*}
* @return {*}
*/
func PostGrpcCli() error {
//连接grpc服务
grpcConn, err := grpc.Dial("192.168.11.140:50052", grpc.WithInsecure()) // gRPC.WithInsecure()以安全的方式操作
if err != nil {
log.Fatal(err)
return err
}
defer grpcConn.Close()
//初始化grpc客户端
grpcClient := pb.NewGreeterClient(grpcConn)
//调用远程服务
postS, err := grpcClient.PostStream(context.TODO())
if err != nil {
log.Fatal(err)
return err
}
for i := 0; i < 5; i++ {
err = postS.Send(&pb.StreamReqData{
Data: fmt.Sprintf("%v,%d", time.Now().Unix(), i),
})
if err != nil {
log.Fatalln(err)
break
}
time.Sleep(time.Second)
}
return err
}
/**
* @func:
* @msg: 双向推流客户端
* @param {*}
* @return {*}
*/
func AllGrpcCli() error {
grpcConn, err := grpc.Dial("192.168.11.140:50052", grpc.WithInsecure()) // gRPC.WithInsecure()以安全的方式操作
if err != nil {
log.Fatal(err)
return err
}
defer grpcConn.Close()
//初始化grpc客户端
grpcClient := pb.NewGreeterClient(grpcConn)
//调用远程服务
allStr, err := grpcClient.AllStream(context.Background())
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
for {
if a, err := allStr.Recv(); err != nil {
fmt.Printf("err:%v", err)
break
} else {
fmt.Println(a.Data)
}
}
}()
go func() {
defer wg.Done()
for {
err = allStr.Send(&pb.StreamReqData{
Data: fmt.Sprintf("client_send:%v", time.Now().Unix()),
})
if err != nil {
fmt.Printf("err:%v", err)
break
}
time.Sleep(time.Second)
}
}()
defer wg.Wait()
return err
}
func main() {
GetGrpcCli()
PostGrpcCli()
AllGrpcCli()
}