RocketMq如何保证消息的顺序消费:
由于RocketMq的消息都是存储在topic中,而topic中又有不同的队列,RocketMq会自动进行负载均衡处理,使消息尽量均匀的分布到不同的队列中去,而队列的属性又是先进先出,所以我们只需要确保把消息发送到同一个队列中,消费者单线程进行消费,就可以确保消息的顺序性。
producer代码:
for(int i=0;i<20;i++) {
Message message=new Message("order_producer_topic", ("hello !"+i).getBytes());
//顺序发送,自定义发送到那个队列的计算方式,
try {
producer.send(message, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//根据传入参数计算应该放到那个队列中
int queueId=Integer.parseInt(arg.toString())%mqs.size();
return mqs.get(queueId);
}
}, 8//该参数用于select()计算
);
//RocketMq还提供了定义好的计算队列方式 该方式是通过传入的tag 进行hash计算,然后对写队列值进行取余运算
message.setTags("oeder message");
producer.send(message, new SelectMessageQueueByHash(), message.getTags());
//该方式是通过随机数的方式进行计算放到那个队列中, 在该方式中,第三个参数不参与计算
producer.send(message, new SelectMessageQueueByRandoom(), "");
} catch (Exception e) {
System.out.println("顺序发送失败"+e);
}
}
consumer代码:
//消费端代码 注册一个MessageListenerOrderly()进行有序监听
consumer.registerMessageListener(new MessageListenerOrderly() {
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt messageExt : msgs) {
String str=new String(messageExt.getBody());
System.out.println("The message queue id is "+messageExt.getQueueId()+"============ message is ["+str+"]");
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
producer中MessageQueueSelector接口解析
在producer中MessageQueueSelector是一个接口,可以自己实现改接口,RocketMq自己提供了三种实现方式,分别是SelectMessageQueueByHash、SelectMessageQueueByMachineRoom、SelectMessageQueueByRandoom。
其中SelectMessageQueueByHash是通过传入参数的hashcode()值与写队列数量进行取余运算,来决定该消息放到那个队列中去。
SelectMessageQueueByMachineRoom(根据机房来选择发往哪个队列,支付宝逻辑机房使用) 这是该实现类自带的注释,用于阿里自己的业务需求,可以忽略。
SelectMessageQueueByRandoom是使用随机数和写队列数量进行取余运算,随机把消息放到一个队列中去。
consumer中MessageListenerOrderly接口解析
MessageListenerOrderly和MessageListenerConcurrently都是继承MessageListener接口,区别在于MessageListenerOrderly是同一队列的消息同一时刻只能一个线程消费,
可保证消息在同一队列严格有序消费而MessageListenerConcurrently接口是同一队列的消息并行消费