如何基于 Flink 生成在线机器学习的样本?

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 在线机器学习与离线相比,在模型更新的时效性,模型的迭代周期,业务实验效果等方面有更好的表现。所以将机器学习从离线迁移到在线已经成为提升业务指标的一个有效的手段。在线机器学习中,样本是关键的一环。本文将给大家详细的介绍微博是如何用 Flink 来实现在线样本生成的。

作者:曹富强(微博)

在线机器学习与离线相比,在模型更新的时效性,模型的迭代周期,业务实验效果等方面有更好的表现。所以将机器学习从离线迁移到在线已经成为提升业务指标的一个有效的手段。

在线机器学习中,样本是关键的一环。本文将给大家详细的介绍微博是如何用 Flink 来实现在线样本生成的。

为何选择 Flink 来做在线的样本生成?

在线样本生成对样本的时效性和准确性都有极高的要求。同样对作业的稳定性及是否容灾也都有严格的指标要求。基于这个前提,我们对目前较为流行的几种实时计算框架(Storm 0.10, Spark 2.11, Flink 1.10)进行了分析比较,结论如下:

1.jpg

因此,我们决定使用 Flink 来作为在线样本生成的实时流计算框架。

如何实现?

在线样本生成,简单描述一个业务场景:对用户的曝光数据和点击数据实时的做关联,关联后将数据输出到 Kafka 中,给下游的在线训练作业用。

首先我们要确定两个数据流关联的时间窗口。这一步一般建议先离线对两个数据流的日志做关联,通过离线的方式对两份数据在不同的时间范围内做 join,来判断在线需要的时间窗口。比如业务接受的最低关联比例是 85%,并且通过离线测试确认 20 分钟内两个数据流可以关联 85%的数据,那么就可以采用 20 分钟作为时间窗口。这里的关联比例和窗口时间实际上是在准确性和实时性之间的一个 trade-off。

确定时间窗口后,我们并没有使用 Flink 的 time window 来实现多个数据流的 join,而是选择采用 union + timer 方式来实现。这里主要考虑两点:第一、Flink 自带的 join 操作不支持多个数据流。第二、使用 timer+state 来实现,自定义程度更高,限制更少,也更方便。

接下来,我们把样本生成过程细分为:

① 输入数据流

一般我们的数据源包括 Kafka,Trigger,MQ 等。Flink 需要从数据源中实时的读取日志。

② 输入数据流的格式化和过滤

读取日志后,对数据做格式化,并且过滤掉不需要的字段和数据。
指定样本 join 的 key。例如:用户 id 和 内容 id 作 key。
输出的数据格式一般为 tuple2(K,V),K:参与 join 的 key。V:样本用到的字段。

③ 输入数据流的 union

使用 Flink 的 union 操作,将多个输入流叠加到一起,形成一个 DataStream。
为每个输入流指定一个可以区分的别名或者增加一个可以区分的字段。

④ 输入数据流的聚合:keyby 操作

对 join 的 key 做 keyby 操作。接上例,表示按照用户 id 和内容 id 对多个数据流做 join。
如果 key 存在数据倾斜的情况,建议对 key 加随机数后先聚合,去掉随机数后再次聚合。

⑤ 数据存储 state + timer

  1. 定义一个Value State。
  2. keyby后的process方法中,我们会重写processElement方法,在processElement方法中判断,如果value state为空,则new 一个新的state,并将数据写到value state中,并且为这条数据注册一个timer(timer会由Flink按key+timestamp自动去重),另外此处我们使用的是ProcessingTime(表示onTimer()在系统时间戳达到Timer设定的时间戳时触发)。如果不为空则按照拼接的策略,更新已经存在的结果。比如:时间窗口内 用户id1,内容id1的第一条日志数据没有点击行为,则这个字段为0,第二条点击数据进入后,将这个字段更新为1。当然除了更新操作,还有计数、累加、均值等各种操作。如何在process里区分数据是来自曝光还是点击呢,使用上面步骤③定义的别名。
  3. 重写onTimer方法,在onTimer方法中主要是定义定时器触发时执行的逻辑:从value state里获取到存入的数据,并将数据输出。然后执行state.clear。
  4. 样本从窗口输出的条件有2个:第一,timer到期。第二,业务需要的样本都拼接上了。

此处参考伪代码:

public class StateSampleFunction extends KeyedProcessFunction<String, Tuple2, ReturnSample> {
    /**
     * 这个状态是通过过程函数来维护,使用ValueState
     */
    private ValueState state;

    private Long timer = null;

