在对服务者调用流程的讲解中,最后invoker调用链是
RegisterDirectory$InvokerDelegate.invoke(InvokerWrapper.invoke) -> ListenerInvokerWraper.invoke -> ProtocolFilterWrapper.invoke -> DubboInvoke.invoke
接下来将重点介绍该流程,其中包括一系列的filter
InvokerWrapper.invoke 很简单 就直接调用下一层的ListenerInvokerWrapper,也是直接调用下层的ProtocolFilterWrapper
首先在refer invoke的时候,已经构建好了filter链
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
根据dubbo的扩展点会加载到所有@Activate的扩展,三个Filter 脱颖而出,顺序依次是:ConsumerContextFilter
、MonitorFilter
、FutureFilter
(排序顺序,order 越小月往前)
再看看一下ProtocolFilterWrapper#buildInvokerChain对filter在invoker的包装,我们就知道优先调用的是ConsumerContextFilter
if (!filters.isEmpty()) { for (int i = filters.size() - 1; i >= 0; i--) { // 最后一个filter包装在最里面的invoker中,返回出去的是包装了第一个filter的invoker // 所以在调用的时候 会先调用第一个filter final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { @Override public Class<T> getInterface() { return invoker.getInterface(); } @Override public URL getUrl() { return invoker.getUrl(); } @Override public boolean isAvailable() { return invoker.isAvailable(); } @Override public Result invoke(Invocation invocation) throws RpcException { Result result = filter.invoke(next, invocation); if (result instanceof AsyncRpcResult) { AsyncRpcResult asyncResult = (AsyncRpcResult) result; asyncResult.thenApplyWithContext(r -> filter.onResponse(r, invoker, invocation)); return asyncResult; } else { return filter.onResponse(result, invoker, invocation); } } @Override public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; }
接下来就是调用invoker方法了
public Result invoke(Invocation invocation) throws RpcException { Result result = filter.invoke(next, invocation); if (result instanceof AsyncRpcResult) { AsyncRpcResult asyncResult = (AsyncRpcResult) result; asyncResult.thenApplyWithContext(r -> filter.onResponse(r, invoker, invocation)); return asyncResult; } else { return filter.onResponse(result, invoker, invocation); } }
ConsumerContextFilter的过滤器
主要功能是往当前执行上下文中设置 invoker、invocation、localAddress、remoteAddress 和 remotePort等。
而后,将 链式的下一个 Filter 放入 invocation中,标识为当前invocation执行的 invoker,并将当前上下文的serverLocal清楚
最后调用invoker.invoke(invocation); 执行下一个Filter调用
MonitorFilter
Monitor 主要通过 ConcurrentMap<String, AtomicInteger>
, 来记录 interface.method
的调用次数。
最后就往下执行。
FutureFilter
然后调到FutureFilter ,最后调到DubboInvoker,那么后面就是网络通信相关的内容了。