用Netty作为服务端的WebSocket通信
1 WebSocket
1.1为什么要选择WebSocket?
http 是无状态协议,为标识用户,通常客户端要带上session。但是它无法解决服务端主动想客户端发送消息的需求。面对并发量高的场景,由于http信息携带的数据大,而对服务器造成较大的负担,所以面对这种场景,WebSocket就上线了。
1.2什么是WebSocket?
WebSocket是一种基于tcp的新的网络协议,它实现了服务器与浏览器之间的全双工,即浏览器和服务器可以互不影响的向对方发送信息和接受信息。如下就是WebSocket的通信过程
可以看到,它还是用到了http协议来建立连接,之后就是Websocket的部分了。
1.3WebSocket的弊端?
WebSocket本身是依赖tomcat,然而tomcat的并发量不大,连接数低,会导致出现断连的情况,因此对于WebSocket通信要求不高的,可以直接依赖tomcat。但是遇到高并发就难以支撑,这时候,netty该上场了。
2 Netty
2.1 什么是Netty?
Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。Netty是基于Java NIO实现的异步通信框架,其主要特点是简单,要比原生的JavaNIO开发方便很多,同时Netty封装了大量好用的组件,方便开发。下面基于Netty实现websocket通信。
Netty的WebSocket通讯实现
public class WebSocketServer {
public void run() {
// 服务端启动辅助类,用于设置TCP相关参数
ServerBootstrap bootstrap = new ServerBootstrap();
// 获取Reactor线程池
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
// 设置为主从线程模型
bootstrap.group(bossGroup, workGroup)
// 设置服务端NIO通信类型
.channel(NioServerSocketChannel.class)
// 设置ChannelPipeline,也就是业务职责链,由处理的Handler串联而成,由从线程池处理
.childHandler(new ChannelInitializer<Channel>() {
// 添加处理的Handler,通常包括消息编解码、业务处理,也可以是日志、权限、过滤等
@Override
protected void initChannel(Channel ch) throws Exception {
// 获取职责链
ChannelPipeline pipeline = ch.pipeline();
//
pipeline.addLast("http-codec", new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65535));
pipeline.addLast("http-chunked", new ChunkedWriteHandler());
pipeline.addLast("handler", new WebSocketHandler());
}
})
// bootstrap 还可以设置TCP参数,根据需要可以分别设置主线程池和从线程池参数,来优化服务端性能。
// 其中主线程池使用option方法来设置,从线程池使用childOption方法设置。
// backlog表示主线程池中在套接口排队的最大数量,队列由未连接队列(三次握手未完成的)和已连接队列
.option(ChannelOption.SO_BACKLOG, 5)
// 表示连接保活,相当于心跳机制,默认为7200s
.childOption(ChannelOption.SO_KEEPALIVE, true);
try {
// 绑定端口,启动select线程,轮询监听channel事件,监听到事件之后就会交给从线程池处理
Channel channel = bootstrap.bind(8081).sync().channel();
// 等待服务端口关闭
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 优雅退出,释放线程池资源
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
new WebSocketServer().run();
}
}
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket Chat</title>
</head>
<body>
<script type="text/javascript">
var socket;
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
socket = new WebSocket("ws://localhost:8081/ws");
socket.onopen = function(event) {
var ta = document.getElementById('responseText');
ta.value = "连接开启!";
};
socket.onclose = function(event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + "连接被关闭";
};
socket.onmessage = function(event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + '\n' + event.data;
};
} else {
alert("你的浏览器不支持 WebSocket!");
}
function send(message) {
if (!window.WebSocket) {
return;
}
if (socket.readyState == WebSocket.OPEN) {
socket.send(message);
} else {
alert("连接没有开启.");
}
}
</script>
<form οnsubmit="return false;">
<h3>WebSocket 聊天室:</h3>
<textarea id="responseText" style="width: 500px; height: 300px;"></textarea>
<br>
<input type="text" name="message" style="width: 300px" value="Welcome to www.waylau.com">
<input type="button" value="发送消息" οnclick="send(this.form.message.value)">
<input type="button" οnclick="javascript:document.getElementById('responseText').value=''" value="清空聊天记录">
</form>
<br>
<br>
</body>
</html>
3 待优化的细节
3.1 心跳和重连接
通过约定的心跳消息,客户端连接完成后,向服务器发送心跳消息,若按照一定规则,如连续两个心跳周期没有收到服务器的返回消息,那么就主动断开连接,执行重新连接
3.2 实现主题订阅的功能
逻辑上并不复杂,如对多个订阅了相同主题的用户推送消息,只需要维护一个map<String, List>.
其中的String代表主题,List代表订阅了该主题的通道。 订阅逻辑通过约定客户端推送的特殊字符串解析。
3.3 对消息的标准化
直接将需要推送的对象转换成JSON字符串 在序列化传输。也可以在外层做一下必要的封装。
3.4 对于并发性与稳定性
从理论上讲,netty基于nio,有较好的并发性。它用一个线程去管理多个通道,能避免用大量的线程去维护长连接,而损耗性能。至于稳定性,就是可用性,需要靠集群去保证,可以考虑用到zookeeper,部署netty集群,注册在zookeeper上,具体细节待研究。