1、配置rabbitmq
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd" > <description>rabbitmq 连接服务配置</description> <!-- 连接配置 publisherConfirms 发布的消息是否确定--> <bean id="rabbitMqConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <property name="addresses" value="${rabbitmq.addresses}"/> <property name="username" value="${rabbitmq.username}"/> <property name="password" value="${rabbitmq.password}"/> <property name="virtualHost" value="${rabbitmq.vhost}"/> <property name="channelCacheSize" value="${rabbitmq.channelCacheSize}"/> </bean> <!-- spring template声明 --> <rabbit:template id="amqpTemplate" connection-factory="rabbitMqConnectionFactory" /> </beans>
2、工具类实现
/** * 监控rabbitmq的工具类 */ @Slf4j public class RabbitMqUtil{ private static RabbitTemplate amqpTemplate; /** * 初始化amqpTemplate */ static { amqpTemplate = SpringContextUtil.getBean(RabbitTemplate.class); Assert.notNull(amqpTemplate,"获取不到amqpTemplate"); } /** * 查询队列中的消息数量 * @param exchange 交换机名字 * @param exchangeType 交换机类型 fanout 或 direct * @param quene 队列名字 * @return */ public static BaseResultDTO getMessageCount(String exchange, String exchangeType, String quene) { Assert.hasText(exchange,"exchange不能为空"); Assert.hasText(quene,"队列名不能为空"); Assert.hasText(exchangeType,"exchangeType不能为空"); checkInitSuccess(); BaseResultDTO baseResultDTO = new BaseResultDTO(); ConnectionFactory connectionFactory = amqpTemplate.getConnectionFactory(); // 创建连接 Connection connection = connectionFactory.createConnection(); // 创建通道 Channel channel = connection.createChannel(false); // 设置消息交换机 try { channel.exchangeDeclare(exchange, exchangeType, true, false, null); AMQP.Queue.DeclareOk declareOk = channel.queueDeclarePassive(quene); //获取队列中的消息个数 Integer queueCount = declareOk.getMessageCount(); // 关闭通道和连接 channel.close(); connection.close(); baseResultDTO.setSuccess(true); baseResultDTO.setMessage(queueCount.toString()); } catch (IOException e) { LoggerFormatUtil.warn(e,log,"连接rabbitmq异常"); } return baseResultDTO; } /** * 校验amqpTemplate是否已经实例化 * @return */ private static void checkInitSuccess(){ if (null == amqpTemplate){ amqpTemplate = SpringContextUtil.getBean(RabbitTemplate.class); } Assert.notNull(amqpTemplate, "amqpTemplate 实例化异常"); } }
3、spring辅助工具类获取bean
@Component public class SpringContextUtil implements ApplicationContextAware { public static ApplicationContext applicationContext = null; public SpringContextUtil() { } @SuppressWarnings("unchecked") public static <T> T getBean(String beanName, Class<T> beanType) { Assert.isTrue(applicationContext != null, "应用上下文不能为空"); Object bean = applicationContext.getBean(beanName); return bean == null ? null : (T)bean; } /** * 按类型获取bean * @param beanType * @param <T> * @return */ @SuppressWarnings("unchecked") public static <T> T getBean(Class<T> beanType){ Assert.isTrue(applicationContext != null, "应用上下文不能为空"); T bean = applicationContext.getBean(beanType); return bean == null ? null : bean; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SpringContextUtil.applicationContext = applicationContext; } }