Dubbo源码解析-远程调用

1.主要调用流程解析

        一个简单的RPC框架调用,涉及代理与网络通信,协议设计等技术的应用,RPC调用需要将调用信息从客户端传递到服务端,其中信息包括了调用接口、调用方法名、方法参数类型和参数值等,在传递方法参数值时需要先序列化对象转化为二进制流并经过网络传输到服务端,在服务端需要按照客户端的序列方法反序列化二进制字节流。Dubbo的一次完整的RPC调用流程如下所示。

Dubbo源码解析-远程调用

        客户端每次启动都会从注册中心拉取和订阅对应的服务列表,Cluster会把拉取的服务列表聚合成一个Invoker,每次RPC调用前都会通过Directory#list获取服务提供者的元数据,获取的这些服务列表给后续的Router和LoadBalance使用。 在发起服务调用后,将路由结果得到的服务列表作为负载均衡参数,经过负载均衡后选出一台机器进行RPC调用。客户端经过路由和负载均衡后,会将请求交给底层I/O线程池(如Netty)处理,I/O线程池主要处理读写、序列化和反序列化等逻辑,这里线程池分为一种由Netty管理的I/O线程池,另一种是Dubbo业务线程池用来实现业务方法调用。

2.网络通信模型解析

        Dubbo框架的RPC调用的网络通信是通过Exchange/Transport/Serialize层实现的,主要实现代码在Remoting包中,其中主要一些定义接口如下所示。可以抽象为Endpoint,Server,Client,Channel等概念,Client与Server经过信道Channel进行通信。 Exchanger数据交互依赖底层的Transport层进行实际的数据传输,同时包括了与Transport层相关的Server和Client。

Dubbo源码解析-远程调用

本质上,RPC通信是一个典型的C/S架构的通信,Dubbo的网络通信流程如下所示。通过客户端首先要连接到服务器端,其中通道以URL作为标识存储,客户端发送通信内容,经过序列化和协议编码后形成完整通信报文发送到服务器端,服务器端会将报文内容进行解码和反序列化,并经过众多的通道处理节点,将报文内容进处理和分发,并提交到线程池中处理。 Dubbo框架提供了大量的扩展点,信息交互的扩展点Exchanger接口,默认为HeaderExchanger实现,信息通信链路层的扩展点Transporter接口,默认是NettyTransporter实现类,还有信道处理扩展点ChannelHandle接口,实现类有很多,Codec2编解码接口,默认是DubboCodec实现,序列化扩展点接口Serialization,默认实现为Hessian2Serializaion实现类。

Dubbo源码解析-远程调用

3.Exchange层实现解析

        由RPC协议实现模块分析可知,远程传输模块主要为RPC协议中的Dubbo协议提供,Exchange层的主要职责就是提供器生产者和消费者之间的数据传输功能。服务器和客户端初始化的时候就会调用DubboProtocol#protocolBindingRefer和 DubboProtocol#export方法进行服务消费端与生产端的初始化,其中又会调用DubboProtocol#getClients和DubboProtocol#createServer进行服务器与客户端的初始化。

private ProtocolServer createServer(URL url) {
        ExchangeServer server;
        try {
            //绑定监听端口与处理handler
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            ...
        }
        str = url.getParameter(CLIENT_KEY);
        if (str != null && str.length() > 0) {
            //支持多种协议的通信服务器
            Set<String> supportedTypes = 
                ExtensionLoader.getExtensionLoader(Transporter.class)
                .getSupportedExtensions();
        }
        return new DubboProtocolServer(server);
    }
}

private ExchangeClient[] getClients(URL url) {
        //有多种模式,可以多个远程调用公用一个客户端实例,也可以不同服务调用对应不同的客户端实例
        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (useShareConnect) {
                clients[i] = shareClients.get(i);
            } else {
                clients[i] = initClient(url);
            }
        }
        return clients;
}

