Flink ML API,为实时机器学习设计的算法接口与迭代引擎

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 林东、高赟在 FFA 2021 的分享。

摘要:本文整理自阿里巴巴高级技术专家林东、阿里巴巴技术专家高赟(云骞)在 Flink Forward Asia 2021 核心技术专场的演讲。主要内容包括:

  1. 面向实时机器学习的 API
  2. 流批一体的迭代引擎
  3. Flink ML 生态建设

点击查看直播回放 & 演讲PDF

一、面向实时机器学习的 API

img

Flink ML API 指的是提供给用户使用算法的接口。通过把所有算法打包为统一的 API 提供给用户,让所有使用者的体验保持一致,也能降低学习和理解算法的成本,此外算法之间也可以更好地交互和兼容。

举个例子,在 Flink ML API 中提供一些基础的功能类,通过使用这些功能类可以把不同算子连接组合成一个高级的算子,可以大大提高了算法的开发效率。同时,通过使用统一的 Table API,让所有的数据都以 Table 格式进行传输,可以使得不同公司开发的算法能够互相兼容,降低不同公司重复开发的算子的成本,提升算法合作的效率。

img

之前版本的 Flink ML API 还是存在不少痛点。

首先是表达能力方面。之前的 API 的输入只支持单个 Table 的形式,无法表达一些常见的算法逻辑。比如有些训练算法的输入表达是一张图,把数据通过不同的 Table 传进来,这种情况下单个 Table 输入的接口就不适用了。再比如有些数据预处理的逻辑需要将多个输入得到的数据进行融合,用单个 Table 输入的 API 也不适合。因此我们计划把算法接口扩展为支持多输入多输出。

其次是实时训练方面。之前的 API 无法原生支持实时机器学习场景。在实时机器学习中,我们希望训练算法可以实时产生模型数据,并将模型数据以流的方式实时传输到多个前端服务器中。但是现有的接口只有一次性的训练和一次性的推理 API,无法表达这种逻辑。

最后是易用性方面。之前采用 toJson() 和 fromJson() 来导出和加载模型数据,并且允许用户储存这些数据。但是有些模型的数据量高达几个 G,在这种情况下将模型数据以 string 的方式进行储存,效率会非常低,甚至可能无法实现。当然,存在一些 hacky 方法,可以把模型数据储存到一个远程终端,再把相关的 url 通过 toJson() 方法传导出来。但是这种情况下会存在易用性的问题,算法使用者需要自己去解析 URL,并从远程获取这些数据。

受限于以上几个方面的因素,我们决定对 Flink ML API 进行扩展。

img

在经过大量讨论以及思考之后,我们对新的 API 赋予了以下特性,解决了上面所有问题。

  • 第一,在 API 上增加了获得模型数据的接口,比如在 model 上增加了 getModelData() 和 setModelData() API,能够帮助实现实时机器学习场景;
  • 第二,对算子的接口做了扩展,让算子可以支持多输入多输出,还可以将不同算子以有向无环图的方式进行整合,打包成更高级的算子;
  • 第三,新的 API 是基于 datastream 实现的,可以支持流批一体的特性,能够同时实现基于有限流和无限流的在线训练;
  • 第四,对算法的参数存取 API 做了改进,新的算法参数的存取 API 更容易使用;
  • 第五,对模型的存取 API 做了改进,新的模型存取 API 采用了 save() 和 load() API,模型数据非常大的情况下用户也无须考虑这方面的复杂度,只需要调用 save() 和 load() 就可以实现相关的功能;
  • 第六,提供了无模型语义的抽象类。

img

上图是最新的 API 构架图。最上层有一个 WithParams interface,它提供了存取参数的 API。我们对这个接口做了改进,用户不再需要表达 isOptional 之类的 field。这个接口之下是一个 stage 接口,它包含了所有算法模块,并提供了存取模块的 API,即 save() 和 load()。save() 负责把模型数据和参数储存下来,load() 负责把模型数据和参数读取出来,还原原先的 stage 实例。用户不用考虑参数存取的复杂度。

