-
Netty是一款用于快速开发高性能的网络应用程序的Java框架,他封装了网络编程的复杂性,使网络编程和Web技术的最新进展能够被比以往更广泛的开发人员接触到
Netty不只是一个接口和一个类的集合,她还定义了一种架构模型以及一套丰富的设计模式
Netty的特点
分类 |
Netty的特性总结 |
设计 |
统一的API,支持多种传输类型,阻塞和非阻塞的,简单而强大的线程模型,真正的无连接数据报套接字支持,链接逻辑组件以支持复用 |
易于使用 |
详实的javadoc以及大量的实例 |
性能 |
拥有比Java核心API更高的吞吐量以及更低的延迟,得益于池化和复用,拥有更低的资源消耗,更少的内存复制 |
健壮性 |
不会因为慢速,快速或者超载的链接而导致OutOfMemory,消除在高速网络中NIO应用程序常见的不公平 读/写 比率 |
安全性 |
完整的SSL/TLS以及StartTLS支持,可用于受限环境,如Applet and OSGI |
社区驱动 |
发布快而且频繁 |
Netty中的主要构件块: Channel callback Future event & ChannelHandler
-
Channel:
可以把Channel看作是传入(入站)和传出(出站)数据的载体,因此他们可以被打开,关闭,链接或者断开链接.
-
回调
: 其实就是一个方法,一个指向已经被提供给另外一个方法的方法引用.这使得后者(接受回调的方法)可以在适当的时侯调用前者. 例子: 当一个新的链接已经建立时,ChannelHandler的channerActive()回调方法将会被调用,并打印消息.
public class ConnectHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client "+ctx.channel().remoteAddress()+" connected!");
}
}
-
Future
: JDK 内置了 interface Future,但是其所提供的实现,只允许手动检查对应的操作是否已经完成或者一直阻塞直到他完成.非常繁琐. Netty提供了自己的实现ChannelFuture
,用于在异步执行操作的时候使用. ChannelFuture提供了额外的方法,是的我们能够注册一个或多个ChannelFutureListener实例,监听回调方法operationComplete(),将会在对应的操作完成时被调用,然后监听器可以判断该操作是成功了还是出错了.如果是后者,我们可以检索产生的Throwable, 简而言之: 由ChannelFutureListener提供的通知机制消除了手动检查对应操作是否完成的必要.
每个Netty的出站IO操作都将返回一个ChannelFuture,也就是说他们都不会阻塞.
Channel channel = null;//does not block
ChannelFuture future = channel.connect(new InetSocketAddress("192,168.0.1", 25));
future.addListener(future1 -> {
if(future.isSuccess()){
//如果操作是成功的,则创建一个ByteBuff以持有数据
ByteBuf buffer = Unpooled.copiedBuffer("Hello", Charset.defaultCharset());
//将数据异步地发送到远程节点,返回一个ChannelFuture
ChannelFuture wf = future.channel().writeAndFlush(buffer);
//...
}else {
Throwable cause = future.cause();
cause.printStackTrace();//如果发生错误,打印堆栈信息
}
});
-
事件和ChannelHandler:
Netty使用不同的事件来通知我们状态的改变或者是操作的状态.这使得我们能够基于已经发生的事件来触发合适的动作: 包括 记录日志
数据转换
控制流
应用程序逻辑
- Netty 是 一个网络框架,所以事件是按照他们入站和出站数据流的相关性进行分类的. 可能由入站数据或者相关的状态更改而触发的事件包括:
链接已被激活或者链接失活
数据读取
用户事件
错误事件
, 出站事件是未来将会触发的摸个动作的操作结果, 包括: 打开或关闭到远程节点的链接
将数据写到或者冲刷到套接字
. 每个事件都可以被分发给ChannelHandler类中的某个用户实现的方法.
- Netty的异步编程模型是建立在Future和回调的概念之上的,而将时间派发到ChannelHandler的方法则发生在更深的层次上. 结合在一起.这些元素就提供了一个处理环境,使你的应用程序逻辑可以独立于任何网络操作相关的顾虑而独立的改变.这也是netty的设计方式的一个关键目标.
拦截操作以及高速的转换入站数据和出站数据,都只需要你提供回调或者利用操作所返回的Future.
- Netty通过触发事件将Selector从应用程序中抽象出来,消除了所有本来将需要手写的派发代码.在内部,将会为每个Channel分配一个EventLoop.用以处理所有时间.包括: (1)注册感兴趣事件(2)将事件派发给ChannelHandler(3)安排进一步动作.
EventLoop本身只由一个线程驱动,其处理了一个Channel的所有IO事件,并且在该EventLoop的整个生命周期内不会改变
每个Channel都拥有一个与之相关联的ChannelPipeline,其持有一个ChannelHandler的实例链,在默认情况下,ChannelHandler会把对它的方法调用转发给链中的下一个ChannelHandler.因此,如果exceptionCaught方法没有被该链中的某处被发现,那么接收的异常将会被传递到ChannelPipeline的尾端并被记录,为此.你的应用程序应该提供至少一个实现了exceptionCaught方法的ChannelHandler
//标示一个channelHandler可以被多个Channel安全的共享
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf)msg;
System.out.println("Server received: "+in.toString(CharsetUtil.UTF_8));
ctx.write(in);//将接收到的消息写给发送者,而不冲刷出站消息
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//将未决消息冲刷到远程节点,并且关闭该节点
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();//打印异常
ctx.close();//关闭该Channel
}
}
-----------------------------------------------------------------------------------------------------------
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public static void main(String[] args) throws InterruptedException {
if(args.length != 1){
System.err.println("Usage: "+EchoServer.class.getName()+" <port>");
}
int port = Integer.parseInt(args[0]);
new EchoServer(port).start();
}
public void start() throws InterruptedException {
final EchoServerHandler serverHandler = new EchoServerHandler();
EventLoopGroup group = new NioEventLoopGroup();// 1: 创建EventLoopGroup
try {
ServerBootstrap b = new ServerBootstrap();// 2: 创建ServerBootStrap
b.group(group)
.channel(NioServerSocketChannel.class) //3: 指定所使用的NIO传输Channel
.localAddress(new InetSocketAddress(port)) // 4: 使用指定的端口设置套接字地址
.childHandler(new ChannelInitializer<SocketChannel>() { // 5: 添加一个EchoServerHandler到子Channel的ChannelPipeLine
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(serverHandler);
}
});
ChannelFuture future = b.bind().sync(); // 6: 异步地绑定服务器,调用sync()方法阻塞等待直到绑定完成
future.channel().closeFuture().sync(); // 7: 获取Channel的CloseFuture,并且阻塞当前线程直到他完成
} finally {
group.shutdownGracefully().sync();// 8: 关闭EventLoopGroup
}
}
}
应用程序需求 |
推荐传输 |
非阻塞代码库或者一个常规的起点 |
NIO(或者linux上的Epoll) |
阻塞代码库 |
OIO |
在同一个JVM内部通信 |
Local |
测试ChannelHandler的实现 |
Embedded(这个一般为自己的ChannelHandle)写单元测试. |
网络数据的基本单位总是字节,java nio提供了ByteBuffer作为他的字节容器,但是这个类使用起来过于复杂,而且也有些繁琐
-
Netty的数据容器 ByteBuf
: Netty通过数据处理API通过2个组件暴露-- abstract class ByteBuf 和 interface ByteBufHolder
-
ByteBuf的优点:
- (1)他可以被用户自定义的缓冲区类型扩展
- (2)通过内置的符合缓冲区实现了透明的零拷贝
- (3)容量可以按需增长(类似于StringBuilder)
- (4)在读和写之间切换不需要调用ByteBuffer的flip方法
- (5)读和写使用了不同的索引
- (6)支持方法的调用链
- (7)支持引用计数
- (8) 支持池化
-
堆缓冲区: 最常用的ByteBuf模式是将数据存储在JVM堆空间中.这种模式称为L支撑数组(backing array),他能在没有使用池化的情况下提供快速的分配和释放
,代码如下:
ByteBuf buf = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);
if(buf.hasArray()){
byte[] array = buf.array();
int offset = buf.arrayOffset()+buf.readerIndex();//计算第一个字节的偏移量
int length = buf.readableBytes();
System.out.println(offset+" - "+length);
}
-
直接缓冲区: 是另外一种ByteBuf模式,通过本地内存来分配
, 直接缓冲区的内容将驻留在常规的会被垃圾回收的堆之外
,如果你的数据包含在一个堆上分配的缓冲区中,那么事实上,再通过套接字发送他之前,JVM将会在内部把你的缓冲区复制到一个直接缓冲区中. 直接缓冲区的缺点:
:相对于堆的缓冲区,他们的分配和释放较为昂贵.如果处理遗留代码,因为数据不在堆上,所以不得不进行一次复制.
-
复合缓冲区
: Netty通过一个ByteBuf的子类CompositeByteBuf---实现了这个模式,他提供了一个将多个缓冲区表示为单个合并缓冲区的虚拟表示,(堆内存+直接内存)
CompositeByteBuf messagebuf = Unpooled.compositeBuffer();
ByteBuf headbuf = null; //can be backing or direct
ByteBuf bodybuf = null;//can be backing or direct
messagebuf.addComponents(headbuf,bodybuf);
//do something
messagebuf.removeComponent(0);//remove the header
//iterate
for (ByteBuf buf:messagebuf) {
System.out.println(buf.toString(CharsetUtil.UTF_8));
}
-
Netty定义了2个重要的ChannelHandler子接口:
(1) ChannelInBoundHandler --- 处理入站数据以及各状态变化. (2) ChannelOutBoundHandler ---- 处理出站数据并且允许拦截所有的操作.
public class DiscardHander extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ReferenceCountUtil.release(msg); // 显示的释放与池化ByteBuf实例相关的内存!
}
}
如果extends SimpleChannelInboundHandlerAdapter <Object> 则不需要显示的释放.自动释放
-
异常处理:
(1)ChannelHandler.exceptionCaught() 的默认实现是简单的将当前异常转发给ChannelPipeLine中的下一个ChannelHandler (2)如果异常到达了ChannelPipeLine的最尾端,它将会被记录为未处理.(Netty将会通过WRAN级别的日志记录该异常到达了ChannelPipeLine的尾端,但没有被处理,并尝试释放该异常.) (3)要想自定义处理逻辑,需要重写exceptionCaught()方法,然后决定是否需要将该异常传播出去.
-
处理出站异常:
(1)每个出站操作的都将返回一个ChannelFuture.祖册到ChannelFuture的ChannelFutureListener将在操作完成时被通知该操作是成功了还是出错了,,,(2)几乎所有的ChannelOutBoundHandler上的方法都会传入一个ChannelPromise的实例.作为ChannelFuture的子类,ChannelPromise也可以被分配用于异步通知的监听器,
2种方法:
ChannelFuture future = null;//some channe
future.addListener((ChannelFutureListener)f -> {
if(!f.isSuccess()){
f.cause().printStackTrace();
f.channel().close();
}
});
public class OutBoundExceptionHandler extends ChannelOutboundHandlerAdapter{
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
promise.addListener((ChannelFutureListener) f -> {
if(!f.isSuccess()){
f.cause().printStackTrace();
f.channel().close();
}
});
}
}
-
编解码器:
-
编码器:
是将消息转换为合适于传输的格式(最有可能的就是字节)
-
解码器:
将网络字节流转换为应用程序的消息格式.
编码器操作出站数据,解码器操作入站数据
-
Netty所提供的解码器类
-
将字节解码为消息:
------ ByteToMessageDecoder and ReplayingDecoder
-
将一种消息类型解码为另外一种
------ MessageToMessageDecoder
/**
* IdleStateHandler: 在链接空闲时间太长时,会触发一个IdleStateEvent 事件,然后通过userEventTriggered方法处理该事件
* ReadTimeOutHandler: 如果在指定的时间间隔内没有收到任何的入站数据,将抛出一个ReadTimeoutException并关闭对应的Channel,可以重写exceptionCaught()
* 来检测该异常
* WriteTimeoutHandler: 如果在指定的时间间隔内没有收到任何的出站数据写入,抛出WriteTimeoutException并关闭对应的Channel
*/
public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
//IdleStateHandler 将在被触发时发送一个IdleStateEvent 事件
ch.pipeline().addLast(new IdleStateHandler(0,0,60, TimeUnit.SECONDS),
new HeartbeatHandler());
}
private static final class HeartbeatHandler extends ChannelInboundHandlerAdapter{
private static final ByteBuf HEARTBEAT_SEQUENC = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEART",
CharsetUtil.UTF_8));
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if( evt instanceof IdleStateEvent){
//发送心跳消息并在发送失败时关闭该链接
ctx.writeAndFlush(HEARTBEAT_SEQUENC.duplicate()).addListener(ChannelFutureListener.CLOSE);
}else {
//如果不是IdleStateEvent 事件,所以将它传递给下一个InboundHandler
super.userEventTriggered(ctx, evt);
}
}
}
}
-
解码基于分隔符的协议和基于长度的协议:
-
DelimiterBasedFrameDecoder:
使用任何由用户提供的分隔符来提取帧的通用解码器
-
LineBasedFrameDecoder:
提取由尾行符(n或rn)分割的帧解码器,比上面的解码器要快.
-
基于长度的协议:
-
FixedLengthFrameDecoder:
提取在构造函数时指定的定长帧
-
LengthFieldBasedFrameDecoder:
根据编码进帧头部中的长度值提取帧,该字段的偏移量以及长度在构造函数中指定.
public class CmdHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new CmdDecoder(64*1024),new CmdHandler());
}
private static final class Cmd{
private final ByteBuf name;
private final ByteBuf args;
public Cmd(ByteBuf name, ByteBuf args) {
this.name = name;
this.args = args;
}
public ByteBuf getName() {
return name;
}
public ByteBuf getArgs() {
return args;
}
@Override
public String toString() {
return name.toString(CharsetUtil.UTF_8)+" - "+args.toString(CharsetUtil.UTF_8);
}
}
public static final class CmdDecoder extends LineBasedFrameDecoder{
public CmdDecoder(int maxLength) {
super(maxLength);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
ByteBuf frame = (ByteBuf)super.decode(ctx, buffer);
if(frame == null) return null;//如果输入中没有帧
int index = frame.indexOf(frame.readerIndex(),frame.writerIndex(),(byte) ' ');
return new Cmd(frame.slice(frame.readerIndex(),index),frame.slice(index+1,frame.writerIndex()));
}
}
public static final class CmdHandler extends SimpleChannelInboundHandler<Cmd>{
@Override
protected void channelRead0(ChannelHandlerContext ctx, Cmd msg) throws Exception {
System.out.println("get: "+msg);
}
}
}
- 使用FileRegion接口的实现
在异步框架中高效的写大块的数据
.例子显示了:如何通过从FileInputStream创建一个DefaultFileRgion.并将其写入channel.从而利用零拷贝特性来传输一个文件内容.
Channel channel = null;//does not block
File file = new File("");
FileInputStream in = new FileInputStream(file);
DefaultFileRegion fileRegion = new DefaultFileRegion(in.getChannel(), 0, file.length());
channel.writeAndFlush(fileRegion).addListener((ChannelFutureListener) f ->{
if(!f.isSuccess()){
f.cause().printStackTrace();
}
});
这个实例只适合于文件内容的直接传输,不包括应用程序对数据的任何处理. 在需要将数据从文件系统复制到用户内存时,可以使用ChunkedWritedHandler,它支持异步写大型数据流,而不会导致大量的内存消耗
UDP: 无连接协议即用户数据报协议(UDP),它通常用在性能至关重要并且能够容忍一定的数据包丢失的情况下,最有名的基于UDP的协议就是 域名服务(DNS),其将完全限定的名称映射为数字的IP地址
- `到目前为止都是一种叫 单播 的传输模式,定义为发送消息给一个由唯一地址所标识的单一网络目的地.
面向连接和无连接协议都支持这种模式.
但是UDP提供了向多个接受者发送消息的额外传输模式 (1)多播----传输到一个预定义的主机组 (2) 广播----传输到网络(或者子网)上的所有主机`