Netty 为许多提供了许多预置的编解码器和处理器,几乎可以开箱即用,减少了在烦琐事务上话费的时间和精力
空闲的连接和超时
检测空闲连接以及超时对于释放资源来说至关重要,Netty 特地为它提供了几个 ChannelHandler 实现
名称 | 描述 |
---|---|
IdleStateHandler | 当连接空闲时间太长时,将会触发一个 IdleStateEvent 事件,然后,你可以通过在 ChannelInboundHandler 重写 userEventTriggered() 方法来处理该 IdleStateEvent 事件 |
ReadTimeoutHandler | 如果在指定的时间间隔内没有收到入站数据,则抛出一个 ReadTimeoutException 并关闭对应的 Channel。可以通过重写你的 ChannelHandler 中的 exceptionCaught() 方法来检测该 ReadTimeoutException |
WriteTimeoutHandler | 如果在指定的时间间隔内没有出站数据写入,则抛出一个 WriteTimeoutException 并关闭对应的 Channel。可以通过重写你的 ChannelHandler 中的 exceptionCaught() 方法来检测该 WriteTimeoutException |
下述代码展示了当使用通常的发送心跳消息到远程节点的方法时,如果 60 秒内没有接收或者发送任何数据,我们将得到通知,如果没有响应,则连接会关闭
public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// IdleStateHandler 将在被触发时发送一个 IdleStateEvent 事件
pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));
// 将一个 HeartbeatHandler 添加到 ChannelPipeline
pipeline.addLast(new HeartbeatHandler());
}
public static final class HeartbeatHandler extends SimpleChannelInboundHandler {
// 发送到远程节点的心跳消息
private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled
.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.ISO_8859_1));
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
// 发送心跳消息,并在发送失败时关闭该连接
ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate())
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
}
}
}
解码基于分隔符的协议
基于分隔符的消息协议使用定义的字符来标记消息或者消息段的开头或者结尾,下表列出的解码器能帮助你定义可以提取由任意标记序列分隔的帧的自定义解码器
名称 | 描述 |
---|---|
DelimiterBasedFrameDecoder | 使用由用户提供的分隔符来提取帧 |
LineBasedFrameDecoder | 由行尾符(\n 或者 \r\n)分隔帧 |
下述代码展示了如何使用 LineBasedFrameDecoder 处理由行尾符分隔的帧
public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 该 LineBasedFrameDecoder 将提取的帧转发给下一个 ChannelInboundHandler
pipeline.addLast(new LineBasedFrameDecoder(64 * 1024));
// 添加 FrameHandler 以接收帧
pipeline.addLast(new FrameHandler());
}
public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
// do something
}
}
}
如果你还使用除了行尾符之外的分隔符来分隔帧,那么你还可以使用 DelimiterBasedFrameDecoder,只需要将特定的分隔符序列指定到其构造函数即可
作为示例,我们将使用下面的协议规范:
- 传入数据流是一系列的帧,每个帧都由换行符 \n 来分隔
- 每个帧都由一系列元素组成,每个元素由单个空格字符分隔
- 一个帧的内容代表一个命令,定义为一个命令名称后跟着数目可变的参数
基于这个协议,我们的自定义解码器将定义以下类:
- Cmd —— 将帧的命令存储在 ByteBuf 中,一个 ByteBuf 用于名称,另一个用于参数
- CmdDecoder —— 从被重写了的 decode() 方法中获取一行字符串,从它的内容构建一个 Cmd 实例
- CmdHandler —— 从 CmdDecoder 获取解码的 Cmd 对象,并对它进行一些处理
public class CmdHandlerInitializer extends ChannelInitializer<Channel> {
static final byte SPACE = (byte) ' ';
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new CmdDecoder(64 * 1024));
pipeline.addLast(new CmdHandler());
}
/**
* Cmd POJO
*/
public static final class Cmd {
private final ByteBuf name;
private final ByteBuf args;
public Cmd(ByteBuf name, ByteBuf args) {
this.name = name;
this.args = args;
}
public ByteBuf getArgs() {
return args;
}
public ByteBuf getName() {
return name;
}
}
public static final class CmdDecoder extends LineBasedFrameDecoder {
public CmdDecoder(int maxLength) {
super(maxLength);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
// 从 ByteBuf 中提取由行尾符序列分隔的帧
ByteBuf frame = (ByteBuf) super.decode(ctx, buffer);
// 如果输入中没有帧,则返回 null
if (frame == null) {
return null;
}
// 查找第一个空格字符的索引,前面是命令名称,后面是参数
int index = frame.indexOf(frame.readerIndex(), frame.writerIndex(), SPACE);
// 使用包含命令名称和参数的切片创建新的 Cmd 对象
return new Cmd(frame.slice(frame.readerIndex(), index), frame.slice(index + 1, frame.writerIndex()));
}
}
public static final class CmdHandler extends SimpleChannelInboundHandler<Cmd> {
@Override
protected void messageReceived(ChannelHandlerContext ctx, Cmd msg) throws Exception {
// 处理传经 ChannelPipeline 的 Cmd 对象
}
}
}
基于长度的协议
基于长度的协议通过将它的长度编码到帧的头部来定义帧,而不是使用特殊的分隔符来标记它的结束,下表列出 Netty 提供的用于处理这种类型的协议的两种解码器
名称 | 描述 |
---|---|
FixedLengthFrameDecoder | 提取在调用构造函数时指定的定长帧 |
LengthFieldBasedFrameDecoder | 根据帧头部中的长度值来提取帧:该字段的偏移量以及长度在构造函数中指定 |
你经常会遇到被编码到消息头部的帧大小不是固定值的协议,为了处理这种变长帧,你可以使用 LengthFieldBasedFrameDecoder,它将从头部字段确定帧长,然后从数据流中提取指定的字节数
下图展示了一个示例,其中长度字段在帧中的偏移量为 0,并且长度为 2 字节
下述代码展示了如何使用其 3 个构造函数分别为 maxFrameLength、lengthFieldOffser 和 lengthFieldLength 的构造函数。在这个场景下,帧的长度被编码到了帧起始的前 8 个字节中
public class LengthBasedInitializer extends ChannelInitializer<Channel> {
/**
* 使用 LengthFieldBasedFrameDecoder 解码将帧长度编码到帧起始的前 8 个字节中的消息
*/
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(64 * 1024, 0, 8));
pipeline.addLast(new FrameHandler());
}
public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
// do something
}
}
}
写大型数据
因为网络饱和的可能性,如何在异步框架中高效地写大块的数据是一个特殊的问题。由于写操作是非阻塞的,所以即时没有写出所有的数据,写操作也会在完成时返回并通知 ChannelFuture。当这种情况发生时,如果仍然不停地写入,就有内存耗尽的风险。所以在写大型数据时,需要考虑处理远程节点的连接是慢速连接的情况,这种情况会导致内存释放的延迟。让我们考虑下将一个文件内容写出到网络的情况
NIO 的零拷贝特性,这种特性消除了将文件的内容从文件系统移动到网络栈的复制过程。所有这一切都发生在 Netty 的核心中,所以应用程序需要做的就是使用一个 FileRegion 接口的实现
下述代码展示了如何通过从 FileInputStream 创建一个 DefaultFileRegion,并将其写入 Channel
// 创建一个 FileInputStream
leInputStream in = new FileInputStream(file);
// 以该文件的的完整长度创建一个新的 DefaultFileRegion
FileRegion region = new DefaultFileRegion(in.getChannel(), 0, file.length());
// 发送该 DefaultFileRegion,并注册一个 ChannelFutureListener
channel.writeAndFlush(region).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// 处理失败
if(!future.isSuccess()) {
Throwable cause = future.cause();
// do something
}
}
});
这个示例只适用于文件内容的直接传输,不包括应用程序对数据的任何处理。在需要将数据从文件系统复制到用户内存中时,可以使用 ChunkedWriteHandler,它支持异步写大型数据流,而又不会导致大量的内存消耗
interface ChunkedInput<B> 中的类型参数 B 是 readChunk() 方法返回的类型。Netty 预置了该接口的四个实现,如表所示,每个都代表了一个将由 ChunkedWriteHandler 处理的不定长度的数据流
名称 | 描述 |
---|---|
ChunkedFile | 从文件中逐块获取数据,当你的平台不支持零拷贝或者你需要转换数据时使用 |
ChunkedNioFile | 和 ChunkedFile 类似,只是它使用了 FileChannel |
ChunkedStream | 从 InputStream 中逐块传输内容 |
ChunkedNioStream | 从 ReadableByteChannel 中逐步传输内容 |
下述代码说明了 ChunkedStream 的用法,它是实践中最常用的实现。所示的类使用了一个 File 以及一个 SSLContext 进行实例化,当 initChannel() 方法被调用时,它将使用所示的 ChannelHandler 链初始化该 Channel
当 Channel 的状态变为活动时,WriteStreamHandler 将会逐块地把来自文件中的数据作为 ChunkedStream 写入
public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> {
private final File file;
private final SslContext sslContext;
public ChunkedWriteHandlerInitializer(File file, SslContext sslContext) {
this.file = file;
this.sslContext = sslContext;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new SslHandler(sslContext.newEngine(ch.alloc())));
// 添加 ChunkedWriteHandler 以处理作为 ChunkedInput 传入的数据
pipeline.addLast(new ChunkedWriteHandler());
// 一旦连接建立,WriteStreamHandler 就开始写文件数据
pipeline.addLast(new WriteStreamHandler());
}
public final class WriteStreamHandler extends SimpleChannelInboundHandler<Channel> {
/**
* 当连接建立时,channelActive() 方法将使用 ChunkedInput 写文件数据
*/
@Override
protected void messageReceived(ChannelHandlerContext ctx, Channel msg) throws Exception {
super.channelActive(ctx);
ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));
}
}
}
序列化数据
JDK 提供了 ObjectOutputStream 和 ObjectInputStream,用于通过网络对 POJO 的基本数据类型和图进行序列化和反序列化。该 API 并不复杂,可以被应用于任何实现了 java.io.Serializable 接口的对象。但它的性能并不高效,在这一节,我们将看到 Netty 如何实现序列化
1. JDK 序列化
如果你的程序必须要和使用了 ObjectOutputStream 和 ObjectInputStream 的远程节点交互,并且考虑兼容性,那么 JDK 序列化将是正确的选择,下表列出了 Netty 提供的用于和 JDK 进行交互操作的序列化类
名称 | 描述 |
---|---|
CompatibleObjectDecoder | 和使用 JDK 序列化的非基于 Netty 的远程节点进行互操作的解码器 |
CompatibleObjectEncoder | 和使用 JDK 序列化的非基于 Netty 的远程节点进行互操作的编码器 |
ObjectDecoder | 构建于 JDK 序列化之上的使用自定义的序列化来解码的解码器 |
ObjectEncoder | 构建于 JDK 序列化之上的使用自定义的序列化来编码的编码器 |
2. Protocol Buffers 序列化
Protocol Buffers 是一种由 Google 公司开发的、开源的数据交换格式,以一种紧凑而高效的方式对结构化的数据进行编码以及解码,能跨多语言使用。下表展示了 Netty 为支持 Protobuf 所提供的 ChannelHandler 实现
名称 | 描述 |
---|---|
ProtobufDecoder | 使用 Protobuf 对消息进行解码 |
ProtobufEncoder | 使用 Protobuf 对消息进行编码 |
ProtobufVarint32FrameDecoder | 根据消息中的 Google Protobuf Buffers 的 Base 128 Varints 整型长度字段值动态地分割所接收到的 ByteBuf |
ProtobufVarint32LengthFieldPrepender | 由 ByteBuf 前追加一个 Google Protobuf Buffers 的 Base 128 Varints 整型的长度字段值 |