Dubbo源码解析-RegistryDirectory层的解析

前言:

在分析完Dubbo的整体架构之后,我们对每个层次来单独分析下。

我们的消费者在启动时,会去查询其所有对应的provider,并将URL转换为Invoker保存到当前内存,并启动对provider的监听,当其发生变动时,可以及时反馈到当前,对Invoker列表进行更新。

那么以上是如何实现的呢?

作为注册中心层,我们可以看到结构如下图:

从RegistryProtocol开始,在RegistryFactory中获取到对应的Registry(示例中采取的是ZookeeperRegistry)

new一个RegistryDirectory,其总负责对注册中心的监听,当有provider发生变动时,可以及时反馈到consumer。

本文就从RegistryProtocol.refer()方法开始聊起。

1.RegistryProtocol.refer()

public class RegistryProtocol implements Protocol {
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        // 获取注册url,本例中以zookeeper:// 开头
        url = getRegistryUrl(url);
        // 所以从registryFactory中获取到的最终为ZookeeperRegistry
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
        String group = qs.get(GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        // 继续调用doRefer()方法
        return doRefer(cluster, registry, type, url);
    }

}

通过注册url zookeeper://... 来确定最终使用的注册中心类型为:ZookeeperRegistry。

2. RegistryProtocol.doRefer()

public class RegistryProtocol implements Protocol {
	private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (directory.isShouldRegister()) {
            directory.setRegisteredConsumerUrl(subscribeUrl);
            // 调用ZookeeperRegistry.register()方法,将当前consumer_url注册到Zookeeper上(本质上就是创建一个临时节点)
            // 具体见2.1
            registry.register(directory.getRegisteredConsumerUrl());
        }
        directory.buildRouterChain(subscribeUrl);
        // RegistryDirectory订阅url变更
        directory.subscribe(toSubscribeUrl(subscribeUrl));

        Invoker<T> invoker = cluster.join(directory);
        List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
        if (CollectionUtils.isEmpty(listeners)) {
            return invoker;
        }

        RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
        for (RegistryProtocolListener listener : listeners) {
            listener.onRefer(this, registryInvokerWrapper);
        }
        return registryInvokerWrapper;
    }
}

2.1 ZookeeperRegistry.registry() 注册url

registry()方法在父类FailbackRegistry.java中,最终还是调用到子类的doRegistry()方法

public class ZookeeperRegistry extends FailbackRegistry {
    public void doRegister(URL url) {
        try {
            // 创建临时节点
            zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
}

2.2 RegistryDirectory.subscribe() 

public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
	public void subscribe(URL url) {
        setConsumerUrl(url);
        CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
        serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
        // 调用ZookeeperRegistry.subscribe()订阅方法
        registry.subscribe(url, this);
    }
}

这里需要注意的是将当前this 也就是RegistryDirectory本身当做listener传入,所以最终监听被触发时,还是会调用到RegistryDirectory

2.3 ZookeeperRegistry.subscribe() 订阅节点变更

public class ZookeeperRegistry extends FailbackRegistry {
	public void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            // 匹配所有接口
            if (ANY_VALUE.equals(url.getServiceInterface())) {
                ...
            } else {
                List<URL> urls = new ArrayList<>();
                // 获取到的path,在本例中即为:/dubbo/org.apache.dubbo.demo.DemoService/providers
                // 也就是provider的路径
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                    ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
                    zkClient.create(path, false);
                    // 创建对该provider_path的监听,监听器本身为RegistryDirectory
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                // 最后触发一次notify,调用RegistryDirectory.notify()
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
}

总结:消费者启动时,创建对provider_path(本例中为/dubbo/org.apache.dubbo.demo.DemoService/providers)的监听,监听器为RegistryDirectory。

2.4 RegistryDirectory.notify() 触发监听回调

public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
	public synchronized void notify(List<URL> urls) {
        Map<String, List<URL>> categoryUrls = urls.stream()
                .filter(Objects::nonNull)
                .filter(this::isValidCategory)
                .filter(this::isNotCompatibleFor26x)
                .collect(Collectors.groupingBy(this::judgeCategory));

        List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
        this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

        // router相关,非本文重点
        List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
        toRouters(routerURLs).ifPresent(this::addRouters);

        // providers
        List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
        ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
        List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
        if (supportedListeners != null && !supportedListeners.isEmpty()) {
            for (AddressListener addressListener : supportedListeners) {
                providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
            }
        }
        // 在这里将URL转换为Invoker,保存到RegistryDirectory.urlInvokerMap中
        refreshOverrideAndInvoker(providerURLs);
    }
}

总结:

我们可以把RegistryDirectory当做注册中心的操作层,所需要的provider信息都存放在RegistryDirectory中。

而具体的操作则交由ZookeeperRegistry来实现。

上一篇:转:V$SQL,V$SQLAREA,V$SQLTEXT


下一篇:Dubbo源码解析-Consumer发送请求全过程