RocketMQ快速入门

简介


Apache RocketMQ 是阿里开源的一款高性能、高吞吐量的分布式消息中间件,在由阿里捐赠给Apache软件基金会之后孵化成了Apache的一个*项目(Top-Level Project,TLP)。RocketMQ 是基于阿里闭源的 MetaQ 内核实现的,它服务于阿里的各个体系,尤其是在电商领域提供了极高的并发支撑。RocketMQ 是使用 Java 语言开发的,因此相较于业界流行的其它消息中间件比如 RabbitMQ(使用 Erlang 开发)、Kafka(使用 Scala 开发),它对于 Java 程序员来说更加友好,更加利于开发者在其底层的基础上进行封装和扩展。
RocketMQ 的官网是http://rocketmq.apache.org/,根据官网的介绍,Apache RocketMQ™  是一个标准化的消息引擎,轻量级的数据处理平台。它具备以下优势:


  • 低延迟:在高并发压力下,超过99.6%的响应延迟在1毫秒之内
  • 面向金融的:系统支撑的追踪和审计功能具备高可用性
  • 行业可发展性:保证了万亿级别的消息容量
  • 厂商中立的:在最新的4.1版本开放了一个新的分布式消息和流标准
  • 对大数据友好的:具备通用集成功能的批量传输支撑了海量吞吐
  • 大量的积累:只要给与足够的磁盘空间,就可以累积消息而不会造成性能损失


RocketMQ 诞生背景


在早期,阿里曾基于 ActiveMQ 5.x(低于5.3)构建了分布式消息中间件。 并将其用于他们的跨国公司的异步通信,搜索,社交网络活动流,数据管道,甚至交易过程中。 随着贸易业务吞吐量的增长,来自消息集群的压力也就变成了亟待解决的问题。


为什么设计 RocketMQ?


随着队列的增长和虚拟主题的使用,ActiveMQ IO 模块遇到了瓶颈。 尽管阿里尽力通过限流,熔断或降级来解决此问题,但效果不佳。 因此,那时他们开始关注流行的消息传递解决方案Kafka。 不幸的是,Kafka不能满足阿里的要求,特别是在低延迟和高可靠性方面,请参阅此处以了解详细信息。
在这种情况下,他们决定发明一个新的消息传递引擎来处理更广泛的用例集——从传统的发布/订阅方案到大批量实时零损失容忍的交易系统。


RocketMQ vs. ActiveMQ vs. Kafka


下表展示了RocketMQ,ActiveMQ和Kafka(依照awesome-java 来说,Apache RocketMQ是最流行的消息传递解决方案)之间的比较:

消息中间件

ActiveMQ

Kafka

RocketMQ

客户端SDK

Java,.NET,C++等

Java, Scala等

Java, C++, Go

协议和规范

推模型,支持OpenWire, STOMP, AMQP, MQTT, JMS

拉模型,支持TCP

拉模型,支持TCP,JMS,OpenMessaging

顺序消息

独立的消费者或者队列可以确保顺序

确保分区内消息的顺序

能确保对消息进行严格排序,并可以很好地扩展

定时消息

支持

不支持

支持

批量消息

不支持

支持,带有异步生产者

支持,使用异步模式来避免消息丢失

广播消息

支持

不支持

支持

消息过滤

支持

支持,可以使用Kafka流来过滤消息

支持,基于SQL92的属性过滤表达式

服务器触发的消息重新投递

不支持

不支持

支持

消息存储

使用JDBC以及高性能日志(例如levelDB,kahaDB)来支持非常快速的持久化操作

高性能文件存储

高性能且低延迟的文件存储

消息追溯性

支持

通过偏移量指示器来支持这一特性

支持时间戳和偏移量两种指示器

消息优先级

支持

不支持

不支持

高可用和故障转移

支持,取决于存储,如果使用kahadb,则需要一个ZooKeeper服务器

支持,需要一个ZooKeeper服务器

