实现的功能:
运行一个服务端,运行多个客户端。在客户端1,发送消息,其余客户端都能收到客户端1发送的消息。
重点:
1、ByteBuffer在使用时,注意flip()方法的调用,否则读取不到消息。
服务端
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set; public class NioServer {
public static void main(String[] args) throws Exception{
//创建服务端
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
//绑定端口
serverSocketChannel.bind(new InetSocketAddress("localhost",12345));
//创建selector
Selector selector = Selector.open();
//在selector中注册服务端的链接事件(注1)
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// //用于存放客户端的链接,用远端的端口,作为唯一标识(由于是本机开启多个客户端进行测试,所以不存在端口冲突问题)
// Map<Integer,SocketChannel> clients = new HashMap<>();
List<SocketChannel> clients = new ArrayList<>(); while (true){
//阻塞等待事件的到来
selector.select();
//获取被触发的事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
//遍历触发的事件
while (iterator.hasNext()){
try {
//获取事件
SelectionKey event = iterator.next();
//是否可以链接
if(event.isAcceptable()){
//为什么需要强转? 因为在(注1)中,我们注册的是 ServerSocketChannel ,所有需要强转回来。(注2)
ServerSocketChannel ssc = (ServerSocketChannel) event.channel();
//获取到链接的socketchannel
SocketChannel socketChannel = ssc.accept();
socketChannel.configureBlocking(false);
//将获取到的链接,注册读事件到selector中,
socketChannel.register(selector,SelectionKey.OP_READ);
// //将获取到的客户端,保存起来,用于跟其它客户端进行通信,由于不涉及线程问题,所以使用map足已
// clients.put(((InetSocketAddress)socketChannel.getRemoteAddress()).getPort(),socketChannel);
clients.add(socketChannel);
}else if(event.isReadable()){ //是否可以读取
//同理(注2)
SocketChannel socketChannel = (SocketChannel) event.channel();
//创建socketChannel需要的buffer
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
String receiveMessage = "";
while (true){
try{
//重置buffer
byteBuffer.clear();
int read = socketChannel.read(byteBuffer);
if(read <= 0 ){
//当读取到末尾时,跳出循环
break;
}
receiveMessage += new String(byteBuffer.array(), Charset.forName("UTF-8"));
}catch (Exception e){
e.printStackTrace();
break;
}
}
System.out.println("收到的消息为:"+((InetSocketAddress)socketChannel.getRemoteAddress()).getPort()+"---"+receiveMessage);
//拼装需要发送的消息
final ByteBuffer otherbf = ByteBuffer.allocate(receiveMessage.length()+10);
otherbf.put((((InetSocketAddress)socketChannel.getRemoteAddress()).getPort()+":"+receiveMessage).getBytes());
System.out.println(new String(otherbf.array()));
//遍历客户端,发送消息
clients.stream().forEach(sc -> {
try {
if(((InetSocketAddress)socketChannel.getRemoteAddress()).getPort() ==
((InetSocketAddress)sc.getRemoteAddress()).getPort()){
//消息不发给自己
}else{
otherbf.flip();
sc.write(otherbf);
}
}catch (Exception e){
e.printStackTrace();
}
});
}
}catch (Exception e){
//添加try是为了程序的健壮
e.printStackTrace();
}finally {
//删除已经处理了的事件
iterator.remove();
}
}
}
}
}
客户端
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set; public class NioClient { public static void main(String[] args) throws Exception{ //打开socketChannel
SocketChannel socketChannel = SocketChannel.open();
//设置为非阻塞
socketChannel.configureBlocking(false);
//链接到服务器
socketChannel.connect(new InetSocketAddress("localhost",12345));
//创建Selector
Selector selector = Selector.open();
//向Selector注册连接事件
socketChannel.register(selector, SelectionKey.OP_CONNECT);
//阻塞等待事件触发
selector.select();
//获取连接事件key
Set<SelectionKey> connectEventKey = selector.selectedKeys();
//获取触发的连接事件
SelectionKey connectEvent = connectEventKey.iterator().next();
//删除已经处理了的事件
selector.selectedKeys().clear();
//转换为注册时的channel
SocketChannel eventSocketChannel = (SocketChannel) connectEvent.channel();
//向selector注册读事件
eventSocketChannel.register(selector,SelectionKey.OP_READ);
new Thread(){
@Override
public void run() {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
ByteBuffer inputBuffer = ByteBuffer.allocate(1024); //长度需要重新考量
try{
if(socketChannel.finishConnect()){
System.out.println("完成连接。");
}
while (true){
String s = reader.readLine();
inputBuffer.clear();
inputBuffer.put(s.getBytes());
inputBuffer.flip();
socketChannel.write(inputBuffer);
}
}catch (Exception e){
e.printStackTrace();
}finally {
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}.start();
//没有和连接事件合并到一个while里面,是因为压根就不会有两次连接,所以我将连接事件单独出来
while (true){
//阻塞等待事件触发,这次是触发读事件
selector.select();
Set<SelectionKey> readEventKey = selector.selectedKeys();
Iterator<SelectionKey> readIterator = readEventKey.iterator();
SocketChannel readSocketChannel = (SocketChannel) readIterator.next().channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(256);
String content = "";
while (true){
byteBuffer.clear();
int read = readSocketChannel.read(byteBuffer);
if(read <= 0){
break;
}
content += new String(byteBuffer.array());
}
System.out.println("收到的消息为:"+content);
readIterator.remove();
}
}
}