RocketMQ消息发送demo演示

导入pom依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>

消息发送者步骤分析

1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息
6.关闭生产者producer

消息消费者步骤分析

1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer

1、基本demo

消息发送

1)发送同步消息

这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知

public class SyncProducer {
    public static void main(String[] args) throws Exception{
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("syncProducer_group");
        // 设置NameServer的地址
        producer.setNamesrvAddr("8.131.84.120:9876");
        // 启动Producer实例
        producer.start();
        for (int i = 0; i < 5; i++) {
            // 创建消息,并指定Topic,Tag和消息体
            Message message = new Message("TopicSync","TAGA","hello".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 发送消息到一个Broker
            SendResult result = producer.send(message);
            // 通过sendResult返回消息是否成功送达
            System.out.println(result);
        }
        // 如果不再发送消息,关闭Producer实例
        producer.shutdown();
    }
}

2)发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("asyncProducer");
        producer.setNamesrvAddr("8.131.84.120:9876");
        producer.start();
        //异步消息发送失败重试次数
        producer.setRetryTimesWhenSendAsyncFailed(0);
        // 创建消息,并指定Topic,Tag和消息体
        Message message = new Message("TopicAsync","TAGA","OrderId001","hello".getBytes(RemotingHelper.DEFAULT_CHARSET));
        // SendCallback接收异步返回结果的回调
        producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                sendResult.getMsgId();
            }
            @Override
            public void onException(Throwable throwable) {
                throwable.printStackTrace();
            }
        });
        // 如果不再发送消息,关闭Producer实例
        producer.shutdown();
    }
}

3)单向发送消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送。

public class OnewayProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("onewayProducer");
        producer.setNamesrvAddr("8.131.84.120:9876");
        producer.start();
        Message message = new Message("TopicOneway","hello".getBytes(RemotingHelper.DEFAULT_CHARSET));
        // 发送单向消息,没有任何返回结果
        producer.sendOneway(message);
        producer.shutdown();
    }

消费消息有两种

1)负载均衡模式

消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同

public class ConsumerLoad {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
        consumer.setNamesrvAddr("8.131.84.120:9876");
        //消费者订阅topic
        consumer.subscribe("TopicSync","*");
        //负载均衡模式消费
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("consumer start");
    }
}

2)广播模式

消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的

public class ConsumerBroad {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
        consumer.setNamesrvAddr("8.131.84.120:9876");
        consumer.subscribe("TopicSync","*");
        //广播模式消费
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("consumer start");
    }
}

2、顺序消息

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序

下面用订单进行分区有序(messageQueue)的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列

本案例是创建了3个订单,每个订单有4个流程,为了保证被消费顺序正确,每个线程会负责单独的一个订单,实现过程先启动消费者等待消费,再启动生产者生产消息,当然记得先把rocketmq启动起来

顺序消息生产

订单实体类类

package messageType.order;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

import java.util.ArrayList;
import java.util.List;

/**
 * 订单类
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class OrderStep {
    private long orderId; //订单id
    private String desc;  //订单描述

    /**
     * 生成模拟订单数据
     */
    static List<OrderStep> buildOrders(){
        List<OrderStep> orderList  = new ArrayList<>();
        orderList.add(new OrderStep(111111L,"create order"));
        orderList.add(new OrderStep(222222L,"create order"));
        orderList.add(new OrderStep(333333L,"create order"));
        orderList.add(new OrderStep(111111L,"pay order"));
        orderList.add(new OrderStep(222222L,"pay order"));
        orderList.add(new OrderStep(333333L,"create order"));
        orderList.add(new OrderStep(111111L,"push order"));
        orderList.add(new OrderStep(111111L,"finish order"));
        orderList.add(new OrderStep(222222L,"push order"));
        orderList.add(new OrderStep(333333L,"pay order"));
        return orderList;
    }
}

生产者

