Pulsar与Kafka消费模型对比

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: kafkakafka 属于 Stream 的消费模型,为了支持多 partition 的消费关系,引入了 consumer group 的概念,同时支持在消费端动态的 reblance 操作,当多个 Consumer 订阅了同一个 Topic 时,会根据分区策略进行消费者订阅分区的重分配。

kafka

kafka 属于 Stream 的消费模型,为了支持多 partition 的消费关系,引入了 consumer group 的概念,同时支持在消费端动态的 reblance 操作,当多个 Consumer 订阅了同一个 Topic 时,会根据分区策略进行消费者订阅分区的重分配。只要 consumer-group 与 topic 之间的关系发生变更,就会动态触发 reblance 操作,诸如:

  • 增加或减少 topic 中 partition 的数目
  • consumer-group 中的 consumer 数减少
  • consumer-group 与 topic 之间的订阅关系发生变更

等等

引入 reblance 的好处在于,当订阅关系发生变更时,用户无需重新启动系统,就可以实现订阅关系的变更,相当于 kafka 将这种分配的权利从服务端下放到客户端中的 consumer 来管理,这样用户就可以自定义自己的分配方案。

pulsar

类似 kafka 这样的 Stream MQ,更多时候适合做离线业务的处理与分析,很多线上业务会使用 Active MQ 这样 Queue 的 MQ。为了同时兼容这两种消费模型,pulsar 做了一层消费层的抽象,统一了 Queue 和 Stream 这两种消费模型,具体如下图所示:

image

其中,Exclusive 和 Failover 属于 Stream 的消费模型,Share 属于 Queue 的消费模型。在写此文章时,pulsar 最新版本为 2.3.1,Key_Shared 属于pulsar 新增加的一种订阅模型,在之后的文章中,我们会单独对 Key_shared 订阅模型做单独的分享,这里不在赘述。

对 Stream 支持的对比

由于 kafka 不支持 Queue 类型的消费模型,所以 Share 这种形式在这里不做对比。下面,和大家一起讨论以下在 Stream 下 pulsar 与 kafka 的消费模型。

如下图所示,左边为 pulsar 在 Failover 和 Exclusive 下的消费情况,右边为 kafka 的消费模型。

image

假设目前有一个 topic,topic name 为 topic1,有 5 个partition,分别为:topic1-p1,topic1-p2,topic1-p3,topic1-p4,topic1-p5,在 kafka 中,使用了 consumer-group 且该 group 下有三个 consumer,上文中提到,kafka 支持 reblance 机制,所以当 consumer-2 与 consumer-3 加入 consumer-group 的过程中,会动态分摊之前 consumer-1 的消费压力,表现为如上图右半部分所示,cousumer-1 消费 topic1-p1 和 ropic1-p2,consumer-2 消费 topic1-p3 和 topic1-p4,consumer-3 消费 topic1-p5 。所以当用户不断的往 consumer-group 中添加 consumer 时,利用 kafka 的 reblance 机制,是可以让用户动态指定具体哪一个 consumer 来消费 topic1 中的哪些 partition。

在 pulsar 中,你可以将 subscribe 理解为 kafka 中的 consumer-group,如果用户在启动 consumer 时,指定的 subscribe-name 是相同的,说明这两个 consumer 属于同一个订阅组,代码示例如下:


Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic("topic-1").subscriptionName("my-subscriber-name")
                .subscriptionType(SubscriptionType.Failover)
                .subscribe();

Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic("topic-2").subscriptionName("my-subscriber-name")
                .subscriptionType(SubscriptionType.Failover)
                .subscribe();

如上图示例所示,在同一个订阅组下,启动三个 consumer,在 pulsar 中,每一个 consumer 都会去订阅 topic1 中的 5 个 partition,所以每个 consumer 都会去启动 5 个 sub-consumer,在 failover 的订阅模型下,会使用 hashcode 的形式,将 5 个 partition 分配给三个 consumer 来消费,pulsar 将当前正在消费的 sub-consumer 看作是处于 leader 状态的 consumer,剩余未工作的 sub-consumer 作为从节点,当 leader 状态的 consumer 由于某些原因无法工作时,处于从状态的 sub-consumer 会去接替 leader 的 consumer,并继续工作。可以发现,kafka 加入 reblance 的机制,允许用户自己指定哪些 consumer 来消费 哪些 partition,在 pulsar 中,这个工作由 failover 的机制来完成,它通过 hash 的形式,将 consumer 分配到不同的 sub-consumer 中来执行。

现在,验证一下上述所描述的内容。

场景一

  1. 以 standalone 的形式启 pulsar

$ docker run -it \
  -p 6650:6650 \
  -p 8080:8080 \
  -v $PWD/pulsardata:/pulsar/data \
  apachepulsar/pulsar:2.3.0 \
  bin/pulsar standalone
  1. 创建一个 topic,partition 的数目为 4
