spingboot整合RabbitMQ

SpirngBoot(2.16)自动配置RabbitMQ

  • RabbitAutoConfiguration
    spingboot整合RabbitMQ
  • RabbitProperties封装了rabbit的配置
    spingboot整合RabbitMQ
  • 自动配置ConnectionFactory
    spingboot整合RabbitMQ
  • 提供RabbitTemplate,操作模板
    spingboot整合RabbitMQ
  • AmqpAdmin:rabbitMQ系统管理组件,如声明一个队列
    spingboot整合RabbitMQ
    上图实现了RabbitMQ的自动配置
对于springboot 1.x版本
  • 使用AmqpAdmin管理交换器 队列 绑定等
    spingboot整合RabbitMQ
  • 发送一个User对象到消息(User 实现序列化接口)
    spingboot整合RabbitMQ
  • 自定义消息的序列化方式(json展示)
    spingboot整合RabbitMQ
    spingboot整合RabbitMQ
    spingboot整合RabbitMQ
  • 使用EnableRabbit开启注解支持
  • 使用 @RabbitListener监听消息来消费
    spingboot整合RabbitMQ
    • 直接使用具体的类接收消息
    • 使用Message接收再从中取出消息体
    • 或者一起使用

springboot2.16版本

  • 使用配置类创建交换器,并绑定队列 指定路由。
    spingboot整合RabbitMQ
    对于FanoutExchange,不需要指定RouteKey
    spingboot整合RabbitMQ
    对于DirectExchange,指定RouteKey
    spingboot整合RabbitMQ
@Component
public class SendEmailConsumer {

    private static final Logger logger = LoggerFactory.getLogger(SendEmailConsumer.class);

    private final SendInnerUtil sendInnerUtil;

    @Autowired
    public SendEmailConsumer(SendInnerUtil sendInnerUtil) {
        this.sendInnerUtil = sendInnerUtil;
    }

    /*** 消费者处理接收消息方法
     * ****重要说明*****
     *  如果生产者是以convertSendAndReceive方法发送,则一定要手动给予返回,处理完后加入下面这一行:
     *  ack-true处理:channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
     *               参数说明-------消息id,fasle代表不批量处理(批量是指将消息id小于当前id的都处理掉)
     *  ack-false处理:channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
     *               参数说明-------消息id,fasle代表不批量处理(批量是指将消息id小于当前id的都处理掉),第二个false表示不重新入队(重新入队用true)
     *  拒绝消息:channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); 消息不会重新入队
     *              参数说明-------消息id,fasle表示不重新入队(重新入队用true)
     *
     *  异常情况下,如果不手动做ack-false处理,则该消息会一直认为没有被消费掉,会一直占用rabbitmq内存空间,时间一久,必然造成内存溢出,切记!!!
     */
    @RabbitListener(queues = RabbitMQConstant.SEND_EMAIL_QUEUE)
    public void sendEmailHandler(RabbitParams params, Message message, Channel channel) {
        try {
            // 统一配置的自动确认,所以不需要手动应答
            logger.info("发送邮件消费者开始执行-------->" + params);
            // 发送邮件
            String template = "InternalServerErrorTemplate";
            AbstractContext context = new Context();
            context.setVariable("contactName", params.getContactName());
            context.setVariable("type", params.getType());
            context.setVariable("dbName", params.getDbName());
            context.setVariable("ip", params.getIp());
            context.setVariable("port", params.getPort());
            context.setVariable("itemName", params.getItemName());
            context.setVariable("threshold", params.getThreshold());
            context.setVariable("value", params.getValue());
            context.setVariable("occurredTime", getDate());
            sendInnerUtil.sendTemplateEmail(params.getEmail(), MonitorConstant.SEND_EMAIL_SUBJECT, template, context);
        } catch (Exception e) {
            logger.error("发送邮件失败=======>" + e.toString(), e);
            try {
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            } catch (IOException io) {
                io.printStackTrace();
            }
        }
    }

    public String getDate() {
        Calendar calendar = Calendar.getInstance();
        return " " + calendar.get(Calendar.YEAR) + "年" +
                " " + (calendar.get(Calendar.MONTH) + 1) + "月" +
                " " + calendar.get(Calendar.DAY_OF_MONTH) + "日" +
                " " + calendar.get(Calendar.HOUR_OF_DAY) + "时" +
                " " + calendar.get(Calendar.MINUTE) + "分" +
                " " + calendar.get(Calendar.SECOND) + "秒";
    }
}

上一篇:Java 实战 spingboot-redis


下一篇:【SpringBoot】SpingBoot整合AOP