Netty 框架学习 —— 传输


概述

流经网络的数据总是具有相同的类型:字节,这些字节如何传输主要取决于我们所说的网络传输。用户并不关心传输的细节,只在乎字节是否被可靠地发送和接收

如果使用 Java 网络编程,你会发现,某些时候当你需要支持高并发连接,随后你尝试将阻塞传输切换为非阻塞传输,那么你会因为这两种 API 的截然不同而遇到问题。Netty 提供了一个通用的 API,这使得转换更加简单。


传统的传输方式

这里介绍仅使用 JDK API 来实现应用程序的阻塞(OIO)和非阻塞版本(NIO)

阻塞网络编程如下:

public class PlainOioServer {

    public void server(int port) throws IOException {
        // 将服务器绑定到指定端口
        final ServerSocket socket = new ServerSocket(port);
        try {
            while (true) {
                // 接收连接
                final Socket clientSocket = socket.accept();
                System.out.println("Accepted connection from " + clientSocket);
                // 创建一个新的线程来处理连接
                new Thread(() -> {
                    OutputStream out;
                    try {
                        out = clientSocket.getOutputStream();
                        // 将消息写给已连接的客户端
                        out.write("Hi\r\n".getBytes(StandardCharsets.UTF_8));
                        out.flush();
                        // 关闭连接x
                        clientSocket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } finally {
                        try {
                            clientSocket.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

这段代码可以处理中等数量的并发客户端,但随着并发连接的增多,你决定改用异步网络编程,但异步的 API 是完全不同的

非阻塞版本如下:

public class PlainNioServer {

    public void server(int port) throws IOException {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        ServerSocket ssocket = serverChannel.socket();
        InetSocketAddress address = new InetSocketAddress(port);
        // 将服务器绑定到选定的端口
        ssocket.bind(address);
        // 打开 Selector 来处理 Channel
        Selector selector = Selector.open();
        // 将 ServerSocket 注册到 Selector 以接受连接
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        final ByteBuffer msg = ByteBuffer.wrap("Hi\r\n".getBytes());
        while (true) {
            try {
                // 等待需要处理的新事件,阻塞将一直持续到下一个传入事件
                selector.select();
            } catch (IOException e) {
                e.printStackTrace();
                break;
            }
            Set<SelectionKey> readKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = readKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                try {
                    // 检查事件是否是一个新的已经就绪可以被接受的连接
                    if (key.isAcceptable()) {
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                        SocketChannel client = server.accept();
                        client.configureBlocking(false);
                        // 接受客户端,并将它注册到选择器
                        client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate());
                        System.out.println("Accepted connection from " + client);
                    }
                    // 检查套接字是否已经准备好写数据
                    if (key.isWritable()) {
                        SocketChannel client = (SocketChannel) key.channel();
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        while (buffer.hasRemaining()) {
                            // 将数据写到已连接的客户端
                            if (client.write(buffer) == 0) {
                                break;
                            }
                        }
                        client.close();
                    }
                } catch (IOException exception) {
                    key.cancel();
                    try {
                        key.channel().close();
                    } catch (IOException cex) {
                        cex.printStackTrace();
                    }
                }
            }
        }
    }
}

可以看到,阻塞和非阻塞的代码是截然不同的。如果为了实现非阻塞而完全重写程序,无疑十分困难


基于 Netty 的传输

使用 Netty 的阻塞网络处理如下:

public class NettyOioServer {

    public void server(int port) throws Exception {
        final ByteBuf buf = Unpooled.unreleasableBuffer(
                Unpooled.copiedBuffer("Hi\n\r", StandardCharsets.UTF_8));
        EventLoopGroup group = new OioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)
                    // 使用阻塞模式
                    .channel(OioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(
                                    new SimpleChannelInboundHandler<>() {
                                        @Override
                                        protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
                                            ctx.writeAndFlush(buf.duplicate())
                                                    .addListener(ChannelFutureListener.CLOSE);
                                        }
                                    });
                        }
                    });
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }
}

而非阻塞版本和阻塞版本几乎一模一样,只需要改动两处地方

EventLoopGroup group = new NioEventLoopGroup();
b.group(group).channel(NioServerSocketChannel.class);

传输 API

传输 API 的核心是 interface Channel,它被用于所有的 IO 操作。每个 Channel 都将被分配一个 ChannelPipeline 和 ChannelConfig,ChannelConfig 包含了该 Channel 的所有配置设置,ChannelPipeline 持有所有将应用于入站和出站数据以及事件的 ChannelHandler 实例

除了访问所分配的 ChannelPipeline 和 ChannelConfig 之外,也可以利用 Channel 的其他方法

方法名 描述
eventLoop 返回分配给 Channel 的 EventLoop
pipeline 返回分配给 Channel 的 ChannelPipeline
isActive 如果 Channel 活动的,返回 true
localAddress 返回本地的 SocketAddress
remoteAddress 返回远程的 SocketAddress
write 将数据写到远程节点
flush 将之前已写的数据冲刷到底层传输
writeAndFlush 等同于调用 write() 并接着调用 flush()

内置的传输

Netty 内置了一些可开箱即用的传输,但它们所支持的协议不尽相同,因此你必须选择一个和你的应用程序所使用协议相容的传输

名称 描述
NIO io.netty.channel.socket.nio 使用 java.nio.channels 包作为基础
Epoll io.netty.channel.epoll 由 JNI 驱动的 epoll() 和非阻塞 IO,可支持只有在 Linux 上可用的多种特性,比 NIO 传输更快,且完全非阻塞
OIO io.netty.channel.socket.oio 使用 java.net 包作为基础
Local io.netty.channel.local 可以在 VM 内部通过管道进行通信的本地传输
Embedded io.netty.channel.embedded Embedded 传输,允许使用 ChannelHandler 而不需要一个真正的基于网络的传输,主要用于测试

上一篇:rabbitmq-workqueue模型(1对多模型)


下一篇:TCP通信_多线程接收用户请求,实现简单的密码验证