Spring Boot 集成 RabbitMQ的几种常见用法
前言
Spring Boot 集成 RabbitMQ 非常简单,如果只是简单的使用配置非常少,Spring Boot 提供了spring-boot-starter-amqp
项目对消息各种支持。
引入依赖
compile group: 'org.springframework.boot', name: 'spring-boot-starter-amqp'
添加配置文件yml
server:
port: 8001
spring:
application:
name: zoo-plus-rabbitmq
rabbitmq:
virtual-host: /
host: localhost
port: 5672
username: guest
password: guest
直接通过队列(单生-单消)
我们需要先配置队列
/**
* @author: 谢飞
*/
@Configuration
public class RabbitConfig {
@Bean
public Queue helloQueue() {
return new Queue("hello-queue");
// return QueueBuilder.durable("hello-queue").build();
}
}
配置队列时我们可以直接new一个队列,也可以直接用链式编程构建一个队列。
配置消费者
/**
* @author: 谢飞
*/
@Slf4j
@Component
public class Consumer {
@RabbitListener(queues = {"hello-queue"})
public void helloQueue(Message message, Channel channel) {
log.info("-----------------hello-queue消费:" + new String(message.getBody()));
}
}
编写测试类测试
/**
* @author: 谢飞
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestRabbitMQ {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送到hello-queue
*/
@Test
public void sendHelloQueue() {
rabbitTemplate.convertAndSend("hello-queue", "这是发送到hello队列的消息");
}
}
然后我们运行测试方法,发现控制台已经打印消费端打印的信息了。
通过交换机(Exchange)发送到队列
通过fanout类型交换机发送
1.编写配置
/**
* @author: 谢飞
*/
@Configuration
public class RabbitConfig {
@Bean
public Queue helloQueue() {
return QueueBuilder.durable("hello-queue").build();
}
@Bean
public Queue helloQueue2() {
return new Queue("hello-queue2");
}
/**
* Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout转发器发送消息,绑定了这个转发器的所有队列都收到这个消息。
*/
@Bean
FanoutExchange helloFanoutExchange() {
return ExchangeBuilder.fanoutExchange("hello-fanout-exchange").build();
}
/**
* 绑定hello-queue队列到hello-fanout-exchange交换机
*/
@Bean
Binding bindingHello() {
return BindingBuilder.bind(helloQueue()).to(helloFanoutExchange());
}
/**
* 绑定hello-queue2队列到hello-fanout-exchange交换机
*/
@Bean
Binding bindingHello2() {
return BindingBuilder.bind(helloQueue2()).to(helloFanoutExchange());
}
}
2.监听类:
/**
* @author: 谢飞
*/
@Slf4j
@Component
public class Consumer {
@RabbitListener(queues = {"hello-queue"})
public void helloQueue(Message message, Channel channel) {
log.info("-----------------hello-queue消费:" + new String(message.getBody()));
}
@RabbitListener(queues = {"hello-queue2"})
public void helloQueue2(Message message, Channel channel) {
log.info("-----------------hello-queue2消费:" + new String(message.getBody()));
}
}
3.测试类
/**
* 发送到hello-fanout-exchange交换机
* 注意:fanout这种模式与routingKey无关,消息发送到与这个交换机绑定的队列
*/
@Test
public void sendFanoutExchange() {
rabbitTemplate.convertAndSend("hello-fanout-exchange", "", "这是发送到hello-fanout-exchange交换机的消息");
}
通过topic类型交换机发送
topic 是 RabbitMQ 中最灵活的一种方式,可以根据 routing_key *的绑定不同的队列
1.编写配置
/**
* @author: 谢飞
*/
@Configuration
public class RabbitConfig {
@Bean
public Queue helloQueue3() {
return new Queue("hello-queue2-topic");
}
////////////////////////////////topic////////////////////////////////////
/**
* 创建Topic Exchange交换机也叫通配符交换机
* Topic Exchange主要有两种通配符:# 和 *
* *(星号):可以(只能)匹配一个单词
* #(井号):可以匹配多个单词(或者零个)
*/
@Bean
TopicExchange helloTopicExchange() {
return ExchangeBuilder.topicExchange("hello-topic-exchange").build();
}
/**
* 绑定hello-queue3队列到hello-topic-exchange交换机
*/
@Bean
Binding bindingHello3() {
return BindingBuilder.bind(helloQueue3()).to(helloTopicExchange()).with("hello.#");
}
}
2.消费端
@RabbitListener(queues = {"hello-queue2-topic"})
public void helloQueue3(Message message, Channel channel) {
log.info("-----------------hello-queue3消费:" + new String(message.getBody()));
}
3.测试段
/**
* 发送到hello-topic-exchange交换机
*/
@Test
public void sendTopicExchange() {
rabbitTemplate.convertAndSend("hello-topic-exchange", "hello.123", "这是发送到hello-topic-exchange交换机的消息");
}
对象的支持(实体必须序列化)
实体:
/**
* @author: 谢飞
*/
@Data
public class Order implements Serializable {
private Long id;
private String orderId;
}
消费者:
@RabbitListener(queues = {"order-queue"})
public void orderQueue(Order order) {
log.info("-----------------order-queue消费:" + order);
}
测试类:
/**
* 发送Order
*/
@Test
public void sendOrder() {
Order order = new Order();
order.setId(1L);
order.setOrderId("123123123123");
rabbitTemplate.convertAndSend("order-queue", order);
}
使用注解的方式
使用注解的方式不需要提前声明配置,只需要在注解里面配置好就行,项目启动自动加载。
消费者:
/**
* 通过注解
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue-1", declare = "true"),
exchange = @Exchange(value = "exchange-1", durable = "true", type = "topic", ignoreDeclarationExceptions = "true"),
key = "springboot.*"
))
@RabbitHandler
public void onMessage(Message message, Channel channel) throws IOException {
System.out.println("---------------------------------------------");
System.out.println("收到消息:" + new String(message.getBody()));
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false);
}
/**
* 传递实体
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue-2", declare = "true"),
exchange = @Exchange(value = "exchange-2", durable = "true", type = "topic", ignoreDeclarationExceptions = "true"),
key = "springboot.*"
))
@RabbitHandler
public void onOrderMessage(@Payload Order order, Channel channel, @Headers Map<String, Object> headers) throws IOException {
System.out.println("---------------------------------------------");
System.out.println("收到消息:" + order);
long deliveryTag = (long) headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
}
生产者:
/**
* 使用注解
*/
@GetMapping("queue1")
public Resp queue1() {
amqpTemplate.convertAndSend("exchange-1", "springboot.abc", "测试queue1 使用注解");
return Resp.success("ok", null);
}
/**
* 发送实体
*/
@GetMapping("queue2")
public Resp queue2() {
Order order = new Order();
order.setId(1L);
amqpTemplate.convertAndSend("exchange-2", "springboot.abc", order);
return Resp.success("ok", null);
}
总结:在springboot中使用rabbitMQ十分方便,大致的简单用法就这些,后续我们将学习到更多rabbitMQ的高级用法。
源码地址:https://gitee.com/zoo-plus/springboot-learn/tree/2.x/springboot-middleware/rabbitmq
NPException 发布了474 篇原创文章 · 获赞 142 · 访问量 56万+ 关注