package messageType.order;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public class OrderProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("order_group_producer");
        producer.setNamesrvAddr("8.131.84.120:9876");
        producer.start();
        //生成订单列表
        List<OrderStep> orderLists = OrderStep.buildOrders();
        //添加个时间
//        Date date = new Date();
//        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//        String dateStr = sdf.format(date);
        for (int i = 0; i < orderLists.size(); i++) {
            //获取订单消息
            String body = orderLists.get(i)+"";
            Message message = new Message("Topic_order","order","KEY"+i,body.getBytes());
            /**
             * 1、第一个参数,消息队列选择器,选中指定的消息队列对象,会将所有消息队列传进来
             * 2、第二个参数,发送的消息
             * 3、第三个参数,选择队列的业务标识(订单id)
             */
            SendResult result = producer.send(
                    message,
                    new MessageQueueSelector() {
                        /**
                         *
                         * @param mqs 队列集合
                         * @param msg 消息对象
                         * @param arg 业务标识的参数
                         * @return
                         */
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            //根据订单id选择发送queue
                            long id = (long) arg;
                            long orderId = id % mqs.size();
                            return mqs.get((int) orderId);
                        }
                    },
                    orderLists.get(i).getOrderId()); //获取订单id

                System.out.println(String.format("sendResult status:%s, queueId:%d, body:%s",
                        result.getSendStatus(),
                        result.getMessageQueue().getQueueId(),
                        body
                ));
        }
        producer.shutdown();
    }
}

返回结果为

[INFO] --- exec-maven-plugin:3.0.0:exec (default-cli) @ mq3 ---
sendResult status:SEND_OK, queueId:3, body:OrderStep(orderId=111111, desc=create order)
sendResult status:SEND_OK, queueId:2, body:OrderStep(orderId=222222, desc=create order)
sendResult status:SEND_OK, queueId:1, body:OrderStep(orderId=333333, desc=create order)
sendResult status:SEND_OK, queueId:3, body:OrderStep(orderId=111111, desc=pay order)
sendResult status:SEND_OK, queueId:2, body:OrderStep(orderId=222222, desc=pay order)
sendResult status:SEND_OK, queueId:1, body:OrderStep(orderId=333333, desc=create order)
sendResult status:SEND_OK, queueId:3, body:OrderStep(orderId=111111, desc=push order)
sendResult status:SEND_OK, queueId:3, body:OrderStep(orderId=111111, desc=finish order)
sendResult status:SEND_OK, queueId:2, body:OrderStep(orderId=222222, desc=push order)
sendResult status:SEND_OK, queueId:1, body:OrderStep(orderId=333333, desc=pay order)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS

顺序消费消息

package messageType.order;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_order_consumer");
        consumer.setNamesrvAddr("8.131.84.120:9876");
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //消费者订阅topic
        consumer.subscribe("Topic_order","*");
        //消费者注册消息监听器
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
                    System.out.println(
                            "consumeThread=" + Thread.currentThread().getName()
                            + ", queueId=" + msg.getQueueId()
                            + ", content:" + new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.println("consumer starting");
    }
}

返回结果,每个线程单独负责处理一个订单的流程

consumer starting
consumeThread=ConsumeMessageThread_1, queueId=1, content:OrderStep(orderId=333333, desc=create order)
consumeThread=ConsumeMessageThread_1, queueId=1, content:OrderStep(orderId=333333, desc=create order)
consumeThread=ConsumeMessageThread_1, queueId=1, content:OrderStep(orderId=333333, desc=pay order)
consumeThread=ConsumeMessageThread_2, queueId=2, content:OrderStep(orderId=222222, desc=create order)
consumeThread=ConsumeMessageThread_2, queueId=2, content:OrderStep(orderId=222222, desc=pay order)
consumeThread=ConsumeMessageThread_2, queueId=2, content:OrderStep(orderId=222222, desc=push order)
consumeThread=ConsumeMessageThread_3, queueId=3, content:OrderStep(orderId=111111, desc=create order)
consumeThread=ConsumeMessageThread_3, queueId=3, content:OrderStep(orderId=111111, desc=pay order)
consumeThread=ConsumeMessageThread_3, queueId=3, content:OrderStep(orderId=111111, desc=push order)
consumeThread=ConsumeMessageThread_3, queueId=3, content:OrderStep(orderId=111111, desc=finish order)

