文章目录
rabbitmq的虚拟主机就相当于是数据库中的库,一个项目应只访问一个虚拟主机。创建虚拟主机的时候应该以’/'开头。
引入的pom文件的依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
</dependencies>
使用的rabbitmq的版本是3.8.19。
1、第一种模式(直连)
生产者
ConnectionFactory factory = new ConnectionFactory();
//设置主机
factory.setHost("主机ip地址");
//做tcp通信的端口号
factory.setPort(5672);
//设置连接虚拟主机
factory.setVirtualHost("虚拟主机名");
//设置用户名和密码
factory.setUsername("用户名");
factory.setPassword("密码");
//获取连接对象
Connection connection = factory.newConnection();
//获取连接的通道
Channel channel = connection.createChannel();
//通道绑定对应的消息队列
//参数(按顺序):队列名(不存在自动创建)、定义队列特性是否要持久化,如果不持久化就会在rabbitmq重启的时候销毁队列并且队列中的消息也会消失,如果持久化了队列会保存下来但是消息也会丢失、是否独占队列(是否只允许当前的连接可用,其他的不可用)、是否在
//消费完成后(队列中没有消息)自动删除队列(但是还要消费者断开连接之后才会删除)、额外的参数
channel.queueDeclare("hello",false,false,false,null);
//参数(按顺序):交换机名称、队列名称、传递消息额外设置,如果为MessageProperties.PERSISTENT_TEXT_PLAIN就表示要持久化到消息队列中也就是即使rabbitmq重启也不会丢失消息、消息内容
channel.basicPublish("","hello",null,"hello".getBytes());
//注意:在通道帮到绑定消息队列的时候如果绑定的是aa,但是发送的时候还是hello,仍然是可以发送到hello的。
//同一个通道可以向不同队列发送消息
//关闭通道
channel.close();
connection.close();
如果设置持久化为true消费完成后删除也为true,重启rabbitmq后队列并没有被保存下来。
消费者
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("主机ip");
factory.setPort(5672);
factory.setVirtualHost("虚拟主机名");
factory.setUsername("用户名");
factory.setPassword("密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//生产者和消费之对队列的定义参数要对应
channel.queueDeclare("hello",false,false,false,null);
//消费消息
//参数(按顺序):消费那个队列的名称、开启消息的自动确认、消费时的回调接口
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override //参数body是从消息队列中取出的消息
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});
//如果不关闭连接,消费者将会一直监听。但是如果关闭连接的话可能会来不及进入那个方法处理就把连接关闭了,造成拿到了消息但是显示不出来,所以不建议关闭连接
channel.close();
connection.close();
消费者测试的时候应写一个main方法,不能使用@test的方式测试。
第二种模型(work quene)
当消费者速度慢的时候会造成消息队列堆积消息导致消息无法及时处理,使用任务模型让多个消费者绑定到一个队列,共同消费队列中的消息,队列中的消息一旦消费,就会消失,因此任务不会被重复执行。
在两个消费者同时消费一个队列时,rabbitmq默认会按顺序将每一个消息发给下一个消费者,每个消费者都会收到相同数量的消息,这种分发消息的方式叫做循环。如图:
平均分配的时候是把消费者1的消息一次性给消费者1,把消费者2的消息一次性给消费者2,如果一个消费者处理的速度慢的话不影响另一个消费者的速度,另一个消费者只消费自己的消息,如图:
但是当一个消费者在执行第二个消息的时候宕机了,那么分配给这个消费者的后面的消息就会丢失,这显然不是我们想要的。所以采用下面的方式解决这个问题:设置消费者一次只能消费一个消息,把消费者的自动确认关闭,当消息完成后手动确认。
消费者1
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("118.31.55.218");
factory.setPort(5672);
factory.setVirtualHost("/ems");
factory.setUsername("adm");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);//每一次只能消费一个消息
//生产者和消费之对队列的定义参数要对应
channel.queueDeclare("hello",true,false,false,null);
//解释一下开启消息的自动确认:就是只要消费者拿到消息就告诉rabbitmq消息已经执行完了,但是如果执行速度慢实际上是没有执行完,
//rabbitmq收到确认后就把队列中的消息标记为已消费。为false的时候是不会自动确认消息,即使这个消息被消费了在队列中标记还是未被消费。
channel.basicConsume("hello",false,new DefaultConsumer(channel){
@Override //参数body是从消息队列中取出的消息
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
//参数(按顺序):确认的是队列中的那个消息,是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
消费者2
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("118.31.55.218");
factory.setPort(5672);
factory.setVirtualHost("/ems");
factory.setUsername("adm");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);//每一次只能消费一个消息
//生产者和消费之对队列的定义参数要对应
channel.queueDeclare("hello",true,false,false,null);
//消费消息
//参数(按顺序):消费那个队列的名称、开启消息的自动确认、消费时的回调接口
channel.basicConsume("hello",false,new DefaultConsumer(channel){
@Override //参数body是从消息队列中取出的消息
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(new String(body));
//参数(按顺序):确认中的是队列中的那个消息,是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
第三种模型(fanout)
fanout也称为广播,fanout模型会将消息发送给所有与交换机绑定的队列。
生产者
ConnectionFactory factory = new ConnectionFactory();
//设置主机
factory.setHost("主机ip");
//做tcp通信的端口号
factory.setPort(5672);
//设置连接虚拟主机
factory.setVirtualHost("虚拟主机名");
//设置用户名和密码
factory.setUsername("用户名");
factory.setPassword("密码");
//获取连接对象
Connection connection = factory.newConnection();
//获取连接的通道
Channel channel = connection.createChannel();
//将通道声明指定交换机
//参数(按顺序):交换机名称(在交换机不存在的时候创建交换机)、交换机类型(fanout是广播)、
channel.exchangeDeclare("logs","fanout");
//参数(按顺序):交换机名称、路由key、消息持久化特性、消息
channel.basicPublish("logs","",null,"fanout".getBytes());
channel.close();
connection.close();
}
消费者
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("主机IP");
factory.setPort(5672);
factory.setVirtualHost("虚拟主机名");
factory.setUsername("用户名");
factory.setPassword("密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//通道绑定交换机
channel.exchangeDeclare("交换机名","fanout");
//创建临时队列(没消息自动删除)
String queue = channel.queueDeclare().getQueue();
//绑定交换机和队列,参数(按顺序):队列的名字、交换机的名字、路由key
//fanout的路由key没有什么效果
channel.queueBind(queue,"交换机名","");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});
消费者2和消费者1代码差不多。
第四种模型(Routing)
Routing模型可以让不同的消息被不同的队列消费.
Routing之订阅模型-Direct(直连):
1.队列与交换机的绑定需要指定一个Routingkey(路由key)
2.消息的发送方在向交换机发送消息时,也必须指定Routingkey
3.交换机不再把消息发送给每一个绑定的队列,而是根据消息的Routingkey进行判断,只有队列的Routingkey与消息的Routingkey完全一致队列才会收到消息
生产者
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("主机IP");
factory.setPort(5672);
factory.setVirtualHost("虚拟主机名");
factory.setUsername("用户名");
factory.setPassword("密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//通过通道声明交换机
//参数(按顺序):交换机名字、交换机类型(direct是路由模式)
channel.exchangeDeclare("direct","direct");
//发送消息
String Routingkey="error";
channel.basicPublish("direct",Routingkey,null,"direct类型".getBytes());
channel.close();
connection.close();
消费者1
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("主机IP");
factory.setPort(5672);
factory.setVirtualHost("虚拟主机名");
factory.setUsername("用户名");
factory.setPassword("密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//通道声明交换机以及交换的类型
channel.exchangeDeclare("direct","direct");
//创建临时队列(没消息自动删除)
String queue = channel.queueDeclare().getQueue();
//绑定交换机和队列,参数(按顺序):队列的名字、交换机的名字、路由key
channel.queueBind(queue,"direct","error");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+new String(body));
}
});
消费者2
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("主机IP");
factory.setPort(5672);
factory.setVirtualHost("虚拟主机名");
factory.setUsername("用户名");
factory.setPassword("密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//通道声明交换机以及交换的类型
channel.exchangeDeclare("direct","direct");
//创建临时队列(没消息自动删除)
String queue = channel.queueDeclare().getQueue();
//绑定交换机和队列,参数(按顺序):队列的名字、交换机的名字、路由key
channel.queueBind(queue,"direct","error");
channel.queueBind(queue,"direct","info");
channel.queueBind(queue,"direct","warning");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:"+new String(body));
}
});
此时消费者1和消费者2都能收到消息,当把生产者的的Routingkey改为info的时候,只有消费者2能收到消息。
Routing之订阅模型-Topic
directl类型的交换机在设置接受队列的路由key时只能设置固定Routingkey,当我们要设置很多Routingkey时会很麻烦,而topic类型的交换机可以解决这个问题,topic类型的交换机可以让队列绑定Routingkey的时候使用通配符!这种类型的Routingkey一般都是由多个单词组成的,多个单词之间使用"."分开。如:item.insert
注意:只用topic类型的交换机才能使用通配符
通配符:*:只能匹配一个单词(不是字母)
#:匹配0个或多个单词
生产者
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("主机IP");
factory.setPort(5672);
factory.setVirtualHost("虚拟主机名");
factory.setUsername("用户名");
factory.setPassword("密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明交换机以及交换机类型
channel.exchangeDeclare("topics","topic");
String Routingkey="user";
channel.basicPublish("topics",Routingkey,null,"topic类型".getBytes());
channel.close();
connection.close();
消费者1
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("主机IP");
factory.setPort(5672);
factory.setVirtualHost("虚拟主机名");
factory.setUsername("用户名");
factory.setPassword("密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//通道声明交换机以及交换的类型
channel.exchangeDeclare("topics","topic");
//创建临时队列(没消息自动删除)
String queue = channel.queueDeclare().getQueue();
//绑定交换机和队列,参数(按顺序):队列的名字、交换机的名字、路由key
channel.queueBind(queue,"topics","user.*");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+new String(body));
}
});
消费者2
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("主机IP");
factory.setPort(5672);
factory.setVirtualHost("虚拟主机名");
factory.setUsername("用户名");
factory.setPassword("密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//通道声明交换机以及交换的类型
channel.exchangeDeclare("topics","topic");
//创建临时队列(没消息自动删除)
String queue = channel.queueDeclare().getQueue();
//绑定交换机和队列,参数(按顺序):队列的名字、交换机的名字、路由key
channel.queueBind(queue,"topics","user.#");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:"+new String(body));
}
});
运行结果:消费者2能收到消息,消费者1不能收到消息。如果把生产者的Routingkey改成user.info那么消费者1和消费者2都能收到消息。
SpringBoot中使用RabbitMQ
pom文件
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
yml文件
spring:
rabbitmq:
host: 主机名
port: 5672
username: 用户名
password: 密码
virtual-host: 虚拟主机
使用RabbitTemplate用来简化操作,只要配置完之后,这个对象就实例化好了,使用时直接在项目中注入。
第一种模型
生产者:
@org.junit.jupiter.api.Test
public void fir(){
//参数(按顺序):队列名称、消息
rabbitTemplate.convertAndSend("hello","First Hello World");
//执行完之后不会创建队列,因为没有消费者。必须有消费者才会创建队列
}
消费者:
@Component
@RabbitListener(queuesToDeclare = @Queue("hello")) //代表是rabbitmq的一个消费者.监听hello队列。可以在@Queue里面设置是否独占、是否自动删除.默认是持久化非独占不自动删除
public class TestRabbitmq {
@RabbitHandler //代表收到消息后的回调方法
public void receive(String msg){
System.out.println(msg);
}
}
第二种模型
生产者
@org.junit.jupiter.api.Test
public void sec(){
for(int i=0;i<10;i++){
rabbitTemplate.convertAndSend("work","第二种模型"+i);
}
}
消费者
@Component
public class Two {
@RabbitListener(queuesToDeclare = @Queue("work")) //代表下面这个方法会处理所监听队列的回调
public void receive1(String msg){
System.out.println("msg1"+msg);
}
@RabbitListener(queuesToDeclare = @Queue("work")) //代表下面这个方法会处理所监听队列的回调
public void receive2(String msg){
System.out.println("msg2"+msg);
}
}
但是这种消费是轮询的,如图
第三种模型
生产者
@org.junit.jupiter.api.Test
public void fanout(){
//参数:交换机名、路由key、消息
rabbitTemplate.convertAndSend("logs","","第三种模型");
}
消费者
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,//创建临时队列
exchange = @Exchange(value = "logs",type = "fanout")//绑定交换机
)
})
public void receive1(String msg){
System.out.println("msg1"+msg);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,//创建临时队列
exchange = @Exchange(value = "logs",type = "fanout")//绑定交换机
)
})
public void receive2(String msg){
System.out.println("msg2"+msg);
}
第四种模型
生产者
@org.junit.jupiter.api.Test
public void route(){
//参数:交换机名、路由key、消息
rabbitTemplate.convertAndSend("direct","info","第四种模型");
}
消费者
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "direct",type = "direct"),
key = {"info","error","warn"}
)
})
public void receive1(String msg){
System.out.println("msg1"+msg);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "direct",type = "direct"),
key = {"info"}
)
})
public void receive2(String msg){
System.out.println("msg2"+msg);
}
第五种模型
生产者
@org.junit.jupiter.api.Test
public void topic(){
//参数:交换机名、路由key、消息
rabbitTemplate.convertAndSend("topic","user.save","第五种模型");
}
消费者
@Component
public class Topic {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(type = "topic",name = "topic"),
key = {"user.*","user.save"}
)
})
public void receive1(String msg){
System.out.println("msg1"+msg);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(type = "topic",name = "topic"),
key = {"order.#","produce.#","user.*"}
)
})
public void receive2(String msg){
System.out.println("msg2"+msg);
}
}