伴鱼:借助 Flink 完成机器学习特征系统的升级

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Flink 用于机器学习特征工程,解决了特征上线难的问题;以及 SQL + Python UDF 如何用于生产实践。

本文作者陈易生,介绍了伴鱼平台机器学习特征系统的升级,在架构上,从 Spark 转为 Flink,解决了特征上线难的问题,以及 SQL + Python UDF 如何用于生产实践。 主要内容为:

  1. 前言
  2. 老版特征系统 V1
  3. 新版特征系统 V2
  4. 总结

GitHub 地址
https://github.com/apache/flink
欢迎大家给 Flink 点赞送 star~

一、前言

在伴鱼,我们在多个在线场景使用机器学习提高用户的使用体验,例如:在伴鱼绘本中,我们根据用户的帖子浏览记录,为用户推荐他们感兴趣的帖子;在转化后台里,我们根据用户的绘本购买记录,为用户推荐他们可能感兴趣的课程等。

特征是机器学习模型的输入。如何高效地将特征从数据源加工出来,让它能够被在线服务高效地访问,决定了我们能否在生产环境可靠地使用机器学习。为此,我们搭建了特征系统,系统性地解决这一问题。目前,伴鱼的机器学习特征系统运行了接近 100 个特征,支持了多个业务线的模型对在线获取特征的需求。

下面,我们将介绍特征系统在伴鱼的演进过程,以及其中的权衡考量。

二、旧版特征系统 V1

特征系统 V1 由三个核心组件构成:特征管道,特征仓库,和特征服务。整体架构如下图所示:

v1 architecture

特征管道包括流特征管道批特征管道,它们分别消费流数据源和批数据源,对数据经过预处理加工成特征 (这一步称为特征工程),并将特征写入特征仓库。

  • 批特征管道使用 Spark 实现,由 DolphinScheduler 进行调度,跑在 YARN 集群上;
  • 出于技术栈的一致考虑,流特征管道使用 Spark Structured Streaming 实现,和批特征管道一样跑在 YARN 集群上。

特征仓库选用合适的存储组件 (Redis) 和数据结构 (Hashes),为模型服务提供低延迟的特征访问能力。之所以选用 Redis 作为存储,是因为:

  • 伴鱼有丰富的 Redis 使用经验;
  • 包括 DoorDash Feature Store [1] 和 Feast [2] 在内的业界特征仓库解决方案都使用了 Redis。

特征服务屏蔽特征仓库的存储和数据结构,对外暴露 RPC 接口 GetFeatures(EntityName, FeatureNames),提供对特征的低延迟点查询。在实现上,这一接口基本对应于 Redis 的 HMGET EntityName FeatureName_1 ... FeatureName_N 操作。

这一版本的特征系统存在几个问题:

  • 算法工程师缺少控制,导致迭代效率低。这个问题与系统涉及的技术栈和公司的组织架构有关。在整个系统中,特征管道的迭代需求最高,一旦模型对特征有新的需求,就需要修改或者编写一个新的 Spark 任务。而 Spark 任务的编写需要有一定的 Java 或 Scala 知识,不属于算法工程师的常见技能,因此交由大数据团队全权负责。大数据团队同时负责多项数据需求,往往有很多排期任务。结果便是新特征的上线涉及频繁的跨部门沟通,迭代效率低;
  • 特征管道只完成了轻量的特征工程,降低在线推理的效率。由于特征管道由大数据工程师而非算法工程师编写,复杂的数据预处理涉及更高的沟通成本,因此这些特征的预处理程度都比较轻量,更多的预处理被留到模型服务甚至模型内部进行,增大了模型推理的时延。

为了解决这几个问题,特征系统 V2 提出几个设计目的:

  • 将控制权交还算法工程师,提高迭代效率;
  • 将更高权重的特征工程交给特征管道,提高在线推理的效率。

三、新版特征系统 V2

特征系统 V2 相比特征系统 V1 在架构上的唯一不同点在于,它将特征管道切分为三部分:特征生成管道,特征源,和特征注入管道。值得一提的是,管道在实现上均从 Spark 转为 Flink,和公司数据基础架构的发展保持一致。特征系统 V2 的整体架构如下图所示:

v2 architecture

1. 特征生成管道

特征生成管道读取原始数据源,加工为特征,并将特征写入指定特征源 (而非特征仓库)。

  • 如果管道以流数据源作为原始数据源,则它是流特征生成管道;
  • 如果管道以批数据源作为原始数据源,则它是批特征生成管道。

特征生成管道的逻辑由算法工程师全权负责编写。其中,批特征生成管道使用 HiveQL 编写,由 DolphinScheduler 调度。流特征生成管道使用 PyFlink 实现,详情见下图:

v2 codegen

算法工程师需要遵守下面步骤:

  1. 用 Flink SQL 声明 Flink 任务源 (source.sql) 和定义特征工程逻辑 (transform.sql);
  2. (可选) 用 Python 实现特征工程逻辑中可能包含的 UDF 实现 (udf_def.py);
  3. 使用自研的代码生成工具,生成可执行的 PyFlink 任务脚本 (run.py);
  4. 本地使用由平台准备好的 Docker 环境调试 PyFlink 脚本,确保能在本地正常运行;
  5. 把代码提交到一个统一管理特征管道的代码仓库,由 AI 平台团队进行代码审核。审核通过的脚本会被部署到伴鱼实时计算平台,完成特征生成管道的上线。

这一套流程确保了:

  • 算法工程师掌握上线特征的自主权;
  • 平台工程师把控特征生成管道的代码质量,并在必要时可以对它们实现重构,而无需算法工程师的介入。

2. 特征源

特征源存储从原始数据源加工形成的特征。值得强调的是,它同时还是连接算法工程师和 AI 平台工程师的桥梁。算法工程师只负责实现特征工程的逻辑,将原始数据加工为特征,写入特征源,剩下的事情就交给 AI 平台。平台工程师实现特征注入管道,将特征写入特征仓库,以特征服务的形式对外提供数据访问服务。

3. 特征注入管道

特征注入管道将特征从特征源读出,写入特征仓库。由于 Flink 社区缺少对 Redis sink 的原生支持,我们通过拓展 RichSinkFunction [3] 简单地实现了 StreamRedisSinkBatchRedisSink,很好地满足我们的需求。

其中,BatchRedisSink 通过 Flink Operator State [4] 和 Redis Pipelining [5] 的简单结合,大量参考 Flink 文档中的 BufferingSink,实现了批量写入,大幅减少对 Redis Server 的请求量,增大吞吐,写入效率相比逐条插入提升了 7 倍 [6]。BatchRedisSink 的简要实现如下。其中,flush 实现了批量写入 Redis 的核心逻辑,checkpointedState / bufferedElements / snapshotState / initializeState 实现了使用 Flink 有状态算子管理元素缓存的逻辑。

class BatchRedisSink(
    pipelineBatchSize: Int
) extends RichSinkFunction[(String, Timestamp, Map[String, String])]
    with CheckpointedFunction {

  @transient
  private var checkpointedState
      : ListState[(String, java.util.Map[String, String])] = _

  private val bufferedElements
      : ListBuffer[(String, java.util.Map[String, String])] =
    ListBuffer.empty[(String, java.util.Map[String, String])]

  private var jedisPool: JedisPool = _

  override def invoke(
      value: (String, Timestamp, Map[String, String]),
      context: SinkFunction.Context
  ): Unit = {
    import scala.collection.JavaConverters._

    val (key, _, featureKVs) = value
    bufferedElements += (key -> featureKVs.asJava)

    if (bufferedElements.size == pipelineBatchSize) {
      flush()
    }
  }

  private def flush(): Unit = {
    var jedis: Jedis = null
    try {
      jedis = jedisPool.getResource
      val pipeline = jedis.pipelined()
      for ((key, hash) <- bufferedElements) {
        pipeline.hmset(key, hash)
      }
      pipeline.sync()
    } catch { ... } finally { ... }
    bufferedElements.clear()
  }

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    checkpointedState.clear()
    for (element <- bufferedElements) {
      checkpointedState.add(element)
    }
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
    val descriptor =
      new ListStateDescriptor[(String, java.util.Map[String, String])](
        "buffered-elements",
        TypeInformation.of(
          new TypeHint[(String, java.util.Map[String, String])]() {}
        )
      )

    checkpointedState = context.getOperatorStateStore.getListState(descriptor)

    import scala.collection.JavaConverters._

    if (context.isRestored) {
      for (element <- checkpointedState.get().asScala) {
        bufferedElements += element
      }
    }
  }

  override def open(parameters: Configuration): Unit = {
    try {
      jedisPool = new JedisPool(...)
    } catch { ... }
  }

  override def close(): Unit = {
    flush()
    if (jedisPool != null) {
      jedisPool.close()
    }
  }
}

