Rocket简介以及单机版安装

1. MQ简介

1. MQ用途

1. 限流削峰

  mq 可以将系统的超量请求暂存其中,以便系统后期可以慢慢进行处理,从而避免了请求的丢失或系统被压垮。

2. 异步解耦

  服务之间同步调用改为通过MQ异步调用的方式,一方发消息,一方接收到消息之后进行处理。

3. 数据收集

  分布式系统会产生海量数据,比如业务日志、监控数据、用户行为等。针对这些数据进行实时或批量采集汇总,然后对这些数据流进行大数据分析。通过MQ完成此类数据收集是最好的选择。

2. 常见MQ对比

Rocket简介以及单机版安装

JMS: Java Message Service。 定义了几种消息类型,简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)

AMQP: Advanced Message Queuing Protocol。 协议指定了exchange交换机、queue消息队列、binding 绑定提供交换机和queue之间的路由规则。

3. RocketMQ 简介

  RocketMQ 是一个统一消息引擎、轻量级数据处理平台。是阿里巴巴开源的消息中间件,2016年11月28你啊你,捐赠给apache 软件基金会, 成为apache 的孵化项目。

4. RocketMQ 基本概念

1. 消息

  消息系统所传输的物理载体,生产和消费的最小单位,每个消息都必须属于一个Topic

2. 主题topic

  表示一类消息的集合,每个主题包含若干条消息,每个消息只能属于一个主题,是MQ进行消息订阅的基本单位。

  一个生产者可以同时发送多种Topic的消息,而一个消费者只能对一个Topic消费。

3. 标签tag

  为消息设置的标签,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,而已根据业务在同一主题下设置不同标签。标签能有效地保持代码的清晰度和连贯性,并优化RocketMQ查询。消费者可以根据不同的tag 实现对不同子主题的消费,实现更好的扩展。

  Topic是消息的一级分类,Tag是消息的二级分类。

4. 队列queue

  存储消息的物理实体。一个topic 可以包含多个Queue,每个Queue中存放的是该Topic 的消息, 一个Topic 的queue 也被称为一个Topic 中消息的分区(Partition)。

  一个Topic 的Queue中的消息只能被一个消费者组中的一个消费者消费;一个Queue 中的消息不允许同一个消费者组中的多个消费者同时消费。

  还有一个分片的概念。在RocketMQ 中分片指的是存放相应Topic 的Broker。 每个分片中会创建出相应数量的分区,即Queue, 每个Queue 的大小都是相同的。

5. 消息标识

  RocketMQ 中每个消息拥有唯一的MessageId,且可以携带具有业务标识的Key, 以便对消息的查询。 不过需要注意的是,MessageId 有两个: 在生产者send() 消息时会自动生成一个MessageId (msgId), 当消息到达Broker 后,Broker 也会自动生成一个MessageId (offsetMsgId)。 msgId、offsetMsgId 与 key 都称为消息标识。

  msgId: 由producer 端生成,其生成规则为: producerIp + 进程pid + MessageClientIdSetter 的ClassLoader的hashCode + 当前时间 + AutomicInteger 自增计数器

  offsetMsgId: 由broker 端生成,其生成规则为: brokerIp + 物理分区的offset(queue 中的偏移量)

  key: 由用户指定的业务相关的唯一标识

5. 系统架构

Rocket简介以及单机版安装

1. Producer

  消息生产者,负责生产消息。Producer 通过MQ 的负载均衡模块选择相应的Broker 集群队列进行消息投递,投递的过程中支持快速失败并且低延迟。

  生产者都是以生产者组出现的。 生产者组是一类生产者的集合,这类Producer 发送相同Topic 类型的消息。 一个生产者组可以同时发送多个Topic 的消息。

2. Consumer

  消费者。消费者都是以消费者组的形式出现的。消费者组是同一类消费者的集合,这类Consumer 消费的是同一个Topic 的消息。消费者组使得在消息消费方面,实现负载均衡(将一个Topic 的不同Queue 平均分配给同一个消费者组的不同Consumer,注意并不是将消息负载均衡)和容错(一个消费者挂了,该组的其他Consumer 可以继续消费原Consumer 消费的Queue)变得非常容易。

  消费者组中的Consumer 的数量小于等于订阅Topic 的queue 的数量。 如果超出,则多出的Consumer 将不能消费消息。

  一个Topic 类型的消息可以被多个消费者组消费。

  消费者组只能消费一个Topic 类型的消息, 不能同时消费多个Topic 类型的消息; 一个消费者组中的消费者必须订阅相同的Topic。

