消息队列
Message queue 释义
服务之间最常见的通信方式是直接调用彼此来通信 , 消息从一端发出后立即就可以达到另一端 , 称为即时消息通讯 ( 同步通信 ) 消息从某一端发出后 , 首先进入一个容器进行临时存储 , 当达到某种条件后 , 再由这个容器发送给另一端 , 称为延迟消息通讯 ( 异步通信 )假设我们在淘宝下了一笔订单后 , 淘宝后台需要做这些事情: 1. 消息通知系统:通知商家 , 你有一笔新的订单 , 请及时发货 2. 推荐系统:更新用户画像 , 重新给用户推荐他可能感兴趣的商品 3. 会员系统:更新用户的积分和等级信息问题思考
createOrder (...) { // 完成订单服务 doCreateOrder (...); // 调用其他服务接口 sendMsg (...); updateUserInterestedGoods (...); updateMemberCreditInfo (...); }
存在问题
过度耦合:如果后面创建订单时 , 需要触发新的动作 , 那就得去改代码 , 在原有的创建订单函数末尾 , 再追加一行代码 缺少缓冲:如果创建订单时 , 会员系统恰好处于非常忙碌或者宕机的状态 , 那这时更新会员信息就会失败 , 我们需要一个地方 , 来暂时存放无法被消费的消息优化方案
我们需要一个消息中间件 , 来实现解耦和缓冲的功能 .案例分析
小红希望小明多读书 , 常寻找好书给小明看 , 之前的方式是这样:小红问小明什么时候有空 , 把书给小明送去 , 并亲眼监督小明读完书才走 . 久而久之 , 两人都觉得麻烦 . 后来的方式改成了:小红对小明说「我放到书架上的书你都要看」 , 然后小红每次发现不错的书都放到书架上 , 小明则看到书架上有书就拿下来看 .书架就是一个消息队列 , 小红是生产者 , 小明是消费者带来的好处 1. 小红想给小明书的时候 , 不必问小明什么时候有空 , 亲手把书交给他了 , 小红只把书放到书架上就行了 . 这样小红小明的时间都更* . 2. 小红相信小明的读书自觉和读书能力 , 不必亲眼观察小明的读书过程 , 小红只要做一个放书的动作 , 很节省时间 . 3. 当明天有另一个爱读书的小伙伴小强加入 , 小红仍旧只需要把书放到书架上 , 小明和小强从书架上取书即可 4. 书架上的书放在那里 , 小明阅读速度快就早点看完 , 阅读速度慢就晚点看完 , 没关系 , 比起小红把书递给小明并监督小明读完的方式 , 小明的压力会小一些 . 消息队列特点 1. 解耦 : 每个成员不必受其他成员影响 , 可以更独立自主 , 只通过一个简单的容器来联系 . 2. 提速 : 小红选只要做一个放书的动作 , 为自己节省了大量时间 . 3. 广播 : 小红只需要劳动一次 , 就可以让多个小伙伴有书可读 , 这大大地节省了她的时间 , 也让新的小伙伴的加入成本很低 . 4. 错峰与流控 : 小红给书的频率不稳定 , 如果今明两天连给了五本 , 之后隔三个月才又给一本 , 那小明只要在三个月内从书架上陆续取走五本书读完就行了 , 压力就不那么大了 .
Email邮件案例分析
有大量用户注册你的软件 , 再高并发情况下注册请求开始出现一些问题 . 例如邮件接口承受不住 , 或是分析信息时的大量计算使 cpu 满载 , 这将会出现虽然用户数据记录很快的添加到数据库中了 , 但是却卡在发邮件或分析信息时的情况 . 导致请求的响应时间大幅增长 , 甚至出现超时 , 这就有点不划算了 . 面对这种情况一般也是将这些操作放入消息队列 ( 生产者消费者模型 ), 消息队列慢慢的进行处理 , 同时可以很快的完成注册请 求 , 不会影响用户使用其他功能 .消息队列相关
AMQP
一个提供统一消息服务的应用层标准高级消息队列协议 , 是一个通用的应用层协议 消息发送与接受的双方遵守这个协议可以实现异步通讯 . 这个协议约定了消息的格式和工作方式RabbitMq
RabbitMQ 是一个实现了 AMQP(Advanced Message Queuing Protocol) 高级消息队列协议的消息队列服务 , 用 Erlang 语言 .Server(Broker): 接收客户端连接 , 实现 AMQP 协议的消息队列和路由功能的进程 . Virtual Host :虚拟主机的概念 , 类似权限控制组 , 一个 Virtual Host 里可以有多个 Exchange 和 Queue. Exchange: 交换机 , 接收生产者发送的消息 , 并根据 Routing Key 将消息路由到服务器中的队列 Queue. ExchangeType: 交换机类型决定了路由消息行为 ,RabbitMQ 中有三种类型 Exchange, 分别是 fanout 、 direct 、 topic. Message Queue :消息队列 , 用于存储还未被消费者消费的消息 . Message :由 Header 和 body 组成 ,Header 是由生产者添加的各种属性的集合 , 包括 Message 是否被持久化、优先级是多少、由哪个 Message Queue 接收等 .body 是真正需要发送的数据内 容 . BindingKey :绑定关键字 , 将一个特定的 Exchange 和一个特定的 Queue 绑定起来 .
Docker安装部署RabbitMQ
docker pull rabbitmq:management
注:注意获取镜像的时候要获取management版本的,不要获取last版本的,management版本的才带有管理界面
docker run - d \ -- name my - rabbitmq \ - p 5672 : 5672 - p 15672 : 15672 \ - v / home / rabbitmq : / var / lib / rabbitmq \ -- hostname my - rabbitmq - host \ - e RABBITMQ_DEFAULT_VHOST = my_vhost \ - e RABBITMQ_DEFAULT_USER = admin \ - e RABBITMQ_DEFAULT_PASS = admin \ -- restart = always \ rabbitmq : management--hostname :主机名 (RabbitMQ 的一个重要注意事项是它根据所谓的 “ 节点名称 ” 存储数据 , 默认为主机名 ) -e :指定环境变量 : RABBITMQ_DEFAULT_VHOST :默认虚拟机名 RABBITMQ_DEFAULT_USER :默认的用户名 RABBITMQ_DEFAULT_PASS :默认用户名的密码 容器启动后 , 可以通过 docker logs 容器 查看日志
docker logs my-rabbitmq
进入管理后台
http://ip:15672
springboot连接配置
配置账号
注:切记需要授权
springboot项目搭建
rabbitMQ - consumer - provider ( publisher )
所需依赖
< dependency > < groupId > org . springframework . boot </ groupId > < artifactId > spring - boot - starter - amqp </ artifactId > </ dependency >
生产者yml文件配置
server:
port: 8080
spring:
application:
name: provider
rabbitmq:
host: 192.168.133.138
password: 123456
port: 5672
username: springboot
virtual-host: my_vhost
消费者yml文件配置
server:
port: 8081
spring:
application:
name: consumer
rabbitmq:
host: 192.168.133.138
password: 123456
port: 5672
username: springboot
virtual-host: my_vhost
生产者 Provider
@Configuration
@SuppressWarnings("all")
public class RabbitConfig {
@Bean
public Queue firstQueue() {
return new Queue("firstQueue");
}
}
@Component
@SuppressWarnings("all")
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void sendFirst() {
rabbitTemplate.convertAndSend("firstQueue", "Hello World");
}
}
消费者 Consumer
@Component
@SuppressWarnings("all")
@Slf4j @RabbitListener(queues = "firstQueue")
public class Receiver {
@RabbitHandler
public void process(String msg) {
log.warn("接收到:" + msg);
}
}
完成自定义数据发送
@SuppressWarnings("all")
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class User{
private String username;
private String userpwd;
}
public void send(User user) {
rabbitTemplate.convertAndSend("first", user);
}
解决方式
:
实现序列化接口