RocketMQ学习笔记

RocketMQ学习笔记

1. RocketMQ介绍

1.1 官方API

1.1.1 概念和特性

1.1.2 架构设计

  • 架构(Architecture):介绍RocketMQ部署架构和技术架构。
  • 设计(Design): 介绍RocketMQ关键机制的设计原理,主要包括消息存储、通信机制、消息过滤、负载均衡、事务消息等。

1.1.3 样例

  • 样例(Example) :介绍RocketMQ的常见用法,包括基本样例、顺序消息样例、延时消息样例、批量消息样例、过滤消息样例、事务消息样例等。

1.1.4 最佳实践

1.1.5 运维管理

  • 集群部署(Operation):介绍单Master模式、多Master模式、多Master多slave模式等RocketMQ集群各种形式的部署方法以及运维工具mqadmin的使用方式。

1.1.6 API Reference

2.1 使用场景、安装和优缺点

背景:RocketMQ是阿里巴巴双十一官方指定消息产品,支撑阿里巴巴集团所有的消息服务,历经十余年高可用与高可靠的严苛考验,是阿里巴巴交易链路的核心产品;

2.1.1 使用场景

  1. 解耦

    场景: A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃…

    在这个场景中,A 系统跟其它各种乱七八糟的系统严重耦合,A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。A 系统要时时刻刻考虑 BCDE 四个系统如果挂了该咋办?要不要重发,要不要把消息存起来?头发都白了啊!

    如果使用 MQ,A 系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。

  2. 异步

    场景: A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s,用户感觉搞个什么东西,慢死了慢死了。用户通过浏览器发起请求,等待个 1s,这几乎是不可接受的。

    如果使用 MQ,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms,对于用户而言,其实感觉上就是点个按钮,8ms 以后就直接返回了。

  3. 削峰

    场景:每天 0:00 到 12:00,A 系统风平浪静,每秒并发请求数量就 50 个。结果每次一到 12:00 ~ 13:00 ,每秒并发请求数量突然会暴增到 5k+ 条。但是系统是直接基于 MySQL 的,大量的请求涌入 MySQL,每秒钟对 MySQL 执行约 5k 条 SQL。一般的 MySQL,扛到每秒 2k 个请求就差不多了,如果每秒请求到 5k 的话,可能就直接把 MySQL 给打死了,导致系统崩溃,用户也就没法再使用系统了。但是高峰期一过,到了下午的时候,就成了低峰期,可能也就 1w 的用户同时在网站上操作,每秒中的请求数量可能也就 50 个请求,对整个系统几乎没有任何的压力。

    如果使用 MQ,每秒 5k 个请求写入 MQ,A 系统每秒钟最多处理 2k 个请求,因为 MySQL 每秒钟最多处理 2k 个。A 系统从 MQ 中慢慢拉取请求,每秒钟就拉取 2k 个请求,不要超过自己每秒能处理的最大请求数量就 ok,这样下来,哪怕是高峰期的时候,A 系统也绝对不会挂掉。而 MQ 每秒钟 5k 个请求进来,就 2k 个请求出去,结果就导致在中午高峰期(1 个小时),可能有几十万甚至几百万的请求积压在 MQ 中。

2.1.2 安装步骤

  • 下载RocketMQ

    RocketMQ最新版本:4.9.0

    # 解压zip
    unzip rocketmq-all-4.9.0-bin-release.zip
    
  • 环境要求

    • Linux64位操作系统
    • JDK1.8(64位)
    • 源码安装需要Maven3.2.x

2.1.3 使用优缺点

  • 优点
  • 缺点

2.2 启动和关闭RocketMQ

  1. 启动和关闭NameServer
# 1.启动NameServer
nohup sh bin/mqnamesrv &
# 2.查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log
# 3.关闭NameServer
sh bin/mqshutdown namesrv
  1. 启动和关闭Broker
# 1.启动Broker autoCreateTopicEnable=true:允许自动创建Topic,且允许通过手工方式创建Topic
nohup sh bin/mqbroker -c ./conf/broker.conf -n localhost:9876 autoCreateTopicEnable=true & 
# 2.查看日志
tail -f ~/logs/rocketmqlogs/broker.log
# 3.关闭Broker
sh bin/mqshutdown broker
  1. 修改默认内存大小【内存大小不足的情况下】
# 1.修改runserver.sh 初始化内存大小
JAVA_OPT="${JAVA_OPT} -server -Xms521m -Xmx521m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
# 2.修改runbroker.sh 初始化内存大小
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"

2.3 测试RocketMQ

2.3.1 发送消息

# 1.设置环节变量
export NAMESRV_ADDR=localhost:9876
# 2.使用安装的demo包发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

2.3.2 接受消息

# 1.设置环节变量
export NAMESRV_ADDR=localhost:9876
# 2.接受消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

2.4 组件

  • Broker:实际处理业务的组件【消息的存储、转发】
  • NameServer:保存Broker相关的服务信息 类似注册中心
  • 消费者:消费消息
  • 生产者:消息源头

白话文理解组件相关作用

  1. 由生产者产生消息经过NameServer投递至Broker中
  2. 消费者从NameServer中调用Broker获取消息

2.4.1 关闭防火墙

# 1.关闭防火墙
systemctl stop firewalld.service

2.4.2 架构原理

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VB6ga1yd-1625476775579)(C:\Users\Laisheng\AppData\Roaming\Typora\typora-user-images\1625210735249.png)]

  • 四种集群模式

    • 单Master模式:单个Master节点, 缺点就是如果宕机之后可能整个服务不可用;
    • 多Master模式:多个Master节点,分摊存放我们的消息,缺点:没有Slave节点,主的Master节点宕机之后消息数据可能会丢失的;
    • 多Master多Slave模式-异步复制:多个Master和Slave节点 采用异步形式 效率非常高 数据可能短暂产生延迟(毫秒级别的)
    • 多Master多Slave模式-同步双写:多个Master和Slave节点 采用同步形式, 效率比较低、数据不会产生延迟。
  • 集群配置

    # 集群名称,可以区分不同集群,不同的业务可以建多个集群
    brokerClusterName=mayikt
    # Broker 的名称Master(主)和Slave(备)通过使用相同的Broker名称来表明相互关系,以说明某个Slave 是哪个Master的Slave 。
    brokerName=broker-a
    # 一个MasterBroker中可以有多个Slave, 0表示Master,大于0表示不同Slave 的ID 。
    brokerId=0 
    # 与fileReservedTim巳参数呼应,表明在几点做消息删除动作,默认值04 表示凌晨4 点。
    deleteWhen=04
    # nameServer集群配置地址
    namesrvAddr=192.168.1.1:9876;192.168.1.2:9876
    # 开启Topic自动创建
    autoCreateTopicEnable=true
    # topic默认创建的队列数
    defaultTopicQueueNums=4
    # 是否允许Broker自动创建订阅组,建议线下开启,线上关闭,默认【true】
    autoCreateSubscriptionGroup=true
    # Broker 监听的端口号,如果一台机器上启动了多个Broker , 则要设置不同的端口号,避免冲突。
    listenPort=10911
    brokerIp=192.168.1.12.5
    

