关于 RocketMQ:The producer group has been created before, specify another name please. 这个报错的解决办法

简介: he producer group has been created before, specify another name please

1. 在网上看了一些解决这个问题的办法,大部分朋友都说是要在实例化 DefaultMQProducer 的时候指定惟一的 instanceName 来解决,窃以为这样虽然解决了问题,但却是不应该用的解决办法。为什么这样说?因为官网介绍客户端公共参数的时候对这个 instanceName 有明确的说明

instanceName DEFAULT

客户端实例名称,客户端创建的多个 Producer、Consumer 实际是共用一个内部实例(这个实例包含网络连接、线程资源等)

所以,这个 instanceName 所标识的实例会同时创建自己的网络连接,线程资源,如果每次创建一个 Producer 都指定不同的 instanceName 这样就会 浪费 更多资源 比如内存和线程,网络 IO。还会降低消息处理的效率。按照说明,应该是尽可能多个 Producer 共用一个 instanceName 才合理。

2. 另外,题目上的报错,是因为 group 已被创建,为什么要用指定不同且唯一的 instanceName 来解决呢?不能因为这样能解决就这样解决。实际上,如果用  DefaultMQProducer 来实例 producer 则会把创建好的 producer 先放到一个  producerTable

ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String,MQProducerInner>();

中,代码中的方法是

boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);

这个方法里 关键地方是

MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);

在添加的时候如果发以 group 为键的 producer 已存在,则注册失败。这里的键是 group 所以,当我们已经创建了同 group 的 producer 时,如果这个 producer 没有 shutdown,则再次以同样的 group 创建 producer 的时候就会报题目中的错误。

而 shutdown 之后之所以不报错是因为,shutdown 这个方法本身调用 的是  unregisterProducer (String group) 在类 MQClientInstance 中。这个方法是包含从 producerTable 中把已添加的 producer 先移除,然后再 shutdown 的。具体代码是下面这样的

 


  1. this.mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup());

  2. this.defaultAsyncSenderExecutor.shutdown();

所以先 调用 DefaultMQProducer shutdown 之后再创建新的同 group 的 producer 是不会报错的。

3. 我们再看为什么每次用   DefaultMQProducer 来创建 producer 的时候如果 都设置不同的 instanceName 为什么也不会报错呢?这是因为如果设置的 instanceName 是唯一的。则在注册 producer 之前,如果设置的 group 不是默认的,则每次 获取的 mQClientFactory 都是不同的,而 producerTable 是 mQClientFactory 类里的一个属性,这样当然 producerTable 也是不同的,这样注册 producer 当然是注册到不同的 producerTable 中去了,所以不会报错。

if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {

   this.defaultMQProducer.changeInstanceNameToPID();

}

 

this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

但是,这种解决办法是不可取的。因为 instanceName 是一个比较重(隔离数据多,创建耗时长,消费资源多)的参数。

4. 那么我们怎么更好的解决这个问题呢?我们可以参考源码中 logappender 模块中 的 ProducerInstance 类来实现。这个类在源码中位于 org.apache.rocketmq.logappender.common 下面

下面是这个类的源码

