服务端代码
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NIOSocketServer extends Thread {
private static final Logger LOG = LoggerFactory
.getLogger(NIOSocketServer.class);
private static final String CHARSET = "UTF-8";
private static final int BUFFER_SIZE = 1024;
private static final int FAIL_TRY_NUM = 3;
private Selector selector;
private ServerSocketChannel ssc;
private static NIOSocketServer server;
public static void main(String[] args) {
server = new NIOSocketServer();
try {
server.initServer();
server.start();
} catch (Exception e) {
server.stopServer();
System.exit(1);
}
}
@Override
public void run() {
int failNum = 0;
while (true) {
try {
int select = selector.select();
if (select > 0) {
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isAcceptable()) {
doAcceptable(key);
}
if (key.isWritable()) {
doWriteMessage(key);
}
if (key.isReadable()) {
doReadMessage(key);
}
if (key.isConnectable()) {
doConnectable(key);
}
iter.remove();
}
}
} catch (Exception e) {
failNum++;
if (failNum > FAIL_TRY_NUM) {
server.stopServer();
}
}
}
}
private void initServer() throws IOException {
selector = Selector.open();
ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.socket().bind(new InetSocketAddress(2181));
ssc.register(selector, SelectionKey.OP_ACCEPT);
}
private void stopServer() {
try {
if (selector != null && selector.isOpen()) {
selector.close();
}
if (ssc != null && ssc.isOpen()) {
ssc.close();
}
} catch (IOException e) {
LOG.info("关闭服务端失败:" + e.getMessage());
}
}
private void doAcceptable(SelectionKey key) throws IOException {
ServerSocketChannel tmpSsc = (ServerSocketChannel) key.channel();
SocketChannel ss = tmpSsc.accept();
ss.configureBlocking(false);
ss.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
private void doConnectable(SelectionKey key) {
LOG.info("connect is ok");
}
private void doWriteMessage(SelectionKey key) throws Exception {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.wrap("server write msg to client"
.getBytes(CHARSET));
while (buffer.hasRemaining()) {
sc.write(buffer);
}
TimeUnit.SECONDS.sleep(1);
}
private void doReadMessage(SelectionKey key) throws Exception {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer bb = ByteBuffer.allocate(BUFFER_SIZE);
int read = sc.read(bb);
while (read > 0) {
bb.flip();
byte[] barr = new byte[bb.limit()];
bb.get(barr);
LOG.info("server read msg from client:" + new String(barr, CHARSET));
bb.clear();
read = sc.read(bb);
}
TimeUnit.SECONDS.sleep(1);
}
}
客户端代码
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NIOSocketClient extends Thread {
private static final Logger LOG = LoggerFactory
.getLogger(NIOSocketClient.class);
private static final String CHARSET = "UTF-8";
private static final int BUFFER_SIZE = 1024;
private static final int FAIL_TRY_NUM = 3;
private SocketChannel socketChannel;
private Selector selector;
private static NIOSocketClient client;
public static void main(String[] args) {
client = new NIOSocketClient();
try {
client.initClient();
client.start();
} catch (Exception e) {
client.close();
}
}
public void run() {
int failNum = 0;
while (true) {
try {
writeMessage();
int select = selector.select();
System.out.println(select);
if (select > 0) {
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey sk = iter.next();
if (sk.isReadable()) {
readMessage(sk);
}
iter.remove();
}
}
} catch (Exception e) {
failNum++;
if (failNum > FAIL_TRY_NUM) {
client.close();
System.exit(1);
}
}
}
}
public void readMessage(SelectionKey sk) throws Exception,
UnsupportedEncodingException {
SocketChannel curSc = (SocketChannel) sk.channel();
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
while (curSc.read(buffer) > 0) {
buffer.flip();
LOG.info("read message from server:"
+ new String(buffer.array(), CHARSET));
buffer.clear();
}
TimeUnit.SECONDS.sleep(1);
}
public void writeMessage() throws Exception {
String ss = "client write msg to server";
ByteBuffer buffer = ByteBuffer.wrap(ss.getBytes(CHARSET));
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
TimeUnit.SECONDS.sleep(1);
}
public void initClient() throws IOException, ClosedChannelException {
InetSocketAddress addr = new InetSocketAddress(2181);
socketChannel = SocketChannel.open();
selector = Selector.open();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
socketChannel.connect(addr);
while (!socketChannel.finishConnect()) {
LOG.info("check finish connection");
}
}
private void close() {
try {
if (selector != null && selector.isOpen()) {
selector.close();
}
if (socketChannel != null && socketChannel.isOpen()) {
socketChannel.close();
}
} catch (IOException e) {
LOG.info("关闭客户端失败:" + e.getMessage());
}
}
}