Netty通信技术

一、简介

一、概述

        Netty是JBOSS提供的一个开源框架, 本质为网络应用程序框架, 提供了非阻塞的、事件驱动的网络应用程序框架和工具,便于快速开发出高性能、高可靠性的网络服务端以及客户端。

二、核心架构

        Netty通信技术

上图是netty的核心架构,从官网上截取的, 有兴趣的可以去看看

核心:

  1. 可拓展的事件模型
  2. 统一的通信API,简化了通信编码
  3. 零拷贝和丰富的字节缓冲区

传输服务:

  1. 支持Socket和Datagram(数据报)
  2. HTTP传输
  3. In-VM Pipe(管道协议,JVM的一种进程)

协议支持:

  1. HTTP以及WebSocket
  2. SSL安全套接字协议支持
  3. Google Protobuf(序列化框架)
  4. 支持zlib、gzip压缩
  5. 支持大文件的传输
  6. RTSP(实时流传输协议, 是TCP/IP协议体系中的一个应用层协议)
  7. 支持二进制协议并且提供完整的单元测试

三、为什么使用Netty不使用Java原生NIO

Netty主要采用的也是NIO,并且是基于JDK中的NIO做了一些实现以及升级

主要有四点:

  1. Netty的API对开发者更友好,JDK中的API功能薄弱且复杂(如:ByteBuffer改为ByteBuf)
  2. Netty中采用了Reactor线程模型,能够保证自身的线程安全
  3. Netty能实现高可用,解决了一些传输的问题,如粘包、半包、断路重连等
  4. 解决了Bug,如JDK的NIO中epoll bug

四、在使用Netty的项目

  • 数据库: Cassandra
  • 大数据处理: Spark、Hadoop
  • Message Queue:RocketMQ
  • 检索: Elasticsearch
  • 框架:gRPC、Apache Dubbo、Spring5(响应式编程WebFlux)
  • 分布式协调器:ZooKeeper
  • 工具类: async-http-client

二、Reactor模型

Reactor线程模型是一种思想,不属于Java,也不属于Netty,其定义了三种角色

  • Reactor:用于监听和分配事件,将I/O事件分给对应的Handler。新的事件包括建立连接就绪、读就绪、写就绪。
  • Acceptor:处理客户端连接,并分配到处理器链中(可暂时简单理解为ServerSocketChannel)
  • Handler:将自身与事件绑定,执行非阻塞的读和写任务,从channel中读入,完成处理业务逻辑, 再将结果写入channel

通过这三种角色定义出三种NIO的模式

单Reactor-单线程模式 

Netty通信技术

Netty通信技术

 

        所有的接收请求以及处理数据都是有一个线程执行的,所以在一定的数量之后,性能就会下降。

单Reactor-多线程模式

Netty通信技术 

Netty通信技术

 

         基于刚刚的单Reactor单线程模式,我们将消耗时间较长的编解码、业务计算抽取出去,建立一个线程池进行处理,这样能提升性能,但还不是最优解。

主从Reactor-多线程模型

Netty通信技术

Netty通信技术 

        这次我们再次基于上面的模型进行调整,将专门负责接受连接的ServerSocketChannel另开一个Reactor进行调用,这个Reactor称为主;主Reactor将Channel建立在专门负责读写的从Reactor上,这就是所谓的1+N+M(1个监听线程,负责监听新的socket;N个IO线程,负责对socket进行读写;M个worker线程,负责处理数据)。

工作流程:

  1. Reactor主线程MainReactor对象通过selector监听到客户端的连接请求,通过Acceptor处理客户端连接事件。
  2. Acceptor跟客户端建立好socket连接之后,MainReactor会将连接分配给SubReactor。
  3. SubReactor将连接注册到自己的Selector的队列中进行监听,并创建对应的Handler对各种事件进行处理。
  4. 当连接上有新的事件发生时,SubReactor会调用对应的Handler进行处理。
  5. Handler通过read从channel和缓冲区上读取请求数据,然后将分发给Worker进行处理。
  6. Worker处理完数据会将结果再返还给Handler,Handler再通过send请求将数据发给客户端。
  7. 一个MainReactor可以对应多个SubReactor。

