springboot2+netty+protobuf(精品)

所有代码都已经上传到了gitee上,地址https://download.csdn.net/download/habazhu1110/16105832.主要为了赚点积分,但是肯定物超所值.

  • 前言:

工具:本文使用的是idea开发但是用eclipse也行,

技术: springboot用的是2.1.5, netty是最新的4.1.60.Final,protobuf用的是最新的3,但是每个部分都是独立的,

本文尽可能的阐述netty和protobuf的原理,本质以及搭建过程

  • 为什么使用netty,什么是nio和bio,什么是netty的0copy机制.

如果你急于撸代码可以跳过这一段,因为这段记下来本来就是对自己知识的总结和沉淀,如果形容起来就是厚积而薄发.......

首先讲两个结论:

.NIO比BIO牛逼

.netty是nio, 但是之所以是nio是因为linux提供了nio的支持.

听起来有点绕,我们捋一下,假设你买了一台电脑回来,这个时候是个裸机(只有硬件).你只能登录主板的BIOS设置.这个时候你需要干的事情只能是一个,

(1)安装linux系统(主要是windows的我不会,他那个好像是AIO具体机制没学过,而且内核机制也不一样). 安装完了以后你得电脑就不一样了,这个时候linux的程序放到了硬盘(也可能是你得固态)

(2)电脑重启,这个时候cpu开始执行引导区,加载指令,linux的启动指令就在里面,linux开始启动. 启动以后会在内存上开辟一块独立的地址(这个圈起来要考的)......他叫做kernal.也就是我们说的系统内核.

kernal是所有软件和硬件的桥梁.处于对硬件的保护(用户隐私).任何一个软件想操作硬件或者想操作其他软件,必须通过kernal. 举个简单的例子,如果你想把qq的内容复制粘贴到一个txt文件,首先txt需要向kernal发送一个复制请求(也可以是系统发出),然后qq把数据从他的内存地址复制到kernal(每个软件都跟kernal一样有自己的内存区), 再由kernal复制给txt. 所以同样一句话在内存中存了3遍.qq,kernal,txt. 为什么会这样呢!!!!因为安全,如果qq不提供复制的接口,kernal就无法调用qq.就会对txt系统说,滚.......如果txt能够绕过这个kernal检查,好吧!!!!你写了个病毒.

(3)到这里我们明白了两件事情: linux系统kernal是个内存桥梁,链接所有的软件区(每个软件会有自己的区域,并且不知道其他软件在哪)和所有的硬件.每一次操作都是数据从一个内存区复制到另外一个或者复制到硬件,也就是所谓的io开销.一般我们说的io其实指的就是kernal和其他系统间的复制.

  • 知道了以上内容我们就可以了解一下BIO和NIO了.

在原始的linux系统中(很老),kernal只提供了write和read两个方法, 什么意思呢, 假设我们现在在linux上启动了一段java代码. 此时内存就有了kernal区和jvm区. 现在有个请求通过网卡请求通过socket传递一个数据给我们,(长连接的实现其实就是不停的去读指定的内存区域).首先数据从网线写入网卡的内存区(硬件在网卡上),网卡调用kernal的写方法, 如果这个时候有个线程进来查询这个区块的状态,他是个写入状态,当写入完成后会变成可读状态,此时才可读.那么问题来了,假设我现在有一万个人和我建立了长连接, 我读第一个人, 可读但是无内容...跳过,第二个人,可读有内容,然后第三个....这个货在写,,也就是正在网卡往kernal复制呢......后面九千多个人心态崩了啊.....于是决定利用linx的时间片规则,调用linux的fork()再起一个进程(是进程...)然后直接找cpu去了....这就是BIO, 正常玩我就排队,一万个人排一排,一旦阻塞就马上开个进程.最多变成了一万排.....每个进程都有自己内存地址,cpu自己有一张进程指令表,这哥们就得对着表在内存上来回反复的横跳........

