消息中间件RocketMQ简介以及简单实用

简介

消息中间件是什么?

中间件:顾名思义 介于两者之间的⼀个技术

A→中间→B

消息中间件:消息中间件利⽤⾼效可靠的消息传递机制进⾏平台⽆关的数据交流,并基于数据通信来进⾏分布式系统的集成。

RocketMQ是什么?

RocketMQ是阿⾥巴巴开源的⼀个消息中间件,是⼀个队列模型的消息中间件,具有⾼性能、⾼可靠、⾼实时、分布式特点。⽬前已贡献给apache

功能

异步化

将⼀些可以进⾏异步化的操作通过发送消息来进⾏异步化,提⾼效率

具体场景:⽤户为了使⽤某个应⽤,进⾏注册,系统需要发送注册邮件并验证短信。对这两个操作的处理⽅式有两种:串⾏及并⾏。

  1. 串行方式:新注册信息⽣成后,先发送注册邮件,再发送验证短信,在这种⽅式下,需要最终发送验证短信后再返回给客户端。

消息中间件RocketMQ简介以及简单实用

  1. 并⾏处理:新注册信息写⼊后,发短信和发邮件并⾏处理

消息中间件RocketMQ简介以及简单实用

在这种⽅式下,发短信和发邮件 需处理完成后再返回给客户端。

假设以上三个⼦系统处理的时间均为50ms,且不考虑⽹络延迟,则总的处理时间:串⾏:50 + 50 + 50 = 150ms 并⾏:50 + 50 = 100ms

  1. 使⽤消息队列

消息中间件RocketMQ简介以及简单实用

并在写⼊消息队列后⽴即返回成功给客户端,则总的响应时间依赖于写⼊消息队列的时间,⽽写⼊消息队列的时间本身是可以很快的,基本可以忽略不计,因此总的处理时间相⽐串⾏提⾼了2倍,相⽐并⾏提⾼了⼀倍;

限流削峰

在⾼并发场景下把请求存⼊消息队列,利⽤排队思想降低系统瞬间峰值

具体场景:购物⽹站开展秒杀活动,⼀般由于瞬时访问量过⼤,服务器接收过⼤,会导致流量暴增,相关系统⽆法处理请求甚⾄崩溃。⽽加⼊消息队列后,系统可以从消息队列中取数据,相当于消息队列做了⼀次缓冲。

消息中间件RocketMQ简介以及简单实用

优点

  1. 请求先⼊消息队列,⽽不是由业务处理系统直接处理,做了⼀次缓冲,极⼤地减少了业务处理系统的压⼒;

  2. 队列⻓度可以做限制,事实上,秒杀时,后⼊队列的⽤户⽆法秒杀到商品,这些请求可以直接被抛弃,返回活动已结束或商品已售完信息;

对比

消息中间件不仅仅只有RocketMQ,市⾯上还有很多其他的消息中间件,这⾥列举⼏个常⻅的和RocketMQ作为⼀个对⽐

ActiveMQ:ActiveMQ 是Apache出品,最流⾏的,能⼒强劲的开源消息总线。ActiveMQ 是⼀个完全⽀持JMS1.1和J2EE 1.4规范的 JMS Provider实现。

JMS: 全称是Java Message Service,即消息服务应⽤程序接⼝,是⼀个Java⾯向消息中间件平台的API,⽤于在两个应⽤程序之间,或分布式系统中发送消息,进⾏异步通信

RabbitMQ:AMQP协议的领导实现,⽀持多种场景。淘宝的MySQL集群内部有使⽤它进⾏通讯,OpenStack开源云平台的通信组件,最先在⾦融⾏业得到运⽤。

AMQP: 即Advanced Message Queuing Protocol,⼀个提供统⼀消息服务的应⽤层标准⾼级消息队列协议,是应⽤层协议的⼀个开放标准,为⾯向消息的中间件设计

Kafka: Kafka是最初由Linkedin公司开发,是⼀个分布式、⽀持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最⼤的特性就是可以实时的处理⼤量数据以满⾜各种需求场景:⽐如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx⽇志、访问⽇志,消息服务等等,⽤scala语⾔编写,Linkedin于2010年贡献给了Apache基⾦会并成为*开源项⽬。
消息中间件RocketMQ简介以及简单实用

模型

相关概念

  • Producer: 消息⽣产者,负责消息的产⽣,由业务系统负责产⽣

  • Consumer:消息消费者,负责消息消费,由后台业务系统负责异步消费

  • Topic:消息的逻辑管理单位

这三者是RocketMq中最最基本的概念。Producer是消息的⽣产者。Consumer是消息的消费者。消息通过Topic进⾏传递。Topic存放的是消息的逻辑地址。

