聊聊RabbitMq动态监听这点事

很长时间没有分享过学习心得了,看了下发布记录,最后一篇文章的时间都在2020-12-10年了,今天抽时间整理下一个很早就想整理的技术分享。顺便说句题外话,因为我一直没时间整理,再加上开发的小伙伴对Mq的理解不够,我开掉了好几个处理这个事情的开发小伙伴,所以我希望这篇文章能对大家带来一点帮助。

背景说明

Mq(消息队列)做为一个消峰工具而常被使用,我们常用的Mq主要分为以下四种:

  • ActiveMQ
  • RabbitMq
  • Kafka
  • RocketMq

今天主要是聊聊RabbitMq,业务场景上选择RabbitMq的原因有很多,今天就不细说了。今天主要是说下如何动态创建队列,并实现动态监听的的方法。

需求背景

做为一个CRM-SAAS平台,每天平台会进入大量的客户信息,那么我们需要用高效的方式把这些数据及时的发给销售,那么这里需要考虑以下几个问题:

  1. 下发数据的及时性
  2. 数据分组
  3. 接收人员属于不同的分组和不同的级别
  4. 数据不满足下发条件(这里举个例子:接收人员都在忙的情况,可能需要过段时间重发)重发的问题

技术方案

  1. 为保证数据及时性,数据进入系统之后及时推进消费队列
  2. 针对数据分组和接收人员不同的分组和不同的级别,并要人为可控的话,那么就设定不同的队列来进行监听消费,我们还可以让队列名称变得有意义,从队列名称上获取我们所需要某些必要信息,例如数据属于那个分组,数据应该下发的群体等。

基于上述考虑,我们选择RabbitMq来实现这个方案,既然是不同的队列消费不同的数据,那么第一步就是考虑如何动态创建队列,因为这里还要设定一个人为可控,也就是人员可以管理,所以比然后伴随着队列的删除和重建。

队列的创建方式

基于注解的使用

    @Bean
    public Queue syncCdrQueue(){
        return new Queue(CrmMqConstant.SYNC_CDR_TO_CRM_Q,true,false,false);
    }

非注解配置

		Channel channelForm = connectionFactory().createConnection().createChannel(false);
		channelForm.queueDeclare(nameForm, true, false, false, null);

基于RabbitAdmin

rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareExchange(fanoutExchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(fanoutExchange));

