【消息中间件JMS java message service】RocketMQ

基础内容

角色

nameserver

注册中心。可以部署多个实现高可用。所有broker会向nameserver上报,但nameserver之间不互相通报。所有数据放在内存中(可配置成持久化,但没有必要)

broker

实例

// 启动mq broker(向指定中心注册)
./mqbroker -n localhost:9876

启动的时候,会向所有的nameser注册,建立长链接,定时上报

master(支持读、写消息)

slave(只能读)

组成

queue

在rocketMQ中,只存在queue,queue的消费模式,是由消费者指定的。

topic

topic是一个逻辑概念:多个queue组成一个topic,消费这个topic即可以消费这个topic下的所有queue

producerGroup、consumerGroup

同一组内的设置,比如consumer的模式,过滤器的条件等等,都需要一致

广播消息

消费者决定消息是一次消费还是可以广播。
MessageModel.CLUSTERING 保证每个集群中的任意一个consumer消费一次
MessageModel.BROADCASTING 每一个consumer都要消费

// 广播消息,类似于activeMq中的topic模式,
        consumer.setMessageModel(MessageModel.BROADCASTING);

消息过滤

  • tag ,new msg的时候可以标定tag,用来做标记。用sql过滤需要在broker.conf开启配置项 enablePropertyFilter=true
consumer.subscribe(topicName, tag); // 接收消息可以用tag过滤
// 也可以用sql表达式过滤tag
Message msg = new Message(topicName,"helloWorld".getBytes());
 msg.putUserProperty("age", String.valueOf(18));

MessageSelector selector = MessageSelector.bySql("age ==5")第一次;
consumer.subscribe(topicName, selector);

事务

rocketMQ使用的是2pc。(区别是 tcc会local住资源)

  • 2pc :两阶段提交。第一次是常识提交,等第二次提交确认,才算一次真正的提交。接受者必须支持这种模式
  • tcc:try confirm cancel
    【消息中间件JMS java message service】RocketMQ

实现方式

  • half message:预处理消息,当broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中
  • 检查事务状态:broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向消息发送者确认事物执行状态(提交、回滚、未知),等待下一次回调。
  • 超时:如果超过回查次数,默认回滚消息
  • transactionListener的两个方法:
    • executeLocalTransaction:半消息发送成功触发此方法来执行本地事物
    • checkLocalTransaction:broker将发送检查消息来检查事务状态,并将调用次方法来获取本地事务状态
  • 本地事务执行状态:
    • LocalTransactionState.COMMIT_MESSAG。执行事务成功,确认提交
    • LocalTransactionState.ROLLBACK_MESSAGE。回滚消息,broker端回删除半消息
    • LocalTransactionState.UNKONW。未知状态,等待broker回查

面试题

如何保证消息的顺序性?

往同一个topic下的同一个queue发送数据,且发数据用一个线程去发,保证发送的顺序性。
(事务producer可以设置线程池)
消费者必须要设置一个线程,并且用order的监听器监听。

上一篇:【Kafka主题/分区/日志/消费顺序】


下一篇:技术高手锤炼记