开篇
这篇文章的目的在于描述Dubbo Consumer在直连和注册中心两种场景下针对provider侧invoker的封装。整篇文章主要从单注册中心、单直连地址、多注册中心、多直连地址的角度进行分析。
通过这篇文章能够了解到Consumer侧针对invoker的生成流程,通过invoker的生成可以了解invoker的调用链。
Consumer reference流程
public class ReferenceConfig<T> extends AbstractReferenceConfig {
private static final Protocol REF_PROTOCOL =
ExtensionLoader.getExtensionLoader(Protocol.class)
.getAdaptiveExtension();
private T createProxy(Map<String, String> map) {
if (shouldJvmRefer(map)) {
// 省略无关代码
} else {
urls.clear();
// 处理reference配置直连情况
if (url != null && url.length() > 0) {
// 处理逗号分隔的直连地址
String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
url = url.setPath(interfaceName);
}
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
} else {
// 添加直连地址到urls
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else {
// 处理注册中心的情况
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())){
checkRegistry();
// 获取注册中心
List<URL> us = loadRegistries(false);
// 添加所有注册中心到urls
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {
}
}
}
// 处理单注册中心或者只有单个直连的情况
if (urls.size() == 1) {
// 单注册中心协议为"registry",直连场景下协议为"dubbo"
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else { // 处理多注册中心或者只有多个直连的情况
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
// 单注册中心协议为"registry",直连场景下协议为"dubbo"
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) {
// 针对多注册中心的方式,通过RegistryAwareCluster进行封装
URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);
// RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
invoker = CLUSTER.join(new StaticDirectory(u, invokers));
} else { // 处理多直连方式
invoker = CLUSTER.join(new StaticDirectory(invokers));
}
}
}
if (shouldCheck() && !invoker.isAvailable()) {
// 处理invoker可用检查的逻辑
throw new IllegalStateException("Failed to check the status of the service "
+ interfaceName + ". No provider available for the service "
+ (group == null ? "" : group + "/") + interfaceName
+ (version == null ? "" : ":" + version) + " from the url "
+ invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion());
}
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
metadataReportService.publishConsumer(consumerURL);
}
// create service proxy
return (T) PROXY_FACTORY.getProxy(invoker);
}
}
- 区分单注册中心、单直连地址、多注册中心、多直连地址的四种情况生成invoker。
- "urls.size() == 1"的条件用于处理单注册中心和单直连地址两种情况。
- "urls.size() != 1"的条件用于处理多注册中心和多直连地址两种情况。
Protocol适配器
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
// 获取扩展
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
// 执行扩展的refer方法
return extension.refer(arg0, arg1);
}
}
com.alibaba.dubbo.rpc.Protocol文件
filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=com.alibaba.dubbo.rpc.support.MockProtocol
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol
rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol
hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol
com.alibaba.dubbo.rpc.protocol.http.HttpProtocol
com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol
thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol
memcached=com.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol
redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol
rest=com.alibaba.dubbo.rpc.protocol.rest.RestProtocol
registry=com.alibaba.dubbo.registry.integration.RegistryProtocol
qos=com.alibaba.dubbo.qos.protocol.QosProtocolWrapper
- Protocol$Adaptive根据url.getProtocol()的Protocol协议生成对应的Protocol对象。
- Protocol文件内容如上图所示。
Cluster适配器
public class Cluster$Adaptive implements Cluster {
public Invoker join(Directory directory) throws RpcException {
if (directory == null) {
throw new IllegalArgumentException("org.apache.dubbo.rpc.cluster.Directory argument == null");
}
if (directory.getUrl() == null) {
throw new IllegalArgumentException("org.apache.dubbo.rpc.cluster.Directory argument getUrl() == null");
}
URL uRL = directory.getUrl();
String string = uRL.getParameter("cluster", "failover");
if (string == null) {
throw new IllegalStateException(new StringBuffer()
.append("Failed to get extension (org.apache.dubbo.rpc.cluster.Cluster) name from url (")
.append(uRL.toString()).append(") use keys([cluster])").toString());
}
Cluster cluster = (Cluster)ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(string);
return cluster.join(directory);
}
}
com.alibaba.dubbo.rpc.cluster.Cluster文件
mock=org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper
failover=org.apache.dubbo.rpc.cluster.support.FailoverCluster
failfast=org.apache.dubbo.rpc.cluster.support.FailfastCluster
failsafe=org.apache.dubbo.rpc.cluster.support.FailsafeCluster
failback=org.apache.dubbo.rpc.cluster.support.FailbackCluster
forking=org.apache.dubbo.rpc.cluster.support.ForkingCluster
available=org.apache.dubbo.rpc.cluster.support.AvailableCluster
mergeable=org.apache.dubbo.rpc.cluster.support.MergeableCluster
broadcast=org.apache.dubbo.rpc.cluster.support.BroadcastCluster
registryaware=org.apache.dubbo.rpc.cluster.support.RegistryAwareCluster
- Cluster$Adaptive根据uRL.getParameter("cluster", "failover")的Cluster协议生成对应的Cluster对象。
- Cluster文件内容如上图所示。
单注册中心或直连地址场景
处理单注册中心场景
public class RegistryProtocol implements Protocol {
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = URLBuilder.from(url)
.setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
.removeParameter(REGISTRY_KEY)
.build();
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
return doRefer(cluster, registry, type, url);
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
// invoker为MockClusterInvoker=>FailoverClusterInvoker
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
}
- 单注册中心场景下,urls.get(0)的协议是”registry“,Protocol$Adaptive返回的被封装的Protocol对象为RegistryProtocol。
- ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("registry")返回的extension为ProtocolListenerWrapper对象,ProtocolListenerWrapper的封装链为ProtocolListenerWrapper => ProtocolFilterWrapper => RegistryProtocol。
- extension.refer()过程按照ProtocolListenerWrapper.refer() => ProtocolFilterWrapper.refer() => RegistryProtocol.refer()流程调用。
- ProtocolListenerWrapper.refer() => ProtocolFilterWrapper.refer() => RegistryProtocol.refer()的调用链路针对"registry"作了特殊处理,直接走RegistryProtocol.refer()的方法。
- RegistryProtocol.refer()执行cluster.join(directory)生成MockClusterInvoker对象,封装关系为MockClusterInvoker=>FailoverClusterInvoker。
- "registry"协议返回的invoker对象封装关系为MockClusterInvoker => FailoverClusterInvoker。
- MockClusterInvoker的invoke()方法会执行FailoverClusterInvoker的doInvoke()方法,进入Dubbo的Cluster集群调用策略。
处理单直连地址场景
public class DubboProtocol extends AbstractProtocol {
public static final String NAME = "dubbo";
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
}
public abstract class AbstractProtocol implements Protocol {
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}
protected abstract <T> Invoker<T> protocolBindingRefer(Class<T> type, URL url) throws RpcException;
}
public class AsyncToSyncInvoker<T> implements Invoker<T> {
private Invoker<T> invoker;
public AsyncToSyncInvoker(Invoker<T> invoker) {
this.invoker = invoker;
}
}
- 单直连地址场景下,urls.get(0)的协议为"dubbo",Protocol$Adaptive返回的被封装的Protocol对象为DubboProtocol。
- ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("dubbo")返回的extension为ProtocolListenerWrapper对象,ProtocolListenerWrapper的封装链为ProtocolListenerWrapper => ProtocolFilterWrapper => DubboProtocol。
- extension.refer()过程按照ProtocolListenerWrapper.refer() => ProtocolFilterWrapper.refer() => DubboProtocol.refer()流程调用。
- DubboProtocol.refer()执行AbstractProtocol.refer()方法生成AsyncToSyncInvoker对象, AsyncToSyncInvoker对象内部包含DubboInvoker对象。
- ”dubbo“协议的invoker对象封装关系为ListenerInvokerWrapper => ProtocolFilterWrapper => AsyncToSyncInvoker。
- AsyncToSyncInvoker的invoke()方法会执行DubboProtocol的invoke()方法。
多注册中心或多直连地址场景
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
// 单注册中心协议为"registry",直连场景下协议为"dubbo"
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) {
// 针对注册中心的方式,通过RegistryAwareCluster进行封装
URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);
// RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
invoker = CLUSTER.join(new StaticDirectory(u, invokers));
} else { // 处理直连方式
invoker = CLUSTER.join(new StaticDirectory(invokers));
}
- 多注册中心场景下invokers为MockClusterInvoker对象列表,CLUSTER.join()方法的Cluster为"failover"。
- 多直连地址场景下invokers为ListenerInvokerWrapper对象列表,CLUSTER.join()方法的Cluster为"failover"。
处理多注册中心场景
- 多注册中心场景下每个注册中心对应MockClusterInvoker对象,在外层有一层MockClusterInvoker包装。
- 多注册中心场景下通过MockClusterInvoker包装多个注册中心中每个注册中心对应的MockClusterInvoker。
处理多直接地址场景
- 多直连地址场景下每个直连地址对应ListenerInvokerWrapper对象,在外层有一层MockClusterInvoker包装。
- 多直连地址场景下通过MockClusterInvoker包装直连地址对应的ListenerInvokerWrapper对象列表。
总结
- 在单直连地址场景下:invoker对象为ListenerInvokerWrapper。
- 在多直连地址场景下:invoker对象为MockClusterInvoker,内部包含ListenerInvokerWrapper对象。
- 在单注册中心场景下:invoker对象为MockClusterInvoker。
- 在多注册中心场景下:invoker对象为MockClusterInvoker,MockClusterInvoker内部包含注册中心对应的MockClusterInvoker对象,相当于在多注册中心情况下,每个注册中心对应一个MockClusterInvoker对象,外部通过MockClusterInvoker进行二次封装。