3、延时消息

比如电商里,提交了一个订单就可以发送一个延时消息,1小时后去检查这个订单的状态,如果还是未付款就取消订单释放库存,经常遇到下单后会提醒15分钟内付款,超出时间未付款就自动取消订单

启动消费者

package messageType.delay;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class ScheduledMessageConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_delay_order_group");
        consumer.setNamesrvAddr("8.131.84.120:9876");
        consumer.subscribe("topic_delay_order","*");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(
                            ", msgId" + msg.getMsgId()
                            + ", delayTime" + (System.currentTimeMillis() - msg.getStoreTimestamp()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.println("consumer starting...");
    }
}

启动生产者,发送延时消息

package messageType.delay;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class ScheduledMessageProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producer_delay_order_group");
        producer.setNamesrvAddr("8.131.84.120:9876");
        producer.start();
        for (int i = 0; i < 10; i++) {
            Message message = new Message("topic_delay_order","tags","key"+i,"delay".getBytes(RemotingHelper.DEFAULT_CHARSET));
            //设置发送延时时间
            message.setDelayTimeLevel(2);
            //发送消息
            SendResult result = producer.send(message);
            //发送状态
            System.out.println(result.getSendStatus());
        }
        producer.shutdown();

    }
}

现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18

// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

4、批量消息

批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB

消费者

package messageType.batch;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consume = new DefaultMQPushConsumer("consumer_batch_group");
        consume.setNamesrvAddr("8.131.84.120:9876");
        consume.subscribe("topic_batch","*");
        consume.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("thread"+Thread.currentThread().getName()+","+new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consume.start();
        System.out.println("consumer starting...");
    }
}

生产者

package messageType.batch;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.ArrayList;
import java.util.List;

public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producer_batch_group");
        producer.setNamesrvAddr("8.131.84.120:9876");
        producer.start();
        List<Message> list = new ArrayList<>();
        Message message1 = new Message("topic_batch","tags","key"+1,("batchMsg"+1).getBytes(RemotingHelper.DEFAULT_CHARSET));
        Message message2 = new Message("topic_batch","tags","key"+2,("batchMsg"+2).getBytes(RemotingHelper.DEFAULT_CHARSET));
        Message message3 = new Message("topic_batch","tags","key"+3,("batchMsg"+3).getBytes(RemotingHelper.DEFAULT_CHARSET));
        list.add(message1);
        list.add(message2);
        list.add(message3);
        SendResult result = producer.send(list);
        System.out.println(result.getSendStatus());
        producer.shutdown();
    }
}

返回结果

consumer starting...
threadConsumeMessageThread_1,batchMsg1
threadConsumeMessageThread_1,batchMsg2
threadConsumeMessageThread_1,batchMsg3

如果消息的总长度可能大于4MB时,这时候最好把消息进行分割

public class ListSplitter implements Iterator<List<Message>> {
   private final int SIZE_LIMIT = 1024 * 1024 * 4;
   private final List<Message> messages;
   private int currIndex;
   public ListSplitter(List<Message> messages) {
           this.messages = messages;
   }
    @Override 
    public boolean hasNext() {
       return currIndex < messages.size();
   }
   	@Override 
    public List<Message> next() {
       int nextIndex = currIndex;
       int totalSize = 0;
       for (; nextIndex < messages.size(); nextIndex++) {
           Message message = messages.get(nextIndex);
           int tmpSize = message.getTopic().length() + message.getBody().length;
           Map<String, String> properties = message.getProperties();
           for (Map.Entry<String, String> entry : properties.entrySet()) {
               tmpSize += entry.getKey().length() + entry.getValue().length();
           }
           tmpSize = tmpSize + 20; // 增加日志的开销20字节
           if (tmpSize > SIZE_LIMIT) {
               //单个消息超过了最大的限制
               //忽略,否则会阻塞分裂的进程
               if (nextIndex - currIndex == 0) {
                  //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
                  nextIndex++;
               }
               break;
           }
           if (tmpSize + totalSize > SIZE_LIMIT) {
               break;
           } else {
               totalSize += tmpSize;
           }

       }
       List<Message> subList = messages.subList(currIndex, nextIndex);
       currIndex = nextIndex;
       return subList;
   }
}
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
  try {
      List<Message>  listItem = splitter.next();
      producer.send(listItem);
  } catch (Exception e) {
      e.printStackTrace();
      //处理error
  }
}

