nettyserver 自定义数据分割

问题起源:

BC20物联网模块通过AT质量发送消息时,无法增加回车换行符号,导致默认的nettysever无法获取消息。

修改方法:

自定义分割符号

 Client&ZDBH01&87.11&0.00&0.00&46.577&-14.707&-72.513&108.91785&34.22269&end#

nettyserver 自定义数据分割

 

package com.jeesite.modules.nettyServer;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

@Slf4j
@Component
public class NettyServer {

    @Value("${tcpServer.port}")
    private int serverPort;
    @Autowired
    private NettyServerHandler nettyServerHandler;

    ServerBootstrap bootstrap = new ServerBootstrap();
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();

    //维护设备连接的map 用于推送消息
    private Map<String, Channel> channelMap = new HashMap<>();

    public boolean serverStart() {
        ByteBuf delimiter = Unpooled.copiedBuffer("#".getBytes());
        //配置服务端的NIO线程组
        bossGroup = new NioEventLoopGroup(1);
        workerGroup = new NioEventLoopGroup();
        try {
            bootstrap = new ServerBootstrap()
                    .group(bossGroup, workerGroup)  // 绑定线程池
                    .channel(NioServerSocketChannel.class) //非阻塞模式
                    .option(ChannelOption.SO_BACKLOG, 128)  //服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝
                    .childOption(ChannelOption.SO_KEEPALIVE, true) //保持长连接,2小时无数据激活心跳机制
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                           // socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));//增加专门对"\n"和"\r\n"的为分隔符,即换行符的解码器
                            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));//增加专门对"\n"和"\r\n"的为分隔符,即换行符的解码器
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new StringEncoder());
                            socketChannel.pipeline().addLast(nettyServerHandler);

                        }
                    });
            boolean flag = false;
            for (int i = 0; i < 10; i++) {
                try {
                    ChannelFuture future = bootstrap.bind(serverPort).sync();
                    System.out.println("Netty Tcp Server start on port:"+ serverPort);
                    flag = true;
                    break;
                } catch (Exception e) {
                    System.err.println("服务端启动失败:{},10s后重试...");
                    Thread.sleep(10000);
                    e.printStackTrace();

                }
            }
            return flag;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    public boolean serverStop() {
        try {
            System.out.println("关闭Netty Tcp 服务端");
            bossGroup.shutdownGracefully().sync();
            workerGroup.shutdownGracefully().sync();
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    //发送消息给下游设备
    public boolean writeMsg(String msg) {
        boolean errorFlag = false;
        Map<String, Channel> channelMap = Constant.channelMap;
        if (channelMap.size() == 0) {
            return true;
        }
        Set<String> keySet = Constant.channelMap.keySet();
        for (String key : keySet) {
            try {
                Channel channel = channelMap.get(key);
                if (!channel.isActive()) {
                    errorFlag = true;
                    continue;
                }
                channel.writeAndFlush(msg);
            } catch (Exception e) {
                errorFlag = true;
            }
        }
        return errorFlag;
    }

}

上一篇:Spring5学习-1


下一篇:activemq的安装