MINA系列学习-IoAccpetor

其实在mina的源码中,IoService可以总结成五部分service责任、Processor线程处理、handler处理器、接收器和连接器,分别对应着IoService、IoProcessor、IoHandler、IoAcceptor和IoConnector。在代码的中有如下包跟IoService关系密切:

org.apache.mina.core.service
org.apache.mina.transport.*
org.apache.mina.core.polling 这个包主要是实现了轮询策略

其中core.service包中主要定义了上面五个部分的接口,以及IoHandler和IoProcessor的实现(Handler是一种常见的设计模式,具体的业务其实并没有多少,只是在结构上将层次划分的更清楚。Processor是mina内部定义的接口,一般不对外使用,用mina官方的话,主要performs actual I/O operations for IoSession. 这一部分我想放在IoSession中来讲)。而在transport包中实现了具体的连接方式,当然,这部分也是我们今天阅读的重点。我想要关注的也是mina底层如何用NIO实现各种连接。

所以这一节的重点是IoAcceptor和IoConnector,Handler和IoService只是稍稍带过,写点儿用法和构成。先看mina对IoService的介绍:IoService provides basic I/O Service and manages I/O Sessions within MINA.

MINA系列学习-IoAccpetor

上面的图简单介绍了IoService的职责,以及其具体实现类AbstractIoService中的职责。在比较大的框架中,都是采用了大量的抽象类之间继承,采用层级实现细节这样的方式来组织代码。所以在mina中看到Abstract开头的类,并不仅仅只是一个抽象,其实里面也包含很多的实现了。

IoService用来管理各种IO服务,在mina中,这些服务可以包括session、filter、handler等。在AbstractIoService中,也是通过线程池来装载这些服务的:

private final Executor executor;
private final boolean createdExecutor; protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) {
//省略。。。 if (executor == null) {
this.executor = Executors.newCachedThreadPool();
createdExecutor = true;
} else {
this.executor = executor;
createdExecutor = false;
} threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
}

然后我们关注一下service中的dispose方法,学习一下线程安全在这里的用法:

/**
* A lock object which must be acquired when related resources are
* destroyed.
*/
protected final Object disposalLock = new Object(); private volatile boolean disposing; private volatile boolean disposed; private final boolean createdExecutor; public final void dispose(boolean awaitTermination) {
if (disposed) {
return;
} synchronized (disposalLock) {
if (!disposing) {
disposing = true; try {
dispose0();
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
}
}
} if (createdExecutor) {
ExecutorService e = (ExecutorService) executor;
e.shutdownNow();
if (awaitTermination) { //Thread.currentThread().setName(); try {
LOGGER.debug("awaitTermination on {} called by thread=[{}]", this, Thread.currentThread().getName());
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
LOGGER.debug("awaitTermination on {} finished", this);
} catch (InterruptedException e1) {
LOGGER.warn("awaitTermination on [{}] was interrupted", this);
// Restore the interrupted status
Thread.currentThread().interrupt();
}
}
}
disposed = true;
}

为了多线程之间变量内存的可见性,防止抢占资源时候出现意想不到的问题,这里用了volatile关键字修饰布尔型变量这样的经典用法,同时使用内置锁机制控制线程的访问。与IoService比较相关的还有个TransportMetadata,这个类主要记录了IoService的相关信息,在使用中,也不是很常见,所以略过这部分了。更多的线程运用在IoProcessor中会体现,这部分放到后面写,今天的主要目的还是连接IoAcceptor和IoConnector。

从现在开始就要复杂了,我们先从服务器的接收端开始写起,也就是IoAccpetor,先上一张宏观的图,来理清思路,图来自mina官网:

MINA系列学习-IoAccpetor

简单介绍一下,连接的实现有四种:

We have many of those implementing classes

  • NioSocketAcceptor : the non-blocking Socket transport IoAcceptor
  • NioDatagramAcceptor : the non-blocking UDP transport IoAcceptor
  • AprSocketAcceptor : the blocking Socket transport IoAcceptor, based on APR
  • VmPipeSocketAcceptor : the in-VM IoAcceptor

