Timestone: Netflix 的高吞吐、低延迟优先级队列系统

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Timestone: Netflix 的高吞吐、低延迟优先级队列系统

队列系统是微服务系统的核心组件之一,本文介绍了 Netflix 内部构建的高吞吐量、低优先级队列系统。原文: Timestone: Netflix’s High-Throughput, Low-Latency Priority Queueing System with Built-in Support for Non-Parallelizable Workloads


简介

Timestone 是 Netflix 内部构建的高吞吐、低延迟优先级队列系统,以支持 Netflix 媒体编码平台Cosmos的需求。在过去的 2 年半的时间里,Timestone 的使用量一直在增加,现在还成为了 Netflix 通用工作流编排引擎Conductor的优先级队列引擎,以及用于大规模数据流水线的调度器(BDP Scheduler)。总而言之,Netflix 内部数百万个关键工作流现在都要通过 Timestone 处理。


Timestone 客户端可以创建队列,基于用户定义的截止日期和元数据对消息进行排队,然后以最早截止日期优先(EDF, earliest-deadline-first)的方式对消息进行出队处理,还支持通过条件(例如"属于队列 X 且具有元数据 Y 的消息")筛选 EDF 消息。


Timestone 与其他优先级队列的不同之处在于,它支持一种称为独占队列(exclusive queues) 的结构,这是一种将工作块标记为不可并行的方法,不需要在消费者端进行任何锁定或协调,所有事情都由后台独占队列处理。我们将在接下来的小节中详细解释这个概念。


为什么用 Timestone


当我们在 2018 年设计 Reloaded(Netflix 的媒体编码系统)的后继系统时(参见Netflix Cosmos Platform一文的"背景"部分),需要一个优先队列系统,用于在 Cosmos 的三个组件之间提供队列(图 1):


  1. API 框架(Optimus)
  2. 正向链式规则引擎(Plato)
  3. 无服务器计算层(Stratum)


image.png

图 1. 建立在 Cosmos 之上的视频编码应用程序。请注意三个 Cosmos 子系统: Optimus(将外部请求映射到内部业务模型的 API 层)、Plato(用于业务规则建模的工作流层)和 Stratum(用于运行无状态和计算密集型功能的无服务器层)。来源:Netflix Cosmos Platform


这个优先级队列系统需要满足的一些关键需求:


  1. 在任何给定时间内,一条消息只能分配给一个处理节点。在 Cosmos 中发生的工作往往是资源密集型的,并且可以触发数以千计的动作。假设数据存储副本之间存在复制延迟,刚刚由工作者 A 通过另一个节点从队列中取出的消息也会被显示为工作者 B 可取出的消息,这种情况会浪费大量计算周期。这一需求最终将一致的解决方案排除在外,意味着我们希望在队列级别上实现线性一致性(linearizable consistency)
  2. 允许非并行工作。


假设(Given) Plato 不断轮询所有工作流队列,以便执行更多的工作;


当(While) Plato 为给定项目执行工作流(或者说处理给定服务的工作请求)时;


那么(Then) Plato 就不能在该工作流上为该项目的工作处理额外的请求。否则,Plato 推理引擎将过早评估工作流,并可能将工作流迁移到不正确的状态。


因此,在 Cosmos 中存在某种不应该并行的工作,要求队列系统本身支持这种类型的访问模式,这一要求催生了独占队列的概念,我们将在"关键概念"部分解释独占队列在 Timestone 中的工作原理。


  1. 允许使用过滤器(元数据键-值对)退出消息以及队列深度查询
  2. 允许在接收消息时自动创建队列
  3. 消息在进入队列一秒内可以标记为可退出


我们之所以创建 Timestone,是因为无法找到满足这些需求的现成解决方案。


系统架构


Timestone 是基于 gRPC 的服务,通过 protocol buffers 定义服务接口以及请求、响应消息结构。应用程序的系统关系图如图 2 所示。


image.png

图 2. Timestone 系统图。箭头链接了典型 Timestone 客户端-服务器交互过程中接触到的所有组件。红色数字表示顺序步骤,相同数字表示并发步骤。


记录系统(System of record)


记录系统是一个持久化 Redis 集群。到达集群(步骤 2)的每个写请求(参见步骤 1,注意该步骤改变了队列状态,包括了出队列请求)在响应发送回服务器(步骤 3)之前被持久化到事务日志中。