支持,主从模式,不需要其他工具

消息追踪

不支持

不支持

支持

配置

默认配置为低级别,用户需要优化配置参数

Kafka使用键值对格式进行配置。 这些值可以从文件或以编程方式提供

开箱即用,用户只需要关注少量的配置

管理和操作工具

支持

支持,使用终端命令展示核心指标

支持,丰富的Web和终端命令可显示核心指标


Quick Start


本部分内容将会介绍如何在本地的机器上快速安装一个 RocketMQ 用于收发消息,更多的细节可以访问 https://github.com/apache/rocketmq/tree/master/docs/cn进行查看。


前置要求


在安装 RokcetMQ 之前需要先安装如下软件:


  1. 64位操作系统,推荐使用 Linux/Unix/Mac 操作系统
  2. 64位 JDK 1.8 及以上的版本
  3. Maven 3.2.x
  4. Git
  5. 为 Broker server 至少预留 4G 的磁盘空间


下载、安装及使用


Linux


1. 安装jdk


  1. 首先在 oracle 官网下载 Linux jdk 8 的压缩包

RocketMQ快速入门

  1. 然后使用 ftp 工具将下载好的压缩包上传到 Linux 服务器上
  2. 使用 tar -zxvf jdk-8u261-linux-x64.tar.gz 命令解压文件
  3. 配置系统环境变量
[root@node01 opt]# vi /etc/profile


在文件末尾添加以下内容:


# jdk放置目录
JAVA_HOME=/opt/jdk1.8.0_226
# jre放置目录
JRE_HOME=/opt/jdk1.8.0_226/jre
# 配置 path 环境变量,以 : 分隔
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
# 配置 classpath 环境变量,以 : 分隔
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
# 设置环境变量
export JAVA_HOME JRE_HOME PATH CLASSPATH


  1. 使配置文件的变更生效


[root@node01 opt]# source /etc/profile


  1. 查看 jdk 版本,至此,jdk 就已经安装完毕


[root@node01 opt]# java -version
java version "1.8.0_226"
Java(TM) SE Runtime Environment (build 1.8.0_226-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.226-b11, mixed mode)


2. 安装maven


  1. 使用 wget 下载 maven


[root@node01 opt]# wget https://mirrors.bfsu.edu.cn/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz


  1. 解压


[root@node01 opt]# tar -zxvf apache-maven-3.6.3-bin.tar.gz


  1. 添加阿里云镜像


[root@node01 opt]# cd apache-maven-3.6.3/conf/
[root@node01 conf]# ll
总用量 20
drwxr-xr-x. 2 root root  4096 117 2019 logging
-rw-r--r--. 1 root root 10468 117 2019 settings.xml
-rw-r--r--. 1 root root  3747 117 2019 toolchains.xml
[root@node01 conf]# vi settings.xml


在配置文件 settings.xml 中加入如下代码:


 

<mirror> 
    <id>aliyun-maven</id> 
    <mirrorOf>*</mirrorOf> 
    <name>aliyun maven</name> 
    <url>http://maven.aliyun.com/nexus/content/groups/public</url> 
</mirror>


  1. 配置环境变量


[root@node01 conf]# vi /etc/profile


在文件中添加如下配置:


# maven 安装目录
M2_HOME=/opt/apache-maven-3.6.3
PATH=$PATH:$M2_HOME/bin
export M2_HOME


[root@node01 opt]# source /etc/profile


  1. 查看 maven 版本,至此,maven 就安装完毕了


[root@node01 conf]# mvn -v
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /opt/apache-maven-3.6.3
Java version: 1.8.0_226, vendor: Oracle Corporation, runtime: /opt/jdk1.8.0_226/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-1127.el7.x86_64", arch: "amd64", family: "unix"


3. 安装rocketmq


  1. 下载 rocketmq 源码包


