dubbo之网络通讯

前言

在《dubbo架构篇》中可知通信层主要有2局份组成Transporter和Exchanger。Transporter负责netty的封装,Exchanger负责将异步同步化。

Transporter

结合《Netty概述》中介绍其API。dubbo对其进行了如下封装:

public class TransportTest {

    public static void main(String[] args) throws Exception {

        URL url = URL.valueOf("dubbo://127.0.0.1:28092/");

        RemotingServer server = Transporters.bind(url, new ChannelHandlerAdapter() {
            @Override
            public void received(Channel channel, Object message) throws RemotingException {
                System.out.println("accept msg:" + message);
                channel.send("ok");
            }

        });

        Client client =  Transporters.connect(url, new ChannelHandlerAdapter(){
            @Override
            public void received(Channel channel, Object message) throws RemotingException {
                System.out.println("client accept msg:" + message);
            }
        });

        client.send("yoyoyoy");
    }
}

通过源码了解到期主要类NettyServer

 protected void doOpen() throws Throwable {
        bootstrap = new ServerBootstrap();
        
        // 主线程
        bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
        // io线程
        workerGroup = NettyEventLoopFactory.eventLoopGroup(
                getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                "NettyServerWorker");
        
        // 代理handler,最终会指向
        // @see ChannelHandlers.wrapInternal 
        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NettyEventLoopFactory.serverSocketChannelClass())
                .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        // 获取根据url参数及spi接口Codec2具体实现
                        // 解码器,编码器的封装类
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        
                        ch.pipeline()
                                 // 解码器
                                .addLast("decoder", adapter.getDecoder())
                                // 编码器 
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                // 事件处理handler
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

通过ChannelHandlers.wrapInternal的分析得handler链为

decoder idle NettyServerHandler MultiMessageHandler HeartbeatHandler Dispatcher 业务Handler Encoder

解析说明:

  1. decoder触码
  2. idle空闲监听
  3. NettyServerHandler转发
  4. MultiMessageHandler List处理
  5. HeartbeatHandler 心跳监听
  6. Dispatcher 通过spi进行任务分配
  7. 业务Handler 自己写的业务代码
  8. Encoder 编码发送

最终得Transport模型如下所示:
dubbo之网络通讯

Exchanger

Exchanger在Transporter之上结合Jdk提供的CompletableFuture类,完成了异步转同步的过程。先看例子:

public class ExchangeTest {

    public static void main(String[] args) throws RemotingException, ExecutionException, InterruptedException {

        URL url = URL.valueOf("dubbo://127.0.0.1:28092/?timeout=12000");

        ExchangeServer server =  Exchangers.bind(url, new Replier(){

            @Override
            public Object reply(ExchangeChannel channel, Object request) throws RemotingException {
                System.out.println("accept msg:" + request);
                return "ok";
            }
        });

        ExchangeClient client = Exchangers.connect(url);

        CompletableFuture<Object> future = client.request("yyyyyy", Executors.newSingleThreadExecutor());

        System.out.println(future.get());

    }
}

Exchanger对上述的handler模主要做了2处改动

  1. 利用spi将解析器换成ExchangeCodec
    主要给原本的传输内容附加了header信息,来标识请求一些属性如id,version
  2. 对业务Handler进行扩充成
DecodeHandler HeaderExchangeHandler 业务Handler

解析说明:

  1. DecodeHandler 结合《dubbo之Serialization》加解码body内容
  2. HeaderExchangeHandler 同步异步处理

客户端调用

client.request

 public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        
        // 创建Request包装类
        // 生成请求id,具体请求数据,请求版本等
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        // CompletableFuture的继承类
        // 提供 DefaultFuture getFuture(id)方法找到并设置结果
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

HeaderExchangeHandler.received

// response对在Codec封装
 public static void received(Channel channel, Response response, boolean timeout) {
        try {
            // 找到请求时的future
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                Timeout t = future.timeoutCheckTask;
                if (!timeout) {
                    // decrease Time
                    t.cancel();
                }
                // 设置结果
                future.doReceived(response);
            } else {
                // error
            }
        } finally {
           
            CHANNELS.remove(response.getId());
        }
    }

主要参考

源码分析Dubbo网络通讯篇概要总结

上一篇:《Java动手撸源码》手写实现Future设计模式


下一篇:Python并发请求之requests_future模块使用