文章目录
过滤消息概述
大部分情况下 ,我们都可以通过TAG来选择我们想要获取的消息,如下
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。
在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。
在RocketMQ定义的语法下,可以实现一些简单的逻辑。
举个例子
基本语法
使用限制
只有使用push模式的消费者才能用使用SQL92标准的sql语句 ,
接口如下
public void subscribe(final String topic, final MessageSelector messageSelector)
启用配置 (重要 )
使用Filter功能,需要在启动配置文件当中配置以下选项
enablePropertyFilter=true
常见错误:The broker does not support consumer to filter message by SQL92
配置文件中增加如下配置
enablePropertyFilter=true
示例
生产者
package com.artisan.rocketmq.filter;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* @author 小工匠
* @version v1.0
* @create 2019-11-11 23:30
* @motto show me the code ,change the word
* @blog https://artisan.blog.csdn.net/
* @description
**/
public class FilterProducer {
/***
* TAG-FILTER-1000 ---> 布隆过滤器
* 过滤掉的那些消息。直接就跳过了么。下次就不会继续过滤这些了。是么。
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("filter_sample_group");
producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");
producer.start();
for (int i = 0; i < 3; i++) {
Message msg = new Message("TopicFilter",
"TAG-FILTER",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties. 生产者设置属性,消费者端通过Tag+该属性定制消息
msg.putUserProperty("a", String.valueOf(i));
if (i % 2 == 0) {
msg.putUserProperty("b", "artisan");
} else {
msg.putUserProperty("b", "smart artisan");
}
producer.send(msg);
}
producer.shutdown();
}
}
消费者
package com.artisan.rocketmq.filter;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.List;
/**
* @author 小工匠
* @version v1.0
* @create 2019-11-11 23:45
* @motto show me the code ,change the word
* @blog https://artisan.blog.csdn.net/
* @description
**/
public class FilterConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_sample_group");
/**
* 注册中心
*/
consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");
/**
* 订阅主题
* 一种资源去换取另外一种资源
*/
consumer.subscribe("TopicFilter", MessageSelector.bySql("a between 0 and 3 and b = 'artisan'"));
/**
* 注册监听器,监听主题消息
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs){
try {
System.out.println("consumeThread=" + Thread.currentThread().getName()
+ ", queueId=" + msg.getQueueId() + ", content:"
+ new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Filter Consumer Started.%n");
}
}
日志:
Filter Consumer Started.
consumeThread=ConsumeMessageThread_1, queueId=0, content:Hello RocketMQ 0
consumeThread=ConsumeMessageThread_2, queueId=2, content:Hello RocketMQ 2
可以看到,我们虽然发了 3条消息 ,但是只获取了我们期望的2条消息,可见过滤起了作用。