一个基于netty的websocket聊天demo

这里,仅仅是一个demo,模拟客户基于浏览器咨询卖家问题的场景,但是,这里的demo中,卖家不是人,是基于netty的程序(我就叫你uglyRobot吧),自动回复了客户问的问题。

项目特点如下:

1. 前端模拟在第三方应用中嵌入客户咨询页面,这里采用的是基于tornado的web应用,打开页面即进入咨询窗口

2. 客户咨询的内容,将会原封不动的被uglyRobot作为答案返回。(真是情况下,客户是不是会疯掉,哈哈)

3. 客户长时间不说话,uglyRobot会自动给予一段告知信息,并将当前的channel释放掉

4. 客户再次回来问问题时,将不再显示欢迎词,重新分配channel进行"交流"

话不多说,直接上关键部分的代码。

首先,看前端的代码. python的web后台部分:

#!/usr/bin/env python
#-*- coding:utf- -*-
#__author__ "shihuc" import tornado.ioloop
import tornado.httpserver
import tornado.web
import tornado.options
import os
import json
import multiprocessing from tornado.options import define, options
define("port", default=, help="Please run on the given port", type=int) procPool = multiprocessing.Pool() class ChatHandler(tornado.web.RequestHandler):
def get(self):
self.render("chat.html") settings = {
'template_path': 'page', # html文件
'static_path': 'resource', # 静态文件(css,js,img)
'static_url_prefix': '/resource/',# 静态文件前缀
'cookie_secret': 'shihuc', # cookie自定义字符串加盐
'xsrf_cookies': True # 防止跨站伪造
} def make_app():
return tornado.web.Application([
(r"/", ChatHandler)
], default_host='',transforms=None, **settings) if __name__ == "__main__":
tornado.options.parse_command_line()
app = make_app()
http_server = tornado.httpserver.HTTPServer(app)
http_server.listen(options.port)
tornado.ioloop.IOLoop.current().start()

聊天的前端html页面的内容:

<!DOCTYPE html>
<html>
<head lang="en">
<link rel="shortcut icon" href="{{static_url('image/favicon.ico')}}" type="image/x-icon" />
<!--<link rel="shortcut icon" href="./favicon.ico" type="image/x-icon" />-->
<meta charset="UTF-8">
<meta name="viewport"
content="width=device-width, user-scalable=no, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0"/>
<title>疯子聊天DEMO</title>
<link rel="stylesheet" href="{{static_url('css/base.css')}}"/>
<link rel="stylesheet" href="{{static_url('css/consult.css')}}"/>
</head>
<body>
<div class="consultBox">
<div class="consult">
<div class="consult-Hd">
<p class="checkmove"></p>
<img src="{{static_url('image/backProperty.png')}}" alt=""/>
<span>疯子机器人有限公司</span>
<a href="javascript:;" class="bell"></a>
<a href="javascript:;" title="关闭" class="close"></a>
</div>
<div class="consult-Bd">
<div class="consult-cont">
<div class="consult-cont-date"></div>
</div>
</div>
<div class="consult-Fd">
<div class="consult-Fd-hd">
<a href="javascript:;" class="brow"></a>
<a href="javascript:;" class="picture"></a>
</div>
<div>
<textarea class="consult-Fd-textarea" id="Ctextarea" autofocus spellcheck="false"></textarea>
</div>
<div class="buttonBox">
<span class="evaluate">请对服务做出评价</span>
<span class="button disable" id="Cbtn">发送</span>
</div>
</div>
</div>
</div>
<script src="{{static_url('js/jquery-1.11.1.min.js')}}"></script>
<script src="{{static_url('js/bootstrap.min.js')}}"></script>
<script src="{{static_url('js/bootbox.js')}}"></script>
<script src="{{static_url('js/consult.js')}}"></script>
</body>
</html>