2.5 分布式事务

2.5.1 产生背景

微服务多数据源背景下:

​ A服务是一个数据源,B服务是一个数据源,A服务需要异步推送数据至B服务,此时A服务已推送数据完毕的情况下,出现了异常情况,导致数据回滚,但是此时B服务已经收到了数据,通俗点来讲:就是一哥们去银行取钱,取了5W块钱,但是银行卡中的数字金额并没产生变化,所以产生了事务一致性问题。

2.5.2 解决思路

半消息:相当于消息投递过去,但是有个标识觉得是否真实投递至MQ中。

  1. 生产者(消息投递方)投递事务消息到Broker中,设置该消息为半消息不可以被消费;
  2. 开始执行我们的本地事务,将本地事务执行的结果(提交/回滚)发送给Broker;
  3. Broker获取回滚或者提交,如果是回滚的情况则删除该消息、如果是提交的话,该消息就可以被消费者消费;
  4. Broker如果没有及时的获取发送方本地事务结果的话,会主动查询本地事务结果。

2.RocketMQ使用

2.1 环境搭建

2.1.1 单机版

  1. 下载和安装RocketMQ 【参考2.1.2

  2. 修改RocketMQ默认内存大小【参考2.2

  3. 启动NameServer服务 【参考2.2

  4. 启动broker服务 【参考2.2

  5. 验证服务是否启动:jps

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nTGC5HUH-1625476775582)(C:\Users\Laisheng\AppData\Roaming\Typora\typora-user-images\1625448615536.png)]

2.1.2 集群版

四种集群模式:操作原理相同

