1. 在网上看了一些解决这个问题的办法,大部分朋友都说是要在实例化 DefaultMQProducer 的时候指定惟一的 instanceName 来解决,窃以为这样虽然解决了问题,但却是不应该用的解决办法。为什么这样说?因为官网介绍客户端公共参数的时候对这个 instanceName 有明确的说明
instanceName | DEFAULT | 客户端实例名称,客户端创建的多个 Producer、Consumer 实际是共用一个内部实例(这个实例包含网络连接、线程资源等) |
所以,这个 instanceName 所标识的实例会同时创建自己的网络连接,线程资源,如果每次创建一个 Producer 都指定不同的 instanceName 这样就会 浪费 更多资源 比如内存和线程,网络 IO。还会降低消息处理的效率。按照说明,应该是尽可能多个 Producer 共用一个 instanceName 才合理。
2. 另外,题目上的报错,是因为 group 已被创建,为什么要用指定不同且唯一的 instanceName 来解决呢?不能因为这样能解决就这样解决。实际上,如果用 DefaultMQProducer 来实例 producer 则会把创建好的 producer 先放到一个 producerTable
ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String,MQProducerInner>();
中,代码中的方法是
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
这个方法里 关键地方是
MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
在添加的时候如果发以 group 为键的 producer 已存在,则注册失败。这里的键是 group 所以,当我们已经创建了同 group 的 producer 时,如果这个 producer 没有 shutdown,则再次以同样的 group 创建 producer 的时候就会报题目中的错误。
而 shutdown 之后之所以不报错是因为,shutdown 这个方法本身调用 的是 unregisterProducer (String group) 在类 MQClientInstance 中。这个方法是包含从 producerTable 中把已添加的 producer 先移除,然后再 shutdown 的。具体代码是下面这样的
this.mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup());
this.defaultAsyncSenderExecutor.shutdown();
所以先 调用 DefaultMQProducer shutdown 之后再创建新的同 group 的 producer 是不会报错的。
3. 我们再看为什么每次用 DefaultMQProducer 来创建 producer 的时候如果 都设置不同的 instanceName 为什么也不会报错呢?这是因为如果设置的 instanceName 是唯一的。则在注册 producer 之前,如果设置的 group 不是默认的,则每次 获取的 mQClientFactory 都是不同的,而 producerTable 是 mQClientFactory 类里的一个属性,这样当然 producerTable 也是不同的,这样注册 producer 当然是注册到不同的 producerTable 中去了,所以不会报错。
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
但是,这种解决办法是不可取的。因为 instanceName 是一个比较重(隔离数据多,创建耗时长,消费资源多)的参数。
4. 那么我们怎么更好的解决这个问题呢?我们可以参考源码中 logappender 模块中 的 ProducerInstance 类来实现。这个类在源码中位于 org.apache.rocketmq.logappender.common 下面
下面是这个类的源码
public class ProducerInstance { public static final String APPENDER_TYPE = "APPENDER_TYPE"; public static final String LOG4J_APPENDER = "LOG4J_APPENDER"; public static final String LOG4J2_APPENDER = "LOG4J2_APPENDER"; public static final String LOGBACK_APPENDER = "LOGBACK_APPENDER"; public static final String DEFAULT_GROUP = "rocketmq_appender"; private ConcurrentHashMap<String, MQProducer> producerMap = new ConcurrentHashMap<String, MQProducer>(); private static ProducerInstance instance = new ProducerInstance(); public static ProducerInstance getProducerInstance() { return instance; } /** 根据 nameServerAddress 和 group 生成 producer 在 producerMap 中的键 **/ private String genKey(String nameServerAddress, String group) { return nameServerAddress + "_" + group; } /** 根据 nameServerAddress 和 group 获取已注册到producerMap中的producer,如果不存在,则调用 DefaultMQProducer 生成新的producer注册并返回 **/ public MQProducer getInstance(String nameServerAddress, String group) throws MQClientException { if (StringUtils.isBlank(group)) { group = DEFAULT_GROUP; } String genKey = genKey(nameServerAddress, group); MQProducer p = getProducerInstance().producerMap.get(genKey); if (p != null) { return p; } DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group); defaultMQProducer.setNamesrvAddr(nameServerAddress); MQProducer beforeProducer = null; beforeProducer = getProducerInstance().producerMap.putIfAbsent(genKey, defaultMQProducer); if (beforeProducer != null) { return beforeProducer; } defaultMQProducer.start(); return defaultMQProducer; } /** 根据 nameServerAddress 和 group 移除已注册到producerMap中的producer,同时shutdown **/ public void removeAndClose(String nameServerAddress, String group) { if (group == null) { group = DEFAULT_GROUP; } String genKey = genKey(nameServerAddress, group); MQProducer producer = getProducerInstance().producerMap.remove(genKey); if (producer != null) { producer.shutdown(); } } /** 移除 producerMap 中所有的 producer 并全部关闭。 **/ public void closeAll() { Set<Map.Entry<String, MQProducer>> entries = getProducerInstance().producerMap.entrySet(); for (Map.Entry<String, MQProducer> entry : entries) { getProducerInstance().producerMap.remove(entry.getKey()); entry.getValue().shutdown(); } } }
可以把这个类直接复制到要使用的项目中,然后在要使用指定 nameServerAddress 和 group 的 producer 时,直接用下面的方法获取一个。
MQProducer producer = ProducerInstance.getProducerInstance().getInstance("localhost:9876", "test-group"); /* 自己生成message消息,然后下面发送 */ producer.send(message); /* 如果是比较频繁使用的producer,发送完消息后不用关闭和移除下次再用的时候可以直接再获取拿来就可以发送消息。 对于确定要隔比较长时间不用的producer,可以用下面的方法 移除并关闭 */ ProducerInstance.getProducerInstance().removeAndClose("localhost:9876", "test-group");
我们会发现,这个类获取 producer 实例的时候只用了 nameServerAddress 和 group 这两个参数。如果我们确实需要操作不同的 instanceName 下的 producer 时,该怎么办呢?直接改造 这个类里的方法,添加上
instanceName 参数 即可。
加参数后的类如下,使用方式没什么差别只是多了个参数而已。
public class ProducerInstance { public static final String APPENDER_TYPE = "APPENDER_TYPE"; public static final String LOG4J_APPENDER = "LOG4J_APPENDER"; public static final String LOG4J2_APPENDER = "LOG4J2_APPENDER"; public static final String LOGBACK_APPENDER = "LOGBACK_APPENDER"; public static final String DEFAULT_GROUP = "rocketmq_appender"; private ConcurrentHashMap<String, MQProducer> producerMap = new ConcurrentHashMap<String, MQProducer>(); private static ProducerInstance instance = new ProducerInstance(); public static ProducerInstance getProducerInstance() { return instance; } private String genKey(String nameServerAddress, String group,String instanceName) { return nameServerAddress + "_" + group + "_" + instanceName; } public MQProducer getInstance(String nameServerAddress, String group,String instanceName) throws MQClientException { if (StringUtils.isBlank(group)) { group = DEFAULT_GROUP; } if (StringUtils.isBlank(instanceName)) { instanceName = "DEFAULT"; } String genKey = genKey(nameServerAddress, group, instanceName); MQProducer p = getProducerInstance().producerMap.get(genKey); if (p != null) { return p; } DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group); defaultMQProducer.setNamesrvAddr(nameServerAddress); defaultMQProducer.setInstanceName(instanceName); MQProducer beforeProducer = null; beforeProducer = getProducerInstance().producerMap.putIfAbsent(genKey, defaultMQProducer); if (beforeProducer != null) { return beforeProducer; } defaultMQProducer.start(); return defaultMQProducer; } public void removeAndClose(String nameServerAddress, String group, String instanceName) { if (group == null) { group = DEFAULT_GROUP; } if (StringUtils.isBlank(instanceName)) { instanceName = "DEFAULT"; } String genKey = genKey(nameServerAddress, group,instanceName); MQProducer producer = getProducerInstance().producerMap.remove(genKey); if (producer != null) { producer.shutdown(); } } public void closeAll() { Set<Map.Entry<String, MQProducer>> entries = getProducerInstance().producerMap.entrySet(); for (Map.Entry<String, MQProducer> entry : entries) { getProducerInstance().producerMap.remove(entry.getKey()); entry.getValue().shutdown(); } } }