关于 RocketMQ:The producer group has been created before, specify another name please. 这个报错的解决办法

简介: he producer group has been created before, specify another name please

1. 在网上看了一些解决这个问题的办法,大部分朋友都说是要在实例化 DefaultMQProducer 的时候指定惟一的 instanceName 来解决,窃以为这样虽然解决了问题,但却是不应该用的解决办法。为什么这样说?因为官网介绍客户端公共参数的时候对这个 instanceName 有明确的说明

instanceName DEFAULT

客户端实例名称,客户端创建的多个 Producer、Consumer 实际是共用一个内部实例(这个实例包含网络连接、线程资源等)

所以,这个 instanceName 所标识的实例会同时创建自己的网络连接,线程资源,如果每次创建一个 Producer 都指定不同的 instanceName 这样就会 浪费 更多资源 比如内存和线程,网络 IO。还会降低消息处理的效率。按照说明,应该是尽可能多个 Producer 共用一个 instanceName 才合理。

2. 另外,题目上的报错,是因为 group 已被创建,为什么要用指定不同且唯一的 instanceName 来解决呢?不能因为这样能解决就这样解决。实际上,如果用  DefaultMQProducer 来实例 producer 则会把创建好的 producer 先放到一个  producerTable

ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String,MQProducerInner>();

中,代码中的方法是

boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);

这个方法里 关键地方是

MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);

在添加的时候如果发以 group 为键的 producer 已存在,则注册失败。这里的键是 group 所以,当我们已经创建了同 group 的 producer 时,如果这个 producer 没有 shutdown,则再次以同样的 group 创建 producer 的时候就会报题目中的错误。

而 shutdown 之后之所以不报错是因为,shutdown 这个方法本身调用 的是  unregisterProducer (String group) 在类 MQClientInstance 中。这个方法是包含从 producerTable 中把已添加的 producer 先移除,然后再 shutdown 的。具体代码是下面这样的

 


  1. this.mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup());

  2. this.defaultAsyncSenderExecutor.shutdown();

所以先 调用 DefaultMQProducer shutdown 之后再创建新的同 group 的 producer 是不会报错的。

3. 我们再看为什么每次用   DefaultMQProducer 来创建 producer 的时候如果 都设置不同的 instanceName 为什么也不会报错呢?这是因为如果设置的 instanceName 是唯一的。则在注册 producer 之前,如果设置的 group 不是默认的,则每次 获取的 mQClientFactory 都是不同的,而 producerTable 是 mQClientFactory 类里的一个属性,这样当然 producerTable 也是不同的,这样注册 producer 当然是注册到不同的 producerTable 中去了,所以不会报错。

if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {

   this.defaultMQProducer.changeInstanceNameToPID();

}

 

this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

但是,这种解决办法是不可取的。因为 instanceName 是一个比较重(隔离数据多,创建耗时长,消费资源多)的参数。

4. 那么我们怎么更好的解决这个问题呢?我们可以参考源码中 logappender 模块中 的 ProducerInstance 类来实现。这个类在源码中位于 org.apache.rocketmq.logappender.common 下面

下面是这个类的源码

public class ProducerInstance {
    public static final String APPENDER_TYPE = "APPENDER_TYPE";
    public static final String LOG4J_APPENDER = "LOG4J_APPENDER";
    public static final String LOG4J2_APPENDER = "LOG4J2_APPENDER";
    public static final String LOGBACK_APPENDER = "LOGBACK_APPENDER";
    public static final String DEFAULT_GROUP = "rocketmq_appender";
    private ConcurrentHashMap<String, MQProducer> producerMap = new ConcurrentHashMap<String, MQProducer>();
    private static ProducerInstance instance = new ProducerInstance();
    public static ProducerInstance getProducerInstance() {
        return instance;
    }
    /**
   根据 nameServerAddress 和 group 生成 producer 在 producerMap 中的键
    **/ 
    private String genKey(String nameServerAddress, String group) {
        return nameServerAddress + "_" + group;
    }
/**
根据 nameServerAddress 和 group 获取已注册到producerMap中的producer,如果不存在,则调用 DefaultMQProducer 生成新的producer注册并返回
**/ 
    public MQProducer getInstance(String nameServerAddress, String group) throws MQClientException {
        if (StringUtils.isBlank(group)) {
            group = DEFAULT_GROUP;
        }
        String genKey = genKey(nameServerAddress, group);
        MQProducer p = getProducerInstance().producerMap.get(genKey);
        if (p != null) {
            return p;
        }
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group);
        defaultMQProducer.setNamesrvAddr(nameServerAddress);
        MQProducer beforeProducer = null;
        beforeProducer = getProducerInstance().producerMap.putIfAbsent(genKey, defaultMQProducer);
        if (beforeProducer != null) {
            return beforeProducer;
        }
        defaultMQProducer.start();
        return defaultMQProducer;
    }
/**
根据 nameServerAddress 和 group 移除已注册到producerMap中的producer,同时shutdown
**/ 
    public void removeAndClose(String nameServerAddress, String group) {
        if (group == null) {
            group = DEFAULT_GROUP;
        }
        String genKey = genKey(nameServerAddress, group);
        MQProducer producer = getProducerInstance().producerMap.remove(genKey);
        if (producer != null) {
            producer.shutdown();
        }
    }
