rocketMQ 消息重试的问题

本文涉及的产品
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
云数据库 Tair(兼容Redis),内存型 2GB
简介: 将 consumeMessageBatchMaxSize 设置为 大于1 的时候 数据重试会导致 部分数据消费不到的情况

问题描述


使用公司的rocketMQ封装的框架  将  consumeMessageBatchMaxSize  设置为20   大概的配置如下所示  (当然下面可能是我们公司自己封装的 但是大致都一样的)

      xxxxConsumer:
        nameServer: XXXXXX
        consumerGroup: XXXXXXXX
        topic: XXXXXXXXXXXXXS
        selectorExpress: '*'        consumeThreadMin: 20        consumeThreadMax: 20        max-retry-times: 3        consumeMessageBatchMaxSize: 32        consumeMode: CONCURRENTLY  
        messageMode: CLUSTERING
        clientUnitName: WF-ORIGINAL-CONSUMER


然后业务逻辑就是正常执行数据  当数据有问题则抛出异常 进入重试队列  然后再次消费数据  当达到最大重试次数的时候  丢弃数据  


看线上的日志 会发现 有大量的数据抛出了异常 但是没有进入重试逻辑  

然后看重试队列 惊奇的发现 数据也正常写入了重试队列当中   但是客户端却没有收到  

本以为是我们公司框架的问题  但是我自己不用框架 实验 也出现这种问题  


consumeMessageBatchMaxSize

某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer的 consumeMessageBatchMaxSize 返个参数,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。


代码实验 问题重现


producer

image.png

