介绍
为了提供并发处理效率,把用户的请求连接随机分配到线程池的线程进行处理,hbase也是采用同样的方式处理用户请求的
客户端代码可以参考:基于java.nio.channels的编程实践-I
代码
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class NIOSocketServer2 extends Thread {
private static final Logger LOG = LoggerFactory
.getLogger(NIOSocketServer2.class);
private static final String CHARSET = "UTF-8";
private static final int BUFFER_SIZE = 1024;
private static final int FAIL_TRY_NUM = 3;
private Selector selector;
private ServerSocketChannel ssc;
private static NIOSocketServer2 server;
private Reader[] readers = null;
private Random rand = new Random();
private int readerSize = 1;
/**
* 程序入口
*
* @param args
*/
public static void main(String[] args) {
server = new NIOSocketServer2();
try {
// server.setDaemon(true);
server.initServer();
server.start();
} catch (Exception e) {
e.printStackTrace();
// 如果出现异常,则直接关闭客户端
server.stopServer();
System.exit(1);
}
}
@Override
public void run() {
int failNum = 0;
while (true) {
try {
int select = selector.select();
if (select > 0) {
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isAcceptable()) {
doAcceptable(key);
}
iter.remove();
}
}
} catch (Exception e) {
failNum++;
if (failNum > FAIL_TRY_NUM) {
server.stopServer();
}
}
}
}
/**
* 初始化服务器端程序,开始监听端口
*
* @throws IOException
*/
private void initServer() throws IOException {
selector = Selector.open();
ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.socket().bind(new InetSocketAddress(2181));
ssc.register(selector, SelectionKey.OP_ACCEPT);
ExecutorService readPool = Executors.newFixedThreadPool(
readerSize,
new ThreadFactoryBuilder()
.setNameFormat("RpcServer.reader=%d,port=" + 2181)
.setDaemon(true).build());
readers = new Reader[readerSize];
for (int i = 0; i < readerSize; ++i) {
Reader reader = new Reader();
readers[i] = reader;
readPool.execute(reader);
}
}
/**
* 停止服务器
*
* @throws IOException
*/
private void stopServer() {
try {
if (selector != null && selector.isOpen()) {
selector.close();
}
if (ssc != null && ssc.isOpen()) {
ssc.close();
}
} catch (IOException e) {
LOG.info("关闭服务端失败:" + e.getMessage());
}
}
Reader getReader() {
return readers[rand.nextInt(readerSize) % readers.length];
}
/**
* 对新的客户端连接进行处理
*
* @param key
* @throws IOException
*/
private void doAcceptable(SelectionKey key) throws IOException {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) {
try {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(true);
channel.socket().setKeepAlive(true);
} catch (IOException ioe) {
channel.close();
throw ioe;
}
Reader reader = getReader();
try {
reader.startAdd();
SelectionKey readKey = reader.registerChannel(channel);
} finally {
reader.finishAdd();
}
}
}
/**
* 已连接
*
* @param key
*/
private void doConnectable(SelectionKey key) {
LOG.info("connect is ok");
}
/**
* 写消息到客户端
*
* @param key
* @throws IOException
*/
private void doWriteMessage(SelectionKey key) throws Exception {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.wrap("server write msg to client"
.getBytes(CHARSET));
while (buffer.hasRemaining()) {
sc.write(buffer);
}
TimeUnit.SECONDS.sleep(1);
}
/**
* @param key
* @throws IOException
*/
private void doReadMessage(SelectionKey key) throws Exception {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer bb = ByteBuffer.allocate(BUFFER_SIZE);
int read = sc.read(bb);
while (read > 0) {
bb.flip();
byte[] barr = new byte[bb.limit()];
bb.get(barr);
LOG.info("server read msg from client:" + new String(barr, CHARSET));
bb.clear();
read = sc.read(bb);
}
TimeUnit.SECONDS.sleep(1);
}
private class Reader implements Runnable {
private volatile boolean adding = false;
private final Selector readSelector;
Reader() throws IOException {
this.readSelector = Selector.open();
}
public void run() {
try {
doRunLoop();
} finally {
try {
readSelector.close();
} catch (IOException ioe) {
LOG.error(getName() + ": error closing read selector in "
+ getName(), ioe);
}
}
}
private synchronized void doRunLoop() {
while (true) {
SelectionKey key = null;
try {
readSelector.select();
while (adding) {
this.wait(1000);
}
Iterator<SelectionKey> iter = readSelector.selectedKeys()
.iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
if (key.isValid()) {
if (key.isReadable()) {
try {
doReadMessage(key);
} catch (Exception e) {
e.printStackTrace();
}
}
if (key.isWritable()) {
try {
doWriteMessage(key);
} catch (Exception e) {
e.printStackTrace();
}
}
}
key = null;
}
} catch (InterruptedException e) {
if (true) { // unexpected -- log it
LOG.info(getName() + ": unexpectedly interrupted: "
+ StringUtils.stringifyException(e));
}
} catch (IOException ex) {
LOG.error(getName() + ": error in Reader", ex);
}
}
}
/**
* This gets reader into the state that waits for the new channel to be
* registered with readSelector. If it was waiting in select() the
* thread will be woken up, otherwise whenever select() is called it
* will return even if there is nothing to read and wait in
* while(adding) for finishAdd call
*/
public void startAdd() {
adding = true;
readSelector.wakeup();
}
public synchronized SelectionKey registerChannel(SocketChannel channel)
throws IOException {
return channel.register(readSelector, SelectionKey.OP_READ
| SelectionKey.OP_WRITE);
}
public synchronized void finishAdd() {
adding = false;
this.notify();
}
}
}