消息有序指可以按照消息的发送顺序来消费。RocketMQ可以保证消息是有序的。
在MQ模型中,序列需要通过3个阶段来保证:
1)发送消息时保持顺序。
2)消息的存储顺序与发送顺序相同。
3)当消息被消费时,它们的保存顺序与它们存储的顺序相同。
发送时保持顺序是指对于有顺序要求的消息,用户应该在同一个线程中同步发送。 存储和发送的顺序是一样的,它要求在同一个线程中发送的消息A和B必须在空间A之前存储。 消费和存储的一致性要求消息A和B到达Consumer后必须按照A和B的顺序进行处理。
1. 生产者
创建一个OrderProducer类:
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class OrderProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
//1. 创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("demo_producer_group");
//2. 指定Nameserver地址
producer.setNamesrvAddr("192.168.195.128:9876");
//3. 启动producer
producer.start();
//4. 创建消息
for (int i = 0; i < 5; i++) {
Message msg = new Message("TopicOrderDemo",
"Tag1",
"Key" + i,
("HelloWorld" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//5. 发送消息
SendResult result = producer.send(msg, new MessageQueueSelector() {
/**
*
* @param mqs:发送的消息信息
* @param msg:消息对象
* @param arg:指定对应的队列下标
* @return
*/
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//获取队列的下标
Integer index = (Integer) arg;
//获取对应下标的队列
return mqs.get(index);
}
}, 1);
System.out.println(result);
}
//6. 关闭producer
producer.shutdown();
}
}
输出:
SendResult [sendStatus=SEND_OK, msgId=C0A8017A385818B4AAC250DAEAF70000, offsetMsgId=C0A8C38000002A9F000000000000086F, messageQueue=MessageQueue [topic=TopicOrderDemo, brokerName=rocketmq-nameserver1, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=C0A8017A385818B4AAC250DAEB130001, offsetMsgId=C0A8C38000002A9F0000000000000937, messageQueue=MessageQueue [topic=TopicOrderDemo, brokerName=rocketmq-nameserver1, queueId=1], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A8017A385818B4AAC250DAEB1C0002, offsetMsgId=C0A8C38000002A9F00000000000009FF, messageQueue=MessageQueue [topic=TopicOrderDemo, brokerName=rocketmq-nameserver1, queueId=1], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=C0A8017A385818B4AAC250DAEB1E0003, offsetMsgId=C0A8C38000002A9F0000000000000AC7, messageQueue=MessageQueue [topic=TopicOrderDemo, brokerName=rocketmq-nameserver1, queueId=1], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=C0A8017A385818B4AAC250DAEB210004, offsetMsgId=C0A8C38000002A9F0000000000000B8F, messageQueue=MessageQueue [topic=TopicOrderDemo, brokerName=rocketmq-nameserver1, queueId=1], queueOffset=4]
2. 消费者
创建一个OrderConsumer类:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class OrderConsumer {
public static void main(String[] args) throws MQClientException {
//1. 创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_producer_group");
//2. 指定Nameserver地址
consumer.setNamesrvAddr("192.168.195.128:9876");
//3. 订阅主题Topic和Tag
consumer.subscribe("TopicOrderDemo", //要消费的主题
"*"); //过滤规则
//4. 创建消息监听
consumer.setMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
//读取消息
for (MessageExt msg : msgs) {
//获取主题
String topic = msg.getTopic();
//获取标签
String tags = msg.getTags();
//获取信息
byte[] body = msg.getBody();
try {
String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
System.out.println("OrderConsumer消费信息——Topic: " + topic + ", Tags: " + tags + ", Result: " + result);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
//5.启动消费者consumer
consumer.start();
}
}
输出:
OrderConsumer消费信息——Topic: TopicOrderDemo, Tags: Tag1, Result: HelloWorld0
OrderConsumer消费信息——Topic: TopicOrderDemo, Tags: Tag1, Result: HelloWorld1
OrderConsumer消费信息——Topic: TopicOrderDemo, Tags: Tag1, Result: HelloWorld2
OrderConsumer消费信息——Topic: TopicOrderDemo, Tags: Tag1, Result: HelloWorld3
OrderConsumer消费信息——Topic: TopicOrderDemo, Tags: Tag1, Result: HelloWorld4