NIO呢,首先因为linux升级了,linux除了write和read还提供了epoll, 这玩意和read的区别在于read是重操作,你调用了就只能等.......他准备好了在返回给你,不拔电源就别想停.而epoll是个询问,问他,你准备好了么,马上返回.你可以决定自己是读还是不读.猛一看这玩意不咋地,原来一个read现在变成了epoll+read费事了啊.实际上呢.....我们还是一万个人站一队,然后我么开始第一个人,可读但是无内容...跳过,第二个人,可读有内容,然后第三个....这个货在写,,....跳过.....对,在这跳过了,然后下一个人..........然后到了队尾再从头来一次.猛一想是那么回事,牛逼了,仔细一想不对啊,干活的就是cpu对吧,不管咋地,我就是站一起不站一起差毛线啊.........我们可以对比一下

BIO: 一万个链接开启一万个进程, 然后cpu维护指令表,根据指令表循环执行

NIO:一万个链接交给netty处理,netty将链接放入线程池,默认用cpu*2的线程数,然后当cpu执行netty的时候,netty自己把这一万个人从头执行到尾(你要知道这个list内存物理地址是连续的),

我们假设linux只有这么一个进程,那么netty的cpu其实在一直执行,而BIO的cpu需要执行一下然后看一下指令表,所以性能要差很多.......

  • 我们再来说一下netty的零copy  https://www.jianshu.com/p/0ab705ae4f2d 这里有内容的出处,总之所谓的零拷贝就是降低了kernal和jvm的对象复制.降低了io操作

这时候我们回顾一下如果将一个qq的内容复制到txt文件里面过程, qq内存区-->kernal区-->txt内存区. 这就是两次copy过程.所以所谓的copy其实就是在内存的复制,主要就是kernal的复制.

我们看一下netty怎么零copy的.......其实还是得靠linux内核的api升级.

这次升级linux提供

sentfile()方法: 已有文件(硬件如硬盘,网卡,显卡),里面有DMA引擎直接就把数据放到kernal然后给网卡了.不经过JVM. 通过Java的FileChannel.transferTo()方法

mmap()方法: 就是再JVM创建的通过调用linux的mmap()然后传递内核和JVM地址的一个映射关系,适合于read和文件内容的改.但是给网卡时候还是需要copy进kernal(为了安全);

direct Buffer缓冲区:Netty的接收和发送ByteBuffer采用直接缓冲区(Direct Buffer)实现零拷贝,我得理解就是不往JVM写直接推给了内核.

  • 什么是selectoror模型.....

我们回到长连接,假设现在我只有一个进程,这个进程里面,有很多个线程(不同的线程是可以利用多核的).然后我们从中选一个线程当老板,这个boss负责干一件事情,就是不停的epoll长连接列表,问kernal有没有新消息,有没有新的连接,如果有,就会把这个连接相关信息给worker线程池,worker线程池一般是cpu的2倍.然后worker就去干活了.这个设计细想一下还是蛮牛逼的,既保证消息的及时收发(或者及时通知对方排队超时)也解决了个别任务导致消费线程的阻塞.

  • java线程的本质是什么,和进程究竟有什么关系,是怎么实现的.引用了一段经典的讲解.地址 https://blog.csdn.net/cai13070139328/article/details/99079045

内核线程(KLT,Kernel-Level Thread),直接由操作系统内核(Kernel,即内核)支持的线程。由内核来完成线程切换,内核通过操纵调度器(Scheduler)对线程进行调度,并负责将线程的任务映射到各个处理器上。每个内核线程可以视为内核的一个分身,这样操作系统就有能力同时处理多件事情,支持多线程的内核叫做多线程内核(Multi-Threads Kernel)。
 

  • 扯了这么多,从基础原理哔哔了很多,理论部分结束了,现在我们来讲讲netty的代码部分.

首先你得建一个maven项目,然后开始搞pom

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.5.RELEASE</version>
</parent>
<groupId>org.example</groupId>
<artifactId>netty</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
    <module>part1</module>
    <module>part2</module>
    <module>boot-netty</module>
</modules>
<properties>
    <swagger.version>2.6.1</swagger.version>
