dubbo源码分析02:服务引用

一、何时创建服务引用 


  引用官方文档的原话,如果将Dubbo托管在Spring-IOC容器下,Dubbo服务引用的时机有两个,第一个是在Spring容器调用ReferenceBean的afterPropertiesSet方法时引用服务,第二个是在ReferenceBean对应的服务被注入到其他类中时引用。这两个引用服务的时机区别在于,第一个是饿汉式的,第二个是懒汉式的。默认情况下,Dubbo使用懒汉式引用服务。如果需要使用饿汉式,可通过配置 <dubbo:reference> 的init属性开启。下面我们按照Dubbo默认配置进行分析,整个分析过程从ReferenceBean的getObject方法开始。当我们的服务被注入到其他类中时,Spring会第一时间调用getObject方法,并由该方法执行服务引用逻辑。按照惯例,在进行具体工作之前,需先进行配置检查与收集工作。接着根据收集到的信息决定服务用的方式,有三种,第一种是引用本地(JVM)服务,第二是通过直连方式引用远程服务,第三是通过注册中心引用远程服务。不管是哪种引用方式,最后都会得到一个Invoker实例。如果有多个注册中心,多个服务提供者,这个时候会得到一组Invoker实例,此时需要通过集群管理类Cluster将多个Invoker合并成一个实例。合并后的Invoker实例已经具备调用本地或远程服务的能力了,但并不能将此实例暴露给用户使用,这会对用户业务代码造成侵入。此时框架还需要通过代理工厂类(ProxyFactory)为服务接口生成代理类,并让代理类去调用 Invoker逻辑。避免了Dubbo框架代码对业务代码的侵入,同时也让框架更容易使用。   直接从ReferenceConfig开始分析,API的形式使用dubbo时,设置好ReferenceConfig实例的各种参数后,调用get方法获取服务引用实例。另外,需要了解一点dubbo的配置,可以参考官网的“schema配置参考手册”。  

二、服务引用的创建流程


  该类的get方法为获取服务引用方法。该方法比较简单,首先检查销毁标识“destroyed”,如果为true,表示该引用已经被销毁,不应再进行使用。接着检查服务引用是否为空,如果为空,则调用init方法进行初始化,否则返回该服务引用。
public synchronized T get() {
    if (destroyed) {
        throw new IllegalStateException("Already destroyed!");
    }
    if (ref == null) {
        init();
    }
    return ref;
}

  下面来重点看看init方法的源码,追踪一下服务引用是如何创建的,方法比较长,有较长的篇幅都在进行参数的校验、补全,需要点耐心看完。

private void init() {
    //检查初始化标识,防止重复初始化
    if (initialized) {
        return;
    }
    initialized = true;
    
    //检查接口名是否合法
    if (interfaceName == null || interfaceName.length() == 0) {
        throw new IllegalStateException("<dubbo:reference interface=\"\"... not allow null!");
    }
    
    //检查ConsumerConfig变量是否为空(ConsumerConfig为ReferenceConfig提供了某些属性的默认值):
    //(1)如果ConsumerConfig为null,则new一个;
    //(2)调用appendProperties(AbstractConfig config)方法完善ConsumerConfig的配置;
    checkDefault();
    
    //调用appendProperties(AbstractConfig config)方法完善ReferenceConfig的配置,该方法逻辑如下:
    //(1)检查AbstractConfig中每一个setXXX(原始类型)或isXXX(原始类型)的方法,对XXX属性进行配置;
    //(2)按优先级从高到低的顺序,依次从System.getProperty、配置中心、AbstractConfig对应getXXX返回值、
    //dubbo本地配置文件中进行查找XXX的属性值并进行设置;
    appendProperties(this);
    
    //设置成员变量“泛化引用标识”,如果为空则从成员变量ConsumerConfig中获取该标识的值
    if (getGeneric() == null && getConsumer() != null) {
        setGeneric(getConsumer().getGeneric());
    }
    
    //判断泛化标识的值是否为真,做这个判断的原因是因为泛化标识为字符串类型
    if (ProtocolUtils.isGeneric(getGeneric())) {
        //如果为真,则将interfaceClass设置为GenericService
        interfaceClass = GenericService.class;
    } else {
        //如果为假,则通过当前的类加载器加载interfaceName,获取interfaceClass
        try {
            interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader());
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        //(1)校验interfaceClass是否为null、是否为接口类型;
        //(2)如果配置了List<MethodConfig>,需要校验interfaceClass是否有相应的方法
        checkInterfaceAndMethods(interfaceClass, methods);
    }
    
    /****************************** begin ******************************/  
    //下面代码块的作用是尝试从系统属性或配置文件中获取interfaceName的配置,
    //该配置值赋给成员变量String url,用于服务消费方点对点调用服务提供方。    
    String resolve = System.getProperty(interfaceName);
    String resolveFile = null;
    if (resolve == null || resolve.length() == 0) {
        resolveFile = System.getProperty("dubbo.resolve.file");
        if (resolveFile == null || resolveFile.length() == 0) {
            File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
            if (userResolveFile.exists()) {
                resolveFile = userResolveFile.getAbsolutePath();
            }
        }
        if (resolveFile != null && resolveFile.length() > 0) {
            Properties properties = new Properties();
            FileInputStream fis = null;
            try {
                fis = new FileInputStream(new File(resolveFile));
                properties.load(fis);
            } catch (IOException e) {
                throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);
            } finally {
                try {
                    if (null != fis) fis.close();
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
            resolve = properties.getProperty(interfaceName);
        }
    }
    if (resolve != null && resolve.length() > 0) {
        url = resolve;
          //省略了日志打印的代码
    }    
    /****************************** end ******************************/  
    
    /****************************** begin ******************************/  
    //下面代码块的作用是检测ApplicationConfig、ModuleConfig、RegistryConfig、MonitorConfig
    //这几个核心配置是否为空。如果为空,则尝试从其他配置中获取。
    if (consumer != null) {
        if (application == null) {
            application = consumer.getApplication();
        }
        if (module == null) {
            module = consumer.getModule();
        }
        if (registries == null) {
            registries = consumer.getRegistries();
        }
        if (monitor == null) {
            monitor = consumer.getMonitor();
        }
    }
    if (module != null) {
        if (registries == null) {
            registries = module.getRegistries();
        }
        if (monitor == null) {
            monitor = module.getMonitor();
        }
    }
    if (application != null) {
        if (registries == null) {
            registries = application.getRegistries();
        }
        if (monitor == null) {
            monitor = application.getMonitor();
        }
    }
    //类似于checkDefault方法检查ConsumerConfig,该方法检查ApplicationConfig是否为空并完善其各字段值
    checkApplication(); 
    //检查ReferenceConfig的local、stub、mock配置项是否正确
    checkStubAndMock(interfaceClass);
    /****************************** end ******************************/  
    
    /****************************** start ******************************/  
    //下面代码块的作用是收集配置,并将配置存储在一个map中
    Map<String, String> map = new HashMap<String, String>();
    Map<Object, Object> attributes = new HashMap<Object, Object>();
    map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE); //side=consumer
    map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion()); //dubbo=2.6.2
    map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); //timestamp=时间戳
    if (ConfigUtils.getPid() > 0) {
        map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); //pid=进程pid
    }
    //判断是否是泛化引用
    if (!isGeneric()) {
        //非泛化引用,设置revision=interfaceClass的jar版本号
        String revision = Version.getVersion(interfaceClass, version);
        if (revision != null && revision.length() > 0) {
            map.put("revision", revision);
        }
        //设置接口方法,methods=xxx1,xxx2,...
        String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
        if (methods.length == 0) {
            logger.warn("NO method found in service interface " + interfaceClass.getName());
            map.put("methods", Constants.ANY_VALUE);
        } else {
            map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
        }
    }
    map.put(Constants.INTERFACE_KEY, interfaceName); //interface=interfaceName
    //将ApplicationConfig、ModuleConfig、ConsumerConfig、ReferenceConfig的值设置到map中,
    //(1)获取方法名为getXXX或者isXXX、public、无方法参数、返回值为原始值的非getClass方法;
    //(2)获取(1)方法上的@Paramter注解,根据该注解的excluded、escaped、append属性判断该属性值
    //是否需要被忽略、是否需要URLEncoded、是否需要以追加的形式设置入map(","作为追加值分隔符);
    //(3)获取方法名为getParameters、public、无方法参数、返回值为Map的方法;
    //(4)将(3)中的方法返回值Map的key-value键值对做key处理之后(添加前缀、"-"变"."),设置入map;
    appendParameters(map, application);
    appendParameters(map, module);
    appendParameters(map, consumer, Constants.DEFAULT_KEY);
    appendParameters(map, this);
    /****************************** end ******************************/  

    /****************************** start ******************************/  
    //下面代码块的作用是处理MethodConfig 实例。该实例包含了事件通知配置如onreturn、onthrow、oninvoke等。
    //由于一般不会使用到MethodConfig配置,我们先暂时忽略这个配置的代码
    String prefix = StringUtils.getServiceKey(map);
    if (methods != null && !methods.isEmpty()) {
        for (MethodConfig method : methods) {
            appendParameters(map, method, method.getName());
            String retryKey = method.getName() + ".retry";
            if (map.containsKey(retryKey)) {
                String retryValue = map.remove(retryKey);
                if ("false".equals(retryValue)) {
                    map.put(method.getName() + ".retries", "0");
                }
            }
            appendAttributes(attributes, method, prefix + "." + method.getName());
            checkAndConvertImplicitConfig(method, map, attributes);
        }
    }
    /****************************** end ******************************/  

    /****************************** start ******************************/  
    //下面代码块的作用是设置服务消费者的IP并存储到map中
    String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
    if (hostToRegistry == null || hostToRegistry.length() == 0) {
        hostToRegistry = NetUtils.getLocalHost();
    } else if (isInvalidLocalHost(hostToRegistry)) {
        throw new IllegalArgumentException("Specified invalid registry ip from property ... value: ...");
    }
    map.put(Constants.REGISTER_IP_KEY, hostToRegistry); //register.ip=实际的IP地址
    /****************************** end ******************************/  

    //将attributes存入静态上下文
    StaticContext.getSystemContext().putAll(attributes);
    
    //根据map创建服务应用代理,下一小节将对该方法进行详细说明
    ref = createProxy(map);
    
    //将服务接口名、ReferenceConfig、服务引用实例、服务接口方法包装成ConsumerModel并存入ApplicationModel
    ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
    ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}

 

