RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:https://www.rabbitmq.com/
部署
1。拉取安装包
docker pull rabbitmq:3-management
2.加载镜像
docker load -i mq.tar
3.安装rabbitMQ
docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
15672是浏览器直接访问的管理员地址
5672是代码访问rabbitmq的地址
RabbitMQ中的几个概念: channel:操作MQ的工具 ; exchange:路由消息到队列中 ; queue:缓存消息 ; virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组
MQ的官方文档中给出了5个MQ的Demo示例,对应了几种不同的用法:
基本消息队列(BasicQueue) 工作消息队列(WorkQueue)
发布订阅(Publish、Subscribe),又根据交换机类型不同分为三种: Fanout Exchange:广播 Direct Exchange:路由 Topic Exchange:主题
SpringAmqp的官方地址:https://spring.io/projects/spring-amqp
开发步骤
1.导包
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.加入配置文件
spring:
rabbitmq:
host: 192.168.190.129 # rabbitMQ的ip地址
port: 5672 # 端口
username: itcast
password: 123321
virtual-host: /
3.开发代码
Basic Queue 简单队列模型
生产者:
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2SimpleQueue() {
String queueName = "simple.queue";
String message = "hello, spring amqp!";
rabbitTemplate.convertAndSend(queueName, message);
}
消费者:
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
}
Work Queue 工作队列模型
生产者:
@Test
public void testSendMessage2WorkQueue() throws InterruptedException {
String queueName = "simple.queue";
String message = "hello, message__";
for (int i = 1; i <= 50; i++) {
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
消费者
@Test
public void testSendMessage2WorkQueue() throws InterruptedException {
String queueName = "simple.queue";
String message = "hello, message__";
for (int i = 1; i <= 50; i++) {
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
@Test
public void testSendFanoutExchange() {
// 交换机名称
String exchangeName = "itcast.fanout";
// 消息
String message = "hello, every one!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
注意:此时要在yml文件中增加配置
spring:
rabbitmq:
host: 192.168.190.129 # rabbitMQ的ip地址
port: 5672 # 端口
username: itcast
password: 123321
virtual-host: /
listener:
simple:
prefetch: 1
如果不加listener属性,多个消费者绑定同一个队列则会类似轮询机制,浪费服务器资源,加上后,处理完成才能获取下一个消息(类似能者多劳);
发布、订阅模型-Fanout
@Test
public void testSendFanoutExchange() {
// 交换机名称
String exchangeName = "itcast.fanout";
// 消息
String message = "hello, every one!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者接收到fanout.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者接收到fanout.queue2的消息:【" + msg + "】");
}
当前模式为广播模式,只要是绑定当前队列都会收到消息。
发布、订阅模型-Direct
@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "itcast.direct";
// 消息
String message = "hello, red!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}
只有和RoutingKey相同的才能接受到消息
发布、订阅模型-Topic
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "itcast.topic";
// 消息
String message = "今天天气不错,我的心情好极了!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}
主题模式,支持表达式。
#:代指0个或多个单词 *:代指一个单词
消息转换器
默认spring使用jkd的消息转换器,传递复杂类型时会被序列化
如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化,步骤如下:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
同时在代码中声明
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
spring在启动过程中自动会加载当前bean,替换默认的。
部署mq时,可能出现的问题
docker启动rabbitmq报错
WARNING: IPv4 forwarding is disabled. Networking will not work.
解决方法:
docker stop 镜像id
sudo docker rm 镜像id
systemctl restart network && systemctl restart docker
执行完上面的命令后,在重新启动