Dubbo初窥工作流程
流程图
- 服务提供者启动时,向注册中心写入自己的元数据信息,同时会订阅配置元数据信息。
- 消费者启动时,也会向注册中心写入自己的元数据信息,并订阅服务提供者、路由和 配置元数据信息。
- 如果有配置monitor,启动时,会同时订阅所有消费者、服务提供者、路由和 配置元数据信息。
- 当服务提供者有变动时(如离开或有新的服务提供者加入),注册中心将变更信息通知给消费者、服务治理中心。
- 当消费者发起服务调用时,会0将调用、统计信息等上报给监控中心 。(monitor)
服务提供者注册服务
当服务启动时,先初始化服务实例,再通过Proxy组件调用协议(Protocol ),把服务端要暴露的接口封装成Invoker ,然后转换成Exporter,这个时候框架会打开服务端口等并记录服务实例到内存中,最后通过Registry把服务元数据注册到注册中心。
- Proxy:服务代理层。在使用Dubbo中,我们发现无论是服务端或是消费端,都不用像编写socket那样一个暴露并监听地址端口,一个订阅对应的地址端口。而这得益于Proxy通过事先约好的配置规则帮我们生成一层代理。
- Invoker :一个可执行体,在Dubbo中允许向它发起invoke调用,它可能是执行一个本地的 接口实现,也可能是一个远程的实现,还可能一个集群实现。
- Protocol :远程调用层。封装RPC调用具体过程,Protocol是Invoker暴露(发布一个服务让别人 可以调用)和引用(引用一个远程服务到本地)的主功能入口,它负责管理Invoker的 整个生命周期。
- Exporter:用于暴露到注册中心的对象,它的内部属性持有了 Invoker对象,我们可以 认为它在Invoker上包了 一层。
- Registry:该模块把Exporter注册到注册中心。
private void doExportUrls() {
// 获取注册中心实例
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
ApplicationModel.initProviderModel(pathKey, providerModel);
// 如果协议有多个(dubbo、rest),会依次暴露
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
doExportUrlsFor1Protocol源码,这里只列举了部分:
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();
if (StringUtils.isEmpty(name)) {
name = DUBBO;
}
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, PROVIDER_SIDE);
// 读取其他信息到map中,方便后边构造url
appendRuntimeParameters(map);
appendParameters(map, metrics);
/**
各种情况下, 参数的添加, 例如, 如果有指定调用方法, 还需要把方法名等信息添加到map中
...
**/
// export service
// 获取地址和端口
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
// 判断是否有在配置里头指定url
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url); // 本地服务暴露
}
// export to remote if the config is not local (export to local only when config is local)
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (!isOnlyInJvm() && logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
//if protocol is only injvm ,not register
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
URL monitorUrl = loadMonitor(registryURL);
// 如果监控地址不为空, 还会上报相关信息
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
// 通过动态代理转换成Invoker, registryURL存储的是注册中心地 I址,使用export作为key追加服务元数据信息
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 暴露服务、向注册中心注册服务信息|
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
// 处理没有注册中心的情况, 直接暴露
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
/**
* @since 2.7.0
* ServiceData Store
*/
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
metadataReportService.publishProvider(url);
}
}
}
this.urls.add(url);
}
服务消费者订阅服务
- 消费者在启动时,需要通过持有远程服务实例生成Invoker,这个Invoker在客户端是核心的远程代理对象。
- 接着会把Invoker通过动态代理转换成实现用户接口的动态代理引用。此时,这里的Invoker承载了网络连接、服务调用和重试等功能,在客户端,它可能是一个远程的实现,也可能是一个集群实现。
在ReferenceConfig#createProxy方法中:
private T createProxy(Map<String, String> map) {
// 检查是否为同一个JVM内部引用
if (shouldJvmRefer(map)) {
URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
invoker = REF_PROTOCOL.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
urls.clear(); // reference retry init will add url to urls, lead to OOM
// 如果指明了地址
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
url = url.setPath(interfaceName);
}
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // assemble URL from register center's configuration
// if protocols not injvm checkRegistry
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())){
checkRegistry();
、
List<URL> us = loadRegistries(false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
// URL拼接消费服务的元数据信息, 并添加到集合urls中
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
}
if (urls.size() == 1) {
// 单注册中心时的处理
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
// 多注册中心时的处理
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
// 逐个获取注册中心的服务, 并添加到invokers列表
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// use RegistryAwareCluster only when register's CLUSTER is available
URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);
// The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
// 合并多个invoker
invoker = CLUSTER.join(new StaticDirectory(u, invokers));
} else { // not a registry url, must be direct invoke.
invoker = CLUSTER.join(new StaticDirectory(invokers));
}
}
}
if (shouldCheck() && !invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
/**
* @since 2.7.0
* ServiceData Store
*/
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
metadataReportService.publishConsumer(consumerURL);
}
// create service proxy
// 把invoker转成接口的代理
return (T) PROXY_FACTORY.getProxy(invoker);
}
关于注册中心实例的创建,元数据注册到注册中心及订阅的功能,主要在RegistryProtocol#refe方法中:
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 设置具体的注册协议
url = URLBuilder.from(url)
.setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
.removeParameter(REGISTRY_KEY)
.build();
// 创建具体的注册中心的实例
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
// 提取消费方refer中保存的元数据信息,如果包含多个分组值则会把调用结果值做合并处理
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// 处理订阅数据并通过 cluster 合并多个Invoker
return doRefer(cluster, registry, type, url);
}
doRefer方法:
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 消费核心关键,持有实际 Invoker 和接收订阅通知, 服务变更会触发这个类回调notify方法,用于重新引用服务
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
// 注册消费信息到注册中心
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
// 订阅服务提供者、路由和动态配置
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
// 通过 Cluster 合并 invokers, 同时默认也会启用FailoverCluster策略进行服务调用重试
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
(未完,待续…)