RabbitMQ在实际项目中的应用
业务场景
说明:
由于一些原因不说明具体业务,大致的业务场景就是,模块A会给模块B发出消息 ,模块B更新本地缓存encache.并且模块B在项目中是集群部署。
选用交换机:
1、fanout 它会把所有的交换器上的消息路由到所有与该交换器邦定的队列中,不需要BindingKey生效
2、direct:它会把消息路由到BindingKey与RoutingKey完全匹配的队列中。
3、topic:是direct上的扩展,同样是利用RoutingKey与BindingKey相匹配,但是匹配规则不一样,支持模糊匹配。有如下的规
-
-
- RoutingKey为一个点号“.”分隔的字符串,每个被隔开的独立字符串即为一个单词,是匹配的单位;
- BindingKey和RoutingKey一样,也是"."分割的字符串;
- 但不同的是BindingKey,可以用“#”,“”进行类似于占位符的模糊匹配,“#”表示一个单词,""表示多个单词(也可以是零个)
-
4、headers:依赖发送消息内容中的hearders属性进行匹配,在绑定队列和交换器时指定一组键值对,这里的也就是headers,当发送消息到交换器时,RabbitMQ会获取到该消息的headers,通过比较会路由到相关队列中,这种交换器性能会很差,一般不会使用。
以上就是交换机类型 依照此场景一般我们会考虑到使用fanout 这种模式 (也可以使用direct)
详情见下图:
图示
实现
这里说明springboot 集成rabbitmq 并且使用direct模型交换机实现该业务场景 展示消费者的代码
pom.xml添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.proerties 添加交换机和路由配置
rabbitmq.admin.exchange=admin.billinfo
rabbitmq.admin.routingkey=admin.billinfo
添加RabbitmqConfig类
@Configuration
public class RabbitMqConfig {
private final Logger logger = LoggerFactory.getLogger(RabbitMqConfig.class);
@Value("${rabbitmq.admin.exchange}")
String adminExchange;
@Value("${rabbitmq.admin.routingkey}")
String adminRouteKey;
@Value("${stationno}")
public Integer stationno;//站点号
//队列起名和交换机起名
@Bean
public Queue AdminQueue() {
return new Queue("admin.billinfo"+station,true); //true 是否持久
}
@Bean
DirectExchange AdminQueueExchange() {
return new DirectExchange(adminExchange==null?"admin.billinfo":adminExchange.trim());
}
//交换机与队列绑定
@Bean
Binding bangdingDirect() {
return BindingBuilder.bind(AdminQueue()).to(AdminQueueExchange()).with(adminRouteKey==null?"":adminRouteKey.trim());
}
}
消息消费者
@Component
@RabbitListener(queues = "admin.billinfo")//监听的队列名称
public class CacheReceiver {
private final Logger logger = LoggerFactory.getLogger(DirectReceiver.class);
@Autowired
PredictPoolMemory predictPoolMemory;
@Autowired
PreviewPoolMemory previewPoolMemory;
@Autowired
AgentPoolMemory agentPoolMemory;
@Autowired
PredictNumberPoolDao predictNumberPoolDao;
@Autowired
PreviewNumberPoolDao previewNumberPoolDao;
@Autowired
AgentNumberPoolDao agentNumberPoolDao;
@Autowired
JobCache jobCache;
//收到消息
@RabbitHandler
public void process(String billInfoMessage){
logger.info("adminbillinfo:{}",billInfoMessage);
JSONObject jsonobj = JSON.parseObject(billInfoMessage);
String cmd = jsonobj.getString("cmd");
int type = jsonobj.getIntValue("type");
int companyid = jsonobj.getIntValue("companyid");
//修改企业信息
if ("updatecompanyinfo".equals(cmd)){
try {
predictPoolMemory.addAll();
} catch (Exception e) {
logger.error("Exception:{}"+e.getMessage(),e);
}
try {
previewPoolMemory.addAll();
} catch (Exception e) {
logger.error("Exception:{}"+e.getMessage(),e);
}
try {
agentPoolMemory.addAll();
} catch (Exception e) {
logger.error("Exception:{}"+e.getMessage(),e);
}
try{
jobCache.initCache();
}catch (Exception e){
logger.error("Exception{}"+e.getMessage(),e);
}
}
}
线上mq容易出现的消息积压问题解决方案
mq自动确认模式 acknowledge = auto,不做异常处理,会导致 mq消费者消费异常,消息重回队列从而导致消息积压
可添加配置
# 开启ACK 无确认模式
spring.rabbitmq.listener.direct.acknowledge-mode=none
spring.rabbitmq.listener.simple.acknowledge-mode=none
AcknowledgeMode 除了 NONE 和 MANUAL 之外还有 AUTO ,它会根据方法的执行情况来决定是否确认还是拒绝(是否重新入queue)
-
如果消息成功被消费(成功的意思是在消费的过程中没有抛出异常),则自动确认
-
当抛出 AmqpRejectAndDontRequeueException 异常的时候,则消息会被拒绝,且 requeue = false(不重新入队列)
-
当抛出 ImmediateAcknowledgeAmqpException 异常,则消费者会被确认
-
其他的异常,则消息会被拒绝,且 requeue = true(如果此时只有一个消费者监听该队列,则有发生死循环的风险,多消费端也会造成资源的极大浪费,这个在开发过程中一定要避免的)。可以通过 setDefaultRequeueRejected(默认是true)去设置
详细线上mq故障排查请关注公众号:君子亦流氓