和上一篇相比,此项目的场景有所不同:需要采集机房的电表、温湿度、水浸和烟感的数据,首先通过通讯管理机先将数据直采,然后通过服务器采集程序采集通讯管理机上存储的数据并解析入库。
先说说这2个有什么不同,项目1中户外设备主动连接服务器,并定时发送报文,而此项目需要服务器去主动连接设备发送报文采集数据,仅仅需要开发netty的client端。
项目框架:springboot+netty+mybatis+lombok+logback
开发环境:idea2018+jdk1.8+mysql5.6.35+maven3.5.3
项目搭建:
1.快速搭建springboot项目,配置pom.xml文件依赖包
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.rtst</groupId> <artifactId>dhjclistener</artifactId> <version>0.0.1-SNAPSHOT</version> <name>dhjclistener</name> <packaging>jar</packaging> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.1.2</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <!-- netty依赖 springboot2.0自动导入版本--> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> </dependency> <!--邮件依赖包--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-mail</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <!--编写更少量的代码:使用apache commons工具类库: https://www.cnblogs.com/ITtangtang/p/3966955.html--> <!--apache.commons.lang3--> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <!--apache.codec:编码方法的工具类包 https://blog.****.net/u012881904/article/details/52767853--> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
2.自定义netty的client端BootNettyClient类
package com.rtst.dhjclistener.nettyclient; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import java.util.List; import java.util.concurrent.TimeUnit; @Component @Slf4j public class BootNettyClient { @Autowired BootNioChannelInitializer bootNioChannelInitializer; @Value("${netty.port}") private Integer port; @Value("#{‘${netty.host}‘.split(‘,‘)}") private List<String> hosts; private List<Channel> channels=null; private Bootstrap bootstrap; //定义线程组,处理读写和连接事件,没有了accept事件 private EventLoopGroup workGroup = new NioEventLoopGroup(); public void start() throws Exception { bootstrap = new Bootstrap(); bootstrap.group(workGroup); //绑定客户端通道 bootstrap.channel(NioSocketChannel.class); //给NioSocketChannel初始化handler,处理读写事件 bootstrap.handler(bootNioChannelInitializer); System.out.println("开始启动----"); for(int i =0;i<hosts.size();i++){ if(StringUtils.isEmpty(BootNettyClientHandler.ctxMap.get(hosts.get(i)))){ doConnect(hosts.get(i),port); }else{ continue; } } } //发起连接 protected void doConnect(String host,int port) { ChannelFuture future = bootstrap.connect(host, port); future.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture futureListener) throws Exception { if (futureListener.isSuccess()) { channels.add(futureListener.channel()); log.info(host+"Connect to server successfully!"); } else { log.info(host+"Failed to connect to server, try connect after 10s"); futureListener.channel().eventLoop().schedule(new Runnable() { @Override public void run() { log.info(host+"重新连接----"); doConnect(host,port);//递归doConnect方法,进行断线重连 } }, 10, TimeUnit.SECONDS); } } }); } }
3.自定义初始化类BootNioChannelInitializer类
package com.rtst.dhjclistener.nettyclient; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.handler.timeout.IdleStateHandler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; @Component public class BootNioChannelInitializer<SocketChannel> extends ChannelInitializer<Channel> { @Autowired BootNettyClientHandler bootNettyClientHandler; @Autowired MyDecoder myDecoder; @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast("idleStateHandler",new IdleStateHandler(15,0,15, TimeUnit.SECONDS));//注意new IdleStateHandler的作用 ch.pipeline().addLast("decoder", myDecoder); //找到他的管道 增加他的handler ch.pipeline().addLast(bootNettyClientHandler); System.out.println("初始化信道"); } }
4.自定义业务处理类BootNettyClientHandler类
package com.rtst.dhjclistener.nettyclient; import com.rtst.dhjclistener.entity.Signal; import com.rtst.dhjclistener.repository.DsignalMapper; import com.rtst.dhjclistener.repository.SignalMapper; import com.rtst.dhjclistener.service.SendEmailContentService; import com.rtst.dhjclistener.util.StringUitls; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.socket.SocketChannel; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.util.*; @Component @Slf4j @ChannelHandler.Sharable public class BootNettyClientHandler extends ChannelInboundHandlerAdapter { @Autowired DsignalMapper dsignalMapper; @Autowired SignalMapper signalMapper; @Autowired BootNettyClient bootNettyClient; @Autowired SendEmailContentService sendEmailContentService;//发送告警邮件service类 @Value("${netty.schoolId}") private int schoolId; @Value("#{‘${netty.host}‘.split(‘,‘)}") private List<String> host; @Value("${netty.port}") private int port; // 将当前客户端连接 存入map 实现控制设备下发 参数 public static Map<String, Channel> ctxMap = new LinkedHashMap<String, Channel>(); public int triggeredNum=0;//计数器 public int EmailNum=0;//邮件方法计数器 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress();//获取现连接的IP地址 if(ctxMap.get(clientIp)!=null){//如果不为空就不存 }else{//否则就将当前的设备ip+端口存进map 当做下发设备的标识的key ctxMap.put(clientIp, ctx.channel()); } log.info(clientIp+"------连接成功-------"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg){ SocketChannel channel = (SocketChannel) ctx.channel(); ByteBuf buff = Unpooled.buffer();//netty需要用ByteBuf传输 //将字符串转成每两个字符加空格形式的字符串 String regex = "(.{2})"; String input = msg.toString().replaceAll(regex, "$1 "); log.info(channel.remoteAddress().getHostString() + ": " + input); System.out.println("服务端接受信息为: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + input); byte[] bytes = StringUitls.toByteArray(msg.toString()); Map<String,Object> params = new LinkedHashMap<>(); InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress();//获取现连接的IP地址
//业务处理逻辑,解析报文并入库,如果有告警,发送邮件提示用户机房存在告警
......
} /** * 连接断开时进入该方法 * @param ctx */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端断开连接---channelInactive"); log.info("客户端断开连接---channelInactive"); super.channelInactive(ctx); InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress();//获取现连接的IP地址 if(ctxMap.get(clientIp)!=null){//如果不为空就删除 ctxMap.remove(clientIp, ctx.channel()); } ctx.close(); bootNettyClient.doConnect(clientIp,port);//断线重连 } /** * 出现异常时进入该方法 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress();//获取现连接的IP地址 if(ctxMap.get(clientIp)!=null){//如果不为空就删除 ctxMap.remove(clientIp, ctx.channel()); } ctx.close(); } //处理超时读写空闲事件 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { System.out.println("触发读写空闲操作-----"); InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress();//获取现连接的IP地址 if (evt instanceof IdleStateEvent){ IdleStateEvent idleStateEvent = (IdleStateEvent) evt ; log.info(clientIp+"触发"+idleStateEvent.state()+"事件"); //获取IdleStateEvent事件,根据状态是否为读状态空闲 if (idleStateEvent.state() == IdleState.READER_IDLE){ log.info("已经 好长时间没有收到信息!"); System.out.println("尝试再次发送命令"); //向下位机发送消息 ByteBuf buf =Unpooled.buffer(); String order = "00 00 00 00 00 06 c8 03 00 00 00 64 05 c2";//可以设置成读取更大地址的数据,比如读0-500的地址位:00 00 00 00 00 06 c8 03 00 00 01 F4 05 c2 byte[] msg = StringUitls.hexStrToBinaryStr(order); buf.writeBytes(msg); ctx.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); triggeredNum++; if(triggeredNum>=3){ ctx.close(); triggeredNum=0; } } } super.userEventTriggered(ctx, evt); } }
5.application类实现CommandLineRunner,来启动nettyClient服务
package com.rtst.dhjclistener; import com.rtst.dhjclistener.nettyclient.BootNettyClient; import org.mybatis.spring.annotation.MapperScan; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @MapperScan("com.rtst.dhjclistener.repository") @EnableScheduling public class DhjclistenerApplication implements CommandLineRunner { @Autowired BootNettyClient bootNettyClient; public static void main(String[] args) { SpringApplication.run(DhjclistenerApplication.class, args); } @Override public void run(String... args) throws Exception { /** * 启动netty服务端服务 */ bootNettyClient.start(); } }
6.定时任务发送报文到下位机,请求下位机采集存储的数据
package com.rtst.dhjclistener.ordertask; import com.rtst.dhjclistener.nettyclient.BootNettyClientHandler; import com.rtst.dhjclistener.util.StringUitls; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import java.util.List; @Component @Configuration public class MyTask { @Value("#{‘${netty.host}‘.split(‘,‘)}") private List<String> host; @Scheduled(cron = "*/20 * * * * ?")//每20秒执行一次发送命令,此处根据自己实际需求设置时间 public void order() { ByteBuf buf = Unpooled.buffer(); String order = "00 00 00 00 00 06 c8 03 00 00 00 64 05 c2"; byte[] msg = StringUitls.hexStrToBinaryStr(order); buf.writeBytes(msg); buf.retain(1);//在同时采集2个通讯管理机时会报异常,此行代码可以解决,如果只是采集一个通讯管理机时是不会存在该异常的 for(int i=0;i<host.size();i++){ if(StringUtils.isEmpty(BootNettyClientHandler.ctxMap.get(host.get(i)))){ System.out.println(host.get(i)+"channel对象为空"); continue; }else{ BootNettyClientHandler.ctxMap.get(host.get(i)).writeAndFlush(buf); } } System.out.println("发送命令成功"); } }
注意点:1.断线重连
2.多个通讯管理机定时任务发送报文时,存在异常。解决方法:buf.retain()的作用