RocketMQ 如何保证消息不丢失

产生消息丢失的场景

rocketmq的消息传递分为3个阶段:
(1)生产者发送消息到broker的队列中
(2)broker存储消息
(3)消费者到队列获取消息进行消费

而这三个阶段可能会导致消息丢失的场景是什么呢?
(1)生产者发送消息到broker的队列中
生产者在发出消息后,可能因为网络异常、broker宕机,导致发出的消息实际并没有到达broker
(2)broker存储消息
broker的存储机制是将消息先存储到内存,存储完成后再发送回执给生产者,然后再异步将数据刷到磁盘,但如果在这个刷盘这个过程中broker宕机了,也会导致消息丢失

(3)消费者到队列获取消息进行消费
broker在将消息发出后,同样可能因为网络异常、消费者宕机或者消息者消费到一半产生错误等因素,导致消息实际并没有被消费者消费,但broker又扣除了这条消息,就会导致消息丢失

防丢失措施

(1)生产阶段

保证消息可靠的手段包括多节点部署broker、同步发送、消息重发。这几种方式实际上是可以配合使用的,比如多节点部署,通过同步发送,发送失败时进行3次重发,都重发失败则记录状态。

1、多节点部署,可以部署主从或集群模式

2、同步发送,主要通过producer.send来实现同步发送;发送重试,需要根据send.getSendStatus()状态来判断

public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
	// 声明group
	DefaultMQProducer producer = new DefaultMQProducer("group_test");

	// 声明namesrv地址
	producer.setNamesrvAddr("localhost:9876");

	// 设置重试次数
	producer.setRetryTimesWhenSendFailed(3);
	// 启动实例
	producer.start();

	// 设置消息的topic,tag以及消息体
	Message msg = new Message("topic_test", "tag_test", "消息内容".getBytes(StandardCharsets.UTF_8));

	// 发送消息,并设置10s连接超时
	SendResult send = producer.send(msg, 10000);
	System.out.println("发送结果:"+send);

    // 发送重试
    if(!send.getSendStatus().equals(SendStatus.SEND_OK)){
		// 发送失败,手动重发
		send = producer.send(msg, 10000);
	}

	// 关闭实例
	producer.shutdown();
}

(2)存储阶段
1、主要将broker的刷盘策略设置为同步刷盘,需要修改broker.conf配置文件
# 设置为同步刷盘模式
flushDiskType = SYNC_FLUSH

2、如果配置的是多节点,一般是主从模式,为了防止主节点有数据,从节点没刷到数据的情况,就需要开启从节点刷盘后再返回ACK回执给生产者,需要修改从节点broker配置文件
# 默认为 ASYNC_MASTER
brokerRole=SYNC_MASTER

broker提供了两种主从同步模式:ASYNC_MASTER异步 和 SYNC_MASTER同步
ASYNC_MASTER:消息发送到master节点后,开启一个异步线程更新给从节点,这个过程中有消息同步丢失的风险,优点是性能高
SYNC_MASTER:消息发送到master节点后,同步更新到从节点,当从节点更新完再返回成功的ACK回执给生产者,表示消息发送成功,可靠性高,但性能会有所下降

(3)消费阶段

其实现是消费后返回成功状态,如果返回的是ConsumeConcurrentlyStatus.RECONSUME_LATER状态,消费者就会触发稍后重试机制进行重新消费,同样的可以通过consumer.setMaxReconsumeTimes设置最大重试次数

	DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");

	consumer.setNamesrvAddr("127.0.0.1:9876");

	// 集群消费模式
	consumer.setMessageModel(MessageModel.CLUSTERING);

	// 设置topic
	consumer.subscribe("topic_test", "*");

	// 设置重试次数
	consumer.setMaxReconsumeTimes(3);

	// 注册回调函数,处理消息
	consumer.registerMessageListener(new MessageListenerConcurrently() {
		@Override
		public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
			for (MessageExt msg : list) {
				String topic = msg.getTopic();
				try {
					String messageBody = new String(msg.getBody(), "utf-8");
					System.out.println(topic+":"+messageBody);
				} catch (UnsupportedEncodingException e) {
					e.printStackTrace();
				}
			}
			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
		}
	});

	// 启动消费者实例
	consumer.start();

上一篇:搭子小程序定制开发:全新找搭子之旅


下一篇:ODOO学习笔记(1):ODOO的SWOT分析和技术优势是什么?