private ExchangeClient initClient(URL url) {
        ExchangeClient client;
        try {
            //判断是否需要延迟类型的客户端
            if (url.getParameter(LAZY_CONNECT_KEY, false)) {
                client = new LazyConnectExchangeClient(url, requestHandler);`
            } else {
                //连接服务器并指定请求处理器
                client = Exchangers.connect(url, requestHandler);
            }
            ...
        } catch (RemotingException e) {
            ...
        }
        return client;
}
复制代码

       Exchaner层实现的主要类结构如下所示,即每个客户端Client通过Channel与服务器Server通信,并通过ChannelHandler处理Channel的相关事件,服务器Server端维护与客户端连接Channel集合。

Dubbo源码解析-远程调用

       Exchanger是一个扩展点,里面包括了两个方法,入参都是URL和ExchangerHandler,返回的是Server和Client, 这两个方法分别用在服务消费者和服务器提供者上。

  • 服务消费者: 包含了一个ExchangeClient, 通过connect方法连接服务提供者,并指定对应的ExchangeHandler用于远程调用的处理,将请求的方法和参数通过URL的方式发送给提供者;

  • 服务提供者: 包含了一个ExchangeServer,通过bind方法类绑定到指定URL和端口,用于监听消费者的连接和远程调用请求,并指定ExchangeHandler来处理请求,根据请求方法和参数调用本地方法,获取结果返回给服务消费者

    @SPI(HeaderExchanger.NAME) public interface Exchanger {

    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;
    
    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
    复制代码

    }

这里可以看到Server/Client都是通过Exchanges类生成的,在bind/connect调用的时候会去加载Exchanger的扩展类,这里HeaderExchanger作为默认扩展实现。Client和Server的底层通信是通过Transporter实现的,Transporter完成bind绑定创建并返回Server对象,完成connect连接服务端创建Client对象。为了支持不同协议/不同的通信实现方式,框架将Transporter设计为SP扩展点,默认实现为NettyTransporter类,具体bind和connect时会调用Transporters去加载并生成不同的实现的客户端和服务端。

主要调用链路如下所示:

Exchangers#bind/#connect -> Transporters#bind/#connect -> Server/Client

其中除Transporter实现可根据URL选择对应的SPI实现外,其他基本为固定模式;Server和Client根据Transporter的不同实现,而使用不同的Server,Client和Channel,具体在传输模块分析。虽然Exchange为SPI,支持Adaptive,Exchange的实现只用HeaderExchange。

3.1 ExchangeChannel实现解析

ExchangeClient和ExchangeServer是通过ExchangeChannel进行通信的,在ExchangeChannel接口类中,主要定义了请求和获取信道处理类对象的方法,其中都是请求后返回异步任务,这里默认是现实HeaderExchangeChannel。

public interface ExchangeChannel extends Channel {

    CompletableFuture<Object> request(Object request, ExecutorService executor);

    CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor);    
    
    ExchangeHandler getExchangeHandler();
}
复制代码

在ExchangeChannel通信中, 所有的RPC的请求-回复信息会封装在Requst和Response两个类中。在Request中,每次发送RPC请求都会带上唯一的id标识,这个id是通过自增生成的,当达到最大值后又会从最小值开始生成,还记录了当前的框架版本和请求数据等信息。在Response中,同样也有请求id,框架版本、错误信息、结果等信息。

public class Request {
    //请求id生成器
    private static final AtomicLong INVOKE_ID = new AtomicLong(0);
    //请求id,唯一标识
    private final long mId;
    //当前版本
    private String mVersion;
    //是否双向
    private boolean mTwoWay = true;
    //是否为事件
    private boolean mEvent = false;
    //请求数据
    private Object mData;
    //新建对象时,先获取到新的请求id
    public Request() {
        mId = newId();
    }
    //通过线程安全的自增long类型
    private static long newId() {
        //达到最大值后又重新回到最小值
        return INVOKE_ID.getAndIncrement();
    }
}

public class Response {
     ...各种类型的回复状态码定义
    //请求id,唯一标识
    private long mId = 0;    //当前框架版本
    private String mVersion;
    //状态码
    private byte mStatus = OK;
    //是否为事件
    private boolean mEvent = false;
    //错误信息
    private String mErrorMsg;
    //结果对象
    private Object mResult;
}
复制代码

      在HeaderExchangeChannel实现,调用request方法时都会去调用ExchangeHandler#send方法,并把最后的结果作为异步结果返回。

public final class HeaderExchangeChannel implements ExchangeChannel {

    ....
    @Override
    public void sent(Channel channel, Object message) throws RemotingException {
        try {
            ExchangeChannel exchangeChannel = HeaderExchangeChannel
                    .getOrAddChannel(channel);
            //信道处理类中发送
            handler.sent(exchangeChannel, message);
        } catch (Throwable t) {
            exception = t;
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
        if (message instanceof Request) {
            Request request = (Request) message;
            DefaultFuture.sent(channel, request);
        }
        ...
    }

    ...
    @Override
    public CompletableFuture<Object> request(Object request, 
            int timeout, ExecutorService executor) {
        //构建RPC请求
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        //构造异步任务,并指定线程执行
        DefaultFuture future = 
            DefaultFuture.newFuture(channel, req, timeout, executor);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }
    ...
}
复制代码

3.2 ExchangHandler实现解析

       ExchangeHandler作为Exchange的信道处理器接口,继承了ChannelHannler的定义方法。·主要定义信道连接/信道断开连接/发送信息/接收信息/信道异常下的处理方法。

@SPI
public interface ChannelHandler {
    
    void connected(Channel channel) throws RemotingException;
    
    void disconnected(Channel channel) throws RemotingException;
  
    void sent(Channel channel, Object message) throws RemotingException;
   
    void received(Channel channel, Object message) throws RemotingException;

    void caught(Channel channel, Throwable exception) throws RemotingException;
}

public interface ExchangeHandler extends ChannelHandler, TelnetHandler {
    
    CompletableFuture<Object> reply(ExchangeChannel channel, Object request) throws RemotingException;

}
复制代码

        ExchangeHandlerDispatcher继承了ExchangeHandler,持有了处理分发器ChannelHandlerDispatcher的对象,当调用send方法时会去调用分发器的send方法,触发ChannelHandler对象列表的sent方法,同理比如连接/断开都会逐一调用。 处理器集合会在调用HeaderExchanger的connect/bind d的时候进行初始化。

public class ExchangeHandlerDispatcher implements ExchangeHandler {
    private final ChannelHandlerDispatcher handlerDispatcher;
    @Override
    public void sent(Channel channel, Object message) {
        handlerDispatcher.sent(channel, message);
    }
    ...
}

public class ChannelHandlerDispatcher implements ChannelHandler {    //信道处理类集合
    private final Collection<ChannelHandler> channelHandlers =
             new CopyOnWriteArraySet<ChannelHandler>();
    ...
    @Override
    public void sent(Channel channel, Object message) {
        for (ChannelHandler listener : channelHandlers) {
            try {
                listener.sent(channel, message);
            } catch (Throwable t) {
                logger.error(t.getMessage(), t);
            }
        }
    }
    ...
}

public class HeaderExchanger implements Exchanger {
    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, 
            new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, 
            new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
}
复制代码

4. 请求-响应模型解析

       在实际使用场景中,客户端会使用多线程并发调用服务,Dubbo框架利用通信内容中的全局请求id标识进行正确响应调用。

Dubbo源码解析-远程调用

     实现这个请求-响应模型主要依靠DefaultFuture这个类,这个类主要存储了请求id与通信信道的对应关系和请求id与任务关系,任务执行线程池、定时调度器,还有发送/接收几个方法,通过DefaultFuture任务、线程池、定时调度器等工具类将异步的请求转化为同步。

public class DefaultFuture extends CompletableFuture<Object> {
    //记录请求id与通信信道的对应关系
    private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<>();
    //记录请求id与对应的Future任务的对应关系
    private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
    //通信信道
    private final Channel channel;
    //请求
    private final Request request;
    //执行任务的线程池
    private ExecutorService executor;
    //定时任务调度器
    public static final Timer TIME_OUT_TIMER = new HashedWheelTimer(...);

    //新建任务
    public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
        final DefaultFuture future = new DefaultFuture(channel, request, timeout);
        future.setExecutor(executor);
        if (executor instanceof ThreadlessExecutor) {
            //放入线程池中执行,并等待任务
            ((ThreadlessExecutor) executor).setWaitingFuture(future);
        }
        //
        timeoutCheck(future);
        return future;
    }

    //超时检查方法
    private static void timeoutCheck(DefaultFuture future) {
        //以请求id作为唯一标识新建检查超时任务,并放入调度器
        TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
        future.timeoutCheckTask = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
    }
    
    //发送
    public static void sent(Channel channel, Request request) {
        DefaultFuture future = FUTURES.get(request.getId());
        if (future != null) {
            //这里只是简单
            future.doSent();
        }
    }

    //接收
    public static void received(Channel channel, Response response, boolean timeout) {
        try {
            //根据请求id从对应关系中取出现有的任务
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                //如果有则去除超时检查
                Timeout t = future.timeoutCheckTask;
                if (!timeout) {
                    t.cancel();
                }
                future.doReceived(response);
            } else {
        } finally {
            //最后删除掉请求id与信道的对应关系
            CHANNELS.remove(response.getId());
        }
    }
 
    private void doReceived(Response res) {
        ...
        if (res.getStatus() == Response.OK) {
            //正常回复处理,将回复结果塞入任务结果中,结束任务
            this.complete(res.getResult());
        } else if (res.getStatus() == Response.CLIENT_TIMEOUT
                 || res.getStatus() == Response.SERVER_TIMEOUT) {
            //超时异常情况处理,将异常信息放入任务结果中,结束任务
            this.completeExceptionally(new TimeoutException(res.getStatus()
                 == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
        } else {
            this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
        }
        ...
    }

    //调度任务
    private static class TimeoutCheckTask implements TimerTask {

        @Override
        public void run(Timeout timeout) {
            //根据请求id获取对应任务,如果任务为空或任务已完成就返回
            DefaultFuture future = DefaultFuture.getFuture(requestID);
            if (future == null || future.isDone()) {
                return;
            }
            //任务未完成则的使用
            if (future.getExecutor() != null) {
                future.getExecutor().execute(() -> notifyTimeout(future));
            } else {
                notifyTimeout(future);
            }
        }

        //进行超时通知
        private void notifyTimeout(DefaultFuture future) {
            //构建超时回复
            Response timeoutResponse = new Response(future.getId());
            timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
            timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
            DefaultFuture.received(future.getChannel(), timeoutResponse, true);
        }
    }
}
复制代码

5. 总结

       主要介绍了RPC调用的主要流程、Dubbo框架远程通信的网络模型,讲解了Exchange层实现的主要架构,以及Exchanger/ExchangeChannel/ExchangeHandler的相关实现。

 

文章持续更新,微信搜索『撸java源码』,关注后第一时间收到推送的技术文章


 
 

上一篇:Python并发请求之requests_future模块使用


下一篇:python 协程(3)-- asyncio的EventLoop以及Future详解