阅读目录
一、获取镜像
#指定版本,该版本包含了web控制页面
docker pull rabbitmq:management
二、运行镜像
#方式一:默认guest 用户,密码也是 guest
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
#方式二:设置用户名和密码
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management
三、访问ui页面
http://localhost:15672/
四、Spring boot连接rabbitmq案例
- 创建完后的pom.xml文件为
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xuan</groupId>
<artifactId>springrabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>springrabbitmq</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-freemarker</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.配置运行环境
# Tomcat
server:
tomcat:
uri-encoding: UTF-8
max-threads: 1000
min-spare-threads: 30
port: 8070
servlet:
context-path: /rabbitmq
spring:
servlet:
multipart:
max-file-size: 100MB
max-request-size: 100MB
enabled: true
freemarker:
suffix: .html
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123456
virtual-host: /testmq
listener:
simple:
#acknowledge-mode: manual #设置确认模式手工确认
concurrency: 3 #消费者最小数量
max-concurrency: 10 # 消费者最大数量
3.这点的rabbitmq配置也可以通过RabbitConfig.java类来配置:
package com.xuan.springrabbitmq.config;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//连接rabbitMQ的基本配置
@Configuration
@EnableRabbit
public class RabbitConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/testmq");
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
//配置消费者监听的容器
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
//factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置确认模式手工确认
return factory;
}
}
4.配置路由和通道
package com.xuan.springrabbitmq.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//生产者消费者模式的配置,包括一个队列和两个对应的消费者
@Configuration
public class ProducerConsumerConfig {
@Bean
public Queue myQueue() {
Queue queue = new Queue("myqueue");
return queue;
}
}
5.配置订阅发布模式PublishSubscribeConfig.java
package com.xuan.springrabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//发布订阅模式的配置,包括两个队列和对应的订阅者,发布者的交换机类型使用fanout(子网广播),两根网线binding用来绑定队列到交换机
@Configuration
public class PublishSubscribeConfig {
@Bean
public Queue myQueue1() {
Queue queue = new Queue("queue1");
return queue;
}
@Bean
public Queue myQueue2() {
Queue queue = new Queue("queue2");
return queue;
}
@Bean
public FanoutExchange fanoutExchange() {
FanoutExchange fanoutExchange = new FanoutExchange("fanout");
return fanoutExchange;
}
@Bean
public Binding binding1() {
Binding binding = BindingBuilder.bind(myQueue1()).to(fanoutExchange());
return binding;
}
@Bean
public Binding binding2() {
Binding binding = BindingBuilder.bind(myQueue2()).to(fanoutExchange());
return binding;
}
}
5.配置direct直连模式DirectExchangeConfig.java
package com.xuan.springrabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//direct直连模式的交换机配置,包括一个direct交换机,两个队列,三根网线binding
@Configuration
public class DirectExchangeConfig {
@Bean
public DirectExchange directExchange() {
DirectExchange directExchange = new DirectExchange("direct");
return directExchange;
}
@Bean
public Queue directQueue1() {
Queue queue = new Queue("directqueue1");
return queue;
}
@Bean
public Queue directQueue2() {
Queue queue = new Queue("directqueue2");
return queue;
}
//3个binding将交换机和相应队列连起来
@Bean
public Binding bindingorange() {
Binding binding = BindingBuilder.bind(directQueue1()).to(directExchange()).with("orange");
return binding;
}
@Bean
public Binding bindingblack() {
Binding binding = BindingBuilder.bind(directQueue2()).to(directExchange()).with("black");
return binding;
}
@Bean
public Binding bindinggreen() {
Binding binding = BindingBuilder.bind(directQueue2()).to(directExchange()).with("green");
return binding;
}
}
6.配置topic交换机模型TopicExchangeConfig.java
package com.xuan.springrabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//topic交换机模型,需要一个topic交换机,两个队列和三个binding
@Configuration
public class TopicExchangeConfig {
@Bean
public TopicExchange topicExchange(){
TopicExchange topicExchange=new TopicExchange("mytopic");
return topicExchange;
}
@Bean
public Queue topicQueue1() {
Queue queue=new Queue("topicqueue1");
return queue;
}
@Bean
public Queue topicQueue2() {
Queue queue=new Queue("topicqueue2");
return queue;
}
//3个binding将交换机和相应队列连起来
@Bean
public Binding bindingtopic1(){
Binding binding= BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("*.orange.*");//binding key
return binding;
}
@Bean
public Binding bindingtopic2(){
Binding binding= BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("*.*.rabbit");
return binding;
}
@Bean
public Binding bindingtopic3(){
Binding binding= BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("lazy.#");//#表示0个或若干个关键字,*表示一个关键字
return binding;
}
}
7.定义发送的消息Mail.java
package po;
import java.io.Serializable;
public class Mail implements Serializable {
private static final long serialVersionUID = -8140693840257585779L;
private String mailId;
private String country;
private Double weight;
public Mail() {
}
public Mail(String mailId, String country, double weight) {
this.mailId = mailId;
this.country = country;
this.weight = weight;
}
public String getMailId() {
return mailId;
}
public void setMailId(String mailId) {
this.mailId = mailId;
}
public String getCountry() {
return country;
}
public void setCountry(String country) {
this.country = country;
}
public double getWeight() {
return weight;
}
public void setWeight(double weight) {
this.weight = weight;
}
@Override
public String toString() {
return "Mail [mailId=" + mailId + ", country=" + country + ", weight="
+ weight + "]";
}
}
8.继承的消息TopicMail.java
package po;
public class TopicMail extends Mail {
String routingkey;
public String getRoutingkey() {
return routingkey;
}
public void setRoutingkey(String routingkey) {
this.routingkey = routingkey;
}
@Override
public String toString() {
return "TopicMail [routingkey=" + routingkey + "]";
}
}
9.定义发送接口的实现ProducerImpl.java
package com.xuan.springrabbitmq.service.impl;
import com.xuan.springrabbitmq.service.Producer;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import po.Mail;
@Transactional
@Service("producer")
public class ProducerImpl implements Producer {
@Autowired
RabbitTemplate rabbitTemplate;
public void sendMail(String queue, Mail mail) {
rabbitTemplate.setQueue(queue);
rabbitTemplate.convertAndSend(queue,mail);
}
}
10.订阅发布时的发送消息实现PublisherImpl.java
package com.xuan.springrabbitmq.service.impl;
import po.Mail;
import com.xuan.springrabbitmq.service.Publisher;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service("publisher")
public class PublisherImpl implements Publisher {
@Autowired
RabbitTemplate rabbitTemplate;
public void publishMail(Mail mail) {
rabbitTemplate.convertAndSend("fanout", "", mail);
}
public void senddirectMail(Mail mail, String routingkey) {
rabbitTemplate.convertAndSend("direct", routingkey, mail);
}
public void sendtopicMail(Mail mail, String routingkey) {
rabbitTemplate.convertAndSend("mytopic", routingkey, mail);
}
}
11.消费者的实现代码QueueListener1.java
package com.xuan.springrabbitmq.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import po.Mail;
@Component
@RabbitListener(queues = "myqueue")
public class QueueListener1 {
@RabbitHandler
public void displayMail(Mail mail, Channel channel, Message message) throws Exception {
System.out.println("队列监听器1号收到消息" + mail.toString());
//channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//如果需要确认的要调用
}
}
12.或者QueueListener2.java
package com.xuan.springrabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import po.Mail;
@Component
public class QueueListener2 {
@RabbitListener(queues = "myqueue")
public void displayMail(Mail mail) throws Exception {
System.out.println("队列监听器2号收到消息"+mail.toString());
}
}