RocketMQ的基础知识
1-1 RocketMQ的特点
RocketMQ特点:
1、支持事务消息
2、支持延迟消息
3、天然支持集群、负载均衡
4、支持指定次数和时间间隔的失败消息重发
1-2 RocketMQ的组成
RocketMQ的组成:
broker: 经纪人,代理商 ;
1) Producer Cluster: 消息生产者群,负责发送消息,一般由业务系统负责产生消息(从NameServer获取broker信息)
2) NameServer Cluster: 负责消费消息,一般是后台系统负责异步消费。
3) Broker Cluster(消息服务器): RocketMQ的核心,负责消息的接受,存储,发送等。
4) Consumer Cluster:集群架构中的组织协调员,相当于注册中心,收集broker的工作情况,不负责消息的处理(从NameServer获取broker信息)
RocketMQ的配置文件(runserver.sh)
#===========================================================================================
# JVM Configuration,开发环境可以将内存参数设置小一点
堆参数:
-Xmx 最大堆
-Xms 最小堆
-Xmn 新生代大小
#================================默认堆的配置是4G=========================================================
配置命令
nohup sh mqnamesrv & # 启动nameserver
nohup sh mqbroker -n localhost:9876 & # 启动broker server并测试
测试消息发送
export NAMESRV_ADDR=127.0.0.1:9876
bash tools.sh org.apache.rocketmq.example.quickstart.Producer
1-3 RocketMQ消息的发送模式与消息的结构(重要)
1-3-1 三种消息发送方式
方式1:同步消息(sync message )
producer向 broker 发送消息,执行 API 时同步等待, 直到broker 服务器返回发送结果
方式2:异步消息(async message)
producer向 broker 发送消息时指定消息发送成功及发送异常的回调方法,调用 API 后立即返回,producer发送消
息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行 。
方式3:单向消息(oneway message)
producer向 broker 发送消息,执行 API 时直接返回,不等待broker 服务器的结果
1-3-2 消息结构
消息结构 | |||
---|---|---|---|
基本属性 | topic (一级分类) | 消息体(4M) | 消息 Flag (通常业务代码使用) |
扩展属性 | tag (一般为空,用于消息过滤) | keys: Message(运维检索) | waitStoreMsgOK (发送是否等待消息存储) |
基本属性三个组成部分:
1)主题:消息的一级分类,具有相同topic的消息将发送至该topic下的消息队列中
2)消息体:即消息的内容 ,可以的字符串、对象等类型(可系列化)。消息的最大长度 是4M
3) 消息flag:消息的一个标记,RocketMQ不处理,留给业务系统使用
扩展属性的三个组成部分
1)tag :相当于消息的二级分类,用于消费消息时进行过滤,可为空(区别于消息flag,扩展属性的flag可以用于过滤消息)
2)keys: Message 索引键,在运维中可以根据这些 key 快速检索到消息,可为空 。
3)waitStoreMsgOK :消息发送时是否等消息存储完成后再返回 。
1-3-3 RocketMQ简单实例
生产者代码
生产者
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class ProducerSimple {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 消息发送的模式1: 同步消息
* 应用场景:
* 向topic队列发送同步消息
* @param topic
* @param msg
*/
public void sendSyncMsg(String topic, String msg){
rocketMQTemplate.syncSend(topic,msg,100000);
}
/**
* 消息的发送模式2:异步消息
*/
public void sendASyncMsg(String topic,String msg){
/*异步消息需要设置回调对象,消息发送成功/失败后,会由另外一个线程调用对象中的方法*/
rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
throwable.printStackTrace();
}
},100000);
}
}
调用生产者发送消息
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class ProducerSimpleTest {
@Autowired
private ProducerSimple producerSimple;
//测试发送同步消息
@Test
public void testSendSyncMsg(){
this.producerSimple.sendSyncMsg("testTopic", "第3条同步消息");
System.out.println("end...");
}
// 测试发送异步消息
@Test
public void testSendASyncMsg(){
this.producerSimple.sendASyncMsg("testTopic","第一条异步消息");
try{
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
生产者配置
server:
port: 8182 #服务端口
servlet:
context‐path: /rocketmq‐consumer
spring:
application:
name: rocketmq‐consumer #指定服务名
rocketmq:
consumer:
group: demo_consumer_group # 必须配置才能注入RocketMQTemplate模板
name-server: 49.52.10.41:9876
消费者代码
消费者
package com.shanjupay.test.rocketmq.message;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* 监听消息队列需要指定:
* 1)topic:监听的主题
* 2)consumerGroup:消费组,相同消费组的消费者共同消费该主题的消息,它们组成一个集群(配置文件中设置),与之对应
* 生成者需要配置producer group.
*/
@Component
@RocketMQMessageListener(topic = "testTopic",consumerGroup = "demo_consumer_group")
public class ConsumerSimple implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println(s);
}
}
消费者配置
server:
port: 8181 #服务端口
servlet:
context‐path: /rocketmq‐producer
spring:
application:
name: rocketmq‐producer
rocketmq:
producer:
group: demo-producer-group
name-server: 49.52.10.41:9876
1-4 RocketMQ的消息传递流程与消费模式
消息发送流程
step1:从Nameserver获取路由信息,选择消息队列: Broker会将信息上报给Nameserver,因此NameServer中存有每个broker的topic以及队列,producer发送前根据topic从NameServer查询所有消息队列。如果该topic没有队列则会新建,通常一个topic会查询到多个队列,因此会按照一定的算法选择一个队列发送。
根据topic查询的结果如下所示:
[
{"brokerName":"Broker‐1","queueId":0},
{"brokerName":"Broker‐1","queueId":1},
{"brokerName":"Broker‐2","queueId":0},
{"brokerName":"Broker‐2","queueId":1}
]
step2:检验并发送消息
-
发送消息前进行校验,比如消息的内容长度不能为0、消息最大长度、消息必要的属性是否具备等
-
若topic下还没有队列则自动创建,默认一个topic下自动创建4个写队列,4个读队列
多个队列的动机:高可用(一个队列挂了,还有其他),高性能(并发度高)
问题:为什么设置producer group?
方便在事务消息中broker(代理)需要回查producer(回调),同一个生产组的producer组成一个集群,提高并发能力
step3:consumer处于监听队列状态,消费消息
辨析三个概念: topic, consumer group,consumer
1)一个消费组可以包括多个消费者,一个消费组可以订阅多个主题。
2)一个队列同时只允许一个消费者消费,一个消费者可以消费多个队列中的消息。
问题:消息队列的消费模式(广播模式的推拉模式)?
1)集群模式(点对点模式):一个消费组内的消费者组成一个集群,主题下的一条消息只能被一个消费者消费。
2)广播模式(发布订阅模式):主题下的一条消息能被消费组下的所有消费者消费,消费者和broker之间通过推模式和拉模式接收消息
广播模式下的消息消费方式?
推模式:broker主动将消息推送给消费者
拉模式:消费者从broker中查询消息
1-5 延迟消息的应用与实现
典型应用场景:订单的关闭
- 延迟消息也叫做定时消息,比如在电商项目的交易系统中,当用户下单之后超过一段时间之后仍然没有支付,此时就需要将该订单关l闭。
功能实现:可以在用户创建订单时就发送一条包含订单内容的延迟消息,该消息在一段时间之后投递给消息消费者,当消息消费者接收到该消息后,
判断该订单的支付状态,如果处于未支付状态,则将该订单关闭,商品回库(删除订单)。
RocketMQ的延迟等级:RocketMQ的延迟消息实现非常简单,只需要发送消息前设置延迟的时间,延迟时间存在十八个等级
(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h),调用setDelayTimeLevel()设置与时间相对应的延迟级别即可
import com.shanjupay.test.rocketmq.model.OrderExt;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message; // spring的message对象
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class ProducerSimple {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 消息发送的模式1: 同步消息
* 应用场景:
* 向topic队列发送同步消息
* @param topic
* @param msg
*/
public void sendSyncMsg(String topic, String msg){
rocketMQTemplate.syncSend(topic,msg,100000);
}
/**
* 消息的发送模式2:异步消息
*/
public void sendASyncMsg(String topic,String msg){
/*异步消息需要设置回调对象,消息发送成功/失败后,会由另外一个线程调用对象中的方法*/
rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
throwable.printStackTrace();
}
},100000);
}
/**
* 将对象转换为json字符串作为消息同步发送
*/
public void sendMsgByJson(String topic, OrderExt orderExt){
rocketMQTemplate.convertAndSend(topic,orderExt);
}
/**
* 发送同步延迟消息(需要将对象转换为spring的message对象)
* @param topic broker中队列topic
* @param orderExt 传递的消息对象内容
*/
public void sendMsgByJsonDelay(String topic, OrderExt orderExt) {
Message<OrderExt> message = MessageBuilder.withPayload(orderExt).build(); //发送同步消息,消息内容将orderExt转为json
this.rocketMQTemplate.syncSend(topic,message,1000,3); //指定发送超时时间(毫秒)和延迟等级
System.out.printf("send msg : %s",orderExt);
}
}
延迟队列的实现流程
1)如果消息的延迟级别大于0,则表示该消息为延迟消息,修改该消息的主题为SCHEDULE_TOPIC_XXXX,队列Id为延迟级别减1。
2)消息进入SCHEDULE_TOPIC_XXXX的队列中。
3)定时任务根据上次拉取的偏移量不断从队列中取出所有消息。
4)根据消息的物理偏移量和大小再次获取消息。
5)根据消息属性重新创建消息,清除延迟级别,恢复原主题和队列Id。
6)重新发送消息到原主题的队列中,供消费者进行消费。
基本思想:通过定时任务+队列来实现消息延时发送到broker
1-6 消费重试
消费重试定义:producer线程成功将消息发送到Broker,被consumer消费时,发生意外情况,没有被正常消费,此时需要进行消费重试
何时需要消费重试?
1)消息没有被消费者接收,比如消费者与broker存在网络异常。此种情况消息会一直被消费重试。
2)消息接受成功,但执行时产生异常,无法向broker返回结果,这个时候也会消费重试(实际场景更为常见的问题)。
broker是如何知道消息消费的成功与否的?
broker会从消费者获取信息消费的结果,如果没有返回消费成功的状态,那么消费者就会进行重试。
消费者抛出异常,该如何处理?
当消息在消费时出现异常,此时消息被有限次的重试消费。
默认策略:消息会按照延迟消息的延迟时间等级(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)从第3级开始重试,每试一次如果还不成功则延迟等级加1,因此最多重试16次,如果依旧无法消费成功,那么该消息会进入到死信队列中。
实际开发中如何处理消费失败的情况?
默认策略:进行有限次的消费重试,每次重试仍然消费失败的话,延迟下一次重试的时间。
实际开发策略基本思想:实际生产中不会让消息重试这么多次,通常在重试一定的次数后将消息写入数据库,由另外单独的程序或人工去处
理
这种处理失败的情况,通常属于线上的异常情况,当重试次数达到一定的阈值,则首先需要保存消息,便于定位问题,维护系统
/*处理的逻辑如下:*/
public class ConsumerSimple implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
//取出当前重试次数
int reconsumeTimes = messageExt.getReconsumeTimes();
//当大于一定的次数后将消息写入数据库,由单独的程序或人工去处理
if(reconsumeTimes >=2){
//将消息写入数据库,之后正常返回
return;
}
throw new RuntimeException(String.format("第%s次处理失败..",reconsumeTimes));
}
}