开篇
这篇文章的目的是在于梳理Dubbo ZookeeperRegistry的注册流程,通过这个流程的分析能够延伸到更多的注册中心。
核心的关注点在于Registry和RegistryFactory对象,RegistryFactory负责动态创建Registry对象,Registry对象负责执行注册中心的注册。
ServiceConfig类的Protocol的动态代理根据类型返回Protocol对象,注册中心的Protocol对象是RegistryProtocol。
源码分析过程
public class ServiceConfig<T> extends AbstractServiceConfig {
// protocol是Protocol$Adaptive对象
private static final Protocol protocol =
ExtensionLoader.getExtensionLoader(Protocol.class)
.getAdaptiveExtension();
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String scope = url.getParameter(SCOPE_KEY);
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 执行Protocol$Adaptive的export()方法
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 执行Protocol$Adaptive的export()方法
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
}
}
- ServiceConfig的doExportUrlsFor1Protocol()方法执行export()方法。
- protocol.export()中的protocol指的是Protocol$Adaptive对象。
- 继续阅读Protocol$Adaptive的export()方法。
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
// 这里的url内容是registry:xxx,如下面的
org.apache.dubbo.common.URL url = arg0.getUrl();
// extName指的是url的协议名,这里是取的registry
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
// 返回RegistryProtocol对象
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
}
registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?
application=dubbo-demo-api-provider&dubbo=2.0.2
&export=dubbo://192.168.1.5:20880/org.apache.dubbo.demo.DemoService?
anyhost=true&application=dubbo-demo-api-provider&bind.ip=192.168.1.5
&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false
&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=24212
&release=&side=provider×tamp=1571482218716&pid=24212
®istry=zookeeper×tamp=1571482215899
- Protocol$Adaptive的export()方法内部根据URL获取扩展名,url的协议名是registry,返回的扩展是RegistryProtocol对象。
- 继续阅读RegistryProtocol的export()方法。
public class RegistryProtocol implements Protocol {
private Cluster cluster;
private Protocol protocol;
// registryFactory是RegistryFactory$Adaptive对象
private RegistryFactory registryFactory;
private ProxyFactory proxyFactory;
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
URL registryUrl = getRegistryUrl(originInvoker);
URL providerUrl = getProviderUrl(originInvoker);
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
//to judge if we need to delay publish
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
return new DestroyableExporter<>(exporter);
}
private Registry getRegistry(final Invoker<?> originInvoker) {
URL registryUrl = getRegistryUrl(originInvoker);
return registryFactory.getRegistry(registryUrl);
}
public void register(URL registryUrl, URL registeredProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl);
}
}
public class RegistryFactory$Adaptive implements RegistryFactory {
public Registry getRegistry(URL uRL) {
String string;
if (uRL == null) {
throw new IllegalArgumentException("url == null");
}
URL uRL2 = uRL;
String string2 = string = uRL2.getProtocol() == null ? "dubbo" : uRL2.getProtocol();
if (string == null) {
throw new IllegalStateException(new StringBuffer().append("Failed to get extension (org.apache.dubbo.registry.RegistryFactory) name from url (").append(uRL2.toString()).append(") use keys([protocol])").toString());
}
RegistryFactory registryFactory = (RegistryFactory)ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(string);
return registryFactory.getRegistry(uRL);
}
}
providerUrl的例子:
dubbo://192.168.1.5:20880/org.apache.dubbo.demo.DemoService?
anyhost=true&application=dubbo-demo-api-provider&deprecated=false&dubbo=2.0.2
&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService
&methods=sayHello&pid=25662&release=&side=provider×tamp=1571485618358
registryUrl的例子:
zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?
application=dubbo-demo-api-provider&dubbo=2.0.2
&export=dubbo://192.168.1.5:20880/org.apache.dubbo.demo.DemoService?
anyhost=true&application=dubbo-demo-api-provider&bind.ip=192.168.1.5
&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false
&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=31349
&release=&side=provider×tamp=1571498589827&pid=31349×tamp=1571498589815
- RegistryProtocol的export()方法内部执行核心三个步骤,获取Reigstry对象、执行register()、执行subscribe()。
- providerUrl和registryUrl的内容例子如上图。
- RegistryProtocol的getRegistry()方法通过registryFactory.getRegistry()返回ZookeeperRegistry对象。
- registryFactory对象是RegistryFactory$Adaptive对象。
- RegistryFactory$Adaptive的getRegistry()的URL参数协议是zookeeper,通过扩展返回的是ZookeeperRegistryFactory对象。
- registryFactory.getRegistry(uRL)执行ZookeeperRegistryFactory.getRegistry()返回ZookeeperRegistry对象。
- RegistryProtocol获取Reigstry对象是ZookeeperRegistry,执行ZookeeperRegistry的register()和subscribe()方法。
- ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(string)中string是"zookeeper",返回的ZookeeperRegistryFactory对象。
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
private ZookeeperTransporter zookeeperTransporter;
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
@Override
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
public abstract class AbstractRegistryFactory implements RegistryFactory {
public Registry getRegistry(URL url) {
url = URLBuilder.from(url)
.setPath(RegistryService.class.getName())
.addParameter(INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(EXPORT_KEY, REFER_KEY)
.build();
String key = url.toServiceStringWithoutResolving();
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//create registry by spi/ioc
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
}
- ZookeeperRegistryFactory作为AbstractRegistryFactory的子类,父类AbstractRegistryFactory的getRegistry会调用子类的createRegistry()方法返回ZookeeperRegistry对象。
- 执行ZookeeperRegistry的register()和subscribe()方法。
public class ZookeeperRegistry extends FailbackRegistry {
private final static String DEFAULT_ROOT = "dubbo";
private final String root;
private final Set<String> anyServices = new ConcurrentHashSet<>();
private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<>();
private final ZookeeperClient zkClient;
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(PATH_SEPARATOR)) {
group = PATH_SEPARATOR + group;
}
this.root = group;
zkClient = zookeeperTransporter.connect(url);
zkClient.addStateListener(state -> {
if (state == StateListener.RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
});
}
@Override
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, (parentPath, currentChilds) -> {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
});
zkListener = listeners.get(listener);
}
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (CollectionUtils.isNotEmpty(services)) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
List<URL> urls = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkListener = listeners.get(listener);
}
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}
public abstract class FailbackRegistry extends AbstractRegistry {
public void register(URL url) {
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
doRegister(url);
} catch (Exception e) {
}
}
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// Sending a subscription request to the server side
doSubscribe(url, listener);
} catch (Exception e) {
}
}
}
- ZookeeperRegistry是FailbackRegistry的子类,父类FailbackRegistry统一的register()和subscribe()入口,具体的实现在子类ZookeeperRegistry。
- 子类ZookeeperRegistry的doRegister()和doSubscribe()是执行具体的注册和订阅工作。
- ZookeeperRegistry的doRegister()核心是在zookeeper节点上创建zk临时节点。
- ZookeeperRegistry的doSubscribe()过于复杂,在consumer端进行分析。