所有代码都已经上传到了gitee上,地址https://download.csdn.net/download/habazhu1110/16105832.主要为了赚点积分,但是肯定物超所值.
- 前言:
工具:本文使用的是idea开发但是用eclipse也行,
技术: springboot用的是2.1.5, netty是最新的4.1.60.Final,protobuf用的是最新的3,但是每个部分都是独立的,
本文尽可能的阐述netty和protobuf的原理,本质以及搭建过程
- 为什么使用netty,什么是nio和bio,什么是netty的0copy机制.
如果你急于撸代码可以跳过这一段,因为这段记下来本来就是对自己知识的总结和沉淀,如果形容起来就是厚积而薄发.......
首先讲两个结论:
.NIO比BIO牛逼
.netty是nio, 但是之所以是nio是因为linux提供了nio的支持.
听起来有点绕,我们捋一下,假设你买了一台电脑回来,这个时候是个裸机(只有硬件).你只能登录主板的BIOS设置.这个时候你需要干的事情只能是一个,
(1)安装linux系统(主要是windows的我不会,他那个好像是AIO具体机制没学过,而且内核机制也不一样). 安装完了以后你得电脑就不一样了,这个时候linux的程序放到了硬盘(也可能是你得固态)
(2)电脑重启,这个时候cpu开始执行引导区,加载指令,linux的启动指令就在里面,linux开始启动. 启动以后会在内存上开辟一块独立的地址(这个圈起来要考的)......他叫做kernal.也就是我们说的系统内核.
kernal是所有软件和硬件的桥梁.处于对硬件的保护(用户隐私).任何一个软件想操作硬件或者想操作其他软件,必须通过kernal. 举个简单的例子,如果你想把qq的内容复制粘贴到一个txt文件,首先txt需要向kernal发送一个复制请求(也可以是系统发出),然后qq把数据从他的内存地址复制到kernal(每个软件都跟kernal一样有自己的内存区), 再由kernal复制给txt. 所以同样一句话在内存中存了3遍.qq,kernal,txt. 为什么会这样呢!!!!因为安全,如果qq不提供复制的接口,kernal就无法调用qq.就会对txt系统说,滚.......如果txt能够绕过这个kernal检查,好吧!!!!你写了个病毒.
(3)到这里我们明白了两件事情: linux系统kernal是个内存桥梁,链接所有的软件区(每个软件会有自己的区域,并且不知道其他软件在哪)和所有的硬件.每一次操作都是数据从一个内存区复制到另外一个或者复制到硬件,也就是所谓的io开销.一般我们说的io其实指的就是kernal和其他系统间的复制.
- 知道了以上内容我们就可以了解一下BIO和NIO了.
在原始的linux系统中(很老),kernal只提供了write和read两个方法, 什么意思呢, 假设我们现在在linux上启动了一段java代码. 此时内存就有了kernal区和jvm区. 现在有个请求通过网卡请求通过socket传递一个数据给我们,(长连接的实现其实就是不停的去读指定的内存区域).首先数据从网线写入网卡的内存区(硬件在网卡上),网卡调用kernal的写方法, 如果这个时候有个线程进来查询这个区块的状态,他是个写入状态,当写入完成后会变成可读状态,此时才可读.那么问题来了,假设我现在有一万个人和我建立了长连接, 我读第一个人, 可读但是无内容...跳过,第二个人,可读有内容,然后第三个....这个货在写,,也就是正在网卡往kernal复制呢......后面九千多个人心态崩了啊.....于是决定利用linx的时间片规则,调用linux的fork()再起一个进程(是进程...)然后直接找cpu去了....这就是BIO, 正常玩我就排队,一万个人排一排,一旦阻塞就马上开个进程.最多变成了一万排.....每个进程都有自己内存地址,cpu自己有一张进程指令表,这哥们就得对着表在内存上来回反复的横跳........
NIO呢,首先因为linux升级了,linux除了write和read还提供了epoll, 这玩意和read的区别在于read是重操作,你调用了就只能等.......他准备好了在返回给你,不拔电源就别想停.而epoll是个询问,问他,你准备好了么,马上返回.你可以决定自己是读还是不读.猛一看这玩意不咋地,原来一个read现在变成了epoll+read费事了啊.实际上呢.....我们还是一万个人站一队,然后我么开始第一个人,可读但是无内容...跳过,第二个人,可读有内容,然后第三个....这个货在写,,....跳过.....对,在这跳过了,然后下一个人..........然后到了队尾再从头来一次.猛一想是那么回事,牛逼了,仔细一想不对啊,干活的就是cpu对吧,不管咋地,我就是站一起不站一起差毛线啊.........我们可以对比一下
BIO: 一万个链接开启一万个进程, 然后cpu维护指令表,根据指令表循环执行
NIO:一万个链接交给netty处理,netty将链接放入线程池,默认用cpu*2的线程数,然后当cpu执行netty的时候,netty自己把这一万个人从头执行到尾(你要知道这个list内存物理地址是连续的),
我们假设linux只有这么一个进程,那么netty的cpu其实在一直执行,而BIO的cpu需要执行一下然后看一下指令表,所以性能要差很多.......
- 我们再来说一下netty的零copy https://www.jianshu.com/p/0ab705ae4f2d 这里有内容的出处,总之所谓的零拷贝就是降低了kernal和jvm的对象复制.降低了io操作
这时候我们回顾一下如果将一个qq的内容复制到txt文件里面过程, qq内存区-->kernal区-->txt内存区. 这就是两次copy过程.所以所谓的copy其实就是在内存的复制,主要就是kernal的复制.
我们看一下netty怎么零copy的.......其实还是得靠linux内核的api升级.
这次升级linux提供
sentfile()方法: 已有文件(硬件如硬盘,网卡,显卡),里面有DMA引擎直接就把数据放到kernal然后给网卡了.不经过JVM. 通过Java的FileChannel.transferTo()方法
mmap()方法: 就是再JVM创建的通过调用linux的mmap()然后传递内核和JVM地址的一个映射关系,适合于read和文件内容的改.但是给网卡时候还是需要copy进kernal(为了安全);
direct Buffer缓冲区:Netty的接收和发送ByteBuffer采用直接缓冲区(Direct Buffer)实现零拷贝,我得理解就是不往JVM写直接推给了内核.
- 什么是selectoror模型.....
我们回到长连接,假设现在我只有一个进程,这个进程里面,有很多个线程(不同的线程是可以利用多核的).然后我们从中选一个线程当老板,这个boss负责干一件事情,就是不停的epoll长连接列表,问kernal有没有新消息,有没有新的连接,如果有,就会把这个连接相关信息给worker线程池,worker线程池一般是cpu的2倍.然后worker就去干活了.这个设计细想一下还是蛮牛逼的,既保证消息的及时收发(或者及时通知对方排队超时)也解决了个别任务导致消费线程的阻塞.
- java线程的本质是什么,和进程究竟有什么关系,是怎么实现的.引用了一段经典的讲解.地址 https://blog.csdn.net/cai13070139328/article/details/99079045
内核线程(KLT,Kernel-Level Thread),直接由操作系统内核(Kernel,即内核)支持的线程。由内核来完成线程切换,内核通过操纵调度器(Scheduler)对线程进行调度,并负责将线程的任务映射到各个处理器上。每个内核线程可以视为内核的一个分身,这样操作系统就有能力同时处理多件事情,支持多线程的内核叫做多线程内核(Multi-Threads Kernel)。
- 扯了这么多,从基础原理哔哔了很多,理论部分结束了,现在我们来讲讲netty的代码部分.
首先你得建一个maven项目,然后开始搞pom
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
</parent>
<groupId>org.example</groupId>
<artifactId>netty</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>part1</module>
<module>part2</module>
<module>boot-netty</module>
</modules>
<properties>
<swagger.version>2.6.1</swagger.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.60.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>${swagger.version}</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>${swagger.version}</version>
</dependency>
<!--引入基础控件-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.1.5.RELEASE</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- actuator-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.9.1</version>
</dependency>
<!--json-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.32</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.15.6</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>5.1.13.RELEASE</version>
</dependency>
</dependencies>
我建议你去下个代码包.....我没要太多积分...很值得
然后建一个类叫做nettyServer
public void init() {
log.info("开始启动 NettyServer");
//创建两个线程
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//默认给予cpu数量的2倍
EventLoopGroup workerGroup = new NioEventLoopGroup();
//创建一个辅助类(工厂)
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)//设置为nio
.option(ChannelOption.SO_BACKLOG, 1024)//BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。
.option(ChannelOption.SO_SNDBUF, 32 * 1024)//定义接收或者传输的系统缓冲区buf的大小,
.option(ChannelOption.SO_RCVBUF, 32 * 1024)//定义接收或者传输的系统缓冲区buf的大小,
.option(ChannelOption.SO_KEEPALIVE, true) //当设置为true的时候,TCP会实现监控连接是否有效,当连接处于空闲状态的时候,超过了2个小时,本地的TCP实现会发送一个数据包给远程的 socket,如果远程没有发回响应,TCP会持续尝试11分钟,知道响应为止,如果在12分钟的时候还没响应,TCP尝试关闭socket连接。
.childHandler( new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel channel) throws Exception {
//设置通过\n结尾
// StickyBagPolicy.lineBasedFrameDecoder(socketChannel,1024);
//设置
channel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
channel.pipeline().addLast(new ProtobufDecoder(RequestModule.Request.getDefaultInstance()));
channel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
channel.pipeline().addLast(new ProtobufEncoder());
channel.pipeline().addLast(new ServerHandler());
}
});
try {
ChannelFuture cf = b.bind(dsNettyEntity.getPort()).sync(); //等待服务器监听端口关闭
log.info("Server Start .. ");
cf.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
- 这里我们讲解一下这个过程, 1.创建boss和worker, 2.创建一个辅助类,有点类似工厂(或者说spring的context).3.有了老板和工厂我们需要生产线channel,channel里面有个核心的东西叫做pipeline(管道),4启动整个工厂.就这么个过程......
这里我就不都说了,后面我会去看看源码然后再比比一通
- 需要注意的是粘包
这里我们要了解一个事情,为啥子要搞个protobuf,这就不得不提另外一个概念: 粘包和拆包.原因网上百度一堆一堆的,简单来说就是http或者https是一个基于tcp的短协议(这是http和tcp的关系,简单来说tcp是规定了三次握手四次挥手的流程,而http和ssh是规定了双方每次挥手握手传输数据的内容格式.https://blog.csdn.net/weixin_40784198/article/details/81434530).有着完整的握手挥手,请求之间不会有任何关联.
而websocket协议(https://blog.csdn.net/weixin_40155271/article/details/80869542)按照文档说法是相当于用http协议创建了链接然后保持连接,双工维护(client和server都保存一份).其实本质呢就是基于http的api开发,通过http的api调用了tcp.包含两部分:一部分是“握手”,一部分是“数据传输”。握手成功后,数据就直接从 TCP 通道传输,与 HTTP 无关了.也不知道为什么不直接调用tcp,估计是为了偷懒,毕竟http协议还是蛮完善的.......刚才看了一下大家比较公认的方法websocket最早是为了丰富浏览器功能.....所以基于http开发简单,成本低.........
我们绕回来看,因为相当于把若干个tcp的传输部分捏在一起了.那么在传输过程中就可能造成包的黏连.比如框架,内核,网络交换机任何一个地方都可能帮你把包给重组了(只是可能).这个就是粘包,你想传的是110和111, 他接到的可能就是1101和11或者是110111....
为了避免这种情况业界有几种常用的做法,netty也提供了相应的api.
1.规定每个包的长度,然后不足的就用空格补齐.
sc.pipeline().addLast(new FixedLengthFrameDecoder(num));
2.规定用某个字符结尾
sc.pipeline().addLast(new LineBasedFrameDecoder(maxLength)); 通过换行符,即\\n或者\\r\\n对数据进行处理;
sc.pipeline().addLast(new DelimiterBasedFrameDecoder(maxFrameLength, Unpooled.wrappedBuffer(delimiter.getBytes()))); delimiter是个自定义的类似###之类的
3.通过在data前面加个字符长度的描述
// 进行长度字段解码,这里也会对数据进行粘包和拆包处理
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip));
// LengthFieldPrepender是一个编码器,主要是在响应字节数据前面添加字节长度字段
ch.pipeline().addLast(new LengthFieldPrepender(lengthFieldLength));
* 解码前 12 hello world! -->hello world!
* 编码钱 hello world!--> 编码后12 hello world!
通过以上的三种方式就可以解决粘包的问题了, 所以如果只是单纯的想解决粘包,不用protobuf也行.
- 刚才我们说了粘包,再说说另外一个问题就是序列化,这里用的是protobuf
首先说为啥序列化, 不用protobuf不也传输的挺好吗.
这里首先强调一点, 不论你用什么代码最终一定是序列化的, 因为网络传输只能传输0和1,而这个过程就是序列化,https://zhuanlan.zhihu.com/p/40462507.简单来说对于java存的都是本地地址,你想给其他的地(同机器或者是跨机器)一些信息,就必须以字节流的形式传输出去,对方接收到字节流以后按照双方约定好的方式解析.比如context-type:json......而protobuf也是一种协议,而且支持多种语言.
在默认的情况下java用的是自带序列化机制,所谓的自带序列化就是java的基础类型,比如一个int是32位,那么你写1,他在转化成2进制数组的时候记录的也00.....1,不足的位置用0补齐.而protobuf不一样,他写的是11.第一个1代表着长度,第二个代表着内容.也就是每一个信息都是用长度+内容的形式.然后把不需要的占位去掉.包体就会小了.
刚才说的protobuf的优点,那这玩意优缺点吗.....不胜枚举....简单来说就是麻烦.
- 现在我们看一下怎么用protobuf
首先去官网:
https://github.com/protocolbuffers/protobuf/releases
1.protoc-3.15.6-win64 解压 2.然后把bin下面的exe放到和项目pom平级 3.创建一个build.bat,细节参照demo 放到 pom平级 4,参照官方文档 https://developers.google.cn/protocol-buffers/docs/javatutorial syntax = "proto3"; message SearchRequest { string query = 1; int32 page_number = 2; int32 result_per_page = 3; } 创建文件user.proto 5.打开文件夹,运行.bat,将会生成对应的UserModule文件,将文件rm 到对应的位置
这是.bat的内容
protoc ./proto/response.proto --java_out=./src/main/java/proto pause
.proto的内容
syntax = "proto3"; option java_package="com.daishu.netty.proto"; option java_outer_classname="RequestModule"; message Request { string id = 1; string from=2; string to=3; string type=4; string data = 5; }
这个是我找到的最简单的方式了!!!!
- 现在我们看一下完整的代码
package com.daishu.netty.scoket; import com.daishu.netty.NettyApplication; import com.daishu.netty.base.DsNettyEntity; import com.daishu.netty.base.DsOrdered; import com.daishu.netty.base.StickyBagPolicy; import com.daishu.netty.client.ClientHandler; import com.daishu.netty.client.NettyClient; import com.daishu.netty.proto.RequestModule; import com.daishu.netty.proto.ResponseModule; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; import io.swagger.annotations.Api; import io.swagger.annotations.ApiParam; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Api("netty的server类") @Component @Order(DsOrdered.NettyServer) public class NettyServer extends Thread { private static Log log = LogFactory.getLog(NettyServer.class); @PostConstruct public void postConstruct(){ log.info("开始启动NettyServer:postConstruct:执行serverStart"); new NettyServer(dsNettyEntity).start(); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("开始启动NettyServer:postConstruct:执行clientStart"); new NettyClient(dsNettyEntity).start(); } public NettyServer() { } public NettyServer(DsNettyEntity dsNettyEntity) { this.dsNettyEntity = dsNettyEntity; } public DsNettyEntity getDsNettyEntity() { return dsNettyEntity; } public void setDsNettyEntity(DsNettyEntity dsNettyEntity) { this.dsNettyEntity = dsNettyEntity; } @Autowired private DsNettyEntity dsNettyEntity; @Override public void run() { init(); } public void init() { log.info("开始启动 NettyServer"); //创建两个线程 EventLoopGroup bossGroup = new NioEventLoopGroup(1); //默认给予cpu数量的2倍 EventLoopGroup workerGroup = new NioEventLoopGroup(); //创建一个辅助类(工厂) ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class)//设置为nio .option(ChannelOption.SO_BACKLOG, 1024)//BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。 .option(ChannelOption.SO_SNDBUF, 32 * 1024)//定义接收或者传输的系统缓冲区buf的大小, .option(ChannelOption.SO_RCVBUF, 32 * 1024)//定义接收或者传输的系统缓冲区buf的大小, .option(ChannelOption.SO_KEEPALIVE, true) //当设置为true的时候,TCP会实现监控连接是否有效,当连接处于空闲状态的时候,超过了2个小时,本地的TCP实现会发送一个数据包给远程的 socket,如果远程没有发回响应,TCP会持续尝试11分钟,知道响应为止,如果在12分钟的时候还没响应,TCP尝试关闭socket连接。 .childHandler( new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel channel) throws Exception { //设置通过\n结尾 // StickyBagPolicy.lineBasedFrameDecoder(socketChannel,1024); //设置 channel.pipeline().addLast(new ProtobufVarint32FrameDecoder()); channel.pipeline().addLast(new ProtobufDecoder(RequestModule.Request.getDefaultInstance())); channel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); channel.pipeline().addLast(new ProtobufEncoder()); channel.pipeline().addLast(new ServerHandler()); } }); try { ChannelFuture cf = b.bind(dsNettyEntity.getPort()).sync(); //等待服务器监听端口关闭 log.info("Server Start .. "); cf.channel().closeFuture().sync(); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } catch (InterruptedException e) { e.printStackTrace(); } } }
package com.daishu.netty.scoket; import com.daishu.netty.proto.RequestModule; import com.daishu.netty.proto.ResponseModule; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.swagger.annotations.Api; import io.swagger.annotations.ApiParam; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @Api("https://blog.csdn.net/qq_16438883/article/details/103456346?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.control&dist_request_id=&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.control") public class ServerHandler extends ChannelInboundHandlerAdapter { private static Log log = LogFactory.getLog(ServerHandler.class); public ServerHandler() { super(); } @Override @ApiParam("通道注册") public void channelRegistered(ChannelHandlerContext ctx) throws Exception { log.info("channelRegistered 通道注册"); super.channelRegistered(ctx); } @Override @ApiParam("通道注销") public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { log.info("channelUnregistered 通道注销"); super.channelUnregistered(ctx); } @Override @ApiParam("通道激活 通道激活时触发,当客户端connect成功后,服务端就会接收到这个事件,从而可以把客户端的Channel记录下来,供后面复用") public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("channelActive 通道激活"); super.channelActive(ctx); } @Override @ApiParam("channel 不活跃,断开连接,待定https://www.jianshu.com/p/903498747f47 ") public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("channelInactive channel 不活跃"); super.channelInactive(ctx); } @Override @ApiParam("io读取") public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("channelRead io读取"); //super.channelRead(ctx, msg); RequestModule.Request request = (RequestModule.Request)msg; log.info("服务端:" + request.getId() + "," + request.getFrom() + "," + request.getData()); ctx.writeAndFlush(createResponse(request.getId(), request.getFrom())); } private ResponseModule.Response createResponse(String id, String from) { return ResponseModule.Response.newBuilder() .setId(id) .setCode(0) .setDesc("响应报文") .build(); } @Override @ApiParam("io读取结束") public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { log.info("io读取结束 channelReadComplete"); super.channelReadComplete(ctx); } @Override @ApiParam("channel 用户事件触发") public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { log.info("userEventTriggered channel 用户事件触发"); super.userEventTriggered(ctx, evt); } @Override @ApiParam("channel 可写更改") public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { log.info("channelWritabilityChanged channel 可写更改"); super.channelWritabilityChanged(ctx); } @Override @ApiParam("channel 捕获到异常了,关闭了") public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.info("exceptionCaught channel 捕获到异常了,关闭了"); super.exceptionCaught(ctx, cause); } @Override protected void ensureNotSharable() { log.info("ensureNotSharable "); super.ensureNotSharable(); } @Override public boolean isSharable() { log.info("isSharable "); return super.isSharable(); } @Override @ApiParam("channel 助手类添加") public void handlerAdded(ChannelHandlerContext ctx) throws Exception { log.info("handlerAdded channel 助手类添加"); super.handlerAdded(ctx); } @Override @ApiParam("channel 助手类移除") public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { log.info("handlerRemoved 助手类移除"); super.handlerRemoved(ctx); } }
package com.daishu.netty.client; import com.daishu.netty.NettyApplication; import com.daishu.netty.proto.ResponseModule; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; import io.swagger.annotations.Api; import io.swagger.annotations.ApiParam; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @Api("https://blog.csdn.net/qq_16438883/article/details/103456346?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.control&dist_request_id=&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.control") public class ClientHandler extends ChannelInboundHandlerAdapter { private static Log log = LogFactory.getLog(ClientHandler.class); public ClientHandler() { super(); } @Override @ApiParam("通道注册") public void channelRegistered(ChannelHandlerContext ctx) throws Exception { log.info("channelRegistered 通道注册"); super.channelRegistered(ctx); } @Override @ApiParam("通道注销") public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { log.info("channelUnregistered 通道注销"); super.channelUnregistered(ctx); } @Override @ApiParam("通道激活 通道激活时触发,当客户端connect成功后,服务端就会接收到这个事件,从而可以把客户端的Channel记录下来,供后面复用") public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("channelActive 通道激活"); super.channelActive(ctx); } @Override @ApiParam("channel 不活跃,断开连接,待定https://www.jianshu.com/p/903498747f47 ") public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("channelInactive channel 不活跃"); super.channelInactive(ctx); } @Override @ApiParam("io读取") public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("channelRead io读取"); //super.channelRead(ctx, msg); try { ResponseModule.Response response = (ResponseModule.Response) msg; log.info("客户端:" + response.getId() + "," + response.getCode() + "," + response.getDesc()); } finally { ReferenceCountUtil.release(msg); } } @Override @ApiParam("io读取结束") public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { log.info("io读取结束 channelReadComplete"); super.channelReadComplete(ctx); } @Override @ApiParam("channel 用户事件触发") public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { log.info("userEventTriggered channel 用户事件触发"); super.userEventTriggered(ctx, evt); } @Override @ApiParam("channel 可写更改") public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { log.info("channelWritabilityChanged channel 可写更改"); super.channelWritabilityChanged(ctx); } @Override @ApiParam("channel 捕获到异常了,关闭了") public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.info("exceptionCaught channel 捕获到异常了,关闭了"); super.exceptionCaught(ctx, cause); } @Override protected void ensureNotSharable() { log.info("ensureNotSharable "); super.ensureNotSharable(); } @Override public boolean isSharable() { log.info("isSharable "); return super.isSharable(); } @Override @ApiParam("channel 助手类添加") public void handlerAdded(ChannelHandlerContext ctx) throws Exception { log.info("handlerAdded channel 助手类添加"); super.handlerAdded(ctx); } @Override @ApiParam("channel 助手类移除") public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { log.info("handlerRemoved 助手类移除"); super.handlerRemoved(ctx); } }
package com.daishu.netty.client; import com.daishu.netty.NettyApplication; import com.daishu.netty.base.DsNettyEntity; import com.daishu.netty.base.DsOrdered; import com.daishu.netty.proto.RequestModule; import com.daishu.netty.proto.ResponseModule; import com.daishu.netty.scoket.NettyServer; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; import io.swagger.annotations.Api; import io.swagger.annotations.ApiParam; import lombok.Data; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.PostConstruct; import java.util.UUID; @Component @RestController @RequestMapping("nettyClient") @Api("客户端") @Order(DsOrdered.NettyClient) public class NettyClient extends Thread { private static Log log = LogFactory.getLog(NettyApplication.class); public NettyClient() { } public NettyClient(DsNettyEntity dsNettyEntity) { this.dsNettyEntity = dsNettyEntity; } @Autowired private DsNettyEntity dsNettyEntity; @ApiParam("通道,客户端本身就是单例模式") private static ChannelFuture channelFuture; @Override public void run() { log.info("client"); init(); } @ApiParam("clientDemo 启动client") public void init() { log.info("开始启动client"); EventLoopGroup workerGroup = new NioEventLoopGroup(1); Bootstrap b = new Bootstrap(); b.group(workerGroup) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { //设置序列化头 channel.pipeline().addLast(new ProtobufVarint32FrameDecoder()); channel.pipeline().addLast(new ProtobufDecoder(ResponseModule.Response.getDefaultInstance())); channel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); channel.pipeline().addLast(new ProtobufEncoder()); channel.pipeline().addLast(new ClientHandler()); } }); try { ChannelFuture cf = b.connect(dsNettyEntity.getHost(), dsNettyEntity.getPort()).sync(); //将本次的链接保持 this.channelFuture = cf; /* cf.channel().writeAndFlush(Unpooled.wrappedBuffer("aaaaabbbbb".getBytes())); Thread.sleep(2000); cf.channel().writeAndFlush(Unpooled.copiedBuffer("ccccccc".getBytes()));*/ log.info("Client Start .. "); //等待客户端端口关闭 cf.channel().closeFuture().sync(); workerGroup.shutdownGracefully(); this.channelFuture = null; } catch (InterruptedException e) { e.printStackTrace(); log.error("initClient异常:",e); this.channelFuture = null; } } @RequestMapping("writeAndFlushSent") @ApiParam("发送消息") public Object writeAndFlushSent(String message){ RequestModule.Request.Builder builder=RequestModule.Request.newBuilder(); builder.setId(UUID.randomUUID().toString()).setData(message); RequestModule.Request request=builder.build(); this.channelFuture.channel().writeAndFlush(request); return "Success"; } }
- netty整合boot的关键
@PostConstruct public void postConstruct(){ log.info("开始启动NettyServer:postConstruct:执行serverStart"); new NettyServer(dsNettyEntity).start(); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("开始启动NettyServer:postConstruct:执行clientStart"); new NettyClient(dsNettyEntity).start(); }
因为spring是单例模式,启动器也都是单线程模式,而netty呢都是阻塞性线程(需要不停地epoll).所以一定是用线程的模式单独启动,那怎么和springboot挂钩呢,可以等ioc初始化完全然后用构造参数的方式把需要的bean传进去,甚至是整个ioc的application传进去.然后用application获取bean的方式拿到bean调用.
-------------------------------------------
如果大家有问题欢迎留言提问,还有就是一定要下源码,然后会简单很多.至于类似于QQ,网页聊天室之类的功能,我这里没有做,因为需要前端整合js的protobuf.然后进行通信.