Dubbo ZookeeperRegistry分析

开篇

 这篇文章的目的是在于梳理Dubbo ZookeeperRegistry的注册流程,通过这个流程的分析能够延伸到更多的注册中心。

 核心的关注点在于Registry和RegistryFactory对象,RegistryFactory负责动态创建Registry对象,Registry对象负责执行注册中心的注册。

  ServiceConfig类的Protocol的动态代理根据类型返回Protocol对象,注册中心的Protocol对象是RegistryProtocol。


源码分析过程

public class ServiceConfig<T> extends AbstractServiceConfig {
    // protocol是Protocol$Adaptive对象
    private static final Protocol protocol = 
      ExtensionLoader.getExtensionLoader(Protocol.class)
     .getAdaptiveExtension();


    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        
        String scope = url.getParameter(SCOPE_KEY);

        if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    for (URL registryURL : registryURLs) {
                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                        // 执行Protocol$Adaptive的export()方法
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {

                    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                    // 执行Protocol$Adaptive的export()方法
                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            }
        }
    }
}
  • ServiceConfig的doExportUrlsFor1Protocol()方法执行export()方法。
  • protocol.export()中的protocol指的是Protocol$Adaptive对象。
  • 继续阅读Protocol$Adaptive的export()方法。


public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {

    public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
        
        if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
        //  这里的url内容是registry:xxx,如下面的
        org.apache.dubbo.common.URL url = arg0.getUrl();
        // extName指的是url的协议名,这里是取的registry
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
        // 返回RegistryProtocol对象
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
        
        return extension.export(arg0);
    }   
}




registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?
application=dubbo-demo-api-provider&dubbo=2.0.2
&export=dubbo://192.168.1.5:20880/org.apache.dubbo.demo.DemoService?
anyhost=true&application=dubbo-demo-api-provider&bind.ip=192.168.1.5
&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false
&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=24212
&release=&side=provider&timestamp=1571482218716&pid=24212
&registry=zookeeper&timestamp=1571482215899
  • Protocol$Adaptive的export()方法内部根据URL获取扩展名,url的协议名是registry,返回的扩展是RegistryProtocol对象。
  • 继续阅读RegistryProtocol的export()方法。


public class RegistryProtocol implements Protocol {

    private Cluster cluster;
    private Protocol protocol;
    // registryFactory是RegistryFactory$Adaptive对象
    private RegistryFactory registryFactory;
    private ProxyFactory proxyFactory;

    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        URL registryUrl = getRegistryUrl(originInvoker);
        URL providerUrl = getProviderUrl(originInvoker);

        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

        // url to registry
        final Registry registry = getRegistry(originInvoker);
        final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
        ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                registryUrl, registeredProviderUrl);
        //to judge if we need to delay publish
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        if (register) {
            register(registryUrl, registeredProviderUrl);
            providerInvokerWrapper.setReg(true);
        }

        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);

        return new DestroyableExporter<>(exporter);
    }



    private Registry getRegistry(final Invoker<?> originInvoker) {
        URL registryUrl = getRegistryUrl(originInvoker);

        return registryFactory.getRegistry(registryUrl);
    }


    public void register(URL registryUrl, URL registeredProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registeredProviderUrl);
    }
}



public class RegistryFactory$Adaptive implements RegistryFactory {

    public Registry getRegistry(URL uRL) {
        String string;
        if (uRL == null) {
            throw new IllegalArgumentException("url == null");
        }

        URL uRL2 = uRL;
        String string2 = string = uRL2.getProtocol() == null ? "dubbo" : uRL2.getProtocol();
        if (string == null) {
            throw new IllegalStateException(new StringBuffer().append("Failed to get extension (org.apache.dubbo.registry.RegistryFactory) name from url (").append(uRL2.toString()).append(") use keys([protocol])").toString());
        }

        RegistryFactory registryFactory = (RegistryFactory)ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(string);
        
        return registryFactory.getRegistry(uRL);
    }
}
providerUrl的例子:
dubbo://192.168.1.5:20880/org.apache.dubbo.demo.DemoService?
anyhost=true&application=dubbo-demo-api-provider&deprecated=false&dubbo=2.0.2
&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService
&methods=sayHello&pid=25662&release=&side=provider&timestamp=1571485618358

