多消费者(多线程)对MNS的使用
背景
在阿里云MNS消费者的使用中,阿里云提供了使用 消息服务-最佳实践-长轮询
的代码和说明,在解决方案中阿里云这么说道
在开了上百个线程同时访问的情况下,如果队列里已经没有消息了,那么其实不需要上百个线程都同时挂LongPolling。只需要有1-N个线程挂LongPolling就足够了。挂LongPolling的线程在发现队列里有消息时,可以唤醒其他线程一起来取消息以达到快速响应的目的
Receiver内部做了LongPolling的排他机制,只要有一个线程在做LongPolling,那么其他线程只需要Wait就可以了。 —— [解决方案]
但是如何启动1-N个线程,同时产生多个消费者,并没有给出说明,阿里云官方提供的demo中是使用在main方法中启用:
CloudAccount account = new CloudAccount("ACCESS_ID", "ACCESS_KEY", "ENDPOINT");
sMNSClient = account.getMNSClient();
sMNSClient.getQueueRef("TestQueue").delete();
sMNSClient.getQueueRef("TestQueue").create();
Thread thread1 = new Thread(new Runnable() {
public void run() {
WorkerFunc(1);
}
});
Thread thread2 = new Thread(new Runnable() {
public void run() {
WorkerFunc(2);
}
});
Thread thread3 = new Thread(new Runnable() {
public void run() {
WorkerFunc(3);
}
});
这里我提供一种比较好的方法,可以利用spring IOC容器的依赖注入,来管理和启动多个消费者(多线程)。
方法展示
Spring会通过依赖注入的方式,来管理关联对象的生命周期,所以我们可以将消费者的产生管理,都由Spring IOC容器代劳,也就是说,我把消费者创建的控制权都交给Spring容器。方法如下
@Component
public class NormalProcessComponent {
private static Logger log = LoggerFactory.getLogger(NormalProcessComponent.class);
private static ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(30);
public NormalProcessComponent(){
for(int i = 0; i < 50; i++){
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
process();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
public void process() throws Exception {
//使用阿里云官方提供的方法
MessageReceiver receiver = new MessageReceiver(workerId, sMNSClient, "TestQueue");
while (true) {
Message message = receiver.receiveMessage();
try {
//取出Que中的信息
result = message .getMessageBodyAsString();
JSONObject params = JSONObject.parseObject(result);
if(params!=null){
//处理数据的方法
} else {
log.info("取出的数据为空!");
Thread.sleep(Constant.SLEEP_SECONDS);
}
} catch (Exception e) {
e.printStackTrace();
log.error("fail to sleep"+message);
break;
}
}
}
}
我们将消费者的产生方法,在类中的构造函数中定义,使用一个固定大小的线程池,来管理消费者(线程),同时加上Component注解,在项目启动时,Spring 的就会实例化这个类,注入到容器中,这个时候构造方法中的,多个消费者就会启动开始工作。
拓展
- 阿里云官方MessageReceiver的解析 :长连接轮询,以及死锁和线程安全性问题的避免
- 消费者的监控 :观察消费者的数量,避免消费者全部死亡,造成队列积压;
阿里云官方MessageReceiver的解析
在阿里云MNS消费者的使用中,阿里云提供了使用 消息服务-最佳实践-长轮询 ,官方已经提供了源代码和详细说明,我在这里就不贴代码了,主要说明其中的原理。
在MessageReceiver中,官方定义了一个
static final Map<String, Object> sLockObjMap
从而保证了,无论new出多少个MessageReceiver,都是从同一个Map,取出的lockObj。在使用lockObj中,均使用同步锁synchronized,从而实现了LongPolling的排他机制,只有一个线程在做LongPolling,其他线程都会Wait。避免了上百个线程同时访问MNS Server,一个Group只会产生,一条长连接进行长轮询。
可以将图中的Group,比作一台台服务器,而里面的多个Consumer,实际就是启动的多个消费线程。
消费者的监控
在上面代码中,使用了一个固定大小的线程池来管理多个线程(消费者),但是一旦子线程死亡,这个线程(消费者),并不会重启,这种情况就会产生队列积压。产生线程死亡一定是不正常,程序中的Bug存在。比如,有异常没有捕获到,或者在子线程中将异常throw出,就会使当前子线程死亡掉。这种情况一定是会有的,因为没有人写出的代码是完美无缺的,程序员只能尽可能避免bug的产生,所以我们需要用完善的日志和监控来完善,我们的项目。
这里我们可以利用监控线程池中的存活线程数量从而来,进行报警。
//当消费者低于一定阈值触发报警
if(threadPool.getActiveCount()<threshold){
//报警
}
可以将这个封装成一个API接口,通过监控这个API来进行报警。