标题 Spring MVC 基于阻塞队列 LinkedBlockingQueue 的同步长轮询功能实现,其实本文介绍的也是生产者消费者的一种实现。生产者不必是一个始终在执行的线程,它可以是一个接口,接受客户端的请求,向队列中插入消息;消费者也不必是一个始终在执行的线程,它同样也可以是一个接口,接受客户端的请求,从队列中取出属于自己的消息;看到很多介绍生产者消息者实现的文章,实现场景都很简单,现实应用往往会比较复杂,有一些附加条件,本例中就需要根据消息中的 familyId 来判断消息是不是下发给自己的。
应用场景
本例的应用场景是一个物联网智能家居应用,系统结构图如下:
主要实现的功能是用户通过手机端APP发出设备控制的命令(如:开灯、关灯等)后,设备网关能够实时的获取到控制命令,进而控制设置的状态。
为什么要使用长轮询功能呢?
其实可选的方案有很多种:
1、长轮询
2、长链接
3、Socket
4、WebSocket
5、MQTT
选择长轮询方案是因为其实现的简单性,实现起来与其它的接口基本没有太大的差别。
同步与异步处理模式分析
同步服务模式:
同步服务为每个请求创建单一线程,由此线程完成整个请求的处理:接收消息,处理消息,返回数据;这种情况下服务器资源对所有入栈请求开放,服务器资源被所有入栈请求竞争使用,如果入栈请求过多就会导致服务器资源耗尽宕机,或者导致竞争加剧,资源调度频繁,服务器资源利用效率降低。
异步服务则可以分别设置两个线程队列,一个专门负责接收消息,另一个专门负责处理消息并返回数据,另有一些值守线程负责任务派发和超时监控等工作。在这种情况下无论入栈请求有多少,服务器始终依照自己的能力处理请求,服务器资源消耗始终在一个可控的范围。这种模式的一个问题就是这两个线程队列的大小如何根据机器负载情况动态调整。
异步服务模式:
这种情况下,虽然入栈请求以消息队列的方式被异步处理但每个请求内部却是采用阻塞的方式访问外部资源,如果外部资源访问速度过慢,可能导致请求处理队列中的所有线程均处于阻塞状态,此时CPU使用率虽然很低但是却因为队列中线程已满而无法处理消息队列中的新消息,此时若能调整线程队列最大线程数将可提高CPU利用率。但另一个问题是如果线程数被调高之后所有线程的IO处理突然结束并且接下来每个线程都将进行大量计算的话那么CPU可能出现过载。
在系统运行的每个时间点上,当时正在进行IO的线程数量和正在进行计算的线程数量是不断变化着的,那么如何才能设计出一个可以根据系统当时情况自动适应负载变化的高度自适应的系统呢?
在这方面采用反应式计算模型确实能设计出适应负载能力很强的系统,系统利用率和吞吐量可以大幅提高,但这种系统仍然可能会出现系统局部负载过高的风险。
采用反应式计算模型,不仅系统中的入栈请求以消息队列的方式得以异步化,而且系统中所有的IO任务也必需依照此法行之,这些IO任务的处理需要采用异步模型(如NIO)。另外要考虑的就是如何划分异步IO消息并为其配置线程队列了,比如是要将所有IO任务放入统一的队列还是为某类IO任务设置单独的队列。
服务器资源虽然由系统分配但大多以线程为持有者被线程持有并使用,如线程堆栈,被线程持有的各类锁等资源。
实现步骤
1、定义消息队列
我这里是定义的静态常量,你找个类把它放进去就可以了。
/** * 存储客户端(用户)提交的设置控制命令 */ public final static BlockingQueue<Equipment> EQUIPMENT_CONTROL = new LinkedBlockingQueue<Equipment>();Equipment 是一个消息实体类,在本例中它最关键的属性是 familyId,因为要根据它来判断消息是下发给哪个家庭的,你发了一个关灯的命令结果我家的灯灭了这肯定是不行的。
2、实现生产者
生产者不必是一个始终在执行的线程,它可以是一个接口,接受客户端的请求;
/**
* 保存或更新设备接口
**/
@RequestMapping("/save.do")
@ResponseBody
public void save(HttpServletRequest request, HttpServletResponse response) throws Exception {
Personal personal = SecurityUtils.getPersonal(request);
Long personalId = personal.getId();
if (personalId == null) {
outFailureJson(response, BaseCodeMessage.personal_10001);
return;
}
//做你要做的事情
//向队列中插入消息
ConstantDict.EQUIPMENT_CONTROL.put(entity);
//输出响应内容
this.outResultJson(response, "success", "Equipment", entity);
}
这里你要做的最关键的是:向队列中插入消息。
Personal 是一个用户信息实体类,通过 SecurityUtils.getPersonal(request); 方法根据 Session 或 Cookie 来从缓存或数据库中获取当前登录用户信息。
3、实现消费者
此接口由网关调用。
这里要做的循环从队列中取数据,然后根据 familyId 判断消息是不是属于自己的,是就退出循环,不是就把刚刚取出的消息再放回去。
/** * 控制设置状态接口_供网关调用 这是一个提供长轮询的方法,网关通过长轮询来即时获得命令信息 * * @author lipw * @date 2017年8月30日下午3:31:59 * @param request * @param response * @throws Exception */ @RequestMapping("/ctrlgw.do") @ResponseBody public void controlgw(HttpServletRequest request, HttpServletResponse response) throws Exception { Personal personal = SecurityUtils.getPersonal(request); Long personalId = personal.getId(); if (personalId == null) { outFailureJson(response, BaseCodeMessage.personal_10001); return; } Long familyId = personal.getFamilyId(); if (familyId == null) { outFailureJson(response, "2", "尚未分配家庭编号!"); return; } Equipment equipment = null; while (true) { equipment = ConstantDict.EQUIPMENT_CONTROL.poll(5000, TimeUnit.MILLISECONDS); if (equipment != null) { if (familyId.equals(equipment.getFamilyId())) { System.out.println("从队列取走一个元素,队列剩余" + ConstantDict.EQUIPMENT_CONTROL.size() + "个元素"); break; } else { // 不属于自己,再放回队列 ConstantDict.EQUIPMENT_CONTROL.put(equipment); } } Thread.sleep(100); } this.outResultJson(response, "success", "equipment", equipment); }为什么要这样设计呢?
因为如果只是通过 peek 方法来获取,而不从 队列 中移除,如果队列头部的消息不是属于自己的,那就要一直循环下去却得不到属于自己的那一条消息。
使用 AJAX 模拟网关进行测试
<div id="divCommand" style="width:98%; min-height:100px; border:1px solid #888;"></div> <script src="${ctx}/static/script/jquery-1.10.2.min.js"></script> <script type="text/javascript"> var successCount = 0; function loadCommand(){ $.ajax({ url:"${ctx}/xxx/ctrlgw.do?token=xxx&t=" + Date.now(), type:"get", data:{}, dataType:"json", success:function(data) { if(data != null && data!=""){ successCount++; $("#divCommand").append(successCount + ", "); } loadCommand(); //成功后继续回调 },error:function(data){ if(data != null && data!=""){ $("#divCommand").append(data.statusText); } if (data.statusText == "timeout"){ loadCommand(); //超时回调 } } }); } $(document).ready(function(){ loadCommand(); }); </script>测试方法通过一个回调函数,不断的向服务器发出请求;
如果服务器队列中有属于自己的消息,会立即返回,没有就会一直等待真到超时,然后重新发起请求。
请求成功后会在 Div 中显示成功的次数,失败了也会显示失败的状态文本。
可以通过浏览器的开发者工具中的 Network 来查看每次请求所用的时间:
结束语
本例的实现方式是同步的,队列没有设置大小,生产者被阻塞的可能性很小,除非所有网关都与平台断开了连接不再处理消息;但消费者的实现由于是同步的,会对服务器的性能有所影响,因为每个消费者请求会占用一个 Servlet 线程导致无法再去处理其它用户请求。那么这个问题有没有解决方案呢?当然有!那就是采用异步处理模式 DeferredResult 。
相关阅读
队列
阻塞队列
同步与异步处理模式分析详解(图)
长轮询与长链接