重点前端逻辑consult.js的内容:

 /**
* Created by shihuc on 2017/2/21.
*/ var ws = null;
var wsurl = "ws://10.90.9.20:9080/websocket"
var wshandler = {}; var consult={};
consult.init=function(){
consult.setDateInfo();
consult.touch();
consult.send();
};
consult.touch=function(){
$('#Ctextarea').on('keyup',function(e){
if(e.keyCode != ){
if($('#Ctextarea').val()!=""){
$('.button').removeClass('disable');
}else{
$('.button').addClass('disable');
}
}
});
$('.close').click(function(){
$('.consultBox').addClass('hide');
});
$('.bell').click(function(){
$(this).toggleClass('bell2');
})
};
consult.send=function(){
$('.button').click(function(){
if(!$(this).hasClass('disable')){
var cont=$('#Ctextarea').val();
if(ws == null){
wshandler.reconnect(wshandler.interval, cont);
}else{
consult.fullSend(cont);
}
}else{
return false;
}
});
$('#Ctextarea').keydown(function(e){
if(e.keyCode == ){
if(!$('.button').hasClass('disable')){
var cont=$('#Ctextarea').val();
if(ws == null){
wshandler.reconnect(wshandler.interval, cont);
}else{
consult.fullSend(cont);
}
}else{
return false;
}
}
});
};
consult.fullSend = function(cont) {
ws.send(cont);
$('.consult-cont').append(consult.clientText(cont));
$('#Ctextarea').val("");
$('.button').addClass('disable');
consult.position();
};
consult.clientText=function(cont){
var newMsg= '<div class="consult-cont-right">';
newMsg +='<div class="consult-cont-msg-wrapper">';
newMsg +='<i class="consult-cont-corner"></i>';
newMsg +='<div class="consult-cont-msg-container">';
newMsg +="<p>Client: "+ cont +"</p>";
newMsg +='</div>';
newMsg +='</div>';
newMsg +='</div>';
return newMsg;
};
consult.serverText=function(cont){
var newMsg= '<div class="consult-cont-left">';
newMsg +='<div class="consult-cont-msg-wrapper">';
newMsg +='<i class="consult-cont-corner"></i>';
newMsg +='<div class="consult-cont-msg-container">';
newMsg +="<p>"+ cont +"</p>";
newMsg +='</div>';
newMsg +='</div>';
newMsg +='</div>';
return newMsg;
};
consult.service = function(cont) {
$('.consult-cont').append(consult.serverText(cont));
consult.position();
};
consult.position=function(){
var offset = $(".consult-Bd")[].scrollHeight;
$('.consult-Bd').scrollTop(offset);
};
consult.setDateInfo = function() {
var dateInfo = new Date();
console.log(dateInfo.toLocaleTimeString());
$('.consult-cont-date').text(dateInfo.toLocaleTimeString());
}; /*
*下面是websocket操作相关的逻辑 by shihuc, 2017/3/9
*/
wshandler.interval = ;//unit is ms
wshandler.cycId = null;
wshandler.isFirst = true; //不是第一次的情况下,不显示欢迎语
wshandler.connect = function() {
if (ws != null) {
console.log("现已连接");
return ;
}
url = wsurl;
if ('WebSocket' in window) {
ws = new WebSocket(url);
} else if ('MozWebSocket' in window) {
ws = new MozWebSocket(url);
} else {
console.log("您的浏览器不支持WebSocket。");
return ;
}
ws.onopen = function() {
//设置发信息送类型为:ArrayBuffer
ws.binaryType = "arraybuffer";
//发送一个字符串和一个二进制信息
if (wshandler.isFirst) {
ws.send("OPEN");
}
}
ws.onmessage = function(e) {
consult.service(e.data.toString());
}
ws.onclose = function(e) {
console.log("onclose: closed");
wshandler.disconnect();
wshandler.isFirst = false;
}
ws.onerror = function(e) {
console.log("onerror: error");
wshandler.disconnect();
wshandler.isFirst = false;
}
} function checkOpenState(interval,cont) {
if (ws.readyState == ws.OPEN){
consult.fullSend(cont);
clearInterval(wshandler.cycId);
}else{
console.log("Wait for ws to be open again");
wshandler.cycId = setInterval("checkOpenState(" + interval + "," + cont + ")", interval);
}
} wshandler.reconnect = function(interval, cont) {
wshandler.connect();
var newCont = "\'" + cont + "\'";
checkOpenState(interval, newCont);
} //断开连接
wshandler.disconnect = function() {
if (ws != null) {
ws.close();
ws = null;
}
} //websocket逻辑区域
$(document).ready(function(){
wshandler.connect();
consult.init();
});

接下来,看看基于netty的关键代码部分:

