Message Queue
消息队列
mq的相关概念
-
定义
-
为什么要用mq
- 流量消峰
- 应用解耦
- 异步处理
-
分类
-
ActiveMQ
- 优点:单机吞吐量万级,时效性 ms 级,可用性高,基于主从架构实现高可用性,消息可靠性较低,有概率丢失数据
- 缺点:维护越来越少,高吞吐量场景较少使用
-
kafka
- 为大数据而生,百万级 TPS
- 优点:吞吐量高,时效性 ms 级,可用性非
常高,分布式的,日志领域比较成熟 - 缺点:社区更新较慢,消息实时性取决于轮询;队列过多,load增加
-
RocketMQ
- 优点:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到 0 丢失,支持 10 亿级别的消息堆积
- 缺点:支持的客户端语言不多,目前是 java 及 c++,其中 c++不成熟
-
RabbitMQ
- 优点:性能较好;吞吐量到万级,支持多种语言,社区活跃度高
- 缺点:商业版需要收费,学习成本较高
-
-
选择
-
kafka
- 大型互联网公司
- 需要日志收集功能
-
RocketMQ
- 金融互联网
- 可靠性要求高
-
RabbitMQ
- 中小型公司
-
RabbitMQ
-
概念
- 消息中间件,接受并转发消息
-
四大核心概念
- 生产者
- 交换机
- 队列
- 消费者
-
六大核心部分
- 简单模式:『Hello World』
- 工作模式:『Work queues』
- 发布订阅模式:『Publish/Subscribe』
- 路由模式:『Routing』
- 主题模式:『Topics』
- 发布确认模式:『Publisher Confirms』
-
专业名词
-
AMQP
- Advanced Message Queuing Protocol, 高级消息队列协议
-
Broker
- 接受和分发消息的应用
- Rabbit server
-
virtual host
- 一个Rabbit Server可以划分多个Vhost
- 每个用户在自己的vhost创建exchange/queue
-
Connection
-
Channel
- 在 connection 内部建立的逻辑连接
-
Exchange
-
分发消息
-
类型
- direct: point to point
- topic: publish-subscribe
- fanout: multicast
-
-
Queue
-
Binding
-
Hello world
生产者
-
创建一个连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("127.0.0.1");
- factory.setUsername("guest");
- factory.setPassword("guest");
-
创建一个连接
- Connection connection = factory.newConnection();
-
创建一个channel
- Channel channel = connection.createChannel();
-
声明队列
-
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- 1.队列名称
- 2.队列里面的消息是否持久化 默认消息存储在内存中
- 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
- 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
- 5.其他参数
-
-
发送一个消息
-
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
- 1.发送到那个交换机
- 2.路由的 key 是哪个
- 3.其他的参数信息
- 4.发送消息的消息体
-
消费者
-
创建一个连接工厂
-
创建一个连接
-
创建一个channel
-
消费消息
-
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
- 1.消费哪个队列
- 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
- 3.消费者成功消费的回调
- 4.消费者未成功消费的回调
-
work queue
轮训分发消息
- 默认
消息应答
-
概念
-
自动应答
-
消息应答的方法
- Channel.basicAck()
- Channel.basicNack()
- Channel.basicReject()
持久化
-
概念
-
队列持久化
-
生产者
-
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
- 需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新
创建一个持久化的队列,不然就会出现错误
- 需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新
-
-
消息持久化
-
生产者
-
MessageProperties.PERSISTENT_TEXT_PLAIN
- channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
-
-
不公平分发
- 消费者
- channel.basicQos(1);
-
预取值
-
prefecth
- 定义通道上允许的未确认消息的最大数量
- 一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认
-
channel.basicQos(prefetch);
-
prefetch=0
- 公平分发
-
prefetch!=0
- 不公平分发,预先分配
-
-
发布确认
概念
-
确认消息是否被消费者"成功消费"
-
前提条件
- 生产者发布的消息已经"成功"发送出去了
-
标准
-
保证消息不会丢失的唯一方法是使用事务:在通道上开启事务,发布消息,提交事务
- 吞吐量大幅降低
-
-
解决方式
- 引入了 发布者确认(Publisher Confirms) 机制,它是模仿AMQP协议中的消费者消息确认机制
原理
- 生产者将信道设置成 confirm 模式
- 所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始)
- 消息确认
发布确认的策略
-
开启发布确认的方法
- channel.confirmSelect();
-
单个确认发布
-
批量确认发布
-
异步确认发布
-
如何处理异步未确认消息
- 最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,
- 比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。
-
3中发布确认速度对比
-
同步
- 简单,但吞吐量非常有限
-
批量
- 简单,合理的吞吐量
- 一旦出现问题,很难推断出是那条消息出现了问题
-
异步
- 最佳性能和资源使用,在出现错误的情况下可以很好地控制
- 实现起来稍微难些
-
Exchanges交换机
交换机
-
概念
- 生产者生产的消息从不会直接发送到队列
- 生产者只能将消息发送到交换机(exchange)
- 交换机必须确切知道如何处理收到的消息
-
类型
- 扇出 fanout
- 直接 direct
- 主题 topics
- 标题 headers
-
无名exchange
-
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
- 第一个参数是交换机的名称。空字符串表示默认或无名称交换机
- 消息能路由发送到队列中其实是由 routingKey(bindingkey)绑定 key 指定的,如果它存在的话
-
临时队列
- 随机名称的队列
- 一旦我们断开了消费者的连
接,队列将被自动删除。 - String queueName = channel.queueDeclare().getQueue();
绑定(bindings)
- exchange 和 queue 之间的桥梁
- exchange 和哪个队列进行了绑定关系
fanout
- 广播
Direct
- 指定队列
topic
-
一定规则
-
routing_key
-
必须是一个单词列表,以点号分隔开
-
词列表最多不能超过255个字节
-
规则替换符
-
- 可以代替一个单词
-
可以代替零个或多个单词
-
-
-
-
结论
- 当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像fanout了
- 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是direct了
死信队列
概念
-
原因导致 queue 中的某些消息无法被消费
-
应用场景
- 消息发生异常
- 订单超时未支付
来源
- 消息 TTL 过期
- 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
- 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.
延迟队列
概念
- 延时队列就是用来存放需要在指定时间被处理的元素的队列。
使用场景
- 1.订单在十分钟之内未支付则自动取消
- 2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
- 3.用户注册成功后,如果三天内没有登陆则进行短信提醒。
- 4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。
- 5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
rabbitMq中的ttl
-
概念
- 一条消息或者该队列中的所有
消息的最大存活时间 - 单位:ms
- 一条消息或者该队列中的所有
-
消息设置ttl
- message.getMessageProperties().setExpiration(ttlTime);
-
队列设置ttl
- arguments.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
- arguments.put("x-message-ttl", 40000);
-
区别
- 如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中
- 第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间
- 如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以
直接投递该消息到消费者,否则该消息将会被丢弃
-
队列ttl
-
延时队列优化
-
rabbitmq插件实现延迟队列
发布确认高级
确认机制方案
-
配置文件
-
spring.rabbitmq.publisher-confirm-type=correlated
- NONE
- CORRELATED
- SIMPLE
-
回退消息
-
Mandatory 参数
- 当消息传递过程中不可达目的地时将消息返回给生产者
备份交换机
- mandatory 参数与备份交换机可以一起使用
- 备份交换机优先级高
RabbitMq其他知识点
幂等性
-
概念
- 用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。
-
解决思路
-
唯一ID+指纹码机制
-
Redis原子性
- 利用redis执行setnx命令,天然具有幂等性
-
优先级队列
-
概念
- 队列中消息具有优先级
-
做法
- 队列需要设置为优先级队列
- 消息需要设置消息的优先级
- 消费者需要等待消息已经发送到队列中
惰性队列
-
概念
- 惰性队列会尽可能的将消息存入磁盘中
-
两种模式
- default
- lazy
XMind - Trial Version