</properties>
<dependencies>
    <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.60.Final</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.16.10</version>
    </dependency>
    <dependency>
        <groupId>io.springfox</groupId>
        <artifactId>springfox-swagger2</artifactId>
        <version>${swagger.version}</version>
    </dependency>
    <dependency>
        <groupId>io.springfox</groupId>
        <artifactId>springfox-swagger-ui</artifactId>
        <version>${swagger.version}</version>
    </dependency>
    <!--引入基础控件-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <version>2.1.5.RELEASE</version>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <!-- actuator-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
    </dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-slf4j-impl</artifactId>
    <version>2.9.1</version>
</dependency>
<!--json-->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.32</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.15.6</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-tx</artifactId>
    <version>5.1.13.RELEASE</version>
</dependency>
</dependencies>
我建议你去下个代码包.....我没要太多积分...很值得

然后建一个类叫做nettyServer

public void init() {
    log.info("开始启动 NettyServer");
    //创建两个线程
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    //默认给予cpu数量的2倍
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    //创建一个辅助类(工厂)
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)//设置为nio
            .option(ChannelOption.SO_BACKLOG, 1024)//BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。
            .option(ChannelOption.SO_SNDBUF, 32 * 1024)//定义接收或者传输的系统缓冲区buf的大小,
            .option(ChannelOption.SO_RCVBUF, 32 * 1024)//定义接收或者传输的系统缓冲区buf的大小,
            .option(ChannelOption.SO_KEEPALIVE, true) //当设置为true的时候,TCP会实现监控连接是否有效,当连接处于空闲状态的时候,超过了2个小时,本地的TCP实现会发送一个数据包给远程的 socket,如果远程没有发回响应,TCP会持续尝试11分钟,知道响应为止,如果在12分钟的时候还没响应,TCP尝试关闭socket连接。
            .childHandler( new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel channel) throws Exception {
                    //设置通过\n结尾
                   // StickyBagPolicy.lineBasedFrameDecoder(socketChannel,1024);
                    //设置
                    channel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                    channel.pipeline().addLast(new ProtobufDecoder(RequestModule.Request.getDefaultInstance()));
                    channel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                    channel.pipeline().addLast(new ProtobufEncoder());
                    channel.pipeline().addLast(new ServerHandler());
                }
            });
    try {
        ChannelFuture cf = b.bind(dsNettyEntity.getPort()).sync();     //等待服务器监听端口关闭
        log.info("Server Start .. ");
        cf.channel().closeFuture().sync();
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();

    } catch (InterruptedException e) {
        e.printStackTrace();
    }

}
  • 这里我们讲解一下这个过程, 1.创建boss和worker, 2.创建一个辅助类,有点类似工厂(或者说spring的context).3.有了老板和工厂我们需要生产线channel,channel里面有个核心的东西叫做pipeline(管道),4启动整个工厂.就这么个过程......

这里我就不都说了,后面我会去看看源码然后再比比一通

  • 需要注意的是粘包

