RabbitMq 的理论及应用示例(一)

什么是RabbitMq

RabbitMQ是一个基于AMQP协议的开源的消息代理和队列服务器。
优点:

  • 采用Erlang语言进行开发作为底层语言实现:Erlang有着和原生Socket一样的延迟,所以性能非常高
  • 开源、性能优秀,稳定性保障
  • 提供可靠性消息投递模式(confirm)、返回模式(return)
  • 与SpringAMQP完美整合,API丰富
  • 集群模式比较丰富,表达式配置,HA模式,镜像队列模型
  • 保证数据不丢失的前提做到高可靠性,可用性

AMOP 专有名词:

  • Server:又称为 Broker
  • Connection:连接,应用程序和broker之间的网络连接
  • Channel:网络信道,一个网络会话的任务
  • Message:消息
  • Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个Virtual host里面可以有若干个Exchage和Queue,但同一个Virtual host里面不能有相同名称的Exchage和Queue
  • Exchange:交换机,接收消息,根据路由键转发消息到绑定队列
  • Binding:Exchage和Queue之间的虚拟连接,binding中可以包含routing key
  • Routing key:一个路由规则
  • Queue:保存具体的消息的容器

RabbitMQ 消息的流转过程:
RabbitMq 的理论及应用示例(一)

应用场景/作用

  • 异步缓冲
    有些业务可以进行异步的,只要做到最终一致性,不用强一致性,即可用MQ

  • 服务解耦

    • 强依赖:使用 dubbo 或 springCloud 进行服务的调用和连接都是强依赖。【比如注册、发现都需要依赖其他服务】

    • 弱依赖:MQ 中间件

      • 不代表弱依赖就可以失败
      • 如果不能失败就要保证上游的消息发布端数据投递的可靠性

      场景举例:用户下单后,订单需要更新库存

      强依赖下会出现的问题:
      1)假如库存系统无法访问,则订单减库存失败,从而导致订单生成失败
      2)订单模块和库存模块是强耦合的
      3)如果启用一个线程做离线操作,只是做了异步访问,访问只是提升速度,是否正 常调用成功是无法保证的

      通过弱依赖来解决以上问题:
      1)订单生产成功写入消息到消息队列(保证消息的可靠投递)
      2)库存系统通过订阅消息获取下单信息,库存系统根据下单信息进行库存操作
      3)如果库存系统出现异常,库存消费消息失败的情况下消息就重回队列了,等待下次发送

  • 削峰和填谷

    • 当我们下游服务处理不过来的时候,就可以将这些消息缓存在一个地方,逐步处理
    • 将短暂一段时间的业务积压在后面缓慢执行就是削峰和填谷的过程

思考

  1. 生产端的可靠性投递;
    • 如果消息和钱有关,这个消息一定不能丢失
    • 需要做到生产端100%投递,就需要和业务数据保证原子性
  2. 消费端的幂等;
    • 生产端如果要做到可靠性投递,可能会有重复投递
    • 消费端消费了两次或多次这个数据可能会不一致
    • 所以消费端一定要做到同一个请求消费多次得到的结果一样
  3. MQ 本身需要考虑
    • HA:高可用
    • 低延迟
    • 可靠性:确保数据是完整的
    • 堆积能力:这是MQ能扛下你的业务量级的保证
    • 扩展性:是否能够天然支持横向扩展无感知扩容

RabbitMq 集群架构原理解析

1)主备模式
master-slave结构,可以理解为热备份,master负责读写,master宕机后切换到slave

2)镜像模式
业界主流使用比较多的模式;
RabbitMQ集群非常经典的就是镜像模式,保证数据100%不丢失;
高可用、数据同步低延迟、奇数个节点。

缺点:
镜像队列集群的缺陷是无法进行很好的横向扩容,因为每个节点都是一个完整的互相复制的节点,并且镜像节点过多也会增加MQ的负担,一个数据写入就要复制到多个节点,吞吐量也会降低
RabbitMq 的理论及应用示例(一)

RabbitMq 的安装和使用

RabbitMq-3.8.19安装详解 前面我已经介绍了一篇,再此省略。

修改用户登录和连接心跳

  • 将 loopback_users.guest = false,前面的注释去掉
  • 将 {heartbeat, 60} 修改为 {heartbeat, 10}

查看MQ端口是否启用:yum -y install lsof

  • lsof -i:5672

启动插件:

  • rabbitmq-plugins enable rabbitmq_management

查看管理后台是否启动

  • lsof -i:15672

常用命令

# 启动服务
systemctl start rabbitmq-server
# 或者
rabbitmq-server -detached
# 开启web管理界面插件
rabbitmq-plugins enable rabbitmq_management

# 关闭应用
rabbitmqctl stop_app

# 启动应用
rabbitmqctl start_app

# 节点状态
rabbitmqctl status

# 添加用户密码
rabbitmqctl add_user username password

# 修改用户密码
rabbitmqctl change_password username password

# 列出所有用户
rabbitmqctl list_users

# 删除用户
rabbitmqctl delete_user username

# 列出用户权限
rabbitmqctl list_user_permissions username

# 清除用户权限
rabbitmqctl clear_permissions -p vhostpath username

# 设置用户权限
# 三个*对应:configure write read
rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"
rabbitmqctl set_permissions -p / gavin ".*" ".*" ".*"

# 列出所有虚拟主机
rabbitmqctl list_vhosts

# 创建虚拟主机
rabbitmqctl add_vhost vhostpath

# 列出虚拟主机的权限
rabbitmqctl list_permissions -p vhostpath

