当运行rocketmq-consume-example ,通过日志发现input1 channel 处理消息并非顺序的,日志如下:
When rocketmq-consume-example is run, we find that the messages processed by input1 channel are not in order according to the log, the log is as follows:
2019-03-19 17:17:23.918 DEBUG 12100 --- [MessageThread_1] s.b.r.c.RocketMQListenerBindingContainer : received msg: MessageExt [queueId=4, storeSize=264, queueOffset=0, sysFlag=0, bornTimestamp=1552979519307, bornHost=/192.168.10.1:57028, storeTimestamp=1552979521004, storeHost=/192.168.10.136:10911, msgId=C0A80A8800002A9F00000000000030B7, commitLogOffset=12471, bodyCRC=302790764, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='test-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, id=9eff547b-c01d-1335-1c5e-de2c92631802, UNIQ_KEY=C0A82B544FE018B4AAC25FF56B4B0003, WAIT=false, contentType=application/json, TAGS=tagStr, timestamp=1552979519307}, body=[109, 115, 103, 45, 52], transactionId='null'}] input1 receive: msg-4 2019-03-19 17:17:23.918 INFO 12100 --- [MessageThread_1] s.b.r.c.RocketMQListenerBindingContainer : consume C0A82B544FE018B4AAC25FF56B4B0003 cost: 0 ms 2019-03-19 17:17:23.919 DEBUG 12100 --- [MessageThread_1] s.b.r.c.RocketMQListenerBindingContainer : received msg: MessageExt [queueId=3, storeSize=252, queueOffset=0, sysFlag=0, bornTimestamp=1552979519304, bornHost=/192.168.10.1:57028, storeTimestamp=1552979521001, storeHost=/192.168.10.136:10911, msgId=C0A80A8800002A9F0000000000002FBB, commitLogOffset=12219, bodyCRC=208186831, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='test-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, id=ea03a763-2f8c-70b0-584a-6db50d616f70, UNIQ_KEY=C0A82B544FE018B4AAC25FF56B480002, WAIT=false, contentType=application/json, timestamp=1552979519304}, body=[109, 115, 103, 45, 51], transactionId='null'}] input1 receive: msg-3 2019-03-19 17:17:23.919 INFO 12100 --- [MessageThread_1] s.b.r.c.RocketMQListenerBindingContainer : consume C0A82B544FE018B4AAC25FF56B480002 cost: 0 ms 2019-03-19 17:17:23.919 DEBUG 12100 --- [MessageThread_1] s.b.r.c.RocketMQListenerBindingContainer : received msg: MessageExt [queueId=2, storeSize=279, queueOffset=0, sysFlag=0, bornTimestamp=1552979519301, bornHost=/192.168.10.1:57028, storeTimestamp=1552979520999, storeHost=/192.168.10.136:10911, msgId=C0A80A8800002A9F0000000000002EA4, commitLogOffset=11940, bodyCRC=963054615, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='test-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, id=5d14e165-87ae-09ca-a2a9-f3d868e3933d, UNIQ_KEY=C0A82B544FE018B4AAC25FF56B450001, WAIT=false, contentType=application/json, TAGS=tagObj, timestamp=1552979519301}, body=[123, 34, 105, 100, 34, 58, 50, 44, 34, 98, 97, 114, 34, 58, 34, 102, 111, 111, 34, 125], transactionId='null'}] input1 receive: {"id":2,"bar":"foo"} 2019-03-19 17:17:23.919 INFO 12100 --- [MessageThread_1] s.b.r.c.RocketMQListenerBindingContainer : consume C0A82B544FE018B4AAC25FF56B450001 cost: 0 ms 2019-03-19 17:17:23.919 DEBUG 12100 --- [MessageThread_1] s.b.r.c.RocketMQListenerBindingContainer : received msg: MessageExt [queueId=1, storeSize=264, queueOffset=0, sysFlag=0, bornTimestamp=1552979519241, bornHost=/192.168.10.1:57028, storeTimestamp=1552979520948, storeHost=/192.168.10.136:10911, msgId=C0A80A8800002A9F0000000000002D9C, commitLogOffset=11676, bodyCRC=1650904291, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='test-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, id=6592c583-41a8-b688-444b-832d5da61ace, UNIQ_KEY=C0A82B544FE018B4AAC25FF56B090000, WAIT=false, contentType=application/json, TAGS=tagStr, timestamp=1552979518998}, body=[109, 115, 103, 45, 49], transactionId='null'}] input1 receive: msg-1 2019-03-19 17:17:23.920 INFO 12100 --- [MessageThread_1] s.b.r.c.RocketMQListenerBindingContainer : consume C0A82B544FE018B4AAC25FF56B090000 cost: 0 ms 2019-03-19 17:17:23.926 DEBUG 12100 --- [MessageThread_1] s.b.r.c.RocketMQListenerBindingContainer : received msg: MessageExt [queueId=5, storeSize=279, queueOffset=0, sysFlag=0, bornTimestamp=1552979519310, bornHost=/192.168.10.1:57028, storeTimestamp=1552979521007, storeHost=/192.168.10.136:10911, msgId=C0A80A8800002A9F00000000000031BF, commitLogOffset=12735, bodyCRC=1554687703, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='test-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, id=ec195550-05c3-23bc-577e-2f63e79b42ac, UNIQ_KEY=C0A82B544FE018B4AAC25FF56B4E0004, WAIT=false, contentType=application/json, TAGS=tagObj, timestamp=1552979519310}, body=[123, 34, 105, 100, 34, 58, 53, 44, 34, 98, 97, 114, 34, 58, 34, 102, 111, 111, 34, 125], transactionId='null'}] input1 receive: {"id":5,"bar":"foo"} 2019-03-19 17:17:23.926 INFO 12100 --- [MessageThread_1] s.b.r.c.RocketMQListenerBindingContainer : consume C0A82B544FE018B4AAC25FF56B4E0004 cost: 0 ms
application.yml如下:
logging: level: org: springframework: cloud: stream: binder: rocketmq: DEBUG
management: endpoint: health: show-details: always endpoints: web: exposure: include: '*' server: port: 28082 spring: application: name: rocketmq-consume-example cloud: stream: bindings: input1: content-type: text/plain destination: test-topic group: test-group1 input2: consumer: concurrency: 20 maxAttempts: 1 content-type: text/plain destination: test-topic group: test-group2 input3: consumer: concurrency: 20 content-type: application/json destination: test-topic group: test-group3 input4: consumer: concurrency: 5 content-type: text/plain destination: TransactionTopic group: transaction-group rocketmq: binder: name-server: 192.168.10.136:9876 bindings: input1: consumer: orderly: true input2: consumer: orderly: false tags: tagStr input3: consumer: tags: tagObj
the messages as follow in mq-console: Message ID | Tag | Key | StoreTime | Operation C0A82B544FE018B4AAC25FF56B4E0004 | tagObj | | 2019-03-19 15:12:01 | MESSAGE DETAIL C0A82B544FE018B4AAC25FF56B4B0003 | tagStr | | 2019-03-19 15:12:01 | MESSAGE DETAIL C0A82B544FE018B4AAC25FF56B480002 | | | 2019-03-19 15:12:01 | MESSAGE DETAIL C0A82B544FE018B4AAC25FF56B450001 | tagObj | | 2019-03-19 15:12:00 | MESSAGE DETAIL C0A82B544FE018B4AAC25FF56B090000 | tagStr | | 2019-03-19 15:12:00
原提问者GitHub用户crazythinking
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
有序消费意味着所有消息都应该存储在同一个消息队列中,默认的MessageQueueSelector是RocketMQTemplate中的SelectMessageQueueByHash
原回答者GitHub用户fangjian0423
阿里云拥有国内全面的云原生产品技术以及大规模的云原生应用实践,通过全面容器化、核心技术互联网化、应用 Serverless 化三大范式,助力制造业企业高效上云,实现系统稳定、应用敏捷智能。拥抱云原生,让创新无处不在。
你好,我是AI助理
可以解答问题、推荐解决方案等
评论
全部评论 (0)