Netty3.5.9源码(一)Server端启动

ServerBootstrap创建

1,构造一个NioServerSocketChannelFactory来,初始化ServerBootstrap

2,构造一个ChannelPipelineFactory,给ServerBootstrap设置ChannelPipelineFactory

3, 绑定端口,接受client端连接过来的请求。

 public void run() {
        // Configure the server.
        ServerBootstrap bootstrap = new ServerBootstrap(
                new NioServerSocketChannelFactory(
                        Executors.newCachedThreadPool(),
                        Executors.newCachedThreadPool()));

        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() throws Exception {
                return Channels.pipeline(new EchoServerHandler());
            }
        });

        // Bind and start to accept incoming connections.
        bootstrap.bind(new InetSocketAddress(port));
    }

    public static void main(String[] args) throws Exception {
        ...
        new EchoServer(port).run();
    }

构造NioServerSocketChannelFactory,构造NioWorkerPool线程池,启动work线程

这里采用Reactor模式,bossExecutor用来接收客户端连接,而workerExecutor用来执行IO

的read,write等操作,这里还有个大侠NioServerSocketPipelineSink

    new NioServerSocketChannelFactory(
                            Executors.newCachedThreadPool(),
                            Executors.newCachedThreadPool())

    public NioServerSocketChannelFactory(
            Executor bossExecutor, Executor workerExecutor) {
        this(bossExecutor, workerExecutor, SelectorUtil.DEFAULT_IO_THREADS);
    }
    public NioServerSocketChannelFactory(
            Executor bossExecutor, Executor workerExecutor,
            int workerCount) {
        this(bossExecutor, new NioWorkerPool(workerExecutor, workerCount));
    }
        public NioServerSocketChannelFactory(
            Executor bossExecutor, WorkerPool<NioWorker> workerPool) {
        if (bossExecutor == null) {
            throw new NullPointerException("bossExecutor");
        }
        if (workerPool == null) {
            throw new NullPointerException("workerPool");
        }

        this.bossExecutor = bossExecutor;
        this.workerPool = workerPool;
        sink = new NioServerSocketPipelineSink(workerPool);
    }

将workerPool绑定到NioServerSocketPipelineSink

    NioServerSocketPipelineSink(WorkerPool<NioWorker> workerPool) {
        this.workerPool = workerPool;
    }

NioWorkerPool

    public NioServerSocketChannelFactory(
            Executor bossExecutor, Executor workerExecutor,
            int workerCount) {
        this(bossExecutor, new NioWorkerPool(workerExecutor, workerCount));
    }

NioWorkerPool线程池的构造,创建workerCount个NioWorker,默认为CPU的2倍数量

    public NioWorkerPool(Executor workerExecutor, int workerCount) {
        super(workerExecutor, workerCount);
    }
        AbstractNioWorkerPool(Executor workerExecutor, int workerCount) {
        if (workerExecutor == null) {
            throw new NullPointerException("workerExecutor");
        }
        if (workerCount <= 0) {
            throw new IllegalArgumentException(
                    "workerCount (" + workerCount + ") " +
                    "must be a positive integer.");
        }
        workers = new AbstractNioWorker[workerCount];

        for (int i = 0; i < workers.length; i++) {
            workers[i] = createWorker(workerExecutor);
        }
        this.workerExecutor = workerExecutor;
    }

NioWorker

    protected NioWorker createWorker(Executor executor) {
        return new NioWorker(executor);
    }
        public NioWorker(Executor executor) {
        super(executor);
    }

这个super是AbstractNioWorker,打开Selector

    AbstractNioWorker(Executor executor) {
        this.executor = executor;
        openSelector();
    }

打开Selector,启动worker thread

    private void openSelector() {
        try {
            selector = Selector.open();
        } catch (Throwable t) {
            throw new ChannelException("Failed to create a selector.", t);
        }

        // Start the worker thread with the new Selector.
        boolean success = false;
        try {
            DeadLockProofWorker.start(executor, new ThreadRenamingRunnable(this, "New I/O  worker #" + id));
            success = true;
        } finally {
            if (!success) {
                // Release the Selector if the execution fails.
                try {
                    selector.close();
                } catch (Throwable t) {
                    logger.warn("Failed to close a selector.", t);
                }
                selector = null;
                // The method will return to the caller at this point.
            }
        }
        assert selector != null && selector.isOpen();
    }

设置ChannelPipelineFactory

       bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() throws Exception {
                return Channels.pipeline(new EchoServerHandler());
            }
        });