三、创建代理对象


  本小节对createProxy方法重点讲解,它是初始化Reference中最重要的方法:
private T createProxy(Map<String, String> map) {
    //构建临时的URL,构造方法参数依次为protocol、host、port、parameter
    URL tmpUrl = new URL("temp", "localhost", 0, map);
    
     //判断是否是JVM内的引用
    final boolean isJvmRefer; 
    if (isInjvm() == null) {       
        if (url != null && url.length() > 0) { 
            //点对点直连参数"url"有值,则不为JVM内引用
            isJvmRefer = false;
        } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
            //调用InjvmProtocol的isInjvmRefer方法,判断是否是JVM内引用
            isJvmRefer = true;
        } else {
            //默认不是JVM内引用
            isJvmRefer = false;
        }
    } else {
        //获取injvm配置值
        isJvmRefer = isInjvm().booleanValue();
    }
    
    /******************************* injvm调用 *******************************/
    if (isJvmRefer) {
        //如果是JVM内的引用,则创建JVM调用的上下文URL,protocol=injvm,host=127.0.0.1,
        //port=0,path=interfaceClass.getName(),并且将参数map设置入URL的parameters中
        URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);        
        //refprotocol为SPI接口Protocol的自适应扩展类,且refer方法有@Adaptive注解,
        //运用SPI机制源码分析中的知识,Protocol接口的自适应扩展类的refer代码,
        //会通过调用URL类型参数的getProtocol方法得到实际应该获取到的扩展类name,即injvm。
        //在源码的dubbo-rpc-injvm模块下,找到protocol的配置文件,
        //其中配置了injvm的扩展类为org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol。
        //那么这里获取到的invoker即为InjvmProtocol.refer的返回结果,即InjvmInvoker
        invoker = refprotocol.refer(interfaceClass, url);        
    } else {
        /******************************* 点对点直连调用 *******************************/   
        //如果成员变量url不为空,表示要做直连调用。url是一个String,维护服务提供者地址
        if (url != null && url.length() > 0) { 
            //使用";"切分url字符串,表示如果想传入多个地址,使用";"分割即可
            String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
            //进行遍历
            if (us != null && us.length > 0) {
                for (String u : us) {
                    //解析每个地址字符串,将其转换为URL对象。地址字符串支持协议头、验证用户密码、IP、PORT、
                    //等其他调用参数(如protocol)
                    URL url = URL.valueOf(u);
                    //如果地址字符串中未包含服务路径,则进行补全,即接口的全限定名
                    if (url.getPath() == null || url.getPath().length() == 0) {
                        url = url.setPath(interfaceName);
                    }
                    //检测url协议是否为registry,若是,表明用户想使用指定的注册中心
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        //将map转换为查询字符串,并作为refer参数的值添加到url的map字段中,
                        //注意:这里通过字符串生成的url还没维护该方法的传入参数map。
                        urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, 
                                                            StringUtils.toQueryString(map)));
                    } else {
                        //合并用户通过直连地址字符串配置的调用参数与其他途径配置的调用参数:
                        //(1)移除服务提供者的一些配置(这些配置来源于用户配置的url属性),如线程池相关配置;
                        //(2)保留服务提供者的部分配置,比如版本,group,时间戳等;
                        //(3)最后将合并后的配置设置为url查询字符串中。
                        urls.add(ClusterUtils.mergeUrl(url, map));
                    }
                }
            }        
        } else {  
            /******************************* 通过注册中心调用 *******************************/
            //加载注册中心配置List<Registry>,转换为List<URL>
            List<URL> us = loadRegistries(false);
            //遍历注册中心URL
            if (us != null && !us.isEmpty()) {
                for (URL u : us) {
                    //获取监控URL
                    URL monitorUrl = loadMonitor(u);
                    if (monitorUrl != null) {
                        map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                    }
                    //(1)将本方法的请求参数map转换为查询字符串形式"key1=value1&key2=value2...";
                    //(2)以refer作为key,(1)的结果encoded作为value,存入注册中心URL的paramters参数中;
                    //(3)将注册中心URL存入ReferenceConfig的全局变量List<URL> urls中;
                    urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                }
            }
            if (urls == null || urls.isEmpty()) {
                throw new IllegalStateException("No such any registry to reference ...");
            }
        }
              
        //如果成员变量List<URL> urls大小为1,则直接通过Protocol自适应拓展类构建Invoker实例接口。
        //该自适应扩展类由字节码技术生成,其refer方法具有@Adaptive注解,根据SPI机制的源码知识,
        //refer方法按照参数URL中getProtocol的值查找实际的扩展类实例,如果getProtocol没有值,
        //则取Protocol接口的@SPI注解value值"dubbo"作为name查找扩展类实例。一般来说,如果通过注册
        //中心进行调用,则getProtocol获取到的值为registry,对应RegistryProtocol这个扩展类;而如果
        //直连调用,getProtocol为空或者是指定的协议(一般为dubbo协议),对应扩展类DubboProtocol。
        if (urls.size() == 1) {
            invoker = refprotocol.refer(interfaceClass, urls.get(0));
        } else {
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;
            for (URL url : urls) {
                invokers.add(refprotocol.refer(interfaceClass, url));
                if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                    registryURL = url; // use last registry url
                }
            }
            if (registryURL != null) { // registry url is available
                // use AvailableCluster only when register's cluster is available
                URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                invoker = cluster.join(new StaticDirectory(u, invokers));
            } else { // not a registry url
                invoker = cluster.join(new StaticDirectory(invokers));
            }
        }
    }

    //校验标识
    Boolean c = check;
    if (c == null && consumer != null) {
        c = consumer.isCheck();
    }
    if (c == null) {
        c = true; // default true
    }
    //检查invoker是否可用,最终调用的是Curator客户端的getZookeeperClient().isConnected()方法
    if (c && !invoker.isAvailable()) {
        throw new IllegalStateException("Failed to check the status of the service ...");
    }
    //...省略日志打印代码
    
    //创建代理对象,proxyFactory是SPI接口ProxyFactory的自适应扩展类,通过ProxyFactory的定义可知,
    //在未设置URL的proxy属性时,获取到默认的扩展类JavassistProxyFactory,但是ProxyFactory接口拥有
    //一个包装扩展类StubProxyFactoryWrapper,因此实际获取到的是StubProxyFactoryWrapper实例,并调用
    //它的getProxy方法
    return (T) proxyFactory.getProxy(invoker);
}

 