/**
移除 producerMap 中所有的 producer 并全部关闭。
**/
    public void closeAll() {
        Set<Map.Entry<String, MQProducer>> entries = getProducerInstance().producerMap.entrySet();
        for (Map.Entry<String, MQProducer> entry : entries) {
            getProducerInstance().producerMap.remove(entry.getKey());
            entry.getValue().shutdown();
        }
    }
}

可以把这个类直接复制到要使用的项目中,然后在要使用指定 nameServerAddress 和 group 的 producer 时,直接用下面的方法获取一个。

MQProducer producer = ProducerInstance.getProducerInstance().getInstance("localhost:9876", "test-group");
/*
自己生成message消息,然后下面发送
*/
producer.send(message);
/*
如果是比较频繁使用的producer,发送完消息后不用关闭和移除下次再用的时候可以直接再获取拿来就可以发送消息。
对于确定要隔比较长时间不用的producer,可以用下面的方法 移除并关闭
*/
ProducerInstance.getProducerInstance().removeAndClose("localhost:9876", "test-group");

我们会发现,这个类获取 producer 实例的时候只用了 nameServerAddress 和 group 这两个参数。如果我们确实需要操作不同的 instanceName 下的 producer 时,该怎么办呢?直接改造 这个类里的方法,添加上

instanceName 参数 即可。

加参数后的类如下,使用方式没什么差别只是多了个参数而已。

public class ProducerInstance {
    public static final String APPENDER_TYPE = "APPENDER_TYPE";
    public static final String LOG4J_APPENDER = "LOG4J_APPENDER";
    public static final String LOG4J2_APPENDER = "LOG4J2_APPENDER";
    public static final String LOGBACK_APPENDER = "LOGBACK_APPENDER";
    public static final String DEFAULT_GROUP = "rocketmq_appender";
    private ConcurrentHashMap<String, MQProducer> producerMap = new ConcurrentHashMap<String, MQProducer>();
    private static ProducerInstance instance = new ProducerInstance();
    public static ProducerInstance getProducerInstance() {
        return instance;
    }
    private String genKey(String nameServerAddress, String group,String instanceName) {
        return nameServerAddress + "_" + group + "_" + instanceName;
    }
    public MQProducer getInstance(String nameServerAddress, String group,String instanceName) throws MQClientException {
        if (StringUtils.isBlank(group)) {
            group = DEFAULT_GROUP;
        }
       if (StringUtils.isBlank(instanceName)) { 
            instanceName = "DEFAULT"; 
         }
        String genKey = genKey(nameServerAddress, group, instanceName);
        MQProducer p = getProducerInstance().producerMap.get(genKey);
        if (p != null) {
            return p;
        }
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group);
        defaultMQProducer.setNamesrvAddr(nameServerAddress);
        defaultMQProducer.setInstanceName(instanceName);
        MQProducer beforeProducer = null;
        beforeProducer = getProducerInstance().producerMap.putIfAbsent(genKey, defaultMQProducer);
        if (beforeProducer != null) {
            return beforeProducer;
        }
        defaultMQProducer.start();
        return defaultMQProducer;
    }
    public void removeAndClose(String nameServerAddress, String group, String instanceName) {
        if (group == null) {
            group = DEFAULT_GROUP;
        }
        if (StringUtils.isBlank(instanceName)) {
           instanceName = "DEFAULT";
          }
        String genKey = genKey(nameServerAddress, group,instanceName);
        MQProducer producer = getProducerInstance().producerMap.remove(genKey);
        if (producer != null) {
            producer.shutdown();
        }
    }
    public void closeAll() {
        Set<Map.Entry<String, MQProducer>> entries = getProducerInstance().producerMap.entrySet();
        for (Map.Entry<String, MQProducer> entry : entries) {
            getProducerInstance().producerMap.remove(entry.getKey());
            entry.getValue().shutdown();
        }
    }
}

 

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7天前
|
消息中间件 监控 Oracle
消息队列 MQ产品使用合集之启动Namesrv节点时,遇到报错,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
27天前
|
消息中间件 监控 应用服务中间件
消息队列 MQ操作报错合集之重启Broker后,积压数出现为负数是什么导致的
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
消息队列 MQ操作报错合集之重启Broker后,积压数出现为负数是什么导致的
|
7天前
|
消息中间件 网络协议 JavaScript
消息队列 MQ产品使用合集之报错提示是"the internal error!",是什么原因导致的”
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
27天前
|
消息中间件 Java 测试技术
消息队列 MQ操作报错合集之设置了setKeepAliveInterval(1)但仍然出现客户端未连接,该怎么解决
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
|
27天前
|
消息中间件 设计模式 网络安全
消息队列 MQ操作报错合集之broker启用controller配置时,遇到报错,是什么导致的
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
|
27天前
|
消息中间件 Apache RocketMQ
消息队列 MQ操作报错合集之设置了controller后,有一主一从,但只显示一个,该怎么解决
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
|
7天前
|
消息中间件 Java API
消息队列 MQ产品使用合集之遇到"No topic route info in name server for the topic"错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7天前
|
消息中间件 Java Shell
消息队列 MQ产品使用合集之启动broker&proxy的时候会报错,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
20天前
|
消息中间件 Oracle 关系型数据库
实时计算 Flink版操作报错合集之连接RabbitMQ时遇到Could not find any factory for identifier 'rabbitmq' that implements 'org.apache.flink.table.factories.DynamicTableFactory'错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
380 0
|
24天前
|
消息中间件 Java RocketMQ
【已解决】RocketMq使用报错
【已解决】RocketMq使用报错
20 0