绑定端口

 bootstrap.bind(new InetSocketAddress(port));

ServerBootstrap中的bind()方法

    //构造一个UpstreamHandler,Binder
    ChannelHandler binder = new Binder(localAddress, futureQueue);
    ChannelHandler parentHandler = getParentHandler();

    ChannelPipeline bossPipeline = pipeline();
    //将binder注册上bossPipeline
    bossPipeline.addLast("binder", binder);
    if (parentHandler != null) {
        bossPipeline.addLast("userHandler", parentHandler);
    }
    //创建channel
    Channel channel = getFactory().newChannel(bossPipeline);

再来看看ChannelFactory中的newChannel()

    public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
        return new NioServerSocketChannel(this, pipeline, sink);
    }

在NioServerSocketChannel完成pipeline与sink的绑定,打开一个ServerSocketChannel,并且配置为非阻塞模式,触发ChannelOpen事件

NioServerSocketChannel(
            ChannelFactory factory,
            ChannelPipeline pipeline,
            ChannelSink sink) {
    super(factory, pipeline, sink);
    socket = ServerSocketChannel.open();
    socket.configureBlocking(false);
    config = new DefaultServerSocketChannelConfig(socket.socket());
    //触发ChannelOpen事件
    fireChannelOpen(this);

NioServerSocketChannel-->super()-->AbstractChannel- 绑定pipeline与sink

    protected AbstractChannel(
            Channel parent, ChannelFactory factory,
            ChannelPipeline pipeline, ChannelSink sink) {

        this.parent = parent;
        this.factory = factory;
        this.pipeline = pipeline;
        id = allocateId(this);
        //pipeline与sink的绑定
        pipeline.attach(this, sink);
    }

fireChannelOpen(this);触发ChannelOpen事件

    channel.getPipeline().sendUpstream(
            new UpstreamChannelStateEvent(
                    channel, ChannelState.OPEN, Boolean.TRUE));

这里是默认的DefaultChannelPipeline来处理sendUpstream()事件,

DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
sendUpstream(head, e);

这个head的handler是ServerBootstrap$Binder

 void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
        try {
            ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
        } catch (Throwable t) {
            notifyHandlerException(e, t);
        }
    }

ctx.getHandler()获取到的handler是Binder,而这个handleUpstream在其父类SimpleChannelUpstreamHandler中

public void handleUpstream(
            ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
    if (e instanceof MessageEvent) {
        messageReceived(ctx, (MessageEvent) e);
    } else if (e instanceof WriteCompletionEvent) {
        WriteCompletionEvent evt = (WriteCompletionEvent) e;
        writeComplete(ctx, evt);
    } else if (e instanceof ChildChannelStateEvent) {
        ChildChannelStateEvent evt = (ChildChannelStateEvent) e;
        if (evt.getChildChannel().isOpen()) {
            childChannelOpen(ctx, evt);
        } else {
            childChannelClosed(ctx, evt);
        }
    } else if (e instanceof ChannelStateEvent) {
        ChannelStateEvent evt = (ChannelStateEvent) e;
        switch (evt.getState()) {
        case OPEN:
            if (Boolean.TRUE.equals(evt.getValue())) {
                channelOpen(ctx, evt);
            } else {
                channelClosed(ctx, evt);
            }
            break;
        case BOUND:
            if (evt.getValue() != null) {
                channelBound(ctx, evt);
            } else {
                channelUnbound(ctx, evt);
            }
            break;
        case CONNECTED:
            if (evt.getValue() != null) {
                channelConnected(ctx, evt);
            } else {
                channelDisconnected(ctx, evt);
            }
            break;
        case INTEREST_OPS:
            channelInterestChanged(ctx, evt);
            break;
        default:
            ctx.sendUpstream(e);
        }
    } else if (e instanceof ExceptionEvent) {
        exceptionCaught(ctx, (ExceptionEvent) e);
    } else {
        ctx.sendUpstream(e);
    }

这里的ChannelEvent是ChannelStateEvent,OPEN,进入channelOpen(),这个channelOpen是在Binder中实现的

    public void channelOpen(
            ChannelHandlerContext ctx,
            ChannelStateEvent evt) {

        try {
            evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());

            // Split options into two categories: parent and child.
            Map<String, Object> allOptions = getOptions();
            Map<String, Object> parentOptions = new HashMap<String, Object>();
            for (Entry<String, Object> e: allOptions.entrySet()) {
                if (e.getKey().startsWith("child.")) {
                    childOptions.put(
                            e.getKey().substring(6),
                            e.getValue());
                } else if (!e.getKey().equals("pipelineFactory")) {
                    parentOptions.put(e.getKey(), e.getValue());
                }
            }

            // Apply parent options.
            evt.getChannel().getConfig().setOptions(parentOptions);
        } finally {
            ctx.sendUpstream(evt);
        }

        boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress));
        assert finished;
    }