3.2.注册中心协议:RegistryProtocol


  该Protocol的扩展实现类,根据承载了注册中心信息的URL以及服务接口类型创建服务引用Invoker,创建方法为refer。
/**
 * 获取服务引用的I
 */
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    //从url的parameters字段中获取key=registry即注册中心的协议头,如zookeeper,
    //在将其设置为protocol,之后移除paramters字段中registry这个key
    url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
    
    //在获取到RegistryProtocol这个扩展类实例时,dubbo SPI机制会自动装配它的RegistryFactory字段,
    //RegistryFactory是一个SPI接口,则实际装配的是RegistryFactory的自适应扩展类。
    //另外,RegistryFactory的getRegistry方法被@Adaptive注解,且注解的value值为"protocol",
    //因此RegistryFactory自定义扩展类会调用方法参数URL的getProtocol,以其返回值作为实际扩展类的name。
    //一般我们使用的注册中心为zookeeper,那么最终会调用到ZookeeperRegistryFactory的getRegistry方法。
    Registry registry = registryFactory.getRegistry(url);
    
    //如果要获取的服务引用为RegistryService,直接调用proxyFactory的getInvoker方法获取Invoker
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }

    //将url中paremeters的key=refer的value查询字符串重新转为Map
    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
    //获取group配置
    String group = qs.get(Constants.GROUP_KEY);
    //若group不为空,并且有多个分组
    if (group != null && group.length() > 0) {
        if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
            //通过SPI加载MergeableCluster实例,并调用doRefer继续执行获取服务引用逻辑
            return doRefer(getMergeableCluster(), registry, type, url);
        }
    }
    //单group或无group,使用RegistryProtocol默认装配的Cluster自适应扩展类调用doRefer方法
    return doRefer(cluster, registry, type, url);
}

/**
 * 创建Invoker对象
 */
 private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
     //创建RegistryDirectory服务目录,服务目录相关内容参考下一节,注意每次走到doRefer都会new。
     //维度是某个服务的(type)的在某个注册中心(URL)的服务目录
     RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
     directory.setRegistry(registry); //添加注册中心属性
     directory.setProtocol(protocol); //添加协议自适应扩展类
     
     //生成服务消费者URL,protocol=consumer,ip=register.ip的值,port=0,path=type的全限定名,格式:
     //consumer://192.168.54.1/com.alibaba.dubbo.rpc.service.GenericService?
     //application=monitor-app&check=false&dubbo=2.6.2&generic=true&
     //interface=com.bestpay.monitor.app.AlarmTestService&pid=6332&
     //protocol=dubbo&retries=-1&side=consumer&timeout=10000&timestamp=1561427241838
     Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());      
     URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
     
     //使用服务消费者URL进行注册
     if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) {
         //调用Registry的register方法,一般为ZookeeperRegistry,它调用FailRegistry的register方法(见下)
         registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, 
                 Constants.CONSUMERS_CATEGORY,Constants.CHECK_KEY, String.valueOf(false)));
     }
     
     //向subscribeUrl的parameter参数添加键值对"category" -> "providers,configurators,routers"
     //订阅服务提供方的zkNode下的providers、configurators、routers等节点数据
     //(1)将RegistryDirectory中的consumerUrl设置为subscribeUrl;
     //(2)调用Registry的subscribe方法,该方法:
     //(2.1)调用父类AbstractRegistry方法,向成员变量subscribed设置值;
     //(2.2)移除failedSubscribed、failedUnsubscribed、failedNotified该subscribeUrl相关数据
     //(3)如果订阅失败,则尝试从ZookeeperRegistry初始化时从缓存文件读取到的数据中获取到URLs,
     //且如果URLs不为空,则向它们发送订阅失败的通知;如果为空,且check=true,则直接抛出异常;
     //否则将url加入failedSubscribed
     directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY
                + "," + Constants.ROUTERS_CATEGORY));

     //调用SPI接口Cluster的自适应扩展类的join,根据Cluster定义可以知道自适应扩展类
     //应该获取一个FailoverCluster实例,但是MockClusterWrapper是Cluster扩展类中的包装类,
     //因此FailoverCluster会被包装起来返回,最终自适应扩展类获取到MockClusterWrapper实例。
     //调用扩展类MockClusterWrapper的join方法,该方法创建了一个MockClusterInvoker实例,
     //并维护了directory以及一个FailoverClusterInvoker。
     Invoker invoker = cluster.join(directory);
     
     //将服务引用invoker、registryUrl、consumerUrl、服务目录directory包装成ConsumerInvokerWrapper,
     //然后以serviceUniqueName = consumerUrl.getServiceKey()做为key,存入
     //在ProviderConsumerRegTable中的static变量ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>>
     ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
     return invoker;
 }

 