这里我们要了解一个事情,为啥子要搞个protobuf,这就不得不提另外一个概念: 粘包和拆包.原因网上百度一堆一堆的,简单来说就是http或者https是一个基于tcp的短协议(这是http和tcp的关系,简单来说tcp是规定了三次握手四次挥手的流程,而http和ssh是规定了双方每次挥手握手传输数据的内容格式.https://blog.csdn.net/weixin_40784198/article/details/81434530).有着完整的握手挥手,请求之间不会有任何关联.

而websocket协议(https://blog.csdn.net/weixin_40155271/article/details/80869542)按照文档说法是相当于用http协议创建了链接然后保持连接,双工维护(client和server都保存一份).其实本质呢就是基于http的api开发,通过http的api调用了tcp.包含两部分:一部分是“握手”,一部分是“数据传输”。握手成功后,数据就直接从 TCP 通道传输,与 HTTP 无关了.也不知道为什么不直接调用tcp,估计是为了偷懒,毕竟http协议还是蛮完善的.......刚才看了一下大家比较公认的方法websocket最早是为了丰富浏览器功能.....所以基于http开发简单,成本低.........

我们绕回来看,因为相当于把若干个tcp的传输部分捏在一起了.那么在传输过程中就可能造成包的黏连.比如框架,内核,网络交换机任何一个地方都可能帮你把包给重组了(只是可能).这个就是粘包,你想传的是110和111, 他接到的可能就是1101和11或者是110111....

为了避免这种情况业界有几种常用的做法,netty也提供了相应的api.

1.规定每个包的长度,然后不足的就用空格补齐.

sc.pipeline().addLast(new FixedLengthFrameDecoder(num));

2.规定用某个字符结尾

sc.pipeline().addLast(new LineBasedFrameDecoder(maxLength)); 通过换行符,即\\n或者\\r\\n对数据进行处理;
sc.pipeline().addLast(new DelimiterBasedFrameDecoder(maxFrameLength, Unpooled.wrappedBuffer(delimiter.getBytes()))); delimiter是个自定义的类似###之类的

3.通过在data前面加个字符长度的描述

// 进行长度字段解码,这里也会对数据进行粘包和拆包处理
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip));
// LengthFieldPrepender是一个编码器,主要是在响应字节数据前面添加字节长度字段
ch.pipeline().addLast(new LengthFieldPrepender(lengthFieldLength));
* 解码前 12 hello world! -->hello world!
* 编码钱 hello world!--> 编码后12 hello world!

通过以上的三种方式就可以解决粘包的问题了, 所以如果只是单纯的想解决粘包,不用protobuf也行.

  • 刚才我们说了粘包,再说说另外一个问题就是序列化,这里用的是protobuf

首先说为啥序列化, 不用protobuf不也传输的挺好吗.

这里首先强调一点, 不论你用什么代码最终一定是序列化的, 因为网络传输只能传输0和1,而这个过程就是序列化,https://zhuanlan.zhihu.com/p/40462507.简单来说对于java存的都是本地地址,你想给其他的地(同机器或者是跨机器)一些信息,就必须以字节流的形式传输出去,对方接收到字节流以后按照双方约定好的方式解析.比如context-type:json......而protobuf也是一种协议,而且支持多种语言.

在默认的情况下java用的是自带序列化机制,所谓的自带序列化就是java的基础类型,比如一个int是32位,那么你写1,他在转化成2进制数组的时候记录的也00.....1,不足的位置用0补齐.而protobuf不一样,他写的是11.第一个1代表着长度,第二个代表着内容.也就是每一个信息都是用长度+内容的形式.然后把不需要的占位去掉.包体就会小了.

刚才说的protobuf的优点,那这玩意优缺点吗.....不胜枚举....简单来说就是麻烦.

  • 现在我们看一下怎么用protobuf

首先去官网:

https://github.com/protocolbuffers/protobuf/releases
1.protoc-3.15.6-win64 解压
2.然后把bin下面的exe放到和项目pom平级
3.创建一个build.bat,细节参照demo 放到 pom平级
4,参照官方文档 https://developers.google.cn/protocol-buffers/docs/javatutorial
syntax = "proto3";

message SearchRequest {
  string query = 1;
  int32 page_number = 2;
  int32 result_per_page = 3;
}
创建文件user.proto

5.打开文件夹,运行.bat,将会生成对应的UserModule文件,将文件rm 到对应的位置

这是.bat的内容

protoc ./proto/response.proto --java_out=./src/main/java/proto

pause

.proto的内容

syntax = "proto3";

option java_package="com.daishu.netty.proto";
option java_outer_classname="RequestModule";

message Request {
  string id = 1;
  string from=2;
  string to=3;
  string type=4;
  string data = 5;
}

这个是我找到的最简单的方式了!!!!

  • 现在我们看一下完整的代码
package com.daishu.netty.scoket;

import com.daishu.netty.NettyApplication;
import com.daishu.netty.base.DsNettyEntity;
import com.daishu.netty.base.DsOrdered;
import com.daishu.netty.base.StickyBagPolicy;
import com.daishu.netty.client.ClientHandler;
import com.daishu.netty.client.NettyClient;
import com.daishu.netty.proto.RequestModule;
import com.daishu.netty.proto.ResponseModule;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiParam;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Api("netty的server类")
@Component
@Order(DsOrdered.NettyServer)
public class NettyServer extends Thread {
    private static Log log = LogFactory.getLog(NettyServer.class);