我们按照图上用箭头标出的两条路来分析我们最常用的NioSocketAcceptor,回顾一下调用这个类的过程:

// Create a TCP acceptor
IoAcceptor acceptor = new NioSocketAcceptor(); // Associate the acceptor to an IoHandler instance (your application)
acceptor.setHandler(this); // Bind : this will start the server...
acceptor.bind(new InetSocketAddress(PORT)); System.out.println("Server started...");

从左边的路走起:

MINA系列学习-IoAccpetor

接口IoAcceptor直接继承了IoService接口,并定义了自己特有的操作。其操作的具体实现由AbstractIoAcceptor完成(注意是上图左边的类来实现的)。我们继续从左边往下看,与IoAcceptor直接关联的是SocketAcceptor接口,它们(IoAcceptor和SocketAcceptor)之间也是接口的继承关系,所以根据前面的经验,我们可以猜测,SocketAcceptor一定又新定义了一些属于自己需要去实现的操作,这样做肯定是为了与另一种实现DatagaramAcceptor来区别,事实也确实如此,看下图:

MINA系列学习-IoAccpetor

这里需要注意的是上图被框出来的部分,他们的返回类型均被缩小了(我记不得是向下转型还是向上转型,应该是向下吧,现在返回的都是子类),这样实现的好处是绝对的专一,避免了转型上的错误。

在看NioSocketAcceptor之前,我们还是要先看右边这条路:

public abstract class AbstractIoAcceptor extends AbstractIoService implements IoAcceptor

回顾一下,AbstractIoService实现了对session的管理,IoAcceptor定义了一些建立连接时用到的一系列方法,这样一来,AbstractIoAcceptor一来有了对session使用的功能,二来需要实现建立连接里需要用到的那些方法,理清楚了这些,我们可以看AbstractIoAcceptor的具体实现:

private final List<SocketAddress> defaultLocalAddresses = new ArrayList<SocketAddress>();

    private final List<SocketAddress> unmodifiableDefaultLocalAddresses = Collections
.unmodifiableList(defaultLocalAddresses); private final Set<SocketAddress> boundAddresses = new HashSet<SocketAddress>(); private boolean disconnectOnUnbind = true; /**
* The lock object which is acquired while bind or unbind operation is performed.
* Acquire this lock in your property setters which shouldn't be changed while
* the service is bound.
*/
protected final Object bindLock = new Object();

这里有三点要说:

l  这里的Address不是用了list就是用了set,注意这些都是用来保存LocalAddress的,mina在设计的时候考虑的很全面,服务器可能会有多个网卡的啊。

l  解释下unmodifiableList,JDK自带方法:Returns an unmodifiable view of the specified list. This method allows modules to provide users with "read-only" access to internal lists. Query operations on the returned list "read through" to the specified list, and attempts to modify the returned list, whether direct or via its iterator, result in an UnsupportedOperationException. The returned list will be serializable if the specified list is serializable. Similarly, the returned list will implement RandomAccess if the specified list does.

l  时刻注意线程安全的问题。

AbstractIoAcceptor主要是对localAddress的操作,里面涉及到集合类的使用和内置锁的使用,其他没有什么特别的,大家可以看看源码,容易理解。主要可以看看里面的bind方法,这个里面涉及了两层锁。

AbstractPollingIoAcceptor在实现连接中至关重要的一个类,他是socket轮询策略的主要实现,有那么几点要关注:

l  polling strategy : The underlying sockets will be checked in an active loop and woke up when an socket needed to be processed.

l  An Executor will be used for running client accepting and an AbstractPollingIoProcessor will be used for processing client I/O operations like reading, writing and closing.(将连接和业务分开,AbstractPollingIoProcessor部分会在session部分写)。

l  All the low level methods for binding, accepting, closing need to be provided by the subclassing implementation.