这里只演示一种常用的:多个Master和Slave节点-异步复制

  1. 准备两台机器【192.168.1.1,192.168.1.2】,安装单机版步骤部署好所有环境

    每台机器上都要启动一个Master角色和Slave角色的Broker,并互为主备

    1. 即在A机器上启动broker-a的master节点、broker-b-s的slave节点;
    2. 在B机器上启动broker-b的master节点、broker-a-s的slave节点。
  2. 修改配置文件

    在conf目录下提供了几种集群方式配置文件的示例

    1. 2m-noslave:双master模式;

      1. 2m-2s-sync:双master双slave同步双写模式;
      2. 2m-2s-async:双master双slave异步复制模式;
    • 编辑192.168.1.1机器上的Master Broker的配置文件broker-a.properties

      # Licensed to the Apache Software Foundation (ASF) under one or more
      # contributor license agreements.  See the NOTICE file distributed with
      # this work for additional information regarding copyright ownership.
      # The ASF licenses this file to You under the Apache License, Version 2.0
      # (the "License"); you may not use this file except in compliance with
      # the License.  You may obtain a copy of the License at
      #
      #     http://www.apache.org/licenses/LICENSE-2.0
      #
      # Unless required by applicable law or agreed to in writing, software
      # distributed under the License is distributed on an "AS IS" BASIS,
      # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      # See the License for the specific language governing permissions and
      # limitations under the License.
      #所属集群名字
      brokerClusterName=rocketmq-cluster
      #broker名字,注意此处不同的配置文件填写的不一样  例如:在a.properties 文件中写 broker-a  在b.properties 文件中写 broker-b
      brokerName=broker-a
      #0 表示 Master,>0 表示 Slave
      brokerId=0
      #删除文件时间点,默认凌晨 4点
      deleteWhen=04
      #文件保留时间,默认 48 小时
      fileReservedTime=120
      #Broker 的角色,ASYNC_MASTER=异步复制Master,SYNC_MASTER=同步双写Master,SLAVE=slave节点
      brokerRole=ASYNC_MASTER
      #刷盘方式,ASYNC_FLUSH=异步刷盘,SYNC_FLUSH=同步刷盘 
      flushDiskType=SYNC_FLUSH
      #Broker 对外服务的监听端口
      listenPort=10911
      #nameServer地址,这里nameserver是单台,如果nameserver是多台集群的话,就用分号分割(即namesrvAddr=ip1:port1;ip2:port2;ip3:port3)
      namesrvAddr=192.168.1.1:9876;192.168.1.2:9876
      #每个topic对应队列的数量,默认为4,实际应参考consumer实例的数量,值过小不利于consumer负载均衡
      defaultTopicQueueNums=8
      #是否允许 Broker 自动创建Topic,生产建议关闭
      autoCreateTopicEnable=true
      #是否允许 Broker 自动创建订阅组,生产建议关闭
      autoCreateSubscriptionGroup=true
      #设置BrokerIP
      brokerIP1=192.168.1.1
      #存储路径
      storePathRootDir=/data/rocketmq-all-4.7.0-bin-release/data/store-a
      #commitLog 存储路径
      storePathCommitLog=/data/rocketmq-all-4.7.0-bin-release/data/store-a/commitlog
      #消费队列存储路径存储路径
      storePathConsumerQueue=/data/rocketmq-all-4.7.0-bin-release/data/store-a/consumequeue
      #消息索引存储路径
      storePathIndex=/data/rocketmq-all-4.7.0-bin-release/data/store-a/index
      #checkpoint 文件存储路径
      storeCheckpoint=/data/rocketmq-all-4.7.0-bin-release/data/store-a/checkpoint
      #abort 文件存储路径
      abortFile=/data/rocketmq-all-4.7.0-bin-release/data/store-a/abort
      #commitLog每个文件的大小默认1G
      mapedFileSizeCommitLog=1073741824
      #ConsumeQueue每个文件默认存30W条,根据业务情况调整
      mapedFileSizeConsumeQueue=300000
      
    • 编辑192.168.1.1机器上的Master Broker的配置文件broker-b-s.properties

      # Licensed to the Apache Software Foundation (ASF) under one or more
      # contributor license agreements.  See the NOTICE file distributed with
      # this work for additional information regarding copyright ownership.
      # The ASF licenses this file to You under the Apache License, Version 2.0
      # (the "License"); you may not use this file except in compliance with
      # the License.  You may obtain a copy of the License at
      #
      #     http://www.apache.org/licenses/LICENSE-2.0
      #
      # Unless required by applicable law or agreed to in writing, software
      # distributed under the License is distributed on an "AS IS" BASIS,
      # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      # See the License for the specific language governing permissions and
      # limitations under the License.
      #所属集群名字
      brokerClusterName=rocketmq-cluster
      #broker名字,注意此处不同的配置文件填写的不一样  例如:在a.properties 文件中写 broker-a  在b.properties 文件中写 broker-b
      brokerName=broker-b
      #0 表示 Master,>0 表示 Slave
      brokerId=1
      #删除文件时间点,默认凌晨 4点
      deleteWhen=04
      #文件保留时间,默认 48 小时
      fileReservedTime=120
      #Broker 的角色,ASYNC_MASTER=异步复制Master,SYNC_MASTER=同步双写Master,SLAVE=slave节点
      brokerRole=SLAVE
      #刷盘方式,ASYNC_FLUSH=异步刷盘,SYNC_FLUSH=同步刷盘 
      flushDiskType=SYNC_FLUSH
      #Broker 对外服务的监听端口
      listenPort=11011
      #nameServer地址,这里nameserver是单台,如果nameserver是多台集群的话,就用分号分割(即namesrvAddr=ip1:port1;ip2:port2;ip3:port3)
      namesrvAddr=192.168.1.1:9876;192.168.1.2:9876
      #每个topic对应队列的数量,默认为4,实际应参考consumer实例的数量,值过小不利于consumer负载均衡
      defaultTopicQueueNums=8
      #是否允许 Broker 自动创建Topic,生产建议关闭
      autoCreateTopicEnable=true
      #是否允许 Broker 自动创建订阅组,生产建议关闭
      autoCreateSubscriptionGroup=true
      #设置BrokerIP
      brokerIP1=192.168.1.1
      #存储路径
      storePathRootDir=/data/rocketmq-all-4.7.0-bin-release/data/store-b
      #commitLog 存储路径
      storePathCommitLog=/data/rocketmq-all-4.7.0-bin-release/data/store-b/commitlog
      #消费队列存储路径存储路径
      storePathConsumerQueue=/data/rocketmq-all-4.7.0-bin-release/data/store-b/consumequeue
      #消息索引存储路径
      storePathIndex=/data/rocketmq-all-4.7.0-bin-release/data/store-b/index
      #checkpoint 文件存储路径
      storeCheckpoint=/data/rocketmq-all-4.7.0-bin-release/data/store-b/checkpoint
      #abort 文件存储路径
      abortFile=/data/rocketmq-all-4.7.0-bin-release/data/store-b/abort
      #commitLog每个文件的大小默认1G
      mapedFileSizeCommitLog=1073741824
      #ConsumeQueue每个文件默认存30W条,根据业务情况调整
      mapedFileSizeConsumeQueue=300000
      
    • 编辑192.168.1.2机器上的Master Broker的配置文件broker-b.properties

      # Licensed to the Apache Software Foundation (ASF) under one or more
      # contributor license agreements.  See the NOTICE file distributed with
      # this work for additional information regarding copyright ownership.
      # The ASF licenses this file to You under the Apache License, Version 2.0
      # (the "License"); you may not use this file except in compliance with
      # the License.  You may obtain a copy of the License at
      #
      #     http://www.apache.org/licenses/LICENSE-2.0
      #
      # Unless required by applicable law or agreed to in writing, software
      # distributed under the License is distributed on an "AS IS" BASIS,
      # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      # See the License for the specific language governing permissions and
      # limitations under the License.
      #所属集群名字
      brokerClusterName=rocketmq-cluster
      #broker名字,注意此处不同的配置文件填写的不一样  例如:在a.properties 文件中写 broker-a  在b.properties 文件中写 broker-b
      brokerName=broker-b
      #0 表示 Master,>0 表示 Slave
      brokerId=0
      #删除文件时间点,默认凌晨 4点
      deleteWhen=04
      #文件保留时间,默认 48 小时
      fileReservedTime=120
      #Broker 的角色,ASYNC_MASTER=异步复制Master,SYNC_MASTER=同步双写Master,SLAVE=slave节点
      brokerRole=ASYNC_MASTER
      #刷盘方式,ASYNC_FLUSH=异步刷盘,SYNC_FLUSH=同步刷盘 
      flushDiskType=SYNC_FLUSH
      #Broker 对外服务的监听端口
      listenPort=10911
      #nameServer地址,这里nameserver是单台,如果nameserver是多台集群的话,就用分号分割(即namesrvAddr=ip1:port1;ip2:port2;ip3:port3)
      namesrvAddr=192.168.1.1:9876;192.168.1.2:9876
      #每个topic对应队列的数量,默认为4,实际应参考consumer实例的数量,值过小不利于consumer负载均衡
      defaultTopicQueueNums=8
      #是否允许 Broker 自动创建Topic,生产建议关闭
      autoCreateTopicEnable=true
      #是否允许 Broker 自动创建订阅组,生产建议关闭
      autoCreateSubscriptionGroup=true
      #设置BrokerIP
      brokerIP1=192.168.1.2
      #存储路径
      storePathRootDir=/data/rocketmq-all-4.7.0-bin-release/data/store-b
      #commitLog 存储路径
      storePathCommitLog=/data/rocketmq-all-4.7.0-bin-release/data/store-b/commitlog
      #消费队列存储路径存储路径
      storePathConsumerQueue=/data/rocketmq-all-4.7.0-bin-release/data/store-b/consumequeue
      #消息索引存储路径
      storePathIndex=/data/rocketmq-all-4.7.0-bin-release/data/store-b/index
      #checkpoint 文件存储路径
      storeCheckpoint=/data/rocketmq-all-4.7.0-bin-release/data/store-b/checkpoint
      #abort 文件存储路径
      abortFile=/data/rocketmq-all-4.7.0-bin-release/data/store-b/abort
      #commitLog每个文件的大小默认1G
      mapedFileSizeCommitLog=1073741824
      #ConsumeQueue每个文件默认存30W条,根据业务情况调整
      mapedFileSizeConsumeQueue=300000
      
    • 编辑192.168.1.2机器上的Master Broker的配置文件 broker-a-s.properties

      # Licensed to the Apache Software Foundation (ASF) under one or more
      # contributor license agreements.  See the NOTICE file distributed with
      # this work for additional information regarding copyright ownership.
      # The ASF licenses this file to You under the Apache License, Version 2.0
      # (the "License"); you may not use this file except in compliance with
      # the License.  You may obtain a copy of the License at
      #
      #     http://www.apache.org/licenses/LICENSE-2.0
      #
      # Unless required by applicable law or agreed to in writing, software
      # distributed under the License is distributed on an "AS IS" BASIS,
      # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      # See the License for the specific language governing permissions and
      # limitations under the License.
      #所属集群名字
      brokerClusterName=rocketmq-cluster
      #broker名字,注意此处不同的配置文件填写的不一样  例如:在a.properties 文件中写 broker-a  在b.properties 文件中写 broker-b
      brokerName=broker-a
      #0 表示 Master,>0 表示 Slave
      brokerId=1
      #删除文件时间点,默认凌晨 4点
      deleteWhen=04
      #文件保留时间,默认 48 小时
      fileReservedTime=120
      #Broker 的角色,ASYNC_MASTER=异步复制Master,SYNC_MASTER=同步双写Master,SLAVE=slave节点
      brokerRole=SLAVE
      #刷盘方式,ASYNC_FLUSH=异步刷盘,SYNC_FLUSH=同步刷盘 
      flushDiskType=SYNC_FLUSH
      #Broker 对外服务的监听端口
      listenPort=11011
      #nameServer地址,这里nameserver是单台,如果nameserver是多台集群的话,就用分号分割(即namesrvAddr=ip1:port1;ip2:port2;ip3:port3)
      namesrvAddr=192.168.1.1:9876;192.168.1.2:9876
      #每个topic对应队列的数量,默认为4,实际应参考consumer实例的数量,值过小不利于consumer负载均衡
      defaultTopicQueueNums=8
      #是否允许 Broker 自动创建Topic,生产建议关闭
      autoCreateTopicEnable=true
      #是否允许 Broker 自动创建订阅组,生产建议关闭
      autoCreateSubscriptionGroup=true
      #设置BrokerIP
      brokerIP1=192.168.1.2
      #存储路径
      storePathRootDir=/data/rocketmq-all-4.7.0-bin-release/data/store-a
      #commitLog 存储路径
      storePathCommitLog=/data/rocketmq-all-4.7.0-bin-release/data/store-a/commitlog
      #消费队列存储路径存储路径
      storePathConsumerQueue=/data/rocketmq-all-4.7.0-bin-release/data/store-a/consumequeue
      #消息索引存储路径
      storePathIndex=/data/rocketmq-all-4.7.0-bin-release/data/store-a/index
      #checkpoint 文件存储路径
      storeCheckpoint=/data/rocketmq-all-4.7.0-bin-release/data/store-a/checkpoint
      #abort 文件存储路径
      abortFile=/data/rocketmq-all-4.7.0-bin-release/data/store-a/abort
      #commitLog每个文件的大小默认1G
      mapedFileSizeCommitLog=1073741824
      #ConsumeQueue每个文件默认存30W条,根据业务情况调整
      mapedFileSizeConsumeQueue=300000
      
  3. 启动Broker服务

    • 启动192.168.1.1主Broker服务

      • nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties & 
        
    • 启动192.168.1.2主Broker服务

      • nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b.properties &
        
    • 启动192.168.1.2备Broker服务

      • nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties &
        
    • 启动192.168.1.1备Broker服务

      • nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties &
        
  4. 安装rocketmq-console控制台

    1. 下载打包rocketmq-console 下载地址:rocketmq-console

    2. 修改配置文件:rocketmq.config.namesrvAddr=‘192.168.1.1:9876;192.168.1.2:9876’

    3. 启动App.java 无异常访问:http://localhost:8080

      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-keCXCojG-1625476775584)(C:\Users\Laisheng\AppData\Roaming\Typora\typora-user-images\1625450979077.png)]

