前言
在《dubbo架构篇》中可知通信层主要有2局份组成Transporter和Exchanger。Transporter负责netty的封装,Exchanger负责将异步同步化。
Transporter
结合《Netty概述》中介绍其API。dubbo对其进行了如下封装:
public class TransportTest {
public static void main(String[] args) throws Exception {
URL url = URL.valueOf("dubbo://127.0.0.1:28092/");
RemotingServer server = Transporters.bind(url, new ChannelHandlerAdapter() {
@Override
public void received(Channel channel, Object message) throws RemotingException {
System.out.println("accept msg:" + message);
channel.send("ok");
}
});
Client client = Transporters.connect(url, new ChannelHandlerAdapter(){
@Override
public void received(Channel channel, Object message) throws RemotingException {
System.out.println("client accept msg:" + message);
}
});
client.send("yoyoyoy");
}
}
通过源码了解到期主要类NettyServer
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
// 主线程
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
// io线程
workerGroup = NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
"NettyServerWorker");
// 代理handler,最终会指向
// @see ChannelHandlers.wrapInternal
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NettyEventLoopFactory.serverSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
// 获取根据url参数及spi接口Codec2具体实现
// 解码器,编码器的封装类
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()
// 解码器
.addLast("decoder", adapter.getDecoder())
// 编码器
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
// 事件处理handler
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
通过ChannelHandlers.wrapInternal的分析得handler链为
解析说明:
- decoder触码
- idle空闲监听
- NettyServerHandler转发
- MultiMessageHandler List处理
- HeartbeatHandler 心跳监听
- Dispatcher 通过spi进行任务分配
- 业务Handler 自己写的业务代码
- Encoder 编码发送
最终得Transport模型如下所示:
Exchanger
Exchanger在Transporter之上结合Jdk提供的CompletableFuture类,完成了异步转同步的过程。先看例子:
public class ExchangeTest {
public static void main(String[] args) throws RemotingException, ExecutionException, InterruptedException {
URL url = URL.valueOf("dubbo://127.0.0.1:28092/?timeout=12000");
ExchangeServer server = Exchangers.bind(url, new Replier(){
@Override
public Object reply(ExchangeChannel channel, Object request) throws RemotingException {
System.out.println("accept msg:" + request);
return "ok";
}
});
ExchangeClient client = Exchangers.connect(url);
CompletableFuture<Object> future = client.request("yyyyyy", Executors.newSingleThreadExecutor());
System.out.println(future.get());
}
}
Exchanger对上述的handler模主要做了2处改动
- 利用spi将解析器换成ExchangeCodec
主要给原本的传输内容附加了header信息,来标识请求一些属性如id,version - 对业务Handler进行扩充成
解析说明:
- DecodeHandler 结合《dubbo之Serialization》加解码body内容
- HeaderExchangeHandler 同步异步处理
客户端调用
client.request
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
// 创建Request包装类
// 生成请求id,具体请求数据,请求版本等
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
// CompletableFuture的继承类
// 提供 DefaultFuture getFuture(id)方法找到并设置结果
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
HeaderExchangeHandler.received
// response对在Codec封装
public static void received(Channel channel, Response response, boolean timeout) {
try {
// 找到请求时的future
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
Timeout t = future.timeoutCheckTask;
if (!timeout) {
// decrease Time
t.cancel();
}
// 设置结果
future.doReceived(response);
} else {
// error
}
} finally {
CHANNELS.remove(response.getId());
}
}