RocketMQ相关知识知多少(二)

简介: RocketMQ相关知识知多少(二)

RocketMQ相关知识知多少(一):https://developer.aliyun.com/article/1534543


1.PushConsumer:高度封装的消费类型,消费消息仅仅通过通过消费监听器监听并返回结果。 消息的获取、消费状态提交以及消费重试都通过 RocketMQ 的客户端 SDK 完成。


  • 返回消费成功:以 Java SDK 为例,返回,表示该消息处理成功,服务端按照消费结果更新消费进度。CosumeResult.SUCCESS


  • 返回消费失败:以 Java SDK 为例,返回,表示该消息处理失败,需要根据消费重试逻辑判断是否进行重试消费。ConsumeResult.FAILURE


  • 出现非预期失败:例如抛异常等行为,该结果按照消费失败处理,需要根据消费重试逻辑判断是否进行重试消费。

使用 PushConsumer 消费者消费时,不允许使用以下方式处理消息,否则 RocketMQ 无法保证消息的可靠性。

  • 错误方式一:消息还未处理完成,就提前返回消费成功结果。 此时如果消息消费失败,RocketMQ 服务端是无法感知的,因此不会进行消费重试。
  • 错误方式二:在消费监听器内将消息再次分发到自定义的其他线程,消费监听器提前返回消费结果。 此时如果消息消费失败,RocketMQ 服务端同样无法感知,因此也不会进行消费重试。


PushConsumer 严格限制了消息同步处理及每条消息的处理超时时间,适用于以下场景:


  • 消息处理时间可预估:如果不确定消息处理耗时,经常有预期之外的长时间耗时的消息,PushConsumer 的可靠性保证会频繁触发消息重试机制造成大量重复消息。

  • 无异步化、高级定制场景:PushConsumer 限制了消费逻辑的线程模型,由客户端 SDK 内部按最大吞吐量触发消息处理。 该模型开发逻辑简单,但是不允许使用异步化和自定义处理流程。

2.简单消费者:SimpleConsumer 是一种接口原子型的消费者类型,消息的获取、消费状态提交以及消费重试都是通过消费者业务逻辑主动发起调用完成。


  • SimpleConsumer 适用于以下场景:


  • 消息处理时长不可控:如果消息处理时长无法预估,经常有长时间耗时的消息处理情况。 建议使用 SimpleConsumer 消费类型,可以在消费时自定义消息的预估处理时长,若实际业务中预估的消息处理时长不符合预期,也可以通过接口提前修改。


  • 需要异步化、批量消费等高级定制场景:SimpleConsumer 在 SDK 内部没有复杂的线程封装,完全由业务逻辑自由定制,可以实现异步分发、批量消费等高级定制场景。


  • 需要自定义消费速率:SimpleConsumer 是由业务逻辑主动调用接口获取消息,因此可以自由调整获取消息的频率,自定义控制消费速率。

五、消费者过滤

消费者订阅了某个主题后,Apache RocketMQ 会将该主题中的所有消息投递给消费者。若消费者只需要关注部分消息,可通过设置过滤条件在 Apache RocketMQ 服务端进行过滤,只获取到需要关注的消息子集,避免接收到大量无效的消息。【使用 Apache RocketMQ 的消息过滤功能,可以帮助消费者更高效地过滤自己需要的消息集合,避免大量无效消息投递给消费者,降低下游系统处理压力。】


过滤的含义指的是将符合条件的消息投递给消费者,而不是将匹配到的消息过滤掉。


Apache RocketMQ 的消息过滤功能通过生产者和消费者对消息的属性、标签进行定义,并在 Apache RocketMQ 服务端根据过滤条件进行筛选匹配,将符合条件的消息投递给消费者进行消费。

消息过滤主要通过以下几个关键流程实现:

  • 生产者:生产者在初始化消息时预先为消息设置一些属性和标签,用于后续消费时指定过滤目标。
  • 消费者:消费者在初始化及后续消费流程中通过调用订阅关系注册接口,向服务端上报需要订阅指定主题的哪些消息,即过滤条件。
  • 服务端:消费者获取消息时会触发服务端的动态过滤计算,Apache RocketMQ 服务端根据消费者上报的过滤条件的表达式进行匹配,并将符合条件的消息投递给消费者。

RocketMQ 支持Tag标签过滤和SQL属性过滤,这两种过滤方式对比如下:


六、消费重试

消费重试指的是,消费者在消费某条消息失败后,Apache RocketMQ 服务端会根据重试策略重新消费该消息,超过一定次数后若还未消费成功,则该消息将不再继续重试,直接被发送到死信队列中。

消息重试的触发条件

  • 消费失败,包括消费者返回消息失败状态标识或抛出非预期异常。


消息处理超时,包括在PushConsumer中排队超时。


