使用netty框架实现websocket客户端。
启动框架
相比服务端标准模式而言,客户端启动类有比较多需要注意的地方,关键的逻辑有两个:
一是客户端需要处理自动重连,这里实际是两种情况,一种是客户端刚启动的时候,尝试去连接服务端,如不成功,则休眠5秒后再次重试;另外一种是出现异常时,包括原先建立连接、正常通信的情况下因为各种原因导致通道失效、心跳异常、服务端退出等,都会自动尝试重连,这样可以确保出现问题时无需系统管理员手工干预,自动重连来恢复运行。
二是连接成功后,要发起WebSocket的握手操作,将http协议升级为websocket协议,关键在于自实现的WebSocketClientHandshakerHandler处理器。
@Slf4j
@Component
public class MessageClient {
@Autowired
private MessageClientGlobalHolder config;
@Autowired
private MessageClientChannelInitializer messageClientChannelInitializer;
/**
* 启动客户端方法
*/
public void start() {
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(messageClientChannelInitializer);
//客户端与服务端连接的通道,final修饰表示只会有一个
ChannelFuture channelFuture = bootstrap.connect(config.getHost(), config.getPort());
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
//未成功
log.error("连接失败", future.cause());
//执行重连
reconnect();
} else {
log.info("连接成功");
Channel channel = future.channel();
//将channel保存到全局变量
config.setChannel(channel);
//发起握手
WebSocketClientHandshakerHandler handler = (WebSocketClientHandshakerHandler) channel.pipeline().get("hookedHandler");
handler.handshake(config.getChannel());
}
}
});
//等待服务器端关闭
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
log.error("消息客户端启动失败:{}" + e.getMessage(), e);
//执行重连
reconnect();
} finally {
workerGroup.shutdownGracefully();
}
}
/**
* 重连
*/
public void reconnect() {
try {
Thread.sleep(5000);
//执行重连
log.info("消息客户端进行重连");
start();
} catch (InterruptedException e) {
log.error("消息客户端重连过程中线程休眠失败", e);
}
}
}
处理器配置
消息处理器的装配与实现关键实现
/**
* 初始化通道
*
* @author wqliu
* @date 2021-2-5 15:12
*/
@Slf4j
@Component
public class MessageClientChannelInitializer extends ChannelInitializer<SocketChannel> {
@Autowired
private MessageClientGlobalHolder config;
@Autowired
private Environment environment;
/**
* 生产运行模式
*/
private final String PRD_MODE="prd";
/**
* 初始化channel
*/
@Override
public void initChannel(SocketChannel socketChannel) throws Exception {
//获取通道链路
ChannelPipeline pipeline = socketChannel.pipeline();
//仅在生产模式下加载ssl过滤器
String mode=environment.getProperty("spring.profiles.active");
if(PRD_MODE.equals(mode)){
//ssl
SSLContext sslContext = createSslContext();
SSLEngine engine = sslContext.createSSLEngine();
engine.setNeedClientAuth(false);
engine.setUseClientMode(false);
pipeline.addLast(new SslHandler(engine));
}
//HTTP 编解码
pipeline.addLast(new HttpClientCodec());
// 聚合为单个 FullHttpRequest 或者 FullHttpResponse
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
/**
* 注意,因WebSocketClientHandshakerHandler继承自SimpleChannelInboundHandler,会自动释放消息
* 对于收到服务端的pong消息,默认情况下不会往通道后续的处理器传递,所以若放到WebSocketClientHandshakerHandler之后,
* 则会产生读空闲,导致心跳超时失效。
*/
// 添加读写通道空闲处理器,当空闲满足设置时,会触发userEventTrigger,由下个处理器获取到
pipeline.addLast(new IdleStateHandler(config.getReadIdleTimeOut(), 0,
0, TimeUnit.SECONDS));
//心跳超时处理
pipeline.addLast(new HeartbeatTimeoutHandler());
//处理web socket协议与握手
pipeline.addLast("hookedHandler", new WebSocketClientHandshakerHandler());
//心跳发送
pipeline.addLast(new HeartbeatRequestHandler(config.getHeartbeatRate()));
//将文本按消息类型转换为请求消息或响应消息
pipeline.addLast(new MessageTypeDecodeHandler());
//请求消息业务逻辑处理器
pipeline.addLast(new RequestMessageBusinessHandler());
//响应消息业务逻辑处理器
pipeline.addLast(new ResponseMessageBusinessHandler());
//编码为TextWebSocketFrame
pipeline.addLast(new TextWebSocketFrameEncodeHandler());
//json序列化
pipeline.addLast(new JsonEncodeHandler());
}
/**
* 创建ssl上下文对象
* @param type
* @param path
* @param password
* @return
* @throws Exception
*/
public SSLContext createSslContext() throws Exception {
//读取配置信息
String path=environment.getProperty("server.ssl.key-store");
log.info("证书地址:{}",path);
String password=environment.getProperty("server.ssl.key-store-password");
String type=environment.getProperty("server.ssl.key-store-type");
//构建证书上下文对象
KeyStore ks = KeyStore.getInstance(type);
path=path.replace("classpath:","");
log.info("处理后的证书地址:{}",path);
ClassPathResource resource = new ClassPathResource(path);
InputStream ksInputStream = resource.getInputStream();
ks.load(ksInputStream, password.toCharArray());
//KeyManagerFactory充当基于密钥内容源的密钥管理器的工厂。
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(ks, password.toCharArray());
//SSLContext的实例表示安全套接字协议的实现,它充当用于安全套接字工厂或 SSLEngine 的工厂。
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(kmf.getKeyManagers(), null, null);
return sslContext;
}
}
处理器清单
一共涉及到12个消息处理器,其中4个是netty内置的,只是进行了参数配置,其他8个是自己实现的,用于处理逻辑和数据的,依次如下:
序号 | 处理器类型 | 职责 | 实现 | 说明 |
---|---|---|---|---|
1 | SslHandler | 处理可靠安全连接 | 内置 | 仅在生产环境,需要进行ssl加解密 |
2 | HttpClientCodec | HTTP 编解码 | 内置 | 对Http请求进行解码与编码 |
3 | HttpObjectAggregator | 聚合HTTP 请求或响应 | 内置 | 将http请求或响应聚合为一个完整对象 |
4 | IdleStateHandler | 空闲监测 | 内置 | 监测空闲状态,触发后续超时处理 |
5 | HeartbeatTimeoutHandler | 心跳超时处理 | 自定义 | 心跳超时执行关闭连接,触发重连 |
6 | WebSocketClientHandshakerHandler | WebSocket专用处理 | 自定义 | 处理WebSocket的握手以及Ping、Pong、Close消息 |
7 | HeartbeatRequestHandler | 发送心跳请求 | 自定义 | 客户端向服务端定时发送心跳 |
8 | MessageTypeDecodeHandler | 文本反序列化成消息对象 | 自定义 | 将文本按消息类型转换为请求消息或响应消息 |
9 | RequestMessageBusinessHandler | 处理请求消息 | 自定义 | 请求消息业务逻辑处理器 |
10 | ResponseMessageBusinessHandler | 处理响应消息 | 自定义 | 响应消息业务逻辑处理器 |
11 | TextWebSocketFrameEncodeHandler | JSON格式转文本帧 | 自定义 | 将json格式字符串编码为TextWebSocketFrame |
12 | JsonEncodeHandler | 对象序列化为JSON字符串 | 自定义 | 将对象序列化为json格式字符串 |
自定义的8个处理器中,最后两个11和12是出站处理器,注意实际执行顺序是先12后11,也就是,业务逻辑处理器9或10的处理结果是一个对象,先由出站处理器12将其序列化为json字符串,然后再由出站处理器11将其包装为一个WebSocket协议约定的文本帧TextWebSocketFrame。
自定义处理器
心跳超时处理
这个处理器实际是配合netty内置的空闲检测处理器IdleStateHandler使用的,只有满足IdleStateHandler中设置的触发条件,才会触发本处理器中的userEventTriggered方法,执行自定义的逻辑操作,这里是主动关闭连接。
客户端每隔固定时间频率向服务器端发送心跳,WebSocket协议约定的PingWebSocketFrame,服务端收到后马上会回复PongWebSocketFrame,如通道失效或服务端无响应情况下,就会触发客户端读空闲。
/**
* 心跳超时处理器
* @author wqliu
* @date 2021-10-2 14:25
**/
@Slf4j
public class HeartbeatTimeoutHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.READER_IDLE)) {
log.info("读空闲");
//关闭连接
ctx.channel().close();
}
} else {
//非空闲事件,传递到下个处理器
super.userEventTriggered(ctx, evt);
}
}
}
WebSocket专用处理
这是很关键的一个处理器,自身也比较复杂。
主要实现是借助netty提供的一个WebSocketClientHandshaker类,在初始化时设置websocket服务端连接信息,然后在客户端启动时,调用该类的发起握手方法handshake,服务器端收到该握手请求后,会进行后续处理,响应一个协议升级,将http协议升级为WebSocket协议。
同时需要注意的是,这里还有一个我们自定义的操作,即在握手成功,协议升级后,客户端发出一个登录服务端的请求消息。
/**
* 处理web socket握手
*
* @author wqliu
* @date 2021-9-28 16:33
**/
@Slf4j
@Data
public class WebSocketClientHandshakerHandler extends SimpleChannelInboundHandler<Object> {
/**
* 握手
*/
private WebSocketClientHandshaker handshaker;
/**
* 握手 异步处理
*/
private ChannelPromise handshakeFuture;
public WebSocketClientHandshakerHandler() {
//初始化握手处理者
MessageClientGlobalHolder config = SpringUtil.getBean(MessageClientGlobalHolder.class);
URI webSocketUri = null;
try {
webSocketUri = new URI(config.getWebSocketUrl());
} catch (URISyntaxException e) {
log.error("解析远程服务器地址出错", e);
}
WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(webSocketUri, WebSocketVersion.V13, (String) null, true, new DefaultHttpHeaders());
this.setHandshaker(handshaker);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// log.info("收到消息:{}",msg.toString());
Channel ch = ctx.channel();
FullHttpResponse response;
//进行握手操作
if (!this.handshaker.isHandshakeComplete()) {
try {
response = (FullHttpResponse) msg;
//握手协议返回,设置结束握手
this.handshaker.finishHandshake(ch, response);
//设置成功
this.handshakeFuture.setSuccess();
} catch (WebSocketHandshakeException var7) {
//已握手成功并将http协议升级为了WebSocket协议,不应再收到Http消息,发生这种情况则抛出异常
FullHttpResponse res = (FullHttpResponse) msg;
String errorMsg = String.format("握手失败,status:%s,reason:%s", res.status(), res.content().toString(CharsetUtil.UTF_8));
this.handshakeFuture.setFailure(new Exception(errorMsg));
}
} else if (msg instanceof FullHttpResponse) {
response = (FullHttpResponse) msg;
throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
} else if (msg instanceof CloseWebSocketFrame) {
log.info("收到关闭信息");
} else if (msg instanceof TextWebSocketFrame) {
log.info("收到文本帧,往下传递");
ReferenceCountUtil.retain(msg);
ctx.fireChannelRead(msg);
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
//设置握手成功后,发起登录请求
this.handshakeFuture = ctx.newPromise();
ChannelFuture handshakeFuture = this.handshakeFuture;
handshakeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
//发送登录请求
log.info("握手成功");
Login login = new Login();
login.sendMessage();
} else {
//握手失败
log.error("握手失败", future.cause());
}
}
});
}
/**
* 发起握手
*/
public void handshake(Channel channel) {
this.getHandshaker().handshake(channel);
}
}
发送心跳请求
心跳机制是客户端每隔固定时间频率向服务器端发送心跳,WebSocket协议约定的PingWebSocketFrame
/**
* 心跳请求处理器
* @author wqliu
* @date 2021-10-2 13:24
**/
@Slf4j
public class HeartbeatRequestHandler extends ChannelInboundHandlerAdapter {
/**
* 心跳发送间隔,单位秒
*/
private int heartbeatInterval=5;
public HeartbeatRequestHandler(int heartbeatInterval){
this.heartbeatInterval=heartbeatInterval;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
EventLoop eventLoop = ctx.channel().eventLoop();
eventLoop.scheduleWithFixedDelay(new Runnable() {
private Channel channel;
@Override
public void run() {
// log.info("发送心跳");
PingWebSocketFrame frame=new PingWebSocketFrame();
ChannelFuture channelFuture = channel.writeAndFlush(frame);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// log.error(future.isSuccess()+"",future.cause());
}
});
}
public Runnable setChannel(Channel channel){
this.channel=channel;
return this;
}
}.setChannel(ctx.channel()),15,heartbeatInterval, TimeUnit.SECONDS);
//不调用父类方法,则其他处理器的channelActive事件不再触发
super.channelActive(ctx);
}
}
文本反序列化成消息对象
我将消息设计为两类,请求消息和响应消息,这里通过自己实现的一个处理器,将客户端传来的文本帧,通过消息类型属性反序列化成请求消息对象或响应消息对象,这里调用的是公用的处理器,即服务端也使用相同的处理器。
/**
* 消息类型解码
* @author wqliu
* @date 2021-10-6 11:23
**/
public class MessageTypeDecodeHandler extends MessageToMessageDecoder<TextWebSocketFrame> {
@Override
protected void decode(ChannelHandlerContext ctx, TextWebSocketFrame msg, List<Object> out) throws Exception {
String message=msg.text();
//消息解析
JSONObject jsonObject = JSONObject.parseObject(message);
String messageType = jsonObject.getString("messageType");
if (messageType.equals(MessageType.REQUEST.name())) {
RequestMessage requestMessage = JSON.parseObject(message, RequestMessage.class);
out.add(requestMessage);
}else if (messageType.equals(MessageType.RESPONSE.name())) {
ResponseMessage responseMessage = JSON.parseObject(message, ResponseMessage.class);
out.add(responseMessage);
}
}
}
请求/响应消息处理器
上一步把消息内容通过解码形成了请求消息或响应消息,而这两个处理器只需加入到链条中即可,根据传入的消息类型,也就是泛型参数类型,会自动识别处理或者往下传递。
JSON格式转文本帧
这个其实也没什么好说的,就是把JSON格式字符串放到文本帧中,这里调用的是公用的处理器,即服务端也使用相同的处理器。
/**
* 将json格式字符串编码为TextWebSocketFrame
* @author wqliu
* @date 2021-10-6 11:23
**/
public class TextWebSocketFrameEncodeHandler extends MessageToMessageEncoder<String> {
@Override
protected void encode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {
TextWebSocketFrame frame=new TextWebSocketFrame(msg);
out.add(frame);
}
}
对象序列化为JSON字符串
这个其实也没什么好说的,就是把对象转换为JSON格式字符串,这里调用的是公用的处理器,即服务端也使用相同的处理器。
/**
* 将对象序列化为json格式字符串
* @author wqliu
* @date 2021-10-6 11:23
**/
public class JsonEncodeHandler extends MessageToMessageEncoder<BaseMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, BaseMessage msg, List<Object> out) throws Exception {
if(msg instanceof BaseMessage) {
out.add(JSONObject.toJSONString(msg));
}else{
out.add(msg);
}
}
}