gRPC

  • 安装
  • 流模式

安装

方法1:
go get -u -v google.golang.org/grpc
方法2:

git clone https://github.com/grpc/grpc-go.git $GOPATH/pkg/mod/google.golang.org/grpc

git clone https://github.com/golang/net.git $GOPATH/pkg/mod/google.golang.org/x/net

git clone https://github.com/golang/text.git $GOPATH/pkg/mod/google.golang.org/x/text

git clone https://github.com/google/go-genproto.git $GOPATH/pkg/mod/google.golang.org/genproto

cd $GOPATH/pkg/mod
go install google.golang.org/grpc

stream模式

流模式可以源源不断的推送数据,很适合传输一些大数据,或服务端和客户端长时间数据交互。

简单模式(Simple RPC)

客户端发起一次请求,服务端响应一次数据

服务端
//protobuf定义的服务函数
type OutMsg struct{}
func (om *OutMsg) SayHello(ctx context.Context, p *pb.Person) (*pb.PhoneNumber, error) {
	p.Age = 18
	p.Name = "wang"
	var pn pb.PhoneNumber = pb.PhoneNumber{Number: "11", Type: 2}
	fmt.Println("rotem call")
	return &pn, nil
}
/**
 * @func: CreateGrpcSer
 * @msg: 创建grpc服务端
 * @param {*}
 * @return {*}
 */
func CreateGrpcSer() {
	//初始化对象
	grpcSer := gRPC.NewServer()
	//注册服务
	pb.RegisterHelloServer(grpcSer, new(OutMsg))
	//设置监听,ip,port
	listener, err := net.Listen("tcp", "0.0.0.0:38000")
	if err != nil {
		log.Fatal(err)
		return
	}
	defer listener.Close()
	//启动
	grpcSer.Serve(listener)
}
客户端
/**
 * @func: CreateGrpcCli
 * @msg: 创建grpc客户端
 * @param {*}
 * @return {*}
 */
func CreateGrpcCli() {
	//连接grpc服务
	grpcConn, err := gRPC.Dial("192.168.11.140:38000", gRPC.WithInsecure()) // gRPC.WithInsecure()以安全的方式操作
	if err != nil {
		log.Fatal(err)
		return
	}
	defer grpcConn.Close()
	//初始化grpc客户端
	grpcClient := pb.NewHelloClient(grpcConn)
	//调用远程服务
	pNumber, err := grpcClient.SayHello(context.TODO(), &pb.Person{})
	if err != nil {
		log.Fatal(err)
		return
	}
	fmt.Println(pNumber)
}

服务端数据流模式(Server-side streaming RPC)

客户端发起一次请求,服务端返回一段连续的数据流。
结束传输服务端会传送EOF

客户端数据流模式(Client-side streaming RPC)

与服务端数据流模式相反,客户端源源不断的向服务端发送数据流,而在发送结束后,由服务端返回一个响应

双向数据流模式(Bidirectional streamin RPC)

客户端和服务端都可以同时向对方发送数据流

关键字 stream

stream.proto

syntax="proto3";

option go_package="../;pb";

//关键字 stream
service Greeter{
	rpc GetStream(StreamReqData) returns (stream StreamResData){} // 服务端流模式
	rpc PostStream(stream StreamReqData) returns (StreamResData); //客户端流模式
	rpc AllStream(stream StreamReqData)returns(stream StreamResData)//双向流模式
}
message StreamReqData{
	string data=1;
}
message StreamResData{
	string data=1;
}
server.go

package main

import (
	"fmt"
	"grpcStream/pb"
	"log"
	"net"
	"sync"
	"time"

	"google.golang.org/grpc"
)

const PORT = ":50052"

/*注意 服务端接口函数定义
// GreeterServer is the server API for Greeter service.
type GreeterServer interface {
	GetStream(*StreamReqData, Greeter_GetStreamServer) error
	PostStream(Greeter_PostStreamServer) error
	AllStream(Greeter_AllStreamServer) error
}
*/
type Serv struct{}

/**
 * @func:
 * @msg: 向客户端推送数据流
 * @param {*pb.StreamReqData} req
 * @param {pb.Greeter_GetStreamServer} res
 * @return {error}
 */
func (s *Serv) GetStream(req *pb.StreamReqData, res pb.Greeter_GetStreamServer) (err error) {
	fmt.Println(req.Data)
	for i := 0; i < 10; i++ {
		err := res.Send(&pb.StreamResData{
			Data: fmt.Sprintf("%v,%d", time.Now().Unix(), i),
		})
		if err != nil {
			log.Fatalln(err)
			break
		}
		// time.Sleep(time.Second)
	}
	return err
}

/**
 * @func:
 * @msg:接收客户端推送数据
 * @param {pb.Greeter_PostStreamServer} cliStr
 * @return {*}
 */
