ActiveMQ NMS使用过程中的一点经验

  最近,项目中使用到了ActiveMQ获取第三方推送过来的数据。具体背景是:公司需要监控全国各地车辆实时运行的GPS数据,但监控本身不是公司做的,而是交给第三方公司做,第三方采集GPS数据后推送给我们。全国各地,近万台车辆,每台车辆每隔几秒就发送一次GPS位置数据,如果我们提供API给第三方公司去调用,显然无论是第三方还是我们这边,服务器都是是扛不住的,这么做也是不合理的,于是,便采取了消息队列,第三方采集到的数据直接推送到消息队列代理服务器,而己方从消息队列服务器取数据处理。以下对项目实践及其中遇到的一些问题及解决进行概要总结。

1、ActiveMQ NMS简介

关于NMS,这里主要谈两点。

NMS API:ActiveMQ定义的一套API接口规范,你可以理解为一个API的接口,它指明了生产者或消费者如何与消息队列服务器通信。

NMS Providers:NMS API的具体实现,基于Windows或ActiveMQ下的各种协议,提供了各种实现,目前提供了ActiveMQ、STOMP、MSMQ、EMS、WCF、AMQP、MQTT、XMS几种实现。具体项目中,我采取的是ActiveMQ实现。

至于消息队列涉及到的其他概念,什么Broker、Queue、Topic、Producer、Consumer,这里不做介绍,各位可以自己查资料,这些 概念本身也不难理解的。

2、关于异常下的Broker重连

  这个异常,可能是由于网络异常,也可能是长时间没有通信,Broker把Client给断掉了,不去管它。起初,这个项目是从一位离职员工的手头接过来的,给的说法是只需要维护就够了,基本上不用调整。当时虽然说是做了重连,后来发现,就跟没做一样。发现这个,是起源于第三方频繁通知,MQ队列有积压,通知我们尽快处理。项目拿到手一看,我勒个去,直接起了一个Timer在那儿定时监控Connection状态,如果状态不对立刻重新打开连接。先不说Socket连接的浪费情况,及Timer这个.NET中近乎Bug的一个东西,这种做法实际中行之无效,因为连接异常情况下再打开,往往是打开失败的,比如上次异常连接没有关闭,状态不对,或者ClientID暂时没被Broker释放等。

  于是,针对重连,开始做第一次优化。查了IConnection元数据,发现有个ExceptionListener可用,于是便想到利用这个事件来监听并重连。改完上线,可第二天一大早过来,发现MQ又挤压了,重连时效了,打开日志看到,记录了ExceptionListener事件日志,但重连没有成功,具体原因,我想可能和优化前是一样的吧。这折腾前后完全没区别。这时候,我想,不能在现有做法里边去整了,必须回到NMS本身去整,堂堂Apache开源项目,一定有更好的重连机制,放着不用自己整,是不是傻。。。

  于是,打开官网,如愿以偿,找到了failover这个东西:

ActiveMQ NMS使用过程中的一点经验

根据描述,链接异常时,随机从配置的Broker列表总选取一个进行重连。这是个好东西,于是,Broker的链接配置,由tcp://183.56.131.224:61616调整为failover:(tcp://183.56.131.224:61616)。这里没有多个Broker,只有一个,所以配置一个也是没有问题的,我们重点是利用failover。与此同时,OpenWire上发现了maxInactivityDuration这个配置项,官网描述如下:

ActiveMQ NMS使用过程中的一点经验

