5 grpc支持的功能
1 拦截器
客户端拦截器
客户端普通的拦截器
func aInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
# before do something
err := invoker(ctx, method, req, reply, cc, opts...)
# after do something
}
客户端流拦截器
func clientStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
# before do something
s, err := streamer(ctx, desc, cc, method, opts...)
# after do something
}
调用时使用
// Setting up a connection to the server.
conn, err := grpc.Dial(address, grpc.WithInsecure(),
grpc.WithUnaryInterceptor(aClientInterceptor),
grpc.WithStreamInterceptor(bStreamInterceptor))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
服务端拦截器
func aUnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
log.Println("======= [Server Interceptor] ", info.FullMethod)
log.Printf(" Pre Proc Message : %s", req)
// Invoking the handler to complete the normal execution of a unary RPC.
m, err := handler(ctx, req)
// Post processing logic
log.Printf(" Post Proc Message : %s", m)
return m, err
}
服务端流拦截器
func bServerStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// Pre-processing
log.Println("====== [Server Stream Interceptor] ", info.FullMethod)
// Invoking the StreamHandler to complete the execution of RPC invocation
err := handler(srv, newWrappedStream(ss))
if err != nil {
log.Printf("RPC failed with error %v", err)
}
return err
}
服务端使用拦截器
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer(
grpc.UnaryInterceptor(aUnaryServerInterceptor),
grpc.StreamInterceptor(bServerStreamInterceptor))
pb.RegisterXXServer(s, &server{})
// Register reflection service on gRPC server.
reflection.Register(s)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
2 截止时间
客户端截止时间
clientDeadline := time.Now().Add(time.Duration(2 * time.Second))
ctx, cancel := context.WithDeadline(context.Background(), clientDeadline)
defer cancel()
3 取消
主动调用cancel
4 错误处理
常用的状态码
OK 成功
CANCELLED 取消
DEEATLINE_EXCEEDED 超时
INVALID_ARGUMENT 客户端非法参数
5 多路复用
一个server上注册多个服务
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
grpcServer := grpc.NewServer()
a_pb.RegisterOrderManagementServer(grpcServer, &aServer{})
b_pb.RegisterGreeterServer(grpcServer, &bServer{})
reflection.Register(grpcServer)
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
6 元数据
服务端读元数据
md, metadataAvailable := metadata.FromIncomingContext(ctx)
if !metadataAvailable {
return nil, status.Errorf(codes.DataLoss, "UnaryEcho: failed to get metadata")
}
服务端写元数据
header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(time.StampNano)})
stream.SendHeader(header)
客户端添加元数据
md := metadata.Pairs(
"timestamp", time.Now().Format(time.StampNano),
"kn", "vn",
)
mdCtx := metadata.NewOutgoingContext(context.Background(), md)
ctxA := metadata.AppendToOutgoingContext(mdCtx, "k1", "v1", "k1", "v2", "k2", "v3")
// RPC using the context with new metadata.
var header, trailer metadata.MD
// RPC: Add Order
res, _ := client.XXX(ctxA, &xx{}, grpc.Header(&header), grpc.Trailer(&trailer))
7 负载均衡
增加负载均衡
roundrobinConn, err := grpc.Dial(
fmt.Sprintf("%s:///%s", exampleScheme, exampleServiceName), // // "example:///lb.example.grpc.io"
grpc.WithBalancerName("round_robin"), // This sets the initial balancing policy.
grpc.WithInsecure(),
)
压缩,增加特性
grpc.UseCompressor(gzip.Name)