Netty 框架学习 —— 引导


概述

前面我们学习了 ChannelPipeline、ChannelHandler 和 EventLoop 之后,接下来的问题是:如何将它们组织起来,成为一个可实际运行的应用程序呢?答案是使用引导(Bootstrap),引导一个应用程序是指对它进行配置,并使它运行起来的过程,也即是将所有的框架组件在后台组合起来并启用


Bootstrap 类

引导类的层次结构包含一个抽象父类和两个具体子类

Netty 框架学习 —— 引导

如果将客户端和服务端视为两个应用程序,那么它们的功能是不一样的:服务端致力于使用一个父 Channel 来接受客户端的连接,并创建子 Channel 以用于它们之间的通信;而客户端很可能只需要一个单独的 Channel 来用于所有的网络交互。这两种方式之间通用的引导步骤由 AbstractBootstrap 处理,而特定于客户端或者服务端的引导步骤分别由 Bootstrap 或 ServerBootstrap 处理


引导客户端

Bootstrap 类被用于客户端或者使用了无连接协议的应用程序,该类的 API 如表所示:

名称 描述
Bootstrap group(EventLoopGroup) 设置用于处理 Channel所有事件的 EventLoopGroup
Bootstrap channel(Class<? extends C>)
Bootstrap channelFactory(ChannelFactory<? extends C>)
channel() 方法指定了 Channel 的实现类。如果该实现类没提供默认的构造函数,可以通过调用 channelFactory() 方法来指定一个工厂类,它将会被 bind() 方法调用
Bootstrap localAddress(SocketAddress) 指定 Channel 应该绑定的本地地址,如果没有指定,则由操作系统创建一个随机的地址
<T> Bootstrap option(ChannelOption<T> option, T value) 设置 ChannelOption,其将被应用到每个新创建的 Channel 的 ChannelConfig
<T> Bootstrap attr(Attribute<T> key, T value) 指定新创建的 Channel 的属性值
Bootstrap handler(ChannelHandler) 设置将被添加到 ChannelPipeline 以接收事件通知的 ChannelHandler
Bootstrap remoteAddress(SockerAddress) 设置远程地址
ChannelFuture connect() 连接到远程节点并返回一个 ChannelFuture
ChannelFuture bind() 绑定 Channel 并返回一个 ChannelFuture

Bootstrap 类负责为客户端和使用无连接协议的应用程序创建 Channel

Netty 框架学习 —— 引导

代码清单展示了引导一个使用 NIO TCP 传输的客户端

EventLoopGroup group = new NioEventLoopGroup();
// 创建一个 Bootstrap 类的实例以创建和连接新的客户端
Bootstrap bootstrap = new Bootstrap();
// 设置 EventLoopGroup
bootstrap.group(group)
    // 指定要使用的 Channel 实现
    .channel(NioSocketChannel.class)
    // 设置用于 Channel 事件和数据的 ChannelInboundHandler
    .handler(new SimpleChannelInboundHandler<ByteBuf>() {
        @Override
        protected void channeRead0(
        	ChannelHandlerContext channelHandlerContext,
            ByteBuf byteBuf) throws Exception {
            Syetem.out.println("Received data");
        }
    });
// 连接到远程主机
ChannelFuture future = bootstrap.connect(
	new InetSocketAddress("www.manning.com", 80)	
);
future.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        if(channelFuture.isSuccess()) {
            System.out.println("Connection established");
        } else {
            System.err.println("Connection attempt failed");
            channelFuture.cause().printStackTrace();
        }
    }
})

引导服务端

下表列出了 ServerBootstrap 类的方法

