导入pom依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
消息发送者步骤分析
1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息
6.关闭生产者producer
消息消费者步骤分析
1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer
1、基本demo
消息发送
1)发送同步消息
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知
public class SyncProducer {
public static void main(String[] args) throws Exception{
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("syncProducer_group");
// 设置NameServer的地址
producer.setNamesrvAddr("8.131.84.120:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 5; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message message = new Message("TopicSync","TAGA","hello".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息到一个Broker
SendResult result = producer.send(message);
// 通过sendResult返回消息是否成功送达
System.out.println(result);
}
// 如果不再发送消息,关闭Producer实例
producer.shutdown();
}
}
2)发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应
public class AsyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("asyncProducer");
producer.setNamesrvAddr("8.131.84.120:9876");
producer.start();
//异步消息发送失败重试次数
producer.setRetryTimesWhenSendAsyncFailed(0);
// 创建消息,并指定Topic,Tag和消息体
Message message = new Message("TopicAsync","TAGA","OrderId001","hello".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收异步返回结果的回调
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
sendResult.getMsgId();
}
@Override
public void onException(Throwable throwable) {
throwable.printStackTrace();
}
});
// 如果不再发送消息,关闭Producer实例
producer.shutdown();
}
}
3)单向发送消息
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
public class OnewayProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("onewayProducer");
producer.setNamesrvAddr("8.131.84.120:9876");
producer.start();
Message message = new Message("TopicOneway","hello".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送单向消息,没有任何返回结果
producer.sendOneway(message);
producer.shutdown();
}
消费消息有两种
1)负载均衡模式
消费者采用负载均衡
方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同
public class ConsumerLoad {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
consumer.setNamesrvAddr("8.131.84.120:9876");
//消费者订阅topic
consumer.subscribe("TopicSync","*");
//负载均衡模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("consumer start");
}
}
2)广播模式
消费者采用广播
的方式消费消息,每个消费者消费的消息都是相同的
public class ConsumerBroad {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
consumer.setNamesrvAddr("8.131.84.120:9876");
consumer.subscribe("TopicSync","*");
//广播模式消费
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("consumer start");
}
}
2、顺序消息
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序
下面用订单进行分区有序(messageQueue)
的示例。一个订单的顺序流程是:创建、付款、推送、完成
。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列
本案例是创建了3个订单,每个订单有4个流程,为了保证被消费顺序正确,每个线程会负责单独的一个订单,实现过程先启动消费者等待消费,再启动生产者生产消息,当然记得先把rocketmq启动起来
顺序消息生产
订单实体类类
package messageType.order;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.util.ArrayList;
import java.util.List;
/**
* 订单类
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class OrderStep {
private long orderId; //订单id
private String desc; //订单描述
/**
* 生成模拟订单数据
*/
static List<OrderStep> buildOrders(){
List<OrderStep> orderList = new ArrayList<>();
orderList.add(new OrderStep(111111L,"create order"));
orderList.add(new OrderStep(222222L,"create order"));
orderList.add(new OrderStep(333333L,"create order"));
orderList.add(new OrderStep(111111L,"pay order"));
orderList.add(new OrderStep(222222L,"pay order"));
orderList.add(new OrderStep(333333L,"create order"));
orderList.add(new OrderStep(111111L,"push order"));
orderList.add(new OrderStep(111111L,"finish order"));
orderList.add(new OrderStep(222222L,"push order"));
orderList.add(new OrderStep(333333L,"pay order"));
return orderList;
}
}
生产者
package messageType.order;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
public class OrderProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("order_group_producer");
producer.setNamesrvAddr("8.131.84.120:9876");
producer.start();
//生成订单列表
List<OrderStep> orderLists = OrderStep.buildOrders();
//添加个时间
// Date date = new Date();
// SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// String dateStr = sdf.format(date);
for (int i = 0; i < orderLists.size(); i++) {
//获取订单消息
String body = orderLists.get(i)+"";
Message message = new Message("Topic_order","order","KEY"+i,body.getBytes());
/**
* 1、第一个参数,消息队列选择器,选中指定的消息队列对象,会将所有消息队列传进来
* 2、第二个参数,发送的消息
* 3、第三个参数,选择队列的业务标识(订单id)
*/
SendResult result = producer.send(
message,
new MessageQueueSelector() {
/**
*
* @param mqs 队列集合
* @param msg 消息对象
* @param arg 业务标识的参数
* @return
*/
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//根据订单id选择发送queue
long id = (long) arg;
long orderId = id % mqs.size();
return mqs.get((int) orderId);
}
},
orderLists.get(i).getOrderId()); //获取订单id
System.out.println(String.format("sendResult status:%s, queueId:%d, body:%s",
result.getSendStatus(),
result.getMessageQueue().getQueueId(),
body
));
}
producer.shutdown();
}
}
返回结果为
[INFO] --- exec-maven-plugin:3.0.0:exec (default-cli) @ mq3 ---
sendResult status:SEND_OK, queueId:3, body:OrderStep(orderId=111111, desc=create order)
sendResult status:SEND_OK, queueId:2, body:OrderStep(orderId=222222, desc=create order)
sendResult status:SEND_OK, queueId:1, body:OrderStep(orderId=333333, desc=create order)
sendResult status:SEND_OK, queueId:3, body:OrderStep(orderId=111111, desc=pay order)
sendResult status:SEND_OK, queueId:2, body:OrderStep(orderId=222222, desc=pay order)
sendResult status:SEND_OK, queueId:1, body:OrderStep(orderId=333333, desc=create order)
sendResult status:SEND_OK, queueId:3, body:OrderStep(orderId=111111, desc=push order)
sendResult status:SEND_OK, queueId:3, body:OrderStep(orderId=111111, desc=finish order)
sendResult status:SEND_OK, queueId:2, body:OrderStep(orderId=222222, desc=push order)
sendResult status:SEND_OK, queueId:1, body:OrderStep(orderId=333333, desc=pay order)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
顺序消费消息
package messageType.order;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_order_consumer");
consumer.setNamesrvAddr("8.131.84.120:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
//consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//消费者订阅topic
consumer.subscribe("Topic_order","*");
//消费者注册消息监听器
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System.out.println(
"consumeThread=" + Thread.currentThread().getName()
+ ", queueId=" + msg.getQueueId()
+ ", content:" + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("consumer starting");
}
}
返回结果,每个线程单独负责处理一个订单的流程
consumer starting
consumeThread=ConsumeMessageThread_1, queueId=1, content:OrderStep(orderId=333333, desc=create order)
consumeThread=ConsumeMessageThread_1, queueId=1, content:OrderStep(orderId=333333, desc=create order)
consumeThread=ConsumeMessageThread_1, queueId=1, content:OrderStep(orderId=333333, desc=pay order)
consumeThread=ConsumeMessageThread_2, queueId=2, content:OrderStep(orderId=222222, desc=create order)
consumeThread=ConsumeMessageThread_2, queueId=2, content:OrderStep(orderId=222222, desc=pay order)
consumeThread=ConsumeMessageThread_2, queueId=2, content:OrderStep(orderId=222222, desc=push order)
consumeThread=ConsumeMessageThread_3, queueId=3, content:OrderStep(orderId=111111, desc=create order)
consumeThread=ConsumeMessageThread_3, queueId=3, content:OrderStep(orderId=111111, desc=pay order)
consumeThread=ConsumeMessageThread_3, queueId=3, content:OrderStep(orderId=111111, desc=push order)
consumeThread=ConsumeMessageThread_3, queueId=3, content:OrderStep(orderId=111111, desc=finish order)
3、延时消息
比如电商里,提交了一个订单就可以发送一个延时消息,1小时后去检查这个订单的状态,如果还是未付款就取消订单释放库存,经常遇到下单后会提醒15分钟内付款,超出时间未付款就自动取消订单
启动消费者
package messageType.delay;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class ScheduledMessageConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_delay_order_group");
consumer.setNamesrvAddr("8.131.84.120:9876");
consumer.subscribe("topic_delay_order","*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(
", msgId" + msg.getMsgId()
+ ", delayTime" + (System.currentTimeMillis() - msg.getStoreTimestamp()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("consumer starting...");
}
}
启动生产者,发送延时消息
package messageType.delay;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_delay_order_group");
producer.setNamesrvAddr("8.131.84.120:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("topic_delay_order","tags","key"+i,"delay".getBytes(RemotingHelper.DEFAULT_CHARSET));
//设置发送延时时间
message.setDelayTimeLevel(2);
//发送消息
SendResult result = producer.send(message);
//发送状态
System.out.println(result.getSendStatus());
}
producer.shutdown();
}
}
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18
// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
4、批量消息
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB
消费者
package messageType.batch;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consume = new DefaultMQPushConsumer("consumer_batch_group");
consume.setNamesrvAddr("8.131.84.120:9876");
consume.subscribe("topic_batch","*");
consume.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("thread"+Thread.currentThread().getName()+","+new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consume.start();
System.out.println("consumer starting...");
}
}
生产者
package messageType.batch;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.ArrayList;
import java.util.List;
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_batch_group");
producer.setNamesrvAddr("8.131.84.120:9876");
producer.start();
List<Message> list = new ArrayList<>();
Message message1 = new Message("topic_batch","tags","key"+1,("batchMsg"+1).getBytes(RemotingHelper.DEFAULT_CHARSET));
Message message2 = new Message("topic_batch","tags","key"+2,("batchMsg"+2).getBytes(RemotingHelper.DEFAULT_CHARSET));
Message message3 = new Message("topic_batch","tags","key"+3,("batchMsg"+3).getBytes(RemotingHelper.DEFAULT_CHARSET));
list.add(message1);
list.add(message2);
list.add(message3);
SendResult result = producer.send(list);
System.out.println(result.getSendStatus());
producer.shutdown();
}
}
返回结果
consumer starting...
threadConsumeMessageThread_1,batchMsg1
threadConsumeMessageThread_1,batchMsg2
threadConsumeMessageThread_1,batchMsg3
如果消息的总长度可能大于4MB时,这时候最好把消息进行分割
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加日志的开销20字节
if (tmpSize > SIZE_LIMIT) {
//单个消息超过了最大的限制
//忽略,否则会阻塞分裂的进程
if (nextIndex - currIndex == 0) {
//假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
//处理error
}
}
5、过滤消息有两种,TAG 和 SQL
在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子:
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 10 | --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 1 | --------------------> Missed
| b = 'abc'|
| c = true |
------------
SQL基本语法
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它
- 数值比较,比如:>,>=,<,<=,BETWEEN,=;
- 字符比较,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 逻辑符号 AND,OR,NOT;
常量支持类型为:
- 数值,比如:123,3.1415;
- 字符,比如:‘abc’,必须用单引号包裹起来;
- NULL,特殊的常量
- 布尔值,TRUE 或 FALSE
只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:
public void subscribe(finalString topic, final MessageSelector messageSelector)
测试Tag
生产者
package messageType.tag;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_tag_group");
producer.setNamesrvAddr("8.131.84.120:9876");
producer.start();
for (int i = 0; i < 3; i++) {
//消费者只能消费 TagTopic主题下的 TAG1
Message message = new Message("TagTopic","TAG1",("hello"+i).getBytes());
producer.send(message);
}
producer.shutdown();
}
}
消费者
package messageType.tag;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_tag_group");
consumer.setNamesrvAddr("8.131.84.120:9876");
//当消费多个时可以这样写,"TAG1 || TAG2"
//还可以用 * 代表可以写所有的此主题下的TAG类型
consumer.subscribe("TagTopic","TAG1");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
for (MessageExt msg : msgs) {
System.out.println("thread"+Thread.currentThread().getName()+", msg"+ new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("consumer starting...");
}
}
返回结果
consumer starting...
threadConsumeMessageThread_1, msghello2
threadConsumeMessageThread_2, msghello1
threadConsumeMessageThread_3, msghello0
SQL语法
生产者
package messageType.sql;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_tag_group");
producer.setNamesrvAddr("8.131.84.120:9876");
producer.start();
for (int i = 0; i < 10; i++) {
//消费者只能消费 TagTopic主题下的 TAG1
Message message = new Message("SqlTopic","TAG1",("hello"+i).getBytes());
//给消息绑定属性
message.putUserProperty("i",String.valueOf(i));
producer.send(message);
}
producer.shutdown();
}
}
消费者
package messageType.sql;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_tag_group");
consumer.setNamesrvAddr("8.131.84.120:9876");
//通过sql形式从主题中筛选
//sql92语法在mq中默认不支持,需要到conf文件夹下,broker.conf文件配置 enablePropertyFilter=true
consumer.subscribe("SqlTopic", MessageSelector.bySql("i > 5"));
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
for (MessageExt msg : msgs) {
System.out.println("thread"+Thread.currentThread().getName()+", msg"+ new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("consumer starting...");
}
}
返回结果
consumer starting...
threadConsumeMessageThread_1, msghello6
threadConsumeMessageThread_2, msghello7
threadConsumeMessageThread_3, msghello8
threadConsumeMessageThread_4, msghello9