我们知道,dubbo的生产者和消费者的关系维护在注册中心,所以,消费者关联生产者肯定是需要订阅注册中心的相关生产者信息才能完成,在Dubbo源码解析之registry注册中心这篇文章中我们分析了dubbo有关注册中心的一些操作如注册、订阅等,在文章的最后,我们分析了消费者订阅注册中心的configuration、routers、providers等信息的流程,在处理provider信息的时候,会创建invoker,invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);我们就接着invoker的创建继续分析,来看consumer如何关联provider。
DubboProtocol:
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
/* 获取client,构建rpc invoker */
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
DubboProtocol:
private ExchangeClient[] getClients(URL url) {
// 是否共享连接
boolean service_share_connect = false;
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
if (connections == 0) {
service_share_connect = true;
connections = 1;
}
// 如果未配置,则共享连接,否则一个服务一个连接
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect) {
/* 获取共享的client */
clients[i] = getSharedClient(url);
} else {
/* 初始化client */
clients[i] = initClient(url);
}
}
return clients;
}
DubboProtocol:
private ExchangeClient getSharedClient(URL url) {
String key = url.getAddress();
ReferenceCountExchangeClient client = referenceClientMap.get(key);
if (client != null) {
if (!client.isClosed()) {
// 缓存不为空并且client未关闭,增加引用计数
client.incrementAndGetCount();
return client;
} else {
// 关闭的client从缓存中移除
referenceClientMap.remove(key);
}
}
synchronized (key.intern()) {
/* 初始化client */
ExchangeClient exchangeClient = initClient(url);
// 包装client
client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
referenceClientMap.put(key, client);
ghostClientMap.remove(key);
return client;
}
}
DubboProtocol:
private ExchangeClient initClient(URL url) {
// client 类型,默认netty
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
String version = url.getParameter(Constants.DUBBO_VERSION_KEY);
boolean compatible = (version != null && version.startsWith("1.0."));
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
// 默认启动心跳检测
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// BIO是不允许的,因为它有严重的性能问题
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
/* 建立连接 */
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
Exchangers:
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
/* 建立连接 */
return getExchanger(url).connect(url, handler);
}
Transporters:
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
/* 建立连接 */
return getTransporter().connect(url, handler);
}
NettyTransporter:
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
/* 创建client */
return new NettyClient(url, listener);
}
NettyClient:
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
/* handler包装过程我们在provider暴露过程中分析过 */
super(url, wrapChannelHandler(url, handler));
}
AbstractClient:
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler); // 父类初始化
send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);
reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);
try {
doOpen(); /* 开启服务 */
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
try {
connect(); /* 建立连接 */
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
}
} catch (RemotingException t) {
if (url.getParameter(Constants.CHECK_KEY, true)) {
close(); // 异常关闭操作,如关闭channel等
throw t;
} else {
logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
}
} catch (Throwable t) {
close(); // 异常关闭操作,如关闭channel等
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
}
NettyClient:
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
bootstrap = new ClientBootstrap(channelFactory);
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getTimeout());
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
}
开启服务就是调用netty3的API来构建ClientBootstrap。
AbstractClient:
protected void connect() throws RemotingException {
connectLock.lock();
try {
if (isConnected()) {
return;
}
/* 初始化重连线程 */
initConnectStatusCheckCommand();
doConnect(); /* 连接操作 */
if (!isConnected()) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: Connect wait timeout: " + getTimeout() + "ms.");
} else {
if (logger.isInfoEnabled()) {
logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", channel is " + this.getChannel());
}
}
reconnect_count.set(0);
reconnect_error_log_flag.set(false);
} catch (RemotingException e) {
throw e;
} catch (Throwable e) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: " + e.getMessage(), e);
} finally {
connectLock.unlock();
}
}
AbstractClient:
private synchronized void initConnectStatusCheckCommand() {
int reconnect = getReconnectParam(getUrl());
if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())) {
// 重连任务
Runnable connectStatusCheckCommand = new Runnable() {
public void run() {
try {
if (!isConnected()) {
connect(); // 建立连接
} else {
lastConnectedTime = System.currentTimeMillis();
}
} catch (Throwable t) {
String errorMsg = "client reconnect to " + getUrl().getAddress() + " find error . url: " + getUrl();
// 等待注册中心同步provider列表
if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout) {
if (!reconnect_error_log_flag.get()) {
reconnect_error_log_flag.set(true);
logger.error(errorMsg, t);
return;
}
}
if (reconnect_count.getAndIncrement() % reconnect_warning_period == 0) {
logger.warn(errorMsg, t);
}
}
}
};
// 固定间隔执行重连任务
reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);
}
}
NettyClient:
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
// 获取连接地址建立连接
ChannelFuture future = bootstrap.connect(getConnectAddress());
try {
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
if (ret && future.isSuccess()) {
Channel newChannel = future.getChannel();
newChannel.setInterestOps(Channel.OP_READ_WRITE);
try {
// 关闭旧的channel
Channel oldChannel = NettyClient.this.channel;
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
}
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
// 如果client关闭了,则新的channel也要关闭
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
} else {
NettyClient.this.channel = newChannel;
}
}
} else if (future.getCause() != null) {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + ", error message is:" + future.getCause().getMessage(), future.getCause());
} else {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
}
} finally {
if (!isConnected()) {
future.cancel();
}
}
}
整个建立连接开启client的过程和我们分析provider暴露过程时开启NettyServer的流程非常相似,请求的发送和响应的流程我们也分析过,consumer和provider复用相同的方法,到这里,consumer关联provider的流程就分析完成了。