2.2 代码集成

2.2.1 简单示例

 <!--添加依赖-->
<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-client</artifactId>
   <version>4.3.0</version>
</dependency>
  • 同步发送消息,异步发送消息,单向发送消息

    package com;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    import org.junit.Test;
    
    import java.io.UnsupportedEncodingException;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author Laisheng
     * @version 1.0
     * @date 2021-07-05
     * @className RocketMQSimplenessTest
     * @description rockerMq简单示例API
     **/
    public class RocketMQSimplenessTest {
    
        /**
         * 同步发送消息
         * 引用场景:通知消息、短信通知、短信营销系统
         */
        @Test
        public void syncSendMsg() throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
            // 1.使用生产者组名称进行实例化
            DefaultMQProducer producer = new DefaultMQProducer("defaultProducer");
            // 2.指定服务器地址
            producer.setNamesrvAddr("192.168.1.20:9876");
            // 3.启动实例
            producer.start();
            for (int i = 0; i < 50; i++){
                Message msg = new Message("TopicTest" /* Topic 主题 */,
                        "TagA" /* Tag 版本*/,
                        ("Hello RocketMQ " +
                                i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body 消息主题*/
                );
                // 调用发送消息将消息传递到其中一个代理
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            }
            // 一旦生产者实例不再使用,就关闭它
            producer.shutdown();
        }
    
        /**
         * 异步发送消息
         * 使用场景:一般用于响应时间敏感的业务场景
         */
        @Test
        public void asyncSendMsg() throws MQClientException, InterruptedException {
            // 1.使用生产者组名称进行实例化
            DefaultMQProducer producer = new DefaultMQProducer("defaultProducer");
            // 2.指定服务器地址
            producer.setNamesrvAddr("192.168.1.20:9876");
            // 3.启动实例
            producer.start();
            // 4.在异步模式下声明发送失败之前在内部执行的最大重试次数
            producer.setRetryTimesWhenSendAsyncFailed(0);
            int messageCount = 50;
            // 5.计数初始化
            final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
            for (int i = 0; i < messageCount; i++) {
                try {
                    final int index = i;
                    Message msg = new Message("TopicTest",
                            "TagA",
                            "OrderID188",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    // 发送消息 SendCallback:发送完成时执行的回调,成功或不成功
                    producer.send(msg, new SendCallback() {
                        // 成功
                        @Override
                        public void onSuccess(SendResult sendResult) {
                            // 递减锁存器的计数,如果计数达到零,则释放所有等待的线程
                            countDownLatch.countDown();
                            System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                        }
                        // 失败
                        @Override
                        public void onException(Throwable e) {
                            // 递减锁存器的计数,如果计数达到零,则释放所有等待的线程
                            countDownLatch.countDown();
                            System.out.printf("%-10d Exception %s %n", index, e);
                            e.printStackTrace();
                        }
                    });
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            // 等待子线程运行结束
            countDownLatch.await(5, TimeUnit.SECONDS);
            // 一旦生产者实例不再使用,就关闭它
            producer.shutdown();
        }
    
        /**
         * 单项发送消息
         * 使用场景:日志收集。
         */
        @Test
        public void oneWaySendMsg() throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
            // 1.使用生产者组名称进行实例化
            DefaultMQProducer producer = new DefaultMQProducer("defaultProducer");
            // 2.指定服务器地址
            producer.setNamesrvAddr("192.168.1.20:9876");
            // 3.启动实例
            producer.start();
            for (int i = 0; i < 100; i++) {
                // 创建一个消息实例,指定主题、标记和消息主体
                Message msg = new Message("TopicTest" /* Topic 主题*/,
                        "TagA" /* Tag 版本*/,
                        ("Hello RocketMQ " +
                                i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body 消息主体*/
                );
                // 调用发送消息将消息传递到其中一个代理。
                producer.sendOneway(msg);
            }
            // 4. 等待发送完成
            Thread.sleep(5000);
            producer.shutdown();
        }
    }
    
  • 消费消息

    package com;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    /**
     * @author Laisheng
     * @date 2021/7/5 0005
     * @className Consumer
     * @description 消费消息
     * @version 1.0
     **/
    public class Consumer {
    
        public static void main(String[] args) throws InterruptedException, MQClientException {
            // 1.使用指定的使用者组名称实例化
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("defaultProducer");
            // 2.指定服务器地址
            consumer.setNamesrvAddr("192.168.1.20:9876");
            // 3.订阅一个主题来使用
            consumer.subscribe("TopicTest", "*");
            // 4.注册回调,以便在从代理获取的消息到达时执行
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msg, ConsumeConcurrentlyContext context) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            //5.启动使用者实例
            consumer.start();
            System.out.printf("Consumer Started.%n");
        }
    }
    

2.2.2 订单示例

  • 采用全局和分区并排序进行消息推送

    package com.producer;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.MQProducer;
    import org.apache.rocketmq.client.producer.MessageQueueSelector;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageQueue;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    import java.io.UnsupportedEncodingException;
    import java.util.List;
    
    /**
     * @author Laisheng
     * @version 1.0
     * @date 2021-07-05
     * @className OrderProducer
     * @description 采用全局和分区并排序进行消息推送
     **/
    public class OrderProducer {
    
        public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
            // 1.使用生产者组名称进行实例化
            DefaultMQProducer producer = new DefaultMQProducer("defaultProducer");
            // 2.指定服务器地址
            producer.setNamesrvAddr("192.168.1.20:9876");
            // 2.启动实例
            producer.start();
            // 3. 定义分区
            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 50; i++) {
                int orderId = i % 10;
                // 创建一个消息实例,指定主题、标记和消息主体
                Message msg = new Message("TopicOrderTest", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                /**
                 *  @param msg 要发送的消息
                 *  @param selector 消息队列选择器,通过它我们得到目标消息队列来传递消息.
                 *  @param arg 与消息队列选择器一起使用的参数
                 */
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);
                System.out.printf("%s%n", sendResult);
            }
            // 4. 一旦生产者实例不再使用,就关闭它
            producer.shutdown();
        }
    }
    
  • 订阅消息

    package com.consumer;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    import java.util.concurrent.atomic.AtomicLong;
    
    /**
     * @author Laisheng
     * @version 1.0
     * @date 2021-07-05
     * @className OrderedConsumer
     * @description 订阅消息
     **/
    public class OrderedConsumer {
    
        public static void main(String[] args) throws Exception {
    
            // 1.使用生产者组名称进行实例化
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("defaultProducer");
            // 2.指定服务器地址
            consumer.setNamesrvAddr("192.168.1.20:9876");
            // 3.消费者启动消费点 程序第一次启动从消息队列头获取数据 
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            // 4.订阅主题以消费订阅
            consumer.subscribe("TopicOrderTest", "TagA || TagC || TagD");
            // 5.监听消费
            consumer.registerMessageListener(new MessageListenerOrderly() {
                AtomicLong consumeTimes = new AtomicLong(0);
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msg, ConsumeOrderlyContext context) {
                    context.setAutoCommit(false);
                    System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msg + "%n");
                    this.consumeTimes.incrementAndGet();
                    if ((this.consumeTimes.get() % 2) == 0) {
                        return ConsumeOrderlyStatus.SUCCESS;
                    }
                    if ((this.consumeTimes.get() % 3) == 0) {
                        return ConsumeOrderlyStatus.ROLLBACK;
                    }
                    if ((this.consumeTimes.get() % 4) == 0) {
                        return ConsumeOrderlyStatus.COMMIT;
                    }
                    if ((this.consumeTimes.get() % 5) == 0) {
                        context.setSuspendCurrentQueueTimeMillis(3000);
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
            consumer.start();
            System.out.printf("Consumer Started.%n");
        }
    }
    

2.2.3 广播示例

广播:向所有订阅该主题的订阅者【消费者】发送消息

  • 生产者示例

    package com.producer;
    
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    /**
     * @author Laisheng
     * @date 2021/7/5 0005
     * @className BroadcastProducer
     * @description 广播
     * @version 1.0
     **/
    public class BroadcastProducer {
        public static void main(String[] args) throws Exception {
            DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
            // 2.指定服务器地址
            producer.setNamesrvAddr("192.168.1.20:9876");
            producer.start();
            for (int i = 0; i < 50; i++) {
                Message msg = new Message("TopicGroupTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            }
            producer.shutdown();
        }
    }
    
  • 消费者示例

    package com.consumer;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
    
    import java.util.List;
    
    /**
     * @author Laisheng
     * @date 2021/7/5 0005
     * @className BroadcastConsumer
     * @description 广播
     * @version 1.0
     **/
    public class BroadcastConsumer {
        public static void main(String[] args) throws Exception {
            // 1.使用生产者组名称进行实例化
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ProducerGroupName");
            // 2.指定服务器地址
            consumer.setNamesrvAddr("192.168.1.20:9876");
            // 3.消费者启动消费点 程序第一次启动从消息队列头获取数据
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            // 4.设置为广播模式
            consumer.setMessageModel(MessageModel.BROADCASTING);
            // 5.订阅主题以消费订阅
            consumer.subscribe("TopicGroupTest", "TagA || TagC || TagD");
            // 6.监听消费
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msg,ConsumeConcurrentlyContext context) {
                    System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msg + "%n");
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.printf("Broadcast Consumer Started.%n");
        }
    }
    

2.2.4 时间表示例

预定消息:预定消息与普通消息的不同之处在于,它们直到指定的时间之后才会被传递。

  • 启动消费者等待传入的订阅消息

    package com.consumer;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    /**
     * @author Laisheng
     * @date 2021/7/5 0005
     * @className ScheduledMessageConsumer
     * @description 启动消费者等待传入的订阅消息
     * @version 1.0
     **/
    public class ScheduledMessageConsumer {
        
         public static void main(String[] args) throws Exception {
             // 1.使用生产者组名称进行实例化
             DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
             // 2.指定服务器地址
             consumer.setNamesrvAddr("192.168.1.20:9876");
             // 3.订阅一个主题来使用
             consumer.subscribe("TestExampleTopic", "*");
             // 4. 注册消息监听器
             consumer.registerMessageListener(new MessageListenerConcurrently() {
                 @Override
                 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                     for (MessageExt message : messages) {
                         // 打印大致的延迟时间段
                         System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
                                 + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                     }
                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                 }
             });
             // 启动消费者
             consumer.start();
         }
     }
    
  • 发送预定消息

    package com.consumer;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    /**
     * @author Laisheng
     * @date 2021/7/5 0005
     * @className ScheduledMessageConsumer
     * @description 启动消费者等待传入的订阅消息
     * @version 1.0
     **/
    public class ScheduledMessageConsumer {
        
         public static void main(String[] args) throws Exception {
             // 1.使用生产者组名称进行实例化
             DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
             // 2.指定服务器地址
             consumer.setNamesrvAddr("192.168.1.20:9876");
             // 3.订阅一个主题来使用
             consumer.subscribe("TestExampleTopic", "*");
             // 4. 注册消息监听器
             consumer.registerMessageListener(new MessageListenerConcurrently() {
                 @Override
                 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                     for (MessageExt message : messages) {
                         // 打印大致的延迟时间段
                         System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
                                 + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                     }
                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                 }
             });
             // 启动消费者
             consumer.start();
         }
     }
    

2.2.5 批处理示例

为什么使用批处理?

  1. 批量发送消息提高了传递小消息的性能。

使用限制:

同一批次的消息应该具有:相同的主题,相同的 waitStoreMsgOK 并且没有调度支持。

此外,一批消息的总大小不应超过 1MiB。

  • 批处理发送消息【消息大小不超出1MiB】

    package com.producer;
    
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.common.message.Message;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @author Laisheng
     * @version 1.0
     * @date 2021-07-05
     * @className BatchingProducer
     * @description 批处理发送消息
     **/
    public class BatchingProducer {
    
        public static void main(String[] args) throws MQClientException {
            DefaultMQProducer producer = new DefaultMQProducer("ProducerBatchingName");
            // 2.指定服务器地址
            producer.setNamesrvAddr("192.168.1.20:9876");
            producer.start();
            String topic = "BatchTestTopic";
            List<Message> messages = new ArrayList<>();
            messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
            messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
            messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
            try {
                producer.send(messages);
            } catch (Exception e) {
               throw new RuntimeException("Send message failed");
            }finally {
                producer.shutdown();
            }
        }
    }
    
  • 批处理发送消息【消息大小超出1MiB】

    package com.producer;
    
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.common.message.Message;
    
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    
    /**
     * @author Laisheng
     * @version 1.0
     * @date 2021-07-05
     * @className BroadcastsProducer
     * @description 批处理发送消息【消息大小超出1MiB】
     **/
    public class BroadcastsProducer implements Iterator<List<Message>> {
    
        /**
         * 分页大小
         */
        private final int SIZE_LIMIT = 1000 * 1000;
    
        /**
         * 消息集合
         */
        private final List<Message> messages;
    
        /**
         * 当前索引
         */
        private int currIndex;
    
        public BroadcastsProducer(List<Message> messages) {
            this.messages = messages;
        }
    
        /**
         *检测集合中是否还有元素
         * @return
         */
        @Override
        public boolean hasNext() {
            return currIndex < messages.size();
        }
    
        /**
         * 返回迭代器的下一个元素
         * @return
         */
        @Override
        public List<Message> next() {
            // 当前索引
            int nextIndex = currIndex;
            for (int totalSize = 0; nextIndex < messages.size(); nextIndex++) {
                // 获取当前消息
                Message message = messages.get(nextIndex);
                // 主题长度+消息主体长度
                int tmpSize = message.getTopic().length() + message.getBody().length;
                // 获取当前消息的配置信息
                Map<String, String> properties = message.getProperties();
                // 获取key+value的总长度
                for (Map.Entry<String, String> entry : properties.entrySet()) {
                    tmpSize += entry.getKey().length() + entry.getValue().length();
                }
                // 用于日志开销
                tmpSize = tmpSize + 20;
                if (tmpSize > SIZE_LIMIT) {
                    // 单条消息超出 SIZE_LIMIT
                    // 这里就让它去吧,否则它会阻止分裂过程
                    if (nextIndex - currIndex == 0) {
                        // 如果下一个子列表没有元素,添加这个然后中断,否则就中断
                        nextIndex++;
                    }
                    break;
                }
                if (tmpSize + totalSize > SIZE_LIMIT) {
                    break;
                } else {
                    totalSize += tmpSize;
                }
    
            }
            // 拆分消息
            List<Message> subList = messages.subList(currIndex, nextIndex);
            currIndex = nextIndex;
            return subList;
        }
        
        public static void main(String[] args) throws MQClientException {
            DefaultMQProducer producer = new DefaultMQProducer("ProducerBatchingName");
            // 2.指定服务器地址
            producer.setNamesrvAddr("192.168.1.20:9876");
            producer.start();
            String topic = "BatchTestTopic";
            List<Message> messages = new ArrayList<>();
            messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
            messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
            messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
            // 把大名单分成小名单
            BroadcastsProducer splitter = new BroadcastsProducer(messages);
            while (splitter.hasNext()) {
                try {
                    List<Message> listItem = splitter.next();
                    producer.send(listItem);
                } catch (Exception e) {
                    throw new RuntimeException("Send message failed");
                }finally {
                    producer.shutdown();
                }
            }
        }
    }
    

2.2.6 过滤器示例

在大多数情况下,标签是一种简单而有用的设计,用于选择您想要的消息。例如:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.setNamesrvAddr("192.168.1.20:9876");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

消费者将收到包含 TAGA 或 TAGB 或 TAGC 的消息。但限制是一条消息只能有一个标签,这可能不适用于复杂的场景。在这种情况下,您可以使用 SQL 表达式来过滤消息。

原则

SQL 功能可以通过您在发送消息时输入的属性进行一些计算。在 RocketMQ 定义的语法下,你可以实现一些有趣的逻辑。下面是一个例子:

语法

RocketMQ 只定义了一些基本的语法来支持这个特性。您也可以轻松扩展它。

  1. 数值比较,如>, >=, <, <=, BETWEEN, =;
  2. 字符比较,如=, <>, IN;
  3. IS NULLIS NOT NULL;
  4. 逻辑AND, OR, NOT;

常量类型有:

  1. 数字,如 123、3.1415;
  2. 字符,如’abc’,必须用单引号引起;
  3. NULL,特殊常数;
  4. 布尔值,TRUEFALSE

使用限制

只有推送消费者才能通过 SQL92 选择消息。【consumer.subscribe(“TOPIC”, “TAGA || TAGB || TAGC”)】

  • 生产者示例

    package com.producer;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    import java.io.UnsupportedEncodingException;
    
    /**
     * @author Laisheng
     * @version 1.0
     * @date 2021-07-05
     * @className FiltrationProducer
     * @description 过滤器
     **/
    public class FiltrationProducer {
    
        public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
            // 1.使用生产者组名称进行实例化
            DefaultMQProducer producer = new DefaultMQProducer("ProducerFiltrationName");
            // 2.指定服务器地址
            producer.setNamesrvAddr("192.168.1.20:9876");
            // 3.启动
            producer.start();
            for (int i = 0; i < 50; i++) {
                Message msg = new Message("TopicFiltrationTest",
                        "TagA",
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
                );
                // 设置一些属性。
                msg.putUserProperty("a", String.valueOf(i));
                SendResult sendResult = producer.send(msg);
            }
            producer.shutdown();
        }
    }
    
  • 消费者示例

    package com.consumer;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.MessageSelector;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    /**
     * @author Laisheng
     * @version 1.0
     * @date 2021-07-05
     * @className FiltrationConsumer
     * @description 过滤器
     **/
    public class FiltrationConsumer {
    
        public static void main(String[] args) throws MQClientException {
            //  1.使用生产者组名称进行实例化
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ProducerFiltrationName");
            // 2.指定服务器地址
            consumer.setNamesrvAddr("192.168.1.20:9876");
            // 3.只有订阅消息具有属性 a, also a >=0 and a <= 3
            consumer.subscribe("TopicFiltrationTest", MessageSelector.bySql("a between 0 and 3"));
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msg, ConsumeConcurrentlyContext context) {
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start(); 
        }
    
    }
    