消息重试策略主要行为


  • 重试过程状态机:控制消息在重试流程中的状态和变化逻辑。


  • 重试间隔:上一次消费失败或超时后,下次重新尝试消费的间隔时间。


  • 最大重试次数:消息可被重试消费的最大次数。

消息重试策略差异

根据消费者类型不同,消息重试策略的具体内部机制和设置方法有所不同,具体差异如下:


PushConsumer消费消息时,消息的几个主要状态如下:


  • Ready:已就绪状态。消息在Apache RocketMQ服务端已就绪,可以被消费者消费。
  • Inflight:处理中状态。消息被消费者客户端获取,处于消费中还未返回消费结果的状态。
  • WaitingRetry:待重试状态,PushConsumer独有的状态。当消费者消息处理失败或消费超时,会触发消费重试逻辑判断。如果当前重试次数未达到最大次数,则该消息变为待重试状态,经过重试间隔后,消息将重新变为已就绪状态可被重新消费。多次重试之间,可通过重试间隔进行延长,防止无效高频的失败。


  • Commit:提交状态。消费成功的状态,消费者返回成功响应即可结束消息的状态机。


  • DLQ:死信状态。消费逻辑的最终兜底机制,若消息一直处理失败并不断进行重试,直到超过最大重试次数还未成功,此时消息不会再重试,会被投递至死信队列。您可以通过消费死信队列的消息进行业务恢复。

SimpleConsumer消费消息时,消息的几个主要状态如下:


  • Ready:已就绪状态。消息在Apache RocketMQ服务端已就绪,可以被消费者消费。
  • Inflight:处理中状态。消息被消费者客户端获取,处于消费中还未返回消费结果的状态。
  • Commit:提交状态。消费成功的状态,消费者返回成功响应即可结束消息的状态机。
  • DLQ:死信状态。消费逻辑的最终兜底机制,若消息一直处理失败并不断进行重试,直到超过最大重试次数还未成功,此时消息不会再重试,会被投递至死信队列。您可以通过消费死信队列的消息进行业务恢复。
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
移动开发 网络协议 安全
什么是 DDos 攻击?怎样防 DDos 攻击?
DDoS(分布式拒绝服务攻击)通过大量非法请求耗尽目标服务器资源,使其无法正常服务。常见手段包括SYN Flood、HTTP Flood等。防御方法有流量清洗、集群防护、高防DNS等,阿里云提供专业DDoS高防服务,保障业务稳定运行。
|
存储 缓存 NoSQL
Redis点赞业务的设计与实现(Redis键值设计)
案例分享Redis点赞业务实现!
1776 2
Redis点赞业务的设计与实现(Redis键值设计)
|
弹性计算 数据可视化 关系型数据库
【最佳实践】Filebeat实现MySQL日志轻量化发送至Elasticsearch
在今天的文章中,我们来详细地描述如果使用Filebeat把MySQL的日志信息传输到Elasticsearch中。
4679 0
【最佳实践】Filebeat实现MySQL日志轻量化发送至Elasticsearch
|
10月前
|
算法 NoSQL 应用服务中间件
阿里面试:10WQPS高并发,怎么限流?这份答案让我当场拿了offer
在 Nacos 的配置管理界面或通过 Nacos 的 API,创建一个名为(与配置文件中 dataId 一致)的配置项,用于存储 Sentinel 的流量控制规则。上述规则表示对名为的资源进行流量控制,QPS 阈值为 10。resource:要保护的资源名称。limitApp:来源应用,default表示所有应用。grade:限流阈值类型,1 表示 QPS 限流,0 表示线程数限流。count:限流阈值。strategy:流控模式,0 为直接模式,1 为关联模式,2 为链路模式。
阿里面试:10WQPS高并发,怎么限流?这份答案让我当场拿了offer
|
缓存 网络协议 安全
什么是防火墙?详解三种常见的防火墙及各自的优缺点
什么是防火墙?详解三种常见的防火墙及各自的优缺点
1890 2
|
存储 Kubernetes Cloud Native
云原生|kubernetes|etcd集群详细介绍+安装部署+调优
云原生|kubernetes|etcd集群详细介绍+安装部署+调优
3275 1
|
Java
线程 - 一句话说明白 Java 线程池中 shutdown 和 shutdownNow 的区别
线程 - 一句话说明白 Java 线程池中 shutdown 和 shutdownNow 的区别
853 0
|
Kubernetes Windows 容器
minio上传下载
minio上传下载
409 0
|
存储 运维 Java
JUC第六讲:ThreadLocal/InheritableThreadLocal详解/TTL-MDC日志上下文实践
JUC第六讲:ThreadLocal/InheritableThreadLocal详解/TTL-MDC日志上下文实践
661 0