优势所在:

  1. 各个线程职责简单且明确,MainReactor只需要负责注册连接,SubReactor负责后续业务处理。
  2. MainReactor和SubReactor交互简单,主只需将连接交给从,从也无需返回数据。
  3. 多个SubReactor可以处理高并发业务。

三、Netty对Reactor的实现

Netty通信技术

 在Netty这个部分当中可以看到用了很多的池化思想

工作流程:

  1. Netty提供了两个线程池,一个BossGroup、一个WorkerGroup,每个池中都有EventLoop(相当于一个线程,可以是NIO,BIO,AIO)
  2. 每个EventLoop中包含Selector和TaskQueue
  3. 每个BossEventLoop负责以下三件事
  4. ①select:轮询注册ServerSocketChannel上的accept事件
  5. ②processSeleckedKeys:与客户端进行连接,并创建SocketChannel,将它注册到某一WorkerEventLoop上
  6. ③runAllTasks:继续处理其他事件
  7. 每个WorkerEventLoop也负责一下三件事
  8. ①select:轮询注册SocketChannel上的read/write事件
  9. ②processSeleckedKeys:在对应的SocketChannel上的PipeLine上处理相对应的数据
  10. ③runAllTasks:继续处理队列中的其他事件

ChannelPipeline和ChannelHandler

Netty通信技术

         如图所示,ChannelPipeline为ChannelHandler的容器,每个SocketChannel都会与一个ChannelPipeLine绑定。假如这是服务端程序,读取处理我们称数据是入站的,需要经过一系列Handler处理后;如果服务器想向客户端写回数据,也需要经过一系列的Handler处理,我们称之为出站。

        ChannelHandler则分为出站和入站的处理器,还有混合型的既能处理出站也能处理入站的

Netty通信技术

 四、Netty使用的示例代码

服务端

public class NettyServer {
    public static void main(String[] args) {
        NettyServer server = new NettyServer();
        server.start(8888);
    }

    private void start(int port) {
        // 创建主线程池
        EventLoopGroup boss = new NioEventLoopGroup(1);
        // 创建从线程池
        EventLoopGroup work = new NioEventLoopGroup();
        try {
            // 创建服务端的引导类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, work)
                    // 配置服务端主的channel
                    .channel(NioServerSocketChannel.class)
                    // 配置服务端handler
                    .handler(new LoggingHandler(LogLevel.INFO))
                    // 配置从的handler,也就是服务端连接的
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new ServerInboundHandler1());
                        }
                    });
            ChannelFuture future = serverBootstrap.bind(port).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }
}

客户端

public class NettyClient {
    public static void main(String[] args) {
        NettyClient client = new NettyClient();
        client.start("127.0.0.1",8888);
    }

    private void start(String host, int port) {
        EventLoopGroup loopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(loopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                        }
                    });
            // 连接服务器
            ChannelFuture future = bootstrap.connect(host, port).sync();
            // 客户端向服务端发送数据
            Channel channel = future.channel();
            String msg= "我是Netty客户端, 你收到了吗?";
            ByteBuf buffer = channel.alloc().buffer();
            buffer.writeBytes(msg.getBytes(StandardCharsets.UTF_8));
            channel.writeAndFlush(buffer);

            // 等待关闭
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            loopGroup.shutdownGracefully();
        }
    }
}

入站处理器

@Slf4j
public class ServerInboundHandler1 extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("ServerInboundHandler1 channelActive 执行了");
        super.channelActive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] dst=new byte[buf.readableBytes()];
        buf.readBytes(dst);
        String s = new String(dst, Charset.defaultCharset());
        System.out.println(s);
        log.info("读取到从客户端的数据"+s);
        super.channelRead(ctx, msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}

上一篇:基于cacit环境下实现:网络环境点到点的ipsla监控-2016-5-20-cacti-成功的


下一篇:CentOS:Device eth0 does not seem to be present 问题解决方法