Dubbo服务引用
文章目录
概念
之前一篇主要讲解了Dubbo服务暴露和服务注册的过程,今天我们来讲Dubbo服务引用的过程。如果之前一篇看得云里雾里的,那么今天这篇会更好地帮助你把之前的知识串联起来。等后面Dubbo服务调用的过程再写完,把之前所有的知识串联起来,大家就会对Dubbo有新的理解。
我们来看一下服务引用的结构图:
服务引用定义:“服务消费者”向注册中心订阅“服务提供者”提供的服务地址,并生成服务接口的实际代理对象。
服务引用的方式
服务引用有三种方式:
- 本地引用:就是本地既当提供者,又当消费者。前面服务暴露的时候讲过,Dubbo在服务暴露的时候,如果没有对scope进行配置为remote的话,会进行本地服务暴露,而且在Dubbo 2.2.0版本开始,每个服务默认都会在本地暴露。通过本地引入,可以减少调用的时间,因为避免了网络通信的过程。
- 直连服务引用:大家都知道Dubbo提供了服务提供者和服务消费者直连的功能,而不经过注册中心。消费者在配置中把服务提供者的地址直接写死。一般在开发和测试的时候使用。
- 注册中心引用:这个就是核心了。提供者把服务接口,地址注册在注册中心;消费者从注册中心去取要消费的服务的相关信息。而在注册中心,很可能是一个服务有多个服务提供者,注册中心在其中如何去做选择、容错、负载均衡。
服务引用时机
服务引用的时机有两个:
- Spring 容器调用 ReferenceBean 的 afterPropertiesSet 方法时引用服务,即Spring初始化的时候就进行引用。
- ReferenceBean 对应的服务被注入到其他类中时引用。
这两个时机的区别是,第一个是饿汉式,第二个是懒汉式。默认情况下,Dubbo 使用懒汉式引用服务。如果需要使用饿汉式,可通过配置 dubbo:reference的 init 属性开启。
服务引用源码解析
本文使用dubbo版本为2.8.4。
配置解析
首先依然是配置解析:
<dubbo:reference
id="demoQuery"
interface="cn.fraudmetrix.forseti.fp.dubbo.intf.DemoQuery"
version="${forseti.fp.dubbo.service.version}"
check="false"
/>
配置解析这块大概和之前服务暴露的过程差不多,都是通用的:
依旧是通过DubboNamespaceHandler加载标签解析器。然后通过DubboBeanDefinitionParser对响应标签解析成响应的Bean。Dubbo服务就初始化成ServiceBean;Dubbo引用则初始化成为一个ReferenceBean。
参考服务暴露过程:服务暴露
服务引用大致流程
接下来,我们还是来看一下服务引入的大致流程图,这样心中有个整体的印象:
服务引用的入口是在referenceBean类中的getObject()方法,再进入到referenceConfig的get()方法中。
从get方法中,进入到init()方法。因为Dubbo 提供了丰富的配置,用于调整和优化框架行为,性能等,所以init方法在前期会做一系列的配置检查,然后通过createProxy()方法,正式开始进行服务引用的过程。
进入到createProxy()方法中,主要分成四个步骤:
- 判断是否为本地引用,判断完成,给isJvmRefer字段赋值。
- 对isJvmRefer进行判断,为TRUE,则是本地引用,调用InjvmProtocol类中的refer()方法。
- 否则,进行对url字段进行判断(该字段是在init()方法中检查配置的时候,将从系统属性或配置文件中加载与接口名相对应的配置存到这个字段中)。如果URL有值,说明是用户指定的URL,这个URL可能是点对点直连的地址,也有可能是注册中心的地址。
- URL有值,则将该地址值放入到List urls 中;无值,则进行加载注册中心的操作,再把注册中心地址放入到List urls 中。然后调用RegistryProtocol中的refer()方法。
服务引用入口
public class ReferenceBean<T> extends ReferenceConfig<T>{
public Object getObject() throws Exception {
return get();
}
}
public class ReferenceConfig<T> extends AbstractReferenceConfig {
public synchronized T get() {
//所引用的服务是否被销毁
if (destroyed){
throw new IllegalStateException("Already destroyed!");
}
if (ref == null) {
init();
}
return ref;
}
private void init() {
...
//前面是一些配置获取与检查,是否合法
ref = createProxy(map);
}
}
通过上文介绍,dubbo默认是通过懒汉式进行引用加载,通过referenceBean进行服务引用。
通过调用里面的getObject()方法,继而调用referenceConfig中的get()方法。get()方法中,如果还没有这个引用,就进行init()初始化。在init()方法中,前期是对dubbo配置的检查,最后通过调用createProxy(map)进行服务引用的过程(其中,map里面存储的都是配置)。
ref判空导致无法初始化BUG修复
这里有个小问题,就是ref判断不为空,结果无法初始化。
按照官网的说法,这个问题在2.6.5 版本就已经修复了,但笔者用的是2.8.4却也碰到这个问题。
为什么ref没有进行引用,却会不为空呢?
这是因为IDEA为了显示对象的信息 ,会调用了toString方法获取对应对象的信息。
toString在调用了AbstractConfig类中的toString()方法,而这个方法会通过反射调用了 ReferenceBean 的 getObject 方法,触发了引入服务动作,所以ref判断跟我们预想的不一致。
可以看到,通过方法名进行判断是否进行调用,而getObject以"get"开头,所以会进行反射调用。
这是有位同学提的PR,但是一开始并没有被接受,因为其中一位项目成员觉得这个并不是bug,只要IDEA中设置一下就好了:把下图红圈中的不勾就可以了。
而另一位成员觉得这个PR还可以,而且最后DUBBO项目的负责人beiwei30也同意了。
于是,后面就加了一个这个方法名的判断。
createProxy()——服务引用的开始
接下来,我们进入createProxy()方法。
这个方法,其实我在开头的流程图中,核心流程就是按照这个方法来画的。
这个方法看起来长,但并不难。为了便于方便,我把这个方法分开来放,这样读起来清晰明了。
下面这段代码,主要是判断是否为本地引用,为isJvmRefer赋值,用于后面用哪个协议的引用方法做判断。
URL tmpUrl = new URL("temp", "localhost", 0, map);
final boolean isJvmRefer;
//是否为本地引用
if (isInjvm() == null) {
if (url != null && url.length() > 0) { //指定URL的情况下,不做本地引用
isJvmRefer = false;
} else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
//默认情况下如果本地有服务暴露,则引用本地服务.
isJvmRefer = true;
} else {
isJvmRefer = false;
}
} else {
isJvmRefer = isInjvm().booleanValue();
}
下面这段代码,如果判断是本地引用的话,构建一个走本地协议的URL,让protocol = LOCAL_PROTOCOL(inJvm),通过InjvmProtocol类中的refer方法进行引用。里面主要是通过服务暴露的exporterMap中去拿服务。
if (isJvmRefer) {
//构建一个走本地协议的URL
URL url = new URL(Constants.LOCAL_PROTOCOL,NetUtils.LOCALHOST,0,interfaceClass.getName())
.addParameters(map);
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
}
我们看一下exporterMap:
如果不是本地调用的话,就是远程调用了。
如果指定了URL,这个URL不是注册中心的地址,就是点对点直连的地址。通过这个URL去判断是否是直接连接到Provider还是通过注册中心拿到Provider地址,再连接Provider。
// 用户指定URL,指定的URL可能是点对点直连地址,也可能是注册中心URL
if (url != null && url.length() > 0) {
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
//从注册中心获取Provider ,将map转换为查询字符串,并作为refer参数的值添加到URL中
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
//点对点直连Provider,移除Provider的一些配置
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
}
下面这段代码的话,就是拿到注册中心的地址,通过注册中心拿Provider的地址,再去连接Provider的情况了。
List<URL> us = loadRegistries(false);
if (us != null && us.size() > 0) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls == null || urls.size() == 0) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
我们看一下URL:
下面这段代码,主要是对注册中心的判断:
- 如果只有一个注册中心(服务提供者)的话,调用 RegistryProtocol 的 refer 构建 Invoker 实例就可以了。
- 有多个注册中心(多个服务提供者),根据url上的协议,通过SPI机制拿到对应的XXXProtocol类,调用其中的refer()方法。根据URL构建出多个invoker,然后再通过 Cluster 合并多个 Invoker,只暴露出一个Invoker。
if (urls.size() == 1) {
// 调用 RegistryProtocol 的 refer 构建 Invoker 实例
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // 用了最后一个registry url
}
}
// 有注册中心协议的URL
if (registryURL != null) {
// 对有注册中心的Cluster 只用 AvailableCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
// 创建 StaticDirectory 实例,并由 Cluster 对多个 Invoker 进行合并
invoker = cluster.join(new StaticDirectory(u, invokers));
} else {
// 没有注册中心的URL
invoker = cluster.join(new StaticDirectory(invokers));
}
}
接下来,就是这个类的最后一段代码,只展示关键代码:
....
// 创建服务代理
return (T) proxyFactory.getProxy(invoker);
通过拿到的invoker,进行创建代理对象。
这里为什么要用代理对象呢,其实就是不想让调用者知道被调用方的实现细节。
再通过流程图来看一下createProxy的整体流程,回顾一下:
RegistryProtocol.Refer()——消费者端的注册与订阅
接下来,我们来看refer方法,首先我们看的是RegistryProtocol类中的refer方法,因为需要从注册中心中拿到Provider的地址。
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 取 registry 参数值,并将其设置为协议头
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
// 获取注册中心实例
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// 将 url 查询字符串转为 Map
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
// 获取 group 配置
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
|| "*".equals(group)) {
// 通过 SPI 加载 MergeableCluster 实例,并调用 doRefer 继续执行服务引用逻辑
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// 调用 doRefer 继续执行服务引用逻辑
return doRefer(cluster, registry, type, url);
}
然后进入到doRefer()的方法中:
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 创建 RegistryDirectory 实例
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
// 设置directory的注册中心和协议
directory.setRegistry(registry);
directory.setProtocol(protocol);
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
// 生成服务消费者链接
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
// 注册服务消费者,在 consumers 的目录下创建一个新节点
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
// 订阅 providers、configurators、routers 等节点数据
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
// 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个
Invoker invoker = cluster.join(directory);
//在服务者消费者的表中记录该信息
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
这里做订阅,主要是通过zk
这个方法中,主要是注册中心的创建、注册消费者到注册中心和做订阅。流程图如下:
这里在完成订阅之后,一个服务可能有多个Provider,providers下面就会有多个节点,需要cluster将多个服务节点合并成一个节点,生成一个invoker。而消费者要用这个服务的时候,会通过负载均衡机制选出一个节点,提供给消费者。
DubboProtocol.refer()——创建服务代理Invoker
上面拿到了Provider之后,通过监听器就会触发DubboProtocol类中的refer方法了。
这里先创建一个DubboInvoker:
- 在服务提供方,Invoker用于调用服务提供类。
- 在服务消费方,Invoker用于执行远程调用。
下面这段代码的主要作用是创建DubboInvoker,我们主要关注一下getClients(url)这个方法,这个方法的作用是获取客户端实例,从而和Provider进行连接通信,实例类型为 ExchangeClient,该类本身并无通信能力,底层依靠Netty进行通信。
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// 创建 DubboInvoker
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
下面这段代码ExchangeClient的获取。
private ExchangeClient[] getClients(URL url) {
// 是否共享连接
boolean service_share_connect = false;
// 获取连接数,默认为0,表示未配置
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
// 如果未配置 connections,则共享连接,默认共享连接
if (connections == 0) {
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect) {
// 获取共享客户端
clients[i] = getSharedClient(url);
} else {
// 初始化新的客户端
clients[i] = initClient(url);
}
}
return clients;
}
我们可以看到,通过连接数connections来判断连接方式,默认为共享连接。
接下来,我们再来看getSharedClient(url)和initClient(url)。
getSharedClient(url) ——获取共享客户端
下面这段代码中会先去缓存中看有没有这个URL对应的客户端,如果没有的话,也会调用initClient(url)去创建一个新的客户端。如果有的话,如果客户端没有关闭,进行引用计数,否则的话,将该客户端从缓存中移除,再创建一个新的客户端。
private ExchangeClient getSharedClient(URL url){
String key = url.getAddress();
// 获取带有“引用计数”功能的 ExchangeClient
ReferenceCountExchangeClient client = referenceClientMap.get(key);
if ( client != null ){
if ( !client.isClosed()){
// 增加引用计数
client.incrementAndGetCount();
return client;
} else {
// logger.warn(new IllegalStateException("client is closed,but stay in clientmap .client :"+ client));
referenceClientMap.remove(key);
}
}
// 创建 ExchangeClient 客户端
ExchangeClient exchagneclient = initClient(url);
// 将 ExchangeClient 实例传给 ReferenceCountExchangeClient,这里使用了装饰模式
client = new ReferenceCountExchangeClient(exchagneclient, ghostClientMap);
referenceClientMap.put(key, client);
ghostClientMap.remove(key);
return client;
}
接下来,我们再看initClient()这个方法。
initClient(url) 初始化Netty客户端
下面这段代码中我们可以看到,先获取客户端的类型,并且默认是Netty 。通过lazy配置,决定创建的客户端类型。
private ExchangeClient initClient(URL url) {
// 获取客户端类型,默认为 netty
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
// 添加编解码和心跳包参数到 url 中
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// 检测客户端类型是否存在,不存在则抛出异常
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: ...");
}
ExchangeClient client;
try {
// 获取 lazy 配置,并根据配置值决定创建的客户端类型
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
// 创建懒加载 ExchangeClient 实例
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
// 创建普通 ExchangeClient 实例
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service...");
}
return client;
}
可以看一下connect返回结果。是一个HeaderExchangeClient类型,里面封装的客户端是Netty。
我们看一下最终返回的DubboInvoker:
DubboProtocol.refer()流程图
我们来总结一下DubboProtocol的流程图:
首先refer方法会进行创建一个DubboInvoker,在创建Invoker过程中,需要进入到getClient方法中,得到客户端;客户端的实例类型是ExchangeClient ,这个实例本身不具备通信能力,他是通过底层的客户端进行通信的,比如Netty(默认)、MinaClient 等。进入到getClient方法中,先通过URL获取connections的值,由这个值决定是进行共享客户端还是创建新的客户端,默认使用共享客户端(即如果判断为0,则设置为共享客户端),接下来会判断是否为共享客户端,如果是,分两种情况:
- 是共享客户端,进入到getSharedClient方法中,先从缓存中获取客户端,如果客户端存在的话,则进行引用计数,返回该客户端;如果不存在,则新建客户端,进入到initClient方法中。
- 不是共享客户端,进入到initClient方法中。首先是获取客户端类型,默认为Netty;接着将一些参数添加到URL中,如心跳包参数、编码格式等;然后会从URL中获取lazy 配置,根据这个配置来决定创建哪种类型的客户端,一种是懒加载 ExchangeClient 实例,另一种是普通 ExchangeClient 实例。后面的话就是如何创建Netty客户端了。
总结
本文主要介绍了Dubbo服务引用的过程,整个过程来说并不是特别难。
主要的话就是先确定是哪种引用方式:本地引用、直接引用和注册中心引用。
如果是本地引用的话,会调用进行本地引用的injvmProtocol.refer()方法。
然后如果是注册中心引用的话,会先去注册中心注册自己,然后订阅providers、configurators、routers 等节点数据,得到Provider的IP地址后,会通过DubboProtocol.refer()方法创建出一个DubboInvoker对象出来,在这个过程中我们重点要关注ExchangeClient的由来,有两种方式:
(1)判断是否为共享客户端,如果是共享客户端的话,从缓存中拿;如果缓存中没有,就新建一个客户端。
(2)不是共享客户端,直接新建一个客户端。
创建完该客户端,最终会返回一个代理对象,消费者通过代理对象访问生产者,而生产者被访问的时候可以屏蔽自己的细节不被消费者所知。