Stage 下分为两块,一块是表达训练逻辑的 Estimator,另一块是表达推理逻辑的 AlgoOperator 和 Model 类。Estimator 的核心 API 是 fit()。与之前不同的是现在支持多个 Table 的输入,可以用来表达需要多个 Table 输入逻辑,比如特征的拼接。Estimator::fit() 输出的是一个 Model。Model 属于 AlgoOperator。AlgoOperator 表达的是计算逻辑,支持多个 Table 作为输入和输出,每个 Table 都是一个数据源,可以用来表达通用的计算逻辑。

AlgoOperator 之下是 Transformer,可以表达对数据做转换的逻辑。它与 AlgoOperator 具有相同的 API 格式,但是它们的抽象概念却有所不同。Transformer 是一个具有模型语义的数据转换逻辑。在计算中,比如数据预处理,存在一些更通用的将不同数据进行拼接转换的操作,例如对数据进行过滤,在通用的概念下可能并不适用于 Transformer。因此我们特意增加了 AlgoOperator 类,方便用户的理解和使用。

Transformer 之下是 model 类。我们增加了 setModelData() 和 getModelData() API。这两个 API 是为实时机器学习专门设计的,可以让用户把模型数据实时导出到多个远程终端做在线的推理。

img

上图是一个比较简化但经典的实时机器学习场景。

这里的数据来源主要有两个,静态数据来自于 HDFS,动态数据来自于 Kafka。由 AlgoOperator 读取来自以上两个数据源的数据,将它们拼接之后形成一个 Table 输入到 Estimator 逻辑。Estimator 读取刚才拼接得到的数据并产生一个 Model,然后可以通过 getModelData() 拿到代表模型数据的 Table。再通过 sink() API 将这些数据传输到 Kafka topic。最后在多个前端服务器上面运行程序,这些程序可以直接创建一个 Model 实例,从 Kafka 中读出模型数据形成一个 Table,再通过 setModelData() 把这些数据传递给 Model,使用得到的 Model 做在线推理。

img

在支持在线训练和在线推理之后,我们进一步提供了一些基础组件,方便用户通过简单的算子构建更复杂算子,这个组件便是 FLIP-175 提供的 GraphBuilder。

假设用户的输入也是与上文一致的两个数据源,最终输出到一个数据源。用户的核心计算逻辑可以分为两块,第一块是数据预处理,比如特征拼接,把两个数据源的数据读进来之后做整合,以 Table 的形式输出到 Estimator,执行第二块的训练逻辑。我们希望先执行训练算子,得到一个 Model。然后将预处理算子和 Model 连接,表达在线推理逻辑。

用户需要做的只是通过 GraphBuilder API 将上述步骤连接进行,不需要专门为在线推理逻辑再写一遍连接逻辑。GraphBuilder 会自动从前面一个图生成,并与后面图中的算子形成一一对应的关系。AlgoOperator 在训练图中的形式是直接转换为推理图中的算子,而 Estimator 在训练图中得到的 Model 会成为推理图中对应的节点,通过将这些节点相连,便得到了最后的 ModelA,最终用作在线推理。

二、流批一体的迭代引擎

img

Flink 是一个基于 Dag 描述执行逻辑的流批一体的处理引擎,但是在许多场景下,尤其是机器学习\图计算类型的应用中,用户还需要数据迭代处理的能力。例如,一些算法的离线训练、在线训练以及模型部署后根据结果动态调整模型参数的场景,都需要数据迭代处理。

由于实际的场景同时会涵盖离线和在线处理的案例,因此需要在迭代这一层能够同时支持离线和在线处理。

img

前文提到的三个场景的处理逻辑既存在区别,也存在共性。

对于离线训练,以逻辑回归为例,在迭代中可以使用一个节点来缓存整个模型,这个节点会将最新的模型发送给训练节点,而训练节点会在迭代开始前预先读取整个数据集,随后在每次收到最新的模型后,从数据集中选择一个mini-batch数据对模型进行更新,并把结果发送回模型缓存节点。