    public StateSampleFunction (String time){
        timer = Long.valueOf(time);
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        // 获取state
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("state", TypeInformation.of(new TypeHint< ReturnSample >() {})));
    }

    @Override
    public void processElement(Tuple2value, Context context, Collector< ReturnSample > collector) throws Exception {
        if (value.f0 == null){
            return;
        }

        Object sampleValue = value.f1;
        Long time = context.timerService().currentProcessingTime();
        ReturnSample returnSample = state.value();
        if (returnSample == null) {
            returnSample = new ReturnSample();
            returnSample.setKey(value.f0);
            returnSample.setTime(time);
            context.timerService().registerProcessingTimeTimer(time +timer);
        }

        // 更新点击数据到state里
        if (sampleValue instanceof ClickLog){
            ClickLog clickLog = (ClickLog)values;
            returnSample =(ReturnSample) clickLog.setSample(returnSample);
        }
        state.update(returnSample);
    }

    /**
     * @param timestamp
     * @param ctx
     * @param out
     * @throws Exception
     */
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector< ReturnSample > out) throws Exception {
        ReturnSample value = state.value();
        state.clear();
        out.collect(value);
    }
}

⑥ 拼接后的日志格式化和过滤

拼接后的数据需要按照在线训练作业的要求对数据做格式化,比如 json、CSV 等格式。
过滤:决定什么样的数据是合格的样本。例如:有真正阅读的内容才算是可用的样本。

⑦ 输出

样本最终输出到实时的数据队列中。下面是实际的作业拓扑和运行时状态:

1-2.jpg
1-3.jpg

整个样本拼接过程的流程图:

1-4.jpg

StateBackend 的选取

使用 RocksDB/Gemini 作为 state 的 Backend 的优势和建议:

我们用大数据对 memory 和 RocksDB,Gemini 做了实验对比,结果显示 RocksDB 和 Gemin 在数据处理,作业稳定性和资源使用等方面比 memory 更合理。其中 Gemini 的优势最为明显。

此外,如果是大数据量的 state,建议使用 Gemini + SSD 固态硬盘。

样本的监控

1. Flink 作业的异常监控

  • 作业失败监控
  • Failover 监控
  • Checkpoint 失败的监控
  • RocksDB 使用情况的监控
  • 作业消费 Kafka 的 Comsumer Lag 的监控
  • 作业反压的监控

2. 样本输入端 Kafka 的消费延迟监控

3. 样本输出端 Kafka 的写入量的监控

4. 样本监控

  • 拼接率监控
  • 正样本监控
  • 输出样本格式的监控
  • 输出标签对应的值是否在正常范围
  • 输入标签对应的值是否为 null
  • 输出标签对应的值是否为空

样本的校验

样本生成后,如何验证数据是否准确

  1. 在线和离线的相互校验

    将在线样本从输出的 Kafka 中接入到 HDFS 上离线存储。并按照在线 join 的时间窗口来分区。
  2. 用同等条件下生成的离线样本和在线样本做对比
  3. 白名单用户的全流程校验

    将白名单用户的日志和样本结果存入 ES 等实时数仓中,来做校验。
    

故障的处理

样本异常对线上模型训练的影响非常大。当发现异常报警时,首先要做的是向在线模型训练作业发送样本异常的报警。收到报警信息后,模型停止更新。从而避免影响模型线上效果。

普通意义的业务故障解决后,丢弃原来的数据,所有输入日志流从最新的时间点开始消费并生成新的样本即可。重要业务需要重置输入日志流的 Kafka offset 从故障时间点开始重新生成样本数据。

平台化

通过平台化对样本生成的流程做出严格的规范非常重要。在平台化的过程中,需要提供简单通用的开发模板以提高作业开发效率;提供平台化的作业监控和样本指标监控框架,避免重复造车;提供通用的样本输出落地策略,和在线/离线校验策略,更便捷的为业务方服务。

微博基于 Flink 搭建的在线样本生成平台架构,如图:

1-5.jpg

UI 页面,如图:

1-6.jpg
1-61.jpg

基于平台化开发,用户只需要关心业务逻辑部分即可。需要用户开发的有:

  1. 对应输入数据的数据清洗逻辑
  2. 样本输出前的数据清洗逻辑

其余的在 UI 上配置即可实现,具体有:

  1. 输入 Kafka 的配置信息及对应数据清洗的 UDF 类
  2. 样本拼接的时间窗口
  3. 窗口内对字段的聚合操作
  4. 样本输出的 Kafka 配置信息及输出前数据清洗和格式化的 UDF 类

资源情况由平台方审核并配置。完成后,自动生成并提交作业。

1-7.jpg

作业提交后:

1. 平台会提供如前所述的作业相关监控,如下:

■ Flink 作业的异常监控

