spring-rabbit
2,配置application.yml配置文件
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC
username: root
password: 1234
rabbitmq:
host: 101.201.101.206
username: guest
password: guest
publisher-confirms: true # 开启Rabbitmq发送消息确认机制,发送消息到队列并触发回调方法
publisher-returns: true
listener:
simple:
concurrency: 10 #消费者数量
max-concurrency: 10 #最大消费者数量
prefetch: 1 #限流(消费者每次从队列获取的消息数量)
auto-startup: true #启动时自动启动容器
acknowledge-mode: manual #开启ACK手动确认模式
3,RabbitConfig配置类
1, 定义消息转换实例 ,转化成 JSON
传输
2 , 配置启用rabbitmq
事务
package com.aaa.springredis.controller;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class RabbitConfig {
/**
-
定义消息转换实例 ,转化成 JSON传输
-
@return Jackson2JsonMessageConverter
*/
@Bean
public MessageConverter integrationEventMessageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
-
配置启用rabbitmq事务
-
@param connectionFactory connectionFactory
-
@return RabbitTransactionManager
*/
@Bean
public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
}
4,初始化rabbitmq的回调函数
说明:被@PostConstruct
修饰的方法会在服务器加载Servlet
的时候运行,并且只会被服务器执行一次。如果想在生成对象时完成某些初始化操作,而偏偏这些初始化操作又依赖于依赖注入,那么久无法在构造函数中实现。为此,可以使用@PostConstruct
注解一个方法来完成初始化,@PostConstruct
注解的方法将会在依赖注入完成后被自动调用。
回调函数的使用前提是配置文件中开启了rabitmq
消息确认机制
Constructor >> @Autowired >> @PostConstruct
@Autowired
RabbitTemplate rabbitTemplate;
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitController.class);
@PostConstruct
private void
《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》
【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享
init(){
/**
-
消息发送到交换器Exchange后触发回调。
-
使用该功能需要开启确认,spring-boot中配置如下:
-
spring.rabbitmq.publisher-confirms = true
*/
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
if (b) {
System.out.println(“消息已确认 cause:{”+s+"} - {"+correlationData+"}");
} else {
System.out.println(“消息未确认 cause:{”+s+"} - {"+correlationData+"}");
}
}
});
/**
-
通过实现ReturnCallback接口,
-
如果消息从交换器发送到对应队列失败时触发
-
比如根据发送消息时指定的routingKey找不到队列时会触发
-
使用该功能需要开启确认,spring-boot中配置如下:
-
spring.rabbitmq.publisher-returns = true
*/
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
LOGGER.error(“消息被退回:{}”, message);
LOGGER.error(“消息使用的交换机:{}”, exchange);
LOGGER.error(“消息使用的路由键:{}”, routingKey);
LOGGER.error(“描述:{}”, replyText);
}
});
}
三,开始测试
1,写抢单测试类
写抢单测试类,我们使用jweter
压力测试工具开启1000个线程进行测试(开启多线程并发测试),所以为了区别每一个模拟的用户,使用userId
累加的方式进行区分。
private int userId=0;
//开始抢单
@RequestMapping("/begin")
@ResponseBody
public void begin(){
userId++;
this.send(userId);
}
而上面的send
方法就是把接收到的用户请求发送到rabbitmq
消息中间件中。
@RequestMapping("/send")
@ResponseBody
public String send(Integer messge){
//第一个参数:交换机名字 第二个参数:Routing Key的值 第三个参数:传递的消息对象
rabbitTemplate.convertAndSend(“test.direct”,“test”,messge);
return “发送消息成功”;
}
2,配置rabbitmq监听方法
rabbitmq
监听上篇文章也说过了,作用就是监听指定队列中收到来自交换机的消息,来一条收一条,收完为止!
通过ACK
确认是否被正确接收,每个Message
都要被确认,可以手动去 ACK
或自动ACK
,如果信息消费失败的话会拒绝当前消息,并把消息返回原队列。
从队列中收到用户的userId
,然后进行购买商品模拟操作(减少一个库存,新增一条购买记录)