对于在线计算,由于训练数据是从外部源源不断到达的,无法预先读取所有训练数据,一般的做法是动态读取一个 mini-batch 的数据,计算完模型的更新后将其发送给模型缓存的节点,等模型的缓存节点进一步发送更新后的模型以后,再读取下一个 mini-batch 数据,这也就要求训练节点必须采用优先级读的方式对数据进行读取,从而最终实现逐个处理 mini-batch 的能力。

在这两种场景下的训练都存在同步和异步方式,具体取决于模型缓存节点是否要收集到所有更新后再开始下一轮训练。此外还存在一些模型,在预测的时候会对参数进行动态的更新,处理完每一条数据之后都要立刻评估是否会进行参数更新,如果需要就再发起更新。

img

这几种场景下的计算逻辑存在一定的共性,首先都需要在作业图中引入迭代的结构来支持数据的循环处理,并且在数据循环处理之后需要进行是否终止迭代的判断。另一方面,计算过程中还需要在每一轮数据接收完成后,接收到相应的通知,触发特定的计算,比如离线训练中,接触完整个模型后就要开始下一轮的计算。

这里其实存在两个选择,一个是在迭代这一层,直接提供将数据集划分为多个 mini-batch,并且对每个 mini-batch 赋予迭代的能力。它在逻辑上可以接受所有类型的迭代,但是如果想要同时支持逐个 mini-batch 处理与多个 mini-batch 并行处理的逻辑,就必须引入一套新的基于 mini-batch 的流处理接口,并且在实践层支持这两种处理逻辑。

另外在线训练和离线训练的 mini-batch 产生的方式也不一样,离线 mini-batch 是通过本地预先读取所有数据,然后在每一轮处理中进行选取来实现。而在线 mini-batch 通过从外部数据中读取特定数据量的数据来实现的。因此如果想要兼容这两种情况,会进一步增加接口和实现的复杂度。

最后如果要兼容单个 per-record 的处理,还必须引入无限大的 mini-batch,或将每条记录看作一个单独的 mini-batch,前者会进一步增加接口的复杂度,而后者需要每一记录对算子进行一次通知,会增加运行时的开销。

考虑到上述情况,我们在迭代中只提供了整个数据集级别的通知,而将划分 mini-batch 的功能放在了迭代之上。在离线训练中,用户可以通过从整个数据集中选取一部分数据来高效实现 mini-batch 选择的功能。而在在线计算中,用户可以通过 Flink 自带的运行集输入算子,实现逐个 mini-batch 处理的功能。

img

基于上述思路,整个迭代的设计如上图所示,主要由 4 部分组成。初始模型这类有回边的输入、数据集这类无回边的只读输入、迭代回边的位置以及最终输出。其中回边对应的数据集与有回边的输入是一一对应的,从回边返回的数据与初始数据进行 union 之后,实现数据的迭代处理。

迭代内部为用户提供了数据集处理完成的通知功能,即进度追踪的能力。用户基于这一能力可以实现处理完成数据集的某一轮之后执行特定操作。比如在离线训练的时候,用户可以在某个算子收到模型的更新数据之后,计算模型的下一轮更新。

此外对于没有回边的输入数据,允许用户指定每一轮是否进行重放。对算子也提供了每轮新建一个算子以及通过一个算子的实例处理所有轮次数据的能力。通过这种方式,用户无须重建算子实例就能实现在轮次之间缓存数据的能力。用户也可以通过回放输入数据并重建算子来复用迭代外的算子,比如 Reduce、Join 这种常用算子,输入数据进行重放并且算子会在某一轮进行重建,这种情况下用户可以直接复用这些迭代外的算子。

同时,我们提供了两种终止判断逻辑,一种是当整个迭代中已经没有数据在处理的时候,会自然终止迭代。另外一种是在有限流的情况下,也允许用户指定特定数据集,如果这一数据集在某轮没有数据产生,用户可以提前终止整个迭代。

img

