开篇
这篇文章的主要目的是讲清楚ClusterInvoker在多个invoker对象进行负载均衡的调用过程,也就描述从调用到负载均衡选择的调用链路,真正的负载均衡部分的逻辑在后面由单独讲解。
selector 调用时序图
说明:
- RegistryProtocol的doRefer()方法内部cluster.join()负责创建ClusterInvoker对象,所有的cluster的invoker的选择逻辑都在这个函数实现。
- FailoverClusterInvoker继承自AbstractClusterInvoker,AbstractClusterInvoker抽象出了通用的所有类型ClusterInvoker需要使用的方法,核心的如select、doSelect、invoke等通用方法,将具体不同实现的doInvoke方法抽象给具体实现类实现。
- AbstractClusterInvoker的invoke()方法作为调用的入口,内部功能主要是根据loadBalance策略选择invoker进行执行。
- AbstractClusterInvoker的select() -> doSelect()方法内部根据不同loadBalance策略实现invoker的调用。
- 通过选择具体的invoker对象后调用invoke()方法实现远程调用。
dubbo cluster 调用过程源码
FailoverCluster对象创建入口
说明:
- 核心在于cluster.join()方法会调用到如FailoverCluster类,建议通过debug打断点方式跟踪调用栈。
public class RegistryProtocol implements Protocol {
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 构建RegistryDirectory,可以把它理解为注册资源,其中包含了消费者/服务/路由等相关信息,其同时也是回调监听器
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
// 构建subscribeUrl信息,主要拼接consumer:xxx的url地址
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
// 向注册中心注册服务消费者
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
// 从注册中心订阅服务提供者(即引用的服务)
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
// 从invoker当中选择其中一个返回
Invoker invoker = cluster.join(directory);
// 注册消费者
ProviderConsumerRegTable.registerConsuemr(invoker, url, subscribeUrl, directory);
return invoker;
}
}
FailoverClusterInvoker 负载均衡器举例
- 按照以下流程进行进行invoker的选择:AbstractClusterInvoker.invoke() -> FailoverClusterInvoker.doInvoke() -> AbstractClusterInvoker. select() -> AbstractClusterInvoker.doselect() -> AbstractLoadBalance.select()。
- invoke()和select()调用链的关键路径在于invoke由父类触发后在实现类FailoverClusterInvoker执行,然后调用的负载均衡因为是通用的所以在父类AbstractClusterInvoker中实现。
- directory.list(invocation)负责选择从ReigstryDeirectory获取服务引用的invokers列表进行负载均衡选择。
- invoker选择的过程中,在AbstractClusterInvoker.invoke()阶段会根据规则生成LoadBalance对象进行invoker的选择,在调用过程中生成LoadBalance的对象。
- LoadBalance的选择是按照URL中配置的信息负载均衡器进行选择,如果没有配置就走默认的负载均衡器。
- AbstractLoadBalance作为负载均衡器基类,提供了select()的执行流程,具体的doSelect动作由各个实现类去实现。
public class FailoverCluster implements Cluster {
public final static String NAME = "failover";
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}
}
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);
public FailoverClusterInvoker(Directory<T> directory) {
super(directory);
}
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
copyinvokers = list(invocation);
// check again
checkInvokers(copyinvokers, invocation);
}
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
return result;
} catch (RpcException e) {
} catch (Throwable e) {
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
}
}
public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
LoadBalance loadbalance;
List<Invoker<T>> invokers = list(invocation);
if (invokers != null && invokers.size() > 0) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
} else {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
List<Invoker<T>> invokers = directory.list(invocation);
return invokers;
}
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers == null || invokers.size() == 0)
return null;
String methodName = invocation == null ? "" : invocation.getMethodName();
boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
{
//ignore overloaded method
if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
stickyInvoker = null;
}
//ignore cucurrent problem
if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
if (availablecheck && stickyInvoker.isAvailable()) {
return stickyInvoker;
}
}
}
Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected);
if (sticky) {
stickyInvoker = invoker;
}
return invoker;
}
private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers == null || invokers.size() == 0)
return null;
if (invokers.size() == 1)
return invokers.get(0);
// If we only have two invokers, use round-robin instead.
if (invokers.size() == 2 && selected != null && selected.size() > 0) {
return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
}
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
//If the `invoker` is in the `selected` or invoker is unavailable && availablecheck is true, reselect.
if ((selected != null && selected.contains(invoker))
|| (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
try {
Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
if (rinvoker != null) {
invoker = rinvoker;
} else {
//Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
int index = invokers.indexOf(invoker);
try {
//Avoid collision
invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invoker;
} catch (Exception e) {
logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
}
}
} catch (Throwable t) {
logger.error("clustor relselect fail reason is :" + t.getMessage() + " if can not slove ,you can set cluster.availablecheck=false in url", t);
}
}
return invoker;
}
}
AbstractLoadBalance执行过程
- AbstractLoadBalance的select()方法作为负载均衡选择器的入口位置,提供通用的getWeight()方法获取权重。
- AbstractLoadBalance的doSelect()由具体的实现类实现。
public abstract class AbstractLoadBalance implements LoadBalance {
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (invokers == null || invokers.size() == 0)
return null;
if (invokers.size() == 1)
return invokers.get(0);
return doSelect(invokers, url, invocation);
}
protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
if (weight > 0) {
long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
int uptime = (int) (System.currentTimeMillis() - timestamp);
int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
if (uptime > 0 && uptime < warmup) {
weight = calculateWarmupWeight(uptime, warmup, weight);
}
}
}
return weight;
}
}