在数据库内部,我们用排序集(sorted set)来表示每个队列,根据优先级对消息 id 进行排序(参见"消息"一节)。我们将消息和队列配置(参见“队列”一节)作为哈希保存在 Redis 中。所有与队列相关的数据结构(从它包含的消息到支持按筛选器出队列所需的内存二级索引)都放在同一个 Redis 分片中,通过共享一个特定于相关队列的公共前缀来实现这一点。然后我们将这个前缀编码为Redis哈希标签(hash tag)。每条消息携带一个最大 32KB 的内容(参见"消息"一节)。


Timestone 和 Redis 之间的几乎所有交互(参见"消息状态"一节)都被编写为 Lua 脚本。大多数 Lua 脚本中,我们倾向于更新大量数据结构。由于 Redis 保证每个脚本都是原子执行的,脚本成功执行可以保证系统处于一致(在 ACID 意义上)状态。


所有 API 操作都以队列为作用域,所有修改状态的 API 操作都是幂等的。


二级索引(Secondary indexes)


出于可观察性的目的,我们在 Elasticsearch 中维护的两个二级索引中维护传入消息及其状态之间转换的信息。当我们从 Redis 得到写响应时,同时(a)将这个响应返回给客户端,(b)将这个响应转换为发布到 Kafka 集群的事件,如步骤 4 所示。两个 Flink 作业(维护的每一种类型的索引都有一个)消费对应 Kafka 主题的事件,并更新 Elasticsearch 中的索引。


一个索引("current")为用户提供系统当前状态的最佳视图,而另一个索引("historic")为用户提供消息的最佳纵向视图,从而允许流经 Timestone 时跟踪消息,并回答诸如在某个状态中花费的时间和处理错误的数量等问题。我们为每条消息维护一个版本计数器,每次写操作都触发计数器递增,通过版本计数器对历史索引中的事件进行排序。事件在 Elasticsearch 集群保存特定时间。


当前在 Netflix 中的使用情况


系统出队列的负载很重,每秒有 30K 的出队列请求(RPS), P99 延迟为 45ms。相比之下,入队列请求是每秒 1.2K 和 P99 延迟是 25ms。此外经常能看到 5K RPS 的入队列突发流量,P99 延迟会增加到 85ms。自今年初以来,有 150 亿(15B)消息在 Timestone 里排队,出队列 4000 亿(400B)次,待处理消息通常达到 1000 万(10M)条。随着我们将 Reloaded(遗留媒体编码系统)的其余部分迁移到 Cosmos,使用量预计将在明年(2023 年)翻一番。


核心概念


消息(Message)


消息携带非透明有效负载(payload) 、用户定义优先级(参见"优先级"一节)、一组可选的(对于独占队列是必选的)元数据键-值对(set of metadata key-value pairs) ,可用于基于过滤器的出队,以及可选的不可见持续时间(invisibility duration) 。放入队列中的任何消息都可以从队列中取出有限次数,我们称之为尝试(attempts) ,消息的每一次出队列调用都会减少尝试次数。


优先级(Priority)


消息的优先级表示为整数值,该值越低,优先级越高。虽然应用程序可以自由使用它们认为合适的取值范围,但标准是使用以毫秒为单位的 Unix 时间戳(例如,1661990400000 表示 UTC 时间 9/1/2022 午夜)。


image.png


图 3. Cosmos 中的流编码流水线所用的PriorityClass枚举代码片段,括号中的值表示以天为单位的偏移量。


也完全可以由应用程序自己定义优先级级别。例如,Cosmos 中的流编码流水线使用邮件优先级类,如图 3 所示。属于标准类的消息使用入队时间作为其优先级,而所有其他类的优先级值按 10 年的倍数调整。优先级是在工作流规则级别设置的,但是如果请求带有 studio 标记(例如DAY_OF_BROADCAST),则可以被重写。


消息状态(Message States)


队列中的 Timestone 消息处于以下六种状态之一(图 4):


  1. invisible (不可见)
  2. pending (待处理)
  3. running (处理中)
  4. completed (已完成)
  5. canceled (已取消)
  6. errored (已出错)


通常来说,消息可以以不可见(invisible)待处理(pending) 的方式进入队列,对应消息处于不可见(invisible)待处理(pending) 状态。当不可见窗口(invisibility window)消失时,不可见消息将变为待处理状态。工作节点可以通过指定处理该消息的时间(租期)从队列中将待处理的最早截止日期优先的消息出队列,还支持批量消息出队列,从而将消息切换到处理中(running) 状态。然后,同一工作节点可以在分配的租约窗口内发出对 Timestone 的完成调用,以将消息迁移到已完成(completed) 状态,或者如果希望保持对消息的控制,则发出租约展期调用。(工作节点还可以将通常处于处理中的消息迁移到已取消状态,表示不再需要处理该消息。)如果这些调用都没有按时发出,消息将再次变为可出队的,对消息的这次尝试将结束。如果消息没有任何可用的尝试次数,将被自动迁移到已出错(errored) 状态。终止状态(terminal states) (已完成、已错误和已取消)由后台进程定期进行垃圾回收。


