原生的 socket 编程:Java 中的 IO 与 socket 编程 [ 复习 ]
介绍
Netty 项目是一个提供异步事件驱动网络应用框架。
示例
1.discord 服务
handler
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //默默的丢弃接收到的数据 ByteBuf in = (ByteBuf) msg; try{ System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII)); } finally{ ReferenceCountUtil.release(msg); //(2) } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); //出现异常时直接关闭链接 ctx.close(); } }
server
public class DiscardServer { private int port; public DiscardServer(int port){ this.port=port; } public void run() { NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new DiscardServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) { int port; if(args.length>0){ port=Integer.parseInt(args[0]); }else{ port=8080; } new DiscardServer(port).run(); } }
使用命令行作为客户端
$ ncat localhost 8080 hello hi hello world
2.echo 服务
handler
public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.write(msg); ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); //出现异常时直接关闭链接 ctx.close(); } }
同样命令行作为客户端
3.time 服务
server handler
public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { final ByteBuf time = ctx.alloc().buffer(4); time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); ChannelFuture f = ctx.writeAndFlush(time); // f.addListener(ChannelFutureListener.CLOSE); f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { assert f == future; ctx.close(); } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){ cause.printStackTrace(); ctx.close(); } }
server
public class TimeServer { private final int port; public TimeServer(int port){ this.port=port; } public void run() { NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) { int port; if(args.length>0){ port=Integer.parseInt(args[0]); }else{ port=8080; } new TimeServer(port).run(); } }
client handler
public class TimeClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf m = (ByteBuf) msg; // (1) try { long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } finally { m.release(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
decoder
public class TimeDecoder extends ByteToMessageDecoder { // (1) @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2) if (in.readableBytes() < 4) { return; // (3) } out.add(in.readBytes(4)); // (4) } }
client
public class TimeClient { public static void main(String[] args) { String host = "localhost"; int port = 8080; NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel.class); b.option(ChannelOption.SO_KEEPALIVE, true); b.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler()); } }); //启动客户端 ChannelFuture future = b.connect(host, port).sync(); //在连接关闭之前保持等待 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); } } }
233