    @PostConstruct
    public void postConstruct(){
        log.info("开始启动NettyServer:postConstruct:执行serverStart");
        new NettyServer(dsNettyEntity).start();
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("开始启动NettyServer:postConstruct:执行clientStart");
        new NettyClient(dsNettyEntity).start();
    }

    public NettyServer() {
    }

    public NettyServer(DsNettyEntity dsNettyEntity) {
        this.dsNettyEntity = dsNettyEntity;
    }

    public DsNettyEntity getDsNettyEntity() {
        return dsNettyEntity;
    }

    public void setDsNettyEntity(DsNettyEntity dsNettyEntity) {
        this.dsNettyEntity = dsNettyEntity;
    }

    @Autowired
    private DsNettyEntity dsNettyEntity;

    @Override
    public void run() {
        init();
    }

    public void init() {
        log.info("开始启动 NettyServer");
        //创建两个线程
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        //默认给予cpu数量的2倍
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        //创建一个辅助类(工厂)
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)//设置为nio
                .option(ChannelOption.SO_BACKLOG, 1024)//BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。
                .option(ChannelOption.SO_SNDBUF, 32 * 1024)//定义接收或者传输的系统缓冲区buf的大小,
                .option(ChannelOption.SO_RCVBUF, 32 * 1024)//定义接收或者传输的系统缓冲区buf的大小,
                .option(ChannelOption.SO_KEEPALIVE, true) //当设置为true的时候,TCP会实现监控连接是否有效,当连接处于空闲状态的时候,超过了2个小时,本地的TCP实现会发送一个数据包给远程的 socket,如果远程没有发回响应,TCP会持续尝试11分钟,知道响应为止,如果在12分钟的时候还没响应,TCP尝试关闭socket连接。
                .childHandler( new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel channel) throws Exception {
                        //设置通过\n结尾
                       // StickyBagPolicy.lineBasedFrameDecoder(socketChannel,1024);
                        //设置
                        channel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                        channel.pipeline().addLast(new ProtobufDecoder(RequestModule.Request.getDefaultInstance()));
                        channel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                        channel.pipeline().addLast(new ProtobufEncoder());
                        channel.pipeline().addLast(new ServerHandler());
                    }
                });
        try {
            ChannelFuture cf = b.bind(dsNettyEntity.getPort()).sync();     //等待服务器监听端口关闭
            log.info("Server Start .. ");
            cf.channel().closeFuture().sync();
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();

        } catch (InterruptedException e) {
            e.printStackTrace();
        }


    }

}
package com.daishu.netty.scoket;

import com.daishu.netty.proto.RequestModule;
import com.daishu.netty.proto.ResponseModule;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiParam;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

@Api("https://blog.csdn.net/qq_16438883/article/details/103456346?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.control&dist_request_id=&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.control")
public class ServerHandler extends ChannelInboundHandlerAdapter {
    private static Log log = LogFactory.getLog(ServerHandler.class);

    public ServerHandler() {
        super();
    }

