一、前言
消息发送到Broker,消费者通过Destination可以订阅消费某个特定的通道内的消息。一些特殊情况下,需要消费者对消息过滤下再进行消费,也就是筛选出某些特定消息。ActiveMQ提供了SQL92表达式语法的自定义消息筛选功能。非常方便快捷的能够开发出具有消息筛选功能的应用。
ActiveMQ 支持:
- 数字表达式:
>
,>=
,<
,<=
,BETWEEN
,=
. - 字符表达式:
=
,<>
,IN
. -
IS NULL
或则IS NOT NULL
. - 逻辑
AND
, 逻辑OR
, 逻辑NOT
.
常数类型:
- 数字:3.1415926, 5。
- 字符: ‘a’,必须带有单引号。
-
NULL
,特别的常量。 - 布尔类型:
TRUE
,FALSE
二、程序案例
生产者:
package com.cfang.prebo.activemq.selector; import java.util.Scanner; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { public static void main(String[] args) { ConnectionFactory connectionFactory = null; Connection connection = null; Session session = null; Destination destination = null; MessageProducer producer = null; Message message = null; try { Scanner scanner = new Scanner(System.in); connectionFactory = new ActiveMQConnectionFactory("tcp://172.31.31.160:61618"); connection = connectionFactory.createConnection(null, null); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("TP_Q_TEST_SELECTOR00"); producer = session.createProducer(destination); while(true) { String line = scanner.nextLine(); if("exit".equals(line)) { break; } message = session.createTextMessage(line); message.setIntProperty("applicationName", line.length()); message.setStringProperty("result", "RT"); producer.send(message); } } catch (Exception e) { e.printStackTrace(); } finally { if(producer != null){ // 回收消息发送者 try { producer.close(); } catch (JMSException e) { e.printStackTrace(); } } if(session != null){ // 回收会话对象 try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if(connection != null){ // 回收连接对象 try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
如上,生产者还可以设置更多的条件,ActiveMQ也提供了全基本类型的 setXXXXXProperty方法去设置条件。
消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
|
如上,消费者就只消费 applicationName = 2 且 result = 'RT' 的消息。
三、小结
1、提供了筛选功能,可以减少 destination 的数量。可以用于实现特定机器,特定消息(灰度?)。
2、如果同时两个消费者的话,一个异常不能消费了,那么消息就会产生积压。对另一个正常的消费者而言,性能会下降,消费时间可能会变长。