这个也不错,配置Connection闲置多久被Broker断掉。我这里比较狠,反正Broker那个队列出了第三方往里边推,就我这儿一个人在消费,直接配置0算了,永不被杀,出问题了重连,岂不爽哉。于是,Broker进一步调整为:failover:(tcp://183.56.131.224:61616)?wireFormat.maxInactivityDuration=0。此机制我也自己写Demo验证过,无论是Broker突然停掉再开启,还是Producer停掉再开启,Consumer均能成功重连的。至此,MQ的可靠重连问题算是解决了。

3、进程重启导致Consumer链接失败

  具体情境是这样的:MQ消费者进程是寄宿在Windows服务中的,运维那边做测试或维护,会在MQ运行正常的情况下直接重启服务,有时候会重启失败,过阵子启动,又成功了。我过去,打开Windows事件日志,说是ClientID被占用,也就是说瞬间重启时候,Broker端暂时没来得及断开或者释放该ClientID对应的Connection,而我们系统中ClientID是配置死的。我又验证了下,正常运行下,先关闭服务,过几秒再开启,就没这问题,也印证了自己的推断。问题是找到了,但总不能告诉运维,每次先停止服务,再打开,不能用重启吧,哪个开发要是这样跟我说,那他妈也太不靠谱了。

  解决方案就是,ClientID动态生成,每次启动都不一样,这个ClientID仅仅是Broker用来标识一个连接端的,随便什么都无所谓,只要跟上次不一样。项目中取的是当前时分秒字符串。如下:

_connection.ClientId = DateTime.Now.ToString("yyyyMMddHHmmss");

调整完上线,再试验,那问题再无复现。

4、服务启动时间过长的问题

  随着各种奇葩情况继续出现,我这里继续被操。具体场景是:鉴于是跟第三方合作,各种第三方服务器宕机,各种网络不靠谱,你懂的。如果是消费者进程已经启动成功了,那第三方或者网络不靠谱了,我们利用2中的重连机制就已经可以了,无非就是等他们靠谱了我们自动重连上就是了。可问题是,如果第三方不靠谱,或者网络不靠谱时,我们在启动消费者Windows服务,那会出现什么情况呢?给大家实际演示下:

目前,我我的服务安装后,是这样的:

ActiveMQ NMS使用过程中的一点经验

假设正常链接配置是这样的:

failover:(tcp://183.56.131.224:61616)?wireFormat.maxInactivityDuration=0

为了模拟外界异常或不可达的情况,我手动设置为如下:

failover:(tcp://183.56.131.200:61616)?wireFormat.maxInactivityDuration=0

大家注意,那个Broker地址是不可达的。

开启服务,其结果如下:

ActiveMQ NMS使用过程中的一点经验

这个启动界面,你就等着吧,等个两三分钟,结果如下:

ActiveMQ NMS使用过程中的一点经验

更要命的是,点击确定后,服务启动结果如下:

ActiveMQ NMS使用过程中的一点经验

这就比较操蛋了,你启动失败就失败把,别给我整成启动状态啊,这不误导人么。

  一般,这种情况,就属于启动进程一直卡主,当服务启动超时时,就会出现这种情况, 启动强行被Windows终止,但那个标记为启动状态这个就不好理解,也比较坑了。这是必须要处理的事情,否则极易造成误导。对于Windows服务本身启动机制,你是没办法做任何事情的,那只能从MQ链接机制去干事情。最终, 经过查询官网文档, 再次如愿以偿,找到了以下两个配置项:

ActiveMQ NMS使用过程中的一点经验

ActiveMQ NMS使用过程中的一点经验

这两个配置项分别代表,启动时最大重连尝试次数,默认值0,代表无限重连,我们出问题就出现在这里,链接不上时无限重试,无限重试无限连接不上,无限链接不上再无限重试。。。然后,进程阻塞,阻塞到一定时间,Windows服务重启失败。这个我也在Connection open时候打断点调试过,确实阻塞了。那么第二个配置项代表一项操作超时时间。问题找到了,那么自然也就有解决方案了,现把链接配置为如下:

failover:(tcp://183.56.131.200:61616)?wireFormat.maxInactivityDuration=0&transport.startupMaxReconnectAttempts=3&transport.timeout=3000

这里有两点要注意:

1)原本,这里配置应该是failover:(tcp://183.56.131.224:61616)?wireFormat.maxInactivityDuration=0&transport.startupMaxReconnectAttempts=3&transport.timeout=3000,但在配置文件中, &符号是不支持的,必须转义或替换,这里采取了实体替换,具体的是&这个鬼实体符;

2)NMS.ActiveMQ v1.4.0以上版本和以前以及其他语言实现版本不大相同,1.4以上版本配置这两项参数时必须有transport前缀。这里当时也是吃过亏的。

配置调整完毕后,我们再用 这个无效地址启动服务,在经过60S以内的启动时间,画风变成了这样:

ActiveMQ NMS使用过程中的一点经验

点击确定:

ActiveMQ NMS使用过程中的一点经验

这个时间,和transport.timeout、transport.initialReconnectDelay、transport.startupMaxReconnectAttempts等几项配置有关。但起码时间不会像之前那样很久,并且最终Windows服务状态显示为启动了。

5、总结

  鉴于这是公司实际运作项目,就不上传代码了,如果是自己的Demo,一定毫不保留,望各位见谅。实际上,也没什么特别的,大家平时遇到这种难缠的问题,多查官网文档,官网文档搞不定,再查源码,配合动手实践,一般都不会是问题的。幸运的是,虽然很多官网文档都是英文,但绝大部分都通俗易懂,我们看上去,也都不费事儿的。

附:ActiveMQ生产者及消费者示例代码

生产者:

/// <summary>
/// 生产者启动器
/// </summary>
public class ProducerBootstrap
{
#region Private Fields private readonly IConnectionFactory _connectionFactory = null;
private IConnection _connection = null;
private IMessageProducer _producer = null; #endregion #region Constructors public ProducerBootstrap()
{
_connectionFactory = new ConnectionFactory("tcp://localhost:61616");
} #endregion #region Public Methods public void Start()
{
_connection = _connectionFactory.CreateConnection();
_connection.ExceptionListener += _connection_ExceptionListener;
_connection.Start();
ISession sesison = _connection.CreateSession();
_producer = sesison.CreateProducer(new ActiveMQQueue("guokun"));
} public void Stop()
{
_connection.Stop();
_connection.Close();
_connection.Dispose();
} public void SendMessage()
{
while (true)
{
ITextMessage message = _producer.CreateTextMessage();
message.Text = string.Format("数据:{0}", DateTime.Now);
_producer.Send(message);
Thread.Sleep();
Console.WriteLine(message.Text);
}
} #endregion #region Private Methods private void _connection_ExceptionListener(Exception exception)
{
Console.WriteLine("生产者发生异常:{0}", exception);
} #endregion
}

消费者:

public class ConsumerBootstrap
{
#region Private Fields private readonly IConnectionFactory _connectionFactory = null;
private IConnection _connection = null;
private IMessageConsumer _consumer = null; #endregion #region Constructors public ConsumerBootstrap()
{
_connectionFactory = new ConnectionFactory("failover:(tcp://localhost:61616)?wireFormat.maxInactivityDuration=0&transport.timeout=3000&transport.startupMaxReconnectAttempts=2");
} #endregion #region Public Methods public void Start()
{
_connection = _connectionFactory.CreateConnection();
_connection.ClientId = "guokun";
_connection.ExceptionListener += _connection_ExceptionListener;
_connection.Start();
ISession session = _connection.CreateSession();
_consumer = session.CreateConsumer(new ActiveMQQueue("guokun"));
_consumer.Listener += _consumer_Listener; Console.WriteLine("消费者启动成功...");
} public void Stop()
{
_connection.Stop();
_connection.Close();
_connection.Dispose();
} #endregion #region Private Methods /// <summary>
/// 消息监听处理
/// </summary>
/// <param name="message"></param>
private void _consumer_Listener(IMessage message)
{
ITextMessage textMessage = message as ITextMessage;
Console.WriteLine("{0}-{1}", DateTime.Now, textMessage.Text);
} private void _connection_ExceptionListener(Exception exception)
{
Console.WriteLine("生产者发生异常:{0}", exception);
} #endregion
}
上一篇:Go语言专题


下一篇:JSP EL表达式 param、paramValues的使用