这个类光看用到的工具就知道他不简单了,之前我们见过的处理线程安全问题都只是用了内置锁,这里终于用到了concurrent包里的东西了。这个类里没有出现NIO中selector这样的东西,而是做了一种策略,这种策略是在线程处理上的,采用队列和信号量协同处理socket到来时候连接的策略。推荐大家仔细看看这个类,里面的注释很详细,我列几个成员变量,你看看有兴趣没:

/** A lock used to protect the selector to be waked up before it's created */
private final Semaphore lock = new Semaphore(1); private final IoProcessor<S> processor; private final boolean createdProcessor; private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>(); private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>(); private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>()); private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture(); /** A flag set when the acceptor has been created and initialized */
private volatile boolean selectable; /** The thread responsible of accepting incoming requests */
private AtomicReference<Acceptor> acceptorRef = new AtomicReference<Acceptor>(); protected boolean reuseAddress = false;

在这个类里还有一个Acceptor的内部类,实现runnable接口,主要用作接收客户端的请求。这个类也有可看性。这里面的东西不是很好写,需要大家自己去细细品味。

看最后一个类,两边的焦点,NioSocketAcceptor。看之前在AbstractPollingIoAcceptor里有句话:All the low level methods for binding, accepting, closing need to be provided by the subclassing implementation.这里总该有NIO的一些东西了:

MINA系列学习-IoAccpetor

public final class NioSocketAcceptor extends AbstractPollingIoAcceptor<NioSession, ServerSocketChannel> implements
SocketAcceptor { private volatile Selector selector;

我想看到的东西终于来了,selector,先不说,接着往下看:

@Override
protected void init() throws Exception {
selector = Selector.open();
} @Override
protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception { SelectionKey key = handle.keyFor(selector); if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
return null;
} // accept the connection from the client
SocketChannel ch = handle.accept(); if (ch == null) {
return null;
} return new NioSocketSession(this, processor, ch);
} @Override
protected ServerSocketChannel open(SocketAddress localAddress) throws Exception {
// Creates the listening ServerSocket
ServerSocketChannel channel = ServerSocketChannel.open(); boolean success = false; try {
// This is a non blocking socket channel
channel.configureBlocking(false); // Configure the server socket,
ServerSocket socket = channel.socket(); // Set the reuseAddress flag accordingly with the setting
socket.setReuseAddress(isReuseAddress()); // and bind.
socket.bind(localAddress, getBacklog()); // Register the channel within the selector for ACCEPT event
channel.register(selector, SelectionKey.OP_ACCEPT);
success = true;
} finally {
if (!success) {
close(channel);
}
}
return channel;
}

都是最基本的java NIO吧,是不是很熟悉,是不是有种相见恨晚的感觉,是不是很简单。当然简单是相对的,因为之间做了那么多铺垫,所以程序设计真的是一种艺术,看你怎么去设计。

只是你想过没有,这是一个final的类,不能继承,那我们在调用的时候从来没有写acceptor.open,这时候这个方法什么时候执行呢,在这个类里面也没有展示出来。这个疑问先留着,后面会说,这当然也是一种机制。

--------------------------------------------------------------

一写发现有那么多,本来想把IoConnector也写的,可是一个Acceptor就出乎意料的多,IoConnector差不多结构,放到下次去写了。下次会和上篇代码一样,给出一个简单的阉割版实现,每次都做一个简单的实现,源码都读完了,是不是就把mina也都写了一遍,但愿能实现。我其实想在最后写个基于P2P的通信框架,有点儿构思,但是没实力写,主要是结构组织的不好,所以还要多读读源码。

所以我发现照着源码做一个删减版代码出来的也是很好的一种读源码的方法,今天大神@ppdep开始读nginx的源码了,祝他好运。。。还有就是kafka看的我恶心,居然连个依赖库都没有,构建工具也没有,还得自己找。。

文章引自--https://my.oschina.net/ielts0909/blog/90862

上一篇:Java获取http和https协议返回的json数据


下一篇:背水一战 Windows 10 (84) - 用户和账号: 微软账号的登录和注销