public class ProducerInstance {
    public static final String APPENDER_TYPE = "APPENDER_TYPE";
    public static final String LOG4J_APPENDER = "LOG4J_APPENDER";
    public static final String LOG4J2_APPENDER = "LOG4J2_APPENDER";
    public static final String LOGBACK_APPENDER = "LOGBACK_APPENDER";
    public static final String DEFAULT_GROUP = "rocketmq_appender";
    private ConcurrentHashMap<String, MQProducer> producerMap = new ConcurrentHashMap<String, MQProducer>();
    private static ProducerInstance instance = new ProducerInstance();
    public static ProducerInstance getProducerInstance() {
        return instance;
    }
    /**
   根据 nameServerAddress 和 group 生成 producer 在 producerMap 中的键
    **/ 
    private String genKey(String nameServerAddress, String group) {
        return nameServerAddress + "_" + group;
    }
/**
根据 nameServerAddress 和 group 获取已注册到producerMap中的producer,如果不存在,则调用 DefaultMQProducer 生成新的producer注册并返回
**/ 
    public MQProducer getInstance(String nameServerAddress, String group) throws MQClientException {
        if (StringUtils.isBlank(group)) {
            group = DEFAULT_GROUP;
        }
        String genKey = genKey(nameServerAddress, group);
        MQProducer p = getProducerInstance().producerMap.get(genKey);
        if (p != null) {
            return p;
        }
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group);
        defaultMQProducer.setNamesrvAddr(nameServerAddress);
        MQProducer beforeProducer = null;
        beforeProducer = getProducerInstance().producerMap.putIfAbsent(genKey, defaultMQProducer);
        if (beforeProducer != null) {
            return beforeProducer;
        }
        defaultMQProducer.start();
        return defaultMQProducer;
    }
/**
根据 nameServerAddress 和 group 移除已注册到producerMap中的producer,同时shutdown
**/ 
    public void removeAndClose(String nameServerAddress, String group) {
        if (group == null) {
            group = DEFAULT_GROUP;
        }
        String genKey = genKey(nameServerAddress, group);
        MQProducer producer = getProducerInstance().producerMap.remove(genKey);
        if (producer != null) {
            producer.shutdown();
        }
    }
/**
移除 producerMap 中所有的 producer 并全部关闭。
**/
    public void closeAll() {
        Set<Map.Entry<String, MQProducer>> entries = getProducerInstance().producerMap.entrySet();
        for (Map.Entry<String, MQProducer> entry : entries) {
            getProducerInstance().producerMap.remove(entry.getKey());
            entry.getValue().shutdown();
        }
    }
}

可以把这个类直接复制到要使用的项目中,然后在要使用指定 nameServerAddress 和 group 的 producer 时,直接用下面的方法获取一个。

MQProducer producer = ProducerInstance.getProducerInstance().getInstance("localhost:9876", "test-group");
/*
自己生成message消息,然后下面发送
*/
producer.send(message);
/*
如果是比较频繁使用的producer,发送完消息后不用关闭和移除下次再用的时候可以直接再获取拿来就可以发送消息。
对于确定要隔比较长时间不用的producer,可以用下面的方法 移除并关闭
*/
ProducerInstance.getProducerInstance().removeAndClose("localhost:9876", "test-group");

我们会发现,这个类获取 producer 实例的时候只用了 nameServerAddress 和 group 这两个参数。如果我们确实需要操作不同的 instanceName 下的 producer 时,该怎么办呢?直接改造 这个类里的方法,添加上

instanceName 参数 即可。

加参数后的类如下,使用方式没什么差别只是多了个参数而已。