3.3.zookeeper注册中心工厂:ZookeeperRegistryFactory


  注册中心协议RegistryProtocol在判定注册中心的协议为zookeeper后,会调用ZookeeperRegistryFactory的getRegistry方法来创建一个Zookeeper注册中心对象ZookeeperRegistry。该Factory缓存了所有zookeeper注册中心的实例。
//dubbo的SPI机制决定了Dubbo运行过程中同一个扩展类实例只有一个,ZookeeperRegistryFactory中具有一个注册中心的缓
//存,key为"zookeeper://172.17.45.14:2181/com.alibaba.dubbo.registry.RegistryService",即如下格式
//"protocol://username:password@ip:port/com.alibaba.dubbo.registry.RegistryService"。更多格式参考
//URL类的toServiceString方法
private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap<String, Registry>();

//实例化扩展类ZookeeperRegistryFactory时,会通过SPI的注入ZookeeperTransporter的自适应扩展类
private ZookeeperTransporter zookeeperTransporter;

/**
 * 获取一个注册中心封装对象
 */
public Registry getRegistry(URL url) {
    //克隆url,并将Path设置为RegistryService的全限定名;
    //在URL的parameters参数中添加interface=RegistryService的全限定名键值对;
    //在URL的parameters参数中移除key=export、key=refer;
    url = url.setPath(RegistryService.class.getName())
        .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
        .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
    
    //获取注册中心Registry的缓存key
    String key = url.toServiceString();
    LOCK.lock();
    try {
        //从缓存中尝试获取
        Registry registry = REGISTRIES.get(key);
        if (registry != null) {
            return registry;
        }
        //如果没有获取,则调用创建方法创建缓存
        registry = createRegistry(url);
        if (registry == null) {
            throw new IllegalStateException("Can not create registry " + url);
        }
        //存入缓存
        REGISTRIES.put(key, registry);
        return registry;
    } finally {
        LOCK.unlock();
    }
}

/**
 * 创建缓存,直接new一个ZookeeperRegistry对象
 */
public Registry createRegistry(URL url) {
    return new ZookeeperRegistry(url, zookeeperTransporter);
}

 

3.4.zookeeper注册中心:ZookeeperRegistry


3.4.1.构造方法  


        ZookeeperRegistry为dubbo封装的基于zookeeper的注册中心,它并不为一个SPI接口,对于每个注册中心都是通过new出来的实例,下面研究构造方法。
/**
 * ZookeeperRegistry的构造方法
 */
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    //调用父类FailbackRegistry的构造方法,方法见下
    super(url);
    //判断url中的host字段是否为"0.0.0.0"或者为表示anyhost的true
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    //获取url中key为group的参数值,如果未获取到,则给默认值"dubbo"
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    //判断group是否以"/"开头,如果不是,则为其添加"/"前缀
    if (!group.startsWith(Constants.PATH_SEPARATOR)) {
        group = Constants.PATH_SEPARATOR + group;
    }  
    this.root = group;
    
    //调用SPI接口ZookeeperTransporter的自适应扩展类,该扩展类为dubbo通过字节码技术生成。
    //connect方法被@Adaptive注解修饰,并且要求从key为client以及transporter中取值,如果
    //这两个参数没有值,则扩展类name取@SPI注解的value值"curator",一般来说都是这个值。
    //那么自适应扩展类最终会调用CuratorZookeeperTransporter类的connect方法获取Zk客户端。
    zkClient = zookeeperTransporter.connect(url);
    
    //向获取到的zkClient设置监听器
    zkClient.addStateListener(new StateListener() {
        @Override
        public void stateChanged(int state) {
            if (state == RECONNECTED) {
                try {
                    //如果是RECONNECTED状态,则进行恢复
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    });
}

   FailbackRegistry为ZookeeperRegistry的父类,定义了注册中心的重试逻辑:

//定时任务线程池,用于周期性的执行重试任务
private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));

//定时任务线程执行结果
private final ScheduledFuture<?> retryFuture;

//需要重试的注册失败的URL
private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();

//需要重试的注销失败的URL
private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();

//需要重试的订阅失败的URL
private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();

//需要重试的解除订阅失败的URL
private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();

//需要重试的通知失败的URL
private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();

/**
 * FailbackRegistry构造函数
 */
public FailbackRegistry(URL url) {
    //调用父类AbstractRegistry的构造方法,方法见下
    super(url);
    //获取重试周期,默认5000毫秒
    int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
    //创建一个定时线程池,以retryPeriod为周期定时调用retry方法
    this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {
                //尝试注册失败、解注册失败、订阅失败、解订阅失败、通知失败的列表
                retry();
            } catch (Throwable t) { // Defensive fault tolerance
                logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
            }
        }
    }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}

   AbstractRegistry为FailbackRegistry的父类,其定义了从缓存文件加载配置、缓存配置到文件、注册、解注册、订阅、解除订阅等Registry的主要功能。

//注册URL列表
private final Set<URL> registered = new ConcurrentHashSet<URL>();

//订阅URL列表
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();

//通知URL列表
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();

/**
 * AbstractRegistry构造函数
 */
public AbstractRegistry(URL url) {
    //设置成员变量registryUrl=url
    setUrl(url);
       //获取文件的同步保存标识
    syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
     //获取缓存文件名,格式为"C:\Users\chenjunyi/.dubbo/dubbo-registry-monitor-app-172.17.45.14:2181.cache"
    String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
    File file = null;
    if (ConfigUtils.isNotEmpty(filename)) {
        file = new File(filename);
        if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
            if (!file.getParentFile().mkdirs()) {
                throw new IllegalArgumentException("Invalid registry store file ...");
            }
        }
    }
    this.file = file;
    //读取缓存的配置文件,将读取结果存到Properties properties成员属性中
    loadProperties();
    //拆分注册中心URL中的address,将其backup的地址与主要地址拆成List<URL>,然后通知其监听器
    notify(url.getBackupUrls());
}

 

3.4.2.register方法


  ZookeeperRegistry的register方法继承自父类FailbackRegistry,它将服务消费/提供者的URL注册到注册中心上,并未重写该方法:
/**
 * 继承自父类FailbackRegistry的register方法,用于注册服务引用(消费者)的URL
 */
