服务暴露
远程服务暴露
源码讲解:
直接从ServiceConfig的exportUrl开始讲解
//org.apache.dubbo.config.ServiceConfig#exportUrl
private void exportUrl(URL url, List<URL> registryURLs) {
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
//本地暴露
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
//远程暴露,执行到这里
url = exportRemote(url, registryURLs);
MetadataUtils.publishServiceDefinition(url);
}
}
this.urls.add(url);
}
private URL exportRemote(URL url, List<URL> registryURLs) {
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
if (SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())) {
url = url.addParameterIfAbsent(SERVICE_NAME_MAPPING_KEY, "true");
}
//if protocol is only injvm ,not register
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
if (monitorUrl != null) {
url = url.putAttribute(MONITOR_KEY, monitorUrl);
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
if (logger.isInfoEnabled()) {
if (url.getParameter(REGISTER_KEY, true)) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url.getServiceKey() + " to registry " + registryURL.getAddress());
} else {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url.getServiceKey());
}
}
doExportUrl(registryURL.putAttribute(EXPORT_KEY, url), true);
}
} else {
if (MetadataService.class.getName().equals(url.getServiceInterface())) {
localMetadataService.setMetadataServiceURL(url);
}
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
//执行到这里
doExportUrl(url, true);
}
return url;
}
private void doExportUrl(URL url, boolean withMetaData) {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
if (withMetaData) {
invoker = new DelegateProviderMetaDataInvoker(invoker, this);
}
//执行到这里,调用RegistryProtocol的export
Exporter<?> exporter = protocolSPI.export(invoker);
exporters.add(exporter);
}
//org.apache.dubbo.registry.integration.RegistryProtocol#export
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
URL registryUrl = getRegistryUrl(originInvoker);
// url to export locally
URL providerUrl = getProviderUrl(originInvoker);
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
Map<URL, NotifyListener> overrideListeners = getProviderConfigurationListener(providerUrl).getOverrideListeners();
overrideListeners.put(registryUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//我们主要看这里,这里是 服务端 进行服务暴露的入口
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
final Registry registry = getRegistry(registryUrl);
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// decide if we need to delay publish
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
register(registry, registeredProviderUrl);
}
// register stated url on provider model
registerStatedUrl(registryUrl, registeredProviderUrl, register);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
notifyExport(exporter);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
//判断是否已经暴露过, key的内容为:com.jiangzheng.course.dubbo.api.service.ServiceDemo:20880
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
//执行到此处的protocol.export方法,最终调用DubboProtocol类
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
checkDestroyed();
URL url = invoker.getUrl();
// 将业务调用的实现类或代理类相关 放入到缓存里
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
}
}
//指定到这里,开启服务
openServer(url);
optimizeSerialization(url);
return exporter;
}
private void openServer(URL url) {
checkDestroyed();
// 获取本地地址或域名
String key = url.getAddress();
//客户端可以导出仅由服务器调用的服务
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
//判断是否已经暴露过了
ProtocolServer server = serverMap.get(key);
if (server == null) {//没有的话向下执行
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
//执行到这里,此处调用createServer方法进行创建服务
serverMap.put(key, createServer(url));
}else {
server.reset(url);
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
private ProtocolServer createServer(URL url) {
url = URLBuilder.from(url)
// send readonly event when server closes, it's enabled by default
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// enable heartbeat by default
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
//上面进行url的创建
ExchangeServer server;
try {
//执行到此处,调用Exchangers.bind方法
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
DubboProtocolServer protocolServer = new DubboProtocolServer(server);
loadServerProperties(protocolServer);
return protocolServer;
}
//org.apache.dubbo.remoting.exchange.Exchangers#bind(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.exchange.ExchangeHandler)
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
//执行到此处,调用bind方法
return getExchanger(url).bind(url, handler);
}
//org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
//执行到此处,调用Transporters.bind方法
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
//org.apache.dubbo.remoting.Transporters#bind(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.ChannelHandler...)
public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
//通过getTransporter获取到Transporter后调用 bind
return getTransporter(url).bind(url, handler);
}
//此处通过SPI获取到了 org.apache.dubbo.remoting.transport.netty4.NettyTransporter
public static Transporter getTransporter(URL url) {
return url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
//org.apache.dubbo.remoting.transport.netty4.NettyTransporter#bind
@Override
public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
//创建netty服务
return new NettyServer(url, handler);
}
//org.apache.dubbo.remoting.transport.netty4.NettyServer#NettyServer
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
//调用supper
super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
// read config before destroy
serverShutdownTimeoutMills = ConfigurationUtils.getServerShutdownTimeout(getUrl().getOrDefaultModuleModel());
}
//org.apache.dubbo.remoting.transport.AbstractServer#AbstractServer
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
executorRepository = url.getOrDefaultApplicationModel().getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
//获取本机地址
localAddress = getUrl().toInetSocketAddress();
//服务绑定的IP的信息
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
//服务绑定的端口号的信息
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = ANYHOST_VALUE;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
try {
//执行到这里,此处是现在NettyServer中
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
executor = executorRepository.createExecutorIfAbsent(url);
}
//org.apache.dubbo.remoting.transport.netty4.NettyServer#doOpen
@Override
protected void doOpen() throws Throwable {
//netty固定api写法
bootstrap = new ServerBootstrap();
//线程数为1,主要用于netty的端口监听
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, EVENT_LOOP_BOSS_POOL_NAME);
//int DEFAULT_IO_THREADS = Math.min(Runtime.getRuntime().availableProcessors() + 1, 32);
//线程数设置与CPU数量是相关的,主要用于真实处理调用请求的
workerGroup = NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
EVENT_LOOP_WORKER_POOL_NAME);
//此处与业务相关,放于之后进行讲解,此处先忽略
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);
//bootstrap相关设置
bootstrap.group(bossGroup, workerGroup)
//NettyEventLoopFactory.serverSocketChannelClass()此方法用于返回使用epoll还是NIO模式
.channel(NettyEventLoopFactory.serverSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_KEEPALIVE, keepalive)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
//当有channel过来的时候进行处理
protected void initChannel(SocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
//此处用于配置decoder和encoder
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {//是否需要开启SSL
ch.pipeline().addLast("negotiation", new SslServerTlsHandler(getUrl()));
}
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
netty启动之后,还需要维护它,如果服务挂掉 需要有一定的重启机制,所以我们看下 之前调用的地方org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind
//org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
//上面我们说的是 Transporters.bind 的逻辑,我们看下 new HeaderExchangeServer的逻辑
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
//org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeServer#HeaderExchangeServer
public HeaderExchangeServer(RemotingServer server) {
Assert.notNull(server, "server == null");
this.server = server;
//执行到此处
startIdleCheckTask(getUrl());
}
private void startIdleCheckTask(URL url) {
if (!server.canHandleIdle()) {
AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels());
//获取一个超时时间,默认为3分钟
int idleTimeout = getIdleTimeout(url);
//因为重试3次,所以此处获取获取心跳时间间隔,所以默认为1分钟
long idleTimeoutTick = calculateLeastDuration(idleTimeout);
CloseTimerTask closeTimerTask = new CloseTimerTask(cp, idleTimeoutTick, idleTimeout);
//创建一个定时任务,每分钟获取下心跳,如果超过3分钟没有心跳 则服务重启
this.closeTimer = IDLE_CHECK_TIMER.get().newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS);
}
}