什么是RabbitMq
RabbitMQ是一个基于AMQP协议的开源的消息代理和队列服务器。
优点:
- 采用Erlang语言进行开发作为底层语言实现:Erlang有着和原生Socket一样的延迟,所以性能非常高
- 开源、性能优秀,稳定性保障
- 提供可靠性消息投递模式(confirm)、返回模式(return)
- 与SpringAMQP完美整合,API丰富
- 集群模式比较丰富,表达式配置,HA模式,镜像队列模型
- 保证数据不丢失的前提做到高可靠性,可用性
AMOP 专有名词:
- Server:又称为 Broker
- Connection:连接,应用程序和broker之间的网络连接
- Channel:网络信道,一个网络会话的任务
- Message:消息
- Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个Virtual host里面可以有若干个Exchage和Queue,但同一个Virtual host里面不能有相同名称的Exchage和Queue
- Exchange:交换机,接收消息,根据路由键转发消息到绑定队列
- Binding:Exchage和Queue之间的虚拟连接,binding中可以包含routing key
- Routing key:一个路由规则
- Queue:保存具体的消息的容器
RabbitMQ 消息的流转过程:
应用场景/作用
-
异步缓冲
有些业务可以进行异步的,只要做到最终一致性,不用强一致性,即可用MQ -
服务解耦
-
强依赖:使用 dubbo 或 springCloud 进行服务的调用和连接都是强依赖。【比如注册、发现都需要依赖其他服务】
-
弱依赖:MQ 中间件
- 不代表弱依赖就可以失败
- 如果不能失败就要保证上游的消息发布端数据投递的可靠性
场景举例:用户下单后,订单需要更新库存
强依赖下会出现的问题:
1)假如库存系统无法访问,则订单减库存失败,从而导致订单生成失败
2)订单模块和库存模块是强耦合的
3)如果启用一个线程做离线操作,只是做了异步访问,访问只是提升速度,是否正 常调用成功是无法保证的通过弱依赖来解决以上问题:
1)订单生产成功写入消息到消息队列(保证消息的可靠投递)
2)库存系统通过订阅消息获取下单信息,库存系统根据下单信息进行库存操作
3)如果库存系统出现异常,库存消费消息失败的情况下消息就重回队列了,等待下次发送
-
-
削峰和填谷
- 当我们下游服务处理不过来的时候,就可以将这些消息缓存在一个地方,逐步处理
- 将短暂一段时间的业务积压在后面缓慢执行就是削峰和填谷的过程
思考
- 生产端的可靠性投递;
- 如果消息和钱有关,这个消息一定不能丢失
- 需要做到生产端100%投递,就需要和业务数据保证原子性
- 消费端的幂等;
- 生产端如果要做到可靠性投递,可能会有重复投递
- 消费端消费了两次或多次这个数据可能会不一致
- 所以消费端一定要做到同一个请求消费多次得到的结果一样
- MQ 本身需要考虑
- HA:高可用
- 低延迟
- 可靠性:确保数据是完整的
- 堆积能力:这是MQ能扛下你的业务量级的保证
- 扩展性:是否能够天然支持横向扩展无感知扩容
RabbitMq 集群架构原理解析
1)主备模式
master-slave结构,可以理解为热备份,master负责读写,master宕机后切换到slave
2)镜像模式
业界主流使用比较多的模式;
RabbitMQ集群非常经典的就是镜像模式,保证数据100%不丢失;
高可用、数据同步低延迟、奇数个节点。
缺点:
镜像队列集群的缺陷是无法进行很好的横向扩容,因为每个节点都是一个完整的互相复制的节点,并且镜像节点过多也会增加MQ的负担,一个数据写入就要复制到多个节点,吞吐量也会降低
RabbitMq 的安装和使用
RabbitMq-3.8.19安装详解 前面我已经介绍了一篇,再此省略。
修改用户登录和连接心跳
- 将 loopback_users.guest = false,前面的注释去掉
- 将 {heartbeat, 60} 修改为 {heartbeat, 10}
查看MQ端口是否启用:yum -y install lsof
- lsof -i:5672
启动插件:
- rabbitmq-plugins enable rabbitmq_management
查看管理后台是否启动
- lsof -i:15672
常用命令
# 启动服务
systemctl start rabbitmq-server
# 或者
rabbitmq-server -detached
# 开启web管理界面插件
rabbitmq-plugins enable rabbitmq_management
# 关闭应用
rabbitmqctl stop_app
# 启动应用
rabbitmqctl start_app
# 节点状态
rabbitmqctl status
# 添加用户密码
rabbitmqctl add_user username password
# 修改用户密码
rabbitmqctl change_password username password
# 列出所有用户
rabbitmqctl list_users
# 删除用户
rabbitmqctl delete_user username
# 列出用户权限
rabbitmqctl list_user_permissions username
# 清除用户权限
rabbitmqctl clear_permissions -p vhostpath username
# 设置用户权限
# 三个*对应:configure write read
rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"
rabbitmqctl set_permissions -p / gavin ".*" ".*" ".*"
# 列出所有虚拟主机
rabbitmqctl list_vhosts
# 创建虚拟主机
rabbitmqctl add_vhost vhostpath
# 列出虚拟主机的权限
rabbitmqctl list_permissions -p vhostpath
# 删除虚拟主机
rabbitmqctl delete_vhost vhostpath
# 查看所有队列
rabbitmqctl list_queues
# 清除队列里的消息
rabbitmqctl -p vhostpath purge_queue queueName
# 清除所有数据
rabbitmqctl reset # 这个动作最好在MQ服务停掉后操作
springboot 整合 rabbitmq
发送
搭建一个 SpringBoot 工程及准备工作,我在另一篇博客写了,这里不再重复。SpringBoot全局异常处理、集成Swagger和参数必填校验 ,我们现在需要的准备工作,都在这篇准备了。
新增 application.yaml
server:
port: 8088
spring:
rabbitmq:
host: 192.168.150.130
username: guest
password: guest
virtual-host: /
connection-timeout: 10000
消息实体
package cn.com.springboot.vo;
import lombok.Data;
import java.io.Serializable;
@Data
public class OrderInfo implements Serializable {
private String id;
private String orderName;
private String messageId;
}
发送类
package cn.com.springboot.web;
import cn.com.springboot.vo.OrderInfo;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class OrderSender {
@Autowired
private RabbitTemplate rabbitTemplate;
private static final String ORDER_EXCHANGE = "order_exchange";
private static final String ORDER_ROUTING_KEY = "order_r_key";
public void sendOrder(OrderInfo orderInfo){
//correlationData:消息唯一id
CorrelationData correlationData = new CorrelationData();
correlationData.setId(orderInfo.getMessageId());
//String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData
rabbitTemplate.convertAndSend(ORDER_EXCHANGE, ORDER_ROUTING_KEY, orderInfo, correlationData);
}
}
rabbitmq 准备工作
创建 exchange
Exchange的相关属性说明
-
Name:exchange的名称
-
Type类型说明
-
direct:exchange在和queue进行binding时会设置routingkey,只有routingkey完全相同,exchange才会将消息转发到对应的queue上,相当于点对点
-
fanout:直接将消息路由到所有绑定的队列中,无需对消息的routingkey进行匹配操作,因为不绑定routingkey,所有也是消息转发最快的(广播方式)
-
topic:此类型的exchange和direct差不多,但direct类型要求routingkey完全相同,而topic可以使用通配符:‘*’,‘#’
其中‘*’表示匹配一个单词,‘#’则表示匹配没有或者多个单词
-
header:路由规则是根据header来判断
-
总结:一般direct和topic用来具体的路由信息,如果用广播就使用fanout,header类型用的比较少
-
-
Durability:Durable是持久化到磁盘的意思
-
Auto Delete:如果设置为yes,那么最后一个绑定到exchange上的队列被删除后,exchange也会自动删除
-
Internal:如果为yes则表明该exchange是rabbitmq内部使用,不提供给外部系统应用,一般是在自己编写erlang语言做定制化扩展时使用
-
Arguments这个是扩展AMQP协议时自定义使用的内容
创建Queue
Exchange和Queue通过Binding关联,由routingkey进行路由
测试发送
package cn.com.springboot.web;
import cn.com.springboot.vo.OrderInfo;
import cn.com.springboot.vo.ResultVo;
import io.swagger.annotations.Api;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@Api("发送消息")
@RestController
public class RabbitMqController {
@Autowired
private OrderSender orderSender;
@PostMapping("/sendOrder")
public ResultVo sender(@RequestBody OrderInfo orderInfo){
orderSender.sendOrder(orderInfo);
return ResultVo.success();
}
}
用 swagger 测试
消息成功发送到队列
注意:
- 一个exchange可以绑定多个queue,只要routingkey一样,一个消息就会发送到多个queue上
- exchange绑定一个queue,无论binding多少个routingkey,只要符合这个routingkey规则的消息都会发送到这个队列中,接收的时候无论从哪个routingkey过来的消息,连接这个队列的消费端都会消费掉,相当于多个消息规则对应一个队列
消息接收
创建另一个工程:consumer-and-producer,搭建步骤和上面的基本一样。
application.yaml
server:
port: 8080
spring:
rabbitmq:
host: 192.168.150.130
username: guest
password: guest
virtual-host: /
connection-timeout: 10000
listener:
simple:
concurrency: 5 # 初始化并发数
max-concurrency: 10 # 最大并发数
auto-startup: true # 自动开启监听
prefetch: 1 # 每个连接同一时间最多处理几个消息,限流设置
acknowledge-mode: manual # 签收模式为手动签收
添加消费类
package cn.com.springboot.web;
import cn.com.springboot.vo.OrderInfo;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Log4j2
@Component
public class OrderReceiver {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "order_queue", durable = "true"),
exchange = @Exchange(value = "order_exchange", type = "topic"),//durable默认是true
key = "order_r_key"//我的routingKey是order_r_key
))
@RabbitHandler
public void receiveOrderInfo(@Payload OrderInfo orderInfo,
@Headers Map<String, Object> headers,
Channel channel) throws IOException {
log.info("开始消费");
log.info("orderName:{}, messageId:{}", orderInfo.getOrderName(), orderInfo.getMessageId());
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
}
}
启动测试
暂时分享到这,欢迎指正!