gRPC 拦截器能做些什么?

什么是拦截器?

拦截器是一种横切维度的功能延展。

具象说明一下,高速收费站就是一种拦截器。它可以做什么?收费,查证,交通控制等等,面向所有穿行过往的车辆。

gRPC 拦截器主要分为两种:客户端拦截器(ClientInterceptor),服务端拦截器(ServerInterceptor),顾名思义,分别于请求的两端执行相应的前拦截处理。

一、客户端拦截器

1、作用时机?

请求被分发出去之前。

2、可以做什么?

a)、请求日志记录及监控

b)、添加请求头数据、以便代理转发使用

c)、请求或者结果重写

通常,如果要提供认证信息的话,可以使用 CallCredentials 实现,虽然,拦截器里也可以通过设置  CallOptions 来提供。

3、ClientInterceptor 源码

@ThreadSafe
public interface ClientInterceptor {

  <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
      MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next);
}

它只有一个方法:interceptCall,对于注册了相应拦截器的客户端调用,都要经过这个方法,

参数:

1、method:MethodDescriptor 类型,标示请求方法。包括方法全限定名称、请求服务名称、请求、结果、序列化工具、幂等等。

2、callOptions:此次请求的附带信息。

3、next:执行此次 RPC 请求的抽象链接管道(Channel)

返回结果:

ClientCall,包含请求及结果信息,并且不为null

4、最简单的实现

什么也不做:

public class MyGrpcClientInterceptor implements ClientInterceptor {
    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
        return next.newCall(method, callOptions);
    }
}

可以看到我们的实现里,没有实现任何逻辑,直接执行了 next.newCall 继续执行客户端的此次调用。

 next.newCall 只能在当前上下文中执行,每次调用以及返回都必须是一个完整地回路,逃逸使用会导致不必要的内存泄漏问题。

5、延伸使用

通过 callOption 设置超时及认证信息

public class MyGrpcClientInterceptor implements ClientInterceptor {
    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
        return next.newCall(method, callOptions
                .withDeadlineAfter(500, TimeUnit.MILLISECONDS) //设置超时
                .withCallCredentials(new CallCredentials() { //设置认证信息
                    @Override 
                    public void applyRequestMetadata(RequestInfo requestInfo, Executor appExecutor, MetadataApplier applier) {
                        Metadata metadata = new Metadata();
                        metadata.put(Metadata.Key.of("id", Metadata.ASCII_STRING_MARSHALLER), "king");
                        applier.apply(metadata);
                    }
                    
                    @Override 
                    public void thisUsesUnstableApi() {}
        }));
    }
}

看着是不是很熟悉,stub 调用时设置,只不过在这里换了一个设置场景。

日志记录:

public class MyGrpcClientInterceptor implements ClientInterceptor {
    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
        CallOptions myCallOptions = callOptions
                .withDeadlineAfter(500, TimeUnit.MILLISECONDS) //设置超时
                .withCallCredentials(new CallCredentials() { //设置认证信息
                    @Override
                    public void applyRequestMetadata(RequestInfo requestInfo, Executor appExecutor, MetadataApplier applier) {
                        Metadata metadata = new Metadata();
                        metadata.put(Metadata.Key.of("id", Metadata.ASCII_STRING_MARSHALLER), "king");
                        applier.apply(metadata);
                    }

                    @Override
                    public void thisUsesUnstableApi() {}
                });
        return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, myCallOptions)) {
            @Override
            public void sendMessage(ReqT message) {
                System.out.println("request method: " + method.getFullMethodName());
                System.out.println("request param:" + message.toString());

                super.sendMessage(message);
            }
        };
    }
}

ForwardingClientCall:ClientCall 的一个抽象实现类,用以请求的代理转发。为什么我们这里要用这个类呢?

其实我们完全可以直接使用 ClientCall 实现,只不过作为*抽闲类,我们必须要实现很多方法。而使用 ForwardingClientCall,则我们只需要去重写我们需要的方法就可以。

如上代码:

sendMessage 发送消息到请求服务器,可能会执行多次。此处我们记录相应的请求参数信息。

二、服务端拦截器

1、作用时机?

请求被具体的Handler相应前。

2、可以做什么?

a)访问认证

b)请求日志记录及监控

c)代理转发

3、ServerInterceptor 源码

@ThreadSafe
public interface ServerInterceptor {

  <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
      ServerCall<ReqT, RespT> call,
      Metadata headers,
      ServerCallHandler<ReqT, RespT> next);
}

参数:

call:ServerCall 对象,包含客户端请求的 MethodDescriptor

headers:请求头信息

next:处理链条上的下一个处理。

4、最简单的实现

public class MyGrpcServerInterceptor implements ServerInterceptor {

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
        return next.startCall(call, headers);
    }
}

ServerCallHandler:定义用以实现请求处理的接口类。

5、延伸使用

public class MyGrpcServerInterceptor implements ServerInterceptor {

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
        //提取认证信息
        String id = headers.get(Metadata.Key.of("id", Metadata.ASCII_STRING_MARSHALLER));
        return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(next.startCall(call, headers)) {
            private long startTime = 0; //处理开始时间
            private ReqT request;
            private boolean valid = false; //认证状态

            @Override
            public void onComplete() {
                //记录请求参数及耗时
                System.out.println("process cost: " + (System.nanoTime() - startTime));
                System.out.println("process param: " + request.toString());
                super.onComplete();
            }

            @Override
            public void onMessage(ReqT message) {
                startTime = System.nanoTime();
                request = message;
                if (StringUtils.equals("king", id)) {
                    super.onMessage(message);
                } else {
                    valid = false;
                }
            }

            @Override
            public void onHalfClose() {
                //验证失败则返回 Status.UNAUTHENTICATED
                if (!valid) {
                    call.close(Status.UNAUTHENTICATED.withDescription("auth failed"), new Metadata());
                } else {
                    super.onHalfClose();
                }
            }
        };
    }
}

onMessage:接收到请求时进行相应处理,我们这记录处理开始时间,及请求参数,同时根据提取的认证信息进行访问验证,验证通过则继续后续处理,否则设置认证状态为 false。

onHalfClose:处理认证标示及返回。

onComplete:处理结束记录请求参数及耗时。

 

上一篇:.Net Core GRPC报错


下一篇:跟我一起学 Go 系列:gRPC 拦截器