MQ概述
消息队列,实在消息传输过程中保存消息的容器,多用于分布式系统之间进行通信
MQ优缺点
优点 | 缺点 |
应用解耦 | 系统可用性降低 |
异步提速 | 系统复杂性提高 |
削峰填谷 | 一致性问题 |
在之前做的项目中注册功能需要邮件微服务来发送邮件验证码,商品详情修改需要通知搜索微服务和静态页微服务修改他们的内容。
使用feign远程调用
第一时间想到的就是通过feign直接进行服务间调用,在注册为服务中引入邮件为服务的client,在商品为服务中引入搜索和静态页为服务的相关功能,如下
耦合高:可以看到注册服务依赖与邮件服务,,邮件服务修改了,注册服务大概率也要修改
同步调用:只有调用邮件服务成功才能返回。大量请求发来,执行速度本来就慢了,更加承受不住压力
使用MQ
解耦:引入MQ后注册服务不依赖与邮件服务了。只需要发消息给MQ
异步提速:注册服务只管发消息给MQ就返回(速度提升了,注册服务的吞吐量提高了),不管邮件服务是否发送成功。(发送失败就失败了,大家在注册或者登陆一些网站时候需要短信验证也有遇到过页面提示成功,但是短信没收到的情况吧。。。)
削峰填谷:大量请求发来时候,都打在了MQ上,服务可以慢慢从MQ中消费消息
系统可用性降低:因为引入了MQ整个系统的稳定性还需要MQ稳定,MQ挂了对整个系统就会造成影响。(外部依赖引入越多可用性就低,需要保证MQ稳定高可用)
系统复杂性提高:消息重复消费问题,消息丢失,消息的顺序性
一致性问题:A系统处理完业务,通过MQ给bcd3个系统发送消息,B,C处理成功,D处理失败,如何保证消息数据处理的一致性问题
其他:生产者不需要从消费者处获得返回值才能使用MQ。
主要的MQ产品包括:
RabbitMQ:基于AMQP,erlang语言开发,稳定性好 ,吞吐量万级(其次),消息延迟微妙级,并发能力强,性能好,社区活跃,管理界面丰富
ActiveMQ:基于jms,apache 吞吐量万级()最差 消息延迟毫秒级
RocketMQ:自定义协议,阿里巴巴出品,目前交由apache 吞吐量十万级(最高) 消息延迟毫秒级
ZeroMQ
Kafka:scala语言开发 自定义协议 分布式消息服务,高吞吐量十万级(次之) 消息延迟毫秒级 ,主要用于大数据
IBM WebSphere 等。
rabbit相关概念
架构图:
Broker:接收和分发消息的应用,RabbitMQ server就是Message Broker
Virtual host:处于多租户和安全的因素设计,吧AMQP的基本组件划分到一个虚拟的分组中,类似网络中的namespace概念,多个不同的用户使用同一个RabbitMQ server提供的服务的同时,可以划分处多个vhost,每个用户在自己的vhost创建交换机和路由
Connection:生产者和消费者和brokr之间的连接
Chanal:避免每次访问mq都创建连接,就使用chanal在内部创建逻辑连接,减少建立连接的消耗
Pubilsher:生产者
Consumer:消费者
Exchange:交换机,消息到达broker的第一站,根据分发规则,匹配查询表中的routing key 分发消息到队列中去,有direct(p2p),topic,fanout
Queue:队列,消息消息最终送到这里等待消费者取走
binding:交换机和队列之间的虚拟连接,binding中包含routing key ,binding的信息被保存到交换机的查询表中,用于message的分发依据
Routes:路由
6种工作模式
RabbitMQ提供了6种消息模型,第6种其实是RPC(远程调用),3、4、5这三种都属于订阅模型,只不过进行路由的方式不同。
第一种:简单模型
RabbitMQ是一个消息代理:它接受和转发消息。 你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。 在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。
RabbitMQ接受,存储和转发数据消息的二进制数据块。
P(producer/ publisher):生产者,一个发送消息的用户应用程序。
C(consumer):消费者,消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序
队列(红色区域):rabbitmq内部类似于邮箱的一个概念。虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。
总之:生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区。
消息一旦被消费者接收,队列中的消息就会被删除。
那么问题来了:RabbitMQ怎么知道消息被接收了呢?如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!
因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:
自动ACK:消息一旦被接收,消费者自动发送ACK
手动ACK:消息接收后,不会发送ACK,需要手动调用
哪种更好呢?这需要看消息的重要性:
如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便
如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK;否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。
(手动ACK:银行转账的过程中,消费方出现了异常,消息依然是被消费了,钱就不翼而飞了)
面试题:如何避免消息丢失?
消费者接受到消息,还没消费就异常:使用手动ACK,等消费方接受到消息后再手动发送ACK
MQ在消费者消费之前宕机:将消息持久化,前提是:队列、Exchange都持久化
第二种:工作模型

