我们先来看看在Sun OpenMQ系统中 一个持久、可靠的方式传送消息的步骤是怎么样的,如图所示:
在传送过程中,系统处理JMS消息分为以下两类: ■ 有效负荷消息,由生成方发送给使用方的消息。 ■ 控制消息,代理与客户端运行时环境之间传送的私有消息,用于确保有效负荷消息成功传送和控制跨连接的消息流。 详细流程如下: 消息生成 1. 客户端运行时环境通过连接将消息从消息生成方传送到代理。 消息处理和路由 2. 代理从连接中读取消息并将此消息放入相应的目的地中。 3. 代理将(持久性)消息放入数据存储库中。 4. 代理向消息生成方的客户端运行时环境确认已收到消息。 5. 代理确定消息的路由。 6. 代理将消息从目的地写入适当的连接,并使用使用方的唯一标识符标记该消息。 消息使用 7. 消息使用方的客户端运行时环境将消息从连接传送到消息使用方。 8. 消息使用方的客户端运行时环境向代理确认消息已使用。 消息生命周期结束 9. 代理处理客户端确认,并在收到所有确认后删除(持久性)消息。 10. 代理向使用方的客户端运行时环境确认,告知客户端确认已得到处理。 如果管理员删除目的地中的消息,或者管理员删除或重新定义长期订阅,导致主题目的地中的消息未被传送即被删除,则代理可以在消息被使用前将它丢弃。在其他情况下,您可能希望代理将消息存储在称为停用消息队列的特殊目的地中,而不是将它们丢弃。在以下情况,消息会被放入停用消息队列中:消息过期时、消息因内存限制而被删除时,以及因客户端引发异常而导致传送失败时。通过将消息存储在停用消息队列中,您可以解决系统问题并在某些情况下恢复消息。
以下是针对JMS应用中的一些优化策略,H.E.在这里分为几点向大家进行介绍:
1.收发消息的属性 在接收端和发送端可以设置消息发送和接收的属性,对于消息发送时还需要注意客户端的消息确认模式一共有3种客户机确认模式: ■ 在AUTO_ACKNOWLEDGE 模式下开销最大,可以保证消息逐条传送的可靠性,会话自动确认客户端使用的每条消息。会话线程会被阻止,以等待代理确认它已处理了每个已使用消息的客户端确认;
■ 在CLIENT_ACKNOWLEDGE 模式下,在一条或多条消息被使用后,客户端通过调用消息对象的acknowledge() 方法来显式确认。这样该会话就确认了自上次调用该方法后使用的所有消息。会话线程会被阻止,以等待代理确认它已处理了客户端确认。Message Queue 提供使客户端可仅仅确认收到一条消息的方法,从而扩展了该模式。因此需要的带宽开销较小;
■ 在DUPS_OK_ACKNOWLEDGE 模式下,会话在使用了十条消息后进行确认。会话线程不会因等待代理确认而被阻止,因为在该模式下代理确认不是必需的。虽然该模式可保证不会丢失消息,但并不能保证不会收到重复的消息,因此名称为:DUPS_OK。
对于更关心性能而不是可靠性的客户端,Message Queue 服务通过提供NO_ACKNOWLEDGE模式来扩展JMS API。在该模式下,代理不跟踪客户端确认,所以不保证使用方客户端已成功处理了消息。对于发送至非长期订户的非持久性消息,选择该模式可提高性能。
如果你有大量的消息需要进行发送,可以采用DUPS_OK_ACKNOWLEDGE模式,因为他是最快的。
代码示例 connection = connectionFactory.createTopicConnection(); session=connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
2.慎用消息压缩 消息的大小对消息的效率是有影响的,跟Http/Gzip的道理一样,减少网络的负载,但是并不能提升你的运行效率,反而会将你的接收响应时间延时。以下是消息压缩的代码示例:
1.发送压缩消息的代码示例: for (int i=0;i<10000;i++){ //循环1W次发送消息 Javabloger_Msg msg=new Javabloger_Msg (); // 自定义的对象,进行实例化 msg.setTime( System.currentTimeMillis() ); //赋值 msg.setChat("This is JavablogerMsg Test msg.复制300次"); //赋值,此处省略1000个字符 myTextMsg.setObject( msg ); //放入消息对象 myTextMsg.setBooleanProperty("JMS_SUN_COMPRESS", true); //设置是否进行压缩属性 myMsgProducer.send(myTextMsg); // 发送消息 }
2.接收端会自己进行解压缩,无需人工干预,但是可以查看出消息经过压缩前后的大小: // 获得压缩前的消息大小。 int uncompressed=bytesMessage.getIntProperty(“JMS_SUN_UNCOMPRESSED_SIZE”); //获得压缩后的消息大小。 int compressed=bytesMessage.getIntProperty(“JMS_SUN_COMPRESSED_SIZE”);
经过测试1000个字符 压缩以后只有95个字符位的大小。 1000个字符的消息发送1W个没有经过压缩, 收发在300毫秒以内完成, 1000个字符的消息发送1W个经过压缩后, 收发在1600毫秒以内完成,也就是1秒多以内完成。
3.取消对消息收/发的验证 可以查阅我写的另外一篇文章 http://www.javabloger.com/article/sun-glassfish-openmq-topic.html在文章中我提到,如果在发送端或者在接收端加上用户和密码的验证会大大降低系统运行效率的,所以请慎用Sun OpenMQ中的身份验证功能。
4.对服务器连接的开关 客户端对JMS服务器的开关连接对服务器的运行性能有很大的影响,所以: 1我们可以采用容器提供的连接池,让容器来完成我们的连接资源管理。 2如果使用非容器下的运行状态,那么每次发送完毕一个消息后下面还有消息的话,关闭session就可以了,无需关闭对JMS服务器的连接。
5.调整JVM虚拟机运行参数 100个线程同时发送100个消息,一共10w个消息,运行到一半的时候出现 [29/Mar/2010:03:34:07 EDT] [B1089]: In low memory condition, Broker is attempting to free up resources [29/Mar/2010:03:34:07 EDT] [B1088]: Entering Memory State YELLOW from previous state GREEN – allocated memory is 151214K, 80% of total memory used 错误,显然在说内存不够。经过google以后真实我的想法是正确的,参考文档: http://docs.sun.com/app/docs/doc/819-4467/6n6k98bq2?l=zh_tw&a=view#aeokn http://docs.sun.com/app/docs/doc/819-4467/6n6k98bqa?l=zh_tw&a=view 加大运行内存,将JMS服务器的内存使用率调整为1G,效果非常明显,命令如下: nohup imqbrokerd -tty -name myBroker -port 6769 -cluster 172.16.2.214:6769,172.16.2.215:6769 -D"imq.cluster.masterbroker=172.16.2.215:6769" -vmargs "-Xms256m -Xmx1024m" &
6.慎用数据库存储消息 如果对于消息体内容较大,频率稍低的可以采用将消息体放入数据库进行存储。如果收发频率较高,并且消息体不算很大可以使用默认状态在内存中使用。也就是说根据不同的应用场景进行应用,千万不要认为将消息体放入数据库中存储是最好的方案,那样只能提高消息的稳定性和消息的安全性。
7.开启多个队列分载 来看一个测试案例: TopicA、B 2个队列在同一台机器上 并发线程数 100X2=200个线程,每个线程发1000个消息,2个接收端平均每个接收端40秒以内处理完成。
TopicA、B 2个队列在同一台机器上 并发线程数 100X2=200个线程,每个线程发 500个消息,2个接收端平均每个接收端10秒以内处理完成。
TopicA、B、C、D 4个队列在同一台机器上 并发线程数 50X4=200个线程,每个线程发 250个消息,4个接收端平均每个接收端5秒以内处理完成。
经过以上测试可以表明,10W条消息进行散列以后效率明显提高了,有人会问为什么需要进行散列,因为在消息接收端 onMessage(Message msg)是单线程,对于Topic类型的消息,如果启动多线程也就是启动了多个onMessage触发,并不能提高运行效率。
8.队列的属性
无论是Topic还是quene类型的消息队列,默认状态都设置了上限的状态,如果消息量大的话将会出现这样的信息: com.sun.messaging.jmq.jmsserver.util.BrokerException: [B4120]: Can not store message 18727-172.16.2.19(8c:56:3e:4f:ba:71)-4100-1269861102326 on destination TopicCase [Topic]. The destination message count limit (maxNumMsgs) of 100000 has been reached. 所以需要注意将队列中的状态,将队列的状态修改为没有限制的 Unlimited 选项。