netty: 以默认的ByteBuf作为传输数据

client部分代码:

//线程
		EventLoopGroup worker = new NioEventLoopGroup();
		//辅助类
		Bootstrap b = new Bootstrap();
		//注册server
		b.group(worker)
		.channel(NioSocketChannel.class)
		.handler(new ChannelInitializer<SocketChannel>() {

			@Override
			protected void initChannel(SocketChannel sc) throws Exception {
				// TODO Auto-generated method stub
				sc.pipeline().addLast(new ClientHandler());
			}
		});

  

clientHandler部分代码:

@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		// TODO Auto-generated method stub
		try {
			ByteBuf buf = (ByteBuf)msg;
			byte[] bytes = new byte[buf.readableBytes()];
			buf.readBytes(bytes);
			String result = new String(bytes, "utf-8");
			System.out.println("Server: " + result);
		}finally {
			ReferenceCountUtil.release(msg);
		}
		
	}

  

 

 

下面查看完整代码 :

client:

public static void main(String[] args) throws InterruptedException {
		
		//线程
		EventLoopGroup worker = new NioEventLoopGroup();
		//辅助类
		Bootstrap b = new Bootstrap();
		//注册server
		b.group(worker)
		.channel(NioSocketChannel.class)
		.handler(new ChannelInitializer<SocketChannel>() {

			@Override
			protected void initChannel(SocketChannel sc) throws Exception {
				// TODO Auto-generated method stub
                               //不做任何处理,ByteBuf格式传输
				sc.pipeline().addLast(new ClientHandler());
			}
		});
		
		ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
		
		cf.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty!!".getBytes()));
//		Thread.sleep(1000);
//		cf.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty!!".getBytes()));
//		Thread.sleep(1000);
//		cf.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty!!".getBytes()));
		//发送完毕,断开连接
		cf.addListener(ChannelFutureListener.CLOSE);
		
		cf.channel().closeFuture().sync();
		worker.shutdownGracefully();
		
	}
	

  

 

clientHandler代码:

需要继承:ChannelHandlerAdapter这个类

public class ClientHandler extends ChannelHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		// TODO Auto-generated method stub
		try {

                        //原始ByteBuf数据格式处理
			ByteBuf buf = (ByteBuf)msg;
			byte[] bytes = new byte[buf.readableBytes()];
			buf.readBytes(bytes);
			String result = new String(bytes, "utf-8");
			System.out.println("Server: " + result);
		}finally {

                       //接收处理完后,丢弃
			ReferenceCountUtil.release(msg);
		}
		
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		// TODO Auto-generated method stub
		cause.printStackTrace();
		ctx.close();
	}

	
}    

  

 

Server代码:

public static void main(String[] args) throws InterruptedException {
		
		//第一个线程连接client端
		EventLoopGroup boss = new NioEventLoopGroup();
		//第二个线程处理逻辑
		EventLoopGroup worker = new NioEventLoopGroup();
		//辅助类,注册 server
		ServerBootstrap b = new ServerBootstrap();
		b.group(boss, worker)
		.channel(NioServerSocketChannel.class)
		.childHandler(new ChannelInitializer<SocketChannel>() {

			@Override
			protected void initChannel(SocketChannel sc) throws Exception {
				// TODO Auto-generated method stub
				sc.pipeline().addLast(new ServerHandler());
			}
		});
		
		//绑定指定的端口方便监听
		ChannelFuture cf = b.bind(8765).sync();
		cf.channel().closeFuture().sync();
		
		boss.shutdownGracefully();
		worker.shutdownGracefully();
		
	}

  

 

serverHandler代码:

需要继承:ChannelHandlerAdapter 类

public class ServerHandler extends ChannelHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		// TODO Auto-generated method stub
		ByteBuf buf = (ByteBuf)msg;
		byte[] bs = new byte[buf.readableBytes()];
		buf.readBytes(bs);
		String result = new String(bs, "utf-8");
		System.out.println("Client: " + result);
		
		String response = "888888";
		ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
		//.addListener(ChannelFutureListener.CLOSE);
		
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		// TODO Auto-generated method stub
		cause.printStackTrace();
		ctx.close();
	}

	

	
}

  

 

上一篇:netty使用EmbeddedChannel对channel的出入站进行单元测试


下一篇:Netty实战之使用Netty解析交通部JT808协议