很长时间没有分享过学习心得了,看了下发布记录,最后一篇文章的时间都在2020-12-10年了,今天抽时间整理下一个很早就想整理的技术分享。顺便说句题外话,因为我一直没时间整理,再加上开发的小伙伴对Mq的理解不够,我开掉了好几个处理这个事情的开发小伙伴,所以我希望这篇文章能对大家带来一点帮助。
背景说明
Mq(消息队列)做为一个消峰工具而常被使用,我们常用的Mq主要分为以下四种:
- ActiveMQ
- RabbitMq
- Kafka
- RocketMq
今天主要是聊聊RabbitMq,业务场景上选择RabbitMq的原因有很多,今天就不细说了。今天主要是说下如何动态创建队列,并实现动态监听的的方法。
需求背景
做为一个CRM-SAAS平台,每天平台会进入大量的客户信息,那么我们需要用高效的方式把这些数据及时的发给销售,那么这里需要考虑以下几个问题:
- 下发数据的及时性
- 数据分组
- 接收人员属于不同的分组和不同的级别
- 数据不满足下发条件(这里举个例子:接收人员都在忙的情况,可能需要过段时间重发)重发的问题
技术方案
- 为保证数据及时性,数据进入系统之后及时推进消费队列
- 针对数据分组和接收人员不同的分组和不同的级别,并要人为可控的话,那么就设定不同的队列来进行监听消费,我们还可以让队列名称变得有意义,从队列名称上获取我们所需要某些必要信息,例如数据属于那个分组,数据应该下发的群体等。
基于上述考虑,我们选择RabbitMq来实现这个方案,既然是不同的队列消费不同的数据,那么第一步就是考虑如何动态创建队列,因为这里还要设定一个人为可控,也就是人员可以管理,所以比然后伴随着队列的删除和重建。
队列的创建方式
基于注解的使用
@Bean
public Queue syncCdrQueue(){
return new Queue(CrmMqConstant.SYNC_CDR_TO_CRM_Q,true,false,false);
}
非注解配置
Channel channelForm = connectionFactory().createConnection().createChannel(false);
channelForm.queueDeclare(nameForm, true, false, false, null);
基于RabbitAdmin
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareExchange(fanoutExchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(fanoutExchange));
从创建队列的灵活度来说,肯定是依次减弱的:
- 注解方式:提前定义队列名称,一般以常量来定义,当然也支持变量的方式,但是对于加载先后的要求就高了,例如:这里用一个动态IP作为队列名称举例
private final String QUEUE_NAME="crm.websocket."+ IPUtils.getLocalhostIp(); @Bean public Queue queue(){ return new Queue(QUEUE_NAME,true,false,false); } //监听 @RabbitListener(queues = "#{queue.name}")
- 非注解方式:这个其实就是通过ConnectionFactory来获取通道创建队列的,这个比较适合在建立链接的时候使用,所以一般在批量初始化队列时候比较合适
@Bean
public List<String> mqMsgQueues() throws IOException {
List<String> queueNames = new ArrayList<String>();
List<Map<String,Object>> engineList = autoAssignEngineService.getAllAutoAssignEngine(-1,-1);
logger.info("engineList:{}", JsonUtils.toJson(engineList));
if(engineList != null && engineList.size() > 0) {
for(Map<String,Object> engine : engineList) {
String groupId = String.valueOf(engine.get("orgId"));
String semAdType = String.valueOf(engine.get("semAdType"));
logger.info("groupId:{},semAdType:{}", groupId,semAdType);
createQueue(queueNames, groupId,semAdType,"1");
createQueue(queueNames, groupId,semAdType,"2");
}
}
return queueNames;
}
private void createQueue(List<String> queueNames, String groupId, String semType, String level) throws IOException {
String nameForm = queue +"."+ groupId+"."+semType + "." + level;
logger.info("nameForm:{}",nameForm);
Channel channelForm = connectionFactory().createConnection().createChannel(false);
channelForm.queueDeclare(nameForm, true, false, false, null);
channelForm.exchangeDeclare(topicExchange, BuiltinExchangeType.TOPIC,true);
channelForm.queueBind(nameForm,topicExchange,routingKey + "."+groupId+"."+semType+"."+level);
queueNames.add(nameForm);
}
- 基于RabbitAdmin的方式:那么这个就相对来说比较灵活,支持随时创建队列了。那么简单封装下:
public void createMqQueue(String queueName,String exName,String rk,String type){ Properties properties = rabbitAdmin.getQueueProperties(queueName); if(properties==null) { Queue queue = new Queue(queueName, true, false, false, null); if(BuiltinExchangeType.DIRECT.getType().equals(type)) { DirectExchange directExchange = new DirectExchange(exName); rabbitAdmin.declareQueue(queue); rabbitAdmin.declareExchange(directExchange); rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(directExchange).with(rk)); }else if(BuiltinExchangeType.FANOUT.getType().equals(type)){ FanoutExchange fanoutExchange = new FanoutExchange(exName); rabbitAdmin.declareQueue(queue); rabbitAdmin.declareExchange(fanoutExchange); rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(fanoutExchange)); }else{ TopicExchange topicExchange = new TopicExchange(exName); rabbitAdmin.declareQueue(queue); rabbitAdmin.declareExchange(topicExchange); rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(topicExchange).with(rk)); } } }
我们知道如何动态创建队列之后,接下来我们想办法解决动态消费监听得事情就行:
动态消费监听
RabbitMq得抽象监听类是:AbstractMessageListenerContainer,他下面有三个实现类,这里就使用SimpleMessageListenerContainer类来进行简单得说明。
方式一(我一个前辈得方式):
初始化队列,存储在静态缓存,用不同得bean来加载监听:
private List<Map<String,String>> groupOrgIds = new ArrayList<Map<String,String>>()
@PostConstruct
public void init() {
if (logger.isDebugEnabled()) {
logger.debug("initbean...");
}
List<AutoAssignEngine> engineList = autoAssignEngineService.getAllAutoAssignEngine();
if (engineList != null && engineList.size() > 0) {
for(AutoAssignEngine engine : engineList) {
createQueueList(engine.getOrgId(),engine.getSemAdType(),"1");
createQueueList(engine.getOrgId(),engine.getSemAdType(),"2");
}
}
}
private void createQueueList(String orgId,String semType,String userLevel) {
Map<String,String> feed = new HashMap<String, String>();
feed.put("orgId", orgId);
feed.put("type", semType);
feed.put("userLevel", userLevel);
groupOrgIds.add(feed);
}
public SimpleMessageListenerContainer setContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConfig.connectionFactory());
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setMaxConcurrentConsumers(8);
container.setConcurrentConsumers(5);
container.setPrefetchCount(10);
return container;
}
public SimpleMessageListenerContainer queueMethod(SimpleMessageListenerContainer container) {
Map<String,String> orgIdMap = groupOrgIds.get(0);
String orgId = orgIdMap.get("orgId");
String sourceType = orgIdMap.get("type");
String userLevel = orgIdMap.get("userLevel");
String queueNames=queueName + "." + orgId+"."+sourceType+"."+userLevel;
container.addQueueNames(queueNames);
excute(orgId,sourceType, container,queueNames);
groupOrgIds.remove(0);
return container;
}
public SimpleMessageListenerContainer excute(String orgId,String semAdType, SimpleMessageListenerContainer container,String queneName) {
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
}
});
return container;
}
/**
* 创建多个队列监听,利用Bean的初始化顺序,去消费groupOrgIds
*/
@Bean
public SimpleMessageListenerContainer container1() {
SimpleMessageListenerContainer container = setContainer();
queueMethod(container);
return container;
}
@Bean
public SimpleMessageListenerContainer container2() {
SimpleMessageListenerContainer container = setContainer();
queueMethod(container);
return container;
}
.....
那么这种方式呢确实能动态监听不同得队列和消费,但是因为利用得是Bean得初始化得方式,所以每次变更需要加载得队列内容就得重新加载Bean,也就是需要重启服务。
方式二:真正得动态监听
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConfig.connectionFactory());
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setMaxConcurrentConsumers(8);
container.setConcurrentConsumers(5);
container.setPrefetchCount(10);
// 查询有多少分配引擎,每个分配引擎一个队列
List<AutoAssignEngine> engineList = autoAssignEngineService.getAllAutoAssignEngine();
if (engineList != null && engineList.size() > 0) {
for(AutoAssignEngine engine : engineList) {
mqService.addNewListener(engine.getOrgId(),engine.getSemAdType(),"1",container);
mqService.addNewListener(engine.getOrgId(),engine.getSemAdType(),"2",container);
}
}
return container;
}
public Boolean addNewListener(String orgId,String semType,String userLevel,SimpleMessageListenerContainer container ){
String queueNames=queueName + "." + orgId+"."+semType+"."+userLevel;
container.addQueueNames(queueNames);
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
}
});
return true;
}
问题1:这里再接收消息(onMessage方法内)得时候不要用方法传参,会出现并发问题。
解决方式1:
String receiveQueueName = message.getMessageProperties().getConsumerQueue();
队列名称解析获取,本人使用。
解决方式2:
使用final变量重新接收传参,不过这个有待测试,不一定又用。
问题2:这不是还是在Bean初始化得时候加载得嘛,如果想要在服务启动之后再增加监听如何处理
完整得动态创建队列和监听(业务过程种实现)
我们知道如何创建队列和监听之后就开始解决问题2。
需求:变更现有队列。
转化需求为:删除现有队列和监听,新建新得队列并增加监听
问题:推送和消费不再统一服务。
解决方式:暴露接口,利用http请求实现同步。
代码实现:
消费端
public Boolean updateListener(String orgId,String semType,String oldOrg){
logger.info("================================消费端开始处理");
String newFirstQueueName = queueName+"."+orgId+"."+semType+"."+1;
String newFirstRk = routingKey+"."+orgId+"."+semType+"."+1;
String newSecondQueueName = queueName+"."+orgId+"."+semType+"."+2;
String newSecondRk = routingKey+"."+orgId+"."+semType+"."+2;
createMqQueue(newFirstQueueName,topicExchange,newFirstRk, BuiltinExchangeType.TOPIC.getType());
createMqQueue(newSecondQueueName,topicExchange,newSecondRk, BuiltinExchangeType.TOPIC.getType());
logger.info("================================创建队列");
SimpleMessageListenerContainer container = SpringCtxUtils.getBean(SimpleMessageListenerContainer.class);
String oneQueueNames=queueName + "." + orgId+"."+semType+"."+1;
String twoQueueNames=queueName + "." + orgId+"."+semType+"."+2;
if(!"NO".equals(oldOrg)) {
String oneOldQueueNames = queueName + "." + oldOrg + "." + semType + "." + 1;
String twoOldQueueNames = queueName + "." + oldOrg + "." + semType + "." + 2;
container.removeQueueNames(oneOldQueueNames);
container.removeQueueNames(twoOldQueueNames);
logger.info("================================删除监听成功");
}
container.addQueueNames(oneQueueNames);
container.addQueueNames(twoQueueNames);
logger.info("================================添加监听成功");
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
}
});
return true;
}
public void createMqQueue(String queueName,String exName,String rk,String type){
Properties properties = rabbitAdmin.getQueueProperties(queueName);
if(properties==null) {
Queue queue = new Queue(queueName, true, false, false, null);
if(BuiltinExchangeType.DIRECT.getType().equals(type)) {
DirectExchange directExchange = new DirectExchange(exName);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareExchange(directExchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(directExchange).with(rk));
}else if(BuiltinExchangeType.FANOUT.getType().equals(type)){
FanoutExchange fanoutExchange = new FanoutExchange(exName);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareExchange(fanoutExchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(fanoutExchange));
}else{
TopicExchange topicExchange = new TopicExchange(exName);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareExchange(topicExchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(topicExchange).with(rk));
}
}
}
暴露接口:
@GetMapping("/add-listener/{orgId}/{semType}/{oldOrg}")
public ComResponse addListener(@PathVariable("orgId") String orgId,@PathVariable("semType") String semType,@PathVariable("oldOrg") String oldOrg){
mqService.updateListener(orgId,semType,oldOrg);
return ComResponse.successResponse();
}
注意:执行顺序,创建新队列,删除监听,添加监听
推送端
//添加新得监听
String requestUrl = consumerUrl+"/"+newOrg+"/"+semType+"/"+oldOrgId;
String result = restTemplateService.getWithNoParams(requestUrl,String.class);
log.info("请求结束:{}",result);
if(!"NO".equals(oldOrgId)) {
String firstQueueName = queue + "." + oldOrgId + "." + semType + "." + 1;
String secondQueueName = queue + "." + oldOrgId + "." + semType + "." + 2;
mqService.deleteMqQueue(firstQueueName);
mqService.deleteMqQueue(secondQueueName);
log.info("删除队列结束");
}
//新增新的的队列
String newFirstQueueName = queue+"."+newOrg+"."+semType+"."+1;
String newFirstRk = routingKey+"."+newOrg+"."+semType+"."+1;
String newSecondQueueName = queue+"."+newOrg+"."+semType+"."+2;
String newSecondRk = routingKey+"."+newOrg+"."+semType+"."+2;
mqService.createMqQueue(newFirstQueueName,topicExchange,newFirstRk, BuiltinExchangeType.TOPIC.getType());
mqService.createMqQueue(newSecondQueueName,topicExchange,newSecondRk, BuiltinExchangeType.TOPIC.getType());
log.info("添加队列结束");
注意:执行顺序:变更监听,删除队列,添加新得队列
到这里基本上就实现了动态创建队列和动态监听。大家如果有什么不太明白得可以留言,抽时间整理得,所以写得比较草,大家凑合着看把。