SpirngBoot(2.16)自动配置RabbitMQ
- RabbitAutoConfiguration
- RabbitProperties封装了rabbit的配置
- 自动配置ConnectionFactory
- 提供RabbitTemplate,操作模板
- AmqpAdmin:rabbitMQ系统管理组件,如声明一个队列
上图实现了RabbitMQ的自动配置
对于springboot 1.x版本
- 使用AmqpAdmin管理交换器 队列 绑定等
- 发送一个User对象到消息(User 实现序列化接口)
- 自定义消息的序列化方式(json展示)
- 使用EnableRabbit开启注解支持
-
使用 @RabbitListener监听消息来消费
- 直接使用具体的类接收消息
- 使用Message接收再从中取出消息体
- 或者一起使用
springboot2.16版本
- 使用配置类创建交换器,并绑定队列 指定路由。
对于FanoutExchange,不需要指定RouteKey
对于DirectExchange,指定RouteKey
@Component
public class SendEmailConsumer {
private static final Logger logger = LoggerFactory.getLogger(SendEmailConsumer.class);
private final SendInnerUtil sendInnerUtil;
@Autowired
public SendEmailConsumer(SendInnerUtil sendInnerUtil) {
this.sendInnerUtil = sendInnerUtil;
}
/*** 消费者处理接收消息方法
* ****重要说明*****
* 如果生产者是以convertSendAndReceive方法发送,则一定要手动给予返回,处理完后加入下面这一行:
* ack-true处理:channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
* 参数说明-------消息id,fasle代表不批量处理(批量是指将消息id小于当前id的都处理掉)
* ack-false处理:channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
* 参数说明-------消息id,fasle代表不批量处理(批量是指将消息id小于当前id的都处理掉),第二个false表示不重新入队(重新入队用true)
* 拒绝消息:channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); 消息不会重新入队
* 参数说明-------消息id,fasle表示不重新入队(重新入队用true)
*
* 异常情况下,如果不手动做ack-false处理,则该消息会一直认为没有被消费掉,会一直占用rabbitmq内存空间,时间一久,必然造成内存溢出,切记!!!
*/
@RabbitListener(queues = RabbitMQConstant.SEND_EMAIL_QUEUE)
public void sendEmailHandler(RabbitParams params, Message message, Channel channel) {
try {
// 统一配置的自动确认,所以不需要手动应答
logger.info("发送邮件消费者开始执行-------->" + params);
// 发送邮件
String template = "InternalServerErrorTemplate";
AbstractContext context = new Context();
context.setVariable("contactName", params.getContactName());
context.setVariable("type", params.getType());
context.setVariable("dbName", params.getDbName());
context.setVariable("ip", params.getIp());
context.setVariable("port", params.getPort());
context.setVariable("itemName", params.getItemName());
context.setVariable("threshold", params.getThreshold());
context.setVariable("value", params.getValue());
context.setVariable("occurredTime", getDate());
sendInnerUtil.sendTemplateEmail(params.getEmail(), MonitorConstant.SEND_EMAIL_SUBJECT, template, context);
} catch (Exception e) {
logger.error("发送邮件失败=======>" + e.toString(), e);
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException io) {
io.printStackTrace();
}
}
}
public String getDate() {
Calendar calendar = Calendar.getInstance();
return " " + calendar.get(Calendar.YEAR) + "年" +
" " + (calendar.get(Calendar.MONTH) + 1) + "月" +
" " + calendar.get(Calendar.DAY_OF_MONTH) + "日" +
" " + calendar.get(Calendar.HOUR_OF_DAY) + "时" +
" " + calendar.get(Calendar.MINUTE) + "分" +
" " + calendar.get(Calendar.SECOND) + "秒";
}
}