工作队列,又称任务队列。主要思想就是避免执行资源密集型任务时,必须等待它执行完成。
相反我们稍后完成任务,我们将任务封装为消息并将其发送到队列。
在后台运行的工作进程将获取任务并最终执行作业。
当你运行许多消费者时,任务将在他们之间共享,但是一个消息只能被一个消费者获取。这个概念在Web应用程序中特别有用,因为在短的HTTP请求窗口中无法处理复杂的任务。
我们可以使用basicQos方法和prefetchCount = 1设置,告诉RabbitMQ一次不要向工作人员发送多于一条消息。 或者换句话说,不要向工作人员发送新消息,直到它处理并确认了前一个消息。 相反,它会将其分派给不是仍然忙碌的下一个工作人员。 也就是“能者多劳”,性能强的消费方可以消费多一些,性能差的消费方就消费少一些
面试题:如何避免消息堆积?
采用workqueue,多个消费者监听同一队列。
消费方接收到消息以后,通过线程池异步消费。
(六)订阅模型分类
在之前的模式中,我们创建了一个工作队列。 工作队列背后的假设是:每个任务只被传递给一个工作人员。 在这一部分,我们将做一些完全不同的事情 - 我们将会传递一个信息给多个消费者。 这种模式被称为“发布/订阅”。
订阅模型示意图:

解读:1个生产者,多个消费者
每一个消费者都有自己的一个队列
生产者没有将消息直接发送到队列,而是发送到了交换机
每个队列都要绑定到交换机
生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的
X(Exchanges):交换机一方面接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
Exchange类型有以下几种:
Fanout:广播,将消息交给所有绑定到交换机的队列 Direct:定向,把消息交给符合指定routing key 的队列 Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力
因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!!第三种:订阅模型:Fanout(广播模型)
Fanout:广播

