dubbo-源码分析Provider

Dubbo provider启动原理:

当我们的dubbo启动我们的spring容器时spring 初始化容器的时候会查找META-INF/spring.handles文件查找对应的NamespaceHandle,dubbo在其jar包下配置了DubboNamespaceHandle,该类下有以下配置项:

registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true));
registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());

意思就是当spring 在解析容器的时候遇到指定配置会使用对应的Parser去解析配置项。

provider

我们提供者主要会配置对应的application、registry、protocol、service所以我们一个一个来看,我们先来看service 根据上面所说,我们配置了<dubbo:service>这个配置项的话就会生成ServiceBean对象,并注册到容器里,那我们来看下serviceBean这个类:

public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware,
ApplicationEventPublisherAware

该类主要实现了上面几个接口,我们来看其中最主要的InitializingBean,该类会在类实例化后调用其中的afterPropertiesSet方法 ,所以我们来看下:

public void afterPropertiesSet() throws Exception {
...
//上面一大堆代码都是判空去重新赋值的代码,我们不关注他们,最主要是下面这个export方法
if (!supportedApplicationListener) {
export();
}
}
public synchronized void export() {
checkAndUpdateSubConfigs();

if (provider != null) {
if (export == null) {
export = provider.getExport();
}
if (delay == null) {
delay = provider.getDelay();
}
}
if (export != null && !export) {
return;
}

//当设置了延时发布时用定时器延时发布
if (delay != null && delay > 0) {
delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS);
} else {
//否则的话直接发布
doExport();
}
}

export方法最主要是判断是有配置了延时发布,是的话就用schedule去延时发布,否的话doExport发布,在spring中真正干活的都是do开头的方法,我们再继续查看doExport方法

 protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("Already unexported!");
}
if (exported) {
return;
}
exported = true;

if (path == null || path.length() == 0) {
path = interfaceName;
}
//生成唯一serviceName group/interfaceClass 如group/com.xx.xxx
//ref 接口实现bean 服务的真正提供者
//interfaceClass 需要发布的接口服务
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), ref, interfaceClass);
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
//发布url
doExportUrls();
}

这里主要查看doExportUrls()方法,上面的是把服务信息存到本地map里:

@SuppressWarnings({"unchecked", "rawtypes"})
private void doExportUrls() {
//这里会获取到注册中心的列表如有配置多个的话,
// 格式:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=user-service&dubbo=2.0.2&pid=16232&registry=zookeeper&release=2.7.0&timestamp=1553883173387
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}

所以这里说明dubbo是支持多协议的多注册中心的,提前预告,下面这个doExportUrlsFor1Protocol方法会很长很复杂:

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//获取协议的名称如dubbo
String name = protocolConfig.getName();
if (name == null || name.length() == 0) {
//默认是dubbo
name = Constants.DUBBO;
}

Map<String, String> map = new HashMap<String, String>();
//组装map 再填充到url上
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
appendRuntimeParameters(map);
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
appendParameters(map, this);
//这里省略了一堆设置url的代码,主要是把接口的配置方法加到参数列表里如果method 重试次数
....
if (ProtocolUtils.isGeneric(generic)) {
map.put(Constants.GENERIC_KEY, generic);
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}

String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(Constants.TOKEN_KEY, token);
}
}
//是否injvm也就是本地发布不上注册中心
if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
protocolConfig.setRegister(false);
map.put("notify", "false");
}
// export service
String contextPath = protocolConfig.getContextpath();
if ((contextPath == null || contextPath.length() == 0) && provider != null) {
contextPath = provider.getContextpath();
}

//获取绑定的有效IP地址
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
//获得一个绑定端口
Integer port = this.findConfigedPorts(protocolConfig, name, map);

//创建一个url
//dubbo://192.168.1.2:20882/com.lin.service.UserService?anyhost=true&application=user-service
// &bean.name=com.lin.service.UserService&bind.ip=192.168.1.2&bind.port=20882&dubbo=2.0.2
// &generic=false&group=userGroup&interface=com.lin.service.UserService&methods=add,findUserByName,findUserById
// &pid=16232&release=2.7.0&side=provider&timestamp=1553884198155
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}

String scope = url.getParameter(Constants.SCOPE_KEY);
// don't export when none is configured
//当scope为none的时候不发布
if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) {

// export to local if the config is not remote (export to remote only when config is remote)
//当scope不是remote的时候发布本地服务
if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
//当scope不是local的时候发布远程服务
if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && !registryURLs.isEmpty()) {
//当注册中心有多个的时候会发布到多个注册中心
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
//监控url
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}

// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(Constants.PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}

//proxyFactory是一个自适应的扩展点,所以是一个proxyFactory$Adaptive
//默认会有这几个
//stub=org.apache.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper
//jdk=org.apache.dubbo.rpc.proxy.jdk.JdkProxyFactory
//javassist=org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory
//当url里有proxy=xxx的时候就取xxxProxyFactory,如果没有的话默认就是JavassistProxyFactory
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
//创建一个Invoker的包装类DelegateProviderMetaDataInvoker
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//protocol是一个自动扩展点,所以会返回一个ivoker.getURL().protocol/Protocol的一个对象 如RegistryProtocol、DubboProtocol
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
/**
* @since 2.7.0
* ServiceData Store
*/
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
//上报元数据中心
metadataReportService.publishProvider(url);
}
}
}
this.urls.add(url);
}

