RocketMQ的简单使用

RocketMQ的基础知识

1-1 RocketMQ的特点

RocketMQ特点:

1、支持事务消息
2、支持延迟消息
3、天然支持集群、负载均衡
4、支持指定次数和时间间隔的失败消息重发

1-2 RocketMQ的组成

RocketMQ的简单使用

RocketMQ的组成

broker: 经纪人,代理商 ;
1) Producer Cluster: 消息生产者群,负责发送消息,一般由业务系统负责产生消息(从NameServer获取broker信息)
2) NameServer Cluster: 负责消费消息,一般是后台系统负责异步消费。
3) Broker Cluster(消息服务器): RocketMQ的核心,负责消息的接受,存储,发送等。
4) Consumer Cluster:集群架构中的组织协调员,相当于注册中心,收集broker的工作情况,不负责消息的处理(从NameServer获取broker信息)

RocketMQ的配置文件(runserver.sh)

#===========================================================================================
# JVM Configuration,开发环境可以将内存参数设置小一点
堆参数:
  -Xmx  最大堆
  -Xms  最小堆
  -Xmn  新生代大小
#================================默认堆的配置是4G=========================================================

配置命令

nohup sh mqnamesrv &                        # 启动nameserver
nohup sh mqbroker -n localhost:9876 &       # 启动broker server并测试

测试消息发送

export NAMESRV_ADDR=127.0.0.1:9876
bash tools.sh org.apache.rocketmq.example.quickstart.Producer

1-3 RocketMQ消息的发送模式与消息的结构(重要)

1-3-1 三种消息发送方式

方式1:同步消息(sync message )

producer向 broker 发送消息,执行 API 时同步等待, 直到broker 服务器返回发送结果 

方式2:异步消息(async message)

producer向 broker 发送消息时指定消息发送成功及发送异常的回调方法,调用 API 后立即返回,producer发送消
息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行 。

方式3:单向消息(oneway message)

producer向 broker 发送消息,执行 API 时直接返回,不等待broker 服务器的结果

1-3-2 消息结构

消息结构
基本属性 topic (一级分类) 消息体(4M) 消息 Flag (通常业务代码使用)
扩展属性 tag (一般为空,用于消息过滤) keys: Message(运维检索) waitStoreMsgOK (发送是否等待消息存储)

基本属性三个组成部分

1)主题:消息的一级分类,具有相同topic的消息将发送至该topic下的消息队列中 
2)消息体:即消息的内容 ,可以的字符串、对象等类型(可系列化)。消息的最大长度 是4M
3) 消息flag:消息的一个标记,RocketMQ不处理,留给业务系统使用

扩展属性的三个组成部分

1)tag :相当于消息的二级分类,用于消费消息时进行过滤,可为空(区别于消息flag,扩展属性的flag可以用于过滤消息)
2)keys: Message 索引键,在运维中可以根据这些 key 快速检索到消息,可为空 。
3)waitStoreMsgOK :消息发送时是否等消息存储完成后再返回 。

1-3-3 RocketMQ简单实例

生产者代码

生产者

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class ProducerSimple {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 消息发送的模式1: 同步消息
     * 应用场景:
     * 向topic队列发送同步消息
     * @param topic
     * @param msg
     */
    public void sendSyncMsg(String topic, String msg){
        rocketMQTemplate.syncSend(topic,msg,100000);
    }

    /**
     * 消息的发送模式2:异步消息

     */
    public void sendASyncMsg(String topic,String msg){
        /*异步消息需要设置回调对象,消息发送成功/失败后,会由另外一个线程调用对象中的方法*/
        rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                throwable.printStackTrace();
            }
        },100000);
    }
}