    @Override
    @ApiParam("通道注册")
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelRegistered 通道注册");
        super.channelRegistered(ctx);

    }

    @Override
    @ApiParam("通道注销")
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelUnregistered 通道注销");
        super.channelUnregistered(ctx);

    }

    @Override
    @ApiParam("通道激活 通道激活时触发,当客户端connect成功后,服务端就会接收到这个事件,从而可以把客户端的Channel记录下来,供后面复用")
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelActive 通道激活");
        super.channelActive(ctx);

    }

    @Override
    @ApiParam("channel 不活跃,断开连接,待定https://www.jianshu.com/p/903498747f47 ")
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelInactive channel 不活跃");
        super.channelInactive(ctx);

    }

    @Override
    @ApiParam("io读取")
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("channelRead io读取");
        //super.channelRead(ctx, msg);
        RequestModule.Request request = (RequestModule.Request)msg;
        log.info("服务端:" + request.getId() + "," + request.getFrom() + "," + request.getData());
        ctx.writeAndFlush(createResponse(request.getId(), request.getFrom()));
    }
    private ResponseModule.Response createResponse(String id, String from) {
        return ResponseModule.Response.newBuilder()
                .setId(id)
                .setCode(0)
                .setDesc("响应报文")
                .build();
    }
    @Override
    @ApiParam("io读取结束")
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        log.info("io读取结束 channelReadComplete");
        super.channelReadComplete(ctx);

    }

    @Override
    @ApiParam("channel 用户事件触发")
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        log.info("userEventTriggered channel 用户事件触发");
        super.userEventTriggered(ctx, evt);

    }

    @Override
    @ApiParam("channel 可写更改")
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        log.info("channelWritabilityChanged channel 可写更改");

        super.channelWritabilityChanged(ctx);

    }

    @Override
    @ApiParam("channel 捕获到异常了,关闭了")
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("exceptionCaught channel 捕获到异常了,关闭了");
        super.exceptionCaught(ctx, cause);
    }

    @Override
    protected void ensureNotSharable() {
        log.info("ensureNotSharable ");

        super.ensureNotSharable();

    }

    @Override
    public boolean isSharable() {
        log.info("isSharable ");
        return super.isSharable();
    }

    @Override
    @ApiParam("channel 助手类添加")
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("handlerAdded channel 助手类添加");
        super.handlerAdded(ctx);
    }

    @Override
    @ApiParam("channel 助手类移除")
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info("handlerRemoved 助手类移除");
        super.handlerRemoved(ctx);
    }
}
package com.daishu.netty.client;

import com.daishu.netty.NettyApplication;
import com.daishu.netty.proto.ResponseModule;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiParam;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

@Api("https://blog.csdn.net/qq_16438883/article/details/103456346?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.control&dist_request_id=&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.control")
public class ClientHandler extends ChannelInboundHandlerAdapter {
    private static Log log = LogFactory.getLog(ClientHandler.class);

    public ClientHandler() {
        super();
    }

    @Override
    @ApiParam("通道注册")
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelRegistered 通道注册");
        super.channelRegistered(ctx);

    }

    @Override
    @ApiParam("通道注销")
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelUnregistered 通道注销");
        super.channelUnregistered(ctx);

    }

    @Override
    @ApiParam("通道激活 通道激活时触发,当客户端connect成功后,服务端就会接收到这个事件,从而可以把客户端的Channel记录下来,供后面复用")
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelActive 通道激活");
        super.channelActive(ctx);

    }

    @Override
    @ApiParam("channel 不活跃,断开连接,待定https://www.jianshu.com/p/903498747f47 ")
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelInactive channel 不活跃");
        super.channelInactive(ctx);

    }

    @Override
    @ApiParam("io读取")
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("channelRead io读取");
        //super.channelRead(ctx, msg);
        try {
            ResponseModule.Response response = (ResponseModule.Response) msg;
            log.info("客户端:" + response.getId() + "," + response.getCode() + "," + response.getDesc());
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    @ApiParam("io读取结束")
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        log.info("io读取结束 channelReadComplete");
        super.channelReadComplete(ctx);

    }

    @Override
    @ApiParam("channel 用户事件触发")
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        log.info("userEventTriggered channel 用户事件触发");
        super.userEventTriggered(ctx, evt);

    }

    @Override
    @ApiParam("channel 可写更改")
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        log.info("channelWritabilityChanged channel 可写更改");

        super.channelWritabilityChanged(ctx);

    }

    @Override
    @ApiParam("channel 捕获到异常了,关闭了")
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("exceptionCaught channel 捕获到异常了,关闭了");
        super.exceptionCaught(ctx, cause);
    }

    @Override
    protected void ensureNotSharable() {
        log.info("ensureNotSharable ");

        super.ensureNotSharable();

    }

    @Override
    public boolean isSharable() {
        log.info("isSharable ");
        return super.isSharable();
    }

    @Override
    @ApiParam("channel 助手类添加")
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("handlerAdded channel 助手类添加");
        super.handlerAdded(ctx);
    }

    @Override
    @ApiParam("channel 助手类移除")
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info("handlerRemoved 助手类移除");
        super.handlerRemoved(ctx);
    }
}
package com.daishu.netty.client;