2.2.7 日志附加程序示例

RocketMQ logappender 提供了 log4j appender、log4j2 appender 和 logback appender 供业务使用

  • 使用 log4j 属性配置文件
log4j.appender.mq=org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender
log4j.appender.mq.Tag=yourTag
log4j.appender.mq.Topic=yourLogTopic
log4j.appender.mq.ProducerGroup=yourLogGroup
log4j.appender.mq.NameServerAddress=yourRocketmqNameserverAddress
log4j.appender.mq.layout=org.apache.log4j.PatternLayout
log4j.appender.mq.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-4r [%t] (%F:%L) %-5p - %m%n
  • 使用 log4j xml 配置文件时,将其配置为这样并添加一个异步附加程序:
<appender name="mqAppender1" class="org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender">
    <param name="Tag" value="yourTag" />
    <param name="Topic" value="yourLogTopic" />
    <param name="ProducerGroup" value="yourLogGroup" />
    <param name="NameServerAddress" value="yourRocketmqNameserverAddress"/>
    <layout class="org.apache.log4j.PatternLayout">
        <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss}-%p %t %c - %m%n" />
    </layout>
</appender>

<appender name="mqAsyncAppender1" class="org.apache.log4j.AsyncAppender">
    <param name="BufferSize" value="1024" />
    <param name="Blocking" value="false" />
    <appender-ref ref="mqAppender1"/>
