最近要使用alibaba的rocket mq(我们公司对其进行了封装,使其运行在dotNet平台上,Java还是和原生的差不多,涉及公司的内容本文不会提及),其中 在生产者组这一块,建议是用单例模式的。但是其中又建议一个组(group)使用一个实例,这样仅仅单例模式就不行了,所以要进行改动,我们的目标就是“一个group使用一个单例”。
其实简单点,多封装几个不同的单例类就行了,一个组用一个类。但是这显然不是一个好主意,于是我们来考虑用另一种方式。
首先要将 group 这个概念抽出来,它是变量,接下来封装不变的代码。
我们先看看代码是什么样的:
/**
* TurboMQ 消息生产者管理器
*/
public class MqProducer { private DefaultMQProducer currentMQProducer; private static Map<String, MqProducer> producerMap = new ConcurrentHashMap<>(3);
private static final Object lock = new Object(); private MqProducer(String group) throws MQClientException {
if (!Validator.isNotNullAndVisible(group)) {
throw new NullPointerException("Group名称不能为空!");
} currentMQProducer = new DefaultMQProducer(group);
currentMQProducer.setNamesrvAddr(“1.1.1.1”);
currentMQProducer.start();
} public static MqProducer instance(String group) throws MQClientException {
if (!Validator.isNotNullAndVisible(group)) {
throw new NullPointerException("Group名称不能为空!");
} if (producerMap.get(group) == null) {
synchronized (lock) {
if (producerMap.get(group) == null) {
producerMap.put(group, new MqProducer(group));
}
}
}
return producerMap.get(group);
} public SendResult send(String topic, String tag, String body) throws UnsupportedEncodingException, InterruptedException, RemotingException, MQClientException, MQBrokerException {
if (!Validator.isNotNullAndVisible(topic, tag, body)) {
throw new NullPointerException("请检查参数是否为空,topic,tag,body");
}
Message message = new Message(topic, tag, body.getBytes("UTF-8"));
return currentMQProducer.send(message);
} public static void shutdownAll() {
producerMap.forEach((key, value) -> {
value.shutdown();
});
} public void shutdown() {
currentMQProducer.shutdown();
} }
我们的解决思路,就是使用 Map 让 group 和实例一一对应起来。
这些代码中你可能需要注意的点是:
1 线程安全的 ConcurrentHashMap 以及要设置初始容量
private static Map<String, MqProducer> producerMap = new ConcurrentHashMap<>(3);
2 instance方法中的两层 if 判断
在 synchronized(lock)锁住之前可能有多个线程了解到当前组是null,都去请求锁,当第一个线程new了新生产者之后,下一个进程进来就不会再new一个新的生产者了。
public static MqProducer instance(String group) throws MQClientException {
if (producerMap.get(group) == null) {
synchronized (lock) {
if (producerMap.get(group) == null) {
producerMap.put(group, new MqProducer(group));
}
}
}
return producerMap.get(group);
}
题外话:
为什么要抛异常?
因为此处是通用代码,通用代码不应处理业务逻辑,而且不该隐蔽错误的发生,要让业务逻辑去确保参数没问题。