同步锁的一个应用

1. 问题

在进行MQ编程的过程中,发现当关闭多个broker的时候,总是抛出并发修改异常:

java.util.ConcurrentModificationException
    at java.util.HashMap$HashIterator.nextEntry(HashMap.java:793)
    at java.util.HashMap$KeyIterator.next(HashMap.java:828)

关闭Broker的代码如下:

	public static void stopAllMessageBus() {
		try {
			Map brokers = BrokerRegistry.getInstance().getBrokers();
			
			
				Iterator iter1 = brokers.keySet().iterator();
				while (iter1.hasNext()) {
					String broker = (String) iter1.next();
					BrokerService bs = (BrokerService)brokers.get(broker);
					bs.stop();
				}	
			
	
		} catch (Exception e) {
			log.error("stopAllMessageBus failed.", e);
		}
		
	}

该代码思路为:首先获取所有注册的Broker,这些Brokers保存在一个HashMap中,当一个servlet监听器监听到所在WEB项目destory时,停止所有的ActiveMQ broker。
public class MessageBusLoader implements ServletContextListener {
	Logger log = Logger.getLogger(MessageBusLoader.class);
	
	/**
	 * WebApp停止时关闭MQ
	 */
	public void contextDestroyed(ServletContextEvent arg0) {
		MessageBusRegistryManager.stopAllMessageBus();
	}

	/**
	 * WebApp启动时加载MQ
	 */
	public void contextInitialized(ServletContextEvent event) {
		try {
			String webroot = event.getServletContext().getRealPath("/");
			MessageBusRegistryManager.startAllMessageBus(webroot);
		} catch (Exception e) {
			log.error("MessageBusLoader contextInitialized error.", e);
		}
	}
}


然而,上述代码在执行的过程中却出现并发异常,原因应该是在WEB项目DESTORY时,调用Broker的停止方法导致HashMap中的对象减少(ActiveMQ的broker实例销毁时unbind BrokerRegistry中的HashMap),同时WEB项目销毁时也可能会关闭Broker,意即其他的代码在修改BrokerRegistry中的HashMap,这时候上述代码去读该HashMap时,HashMap中的对象已经不在了,产生了不一致性,因而引发并发修改异常。

2.解决办法

在BrokerRegistry中,对HashMap进行处理时,使用了互斥对象,而且该互斥对象能被外界读取,因而使用该互斥对象,加上使用同步关键字,即可解决该并发异常的问题。

代码如下:

	public static void stopAllMessageBus() {
		try {
			Map brokers = BrokerRegistry.getInstance().getBrokers();
			
			synchronized(BrokerRegistry.getInstance().getRegistryMutext()){
				Iterator iter1 = brokers.keySet().iterator();
				while (iter1.hasNext()) {
					String broker = (String) iter1.next();
					BrokerService bs = (BrokerService)brokers.get(broker);
					bs.stop();
				}	
			}
	
		} catch (Exception e) {
			log.error("stopAllMessageBus failed.", e);
		}
		
	}

类BrokerRegistry.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.broker;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 
 */
public class BrokerRegistry {

    private static final Logger LOG = LoggerFactory.getLogger(BrokerRegistry.class);
    private static final BrokerRegistry INSTANCE = new BrokerRegistry();

    private final Object mutex = new Object();
    private final Map<String, BrokerService> brokers = new HashMap<String, BrokerService>();

    public static BrokerRegistry getInstance() {
        return INSTANCE;
    }

    /**
     * @param brokerName
     * @return the BrokerService
     */
    public BrokerService lookup(String brokerName) {
        BrokerService result = null;
        synchronized (mutex) {
            result = brokers.get(brokerName);
            if (result == null && brokerName != null && brokerName.equals(BrokerService.DEFAULT_BROKER_NAME)) {
                result = findFirst();
                if (result != null) {
                    LOG.warn("Broker localhost not started so using " + result.getBrokerName() + " instead");
                }
            }
        }
        return result;
    }

    /**
     * Returns the first registered broker found
     * 
     * @return the first BrokerService
     */
    public BrokerService findFirst() {
        synchronized (mutex) {
            Iterator<BrokerService> iter = brokers.values().iterator();
            while (iter.hasNext()) {
                return iter.next();
            }
            return null;
        }
    }

    /**
     * @param brokerName
     * @param broker
     */
    public void bind(String brokerName, BrokerService broker) {
        synchronized (mutex) {
            brokers.put(brokerName, broker);
            mutex.notifyAll();
        }
    }

    /**
     * @param brokerName
     */
    public void unbind(String brokerName) {
        synchronized (mutex) {
            brokers.remove(brokerName);
        }
    }

    /**
     * @return the mutex used
     */
    public Object getRegistryMutext() {
        return mutex;
    }
    
    public Map<String, BrokerService> getBrokers() {
        return Collections.unmodifiableMap(this.brokers);
    }
}


同步锁的一个应用

上一篇:关于CCI项目中缺少libz3.dll、Microsoft.Z3.dll、Microsoft.Z3V3.dll的解决办法


下一篇:简单的并发编程中犯2的一个小例子--CAS使用时一定要考虑下是否有必要做轮询