Pulsar Consumer实现介绍

简介: Pulsar-Consumer “Pulsar is a distributed pub-sub messaging platform with a very flexible messaging model and an intuitive client API.” Pulsar是pub-sub模式的分布式消息平台,拥有灵活的消息模型和直观的客户端API。

Pulsar-Consumer

“Pulsar is a distributed pub-sub messaging platform with a very flexible messaging model and an intuitive client API.”

Pulsar是pub-sub模式的分布式消息平台,拥有灵活的消息模型和直观的客户端API。

Pulsar由雅虎开发并开源的下一代消息系统,目前是Apache软件基金会的孵化器项目。

本片文章简单介绍Pulsar的Consumer,包含以下内容:

  • Consumer的体系
  • 消费逻辑的实现

1. Consumer体系

A consumer is a process that attaches to a topic via a subscription and then receives messages.

Consumer通过订阅关系绑定到Topic(和Producer类似,都是绑定到一个Topic上),并接收消息。

Consumer支持:

  • 同步接收消息:阻塞用户线程等待消息
  • 异步接收消息:异步等待消息(通过Future返回消息)
  • 通过MessageListener返回消息:接收消息后回调用户的MessageListener

Consumer提供了三类获取消息的方式,其中异步的方式包含通过Future异步等待消息和通过MessageListener被动接收消息。MessageListener和另外两种方式是互斥的,一旦Consumer注册了MessageListener接口,则必须通过MessageListener处理消息,主动触发receive获取消息将抛出异常。

Consumer的继承关系:

  • Consumer:定义了消费者相关的接口
  • ConsumerBase:接口中基础方法的实现,抽象类
  • ConsumerImpl:在ConsumerBase基础上的Consumer具体实现
  • MultiTopicsConsumerImpl:组合多个ConsumerImpl完成对多Topic/Partition的消费

Consumer的设计和Producer是一致的,通过接口定义行为,基础类实现基本能力,在通过组合的方式来实现消费多个Topic/Partition(Producer则是像多个Topic发送消息)。

1.1 消费进度提交

Consumer处理消息后需要发送acknowledgement到Broker,这样Broker可以丢弃消息(应该是移动消费offset的操作,类似RocketMQ,并不是真正的删除消息)。支持单挑消息提交或者批量提交,批量提交则以最后一条消息的offset为准。(只是记录一个offset比较某个位置之前的消息都已经被Consumer处理,所以批量提交其实只是把最大的offset提交)

1.2 订阅模型

订阅模型决定了消息时如何被投递给Consumer的。在Pulsar中,订阅模型有: exclusive、shared、 failover。

Exclusive

只能有一个Consumer绑定到订阅关系上,其他Consumer尝试绑定到订阅关系上时会报错(Exclusive是默认的订阅模型)。

Shared

在Shared模型中,多个Consumer可以绑定到一个订阅关系上。消息按照round-robin模式被投递给各个Consumer。若某个Consumer宕机,被投递给该Consumer的未被ACK(没有acknowledgement)的消息将被重新投递给其他的Consumer进行消费。

Shared模式带来的限制:

  1. 消息时按照round-robin模式投递给各个Consumer的,所以消息顺序无法得到保障
  2. 同样因为round-robin模式,无法使用批量提交acknowledgement的功能(如上图Consumer C-3如果提交了m4会导致m3被标记为已经消费,但实际Consumer C1可能还没处理m3)

failover

在Failover模型中,多个Consumer可以绑定到一个订阅关系上,但是只有一个称为Master Consumer的消费者能消费消息。对多个Consumer按照name进行排序,第一个Consumer则为Master Consumer。

在Master Consumer失效(比如断开连接)后,Master Consumer未提交的消息和后续的消息会提交给后续的Consumer。

2. 消费逻辑的实现

Consumer获取消息的核心API有以下两个,分别实现同步获取消息和异步获取消息:

/**
     * Receives a single message.
     * <p>
     * This calls blocks until a message is available.
     *
     * @return the received message
     * @throws PulsarClientException.AlreadyClosedException
     *             if the consumer was already closed
     * @throws PulsarClientException.InvalidConfigurationException
     *             if a message listener was defined in the configuration
     */
    Message<T> receive() throws PulsarClientException;

    /**
     * Receive a single message
     * <p>
     * Retrieves a message when it will be available and completes {@link CompletableFuture} with received message.
     * </p>
     * <p>
     * {@code receiveAsync()} should be called subsequently once returned {@code CompletableFuture} gets complete with
     * received message. Else it creates <i> backlog of receive requests </i> in the application.
     * </p>
     *
     * @return {@link CompletableFuture}<{@link Message}> will be completed when message is available
     */
    CompletableFuture<Message<T>> receiveAsync();

MessageListener则通过ConsumerBuilder接口进行设置并传入到Consumer的构造方法中。

这三个API都由ConsumerImpl#messageReceived触发,即Consumer接收到消息后根据请求的类型来决定:

  • 同步获取消息的,将消息放入内存队列,被挂起的线程会从队列中获取消息
  • 异步获取消息的,执行callback将消息放入future
  • 通过MessageListener处理消息的,通过ListenerExecutor执行逻辑

可见Pulsar在消费模式上处理是统一的,即无论客户端采用何种方式进行消息的接收,消息统一由服务端进行“推送”,而在Consumer内部根据用户请求的类型进行处理。

通过ConsumerImpl#messageReceived的实现可以发现Pulsar的消息消费是一种“推”的模型,这和RocketMQ的“拉”的模型差异是很大的(RocketMQ采用一种Long-Polling的方式,由Consumer主动发起请求从服务端获取数据,若服务端有需要处理的消息,请求立即返回;如果没有消息,这个请求会在服务单阻塞一段时间,直到新消息到达或者请求即将超时,返回给客户端)。

