仿淘宝开放平台之消息服务——客户端处理链条设计与实现

使用netty框架实现websocket客户端。

启动框架

相比服务端标准模式而言,客户端启动类有比较多需要注意的地方,关键的逻辑有两个:

一是客户端需要处理自动重连,这里实际是两种情况,一种是客户端刚启动的时候,尝试去连接服务端,如不成功,则休眠5秒后再次重试;另外一种是出现异常时,包括原先建立连接、正常通信的情况下因为各种原因导致通道失效、心跳异常、服务端退出等,都会自动尝试重连,这样可以确保出现问题时无需系统管理员手工干预,自动重连来恢复运行。
二是连接成功后,要发起WebSocket的握手操作,将http协议升级为websocket协议,关键在于自实现的WebSocketClientHandshakerHandler处理器。

@Slf4j
@Component
public class MessageClient {


    @Autowired
    private MessageClientGlobalHolder config;

    @Autowired
    private MessageClientChannelInitializer messageClientChannelInitializer;



    /**
     * 启动客户端方法
     */
    public void start() {

        EventLoopGroup workerGroup = new NioEventLoopGroup(1);
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(workerGroup);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(messageClientChannelInitializer);


            //客户端与服务端连接的通道,final修饰表示只会有一个
            ChannelFuture channelFuture = bootstrap.connect(config.getHost(), config.getPort());
            channelFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        //未成功
                        log.error("连接失败", future.cause());
                        //执行重连
                        reconnect();
                    } else {
                        log.info("连接成功");
                        Channel channel = future.channel();
                        //将channel保存到全局变量
                        config.setChannel(channel);
                        //发起握手
                        WebSocketClientHandshakerHandler handler = (WebSocketClientHandshakerHandler) channel.pipeline().get("hookedHandler");
                        handler.handshake(config.getChannel());
                    }
                }
            });
            //等待服务器端关闭
            channelFuture.channel().closeFuture().sync();


        } catch (Exception e) {
            log.error("消息客户端启动失败:{}" + e.getMessage(), e);
            //执行重连
            reconnect();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

    /**
     * 重连
     */
    public void reconnect() {
        try {
            Thread.sleep(5000);
            //执行重连
            log.info("消息客户端进行重连");
            start();

        } catch (InterruptedException e) {
            log.error("消息客户端重连过程中线程休眠失败", e);
        }

    }

   
}

处理器配置

消息处理器的装配与实现关键实现


/**
 * 初始化通道
 *
 * @author wqliu
 * @date 2021-2-5 15:12
 */
@Slf4j
@Component
public class MessageClientChannelInitializer extends ChannelInitializer<SocketChannel> {


    @Autowired
    private MessageClientGlobalHolder config;

    @Autowired
    private Environment environment;

    /**
     * 生产运行模式
     */
    private final String PRD_MODE="prd";

    /**
     * 初始化channel
     */
    @Override
    public void initChannel(SocketChannel socketChannel) throws Exception {



        //获取通道链路
        ChannelPipeline pipeline = socketChannel.pipeline();

        //仅在生产模式下加载ssl过滤器
        String mode=environment.getProperty("spring.profiles.active");
        if(PRD_MODE.equals(mode)){
            //ssl
            SSLContext sslContext = createSslContext();
            SSLEngine engine = sslContext.createSSLEngine();
            engine.setNeedClientAuth(false);
            engine.setUseClientMode(false);
            pipeline.addLast(new SslHandler(engine));
        }


        //HTTP 编解码
        pipeline.addLast(new HttpClientCodec());

        // 聚合为单个 FullHttpRequest 或者 FullHttpResponse
        pipeline.addLast(new HttpObjectAggregator(64 * 1024));

        /**
         * 注意,因WebSocketClientHandshakerHandler继承自SimpleChannelInboundHandler,会自动释放消息
         * 对于收到服务端的pong消息,默认情况下不会往通道后续的处理器传递,所以若放到WebSocketClientHandshakerHandler之后,
         * 则会产生读空闲,导致心跳超时失效。
         */
        // 添加读写通道空闲处理器,当空闲满足设置时,会触发userEventTrigger,由下个处理器获取到
        pipeline.addLast(new IdleStateHandler(config.getReadIdleTimeOut(), 0,
                0, TimeUnit.SECONDS));

        //心跳超时处理
        pipeline.addLast(new HeartbeatTimeoutHandler());


        //处理web socket协议与握手
        pipeline.addLast("hookedHandler", new WebSocketClientHandshakerHandler());


        //心跳发送
        pipeline.addLast(new HeartbeatRequestHandler(config.getHeartbeatRate()));

        //将文本按消息类型转换为请求消息或响应消息
        pipeline.addLast(new MessageTypeDecodeHandler());

        //请求消息业务逻辑处理器
        pipeline.addLast(new RequestMessageBusinessHandler());

        //响应消息业务逻辑处理器
        pipeline.addLast(new ResponseMessageBusinessHandler());

        //编码为TextWebSocketFrame
        pipeline.addLast(new TextWebSocketFrameEncodeHandler());

        //json序列化
        pipeline.addLast(new JsonEncodeHandler());


    }

