1、半关闭的Socket
前面的服务器和客户端通信时总是以行为最小数据单位,但是在某些协议里,通信的数据单位可能是多行的,当出现多行数据时就
出现一个问题:Socket输出流如何表示输出数据已经结束。
在IO中,如果表示输出已经结束,可以通过关闭输出流来实现,但在网络通信中则不同通过关闭输出流表示输出已经结束,
因为如果关闭,对应的Socket也将随之关闭,这样会导致程序无法再从Socket对应输出流中获取数据了。
在这种情况下,socket提供了两个半关闭的方法:
shutdownInput():关闭Socket的输入流,程序还可以通过Socket输出流输出数据
shutdownOutput():关闭Socket的输出流,程序还可以通过Socket的输入流读取数据
package rdb.com.net.server; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintStream; import java.net.ServerSocket; import java.net.Socket; import java.util.Scanner; public class Server1 { public static void main(String[] args){ ServerSocket serverSocket = null; PrintStream ps = null; Scanner scan = null; Socket socket = null; try { serverSocket = new ServerSocket(30000); while(true){ socket = serverSocket.accept(); ps = new PrintStream(socket.getOutputStream()); ps.println("服务端第一行数据"); ps.println("服务端第二行数据"); //关闭输出流,表明socket输出已经结束 socket.shutdownOutput(); //false,关闭输出流或关闭输入流 是不会关闭Socket System.out.println(socket.isClosed()); scan = new Scanner(socket.getInputStream()); while(scan.hasNextLine()){ System.out.println(scan.nextLine()); } } } catch (IOException e) { e.printStackTrace(); }finally{ try { if(scan != null){scan.close();} if(socket != null){socket.close();} if(serverSocket != null){serverSocket.close();} } catch (IOException e) { e.printStackTrace(); } } } }
package rdb.com.net.server; import java.io.IOException; import java.io.PrintStream; import java.net.Socket; import java.util.Scanner; public class Client1 { public static void main(String[] args) { Socket socket = null ; PrintStream ps = null; Scanner scan = null; try { socket = new Socket("127.0.0.1", 30000); ps = new PrintStream(socket.getOutputStream()); ps.println("客户端第一行数据"); ps.println("客户端第二行数据"); //关闭输出流 socket.shutdownOutput(); System.out.println(socket.isClosed()); scan = new Scanner(socket.getInputStream()); while(scan.hasNextLine()){ System.out.println(scan.nextLine()); } } catch (IOException e) { e.printStackTrace(); }finally{ try { if(scan != null){scan.close();} if(socket != null){socket.close();} } catch (IOException e) { e.printStackTrace(); } } } }
当使用上面两个方法关闭输入或者输出流后,Socket无法再次打开输入或者输出流,这种做法不适合保持持久通信状态的交互式应用。
适用一些一站式通信协议,如HTTP协议-----客户端请求服务器,服务器发送数据完成后无需再次发送数据,客户端只需读取响应数据,读取后Socket连接也就被关闭了
2、NIO实现非阻塞Socket通信
JDK1.4开始Java提供了NIO API开发高性能的网络服务器。
Java NIO为非阻塞式Socket提供了如下几个类:
Selector:是SelectableChannel对象的多路复用器,Selector实现了通过一个线程管理多个Channel,从而管理多个网络连接的目的。
Selector可以同时监控多个SelectableChannel的IO状况,是非阻塞的核心,一个Selector实例有三个SelectionKey集合:
1)所有的SelectionKey集合:代表注册在该Selector上的Channel,这个集合可以通过keys()方法返回
2)被选择的SelectionKey集合:代表了所有可通过select()方法获取的、需要进行IO处理的Channel,可以通过selectKeys()返回。
3)被取消的SelectionKey集合:代表了所有取消注册关系的Channel,在下一次执行select()方法时,这些Channel对应的SelectionKey会被彻底删除。
Selector还提供了一系列select()相关方法:
1)int select():监控所有注册的channel,当其中有需要处理IO操作时,该方法放回,并将对应的SelectionKey加入选择的SelectionKey集合中,该方法返回这些Channel的数量。
2)int select(long timeout):可以设置超时时长的select()操作
3)int selectNow():执行一个立即返回的select()操作,相对与无参的select()操作,该方法不会阻塞线程。
4)Selector wakeup():使一个未返回的select()立即返回。
SelectableChannel:代表可以支持非阻塞IO操作的Channel对象,它可以被注册到Selector上,这种注册关系由SelectionKey实例表示。SelectableChannel对象支持阻塞和非阻塞两种模式,默认是阻塞模式,必须在非阻塞模式下才可以使用非阻塞IO操作。
1)register():将Channel注册到指定Selector上。
2)SelectableChannel configureBlocking(boolean block):设置是否采用阻塞模式。
3)boolean isBlocking():返回Channel是否是阻塞模式。
4)int validOps():返回一个整数值,表示该Channel所支持的的IO操作。(不同的SelectableChannel 支持的操作不一样)
SelectionKey:该对象代表了SelectableChannel和Selector之间的注册关系,其中用静态常量定义了4种IO操作:
1)OP_READ:读
2)OP_WRITE:写
3)OP_CONNECT:连接
4)OP_ACCEPT:接收
下面使用NIO实现简单聊天室
package chatServer; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.Channel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; public class NServer { static final int PORT = 30000; private Selector selector = null; //定义编码、解码对象 private Charset charset = Charset.forName("UTF-8"); public void init() { try { //获取Selector对象 selector = Selector.open(); //获取未绑定的ServerSocketChannel对象 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); InetSocketAddress isc = new InetSocketAddress("127.0.0.1", PORT); //将ServerSocketChannel邦到指定ip,端口 serverSocketChannel.bind(isc); //设置serverSocketChannel以非阻塞方式工作 serverSocketChannel.configureBlocking(false); //将serverSocketChannel注册到Selector serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while(selector.select() > 0) { //依次处理Selector上每个已选择的SelectionKey for(SelectionKey sk : selector.selectedKeys()) { //移除正在处理的SelectionKey selector.selectedKeys().remove(sk); //如果sk对应的channel包含客户端的连接请求 if(sk.isAcceptable()) { //接受连接,产生服务端的SocketChannel. SocketChannel socketChannel = serverSocketChannel.accept(); //设置非阻塞模式 socketChannel.configureBlocking(false); //SocketChannel注册到Selector socketChannel.register(selector, SelectionKey.OP_READ); //sk对应的channel设置为accept,准备接收其他请求你 sk.interestOps(SelectionKey.OP_ACCEPT); } //如果sk对应的channel可读 if(sk.isReadable()) { SocketChannel sc = (SocketChannel) sk.channel(); ByteBuffer buff = ByteBuffer.allocate(1024); String content = "" ; try { while(sc.read(buff)>0) { buff.flip(); content += charset.decode(buff); } System.out.println("读取数据:" + content); //将sk对应的Channel设置成READ,准备下一次读取 sk.interestOps(SelectionKey.OP_READ); } catch (Exception e) { //删除SelectionKey sk.cancel(); if(sk.channel() != null) { sk.channel().close(); } } //如果读取到消息,将消息发送到各个客户端 if(content.length() > 0) { for(SelectionKey key : selector.keys()) { Channel targetChannel = key.channel(); //剔除ServerSocketChannel if(targetChannel instanceof SocketChannel) { //将读取到的内容写入Channel中 ((SocketChannel) targetChannel).write(charset.encode(content)); } } } } } } } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { NServer nServer = new NServer(); nServer.init(); } }
package charClient; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Scanner; public class NClient { static final int PORT = 30000; private Selector selector = null; private SocketChannel socketChannel ; private Charset charset = Charset.forName("UTF-8"); public void init() { try { //检测SocketChannel的Selector对象 selector = Selector.open(); InetSocketAddress isa = new InetSocketAddress("127.0.0.1", PORT); //创建连接到指定主机的SocketChannel socketChannel = SocketChannel.open(isa); //设置非阻塞 socketChannel.configureBlocking(false); //注册channel,监听读 socketChannel.register(selector, SelectionKey.OP_READ); //启动线程,读取服务器来的数据 new NClientThread(selector).start(); //创建键盘输入流 Scanner scan = new Scanner(System.in); while(scan.hasNextLine()) { String line = scan.nextLine(); socketChannel.write(charset.encode(line)); } } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { NClient nClient = new NClient(); nClient.init(); } } package charClient; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; public class NClientThread extends Thread{ private Selector selector = null; private Charset charset = Charset.forName("UTF-8"); public NClientThread(Selector selector) { this.selector = selector ; } @Override public void run() { try { while(selector.select() > 0) { for(SelectionKey sk : selector.selectedKeys()) { //移除正在处理的SelectionKey selector.selectedKeys().remove(sk); if(sk.isReadable()) { SocketChannel sc = (SocketChannel) sk.channel(); ByteBuffer buff = ByteBuffer.allocate(1024); String content = "" ; while(sc.read(buff) > 0) { buff.flip(); content += charset.decode(buff); } System.out.println("聊天信息:"+content); //为下一次读做准备 sk.interestOps(SelectionKey.OP_READ); } } } } catch (IOException e) { e.printStackTrace(); } } }
package chatServer; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.Channel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; public class NServer { static final int PORT = 30000; private Selector selector = null; //定义编码、解码对象 private Charset charset = Charset.forName("UTF-8"); public void init() { try { //获取Selector对象 selector = Selector.open(); //获取未绑定的ServerSocketChannel对象 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); InetSocketAddress isc = new InetSocketAddress("127.0.0.1", PORT); //将ServerSocketChannel邦到指定ip,端口 serverSocketChannel.bind(isc); //设置serverSocketChannel以非阻塞方式工作 serverSocketChannel.configureBlocking(false); //将serverSocketChannel注册到Selector serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while(selector.select() > 0) { //依次处理Selector上每个已选择的SelectionKey for(SelectionKey sk : selector.selectedKeys()) { //移除正在处理的SelectionKey selector.selectedKeys().remove(sk); //如果sk对应的channel包含客户端的连接请求 if(sk.isAcceptable()) { //接受连接,产生服务端的SocketChannel. SocketChannel socketChannel = serverSocketChannel.accept(); //设置非阻塞模式 socketChannel.configureBlocking(false); //SocketChannel注册到Selector socketChannel.register(selector, SelectionKey.OP_READ); //sk对应的channel设置为accept,准备接收其他请求你 sk.interestOps(SelectionKey.OP_ACCEPT); } //如果sk对应的channel可读 if(sk.isReadable()) { SocketChannel sc = (SocketChannel) sk.channel(); ByteBuffer buff = ByteBuffer.allocate(1024); String content = "" ; try { while(sc.read(buff)>0) { buff.flip(); content += charset.decode(buff); } System.out.println("读取数据:" + content); //将sk对应的Channel设置成READ,准备下一次读取 sk.interestOps(SelectionKey.OP_READ); } catch (Exception e) { //删除SelectionKey sk.cancel(); if(sk.channel() != null) { sk.channel().close(); } } //如果读取到消息,将消息发送到各个客户端 if(content.length() > 0) { for(SelectionKey key : selector.keys()) { Channel targetChannel = key.channel(); //剔除ServerSocketChannel if(targetChannel instanceof SocketChannel) { //将读取到的内容写入Channel中 ((SocketChannel) targetChannel).write(charset.encode(content)); } } } } } } } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { NServer nServer = new NServer(); nServer.init(); } }