public void register(URL url) {
    //调用父类AbstractRegistry的register方法
    super.register(url);
    failedRegistered.remove(url); //从注册失败列表中移除该url    
    failedUnregistered.remove(url); //从注销失败列表中移除该url  
    try {
        //勾起实际的注册方法
        doRegister(url);
    } catch (Exception e) {
        //如果抛出异常
        Throwable t = e;
        //判断check标识,从ZookeeperRegistry的registerUrl,即注册中心URL的paramters获取key=check的值,
        //从消费者URL,即url的parameters获取key=check的值,以及获取url的protocol。
        //计算flag check的布尔值
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
            && url.getParameter(Constants.CHECK_KEY, true)
            && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
        //判断异常类型是否为SkipFailbackWrapperException
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        //如果都不应该跳过异常,则抛出异常,否则仅仅是打印异常
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        //对于注册失败的消费者URL,添加到注册失败列表中Set<URL> failedRegistered
        failedRegistered.add(url);
    }
}

/**
 * 由ZookeeperRegistry实现的doRegister方法
 */
protected void doRegister(URL url) {
    try {
        //调用CuratorZookeeperClient创建节点,以泛化调用为例,传入的参数URL格式为
        //consumer://192.168.54.1/com.alibaba.dubbo.rpc.service.GenericService?
        //application=monitor-app&category=consumers&check=false&dubbo=2.6.2&
        //generic=true&interface=com.bestpay.monitor.app.AlarmTestService&pid=19616&
        //protocol=dubbo&retries=-1&side=consumer&timeout=10000&timestamp=1561429026207
        //可以看出,它的path路径GenericService不一定等于interface
        //(1)toUrlPath(url)获取需要创建节点路径,以消费者为例,其格式为
        //"/dubbo/com.bestpay.monitor.app.AlarmTestService/consumers/consumer%3A%2F%2F192.168.54.1%2F
        //com.alibaba.dubbo.rpc.service.GenericService%3Fapplication%3Dmonitor-app%26..."
        //可以看出,真正的创建节点路径是interface接口作为path的路径
        //(2)url.getParameter(Constants.DYNAMIC_KEY, true)决定是否创建临时节点,true-临时节点。
        //而CuratorZookeeperClient内部的create逻辑为:
        //(1)截取示例中的"/dubbo/com.bestpay.monitor.app.AlarmTestService/consumers"作为
        //zkNode的父路径并一级级创建父节点consumers,父节点都为永久节点;
        //(2)根据第二个参数的值决定创建的每个消费者节点(即截取父路径后遗留的字符串)是否为临时节点,
        //true-临时节点;false-永久节点;        
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

 

3.4.3.subscribe方法


  ZookeeperRegistry的subscribe方法继承自父类FailbackRegistry,它订阅服务提供者ZK节点下的routers节点、configurators节点以及providers节点,并未重写该方法。该方法在实际的调用过程中,被RegistryDirectory的subscribe勾起:
/**
 * RegistryDirectory的subscribe方法
 */
public void subscribe(URL url) {
    setConsumerUrl(url); //简单的设置RegistryDirectory成员变量consumerUrl
    //勾起成员变量Registry的subscribe方法,并将自身作为NotifyListener传入
    registry.subscribe(url, this); 
}

   调起继承自FailbackRegistry的subscribe方法:

/**
 * 继承自FailbackRegistry的subscribe方法
 */ 
public void subscribe(URL url, NotifyListener listener) {
    //调用父类的订阅方法,
    super.subscribe(url, listener);
    //从failedSubscribed、failedUnsubscribed、failedNotified列表中移除该url对应的listener
    removeFailedSubscribed(url, listener);
    try {
        //调用由ZookeeperRegistry实现的doSubscribe方法
        doSubscribe(url, listener);
    } catch (Exception e) {
        //如果doSubscribe执行抛出异常
        Throwable t = e;
        //从加载的文件缓存Properties中获取url.getServiceKey对应的缓存
        List<URL> urls = getCacheUrls(url);
        if (urls != null && !urls.isEmpty()) {
            //如果读取到的缓存不为空,则对该url进行通知,通知的内容是文件缓存的内容
            notify(url, listener, urls);
            logger.error("Failed to subscribe ... Using cached list: ...");
        } else {
            //获取check标识,getUrl方法获取的是ZookeeperRegistry维护的registryUrl,
            //而url指的是服务消费者的URL,从它们的parameters字段获取check这个key的value;
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true);
            //判断异常类型
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            //决定订阅失败的处理是继续抛出异常还是打印错误日志
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to subscribe ... cause: ");
            } else {
                logger.error("Failed to subscribe url ... waiting for retry ...");
            }
        }
        //向成员变量 ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed添加订阅失败的数据
        addFailedSubscribed(url, listener);
    }
}

/**
 * 由ZookeeperRegistry实现的doSubscribe方法
 */
