以netty的一个小demo为例(使用的源码版本为4.1.50.Final)
从b.bind(PORT)
跟进
最终调用到AbstractBootstrap#doBind
这个方法
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it‘s not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
这里就是服务端创建的主要代码
创建Channel
主要概括如下
1.通过demo里传入的NioServerSocketChannel
类反射创建channel
2.在NioServerSocketChannel
的构造函数里
通过java底层的SelectorProvider创建ServerSocketChannel 调用父类构造函数设置阻塞模式,创建id,unsafe,pipeline等 创建NioServerSocketChannelConfig,便于进行参数配置
方法体里第一行代码的initAndRegister
方法就是在创建和初始化channel
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel(); //创建channel,本节分析
init(channel);
} catch (Throwable t) {
//省略
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
创建channel就在channelFactory.newChannel()
这行代码里,这里会调用到ReflectiveChannelFactory#newChannel
这个方法
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
//省略
}
}
可以看到这里用反射创建一个channel,也就是说channel是通过channelFactory
的constructor
来反射创建的,而channelFactory
是在demo里调用channel方法时初始化的,如下
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) //在这里初始化了`channelFactory
//省略
在AbstractBootstrap#channel
方法里
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
最终调用到AbstractBootstrap#channelFactory
方法里
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
ObjectUtil.checkNotNull(channelFactory, "channelFactory");
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return self();
}
既然这里是通过反射创建了一个NioServerSocketChannel
,接下来看看NioServerSocketChannel
这个类的构造函数
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
先看看NioServerSocketChannel#newSocket
这个方法
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel(); //provider就是DEFAULT_SELECTOR_PROVIDER,为SelectorProvider.provider()
} catch (IOException e) {
//省略
}
}
上面的构造函数调用到了另一个构造函数
public NioServerSocketChannel(ServerSocketChannel channel) {
//注意这里传入SelectionKey.OP_ACCEPT,用于之后注册accept事件
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
super方法调用父类做一些简单的初始化,主要在AbstractNioChannel
和AbstractChannel
这两个类这种
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
//省略
}
}
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
NioServerSocketChannelConfig
则是传入java底层的ServerSocketChannel方便以后对做一些配置
初始化channel
先概括一下,主要做了这几件事
将用户设置的options和attrs设置到channel的config里 向channel的pipeline里添加用户传入的handler 创建一个特殊的handler即ServerBootstrapAcceptor(用于处理新连接接入)并添加进pipeline里
接下来分析初始化channel的过程,即init(channel)
这行代码
这里会调用到ServerBootstrap#init
这个方法
void init(Channel channel) {
//newOptionsArray这个方法拿到用户设置的options设置到channel的config里
setChannelOptions(channel, newOptionsArray(), logger);
//attrs0这个方法拿到用户设置的attrs设置到channel里
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
//拿到用于配置childHandler的childOptions
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
}
//拿到用于配置childHandler的childAttrs
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
//拿到demo里调用childHandler方法时添加的handler
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
//ServerBootstrapAcceptor是一个特殊的handler,用于处理新连接接入
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}