文章目录
服务端
初始化一个ServerSocketChannel,绑定端口,然后使用Selector监听accept事件。
当有accept发生时,表示有客户端连接进来了,获取客户端的SocketChannel,然后注册其read事件;用来接收客户端发送的消息。
package chatroom;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* 服务端
*
* @author wenei
* @date 2021-07-20 20:36
*/
public class Server {
private static final Logger log = Logger.getLogger(Server.class.getName());
private int port;
private List<SocketChannel> clientChannelList = new ArrayList<>();
public Server(int port) {
this.port = port;
}
public void start() throws IOException {
// 初始化服务端channel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
// 新建Selector
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
final int selectCount = selector.select();
if (selectCount <= 0) {
continue;
}
final Set<SelectionKey> selectionKeys = selector.selectedKeys();
final Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
final SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
// 当有accept事件时,将新的连接加入Selector
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel accept = serverChannel.accept();
accept.configureBlocking(false);
clientChannelList.add(accept);
accept.register(selector, SelectionKey.OP_READ);
log.log(Level.INFO, "新连接 " + accept);
} else if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
log.log(Level.INFO, "可读连接 " + socketChannel);
ByteBuffer buffer = ByteBuffer.allocate(60);
try {
/**
* 当客户端非正常退出时,read抛出异常,属于被动性关闭;
* 当客户端正常返回时,返回-1,但也是readable信号,所以需要处理
*/
final int read = socketChannel.read(buffer);
if (read == -1) {
log.log(Level.INFO, "连接主动关闭:" + socketChannel);
clientChannelList.remove(socketChannel);
socketChannel.close();
continue;
}
} catch (IOException e) {
log.log(Level.INFO, "连接被动关闭:" + socketChannel);
clientChannelList.remove(socketChannel);
socketChannel.close();
continue;
}
buffer.flip();
byte[] bytes = new byte[60];
int index = 0;
while (buffer.hasRemaining()) {
bytes[index++] = buffer.get();
}
bytes[index] = '\0';
log.log(Level.INFO, "接受数据: " + new String(bytes, StandardCharsets.UTF_8).trim());
// 广播
clientChannelList.forEach(channel -> {
if (channel != socketChannel) {
buffer.flip();
try {
channel.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
});
// buffer.clear();
}
}
}
}
public static void main(String[] args) throws IOException {
new Server(10022).start();
}
}
客户端
使用主线程获取键盘输入,然后传给服务端。
使用子线程接收服务端发送的信息并显示。
package chatroom;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
/**
* 客户端
*
* @author wenei
* @date 2021-07-21 9:14
*/
public class Client {
/**
* 客户端接收信息线程
*/
static class ClientReceiveThread implements Runnable {
/**
* 客户端socket
*/
private SocketChannel socketChannel;
public ClientReceiveThread(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
@Override
public void run() {
try {
Selector selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_READ);
while (true) {
final int selectCount = selector.select(100);
if (Thread.currentThread().isInterrupted()) {
System.out.println("连接关闭");
socketChannel.close();
return;
}
if (selectCount <= 0) {
continue;
}
final Set<SelectionKey> selectionKeys = selector.selectedKeys();
final Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
final SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer recvBuffer = ByteBuffer.allocate(60);
socketChannel.read(recvBuffer);
recvBuffer.flip();
byte[] bytes = new byte[60];
int index = 0;
while (recvBuffer.hasRemaining()) {
bytes[index++] = recvBuffer.get();
}
bytes[index] = '\0';
System.out.println("接受数据: " + new String(bytes, StandardCharsets.UTF_8).trim());
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private int port;
public Client(int port) {
this.port = port;
}
public void start() throws IOException {
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(port));
socketChannel.configureBlocking(false);
Scanner scanner = new Scanner(System.in);
ByteBuffer buffer = ByteBuffer.allocate(60);
Thread thread = new Thread(new ClientReceiveThread(socketChannel));
thread.start();
while (true) {
String data = scanner.nextLine();
if (data.equals("exit")) {
break;
}
System.out.println("输入数据:" + data);
buffer.put(data.getBytes(StandardCharsets.UTF_8));
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
}
thread.interrupt();
}
public static void main(String[] args) throws IOException {
new Client(10022).start();
}
}