1.什么是RabbitMQ?
RabbitMQ是一款使用Erlang语言开发的,基于AMQP协议的消息中间件,作为一款优秀的消息系统,RabbitMQ有高并发、可扩展等优势,并适用于大型系统中各个模块之间的通信。
RabbitMQ的特点为:
持久化、传输确认、发布确认等功能保证消息可靠
支持多种消息分发模式,处理更加灵活
提供可视化管理界面,使用方便
支持集群部署,保证服务高可用
2.RabbitMQ环境搭建
version: '3'
services:
rabbitmq:
image: registry.cn-hangzhou.aliyuncs.com/zhengqing/rabbitmq:3.7.8-management # 原镜像`rabbitmq:3.7.8-management` 【 注:该版本包含了web控制页面 】
container_name: rabbitmq # 容器名为'rabbitmq'
hostname: my-rabbit
restart: unless-stopped # 指定容器退出后的重启策略为始终重启,但是不考虑在Docker守护进程启动时就已经停止了的容器
environment: # 设置环境变量,相当于docker run命令中的-e
TZ: Asia/Shanghai
LANG: en_US.UTF-8
RABBITMQ_DEFAULT_VHOST: my_vhost # 主机名
RABBITMQ_DEFAULT_USER: admin # 登录账号
RABBITMQ_DEFAULT_PASS: admin # 登录密码
volumes: # 数据卷挂载路径设置,将本机目录映射到容器目录
- "./rabbitmq/data:/var/lib/rabbitmq"
ports: # 映射端口
- "5672:5672"
- "15672:15672"
运行
docker-compose -f docker-compose-rabbitmq.yml -p rabbitmq up -d
web管理端:http://127.0.0.1:15672
登录账号密码:admin/admin
3.代码工程
实验目的:实现通过rabbitmq发送和接收消息
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springboot-demo</artifactId>
<groupId>com.et</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rabbitmq</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>
application.properties
server.port=8088
#rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=my_vahost
config
简单使用
package com.et.rabbitmq.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public Queue Queue() {
return new Queue("hello");
}
}
topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key*的绑定不同的队列 首先对topic规则配置,这里使用两个队列来测试
package com.et.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicRabbitConfig {
public final static String TOPIC_ONE = "topic.one";
public final static String TOPIC_TWO = "topic.two";
public final static String TOPIC_EXCHANGE = "topicExchange";
@Bean
public Queue queue_one(){
return new Queue(TOPIC_ONE);
}
@Bean
public Queue queue_two(){
return new Queue(TOPIC_TWO);
}
@Bean
TopicExchange exchange(){
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
Binding bindingExchangeOne(Queue queue_one, TopicExchange exchange){
return BindingBuilder.bind(queue_one).to(exchange).with("topic.one");
}
@Bean
Binding bindingExchangeTwo(Queue queue_two, TopicExchange exchange){
//# 表示零个或多个词
//* 表示一个词
return BindingBuilder.bind(queue_two).to(exchange).with("topic.#");
}
}
Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
package com.et.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutRabbitConfig {
@Bean
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
}
receiver
package com.et.rabbitmq.receiver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class HelloReceiver {
@RabbitListener(queues = "hello")
public void process(String hello) {
System.out.println("Receiver : " + hello);
}
@RabbitListener(queues = {"topic.one"})
public void receiveTopic1(@Payload String fileBody) {
log.info("topic1:" + fileBody);
}
@RabbitListener(queues = {"topic.two"})
public void receiveTopic2(@Payload String fileBody) {
log.info("topic2:" + fileBody);
}
@RabbitListener(queues = {"fanout.A"})
public void fanoutA(@Payload String fileBody) {
log.info("fanoutA:" + fileBody);
}
@RabbitListener(queues = {"fanout.B"})
public void fanoutB(@Payload String fileBody) {
log.info("fanoutB:" + fileBody);
}
@RabbitListener(queues = {"fanout.C"})
public void fanoutC(@Payload String fileBody) {
log.info("fanoutC:" + fileBody);
}
}
sender
package com.et.rabbitmq.sender;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class HelloSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hello " + new Date();
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("hello", context);
}
}
package com.et.rabbitmq.sender;
import com.et.rabbitmq.config.TopicRabbitConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TopicSender {
@Autowired
private AmqpTemplate rabbitTemplate;
//两个消息接受者都可以收到
public void send_one() {
String context = "Hi, I am message one";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,"topic.one",context);
}
//只有TopicReceiverTwo都可以收到
public void send_two() {
String context = "Hi, I am message two";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,"topic.two",context);
}
}
DemoApplication.java
package com.et.quartz;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
以上只是一些关键代码,所有代码请参见下面代码仓库
代码仓库
https://github.com/Harries/springboot-demo
4.测试
简单使用
@Test
public void hello() throws Exception {
helloSender.send();
Thread.sleep(50000);
}
Topic Exchange
@Test
public void topicOne() throws Exception {
topicSender.send_one();
Thread.sleep(50000);
}
@Test
public void topicTwo() throws Exception {
topicSender.send_two();
Thread.sleep(50000);
}
Fanout Exchange
@Test
public void sendFanout() throws InterruptedException {
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
Thread.sleep(50000);
}
5.参考连接
https://www.rabbitmq.com/
https://spring.io/projects/spring-amqp