基础内容
角色
nameserver
注册中心。可以部署多个实现高可用。所有broker会向nameserver上报,但nameserver之间不互相通报。所有数据放在内存中(可配置成持久化,但没有必要)
broker
实例
// 启动mq broker(向指定中心注册)
./mqbroker -n localhost:9876
启动的时候,会向所有的nameser注册,建立长链接,定时上报
master(支持读、写消息)
slave(只能读)
组成
queue
在rocketMQ中,只存在queue,queue的消费模式,是由消费者指定的。
topic
topic是一个逻辑概念:多个queue组成一个topic,消费这个topic即可以消费这个topic下的所有queue
producerGroup、consumerGroup
同一组内的设置,比如consumer的模式,过滤器的条件等等,都需要一致
广播消息
消费者决定消息是一次消费还是可以广播。
MessageModel.CLUSTERING 保证每个集群中的任意一个consumer消费一次
MessageModel.BROADCASTING 每一个consumer都要消费
// 广播消息,类似于activeMq中的topic模式,
consumer.setMessageModel(MessageModel.BROADCASTING);
消息过滤
- tag ,new msg的时候可以标定tag,用来做标记。用sql过滤需要在broker.conf开启配置项 enablePropertyFilter=true
consumer.subscribe(topicName, tag); // 接收消息可以用tag过滤
// 也可以用sql表达式过滤tag
Message msg = new Message(topicName,"helloWorld".getBytes());
msg.putUserProperty("age", String.valueOf(18));
MessageSelector selector = MessageSelector.bySql("age ==5")第一次;
consumer.subscribe(topicName, selector);
事务
rocketMQ使用的是2pc。(区别是 tcc会local住资源)
- 2pc :两阶段提交。第一次是常识提交,等第二次提交确认,才算一次真正的提交。接受者必须支持这种模式
- tcc:try confirm cancel
实现方式
- half message:预处理消息,当broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中
- 检查事务状态:broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向消息发送者确认事物执行状态(提交、回滚、未知),等待下一次回调。
- 超时:如果超过回查次数,默认回滚消息
- transactionListener的两个方法:
- executeLocalTransaction:半消息发送成功触发此方法来执行本地事物
- checkLocalTransaction:broker将发送检查消息来检查事务状态,并将调用次方法来获取本地事务状态
- 本地事务执行状态:
- LocalTransactionState.COMMIT_MESSAG。执行事务成功,确认提交
- LocalTransactionState.ROLLBACK_MESSAGE。回滚消息,broker端回删除半消息
- LocalTransactionState.UNKONW。未知状态,等待broker回查
面试题
如何保证消息的顺序性?
往同一个topic下的同一个queue发送数据,且发数据用一个线程去发,保证发送的顺序性。
(事务producer可以设置线程池)
消费者必须要设置一个线程,并且用order的监听器监听。