Flink之DataSet迭代计算

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

(1)迭代计算分类与原理


迭代计算在批量数据处理过程中的应用非常广泛,如常用的机器学习算法Kmeans、逻辑回归,以及图形计算等,都会用到迭代计算。DataSet API对迭代计算功能的支持相对比较完善,在性能上比较其他分布式计算框架也具有非常高的优势。目前Flink中的迭代计算种类有两种模式,分别是Bulk Iteration (全量迭代计算)和 Delt Iteration(增量迭代计算)


什么是迭代运算?

所谓迭代运算,就是给定一个初值,用所给的算法公式计算初值得到一个中间结果,然后将中间结果作为输入参数进行反复计算,在满足一定条件的时候得到计算结果。


(2)全量迭代计算详解


Bulk Iteration

这种迭代方式称为全量迭代,它会将整个数据输入,经过一定的迭代次数1.png

全量迭代计算,一共有几个步骤:


首先初始化数据,可以通过从DataSource算子中获取,也可以从其他转化Operators中接入

其次定义Step Function,并在每一步迭代过程使用Step Function,结合数据集以及上一次迭代计算的Solution数据集,进行本次迭代计算。

每一次迭代过程中Step Function输出的结果,被称为Next Partital Solution数据集,该结果会作为下一次迭代计算的输入数据集。

最后一次迭代计算的结果输出,可以通过DataSink输出,或者接入到下一个0perators中。

迭代终止的条件有两种,分别为达到最大迭代次数或者符合自定义聚合器收敛条件:


最大迭代次数:指定迭代的最大次数,当计算次数超过该设定值是,终止迭代

自定义收敛条件:用户自定义的聚合器和收敛条件,例如终止条件设定为当Sum统计结果小于零则终止,否则继续迭代。


2.png

(2.1)案例分析


3.png

(2.2)案例实战

全量迭代计算通过使用DataSet的iterate()方法调用

package com.aikfk.flink.dataset.iteration;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/9 5:59 下午
 */
public class PiIterator {
    public static void main(String[] args) throws Exception {
        // 准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 创建初始IterativeDataSet
        IterativeDataSet<Integer> initial = env.fromElements(0).iterate(10000);
        DataSet<Integer> iteration = initial.map(new MapFunction<Integer, Integer>() {
            @Override
            public Integer map(Integer integer) throws Exception {
                double x = Math.random();
                double y = Math.random();
                return integer + ((x * x + y * y <= 1 ) ? 1 : 0);
            }
        });
        // 计算出点的距离小于一的个数
        DataSet<Integer> count = initial.closeWith(iteration);
        // 求出PI
        DataSet<Double> result = count.map(new MapFunction<Integer, Double>() {
            @Override
            public Double map(Integer count) throws Exception {
                return count / (double) 10000 * 4;
            }
        });
        result.print();
        /**
         * 3.146
         */
    }
}

(3)增量迭代计算详解


增量迭代是通过部分计算取代全量计算,在计算过程中会将数据集分为热点数据和非热点数据集,每次迭代计算会针对热点数据展开,这种模式适合用于数据量比较大的计算场景,不需要对全部的数据集进行计算,所以在性能和速度上都会有很大的提升。

4.png


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
6月前
|
SQL Oracle 关系型数据库
实时计算 Flink版操作报错之往GREENPLUM 6 写数据,用postgresql-42.2.9.jar 报 ON CONFLICT (uuid) DO UPDATE SET 语法有问题。怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
SQL Java 关系型数据库
Flink DataSet API迁移到DataStream API实战
本文介绍了作者的Flink项目从DataSet API迁移到DataStream API的背景、方法和遇到的问题以及解决方案。
217 3
|
1月前
|
SQL 大数据 API
大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
54 0
|
1月前
|
Java Shell 流计算
Flink-02 Flink Java 3分钟上手 Stream SingleOutputStreamOpe ExecutionEnvironment DataSet FlatMapFunction
Flink-02 Flink Java 3分钟上手 Stream SingleOutputStreamOpe ExecutionEnvironment DataSet FlatMapFunction
24 1
Flink-02 Flink Java 3分钟上手 Stream SingleOutputStreamOpe ExecutionEnvironment DataSet FlatMapFunction
|
1月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
47 1
|
1月前
|
存储 Java 数据处理
Flink-01 介绍Flink Java 3分钟上手 HelloWorld 和 Stream ExecutionEnvironment DataSet FlatMapFunction
Flink-01 介绍Flink Java 3分钟上手 HelloWorld 和 Stream ExecutionEnvironment DataSet FlatMapFunction
36 1
|
1月前
|
Kubernetes Cloud Native 流计算
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
75 3
|
4月前
|
SQL 网络安全 API
实时计算 Flink版产品使用问题之使用ProcessTime进行窗口计算,并且有4台机器的时间提前了2个小时,会导致什么情况
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在进行DWS层的实时聚合计算时,遇到多次更新同一个字段的情况,该如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
流计算
Flink CDC里假设我做widow计算使用ProcessTime计算
【1月更文挑战第23天】【1月更文挑战第113篇】Flink CDC里假设我做widow计算使用ProcessTime计算
225 45
下一篇
无影云桌面