DataStream<double[]> initParameters = …
DataStream<Tuple2<double[], Double>> dataset = …
​
DataStreamList resultStreams =
    Iterations.iterate(
          DataStreamList.of(initParameters),
          ReplayableDataStreamList.notReplay(dataset),
          IterationConfig.newBuilder().setOperatorRoundMode(ALL_ROUND).build();
          (variableStreams, dataStreams) -> {
                    DataStream<double[]> modelUpdate = variableStreams.get(0);
                    DataStream<Tuple2<double[], Double>> dataset = dataStreams.get(0);
 
                  
                    DataStream<double[]> newModelUpdate = …
                    DataStream<double[]> modelOutput = …
                    return new IterationBodyResult(
       DataStreamList.of(newModelUpdate), 
       DataStreamList.of(modelOutput)
                });
         
DataStream<double[]> finalModel = resultStreams.get("final_model");

上图是使用迭代 API 来构建迭代的例子。用户需要指定有回边和无回边的输入列表、算子是否需要每轮重建以及迭代体的计算逻辑的等。对于迭代体,用户需要返回回边对应的数据集以及迭代的最终输出。

img

public static class ModelCacheFunction extends ProcessFunction<double[], double[]>
        implements IterationListener<double[]> { 
         
        private final double[] parameters = new double[N_DIM];
 
        public void processElement(double[] update, Context ctx, Collector<O> output) {
            // Suppose we have a util to add the second array to the first.
            ArrayUtils.addWith(parameters, update);
        }
 
        void onEpochWatermarkIncremented(int epochWatermark, Context context, Collector<T> collector) {
            if (epochWatermark < N_EPOCH * N_BATCH_PER_EPOCH) {
                collector.collect(parameters);
            }
        }
 
        public void onIterationEnd(int[] round, Context context) {
            context.output(FINAL_MODEL_OUTPUT_TAG, parameters);
        }
    }

对于迭代内的算子,如果它实现了 IterationListener 接口,就会在所有数据集处理完某一轮之后,通知迭代的算子。如果整个迭代都处理终止则会通过 onIterationTerminated 对算子进行进一步通知,用户可以在这两个回调中实现需要的计算逻辑。

img

在迭代的实现中,对于用户通过代码来创建的迭代处理结构,会增加一部分迭代内部的节点,并对用户所有的处理节点进行 wrap 操作,从而达到管理算子生命周期并对数据类型进行转换的目的。最后, 迭代基于 Colocation 与本地内存实现了回边,这样在调度器看来整个作业逻辑仍然是一个 DAG,从而可以直接复用现在的调度逻辑。

img

在迭代中插入的专用算子主要包括 input、output、head 与 tail 算子,其中 input 和 output 负责数据类型的转换,外部数据进入迭代内时会为每一条记录增加一个迭代头,里面记录了该 record 处理的轮次,每个算子的 wrap 会将迭代头去掉后交给用户原始的算子处理。

head 和 tail 算子负责实现回边及计算某一轮迭代是否已经全部处理完成。head 算子从 input 读取完整个输入,并在最后插入一条特殊的 EpochWatermark 事件,来标记第零轮迭代的终止。由于 head 算子可能会存在多个并发,所以必须等 head 算子的所有并发都读取完输入后,才能发送第 0 轮终止的事件。

head 算子通过 Operator Coordinator 来同步所有并发。Operator Coordinator 是一个位于 JobManager 中的全局组件,它可以与所有 head task 进行双向通信,所有 head 算子并发都收到每一轮处理完成的通知后,就会发送全局广播给所有 head task,告诉他们这一轮的处理已经全部结束。head 发送 EpochWaterMark 之后,所有迭代中的算子都会与这一消息进行对齐。算子从所有输入中都读取到特定轮次的 EpochWatermark 之后,就会认为这一轮处理完成,并调用这一轮结束的回调。当 tail 节点收到某一轮数据或 EpochWatermark 之后,会将轮次加 1,然后通过回边再次发送给 head,从而实现数据循环处理。

img

最后我们也支持了有迭代情况下的 checkpoint 功能。由于 Flink 默认的 checkpoint 机制无法支持带环的计算图,因此我们对 Flink 的 checkpoint 机制进行了扩展,实现了带环的 Chandy-Lamport 算法,会同时缓存来自回边的消息。另外 head 算子在对齐的时候,除了要读取正常输入的 barrier 之外,也会等待来自 Operator Coordinator 的特殊的 barrier。每一轮全局结束的消息也是来自 Operator Coordinator 广播,可以保证每个 checkpoint 中所有迭代内的算子都处在同一轮,从而简化算子后续进行并发修改的操作。

另外还有一个开发中的优化,Operator Coordinator 会将收到的 barrier 延迟到下一个全局对齐消息之后,再发送通知,从而使得整个迭代内的算子的 state 都是恰好处于读取完某一轮数据之后。许多同步算法都是先将缓存收到的数据存储在算子中,直到读取完一轮所有数据之后再进行统一处理。这一优化可以保证在进行 snapshot 操作的时候,正好清空所有缓存,从而使整个 checkpoint 中缓存的数据量最小。

img

以上是关于 Flink 流批一体迭代引擎的介绍,这些引擎可以同时在在线和离线场景中使用,并且支持 exactly-once 的容错。未来我们将进一步支持 batch 的执行模式,并提供更多的上层开发工具。

三、Flink ML 生态建设

img

最近我们已经将 Flink ML 相关代码从 Flink 核心代码库中移入一个单独的 Flink ML 代码库。这样做的首先是为了方便 Flink ML 的快速迭代,其次也希望通过这个手段减少 Flink 核心代码库的复杂度,避免 Flink 核心代码库过于臃肿。

另外,我们在 Github 上建立了一个中立的组织 Flink-extended,能够为所有 Flink 社区的开发者提供平台来贡献一些他们希望开源的项目。方便大家分享不带有特定公司的名字的代码,使不同公司的开发人员可以把代码贡献出来,方便 Flink 社区来使用和共建。我们希望借此促进 Flink 生态的繁荣发展。

目前中立项目中已经有一些比较重要的项目,比如 Deep Learning on Flink,它是由阿里大数据团队主要开发的一个开源项目,核心作用是可以把 Tensorflow 打包成 Java 算子在 Flink 中运行,方便将 Flink 的预处理程序与 Tensorflow 深度学习的训练算法相结合,形成端到端的训练以及推理。

最近我们已经在 Flink ML 中新增了若干常见算法,之后还会继续提供更多开箱可用的算法。

img

上图是我们目前正在进行中的重要工作,其中最核心的工作是将现有的阿里巴巴开源的 Alink 代码库进行改造,使其中的算法能够适配新设计的 Flink ML API,并将改造后的算法贡献到 Apache 项目,方便 Flink 用户得到更多开箱可用的算法。

此外,我们还与 360 一起合作共建 Clink 项目,核心目标是在离线计算中用 Java 去运行某些算子,得到训练结果。另一方面,这些算子需要能够以非常低的延迟做在线推理。然而低延迟在线推理很难用 Java 实现,通常需要用 C++ 来实现。为了使开发者只写一遍算法就能同时应用于 Java 和 C++ 环境,Clink 提供了一些打包的基础类的功能,方便算法开发者写好 C++ 算子之后,能够使用 JNI 打包成 Java 算子,并在 Flink 中使用这些算子。

最后,我们计划在 Flink ML 中开发对于 Python 的支持,其中包括允许算法使用者通过写 Python 程序将 Flink ML 中的 Java 算子进行连接和组合使用,希望能提高机器学习开发者的效率和使用体验。

以上工作基本都已经进入开源项目,其中算法 API 的设计在 FLIP-173 中 ,迭代引擎的设计主要在 FLIP-176 中 ,FLIP-174 和 FLIP-175 分别提供了算法参数的 API 以及 GraphBuilder 的 API。Clink 和 Deep Learning on Flink 等项目也已经在 Flink-extended 的组织上,欢迎大家使用。

点击查看直播回放 & 演讲PDF


更多 Flink 相关技术问题,可扫码加入社区钉钉交流群
第一时间获取最新技术文章和社区动态,请关注公众号~

O1CN01tmtpiy1iazJYZdixL_!!6000000004430-2-tps-899-548.png"

活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99 元试用 实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

image.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
4天前
|
JSON API 数据格式
淘宝 / 天猫官方商品 / 订单订单 API 接口丨商品上传接口对接步骤
要对接淘宝/天猫官方商品或订单API,需先注册淘宝开放平台账号,创建应用获取App Key和App Secret。之后,详细阅读API文档,了解接口功能及权限要求,编写认证、构建请求、发送请求和处理响应的代码。最后,在沙箱环境中测试与调试,确保API调用的正确性和稳定性。
|
16天前
|
供应链 数据挖掘 API
电商API接口介绍——sku接口概述
商品SKU(Stock Keeping Unit)接口是电商API接口中的一种,专门用于获取商品的SKU信息。SKU是库存量单位,用于区分同一商品的不同规格、颜色、尺寸等属性。通过商品SKU接口,开发者可以获取商品的SKU列表、SKU属性、库存数量等详细信息。
|
17天前
|
JSON API 数据格式
店铺所有商品列表接口json数据格式示例(API接口)
当然,以下是一个示例的JSON数据格式,用于表示一个店铺所有商品列表的API接口响应
|
27天前
|
编解码 监控 API
直播源怎么调用api接口
调用直播源的API接口涉及开通服务、添加域名、获取API密钥、调用API接口、生成推流和拉流地址、配置直播源、开始直播、监控管理及停止直播等步骤。不同云服务平台的具体操作略有差异,但整体流程简单易懂。
|
29天前
|
存储 算法 Java
Set接口及其主要实现类(如HashSet、TreeSet)如何通过特定数据结构和算法确保元素唯一性
Java Set因其“无重复”特性在集合框架中独树一帜。本文解析了Set接口及其主要实现类(如HashSet、TreeSet)如何通过特定数据结构和算法确保元素唯一性,并提供了最佳实践建议,包括选择合适的Set实现类和正确实现自定义对象的hashCode()与equals()方法。
32 4
|
7天前
|
JSON API 数据安全/隐私保护
拍立淘按图搜索API接口返回数据的JSON格式示例
拍立淘按图搜索API接口允许用户通过上传图片来搜索相似的商品,该接口返回的通常是一个JSON格式的响应,其中包含了与上传图片相似的商品信息。以下是一个基于淘宝平台的拍立淘按图搜索API接口返回数据的JSON格式示例,同时提供对其关键字段的解释
|
1月前
|
JSON 算法 数据可视化
测试专项笔记(一): 通过算法能力接口返回的检测结果完成相关指标的计算(目标检测)
这篇文章是关于如何通过算法接口返回的目标检测结果来计算性能指标的笔记。它涵盖了任务描述、指标分析(包括TP、FP、FN、TN、精准率和召回率),接口处理,数据集处理,以及如何使用实用工具进行文件操作和数据可视化。文章还提供了一些Python代码示例,用于处理图像文件、转换数据格式以及计算目标检测的性能指标。
57 0
测试专项笔记(一): 通过算法能力接口返回的检测结果完成相关指标的计算(目标检测)
|
1月前
|
人工智能 自然语言处理 PyTorch
Text2Video Huggingface Pipeline 文生视频接口和文生视频论文API
文生视频是AI领域热点,很多文生视频的大模型都是基于 Huggingface的 diffusers的text to video的pipeline来开发。国内外也有非常多的优秀产品如Runway AI、Pika AI 、可灵King AI、通义千问、智谱的文生视频模型等等。为了方便调用,这篇博客也尝试了使用 PyPI的text2video的python库的Wrapper类进行调用,下面会给大家介绍一下Huggingface Text to Video Pipeline的调用方式以及使用通用的text2video的python库调用方式。
|
1月前
|
JSON JavaScript API
(API接口系列)商品详情数据封装接口json数据格式分析
在成长的路上,我们都是同行者。这篇关于商品详情API接口的文章,希望能帮助到您。期待与您继续分享更多API接口的知识,请记得关注Anzexi58哦!
|
17天前
|
JSON 前端开发 JavaScript
API接口商品详情接口数据解析
商品详情接口通常用于提供特定商品的详细信息,这些信息比商品列表接口中的信息更加详细和全面。以下是一个示例的JSON数据格式,用于表示一个商品详情API接口的响应。这个示例假定API返回一个包含商品详细信息的对象。

相关产品

  • 实时计算 Flink版