RocketMq顺序消费

RocketMq如何保证消息的顺序消费:

由于RocketMq的消息都是存储在topic中,而topic中又有不同的队列,RocketMq会自动进行负载均衡处理,使消息尽量均匀的分布到不同的队列中去,而队列的属性又是先进先出,所以我们只需要确保把消息发送到同一个队列中,消费者单线程进行消费,就可以确保消息的顺序性。

producer代码:

for(int i=0;i<20;i++) {
			Message message=new Message("order_producer_topic", ("hello !"+i).getBytes());
			//顺序发送,自定义发送到那个队列的计算方式,
			try {
				producer.send(message, new MessageQueueSelector() {
					public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
						//根据传入参数计算应该放到那个队列中
						int  queueId=Integer.parseInt(arg.toString())%mqs.size();
						return mqs.get(queueId);
					}
				}, 8//该参数用于select()计算
				);
				
				//RocketMq还提供了定义好的计算队列方式  该方式是通过传入的tag 进行hash计算,然后对写队列值进行取余运算
				message.setTags("oeder message");
				producer.send(message, new SelectMessageQueueByHash(), message.getTags());
				
				//该方式是通过随机数的方式进行计算放到那个队列中, 在该方式中,第三个参数不参与计算
				producer.send(message, new SelectMessageQueueByRandoom(), "");
				
			} catch (Exception e) {
				System.out.println("顺序发送失败"+e);
			}
		}

consumer代码:

	//消费端代码      注册一个MessageListenerOrderly()进行有序监听
		consumer.registerMessageListener(new MessageListenerOrderly() {
			public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
				for (MessageExt messageExt : msgs) {
					String str=new String(messageExt.getBody());
					System.out.println("The message queue id is "+messageExt.getQueueId()+"============ message is ["+str+"]");
				}
				return ConsumeOrderlyStatus.SUCCESS;
			}
		});

producer中MessageQueueSelector接口解析

在producer中MessageQueueSelector是一个接口,可以自己实现改接口,RocketMq自己提供了三种实现方式,分别是SelectMessageQueueByHash、SelectMessageQueueByMachineRoom、SelectMessageQueueByRandoom。

其中SelectMessageQueueByHash是通过传入参数的hashcode()值与写队列数量进行取余运算,来决定该消息放到那个队列中去。

SelectMessageQueueByMachineRoom(根据机房来选择发往哪个队列,支付宝逻辑机房使用) 这是该实现类自带的注释,用于阿里自己的业务需求,可以忽略。

SelectMessageQueueByRandoom是使用随机数和写队列数量进行取余运算,随机把消息放到一个队列中去。

consumer中MessageListenerOrderly接口解析

MessageListenerOrderly和MessageListenerConcurrently都是继承MessageListener接口,区别在于MessageListenerOrderly是同一队列的消息同一时刻只能一个线程消费,

可保证消息在同一队列严格有序消费而MessageListenerConcurrently接口是同一队列的消息并行消费

上一篇:AOP中实现动态代理的两种方式


下一篇:RocketMQ使用方法