调用生产者发送消息

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ProducerSimpleTest {
    @Autowired
    private ProducerSimple producerSimple;
    //测试发送同步消息
    @Test
    public void testSendSyncMsg(){
        this.producerSimple.sendSyncMsg("testTopic", "第3条同步消息");
        System.out.println("end...");
    }
    // 测试发送异步消息
    @Test
    public void testSendASyncMsg(){
        this.producerSimple.sendASyncMsg("testTopic","第一条异步消息");
        try{
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

生产者配置

server:
  port: 8182 #服务端口
  servlet:
    context‐path: /rocketmq‐consumer

spring:
  application:
    name: rocketmq‐consumer #指定服务名
rocketmq:
  consumer:
    group: demo_consumer_group           # 必须配置才能注入RocketMQTemplate模板
  name-server: 49.52.10.41:9876
消费者代码

消费者

package com.shanjupay.test.rocketmq.message;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;


/**
 * 监听消息队列需要指定:
 * 1)topic:监听的主题
 * 2)consumerGroup:消费组,相同消费组的消费者共同消费该主题的消息,它们组成一个集群(配置文件中设置),与之对应
 * 生成者需要配置producer group.
 */
@Component
@RocketMQMessageListener(topic = "testTopic",consumerGroup = "demo_consumer_group")
public class ConsumerSimple implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println(s);
    }
}

消费者配置

server:
  port: 8181 #服务端口
  servlet:
    context‐path: /rocketmq‐producer

spring:
  application:
    name: rocketmq‐producer
rocketmq:
  producer:
    group: demo-producer-group
  name-server: 49.52.10.41:9876

1-4 RocketMQ的消息传递流程与消费模式

RocketMQ的简单使用

消息发送流程

step1:从Nameserver获取路由信息,选择消息队列: Broker会将信息上报给Nameserver,因此NameServer中存有每个broker的topic以及队列,producer发送前根据topic从NameServer查询所有消息队列。如果该topic没有队列则会新建,通常一个topic会查询到多个队列,因此会按照一定的算法选择一个队列发送。

根据topic查询的结果如下所示:
[
    {"brokerName":"Broker‐1","queueId":0},
    {"brokerName":"Broker‐1","queueId":1},
    {"brokerName":"Broker‐2","queueId":0},
    {"brokerName":"Broker‐2","queueId":1}
]

step2:检验并发送消息

  • 发送消息前进行校验,比如消息的内容长度不能为0、消息最大长度、消息必要的属性是否具备等

  • 若topic下还没有队列则自动创建,默认一个topic下自动创建4个写队列,4个读队列

多个队列的动机:高可用(一个队列挂了,还有其他),高性能(并发度高)

问题:为什么设置producer group?

方便在事务消息中broker(代理)需要回查producer(回调),同一个生产组的producer组成一个集群,提高并发能力

step3:consumer处于监听队列状态,消费消息

辨析三个概念: topic, consumer group,consumer

1)一个消费组可以包括多个消费者,一个消费组可以订阅多个主题。
2)一个队列同时只允许一个消费者消费,一个消费者可以消费多个队列中的消息。
问题:消息队列的消费模式(广播模式的推拉模式)?
1)集群模式(点对点模式):一个消费组内的消费者组成一个集群,主题下的一条消息只能被一个消费者消费。
2)广播模式(发布订阅模式):主题下的一条消息能被消费组下的所有消费者消费,消费者和broker之间通过推模式和拉模式接收消息

广播模式下的消息消费方式?

推模式:broker主动将消息推送给消费者

拉模式:消费者从broker中查询消息

1-5 延迟消息的应用与实现

典型应用场景:订单的关闭

  • 延迟消息也叫做定时消息,比如在电商项目的交易系统中,当用户下单之后超过一段时间之后仍然没有支付,此时就需要将该订单关l闭。

功能实现:可以在用户创建订单时就发送一条包含订单内容的延迟消息,该消息在一段时间之后投递给消息消费者,当消息消费者接收到该消息后,
判断该订单的支付状态,如果处于未支付状态,则将该订单关闭,商品回库(删除订单)

RocketMQ的延迟等级:RocketMQ的延迟消息实现非常简单,只需要发送消息前设置延迟的时间,延迟时间存在十八个等级
(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h),调用setDelayTimeLevel()设置与时间相对应的延迟级别即可

