Reactor模式
- Reactor中文通常被译作“反应堆”,从字面上便透露出了凌厉的霸气,但是不可否认,这个中文的译名除了让人觉得Reactor模式很厉害之外,并没有透露出更多的信息,让人对其理解仍是云里雾里,或许将Reactor看成是'Notifier'+'Dispatcher'的结合体,会更能直接的表达Reactor的工作模式。
-
Wikipedia的定义
- 对于什么是Reactor模式,不同的机构给出了许多不同的定义,个人最喜欢Wikipedia上的描述,原文如下:
The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers
整段描述强调几个关键信息:
- Reactor模式是一种事件驱动模式
- 一个或多个输入是同时交付的
- 服务控制器会分离到达的多个请求并同步的分发给相关的处理器进行处理
- Reactor模式简图
Reactor模式的组件
-
Reactor模式中包含了Reactor、Handle、EventHandler、Synchronous Event Demutiplexer等必不可少的组成部份,组件间的关系如下所示:
- Synchronous Event Demutiplexer是同步事件分离器,是IO多路服用技术实现的关键,主要任务是监听系统的Handlers中事件的发生,监听的过程是阻塞等待的
-
Handle是句柄,即操作系统管理的资源
- Dispatcher是分发器,负责根据Event的类型来调用EventHandler
- EventHandler是事件处理器,每个事件处理器会关联一个Handle
Reactor模式工作的时序图如下:
-
流程如下所述:
- 初始化Dispatcher
- 注册EventHandler到Dispatcher中,每个EventHandler包含对相应Handle的引用,从而建立Handle到EventHandler的映射
- 启动Event Loop。在Event Loop中,调用select()方法,Synchronous Event Demultiplexer阻塞等待Event发生
- 当某个或某些Handle的Event发生后,select()方法返回,Dispatcher根据返回的Event找到注册的EventHandler,并回调该EventHandler的handle_event()方法
- 在EventHandler的handle_event()方法中还可以向Dispatcher中注册新的EventHandler,用来处理下一个Event
I/O多路复用
- 指多个描述符(fd)的I/O操作能在一个线程内并发交替地顺序完成,这就叫I/O多路复用,这里的“复用”指的是复用同一个线程。I/O多路复用是reactor模式的核心,I/O多路复用功能由Synchronous Event Demutiplexer提供,而Synchronous Event Demutiplexer是由操作系统实现的。
NIO
- NIO是Java SDK提供的基于Reactor模式的非阻塞IO工作模式的实现
- NIO与IO的主要区别包括:
| NIO | IO |
| ------ | ------ |
| 面向缓冲 | 面向流 |
| 非阻塞IO | 阻塞 IO |
面向缓冲与面向流
- 面向缓冲是NIO与传统IO最大的区别,传统的IO是基于字节的,所有的IO都被看作是单个子节的移动,而NIO是基于块的,一个块则由多个字节组成,从简单的原理上看,NIO的性能提升主要来源于每一次IO操作都能尽可能的读写更多的字节,而更直接的提升是得益于NIO使用的IO读写结构Channel和Buffer非常贴近操作系统执行IO的方式:通道和缓冲器。简单的理解就是越接近操作系统底层,越快速。
-
NIO对Reactor模型组件的实现
- Selector : Synchronous Event Demultiplexer
- SelectKey : Event
- SocketChannel : Handle
- (Handlers write by yourself) : EventHandler
#### NIO server demo
package nio.service;
import nio.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class NIOService {
private static final Logger logger = LoggerFactory.getLogger(NIOService.class);
private static final int TIMEOUT = 30000;
private static final int PORT = 8084;
private static final int BLOCK = 4096;
private static ByteBuffer receiveBuffer = ByteBuffer.allocate(BLOCK);
private static ByteBuffer sendBuffer = ByteBuffer.allocate(BLOCK);
public static void main(String[] args) {
SocketChannel readChannel = null;
ServerSocketChannel listenChannel = null;
try {
//创建一个选择器,用于监听管理事件
Selector selector = Selector.open();
//创建服务器socket管道,用来接收和分发socket连接
listenChannel = ServerSocketChannel.open();
//设置管道为非阻塞形式
listenChannel.configureBlocking(false);
//绑定socket端口
listenChannel.socket().bind(new InetSocketAddress(PORT));
//将管道注册到选择器中,注册的事件类型为OP_ACCEPT,现在selector只监听指定端口的OP_ACCEPT事件
listenChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("running.......");
while (true) {
//监听事件,设置每一次监听的超时数
if (selector.select(TIMEOUT) == 0) {
continue;
}
//事件来源列表
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
//获取一个事件
SelectionKey key = iter.next();
//删除当前事件
iter.remove();
//检查此键的通道是否已准备好接受新的套接字连接
if (key.isAcceptable()) {
System.out.println("in acceptable key readyOps : " + key.readyOps());
//返回此键的通道
ServerSocketChannel server = (ServerSocketChannel) key.channel();
//接收套接字连接,返回套接字通道
readChannel = server.accept();
//配置为非阻塞
readChannel.configureBlocking(false);
//注册到同一个Selector中
readChannel.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
//返回为之创建的socket通道
SocketChannel channel = (SocketChannel) key.channel();
//清空buffer
receiveBuffer.clear();
int count = channel.read(receiveBuffer);
if (count > 0) {
String receiveText = new String(receiveBuffer.array(), 0, count);
//在发送的buffer中存入收到的内容
sendBuffer.put(receiveBuffer.array());
//设置监听的消息包括写消息
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
System.out.println("in readable key readyOps : " + key.readyOps());
System.out.println("receive message : " + receiveText);
//在网络不阻塞的情况下,socket都是可写的
//保证缓存的可读性
sendBuffer.clear();
sendBuffer.put(CharsetUtil.encode("server get the input: " + receiveText).array());
//保证buffer的可读性
sendBuffer.flip();
SocketChannel socketChannel = (SocketChannel) key.channel();
if (socketChannel.isConnected()) {
//do something
while (sendBuffer.hasRemaining()) {
if (socketChannel.isConnected()) {
socketChannel.write(sendBuffer);
}
}
}
sendBuffer.clear();
} else if (count < 0) {//socket已经断开,count == -1
//do nothing
}
}
}
}
} catch (IOException e) {
logger.error("nio selector open failure.", e);
} finally {
if (readChannel != null) {
try {
readChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (listenChannel != null) {
try {
listenChannel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
-
NIO参考资料
epoll
epoll是除了NIO之外另一个对Reactor模式的经典实现,那么什么是epoll?根据man手册的描述:epoll是为了处理大批量句柄(handle)而做了改进的poll。linux 2.5.44以上版本支持,是一个性能极好的I/O多路复用的实现。
epoll对select/poll的改进
- select/poll 的工作方式是阻塞监听描述符(fd)就绪状态,直到有描述符就绪(有数据可读、可写、或者有except、timeout),则返回fdset,然后通过遍历fdset来找到就绪的描述符。这两种模式之间并没有本质上的区别,poll与select最大的区别在于 poll没有最大文件描述符数量的限制,而select有最大文件描述符数量上的限制。
- select/poll的缺点在于:当大量文件描述符的数组被复制于用户态和内核的地址空间之间,不论这些文件描述符是否就绪,轮询的开销会随着文件描述符数量的增加而线性增加。
- epoll 对与select/poll的改进在于,epoll在阻塞监听描述符就绪状态时,仅会返回已经就绪的文件描述符集合,而无需再遍历集合。epoll事先通过epoll_ctl()来注册一 个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait() 时便得到通知,这样IO的效率不会随着监视fd的数量的增长而下降。
- epoll提供了三个可以操作的接口
- epoll_create(int size);
- epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
-
epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
+ int epoll_create(int size) + 创建一个epoll的句柄,size是最大的描述符监听数。size并不会真正限制epolll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议值。 + int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) + 对指定描述符的监听事件执行指定的Op操作,epfd:是epoll_create()的返回值,fd:是需要监听的文件描述符,epoll_event:是告诉内核需要监听什么事,op:表示操作,有三个宏定义,分别是添加EPOLL_CTL_ADD,删除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。 + int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout) : 等待epfd上的io事件,最多返回maxevents个事件, 该方法与Java NIO的select()方法类似,maxevents的值不能超过epoll_create中的参数size的大小,也就是说,size的大小也间接限制了epoll的线程一次会批量处理几个IO事件。 + epoll使用的代码demo:
while(1)
{
nfds = epoll_wait(epfd,events,30,600);
for(i=0;i<nfds;++i)
{
if(events[i].data.fd==listenfd) //出现新的连接
{
connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); //接受这个连接
ev.data.fd=connfd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); //将新的描述符添加到epoll的监听队列中
}
else if( events[i].events&EPOLLIN ) //接收到数据
{
n = read(sockfd, line, MAXLINE)) < 0
ev.data.ptr = md; //添加数据,md为自定义类型my_data的指针
ev.events=EPOLLOUT|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);//修改标识符,等待下一个循环时发送数据
}
else if(events[i].events&EPOLLOUT) //有数据待发送,写socket
{
struct my_data* md = (my_data*)events[i].data.ptr; //读取数据
sockfd = md->fd;
send( sockfd, md->ptr, strlen((char*)md->ptr), 0 ); //发送数据
ev.data.fd=sockfd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //修改标识符,等待下一个循环时接收数据
}
else
{
//其他的处理
}
}
}