Netty章节十八:Netty Server Start 源码分析

Netty Server Start 源码分析

针对程序

 public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
     	//or  EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
     
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new MyServerInitializer());
            
            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

NioEventLoopGroup()

它是基于NIO 选择器(Selector)的Channel对象

无参构造

new NioEventLoopGroup(),创建一个新的实例并且使用默认的线程数,使用默认的ThreadFactory(线程工程),和SelectorProvider并且这是由SelectorProvider是由SelectorProvider.provider()静态方法方法提供的

public NioEventLoopGroup() {
    this(0);
}
//一直向里调用直到父类的MultithreadEventLoopGroup()构造函数,这个是决定线程数是多少的核心方法

//MultithreadEventLoopGroup类的静态代码块
static {
   /*
     SystemPropertyUtil.getInt(
        "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)
     如果第一个系统属性(需要自己设置)的值不存在则返回第二个参数的值(可用的处理器/系统核心数 * 2)
   */
   DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
          "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

   if (logger.isDebugEnabled()) {
       logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
   }
}

//调到这个方法时,参数都是
//(0,null,SelectorProvider.provider(),DefaultSelectStrategyFactory.INSTANCE,RejectedExecutionHandlers.reject())
//后面三个参数都是对应的返回值,这里写的是这个参数如何来的,以便理解
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}


//最后做初始化的构造方法
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
 	....   
}

Netty章节十八:Netty Server Start 源码分析

此时的nTreads为24是由 Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));获取的,因为我们没有配置系统属性io.netty.eventLoopThreads所以返回的是 (可用的处理器/系统核心数 * 2) 为24。最后会构造出24个线程为这个EventLoopGroup(事件循环组)工作

带参构造

NioEventLoopGroup(int) 如果创建的时候传入一个int值,那么它将使用这个int值个线程
不设置构造参数的话使用默认个线程(如果没有设置"io.netty.eventLoopThreads"系统属性那么他就会使用系统核心数*2的核心数)

为什么很多人设置为1,因为它是异步的只需要一个线程来不断的监听事件循环,当事件发生的时候获取到事件循环本身,然后将事件相应的处理工作丢给workerGroup

ServerBootstrap对象的方法

group()方法

group(EventLoopGroup parentGroup, EventLoopGroup childGroup)方法,设置这个parentGroup(bossGroup(接收远端发来的连接,将处理工作交给childGroup/child))与 child(workerGroup(与客户端打交道))。
这些EventLoopGroup 是用于处理针对于ServerChannel与Channel的所有的events(事件)以及IO(输入输出)
param:
parentGroup就是bossGroup childGroup是workerGroup

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        //如果AbstractBootstrap中的group属性没有被设置,则将parentGroup赋给group
        super.group(parentGroup);
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
    	//如果当前这个类的childGroup为null的话,将传进来的childGroup设置为当前这个类的childGroup
        this.childGroup设置值 = ObjectUtil.checkNotNull(childGroup, "childGroup");
        return this;
    }

channel()方法

/*
   通过channelClass的class对象来创建Channel的实例。如果你的Channel实现了无无参数的构造函数
   则可以使用this或者使用channelFactory()

*/
public B channel(Class<? extends C> channelClass) {
    //将ReflectiveChannelFactory对象赋值给,成员属性channelFactory
    return channelFactory(new ReflectiveChannelFactory<C>(
          ObjectUtil.checkNotNull(channelClass, "channelClass")
    ));
}

/*
    一个ChannelFactory,通过反射的形式调用其默认构造函数来实例化新的Channel。
 */
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
    //建设者
    private final Constructor<? extends T> constructor;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
       //检查clazz是否为空,如果为空抛出NullPointerException异常内容是第二个参数,如果不为空返回第一个参数
       ObjectUtil.checkNotNull(clazz, "clazz");
       try {
           //获取这个参数的构造函数赋给constructor,以便后面使用反射创建这个对象
           this.constructor = clazz.getConstructor();
       } catch (NoSuchMethodException e) {
           throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
                        " does not have a public non-arg constructor", e);
       }
    }
}