[root@node01 opt]# wget https://mirror.bit.edu.cn/apache/rocketmq/4.7.1/rocketmq-all-4.7.1-source-release.zip


  1. 解压


[root@node01 opt]# yum install -y unzip
[root@node01 opt]# unzip rocketmq-all-4.7.1-source-release.zip


  1. 去到解压后的文件目录中进行编译


[root@node01 rocketmq-all-4.7.1-source-release]# mvn -Prelease-all -DskipTests clean install -U


  1. 将编译好的文件移个位置


[root@node01 rocketmq-all-4.7.1-source-release]# cd distribution/target/rocketmq-4.7.1
[root@node01 rocketmq-4.7.1]# ll
总用量 4
drwxr-xr-x. 6 root root 4096 829 15:42 rocketmq-4.7.1
[root@node01 rocketmq-4.7.1]# mv rocketmq-4.7.1/ /opt


  1. 启动 Name Server,如果在终端显示如下信息,则说明启动成功了:


[root@node01 opt]# cd rocketmq-4.7.1/bin/
[root@node01 bin]# ls
cachedog.sh       dledger      mqbroker            mqbroker.numanode1  mqnamesrv      mqshutdown.cmd  play.sh        runbroker.sh   setcache.sh   tools.sh
cleancache.sh     mqadmin      mqbroker.cmd        mqbroker.numanode2  mqnamesrv.cmd  os.sh           README.md      runserver.cmd  startfsrv.sh
cleancache.v1.sh  mqadmin.cmd  mqbroker.numanode0  mqbroker.numanode3  mqshutdown     play.cmd        runbroker.cmd  runserver.sh   tools.cmd
[root@node01 bin]# ./mqnamesrv
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON


  1. 启动 broker
    使用命令启动 broker,会发现报如下的错误:


[root@node01 bin]# ./mqbroker -n localhost:9876
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /opt/rocketmq-4.7.1/bin/hs_err_pid9514.log


从错误信息我们可以得知原因是 jvm 启动初始化内存分配大于物理内存。
因此,我们可以修改启动脚本中的 jvm 参数来解决这个问题,首先,让我们来修改nameserver的启动脚本:


[root@node01 bin]# vi runserver.sh


找到分配jvm内存的配置行:


JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"


将它修改为如下配置(具体配置数值根据实际生产场景进行调整):


JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"


然后修改broker的启动脚本:


[root@node01 bin]# vi runbroker.sh


同样是找到分配jvm内存的配置行:


JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"


将它修改为如下配置(具体配置数值根据实际生产场景进行调整):


JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"


修改完之后重启 nameserver 和 broker,当出现如下信息,说明 broker 已经启动成功:


[root@node01 bin]# ./mqbroker -n localhost:9876
The broker[node01, 192.168.114.60:10911] boot success. serializeType=JSON and name server is localhost:9876


  1. 执行测试程序测试消息发送和接收
    在发送/接收消息之前,我们需要告诉客户端 Name Server 的位置。RocketMQ提供了多种方法来实现这一目标。为了简单起见,我们通过设置环境变量 NAMESRV_ADDR 来实现,在 /etc/profile 中加入如下配置:


export NAMESRV_ADDR=localhost:9876


使用测试程序发送消息:


./tools.sh org.apache.rocketmq.example.quickstart.Producer


显示如下信息则表示消息发送成功:

RocketMQ快速入门

另起一个会话,然后用测试程序来接收消息:


./tools.sh org.apache.rocketmq.example.quickstart.Consumer


显示如下信息则表示消息接收成功:

RocketMQ快速入门
8. 关闭

关闭 broker:


[root@node01 bin]# ./mqshutdown broker
The mqbroker(1502) is running...
Send shutdown request to mqbroker(1502) OK


关闭 name server:


[root@node01 bin]# ./mqshutdown namesrv
The mqnamesrv(1444) is running...
Send shutdown request to mqnamesrv(1444) OK


Windows