registryUrl的例子:
zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?
application=dubbo-demo-api-provider&dubbo=2.0.2
&export=dubbo://192.168.1.5:20880/org.apache.dubbo.demo.DemoService?
anyhost=true&application=dubbo-demo-api-provider&bind.ip=192.168.1.5
&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false
&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=31349
&release=&side=provider&timestamp=1571498589827&pid=31349&timestamp=1571498589815
  • RegistryProtocol的export()方法内部执行核心三个步骤,获取Reigstry对象、执行register()、执行subscribe()。
  • providerUrl和registryUrl的内容例子如上图。
  • RegistryProtocol的getRegistry()方法通过registryFactory.getRegistry()返回ZookeeperRegistry对象。
  • registryFactory对象是RegistryFactory$Adaptive对象。
  • RegistryFactory$Adaptive的getRegistry()的URL参数协议是zookeeper,通过扩展返回的是ZookeeperRegistryFactory对象。
  • registryFactory.getRegistry(uRL)执行ZookeeperRegistryFactory.getRegistry()返回ZookeeperRegistry对象。
  • RegistryProtocol获取Reigstry对象是ZookeeperRegistry,执行ZookeeperRegistry的register()和subscribe()方法。
  • ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(string)中string是"zookeeper",返回的ZookeeperRegistryFactory对象。


public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

    private ZookeeperTransporter zookeeperTransporter;

    public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
        this.zookeeperTransporter = zookeeperTransporter;
    }

    @Override
    public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }
}


public abstract class AbstractRegistryFactory implements RegistryFactory {

    public Registry getRegistry(URL url) {
        url = URLBuilder.from(url)
                .setPath(RegistryService.class.getName())
                .addParameter(INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(EXPORT_KEY, REFER_KEY)
                .build();
        String key = url.toServiceStringWithoutResolving();
        // Lock the registry access process to ensure a single instance of the registry
        LOCK.lock();
        try {
            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
                return registry;
            }
            //create registry by spi/ioc
            registry = createRegistry(url);
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }
}
  • ZookeeperRegistryFactory作为AbstractRegistryFactory的子类,父类AbstractRegistryFactory的getRegistry会调用子类的createRegistry()方法返回ZookeeperRegistry对象。
  • 执行ZookeeperRegistry的register()和subscribe()方法。


public class ZookeeperRegistry extends FailbackRegistry {

    private final static String DEFAULT_ROOT = "dubbo";
    private final String root;
    private final Set<String> anyServices = new ConcurrentHashSet<>();
    private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<>();
    private final ZookeeperClient zkClient;

    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(PATH_SEPARATOR)) {
            group = PATH_SEPARATOR + group;
        }
        this.root = group;
        zkClient = zookeeperTransporter.connect(url);
        zkClient.addStateListener(state -> {
            if (state == StateListener.RECONNECTED) {
                try {
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        });
    }

    @Override
    public void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }


    public void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            if (ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                    listeners = zkListeners.get(url);
                }
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, (parentPath, currentChilds) -> {
                        for (String child : currentChilds) {
                            child = URL.decode(child);
                            if (!anyServices.contains(child)) {
                                anyServices.add(child);
                                subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
                                        Constants.CHECK_KEY, String.valueOf(false)), listener);
                            }
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                zkClient.create(root, false);
                List<String> services = zkClient.addChildListener(root, zkListener);
                if (CollectionUtils.isNotEmpty(services)) {
                    for (String service : services) {
                        service = URL.decode(service);
                        anyServices.add(service);
                        subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                    }
                }
            } else {
                List<URL> urls = new ArrayList<>();
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
                        zkListener = listeners.get(listener);
                    }
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

}



public abstract class FailbackRegistry extends AbstractRegistry {

    public void register(URL url) {
        super.register(url);
        removeFailedRegistered(url);
        removeFailedUnregistered(url);
        try {
            doRegister(url);
        } catch (Exception e) {
        }
    }

    public void subscribe(URL url, NotifyListener listener) {
        super.subscribe(url, listener);
        removeFailedSubscribed(url, listener);
        try {
            // Sending a subscription request to the server side
            doSubscribe(url, listener);
        } catch (Exception e) {
           
        }
    }
}
  • ZookeeperRegistry是FailbackRegistry的子类,父类FailbackRegistry统一的register()和subscribe()入口,具体的实现在子类ZookeeperRegistry。
  • 子类ZookeeperRegistry的doRegister()和doSubscribe()是执行具体的注册和订阅工作。
  • ZookeeperRegistry的doRegister()核心是在zookeeper节点上创建zk临时节点。
  • ZookeeperRegistry的doSubscribe()过于复杂,在consumer端进行分析。


Registry类图

Dubbo ZookeeperRegistry分析

Dubbo ZookeeperRegistry分析

上一篇:大神之路你必须了解的——Java 设计模式


下一篇:从零开始入门 | Kubernetes 中的服务发现与负载均衡