5、过滤消息有两种,TAG 和 SQL

RocketMQ消息发送demo演示

在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子:

------------
| message  |
|----------|  a > 5 AND b = 'abc'
| a = 10   |  --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message  |
|----------|   a > 5 AND b = 'abc'
| a = 1    |  --------------------> Missed
| b = 'abc'|
| c = true |
------------

SQL基本语法

RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

常量支持类型为:

  • 数值,比如:123,3.1415;
  • 字符,比如:‘abc’,必须用单引号包裹起来;
  • NULL,特殊的常量
  • 布尔值,TRUE 或 FALSE

只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:

public void subscribe(finalString topic, final MessageSelector messageSelector)

测试Tag

生产者

package messageType.tag;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producer_tag_group");
        producer.setNamesrvAddr("8.131.84.120:9876");
        producer.start();
        for (int i = 0; i < 3; i++) {
            //消费者只能消费 TagTopic主题下的 TAG1
            Message message = new Message("TagTopic","TAG1",("hello"+i).getBytes());
            producer.send(message);
        }
        producer.shutdown();
    }
}

消费者

package messageType.tag;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_tag_group");
        consumer.setNamesrvAddr("8.131.84.120:9876");
        //当消费多个时可以这样写,"TAG1 || TAG2"
        //还可以用 * 代表可以写所有的此主题下的TAG类型
        consumer.subscribe("TagTopic","TAG1");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
                for (MessageExt msg : msgs) {
                    System.out.println("thread"+Thread.currentThread().getName()+", msg"+ new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.println("consumer starting...");
    }
}

返回结果

consumer starting...
threadConsumeMessageThread_1, msghello2
threadConsumeMessageThread_2, msghello1
threadConsumeMessageThread_3, msghello0

SQL语法

生产者

package messageType.sql;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producer_tag_group");
        producer.setNamesrvAddr("8.131.84.120:9876");
        producer.start();
        for (int i = 0; i < 10; i++) {
            //消费者只能消费 TagTopic主题下的 TAG1
            Message message = new Message("SqlTopic","TAG1",("hello"+i).getBytes());
            //给消息绑定属性
            message.putUserProperty("i",String.valueOf(i));
            producer.send(message);
        }
        producer.shutdown();
    }
}

消费者

package messageType.sql;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_tag_group");
        consumer.setNamesrvAddr("8.131.84.120:9876");
        //通过sql形式从主题中筛选
        //sql92语法在mq中默认不支持,需要到conf文件夹下,broker.conf文件配置 enablePropertyFilter=true
        consumer.subscribe("SqlTopic", MessageSelector.bySql("i > 5"));
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
                for (MessageExt msg : msgs) {
                    System.out.println("thread"+Thread.currentThread().getName()+", msg"+ new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.println("consumer starting...");
    }
}

返回结果

consumer starting...
threadConsumeMessageThread_1, msghello6
threadConsumeMessageThread_2, msghello7
threadConsumeMessageThread_3, msghello8
threadConsumeMessageThread_4, msghello9
上一篇:kafka 简要搭建测试


下一篇:从入门到入土(三)RocketMQ 怎么保证的消息不丢失?