在广播模式下,消息发送流程是这样的:可以有多个消费者
每个消费者有自己的queue(队列)
每个队列都要绑定到Exchange(交换机)
生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
交换机把消息发送给绑定过的所有队列
队列的消费者都能拿到消息。实现一条消息被多个消费者消费
第四种:订阅模型:Direct(路由模型)
Direct:有选择性的接收消息
我们将添加一个功能 :我们将只能订阅一部分消息。
例如:我们只能将重要的错误消息引导到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey
P:生产者,向Exchange发送消息,发送消息时,会指定一个RoutingKey
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要RoutingKey为 error 的消息
C2:消费者,其所在队列指定了需要RoutingKey为 info、error、warning 的消息
第五种:订阅模型:Topic(通配符模型)
Topic:使用通配符绑定RoutingKey
通配符规则:
# :匹配一个或多个词
* :匹配不多不少恰好1个词
docker安装rabbitmq
SpringAMQP的简介
SpringAMQP是对AMQP协议的抽象实现,而 spring-rabbit 是对协议的具体实现,也是目前的唯一实现。底层使用的就是RabbitMQ。
添加AMQP的启动器:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring整合rabbitmq
生产者配置
spring:
rabbitmq:
host: 192.168.152.129 #rabbitmqip
username: leyou #用户名
password: leyou #密码
virtual-host: /leyou #虚拟主机
template:
retry:
enabled: true #开启重试
initial-interval: 10000ms
max-interval: 30000ms
multiplier: 2
exchange: ly.item.exchange #发送到这个交换机
生产者发送消息
消费者配置
rabbitmq:
host: 192.168.152.129
username: leyou
password: leyou
virtual-host: /leyou
消费者监听接收消息并消费消息
@Component
public class ItemListener {
@Autowired
private PageService pageService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "search.item.insert.queue", durable = "true"),
exchange = @Exchange(
value = "ly.item.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"item.insert", "item.update"}))
public void listenInsertOrUpdate(Long spuId) {
if (spuId == null) {
return;
}
//处理消息,创建或者修改静态页
pageService.createHtml(spuId);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "search.item.delete.queue", durable = "true"),
exchange = @Exchange(
value = "ly.item.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"item.delete"}))
public void listenDelete(Long spuId) {
if (spuId == null) {
return;
}
//处理消息,伤处静态页
pageService.deleteHtml(spuId);
}
}
rabbitmq高级特性
消息可靠投递
生产者确认
rabbitmq提供了俩种方式来控制消息的可靠投递
- confirm:确认模式
消息从生产者到交换机会返回一个confirmCallback
- 配置spring.rabbitmq.publisher-confirms: true开启
- 然后rabbitmqtemplate.setConfirmCallback()设置回调方法
消息发送成功ack就为true
- return退回模式
消息从交换机到队列投递失败会返回一个returncallback
- 开启回退模式:
spring.rabbitmq.publisher-return: true
- 设置returncallback
rabbitmqtemplate.setReturnCallback()
- 设置交换机处理消息的模式
1.如果消息每到路由就丢弃,(默认)
2.如果消息没到路由返回给消息发送方,通过这个方法设置rabbitmqtemplate.setMandatory(true)
利用这俩个callback来控制消息的可靠投递
消费端确认
- 自动确认:消费端接收到消息就确认,不管是否处理成功
- 手动确认:消费端处理完消息手动确认
rabbitmq: listener: simple: acknowledge-mode: manual #手动签收
NONE:不确认
AUTO:自动确认
MANUAL:手动确认
import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class MyAckListener { /** * * @param message 队列中的消息; * @param channel 当前的消息队列; * @param tag 取出来当前消息在队列中的的索引, * 用这个@Header(AmqpHeaders.DELIVERY_TAG)注解可以拿到; * @throws IOException */ @RabbitListener(queues = "direct_boot_queue") public void myAckListener(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException { System.out.println(message); try { /** * 无异常就确认消息 * basicAck(long deliveryTag, boolean multiple) * deliveryTag:取出来当前消息在队列中的的索引; * multiple:为true的话就是批量确认,如果当前deliveryTag为5,那么就会确认 * deliveryTag为5及其以下的消息;一般设置为false */ channel.basicAck(tag, false); }catch (Exception e){ /** * 有异常就绝收消息 * basicNack(long deliveryTag, boolean multiple, boolean requeue) * requeue:true为将消息重返当前消息队列,还可以重新发送给消费者; * false:将消息丢弃 */ channel.basicNack(tag,false,true); } } }
持久化
交换机,队列,消息持久化:
durable = "true" 持久化
消费端限流
每次拉取n条消息,确认后才接收下一条消息
- 确保ack为手动确认
- 配置perfetch,配置单个请求处理消息的个数
TTL
ttl全程存活时间/过期时间
当消息到达存活时间还未被消费会被自动清除
rabbit可以对消息设置过期时间,也可以对整个队列消息设置过期时间
- 如果设置了队列过期时间,也设置了消息过期时间,他会以短的为准
- 队列消息过期后会将所有消息移除
- 消息过期后,只有消息在队列顶端,才判断是否过期
死信队列
又称死信交换机,当消息绑定死信队列后,消息设置了过期时间,并且过期后会发送给死信交换机,死信交换机可以发送给其他队列
称为死信的3中情况:
- 队列消息长度达到限制
- 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false
- 原队列消息过期设置,消息超时未被消费
队列绑定死信交换机:
延迟队列
消息进入队列后不会立即被消费,只有到达指定时间后才会被消费
- 下单后30分钟未支付,取消订单,回滚库存
- 新用户主策7天后,发送短信问候
实现方式:
- 定时器
- 延迟队列
日志与监控
也可以通过图形化界面看,15679端口就是管理界面端口
25679是集群端口
5679是编程语言客户端接口
消息追踪
rabbitmq应用问题
1.消息可靠性保障
2.消息幂等性保障
幂等性是指一次或者多次请求某一个资源,对于资源本身应该具有同样的结果,也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同
在mq中指消费多条相同的消息,得到的结果与消费一次该消息一样
集群搭建(高可用)
镜像队列:可以把队列内容复制到集群中的每个节点上
HAProxy反向代理(负载均衡):