dubbo之NIO服务器-抽象API

写在前面

再java社区中,比较优秀的NIO框架有netty(netty3.x,netty4.x),mina,grizzly,dubbo基于dubbo:\\协议和thrift:\\协议实现了自己的NIO服务器,当然底层会直接使用现有的NIO框架(毕竟重复造*的成本还是比较高的),那么到底选择哪种NIO框架呢?dubbo的做法是让用户自己选择,具体的做法是先提供API层,然后针对具体的NIO框架提供具体的实现,如下:

API层:
    dubbo-remoting-api
实现:
    dubbo-remoting-netty
    dubbo-remoting-netty4
    dubbo-remoting-mina
    dubbo-remoting-grizzly
    dubbo-remoting-p2p

然后再配合Dubbo SPI机制 就可以让用户*选择使用了。本文我们一起来看下dubbo-remoting-api部分。

当我们使用netty来编写网络通信程序时,一般需要如下的四个类:

Server:用来启动服务端的类。
ServerHandler:服务端用来接收处理客户端消息的类。
Client:用来启动客户端的类。
ClientHandler:客户端用来处理服务端消息的类。

dubbo也与此类似,对应的类如下:

Server->com.alibaba.dubbo.remoting.Server
ServerHandler->com.alibaba.dubbo.remoting.ChannelHandler
Client->com.alibaba.dubbo.remoting.Client
ClientHandler->com.alibaba.dubbo.remoting.ChannelHandler

但是实际上,因为dubbo使用的是自定义的协议,如dubbo://,如协议编码接口com.alibaba.dubbo.remoting.Codec2,消息分发接口com.alibaba.dubbo.remoting.Dispatcher,本文涉及到的类图如下:

dubbo之NIO服务器-抽象API

1:Endpoint

Endpoint的英文翻译是端点,可以理解为和外界沟通的门户,用来进一步封装通信连接,源码如下:

public interface Endpoint {
    // 获取URL地址
    URL getUrl();
    // 获取ChannelHandler
    ChannelHandler getChannelHandler();
    // 获取本地网络地址
    InetSocketAddress getLocalAddress();
    // 发送消息
    void send(Object message) throws RemotingException;
    // 发送消息
    void send(Object message, boolean sent) throws RemotingException;
    // 关闭通道
    void close();
    // 优雅方式关闭通道
    void close(int timeout);
    void startClose();
    // 是否关闭
    boolean isClosed();
}

1.1:Channel

接口签名是public interface Channel extends Endpoint{},继承了Endpoint接口,因此Channel也是一个Endpoint,源码如下:

public interface Channel extends Endpoint {
    // 获取通道远端地址
    InetSocketAddress getRemoteAddress();
    // 是否连接
    boolean isConnected();
    // 是否有某个属性
    boolean hasAttribute(String key);
    // 获取属性
    Object getAttribute(String key);
    // 设置属性
    void setAttribute(String key, Object value);
    // 删除属性
    void removeAttribute(String key);
}

该接口是通讯的载体,不同的NIO框架有不同的实现,如com.alibaba.dubbo.remoting.transport.netty4.NettyChannel

1.2:Client

客户端接口,继承了Endpoint接口,因此Client是一个Endpoint,源码如下:

public interface Client extends Endpoint, Channel, Resetable {
    // 重新连接服务端
    void reconnect() throws RemotingException;

    @Deprecated
    void reset(com.alibaba.dubbo.common.Parameters parameters);
}

1.3:Server

服务端接口,继承了Endpoint接口,因此Server是一个Endpoint,源码如下:
源码如下:

public interface Server extends Endpoint, Resetable {
    // 是否绑定本地端口,即已经启动服务,可以被客户端连接和接收消息
    boolean isBound();
    // 获得客户端的连接们
    Collection<Channel> getChannels();
    // 获取指定地址的客户端连接
    Channel getChannel(InetSocketAddress remoteAddress);
    @Deprecated
    void reset(com.alibaba.dubbo.common.Parameters parameters);
}

1.3.1:Resetable

可重置接口,用于服务端根据传入的URL参数重置内部的相关属性,源码如下:

public interface Resetable {
    // 根据传入的URL参数重置内部相关属性
    void reset(URL url);
}

1.4:ChannelHandler

负责通道的逻辑处理,比如从通道中获取消息,发送消息,可以理解为通道的包装器,源码如下:

@SPI
public interface ChannelHandler {
    void connected(Channel channel) throws RemotingException;
    void disconnected(Channel channel) throws RemotingException;
    void sent(Channel channel, Object message) throws RemotingException;
    void received(Channel channel, Object message) throws RemotingException;
    void caught(Channel channel, Throwable exception) throws RemotingException;
}

比如netty4提供的具体实现com.alibaba.dubbo.remoting.transport.netty4.NettyServerHandler,内部会使用netty4的API执行具体操作。

2:Transporter

网络传输接口,源码如下:

@SPI("netty")
public interface Transporter {
    // 绑定Server,使用自适应方法加载对应的Server的Transport实现类
    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    Server bind(URL url, ChannelHandler handler) throws RemotingException;
    // 连接服务器,使用自适应方法加载对应的Client的Transport实现类
    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;
}

使用该类来获取服务端和服务端对应的类Server和Client。

3:Transporters

这是Transport门面类 ,因为不同的NIO框架的获取Server和Client的逻辑是不相同的,且还具有一定的复杂度,不同的NIO框架的具体实现就是不同的子系统,因此这里使用了门面设计模式来简化客户端的使用。

