先啰嗦两句,使用 netty 来搭建服务器程序,可以发现相比于传统的 nio 程序, netty 的代码更加简洁,开发难度更低,扩展性也很好,非常适合作为基础通信框架.
下面上代码:
Server
package time.server.impl; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; /**
* TODO
*
* @description
* @author mjorcen
* @time 2015年5月25日 下午2:50:57
*/
public class NTimeServerImpl { public void bind(int port) {
// 创建两个NioEventLoopGroup 实例,NioEventLoopGroup
// 是一个线程组,它包含一组NIO线程,专门用于处理网络事件的处理,实际上他们就是Reactor 线程组
// 这里创建两个的原因是一个用于服务端接收用户的链接,另一个用于进行SocketChannel的网络读写
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup(); try {
// 创建一个 ServerBootstrap ,它是netty用于NIO服务端的辅助启动类,目的是降低服务端的开发复杂度.
ServerBootstrap bootstrap = new ServerBootstrap();
// 设定 服务端接收用户请求的线程组和用于进行SocketChannel网络读写的线程组
bootstrap.group(bossGroup, workerGroup);
// 设置创建的 channel 类型
bootstrap.channel(NioServerSocketChannel.class);
// 配置 NioServerSocketChannel 的 tcp 参数, BACKLOG 的大小
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
// 绑定io处理类(childChannelHandler).他的作用类似于 reactor 模式中的 handler
// 类,主要用于处理网络 I/O 事件,例如对记录日志,对消息进行解码等.
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeServerHandler());
}
});
// 绑定端口,随后调用它的同步阻塞方法 sync 等等绑定操作成功,完成之后 Netty 会返回一个 ChannelFuture
// 它的功能类似于的 Future,主要用于异步操作的通知回调.
ChannelFuture channelFuture = bootstrap.bind(port).sync();
// 等待服务端监听端口关闭,调用 sync 方法进行阻塞,等待服务端链路关闭之后 main 函数才退出.
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 优雅的退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
} public static void main(String[] args) {
NTimeServerImpl server = new NTimeServerImpl();
server.bind(9091);
} }
Server Handler
package time.server.impl; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext; import java.util.Date; import time.TimeConfig; /**
* TODO
*
* @description
* @author ez
* @time 2015年5月25日 下午3:06:09
*/
public class TimeServerHandler extends ChannelHandlerAdapter implements
TimeConfig { /*
* (non-Javadoc)
*
* @see io.netty.channel.ChannelHandlerAdapter#channelRead(io.netty.channel.
* ChannelHandlerContext, java.lang.Object)
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "utf-8");
System.out.println("The time server receive order : " + body);
String currentTime = QUERY.equalsIgnoreCase(body) ? new Date()
.toString() : "BAD ORDER";
System.out.println("currentTime : " + currentTime);
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes("utf-8"));
ctx.writeAndFlush(resp);
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
// 当出现异常时,释放资源.
ctx.close();
} @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
} }
client
package time.client.impl; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; /**
* TODO
*
* @description
* @author ez
* @time 2015年5月25日 下午3:17:29
*/
public class NTimeClient { public void connect(int port, String host) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try { Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
}); // 发起异步链接操作
ChannelFuture future = bootstrap.connect(host, port).sync(); // 等待客户端链路关闭
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
} public static void main(String[] args) throws Exception {
NTimeClient client = new NTimeClient();
client.connect(9091, "localhost");
}
}
client handler
package time.client.impl; import java.io.UnsupportedEncodingException; import time.TimeConfig;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext; /**
* TODO
*
* @description
* @author ez
* @time 2015年5月25日 下午3:21:26
*/
public class TimeClientHandler extends ChannelHandlerAdapter implements
TimeConfig {
private final ByteBuf message; public TimeClientHandler() throws UnsupportedEncodingException {
byte[] bs = QUERY.getBytes("utf-8");
message = Unpooled.buffer(bs.length);
message.writeBytes(bs);
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
// 异常时,调用这个方法
ctx.close();
} @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 当客户端和服务器 TCP 链路建立成功之后,调用这个方法.
ctx.writeAndFlush(message);
} @Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
// 当服务端返回应答消息时,调用这个方法.
ByteBuf buf = (ByteBuf) msg;
byte[] bs = new byte[buf.readableBytes()];
buf.readBytes(bs);
System.out.println(new String(bs, "utf-8"));
} }