特征系统 V2 很好地满足了我们提出的设计目的。

  • 由于特征生成管道的编写只需用到 SQL 和 Python 这两种算法工程师十分熟悉的工具,因此他们全权负责特征生成管道的编写和上线,无需依赖大数据团队,大幅提高了迭代效率。在熟悉后,算法工程师通常只需花费半个小时以内,就可以完成流特征的编写、调试和上线。而这个过程原本需要花费数天,取决于大数据团队的排期;
  • 出于同样的原因,算法工程师可以在有需要的前提下,完成更重度的特征工程,从而减少模型服务和模型的负担,提高模型在线推理效率。

四、总结

特征系统 V1 解决了特征上线的问题,而特征系统 V2 在此基础上,解决了特征上线难的问题。在特征系统的演进过程中,我们总结出作为平台研发的几点经验:

  • 平台应该提供用户想用的工具。这与 Uber ML 平台团队在内部推广的经验[7] 相符。算法工程师在 Python 和 SQL 环境下工作效率最高,而不熟悉 Java 和 Scala。那么,想让算法工程师自主编写特征管道,平台应该支持算法工程师使用 Python 和 SQL 编写特征管道,而不是让算法工程师去学 Java 和 Scala,或是把工作转手给大数据团队去做;
  • 平台应该提供易用的本地调试工具。我们提供的 Docker 环境封装了 Kafka 和 Flink,让用户可以在本地快速调试 PyFlink 脚本,而无需等待管道部署到测试环境后再调试;
  • 平台应该在鼓励用户自主使用的同时,通过自动化检查或代码审核等方式牢牢把控质量。

Reference

[1] https://doordash.engineering/2020/11/19/building-a-gigascale-ml-feature-store-with-redis/

[2] https://docs.feast.dev/feast-on-kubernetes/concepts/stores#online-store

[3] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java

[4] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#using-operator-state

[5] https://redis.io/topics/pipelining

[6] https://site-git-update-feature-system-yishengdd.vercel.app/posts/flink-bulk-insert-redis


更多 Flink 相关技术问题,可扫码加入社区钉钉交流群
第一时间获取最新技术文章和社区动态,请关注公众号~

image.png

活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99 元试用 实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制T恤;另包 3 个月及以上还有 85 折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

