rocketMq - 串行消费过程

简介: rocketMq消费过程包括两种,分别是并发消费和有序消费,每个消费方式都可以单独拿出来进行分享,这篇文章单独用来分析串行消费问题。由于串行消费和并行消费的大体逻辑都是相同的,所以建议先看rocketMq - 并发消费过程文章,本章只会针对不同的地方进行说明。

rocketMq消费过程包括两种,分别是并发消费和有序消费,每个消费方式都可以单独拿出来进行分享,这篇文章单独用来分析串行消费问题。

由于串行消费和并行消费的大体逻辑都是相同的,所以建议先看rocketMq - 并发消费过程文章,本章只会针对不同的地方进行说明。

为什么有序消费能够保证消息被顺序消费?

为什么有序消费能够保证消息被顺序消费?

为什么有序消费能够保证消息被顺序消费?

答案是:

    1、顺序消费的顺序是有序保存在ProcessQueue的TreeMap对象中,key为消息的偏移量,也就是一个messageQueue拉取的消息有序放置在ProcessQueue当中

    2、每次消费的时候都是按序从ProcessQueue中按顺序拷贝待消费任务到临时的TreeMap对象当中

    3、消费失败后依旧会重新消费刚刚消费失败的那部分任务

    4、每次pullRequest执行完成后都会触发一次ConsumeRequest任务,会在原来的TreeMap对象中加入新的待消费的消息


img_29df4ebc9db24a78694eb761fedfc32c.png
提交有序消费ConsumeRequest

说明:参见DefaultMQPushConsumerImpl类


img_91226b720672daaf0f339699b41313ec.png
消费中需要多次加锁

说明:参见ConsumeMessageOrderlyService类

    1、takeMessages操作会按照顺序情况从processQueue的msgTreeMap中获取(TreeMap是有序对象)



img_60c8641a7ae949dc0deb3bb7d68e43b4.png

说明:参见ConsumeMessageOrderlyService类

    1、如果保证顺序消费呢,获取待消费数据的时候按照顺序进行获取放到临时TreeMap中并删除全局TreeMap里面的message对象

    2、消费成功后清除临时TreeMap里面的消息

    3、消费失败后将消息对象的重试次数+1然后重新提交ConsumeRequest,如果已经超出重试次数就丢弃。再次投递是一个延迟多少时间后消费的任务

    4、和无序消费的任务相比,没有重新发送broker的重试队列这一步



img_883bb8536b89cb8fe66b9eef3665c081.png
处理消费结果

说明:参见ConsumeMessageOrderlyService类

    1、消费成功后会持久化成功消费的偏移量,本地保存消费偏移量

    2、定时任务定时同步消费偏移量到broker进行保存

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 负载均衡 RocketMQ
RocketMQ的消费模式
在公司的技术分享中,就有聊到rocketmq的消费模式,特此总结一下。 在说消费之前,这里先说一下rocketmq中group的概念吧,一个group代表的是逻辑相同的一组实例,最可以表达这个概念的是我们将一个项目部署多个实例,那么这个项目的集群就可以称之为一个group。
1025 0
RocketMQ的消费模式
|
消息中间件 存储 Java
自顶向下学习 RocketMQ(九):回溯消费
回溯消费是指 Consumer 已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker 在向 Consumer 投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于 Consumer 系统故障,恢复后需要重新消费 1 小时前的数据,那么 Broker 要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒。
自顶向下学习 RocketMQ(九):回溯消费
|
存储 消息中间件 API
RocketMQ实战:一个新的消费组初次启动时从何处开始消费呢?
RocketMQ实战:一个新的消费组初次启动时从何处开始消费呢?
RocketMQ实战:一个新的消费组初次启动时从何处开始消费呢?
|
消息中间件 存储 Java
阿里二面:RocketMQ 消费失败了,怎么处理?
阿里二面:RocketMQ 消费失败了,怎么处理?
522 2
阿里二面:RocketMQ 消费失败了,怎么处理?
|
消息中间件 NoSQL 关系型数据库
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
实战:如何防止mq消费方消息重复消费 如果因为网络延迟等原因,mq无法及时接收到消费方的应答,导致mq重试。(计算机网络)。在重试过程中造成重复消费的问题
2745 1
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
|
消息中间件 存储 Cloud Native
云原生中间件RocketMQ-消费者消费模式之广播模式
云原生中间件RocketMQ-消费者消费模式之广播模式
887 7
|
消息中间件 SQL 存储
解析 RocketMQ 多样消费功能-消息过滤
在消息中间件的使用过程中,一个主题对应的消费者想要通过规则只消费这个主题下具备某些特征的消息,过滤掉自己不关心的消息,这个功能就叫消息过滤。
解析 RocketMQ 多样消费功能-消息过滤
|
消息中间件 存储 前端开发
同步异步调用,并谈谈消息队列mq;RocketMQ发送消息和消费消息测试类
同步调用优点: 时效性强,打电话、直播,很快可以得到结果 同步调用的问题:
625 1
|
消息中间件 缓存 Java
rocketmq消费源码
rocketmq消费源码
374 0
rocketmq消费源码
|
消息中间件 存储 RocketMQ
阿里二面:RocketMQ同一个消费组内的消费者订阅量不同tag,会有问题吗?
阿里二面:RocketMQ同一个消费组内的消费者订阅量不同tag,会有问题吗?
743 0
阿里二面:RocketMQ同一个消费组内的消费者订阅量不同tag,会有问题吗?
下一篇
无影云桌面