源码如下:

// Transporter门面,简化客户端使用各种不同的Transporter的复杂度
public class Transporters {

    static {
        Version.checkDuplicate(Transporters.class);
        Version.checkDuplicate(RemotingException.class);
    }

    private Transporters() {
    }
    
    // 获取Server的门面方法,会根据url参数的不同获取不同的Transporter最终获取Server
    public static Server bind(String url, ChannelHandler... handler) throws RemotingException {
        return bind(URL.valueOf(url), handler);
    }

    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        // 创建使用Channel进行通信的ChannelHandler,如果是多个则会创建ChannelHandlerDispatcher
        // 内部会循环调用每一个ChannelHandler
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().bind(url, handler);
    }

    // 获取Client的门面方法,会根据url参数的不同获取不同的Transporter最终获取Client
    public static Client connect(String url, ChannelHandler... handler) throws RemotingException {
        return connect(URL.valueOf(url), handler);
    }

    public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        ChannelHandler handler;
        if (handlers == null || handlers.length == 0) {
            handler = new ChannelHandlerAdapter();
        } else if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().connect(url, handler);
    }

    public static Transporter getTransporter() {
        // 2022-02-04 21:22:35
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
    }
}

2022-02-04 21:22:35处是通过dubbo的自适应机制 来动态获取底层NIO框架对应的Transporter。

4:Codec2

消息编解码顶层接口,源码如下:

@SPI
public interface Codec2 {
    // public static final String CODEC_KEY = "codec";,基于Adaptive机制动态调用Codec2$Adaptive
    // 类以获取具体子类调用对应方法
    @Adaptive({Constants.CODEC_KEY})
    void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException;

    // public static final String CODEC_KEY = "codec";,基于Adaptive机制动态调用Codec2$Adaptive
    // 类以获取具体子类调用对应方法
    @Adaptive({Constants.CODEC_KEY})
    Object decode(Channel channel, ChannelBuffer buffer) throws IOException;

    // 处理拆包,粘包策略枚举
    enum DecodeResult {
        NEED_MORE_INPUT, SKIP_SOME_INPUT
    }
}

4.1:Codec

老版本的Codec2,源码如下;

@Deprecated
@SPI
public interface Codec {
    Object NEED_MORE_INPUT = new Object();

    @Adaptive({Constants.CODEC_KEY})
    void encode(Channel channel, OutputStream output, Object message) throws IOException;

    @Adaptive({Constants.CODEC_KEY})
    Object decode(Channel channel, InputStream input) throws IOException;
}

使用了类com.alibaba.dubbo.remoting.transport.codec.CodecAdapter进行适配。

4.2:Decodeable

可编码接口:

public interface Decodeable {
    void decode() throws Exception;
}

5:Dispatcher

消息分发接口:

@SPI(AllDispatcher.NAME)
public interface Dispatcher {
    // 分发消息到线程池中,使用了Adaptive机制
    @Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"})
    // 最后2个参数是为了兼容老版本配置而保留
    ChannelHandler dispatch(ChannelHandler handler, URL url);
}

6:RemotingException

源码如下:

public class RemotingException extends Exception {

    private static final long serialVersionUID = -3160452149606778709L;
    // 本地地址
    private InetSocketAddress localAddress;
    // 远端地址
    private InetSocketAddress remoteAddress;

    public RemotingException(Channel channel, String msg) {
        this(channel == null ? null : channel.getLocalAddress(), channel == null ? null : channel.getRemoteAddress(),
                msg);
    }

    public RemotingException(InetSocketAddress localAddress, InetSocketAddress remoteAddress, String message) {
        super(message);

        this.localAddress = localAddress;
        this.remoteAddress = remoteAddress;
    }

    public RemotingException(Channel channel, Throwable cause) {
        this(channel == null ? null : channel.getLocalAddress(), channel == null ? null : channel.getRemoteAddress(),
                cause);
    }

    public RemotingException(InetSocketAddress localAddress, InetSocketAddress remoteAddress, Throwable cause) {
        super(cause);

        this.localAddress = localAddress;
        this.remoteAddress = remoteAddress;
    }

    public RemotingException(Channel channel, String message, Throwable cause) {
        this(channel == null ? null : channel.getLocalAddress(), channel == null ? null : channel.getRemoteAddress(),
                message, cause);
    }

    public RemotingException(InetSocketAddress localAddress, InetSocketAddress remoteAddress, String message,
                             Throwable cause) {
        super(message, cause);

        this.localAddress = localAddress;
        this.remoteAddress = remoteAddress;
    }

    public InetSocketAddress getLocalAddress() {
        return localAddress;
    }

    public InetSocketAddress getRemoteAddress() {
        return remoteAddress;
    }
}

RemotingException有两个子类异常,分别是ExecutionException,TimeoutException,分别如下:

public class ExecutionException extends RemotingException {
    private static final long serialVersionUID = -2531085236111056860L;
    private final Object request;
    // 省略各种构造函数
}
public class TimeoutException extends RemotingException {
    // 代表是客户端的常量
    public static final int CLIENT_SIDE = 0;
    // 代表是服务端的常量
    public static final int SERVER_SIDE = 1;
    private static final long serialVersionUID = 3122966731958222692L;
    // 阶段
    private final int phase;
    
    // 省略各种构造函数
}
上一篇:JAVA学习笔记(二十四)-常用API


下一篇:接口指南,快递鸟物流开放平台API对接,极兔速递