用Netty作为服务端的WebSocket通信

用Netty作为服务端的WebSocket通信


1 WebSocket

1.1为什么要选择WebSocket?

http 是无状态协议,为标识用户,通常客户端要带上session。但是它无法解决服务端主动想客户端发送消息的需求。面对并发量高的场景,由于http信息携带的数据大,而对服务器造成较大的负担,所以面对这种场景,WebSocket就上线了。

1.2什么是WebSocket?

WebSocket是一种基于tcp的新的网络协议,它实现了服务器与浏览器之间的全双工,即浏览器和服务器可以互不影响的向对方发送信息和接受信息。如下就是WebSocket的通信过程
用Netty作为服务端的WebSocket通信

可以看到,它还是用到了http协议来建立连接,之后就是Websocket的部分了。

1.3WebSocket的弊端?

WebSocket本身是依赖tomcat,然而tomcat的并发量不大,连接数低,会导致出现断连的情况,因此对于WebSocket通信要求不高的,可以直接依赖tomcat。但是遇到高并发就难以支撑,这时候,netty该上场了。

2 Netty

2.1 什么是Netty?

Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。Netty是基于Java NIO实现的异步通信框架,其主要特点是简单,要比原生的JavaNIO开发方便很多,同时Netty封装了大量好用的组件,方便开发。下面基于Netty实现websocket通信。

Netty的WebSocket通讯实现

public class WebSocketServer {
	
	public void run() {
		// 服务端启动辅助类,用于设置TCP相关参数
		ServerBootstrap bootstrap = new ServerBootstrap();
		// 获取Reactor线程池
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workGroup = new NioEventLoopGroup();
		// 设置为主从线程模型
		bootstrap.group(bossGroup, workGroup)
		// 设置服务端NIO通信类型
		.channel(NioServerSocketChannel.class)
		// 设置ChannelPipeline,也就是业务职责链,由处理的Handler串联而成,由从线程池处理
		.childHandler(new ChannelInitializer<Channel>() {
			// 添加处理的Handler,通常包括消息编解码、业务处理,也可以是日志、权限、过滤等
			@Override
			protected void initChannel(Channel ch) throws Exception {
				// 获取职责链
				ChannelPipeline pipeline = ch.pipeline();
				// 
				pipeline.addLast("http-codec", new HttpServerCodec());
				pipeline.addLast("aggregator", new HttpObjectAggregator(65535));
				pipeline.addLast("http-chunked", new ChunkedWriteHandler());
				pipeline.addLast("handler", new WebSocketHandler());
			}
		})
		// bootstrap 还可以设置TCP参数,根据需要可以分别设置主线程池和从线程池参数,来优化服务端性能。
		// 其中主线程池使用option方法来设置,从线程池使用childOption方法设置。
		// backlog表示主线程池中在套接口排队的最大数量,队列由未连接队列(三次握手未完成的)和已连接队列
		.option(ChannelOption.SO_BACKLOG, 5)
		// 表示连接保活,相当于心跳机制,默认为7200s
		.childOption(ChannelOption.SO_KEEPALIVE, true);
		
		try {
			// 绑定端口,启动select线程,轮询监听channel事件,监听到事件之后就会交给从线程池处理
			Channel channel = bootstrap.bind(8081).sync().channel();
			// 等待服务端口关闭
			channel.closeFuture().sync();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			// 优雅退出,释放线程池资源
			bossGroup.shutdownGracefully();
			workGroup.shutdownGracefully();
		}
	}
 
	public static void main(String[] args) {
		new WebSocketServer().run();
	}
 
}
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket Chat</title>
</head>
<body>
    <script type="text/javascript">
        var socket;
        if (!window.WebSocket) {
            window.WebSocket = window.MozWebSocket;
        }
        if (window.WebSocket) {
            socket = new WebSocket("ws://localhost:8081/ws");
            socket.onopen = function(event) {
                var ta = document.getElementById('responseText');
                ta.value = "连接开启!";
            };
            socket.onclose = function(event) {
                var ta = document.getElementById('responseText');
                ta.value = ta.value + "连接被关闭";
            };
            socket.onmessage = function(event) {
                var ta = document.getElementById('responseText');
                ta.value = ta.value + '\n' + event.data;
            };
        } else {
            alert("你的浏览器不支持 WebSocket!");
        }
 
        function send(message) {
            if (!window.WebSocket) {
                return;
            }
            if (socket.readyState == WebSocket.OPEN) {
                socket.send(message);
            } else {
                alert("连接没有开启.");
            }
        }
    </script>
    <form οnsubmit="return false;">
        <h3>WebSocket 聊天室:</h3>
        <textarea id="responseText" style="width: 500px; height: 300px;"></textarea>
        <br> 
        <input type="text" name="message"  style="width: 300px" value="Welcome to www.waylau.com">
        <input type="button" value="发送消息" οnclick="send(this.form.message.value)">
        <input type="button" οnclick="javascript:document.getElementById('responseText').value=''" value="清空聊天记录">
    </form>
    <br> 
    <br> 
</body>
</html>

3 待优化的细节

3.1 心跳和重连接

通过约定的心跳消息,客户端连接完成后,向服务器发送心跳消息,若按照一定规则,如连续两个心跳周期没有收到服务器的返回消息,那么就主动断开连接,执行重新连接

3.2 实现主题订阅的功能

逻辑上并不复杂,如对多个订阅了相同主题的用户推送消息,只需要维护一个map<String, List>.
其中的String代表主题,List代表订阅了该主题的通道。 订阅逻辑通过约定客户端推送的特殊字符串解析。

3.3 对消息的标准化

直接将需要推送的对象转换成JSON字符串 在序列化传输。也可以在外层做一下必要的封装。

3.4 对于并发性与稳定性

从理论上讲,netty基于nio,有较好的并发性。它用一个线程去管理多个通道,能避免用大量的线程去维护长连接,而损耗性能。至于稳定性,就是可用性,需要靠集群去保证,可以考虑用到zookeeper,部署netty集群,注册在zookeeper上,具体细节待研究。

上一篇:【Python实战】爬取网易云音乐用户听歌排行,来制作一张Ta最爱歌手的词云图


下一篇:ubuntu、windows安装python ta-lib库