咱们先捋一遍再看源码:
Selector selector = Selector.open(); 在默认情况下生成了一个WindowsSelectorImpl实例,并且建立了Pipe
创建Selector对象:
Selector selector = Selector.open();
Selector实现原理:
SocketChannel、ServerSocketChannel和Selector的实例初始化都通过SelectorProvider类实现,我们先获取相应的SelectorProvider。
public static SelectorProvider provider() {
synchronized (lock) {
//provider不为空,直接返回provider
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
//由JDK的参数-Djava.nio.channels.spi.SelectorProvider=class设置的class来反射构造SelectorProvider
if (loadProviderFromProperty())
return provider;
//从jar中的目录META-INF/services配置文件中找参数java.nio.channels.spi.SelectorProvider=class设置的第一个class来反射构造SelectorProvider
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
AccessController.doPrivileged属于特权操作,意思是不管这个方法由哪个用户发起,都无需对此操作涉及的资源(文件读写特权等等)进行检查。
由于SelectorProvider在不同操作系统下有不同的实现。
provider = sun.nio.ch.DefaultSelectorProvider.create();根据不同操作系统返回不同实现类,这里主要以Windows实现梳理整个流程,windows平台就返回WindowsSelectorProvider。
package sun.nio.ch;
import java.io.IOException;
import java.nio.channels.spi.AbstractSelector;
public class WindowsSelectorProvider extends SelectorProviderImpl {
public WindowsSelectorProvider() {
}
public AbstractSelector openSelector() throws IOException {
return new WindowsSelectorImpl(this);
}
}
以默认情况为例,采用的就是WindowsSelectorImpl.
在Windows操作系统JDK里sun.nio.ch.DefaultSelectorProvider源代码:
package sun.nio.ch;
import java.nio.channels.spi.SelectorProvider;
public class DefaultSelectorProvider
{
public static SelectorProvider create()
{
return new WindowsSelectorProvider();
}
}
这里直接返回WindowsSelectorProvider,即就是没进行任何的配置,则会使用WindowsSelectorProvider,其构造方法为空,没有任何实现,获得SelectorProvider后,调用openSelector()方法获得对应Selector。
private final Pipe wakeupPipe = Pipe.open();
WindowsSelectorImpl(SelectorProvider var1) throws IOException {
super(var1);
this.wakeupSourceFd = ((SelChImpl)this.wakeupPipe.source()).getFDVal();
SinkChannelImpl var2 = (SinkChannelImpl)this.wakeupPipe.sink();
var2.sc.socket().setTcpNoDelay(true);
this.wakeupSinkFd = var2.getFDVal();
this.pollWrapper.addWakeupSocket(this.wakeupSourceFd, 0);
}
观察构造方法,首先需要用到wakeupPipe, 这个成员的取得需要Pipe调用open()方法。
public static Pipe open() throws IOException {
return SelectorProvider.provider().openPipe();
}
直接调用了SelectorProvider的openPipe()方法:
public Pipe openPipe() throws IOException {
return new PipeImpl(this);
}
openPipe()方法在WindowsSelectorProvider的父类SelectorProviderImpl类中实现,
openPipe()的实现调用了pipeImpl的构造方法,
static {
Util.load();
byte[] var0 = new byte[8];
boolean var1 = IOUtil.randomBytes(var0);
if(var1) {
rnd = new Random(ByteBuffer.wrap(var0).getLong());
} else {
rnd = new Random();
}
}
PipeImpl(SelectorProvider var1) throws IOException {
try {
AccessController.doPrivileged(new PipeImpl.Initializer(var1));
} catch (PrivilegedActionException var3) {
throw (IOException)var3.getCause();
}
}
这里首先在PipeImpl的静态块中会产生一个随机数保存,然后在其构造方法中,通过AccessController调用doPrivileged方法new一个PipeImpl中内部类Initializer,这个方法会调用传入的类所实现的run()方法,并保证其内部所涉及的权限问题,接下来看其run()方法:
public Void run() throws IOException {
PipeImpl.Initializer.LoopbackConnector var1 = new PipeImpl.Initializer.LoopbackConnector();
var1.run();
if(this.ioe instanceof ClosedByInterruptException) {
this.ioe = null;
Thread var2 = new Thread(var1) {
public void interrupt() {
}
};
var2.start();
while(true) {
try {
var2.join();
break;
} catch (InterruptedException var4) {
;
}
}
Thread.currentThread().interrupt();
}
if(this.ioe != null) {
throw new IOException("Unable to establish loopback connection", this.ioe);
} else {
return null;
}
}
该方法内部会初始化Pipe管道的读写对象,初始化完成后对异常进行分类处理。启动了一个线程,看LoopbackConnector(),内部类Initializer的内部类LoopbackConnector,通过LoopbackConnector的实例化对象调用其run()方法:
private LoopbackConnector() {
}
public void run() {
//定义服务端Channel
ServerSocketChannel var1 = null;
//定义两个客户端通道分别负责读写
SocketChannel var2 = null;
SocketChannel var3 = null;
try {
//初始化两个ByteBuffer缓冲区,支持读写操作
ByteBuffer var4 = ByteBuffer.allocate(16);
ByteBuffer var5 = ByteBuffer.allocate(16);
InetAddress var6 = InetAddress.getByName("127.0.0.1");
assert var6.isLoopbackAddress();
//对服务端创建的IP和端口进行存储,
InetSocketAddress var7 = null;
//自旋处理,直接成功初始化出读写对象
while(true) {
// 初始化ServerSocketChannel,用户提供服务
if (var1 == null || !var1.isOpen()) {
var1 = ServerSocketChannel.open();
var1.socket().bind(new InetSocketAddress(var6, 0));
var7 = new InetSocketAddress(var6, var1.socket().getLocalPort());
}
//通过已经初始化的IP和端口打开一个写通道
var2 = SocketChannel.open(var7);
PipeImpl.RANDOM_NUMBER_GENERATOR.nextBytes(var4.array());
do {
//将数据全部写出去
var2.write(var4);
} while(var4.hasRemaining());
var4.rewind();
// 通过服务端获取一个读请求
// 此处读取上一步写的数据
var3 = var1.accept();
do {
// 读取到写数据, 添加到另一个缓冲区
var3.read(var5);
} while(var5.hasRemaining());
var5.rewind();
// 如果读写数据一致,说明管道通信正常,初始化管道的读对象和写对象
if (var5.equals(var4)) {
// 读对象
PipeImpl.this.source = new SourceChannelImpl(Initializer.this.sp, var2);
// 写对象
PipeImpl.this.sink = new SinkChannelImpl(Initializer.this.sp, var3);
break;
}
var3.close();
var2.close();
}
} catch (IOException var18) {
try {
if (var2 != null) {
var2.close();
}
if (var3 != null) {
var3.close();
}
} catch (IOException var17) {
;
}
Initializer.this.ioe = var18;
} finally {
try {
if (var1 != null) {
var1.close();
}
} catch (IOException var16) {
;
}
}
}
val1通过ServerSocketChannel.open(),通过SelectorProviderImpl的openServerSocketChannel()
public ServerSocketChannel openServerSocketChannel() throws IOException {
return new ServerSocketChannelImpl(this);
}
返回ServerSocketChannelImpl
ServerSocketChannelImpl(SelectorProvider var1) throws IOException {
super(var1);
this.fd = Net.serverSocket(true);
this.fdVal = IOUtil.fdVal(this.fd);
this.state = 0;
}
回到WindowsSelectorImpl的构造方法,用wakeupSourceFd保存source(SourceChannelImpl)的fdVal值,用wakeupSinkFd保存sink(SinkChannelImpl)的fdVal值;禁用Nagle算法,最后使用pollWrpper成员保存source的fdVal值。得到source的fd跟sink的fd并保存,把wakeupSourceFd加到PoolWrapper中。至此selector创建完毕。
WindowsSelectorImpl(SelectorProvider var1) throws IOException {
super(var1);
this.wakeupSourceFd = ((SelChImpl)this.wakeupPipe.source()).getFDVal();
SinkChannelImpl var2 = (SinkChannelImpl)this.wakeupPipe.sink();
var2.sc.socket().setTcpNoDelay(true);
this.wakeupSinkFd = var2.getFDVal();
this.pollWrapper.addWakeupSocket(this.wakeupSourceFd, 0);
}
void addWakeupSocket(int var1, int var2) {
this.putDescriptor(var2, var1);
this.putEventOps(var2, Net.POLLIN);
}
void putDescriptor(int var1, int var2) {
this.pollArray.putInt(SIZE_POLLFD * var1 + 0, var2);
}
void putEventOps(int var1, int var2) {
this.pollArray.putShort(SIZE_POLLFD * var1 + 4, (short)var2);
}
究极机器
发布了22 篇原创文章 · 获赞 4 · 访问量 2093
私信
关注