消息中间件RocketMQ简介以及简单实用

具体来说是Producer将消息发往具体的Topic。Consumer订阅Topic,主动拉取或被动接受消息,如果Consumer消费消息失败则默认会重试16次

概念模型

消息中间件RocketMQ简介以及简单实用

  • Broker: 消息的中转⻆⾊,负责存储消息,转发消息,⼀般也称为server,可以理解为⼀个存放消息的服务,⾥⾯可以有多个Topic

  • MessageQueue: 消息的物理管理单位,⼀个Topic下有多个Queue,默认⼀个Topic创建时会创建四个MessageQueue

  • ConsumerGroup: 具有同样消费逻辑消费同样消息的Consumer,可以归并为⼀个group

  • ProducerGroup: 具有同样属性的⼀些Producer可以归并为同⼀个Group同样属性是指:发送同样Topic类型的消息

  • Nameserver 注册中⼼

    作⽤:

    • 每个Broker启动的时候会向namesrv注册

    • Producer发送消息的时候根据Topic获取路由到Broker⾥⾯Topic的信息

    • Consumer根据Topic到Namesrv 获取topic的路由到Broker的信息

部署模型

消息中间件RocketMQ简介以及简单实用

  1. 注册中⼼Nameserver启动

  2. 消息中转服务Broker启动

    • 启动的时候会去创建Topic并创建对应的MessageQueue

    • 启动的时候会去注册中⼼注册,把⾃⼰的地址以及负责的Topic告诉注册中⼼

    • Broker和Nameserver之间通过⼼跳机制来检测对⽅是否存活

      连接: 单个broker和所有nameserver保持⻓连接

      ⼼跳:

      ⼼跳间隔:每隔30秒(此时间⽆法更改)向所有nameserver发送⼼跳,⼼跳包含了⾃身的topic配置信息。

      ⼼跳超时:nameserver每隔10秒钟(此时间⽆法更改),扫描所有还存活的broker连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间⽆法更改)没有发送⼼跳数据,则断开连接。

  3. 消息⽣产者Produer启动

    • 启动的时候会去注册中⼼注册

    注册那些内容呢?

    1. 把⾃⼰的IP地址告诉注册中心

    2. 把⾃⼰⽣产的Topic告诉注册中心

      运⾏时:

      • 单个⽣产者者和⼀台nameserver保持⻓连接,定时查询topic配置信息,如果该nameserver挂掉,⽣产者会⾃动连接下⼀个nameserver,直到有可⽤连接为⽌,并能⾃动重连。
      • 单个⽣产者和该⽣产者关联的所有broker保持⻓连接。
      • 默认情况下,⽣产者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,⽣产者最多要30秒才能感知,在此期间,发往该broker的消息发送失败。该时间可⼿动配置
      • 默认情况下,⽣产者每隔30秒向所有broker发送⼼跳,该时间由DefaultMQProducer的heartbeatBrokerInterval参数决定,可⼿动配置。broker每隔10秒钟(此时间⽆法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间⽆法更改)没有发送⼼跳数据,则关闭连接
  4. 消息消费者Consumer启动

    • 启动的时候会去注册中⼼注册

    注册哪些内容呢?

    1. 把⾃⼰的IP地址告诉注册中⼼

    2. 把⾃⼰可以消费的Topic告诉注册中⼼

      运⾏时:

      • 单个消费者和⼀台nameserver保持⻓连接,定时查询topic配置信息,如果该nameserver挂掉,消费者会⾃动连接下⼀个nameserver,直到有可⽤连接为⽌,并能⾃动重连。
      • 单个消费者和该消费者关联的所有broker保持⻓连接。
      • 默认情况下,消费者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,客户端最多要30秒才能感知。该时间由DefaultMQPushConsumer的pollNameServerInteval参数决定,可⼿动配置。
      • 默认情况下,消费者每隔30秒向所有broker发送⼼跳,该时间由DefaultMQPushConsumer的heartbeatBrokerInterval参数决定,可⼿动配置。broker每隔10秒钟(此时间⽆法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间⽆法更改)没有发送⼼跳数据,则关闭连接,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费

注意事项

  • 同步刷盘与异步刷盘

    RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复,⼜可以让存储的消息量超出内存的限制。

    RocketMQ为了提⾼性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写⼊RocketMQ的时候,有两种写磁盘⽅式:

    • 异步刷盘:在返回写成功状态时,消息可能只是被写⼊了内存中,写操作的返回快吞吐量⼤;当内存⾥的消息量积累到⼀定程度时,统⼀触发写磁盘操作,快速写⼊
    • 同步刷盘:在返回写成功状态时,消息已经被写⼊磁盘。具体流程是,消息写⼊内存后,⽴刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执⾏完成后唤醒等待的线程,返回消息写成功的状态。

    同步刷盘还是异步刷盘,是通过Broker配置⽂件⾥的flflushDiskType参数设置的,这个参数被设置成SYNC_FLUSH、ASYNC_FLUSH中的⼀个

  • 同步复制与异步复制

    如果⼀个broker组有Master和Slave,消息需要从Master复制到Slave上,有同步和异步两种复制⽅式。