protected void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            //如果方法参数URL的paramters中key=interface为*,暂时先不讨论
            String root = toRootPath();
            ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
            if (listeners == null) {
                zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                listeners = zkListeners.get(url);
            }
            ChildListener zkListener = listeners.get(listener);
            if (zkListener == null) {
                listeners.putIfAbsent(listener, new ChildListener() {
                    @Override
                    public void childChanged(String parentPath, List<String> currentChilds) {
                        for (String child : currentChilds) {
                            child = URL.decode(child);
                            if (!anyServices.contains(child)) {
                                anyServices.add(child);
                                subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
                                                                           Constants.CHECK_KEY, String.valueOf(false)), listener);
                            }
                        }
                    }
                });
                zkListener = listeners.get(listener);
            }
            zkClient.create(root, false);
            List<String> services = zkClient.addChildListener(root, zkListener);
            if (services != null && !services.isEmpty()) {
                for (String service : services) {
                    service = URL.decode(service);
                    anyServices.add(service);
                    subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                                                                 Constants.CHECK_KEY, String.valueOf(false)), listener);
                }
            }
        } else {
            //方法参数URL的paramters中key=interface不为*
            List<URL> urls = new ArrayList<URL>();
            //通过toCategoriesPath方法生成要订阅的ZK节点路径,以interface=AlarmTestService为例:
            //(1)/dubbo/com.bestpay.monitor.app.AlarmTestService/providers;
            //(2)/dubbo/com.bestpay.monitor.app.AlarmTestService/configurators;
            //(3)/dubbo/com.bestpay.monitor.app.AlarmTestService/routers;
            //然后遍历这3个路径
            for (String path : toCategoriesPath(url)) {
                //获取这个ZookeeperRegistry中的服务消费者URL对应的监听器Map
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                    listeners = zkListeners.get(url);
                }
                //获取NotifyListener,即RegistryDirectory对应的ChildListener。
                //一般来说RegistryDirectory与注册中心Registry和服务引用接口(如GenericService)绑定。
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    //如果没有ChildListener,则创建一个并设置到listeners这个map中
                    listeners.putIfAbsent(listener, new ChildListener() {
                        @Override
                        public void childChanged(String parentPath, List<String> currentChilds) {
                            ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                //创建"XXXX/XXX/providers、configurators、providers"永久节点
                zkClient.create(path, false);
                //向Dubbo封装的ZkClient添加ChildListener
                List<String> children = zkClient.addChildListener(path, zkListener);
                if (children != null) {
                    urls.addAll(toUrlsWithEmpty(url, path, children));
                }
            }
            notify(url, listener, urls);
        }
    } catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

   类似于register方法,FallbackRegistry首先调用父类AbstractRegistry的subscribe,在经过一系列的校验之后,向成员变量ConcurrentMap<URL, Set<NotifyListener>> subscribed添加服务引用(即消费者)的URL和监听器。

/**
 * AbstractRegistry的subscribe方法
 */
public void subscribe(URL url, NotifyListener listener) {
    if (url == null) {
        throw new IllegalArgumentException("subscribe url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("subscribe listener == null");
    }
    if (logger.isInfoEnabled()) {
        logger.info("Subscribe: " + url);
    }
    Set<NotifyListener> listeners = subscribed.get(url);
    if (listeners == null) {
        subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
        listeners = subscribed.get(url);
    }
    listeners.add(listener);
}

 

3.4.4.恢复与重试


  为了保障连接能够重试与恢复,在ZookeeperRegistry中为CuratorZookeeperClient设置了状态监听器,对于Curator通知的连接RECONNECTED事件,会勾起一个recover方法;在FailbackRegistry的构造方法中,同时设置了一个定时线程,用于调用retry方法,该方法捞取注册失败、解注册失败、订阅失败、解订阅失败、通知失败的URL列表并重试。下面来看看这两个方法是如何工作的:
/**
 * 用于根据Curator客户端推送的连接状态,RECONNECTED进行恢复
 */
protected void recover() throws Exception {
    //获取恢复的注册中心的地址URL列表Set<URL>,getRegistered()获取成员变量Set<URL>
    Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
    if (!recoverRegistered.isEmpty()) {
        if (logger.isInfoEnabled()) {
            logger.info("Recover register url " + recoverRegistered);
        }
        //将getRegistered()添加到成员变量Set<URL> failedRegistered中
        for (URL url : recoverRegistered) {
            failedRegistered.add(url);
        }
    }
    
    //获取恢复的订阅列表Map<URL, Set<NotifyListener>>,getSubscribed()获取成员变量subscribed
    Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>
                                                                            (getSubscribed());
    if (!recoverSubscribed.isEmpty()) {
        if (logger.isInfoEnabled()) {
            logger.info("Recover subscribe url " + recoverSubscribed.keySet());
        }
        for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
            URL url = entry.getKey();
            //添加到成员变量failedSubscribed中
            for (NotifyListener listener : entry.getValue()) {
                addFailedSubscribed(url, listener);
            }
        }
    }
}

/**
 * 对于成员变量failedRegistered、failedUnregistered、failedSubscribed、
 * failedUnsubscribed、failedNotified进行重试,可以看到,如果重试成功则将
 * 其移出相应的重试列表,如果重试失败,则忽略异常等待下次重试
 */
protected void retry() {
    if (!failedRegistered.isEmpty()) {
        Set<URL> failed = new HashSet<URL>(failedRegistered);
        if (failed.size() > 0) {
            if (logger.isInfoEnabled()) {
                logger.info("Retry register " + failed);
            }
            try {
                for (URL url : failed) {
                    try {
                        doRegister(url);
                        failedRegistered.remove(url);
                    } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                        logger.warn("Failed to retry register ... waiting for again, cause: ...");
                    }
                }
            } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                logger.warn("Failed to retry register ... waiting for again, cause: ...");
            }
        }
    }
    if (!failedUnregistered.isEmpty()) {
        Set<URL> failed = new HashSet<URL>(failedUnregistered);
        if (!failed.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Retry unregister " + failed);
            }
            try {
                for (URL url : failed) {
                    try {
                        doUnregister(url);
                        failedUnregistered.remove(url);
                    } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                        logger.warn("Failed to retry unregister ... waiting for again, cause: ...");
                    }
                }
            } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                logger.warn("Failed to retry unregister ... waiting for again, cause: ");
            }
        }
    }
    if (!failedSubscribed.isEmpty()) {
        Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedSubscribed);
        for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>
                                                         (failed).entrySet()) {
            if (entry.getValue() == null || entry.getValue().size() == 0) {
                failed.remove(entry.getKey());
            }
        }
        if (failed.size() > 0) {
            if (logger.isInfoEnabled()) {
                logger.info("Retry subscribe " + failed);
            }
            try {
                for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
                    URL url = entry.getKey();
                    Set<NotifyListener> listeners = entry.getValue();
                    for (NotifyListener listener : listeners) {
                        try {
                            doSubscribe(url, listener);
                            listeners.remove(listener);
                        } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                            logger.warn("Failed to retry subscribe ... waiting for again, cause: ...");
                        }
                    }
                }
            } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                logger.warn("Failed to retry subscribe ... waiting for again, cause: ");
            }
        }
    }
    if (!failedUnsubscribed.isEmpty()) {
        Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>
                                                                (failedUnsubscribed);
        for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>
                                                         (failed).entrySet()) {
            if (entry.getValue() == null || entry.getValue().isEmpty()) {
                failed.remove(entry.getKey());
            }
        }
        if (failed.size() > 0) {
            if (logger.isInfoEnabled()) {
                logger.info("Retry unsubscribe " + failed);
            }
            try {
                for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
                    URL url = entry.getKey();
                    Set<NotifyListener> listeners = entry.getValue();
                    for (NotifyListener listener : listeners) {
                        try {
                            doUnsubscribe(url, listener);
                            listeners.remove(listener);
                        } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                            logger.warn("Failed to retry unsubscribe ... waiting for again, cause: ..");
                        }
                    }
                }
            } catch (Throwable t) { 
                logger.warn("Failed to retry unsubscribe ... waiting for again, cause: ...");
            }
        }
    }
    if (!failedNotified.isEmpty()) {
        Map<URL, Map<NotifyListener, List<URL>>> failed = new HashMap<URL, Map<NotifyListener, 
                                                    List<URL>>>(failedNotified);
        for (Map.Entry<URL, Map<NotifyListener, List<URL>>> entry : new HashMap<URL, Map<NotifyListener, 
                                                     List<URL>>>(failed).entrySet()) {
            if (entry.getValue() == null || entry.getValue().size() == 0) {
                failed.remove(entry.getKey());
            }
        }
        if (failed.size() > 0) {
            if (logger.isInfoEnabled()) {
                logger.info("Retry notify " + failed);
            }
            try {
                for (Map<NotifyListener, List<URL>> values : failed.values()) {
                    for (Map.Entry<NotifyListener, List<URL>> entry : values.entrySet()) {
                        try {
                            NotifyListener listener = entry.getKey();
                            List<URL> urls = entry.getValue();
                            listener.notify(urls);
                            values.remove(listener);
                        } catch (Throwable t) { 
                            logger.warn("Failed to retry notify ... waiting for again, cause: ...");
                        }
                    }
                }
            } catch (Throwable t) { 
                logger.warn("Failed to retry notify ... waiting for again, cause: ...");
            }
        }
    }
}

 

3.5.zookeeper封装客户端:CuratorZookeeperClient


  CuratorZookeeperTransporter的connect方法直接new一个Dubbo封装的CuratorZookeeperClient。
