一提起netty,一般都能想到NIO,callback。NIO的核心就是epoll,channel,buffer。今天从netty官网上一个简单的例子来入门netty。内容是一个回显服务器,客户端给服务器发送消息,服务器对这些消息进行回复。我相信经过手写这个简单的例子,能对netty一些基本的概念更加熟悉,方便以后的学习。
public class TelnetServer {
public static void main(String[] args) throws CertificateException, SSLException, InterruptedException {
SelfSignedCertificate ssc = new SelfSignedCertificate();
final SslContext sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(boss, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new TelnetServerInitializer(sslCtx));
b.bind(8992).sync().channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
服务端一般会有bossGroup去专门接收请求,由workerGroup专门处理响应的请求。这里我们指定服务端的channel类型为NioServerSocketChannel,服务端的channel是专门用来处理新的连接的。ServerBootstrap在初始化的时候会添加一个专门用于接收新连接的handler到Pipeline内:ServerBootstrapAcceptor。
提一下Pipeline:channel会有一个handler容器,而Pipeline就是充当了handler容器这个角色,典型的责任链模式,handler会判断能不能处理当前请求,如果不能处理就传递给下游。
我们在服务端的channel的pipeline内再加入一个输出日志的handler:LoggingHandler,用于观察服务端接收新连接。
再注册childHandler,chlldHandler用于处理客户端的请求。netty用channel来抽象端到端的通信方式,处理各种事件就是通过在channel上面增加相应的handler。
最后,我们经常看到sync()这个方法,顾名思义,同步,其实就是自旋到这个异步事件完成。有些异步方法比较重要比如bind、connect,为了避免后面的代码变成异步,就可以使用sync方法在这个地方从异步变成同步。
public class TelnetServerInitializer extends ChannelInitializer<SocketChannel> {
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
private final SslContext sslContext;
public TelnetServerInitializer(SslContext sslContext) {
this.sslContext = sslContext;
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(sslContext.newHandler(ch.alloc()))
.addLast(new DelimiterBasedFrameDecoder(10*1024, Delimiters.lineDelimiter()))
.addLast(DECODER)
.addLast(ENCODER)
.addLast(new TelnetServerHandler());
}
}
我们为childChannel的pipeline添加了几个handler,它们的作用分别是
- 为channel提供底层SSL协议支持
- 通过换行符分割消息
- 解码器
- 编码器
- 具体业务处理器(业务层代码,在这一层字节流已经转为我们想要的消息格式了)
public class TelnetServerHandler extends SimpleChannelInboundHandler<String> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// Send greeting for a new connection.
ctx.write("Welcome to " + InetAddress.getLocalHost().getHostName() + "!\r\n");
ctx.write("It is " + new Date() + " now.\r\n");
ctx.flush();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
String response;
boolean close = false;
if (msg.isEmpty()) response = "Please type something.\n";
else if ("bye".equals(msg.toLowerCase())) {
response = "Have a good day!\n";
close = true;
} else response = "Did you say '" + msg + "'?\r\n";
ChannelFuture f = ctx.write(response);
if (close) f.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
为了完整,贴一下客户端的代码。
public class TelnetClient {
public static void main(String[] args) throws IOException, InterruptedException {
SslContext sslContext = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
EventLoopGroup group = new NioEventLoopGroup(1);
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new TelnetClientInitializer(sslContext));
Channel channel = b.connect(InetAddress.getLocalHost(), 8992).channel();
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
ChannelFuture lastWriteFuture = null;
for (; ; ) {
String line = in.readLine();
if (line == null) break;
lastWriteFuture = channel.writeAndFlush(line + "\n");
if ("bye".equals(line)) {
channel.closeFuture().sync();
break;
}
}
if (lastWriteFuture != null) {
lastWriteFuture.sync();
}
} finally {
group.shutdownGracefully();
}
}
}
public class TelnetClientInitializer extends ChannelInitializer<SocketChannel> {
private static final StringEncoder ENCODER = new StringEncoder();
private static final StringDecoder DECODER = new StringDecoder();
private final SslContext sslContext;
public TelnetClientInitializer(SslContext sslContext) {
this.sslContext = sslContext;
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(sslContext.newHandler(ch.alloc()))
.addLast(new DelimiterBasedFrameDecoder(10*1024, Delimiters.lineDelimiter()))
.addLast(DECODER)
.addLast(ENCODER)
.addLast(new TelnetClientHandler());
}
}
@ChannelHandler.Sharable
public class TelnetClientHandler extends SimpleChannelInboundHandler<String> {
private final Logger logger = Logger.getAnonymousLogger();
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
logger.info(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
通过这个例子我大概能明白server和client是怎么跑起来的,怎么处理I/IO事件的。
server:
- 注册handler到主channel(新连接建立),childChannel(客户端请求)
- 通过seletor轮询I/O事件,一个seletor可以绑定多个channel,实现I/O多路复用,底层用的是epoll。
- eventloop负责轮询I/O事件,I/O事件进入pipeline进行处理。
这些事情用JDK的NIO也能做到,但是Netty做了一些封装和优化,使得NIO服务器变得容易 实现。而且利用Netty,我们还可以轻松的自定义协议,其实协议本质就是从字节流转换成消息。