# 删除虚拟主机
rabbitmqctl delete_vhost vhostpath

# 查看所有队列
rabbitmqctl list_queues

# 清除队列里的消息
rabbitmqctl -p vhostpath purge_queue queueName

# 清除所有数据
rabbitmqctl reset # 这个动作最好在MQ服务停掉后操作

springboot 整合 rabbitmq

发送

搭建一个 SpringBoot 工程及准备工作,我在另一篇博客写了,这里不再重复。SpringBoot全局异常处理、集成Swagger和参数必填校验 ,我们现在需要的准备工作,都在这篇准备了。

新增 application.yaml

server:
  port: 8088

spring:
  rabbitmq:
    host: 192.168.150.130
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 10000

消息实体

package cn.com.springboot.vo;

import lombok.Data;

import java.io.Serializable;

@Data
public class OrderInfo implements Serializable {

    private String id;

    private String orderName;

    private String messageId;
}

发送类

package cn.com.springboot.web;

import cn.com.springboot.vo.OrderInfo;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class OrderSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private static final String ORDER_EXCHANGE = "order_exchange";

    private static final String ORDER_ROUTING_KEY = "order_r_key";

    public void sendOrder(OrderInfo orderInfo){
        //correlationData:消息唯一id
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(orderInfo.getMessageId());

        //String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData
        rabbitTemplate.convertAndSend(ORDER_EXCHANGE, ORDER_ROUTING_KEY, orderInfo, correlationData);
    }
}

rabbitmq 准备工作

创建 exchange
RabbitMq 的理论及应用示例(一)
Exchange的相关属性说明

  • Name:exchange的名称

  • Type类型说明

    • direct:exchange在和queue进行binding时会设置routingkey,只有routingkey完全相同,exchange才会将消息转发到对应的queue上,相当于点对点

    • fanout:直接将消息路由到所有绑定的队列中,无需对消息的routingkey进行匹配操作,因为不绑定routingkey,所有也是消息转发最快的(广播方式)

    • topic:此类型的exchange和direct差不多,但direct类型要求routingkey完全相同,而topic可以使用通配符:‘*’,‘#’

      其中‘*’表示匹配一个单词,‘#’则表示匹配没有或者多个单词

    • header:路由规则是根据header来判断

    • 总结:一般direct和topic用来具体的路由信息,如果用广播就使用fanout,header类型用的比较少

  • Durability:Durable是持久化到磁盘的意思

  • Auto Delete:如果设置为yes,那么最后一个绑定到exchange上的队列被删除后,exchange也会自动删除

  • Internal:如果为yes则表明该exchange是rabbitmq内部使用,不提供给外部系统应用,一般是在自己编写erlang语言做定制化扩展时使用

  • Arguments这个是扩展AMQP协议时自定义使用的内容

创建Queue
RabbitMq 的理论及应用示例(一)
Exchange和Queue通过Binding关联,由routingkey进行路由
RabbitMq 的理论及应用示例(一)
RabbitMq 的理论及应用示例(一)
RabbitMq 的理论及应用示例(一)
RabbitMq 的理论及应用示例(一)

测试发送

package cn.com.springboot.web;

import cn.com.springboot.vo.OrderInfo;
import cn.com.springboot.vo.ResultVo;
import io.swagger.annotations.Api;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@Api("发送消息")
@RestController
public class RabbitMqController {
    @Autowired
    private OrderSender orderSender;

    @PostMapping("/sendOrder")
    public ResultVo sender(@RequestBody OrderInfo orderInfo){
        orderSender.sendOrder(orderInfo);
        return ResultVo.success();
    }
}

用 swagger 测试
RabbitMq 的理论及应用示例(一)
消息成功发送到队列
RabbitMq 的理论及应用示例(一)
注意:

  • 一个exchange可以绑定多个queue,只要routingkey一样,一个消息就会发送到多个queue上
  • exchange绑定一个queue,无论binding多少个routingkey,只要符合这个routingkey规则的消息都会发送到这个队列中,接收的时候无论从哪个routingkey过来的消息,连接这个队列的消费端都会消费掉,相当于多个消息规则对应一个队列

消息接收

创建另一个工程:consumer-and-producer,搭建步骤和上面的基本一样。
application.yaml

server:
  port: 8080

spring:
  rabbitmq:
    host: 192.168.150.130
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 10000
    listener:
      simple:
        concurrency: 5              # 初始化并发数
        max-concurrency: 10         # 最大并发数
        auto-startup: true          # 自动开启监听
        prefetch: 1                 # 每个连接同一时间最多处理几个消息,限流设置
        acknowledge-mode: manual    # 签收模式为手动签收

添加消费类

package cn.com.springboot.web;

import cn.com.springboot.vo.OrderInfo;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

@Log4j2
@Component
public class OrderReceiver {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "order_queue", durable = "true"),
            exchange = @Exchange(value = "order_exchange", type = "topic"),//durable默认是true
            key = "order_r_key"//我的routingKey是order_r_key
    ))
    @RabbitHandler
    public void receiveOrderInfo(@Payload OrderInfo orderInfo,
                                 @Headers Map<String, Object> headers,
                                 Channel channel) throws IOException {
        log.info("开始消费");
        log.info("orderName:{}, messageId:{}", orderInfo.getOrderName(), orderInfo.getMessageId());

        Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);

        channel.basicAck(deliveryTag, false);
    }
}

启动测试
RabbitMq 的理论及应用示例(一)

暂时分享到这,欢迎指正!

上一篇:消息队列 RabbitMq 的学习和应用


下一篇:Java-RabbitMq-死信队列