public ZookeeperClient connect(URL url) {
    return new CuratorZookeeperClient(url);
}

   而CuratorZookeeperClient是dubbo通过Curator封装出来的zookeeper客户端。它的构造函数通过Curator框架创建一个client,并且向该client添加一个连接状态的监听器。当有连接状态改变时,会向CuratorZookeeperClient维护的StateListener调用stateChanged方法,传入获取到的状态。

 public CuratorZookeeperClient(URL url) {
     super(url);
     try {
         CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
             .connectString(url.getBackupAddress())
             .retryPolicy(new RetryNTimes(1, 1000))
             .connectionTimeoutMs(5000);
         String authority = url.getAuthority();
         if (authority != null && authority.length() > 0) {
             builder = builder.authorization("digest", authority.getBytes());
         }
         client = builder.build();
         client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
             @Override
             public void stateChanged(CuratorFramework client, ConnectionState state) {
                 //回调自身维护的监听器List<StateListener>
                 if (state == ConnectionState.LOST) {
                     CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
                 } else if (state == ConnectionState.CONNECTED) {
                     CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
                 } else if (state == ConnectionState.RECONNECTED) {
                     CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
                 }
             }
         });
         client.start();
     } catch (Exception e) {
         throw new IllegalStateException(e.getMessage(), e);
     }
 }

 

3.6.StubProxyFactoryWrapper


  看看getProxy方法:
/** 
 * StubProxyFactoryWrapper的getProxy方法
 */
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
    //它首先调用包装的JavassistProxyFactory的getProxy方法
    T proxy = proxyFactory.getProxy(invoker);
    if (GenericService.class != invoker.getInterface()) {
        //如果设置了本地代理类,则将获取到的Proxy包装为代理类对象
        String stub = invoker.getUrl().getParameter(
            Constants.STUB_KEY, invoker.getUrl().getParameter(Constants.LOCAL_KEY));
        if (ConfigUtils.isNotEmpty(stub)) {
            Class<?> serviceType = invoker.getInterface();
            if (ConfigUtils.isDefault(stub)) {
                if (invoker.getUrl().hasParameter(Constants.STUB_KEY)) {
                    stub = serviceType.getName() + "Stub";
                } else {
                    stub = serviceType.getName() + "Local";
                }
            }
            try {
                Class<?> stubClass = ReflectUtils.forName(stub);
                if (!serviceType.isAssignableFrom(stubClass)) {
                    throw new IllegalStateException("The stub implementation class ...");
                }
                try {
                    Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType);
                    proxy = (T) constructor.newInstance(new Object[]{proxy});
                    //export stub service
                    URL url = invoker.getUrl();
                    if (url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT)) {
                        url = url.addParameter(Constants.STUB_EVENT_METHODS_KEY, StringUtils.join(
                            Wrapper.getWrapper(proxy.getClass()).getDeclaredMethodNames(), ","));
                        url = url.addParameter(Constants.IS_SERVER_KEY, Boolean.FALSE.toString());
                        try {
                            export(proxy, (Class) invoker.getInterface(), url);
                        } catch (Exception e) {
                            LOGGER.error("export a stub service error.", e);
                        }
                    }
                } catch (NoSuchMethodException e) {
                    throw new IllegalStateException("No such constructor \"public ...");
                }
            } catch (Throwable t) {
                LOGGER.error("Failed to create stub implementation class ...");
            }
        }
    }
    return proxy;
}

/**
 * JavassistProxyFactory的getProxy方法,继承自AbstractProxyFactory
 */
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
    Class<?>[] interfaces = null;
    //获取接口列表
    String config = invoker.getUrl().getParameter("interfaces");
    if (config != null && config.length() > 0) {
        //切分接口列表
        String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
        if (types != null && types.length > 0) {
             //设置服务接口类和EchoService.class到interfaces中
            interfaces = new Class<?>[types.length + 2];
            interfaces[0] = invoker.getInterface();
            interfaces[1] = EchoService.class;
            for (int i = 0; i < types.length; i++) {
                interfaces[i + 1] = ReflectUtils.forName(types[i]);
            }
        }
    }
    //如果接口列表为空,则设置它为服务接口以及回声测试接口
    if (interfaces == null) {
        interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
    }
    
    return getProxy(invoker, interfaces);
}

/**
 * JavassistProxyFactory实现的getProxy方法,非继承,获取服务接口代理类对象
 */
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    //通过字节码技术生成类Proxy的子类(Proxy是抽象类)
    //并调用Proxy子类的newInstance方法创建服务接口代理类实例,该实例维护一个InvokerInvocationHandler
    //代理类实例的每个方法实现都会调用InvokerInvocationHandler的invoke方法,将服务接口的方法参数以及
    //调用的方法反射对象Method传入。InvokerInvocationHandler的invoke方法在一系列检查后最终执行如下方法:
    //return invoker.invoke(new RpcInvocation(method, args)).recreate();
    return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

 

3.7.Proxy


  Proxy是一个抽象类,该类提供一个Proxy方法,该方法通过字节码技术生成Proxy的之类。该子类有一个方法,用于生成Invoker代理类。
//Proxy子类实例缓存
private static final Map<ClassLoader, Map<String, Object>> ProxyCacheMap = 
    new WeakHashMap<ClassLoader, Map<String, Object>>();

/**
 * 生成Proxy子类,该子类封装了生成服务接口代理类的逻辑
 */
public static Proxy getProxy(Class<?>... ics) {
    return getProxy(ClassHelper.getClassLoader(Proxy.class), ics);
}

/**
 * 生成Proxy子类,该子类封装了生成服务接口代理类的逻辑
 */
