RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。
一、消息队列
1、MQ:Message Queue释义
服务之间最常见的通信方式是直接调用彼此来通信,消息从一端发出后立即就可以达到另一端,称为即时消息通讯(同步通信)
消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送给另一端,称为延迟消息通讯(异步通信)
2、消息队列特点
- 解耦:使用了消息队列后,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦
- 提速:发收消息速度增加
- 广播:只要发一次消息
- 错峰与流控:流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。
二、RabbitMQ的结构
1、消息队列协议-----AMQP
一个提供统一消息服务的应用层标准高级消息队列协议,是一个通用的应用层协议
消息发送与接受的双方遵守这个协议可以实现异步通讯.这个协议约定了消息的格式和工作方式.
2、技术选型
3、RabbitMQ
4、 结构初识
- Server(Broker):接收客户端连接,实现AMQP协议的消息队列和路由功能的进程.
- Virtual Host:虚拟主机的概念,类似权限控制组,一个Virtual Host里可以有ExchangeQueue.
- Exchange:交换机,接收生产者发送的消息,并根据Routing Key将消息路由到服务器中的队列Queue.
- ExchangeType:交换机类型决定了路由消息行为,RabbitMQ中有三种类型Exchange,分别是fanout、direct、topic.
- Message Queue:消息队列,用于存储还未被消费者消费的消息.
- Message:由Header和body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、优先级是多少、由哪个Message Queue接收等.body是真正需要发送的数据内容.
- BindingKey:绑定关键字,将一个特定的Exchange和一个特定的Queue绑定起来.
三、Docker中部署RabbitMQ
1、拉取镜像
docker pull rabbitmq:management
*注意获取镜像的时候要获取management版本的,不要获取last版本的,management版本的才带有管理界面
2、 运行
docker run -d \
--name my-rabbitmq \
-p 5672:5672 -p 15672:15672 \
-v /home/rabbitmq:/var/lib/rabbitmq \
--hostname my-rabbitmq-host \
-e RABBITMQ_DEFAULT_VHOST=my_vhost \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--restart=always \
rabbitmq:management
--hostname:主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名)
-e:指定环境变量:
RABBITMQ_DEFAULT_VHOST:默认虚拟机名
RABBITMQ_DEFAULT_USER:默认的用户名
RABBITMQ_DEFAULT_PASS:默认用户名的密码
3、springboot连接配置
- 配置账号
*切记需要授权
四、搭建rabbitmq项目
1、需要建立生产者和消费者
生产者和消费者需要继承父类
2、父类导入amqp依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3、yml文件配置
server: port: 8080 spring: application: name: xx rabbitmq: host: 192.168.44.138 password: 123456 port: 5672 username: springboot virtual-host: my_vhost
4、生产者 Provider
package com.xhy.provider; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @SuppressWarnings("all") public class RabbitConfig { @Bean public Queue firstQueue() { return new Queue("firstQueue"); } }
在测试类里启动,就会新建一个firstQueue:
5.创建Sender类
模拟队列中的发送消息
package com.xhy.provider; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component @SuppressWarnings("all") public class Sender { @Autowired private AmqpTemplate rabbitTemplate; public void sendFirst() { rabbitTemplate.convertAndSend("firstQueue", "Hello World"); } }
6、在测试类调用senderFirst方法
模拟消费者下单后给生产者发送消息
package com.xhy.provider; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class ProviderApplicationTests { @Autowired private Sender sender; @Test void contextLoads() { sender.sendFirst(); } }
7.此时RabbitMQ内出现一条消息队列
8. 在consumer的yml文件中配置相关信息
server: port: 8081 spring: application: name: xx rabbitmq: host: 192.168.44.138 password: 123456 port: 5672 username: springboot virtual-host: my_vhost
注意:端口号与provider不一致
9.在consumer中创建Receiver类
模拟生产者接收消息
@RabbitListener表示监听消息队列
package com.xhy.consumer; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; @SuppressWarnings("all") @Slf4j @RabbitListener(queues = "firstQueue") public class Receiver { @RabbitHandler public void process(String msg) { log.warn("接收到:" + msg); } }
10.运行consumer模块就可以接收消息
11.为了使消息变成动态的,写一个用户类
package com.xhy.consumer.provider; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.Setter; @SuppressWarnings("all") @Data @Setter @AllArgsConstructor @NoArgsConstructor public class User{ private String username; private String userpwd; }
12.Sender
package com.xhy.provider; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component @SuppressWarnings("all") public class Sender { @Autowired private AmqpTemplate rabbitTemplate; public void sendFirst() { rabbitTemplate.convertAndSend("firstQueue", "Hello World"); } public void sendFirst(User user) { rabbitTemplate.convertAndSend("firstQueue", user); } public void sendFirst(String json) { rabbitTemplate.convertAndSend("firstQueue", json); } }
13.
package com.xhy.provider; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.SneakyThrows; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class ProviderApplicationTests { @Autowired private Sender sender; @Test @SneakyThrows void contextLoads() { User user=new User("aa","bb"); ObjectMapper mapper=new ObjectMapper(); sender.sendFirst(mapper.writeValueAsString(user)); } }
14.運行