handler()方法

添加针对bossGroup发挥作用的Handler处理器

childHandler()方法

添加针对workerGroup发挥作用的Handler处理器

/*
   设置用于为通道的请求提供服务的ChannelHandler。
*/
public ServerBootstrap childHandler(ChannelHandler childHandler) {
   //childHandler参数不为空,则赋给 this.childHandler
   this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
        return this;
    }

核心方法bind()

/*
   创建一个新的Channel并将其绑定。
*/
public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
}

/--------------------------------------------------------

public ChannelFuture bind(SocketAddress localAddress) {
    validate();
    return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}

/--------------------------------------------------------

private ChannelFuture doBind(final SocketAddress localAddress) {
   //初始化并注册channel
   final ChannelFuture regFuture = initAndRegister();
   final Channel channel = regFuture.channel();
   if (regFuture.cause() != null) {
       return regFuture;
   }

   if (regFuture.isDone()) {
       // At this point we know that the registration was complete and successful.
       ChannelPromise promise = channel.newPromise();
       doBind0(regFuture, channel, localAddress, promise);
       return promise;
   } else {
       // Registration future is almost always fulfilled already, but just in case it‘s not.
       final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
       regFuture.addListener(new ChannelFutureListener() {
           @Override
           public void operationComplete(ChannelFuture future) throws Exception {
               Throwable cause = future.cause();
               if (cause != null) {
                   // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                   // IllegalStateException once we try to access the EventLoop of the Channel.
                   promise.setFailure(cause);
               } else {
                   // Registration was successful, so set the correct executor to use.
                   // See https://github.com/netty/netty/issues/2586
                   promise.registered();

                   doBind0(regFuture, channel, localAddress, promise);
               }
           }
       });
       return promise;
   }
}

initAndRegister()

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        /*使用channel()方法添加的ReflectiveChannelFactory工厂反射的创建channel对象(这个channel对象是NioServerSocketChannel),
        使用反射创建NioServerSocketChannel的时候会调用AbstractChannel父类的构造方法创建
        与这个Channel所关联的ChannelPipeline对象(实际类型是DefaultChannelPipeline)
        */
        channel = channelFactory.newChannel();
		//初始化channel
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            // 如果newChannel崩溃,则channel可以为null(例如SocketException(“打开的文件太多”))
            channel.unsafe().closeForcibly();
            // 由于尚未注册频道,因此我们需要强制使用GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // 由于尚未注册频道,因此我们需要强制使用GlobalEventExecutor
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    /*
    	正式开始注册
    	config()  返回当前ServerBootstrap的ServerBootstrapConfig对象
    	group()	  返回当前ServerBootstrap父类AbstractBootstrap里面维护的group对象,就是我们调用group()方法设置的EventLoopGroup,
    		在本例中group()返回的是bossGroup
    */
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    /*
        如果我们在这里,承诺没有失败,那就是下列情况之一:
        1)如果尝试从事件循环注册,则此时注册已完成。也就是说,现在尝试bind()或connect()是安全的,因为通道已经注册。
        2)如果我们尝试从另一个线程注册,则注册请求已成功添加到事件循环的任务队列以供以后执行。也就是说,现在尝试bind()或connect()是安全的:
            因为bind()或connect()将在执行计划的注册任务之后执行
            因为register()、bind()和connect()都绑定到同一个线程。
     */
    return regFuture;
}
init()
/*
    完成Options与Attributes相关的设定,
 */
