前言:
之前的文章已经从调用结构方面从前到后整个梳理了一下全过程。
本篇就从实战调用角度来分析下整个过程,之前是抽象,现在就是实战。
1.示例代码
代码的话跟之前是一样的,笔者在这里再贴一下
1.1 provider
public class ProviderApplication {
public static void main(String[] args) throws Exception {
ServiceConfig<DemoServiceImpl> service = new ServiceConfig<>();
service.setInterface(DemoService.class);
service.setRef(new DemoServiceImpl());
service.setApplication(new ApplicationConfig("dubbo-demo-api-provider"));
service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
service.export();
System.out.println("dubbo service started");
new CountDownLatch(1).await();
}
}
1.2 comsumer
public class ConsumerApplication {
public static void main(String[] args) {
ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
reference.setApplication(new ApplicationConfig("dubbo-demo-api-consumer"));
reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
reference.setInterface(DemoService.class);
reference.setScope("remote");
DemoService service = reference.get();
String message = service.sayHello("dubbo");
System.out.println(message);
}
}
2.消费者调用全过程
之前的博客已经分析过主要的步骤,在这里笔者快速过一下
我们就从String message = service.sayHello("dubbo"); 这句调用开始
2.1 service即Proxy0
通过javassist创建的Proxy0继承了Proxy抽象类。简略内容如下所示:
public final class $Proxy0
extends Proxy
implements Subject {
private static Method m1;
private static Method m2;
private static Method m3;
private static Method m0;
public $Proxy0(InvocationHandler paramInvocationHandler) {
super(paramInvocationHandler);
}
@Override
public final String sayHello(String paramString) {
try {
// 本质上是对InvocationHandler的调用
return (String) this.h.invoke(this, m3, new Object[]{paramString});
} catch (Error | RuntimeException localError) {
throw localError;
} catch (Throwable localThrowable) {
throw new UndeclaredThrowableException(localThrowable);
}
}
}
Proxy代理类对方法的调用,最终都反映到InvokerInvocationHandler的调用上
2.2 InvokerInvocationHandler.invoke()
public class InvokerInvocationHandler implements InvocationHandler {
private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);
private final Invoker<?> invoker;
private ConsumerModel consumerModel;
...
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
...
// 将请求的所有信息都包装进来,请求方法、接口信息、参数信息
RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
String serviceKey = invoker.getUrl().getServiceKey();
rpcInvocation.setTargetServiceUniqueName(serviceKey);
if (consumerModel != null) {
rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
}
// 这里的invoker=MockClusterInvoker
return invoker.invoke(rpcInvocation).recreate();
}
}
请求体所有信息都包装在RpcInvocation,交由下一个Invoker(MockClusterInvoker)处理
2.3 ClusterInvoker.invoke()
ClusterInvoker的调用链为:MockClusterInvoker --> FailoverClusterInvoker
MockClusterInvoker主要作用就是进行Mock访问,这个我们后续再仔细说明;
FailoverClusterInvoker意义比较重大,它提供的是一种消费者调用时的集群容错方案,在多个服务提供者情况下,当前消费者调用A提供者失败时,会自动切换到B提供者再次调用,最多重试N次。
当然除了FailoverClusterInvoker,还有其他多种集群容错方案可供选择,这个后续会有系列文章进行说明。
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
// 重试次数
int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
...
for (int i = 0; i < len; i++) {
if (i > 0) {
checkWhetherDestroyed();
copyInvokers = list(invocation);
checkInvokers(copyInvokers, invocation);
}
// 选择合适的dubbo provider Invoker
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
// 针对这个Invoker发起调用
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
...
}
return result;
// 异常,则再次进入循环,进入下一次调用
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
...
}
}
在ClusterInvoker的包装下,完成了Dubbo集群容错与负载均衡策略的实现。后续会更加详细的对这两方面进行介绍。
2.4 ProtocolFilterWrapper.invoke()
Filter包装类的调用,在真正调用到DubboProtocol之前,会先经过一系列的Filter的调用
public class ProtocolFilterWrapper implements Protocol {
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
// 通过SPI的方式获取所有的Filter(属于Consumer组)
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
...
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
// 逐个调用Filter
asyncResult = filter.invoke(next, invocation);
} ...
};
}
}
return last;
}
}
似乎每个框架都有Filter层,可以在真正调用前做一些全局操作,后续会有专门的Filter专题来介绍,这里我们知道即可。
2.5 ListenerInvokerWrapper.invoke()
public class ListenerInvokerWrapper<T> implements Invoker<T> {
@Override
public Result invoke(Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
}
主要是创建一系列的监听器,后续分析
2.6 同步?异步调用?
public class AsyncToSyncInvoker<T> implements Invoker<T> {
private Invoker<T> invoker;
...
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult = invoker.invoke(invocation);
try {
// 如果是同步调用,则一直等待结果集
if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}
} ...
return asyncResult;
}
}
Dubbo为了提高性能,提供了异步调用方式,后续专门介绍。
默认都是同步调用,会一直等待结果集
2.7 DubboInvoker.doInvoker()
最终执行到真正的调用类
public class DubboInvoker<T> extends AbstractInvoker<T> {
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
// 轮询获取可用client
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
// 最终通过调用currentClient.send()来进行同步或异步调用
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = calculateTimeout(invocation, methodName);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}
2.8 HeaderExchangeChannel.request()
final class HeaderExchangeChannel implements ExchangeChannel {
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// 创建request对象
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
// 主要内容就是request,包含了接口名、方法名、参数信息
req.setData(request);
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
try {
// 交由channel发送出去
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
}
快到尾声了,后续的具体发送工作交由NettyClient执行(默认)
2.9 NettyClient.send()
public class NettyClient extends AbstractClient {
public void send(Object message, boolean sent) throws RemotingException {
// 未创建连接的情况下,则先创建连接
if (needReconnect && !isConnected()) {
connect();
}
Channel channel = getChannel();
//TODO Can the value returned by getChannel() be null? need improvement.
if (channel == null || !channel.isConnected()) {
throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
}
// 最后交由channel发送出去
channel.send(message, sent);
}
}
2.10 NettyChannel.send()
final class NettyChannel extends AbstractChannel {
public void send(Object message, boolean sent) throws RemotingException {
// whether the channel is closed
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
// 最终通过channel发送出去
ChannelFuture future = channel.writeAndFlush(message);
if (sent) {
// wait timeout ms
timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
success = future.await(timeout);
}
Throwable cause = future.cause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
removeChannelIfDisconnected(channel);
throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
}
最终,还是通过Netty的channel将请求发送出去了
过程有点长,通过时序图来展示下