Spring AMQP:RabbitTemplate SimpleMessageListenerContainer

一.RabbitTemplate介绍

RabbitTemplate:消息模板,在与Spring AMQP整合时,进行发送消息的关键类。

包括了可靠性投递消息方法、回调监听消息接口ConfirmCallBack、返回值确认接口ReturnCallBack等,同样需要进行注入到ioc容器中。

与spring整合需要实例化,与spring boot整合只需要在配置文件中配置就好了

相关配置

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        return new RabbitTemplate(connectionFactory);
    }

测试

发送消息

    @Test
    public void testSend1(){
        rabbitTemplate.convertAndSend("amqp.bean.topic", "amqp.send","hello spring");
    }

结果:

Spring AMQP:RabbitTemplate  SimpleMessageListenerContainer

 测试2:

    @Test
    public void testSend(){
        //创建消息
        MessageProperties properties = new MessageProperties();
        properties.getHeaders().put("send.amqp","测试发送信息");
        Message message = new Message("hello spring amqp".getBytes(), properties);

        //发送
        rabbitTemplate.convertAndSend("amqp.bean.topic", "amqp.send", message, message1 -> {
            System.out.println("添加额外的设置");
            message1.getMessageProperties().getHeaders().put("extra","额外信息");
            return message1;
        });
    }

 测试结果:

Spring AMQP:RabbitTemplate  SimpleMessageListenerContainer

二.SimpleMessageListenerContainer介绍

 简单消息容器,对于消费者的配置项,这个类可以满足

监听多个队列、自动启动、自动声明,事务设置,设置消费者属性,批量消费、设置消息确认和自动确认模式、重回队列,异常捕获handler函数,消费者标签生成测虐,多占模式,设置监听器、转换器。

SimpleMessageListenerContainer可以进行动态设置,在运行的应用中动态修改其消费者数量的大小、接收消息模式等

配置:

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        //设置监听的队列
        container.setQueues(queue());
        //设置当前消费者数量
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(5);
        //重回队列
        container.setDefaultRequeueRejected(false);
        //签收机制
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //消费端标签策略
        container.setConsumerTagStrategy(queue-> queue+"_"+ UUID.randomUUID().toString());
        //消息监听
        container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> System.out.println(new String(message.getBody())));
        return container;
    }

运行springboot主应用,

可以看到在rabbitmq的控制台中,有Container监听的队列。

Spring AMQP:RabbitTemplate  SimpleMessageListenerContainer

上一篇:golang实现rabbitmq消费者模式 断线重连机制


下一篇:消息中间件系列(七):如何从0到1设计一个消息队列中间件