日志记录所有数据摄取和传输提供支持。日志记录平台的核心是分布式PubSub系统,可帮助我们的客户传输/缓冲数据并异步使用。
在这篇博客中,我们介绍了MemQ(发音为mem - queue),这是一个高效,可扩展的PubSub系统,为云开发,自2020年年中以来一直为我们提供近乎实时的数据传输用例,并补充了Kafka,同时成本效益提高了90%。
历史
近十年来,Pinterest一直依赖Apache Kafka作为唯一的PubSub系统。随着Pinterest的发展,数据量和围绕运营超大规模分布式PubSub平台的挑战也随之增加。在 Scale 上运行 Apache Kafka 让我们对如何构建可扩展的 PubSub 系统有了深刻的了解。在对 PubSub 环境的运营和可扩展性挑战进行深入调查后,我们得出了以下关键要点:
- 并非每个数据集都需要亚秒级延迟服务,延迟和成本应成反比(延迟越低,成本越高)
- PubSub 系统的存储和服务组件需要分开,以实现基于资源的独立可扩展性。
- 按读取而不是写入排序为特定的使用者用例提供了所需的灵活性(对于同一数据集,不同的应用程序可能具有不同的优先级)
- 在大多数情况下,Pinterest不需要严格的分区排序,这通常会导致可扩展性挑战。
- Kafka 中的重新平衡成本高昂,通常会导致性能下降,并对饱和群集上的客户产生负面影响。
- 在云环境中运行自定义复制的成本很高。
在2018年,我们尝试了一种新型的PubSub系统,该系统将原生利用云。2019年,我们开始正式探索如何解决PubSub可扩展性挑战的选项,并根据运营成本以及现有技术的重新设计成本评估了多种PubSub技术,以满足Pinterest的需求。我们最终得出结论,我们需要一种基于Apache Kafka,Apache Pulsar和Facebook LogDevice学习的PubSub技术,并且是为云构建的。
MemQ是一个新的PubSub系统,它增强了Pinterest的Kafka。它使用类似于Apache Pulsar和Facebook Logdevice的解耦存储和服务架构;但是,它依赖于可插拔的复制存储层,即对象存储/ DFS / NFS来存储数据。最终结果是一个PubSub系统,
- 处理 GB/s 流量
- 独立缩放、写入和读取
- 不需要昂贵的再平衡来处理流量增长
- 比我们的Kafka足迹更具成本效益90%
特点
MemQ的秘密在于,它利用微批处理和不可变写入来创建一种架构,在该架构中,存储层所需的每秒输入/输出操作(IOPS)数量大大减少,从而允许经济高效地使用像Amazon S3这样的云原生对象存储。这种方法类似于网络的分组交换(与电路交换相比,即单个大型连续数据存储,例如kafka分区)。
MemQ将连续的日志流分解为块(对象),类似于Pulsar中的分类帐,但不同之处在于它们被编写为对象并且是不可变的。这些“数据包”/“对象”的大小(在 MemQ 内部称为 Batch)在确定端到端 (E2E) 延迟方面起着重要作用。数据包越小,写入速度越快,但代价是 IOPS 越高。因此,MemQ允许可调谐的端到端延迟,但代价是更高的IOP。此体系结构的一个关键性能优势是,根据底层存储层实现读取和写入硬件的分离,允许写入和读取作为可以分布在存储层中的数据包独立扩展。
这也消除了在 Kafka 中遇到的限制,即为了恢复副本,必须从头开始重新复制分区。对于 MemQ,底层复制存储只需要恢复特定的 Batch,在发生存储故障时,其副本计数由于故障而减少。但是,由于 MemQ at Pinterest 在 Amazon S3 上运行,因此存储的恢复、分片和扩展由 AWS 处理,无需 Pinterest 进行任何手动干预。
MemQ 的组件
客户
MemQ 客户端使用种子节点发现集群,然后连接到种子节点以发现给定主题的元数据和托管 TopicProcessors 的代理,或者对于使用者,则发现通知队列的地址。
代理
与其他PubSub系统类似,MemQ具有经纪人的概念。MemQ 代理是集群的一部分,主要负责处理元数据和写入请求。
注意:MemQ 中的读取请求可以由存储层直接处理,除非使用读取代理
集群调控器
Governor 是 MemQ 集群的领导者,负责自动再平衡和 TopicProcessor 分配。集群中的任何代理都可以被选为州长,并且它使用Zookeeper与经纪人进行通信,Zookeeper也用于州长选举。
Governor 使用可插入分配算法做出分配决策。默认的评估代理上的可用容量以做出分配决策。Governor 还使用此功能来处理代理故障并还原主题的容量。
Topic & TopicProcessor
与其他PubSub系统类似,MemQ使用Topic的逻辑概念。代理上的 MemQ 主题由一个名为 TopicProcessor 的模块处理。代理可以托管一个或多个主题处理器,其中每个主题处理器实例处理一个主题。主题具有写入和读取分区。写入分区用于创建 TopicProcessors(1:1 关系),读取分区用于确定使用者处理数据所需的并行度级别。读取分区计数等于通知队列的分区数。
存储
MemQ 存储由两部分组成:
- 复制存储(对象存储/DFS)
- 通知队列(Kafka、Pulsar 等)
1. 复制存储
MemQ 允许使用可插拔的存储处理程序。目前,我们已经为 Amazon S3 实现了存储处理程序。Amazon S3 为容错按需存储提供了经济高效的解决方案。MemQ 使用 S3 上的以下前缀格式来创建高吞吐量和可扩展的存储层:
s3://<bucketname>/<(a) 批处理中第一个客户端请求 ID 的 2 字节哈希>/<(b) cluster>/topics/<topicname>
(a) = 用于在 S3 内部进行分区,以便在需要时处理更高的请求速率
(b) = MemQ 集群的名称
可用性和容错
由于 S3 是一个高度可用的 Web 级对象存储,因此 MemQ 依靠其可用性作为第一道防线。为了适应将来的S3重新分区,MemQ在前缀的第一级添加了一个两位数的十六进制哈希,创建了256个基本前缀,理论上可以由独立的S3分区处理,只是为了使其面向未来。
一致性
底层存储层的一致性决定了 MemQ 的一致性特征。对于 S3,在确认之前,对 S3 标准的每次写入 (PUT) 都保证至少复制到三个可用区 (AZ)。
2. 通知队列
MemQ 使用通知系统向使用者提供指针以定位数据。目前,我们使用 Kafka 形式的外部通知队列。将数据写入存储层后,存储处理程序将生成一条通知消息,记录写入的属性,包括其位置、大小、主题等。使用者使用此信息从存储层检索数据(批处理)。还可以使 MemQ Brokers 以牺牲效率为代价为消费者代理 Batch。通知队列还为使用者提供群集/负载平衡。
内存数据格式
MemQ 对消息和批处理使用自定义存储/网络传输格式。
MemQ 中最低的传输单位称为 LogMessage。这类似于Pulsar Message或Kafka ProducerRecord。
LogMessage 上的包装器允许 MemQ 执行不同级别的批处理。单位层次结构:
- 批次(持久性单位)
- 消息(生产者上传单位)
- 日志消息(与应用程序交互的单元)
生成数据
MemQ 生产者负责向经纪商发送数据。它使用异步调度模型,允许非阻塞发送发生,而无需等待确认。
此模型对于隐藏底层存储层的上传延迟同时保持存储级别确认至关重要。这导致了自定义MemQ协议和客户端的实现,因为我们无法使用现有的PubSub协议,这些协议依赖于同步确认。MemQ 支持三种类型的 acks:ack=0(producer fire & forget)、ack=1(Broker received)和 ack=all(存储接收)。使用 ack=all 时,复制因子 (RF) 由底层存储层确定(例如,在 S3 标准 RF=3 中 [跨三个可用区])。如果确认失败,MemQ 生产者可以显式或隐式触发重试。
存储数据MemQ Topic Processor在概念上是一个RingBuffer。此虚拟环细分为 Batch,允许简化写入。当消息通过网络到达时,它们将排队到当前可用的 Batch 中,直到 Batch 被填满或发生基于时间的触发器。批处理完成后,它将被传递给存储处理程序,以便上传到存储层(如 S3)。如果上传成功,则会通过通知队列向批处理中各个消息的确认 (ack) 发送通知(如果创建者请求 acks),并使用 AckHandler 向其各自的创建者发送通知。
使用数据MemQ 使用者允许应用程序从 MemQ 读取数据。使用者使用代理元数据 API 来发现指向通知队列的指针。我们向应用程序公开一个基于轮询的接口,其中每个轮询请求返回一个 LogMessages 迭代器,以允许读取 Batch 中的所有 LogMessages。这些批处理是使用通知队列发现的,并直接从存储层检索。
数据丢失检测**的其他功能**:将工作负载从 Kafka 迁移到 MemQ 需要对数据丢失进行严格验证。因此,MemQ具有内置的审计系统,可以有效地跟踪每个消息的端到端交付,并近乎实时地发布指标。
批处理和流统一:由于 MemQ 使用外化存储系统,因此有机会为对原始 MemQ 数据运行直接批处理提供支持,而无需将其转换为其他格式。这允许用户对 MemQ 执行临时检查,而无需担心寻道性能,只要存储层可以单独扩展读取和写入。根据存储引擎的不同,MemQ 使用者可以执行并发提取,以便为某些流情况提供更快的回填。
性能延迟
MemQ支持大小和基于时间的刷新到存储层,除了一些优化以抑制抖动之外,还可以对最大尾部延迟进行硬限制。到目前为止,我们能够使用 AWS S3 存储实现 30 秒的 p99 E2E 延迟,并且正在积极努力改善 MemQ 延迟,这增加了可以从 Kafka 迁移到 MemQ 的使用案例数量。
事实证明,S3 Standard 上的成本MemQ 比使用 i3 实例跨三个可用区使用三个可用区进行三个副本的等效 Kafka 部署便宜 90%(平均约为 80%)。这些节省来自以下几个因素:-降低 IOPS- 消除排序约束- 分离计算和存储- 由于消除了计算硬件而降低了复制成本 - 放宽了延迟约束
具有 S3 的可扩展性MemQ 可根据写入和读取吞吐量要求按需扩展。MemQ 调控器执行实时重新平衡,以确保只要可以配置计算,就有足够的写入容量可用。代理通过添加其他代理和更新流量容量要求来线性扩展。如果使用者需要额外的并行性来处理数据,则会手动更新读取分区。
在 Pinterest,我们直接在 EC2 上运行 MemQ,并根据流量和新用例要求扩展集群。
未来工作我们正在积极致力于以下领域:- 减少 MemQ 的 E2E 延迟(<5 秒),以支持更多用例- 实现与流和批处理系统的本机集成- 读取时按键排序
结论
MemQ 为 PubSub 提供了一种灵活、低成本的云原生方法。如今,MemQ 为 Pinterest 上所有 ML 训练数据的收集和传输提供支持。我们正在积极研究将其扩展到其他数据集,并进一步优化延迟。除了解决 PubSub 问题之外,MemQ 存储还可以公开使用 PubSub 数据进行批处理的能力,而不会对性能产生重大影响,从而实现低延迟批处理。
确认
如果没有Dave Burgess和Chunyan Wang的坚定支持,建立MemQ是不可能的。这也要感谢Ping-Min Lin,他一直是MemQ中错误修复和性能优化的关键推动者,实现了大规模的生产部署。