@Override
void init(Channel channel) {
    //ChannelOption是用于配置与channel相关的特别是ChannelConfig里面的这些网络层的基本的配置
    //Option初始化的值可以在serverBootstrap初始化的时候使用.option()方法进行设置,不设置netty则使用底层给根据不同情况设定好的值
    setChannelOptions(channel, newOptionsArray(), logger);
    /*
    Attribute/AttributeKey主要维护业务数据可以在程序运行过程中,动态的向里面添加key value对象,然后在后面用到的地方在取出来(类似rquest作用域)
    实现了业务数据随着netty调用流程流转,实现数据共享(类似工作流引擎当中的jBPM、Activiti,在某个流程当中可以设置一些数据,然后在
    后续的流程节点当中将数据取出来,实现了数据随着流程流转,可以在后面再取出来)
    */
    setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));

    //在调用init()方法之前的channelFactory.newChannel();的时候以及创建好了与这个Channel所关联的ChannelPipeline对象,所以可以直接使用
    ChannelPipeline p = channel.pipeline();

    //currentChildGroup就是我们创建的workerGroup
    final EventLoopGroup currentChildGroup = childGroup;
    //currentChildHandler是我们调用childHandler()方法设置的Handler处理器,这里是MyServerInitializer
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
    }
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);

    //这里只是将ChannelInitializer对象添加到管道当中,initChannel()方法并不会执行,而是后续的某一个时刻会被调用
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) {
            //将生成的Channel对象对应的pipeline拿到
            final ChannelPipeline pipeline = ch.pipeline();
            //如果之前调用了handler()方法则将添加的对象addLast()到这个ChannelPipeline当中
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    //向Channel中添加一个接收器
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}
config().group().register(channel)
//MultithreadEventLoopGroup类的方法
@Override
public ChannelFuture register(Channel channel) {
   //这个register()方法调的是SingleThreadEventLoop类中的
   return next().register(channel);
}

@Override
public EventLoop next() {
   //返回一个
   return (EventLoop) super.next();
}

@Override
public EventExecutor next() {
    return chooser.next();
}

//DefaultEventExecutorChooserFactory类中的GenericEventExecutorChooser内部类的方法

private static final class GenericEventExecutorChooser implements EventExecutorChooser {
  private final AtomicInteger idx = new AtomicInteger();
  //保存的就是这个事件循环组的所以线程,本程序中就是一个一个的NioEventLoop。就是new NioEventLoopGroup()的时候创建的那些线程对象
  private final EventExecutor[] executors;

  GenericEventExecutorChooser(EventExecutor[] executors) {
      this.executors = executors;
  }
/*返回的是这个事件循环组中的某一个事件执行器(EventExecutor)根据Math.abs(idx.getAndIncrement() % executors.length)计算结果选择其中一个*/
  @Override
  public EventExecutor next() {
      return executors[Math.abs(idx.getAndIncrement() % executors.length)];
  }
}
SingleThreadEventLoop
@Override
public ChannelFuture register(Channel channel) {
   //创建一个Promise,传入要注册的Channel与事当前的件循环组
   return register(new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture register(final ChannelPromise promise) {
   ObjectUtil.checkNotNull(promise, "promise");
   //调用cahnnel()得到刚刚创建promise对象传进去的Channel对象,然后调用它(AbstractNioChannel)的unsafe()方法得到一个NioUnsafe对象,这个Unsafe对象来自于它的父类AbstractChannel,AbstractNioChannel做了一个向下类型转换,最后调用register()方法
   promise.channel().unsafe().register(this, promise);
   return promise;
}

//register()方法是AbstractChannel中的-----------------------------------

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(eventLoop, "eventLoop");
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }
    AbstractChannel.this.eventLoop = eventLoop;
    /*
    	核心代码段
    	判断当前正在执行这行代码的线程是不是SingleThreadEventExecutor中维护的thread
    	如果是则直接调用register0()注册
    	如果不是则将这注册任务以一个任务的形式提交给eventLoop(SingleThreadEventExecutor)当中维护的那个线程对象,让它去执行解决了多线程并发的问题
    */
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}