import com.daishu.netty.NettyApplication;
import com.daishu.netty.base.DsNettyEntity;
import com.daishu.netty.base.DsOrdered;
import com.daishu.netty.proto.RequestModule;
import com.daishu.netty.proto.ResponseModule;
import com.daishu.netty.scoket.NettyServer;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiParam;
import lombok.Data;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;
import java.util.UUID;

@Component
@RestController
@RequestMapping("nettyClient")
@Api("客户端")
@Order(DsOrdered.NettyClient)
public class NettyClient extends Thread {
    private static Log log = LogFactory.getLog(NettyApplication.class);

    public NettyClient() {
    }

    public NettyClient(DsNettyEntity dsNettyEntity) {
        this.dsNettyEntity = dsNettyEntity;
    }

    @Autowired
    private DsNettyEntity dsNettyEntity;

    @ApiParam("通道,客户端本身就是单例模式")
    private static ChannelFuture channelFuture;


    @Override
    public void run()  {
        log.info("client");
        init();
    }




    @ApiParam("clientDemo 启动client")
    public void init() {
        log.info("开始启动client");
        EventLoopGroup workerGroup = new NioEventLoopGroup(1);
        Bootstrap b = new Bootstrap();
        b.group(workerGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        //设置序列化头
                        channel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                        channel.pipeline().addLast(new ProtobufDecoder(ResponseModule.Response.getDefaultInstance()));
                        channel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                        channel.pipeline().addLast(new ProtobufEncoder());
                        channel.pipeline().addLast(new ClientHandler());
                    }
                });

        try {
            ChannelFuture cf = b.connect(dsNettyEntity.getHost(), dsNettyEntity.getPort()).sync();
            //将本次的链接保持
            this.channelFuture = cf;
           /* cf.channel().writeAndFlush(Unpooled.wrappedBuffer("aaaaabbbbb".getBytes()));
            Thread.sleep(2000);

            cf.channel().writeAndFlush(Unpooled.copiedBuffer("ccccccc".getBytes()));*/
            log.info("Client Start .. ");

            //等待客户端端口关闭
            cf.channel().closeFuture().sync();
            workerGroup.shutdownGracefully();
            this.channelFuture = null;
        } catch (InterruptedException e) {
            e.printStackTrace();
            log.error("initClient异常:",e);
            this.channelFuture = null;
        }
    }
    @RequestMapping("writeAndFlushSent")
    @ApiParam("发送消息")
    public Object writeAndFlushSent(String message){
        RequestModule.Request.Builder builder=RequestModule.Request.newBuilder();
        builder.setId(UUID.randomUUID().toString()).setData(message);
        RequestModule.Request request=builder.build();
        this.channelFuture.channel().writeAndFlush(request);
        return "Success";
    }


}
  • netty整合boot的关键
@PostConstruct
public void postConstruct(){
    log.info("开始启动NettyServer:postConstruct:执行serverStart");
    new NettyServer(dsNettyEntity).start();
    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    log.info("开始启动NettyServer:postConstruct:执行clientStart");
    new NettyClient(dsNettyEntity).start();
}

因为spring是单例模式,启动器也都是单线程模式,而netty呢都是阻塞性线程(需要不停地epoll).所以一定是用线程的模式单独启动,那怎么和springboot挂钩呢,可以等ioc初始化完全然后用构造参数的方式把需要的bean传进去,甚至是整个ioc的application传进去.然后用application获取bean的方式拿到bean调用.

-------------------------------------------

如果大家有问题欢迎留言提问,还有就是一定要下源码,然后会简单很多.至于类似于QQ,网页聊天室之类的功能,我这里没有做,因为需要前端整合js的protobuf.然后进行通信.

 

 

 

 

上一篇:Python之转化成 PB 格式数据存储格式


下一篇:protobuf和flask结合高效数据传输