从创建队列的灵活度来说,肯定是依次减弱的:

  • 注解方式:提前定义队列名称,一般以常量来定义,当然也支持变量的方式,但是对于加载先后的要求就高了,例如:这里用一个动态IP作为队列名称举例
    private final String QUEUE_NAME="crm.websocket."+ IPUtils.getLocalhostIp();
    
        @Bean
        public Queue queue(){
            return new Queue(QUEUE_NAME,true,false,false);
        }
    
    //监听
    @RabbitListener(queues = "#{queue.name}")
  • 非注解方式:这个其实就是通过ConnectionFactory来获取通道创建队列的,这个比较适合在建立链接的时候使用,所以一般在批量初始化队列时候比较合适
    @Bean
    public List<String> mqMsgQueues() throws IOException {
    	List<String> queueNames = new ArrayList<String>();
    	List<Map<String,Object>> engineList = autoAssignEngineService.getAllAutoAssignEngine(-1,-1);
        logger.info("engineList:{}", JsonUtils.toJson(engineList));
    	if(engineList != null && engineList.size() > 0) {
    		for(Map<String,Object> engine : engineList) {
    			String groupId = String.valueOf(engine.get("orgId"));
    			String semAdType = String.valueOf(engine.get("semAdType"));
                logger.info("groupId:{},semAdType:{}", groupId,semAdType);
	            createQueue(queueNames, groupId,semAdType,"1");
	            createQueue(queueNames, groupId,semAdType,"2");
    		}
    	}
    	return queueNames;
    }

	private void createQueue(List<String> queueNames, String groupId, String semType, String level) throws IOException {
		String nameForm = queue +"."+ groupId+"."+semType + "." + level;
		logger.info("nameForm:{}",nameForm);
		Channel channelForm = connectionFactory().createConnection().createChannel(false);
		channelForm.queueDeclare(nameForm, true, false, false, null);
		channelForm.exchangeDeclare(topicExchange, BuiltinExchangeType.TOPIC,true);
		channelForm.queueBind(nameForm,topicExchange,routingKey + "."+groupId+"."+semType+"."+level);
		queueNames.add(nameForm);
	}
  • 基于RabbitAdmin的方式:那么这个就相对来说比较灵活,支持随时创建队列了。那么简单封装下:
       public void createMqQueue(String queueName,String exName,String rk,String type){
            Properties properties = rabbitAdmin.getQueueProperties(queueName);
            if(properties==null) {
                Queue queue = new Queue(queueName, true, false, false, null);
                if(BuiltinExchangeType.DIRECT.getType().equals(type)) {
                    DirectExchange directExchange = new DirectExchange(exName);
                    rabbitAdmin.declareQueue(queue);
                    rabbitAdmin.declareExchange(directExchange);
                    rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(directExchange).with(rk));
                }else if(BuiltinExchangeType.FANOUT.getType().equals(type)){
                    FanoutExchange fanoutExchange = new FanoutExchange(exName);
                    rabbitAdmin.declareQueue(queue);
                    rabbitAdmin.declareExchange(fanoutExchange);
                    rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(fanoutExchange));
                }else{
                    TopicExchange topicExchange = new TopicExchange(exName);
                    rabbitAdmin.declareQueue(queue);
                    rabbitAdmin.declareExchange(topicExchange);
                    rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(topicExchange).with(rk));
                }
            }
        }

    我们知道如何动态创建队列之后,接下来我们想办法解决动态消费监听得事情就行:

动态消费监听

RabbitMq得抽象监听类是:AbstractMessageListenerContainer,他下面有三个实现类,这里就使用SimpleMessageListenerContainer类来进行简单得说明。

方式一(我一个前辈得方式):

初始化队列,存储在静态缓存,用不同得bean来加载监听:

private List<Map<String,String>> groupOrgIds = new ArrayList<Map<String,String>>()
@PostConstruct
	public void init() {
		if (logger.isDebugEnabled()) {
			logger.debug("initbean...");
		}
		List<AutoAssignEngine> engineList = autoAssignEngineService.getAllAutoAssignEngine();
		if (engineList != null && engineList.size() > 0) {
			for(AutoAssignEngine engine : engineList) {
				createQueueList(engine.getOrgId(),engine.getSemAdType(),"1");
				createQueueList(engine.getOrgId(),engine.getSemAdType(),"2");
			}
		}
	}
private void createQueueList(String orgId,String semType,String userLevel) {
		Map<String,String> feed = new HashMap<String, String>();
		feed.put("orgId", orgId);
		feed.put("type", semType);
		feed.put("userLevel", userLevel);
		groupOrgIds.add(feed);
	}
public SimpleMessageListenerContainer setContainer() {
		SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
		container.setConnectionFactory(rabbitConfig.connectionFactory());
		container.setAcknowledgeMode(AcknowledgeMode.AUTO);
		container.setMaxConcurrentConsumers(8);
		container.setConcurrentConsumers(5);
		container.setPrefetchCount(10);
		return container;
	}

	public SimpleMessageListenerContainer queueMethod(SimpleMessageListenerContainer container) {
	    Map<String,String> orgIdMap = groupOrgIds.get(0);
		String orgId = orgIdMap.get("orgId");
		String sourceType = orgIdMap.get("type");
		String userLevel = orgIdMap.get("userLevel");
		String queueNames=queueName + "." + orgId+"."+sourceType+"."+userLevel;
		container.addQueueNames(queueNames);
		excute(orgId,sourceType, container,queueNames);
		groupOrgIds.remove(0);
		return container;
	}