Consumer获取消息的模型

具体看Pulsar-Consumer获取消息的代码实现会发现它也不是一种纯粹的,类似淘宝Notify的推的模式,而是一种推拉结合的方式,示意如下:

  1. Consumer向Broker发送FLOW请求,通知Broker可以推送消息给Consumer
  2. Broker将消息通过MESSAGE请求将消息推送给Consumer

这是一个反复的过程,每次Consumer接收消息处理后都会继续发送FLOW请求给Broker。

这是在RocketMQ或者Kafka的设计中都没有的一种方式,这种方式进行一定的拓展则可以实现类似akka的Dynamic Push/Pull模式(详见公众号历史文章:《Push or Pull?》)。

在阅读Pulsar Consumer部分代码的时候还发现非常有趣的一点,当你搜索“Consumer”时会出现一个Consumer接口和一个Consumer类:

  • 接口: org.apache.pulsar.client.api.Consumer
  • 类: org.apache.pulsar.broker.service.Consumer

Consumer接口是Client模块定义Consumer行为的,为什么在Broker模块会有一个Consumer类?

实际在Broker端会给链接上来的Consumer构造一个对应的Consumer对象,维护远端的Consumer的链接等信息。所有对远端的Consumer的操作会封装在Broker端的Consumer中。这样可以更好的实现代码的可插拔性,降低耦合,提升代码的可测试性。比如在测试Broker端的逻辑时,只需要Mock一个Consumer类来模拟各种正常和网络异常的情况,而不需要真正的启动一个Consumer。

总结

本文主要是介绍一下Pulsar Consumer模块的相关概念和一些模型,没有深入的解读代码实现。Pulsar Consumer的实现方式还是非常有趣的,和大家比较熟悉的RocketMQ的实现方式差异比较大,值得一读。

如果本文对您有帮助,点一下右下角的“推荐”
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
Java Spring 容器
【Java】Spring如何扫描自定义的注解?
【Java】Spring如何扫描自定义的注解?
326 0
|
4月前
|
存储 缓存 人工智能
Mooncake 最新进展:SGLang 和 LMCache 基于 Mooncake 实现高效 PD 分离框架
Mooncake 的架构设计兼具高性能和灵活性,为未来的扩展性和生态建设奠定了坚实基础。
|
2月前
|
缓存 监控 搜索推荐
301重定向实现原理全面解析:从HTTP协议到SEO最佳实践
301重定向是HTTP协议中的永久重定向状态码,用于告知客户端请求的资源已永久移至新URL。它在SEO中具有重要作用,能传递页面权重、更新索引并提升用户体验。本文详解其工作原理、服务器配置方法(如Apache、Nginx)、对搜索引擎的影响及最佳实践,帮助实现网站平稳迁移与优化。
434 68
|
11月前
|
Web App开发 前端开发 JavaScript
探索Python科学计算的边界:利用Selenium进行Web应用性能测试与优化
【10月更文挑战第6天】随着互联网技术的发展,Web应用程序已经成为人们日常生活和工作中不可或缺的一部分。这些应用不仅需要提供丰富的功能,还必须具备良好的性能表现以保证用户体验。性能测试是确保Web应用能够快速响应用户请求并处理大量并发访问的关键步骤之一。本文将探讨如何使用Python结合Selenium来进行Web应用的性能测试,并通过实际代码示例展示如何识别瓶颈及优化应用。
465 5
|
存储 NoSQL Java
教程:Spring Boot与RocksDB本地存储的整合方法
教程:Spring Boot与RocksDB本地存储的整合方法
|
缓存 Java
Electron V8排查问题之避免V8FatalErrorCallback崩溃问题如何解决
Electron V8排查问题之避免V8FatalErrorCallback崩溃问题如何解决
247 0
|
Kubernetes Ubuntu Docker
Kubernetes 审计(Auditing)
在 Kubernetes 1.22.2 环境中实施审计策略可以帮助管理员监控和记录集群中的资源操作,确保集群的安全性和符合性。通过启用审计 Admission Controller 和配置相应的审计策略,我们可以灵活地控制审计记录的格式和范围。 致力于一条龙式的为您解决问题
188 0
|
JSON 自然语言处理 搜索推荐
开发一款专属的 VSCode 代码提示插件
作为前端开发者一定用过VsCode这款利器,而其强大的插件能力无疑更是让我们深深的爱上了它。据不完全统计,VsCode插件市场中的插件数量已经超过了3万,由此可见大家的热情有多高。其中涉及到各种各样功能的插件,有主题曲相关的,有代码开发相关的,比如代码片段、Git插件、tslint等等。作为开发者,肯定用过各种各样的代码提示的插件,代表性的有TabNine、Copilot等等。今天就让我们来自己动手,开发一款专属的代码提示插件。毕竟别人的再好也是别人的, 属于自己的才是最好的。
3179 1
开发一款专属的 VSCode 代码提示插件
|
存储 运维 算法
PolarDB-X 一致性共识协议 (X-Paxos)
近几年NewSQL和云原生数据库的不断兴起,极大地推动了关系数据库和一致性协议的结合,PolarDB-X也是在这样的背景下应运而生。
2192 0
PolarDB-X 一致性共识协议 (X-Paxos)
|
XML Java Maven
解决SSM项目打包没有mapper.xml文件的问题
IDEA的maven项目中,默认源代码目录下(src/main/java目录)的xml等资源文件并不会在编译的时候一块打包进classes文件夹,而是直接舍弃掉。如果使用的是Eclipse,Eclipse的src目录下的xml等资源文件在编译的时候会自动打包进输出到classes文件夹。
489 1

热门文章

最新文章