这里涉及到一个自适应扩展点的概念,具体什么是自适应扩展点可以到dubbo官网上看,那里介绍的很详细

因为这里要注册的URL是registry://192.168.xxxx这样格式的,又因为protocol又是一个Protocol自适应扩展点

  private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();


Exporter<?> exporter = protocol.export(wrapperInvoker);

所以我们能得到protocol.export这里里面是调用的RegistryProtocol里面的export方法,所以我们再来看下这个方法:

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//获得注册地址
//zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=user-service...
URL registryUrl = getRegistryUrl(originInvoker);
// url to export locally
//获取提供者发布url
//dubbo://192.168.1.2:20882/com.lin.service.UserService?anyhost=true&application=user-service&bean.name=...
URL providerUrl = getProviderUrl(originInvoker);

// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.

final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//export invoker
//本地发布服务 也就是服务发布到netty容器里
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

// url to registry
//获得注册中心地址
final Registry registry = getRegistry(originInvoker);
//获得要注册的提供者URL
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
//to judge if we need to delay publish
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
//注册到注册中心
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}

// Deprecated! Subscribe to override rules in 2.6.x or before.
//订阅url
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}

我们一步一步来分析上面的代码,首先我这边的注册中心用的是zookeeper所以拿到的注册地址是zookeeper协议的,服务提供者也是用的默认的dubbo协议,那我们下一步来看下服务真正发布的方法doLocalExport方法:

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {

final Invoker<?> invokerDelegete = new InvokerDelegate<T>(originInvoker, providerUrl);
//这里protocol又是一个自适应扩展点,所以里面会调用invoker.getUrl.getProtocol+"Protocol"的export()方法
//如 DubboProtocol.export();
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}

一开始就是一个双重检查锁,我们不管他,直接关注我们的protocol.export方法,跟上面的一样,protocol也是一个自适应扩展点,所以里面实际用的是我们invoker.getUrl.getProtocol+"Protocol"的export()方法,我们invoker的url呢又是我们上面包装进去的providerUrl也就是dubbo://xxxx这个url,所以最终调用的就是DubboProtocol的export方法:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();

// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);

//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}

openServer(url);
optimizeSerialization(url);
return exporter;
}

上面那些代码我们不关心先不看了,我们直接可以看到有一个openServer(url)的方法,根据名字我们就可以猜到这里就是开启我们服务的地方了,我们来看下:

private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
//创建服务
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
private ExchangeServer createServer(URL url) {

   ...
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
...
return server;
}

我们可以看到这里Exchangers.bind(url,requestHandler),这里呢最终会调到:

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
//这里getTransporter也是获得一个自适应的扩展点,如果没有配置的话默认就是用的NettyTransporter
return getTransporter().bind(url, handler);
}

再里面就是发布到Netty容器了,有兴趣的可以自己去看下,现在这里我们的服务就已经发布了,下面还有注册到注册中心,我们再看下我们上面的RegistryProtocol的export方法里面的注册服务的代码:

// url to registry
//获得注册中心地址
final Registry registry = getRegistry(originInvoker);
//获得要注册的提供者URL
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
//to judge if we need to delay publish
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
//注册到注册中心
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}

主要还是register这个方法:

public void register(URL registryUrl, URL registeredProviderUrl) {
//因为registryFacotry是一个自适应的扩展点,所以会返回一个zookeeperRegistry,如果是redis://的话就返回一个RedisRegistry
Registry registry = registryFactory.getRegistry(registryUrl);
//注册到注册中心
registry.register(registeredProviderUrl);
}

根据环境我们会获得一个ZookeeperRegistry所以我们再看下zookeeperRegistry的register方法:

因为register这个方法zookeeperRegistry并没有去实现它,所以一定是在父类的register我们继续看他父类FailbackRegistry的register方法:

public void register(URL url) {
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// Sending a registration request to the server side
//注册服务 是模版模式,所以会在子类实现
doRegister(url);
} catch (Exception e) {
Throwable t = e;

// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}

// Record a failed registration request to a failed list, retry regularly
addFailedRegistered(url);
}
}

这方法主要注册的就是doRegister方法了,因为是模版模式,所以这个方法zookeeperRegistry自己实现了这个方法:

@Override
public void doRegister(URL url) {
try {
//创建一个临时节点
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}

所以这里就是最终注册的地方了,这里会根据url去创建一个服务的临时节点,到这服务的发布和注册就已经完成了,其他地方有兴趣的可以自己去看下源码,dubbo里很多地方都用到了自适应扩展点这个概念,所以如果要看源码就要先去理解什么是自适应扩展点。

上一篇:sql 时间转换问题 from_unixtime() UNIX_TIMESTAMP()


下一篇:封装对Cookie和Session设置或取值的类