两周前用长轮询做了一个Chat,并移植到了Azure,还写了篇博客http://www.cnblogs.com/indream/p/3187540.html,让大家帮忙测试。
首先感谢300位注册用户,让我有充足的数据进行重构和优化。所以这两周都在进行大重构。
其中最大的一个问题就是数据流量过大,原先已有更新,还会有Web传统“刷新”的形式把数据重新拿一次,然后再替换掉本地数据。
但这一拿问题就来了,在10个Chat*300个用户的情况下,这一拿产生了一次8M多的流量,这是十分严重的事情,特别是其中绝大部分数据都是浪费掉了的。
那么解决方案就很简单了,把“全量”改成“增量”,只传输修改的部分,同时大量增加往返次数,把每次往返量压缩。
当然,这篇文章主要讲长轮询,也是之后被问得比较多的方面,所以就单独写篇文章出来了。
这次比单纯的轮询多了一个缓存行为,以解决每次“心跳”中所产生的断线间隔数据丢失的问题。
首先列举一下所使用到的技术点:
- jQuery.Ajax
- .NET同步(lock)与异步(async await Task)
- MVC异步页面
长轮询的简介
长轮询是一种类似于JSONP一样畸形的Web通信技术,用以实现Web与服务端之间的实时双向通信。
在有人实现JSONP之前,单纯的JS或者说Web是无法实现原生地有效地实现跨域通信的;而在有了JSONP之后,这项工作就变得简单了,虽然实现方法很“畸形(或者说有创意吧)”。
同样,在有长轮询之前,还没出现HTML5 Web Socket的时代,单纯的Web无法与服务器进行实时通信,HTTP限制了通信行为只能是有客户端发起请求,然后服务端针对该请求进行回应。
长轮询所做的就是把原有的协议“漏洞”利用起来,使得客户端和服务端之间在HTML 4.1(部分更低版本应该也可以兼容)下可以实时通信。
长轮询的原理
HTTP协议本身有两个“漏洞”,也是现在网络通信中无法避免的。
一个是请求(Request)和答复(Response)之间无法确认其连接状况,可就无法确定其所用的时限了。
判断客户端与服务端是否相连的一个标准就是客户端的请求是否能收到服务端的答复,如果收得到,就说明连接上了,即时收到的是服务端错误的通知(比如404 not found)。
第二漏洞就是在获取到答复(Response)前,都无法知道所需要的数据内容是怎么样的(如果有还跟人家要啥)。
长轮询就是利用了这两个“漏洞”:服务端收到请求(Request)后,将该请求Hold住不马上答复,而是一直等,等到服务端有信息需要发送给客户端的时候,通过将刚才Hold住的那条请求(Request)的答复(Response)发回给客户端,让客户端作出反应。而返回的内容,呵呵呵呵呵,那就随便服务端了。
然后,客户端收到答复(Response)后,马上再重新发送一次请求(Request)给服务端,让服务端再Hold住这条连接。周而复始,就实现了从服务端向客户端发送消息的实时通信,客户端向服务端发送消息则依旧利用传统的Post和Get进行。
受Web通信现实情况限制,如果服务端长时间没有消息需要推送到客户端的时候,也不能一直Hold住那条链接,因为很有可能被判定为网关超时等超时情况。所以即使没有消息,每间隔一段时间,服务端也要返回一个答复(Response),让客户端重新请求一个链接。
见过一些人喜欢把每次轮询的断开到下次轮询开始客户端的接收->再请求的行为称之为一次“心跳(Beat)”,也挺贴切的。
要实现真正的实时通信,长轮询的实现并不那么简单,因为每次“心跳”时会产生一个小间隙,这个间隙的时候服务端已经将上一个答复(Response)返回,但还没有接收到客户端的下一次请求(Request)。那么这时候,服务端如果有最新消息,就无法推送给客户端了,所以需要将这些消息缓存起来,等到下一次机会到来的时候再XXOO。
jQuery.AJAX
如果是AJAX的话,一般都是用jQuery进行实现。况且,毕竟还用了JSONP,手动写起来在工作中实在不划算。
到了Web端的代码,就变得很容易了,以下内容直接从项目中节选,只是作了一些山间
getJsonp: function (url, data, callback, errorCallback) {
$.ajax({
url: url,
data: data,
type: "POST",
dataType: "jsonp",
jsonpCallback: "callback" + Math.random().toString().replace('.', ''),
success: callback,
error: errorCallback
});
},
//轮询的锁,保证每个轮询有且仅有一个
pollingLocks: {
},
//轮询的重试时间
pollingRetries: {
},
//轮询错误的callBack缓存
pollingCallbacks: [],
//轮询
//listeningCode: 监听编码,与服务器的一个契约,单个监听编码在服务器中有对应的一个缓冲池,以保留该监听相关信息
//url: 目标地址
//data: 请求时的参数
//lockName: 锁名,同样的锁名在同一时间只会出现一个轮询
//callbakc: 接收到服务端数据后的回调
polling: function (listeningCode, url, data, lockName, callback) {
var comet = chatConnectionProvider.connections.comet; //判断是否有锁,排他,不允许重复监听,保持单一链接
if (!comet.pollingLocks[lockName]) {
//锁住监听
comet.pollingLocks[lockName] = true;
comet.getJsonp(url, data, function (cometCallbackData) {
var listeningCode = cometCallbackData.ListeningCode;
//将消息发回
for (var i in cometCallbackData.Callbacks) {
callback(cometCallbackData.Callbacks[i]);
}
//将监听编码添加到请求数据中,以和服务器的监听编码保持一致
data = data || {};
data.listeningCode = cometCallbackData.ListeningCode;
//解锁后继续监听
comet.pollingLocks[lockName] = false;
comet.polling(listeningCode, url, data, lockName, callback);
}, function (jqXHR, textStatus, errorThrown) {
//如果发生错误,则重试,并且逐步加大重试时间,以减低服务器压力,以100毫秒开始,每次加倍
comet.pollingRetries[lockName] = comet.pollingRetries[lockName] * 2 || 100;
//将回调函数暂存
chatConnectionProvider.connections.comet.pollingCallbacks[lockName] = callback;
var rePollingMethors = 'chatConnectionProvider.connections.comet.pollingLocks["' + lockName + '"] = false;'//先解锁,在解锁之前排他,不允许重复轮询
+ 'chatConnectionProvider.connections.comet.polling("' + listeningCode + '", "' + url + '", "' + data + '", "' + lockName + '", chatConnectionProvider.connections.comet.pollingCallbacks["' + lockName + '"]);';
setTimeout(rePollingMethors, comet.pollingRetries[lockName]);
});
}
},
JS部分代码
.NET MVC中的异步
一开始我花了比较长时间寻找服务端Hold住请求的方法。
普通情况下,一个Web的请求是同步执行的,如果需要转成异步的话,需要对线程进行操作。比如一开始我最白痴的想法是用自旋锁,或者用Thread相关的方法,然后在需要的时候采用一些Interup方法进行中断等等,都不容易写。
后来发现MVC中提供了比较合理的一种原生的异步页面方式,可以简单地实现同步转异步。
首先是Controller要由默认的Controller改为继承自AsyncController。该基类有一个私有成员AsyncManager,利用该对象可以简单地将同步转换成异步。
而原本有的方法,要拆分成两个方法来写,分别在两个方法用原名加上Async和Completed。
比如我的ListenController,里面有一个User方法,用以监听用户的数据。经过实现之后,就变成了ListenController : AsyncController,同时拥有一对User方法:UserAsync和UserCompleted。
那么,在页面请求Listen/User的时候,就会自动调用名称匹配的UserAsync方法。
在这之后,我们就需要利用AsyncManager执行以下语句,将线程“挂起”(Hold住,这样懂了吧):
asyncManager.OutstandingOperations.Increment();
直到我们有消息需要发送给用户的时候,通过以下方式对UserCompleted进行传参:
asyncManager.Parameters["listeningCode"] = Code;
然后再触发UserCompleted:
asyncManager.OutstandingOperations.Decrement();
再整体地看一次,ListenController就是长这个样子的:
public class ListenController : AsyncController
{
//
// GET: /Listen/ ICometManager cometManager; public ListenController()
{
cometManager = StructureMap.ObjectFactory.GetInstance<ICometManager>();
} /// <summary>
/// 监听用户的信息
/// </summary>
/// <param name="listeningCode">监听编码,如果为空则视为一次全新的监听,允许同以客户端开启多个网页进行多个监听</param>
public void UserAsync(int? listeningCode)
{
//开始监听用户
cometManager.ListenUser(listeningCode, AsyncManager);
} /// <summary>
/// 返回用户的信息
/// </summary>
/// <param name="listeningCode">监听编码</param>
/// <returns></returns>
public JsonpResult UserCompleted(int listeningCode)
{
//获取用户所有的消息
var callbacks = cometManager.TakeAllUserCallbacks(listeningCode); //将该消息返回
return Json(new
{
ListeningCode = listeningCode,
Callbacks = callbacks.Select(item => new CallbackModel(item))
})
.ToJsonp();
}
}
ListenController
CometManager就是我用来处理轮询的对象。
注意到在UserCompleted是通过了一个ICometManager.TakeAllUserCallbacks来获取用户的所有回调数据,而不是直接通过AsyncManager.Parameters发送。原因是实现过程中我发现无法通过AsyncManager.Parameters将自定义对象传参,所以采取了这种方式。或许,实现序列化后或者引用相关序列化方法,能实现如此传参。
在CometManager : ICometManager中,相关实现如此:
/// <summary>
/// 监听用户的方法
/// </summary>
/// <param name="listeningCode">指定监听编码,如果为空则为全新的监听</param>
/// <param name="asyncManager">监听来源页面的AsyncManager,用以处理异步与回调</param>
public void ListenUser(int? listeningCode, System.Web.Mvc.Async.AsyncManager asyncManager)
{
//监听新消息
userListenerQuery.Add(chatUserProvider.Current.Id, listeningCode, userListenManager, asyncManager);
} /// <summary>
/// 取走用户所有回调结果
/// </summary>
/// <param name="listeningCode">监听者的Id</param>
/// <returns></returns>
public IEnumerable<CallbackModel> TakeAllUserCallbacks(int listeningCode)
{
return userListenerQuery.TakeAllCallback(listeningCode);
}
CometManager节选
userListenerQuery是一个单例(Singleton)的监听队列;而UserListenManager是往上一层的监听管理对象,毕竟Chat本身不单止支持轮询,还需要支持其他通信方式,所以往上有一个公共层管理着所有消息。
.NET中的异步
除了MVC本身提供的特有方法外,还需要一些传统的行为才能实现完整的长轮询。
接着上面,参照ListenQuery的实现:
Dictionary<int, CometListener> listenersDic;
Dictionary<int, DateTime> lastAddTimeDic; public ListenerQuery()
{
listenersDic = new Dictionary<int, CometListener>();
lastAddTimeDic = new Dictionary<int, DateTime>();
} /// <summary>
/// 添加一个监听
/// </summary>
/// <param name="listenToId">监听对象的Id</param>
/// <param name="listeningCode">原有监听者的编码</param>
/// <param name="listenManager">监听的相关业务管理对象</param>
/// <param name="asyncManager">页面的异步管理对象</param>
/// <returns>监听编码</returns>
public int Add(int listenToId, int? listeningCode, IListenManager<int> listenManager, AsyncManager asyncManager)
{
lock (listenersDic)
{
lock (lastAddTimeDic)
{
CometListener listener;
//如果监听者不存在,则生成,否则用原有的监听者
if (listeningCode == null || !listenersDic.ContainsKey(listeningCode.Value))
{
////生成其随机编码
//var seed = 10000;
//var random = new Random(seed);
//listeningCode = random.Next(seed);
//while (listenersDic.ContainsKey(listeningCode.Value))
//{
// listeningCode = random.Next(seed);
//}
//改为采用原有编码 //生成监听者并开始监听
Action<int> setListenerCode;
listener = new CometListener(out setListenerCode);
listenManager.ListenAsnyc(listenToId, listener, setListenerCode); listeningCode = listener.Code;
//添加入本列表字典
listenersDic.Add(listeningCode.Value, listener);
//添加监听时间
lastAddTimeDic.Add(listeningCode.Value, DateTime.Now);
}
else
{
listener = listenersDic[listeningCode.Value];
lastAddTimeDic[listeningCode.Value] = DateTime.Now;
} //开始监听
listener.Begin(asyncManager); //定时一次检查,如果监听超时,则清除监听
//设计倒计时,定期重新监听,以免超时
var timeLimitInMilliSecond = ;
System.Timers.Timer timer = new System.Timers.Timer(timeLimitInMilliSecond); //设置计时终结方法
timer.Elapsed += (sender, e) =>
{
if (lastAddTimeDic[listeningCode.Value].AddSeconds() < DateTime.Now)
{
listenManager.StopListenAsnyc(listener);
}
}; //启动倒计时
timer.Start();
}
} return listeningCode.Value;
} /// <summary>
/// 取走所有回调结果
/// </summary>
/// <param name="listeningCode">监听者的Id</param>
/// <returns></returns>
public IEnumerable<CallbackModel> TakeAllCallback(int listeningCode)
{
return listenersDic[listeningCode].ShiftAllCallbacks();
}
}
ListenerQuery
这里用了一个字典来记录每个ListeningCode以及相关的Listener。
注意Add方法内有一个Timer。就像注释上所说的,定期检查用户是否在监听。我在这里设置了每30秒有一次“心跳”(Beat),而每次监听后的第60秒会来检查45秒内(暂时这么设置的,有待时间考验是不是个合适值)用户是否再来监听,如果没有则停止监听。
这么做的原因是防止客户端单方面离婚毁约,然后服务端的Comet傻傻地在这里痴情地帮客户端继续保留缓存消息。这种情况时有出现,比如客户端还没等到答复(Response)就私奔关掉了页面,留下服务单在那边Hold住连接傻傻地等待。
注意凡是处理队列类的地方都有锁,以防止并发问题。
那么最后,CometListener的实现就如下:
public class CometListener : Listen.IListener
{
AsyncManager asyncManager; List<CallbackModel> callbacks; /// <summary>
/// 构造函数
/// </summary>
public CometListener(out Action<int> setListenerCode)
{
setListenerCode = setCode; callbacks = new List<CallbackModel>();
} internal void setCode(int code)
{
this.Code = code;
} /// <summary>
/// 开始监听的方法
/// </summary>
/// <param name="asyncManager">页面的异步处理对象</param>
public void Begin(AsyncManager asyncManager)
{
//先把原有数据返回
Return(); lock (asyncManager)
{
this.asyncManager = asyncManager;
lock (this.asyncManager)
{
//启动异步
asyncManager.OutstandingOperations.Increment(); //设计倒计时,定期断开监听,以免网关超时
var timeLimitInMilliSecond = ;
System.Timers.Timer timer = new System.Timers.Timer(timeLimitInMilliSecond); //设置计时终结方法
timer.Elapsed += (sender, e) =>
{
if (this.asyncManager == asyncManager)
{
Return();
}
}; //启动倒计时
timer.Start();
}
}
} /// <summary>
/// 将现有的值返回给客户端
/// </summary>
public void Return()
{
if (asyncManager != null)
{
lock (asyncManager)
{
//返回最新值
asyncManager.Parameters["listeningCode"] = Code; //返回最新值
asyncManager.OutstandingOperations.Decrement(); //清空当前页面异步对象,以等待下一个轮询请求
asyncManager = null;
}
}
} /// <summary>
/// 拿走并清除callbacks
/// </summary>
public IEnumerable<CallbackModel> ShiftAllCallbacks()
{
lock (callbacks)
{
var result = callbacks.ToList();
callbacks.Clear();
return result;
}
} #region IListener members /// <summary>
/// 唯一的监听编码,用以隔开并区分监听
/// </summary>
public int Code
{
get;
private set;
} /// <summary>
/// 回调方法,通过该方法将新的数据发送回给监听者
/// </summary>
/// <param name="typeCode">数据的类型</param>
/// <param name="data">数据内容</param>
/// <returns></returns>
public async Task CallAsync(int typeCode, object args)
{
lock (callbacks)
{
callbacks.Add(new CallbackModel(typeCode, args));
}
Return();
} #endregion
}
CometListener
总结
两周前单次通信的往返大约在200ms~300ms之间,这次重构后,将Chat内核中大量同步行为改成了异步并发,已经将单次通信往返压缩在了30ms~50ms之间。当然最希望是能压缩在10ms~20ms,那样就可以用长轮询进行高同步性的游戏应用了,比如射击、即时战略。但是,到时候就没那么简单了吧,毕竟心跳(Beat)的时候是会有两次往返,也就是必须将单次往返压缩在10ms以内才有可能实现,页面的数据支撑也是个问题,需要大量套用字页面来存放数据,Balabalabalabala.......
和JSONP一样,长轮询是一个畸形的技术,也更加是开发人员在备受显示情况限制下智慧的结晶。当然,从通信上来讲,它不是一项“优秀”的技术或者协议,它浪费了太多“不必要”的资源在不必要的事情上了。就像期待IE6今早从市场上消失一样,我也期待大家普遍早日统一用上诸如Web Socket一般更好的通信技术。但现时来说,我们不得不以类似于长轮训、Hack的一些方式向底端的用户妥协,毕竟用户才是产品的最终使用者。
最后,再次感谢各位当时在Chat贡献的测试数据,特别感谢诸位在上面约架(pao)、求关(zhong)注(子)和发#ffd800网地址的几位同胞。Azure账号已经到期,所以已经上不去了。大家对数据感兴趣吗?(呵呵呵呵呵呵呵呵呵呵~)