$ ./bin/pulsar-admin topics mytopic1 create-partitioned-topic -p 4

以 failover 的订阅类型,启动 3 个 consumer,并指定他们为同一个订阅组,即-s sub-1


$ ./bin/pulsar-client consume mytopic1 -s sub-1 -n 0 -t Failover
  1. 启动 producer,发送 10 条数据到 mytopic1
$ ./bin/pulsar-client produce mytopic1 -n 10 -m "hello-pulsar"

结果如下所示:

image

可以看到,consumer1 接收到 2 条消息,consumer2 接收到 5 条消息,consumer3 接收到 3 条消息。效果和我们所预期的是一致的。

上述情况是因为在 producer 发送之前,就已经启动好三个 consumer 来消费消息,所以 pulsar 会以 hash 的形式将消息分发到三个 consumer 中来消费。

场景二

以 Exclusive 的订阅形式启动两个 consumer,效果如下:


./bin/pulsar-client consume mytopic1 -s sub-1 -n 0 -t Exclusive

image

可以看到,当启动 consumer2 时,会报错 Exclusive consumer is already connected,这是因为,Failover 的订阅模式下,其它的 consumer 会以 “从” consumer 的形态存在,但是 Exclusive 只允许一个 consumer 订阅一个 topic。

5万人关注的大数据成神之路,不来了解一下吗?
5万人关注的大数据成神之路,真的不来了解一下吗?
5万人关注的大数据成神之路,确定真的不来了解一下吗?

欢迎您关注《大数据成神之路》
目录
相关文章
|
消息中间件 Java Kafka
Spring Boot集成Kafka动态创建消费者与动态删除主题(实现多消费者的发布订阅模型)
Spring Boot集成Kafka动态创建消费者与动态删除主题(实现多消费者的发布订阅模型)
17364 1
Spring Boot集成Kafka动态创建消费者与动态删除主题(实现多消费者的发布订阅模型)
|
消息中间件 存储 Kubernetes
kafka/pulsar on k8s
kafka/pulsar on k8s
kafka/pulsar on k8s
|
5月前
|
消息中间件 存储 网络协议
Kafka 线程模型痛点攻克: 提升分区写入 2 倍性能
Apache Kafka的单分区写入性能在某些严格保序场景中至关重要,但其现有线程模型限制了性能发挥。本文分析了Kafka的串行处理模型,包括SocketServer、KafkaChannel、RequestChannel等组件,指出其通过KafkaChannel状态机确保请求顺序处理,导致处理效率低下。AutoMQ提出流水线处理模型,简化KafkaChannel状态机,实现网络解析、校验定序和持久化的阶段间并行化,提高处理效率。测试结果显示,AutoMQ的极限吞吐是Kafka的2倍,P99延迟降低至11ms。
111 3
Kafka 线程模型痛点攻克: 提升分区写入 2 倍性能
|
消息中间件 存储 算法
MQ - 闲聊MQ一二事儿 (Kafka、RocketMQ 、Pulsar )
MQ - 闲聊MQ一二事儿 (Kafka、RocketMQ 、Pulsar )
361 0
|
消息中间件 Java Kafka
聊聊 Kafka: Consumer 源码解析之 poll 模型
聊聊 Kafka: Consumer 源码解析之 poll 模型
871 0
|
消息中间件 缓存 Java
聊聊 Kafka: Producer 的网络模型
聊聊 Kafka: Producer 的网络模型
135 0
|
消息中间件 存储 Kafka
Kafka 实战开篇-讲解架构模型、基础概念以及集群搭建(下)
Kafka 实战开篇-讲解架构模型、基础概念以及集群搭建(下)
177 0
|
消息中间件 NoSQL 中间件
Kafka 实战开篇-讲解架构模型、基础概念以及集群搭建(上)
Kafka 实战开篇-讲解架构模型、基础概念以及集群搭建
327 0
|
消息中间件 存储 运维
系列二:次时代Kafka与Pulsar该如何选择?
感谢大家支持,目前新书已上架各大线上平台!! 多谢开发者社区对此的支持。感谢机械工业出版社编辑老师长期的指导。感谢Tencent同事们的指点与陪伴。
1163 0
|
消息中间件 存储 Kafka
通过 KoP 将 Kafka 应用迁移到 Pulsar
KoP(Pulsar on Kafka)通过在 Pulsar Broker 上引入 Kafka 协议处理程序,为 Apache Pulsar 带来原生 Apache Kafka 协议支持。 通过将 KoP 协议处理程序添加到您现有的 Pulsar 集群,您可以将现有的 Kafka 应用程序和服务迁移到 Pulsar,而无需修改代码。 这使 Kafka 应用程序能够利用 Pulsar 的强大功能,
440 0