1 概述
Netty中的ChannelPipeline类似于servlet,chanelHandler类似于filter。这类拦截器就是职责链设计模式,主要是事件拦截和用户业务逻辑定制。演示代码采用的是netty 3.10.5版本。调试步骤和示例代码如下:
步骤1 下载完成后导入为maven项目。步骤2 需要测试的项目在configure build path时不要直接导入netty3的jar包,直接导入project项目。
步骤3 对关注的方法打断点。
步骤4 跟踪调用链。
import java.net.InetSocketAddress; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.jboss.netty.handler.codec.string.StringDecoder; /** * netty服务端入门 */ public class Server { public static void main(String[] args) { // 服务类 ServerBootstrap bootstrap = new ServerBootstrap(); // boss线程监听端口,worker线程负责数据读写 ExecutorService boss = Executors.newCachedThreadPool(); ExecutorService worker = Executors.newCachedThreadPool(); // 设置niosocket工厂 bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker)); // 设置管道的工厂 bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("xyHelloHandler", new HelloHandler()); return pipeline; } }); bootstrap.bind(new InetSocketAddress(10101)); System.out.println("server3 start!!!"); } } import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; /** * 消息接受处理类 */ public class HelloHandler extends SimpleChannelHandler { // 接收消息 @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { String messageReceived = (String) e.getMessage(); System.out.println(messageReceived); super.messageReceived(ctx, e); } // 捕获异常 @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { System.out.println("exceptionCaught"); super.exceptionCaught(ctx, e); } // 新连接 @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channelConnected"); super.channelConnected(ctx, e); } // 必须是链接已经建立,关闭通道的时候才会触发 @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channelDisconnected"); super.channelDisconnected(ctx, e); } // channel关闭的时候触发 @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channelClosed"); super.channelClosed(ctx, e); } }
handler是通过pipeline的addLast方法添加的,那首先将断点定位该处。
bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decode", new StringDecoder()); pipeline.addLast("xyHelloHandler", new HelloHandler()); return pipeline; } });
2 server启动
在示例代码中,Debug方式运行Server,耐心一步步跟着断点往下走,注意查看调用链。
A DefaultChannelPileline@addLast
断点如约而至,从调用链中可看出入口是ServerBootStrap的bindAsync方法。
public ChannelFuture bindAsync(final SocketAddress localAddress) { if (localAddress == null) { throw new NullPointerException("localAddress"); } Binder binder = new Binder(localAddress); ChannelHandler parentHandler = getParentHandler(); ChannelPipeline bossPipeline = pipeline(); bossPipeline.addLast("binder", binder); if (parentHandler != null) { bossPipeline.addLast("userHandler", parentHandler); } }
bossPipeline.addLast("binder", binder)赋值了一个名为binder的处理器。继续跟进addLast方法。
private final Map<String, DefaultChannelHandlerContext> name2ctx =new HashMap<String, DefaultChannelHandlerContext>(4); public synchronized void addLast(String name, ChannelHandler handler) { if (name2ctx.isEmpty()) { init(name, handler); } else { checkDuplicateName(name); DefaultChannelHandlerContext oldTail = tail; DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(oldTail, null, name, handler); callBeforeAdd(newTail); oldTail.next = newTail; tail = newTail; name2ctx.put(name, newTail); callAfterAdd(newTail); } }B DefaultChannelPileline@init
name2ctx是保存handlerContext的集合对象,此时是空的。断点走进init方法,逐行分析方法内的这几行代码。
private void init(String name, ChannelHandler handler) { DefaultChannelHandlerContext ctx = new DefaultChannelHandlerContext(null, null, name, handler); callBeforeAdd(ctx); head = tail = ctx; name2ctx.clear(); name2ctx.put(name, ctx); callAfterAdd(ctx); }
第1行 声明了DefaultChannelHandlerContext对象,传入name值是binder,handler是名为binder的处理器,这两个值初始化的地方就在上文看到的调用链的BootStrap的bind方法中。先看DefaultChannelHandlerContext该类使用到的属性。很明显这是链表的数据结构,prev指向上个对象,next指向下个对象。
private final class DefaultChannelHandlerContext implements ChannelHandlerContext { volatile DefaultChannelHandlerContext next; volatile DefaultChannelHandlerContext prev; private final String name; private final ChannelHandler handler; }
第2行 binder并不是LifeCycleAwareChannelHandler类型,直接return掉。
private static void callBeforeAdd(ChannelHandlerContext ctx) { if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) { return; }
第3行 将链表中的第1个head和最后一个tail都赋值给了第1行中声明的对象。
private volatile DefaultChannelHandlerContext head; private volatile DefaultChannelHandlerContext tail;
第4行 清除Map对象中的值。
第5行 将第1行声明的对象赋值给map集合。
第6行 binder并不是LifeCycleAwareChannelHandler类型,直接return掉。第一个需要关注的断点到此结束。
private void callAfterAdd(ChannelHandlerContext ctx) { if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) { return; } }
3 client启动
利用telnet启动一个客户端。telnet 127.0.0.1 10101。这次设置断点在位置Server类中的Channels.pipeline()位置
A channels.pipeline()
新生成了一个ChannelPipeline对象。从调用链可看出【1】boss线程池负责接收连接。【2】上游是NioServerBoss@registerAcceptedChannel。在建立连接时,该ChannelPipeline的所有handler将被设置完成。
private static void registerAcceptedChannel(NioServerSocketChannel parent, SocketChannel acceptedSocket,Thread currentThread) { try { ChannelSink sink = parent.getPipeline().getSink(); ChannelPipeline pipeline =parent.getConfig().getPipelineFactory().getPipeline(); } }B pipeline.addLast(decoder)
添加StringDecoder这个处理器,流程和4.2中的所有步骤一致,都是走init方法,同样在callBeforeAdd和callAfterAdd中return了。唯一不同的是name叫做decoder。
C pipeline.addLast(xyHelloHandler)
name2ctx不再empty,进入else分支。
checkDuplicateName检查是否有重名,有重名则直接抛出异常。
oldTail表示原来尾对象,新申明的xyHelloHandler将被作为新尾对象。再把原oldTail的next指向xyHelloHandler,再放入name2ctx集合。
D AbstractNioSelector@select
继续跟着断点走,会走到AbstractNioSelector的select方法,select会阻塞,等待下一次事件。
4 client发送信息
在telnet窗口按下ctrl+] 会进入telnet的发送窗口。现在关注以下2个方法的调用链,StringDecoder@decode、HelloHandler@messageReceived断点到这两个方法。
A StringDecoder@decode
从NioWorker的read方法开始,执行到DefaultChannelPipeline的sendUpstream,首先执行该pipeline的head的handler,而这个管道的head handler是StringDecoder。
public void sendUpstream(ChannelEvent e) { DefaultChannelHandlerContext head = getActualUpstreamContext(this.head); if (head == null) { if (logger.isWarnEnabled()) { logger.warn( "The pipeline contains no upstream handlers; discarding: " + e); } return; } sendUpstream(head, e); }
进入sendUpstream(head, e)方法,会先调用StringDecoder父类OnetoOneDecoder的handleUpStream()方法。
public void handleUpstream( ChannelHandlerContext ctx, ChannelEvent evt) throws Exception { if (!(evt instanceof MessageEvent)) { ctx.sendUpstream(evt); return; } MessageEvent e = (MessageEvent) evt; Object originalMessage = e.getMessage(); Object decodedMessage = decode(ctx, e.getChannel(), originalMessage); if (originalMessage == decodedMessage) { ctx.sendUpstream(evt); } else if (decodedMessage != null) { fireMessageReceived(ctx, decodedMessage, e.getRemoteAddress()); } }
父类方法调用子类的decode方法,如此这般StringDecoder的decode方法就被调用到了。
B HelloHandler@messageReceived
注意上述方法中还调用fireMessageReceived(ctx, decodedMessage, e.getRemoteAddress())。一层一层进入该方法,最终会执行到DefaultChannelPipeline的内部类DefaultChannelHandlerContext的sendUpstream方法
public void sendUpstream(ChannelEvent e) { DefaultChannelHandlerContext next = getActualUpstreamContext(this.next); if (next != null) { DefaultChannelPipeline.this.sendUpstream(next, e); } }next相当于获取了head的下一个handler对象即HelloHanlder。当然helloHandler中也有所谓的链式调用,继续调用fireMessageReceived,若它的next还指向其他handler则会继续调用。
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { String messageReceived = (String) e.getMessage(); System.out.println(messageReceived); super.messageReceived(ctx, e); }