写在前面
再java社区中,比较优秀的NIO框架有netty(netty3.x,netty4.x),mina,grizzly,dubbo基于dubbo:\\
协议和thrift:\\
协议实现了自己的NIO服务器,当然底层会直接使用现有的NIO框架(毕竟重复造*的成本还是比较高的),那么到底选择哪种NIO框架呢?dubbo的做法是让用户自己选择,具体的做法是先提供API层,然后针对具体的NIO框架提供具体的实现,如下:
API层:
dubbo-remoting-api
实现:
dubbo-remoting-netty
dubbo-remoting-netty4
dubbo-remoting-mina
dubbo-remoting-grizzly
dubbo-remoting-p2p
然后再配合Dubbo SPI机制 就可以让用户*选择使用了。本文我们一起来看下dubbo-remoting-api
部分。
当我们使用netty来编写网络通信程序时,一般需要如下的四个类:
Server:用来启动服务端的类。
ServerHandler:服务端用来接收处理客户端消息的类。
Client:用来启动客户端的类。
ClientHandler:客户端用来处理服务端消息的类。
dubbo也与此类似,对应的类如下:
Server->com.alibaba.dubbo.remoting.Server
ServerHandler->com.alibaba.dubbo.remoting.ChannelHandler
Client->com.alibaba.dubbo.remoting.Client
ClientHandler->com.alibaba.dubbo.remoting.ChannelHandler
但是实际上,因为dubbo使用的是自定义的协议,如dubbo://
,如协议编码接口com.alibaba.dubbo.remoting.Codec2
,消息分发接口com.alibaba.dubbo.remoting.Dispatcher
,本文涉及到的类图如下:
1:Endpoint
Endpoint的英文翻译是端点
,可以理解为和外界沟通的门户
,用来进一步封装通信连接,源码如下:
public interface Endpoint {
// 获取URL地址
URL getUrl();
// 获取ChannelHandler
ChannelHandler getChannelHandler();
// 获取本地网络地址
InetSocketAddress getLocalAddress();
// 发送消息
void send(Object message) throws RemotingException;
// 发送消息
void send(Object message, boolean sent) throws RemotingException;
// 关闭通道
void close();
// 优雅方式关闭通道
void close(int timeout);
void startClose();
// 是否关闭
boolean isClosed();
}
1.1:Channel
接口签名是public interface Channel extends Endpoint{}
,继承了Endpoint接口,因此Channel也是一个Endpoint,源码如下:
public interface Channel extends Endpoint {
// 获取通道远端地址
InetSocketAddress getRemoteAddress();
// 是否连接
boolean isConnected();
// 是否有某个属性
boolean hasAttribute(String key);
// 获取属性
Object getAttribute(String key);
// 设置属性
void setAttribute(String key, Object value);
// 删除属性
void removeAttribute(String key);
}
该接口是通讯的载体,不同的NIO框架有不同的实现,如com.alibaba.dubbo.remoting.transport.netty4.NettyChannel
1.2:Client
客户端接口,继承了Endpoint接口,因此Client是一个Endpoint,源码如下:
public interface Client extends Endpoint, Channel, Resetable {
// 重新连接服务端
void reconnect() throws RemotingException;
@Deprecated
void reset(com.alibaba.dubbo.common.Parameters parameters);
}
1.3:Server
服务端接口,继承了Endpoint接口,因此Server是一个Endpoint,源码如下:
源码如下:
public interface Server extends Endpoint, Resetable {
// 是否绑定本地端口,即已经启动服务,可以被客户端连接和接收消息
boolean isBound();
// 获得客户端的连接们
Collection<Channel> getChannels();
// 获取指定地址的客户端连接
Channel getChannel(InetSocketAddress remoteAddress);
@Deprecated
void reset(com.alibaba.dubbo.common.Parameters parameters);
}
1.3.1:Resetable
可重置接口,用于服务端根据传入的URL参数重置内部的相关属性,源码如下:
public interface Resetable {
// 根据传入的URL参数重置内部相关属性
void reset(URL url);
}
1.4:ChannelHandler
负责通道的逻辑处理,比如从通道中获取消息,发送消息,可以理解为通道的包装器,源码如下:
@SPI
public interface ChannelHandler {
void connected(Channel channel) throws RemotingException;
void disconnected(Channel channel) throws RemotingException;
void sent(Channel channel, Object message) throws RemotingException;
void received(Channel channel, Object message) throws RemotingException;
void caught(Channel channel, Throwable exception) throws RemotingException;
}
比如netty4提供的具体实现com.alibaba.dubbo.remoting.transport.netty4.NettyServerHandler
,内部会使用netty4的API执行具体操作。
2:Transporter
网络传输接口,源码如下:
@SPI("netty")
public interface Transporter {
// 绑定Server,使用自适应方法加载对应的Server的Transport实现类
@Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
Server bind(URL url, ChannelHandler handler) throws RemotingException;
// 连接服务器,使用自适应方法加载对应的Client的Transport实现类
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
Client connect(URL url, ChannelHandler handler) throws RemotingException;
}
使用该类来获取服务端和服务端对应的类Server和Client。
3:Transporters
这是Transport
的门面类 ,因为不同的NIO框架的获取Server和Client的逻辑是不相同的,且还具有一定的复杂度,不同的NIO框架的具体实现就是不同的子系统,因此这里使用了门面设计模式来简化客户端的使用。
源码如下:
// Transporter门面,简化客户端使用各种不同的Transporter的复杂度
public class Transporters {
static {
Version.checkDuplicate(Transporters.class);
Version.checkDuplicate(RemotingException.class);
}
private Transporters() {
}
// 获取Server的门面方法,会根据url参数的不同获取不同的Transporter最终获取Server
public static Server bind(String url, ChannelHandler... handler) throws RemotingException {
return bind(URL.valueOf(url), handler);
}
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
// 创建使用Channel进行通信的ChannelHandler,如果是多个则会创建ChannelHandlerDispatcher
// 内部会循环调用每一个ChannelHandler
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}
// 获取Client的门面方法,会根据url参数的不同获取不同的Transporter最终获取Client
public static Client connect(String url, ChannelHandler... handler) throws RemotingException {
return connect(URL.valueOf(url), handler);
}
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().connect(url, handler);
}
public static Transporter getTransporter() {
// 2022-02-04 21:22:35
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
}
2022-02-04 21:22:35
处是通过dubbo的自适应机制 来动态获取底层NIO框架对应的Transporter。
4:Codec2
消息编解码顶层接口,源码如下:
@SPI
public interface Codec2 {
// public static final String CODEC_KEY = "codec";,基于Adaptive机制动态调用Codec2$Adaptive
// 类以获取具体子类调用对应方法
@Adaptive({Constants.CODEC_KEY})
void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException;
// public static final String CODEC_KEY = "codec";,基于Adaptive机制动态调用Codec2$Adaptive
// 类以获取具体子类调用对应方法
@Adaptive({Constants.CODEC_KEY})
Object decode(Channel channel, ChannelBuffer buffer) throws IOException;
// 处理拆包,粘包策略枚举
enum DecodeResult {
NEED_MORE_INPUT, SKIP_SOME_INPUT
}
}
4.1:Codec
老版本的Codec2,源码如下;
@Deprecated
@SPI
public interface Codec {
Object NEED_MORE_INPUT = new Object();
@Adaptive({Constants.CODEC_KEY})
void encode(Channel channel, OutputStream output, Object message) throws IOException;
@Adaptive({Constants.CODEC_KEY})
Object decode(Channel channel, InputStream input) throws IOException;
}
使用了类com.alibaba.dubbo.remoting.transport.codec.CodecAdapter
进行适配。
4.2:Decodeable
可编码接口:
public interface Decodeable {
void decode() throws Exception;
}
5:Dispatcher
消息分发接口:
@SPI(AllDispatcher.NAME)
public interface Dispatcher {
// 分发消息到线程池中,使用了Adaptive机制
@Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"})
// 最后2个参数是为了兼容老版本配置而保留
ChannelHandler dispatch(ChannelHandler handler, URL url);
}
6:RemotingException
源码如下:
public class RemotingException extends Exception {
private static final long serialVersionUID = -3160452149606778709L;
// 本地地址
private InetSocketAddress localAddress;
// 远端地址
private InetSocketAddress remoteAddress;
public RemotingException(Channel channel, String msg) {
this(channel == null ? null : channel.getLocalAddress(), channel == null ? null : channel.getRemoteAddress(),
msg);
}
public RemotingException(InetSocketAddress localAddress, InetSocketAddress remoteAddress, String message) {
super(message);
this.localAddress = localAddress;
this.remoteAddress = remoteAddress;
}
public RemotingException(Channel channel, Throwable cause) {
this(channel == null ? null : channel.getLocalAddress(), channel == null ? null : channel.getRemoteAddress(),
cause);
}
public RemotingException(InetSocketAddress localAddress, InetSocketAddress remoteAddress, Throwable cause) {
super(cause);
this.localAddress = localAddress;
this.remoteAddress = remoteAddress;
}
public RemotingException(Channel channel, String message, Throwable cause) {
this(channel == null ? null : channel.getLocalAddress(), channel == null ? null : channel.getRemoteAddress(),
message, cause);
}
public RemotingException(InetSocketAddress localAddress, InetSocketAddress remoteAddress, String message,
Throwable cause) {
super(message, cause);
this.localAddress = localAddress;
this.remoteAddress = remoteAddress;
}
public InetSocketAddress getLocalAddress() {
return localAddress;
}
public InetSocketAddress getRemoteAddress() {
return remoteAddress;
}
}
RemotingException有两个子类异常,分别是ExecutionException,TimeoutException,分别如下:
public class ExecutionException extends RemotingException {
private static final long serialVersionUID = -2531085236111056860L;
private final Object request;
// 省略各种构造函数
}
public class TimeoutException extends RemotingException {
// 代表是客户端的常量
public static final int CLIENT_SIDE = 0;
// 代表是服务端的常量
public static final int SERVER_SIDE = 1;
private static final long serialVersionUID = 3122966731958222692L;
// 阶段
private final int phase;
// 省略各种构造函数
}