3. nameServer

  是一个Broker 与 Topic 路由的注册中心,支持Broker 的动态注册与发现。主要功能包括:

(1) broker 管理: 接收Broker 集群的注册信息以及心跳等

(2) 路由信息管理: 每个NameServer 保存着broker 集群的整个路由信息和用于客户端查询的队列信息。Producer 和 Consumer 可以通过NameServer 获取Broker 的路由信息,从而进行消息投递和消费。

补充: 客户端nameserver 选择策略, 客户端指的是生产者和消费者

  客户端在配置时必须写上nameServer 集群的地址,选择的时候先生产一个随机数,然后再与NameServer 节点数量取模得到所索引,然后进行连接,连接失败则会进行round-robin 策略。也就是首先采用随机,失败后采用轮询。

4. broker

  充当消息中转角色,负责存储消息、转发消息。存储Producer 生产的消息,同时为Consumer 拉取消息做准备。也存着消息相关的元数据,包括消费者组进度偏移offset、主题、队列等。

  broker 功能示意图如下:

Rocket简介以及单机版安装

ClientManager: 客户端管理,负责接收解析客户端(Producer/ consumer)请求

Store Service: 负责存储。 提供API接口,处理消息存储到物理硬盘和消息查询

HA Service: 高可用。 提供Master Broker 和 Slave Broker 之间的消息同步。

Index Service: 索引服务。根据特定的Message Key,对投递到Broker 的消息进行索引,同时也提供根据MessageKey 快速查询。

  注意Broker 集群方案为主从架构,一个Master 可以包含多个Slave。 Master 处理读写, slave 负责对master 的数据进行备份。master 与slave 的对应关系是通过指定相同的broker name、不同的BrokerId 来确定的。BrokerId 为0 标识为master, 非0 标识slave。每个Broker 与NameServer 集群中的所有节点建立长连接, 定时注册Topic 信息到所有的 NameServer。

6. 工作流程

1. 启动nameServer,监听指定端口,等待broker、producer、consumer 的连接

2. 启动broker,broker 与nameserver 集群中的所有节点建立长连接,然后每30 s向NameServer 定时发送心跳包

3. 发消息前或者发送消息时,创建Topic, 需要将Topic 与 Broker 的路由关系写入到NameServer

4. Producer 跟NameServer 集群中的一个节点建立长链接,并从NameServer 中获取路由信息,即发送的Topic 的Queue 与Broker(IP + Port)的映射关系。然后根据算法选择一个Queue,与Queue 所在的Broker 建立长连接发送消息。Producer 会将路由信息缓存到自己本地,每30 s 从nameServer更新一次。

5. Consumer 和 Producer 类似,不同的是还会每30 s向broker 发送心跳,以确保存活状态。

2. 单机版安装

参考: https://rocketmq.apache.org/docs/quick-start/

 1. 下载源码并且编译

(1) 下载源码

(2) 解压后使用mvn 进行编译安装

mvn -Prelease-all -DskipTests clean install -U

(3) 解压安装之后到target 目录下会看到有可执行文件生成 %rocketmq-all-4.9.2%\distribution\target\rocketmq-4.9.2.zip

2. 启动

我们把该文件传输到linux 服务器,然后按照教程启动即可。当然windows 下面也可以启动,为了方便我们以linux 为例子

(1) 解压

unzip ./rocketmq-4.9.2.zip

(2) 启动nameserver (如果是虚拟机启动的可能需要修改下启动的JVM参数, 修改bin/runserver.sh )

nohup sh bin/mqnamesrv &

(3) 启动broker (如果是虚拟机启动的可能需要修改下启动的JVM参数, 修改bin/runbroker.sh )

nohup sh bin/mqbroker -n localhost:9876 &

jps 查看启动的java 相关进程

[root@localhost rocketmq-4.9.2]# jps -l
8981 org.apache.rocketmq.namesrv.NamesrvStartup
9093 sun.tools.jps.Jps
9017 org.apache.rocketmq.broker.BrokerStartup

(4) 测试发送消息和接收消息

- 设置环境变量

export NAMESRV_ADDR=localhost:9876

- 发送消息

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

最终控制台打出的相关日志如下:

SendResult [sendStatus=SEND_OK, msgId=7F00000123B70194FA3E0C205C4903E4, offsetMsgId=C0A80D8F00002A9F000000000008C5B6, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost, queueId=3], queueOffset=749]
SendResult [sendStatus=SEND_OK, msgId=7F00000123B70194FA3E0C205C4A03E5, offsetMsgId=C0A80D8F00002A9F000000000008C676, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost, queueId=0], queueOffset=749]
SendResult [sendStatus=SEND_OK, msgId=7F00000123B70194FA3E0C205C4B03E6, offsetMsgId=C0A80D8F00002A9F000000000008C736, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost, queueId=1], queueOffset=749]
SendResult [sendStatus=SEND_OK, msgId=7F00000123B70194FA3E0C205C4D03E7, offsetMsgId=C0A80D8F00002A9F000000000008C7F6, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost, queueId=2], queueOffset=749]
。。。

- 接收消息

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

最終控制台打出的日志如下:

ConsumeMessageThread_8 Receive New Messages: [MessageExt [brokerName=localhost, queueId=3, storeSize=192, queueOffset=720, sysFlag=0, bornTimestamp=1641216646960, bornHost=/192.168.13.143:49262, storeTimestamp=1641216646963, storeHost=/192.168.13.143:10911, msgId=C0A80D8F00002A9F0000000000086EB6, commitLogOffset=552630, bodyCRC=1191992521, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=750, CONSUME_START_TIME=1641216703248, UNIQ_KEY=7F00000123B70194FA3E0C205AB00370, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56, 56, 48], transactionId='null'}]] 
ConsumeMessageThread_20 Receive New Messages: [MessageExt [brokerName=localhost, queueId=3, storeSize=192, queueOffset=719, sysFlag=0, bornTimestamp=1641216646951, bornHost=/192.168.13.143:49262, storeTimestamp=1641216646953, storeHost=/192.168.13.143:10911, msgId=C0A80D8F00002A9F0000000000086BB6, commitLogOffset=551862, bodyCRC=704111923, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=750, CONSUME_START_TIME=1641216703248, UNIQ_KEY=7F00000123B70194FA3E0C205AA6036C, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56, 55, 54], transactionId='null'}]] 
。。。

(5) 关闭Server

关闭name server 还是broker, 都是使用 bin/mqshutdown 命令

[root@localhost rocketmq-4.9.2]# sh bin/mqshutdown broker
The mqbroker(9017) is running...
Send shutdown request to mqbroker(9017) OK
[root@localhost rocketmq-4.9.2]# sh bin/mqshutdown namesrv
The mqnamesrv(8981) is running...
Send shutdown request to mqnamesrv(8981) OK
[2]+  Exit 143                nohup sh bin/mqbroker -n localhost:9876

3. 安裝可视化web 界面

  可视化界面可以运行于windows, 也可以运行于linux,只需要访问到nameserver 即可

1. 下载

https://github.com/apache/rocketmq-externals/tags    下载 rocketmq-console-1.0.0

2. 修改配置

下载的是一个Springboot 源码工程,修改application.properties 文件,修改server.port 和 rocketmq.config.namesrvAddr 地址,如下:

server.contextPath=
server.port=7000
#spring.application.index=true
spring.application.name=rocketmq-console
spring.http.encoding.charset=UTF-8
spring.http.encoding.enabled=true
spring.http.encoding.force=true
logging.config=classpath:logback.xml
#if this value is empty,use env value rocketmq.config.namesrvAddr  NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
rocketmq.config.namesrvAddr=192.168.13.143:9876
#if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
rocketmq.config.isVIPChannel=
#rocketmq-console's data path:dashboard/monitor
rocketmq.config.dataPath=/tmp/rocketmq-console/data
#set it false if you don't want use dashboard.default true
rocketmq.config.enableDashBoardCollect=true

3. 执行打包

到源码pom 所在目录进行打包:

mvn clean package -DskipTests=true

4. 运行target 目录下面生成的jar 包即可

java -jar rocketmq-console-ng-1.0.0.jar

5. 启动后通过浏览器访问即可

Rocket简介以及单机版安装

 查看message:

Rocket简介以及单机版安装

 

 

   至此简单完成了rocketMQ 的单机版安装。

 

上一篇:Linux ❉ 打包(归档)和压缩


下一篇:MySQL5.7升级到8.0