    /**
     * 创建ssl上下文对象
     * @param type
     * @param path
     * @param password
     * @return
     * @throws Exception
     */
    public SSLContext createSslContext() throws Exception {

        //读取配置信息
        String path=environment.getProperty("server.ssl.key-store");
        log.info("证书地址:{}",path);
        String password=environment.getProperty("server.ssl.key-store-password");
        String type=environment.getProperty("server.ssl.key-store-type");

        //构建证书上下文对象
        KeyStore ks = KeyStore.getInstance(type);
        path=path.replace("classpath:","");
        log.info("处理后的证书地址:{}",path);
        ClassPathResource resource = new ClassPathResource(path);
        InputStream ksInputStream = resource.getInputStream();
        ks.load(ksInputStream, password.toCharArray());
        //KeyManagerFactory充当基于密钥内容源的密钥管理器的工厂。
        KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
        kmf.init(ks, password.toCharArray());
        //SSLContext的实例表示安全套接字协议的实现,它充当用于安全套接字工厂或 SSLEngine 的工厂。
        SSLContext sslContext = SSLContext.getInstance("TLS");
        sslContext.init(kmf.getKeyManagers(), null, null);
        return sslContext;
    }


}

处理器清单

一共涉及到12个消息处理器,其中4个是netty内置的,只是进行了参数配置,其他8个是自己实现的,用于处理逻辑和数据的,依次如下:

序号 处理器类型 职责 实现 说明
1 SslHandler 处理可靠安全连接 内置 仅在生产环境,需要进行ssl加解密
2 HttpClientCodec HTTP 编解码 内置 对Http请求进行解码与编码
3 HttpObjectAggregator 聚合HTTP 请求或响应 内置 将http请求或响应聚合为一个完整对象
4 IdleStateHandler 空闲监测 内置 监测空闲状态,触发后续超时处理
5 HeartbeatTimeoutHandler 心跳超时处理 自定义 心跳超时执行关闭连接,触发重连
6 WebSocketClientHandshakerHandler WebSocket专用处理 自定义 处理WebSocket的握手以及Ping、Pong、Close消息
7 HeartbeatRequestHandler 发送心跳请求 自定义 客户端向服务端定时发送心跳
8 MessageTypeDecodeHandler 文本反序列化成消息对象 自定义 将文本按消息类型转换为请求消息或响应消息
9 RequestMessageBusinessHandler 处理请求消息 自定义 请求消息业务逻辑处理器
10 ResponseMessageBusinessHandler 处理响应消息 自定义 响应消息业务逻辑处理器
11 TextWebSocketFrameEncodeHandler JSON格式转文本帧 自定义 将json格式字符串编码为TextWebSocketFrame
12 JsonEncodeHandler 对象序列化为JSON字符串 自定义 将对象序列化为json格式字符串

自定义的8个处理器中,最后两个11和12是出站处理器,注意实际执行顺序是先12后11,也就是,业务逻辑处理器9或10的处理结果是一个对象,先由出站处理器12将其序列化为json字符串,然后再由出站处理器11将其包装为一个WebSocket协议约定的文本帧TextWebSocketFrame。

自定义处理器

心跳超时处理