public class Producer {
    @SneakyThrows
    public static void main(String[] args) {
        DefaultMQProducer producer = new DefaultMQProducer("LL_TAAEST_TOPIC");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
for (int i =800; i < 850; i++) {
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("LL_TEST_TOPIC" /* Topic */,
                    String.valueOf(i) /* Tag */, String.valueOf(i),
                    String.valueOf(i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 发送消息到一个Broker
            SendResult sendResult = producer.send(msg);
            // 通过sendResult返回消息是否成功送达
            System.out.printf("%s%n", sendResult);
        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}


Consumer

image.png

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("LL_TEST_TOPIC_CONSUMER");
        // 设置NameServer的地址
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        consumer.subscribe("LL_TEST_TOPIC", "*");
        consumer.setMaxReconsumeTimes(5);
        //设置批量消费
        consumer.setConsumeMessageBatchMaxSize(20);
        consumer.setConsumeThreadMin(5);
        consumer.setConsumeThreadMax(5);
        // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : messageExtList) {
if (messageExt.getReconsumeTimes()>2){
                        System.out.println("消费超过两次 返回  key "+ messageExt.getKeys()+"  次数 "+ messageExt.getReconsumeTimes());
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
if (Integer.parseInt(messageExt.getKeys())%2==1){
                        System.out.println("消费数据 成功 key "+ messageExt.getKeys()+"  次数 "+ messageExt.getReconsumeTimes());
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                    System.out.println("消费数据  失败 key "+ messageExt.getKeys()+"  次数 "+ messageExt.getReconsumeTimes());
                    throw new IllegalArgumentException();
                }
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}



数据

"C:\Program Files\Java\jdk1.8.0_181\bin\java.exe""-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2021.2\lib\idea_rt.jar=56563:C:\Program Files\JetBrains\IntelliJ IDEA 2021.2\bin"-Dfile.encoding=UTF-8 -classpath"C:\Program Files\Java\jdk1.8.0_181\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\rt.jar;D:\workspace\demoTest\org-rs-ns\target\classes;D:\repository\com\alibaba\cloud\spring-cloud-starter-alibaba-nacos-discovery\2.2.2.RELEASE\spring-cloud-starter-alibaba-nacos-discovery-2.2.2.RELEASE.jar;D:\repository\com\alibaba\nacos\nacos-client\1.3.2\nacos-client-1.3.2.jar;D:\repository\com\alibaba\nacos\nacos-common\1.3.2\nacos-common-1.3.2.jar;D:\repository\org\apache\httpcomponents\httpasyncclient\4.1.4\httpasyncclient-4.1.4.jar;D:\repository\org\apache\httpcomponents\httpcore-nio\4.4.12\httpcore-nio-4.4.12.jar;D:\repository\com\alibaba\nacos\nacos-api\1.3.2\nacos-api-1.3.2.jar;D:\repository\com\fasterxml\jackson\core\jackson-core\2.10.1\jackson-core-2.10.1.jar;D:\repository\io\prometheus\simpleclient\0.5.0\simpleclient-0.5.0.jar;D:\repository\org\yaml\snakeyaml\1.25\snakeyaml-1.25.jar;D:\repository\com\alibaba\spring\spring-context-support\1.0.9\spring-context-support-1.0.9.jar;D:\repository\org\springframework\cloud\spring-cloud-commons\2.2.5.RELEASE\spring-cloud-commons-2.2.5.RELEASE.jar;D:\repository\org\springframework\security\spring-security-crypto\5.2.1.RELEASE\spring-security-crypto-5.2.1.RELEASE.jar;D:\repository\org\springframework\cloud\spring-cloud-context\2.2.5.RELEASE\spring-cloud-context-2.2.5.RELEASE.jar;D:\repository\org\springframework\cloud\spring-cloud-starter-netflix-ribbon\2.2.5.RELEASE\spring-cloud-starter-netflix-ribbon-2.2.5.RELEASE.jar;D:\repository\org\springframework\cloud\spring-cloud-starter\2.2.5.RELEASE\spring-cloud-starter-2.2.5.RELEASE.jar;D:\repository\org\springframework\security\spring-security-rsa\1.0.9.RELEASE\spring-security-rsa-1.0.9.RELEASE.jar;D:\repository\org\bouncycastle\bcpkix-jdk15on\1.64\bcpkix-jdk15on-1.64.jar;D:\repository\org\bouncycastle\bcprov-jdk15on\1.64\bcprov-jdk15on-1.64.jar;D:\repository\org\springframework\cloud\spring-cloud-netflix-ribbon\2.2.5.RELEASE\spring-cloud-netflix-ribbon-2.2.5.RELEASE.jar;D:\repository\org\springframework\cloud\spring-cloud-netflix-archaius\2.2.5.RELEASE\spring-cloud-netflix-archaius-2.2.5.RELEASE.jar;D:\repository\org\springframework\cloud\spring-cloud-starter-netflix-archaius\2.2.5.RELEASE\spring-cloud-starter-netflix-archaius-2.2.5.RELEASE.jar;D:\repository\com\netflix\archaius\archaius-core\0.7.6\archaius-core-0.7.6.jar;D:\repository\com\google\code\findbugs\jsr305\3.0.1\jsr305-3.0.1.jar;D:\repository\commons-configuration\commons-configuration\1.8\commons-configuration-1.8.jar;D:\repository\com\netflix\ribbon\ribbon\2.3.0\ribbon-2.3.0.jar;D:\repository\com\netflix\ribbon\ribbon-transport\2.3.0\ribbon-transport-2.3.0.jar;D:\repository\io\reactivex\rxnetty-contexts\0.4.9\rxnetty-contexts-0.4.9.jar;D:\repository\io\reactivex\rxnetty-servo\0.4.9\rxnetty-servo-0.4.9.jar;D:\repository\com\netflix\hystrix\hystrix-core\1.4.3\hystrix-core-1.4.3.jar;D:\repository\javax\inject\javax.inject\1\javax.inject-1.jar;D:\repository\io\reactivex\rxnetty\0.4.9\rxnetty-0.4.9.jar;D:\repository\com\netflix\ribbon\ribbon-core\2.3.0\ribbon-core-2.3.0.jar;D:\repository\commons-lang\commons-lang\2.6\commons-lang-2.6.jar;D:\repository\com\netflix\ribbon\ribbon-httpclient\2.3.0\ribbon-httpclient-2.3.0.jar;D:\repository\org\apache\httpcomponents\httpclient\4.5.10\httpclient-4.5.10.jar;D:\repository\com\sun\jersey\jersey-client\1.19.1\jersey-client-1.19.1.jar;D:\repository\com\sun\jersey\jersey-core\1.19.1\jersey-core-1.19.1.jar;D:\repository\javax\ws\rs\jsr311-api\1.1.1\jsr311-api-1.1.1.jar;D:\repository\com\sun\jersey\contribs\jersey-apache-client4\1.19.1\jersey-apache-client4-1.19.1.jar;D:\repository\com\netflix\servo\servo-core\0.10.1\servo-core-0.10.1.jar;D:\repository\com\netflix\servo\servo-internal\0.10.1\servo-internal-0.10.1.jar;D:\repository\com\netflix\netflix-commons\netflix-commons-util\0.1.1\netflix-commons-util-0.1.1.jar;D:\repository\com\netflix\ribbon\ribbon-loadbalancer\2.3.0\ribbon-loadbalancer-2.3.0.jar;D:\repository\com\netflix\netflix-commons\netflix-statistics\0.1.1\netflix-statistics-0.1.1.jar;D:\repository\io\reactivex\rxjava\1.3.8\rxjava-1.3.8.jar;D:\repository\org\example\greeter-spring-boot-starter\1.0-SNAPSHOT\greeter-spring-boot-starter-1.0-SNAPSHOT.jar;D:\repository\org\springframework\boot\spring-boot-autoconfigure\2.2.2.RELEASE\spring-boot-autoconfigure-2.2.2.RELEASE.jar;D:\repository\cn\com\yto56\npush\tms-npush-common-dto\1.0-SNAPSHOT\tms-npush-common-dto-1.0-SNAPSHOT.jar;D:\repository\cn\com\yto56\basic\framework\yto-framework-core\3.2.18-SNAPSHOT\yto-framework-core-3.2.18-20220731.074711-170.jar;D:\repository\com\alibaba\transmittable-thread-local\2.12.2\transmittable-thread-local-2.12.2.jar;D:\repository\cn\com\yto56\basic\framework\yto-framework-model\3.2.18-SNAPSHOT\yto-framework-model-3.2.18-20220731.074703-150.jar;D:\repository\com\github\xiaoymin\knife4j-annotations\2.0.4\knife4j-annotations-2.0.4.jar;D:\repository\io\swagger\swagger-annotations\1.5.22\swagger-annotations-1.5.22.jar;D:\repository\joda-time\joda-time\2.10.5\joda-time-2.10.5.jar;D:\repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;D:\repository\com\fasterxml\jackson\dataformat\jackson-dataformat-xml\2.10.1\jackson-dataformat-xml-2.10.1.jar;D:\repository\com\fasterxml\jackson\module\jackson-module-jaxb-annotations\2.10.1\jackson-module-jaxb-annotations-2.10.1.jar;D:\repository\org\codehaus\woodstox\stax2-api\4.2\stax2-api-4.2.jar;D:\repository\com\fasterxml\woodstox\woodstox-core\6.0.2\woodstox-core-6.0.2.jar;D:\repository\javax\servlet\javax.servlet-api\4.0.1\javax.servlet-api-4.0.1.jar;D:\repository\com\google\guava\guava\18.0\guava-18.0.jar;D:\repository\com\alibaba\fastjson\1.2.83\fastjson-1.2.83.jar;D:\repository\commons-codec\commons-codec\1.13\commons-codec-1.13.jar;D:\repository\commons-io\commons-io\2.5\commons-io-2.5.jar;D:\repository\org\springframework\boot\spring-boot-starter-jdbc\2.2.2.RELEASE\spring-boot-starter-jdbc-2.2.2.RELEASE.jar;D:\repository\org\springframework\boot\spring-boot-starter\2.2.2.RELEASE\spring-boot-starter-2.2.2.RELEASE.jar;D:\repository\org\springframework\boot\spring-boot-starter-logging\2.2.2.RELEASE\spring-boot-starter-logging-2.2.2.RELEASE.jar;D:\repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;D:\repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;D:\repository\org\apache\logging\log4j\log4j-to-slf4j\2.12.1\log4j-to-slf4j-2.12.1.jar;D:\repository\org\apache\logging\log4j\log4j-api\2.12.1\log4j-api-2.12.1.jar;D:\repository\org\slf4j\jul-to-slf4j\1.7.29\jul-to-slf4j-1.7.29.jar;D:\repository\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;D:\repository\com\zaxxer\HikariCP\3.4.1\HikariCP-3.4.1.jar;D:\repository\org\springframework\spring-jdbc\5.2.2.RELEASE\spring-jdbc-5.2.2.RELEASE.jar;D:\repository\org\springframework\spring-beans\5.2.2.RELEASE\spring-beans-5.2.2.RELEASE.jar;D:\repository\org\springframework\spring-tx\5.2.2.RELEASE\spring-tx-5.2.2.RELEASE.jar;D:\repository\org\mybatis\spring\boot\mybatis-spring-boot-starter\2.1.4\mybatis-spring-boot-starter-2.1.4.jar;D:\repository\org\mybatis\spring\boot\mybatis-spring-boot-autoconfigure\2.1.4\mybatis-spring-boot-autoconfigure-2.1.4.jar;D:\repository\org\mybatis\mybatis\3.5.6\mybatis-3.5.6.jar;D:\repository\org\mybatis\mybatis-spring\2.0.6\mybatis-spring-2.0.6.jar;D:\repository\mysql\mysql-connector-java\8.0.18\mysql-connector-java-8.0.18.jar;D:\repository\org\springframework\boot\spring-boot-starter-web\2.2.2.RELEASE\spring-boot-starter-web-2.2.2.RELEASE.jar;D:\repository\org\springframework\boot\spring-boot-starter-json\2.2.2.RELEASE\spring-boot-starter-json-2.2.2.RELEASE.jar;D:\repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.10.1\jackson-datatype-jdk8-2.10.1.jar;D:\repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.10.1\jackson-datatype-jsr310-2.10.1.jar;D:\repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.10.1\jackson-module-parameter-names-2.10.1.jar;D:\repository\org\springframework\boot\spring-boot-starter-tomcat\2.2.2.RELEASE\spring-boot-starter-tomcat-2.2.2.RELEASE.jar;D:\repository\org\apache\tomcat\embed\tomcat-embed-core\9.0.29\tomcat-embed-core-9.0.29.jar;D:\repository\org\apache\tomcat\embed\tomcat-embed-el\9.0.29\tomcat-embed-el-9.0.29.jar;D:\repository\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.29\tomcat-embed-websocket-9.0.29.jar;D:\repository\org\springframework\boot\spring-boot-starter-validation\2.2.2.RELEASE\spring-boot-starter-validation-2.2.2.RELEASE.jar;D:\repository\jakarta\validation\jakarta.validation-api\2.0.1\jakarta.validation-api-2.0.1.jar;D:\repository\org\hibernate\validator\hibernate-validator\6.0.18.Final\hibernate-validator-6.0.18.Final.jar;D:\repository\org\jboss\logging\jboss-logging\3.4.1.Final\jboss-logging-3.4.1.Final.jar;D:\repository\com\fasterxml\classmate\1.5.1\classmate-1.5.1.jar;D:\repository\org\springframework\spring-web\5.2.2.RELEASE\spring-web-5.2.2.RELEASE.jar;D:\repository\org\springframework\spring-webmvc\5.2.2.RELEASE\spring-webmvc-5.2.2.RELEASE.jar;D:\repository\org\springframework\spring-aop\5.2.2.RELEASE\spring-aop-5.2.2.RELEASE.jar;D:\repository\org\springframework\spring-context\5.2.2.RELEASE\spring-context-5.2.2.RELEASE.jar;D:\repository\org\springframework\spring-expression\5.2.2.RELEASE\spring-expression-5.2.2.RELEASE.jar;D:\repository\org\springframework\boot\spring-boot-starter-data-mongodb\2.2.2.RELEASE\spring-boot-starter-data-mongodb-2.2.2.RELEASE.jar;D:\repository\org\mongodb\mongodb-driver\3.11.2\mongodb-driver-3.11.2.jar;D:\repository\org\mongodb\bson\3.11.2\bson-3.11.2.jar;D:\repository\org\mongodb\mongodb-driver-core\3.11.2\mongodb-driver-core-3.11.2.jar;D:\repository\org\springframework\data\spring-data-mongodb\2.2.3.RELEASE\spring-data-mongodb-2.2.3.RELEASE.jar;D:\repository\org\springframework\data\spring-data-commons\2.2.3.RELEASE\spring-data-commons-2.2.3.RELEASE.jar;D:\repository\org\apache\rocketmq\rocketmq-client\4.9.1\rocketmq-client-4.9.1.jar;D:\repository\org\apache\rocketmq\rocketmq-common\4.9.1\rocketmq-common-4.9.1.jar;D:\repository\org\apache\rocketmq\rocketmq-remoting\4.9.1\rocketmq-remoting-4.9.1.jar;D:\repository\io\netty\netty-all\4.1.43.Final\netty-all-4.1.43.Final.jar;D:\repository\org\apache\rocketmq\rocketmq-logging\4.9.1\rocketmq-logging-4.9.1.jar;D:\repository\commons-validator\commons-validator\1.7\commons-validator-1.7.jar;D:\repository\commons-beanutils\commons-beanutils\1.9.4\commons-beanutils-1.9.4.jar;D:\repository\commons-digester\commons-digester\2.1\commons-digester-2.1.jar;D:\repository\commons-logging\commons-logging\1.2\commons-logging-1.2.jar;D:\repository\org\apache\commons\commons-lang3\3.9\commons-lang3-3.9.jar;D:\repository\cn\hutool\hutool-all\5.7.14\hutool-all-5.7.14.jar;D:\repository\com\alibaba\easyexcel\2.2.6\easyexcel-2.2.6.jar;D:\repository\org\apache\poi\poi\3.17\poi-3.17.jar;D:\repository\org\apache\commons\commons-collections4\4.1\commons-collections4-4.1.jar;D:\repository\org\apache\poi\poi-ooxml\3.17\poi-ooxml-3.17.jar;D:\repository\com\github\virtuald\curvesapi\1.04\curvesapi-1.04.jar;D:\repository\org\apache\poi\poi-ooxml-schemas\3.17\poi-ooxml-schemas-3.17.jar;D:\repository\org\apache\xmlbeans\xmlbeans\2.6.0\xmlbeans-2.6.0.jar;D:\repository\stax\stax-api\1.0.1\stax-api-1.0.1.jar;D:\repository\cglib\cglib\3.1\cglib-3.1.jar;D:\repository\org\ow2\asm\asm\4.2\asm-4.2.jar;D:\repository\org\slf4j\slf4j-api\1.7.29\slf4j-api-1.7.29.jar;D:\repository\org\ehcache\ehcache\3.8.1\ehcache-3.8.1.jar;D:\repository\org\glassfish\jaxb\jaxb-runtime\2.3.2\jaxb-runtime-2.3.2.jar;D:\repository\org\glassfish\jaxb\txw2\2.3.2\txw2-2.3.2.jar;D:\repository\com\sun\istack\istack-commons-runtime\3.0.8\istack-commons-runtime-3.0.8.jar;D:\repository\org\jvnet\staxex\stax-ex\1.8.1\stax-ex-1.8.1.jar;D:\repository\com\sun\xml\fastinfoset\FastInfoset\1.2.16\FastInfoset-1.2.16.jar;D:\repository\com\jcraft\jsch\0.1.55\jsch-0.1.55.jar;D:\repository\org\springframework\boot\spring-boot-devtools\2.2.2.RELEASE\spring-boot-devtools-2.2.2.RELEASE.jar;D:\repository\org\springframework\boot\spring-boot\2.2.2.RELEASE\spring-boot-2.2.2.RELEASE.jar;D:\repository\com\h2database\h2\1.4.200\h2-1.4.200.jar;D:\repository\org\projectlombok\lombok\1.18.10\lombok-1.18.10.jar;D:\repository\jakarta\xml\bind\jakarta.xml.bind-api\2.3.2\jakarta.xml.bind-api-2.3.2.jar;D:\repository\jakarta\activation\jakarta.activation-api\1.2.1\jakarta.activation-api-1.2.1.jar;D:\repository\org\objenesis\objenesis\2.6\objenesis-2.6.jar;D:\repository\org\springframework\spring-core\5.2.2.RELEASE\spring-core-5.2.2.RELEASE.jar;D:\repository\org\springframework\spring-jcl\5.2.2.RELEASE\spring-jcl-5.2.2.RELEASE.jar;D:\repository\com\alicp\jetcache\jetcache-starter-redis\2.5.11\jetcache-starter-redis-2.5.11.jar;D:\repository\com\alicp\jetcache\jetcache-autoconfigure\2.5.11\jetcache-autoconfigure-2.5.11.jar;D:\repository\com\alicp\jetcache\jetcache-anno\2.5.11\jetcache-anno-2.5.11.jar;D:\repository\com\alicp\jetcache\jetcache-core\2.5.11\jetcache-core-2.5.11.jar;D:\repository\com\alicp\jetcache\jetcache-anno-api\2.5.11\jetcache-anno-api-2.5.11.jar;D:\repository\com\github\ben-manes\caffeine\caffeine\2.8.0\caffeine-2.8.0.jar;D:\repository\org\checkerframework\checker-qual\2.10.0\checker-qual-2.10.0.jar;D:\repository\com\google\errorprone\error_prone_annotations\2.3.3\error_prone_annotations-2.3.3.jar;D:\repository\com\alicp\jetcache\jetcache-redis\2.5.11\jetcache-redis-2.5.11.jar;D:\repository\redis\clients\jedis\3.1.0\jedis-3.1.0.jar;D:\repository\org\apache\commons\commons-pool2\2.7.0\commons-pool2-2.7.0.jar;D:\repository\com\esotericsoftware\kryo\4.0.2\kryo-4.0.2.jar;D:\repository\com\esotericsoftware\reflectasm\1.11.3\reflectasm-1.11.3.jar;D:\repository\com\esotericsoftware\minlog\1.3.0\minlog-1.3.0.jar;D:\repository\org\jeasy\easy-rules-core\3.3.0\easy-rules-core-3.3.0.jar;D:\repository\org\jeasy\easy-rules-support\3.3.0\easy-rules-support-3.3.0.jar;D:\repository\com\fasterxml\jackson\dataformat\jackson-dataformat-yaml\2.10.1\jackson-dataformat-yaml-2.10.1.jar;D:\repository\com\fasterxml\jackson\core\jackson-databind\2.10.1\jackson-databind-2.10.1.jar;D:\repository\com\fasterxml\jackson\core\jackson-annotations\2.10.1\jackson-annotations-2.10.1.jar;D:\repository\org\jeasy\easy-rules-mvel\3.3.0\easy-rules-mvel-3.3.0.jar;D:\repository\org\mvel\mvel2\2.4.3.Final\mvel2-2.4.3.Final.jar;D:\repository\com\bbossgroups\plugins\bboss-elasticsearch-rest-jdbc\6.5.1\bboss-elasticsearch-rest-jdbc-6.5.1.jar;D:\repository\com\bbossgroups\plugins\bboss-datatran-schedule-quartz\6.5.1\bboss-datatran-schedule-quartz-6.5.1.jar;D:\repository\com\bbossgroups\plugins\bboss-datatran-core\6.5.1\bboss-datatran-core-6.5.1.jar;D:\repository\com\bbossgroups\plugins\bboss-elasticsearch-geoip\6.5.1\bboss-elasticsearch-geoip-6.5.1.jar;D:\repository\com\bbossgroups\plugins\bboss-elasticsearch-rest-booter\6.5.1\bboss-elasticsearch-rest-booter-6.5.1.jar;D:\repository\com\maxmind\db\maxmind-db\1.4.0\maxmind-db-1.4.0.jar;D:\repository\com\maxmind\geoip2\geoip2\2.14.0\geoip2-2.14.0.jar;D:\repository\com\bbossgroups\plugins\bboss-elasticsearch-spring-boot-starter\6.5.1\bboss-elasticsearch-spring-boot-starter-6.5.1.jar;D:\repository\com\bbossgroups\plugins\bboss-elasticsearch-rest\6.5.1\bboss-elasticsearch-rest-6.5.1.jar;D:\repository\com\bbossgroups\plugins\bboss-elasticsearch-rest-entity\6.5.1\bboss-elasticsearch-rest-entity-6.5.1.jar;D:\repository\com\bbossgroups\bboss-core-entity\5.8.7\bboss-core-entity-5.8.7.jar;D:\repository\com\bbossgroups\bboss-http\5.9.3\bboss-http-5.9.3.jar;D:\repository\com\bbossgroups\bboss-core\5.8.7\bboss-core-5.8.7.jar;D:\repository\com\bbossgroups\bboss-soa\5.8.7\bboss-soa-5.8.7.jar;D:\repository\org\apache\httpcomponents\httpcore\4.4.12\httpcore-4.4.12.jar;D:\repository\org\apache\httpcomponents\httpmime\4.5.10\httpmime-4.5.10.jar;D:\repository\com\bbossgroups\bboss-persistent\5.8.7\bboss-persistent-5.8.7.jar;D:\repository\com\bbossgroups\bboss-util\5.8.7\bboss-util-5.8.7.jar;D:\repository\oro\oro\2.0.8\oro-2.0.8.jar;D:\repository\com\fasterxml\uuid\java-uuid-generator\3.1.4\java-uuid-generator-3.1.4.jar;D:\repository\cglib\cglib-nodep\3.1\cglib-nodep-3.1.jar;D:\repository\javax\transaction\jta\1.1\jta-1.1.jar;D:\repository\com\bbossgroups\bboss-velocity\5.8.7\bboss-velocity-5.8.7.jar;D:\repository\jdom\jdom\1.0\jdom-1.0.jar;D:\repository\werken-xpath\werken-xpath\0.9.4\werken-xpath-0.9.4.jar" com.example.demotest.mq.Consumer
Consumer Started.
消费数据  失败 key 800  次数 0消费数据  失败 key 804  次数 0消费数据  失败 key 802  次数 0消费数据 成功 key 803  次数 0消费数据 成功 key 801  次数 0消费数据 成功 key 805  次数 0消费数据  失败 key 806  次数 0消费数据 成功 key 807  次数 0消费数据  失败 key 808  次数 0消费数据 成功 key 809  次数 0消费数据  失败 key 810  次数 0消费数据 成功 key 811  次数 0消费数据  失败 key 812  次数 0消费数据 成功 key 813  次数 0消费数据  失败 key 814  次数 0消费数据 成功 key 815  次数 0消费数据  失败 key 816  次数 0消费数据 成功 key 817  次数 0消费数据  失败 key 818  次数 0消费数据 成功 key 819  次数 0消费数据  失败 key 820  次数 0消费数据 成功 key 821  次数 0消费数据 成功 key 823  次数 0消费数据  失败 key 822  次数 0消费数据  失败 key 824  次数 0消费数据 成功 key 825  次数 0消费数据  失败 key 826  次数 0消费数据 成功 key 827  次数 0消费数据  失败 key 828  次数 0消费数据 成功 key 829  次数 0消费数据  失败 key 830  次数 0消费数据 成功 key 831  次数 0消费数据  失败 key 832  次数 0消费数据 成功 key 833  次数 0消费数据  失败 key 834  次数 0消费数据 成功 key 835  次数 0消费数据  失败 key 836  次数 0消费数据 成功 key 837  次数 0消费数据  失败 key 838  次数 0消费数据 成功 key 839  次数 0消费数据  失败 key 840  次数 0消费数据 成功 key 841  次数 0消费数据  失败 key 842  次数 0消费数据 成功 key 843  次数 0消费数据  失败 key 844  次数 0消费数据 成功 key 845  次数 0消费数据  失败 key 846  次数 0消费数据 成功 key 847  次数 0消费数据  失败 key 848  次数 0消费数据 成功 key 849  次数 0消费数据  失败 key 806  次数 1消费数据  失败 key 802  次数 1消费数据  失败 key 812  次数 1消费数据  失败 key 814  次数 1消费数据  失败 key 832  次数 1消费数据  失败 key 840  次数 1消费数据  失败 key 838  次数 1消费数据  失败 key 806  次数 2消费数据  失败 key 800  次数 2消费数据  失败 key 812  次数 2消费数据  失败 key 804  次数 2消费数据  失败 key 836  次数 2消费数据  失败 key 822  次数 2消费超过两次 返回  key 806  次数 3消费超过两次 返回  key 808  次数 3消费超过两次 返回  key 812  次数 3消费超过两次 返回  key 814  次数 3消费超过两次 返回  key 816  次数 3消费超过两次 返回  key 824  次数 3消费超过两次 返回  key 822  次数 3消费超过两次 返回  key 842  次数 3消费超过两次 返回  key 828  次数 3消费超过两次 返回  key 844  次数 3消费超过两次 返回  key 848  次数 3

可以清晰的看到 有需要重试的数据没有进来  

同时查看 重试队列 数据是正常的  也都进入了重试队列


配置

默认生产者和消费者 都是按照集群方式配置  当然测试的时候是broker的方式

取消广播方式  

默认生产者 16个 读写队列  

消费者 默认配置 1个重试队列


补充说明

生产者  TOPIC 和重试队列的 TOIC 的 perm 都是 6  默认可写可读的  

PERM  我看好多文章说的都是  2 4 6 即 2 可写  4 可读 6 可读可写  其实不准确  看官方源码就知道   它取的是计算后的数据  

/**/packageorg.apache.rocketmq.common.constant;
publicclassPermName {
publicstaticfinalintPERM_PRIORITY=0x1<<3;
publicstaticfinalintPERM_READ=0x1<<2;
publicstaticfinalintPERM_WRITE=0x1<<1;
publicstaticfinalintPERM_INHERIT=0x1;
publicstaticStringperm2String(finalintperm) {
finalStringBuildersb=newStringBuilder("---");
if (isReadable(perm)) {
sb.replace(0, 1, "R");
        }
if (isWriteable(perm)) {
sb.replace(1, 2, "W");
        }
if (isInherited(perm)) {
sb.replace(2, 3, "X");
        }
returnsb.toString();
    }
publicstaticbooleanisReadable(finalintperm) {
return (perm&PERM_READ) ==PERM_READ;
    }
publicstaticbooleanisWriteable(finalintperm) {
return (perm&PERM_WRITE) ==PERM_WRITE;
    }
publicstaticbooleanisInherited(finalintperm) {
return (perm&PERM_INHERIT) ==PERM_INHERIT;
    }
}



再补充下

消息重试

Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer消费消息失败通常可以认为有以下几种情况:

  • 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过10秒后再重试。
  • 由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用sleep 30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。

RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。




解决

1、  将  consumeMessageBatchMaxSize    设为1  就解决了   其实这个批量消费不一定真的能提高多大的消费性能,前面我们已经开启了多线程,最后影响消费快慢还是我们的业务逻辑  即1条数据的消费时间   我们虽然可以批量拿到多条数据,但是当消费能力上不去的话 还是会阻塞的


2、我尝试将 retry队列中的 perm 默认为6  设置成 1   即可继承  也可以解决   但是不知道为什么   待求证  


疑问


感觉官方还是可以解决一下这个问题的吧   当批量消费的时候  重试数据没有消费到    或者是我操作失误  但是网上也没有相关的文档 可以求证    希望有大佬可以解惑吧

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 存储 算法
RocketMQ 重试机制详解及最佳实践
本文主要介绍在使用 RocketMQ 时为什么需要重试与兜底机制,生产者与消费者触发重试的条件和具体行为,如何在 RocketMQ 中合理使用重试机制,帮助构建弹性,高可用系统的最佳实践。
1351 0
RocketMQ 重试机制详解及最佳实践
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
756 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
3月前
|
消息中间件 NoSQL 关系型数据库
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
109 1
|
4月前
|
消息中间件 Arthas 监控
消息队列 MQ产品使用合集之每次重置reconsumeTimes就无法达到死信阈值,重试次数是否就要应用方控制
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ产品使用合集之每次重置reconsumeTimes就无法达到死信阈值,重试次数是否就要应用方控制
|
消息中间件 缓存 监控
Rocketmq并发和顺序消费的失败重试机制
Rocketmq并发和顺序消费的失败重试机制
|
消息中间件 Arthas 监控
一次RocketMQ ons SDK Bug导致消息不断堆积到重试队列的案例分析
一次RocketMQ ons SDK Bug导致消息不断堆积到重试队列的案例分析
446 1
|
消息中间件 中间件 RocketMQ
【Alibaba中间件技术系列】「RocketMQ技术专题」分析消息队列中的消费失败重试机制的原理和实践
【Alibaba中间件技术系列】「RocketMQ技术专题」分析消息队列中的消费失败重试机制的原理和实践
412 3
|
消息中间件 缓存 Cloud Native
RocketMQ 重试机制的概念与最佳实践|学习笔记
快速学习 RocketMQ 重试机制的概念与最佳实践
539 0
RocketMQ 重试机制的概念与最佳实践|学习笔记
|
消息中间件 存储 缓存
简述RocketMQ消息拉取过程【二】
简述RocketMQ消息拉取过程【二】
1080 1
|
消息中间件 Apache RocketMQ
《万亿级数据洪峰下的消息引擎——Apache RocketMQ》电子版地址
万亿级数据洪峰下的消息引擎——Apache RocketMQ
334 0
《万亿级数据洪峰下的消息引擎——Apache RocketMQ》电子版地址