client部分代码:
//线程 EventLoopGroup worker = new NioEventLoopGroup(); //辅助类 Bootstrap b = new Bootstrap(); //注册server b.group(worker) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { // TODO Auto-generated method stub sc.pipeline().addLast(new ClientHandler()); } });
clientHandler部分代码:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // TODO Auto-generated method stub try { ByteBuf buf = (ByteBuf)msg; byte[] bytes = new byte[buf.readableBytes()]; buf.readBytes(bytes); String result = new String(bytes, "utf-8"); System.out.println("Server: " + result); }finally { ReferenceCountUtil.release(msg); } }
下面查看完整代码 :
client:
public static void main(String[] args) throws InterruptedException { //线程 EventLoopGroup worker = new NioEventLoopGroup(); //辅助类 Bootstrap b = new Bootstrap(); //注册server b.group(worker) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { // TODO Auto-generated method stub //不做任何处理,ByteBuf格式传输 sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); cf.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty!!".getBytes())); // Thread.sleep(1000); // cf.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty!!".getBytes())); // Thread.sleep(1000); // cf.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty!!".getBytes())); //发送完毕,断开连接 cf.addListener(ChannelFutureListener.CLOSE); cf.channel().closeFuture().sync(); worker.shutdownGracefully(); }
clientHandler代码:
需要继承:ChannelHandlerAdapter这个类
public class ClientHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // TODO Auto-generated method stub try { //原始ByteBuf数据格式处理 ByteBuf buf = (ByteBuf)msg; byte[] bytes = new byte[buf.readableBytes()]; buf.readBytes(bytes); String result = new String(bytes, "utf-8"); System.out.println("Server: " + result); }finally { //接收处理完后,丢弃 ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // TODO Auto-generated method stub cause.printStackTrace(); ctx.close(); } }
Server代码:
public static void main(String[] args) throws InterruptedException { //第一个线程连接client端 EventLoopGroup boss = new NioEventLoopGroup(); //第二个线程处理逻辑 EventLoopGroup worker = new NioEventLoopGroup(); //辅助类,注册 server ServerBootstrap b = new ServerBootstrap(); b.group(boss, worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { // TODO Auto-generated method stub sc.pipeline().addLast(new ServerHandler()); } }); //绑定指定的端口方便监听 ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); boss.shutdownGracefully(); worker.shutdownGracefully(); }
serverHandler代码:
需要继承:ChannelHandlerAdapter 类
public class ServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // TODO Auto-generated method stub ByteBuf buf = (ByteBuf)msg; byte[] bs = new byte[buf.readableBytes()]; buf.readBytes(bs); String result = new String(bs, "utf-8"); System.out.println("Client: " + result); String response = "888888"; ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes())); //.addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // TODO Auto-generated method stub cause.printStackTrace(); ctx.close(); } }