grpc(五)

5 grpc支持的功能

1 拦截器

客户端拦截器

客户端普通的拦截器

func aInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
    # before do something
	err := invoker(ctx, method, req, reply, cc, opts...)
	# after do something 
}

客户端流拦截器


func clientStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
    # before do something
    s, err := streamer(ctx, desc, cc, method, opts...)
    # after do something 
}

调用时使用

// Setting up a connection to the server.
conn, err := grpc.Dial(address, grpc.WithInsecure(),
	grpc.WithUnaryInterceptor(aClientInterceptor),
	grpc.WithStreamInterceptor(bStreamInterceptor))
if err != nil {
	log.Fatalf("did not connect: %v", err)
}

服务端拦截器

func aUnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
	log.Println("======= [Server Interceptor] ", info.FullMethod)
	log.Printf(" Pre Proc Message : %s", req)
	// Invoking the handler to complete the normal execution of a unary RPC.
	m, err := handler(ctx, req)

	// Post processing logic
	log.Printf(" Post Proc Message : %s", m)
	return m, err
}

服务端流拦截器


func bServerStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
	// Pre-processing
	log.Println("====== [Server Stream Interceptor] ", info.FullMethod)

	// Invoking the StreamHandler to complete the execution of RPC invocation
	err := handler(srv, newWrappedStream(ss))
	if err != nil {
		log.Printf("RPC failed with error %v", err)
	}
	return err
}

服务端使用拦截器

lis, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	s := grpc.NewServer(
		grpc.UnaryInterceptor(aUnaryServerInterceptor),
		grpc.StreamInterceptor(bServerStreamInterceptor))
	pb.RegisterXXServer(s, &server{})
	// Register reflection service on gRPC server.
	reflection.Register(s)
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}

2 截止时间

客户端截止时间

clientDeadline := time.Now().Add(time.Duration(2 * time.Second))
ctx, cancel := context.WithDeadline(context.Background(), clientDeadline)
defer cancel()

3 取消

主动调用cancel

4 错误处理

常用的状态码

OK 成功

CANCELLED 取消

DEEATLINE_EXCEEDED 超时

INVALID_ARGUMENT 客户端非法参数


5 多路复用

一个server上注册多个服务

lis, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	grpcServer := grpc.NewServer()

	a_pb.RegisterOrderManagementServer(grpcServer, &aServer{})

	b_pb.RegisterGreeterServer(grpcServer, &bServer{})

	reflection.Register(grpcServer)
	if err := grpcServer.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}

6 元数据

服务端读元数据

	md, metadataAvailable := metadata.FromIncomingContext(ctx)
	if !metadataAvailable {
		return nil, status.Errorf(codes.DataLoss, "UnaryEcho: failed to get metadata")
	}

服务端写元数据

	header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(time.StampNano)})
	stream.SendHeader(header)

客户端添加元数据


	md := metadata.Pairs(
		"timestamp", time.Now().Format(time.StampNano),
		"kn", "vn",
	)
	mdCtx := metadata.NewOutgoingContext(context.Background(), md)

	ctxA := metadata.AppendToOutgoingContext(mdCtx, "k1", "v1", "k1", "v2", "k2", "v3")


	// RPC using the context with new metadata.
	var header, trailer metadata.MD


	// RPC: Add Order
	res, _ := client.XXX(ctxA, &xx{}, grpc.Header(&header), grpc.Trailer(&trailer))

7 负载均衡

增加负载均衡

roundrobinConn, err := grpc.Dial(
		fmt.Sprintf("%s:///%s", exampleScheme, exampleServiceName), // // "example:///lb.example.grpc.io"
		grpc.WithBalancerName("round_robin"), // This sets the initial balancing policy.
		grpc.WithInsecure(),
	)

压缩,增加特性

 grpc.UseCompressor(gzip.Name)
上一篇:etcd学习和实战:2、本地集群测试及gRPC命名和发现


下一篇:华为网络自动化编程 之 gRPC 方式获取交换机信息