</appender>
  • 使用 log4j2 时,配置如下
<!-- 如果你想要无块,只需为 ref 配置一个 asyncAppender -->
<RocketMQ name="rocketmqAppender" producerGroup="yourLogGroup" nameServerAddress="yourRocketmqNameserverAddress"
     topic="yourLogTopic" tag="yourTag">
    <PatternLayout pattern="%d [%p] hahahah %c %m%n"/>
</RocketMQ>
  • 使用 logback 时,还需要一个 asyncAppender
<appender name="mqAppender1" class="org.apache.rocketmq.logappender.logback.RocketmqLogbackAppender">
    <tag>yourTag</tag>
    <topic>yourLogTopic</topic>
    <producerGroup>yourLogGroup</producerGroup>
    <nameServerAddress>yourRocketmqNameserverAddress</nameServerAddress>
    <layout>
        <pattern>%date %p %t - %m%n</pattern>
    </layout>
</appender>

<appender name="mqAsyncAppender1" class="ch.qos.logback.classic.AsyncAppender">
    <queueSize>1024</queueSize>
    <discardingThreshold>80</discardingThreshold>
    <maxFlushTime>2000</maxFlushTime>
    <neverBlock>true</neverBlock>
    <appender-ref ref="mqAppender1"/>
</appender>