public SimpleMessageListenerContainer excute(String orgId,String semAdType, SimpleMessageListenerContainer container,String queneName) {
		container.setMessageListener(new ChannelAwareMessageListener() {
			@Override
			public void onMessage(Message message, Channel channel) throws Exception {

			}
		});
		return container;
	}
/**
	 * 创建多个队列监听,利用Bean的初始化顺序,去消费groupOrgIds
	 */
	@Bean
	public SimpleMessageListenerContainer container1() {
		SimpleMessageListenerContainer container = setContainer();
		queueMethod(container);
		return container;
	}
@Bean
	public SimpleMessageListenerContainer container2() {
		SimpleMessageListenerContainer container = setContainer();
		queueMethod(container);
		return container;
	}
.....

那么这种方式呢确实能动态监听不同得队列和消费,但是因为利用得是Bean得初始化得方式,所以每次变更需要加载得队列内容就得重新加载Bean,也就是需要重启服务。

方式二:真正得动态监听

	@Bean
	public SimpleMessageListenerContainer simpleMessageListenerContainer() {
		SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
		container.setConnectionFactory(rabbitConfig.connectionFactory());
		container.setAcknowledgeMode(AcknowledgeMode.AUTO);
		container.setMaxConcurrentConsumers(8);
		container.setConcurrentConsumers(5);
		container.setPrefetchCount(10);
		// 查询有多少分配引擎,每个分配引擎一个队列
		List<AutoAssignEngine> engineList = autoAssignEngineService.getAllAutoAssignEngine();
		if (engineList != null && engineList.size() > 0) {
			for(AutoAssignEngine engine : engineList) {
				mqService.addNewListener(engine.getOrgId(),engine.getSemAdType(),"1",container);
				mqService.addNewListener(engine.getOrgId(),engine.getSemAdType(),"2",container);
			}
		}
		return container;
	}

 public Boolean addNewListener(String orgId,String semType,String userLevel,SimpleMessageListenerContainer container ){
        String queueNames=queueName + "." + orgId+"."+semType+"."+userLevel;
        container.addQueueNames(queueNames);
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {

            }
        });
        return true;
    }

问题1:这里再接收消息(onMessage方法内)得时候不要用方法传参,会出现并发问题。

解决方式1:

String receiveQueueName = message.getMessageProperties().getConsumerQueue();

队列名称解析获取,本人使用。

解决方式2:

使用final变量重新接收传参,不过这个有待测试,不一定又用。

问题2:这不是还是在Bean初始化得时候加载得嘛,如果想要在服务启动之后再增加监听如何处理

完整得动态创建队列和监听(业务过程种实现)

我们知道如何创建队列和监听之后就开始解决问题2。

需求:变更现有队列。

转化需求为:删除现有队列和监听,新建新得队列并增加监听

问题:推送和消费不再统一服务。

解决方式:暴露接口,利用http请求实现同步。

代码实现:

消费端

    public Boolean updateListener(String orgId,String semType,String oldOrg){
        logger.info("================================消费端开始处理");
        String newFirstQueueName = queueName+"."+orgId+"."+semType+"."+1;
        String newFirstRk = routingKey+"."+orgId+"."+semType+"."+1;
        String newSecondQueueName = queueName+"."+orgId+"."+semType+"."+2;
        String newSecondRk =  routingKey+"."+orgId+"."+semType+"."+2;
        createMqQueue(newFirstQueueName,topicExchange,newFirstRk, BuiltinExchangeType.TOPIC.getType());
        createMqQueue(newSecondQueueName,topicExchange,newSecondRk, BuiltinExchangeType.TOPIC.getType());
        logger.info("================================创建队列");
        SimpleMessageListenerContainer container = SpringCtxUtils.getBean(SimpleMessageListenerContainer.class);
        String oneQueueNames=queueName + "." + orgId+"."+semType+"."+1;
        String twoQueueNames=queueName + "." + orgId+"."+semType+"."+2;
        if(!"NO".equals(oldOrg)) {
            String oneOldQueueNames = queueName + "." + oldOrg + "." + semType + "." + 1;
            String twoOldQueueNames = queueName + "." + oldOrg + "." + semType + "." + 2;
            container.removeQueueNames(oneOldQueueNames);
            container.removeQueueNames(twoOldQueueNames);
            logger.info("================================删除监听成功");
        }
        container.addQueueNames(oneQueueNames);
        container.addQueueNames(twoQueueNames);
        logger.info("================================添加监听成功");
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {

            }
        });
        return true;
    }

    public void createMqQueue(String queueName,String exName,String rk,String type){
        Properties properties = rabbitAdmin.getQueueProperties(queueName);
        if(properties==null) {
            Queue queue = new Queue(queueName, true, false, false, null);
            if(BuiltinExchangeType.DIRECT.getType().equals(type)) {
                DirectExchange directExchange = new DirectExchange(exName);
                rabbitAdmin.declareQueue(queue);
                rabbitAdmin.declareExchange(directExchange);
                rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(directExchange).with(rk));
            }else if(BuiltinExchangeType.FANOUT.getType().equals(type)){
                FanoutExchange fanoutExchange = new FanoutExchange(exName);
                rabbitAdmin.declareQueue(queue);
                rabbitAdmin.declareExchange(fanoutExchange);
                rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(fanoutExchange));
            }else{
                TopicExchange topicExchange = new TopicExchange(exName);
                rabbitAdmin.declareQueue(queue);
                rabbitAdmin.declareExchange(topicExchange);
                rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(topicExchange).with(rk));
            }
        }
    }

暴露接口:

    @GetMapping("/add-listener/{orgId}/{semType}/{oldOrg}")
    public ComResponse addListener(@PathVariable("orgId") String orgId,@PathVariable("semType") String semType,@PathVariable("oldOrg") String oldOrg){
        mqService.updateListener(orgId,semType,oldOrg);
        return ComResponse.successResponse();
    }

注意:执行顺序,创建新队列,删除监听,添加监听

推送端

//添加新得监听
        String requestUrl = consumerUrl+"/"+newOrg+"/"+semType+"/"+oldOrgId;
        String result = restTemplateService.getWithNoParams(requestUrl,String.class);
        log.info("请求结束:{}",result);
        if(!"NO".equals(oldOrgId)) {
            String firstQueueName = queue + "." + oldOrgId + "." + semType + "." + 1;
            String secondQueueName = queue + "." + oldOrgId + "." + semType + "." + 2;
            mqService.deleteMqQueue(firstQueueName);
            mqService.deleteMqQueue(secondQueueName);
            log.info("删除队列结束");
        }
        //新增新的的队列
        String newFirstQueueName = queue+"."+newOrg+"."+semType+"."+1;
        String newFirstRk = routingKey+"."+newOrg+"."+semType+"."+1;
        String newSecondQueueName = queue+"."+newOrg+"."+semType+"."+2;
        String newSecondRk =  routingKey+"."+newOrg+"."+semType+"."+2;
        mqService.createMqQueue(newFirstQueueName,topicExchange,newFirstRk, BuiltinExchangeType.TOPIC.getType());
        mqService.createMqQueue(newSecondQueueName,topicExchange,newSecondRk, BuiltinExchangeType.TOPIC.getType());
        log.info("添加队列结束");

注意:执行顺序:变更监听,删除队列,添加新得队列

到这里基本上就实现了动态创建队列和动态监听。大家如果有什么不太明白得可以留言,抽时间整理得,所以写得比较草,大家凑合着看把。

上一篇:RabbitMQ消息中间件技术精讲(一)


下一篇:第06章 TF-A初使用