public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
    if (ics.length > 65535)
        throw new IllegalArgumentException("interface limit exceeded");

    StringBuilder sb = new StringBuilder();
    //遍历接口列表
    for (int i = 0; i < ics.length; i++) {
        String itf = ics[i].getName();
        //检测是否为接口类型
        if (!ics[i].isInterface())
            throw new RuntimeException(itf + " is not a interface.");
        //使用提供的ClassLoader,即Proxy自身的ClassLoader加载当前遍历的接口
        Class<?> tmp = null;
        try {
            tmp = Class.forName(itf, false, cl);
        } catch (ClassNotFoundException e) {
        }
        //检测接口是否相同,这里相当于判断Proxy的类加载器与接口的类加载器是否为一个
        if (tmp != ics[i])
            throw new IllegalArgumentException(ics[i] + " is not visible from class loader");
        //拼接接口名称,格式"接口名1;接口名2..."
        sb.append(itf).append(';');
    }

    //获取类加载器对应的Proxy缓存,一个Map对象
    Map<String, Object> cache;
    synchronized (ProxyCacheMap) {
        //获取当前类加载器的Proxy实例缓存,key为ClassLoader
        cache = ProxyCacheMap.get(cl);
        if (cache == null) {
            cache = new HashMap<String, Object>();
            //缓存为null,设置该ClassLoader的缓存
            ProxyCacheMap.put(cl, cache);
        }
    }
    
    //以接口名作为key,从cache中获取这个key的Proxy实例缓存
    String key = sb.toString();
    Proxy proxy = null;
    //获取value时的并发控制,使用监视器锁
    synchronized (cache) {
        do {
            //如果value就是应用类型包装的,直接从引用中获取实例
            Object value = cache.get(key);
            if (value instanceof Reference<?>) {
                proxy = (Proxy) ((Reference<?>) value).get();
                if (proxy != null)
                    return proxy;
            }
            //如果value不是引用类型,则进行判断其是否等于PendingGenerationMarker,即一个Object对象。
            //这是使用了cache.wait(),让其他线程在cache这个对象上进行等待,原因如下:
            //(1)首先一个在cache中未命中的key其value肯定为null;那么我们肯定要创建这个value;
            //(2)既然value==null,则进入到else逻辑,设置一个key的标识,跳出循环,也跳出监视器锁同步块;
            //(3)当前线程代码继续执行去创建Proxy的实例,其他线程进入到这个监视器锁块,就会进行循环获取Proxy;
            //(4)不断地循环获取满足条件的Reference也没错,但是这样不断疯狂的循环,对程序有影响;
            //(5)因此,设置PendingGenerationMarker的目的也在于此,作为一个标识,如果发现key的value还是它,
            //   就表示Proxy实例尚未创建完成,在此进行等待;直到实例创建完成并进行notify。
            //(6)当然,若使用监视器锁将创建Proxy的代码锁住也可以,但是这样锁住的代码块太大了。
            if (value == PendingGenerationMarker) {
                try {
                    cache.wait();
                } catch (InterruptedException e) {
                }
            } else {
                cache.put(key, PendingGenerationMarker);
                break;
            }
        }
        while (true);
    }

    //原子计数器+1,作为id,用于拼接生成的Proxy子类的名字
    long id = PROXY_CLASS_COUNTER.getAndIncrement();
    String pkg = null;
    //ccm用于为Proxy生成子类,ccp为服务接口生成代理类
    ClassGenerator ccp = null, ccm = null;
    try {
        /************************* 开始创建服务接口代理类 *************************/
        //创建生成服务接口代理类的ClassGenerator
        ccp = ClassGenerator.newInstance(cl);

        Set<String> worked = new HashSet<String>();
        List<Method> methods = new ArrayList<Method>();
        //遍历要代理的接口
        for (int i = 0; i < ics.length; i++) {
            //检测接口访问级别是否为public
            if (!Modifier.isPublic(ics[i].getModifiers())) {
                //不为public级别的接口,需要确保它们必须在同一个包下
                String npkg = ics[i].getPackage().getName();
                if (pkg == null) {
                    pkg = npkg;
                } else {
                    if (!pkg.equals(npkg))
                        throw new IllegalArgumentException("non-public interfaces from diff pack...");
                }
            }
            //添加接口到cpp中
            ccp.addInterface(ics[i]);
            //遍历服务接口的所有方法
            for (Method method : ics[i].getMethods()) {
                //获取方法描述,JVM格式"realTimePushList(Lcom/bestpay/messagecenter/product/
                //service/api/dto/push/RealTimePushListDTO;)Lcom/bestpay/dubbo/result/Result;"
                String desc = ReflectUtils.getDesc(method);
                //如果方法描述字符串已在worked中,则忽略。考虑A接口和B接口中包含一个完全相同的方法的情况
                if (worked.contains(desc))
                    continue;
                worked.add(desc);

                //服务接口代理类方法大小 TODO 
                int ix = methods.size();
                Class<?> rt = method.getReturnType();
                Class<?>[] pts = method.getParameterTypes();

                //拼接代码字符串"Object[] args = new Object[N];",N是当前遍历的method参数个数
                StringBuilder code = new StringBuilder("Object[] args = new Object[").
                                    append(pts.length).append("];");
                //遍历method的参数列表
                for (int j = 0; j < pts.length; j++)
                    //拼接代码字符串"args[j]=($w)$k;",其中k=j+1。这个是args的赋值语句
                    code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
                //拼接handler的调用语句,"Object ret = handler.invoke(this, methods[ix], args);"
                //handler是java动态代理InvocationHandler的一个实现类,ix为methods.size()。             
                code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");
                //若方法返回类型不为void,则拼接返回类型并进行强制类型转换"return (类型)ret;"
                if (!Void.TYPE.equals(rt))
                    code.append(" return ").append(asArgument(rt, "ret")).append(";");
                //向List<Method>中添加该method
                methods.add(method);
                //添加方法名、访问控制符、返回类型、参数列表、抛出异常类型、方法体代码到ClassGenerator中 
                ccp.addMethod(method.getName(), method.getModifiers(), rt, pts,
                              method.getExceptionTypes(), code.toString());
            }
        }
        //设置服务接口代理类包名就为Proxy类所在的包名
        if (pkg == null)
            pkg = PACKAGE_NAME;
        //设置服务接口代理类类名为"pkg + ".proxy" + id",比如org.apache.dubbo.proxy0,注意是小写
        String pcn = pkg + ".proxy" + id;
        ccp.setClassName(pcn);
        //添加成员属性Method[] methods
        ccp.addField("public static java.lang.reflect.Method[] methods;");
        //添加成员属性InvocationHandler
        ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
        //添加带有InvocationHandler参数的构造方法,比如:
        //public proxy0(java.lang.reflect.InvocationHandler $1) { handler=$1; }
        ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, 
                           new Class<?>[0], "handler=$1;");
        //添加默认无参构造器
        ccp.addDefaultConstructor();
        //生成服务接口代理Class
        Class<?> clazz = ccp.toClass();
        //将服务接口代理类的static属性methods设置为上面收集到的methods列表
        clazz.getField("methods").set(null, methods.toArray(new Method[0]));

        /************************* 开始创建Proxy类的子类 *************************/          
        String fcn = Proxy.class.getName() + id; //类名=Proxy全限定名+id,如Proxy1、Proxy2等      
        ccm = ClassGenerator.newInstance(cl); //创建生成Proxy子类的ClassGenerator       
        ccm.setClassName(fcn); //设置类名       
        ccm.addDefaultConstructor(); //添加默认构造器     
        ccm.setSuperClass(Proxy.class); //设置父类
        //添加方法newInstance,该方法调用构造方法,格式如下:
        //public Object newInstance(java.lang.reflect.InvocationHandler $1) { 
        //    return new org.apache.dubbo.proxy0($1);
        //}
        ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + 
                      " h){ return new " + pcn + "($1); }");
        Class<?> pc = ccm.toClass(); //生成Proxy子类Class
        proxy = (Proxy) pc.newInstance(); //生成Proxy子类的实例
    } catch (RuntimeException e) {
        throw e;
    } catch (Exception e) {
        throw new RuntimeException(e.getMessage(), e);
    } finally {
        //释放资源
        if (ccp != null)
            ccp.release();
        if (ccm != null)
            ccm.release();
        //同步设置该Proxy子类的实例缓存,使用弱引用
        synchronized (cache) {
            if (proxy == null)
                cache.remove(key);
            else
                cache.put(key, new WeakReference<Proxy>(proxy));
            //通知所有在cache上等待的线程
            cache.notifyAll();
        }
    }
    return proxy;
}

 

上一篇:PHP:带未定义常量的switch语句


下一篇:PHP和JavaScript的常见常量文件