本文主要讲述的是 windows 10 操作系统下 RocketMQ 的安装,请确保你的操作系统中已经安装了 PowerShell。和 Linux 一样,在 Windows 安装 RocketMQ 需要先安装 jdk 和 maven ,本文就不再细述如何安装 jdk 和 maven 了,请自行查阅资料安装。


1. 安装 RocketMQ


  1. 首先,在官网下载 RocketMQ 的二进制压缩包,然后选择一个本地目录进行解压缩
  2. 配置环境变量,添加如下两个环境变量:

RocketMQ快速入门

RocketMQ快速入门

RocketMQ快速入门

打开 runbroker.cmd,修改如下配置行:


set "JAVA_OPT=%JAVA_OPT% -cp %CLASSPATH%"


修改后的配置行如下:


rem set "JAVA_OPT=%JAVA_OPT% -cp "%CLASSPATH%"


  1. 启动 name server:

RocketMQ快速入门

  1. 启动 broker:

RocketMQ快速入门

当控制台显示如下信息时,则说明启动成功了:

RocketMQ快速入门

  1. 使用测试程序发送/接收消息

发送消息:

RocketMQ快速入门

接收消息:

RocketMQ快速入门

  1. 关闭
    直接关闭 cmd/powershell 即可(不要在生产环境这样做)


安装启动时可能会遇到的问题


1. 编译时包无法在mirror上找到 提示502错误


原因:网络不好或maven仓库服务器出错
重试即可,或者换一个镜像仓库


2. 启动broker失败,报 Cannot allocate memory 错误


此问题在上文中已经叙述过了,在此就不再赘述了


3. 启动broker成功但提示:Failed to obtain the host name


原因:无法解析当前的主机名
在/etc/hosts里添加映射即可


# 配置 ip 地址到主机名的映射
192.168.114.60 node-01


4. linux日期校准


安装ntpdate


yum install ntpdate
ntpdate ntp1.aliyun.com


控制台rocketmq-console


编译安装


1. 下载


编译源码包下载地址:https://github.com/apache/rocketmq-externals
中文指南
https://github.com/apache/rocketmq-externals/blob/master/rocketmq-console/doc/1_0_0/UserGuide_CN.md


2. 将源码包上传到服务器并解压缩


[root@node01 opt]# unzip rocketmq-externals.zip


3. 编译


[root@node01 opt]# cd rocketmq-externals/rocketmq-console/
[root@node01 rocketmq-console]# mvn clean package -Dmaven.test.skip=true


4. 启动


编译成功后在rocketmq-console/target目录下执行rocketmq-console-ng-2.0.0.jar
启动(需要确保你的 name server 已经启动了),直接动态添加nameserver地址即可,或者你也可以通过编辑rocketmq-console/src/main/resources目录下的application.properties配置文件添加rocketmq.config.namesrvAddr属性来配置nameserver地址。


[root@node01 rocketmq-console]# ll
总用量 68
drwxr-xr-x. 3 root root  4096 830 10:50 doc
-rw-r--r--. 1 root root 30422 830 10:50 LICENSE
-rw-r--r--. 1 root root   180 830 10:50 NOTICE
-rw-r--r--. 1 root root 10593 830 10:50 pom.xml
-rw-r--r--. 1 root root  2390 830 10:50 README.md
drwxr-xr-x. 4 root root  4096 830 10:50 src
drwxr-xr-x. 3 root root  4096 830 10:50 style
drwxr-xr-x. 7 root root  4096 830 10:59 target
[root@node01 rocketmq-console]# cd target/
[root@node01 target]# java -jar rocketmq-console-ng-2.0.0.jar --rocketmq.config.namesrvAddr=localhost:9876


当出现如下界面时就说明启动成功了:

RocketMQ快速入门

启动成功后访问服务器 8080 端口即可。

RocketMQ快速入门


界面组成


RocketMQ 是面向集群而生的,这一点从 rocketmq-console 的界面上就能体现出来。