名称 描述
group 设置 ServerBootstrap 要用的 EventLoopGroup
channel 设置将要被实例化的 ServerChannel 类
channelFactory 如果不能通过默认的构造函数创建 Channel,那么可以提供一个 ChannelFactory
localAddress 指定 ServerChannel 应该绑定的本地地址,如果没有指定,则由操作系统使用一个随机地址
option 指定要应用到新创建的 ServerChannel 的 ChannelConfig 的 ChannelOption
childOption 指定当子 Channel 被接受时,应用到子 Channel 的 ChannelConfig 的 ChannelOption
attr 指定 ServerChannel 上的属性
childAttr 将属性设置给已经被接受的子 Channel
handler 设置被添加到 ServerChannel 的 ChannelPipeline 中的 ChannelHandler
childHandler 设置将被添加到已被接受的子 Channel 的 ChannelPipeline 中的 ChannelHandler
绑定 ServerChannel 并且返回一个 ChannelFuture,其将会在绑定操作完成后收到通知

ServerChannel 的实现负责创建子 Channel,这些子 Channel 代表了已被接受的连接。ServerBootstrap 提供了 childHandler()、childAttr() 和 childOption() 这些方法,以简化将设置应用到已被接受的子 Channel 的 ChannelConfig 的任务

下图展示了 ServerBootstrap 在 bind() 方法被调用时创建了一个 ServerChannel,并且该 ServerChannel 管理了多个子 Channel

Netty 框架学习 —— 引导

引导服务器的代码如下所示:

NioEventLoopGroup group = new NioEventLoopGroup();
// 创建 ServerBootstrap
ServerBootstrap bootstrap = new ServerBootstrap();
// 设置 EventLoopGroup
bootstrap.group(group)
    // 指定要使用的 Channel 实现
    .channel(NioServerSocketChannel.class)
    // 设置用于处理已被接受的子 Channel 的 IO 及数据的 ChannelInboundHandler
    .childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx,
                                   ByteBuf byteBuf) throws Exception {
            System.out.println("Received data");
        }
    });
ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
future.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        if(channelFuture.isSuccess()) {
            System.out.println("Server bound");
        } else {
            System.out.println("Bound attempt failed");
            channelFuture.cause().printStackTrace();
        }
    }
})

从 Channel 引导客户端

假设要求你的服务器充当第三方的客户端,在这种情况下,需要从已经被接受的子 Channel 中引导一个客户端 Channel

我们可以按照前面讲过的引导客户端的方式创建新的 Bootstrap 实例,但这要求你为每个新创建的客户端 Channel 定义一个 EventLoop,这会产生额外的线程,并且子 Channel 和客户端 Channel 之间交换数据时不可避免会发生上下文切换

一个更好的解决办法是:通过将子 Channel 的 EventLoop 传递给 Bootstrap 的 group() 方法来共享该 EventLoop 传递给 Bootstrap 的 group() 方法来共享该 EventLoop,避免额外的线程创建和上下文切换

实现 EventLoop 共享涉及通过调用 group() 方法来设置 EventLoop

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup())
    .channel(NioServerSocketChannel.class)
    .childHandler(
		new SimpleChannelInboundHandler<ByteBuf>() {
            ChannelFuture connectFuture;
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                // 创建一个 Bootstrap 实例以连接到远程主机
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.channel(NioSockerChannel.class).handler(
                	new SimpleChannelInboundHandler<ByteBuf>() {
                        @Override
                        protected void channelRead0(
                        	ChannelHandlerContext ctx, ByteBuf in) throws Exception {
                            System.out.println("Received data");
                        }
                    });
                // 使用子 Channel 的 EventLoop
                bootstrap.group(ctx.channel().eventLoop());
                connectFuture = bootstrap.connect(new InetSocketAddress("www.manning.com", 80));
            }
            @Override
            protected void channelRead0(
            	ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
                if(connectFuture.isDone) {
                    // 当连接完成时,执行数据操作
                }
            }
        });

引导过程中添加多个 ChannelHandler

前面的引导过程中调用了 handler() 或者 childHandler() 方法来添加单个 channelHandler() 方法来添加单个 ChannelHandler,如果我们需要多个 ChannelHandler,Netty 提供了一个特殊的 ChannelInboundHandlerAdapter 子类:

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter

