通过Mina官 网文档,我们可以看到,有如下几个状态:
- Connected : the session has been created and is available
- Idle : the session hasn’t processed any request for at least a period of time (this period is configurable)Closing : the session is being closed (the remaining messages are being flushed, cleaning up is not terminated)
- Idle for read : no read has actually been made for a period of time
- Idle for write : no write has actually been made for a period of time
- Idle for both : no read nor write for a period of time
- Closed : The session is now closed, nothing else can be done to revive it.
对应的状态迁移图,如图所示:
通过上面的状态图,我们可以看出,是哪个事件的发生使得IoSession进入哪个状态,比较直观明了。下面,我们看一下IoSession对应的设计,类继承关系如下所示:
对于IoSession接口类,我在上图把具有不同类型功能的操作进行了分类,说明如下:
- 一个IoSession实例可以访问/持有哪些数据:前半部分以get开头的方法,能够返回对应的数据对象。
- 一个IoSession实例可以检测哪些状态数据:中间部分以is开头的一些方法。
- 一个IoSession实例可以执行哪些方法调用:后半部分以动词开头命名的方法。
- 一个IoSession实例还可以获取通信过程相关的统计信息,如读取字节数/消息数、写入字节数/消息数,等等,在上面类图中省略 了这些方法。
可见,IoSession在Mina框架中的位置是相当重要的。
根据上面的类图,我们分析一下NioSocketSession类的源代码。
AbstractIoSession实现了IoSession接口中定义的大多数方法,我们关注读和写两个重要的方法,因为他们最终也被NioSocketSession类所继承。
先看读数据请求方法read,如下所示:
01 |
public final ReadFuture read() {
|
02 |
if (!getConfig().isUseReadOperation()) {
|
03 |
throw new IllegalStateException( "useReadOperation is not enabled." );
|
06 |
Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
|
08 |
synchronized (readyReadFutures) {
|
09 |
future = readyReadFutures.poll();
|
11 |
if (future.isClosed()) {
|
12 |
readyReadFutures.offer(future);
|
15 |
future = new DefaultReadFuture( this );
|
16 |
getWaitingReadFutures().offer(future);
|
再看一下,写数据请求方法write,如下所示:
01 |
public WriteFuture write(Object message, SocketAddress remoteAddress) {
|
02 |
if (message == null ) {
|
03 |
throw new IllegalArgumentException( "Trying to write a null message : not allowed" );
|
06 |
if (!getTransportMetadata().isConnectionless() && (remoteAddress != null )) {
|
07 |
throw new UnsupportedOperationException();
|
10 |
if (isClosing() || !isConnected()) {
|
11 |
WriteFuture future = new DefaultWriteFuture( this );
|
12 |
WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
|
13 |
WriteException writeException = new WriteToClosedSessionException(request);
|
14 |
future.setException(writeException);
|
18 |
FileChannel openedFileChannel = null ;
|
21 |
if ((message instanceof IoBuffer) && !((IoBuffer) message).hasRemaining()) {
|
22 |
throw new IllegalArgumentException( "message is empty. Forgot to call flip()?" );
|
23 |
} else if (message instanceof FileChannel) {
|
24 |
FileChannel fileChannel = (FileChannel) message;
|
25 |
message = new DefaultFileRegion(fileChannel, 0 , fileChannel.size());
|
26 |
} else if (message instanceof File) {
|
27 |
File file = (File) message;
|
28 |
openedFileChannel = new FileInputStream(file).getChannel();
|
29 |
message = new FilenameFileRegion(file, openedFileChannel, 0 , openedFileChannel.size());
|
31 |
} catch (IOException e) {
|
32 |
ExceptionMonitor.getInstance().exceptionCaught(e);
|
33 |
return DefaultWriteFuture.newNotWrittenFuture( this , e);
|
37 |
WriteFuture writeFuture = new DefaultWriteFuture( this );
|
38 |
WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
|
41 |
IoFilterChain filterChain = getFilterChain();
|
42 |
filterChain.fireFilterWrite(writeRequest);
|
45 |
if (openedFileChannel != null ) {
|
46 |
final FileChannel finalChannel = openedFileChannel;
|
47 |
writeFuture.addListener( new IoFutureListener<WriteFuture>() {
|
48 |
public void operationComplete(WriteFuture future) {
|
51 |
} catch (IOException e) {
|
52 |
ExceptionMonitor.getInstance().exceptionCaught(e);
|
再看,NioSession类中增加了一个返回IoProcessor实例的抽象方法,而这个IoProcessor是在创建一个IoSession实例(例如,可以实例化一个NioSocketSession)的时候,由外部传到IoSession内部。我们知道,IoProcessor是Mina框架底层真正用来处理实际I/O操作的处理器,通过一个IoSession实例获取一个IoProcessor,可以方便地响应作用于IoSession的I/O读写请求,从而由这个IoProcessor直接去处理。
根据Mina框架架构设计,IoService->IoFilter Chain->IoHandler,我们知道在IoFilter Chain的一端(头部)之前会调用处理实际的I/O操作请求,也就是IoProcessor需要处理的逻辑,那么可以想象到,IoProcessor被调用的位置,可以查看org.apache.mina.core.filterchain.DefaultIoFilterChain类的源代码,其中定义了一个内部类,源码如下所示:
01 |
private class HeadFilter extends IoFilterAdapter {
|
02 |
@SuppressWarnings ( "unchecked" )
|
04 |
public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
|
06 |
AbstractIoSession s = (AbstractIoSession) session;
|
09 |
if (writeRequest.getMessage() instanceof IoBuffer) {
|
10 |
IoBuffer buffer = (IoBuffer) writeRequest.getMessage();
|
15 |
int remaining = buffer.remaining();
|
20 |
s.increaseScheduledWriteMessages();
|
22 |
s.increaseScheduledWriteBytes(remaining);
|
25 |
s.increaseScheduledWriteMessages();
|
28 |
WriteRequestQueue writeRequestQueue = s.getWriteRequestQueue();
|
30 |
if (!s.isWriteSuspended()) {
|
31 |
if (writeRequestQueue.size() == 0 ) {
|
33 |
s.getProcessor().write(s, writeRequest);
|
35 |
s.getWriteRequestQueue().offer(s, writeRequest);
|
36 |
s.getProcessor().flush(s);
|
39 |
s.getWriteRequestQueue().offer(s, writeRequest);
|
43 |
@SuppressWarnings ( "unchecked" )
|
45 |
public void filterClose(NextFilter nextFilter, IoSession session) throws Exception {
|
46 |
((AbstractIoSession) session).getProcessor().remove(session);
|
最后,我们看一下NioSocketSession实例被创建的时机。其实很容易想到,当一次网络通信开始的时候,也就是客户端连接到服务器端的时候,服务器端首先进行accept,这时候一次会话才被启动,也就是在这个是被创建,拿Mina中的NioSocketAcceptor类来看,创建NioSocketSession的代码,如下所示:
01 |
protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {
|
03 |
SelectionKey key = handle.keyFor(selector);
|
05 |
if ((key == null ) || (!key.isValid()) || (!key.isAcceptable())) {
|
10 |
SocketChannel ch = handle.accept();
|
16 |
return new NioSocketSession( this , processor, ch);
|
通过上面的分析,我们可知,IoSession在基于Mina进行网络通信的过程中,对于网络通信相关操作的请求都是基于一个IoSession实例来进行的,所以说,IoSession在Mina中是一个很重要的结构。