问题描述
使用公司的rocketMQ封装的框架 将 consumeMessageBatchMaxSize 设置为20 大概的配置如下所示 (当然下面可能是我们公司自己封装的 但是大致都一样的)
xxxxConsumer: nameServer: XXXXXX consumerGroup: XXXXXXXX topic: XXXXXXXXXXXXXS selectorExpress: '*' consumeThreadMin: 20 consumeThreadMax: 20 max-retry-times: 3 consumeMessageBatchMaxSize: 32 consumeMode: CONCURRENTLY messageMode: CLUSTERING clientUnitName: WF-ORIGINAL-CONSUMER
然后业务逻辑就是正常执行数据 当数据有问题则抛出异常 进入重试队列 然后再次消费数据 当达到最大重试次数的时候 丢弃数据
看线上的日志 会发现 有大量的数据抛出了异常 但是没有进入重试逻辑
然后看重试队列 惊奇的发现 数据也正常写入了重试队列当中 但是客户端却没有收到
本以为是我们公司框架的问题 但是我自己不用框架 实验 也出现这种问题
consumeMessageBatchMaxSize
某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer的 consumeMessageBatchMaxSize 返个参数,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。
代码实验 问题重现
producer
public class Producer { @SneakyThrows public static void main(String[] args) { DefaultMQProducer producer = new DefaultMQProducer("LL_TAAEST_TOPIC"); producer.setNamesrvAddr("localhost:9876"); producer.start(); for (int i =800; i < 850; i++) { // 创建消息,并指定Topic,Tag和消息体 Message msg = new Message("LL_TEST_TOPIC" /* Topic */, String.valueOf(i) /* Tag */, String.valueOf(i), String.valueOf(i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // 发送消息到一个Broker SendResult sendResult = producer.send(msg); // 通过sendResult返回消息是否成功送达 System.out.printf("%s%n", sendResult); } // 如果不再发送消息,关闭Producer实例。 producer.shutdown(); } }
Consumer
public class Consumer { public static void main(String[] args) throws MQClientException { // 实例化消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("LL_TEST_TOPIC_CONSUMER"); // 设置NameServer的地址 consumer.setNamesrvAddr("localhost:9876"); // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息 consumer.subscribe("LL_TEST_TOPIC", "*"); consumer.setMaxReconsumeTimes(5); //设置批量消费 consumer.setConsumeMessageBatchMaxSize(20); consumer.setConsumeThreadMin(5); consumer.setConsumeThreadMax(5); // 注册回调实现类来处理从broker拉取回来的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) { for (MessageExt messageExt : messageExtList) { if (messageExt.getReconsumeTimes()>2){ System.out.println("消费超过两次 返回 key "+ messageExt.getKeys()+" 次数 "+ messageExt.getReconsumeTimes()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } if (Integer.parseInt(messageExt.getKeys())%2==1){ System.out.println("消费数据 成功 key "+ messageExt.getKeys()+" 次数 "+ messageExt.getReconsumeTimes()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } System.out.println("消费数据 失败 key "+ messageExt.getKeys()+" 次数 "+ messageExt.getReconsumeTimes()); throw new IllegalArgumentException(); } // 标记该消息已经被成功消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者实例 consumer.start(); System.out.printf("Consumer Started.%n"); } }
数据
"C:\Program Files\Java\jdk1.8.0_181\bin\java.exe""-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2021.2\lib\idea_rt.jar=56563:C:\Program Files\JetBrains\IntelliJ IDEA 2021.2\bin"-Dfile.encoding=UTF-8 -classpath"C:\Program Files\Java\jdk1.8.0_181\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\rt.jar;D:\workspace\demoTest\org-rs-ns\target\classes;D:\repository\com\alibaba\cloud\spring-cloud-starter-alibaba-nacos-discovery\2.2.2.RELEASE\spring-cloud-starter-alibaba-nacos-discovery-2.2.2.RELEASE.jar;D:\repository\com\alibaba\nacos\nacos-client\1.3.2\nacos-client-1.3.2.jar;D:\repository\com\alibaba\nacos\nacos-common\1.3.2\nacos-common-1.3.2.jar;D:\repository\org\apache\httpcomponents\httpasyncclient\4.1.4\httpasyncclient-4.1.4.jar;D:\repository\org\apache\httpcomponents\httpcore-nio\4.4.12\httpcore-nio-4.4.12.jar;D:\repository\com\alibaba\nacos\nacos-api\1.3.2\nacos-api-1.3.2.jar;D:\repository\com\fasterxml\jackson\core\jackson-core\2.10.1\jackson-core-2.10.1.jar;D:\repository\io\prometheus\simpleclient\0.5.0\simpleclient-0.5.0.jar;D:\repository\org\yaml\snakeyaml\1.25\snakeyaml-1.25.jar;D:\repository\com\alibaba\spring\spring-context-support\1.0.9\spring-context-support-1.0.9.jar;D:\repository\org\springframework\cloud\spring-cloud-commons\2.2.5.RELEASE\spring-cloud-commons-2.2.5.RELEASE.jar;D:\repository\org\springframework\security\spring-security-crypto\5.2.1.RELEASE\spring-security-crypto-5.2.1.RELEASE.jar;D:\repository\org\springframework\cloud\spring-cloud-context\2.2.5.RELEASE\spring-cloud-context-2.2.5.RELEASE.jar;D:\repository\org\springframework\cloud\spring-cloud-starter-netflix-ribbon\2.2.5.RELEASE\spring-cloud-starter-netflix-ribbon-2.2.5.RELEASE.jar;D:\repository\org\springframework\cloud\spring-cloud-starter\2.2.5.RELEASE\spring-cloud-starter-2.2.5.RELEASE.jar;D:\repository\org\springframework\security\spring-security-rsa\1.0.9.RELEASE\spring-security-rsa-1.0.9.RELEASE.jar;D:\repository\org\bouncycastle\bcpkix-jdk15on\1.64\bcpkix-jdk15on-1.64.jar;D:\repository\org\bouncycastle\bcprov-jdk15on\1.64\bcprov-jdk15on-1.64.jar;D:\repository\org\springframework\cloud\spring-cloud-netflix-ribbon\2.2.5.RELEASE\spring-cloud-netflix-ribbon-2.2.5.RELEASE.jar;D:\repository\org\springframework\cloud\spring-cloud-netflix-archaius\2.2.5.RELEASE\spring-cloud-netflix-archaius-2.2.5.RELEASE.jar;D:\repository\org\springframework\cloud\spring-cloud-starter-netflix-archaius\2.2.5.RELEASE\spring-cloud-starter-netflix-archaius-2.2.5.RELEASE.jar;D:\repository\com\netflix\archaius\archaius-core\0.7.6\archaius-core-0.7.6.jar;D:\repository\com\google\code\findbugs\jsr305\3.0.1\jsr305-3.0.1.jar;D:\repository\commons-configuration\commons-configuration\1.8\commons-configuration-1.8.jar;D:\repository\com\netflix\ribbon\ribbon\2.3.0\ribbon-2.3.0.jar;D:\repository\com\netflix\ribbon\ribbon-transport\2.3.0\ribbon-transport-2.3.0.jar;D:\repository\io\reactivex\rxnetty-contexts\0.4.9\rxnetty-contexts-0.4.9.jar;D:\repository\io\reactivex\rxnetty-servo\0.4.9\rxnetty-servo-0.4.9.jar;D:\repository\com\netflix\hystrix\hystrix-core\1.4.3\hystrix-core-1.4.3.jar;D:\repository\javax\inject\javax.inject\1\javax.inject-1.jar;D:\repository\io\reactivex\rxnetty\0.4.9\rxnetty-0.4.9.jar;D:\repository\com\netflix\ribbon\ribbon-core\2.3.0\ribbon-core-2.3.0.jar;D:\repository\commons-lang\commons-lang\2.6\commons-lang-2.6.jar;D:\repository\com\netflix\ribbon\ribbon-httpclient\2.3.0\ribbon-httpclient-2.3.0.jar;D:\repository\org\apache\httpcomponents\httpclient\4.5.10\httpclient-4.5.10.jar;D:\repository\com\sun\jersey\jersey-client\1.19.1\jersey-client-1.19.1.jar;D:\repository\com\sun\jersey\jersey-core\1.19.1\jersey-core-1.19.1.jar;D:\repository\javax\ws\rs\jsr311-api\1.1.1\jsr311-api-1.1.1.jar;D:\repository\com\sun\jersey\contribs\jersey-apache-client4\1.19.1\jersey-apache-client4-1.19.1.jar;D:\repository\com\netflix\servo\servo-core\0.10.1\servo-core-0.10.1.jar;D:\repository\com\netflix\servo\servo-internal\0.10.1\servo-internal-0.10.1.jar;D:\repository\com\netflix\netflix-commons\netflix-commons-util\0.1.1\netflix-commons-util-0.1.1.jar;D:\repository\com\netflix\ribbon\ribbon-loadbalancer\2.3.0\ribbon-loadbalancer-2.3.0.jar;D:\repository\com\netflix\netflix-commons\netflix-statistics\0.1.1\netflix-statistics-0.1.1.jar;D:\repository\io\reactivex\rxjava\1.3.8\rxjava-1.3.8.jar;D:\repository\org\example\greeter-spring-boot-starter\1.0-SNAPSHOT\greeter-spring-boot-starter-1.0-SNAPSHOT.jar;D:\repository\org\springframework\boot\spring-boot-autoconfigure\2.2.2.RELEASE\spring-boot-autoconfigure-2.2.2.RELEASE.jar;D:\repository\cn\com\yto56\npush\tms-npush-common-dto\1.0-SNAPSHOT\tms-npush-common-dto-1.0-SNAPSHOT.jar;D:\repository\cn\com\yto56\basic\framework\yto-framework-core\3.2.18-SNAPSHOT\yto-framework-core-3.2.18-20220731.074711-170.jar;D:\repository\com\alibaba\transmittable-thread-local\2.12.2\transmittable-thread-local-2.12.2.jar;D:\repository\cn\com\yto56\basic\framework\yto-framework-model\3.2.18-SNAPSHOT\yto-framework-model-3.2.18-20220731.074703-150.jar;D:\repository\com\github\xiaoymin\knife4j-annotations\2.0.4\knife4j-annotations-2.0.4.jar;D:\repository\io\swagger\swagger-annotations\1.5.22\swagger-annotations-1.5.22.jar;D:\repository\joda-time\joda-time\2.10.5\joda-time-2.10.5.jar;D:\repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;D:\repository\com\fasterxml\jackson\dataformat\jackson-dataformat-xml\2.10.1\jackson-dataformat-xml-2.10.1.jar;D:\repository\com\fasterxml\jackson\module\jackson-module-jaxb-annotations\2.10.1\jackson-module-jaxb-annotations-2.10.1.jar;D:\repository\org\codehaus\woodstox\stax2-api\4.2\stax2-api-4.2.jar;D:\repository\com\fasterxml\woodstox\woodstox-core\6.0.2\woodstox-core-6.0.2.jar;D:\repository\javax\servlet\javax.servlet-api\4.0.1\javax.servlet-api-4.0.1.jar;D:\repository\com\google\guava\guava\18.0\guava-18.0.jar;D:\repository\com\alibaba\fastjson\1.2.83\fastjson-1.2.83.jar;D:\repository\commons-codec\commons-codec\1.13\commons-codec-1.13.jar;D:\repository\commons-io\commons-io\2.5\commons-io-2.5.jar;D:\repository\org\springframework\boot\spring-boot-starter-jdbc\2.2.2.RELEASE\spring-boot-starter-jdbc-2.2.2.RELEASE.jar;D:\repository\org\springframework\boot\spring-boot-starter\2.2.2.RELEASE\spring-boot-starter-2.2.2.RELEASE.jar;D:\repository\org\springframework\boot\spring-boot-starter-logging\2.2.2.RELEASE\spring-boot-starter-logging-2.2.2.RELEASE.jar;D:\repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;D:\repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;D:\repository\org\apache\logging\log4j\log4j-to-slf4j\2.12.1\log4j-to-slf4j-2.12.1.jar;D:\repository\org\apache\logging\log4j\log4j-api\2.12.1\log4j-api-2.12.1.jar;D:\repository\org\slf4j\jul-to-slf4j\1.7.29\jul-to-slf4j-1.7.29.jar;D:\repository\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;D:\repository\com\zaxxer\HikariCP\3.4.1\HikariCP-3.4.1.jar;D:\repository\org\springframework\spring-jdbc\5.2.2.RELEASE\spring-jdbc-5.2.2.RELEASE.jar;D:\repository\org\springframework\spring-beans\5.2.2.RELEASE\spring-beans-5.2.2.RELEASE.jar;D:\repository\org\springframework\spring-tx\5.2.2.RELEASE\spring-tx-5.2.2.RELEASE.jar;D:\repository\org\mybatis\spring\boot\mybatis-spring-boot-starter\2.1.4\mybatis-spring-boot-starter-2.1.4.jar;D:\repository\org\mybatis\spring\boot\mybatis-spring-boot-autoconfigure\2.1.4\mybatis-spring-boot-autoconfigure-2.1.4.jar;D:\repository\org\mybatis\mybatis\3.5.6\mybatis-3.5.6.jar;D:\repository\org\mybatis\mybatis-spring\2.0.6\mybatis-spring-2.0.6.jar;D:\repository\mysql\mysql-connector-java\8.0.18\mysql-connector-java-8.0.18.jar;D:\repository\org\springframework\boot\spring-boot-starter-web\2.2.2.RELEASE\spring-boot-starter-web-2.2.2.RELEASE.jar;D:\repository\org\springframework\boot\spring-boot-starter-json\2.2.2.RELEASE\spring-boot-starter-json-2.2.2.RELEASE.jar;D:\repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.10.1\jackson-datatype-jdk8-2.10.1.jar;D:\repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.10.1\jackson-datatype-jsr310-2.10.1.jar;D:\repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.10.1\jackson-module-parameter-names-2.10.1.jar;D:\repository\org\springframework\boot\spring-boot-starter-tomcat\2.2.2.RELEASE\spring-boot-starter-tomcat-2.2.2.RELEASE.jar;D:\repository\org\apache\tomcat\embed\tomcat-embed-core\9.0.29\tomcat-embed-core-9.0.29.jar;D:\repository\org\apache\tomcat\embed\tomcat-embed-el\9.0.29\tomcat-embed-el-9.0.29.jar;D:\repository\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.29\tomcat-embed-websocket-9.0.29.jar;D:\repository\org\springframework\boot\spring-boot-starter-validation\2.2.2.RELEASE\spring-boot-starter-validation-2.2.2.RELEASE.jar;D:\repository\jakarta\validation\jakarta.validation-api\2.0.1\jakarta.validation-api-2.0.1.jar;D:\repository\org\hibernate\validator\hibernate-validator\6.0.18.Final\hibernate-validator-6.0.18.Final.jar;D:\repository\org\jboss\logging\jboss-logging\3.4.1.Final\jboss-logging-3.4.1.Final.jar;D:\repository\com\fasterxml\classmate\1.5.1\classmate-1.5.1.jar;D:\repository\org\springframework\spring-web\5.2.2.RELEASE\spring-web-5.2.2.RELEASE.jar;D:\repository\org\springframework\spring-webmvc\5.2.2.RELEASE\spring-webmvc-5.2.2.RELEASE.jar;D:\repository\org\springframework\spring-aop\5.2.2.RELEASE\spring-aop-5.2.2.RELEASE.jar;D:\repository\org\springframework\spring-context\5.2.2.RELEASE\spring-context-5.2.2.RELEASE.jar;D:\repository\org\springframework\spring-expression\5.2.2.RELEASE\spring-expression-5.2.2.RELEASE.jar;D:\repository\org\springframework\boot\spring-boot-starter-data-mongodb\2.2.2.RELEASE\spring-boot-starter-data-mongodb-2.2.2.RELEASE.jar;D:\repository\org\mongodb\mongodb-driver\3.11.2\mongodb-driver-3.11.2.jar;D:\repository\org\mongodb\bson\3.11.2\bson-3.11.2.jar;D:\repository\org\mongodb\mongodb-driver-core\3.11.2\mongodb-driver-core-3.11.2.jar;D:\repository\org\springframework\data\spring-data-mongodb\2.2.3.RELEASE\spring-data-mongodb-2.2.3.RELEASE.jar;D:\repository\org\springframework\data\spring-data-commons\2.2.3.RELEASE\spring-data-commons-2.2.3.RELEASE.jar;D:\repository\org\apache\rocketmq\rocketmq-client\4.9.1\rocketmq-client-4.9.1.jar;D:\repository\org\apache\rocketmq\rocketmq-common\4.9.1\rocketmq-common-4.9.1.jar;D:\repository\org\apache\rocketmq\rocketmq-remoting\4.9.1\rocketmq-remoting-4.9.1.jar;D:\repository\io\netty\netty-all\4.1.43.Final\netty-all-4.1.43.Final.jar;D:\repository\org\apache\rocketmq\rocketmq-logging\4.9.1\rocketmq-logging-4.9.1.jar;D:\repository\commons-validator\commons-validator\1.7\commons-validator-1.7.jar;D:\repository\commons-beanutils\commons-beanutils\1.9.4\commons-beanutils-1.9.4.jar;D:\repository\commons-digester\commons-digester\2.1\commons-digester-2.1.jar;D:\repository\commons-logging\commons-logging\1.2\commons-logging-1.2.jar;D:\repository\org\apache\commons\commons-lang3\3.9\commons-lang3-3.9.jar;D:\repository\cn\hutool\hutool-all\5.7.14\hutool-all-5.7.14.jar;D:\repository\com\alibaba\easyexcel\2.2.6\easyexcel-2.2.6.jar;D:\repository\org\apache\poi\poi\3.17\poi-3.17.jar;D:\repository\org\apache\commons\commons-collections4\4.1\commons-collections4-4.1.jar;D:\repository\org\apache\poi\poi-ooxml\3.17\poi-ooxml-3.17.jar;D:\repository\com\github\virtuald\curvesapi\1.04\curvesapi-1.04.jar;D:\repository\org\apache\poi\poi-ooxml-schemas\3.17\poi-ooxml-schemas-3.17.jar;D:\repository\org\apache\xmlbeans\xmlbeans\2.6.0\xmlbeans-2.6.0.jar;D:\repository\stax\stax-api\1.0.1\stax-api-1.0.1.jar;D:\repository\cglib\cglib\3.1\cglib-3.1.jar;D:\repository\org\ow2\asm\asm\4.2\asm-4.2.jar;D:\repository\org\slf4j\slf4j-api\1.7.29\slf4j-api-1.7.29.jar;D:\repository\org\ehcache\ehcache\3.8.1\ehcache-3.8.1.jar;D:\repository\org\glassfish\jaxb\jaxb-runtime\2.3.2\jaxb-runtime-2.3.2.jar;D:\repository\org\glassfish\jaxb\txw2\2.3.2\txw2-2.3.2.jar;D:\repository\com\sun\istack\istack-commons-runtime\3.0.8\istack-commons-runtime-3.0.8.jar;D:\repository\org\jvnet\staxex\stax-ex\1.8.1\stax-ex-1.8.1.jar;D:\repository\com\sun\xml\fastinfoset\FastInfoset\1.2.16\FastInfoset-1.2.16.jar;D:\repository\com\jcraft\jsch\0.1.55\jsch-0.1.55.jar;D:\repository\org\springframework\boot\spring-boot-devtools\2.2.2.RELEASE\spring-boot-devtools-2.2.2.RELEASE.jar;D:\repository\org\springframework\boot\spring-boot\2.2.2.RELEASE\spring-boot-2.2.2.RELEASE.jar;D:\repository\com\h2database\h2\1.4.200\h2-1.4.200.jar;D:\repository\org\projectlombok\lombok\1.18.10\lombok-1.18.10.jar;D:\repository\jakarta\xml\bind\jakarta.xml.bind-api\2.3.2\jakarta.xml.bind-api-2.3.2.jar;D:\repository\jakarta\activation\jakarta.activation-api\1.2.1\jakarta.activation-api-1.2.1.jar;D:\repository\org\objenesis\objenesis\2.6\objenesis-2.6.jar;D:\repository\org\springframework\spring-core\5.2.2.RELEASE\spring-core-5.2.2.RELEASE.jar;D:\repository\org\springframework\spring-jcl\5.2.2.RELEASE\spring-jcl-5.2.2.RELEASE.jar;D:\repository\com\alicp\jetcache\jetcache-starter-redis\2.5.11\jetcache-starter-redis-2.5.11.jar;D:\repository\com\alicp\jetcache\jetcache-autoconfigure\2.5.11\jetcache-autoconfigure-2.5.11.jar;D:\repository\com\alicp\jetcache\jetcache-anno\2.5.11\jetcache-anno-2.5.11.jar;D:\repository\com\alicp\jetcache\jetcache-core\2.5.11\jetcache-core-2.5.11.jar;D:\repository\com\alicp\jetcache\jetcache-anno-api\2.5.11\jetcache-anno-api-2.5.11.jar;D:\repository\com\github\ben-manes\caffeine\caffeine\2.8.0\caffeine-2.8.0.jar;D:\repository\org\checkerframework\checker-qual\2.10.0\checker-qual-2.10.0.jar;D:\repository\com\google\errorprone\error_prone_annotations\2.3.3\error_prone_annotations-2.3.3.jar;D:\repository\com\alicp\jetcache\jetcache-redis\2.5.11\jetcache-redis-2.5.11.jar;D:\repository\redis\clients\jedis\3.1.0\jedis-3.1.0.jar;D:\repository\org\apache\commons\commons-pool2\2.7.0\commons-pool2-2.7.0.jar;D:\repository\com\esotericsoftware\kryo\4.0.2\kryo-4.0.2.jar;D:\repository\com\esotericsoftware\reflectasm\1.11.3\reflectasm-1.11.3.jar;D:\repository\com\esotericsoftware\minlog\1.3.0\minlog-1.3.0.jar;D:\repository\org\jeasy\easy-rules-core\3.3.0\easy-rules-core-3.3.0.jar;D:\repository\org\jeasy\easy-rules-support\3.3.0\easy-rules-support-3.3.0.jar;D:\repository\com\fasterxml\jackson\dataformat\jackson-dataformat-yaml\2.10.1\jackson-dataformat-yaml-2.10.1.jar;D:\repository\com\fasterxml\jackson\core\jackson-databind\2.10.1\jackson-databind-2.10.1.jar;D:\repository\com\fasterxml\jackson\core\jackson-annotations\2.10.1\jackson-annotations-2.10.1.jar;D:\repository\org\jeasy\easy-rules-mvel\3.3.0\easy-rules-mvel-3.3.0.jar;D:\repository\org\mvel\mvel2\2.4.3.Final\mvel2-2.4.3.Final.jar;D:\repository\com\bbossgroups\plugins\bboss-elasticsearch-rest-jdbc\6.5.1\bboss-elasticsearch-rest-jdbc-6.5.1.jar;D:\repository\com\bbossgroups\plugins\bboss-datatran-schedule-quartz\6.5.1\bboss-datatran-schedule-quartz-6.5.1.jar;D:\repository\com\bbossgroups\plugins\bboss-datatran-core\6.5.1\bboss-datatran-core-6.5.1.jar;D:\repository\com\bbossgroups\plugins\bboss-elasticsearch-geoip\6.5.1\bboss-elasticsearch-geoip-6.5.1.jar;D:\repository\com\bbossgroups\plugins\bboss-elasticsearch-rest-booter\6.5.1\bboss-elasticsearch-rest-booter-6.5.1.jar;D:\repository\com\maxmind\db\maxmind-db\1.4.0\maxmind-db-1.4.0.jar;D:\repository\com\maxmind\geoip2\geoip2\2.14.0\geoip2-2.14.0.jar;D:\repository\com\bbossgroups\plugins\bboss-elasticsearch-spring-boot-starter\6.5.1\bboss-elasticsearch-spring-boot-starter-6.5.1.jar;D:\repository\com\bbossgroups\plugins\bboss-elasticsearch-rest\6.5.1\bboss-elasticsearch-rest-6.5.1.jar;D:\repository\com\bbossgroups\plugins\bboss-elasticsearch-rest-entity\6.5.1\bboss-elasticsearch-rest-entity-6.5.1.jar;D:\repository\com\bbossgroups\bboss-core-entity\5.8.7\bboss-core-entity-5.8.7.jar;D:\repository\com\bbossgroups\bboss-http\5.9.3\bboss-http-5.9.3.jar;D:\repository\com\bbossgroups\bboss-core\5.8.7\bboss-core-5.8.7.jar;D:\repository\com\bbossgroups\bboss-soa\5.8.7\bboss-soa-5.8.7.jar;D:\repository\org\apache\httpcomponents\httpcore\4.4.12\httpcore-4.4.12.jar;D:\repository\org\apache\httpcomponents\httpmime\4.5.10\httpmime-4.5.10.jar;D:\repository\com\bbossgroups\bboss-persistent\5.8.7\bboss-persistent-5.8.7.jar;D:\repository\com\bbossgroups\bboss-util\5.8.7\bboss-util-5.8.7.jar;D:\repository\oro\oro\2.0.8\oro-2.0.8.jar;D:\repository\com\fasterxml\uuid\java-uuid-generator\3.1.4\java-uuid-generator-3.1.4.jar;D:\repository\cglib\cglib-nodep\3.1\cglib-nodep-3.1.jar;D:\repository\javax\transaction\jta\1.1\jta-1.1.jar;D:\repository\com\bbossgroups\bboss-velocity\5.8.7\bboss-velocity-5.8.7.jar;D:\repository\jdom\jdom\1.0\jdom-1.0.jar;D:\repository\werken-xpath\werken-xpath\0.9.4\werken-xpath-0.9.4.jar" com.example.demotest.mq.Consumer Consumer Started. 消费数据 失败 key 800 次数 0消费数据 失败 key 804 次数 0消费数据 失败 key 802 次数 0消费数据 成功 key 803 次数 0消费数据 成功 key 801 次数 0消费数据 成功 key 805 次数 0消费数据 失败 key 806 次数 0消费数据 成功 key 807 次数 0消费数据 失败 key 808 次数 0消费数据 成功 key 809 次数 0消费数据 失败 key 810 次数 0消费数据 成功 key 811 次数 0消费数据 失败 key 812 次数 0消费数据 成功 key 813 次数 0消费数据 失败 key 814 次数 0消费数据 成功 key 815 次数 0消费数据 失败 key 816 次数 0消费数据 成功 key 817 次数 0消费数据 失败 key 818 次数 0消费数据 成功 key 819 次数 0消费数据 失败 key 820 次数 0消费数据 成功 key 821 次数 0消费数据 成功 key 823 次数 0消费数据 失败 key 822 次数 0消费数据 失败 key 824 次数 0消费数据 成功 key 825 次数 0消费数据 失败 key 826 次数 0消费数据 成功 key 827 次数 0消费数据 失败 key 828 次数 0消费数据 成功 key 829 次数 0消费数据 失败 key 830 次数 0消费数据 成功 key 831 次数 0消费数据 失败 key 832 次数 0消费数据 成功 key 833 次数 0消费数据 失败 key 834 次数 0消费数据 成功 key 835 次数 0消费数据 失败 key 836 次数 0消费数据 成功 key 837 次数 0消费数据 失败 key 838 次数 0消费数据 成功 key 839 次数 0消费数据 失败 key 840 次数 0消费数据 成功 key 841 次数 0消费数据 失败 key 842 次数 0消费数据 成功 key 843 次数 0消费数据 失败 key 844 次数 0消费数据 成功 key 845 次数 0消费数据 失败 key 846 次数 0消费数据 成功 key 847 次数 0消费数据 失败 key 848 次数 0消费数据 成功 key 849 次数 0消费数据 失败 key 806 次数 1消费数据 失败 key 802 次数 1消费数据 失败 key 812 次数 1消费数据 失败 key 814 次数 1消费数据 失败 key 832 次数 1消费数据 失败 key 840 次数 1消费数据 失败 key 838 次数 1消费数据 失败 key 806 次数 2消费数据 失败 key 800 次数 2消费数据 失败 key 812 次数 2消费数据 失败 key 804 次数 2消费数据 失败 key 836 次数 2消费数据 失败 key 822 次数 2消费超过两次 返回 key 806 次数 3消费超过两次 返回 key 808 次数 3消费超过两次 返回 key 812 次数 3消费超过两次 返回 key 814 次数 3消费超过两次 返回 key 816 次数 3消费超过两次 返回 key 824 次数 3消费超过两次 返回 key 822 次数 3消费超过两次 返回 key 842 次数 3消费超过两次 返回 key 828 次数 3消费超过两次 返回 key 844 次数 3消费超过两次 返回 key 848 次数 3
可以清晰的看到 有需要重试的数据没有进来
同时查看 重试队列 数据是正常的 也都进入了重试队列
配置
默认生产者和消费者 都是按照集群方式配置 当然测试的时候是broker的方式
取消广播方式
默认生产者 16个 读写队列
消费者 默认配置 1个重试队列
补充说明
生产者 TOPIC 和重试队列的 TOIC 的 perm 都是 6 默认可写可读的
PERM 我看好多文章说的都是 2 4 6 即 2 可写 4 可读 6 可读可写 其实不准确 看官方源码就知道 它取的是计算后的数据
/**/packageorg.apache.rocketmq.common.constant; publicclassPermName { publicstaticfinalintPERM_PRIORITY=0x1<<3; publicstaticfinalintPERM_READ=0x1<<2; publicstaticfinalintPERM_WRITE=0x1<<1; publicstaticfinalintPERM_INHERIT=0x1; publicstaticStringperm2String(finalintperm) { finalStringBuildersb=newStringBuilder("---"); if (isReadable(perm)) { sb.replace(0, 1, "R"); } if (isWriteable(perm)) { sb.replace(1, 2, "W"); } if (isInherited(perm)) { sb.replace(2, 3, "X"); } returnsb.toString(); } publicstaticbooleanisReadable(finalintperm) { return (perm&PERM_READ) ==PERM_READ; } publicstaticbooleanisWriteable(finalintperm) { return (perm&PERM_WRITE) ==PERM_WRITE; } publicstaticbooleanisInherited(finalintperm) { return (perm&PERM_INHERIT) ==PERM_INHERIT; } }
再补充下
消息重试
Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer消费消息失败通常可以认为有以下几种情况:
- 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过10秒后再重试。
- 由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用sleep 30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。
RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。
解决
1、 将 consumeMessageBatchMaxSize 设为1 就解决了 其实这个批量消费不一定真的能提高多大的消费性能,前面我们已经开启了多线程,最后影响消费快慢还是我们的业务逻辑 即1条数据的消费时间 我们虽然可以批量拿到多条数据,但是当消费能力上不去的话 还是会阻塞的
2、我尝试将 retry队列中的 perm 默认为6 设置成 1 即可继承 也可以解决 但是不知道为什么 待求证
疑问
感觉官方还是可以解决一下这个问题的吧 当批量消费的时候 重试数据没有消费到 或者是我操作失误 但是网上也没有相关的文档 可以求证 希望有大佬可以解惑吧