它定义了如下方法

protected abstract void initChannel(C ch) throws Exception;

这个方法提供了一种将多个 ChannelHandler 添加到一个 ChannelPipeline 中的简便方法,你只需要向 Bootstrap 或 ServerBootstrap 的实例提供你的 ChannelInitializer 实现即可。一旦 Channel 被注册到它的 EventLoop 之后,就会调用你的 initChannel() 版本,在该方法返回之后,ChannelInitializer 的实例将会从 ChannelPipeline 中移除自己

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup())
    .channel(NioServerSocketChannel.class)
    // 注册一个 ChannelInitializerImpl 的实例来设置 ChannelPipeline
    .childHandler(new ChannelInitializerImpl());
ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
future.sync();

final class ChannelInitializerImpl extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        CHannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new HttpClientCodec());
        pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
    }
}

使用 Netty 的 ChannelOption 和属性

在每个 Channel 创建时都手动配置可能会相当乏味,可以使用 option() 方法来将 ChannelOption 应用到引导,其值会自动应用到所创建的所有 Channel。可用的 ChannelOption 包括了底层连接的详细信息,如 keep-alive 或者超时属性以及缓冲区设置

Netty 应用程序通常与组织的专有软件集成在一起,而 Channel 甚至可能会在正常的 Netty 生命周期之外被使用。在某些常用属性和数据不可用时,Netty 提供了 AttributeMap 抽象以及 AttributeKey<T>,使用这些工具,可以安全地将任何类型的数据与客户端和服务端 Channel 相关联

例如,考虑一个用于跟踪用户和 Channel 之间关系的服务器应用程序,可以通过将用户的 ID 存储为 Channel 的一个属性来完成

// 创建一个 AttributeKey 以标识该属性
final AttributeKey<Integer> id = AttributeKey.newInstance("ID");
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(
		new SimpleChannelInboundHandler<ByteBuf>() {
        	@Override
            public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                // 使用 AttributeKey 检索属性以及它的值
                Integer idValue = ctx.channel().attr(id).get();
            }
            
            @Override
            public void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
                System.out.println("Received data");
            }
        });
// 设置 ChannelOption
bootstrap.option(ChannelOption.SO_KEEPALIVE, true)
    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
// 存储 id 属性
bootstrap.attr(id, 123456);
ChannelFuture future = bootstrap.connect(new InetSocketAddress("www.maning.com", 80));
future.syncUninterruptibly();

引导 DatagramChannel

前面使用的都是基于 TCP 协议的 SocketChannel,但 Bootstrap 类也可以用于无连接协议。为此,Netty 提供了各种 DatagramChannel 的实现,唯一的区别就是,不再调用 connect() 方法,而只调用 bind() 方法

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new OioEventLoopGroup())
    .channel(OioSocketChannel.class)
    .handler(
		new SimpleChannelInboundHandler<DatagramPacket>() {
            @Override
            public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
                System.out.println("Received data");
            }
        });
ChannelFuture future = bootstrap.bind(new InetSocketAddress(0));

关闭

引导使得你的应用程序启动,自然也需要优雅地进行关闭,你也可以让 JVM 在退出时处理一切,但这不符合优雅的定义

最重要的是,你需要关闭 EventLoopGroup,它将处理任何挂起的事件和任务,并随后释放所有活动线程。通过调用 EventLoopGroup.shutdownGracefully() 方法,将返回一个 Future,这个 Future 将在关闭完成时接收到通知。shutdownGracefully 是一个异步操作,你需要阻塞等待直到它完成,或者向返回的 Future 注册一个监听器

EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
    .channel(NioServerSocketChannel.class);
...
Future<?> future = group.shutdownGracefully();
future.syncUniterruptibly();

Netty 框架学习 —— 引导

上一篇:java 学习流程图


下一篇:Jenkins 构建自动化 .NET Core 发布镜像