Dubbo初窥工作流程

Dubbo初窥工作流程


流程图

Dubbo初窥工作流程

  • 服务提供者启动时,向注册中心写入自己的元数据信息,同时会订阅配置元数据信息。
  • 消费者启动时,也会向注册中心写入自己的元数据信息,并订阅服务提供者、路由和 配置元数据信息。
  • 如果有配置monitor,启动时,会同时订阅所有消费者、服务提供者、路由和 配置元数据信息。
  • 当服务提供者有变动时(如离开或有新的服务提供者加入),注册中心将变更信息通知给消费者、服务治理中心。
  • 当消费者发起服务调用时,会0将调用、统计信息等上报给监控中心 。(monitor)

服务提供者注册服务

当服务启动时,先初始化服务实例,再通过Proxy组件调用协议(Protocol ),把服务端要暴露的接口封装成Invoker ,然后转换成Exporter,这个时候框架会打开服务端口等并记录服务实例到内存中,最后通过Registry把服务元数据注册到注册中心。

  • Proxy:服务代理层。在使用Dubbo中,我们发现无论是服务端或是消费端,都不用像编写socket那样一个暴露并监听地址端口,一个订阅对应的地址端口。而这得益于Proxy通过事先约好的配置规则帮我们生成一层代理。
  • Invoker :一个可执行体,在Dubbo中允许向它发起invoke调用,它可能是执行一个本地的 接口实现,也可能是一个远程的实现,还可能一个集群实现。
  • Protocol :远程调用层。封装RPC调用具体过程,Protocol是Invoker暴露(发布一个服务让别人 可以调用)和引用(引用一个远程服务到本地)的主功能入口,它负责管理Invoker的 整个生命周期。
  • Exporter:用于暴露到注册中心的对象,它的内部属性持有了 Invoker对象,我们可以 认为它在Invoker上包了 一层。
  • Registry:该模块把Exporter注册到注册中心。
private void doExportUrls() {
	// 获取注册中心实例
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
            ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
            ApplicationModel.initProviderModel(pathKey, providerModel);
            // 如果协议有多个(dubbo、rest),会依次暴露
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

doExportUrlsFor1Protocol源码,这里只列举了部分:

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        String name = protocolConfig.getName();
        if (StringUtils.isEmpty(name)) {
            name = DUBBO;
        }

        Map<String, String> map = new HashMap<String, String>();
        map.put(SIDE_KEY, PROVIDER_SIDE);
		// 读取其他信息到map中,方便后边构造url
        appendRuntimeParameters(map);
        appendParameters(map, metrics);
        /**
          各种情况下, 参数的添加, 例如, 如果有指定调用方法, 还需要把方法名等信息添加到map中
          ...
        **/
        // export service
        // 获取地址和端口
        String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
        Integer port = this.findConfigedPorts(protocolConfig, name, map);
        URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
	// 判断是否有在配置里头指定url
        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }

        String scope = url.getParameter(SCOPE_KEY);
        // don't export when none is configured
        if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

            // export to local if the config is not remote (export to remote only when config is remote)
            if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                exportLocal(url); // 本地服务暴露
            }
            // export to remote if the config is not local (export to local only when config is local)
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                if (!isOnlyInJvm() && logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    for (URL registryURL : registryURLs) {
                        //if protocol is only injvm ,not register
                        if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                            continue;
                        }
                        url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                        URL monitorUrl = loadMonitor(registryURL);
                        // 如果监控地址不为空, 还会上报相关信息
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }

                        // For providers, this is used to enable custom proxy to generate invoker
                        String proxy = url.getParameter(PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                        }
  					// 通过动态代理转换成Invoker, registryURL存储的是注册中心地 I址,使用export作为key追加服务元数据信息
                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
 					// 暴露服务、向注册中心注册服务信息|
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
               // 处理没有注册中心的情况, 直接暴露
                    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
                /**
                 * @since 2.7.0
                 * ServiceData Store
                 */
                MetadataReportService metadataReportService = null;
                if ((metadataReportService = getMetadataReportService()) != null) {
                    metadataReportService.publishProvider(url);
                }
            }
        }
        this.urls.add(url);
    }

