RabbitMQ在实际项目中的应用

RabbitMQ在实际项目中的应用

业务场景

说明:

由于一些原因不说明具体业务,大致的业务场景就是,模块A会给模块B发出消息 ,模块B更新本地缓存encache.并且模块B在项目中是集群部署。

选用交换机:

1、fanout 它会把所有的交换器上的消息路由到所有与该交换器邦定的队列中,不需要BindingKey生效

2、direct:它会把消息路由到BindingKey与RoutingKey完全匹配的队列中

3、topic:是direct上的扩展,同样是利用RoutingKey与BindingKey相匹配,但是匹配规则不一样,支持模糊匹配。有如下的规

      • RoutingKey为一个点号“.”分隔的字符串,每个被隔开的独立字符串即为一个单词,是匹配的单位;
      • BindingKey和RoutingKey一样,也是"."分割的字符串;
      • 但不同的是BindingKey,可以用“#”,“”进行类似于占位符的模糊匹配,“#”表示一个单词,""表示多个单词(也可以是零个)

4、headers:依赖发送消息内容中的hearders属性进行匹配,在绑定队列和交换器时指定一组键值对,这里的也就是headers,当发送消息到交换器时,RabbitMQ会获取到该消息的headers,通过比较会路由到相关队列中,这种交换器性能会很差,一般不会使用。

以上就是交换机类型 依照此场景一般我们会考虑到使用fanout 这种模式 (也可以使用direct)

详情见下图:

图示

RabbitMQ在实际项目中的应用

实现

这里说明springboot 集成rabbitmq 并且使用direct模型交换机实现该业务场景 展示消费者的代码

pom.xml添加依赖

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>

application.proerties 添加交换机和路由配置

rabbitmq.admin.exchange=admin.billinfo
rabbitmq.admin.routingkey=admin.billinfo

添加RabbitmqConfig类

@Configuration
public class RabbitMqConfig {
	private  final Logger logger = LoggerFactory.getLogger(RabbitMqConfig.class);

	
    @Value("${rabbitmq.admin.exchange}")
    String  adminExchange;

    @Value("${rabbitmq.admin.routingkey}")
    String  adminRouteKey;
	 
	 @Value("${stationno}")
	 public  Integer stationno;//站点号
    //队列起名和交换机起名
    @Bean
    public Queue AdminQueue() {
        return new Queue("admin.billinfo"+station,true);  //true 是否持久
    }
    @Bean
    DirectExchange AdminQueueExchange() {
        return new DirectExchange(adminExchange==null?"admin.billinfo":adminExchange.trim());
    }
    //交换机与队列绑定
    @Bean
    Binding bangdingDirect() {
        return BindingBuilder.bind(AdminQueue()).to(AdminQueueExchange()).with(adminRouteKey==null?"":adminRouteKey.trim());
    }

}

消息消费者

@Component
@RabbitListener(queues = "admin.billinfo")//监听的队列名称
public class CacheReceiver {
    private  final Logger logger = LoggerFactory.getLogger(DirectReceiver.class);

    @Autowired
    PredictPoolMemory predictPoolMemory;
    @Autowired
    PreviewPoolMemory previewPoolMemory;
    @Autowired
    AgentPoolMemory agentPoolMemory;

    @Autowired
    PredictNumberPoolDao predictNumberPoolDao;
    @Autowired
    PreviewNumberPoolDao previewNumberPoolDao;
    @Autowired
    AgentNumberPoolDao agentNumberPoolDao;
    @Autowired
    JobCache jobCache;



    //收到消息
    @RabbitHandler
    public void process(String billInfoMessage){
        logger.info("adminbillinfo:{}",billInfoMessage);
        JSONObject jsonobj = JSON.parseObject(billInfoMessage);
        String cmd = jsonobj.getString("cmd");
        int type = jsonobj.getIntValue("type");
        int companyid  = jsonobj.getIntValue("companyid");

        //修改企业信息
        if ("updatecompanyinfo".equals(cmd)){
            try {
                predictPoolMemory.addAll();
            } catch (Exception e) {
                logger.error("Exception:{}"+e.getMessage(),e);
            }

            try {
                previewPoolMemory.addAll();
            } catch (Exception e) {
                logger.error("Exception:{}"+e.getMessage(),e);
            }

            try {
                agentPoolMemory.addAll();
            } catch (Exception e) {
                logger.error("Exception:{}"+e.getMessage(),e);
            }
            try{
                jobCache.initCache();
            }catch (Exception e){
                logger.error("Exception{}"+e.getMessage(),e);
            }
        }
}

线上mq容易出现的消息积压问题解决方案

mq自动确认模式 acknowledge = auto,不做异常处理,会导致 mq消费者消费异常,消息重回队列从而导致消息积压

可添加配置

# 开启ACK  无确认模式
spring.rabbitmq.listener.direct.acknowledge-mode=none
spring.rabbitmq.listener.simple.acknowledge-mode=none

AcknowledgeMode 除了 NONE 和 MANUAL 之外还有 AUTO ,它会根据方法的执行情况来决定是否确认还是拒绝(是否重新入queue)

  • 如果消息成功被消费(成功的意思是在消费的过程中没有抛出异常),则自动确认

  • 当抛出 AmqpRejectAndDontRequeueException 异常的时候,则消息会被拒绝,且 requeue = false(不重新入队列)

  • 当抛出 ImmediateAcknowledgeAmqpException 异常,则消费者会被确认

  • 其他的异常,则消息会被拒绝,且 requeue = true(如果此时只有一个消费者监听该队列,则有发生死循环的风险,多消费端也会造成资源的极大浪费,这个在开发过程中一定要避免的)。可以通过 setDefaultRequeueRejected(默认是true)去设置

详细线上mq故障排查请关注公众号:君子亦流氓

上一篇:【spring】依赖注入的方式


下一篇:springboot service 用 @Autowired注入 mapper 为null