public class ProducerInstance {
    public static final String APPENDER_TYPE = "APPENDER_TYPE";
    public static final String LOG4J_APPENDER = "LOG4J_APPENDER";
    public static final String LOG4J2_APPENDER = "LOG4J2_APPENDER";
    public static final String LOGBACK_APPENDER = "LOGBACK_APPENDER";
    public static final String DEFAULT_GROUP = "rocketmq_appender";
    private ConcurrentHashMap<String, MQProducer> producerMap = new ConcurrentHashMap<String, MQProducer>();
    private static ProducerInstance instance = new ProducerInstance();
    public static ProducerInstance getProducerInstance() {
        return instance;
    }
    private String genKey(String nameServerAddress, String group,String instanceName) {
        return nameServerAddress + "_" + group + "_" + instanceName;
    }
    public MQProducer getInstance(String nameServerAddress, String group,String instanceName) throws MQClientException {
        if (StringUtils.isBlank(group)) {
            group = DEFAULT_GROUP;
        }
       if (StringUtils.isBlank(instanceName)) { 
            instanceName = "DEFAULT"; 
         }
        String genKey = genKey(nameServerAddress, group, instanceName);
        MQProducer p = getProducerInstance().producerMap.get(genKey);
        if (p != null) {
            return p;
        }
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group);
        defaultMQProducer.setNamesrvAddr(nameServerAddress);
        defaultMQProducer.setInstanceName(instanceName);
        MQProducer beforeProducer = null;
        beforeProducer = getProducerInstance().producerMap.putIfAbsent(genKey, defaultMQProducer);
        if (beforeProducer != null) {
            return beforeProducer;
        }
        defaultMQProducer.start();
        return defaultMQProducer;
    }
    public void removeAndClose(String nameServerAddress, String group, String instanceName) {
        if (group == null) {
            group = DEFAULT_GROUP;
        }
        if (StringUtils.isBlank(instanceName)) {
           instanceName = "DEFAULT";
          }
        String genKey = genKey(nameServerAddress, group,instanceName);
        MQProducer producer = getProducerInstance().producerMap.remove(genKey);
        if (producer != null) {
            producer.shutdown();
        }
    }
    public void closeAll() {
        Set<Map.Entry<String, MQProducer>> entries = getProducerInstance().producerMap.entrySet();
        for (Map.Entry<String, MQProducer> entry : entries) {
            getProducerInstance().producerMap.remove(entry.getKey());
            entry.getValue().shutdown();
        }
    }
}

 

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 RocketMQ
RocketMQ报错:MQClientException:no route info of this topic的解决
RocketMQ报错:MQClientException:no route info of this topic的解决
140 0
|
消息中间件 Apache RocketMQ
rocketmq客户端发送消息报错和超时问题
org.apache.rocketmq.remoting.exception.RemotingTimeoutException: wait response on the channel <10.0.21.69:10911> timeout, 1000(ms)、 closeChannel: close the connection to remote address
2107 1
rocketmq客户端发送消息报错和超时问题
|
11天前
|
消息中间件 C# RocketMQ
MQ产品使用合集之设置rocketmq的timerMaxDelaySec时间出现报错如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
70 4
|
11天前
|
消息中间件 数据安全/隐私保护
MQTT微消息队列服务器连接报错:Error: Connection refused: Not authorized
使用MQTTX工具进行测试时,通过AccessKey创建了Client ID的用户名和密码。配置了公网接入点及端口1883,但尝试连接时出现错误。已附上工具截图:![](https://ucc.alicdn.com/pic/developer-ecology/3byii5uar64gg_36327474e991439da422f38c450ef153.png)。确认过用户名、密码和Client ID无误,问题仍未解决,期待回复!
|
11天前
|
消息中间件 Linux
mq报错abbit@syld36: * connected to epmd (port 4369) on syld36 * epmd reports node ‘rabbit‘ uses po
mq报错abbit@syld36: * connected to epmd (port 4369) on syld36 * epmd reports node ‘rabbit‘ uses po
19 0
|
11天前
|
消息中间件 Windows
win10 安装RabbitMQ的步骤--和报错解决
win10 安装RabbitMQ的步骤--和报错解决
25 0
|
11天前
|
算法 物联网 网络安全
MQTT常见问题之使用MQTTSendMessage2MQTT.py测试报错如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
11天前
|
消息中间件 存储 物联网
MQTT常见问题之mqtt第4—6次调用会报错如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
6月前
|
消息中间件
RabbitMq没开启rabbitmq_management插件控制台报错Node statistics not available
RabbitMq没开启rabbitmq_management插件控制台报错Node statistics not available
|
11天前
|
消息中间件 SQL 数据处理
Flink报错问题之flink消费rabbitmq报错如何解决
Flink报错通常是指在使用Apache Flink进行实时数据处理时遇到的错误和异常情况;本合集致力于收集Flink运行中的报错信息和解决策略,以便开发者及时排查和修复问题,优化Flink作业的稳定性。