最近要使用alibaba的rocket mq(我们公司对其进行了封装,使其运行在dotNet平台上,Java还是和原生的差不多,涉及公司的内容本文不会提及),其中 在生产者组这一块,建议是用单例模式的。但是其中又建议一个组(group)使用一个实例,这样仅仅单例模式就不行了,所以要进行改动,我们的目标就是“一个group使用一个单例”。
其实简单点,多封装几个不同的单例类就行了,一个组用一个类。但是这显然不是一个好主意,于是我们来考虑用另一种方式。
首先要将 group 这个概念抽出来,它是变量,接下来封装不变的代码。
我们先看看代码是什么样的:
/** * TurboMQ 消息生产者管理器 */ public class MqProducer { private DefaultMQProducer currentMQProducer; private static Map<String, MqProducer> producerMap = new ConcurrentHashMap<>(3); private static final Object lock = new Object(); private MqProducer(String group) throws MQClientException { if (!Validator.isNotNullAndVisible(group)) { throw new NullPointerException("Group名称不能为空!"); } currentMQProducer = new DefaultMQProducer(group); currentMQProducer.setNamesrvAddr(“1.1.1.1”); currentMQProducer.start(); } public static MqProducer instance(String group) throws MQClientException { if (!Validator.isNotNullAndVisible(group)) { throw new NullPointerException("Group名称不能为空!"); } if (producerMap.get(group) == null) { synchronized (lock) { if (producerMap.get(group) == null) { producerMap.put(group, new MqProducer(group)); } } } return producerMap.get(group); } public SendResult send(String topic, String tag, String body) throws UnsupportedEncodingException, InterruptedException, RemotingException, MQClientException, MQBrokerException { if (!Validator.isNotNullAndVisible(topic, tag, body)) { throw new NullPointerException("请检查参数是否为空,topic,tag,body"); } Message message = new Message(topic, tag, body.getBytes("UTF-8")); return currentMQProducer.send(message); } public static void shutdownAll() { producerMap.forEach((key, value) -> { value.shutdown(); }); } public void shutdown() { currentMQProducer.shutdown(); } }
我们的解决思路,就是使用 Map 让 group 和实例一一对应起来。
这些代码中你可能需要注意的点是:
1 线程安全的 ConcurrentHashMap 以及要设置初始容量
private static Map<String, MqProducer> producerMap = new ConcurrentHashMap<>(3);
2 instance方法中的两层 if 判断
在 synchronized(lock)锁住之前可能有多个线程了解到当前组是null,都去请求锁,当第一个线程new了新生产者之后,下一个进程进来就不会再new一个新的生产者了。
public static MqProducer instance(String group) throws MQClientException { if (producerMap.get(group) == null) { synchronized (lock) { if (producerMap.get(group) == null) { producerMap.put(group, new MqProducer(group)); } } } return producerMap.get(group); }
题外话:
为什么要抛异常?
因为此处是通用代码,通用代码不应处理业务逻辑,而且不该隐蔽错误的发生,要让业务逻辑去确保参数没问题。