​同步复制是等Master和Slave均写成功后才反馈给客户端写成功状态;异步复制⽅式是只要Master写成功即可反馈给客户端写成功状态

​同步复制和异步复制是通过Broker配置⽂件⾥的brokerRole参数进⾏设置的,这个参数可以被设置成ASYNC_MASTER、SYNC_MASTER、SLAVE三个值中的⼀个。

安装使用

下载

官⽹地址:http://rocketmq.apache.org

下载地址:http://rocketmq.apache.org/release_notes/release-notes-4.4.0/

消息中间件RocketMQ简介以及简单实用

安装

  1. 按照上述步骤,下载,解压

  2. 注意事项:

a) RocketMQ 是基于Java开发的,所以需要配置相应的jdk环境变量才能运⾏

b) RocketMQ也需要配置环境变量才可在windows上运⾏
消息中间件RocketMQ简介以及简单实用

启动

准备⼯作

⽬的:为了防⽌出现内存不⾜的情况
消息中间件RocketMQ简介以及简单实用

修改NameServer和Broker配置:(windows/Linux)(如果是Linux环境则对应的是 runbroker.sh 和runserver.sh)编辑runbroker.cmd⽂件,修改如下:
消息中间件RocketMQ简介以及简单实用

参数解释

参数解释:

- Xms: 是指程序启动时初始内存⼤⼩(此值可以设置成与-Xmx相同,以避免每次GC完成后 JVM内存重新分配)

- Xmx: 指程序运⾏时最⼤可⽤内存⼤⼩,程序运⾏中内存⼤于这个值会 OutOfMemory

- Xmn: 年轻代⼤⼩(整个JVM内存⼤⼩ = 年轻代 + 年⽼代 + 永久代)

编辑runserver.cmd⽂件,修改如上图⼀样

启动

  1. ⾸先启动注册中⼼nameserver ,默认启动在9876端⼝,打开cmd命令窗⼝,进⼊bin⽬录,执⾏

命令:

start mqnamesrv.cmd

Linux则执⾏

sh ./mqnamesrv

出现以下⽇志表示启动成功

消息中间件RocketMQ简介以及简单实用

  1. 启动RocketMQ服务,也就是broker

进⼊bin⽬录,执⾏命令:

Windows

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

Linux

sh ./mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true

注意:autoCreateTopicEnable=true这个设置表示开启⾃动创建topic功能,真实⽣产环境不建议开启。

出现以下⽇志表示启动成功

消息中间件RocketMQ简介以及简单实用

补充命令

jps: 查看Java进程,是JDK给我们提供的⼀个⼯具命令

整合

导包

<dependency>
 <groupId>org.apache.rocketmq</groupId>
 <artifactId>rocketmq-client</artifactId>
 <version>4.4.0</version>
</dependency>

消息生产者实现

public static void main(String[] args){
 	// 新增消息⽣产者
 	DefaultMQProucer producer = new DefaultMQProucer("producer_group");
	// 配置注册中⼼
 	producer.setNamesrvAddr("localhost:9876");
 	// 启动
 	producer.start();
 	// 新建消息对象
 	Message message = new Message("topicA","message".context.getBytes(Charset.forName("utf-8")));
 	// 发送消息
 	producer.send(message);
}

消息消费者实现

public static void main(String[] args) throws MQClientException {
    DefaultMQPushConsumer mqConsumer = new DefaultMQPushConsumer("consumer_group");
    mqConsumer.setNamesrvAddr("localhost:9876");
    mqConsumer.subscribe("topicA", "*");
    // 设置消息监听器
    mqConsumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            MessageExt message = msgs.get(0);
            //获取消息内容
            byte[] body = message.getBody();
        });
    }
	mqConsumer.start();
}

补充:

如果出现以下错误

org.apache.rocketmq.client.exception.MQClientException: No route info of this topic, xxx

那么表示你的rocketMQ中没有创建这个topic,表示开启autoCreateTopicEnable失效了,这个时候需要⼿动创建topic,还是进⼊bin⽬录,执⾏命令:

sh ./mqamdin updateTopic -n localhost:9876 -b localhost:10911 -t topicName

然后重新启动⼀下程序即可。

上一篇:SAP ABAP守护进程(ABAP Daemon)的实现方式


下一篇:小五思科技术学习笔记之浮动路由(floating static route)