2.2.8 OpenMessageing示例

OpenMessaging,其中包括建立行业指南和消息传递、流规范,为金融、电子商务、物联网和大数据领域提供通用框架。设计原则是在分布式异构环境中面向云、简单、灵活和语言独立。符合这些规范将使开发跨所有主要平台和操作系统的异构消息传递应用程序成为可能。

RocketMQ 提供了 OpenMessaging 0.1.0-alpha 的部分实现,以下示例演示如何基于 OpenMessaging 访问 RocketMQ。


  • OMS生产者:下面的例子展示了如何以同步、异步或单向传输的方式向 RocketMQ broker 发送消息。
public class OMSProducer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

        final Producer producer = messagingAccessPoint.createProducer();

        messagingAccessPoint.startup();
        System.out.printf("MessagingAccessPoint startup OK%n");

        producer.startup();
        System.out.printf("Producer startup OK%n");

        {
            Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
            SendResult sendResult = producer.send(message);
            System.out.printf("Send sync message OK, msgId: %s%n", sendResult.messageId());
        }

        {
            final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
            result.addListener(new PromiseListener<SendResult>() {
                @Override
                public void operationCompleted(Promise<SendResult> promise) {
                    System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
                }

                @Override
                public void operationFailed(Promise<SendResult> promise) {
                    System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
                }
            });
        }

        {
            producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
            System.out.printf("Send oneway message OK%n");
        }

        producer.shutdown();
        messagingAccessPoint.shutdown();
    }
}
  • OMSP 拉消费者:使用 OMS PullConsumer 轮询来自指定队列的消息。
public class OMSPullConsumer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

        final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
            OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));

        messagingAccessPoint.startup();
        System.out.printf("MessagingAccessPoint startup OK%n");
        
        consumer.startup();
        System.out.printf("Consumer startup OK%n");

        Message message = consumer.poll();
        if (message != null) {
            String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
            System.out.printf("Received one message: %s%n", msgId);
            consumer.ack(msgId);
        }

        consumer.shutdown();
        messagingAccessPoint.shutdown();
    }
}

2.2.9 交易示例【事务】

什么是事务性消息?

它可以被认为是两阶段提交消息的实现,以确保分布式系统中的最终一致性。事务性消息确保本地事务的执行和消息的发送可以原子地执行。

使用限制

  1. 事务性消息没有调度和批量支持。
  2. 为了避免单条消息被检查次数过多导致半队列消息堆积,我们默认将单条消息的检查次数限制为15次,但用户可以通过更改“transactionCheckMax ”参数在broker的配置中,如果一条消息被检查了“transactionCheckMax”次,broker默认会丢弃这条消息并同时打印错误日志。用户可以通过覆盖“AbstractTransactionCheckListener”类来更改此行为。
  3. 交易消息会在一定时间后被检查,该时间由代理配置中的参数“transactionTimeout”确定。并且用户也可以在发送事务消息时通过设置用户属性“CHECK_IMMUNITY_TIME_IN_SECONDS”来改变这个限制,这个参数优先于“transactionMsgTimeout”参数。
  4. 一条交易消息可能被检查或消费不止一次。
  5. 向用户目标主题提交的消息回复可能会失败。目前,这取决于日志记录。高可用是由 RocketMQ 本身的高可用机制来保证的。如果要保证事务消息不丢失,保证事务完整性,建议使用同步双写机制。
  6. 事务性消息的生产者ID不能与其他类型消息的生产者ID共享。与其他类型的消息不同,事务性消息允许向后查询。MQ Server 通过生产者 ID 查询客户端。

应用

一、交易状态

事务消息有三种状态:

  • TransactionStatus.CommitTransaction:提交事务,表示允许消费者消费这条消息。
  • TransactionStatus.RollbackTransaction:回滚事务,表示消息将被删除,不允许消费。
  • TransactionStatus.Unknown:中间状态,表示需要MQ回检来确定状态。

二、发送交易消息

  1. 创建事务生产者
    使用TransactionMQProducer类创建生产者客户端,并指定唯一的producerGroup,可以设置自定义线程池来处理check请求。执行完本地事务后,需要根据执行结果回复MQ,回复状态如上一节所述。
