一、Server端的编程模型
- 示例代码1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new TimeServerHandler());
}
});
b.bind(PORT).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
由ServerBootstrap作为入口,用链式编程方式,分别调用group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).handler().
childHander()方法相当重要的方法,开发人员入手的地方,采用面向接口的方式,让开发人实现ChannelInitializer接口,实现initChannel(SocketChannel ch)方法,把自己的Handler通过channelPipeline().addLast(new TimeServerHandler())来实现事件处理。
- 示例代码2
@Sharable
public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override
public void channelActive(final ChannelHandlerContext ctx) {
final ByteBuf time = ctx.alloc().buffer(4); // (2)
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); final ChannelFuture f = ctx.writeAndFlush(time); // (3)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // (4)
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
二、client编程模型
- 代码示例3
public class TimeClient { public static void main(String[] args) throws Exception {
new TimeClient().bind();
} private void bind() throws InterruptedException{
String host = "localhost";
int port = Integer.parseInt("8080");
EventLoopGroup workerGroup = new NioEventLoopGroup(); try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new channelHandler()); // 启动客户端
ChannelFuture f = b.connect(host, port).sync(); // (5) // 等待连接关闭
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
} private class channelHandler extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
}
}
public class TimeClientHandler extends ChannelInboundHandlerAdapter { private static final SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd HH:mm:ss");
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(sdf.format(new Date(currentTimeMillis)));
ctx.close();
} finally {
m.release();
}
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}