/**
* @author "shihuc"
* @date 2017年2月20日
*/
package com.tk.ics.gateway.server; import org.apache.log4j.Logger; import com.tk.ics.gateway.protocol.ws.WebSocketServerInitializer; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate; /**
* @author chengsh05
*
*/
public final class WebSocketServer { private static Logger logger = Logger.getLogger(WebSocketServer.class); static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "" : "")); public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
} EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new WebSocketServerInitializer(sslCtx)); Channel ch = b.bind(PORT).sync().channel();
logger.info("打开您的浏览器,并在地址栏输入 " + (SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/'); ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

netty的childHandler相关的配置代码(WebSocketServerInitializer):

 /**
* @author "shihuc"
* @date 2017年3月13日
*/
package com.tk.ics.gateway.protocol.ws; import com.tk.ics.gateway.handler.ws.WebSocketFrameHandler; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.timeout.IdleStateHandler; /**
* @author chengsh05
*
*/
public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> { private static final String WEBSOCKET_PATH = "/websocket"; private final SslContext sslCtx; public WebSocketServerInitializer(SslContext sslCtx) {
this.sslCtx = sslCtx;
} @Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
//添加超时处理
pipeline.addLast(new IdleStateHandler(, , ));
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator());
pipeline.addLast(new WebSocketServerCompressionHandler());
pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH, null, true));
pipeline.addLast(new WebSocketFrameHandler());
}
}

再接下来,重点看看WebSocketFrameHandler的源码:

 /**
* @author "shihuc"
* @date 2017年3月13日
*/
package com.tk.ics.gateway.handler.ws; import java.util.Locale; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import com.tk.ics.gateway.channel.ServerChannelMgmt; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent; /**
* @author chengsh05
*
*/
public class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> { private static final Logger logger = LoggerFactory.getLogger(WebSocketFrameHandler.class); @Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
// ping and pong frames already handled if (frame instanceof TextWebSocketFrame) {
// Send the uppercase string back.
String request = ((TextWebSocketFrame) frame).text();
logger.info("{} received: {}", ctx.channel(), request);
if(request.equalsIgnoreCase("close")){
ctx.channel().writeAndFlush(new TextWebSocketFrame("you have closed this session".toUpperCase(Locale.US)));
ctx.close();
}else{
/*
* 在这个地方添加客服所需的响应逻辑。当前的demo中,将接收到的消息,又原封不动的发送给了客户端
*/
//ctx.channel().writeAndFlush(new TextWebSocketFrame(request.toUpperCase(Locale.US)));
if(request.toString().equalsIgnoreCase("OPEN")){
ctx.channel().writeAndFlush(new TextWebSocketFrame("iTker: 欢迎光临疯子机器人有限公司。有什么需要咨询的尽管说!小疯第一时间来给您解答~"));
} else {
ctx.channel().writeAndFlush(new TextWebSocketFrame("iTker: \r\n" + request.toString()));
}
}
} else {
String message = "unsupported frame type: " + frame.getClass().getName();
throw new UnsupportedOperationException(message);
}
} @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
String channelId = ctx.channel().id().asLongText();
logger.info("websocket channel active: " + channelId);
if(ServerChannelMgmt.getUserChannelMap().get(channelId) == null){
ServerChannelMgmt.getUserChannelMap().put(channelId, ctx.channel());
}
} @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String channelId = ctx.channel().id().asLongText();
logger.info("websocket channel inactive: " + channelId);
if(ServerChannelMgmt.getUserChannelMap().get(channelId) != null){
ServerChannelMgmt.getUserChannelMap().remove(channelId);
} ctx.fireChannelInactive();
} @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
ChannelFuture f = ctx.channel().writeAndFlush(new TextWebSocketFrame("iTker: 您长时间没有咨询了,再见! 若有需求,欢迎您随时与我们联系!"));
f.addListener(ChannelFutureListener.CLOSE);
}
else if (event.state() == IdleState.WRITER_IDLE)
System.out.println("write idle");
else if (event.state() == IdleState.ALL_IDLE)
System.out.println("all idle");
}
}
}

这个简单的demo,核心部分的代码,就都在这里了。最后,上一个运行过程中的前端效果截图分享给读者。

一个基于netty的websocket聊天demo

这个demo做的事情非常的简单,但是其价值不菲,逻辑实现过程,有需要的,或者有好的建议的,可以留言探讨。这个是本人负责的项目的一个minimini版本的一个小角落一览。

后续,本人将针对这个代码,分析介绍netty的源码重点脉络(基于4.1.7.final版本),敬请关注本人的博客!

上一篇:Codeforces Round #288 (Div. 2)


下一篇:[iOS UI进阶 - 2.1] 彩票Demo v1.1