RPC调用:客户端将服务调用接口、方法名、方法类型参数和方法参数值进行序列化传输到服务端,服务端反序列读取信息进行代理调用。
总结:
(1)客户端启动时会从注册中心拉取和订阅相应的服务列表,Cluster会把拉取的服务列表合成一个Invoker。
(2)Directory#list获取providers地址(已经生成的Invoker)进行路由和负载均衡。
(3)每一个接口都对应一个RegistryDirectory,负责拉取和订阅服务提供者、动态配置和路由。
消费者发起调用
InvokerInvocationHandler#invoke
消费者经过代理,首先会调用InvokerInvocationHandler#invoke。根据方法名称和入参构造了一个RpcInvocation。这里的invoker类型是MockClusterInvoker。
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
//省略代码
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
MockClusterInvoker–包装–>FailoverClusterInvoker(如果是多注册中心,则是RegistryAwareClusterInvoker)–继承–>AbstractClusterInvoker
MockClusterInvoker#invoke
判断是否带有mock参数,如果没有,默认为false。这里的invoker是包装的FailoverClusterInvoker。
如果mock=false,调用FailoverClusterInvoker#invoke(调用父类AbstractClusterInvoker的invoke方法)。
如果mock=force,调用MockClusterInvoker#doMockInvoke。
如果mock=fail,调用FailoverClusterInvoker#invoke,如果出现异常,再调用MockClusterInvoker#doMockInvoke。
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || value.equalsIgnoreCase("false")) {
//no mock
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
//force:direct mock
result = doMockInvoke(invocation, null);
} else {
//fail-mock
try {
result = this.invoker.invoke(invocation);
} catch (RpcException e) {
//省略代码
result = doMockInvoke(invocation, e);
}
}
return result;
}
Invocation封装了调用方法名称,参数类型,参数值,返回类型。
AbstractClusterInvoker#invoke
先获取服务提供者列表和负载均衡策略,再发起远程调用。
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
// binding attachments into invocation.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
List<Invoker<T>> invokers = list(invocation);
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
(1)StaticDirectory#doList
获取注册invoker。
AbstractClusterInvoker#list–>AbstractDirectory#list–>StaticDirectory#doList
调用StaticDirectory#doList获取到2个MockerClusterInvoker对象。这里invokers的directory是RegistryDirectory(注册目录)。
(2)AbstractClusterInvoker#initLoadBalance
获取负载均衡算法。
判断第一个注册invoker是否带有loadbalance参数,如果没有,默认值是random。
RandomLoadBalance 随机调用
RoundRobinLoadBalance 轮询调用
ConsistentHashLoadBalance 一致性Hash调用
LeastActiveLoadBalance 最少活跃调用
protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
if (CollectionUtils.isNotEmpty(invokers)) {
return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(RpcUtils.getMethodName(invocation), LOADBALANCE_KEY, DEFAULT_LOADBALANCE));
} else {
return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(DEFAULT_LOADBALANCE);
}
}
FailoverClusterInvoker#doInvoke
MockClusterInvoker#invoke–>AbstractClusterInvoker#invoke–>FailoverClusterInvoker#doInvoke
获取方法的重试次数,默认是2,如果没有设定重试次数,一个方法最多调用3次,重试2次。负载均衡选举出一个invoker,然后调用这个invoker的invoke方法。
FailoverClusterInvoker#doInvoke
–>InvokerWrapper#invoke
–>ProtocolFilterWrapper$CallbackRegistrationInvoker#invoke
–>ProtocolFilterWrapper$1#invoke(ProtocolFilterWrapper#buildInvokerChain构造的invoker)
–>ConsumerContextFilter#invoke
–>FutureFilter#invoke
–>MonitorFilter#invoke
–>ListenerInvokerWrapper#invoke
–>AsyncToSyncInvoker#invoke
–>AbstractInvoker#invoke 判断方法调用是同步还是异步
–>DubboInvoker#doInvoke
–>ReferenceCountExchangeClient#request
–>HeaderExchangeClient#request
–>HeaderExchangeChannel#request
–>AbstractPeer#send
–>AbstractClient#send
–>NettyChannel#send
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
copyInvokers = list(invocation);
// check again
checkInvokers(copyInvokers, invocation);
}
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
//省略异常抛出代码
}
return result;
} catch (RpcException e) {
//省略
} catch (Throwable e) {
//省略
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
//省略异常RpcException抛出代码
}
(1)RegistryDirectory#doList
AbstractClusterInvoker#list–>AbstractDirectory#list–>RegistryDirectory#doList
返回一个RegistryDirectory$invokerDelegate。
(2)AbstractClusterInvoker#doSelect
AbstractClusterInvoker#select–>AbstractClusterInvoker#doSelect
使用负载均衡策略选择一个invoker。
第一步:使用负载均衡策略选择一个invoker,如果这个invoker在选举过的列表里,或者这个invoker不可用,使用第二步重新选择,否则返回这次选出的invoker。
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (CollectionUtils.isEmpty(invokers)) {
return null;
}
if (invokers.size() == 1) {
return invokers.get(0);
}
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
//If the `invoker` is in the `selected` or invoker is unavailable && availablecheck is true, reselect.
if ((selected != null && selected.contains(invoker))
|| (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
try {
Invoker<T> rInvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
if (rInvoker != null) {
invoker = rInvoker;
} else {
//Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
int index = invokers.indexOf(invoker);
try {
//Avoid collision
invoker = invokers.get((index + 1) % invokers.size());
} catch (Exception e) {
logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
}
}
} catch (Throwable t) {
logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
}
}
return invoker;
}
第二步:重新选举,优先调用不在已经选举过的列表的服务实例。如果所有服务实例已经被调用过,使用负载均衡策略选举一个可用的。
private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck) throws RpcException {
//Allocating one in advance, this list is certain to be used.
List<Invoker<T>> reselectInvokers = new ArrayList<>(
invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());
// First, try picking a invoker not in `selected`.
for (Invoker<T> invoker : invokers) {
if (availablecheck && !invoker.isAvailable()) {
continue;
}
if (selected == null || !selected.contains(invoker)) {
reselectInvokers.add(invoker);
}
}
if (!reselectInvokers.isEmpty()) {
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
// Just pick an available invoker using loadbalance policy
if (selected != null) {
for (Invoker<T> invoker : selected) {
if ((invoker.isAvailable()) // available first
&& !reselectInvokers.contains(invoker)) {
reselectInvokers.add(invoker);
}
}
}
if (!reselectInvokers.isEmpty()) {
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
return null;
}
服务提供者响应调用
HeaderExchangeHandler#received
(1)更新时间戳,心跳处理会根据这个值判断是否超过空闲时间。
(2)request.isEvent()判断是否是事件类型。对于readonly事件,用于Dubbo优雅停机,服务端下线时,因为网络原因,客户端不能及时感知注册中心事件,服务端就会发送readonly报文通知下线。
(3)handleRequest处理Request请求。
(4)handleResponse处理Response响应。
(5)支持Telnet调用。
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
handleRequest(exchangeChannel, request);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
HeaderExchangeHandler#handleRequest
(1)如果请求参数是Throwable类型, 将异常转成错误信息字符串直接返回。
(2)handler.reply==>DubboProtocol#reply进行方法调用。
(3)远程方法调用完成后,调用channel.send发送Response响应。
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) {
Object data = req.getData();
String msg;
if (data == null) {
msg = null;
} else if (data instanceof Throwable) {
msg = StringUtils.toString((Throwable) data);
} else {
msg = data.toString();
}
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
channel.send(res);
return;
}
// find handler by message class.
Object msg = req.getData();
try {
CompletionStage<Object> future = handler.reply(channel, msg);
future.whenComplete((appResult, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(appResult);
} else {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
channel.send(res);
} catch (RemotingException e) {
logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
} finally {
// HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
public static void received(Channel channel, Response response) {
received(channel, response, false);
}
public static void received(Channel channel, Response response, boolean timeout) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
Timeout t = future.timeoutCheckTask;
if (!timeout) {
// decrease Time
t.cancel();
}
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) {
this.complete(res.getResult());
} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
} else {
this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
}
}
DubboProtocol#requestHandler
查找服务提供方invoker实例,并进行服务的真实调用。
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
+ " not found in callback service interface ,invoke will be ignored."
+ " please update the api interface. url is:"
+ invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
Result result = invoker.invoke(inv);
return result.completionFuture().thenApply(Function.identity());
}
获取服务暴露的端口和调用传递的接口,根据端口、接口名、接口分组和版本构造唯一的key,从HashMap中取出对应的Exporter并调用Invoker属性值(在服务暴露时,提供的服务会根据端口、接口名、接口版本和接口分组把实例Invoker存储到HashMap中,客户端调用时需要带有相同信息构造的key)。
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
boolean isCallBackServiceInvoke = false;
boolean isStubServiceInvoke = false;
int port = channel.getLocalAddress().getPort();
String path = inv.getAttachments().get(PATH_KEY);
// if it's callback service on client side
isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(STUB_EVENT_KEY));
if (isStubServiceInvoke) {
port = channel.getRemoteAddress().getPort();
}
//callback
isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
if (isCallBackServiceInvoke) {
path += "." + inv.getAttachments().get(CALLBACK_SERVICE_KEY);
inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
}
String serviceKey = serviceKey(port, path, inv.getAttachments().get(VERSION_KEY), inv.getAttachments().get(GROUP_KEY));
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
if (exporter == null) {
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " +
", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
}
return exporter.getInvoker();
}
AbstractProxyInvoker#invoke
AbstractProxyInvoker有2个匿名内部实现类,分别是JavassistProxyFactory和JdkProxyFactory。调用代理类的doInvoke方法,会去调用真正的服务方法,获取到返回值后,封装成AsyncRpcResult 。
public Result invoke(Invocation invocation) throws RpcException {
try {
Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
CompletableFuture<Object> future = wrapWithFuture(value, invocation);
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
future.whenComplete((obj, t) -> {
AppResponse result = new AppResponse();
if (t != null) {
if (t instanceof CompletionException) {
result.setException(t.getCause());
} else {
result.setException(t);
}
} else {
result.setValue(obj);
}
asyncRpcResult.complete(result);
});
return asyncRpcResult;
} catch (InvocationTargetException e) {
if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
}
return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
} catch (Throwable e) {
throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
心跳检测
HeartbeatTimerTask#doTask
TCP连接空闲超过心跳时间,发送心跳事件报文。
protected void doTask(Channel channel) {
try {
Long lastRead = lastRead(channel);
Long lastWrite = lastWrite(channel);
if ((lastRead != null && now() - lastRead > heartbeat)
|| (lastWrite != null && now() - lastWrite > heartbeat)) {
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setEvent(Request.HEARTBEAT_EVENT);
channel.send(req);
if (logger.isDebugEnabled()) {
logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period: "
+ heartbeat + "ms");
}
}
} catch (Throwable t) {
logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
}
}
ReconnectTask#doTask
如果客户端断开连接,会重新连接;客户端空闲超时也会触发重连。
protected void doTask(Channel channel) {
try {
Long lastRead = lastRead(channel);
Long now = now();
// Rely on reconnect timer to reconnect when AbstractClient.doConnect fails to init the connection
if (!channel.isConnected()) {
try {
logger.info("Initial connection to " + channel);
((Client) channel).reconnect();
} catch (Exception e) {
logger.error("Fail to connect to " + channel, e);
}
// check pong at client
} else if (lastRead != null && now - lastRead > idleTimeout) {
logger.warn("Reconnect to channel " + channel + ", because heartbeat read idle time out: "
+ idleTimeout + "ms");
try {
((Client) channel).reconnect();
} catch (Exception e) {
logger.error(channel + "reconnect failed during idle time.", e);
}
}
} catch (Throwable t) {
logger.warn("Exception when reconnect to remote channel " + channel.getRemoteAddress(), t);
}
}