ServerBootstrap创建
1,构造一个NioServerSocketChannelFactory来,初始化ServerBootstrap
2,构造一个ChannelPipelineFactory,给ServerBootstrap设置ChannelPipelineFactory
3, 绑定端口,接受client端连接过来的请求。
public void run() {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new EchoServerHandler());
}
});
// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(port));
}
public static void main(String[] args) throws Exception {
...
new EchoServer(port).run();
}
构造NioServerSocketChannelFactory,构造NioWorkerPool线程池,启动work线程
这里采用Reactor模式,bossExecutor用来接收客户端连接,而workerExecutor用来执行IO
的read,write等操作,这里还有个大侠NioServerSocketPipelineSink
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())
public NioServerSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor) {
this(bossExecutor, workerExecutor, SelectorUtil.DEFAULT_IO_THREADS);
}
public NioServerSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor,
int workerCount) {
this(bossExecutor, new NioWorkerPool(workerExecutor, workerCount));
}
public NioServerSocketChannelFactory(
Executor bossExecutor, WorkerPool<NioWorker> workerPool) {
if (bossExecutor == null) {
throw new NullPointerException("bossExecutor");
}
if (workerPool == null) {
throw new NullPointerException("workerPool");
}
this.bossExecutor = bossExecutor;
this.workerPool = workerPool;
sink = new NioServerSocketPipelineSink(workerPool);
}
将workerPool绑定到NioServerSocketPipelineSink
NioServerSocketPipelineSink(WorkerPool<NioWorker> workerPool) {
this.workerPool = workerPool;
}
NioWorkerPool
public NioServerSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor,
int workerCount) {
this(bossExecutor, new NioWorkerPool(workerExecutor, workerCount));
}
NioWorkerPool线程池的构造,创建workerCount个NioWorker,默认为CPU的2倍数量
public NioWorkerPool(Executor workerExecutor, int workerCount) {
super(workerExecutor, workerCount);
}
AbstractNioWorkerPool(Executor workerExecutor, int workerCount) {
if (workerExecutor == null) {
throw new NullPointerException("workerExecutor");
}
if (workerCount <= 0) {
throw new IllegalArgumentException(
"workerCount (" + workerCount + ") " +
"must be a positive integer.");
}
workers = new AbstractNioWorker[workerCount];
for (int i = 0; i < workers.length; i++) {
workers[i] = createWorker(workerExecutor);
}
this.workerExecutor = workerExecutor;
}
NioWorker
protected NioWorker createWorker(Executor executor) {
return new NioWorker(executor);
}
public NioWorker(Executor executor) {
super(executor);
}
这个super是AbstractNioWorker,打开Selector
AbstractNioWorker(Executor executor) {
this.executor = executor;
openSelector();
}
打开Selector,启动worker thread
private void openSelector() {
try {
selector = Selector.open();
} catch (Throwable t) {
throw new ChannelException("Failed to create a selector.", t);
}
// Start the worker thread with the new Selector.
boolean success = false;
try {
DeadLockProofWorker.start(executor, new ThreadRenamingRunnable(this, "New I/O worker #" + id));
success = true;
} finally {
if (!success) {
// Release the Selector if the execution fails.
try {
selector.close();
} catch (Throwable t) {
logger.warn("Failed to close a selector.", t);
}
selector = null;
// The method will return to the caller at this point.
}
}
assert selector != null && selector.isOpen();
}
设置ChannelPipelineFactory
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new EchoServerHandler());
}
});
绑定端口
bootstrap.bind(new InetSocketAddress(port));
ServerBootstrap中的bind()方法
//构造一个UpstreamHandler,Binder
ChannelHandler binder = new Binder(localAddress, futureQueue);
ChannelHandler parentHandler = getParentHandler();
ChannelPipeline bossPipeline = pipeline();
//将binder注册上bossPipeline
bossPipeline.addLast("binder", binder);
if (parentHandler != null) {
bossPipeline.addLast("userHandler", parentHandler);
}
//创建channel
Channel channel = getFactory().newChannel(bossPipeline);
再来看看ChannelFactory中的newChannel()
public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
return new NioServerSocketChannel(this, pipeline, sink);
}
在NioServerSocketChannel完成pipeline与sink的绑定,打开一个ServerSocketChannel,并且配置为非阻塞模式,触发ChannelOpen事件
NioServerSocketChannel(
ChannelFactory factory,
ChannelPipeline pipeline,
ChannelSink sink) {
super(factory, pipeline, sink);
socket = ServerSocketChannel.open();
socket.configureBlocking(false);
config = new DefaultServerSocketChannelConfig(socket.socket());
//触发ChannelOpen事件
fireChannelOpen(this);
NioServerSocketChannel-->super()-->AbstractChannel- 绑定pipeline与sink
protected AbstractChannel(
Channel parent, ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink) {
this.parent = parent;
this.factory = factory;
this.pipeline = pipeline;
id = allocateId(this);
//pipeline与sink的绑定
pipeline.attach(this, sink);
}
fireChannelOpen(this);触发ChannelOpen事件
channel.getPipeline().sendUpstream(
new UpstreamChannelStateEvent(
channel, ChannelState.OPEN, Boolean.TRUE));
这里是默认的DefaultChannelPipeline来处理sendUpstream()事件,
DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
sendUpstream(head, e);
这个head的handler是ServerBootstrap$Binder
void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
try {
((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
} catch (Throwable t) {
notifyHandlerException(e, t);
}
}
ctx.getHandler()获取到的handler是Binder,而这个handleUpstream在其父类SimpleChannelUpstreamHandler中
public void handleUpstream(
ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof MessageEvent) {
messageReceived(ctx, (MessageEvent) e);
} else if (e instanceof WriteCompletionEvent) {
WriteCompletionEvent evt = (WriteCompletionEvent) e;
writeComplete(ctx, evt);
} else if (e instanceof ChildChannelStateEvent) {
ChildChannelStateEvent evt = (ChildChannelStateEvent) e;
if (evt.getChildChannel().isOpen()) {
childChannelOpen(ctx, evt);
} else {
childChannelClosed(ctx, evt);
}
} else if (e instanceof ChannelStateEvent) {
ChannelStateEvent evt = (ChannelStateEvent) e;
switch (evt.getState()) {
case OPEN:
if (Boolean.TRUE.equals(evt.getValue())) {
channelOpen(ctx, evt);
} else {
channelClosed(ctx, evt);
}
break;
case BOUND:
if (evt.getValue() != null) {
channelBound(ctx, evt);
} else {
channelUnbound(ctx, evt);
}
break;
case CONNECTED:
if (evt.getValue() != null) {
channelConnected(ctx, evt);
} else {
channelDisconnected(ctx, evt);
}
break;
case INTEREST_OPS:
channelInterestChanged(ctx, evt);
break;
default:
ctx.sendUpstream(e);
}
} else if (e instanceof ExceptionEvent) {
exceptionCaught(ctx, (ExceptionEvent) e);
} else {
ctx.sendUpstream(e);
}
这里的ChannelEvent是ChannelStateEvent,OPEN,进入channelOpen(),这个channelOpen是在Binder中实现的
public void channelOpen(
ChannelHandlerContext ctx,
ChannelStateEvent evt) {
try {
evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());
// Split options into two categories: parent and child.
Map<String, Object> allOptions = getOptions();
Map<String, Object> parentOptions = new HashMap<String, Object>();
for (Entry<String, Object> e: allOptions.entrySet()) {
if (e.getKey().startsWith("child.")) {
childOptions.put(
e.getKey().substring(6),
e.getValue());
} else if (!e.getKey().equals("pipelineFactory")) {
parentOptions.put(e.getKey(), e.getValue());
}
}
// Apply parent options.
evt.getChannel().getConfig().setOptions(parentOptions);
} finally {
ctx.sendUpstream(evt);
}
boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress));
assert finished;
}
关联主Channel与getPipelineFactory(pipelineFactory==EchoServer$1?)
evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());
将parentOptions与主主Channel关联
evt.getChannel().getConfig().setOptions(parentOptions);
绑定之前打开的NioServerSocketChannel
evt.getChannel().bind(localAddress)
bind()
public ChannelFuture bind(SocketAddress localAddress) {
return Channels.bind(this, localAddress);
}
//sendDownstream事件
public static ChannelFuture bind(Channel channel, SocketAddress localAddress) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
ChannelFuture future = future(channel);
channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(
channel, future, ChannelState.BOUND, localAddress));
return future;
}
sendDownstream()事件
public void sendDownstream(ChannelEvent e) {
DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
if (tail == null) {
try {
//终于看到sink了
getSink().eventSunk(this, e);
return;
} catch (Throwable t) {
notifyHandlerException(e, t);
return;
}
}
sendDownstream(tail, e);
}
绕了一大圈,终于看到sink了
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
Channel channel = e.getChannel();
if (channel instanceof NioServerSocketChannel) {
handleServerSocket(e);
} else if (channel instanceof NioSocketChannel) {
handleAcceptedSocket(e);
}
}
在这里ChannelEvent是BIND,channel是NioServerSocketChannel,state是BOUND
private void handleServerSocket(ChannelEvent e) {
if (!(e instanceof ChannelStateEvent)) {
return;
}
ChannelStateEvent event = (ChannelStateEvent) e;
NioServerSocketChannel channel =
(NioServerSocketChannel) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
close(channel, future);
}
break;
case BOUND:
if (value != null) {
bind(channel, future, (SocketAddress) value);
} else {
close(channel, future);
}
break;
default:
break;
}
}
直接进入看看bind
private void bind(
NioServerSocketChannel channel, ChannelFuture future,
SocketAddress localAddress) {
boolean bound = false;
boolean bossStarted = false;
try {
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
bound = true;
future.setSuccess();
fireChannelBound(channel, channel.getLocalAddress());
Executor bossExecutor =
((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
DeadLockProofWorker.start(bossExecutor,
new ThreadRenamingRunnable(new Boss(channel),
"New I/O server boss #" + id + " (" + channel + ‘)‘));
bossStarted = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (!bossStarted && bound) {
close(channel, future);
}
}
}
从channel中拿到socket进行bind
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
启动boss线程
Executor bossExecutor =
((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
DeadLockProofWorker.start(bossExecutor,
new ThreadRenamingRunnable(new Boss(channel),
"New I/O server boss #" + id + " (" + channel + ‘)‘));
在boss线程中,打开一个Selector,
Boss(NioServerSocketChannel channel) throws IOException {
this.channel = channel;
selector = Selector.open();
boolean registered = false;
try {
channel.socket.register(selector, SelectionKey.OP_ACCEPT);
registered = true;
} finally {
if (!registered) {
closeSelector();
}
}
channel.selector = selector;
}
将ServerSocketChannel的注册OP_ACCEPT到selector