服务消费者订阅服务

  • 消费者在启动时,需要通过持有远程服务实例生成Invoker,这个Invoker在客户端是核心的远程代理对象。
  • 接着会把Invoker通过动态代理转换成实现用户接口的动态代理引用。此时,这里的Invoker承载了网络连接、服务调用和重试等功能,在客户端,它可能是一个远程的实现,也可能是一个集群实现。

在ReferenceConfig#createProxy方法中:

 private T createProxy(Map<String, String> map) {
       // 检查是否为同一个JVM内部引用
        if (shouldJvmRefer(map)) {
            URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
            invoker = REF_PROTOCOL.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        } else {
            urls.clear(); // reference retry init will add url to urls, lead to OOM
            // 如果指明了地址
            if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (StringUtils.isEmpty(url.getPath())) {
                            url = url.setPath(interfaceName);
                        }
                        if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else { // assemble URL from register center's configuration
                // if protocols not injvm checkRegistry
                if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())){
                    checkRegistry();
                    、
                    List<URL> us = loadRegistries(false);
                    if (CollectionUtils.isNotEmpty(us)) {
                        for (URL u : us) {
                            URL monitorUrl = loadMonitor(u);
                            if (monitorUrl != null) {
                                map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                            } 
                            // URL拼接消费服务的元数据信息, 并添加到集合urls中
                            urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                        }
                    }
                    if (urls.isEmpty()) {
                        throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                    }
                }
            }

            if (urls.size() == 1) {
            // 单注册中心时的处理
                invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
            } else {
            // 多注册中心时的处理
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                // 逐个获取注册中心的服务, 并添加到invokers列表
                    invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                    if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // use last registry url
                    }
                }
                if (registryURL != null) { // registry url is available
                    // use RegistryAwareCluster only when register's CLUSTER is available
                    URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);
                    // The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
                    // 合并多个invoker
                    invoker = CLUSTER.join(new StaticDirectory(u, invokers));
                } else { // not a registry url, must be direct invoke.
                    invoker = CLUSTER.join(new StaticDirectory(invokers));
                }
            }
        }

        if (shouldCheck() && !invoker.isAvailable()) {
            throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
        }
        if (logger.isInfoEnabled()) {
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        }
        /**
         * @since 2.7.0
         * ServiceData Store
         */
        MetadataReportService metadataReportService = null;
        if ((metadataReportService = getMetadataReportService()) != null) {
            URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
            metadataReportService.publishConsumer(consumerURL);
        }
        // create service proxy
        // 把invoker转成接口的代理
        return (T) PROXY_FACTORY.getProxy(invoker);
    }

关于注册中心实例的创建,元数据注册到注册中心及订阅的功能,主要在RegistryProtocol#refe方法中:

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
		// 设置具体的注册协议
        url = URLBuilder.from(url)
                .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
                .removeParameter(REGISTRY_KEY)
                .build();
        // 创建具体的注册中心的实例
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        // 提取消费方refer中保存的元数据信息,如果包含多个分组值则会把调用结果值做合并处理
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
        String group = qs.get(GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        // 处理订阅数据并通过 cluster 合并多个Invoker
        return doRefer(cluster, registry, type, url);
    }

doRefer方法:

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 消费核心关键,持有实际 Invoker 和接收订阅通知, 服务变更会触发这个类回调notify方法,用于重新引用服务
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
            directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
             // 注册消费信息到注册中心
            registry.register(directory.getRegisteredConsumerUrl());
        }
        directory.buildRouterChain(subscribeUrl);
        // 订阅服务提供者、路由和动态配置
        directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
                PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
		// 通过 Cluster 合并 invokers, 同时默认也会启用FailoverCluster策略进行服务调用重试
        Invoker invoker = cluster.join(directory);
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }

(未完,待续…)

上一篇:Dubbo集群容错


下一篇:java命令模式的缺点