相关知识点:
[1] ChannelGroup是一个容纳打开的通道实例的线程安全的集合,方便我们统一施加操作。所以在使用的过程中可以将一些相关的Channel归类为一个有意义的集合,关闭的通道会自动从集合中移除,而且一个Channel可以属于多个ChannelGroup。常见的应用场景是 向一组通道广播消息;简化一组通道的关闭流程。
[2] 因为在Channel中流通的是ChannelBuffer (可以联系NIO ByteBuffer进行理解),而在内部流水线中处理的可能是一个个对象,所以在收到或要发送到对端的时候就需要一个解码,编码的过程:在实际有意义的对象和字节数组间转换。
[3] 在客户端和服务器端流水线中handler执行顺序是不同的,在客户端中执行顺序和ChannelPipeline的增加顺序一致;而服务器端则恰好是相反的,这一点要注意,所以看到编解码类的位置是那样的。
具体代码:
TimeClient.java
import java.net.InetSocketAddress; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; public class TimeClient { public static void main(String[] args) { String host = "localhost"; int port = 8080; if(args.length == 2){ host = args[0]; port = Integer.parseInt(args[1]); } ChannelFactory factory = new NioClientSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); ClientBootstrap bootstrap = new ClientBootstrap(factory); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new TimeDecoder2(), new TimeClientHandler4()); } }); ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); // future.awaitUninterruptibly(); if(!future.isSuccess()){ future.getCause().printStackTrace(); } future.getChannel().getCloseFuture().awaitUninterruptibly(); factory.releaseExternalResources(); } }
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.frame.FrameDecoder; public class TimeDecoder2 extends FrameDecoder{ @Override protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { if(buffer.readableBytes() < 4) return null; return new UnixTime(buffer.readInt()); } }
TimeClientHandler4.java
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; public class TimeClientHandler4 extends SimpleChannelHandler{ @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { //the decoder put UnixTime obj,so get just what we need UnixTime cur = (UnixTime)e.getMessage(); System.out.println(cur); e.getChannel().close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { e.getCause().printStackTrace(); Channel c = e.getChannel(); c.close(); } }
TimeServer.java
import java.net.InetSocketAddress; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.ChannelGroupFuture; import org.jboss.netty.channel.group.DefaultChannelGroup; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; public class TimeServer { //[1] public static ChannelGroup allChannels = new DefaultChannelGroup("time-server"); public static void main(String[] args) { ChannelFactory factory = new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); ServerBootstrap bootstrap = new ServerBootstrap(factory); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new TimeServerHandler2(), new TimeEncoder()); } }); bootstrap.setOption("reuseAddr", true); bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setOption("child.keepAlive", true); Channel channel = bootstrap.bind(new InetSocketAddress(8080)); allChannels.add(channel); //waitForShutdownCommand(); this is a imaginary logic:for instance //when there is accepted connection we close this server ; if(allChannels.size() >=2){ ChannelGroupFuture f = allChannels.close(); f.awaitUninterruptibly(); factory.releaseExternalResources(); } } }
TimeServerHandler2.java
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.SimpleChannelHandler; public class TimeServerHandler2 extends SimpleChannelHandler{ @Override public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { TimeServer.allChannels.add(e.getChannel()); } @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { UnixTime time = new UnixTime((int) (System.currentTimeMillis() / 1000)); ChannelFuture f = e.getChannel().write(time); f.addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { e.getCause().printStackTrace(); Channel c = e.getChannel(); c.close(); } }
TimeEncoder.java
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; public class TimeEncoder extends SimpleChannelHandler{ @Override public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception { UnixTime time = (UnixTime)e.getMessage(); ChannelBuffer buf = ChannelBuffers.buffer(4); buf.writeInt(time.getValue()); Channels.write(ctx, e.getFuture(), buf); } }
UnixTime.java
import java.util.Date; public class UnixTime { private int value; public UnixTime(int value){ this.value = value; } public int getValue(){ return this.value; } public void setValue(int value){ this.value = value; } @Override public String toString() { //Allocates a Date object and initializes it to represent the specified number //of milliseconds since the standard base time known as "the epoch", //namely January 1, 1970, 00:00:00 GMT return new Date(value * 1000L).toString(); } }