RocketMQ-初体验RocketMQ(10)-过滤消息_SQL92表达式筛选消息

文章目录

RocketMQ-初体验RocketMQ(10)-过滤消息_SQL92表达式筛选消息


过滤消息概述

大部分情况下 ,我们都可以通过TAG来选择我们想要获取的消息,如下

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。

在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。

在RocketMQ定义的语法下,可以实现一些简单的逻辑。

举个例子

RocketMQ-初体验RocketMQ(10)-过滤消息_SQL92表达式筛选消息


基本语法

RocketMQ-初体验RocketMQ(10)-过滤消息_SQL92表达式筛选消息


使用限制

只有使用push模式的消费者才能用使用SQL92标准的sql语句

接口如下

public void subscribe(final String topic, final MessageSelector messageSelector)

启用配置 (重要 )

使用Filter功能,需要在启动配置文件当中配置以下选项

enablePropertyFilter=true

RocketMQ-初体验RocketMQ(10)-过滤消息_SQL92表达式筛选消息

常见错误:The broker does not support consumer to filter message by SQL92

配置文件中增加如下配置

enablePropertyFilter=true

示例

生产者

package com.artisan.rocketmq.filter;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * @author 小工匠
 * @version v1.0
 * @create 2019-11-11 23:30
 * @motto show me the code ,change the word
 * @blog https://artisan.blog.csdn.net/
 * @description
 **/
public class FilterProducer {


    /***
     * TAG-FILTER-1000 ---> 布隆过滤器
     * 过滤掉的那些消息。直接就跳过了么。下次就不会继续过滤这些了。是么。
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("filter_sample_group");
        producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");
        producer.start();

        for (int i = 0; i < 3; i++) {
            Message msg = new Message("TopicFilter",
                    "TAG-FILTER",
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );

            // Set some properties. 生产者设置属性,消费者端通过Tag+该属性定制消息
            msg.putUserProperty("a", String.valueOf(i));
            if (i % 2 == 0) {
                msg.putUserProperty("b", "artisan");
            } else {
                msg.putUserProperty("b", "smart artisan");
            }
            producer.send(msg);
        }

        producer.shutdown();
    }

}

消费者

package com.artisan.rocketmq.filter;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * @author 小工匠
 * @version v1.0
 * @create 2019-11-11 23:45
 * @motto show me the code ,change the word
 * @blog https://artisan.blog.csdn.net/
 * @description
 **/
public class FilterConsumer {

    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_sample_group");
        /**
         * 注册中心
         */
        consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");
        /**
         * 订阅主题
         * 一种资源去换取另外一种资源
         */
        consumer.subscribe("TopicFilter", MessageSelector.bySql("a between 0 and 3 and b = 'artisan'"));
        /**
         * 注册监听器,监听主题消息
         */
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs){
                    try {
                        System.out.println("consumeThread=" + Thread.currentThread().getName()
                                + ", queueId=" + msg.getQueueId() + ", content:"
                                + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();

        System.out.printf("Filter Consumer Started.%n");
    }
}

日志:

Filter Consumer Started.
consumeThread=ConsumeMessageThread_1, queueId=0, content:Hello RocketMQ 0
consumeThread=ConsumeMessageThread_2, queueId=2, content:Hello RocketMQ 2

可以看到,我们虽然发了 3条消息 ,但是只获取了我们期望的2条消息,可见过滤起了作用。

上一篇:Apache Kafka-通过设置Consumer Group实现广播模式


下一篇:RocketMQ学习笔记(二)