package com.producer;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author Laisheng
 * @date 2021/7/5 0005
 * @className TransactionListenerImpl
 * @description 事务监听
 * @version 1.0
**/
public class TransactionListenerImpl implements TransactionListener {

 private AtomicInteger transactionIndex = new AtomicInteger(0);

 private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

 /**
     * 当发送事务性准备(半)消息成功时,将调用此方法执行本地事务。
     * @param msg
     * @param arg
     * @return
  */
 @Override
 public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
     int value = transactionIndex.getAndIncrement();
     int status = value % 3;
     localTrans.put(msg.getTransactionId(), status);
     return LocalTransactionState.UNKNOW;
 }

 /**
     * 当准备(半)消息没有响应时.broker会发送check消息检查交易状态,调用该方法获取本地交易状态。
     * @param msg
     * @return
  */
 @Override
 public LocalTransactionState checkLocalTransaction(MessageExt msg) {
     Integer status = localTrans.get(msg.getTransactionId());
     if (null != status) {
         switch (status) {
             case 0:
                 return LocalTransactionState.UNKNOW;
             case 1:
                 return LocalTransactionState.COMMIT_MESSAGE;
             case 2:
                 return LocalTransactionState.ROLLBACK_MESSAGE;
         }
     }
     return LocalTransactionState.COMMIT_MESSAGE;
 }
}

package com.producer;


import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.*;

/**
 * @author Laisheng
 * @version 1.0
 * @date 2021/7/5 0005
 * @className TransactionProducer
 * @description 事务
**/
public class TransactionProducer {

 public static void main(String[] args) throws MQClientException, InterruptedException {
     // 定义事务监听
     TransactionListener transactionListener = new TransactionListenerImpl();
     // 指定命名空间、生产者组和 RPC 钩子的构造函数。
     TransactionMQProducer producer = new TransactionMQProducer("transactionMQProducer");
     // 指定服务器地址
     producer.setNamesrvAddr("192.168.1.20:9876");
     // 定义线程池
     ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
         @Override
         public Thread newThread(Runnable r) {
             Thread thread = new Thread(r);
             thread.setName("client-transaction-msg-check-thread");
             return thread;
         }
     });

     producer.setExecutorService(executorService);
     producer.setTransactionListener(transactionListener);
     producer.start();
     String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
     for (int i = 0; i < 10; i++) {
         try {
             Message msg =
                     new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                             ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
             SendResult sendResult = producer.sendMessageInTransaction(msg, null);
             System.out.printf("%s%n", sendResult);
             Thread.sleep(10);
         } catch (MQClientException | UnsupportedEncodingException e) {
             e.printStackTrace();
         }
     }
     for (int i = 0; i < 100000; i++) {
         Thread.sleep(1000);
     }
     producer.shutdown();
 }
}

2.2.10 常见问题

用法

  1. 新创建的Consumer ID从哪里开始消费消息?
  • 如果主题在三天内发送了一条消息,则消费者从服务器中保存的第一条消息开始消费消息。
  • 如果主题在三天前发送消息,则消费者从服务器中的最新消息开始消费消息,换句话说,从消息队列的尾部开始。
  • 如果这样的消费者重新启动,则它开始从最后一个消费位置消费消息。
  1. 消费失败如何重新消费消息?
  • 集群消费模式消费业务逻辑代码返回Action.ReconsumerLater,NULL,或者抛出异常,如果一条消息消费失败,最多重试16次,之后消息将被丢弃。
  • 广播消费模式广播消费仍然确保消息至少被消费一次,但不提供重新发送选项。
  1. 消费失败如何查询失败消息?
  • 使用按时间主题查询,可以查询一段时间内的消息。
  • 使用Topic和Message Id来准确查询消息。
  • 使用Topic和Message Key可以准确查询具有相同Message Key的一类消息。
  1. 消息是否只传递一次?
  • RocketMQ 确保所有消息至少传递一次。在大多数情况下,消息不会重复。
  1. 如何添加新的消费者?
  • 启动一个新代理并将其注册到相同的名称服务器列表。
  • 默认情况下,仅自动创建内部系统主题和消费者组。如果您希望在新节点上拥有您的业务主题和消费者组,请从现有代理复制它们。提供了管理工具和命令行来处理此问题。

配置相关

  1. 消息在服务器上保存多长时间?
  • 存储的消息将最多保存 3 天,超过 3 天未使用的消息将被删除。
  1. 消息体的大小限制是多少?
  • 一般为256KB。
  1. 如何设置消费者线程数?
  • 启动Consumer时,设置一个ConsumeThreadNums属性,示例如下
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(20);

错误

  1. 如果你启动一个生产者或消费者失败并且错误信息是生产者组或消费者重复?
  • 原因:使用同一个Producer/Consumer Group在同一个JVM中启动多个Producer/Consumer实例可能会导致客户端无法启动。
  • 解决方法:确保一个Producer/Consumer Group对应的JVM只启动了一个Producer/Consumer实例。
  1. 如果消费者在广播模式下开始加载json文件失败?
  • 原因:Fastjson版本过低,无法让广播消费者加载本地offsets.json,导致消费者启动失败。损坏的 fastjson 文件也会导致同样的问题。
  • 解决方案:Fastjson 版本必须升级到rocketmq 客户端依赖版本,以保证本地的offsets.json 可以加载。默认情况下,offsets.json 文件在 /home/{user}/.rocketmq_offsets 中。或者检查fastjson的完整性。
  1. Broker宕机有什么影响?
  • 主Broker宕机:消息不能再发送到此代理集,但如果有另一个可用的代理集,则在存在主题的情况下仍然可以发送消息。消息仍然可以从奴隶消费。
  • 备Broker宕机:只要有另一个工作的slave,就不会影响发送消息。对消费消息也不会产生影响,除非消费者组设置为优先从该从站消费。默认情况下,消费者组从 master 消费。
  • 所有备Broker宕机:向master发送消息不会有任何影响,但是,如果master是SYNC_MASTER,producer会得到一个SLAVE_NOT_AVAILABLE,表示消息没有发送给任何slave。对消费消息也没有影响,除非消费者组设置为最好从slave消费。默认情况下,消费者组从 master 消费。
  1. Producer报错“No Topic Route Info”,如何诊断?
  • 产生背景:当您尝试将消息发送到其路由信息对生产者不可用的主题时,就会发生这种情况。
    • 确保生产者可以连接到名称服务器并且能够从中获取路由元信息。
    • 确保名称服务器确实包含主题的路由元信息。您可以使用管理工具或 Web 控制台通过 topicRoute 从名称服务器查询路由元信息。
    • 确保您的代理将心跳发送到您的生产者连接到的同一名称服务器列表。
    • 确保主题的权限为 6(rw-),或至少为 2(-w-)。
  • 如果找不到此主题,请通过管理工具命令 updateTopic 或 Web 控制台在代理上创建它。
上一篇:rocketmq-spring的consumer设置消费失败最大重试次数


下一篇:Kafka基础知识(个人总结)