func (s *Serv) PostStream(cliStr pb.Greeter_PostStreamServer) (err error) {
	for {
		if a, err := cliStr.Recv(); err != nil {
			fmt.Printf("err:%v", err)
			break
		} else {
			fmt.Println(a.Data)
		}
	}
	return err
}

/**
 * @func:
 * @msg: 双向推流服务端
 * @param {pb.Greeter_AllStreamServer} allStr
 * @return {*}
 */
func (s *Serv) AllStream(allStr pb.Greeter_AllStreamServer) (err error) {
	wg := sync.WaitGroup{}
	wg.Add(2)
	go func() {
		defer wg.Done()
		for {
			if a, err := allStr.Recv(); err != nil {
				fmt.Printf("err:%v", err)
				break
			} else {
				fmt.Println(a.Data)
			}
		}
	}()
	go func() {
		defer wg.Done()
		for {
			err = allStr.Send(&pb.StreamResData{
				Data: fmt.Sprintf("server_send:%v", time.Now().Unix()),
			})
			if err != nil {
				fmt.Printf("err:%v", err)
				break
			}
			time.Sleep(time.Second)
		}
	}()
	defer wg.Wait()
	return err
}
func main() {
	lis, err := net.Listen("tcp", PORT)
	if err != nil {
		panic(err)
	}
	defer lis.Close()
	s := grpc.NewServer()
	pb.RegisterGreeterServer(s, new(Serv))
	err = s.Serve(lis)
	if err != nil {
		panic(err)
	}
}
client.go

package main

import (
	"context"
	"fmt"
	"grpcStream/pb"
	"log"
	"sync"
	"time"

	"google.golang.org/grpc"
)

/**
 * @func: GetGrpcCli
 * @msg: 创建grpc客户端-服务端流模式
 * @param {*}
 * @return {*}
 */
func GetGrpcCli() error {
	//连接grpc服务
	grpcConn, err := grpc.Dial("192.168.11.140:50052", grpc.WithInsecure()) // gRPC.WithInsecure()以安全的方式操作
	if err != nil {
		log.Fatal(err)
		panic(err)
	}
	defer grpcConn.Close()
	//初始化grpc客户端
	grpcClient := pb.NewGreeterClient(grpcConn)
	//调用远程服务
	res, err := grpcClient.GetStream(context.TODO(), &pb.StreamReqData{Data: "req"})
	if err != nil {
		log.Fatal(err)
		panic(err)
	}
	for {
		a, err := res.Recv()
		if err != nil {
			fmt.Println(err)
			break
		}
		fmt.Println(a)
	}
	panic(err)
}

/**
 * @func:
 * @msg: 创建grpc客户端-客户端流模式
 * @param {*}
 * @return {*}
 */
func PostGrpcCli() error {
	//连接grpc服务
	grpcConn, err := grpc.Dial("192.168.11.140:50052", grpc.WithInsecure()) // gRPC.WithInsecure()以安全的方式操作
	if err != nil {
		log.Fatal(err)
		return err
	}
	defer grpcConn.Close()
	//初始化grpc客户端
	grpcClient := pb.NewGreeterClient(grpcConn)
	//调用远程服务
	postS, err := grpcClient.PostStream(context.TODO())
	if err != nil {
		log.Fatal(err)
		return err
	}
	for i := 0; i < 5; i++ {
		err = postS.Send(&pb.StreamReqData{
			Data: fmt.Sprintf("%v,%d", time.Now().Unix(), i),
		})
		if err != nil {
			log.Fatalln(err)
			break
		}
		time.Sleep(time.Second)
	}
	return err
}

/**
 * @func:
 * @msg: 双向推流客户端
 * @param {*}
 * @return {*}
 */
func AllGrpcCli() error {
	grpcConn, err := grpc.Dial("192.168.11.140:50052", grpc.WithInsecure()) // gRPC.WithInsecure()以安全的方式操作
	if err != nil {
		log.Fatal(err)
		return err
	}
	defer grpcConn.Close()
	//初始化grpc客户端
	grpcClient := pb.NewGreeterClient(grpcConn)
	//调用远程服务
	allStr, err := grpcClient.AllStream(context.Background())
	wg := sync.WaitGroup{}
	wg.Add(2)
	go func() {
		defer wg.Done()
		for {
			if a, err := allStr.Recv(); err != nil {
				fmt.Printf("err:%v", err)
				break
			} else {
				fmt.Println(a.Data)
			}
		}
	}()
	go func() {
		defer wg.Done()
		for {
			err = allStr.Send(&pb.StreamReqData{
				Data: fmt.Sprintf("client_send:%v", time.Now().Unix()),
			})
			if err != nil {
				fmt.Printf("err:%v", err)
				break
			}
			time.Sleep(time.Second)
		}
	}()
	defer wg.Wait()
	return err
}
func main() {
	GetGrpcCli()
	PostGrpcCli()
	AllGrpcCli()
}
上一篇:Decomposition——HDU7028


下一篇:C++学习笔记