gRPC 中监听 Sream 和 Transport 的事件

gRPC 中监听 Sream 和 Transport 的事件

gRPC 提供了拦截器可以监听请求的事件,但是对于 Stream 或者 Transport 的具体事件,无法通过拦截器实现;gRPC 提供了 StreamTracer 和 TransportFilter 支持这样的能力

StreamTracer

StreamTracer 用于监听流的所有事件,包括流关闭、出入站消息、出入站流大小等信息

StreamTracer 有用于客户端的 ClientStreamTracer 和用于服务端的 ServerStreamTracer

客户端

客户端的 StreamTracer 在拦截器中注入,当有请求被执行时,可以向 callOptions 添加自定义的 ClientStreamTracer.Factory,这样就会创建相应的 StreamTracer,实现监听

  • CustomClientInterceptor.java

在拦截器中指定

public class CustomClientInterceptor implements ClientInterceptor {
    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
        callOptions = callOptions.withStreamTracerFactory(new CustomClientStreamTracerFactory<>(method, callOptions, next));

        return next.newCall(method, callOptions);
    }
}
  • CustomClientStreamTracerFactory.java

在实现类中重写需要监听的事件方法

public class CustomClientStreamTracerFactory<ReqT, RespT> extends ClientStreamTracer.Factory {

    private final MethodDescriptor<ReqT, RespT> method;
    private final CallOptions callOptions;
    private final Channel next;

    public CustomClientStreamTracerFactory(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
        this.method = method;
        this.callOptions = callOptions;
        this.next = next;
    }

    @Override
    public ClientStreamTracer newClientStreamTracer(ClientStreamTracer.StreamInfo info, Metadata headers) {
        return new CustomClientStreamTracer<>(method, callOptions, next, info, headers);
    }
}

@Slf4j
class CustomClientStreamTracer<ReqT, RespT> extends ClientStreamTracer {

    public CustomClientStreamTracer(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next, StreamInfo info, Metadata headers) {
        log.info("Method:" + method.getFullMethodName() + " Next:" + next.authority() + " Header: " + headers.toString());
    }
}

服务端

服务端的 StreamTracer 在构建 Server 时指定,在执行请求时会监听相应的方法

  • 构建 Server
Server server = ServerBuilder.forPort(1235) 
                             .addStreamTracerFactory(new CustomServerStreamTracerFactory())
                             .build();
  • CustomServerStreamTracerFactory.java

可以在 CustomServerStreamTracer 中重写要监听的事件的方法

public class CustomServerStreamTracerFactory extends ServerStreamTracer.Factory {
    @Override
    public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
        return new CustomServerStreamTracer(fullMethodName, headers);
    }
}

@Slf4j
class CustomServerStreamTracer extends ServerStreamTracer {

    private final String fullMethodName;

    private final Metadata headers;

    public CustomServerStreamTracer(String fullMethodName, Metadata headers) {
        this.fullMethodName = fullMethodName;
        this.headers = headers;
    }
}

Transport 监听

可以监听 Transport 的就绪和终止事件

Server 端

  • CustomServerTransportFilter.java
@Slf4j
public class CustomServerTransportFilter extends ServerTransportFilter {

    @Override
    public Attributes transportReady(Attributes transportAttrs) {
        log.info("CustomServerTransportFilter transportReady, transportAttrs: {}", transportAttrs);
        return super.transportReady(transportAttrs);
    }

    @Override
    public void transportTerminated(Attributes transportAttrs) {
        log.info("CustomServerTransportFilter transportTerminated, transportAttrs: {}", transportAttrs);
        super.transportTerminated(transportAttrs);
    }
}
上一篇:gRPC 中泛化调用服务接口


下一篇:gRPC学习之一:在CentOS7部署和设置GO