这个处理器实际是配合netty内置的空闲检测处理器IdleStateHandler使用的,只有满足IdleStateHandler中设置的触发条件,才会触发本处理器中的userEventTriggered方法,执行自定义的逻辑操作,这里是主动关闭连接。

客户端每隔固定时间频率向服务器端发送心跳,WebSocket协议约定的PingWebSocketFrame,服务端收到后马上会回复PongWebSocketFrame,如通道失效或服务端无响应情况下,就会触发客户端读空闲。


/**
 * 心跳超时处理器
 * @author wqliu
 * @date 2021-10-2 14:25
 **/
@Slf4j
public class HeartbeatTimeoutHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state().equals(IdleState.READER_IDLE)) {
                log.info("读空闲");
                //关闭连接
                ctx.channel().close();
            }
        } else {
            //非空闲事件,传递到下个处理器
            super.userEventTriggered(ctx, evt);
        }

    }


}

WebSocket专用处理

这是很关键的一个处理器,自身也比较复杂。

主要实现是借助netty提供的一个WebSocketClientHandshaker类,在初始化时设置websocket服务端连接信息,然后在客户端启动时,调用该类的发起握手方法handshake,服务器端收到该握手请求后,会进行后续处理,响应一个协议升级,将http协议升级为WebSocket协议。

同时需要注意的是,这里还有一个我们自定义的操作,即在握手成功,协议升级后,客户端发出一个登录服务端的请求消息。

/**
 * 处理web socket握手
 *
 * @author wqliu
 * @date 2021-9-28 16:33
 **/
@Slf4j
@Data
public class WebSocketClientHandshakerHandler extends SimpleChannelInboundHandler<Object> {
    /**
     * 握手
     */
    private WebSocketClientHandshaker handshaker;
    /**
     * 握手 异步处理
     */
    private ChannelPromise handshakeFuture;

    public WebSocketClientHandshakerHandler() {
        //初始化握手处理者
        MessageClientGlobalHolder config = SpringUtil.getBean(MessageClientGlobalHolder.class);
        URI webSocketUri = null;
        try {
            webSocketUri = new URI(config.getWebSocketUrl());
        } catch (URISyntaxException e) {
            log.error("解析远程服务器地址出错", e);
        }
        WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(webSocketUri, WebSocketVersion.V13, (String) null, true, new DefaultHttpHeaders());
        this.setHandshaker(handshaker);


    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        // log.info("收到消息:{}",msg.toString());
        Channel ch = ctx.channel();
        FullHttpResponse response;
        //进行握手操作
        if (!this.handshaker.isHandshakeComplete()) {
            try {
                response = (FullHttpResponse) msg;
                //握手协议返回,设置结束握手
                this.handshaker.finishHandshake(ch, response);
                //设置成功
                this.handshakeFuture.setSuccess();

            } catch (WebSocketHandshakeException var7) {
                 //已握手成功并将http协议升级为了WebSocket协议,不应再收到Http消息,发生这种情况则抛出异常
                FullHttpResponse res = (FullHttpResponse) msg;
                String errorMsg = String.format("握手失败,status:%s,reason:%s", res.status(), res.content().toString(CharsetUtil.UTF_8));
                this.handshakeFuture.setFailure(new Exception(errorMsg));
            }
        } else if (msg instanceof FullHttpResponse) {
            response = (FullHttpResponse) msg;
            throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
        } else if (msg instanceof CloseWebSocketFrame) {
            log.info("收到关闭信息");

        } else if (msg instanceof TextWebSocketFrame) {
            log.info("收到文本帧,往下传递");
            ReferenceCountUtil.retain(msg);
            ctx.fireChannelRead(msg);
        }

    }


    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {

        //设置握手成功后,发起登录请求
        this.handshakeFuture = ctx.newPromise();
        ChannelFuture handshakeFuture = this.handshakeFuture;
        handshakeFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    //发送登录请求
                    log.info("握手成功");
                    Login login = new Login();
                    login.sendMessage();

                } else {
                    //握手失败
                    log.error("握手失败", future.cause());
                }
            }
        });


    }

    /**
     * 发起握手
     */
    public void handshake(Channel channel) {
        this.getHandshaker().handshake(channel);
    }


}