消息可以在工作节点调用 API 时迁移状态,也可以在 Timestone 运行后台进程时迁移状态(图 4,标为红色,定期运行),图 4 显示了完整的状态转换图。


image.png


图 4. Timestone 消息的状态迁移图。


队列(Queues)


所有传入的消息都存储在队列中,按其优先级日期排序。Timestone 可以托管任意数量用户创建的队列,并提供一组用于队列管理的 API,所有操作都围绕某个队列配置对象进行。存储在这个对象中的数据包括队列类型(参见其余部分)、应用于出队消息的租期或应用于出队消息的不可见持续时间、消息可以出队的次数,以及是否暂时阻止入队或出队。注意,消息生产者可以通过在入队期间在消息级别设置默认租期或不可见持续时间来覆盖对应配置。


Timestone 中的队列分为两种类型: 简单队列(simple)独占队列(exclusive)


创建独占队列(exclusive queue) 时,需要与用户定义的独占键(exclusivity key) (例如project)相关联。所有发布到该队列的消息都必须在其元数据中携带此键。例如,带有project=foo的消息将被队列接受,没有project键的消息将不会被接受。本例中,我们调用与独占键对应的值foo,即消息的独占值(exclusivity value) 。独占队列的约定是,在任何时间点,每个独占值最多只能有一个消费者。因此,如果示例中的基于project的独占队列中有两个消息,其中的键值对是project=foo,并且其中一个消息已经被一个工作节点获取,那么另一个消息是不可出队的。如图 5 所示。


image.png

图 5. 因为是独占队列,并且独占值 foo 已经被获取,因此即使 msg_1 具有更高的优先级,当 worker_2 发出出队调用时,也只会获取 msg_2 而不是 msg_1。


在简单队列中不会应用这种契约,也不与消息元数据键紧密耦合。简单队列可以作为典型优先级队列,简单的以最早截止日期优先的方式对消息进行排序。


我们在做什么


我们正在做的一些工作:


  1. 随着 Timestone 在 Cosmos 中使用量的增加,支持一系列队列深度查询的需求也在增加。为了解决这个问题,我们正在构建使用不同查询模型的专用查询服务。
  2. 如上所述(见"记录系统"一节),一个队列及其内容目前只能占用一个 Redis 分片。然而,热队列可能会越来越大(特别是当计算能力不足时)。我们希望支持任意大的队列,因此促使我们构建了对队列分片的支持。
  3. 消息最多可以携带 4 个键值对,目前所有这些键值对都会用来填充按筛选器出队过程中使用的二级索引。这个运算在时间和空间上都是指数级复杂度的(O(2n))。我们正在将已排序集合切换到字典排序,以减少一半索引量,并以一种更具成本效益的方式处理元数据。
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
3月前
|
消息中间件 大数据 Kafka
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
39 2
|
3月前
|
消息中间件 NoSQL 大数据
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
48 1
|
5月前
|
存储 消息中间件 NoSQL
高可用延迟队列设计与实现
高可用延迟队列设计与实现
|
6月前
|
消息中间件 缓存 并行计算
每秒钟承载600万订单级别的无锁并行计算框架 Disruptor学习
每秒钟承载600万订单级别的无锁并行计算框架 Disruptor学习
|
7月前
|
负载均衡 并行计算 Java
分布式系统中,利用并行和并发来提高整体的处理能力
分布式系统中,利用并行和并发来提高整体的处理能力
|
8月前
|
消息中间件 监控 Java
Kafka性能调优:高吞吐、低延迟的数据流
Apache Kafka作为一种高性能、分布式流处理平台,对于实时数据的处理至关重要。本文将深入讨论Kafka性能调优的关键策略和技术,通过丰富的示例代码为大家提供实际操作指南,以构建高吞吐、低延迟的数据流系统。
|
消息中间件 SQL 资源调度
|
存储 监控 Linux
Netty如何做到单机百万并发?(二)
Netty如何做到单机百万并发?(二)
Netty如何做到单机百万并发?(二)
|
消息中间件
延时队列优化 (2)
在这里新增了一个队列QC,绑定关系如下,该队列不设置TTL时间
延时队列优化 (2)