private void register0(ChannelPromise promise) {
     try {
         // check if the channel is still open as it could be closed in the mean time when the register
         // call was outside of the eventLoop
         if (!promise.setUncancellable() || !ensureOpen(promise)) {
             return;
         }
         boolean firstRegistration = neverRegistered;
         //注册的最底层实现,完成注册
         doRegister();
         neverRegistered = false;
         registered = true;

         // 确保在实际通知诺言之前,先调用handlerAdded(...)。这是必需的,因为用户可能已经通过ChannelFutureListener中的管道触发了事件
         pipeline.invokeHandlerAddedIfNeeded();

         safeSetSuccess(promise);
         pipeline.fireChannelRegistered();
         //如果从未注册过频道,则仅触发channelActive。如果通道已注销并重新注册,则可以防止激活多个通道。
         if (isActive()) {
             if (firstRegistration) {
                 pipeline.fireChannelActive();
             } else if (config().isAutoRead()) {
                 // 此通道之前已注册,并且已设置autoRead()。这意味着我们需要再次开始读取,以便处理入站数据。
                 //
                 // 可以接收客户端消息了
                 beginRead();
             }
         }
      } catch (Throwable t) {
          // Close the channel directly to avoid FD leak.
          closeForcibly();
          closeFuture.setClosed();
          safeSetFailure(promise, t);
      }
}


//doRegister()方法是由子类AbstractNioChannel实现的
@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            //javaChannel()返回的是SelectableChannel(ServerSocketChannelImpl),然后将这个channel注册到eventLoop().unwrappedSelector()返回的Selector上,0是感兴趣的事件
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

重要的类说明

ChannelOption<T>

ChannelOption允许以类型安全的方式配置一个ChannelConfig
到底支持哪一种ChannelOption取决于ChannelConfig的实际实现也可能依赖于它所属的运输层的本质。

用于存储这个channel与TCP/IP相关的一些基本的配置信息,它是不存储值的值在其它地方存储,它里面存放的是
值的类型(ChannelOption本身并不维护选项的值的信息,它只维护这个选项本身/这个名字本身)

param:
泛型 T ChannelOption值的类型(某一个选项/某一个设置项的类型)

ChannelConfig

对于channel的一套配置属性
下转换到更具体的配置类型,如SocketChannelConfig或使用setOptions(Map)设置与传输具体相关的属性:
Channel ch = ...;
SocketChannelConfig cfg = (SocketChannelConfig) ch.getConfig();
cfg.setTcpNoDelay(false);

Option map
一个Option map属性是动态的只能写的属性,可以进行Channel的配置而无需向下类型转换。

名称 相关setter方法
ChannelOption.CONNECT_TIMEOUT_MILLIS setConnectTimeoutMillis(int)
ChannelOption.WRITE_SPIN_COUNT setWriteSpinCount(int)
ChannelOption.WRITE_BUFFER_WATER_MARK setWriteBufferWaterMark(WriteBufferWaterMark)
ChannelOption.ALLOCATOR setAllocator(ByteBufAllocator)
ChannelOption.AUTO_READ setAutoRead(boolean)

还有很多选项都位于ChannelConfig的子类当中,比如说你可以配置一些参数特定于TCP/IP scoket 参数

AttributeKey<T>

属性键可以用于在AttributeMap外面访问Attribute,请注意相同的名字不可能有多个键(keys)

param:
T Attribute类型,其可以通过该访问AttributeKey 。

Channel与ChannelContext作用域

直接向Channel上面附加的属性与向ChannelContext附加的属性,它们的作用域有什么不同

  1. Netty4.0
    对于整个channel来说它有一个map,用于维护它的属性和值的映射关系。而针对每一个ChannelHandlerContext也拥有自己的一个map,在Netty的组件当中只要有一个handler就会有一个与之相关和对应的ChannelHandlerContext。如果有10个handler那么Netty就会创建10ChannelHandlerContext同时在10个ChannelHandlerContext当中就会拥有10个不同的map用于分别存放这个handler在自己的作用域中所拥有的key value值,而channel本身又有一个独立的map映射信息。

    这种做法有两个问题:

    1. 当你在A handler当中set的值,在B handler当中是拿不到的,或者说你在channel当中设置的值,在handler中也是拿不到的
    2. 浪费内存,创建了太多个map每个ChannelHandlerContext都会有一个
  2. Netty4.1之后
    只会有一个map对象,而这个map对象会被channel以及这个channel上的所有handler所共享,而且key是不会重复的

Netty章节十八:Netty Server Start 源码分析

上一篇:Netty章节二十二:Netty自定义编解码器


下一篇:js的if判断,关于==的判断