运维


RocketMQ快速入门


  • NameSvrAddrList:配置 name server 的地址,它是个列表,因此可以配置多个 name server 的地址,这说明 name server 是可以集群部署的
  • IsUseVIPChannel:是否使用VIPChannel


驾驶舱


RocketMQ快速入门


  • Broker TOP 10:查看消息量最多的10个broker的消息量(总量)
  • Broker 5min trend:查看broker消息量5分钟的趋势
  • 主题 TOP 10:查看消息量最多的10个单一主题的消息量(总量)
  • 主题 5min trend:查看主题消息量5分钟的趋势


集群


RocketMQ快速入门


  • 查看集群的分布情况
    • cluster与broker关系
    • cluster中包含的broker
  • 查看broker具体信息/运行信息/状态信息
  • 查看broker配置信息


主题


RocketMQ快速入门


  • 展示所有的主题,可以通过主题名称进行过滤
  • 筛选 普通/重试/死信/系统 主题
  • 新增/更新主题
    • clusterName:集群名
    • brokerName:主机名
    • topicName:主题名
    • writeQueueNums:写队列数量
    • readQueueNums:读队列数量
    • perm:2是写 4是读 6是读写
  • 状态 查询消息投递状态(投递到哪些broker/哪些queue/多少量等)
  • 路由 查看消息的路由(现在你发这个主题的消息会发往哪些broker,对应broker的queue信息)
  • CONSUMER管理(这个topic都被哪些group订阅了、消费了,消费情况何如)
  • topic配置(查看/变更当前的topic的配置)
  • 发送消息(向这个主题发送一个测试消息)
  • 重置消费位点(分为在线和离线两种情况,不过都需要检查重置是否成功)
  • 删除主题 (会删除掉所有broker以及namesrv上的主题配置和路由信息,在生产环境上请慎重进行这项操作)


消费者


RocketMQ快速入门


  • 展示所有的消费组,可以通过组名进行过滤
  • 刷新页面/每隔五秒定时刷新页面
  • 按照订阅组/数量/TPS/延迟 进行排序
    • 新增/更新消费组
    • clusterName :集群名
    • brokerName:主机名
    • groupName:消费组名字
    • consumeEnable:是否可以消费,设置为FALSE的话将无法进行消费
    • consumeBroadcastEnable:是否可以广播消费
    • retryQueueNums:重试队列的大小
    • brokerId:正常情况从哪消费
    • whichBrokerWhenConsumeSlowly:出问题了从哪消费
  • 终端:在线的消费客户端查看,包括版本订阅信息和消费模式
  • 消费详情:对应消费组的消费明细查看,这个消费组订阅的所有Topic的消费情况,每个queue对应的消费client查看(包括Retry消息)
  • 配置:查看/变更消费组的配置
  • 删除:在指定的broker上删除消费组(谨慎操作)


生产者


RocketMQ快速入门


  • 展示在线的消息生产者客户端(主机、版本、地址等信息),可以通过组名进行过滤
  • 通过主题进行筛选


消息


RocketMQ快速入门


  • 根据Topic和时间区间查询(由于数据量大 最多只会展示2000条,多的会被忽略)
  • 根据Message Key和Topic进行查询
    • 最多只会展示64条
  • 根据Message Id和Topic进行消息的查询
  • 消息详情:展示这条消息的详细信息,查看消息对应到具体消费组的消费情况(如果异常,可以查看具体的异常信息)。可以向指定的消费组重发消息。


消息轨迹


RocketMQ快速入门


  • 根据Message Key和Topic进行查询
    • 最多只会展示64条
  • 根据Message Id和Topic进行消息的查询
  • 消息轨迹详情:展示展示这条消息的消息轨迹的详细信息


参考资料


上一篇:Scala入门到精通——第十四节 Case Class与模式匹配(一)


下一篇:Android性能调优篇之探索JVM内存分配