关联主Channel与getPipelineFactory(pipelineFactory==EchoServer$1?)

 evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());

将parentOptions与主主Channel关联

evt.getChannel().getConfig().setOptions(parentOptions);

绑定之前打开的NioServerSocketChannel

evt.getChannel().bind(localAddress)

bind()

    public ChannelFuture bind(SocketAddress localAddress) {
        return Channels.bind(this, localAddress);
    }
    //sendDownstream事件
    public static ChannelFuture bind(Channel channel, SocketAddress localAddress) {
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    ChannelFuture future = future(channel);
    channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(
            channel, future, ChannelState.BOUND, localAddress));
    return future;
}

sendDownstream()事件

    public void sendDownstream(ChannelEvent e) {
        DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
        if (tail == null) {
            try {
                //终于看到sink了
                getSink().eventSunk(this, e);
                return;
            } catch (Throwable t) {
                notifyHandlerException(e, t);
                return;
            }
        }

        sendDownstream(tail, e);
    }

绕了一大圈,终于看到sink了

    public void eventSunk(
            ChannelPipeline pipeline, ChannelEvent e) throws Exception {
        Channel channel = e.getChannel();
        if (channel instanceof NioServerSocketChannel) {
            handleServerSocket(e);
        } else if (channel instanceof NioSocketChannel) {
            handleAcceptedSocket(e);
        }
    }

在这里ChannelEvent是BIND,channel是NioServerSocketChannel,state是BOUND

private void handleServerSocket(ChannelEvent e) {
    if (!(e instanceof ChannelStateEvent)) {
        return;
    }

    ChannelStateEvent event = (ChannelStateEvent) e;
    NioServerSocketChannel channel =
        (NioServerSocketChannel) event.getChannel();
    ChannelFuture future = event.getFuture();
    ChannelState state = event.getState();
    Object value = event.getValue();

    switch (state) {
    case OPEN:
        if (Boolean.FALSE.equals(value)) {
            close(channel, future);
        }
        break;
    case BOUND:
        if (value != null) {
            bind(channel, future, (SocketAddress) value);
        } else {
            close(channel, future);
        }
        break;
    default:
        break;
    }
}

直接进入看看bind

private void bind(
            NioServerSocketChannel channel, ChannelFuture future,
            SocketAddress localAddress) {

        boolean bound = false;
        boolean bossStarted = false;
        try {
            channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
            bound = true;

            future.setSuccess();
            fireChannelBound(channel, channel.getLocalAddress());

            Executor bossExecutor =
                ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
            DeadLockProofWorker.start(bossExecutor,
                    new ThreadRenamingRunnable(new Boss(channel),
                            "New I/O server boss #" + id + " (" + channel + ‘)‘));
            bossStarted = true;
        } catch (Throwable t) {
            future.setFailure(t);
            fireExceptionCaught(channel, t);
        } finally {
            if (!bossStarted && bound) {
                close(channel, future);
            }
        }
    }

从channel中拿到socket进行bind

channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());

启动boss线程

 Executor bossExecutor =
                ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
            DeadLockProofWorker.start(bossExecutor,
                    new ThreadRenamingRunnable(new Boss(channel),
                            "New I/O server boss #" + id + " (" + channel + ‘)‘));

在boss线程中,打开一个Selector,

 Boss(NioServerSocketChannel channel) throws IOException {
        this.channel = channel;
        selector = Selector.open();
        boolean registered = false;
        try {
            channel.socket.register(selector, SelectionKey.OP_ACCEPT);
            registered = true;
        } finally {
            if (!registered) {
                closeSelector();
            }
        }

        channel.selector = selector;
    }

将ServerSocketChannel的注册OP_ACCEPT到selector


Netty3.5.9源码(一)Server端启动,布布扣,bubuko.com

Netty3.5.9源码(一)Server端启动

上一篇:Linux下的IBM Websphere MQ Server安装测试文档


下一篇:css3新特性总结(视觉表现方面)