rabbitmq查看队列中消息数量的方法

1、配置rabbitmq

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit.xsd" >

    <description>rabbitmq 连接服务配置</description>

    <!-- 连接配置 publisherConfirms 发布的消息是否确定-->
    <bean id="rabbitMqConnectionFactory"
          class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <property name="addresses" value="${rabbitmq.addresses}"/>
        <property name="username" value="${rabbitmq.username}"/>
        <property name="password" value="${rabbitmq.password}"/>
        <property name="virtualHost" value="${rabbitmq.vhost}"/>
        <property name="channelCacheSize" value="${rabbitmq.channelCacheSize}"/>
    </bean>


    <!-- spring template声明 -->
    <rabbit:template id="amqpTemplate" connection-factory="rabbitMqConnectionFactory"
    />


</beans>

2、工具类实现

/**
 * 监控rabbitmq的工具类
 */
@Slf4j
public class RabbitMqUtil{
    private static RabbitTemplate amqpTemplate;

    /**
     * 初始化amqpTemplate
     */
    static {
        amqpTemplate = SpringContextUtil.getBean(RabbitTemplate.class);
        Assert.notNull(amqpTemplate,"获取不到amqpTemplate");
    }

    /**
     * 查询队列中的消息数量
     * @param exchange 交换机名字
     * @param exchangeType 交换机类型 fanout 或 direct
     * @param quene 队列名字
     * @return
     */
    public static BaseResultDTO getMessageCount(String exchange, String exchangeType, String quene) {
        Assert.hasText(exchange,"exchange不能为空");
        Assert.hasText(quene,"队列名不能为空");
        Assert.hasText(exchangeType,"exchangeType不能为空");
        checkInitSuccess();


        BaseResultDTO baseResultDTO = new BaseResultDTO();
        ConnectionFactory connectionFactory = amqpTemplate.getConnectionFactory();
        // 创建连接
        Connection connection = connectionFactory.createConnection();
        // 创建通道
        Channel channel = connection.createChannel(false);
        // 设置消息交换机
        try {
            channel.exchangeDeclare(exchange, exchangeType, true, false, null);
            AMQP.Queue.DeclareOk declareOk = channel.queueDeclarePassive(quene);
            //获取队列中的消息个数
            Integer queueCount = declareOk.getMessageCount();
            // 关闭通道和连接
            channel.close();
            connection.close();

            baseResultDTO.setSuccess(true);
            baseResultDTO.setMessage(queueCount.toString());
        } catch (IOException e) {
            LoggerFormatUtil.warn(e,log,"连接rabbitmq异常");
        }


        return baseResultDTO;
    }


    /**
     * 校验amqpTemplate是否已经实例化
     * @return
     */
    private static void checkInitSuccess(){
        if (null == amqpTemplate){
            amqpTemplate = SpringContextUtil.getBean(RabbitTemplate.class);
        }
        Assert.notNull(amqpTemplate, "amqpTemplate 实例化异常");
    }
}

 


3、spring辅助工具类获取bean

@Component
public class SpringContextUtil implements ApplicationContextAware {
    public static ApplicationContext applicationContext = null;

    public SpringContextUtil() {
    }

    @SuppressWarnings("unchecked")
    public static <T> T getBean(String beanName, Class<T> beanType) {
        Assert.isTrue(applicationContext != null, "应用上下文不能为空");
        Object bean = applicationContext.getBean(beanName);
        return bean == null ? null : (T)bean;
    }

    /**
     * 按类型获取bean
     * @param beanType
     * @param <T>
     * @return
     */
    @SuppressWarnings("unchecked")
    public static <T> T getBean(Class<T> beanType){
        Assert.isTrue(applicationContext != null, "应用上下文不能为空");
        T bean = applicationContext.getBean(beanType);
        return bean == null ? null : bean;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        SpringContextUtil.applicationContext = applicationContext;
    }
}

 

上一篇:Java多线程机制中的各种锁问题


下一篇:4.Spring相关API