1.阻塞模式实例
NIOUtil类,用来通过SOcket获取BufferedReader和PrintWriter。
package IO; import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket; public class NIOUtil {
public static PrintWriter getPrintWriter(Socket socket) throws IOException {
OutputStream outputStream = socket.getOutputStream();
return new PrintWriter(outputStream, true);
} public static BufferedReader getBufferedReader(Socket socket)
throws IOException {
InputStream inputStream = socket.getInputStream();
return new BufferedReader(new InputStreamReader(inputStream));
}
}
使用ServerSocketChannel创建阻塞服务器端程序:
package IO; import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class BlockingNIOServer {
private int port = 8000;
private ServerSocketChannel serverSocketChannel = null;
private ExecutorService executorService = null;
private static int DEFAULT_POOI_SIZE = 4; public BlockingNIOServer() throws IOException {
super();
this.executorService = Executors.newFixedThreadPool(DEFAULT_POOI_SIZE
* Runtime.getRuntime().availableProcessors());
this.serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().setReuseAddress(true);
serverSocketChannel.socket().bind(new InetSocketAddress(port));
this.executorService = executorService;
} public void service() {
while (true) {
SocketChannel channel = null;
try {
channel = serverSocketChannel.accept();
executorService.execute(new Handler(channel));
} catch (Exception e) {
e.printStackTrace();
}
}
} public static void main(String[] args) throws Exception {
new BlockingNIOServer().service();
} private class Handler implements Runnable {
private SocketChannel channel;
public Handler(SocketChannel channel) {
super();
this.channel = channel;
} @Override
public void run() {
handler(channel);
} public void handler(SocketChannel channel) {
Socket socket = null;
try {
socket = channel.socket();
System.out.println("接收到来自:" + socket.getInetAddress() + " 端口:"
+ socket.getPort() + "的请求");
BufferedReader bufferedReader = NIOUtil
.getBufferedReader(socket);
PrintWriter printWriter = NIOUtil.getPrintWriter(socket);
String msg = null; while ((msg = bufferedReader.readLine()) != null) {
System.out.println(msg);
printWriter.println(Echo(msg));
if ("bye".equalsIgnoreCase(msg))
break;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
} } private String Echo(String msg) {
return "ECHO:" + msg;
}
}
}
使用SocketChannel创建阻塞Socket客户端:
package IO; import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class BlockingNIOServer {
private int port = 8000;
private ServerSocketChannel serverSocketChannel = null;
private ExecutorService executorService = null;
private static int DEFAULT_POOI_SIZE = 4; public BlockingNIOServer() throws IOException {
super();
this.executorService = Executors.newFixedThreadPool(DEFAULT_POOI_SIZE
* Runtime.getRuntime().availableProcessors());
this.serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().setReuseAddress(true);
serverSocketChannel.socket().bind(new InetSocketAddress(port));
this.executorService = executorService;
} public void service() {
while (true) {
SocketChannel channel = null;
try {
channel = serverSocketChannel.accept();
executorService.execute(new Handler(channel));
} catch (Exception e) {
e.printStackTrace();
}
}
} public static void main(String[] args) throws Exception {
new BlockingNIOServer().service();
} private class Handler implements Runnable {
private SocketChannel channel;
public Handler(SocketChannel channel) {
super();
this.channel = channel;
} @Override
public void run() {
handler(channel);
} public void handler(SocketChannel channel) {
Socket socket = null;
try {
socket = channel.socket();
System.out.println("接收到来自:" + socket.getInetAddress() + " 端口:"
+ socket.getPort() + "的请求");
BufferedReader bufferedReader = NIOUtil
.getBufferedReader(socket);
PrintWriter printWriter = NIOUtil.getPrintWriter(socket);
String msg = null; while ((msg = bufferedReader.readLine()) != null) {
System.out.println(msg);
printWriter.println(Echo(msg));
if ("bye".equalsIgnoreCase(msg))
break;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
} } private String Echo(String msg) {
return "ECHO:" + msg;
}
}
}
2.非阻塞模式实例
Charset类,主要用于decode()和encode()
package IO; import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset; public class CharSetUtil {
public static String decode(ByteBuffer buffer, String charsetName) {
Charset charset = Charset.forName(charsetName);
CharBuffer msg = charset.decode(buffer);
return msg.toString();
} public static ByteBuffer encode(String msg, String charsetName) {
Charset charset = Charset.forName(charsetName);
ByteBuffer byteBuffer = charset.encode(msg);
return byteBuffer;
}
}
非阻塞的服务器端
package IO; import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class NoBlockingServer {
private int port = 8000;
private ServerSocketChannel serverSocketChannel = null;
private Selector selector = null; public NoBlockingServer() throws IOException {
super();
selector = Selector.open();
this.serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().setReuseAddress(true);
serverSocketChannel.configureBlocking(false);//设置为非阻塞
serverSocketChannel.socket().bind(new InetSocketAddress(port));
System.out.println("服务器启动成功");
} public void service() throws IOException {
//给serverSocketChannel注册OP_ACCEPT事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//注意selector.select()将会阻塞
while (selector.select() > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = null;
try {
selectionKey = (SelectionKey) iterator.next();
iterator.remove(); if (selectionKey.isAcceptable()) {
dealWithAcceptable(selectionKey);
}
if (selectionKey.isReadable()) {
dealWithReadable(selectionKey);
}
if (selectionKey.isWritable()) {
dealWithWritable(selectionKey);
}
} catch (Exception e) {
if (selectionKey != null) {
selectionKey.cancel();
selectionKey.channel().close();
}
}
}
}
} private void dealWithAcceptable(SelectionKey selectionKey)
throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey
.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("接收到来自:" + socketChannel.socket().getInetAddress()
+ " 端口" + socketChannel.socket().getPort() + "的请求");
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ
| SelectionKey.OP_WRITE, buffer);
} private void dealWithReadable(SelectionKey selectionKey) throws IOException{
ByteBuffer buffer=(ByteBuffer) selectionKey.attachment();
SocketChannel channel=(SocketChannel) selectionKey.channel();
ByteBuffer readBuffer=ByteBuffer.allocate(32);
channel.read(readBuffer);
readBuffer.flip(); buffer.limit(buffer.capacity());
buffer.put(readBuffer);
} private void dealWithWritable(SelectionKey selectionKey) throws IOException{
ByteBuffer buffer=(ByteBuffer) selectionKey.attachment();
SocketChannel channel=(SocketChannel) selectionKey.channel();
buffer.flip(); String msg=CharSetUtil.decode(buffer, "UTF-8"); if(msg.indexOf("\r\n")==-1){
return;
} String outPutData=msg.substring(0, msg.indexOf("\n")+1);
System.out.println("接收来自客户端的数据:"+outPutData); ByteBuffer outbyteBuffer=CharSetUtil.encode("echo:"+outPutData, "UTF-8");
while (outbyteBuffer.hasRemaining()) {
channel.write(outbyteBuffer);
} ByteBuffer tmp=CharSetUtil.encode(outPutData, "UTF-8");
buffer.position(tmp.limit());
buffer.compact();
if("bye\r\n".equalsIgnoreCase(outPutData)){
selectionKey.cancel();
channel.close();
System.out.println("关闭与客户端的连接");
}
} public static void main(String[] args) throws Exception {
new NoBlockingServer().service();
}
}
非阻塞的客户端
package IO; import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set; public class NoBlockingClient {
private SocketChannel channel = null;
private ByteBuffer send = ByteBuffer.allocate(1024);
private ByteBuffer rece = ByteBuffer.allocate(1024);
private Selector selector; public NoBlockingClient() throws IOException {
super();
channel = SocketChannel.open();
channel.socket().connect(new InetSocketAddress("localhost", 8000));
channel.configureBlocking(false); System.out.println("与服务器建立连接成功");
selector = Selector.open();
} public void talk() throws IOException {
channel.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE);
while (selector.select() > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = null;
try {
selectionKey = (SelectionKey) iterator.next();
iterator.remove(); if (selectionKey.isReadable()) {
dealWithReadable(selectionKey);
}
if (selectionKey.isWritable()) {
dealWithWritable(selectionKey);
}
} catch (Exception e) {
if (selectionKey != null) {
selectionKey.cancel();
try {
selectionKey.channel().close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
}
} private void receFromUser() throws IOException{
BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(System.in));
String msg=null;
while ((msg=bufferedReader.readLine())!=null) {
synchronized (send) {
send.put(CharSetUtil.encode(msg+"\r\n", "UTF-8"));
}
if ("bye".equalsIgnoreCase(msg)) {
break;
}
}
} private void dealWithWritable(SelectionKey selectionKey) throws IOException{
SocketChannel channel=(SocketChannel) selectionKey.channel();
synchronized (send) {
send.flip();
channel.write(send);
send.compact();
}
} private void dealWithReadable(SelectionKey selectionKey) throws IOException{
SocketChannel channel=(SocketChannel) selectionKey.channel();
channel.read(rece);
rece.flip();
String msg=CharSetUtil.decode(rece, "UTF-8"); if(msg.indexOf("\r\n")==-1){
return;
} String outPutData=msg.substring(0, msg.indexOf("\n")+1);
System.out.println(outPutData); if("echo:bye\r\n".equalsIgnoreCase(outPutData)){
selectionKey.cancel();
channel.close();
selector.close();
System.out.println("关闭与客户端的连接");
} ByteBuffer tmp=CharSetUtil.encode(outPutData, "UTF-8");
rece.position(tmp.limit());
rece.compact();
} public static void main(String[] args) throws IOException {
System.out.println(System.getProperty("file.encoding"));
final NoBlockingClient noBlockingClient=new NoBlockingClient();
Thread thread=new Thread(){
public void run() {
try {
noBlockingClient.receFromUser();
} catch (IOException e) {
e.printStackTrace();
}
};
}; thread.start();
noBlockingClient.talk();
}
}
3.阻塞和非阻塞编程实例
服务器端使用阻塞和非阻塞模式,f负责接收客户端连接的线程按照阻塞模式工作,负责接收和发送数据的线程按照非阻塞模式工作。
package IO; import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set; public class NOBlockingAndBolckingServer {
private int port = 8000;
private ServerSocketChannel serverSocketChannel = null;
private Selector selector = null;
private Object gate = new Object(); public NOBlockingAndBolckingServer() throws IOException {
super();
selector = Selector.open();
this.serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(port));
System.out.println("服务器启动成功");
} public void accept() {
for (;;) {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("接收到来自:"
+ socketChannel.socket().getInetAddress() + " 端口"
+ socketChannel.socket().getPort() + "的请求");
socketChannel.configureBlocking(false); synchronized (gate) {
selector.wakeup();
socketChannel.register(selector, SelectionKey.OP_READ
| SelectionKey.OP_WRITE);
}
} catch (Exception e) {
e.printStackTrace();
}
}
} public void service() throws IOException {
// 给serverSocketChannel注册OP_ACCEPT事件
for (;;) {
synchronized (gate) {
int n = selector.select();
if (n == 0)
continue;
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = null;
try {
selectionKey = (SelectionKey) iterator.next();
iterator.remove(); if (selectionKey.isReadable()) {
dealWithReadable(selectionKey);
}
if (selectionKey.isWritable()) {
dealWithWritable(selectionKey);
}
} catch (Exception e) {
if (selectionKey != null) {
selectionKey.cancel();
selectionKey.channel().close();
}
}
}
}
}
} private void dealWithReadable(SelectionKey selectionKey) throws IOException {
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(32);
channel.read(readBuffer);
readBuffer.flip(); buffer.limit(buffer.capacity());
buffer.put(readBuffer);
} private void dealWithWritable(SelectionKey selectionKey) throws IOException {
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
SocketChannel channel = (SocketChannel) selectionKey.channel();
buffer.flip(); String msg = CharSetUtil.decode(buffer, "UTF-8"); if (msg.indexOf("\r\n") == -1) {
return;
} String outPutData = msg.substring(0, msg.indexOf("\n") + 1);
System.out.println("接收来自客户端的数据:" + outPutData); ByteBuffer outbyteBuffer = CharSetUtil.encode("echo:" + outPutData,
"UTF-8");
while (outbyteBuffer.hasRemaining()) {
channel.write(outbyteBuffer);
} ByteBuffer tmp = CharSetUtil.encode(outPutData, "UTF-8");
buffer.position(tmp.limit());
buffer.compact();
if ("bye\r\n".equalsIgnoreCase(outPutData)) {
selectionKey.cancel();
channel.close();
System.out.println("关闭与客户端的连接");
}
} public static void main(String[] args) throws Exception {
final NOBlockingAndBolckingServer server=new NOBlockingAndBolckingServer();
Thread thread=new Thread(){
public void run() {
server.accept();
};
};
thread.start();
server.service();
}
}
客户端和服务器端创建多个连接。
package IO; import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.LinkedList; class Target { // 表示一项任务
InetSocketAddress address;
SocketChannel channel;
Exception failure;
long connectStart; // 开始连接时的时间
long connectFinish = 0; // 连接成功时的时间
boolean shown = false; // 该任务是否已经打印 Target(String host) {
try {
address = new InetSocketAddress(InetAddress.getByName(host), 80);
} catch (IOException x) {
failure = x;
}
} void show() { // 打印任务执行的结果
String result;
if (connectFinish != 0)
result = Long.toString(connectFinish - connectStart) + "ms";
else if (failure != null)
result = failure.toString();
else
result = "Timed out";
System.out.println(address + " : " + result);
shown = true;
}
} public class PingClient {
private Selector selector;
// 存放用户新提交的任务
private LinkedList targets = new LinkedList();
// 存放已经完成的需要打印的任务
private LinkedList finishedTargets = new LinkedList(); public PingClient() throws IOException {
selector = Selector.open();
Connector connector = new Connector();
Printer printer = new Printer();
connector.start();
printer.start();
receiveTarget();
} public static void main(String args[]) throws IOException {
new PingClient();
} public void addTarget(Target target) {
// 向targets队列中加入一个任务
SocketChannel socketChannel = null;
try {
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(target.address); target.channel = socketChannel;
target.connectStart = System.currentTimeMillis(); synchronized (targets) {
targets.add(target);
}
selector.wakeup();
} catch (Exception x) {
if (socketChannel != null) {
try {
socketChannel.close();
} catch (IOException xx) {
}
}
target.failure = x;
addFinishedTarget(target);
}
} public void addFinishedTarget(Target target) {
// 向finishedTargets队列中加入一个任务
synchronized (finishedTargets) {
finishedTargets.notify();
finishedTargets.add(target);
}
} public void printFinishedTargets() {
// 打印finisedTargets队列中的任务
try {
for (;;) {
Target target = null;
synchronized (finishedTargets) {
while (finishedTargets.size() == 0)
finishedTargets.wait();
target = (Target) finishedTargets.removeFirst();
}
target.show();
}
} catch (InterruptedException x) {
return;
}
} public void registerTargets() {
// 取出targets队列中的任务,向Selector注册连接就绪事件
synchronized (targets) {
while (targets.size() > 0) {
Target target = (Target) targets.removeFirst(); try {
target.channel.register(selector, SelectionKey.OP_CONNECT,
target);
} catch (IOException x) {
try {
target.channel.close();
} catch (IOException e) {
e.printStackTrace();
}
target.failure = x;
addFinishedTarget(target);
}
}
}
} public void processSelectedKeys() throws IOException {
// 处理连接就绪事件
for (Iterator it = selector.selectedKeys().iterator(); it.hasNext();) {
SelectionKey selectionKey = (SelectionKey) it.next();
it.remove(); Target target = (Target) selectionKey.attachment();
SocketChannel socketChannel = (SocketChannel) selectionKey
.channel(); try {
if (socketChannel.finishConnect()) {
selectionKey.cancel();
target.connectFinish = System.currentTimeMillis();
socketChannel.close();
addFinishedTarget(target);
}
} catch (IOException x) {
socketChannel.close();
target.failure = x;
addFinishedTarget(target);
}
}
} //接收用户输入的地址,向targets队列中加入任务
public void receiveTarget() {
try {
BufferedReader localReader = new BufferedReader(
new InputStreamReader(System.in));
String msg = null;
while ((msg = localReader.readLine()) != null) {
if (!msg.equals("bye")) {
Target target = new Target(msg);
addTarget(target);
} else {
shutdown = true;
selector.wakeup();
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
} boolean shutdown = false; public class Printer extends Thread {
public Printer() {
setDaemon(true);
} public void run() {
printFinishedTargets();
}
} public class Connector extends Thread {
public void run() {
while (!shutdown) {
try {
registerTargets();
if (selector.select() > 0) {
processSelectedKeys();
}
} catch (Exception e) {
e.printStackTrace();
}
}
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}