作业失败监控
Failover 监控
Checkpoint 失败的监控
RocksDB 使用情况的监控
作业消费 Kafka 的 Comsumer Lag 的监控
作业反压的监控

■ 样本监控

拼接率监控
正样本监控
输出样本格式的监控
输出标签对应的值是否在正常范围
输入标签对应的值是否为 null
输出标签对应的值是否为空

2. 平台会自动将数据落盘,存储到HDFS上。方便离线验证或者离线训练。

3. 用户只需将精力放到样本的验证上即可,由平台方保证作业的稳定性。

作者介绍:

曹富强,微博机器学习研发中心-高级系统工程师。现负责微博机器学习平台数据计算/数据存储模块,主要涉及实时计算 Flink、Storm、Spark Streaming,数据存储Kafka、Redis,离线计算 Hive、Spark 等。目前专注于 Flink/Kafka/Redis 在微博机器学习场景的应用,为机器学习提供框架,技术,应用层面的支持。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
机器学习/深度学习 人工智能 算法
机器学习笔试面试之图像数据不足时的处理方法、检验方法、不均衡样本集的重采样
机器学习笔试面试之图像数据不足时的处理方法、检验方法、不均衡样本集的重采样
182 0
|
6月前
|
机器学习/深度学习 人工智能 Apache
人工智能平台PAI操作报错合集之alink任务可以在本地运行,上传到flink web运行就报错,如何解决
阿里云人工智能平台PAI (Platform for Artificial Intelligence) 是阿里云推出的一套全面、易用的机器学习和深度学习平台,旨在帮助企业、开发者和数据科学家快速构建、训练、部署和管理人工智能模型。在使用阿里云人工智能平台PAI进行操作时,可能会遇到各种类型的错误。以下列举了一些常见的报错情况及其可能的原因和解决方法。
|
6月前
|
机器学习/深度学习 人工智能 监控
【机器学习】大模型驱动少样本学习在图像识别中的应用
【机器学习】大模型驱动少样本学习在图像识别中的应用
171 0
|
7月前
|
机器学习/深度学习 人工智能 流计算
人工智能平台PAI 操作报错合集之在集群上提交了包含alink相关功能的flink任务,但是却报错如何解决
阿里云人工智能平台PAI (Platform for Artificial Intelligence) 是阿里云推出的一套全面、易用的机器学习和深度学习平台,旨在帮助企业、开发者和数据科学家快速构建、训练、部署和管理人工智能模型。在使用阿里云人工智能平台PAI进行操作时,可能会遇到各种类型的错误。以下列举了一些常见的报错情况及其可能的原因和解决方法。
|
7月前
|
机器学习/深度学习 人工智能 API
人工智能平台PAI产品使用合集之机器学习PAI中的sample_weight怎么加在样本中
阿里云人工智能平台PAI是一个功能强大、易于使用的AI开发平台,旨在降低AI开发门槛,加速创新,助力企业和开发者高效构建、部署和管理人工智能应用。其中包含了一系列相互协同的产品与服务,共同构成一个完整的人工智能开发与应用生态系统。以下是对PAI产品使用合集的概述,涵盖数据处理、模型开发、训练加速、模型部署及管理等多个环节。
|
7月前
|
机器学习/深度学习 数据采集 数据可视化
【机器学习】样本、特征、标签:构建智能模型的三大基石
【机器学习】样本、特征、标签:构建智能模型的三大基石
3265 0
|
7月前
|
机器学习/深度学习 算法 搜索推荐
Flink中的流式机器学习是什么?请解释其作用和常用算法。
Flink中的流式机器学习是什么?请解释其作用和常用算法。
191 0
|
机器学习/深度学习 算法 数据挖掘
Sentieon | 应用教程: TNscope®使用机器学习模型进行有匹配正常样本的体细胞变异发现
Sentieon | 应用教程: TNscope®使用机器学习模型进行有匹配正常样本的体细胞变异发现
131 0
|
消息中间件 机器学习/深度学习 SQL
《Apache Flink 案例集(2022版)》——3.机器学习——Bilibili-Flink 在 B 站的多元化探索与实践(1)
《Apache Flink 案例集(2022版)》——3.机器学习——Bilibili-Flink 在 B 站的多元化探索与实践(1)
168 0
|
存储 SQL 机器学习/深度学习
《Apache Flink 案例集(2022版)》——3.机器学习——Bilibili-Flink 在 B 站的多元化探索与实践(2)
《Apache Flink 案例集(2022版)》——3.机器学习——Bilibili-Flink 在 B 站的多元化探索与实践(2)
163 0

相关产品

  • 实时计算 Flink版