image.png

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
3月前
|
SQL 人工智能 API
Apache Flink 2.1.0: 面向实时 Data + AI 全面升级,开启智能流处理新纪元
Apache Flink 2.1.0 正式发布,标志着实时数据处理引擎向统一 Data + AI 平台迈进。新版本强化了实时 AI 能力,支持通过 Flink SQL 和 Table API 创建及调用 AI 模型,新增 Model DDL、ML_PREDICT 表值函数等功能,实现端到端的实时 AI 工作流。同时增强了 Flink SQL 的流处理能力,引入 Process Table Functions(PTFs)、Variant 数据类型,优化流式 Join 及状态管理,显著提升作业稳定性与资源利用率。
399 0
|
26天前
|
存储 消息中间件 人工智能
云栖实录|实时计算 Flink 全新升级 - 全栈流处理平台助力实时智能
本文根据 2025 云栖大会演讲整理而成,演讲信息如下 演讲人:黄鹏程 阿里云智能集团计算平台事业部实时计算Flink版产品负责人
120 1
云栖实录|实时计算 Flink 全新升级 - 全栈流处理平台助力实时智能
|
8月前
|
机器学习/深度学习 算法 Python
机器学习特征筛选:向后淘汰法原理与Python实现
向后淘汰法(Backward Elimination)是机器学习中一种重要的特征选择技术,通过系统性地移除对模型贡献较小的特征,以提高模型性能和可解释性。该方法从完整特征集出发,逐步剔除不重要的特征,最终保留最具影响力的变量子集。其优势包括提升模型简洁性和性能,减少过拟合,降低计算复杂度。然而,该方法在高维特征空间中计算成本较高,且可能陷入局部最优解。适用于线性回归、逻辑回归等统计学习模型。
313 7
|
5月前
|
机器学习/深度学习 存储 运维
机器学习异常检测实战:用Isolation Forest快速构建无标签异常检测系统
本研究通过实验演示了异常标记如何逐步完善异常检测方案和主要分类模型在欺诈检测中的应用。实验结果表明,Isolation Forest作为一个强大的异常检测模型,无需显式建模正常模式即可有效工作,在处理未见风险事件方面具有显著优势。
403 46
|
10月前
|
机器学习/深度学习 存储 设计模式
特征时序化建模:基于特征缓慢变化维度历史追踪的机器学习模型性能优化方法
本文探讨了数据基础设施设计中常见的一个问题:数据仓库或数据湖仓中的表格缺乏构建高性能机器学习模型所需的历史记录,导致模型性能受限。为解决这一问题,文章介绍了缓慢变化维度(SCD)技术,特别是Type II类型的应用。通过SCD,可以有效追踪维度表的历史变更,确保模型训练数据包含完整的时序信息,从而提升预测准确性。文章还从数据工程师、数据科学家和产品经理的不同视角提供了实施建议,强调历史数据追踪对提升模型性能和业务洞察的重要性,并建议采用渐进式策略逐步引入SCD设计模式。
381 8
特征时序化建模:基于特征缓慢变化维度历史追踪的机器学习模型性能优化方法
|
10月前
|
人工智能 自然语言处理 安全
通过阿里云Milvus与PAI搭建高效的检索增强对话系统
阿里云向量检索Milvus版是一款全托管的云服务,兼容开源Milvus并支持无缝迁移。它提供大规模AI向量数据的相似性检索服务,具备易用性、可用性、安全性和低成本等优势,适用于多模态搜索、检索增强生成(RAG)、搜索推荐、内容风险识别等场景。用户可通过PAI平台部署RAG系统,创建和配置Milvus实例,并利用Attu工具进行可视化操作,快速开发和部署应用。使用前需确保Milvus实例和PAI在相同地域,并完成相关配置与开通服务。
|
5月前
|
存储 分布式计算 API
基于PAI-FeatureStore的LLM embedding功能,结合通义千问大模型,可通过以下链路实现对物品标题、内容字段的离线和在线特征管理。
本文介绍了基于PAI-FeatureStore和通义千问大模型的LLM embedding功能,实现物品标题、内容字段的离线与在线特征管理。核心内容包括:1) 离线特征生产(MaxCompute批处理),通过API生成Embedding并存储;2) 在线特征同步,实时接入数据并更新Embedding至在线存储;3) Python SDK代码示例解析;4) 关键步骤说明,如客户端初始化、参数配置等;5) 最佳实践,涵盖性能优化、数据一致性及异常处理;6) 应用场景示例,如推荐系统和搜索排序。该方案支持端到端文本特征管理,满足多种语义理解需求。
176 1
|
5月前
|
Java 调度 流计算
基于Java 17 + Spring Boot 3.2 + Flink 1.18的智慧实验室管理系统核心代码
这是一套基于Java 17、Spring Boot 3.2和Flink 1.18开发的智慧实验室管理系统核心代码。系统涵盖多协议设备接入(支持OPC UA、MQTT等12种工业协议)、实时异常检测(Flink流处理引擎实现设备状态监控)、强化学习调度(Q-Learning算法优化资源分配)、三维可视化(JavaFX与WebGL渲染实验室空间)、微服务架构(Spring Cloud构建分布式体系)及数据湖建设(Spark构建实验室数据仓库)。实际应用中,该系统显著提升了设备调度效率(响应时间从46分钟降至9秒)、设备利用率(从41%提升至89%),并大幅减少实验准备时间和维护成本。
320 0

相关产品

  • 实时计算 Flink版