发送心跳请求

心跳机制是客户端每隔固定时间频率向服务器端发送心跳,WebSocket协议约定的PingWebSocketFrame

/**
 * 心跳请求处理器
 * @author wqliu
 * @date 2021-10-2 13:24
 **/
@Slf4j
public class HeartbeatRequestHandler extends ChannelInboundHandlerAdapter {

    /**
     * 心跳发送间隔,单位秒
     */
    private int heartbeatInterval=5;

    public HeartbeatRequestHandler(int heartbeatInterval){
        this.heartbeatInterval=heartbeatInterval;
    }


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        EventLoop eventLoop = ctx.channel().eventLoop();
        eventLoop.scheduleWithFixedDelay(new Runnable() {
            private Channel channel;
            @Override
            public void run() {
                // log.info("发送心跳");
                PingWebSocketFrame frame=new PingWebSocketFrame();
                ChannelFuture channelFuture = channel.writeAndFlush(frame);
                channelFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        // log.error(future.isSuccess()+"",future.cause());
                    }
                });
            }

            public Runnable setChannel(Channel channel){
                this.channel=channel;
                return this;
            }

        }.setChannel(ctx.channel()),15,heartbeatInterval, TimeUnit.SECONDS);

        //不调用父类方法,则其他处理器的channelActive事件不再触发
        super.channelActive(ctx);
    }
}

文本反序列化成消息对象

我将消息设计为两类,请求消息和响应消息,这里通过自己实现的一个处理器,将客户端传来的文本帧,通过消息类型属性反序列化成请求消息对象或响应消息对象,这里调用的是公用的处理器,即服务端也使用相同的处理器。


/**
 * 消息类型解码
 * @author wqliu
 * @date 2021-10-6 11:23
 **/
public class MessageTypeDecodeHandler extends MessageToMessageDecoder<TextWebSocketFrame> {
    @Override
    protected void decode(ChannelHandlerContext ctx, TextWebSocketFrame msg, List<Object> out) throws Exception {
        String message=msg.text();
        //消息解析
        JSONObject jsonObject = JSONObject.parseObject(message);
        String messageType = jsonObject.getString("messageType");
        if (messageType.equals(MessageType.REQUEST.name())) {
            RequestMessage requestMessage = JSON.parseObject(message, RequestMessage.class);
            out.add(requestMessage);

        }else if (messageType.equals(MessageType.RESPONSE.name())) {

            ResponseMessage responseMessage = JSON.parseObject(message, ResponseMessage.class);
            out.add(responseMessage);
        }
    }

}

请求/响应消息处理器

上一步把消息内容通过解码形成了请求消息或响应消息,而这两个处理器只需加入到链条中即可,根据传入的消息类型,也就是泛型参数类型,会自动识别处理或者往下传递。



JSON格式转文本帧

这个其实也没什么好说的,就是把JSON格式字符串放到文本帧中,这里调用的是公用的处理器,即服务端也使用相同的处理器。

/**
 * 将json格式字符串编码为TextWebSocketFrame
 * @author wqliu
 * @date 2021-10-6 11:23
 **/

public class TextWebSocketFrameEncodeHandler extends MessageToMessageEncoder<String> {
    @Override
    protected void encode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {
        TextWebSocketFrame frame=new TextWebSocketFrame(msg);
        out.add(frame);

    }


}

对象序列化为JSON字符串

这个其实也没什么好说的,就是把对象转换为JSON格式字符串,这里调用的是公用的处理器,即服务端也使用相同的处理器。

/**
 * 将对象序列化为json格式字符串
 * @author wqliu
 * @date 2021-10-6 11:23
 **/
public class JsonEncodeHandler extends MessageToMessageEncoder<BaseMessage> {
    @Override
    protected void encode(ChannelHandlerContext ctx, BaseMessage msg, List<Object> out) throws Exception {

        if(msg instanceof BaseMessage) {
            out.add(JSONObject.toJSONString(msg));
        }else{
            out.add(msg);
        }
    }


}
上一篇:10寸RK3399三防加固平板ipad,Android10.0操作系统


下一篇:RK3399平台开发系列讲解(内核调试篇)9.2、如何使用dump_stack分析函数调用关系