如下所示,我们写一个简单的Netty Demo,实现客户端与服务端进行通讯。
1、Netty 服务端启动类
/** * (1)、 初始化用于Acceptor的主"线程池"以及用于I/O工作的从"线程池"; * (2)、 初始化ServerBootstrap实例, 此实例是netty服务端应用开发的入口; * (3)、 通过ServerBootstrap的group方法,设置(1)中初始化的主从"线程池"; * (4)、 指定通道channel的类型,由于是服务端,故而是NioServerSocketChannel; * (5)、 设置ServerSocketChannel的处理器 * (6)、 设置子通道也就是SocketChannel的处理器, 其内部是实际业务开发的"主战场" * (8)、 配置子通道也就是SocketChannel的选项 * (9)、 绑定并侦听某个端口 */ public class SimpleNettyServer { public void bind(int port) throws Exception { // 服务器端应用程序使用两个NioEventLoopGroup创建两个EventLoop的组,EventLoop这个相当于一个处理线程,是Netty接收请求和处理IO请求的线程。 // 主线程组, 用于接受客户端的连接,但是不做任何处理,跟老板一样,不做事 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 从线程组, 当boss接受连接并注册被接受的连接到worker时,处理被接受连接的流量。 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // netty服务器启动类的创建, 辅助工具类,用于服务器通道的一系列配置 ServerBootstrap serverBootstrap = new ServerBootstrap(); /** * 使用了多少线程以及如何将它们映射到创建的通道取决于EventLoopGroup实现,甚至可以通过构造函数进行配置。 * 设置循环线程组,前者用于处理客户端连接事件,后者用于处理网络IO(server使用两个参数这个) * public ServerBootstrap group(EventLoopGroup group) * public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) */ serverBootstrap.group(bossGroup, workerGroup) //绑定两个线程组 // 用于构造socketchannel工厂 .channel(NioServerSocketChannel.class) //指定NIO的模式 /** * @Description: 初始化器,channel注册后,会执行里面的相应的初始化方法,传入自定义客户端Handle(服务端在这里操作) * @Override protected void initChannel(SocketChannel channel) throws Exception { // 通过SocketChannel去获得对应的管道 ChannelPipeline pipeline = channel.pipeline(); // 通过管道,添加handler pipeline.addLast("nettyServerOutBoundHandler", new NettyServerOutBoundHandler()); pipeline.addLast("nettyServerHandler", new NettyServerHandler()); } * 子处理器也可以通过下面的内部方法来实现。 */ .childHandler(new ChannelInitializer<SocketChannel>() { // 子处理器,用于处理workerGroup protected void initChannel(SocketChannel socketChannel) throws Exception { // socketChannel.pipeline().addLast(new NettyServerOutBoundHandler()); socketChannel.pipeline().addLast(new SimpleNettyServerHandler()); } }); // 启动server,绑定端口,开始接收进来的连接,设置8088为启动的端口号,同时启动方式为同步 ChannelFuture channelFuture = serverBootstrap.bind(8088).sync(); System.out.println("server start"); // 监听关闭的channel,等待服务器 socket 关闭 。设置位同步方式 channelFuture.channel().closeFuture().sync(); } finally { //退出线程组 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; new netty.server.SimpleNettyServer().bind(port); } }
2、Netty 服务端处理类Handler
public class SimpleNettyServerHandler extends ChannelInboundHandlerAdapter { /** * 本方法用于读取客户端发送的信息 * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("SimpleNettyServerHandler.channelRead"); ByteBuf result = (ByteBuf) msg; byte[] bytesMsg = new byte[result.readableBytes()]; // msg中存储的是ByteBuf类型的数据,把数据读取到byte[]中 result.readBytes(bytesMsg); String resultStr = new String(bytesMsg); // 接收并打印客户端的信息 System.out.println("Client said:" + resultStr); // 释放资源,这行很关键 result.release(); // 向客户端发送消息 String response = "hello client!"; // 在当前场景下,发送的数据必须转换成ByteBuf数组 ByteBuf encoded = ctx.alloc().buffer(4 * response.length()); encoded.writeBytes(response.getBytes()); ctx.write(encoded); ctx.flush(); } /** * 本方法用作处理异常 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 当出现异常就关闭连接 cause.printStackTrace(); ctx.close(); } /** * 信息获取完毕后操作 * * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
3、Netty 客户端启动类
public class SimpleNettyClient { public void connect(String host, int port) throws Exception { EventLoopGroup worker = new NioEventLoopGroup(); try { // 客户端启动类程序 Bootstrap bootstrap = new Bootstrap(); /** *EventLoop的组 */ bootstrap.group(worker); /** * 用于构造socketchannel工厂 */ bootstrap.channel(NioSocketChannel.class); /**设置选项 * 参数:Socket的标准参数(key,value),可自行百度 保持呼吸,不要断气! * */ bootstrap.option(ChannelOption.SO_KEEPALIVE, true); /** * 自定义客户端Handle(客户端在这里搞事情) */ bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new SimpleNettyClientHandler()); } }); /** 开启客户端监听,连接到远程节点,阻塞等待直到连接完成*/ ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); /**阻塞等待数据,直到channel关闭(客户端关闭)*/ channelFuture.channel().closeFuture().sync(); } finally { worker.shutdownGracefully(); } } public static void main(String[] args) throws Exception { SimpleNettyClient client = new SimpleNettyClient(); client.connect("127.0.0.1", 8088); } }
4、客户端处理类Handler
public class SimpleNettyClientHandler extends ChannelInboundHandlerAdapter { /** * 本方法用于接收服务端发送过来的消息 * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("SimpleClientHandler.channelRead"); ByteBuf result = (ByteBuf) msg; byte[] result1 = new byte[result.readableBytes()]; result.readBytes(result1); System.out.println("Server said:" + new String(result1)); result.release(); } /** * 本方法用于处理异常 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 当出现异常就关闭连接 cause.printStackTrace(); ctx.close(); } /** * 本方法用于向服务端发送信息 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String msg = "hello Server!"; ByteBuf encoded = ctx.alloc().buffer(4 * msg.length()); encoded.writeBytes(msg.getBytes()); ctx.write(encoded); ctx.flush(); } }
先启动服务端,然后启动客户端,可分别在Console得到以下输出:
服务端:
server start SimpleNettyServerHandler.channelRead Client said:hello Server!
客户端:
SimpleClientHandler.channelRead Server said:hello client!
由此,一个Netty的简单Demo即搭建完成。