1. 问题
在进行MQ编程的过程中,发现当关闭多个broker的时候,总是抛出并发修改异常:
java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextEntry(HashMap.java:793)
at java.util.HashMap$KeyIterator.next(HashMap.java:828)
关闭Broker的代码如下:
public static void stopAllMessageBus() { try { Map brokers = BrokerRegistry.getInstance().getBrokers(); Iterator iter1 = brokers.keySet().iterator(); while (iter1.hasNext()) { String broker = (String) iter1.next(); BrokerService bs = (BrokerService)brokers.get(broker); bs.stop(); } } catch (Exception e) { log.error("stopAllMessageBus failed.", e); } }
该代码思路为:首先获取所有注册的Broker,这些Brokers保存在一个HashMap中,当一个servlet监听器监听到所在WEB项目destory时,停止所有的ActiveMQ broker。
public class MessageBusLoader implements ServletContextListener { Logger log = Logger.getLogger(MessageBusLoader.class); /** * WebApp停止时关闭MQ */ public void contextDestroyed(ServletContextEvent arg0) { MessageBusRegistryManager.stopAllMessageBus(); } /** * WebApp启动时加载MQ */ public void contextInitialized(ServletContextEvent event) { try { String webroot = event.getServletContext().getRealPath("/"); MessageBusRegistryManager.startAllMessageBus(webroot); } catch (Exception e) { log.error("MessageBusLoader contextInitialized error.", e); } } }
然而,上述代码在执行的过程中却出现并发异常,原因应该是在WEB项目DESTORY时,调用Broker的停止方法导致HashMap中的对象减少(ActiveMQ的broker实例销毁时unbind BrokerRegistry中的HashMap),同时WEB项目销毁时也可能会关闭Broker,意即其他的代码在修改BrokerRegistry中的HashMap,这时候上述代码去读该HashMap时,HashMap中的对象已经不在了,产生了不一致性,因而引发并发修改异常。
2.解决办法
在BrokerRegistry中,对HashMap进行处理时,使用了互斥对象,而且该互斥对象能被外界读取,因而使用该互斥对象,加上使用同步关键字,即可解决该并发异常的问题。
代码如下:
public static void stopAllMessageBus() { try { Map brokers = BrokerRegistry.getInstance().getBrokers(); synchronized(BrokerRegistry.getInstance().getRegistryMutext()){ Iterator iter1 = brokers.keySet().iterator(); while (iter1.hasNext()) { String broker = (String) iter1.next(); BrokerService bs = (BrokerService)brokers.get(broker); bs.stop(); } } } catch (Exception e) { log.error("stopAllMessageBus failed.", e); } }
类BrokerRegistry.java
/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.broker; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * */ public class BrokerRegistry { private static final Logger LOG = LoggerFactory.getLogger(BrokerRegistry.class); private static final BrokerRegistry INSTANCE = new BrokerRegistry(); private final Object mutex = new Object(); private final Map<String, BrokerService> brokers = new HashMap<String, BrokerService>(); public static BrokerRegistry getInstance() { return INSTANCE; } /** * @param brokerName * @return the BrokerService */ public BrokerService lookup(String brokerName) { BrokerService result = null; synchronized (mutex) { result = brokers.get(brokerName); if (result == null && brokerName != null && brokerName.equals(BrokerService.DEFAULT_BROKER_NAME)) { result = findFirst(); if (result != null) { LOG.warn("Broker localhost not started so using " + result.getBrokerName() + " instead"); } } } return result; } /** * Returns the first registered broker found * * @return the first BrokerService */ public BrokerService findFirst() { synchronized (mutex) { Iterator<BrokerService> iter = brokers.values().iterator(); while (iter.hasNext()) { return iter.next(); } return null; } } /** * @param brokerName * @param broker */ public void bind(String brokerName, BrokerService broker) { synchronized (mutex) { brokers.put(brokerName, broker); mutex.notifyAll(); } } /** * @param brokerName */ public void unbind(String brokerName) { synchronized (mutex) { brokers.remove(brokerName); } } /** * @return the mutex used */ public Object getRegistryMutext() { return mutex; } public Map<String, BrokerService> getBrokers() { return Collections.unmodifiableMap(this.brokers); } }