import com.shanjupay.test.rocketmq.model.OrderExt;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;         // spring的message对象
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class ProducerSimple {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 消息发送的模式1: 同步消息
     * 应用场景:
     * 向topic队列发送同步消息
     * @param topic
     * @param msg
     */
    public void sendSyncMsg(String topic, String msg){
        rocketMQTemplate.syncSend(topic,msg,100000);
    }

    /**
     * 消息的发送模式2:异步消息

     */
    public void sendASyncMsg(String topic,String msg){
        /*异步消息需要设置回调对象,消息发送成功/失败后,会由另外一个线程调用对象中的方法*/
        rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                throwable.printStackTrace();
            }
        },100000);
    }


    /**
     * 将对象转换为json字符串作为消息同步发送
     */
    public void sendMsgByJson(String topic, OrderExt orderExt){
        rocketMQTemplate.convertAndSend(topic,orderExt);
    }

    /**
     * 发送同步延迟消息(需要将对象转换为spring的message对象)
     * @param topic     broker中队列topic
     * @param orderExt  传递的消息对象内容
     */
    public void sendMsgByJsonDelay(String topic, OrderExt orderExt) {
        Message<OrderExt> message = MessageBuilder.withPayload(orderExt).build();      //发送同步消息,消息内容将orderExt转为json
        this.rocketMQTemplate.syncSend(topic,message,1000,3); //指定发送超时时间(毫秒)和延迟等级
        System.out.printf("send msg : %s",orderExt);
    }
    
}
延迟队列的实现流程
1)如果消息的延迟级别大于0,则表示该消息为延迟消息,修改该消息的主题为SCHEDULE_TOPIC_XXXX,队列Id为延迟级别减1。
2)消息进入SCHEDULE_TOPIC_XXXX的队列中。
3)定时任务根据上次拉取的偏移量不断从队列中取出所有消息。
4)根据消息的物理偏移量和大小再次获取消息。
5)根据消息属性重新创建消息,清除延迟级别,恢复原主题和队列Id。
6)重新发送消息到原主题的队列中,供消费者进行消费。

基本思想:通过定时任务+队列来实现消息延时发送到broker

1-6 消费重试

消费重试定义:producer线程成功将消息发送到Broker,被consumer消费时,发生意外情况,没有被正常消费,此时需要进行消费重试

何时需要消费重试?
1)消息没有被消费者接收,比如消费者与broker存在网络异常。此种情况消息会一直被消费重试。
2)消息接受成功,但执行时产生异常,无法向broker返回结果,这个时候也会消费重试(实际场景更为常见的问题)。
broker是如何知道消息消费的成功与否的?
broker会从消费者获取信息消费的结果,如果没有返回消费成功的状态,那么消费者就会进行重试。
消费者抛出异常,该如何处理?
当消息在消费时出现异常,此时消息被有限次的重试消费。
默认策略:消息会按照延迟消息的延迟时间等级(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)从第3级开始重试,每试一次如果还不成功则延迟等级加1,因此最多重试16次,如果依旧无法消费成功,那么该消息会进入到死信队列中。
实际开发中如何处理消费失败的情况?

默认策略:进行有限次的消费重试,每次重试仍然消费失败的话,延迟下一次重试的时间。

实际开发策略基本思想:实际生产中不会让消息重试这么多次,通常在重试一定的次数后将消息写入数据库,由另外单独的程序或人工去处

这种处理失败的情况,通常属于线上的异常情况,当重试次数达到一定的阈值,则首先需要保存消息,便于定位问题,维护系统
/*处理的逻辑如下:*/
public class ConsumerSimple implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
        	 //取出当前重试次数
        	int reconsumeTimes = messageExt.getReconsumeTimes();
        	    //当大于一定的次数后将消息写入数据库,由单独的程序或人工去处理
            if(reconsumeTimes >=2){
                //将消息写入数据库,之后正常返回
                return;
            }
    		throw new RuntimeException(String.format("第%s次处理失败..",reconsumeTimes));
    }
}

参考资料

RocketMQ的延迟消息
消息队列的幂等性
消息重复,消息丢失,消息积压的解决策略
RocketMQ的基础课程

RocketMQ的简单使用

上一篇:ARC124 B - XOR Matching 2(思维)


下一篇:递归和迭代实现二叉树先序、中序、后序和层序遍历