实时大数据计算引擎Apache Flink计算研究(二)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 接上文,近期团队在研究大数据平台产品,在业务场景设计时,经常会遇到实时数据加工的需求,因此开始探索实时大数据计算引擎。同时,我认为Flink也是未来流批一体的趋势。本文将技术预研过程中的要点整理分享出来,供大家参考使用,内容较多,分2个文章发布。

接上文,实时大数据计算引擎Apache Flink计算研究(一)


8、Flink DataSetAPI

数据源部分

不但提供了流处理,还提供了批处理,流处理和批处理分属于不同的api

基于文件

  • readTextFile(path)

基于集合

  • fromCollection(Collection)

算子部分

Map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作

FlatMap:输入一个元素,可以返回零个,一个或者多个元素

MapPartition:类似map,一次处理一个分区的数据【如果在进行map处理的时候需要获取第三方资源链接,建议使用MapPartition】

Filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下

Reduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值

Aggregate:sum、max、min等

Distinct:返回一个数据集中去重之后的元素,data.distinct()

Join:内连接

OuterJoin:外链接

Cross:获取两个数据集的笛卡尔积

Union:返回两个数据集的总和,数据类型需要一致

First-n:获取集合中的前N个元素

Sort Partition:在本地对数据集的所有分区进行排序,通过sortPartition()的链接调用来完成对多个字段的排序

批处理的统计文件中单词的个数

image.png

image.png

运行结果

输入文件

image.png

输出文件

image.png

9、DataSet API之Transformations

获取笛卡尔积

image.png

运行结果

image.png

批处理去重

image.png

处理结果

image.png

获取集合中的前N个元素

image.png

image.png

运行结果

image.png

image.png

image.png

image.png

10、DataSet API之Data sinks

writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取

writeAsCsv():将元组以逗号分隔写入文件中,行及字段之间的分隔是可配置的。每个字段的值来自对象的toString()方法

print():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中

在前2个章节中已测试过其中的方法。


11、Flink中的广播变量

1.应用场景,

把元素广播给所有的分区,数据会被重复处理

广播变量允许编程人员在每台机器上保持1个只读的缓存变量,而不是传送变量的副本给tasks

广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的

一句话解释,可以理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。如果不使用broadcast,则在每个节点中的每个task中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。

用法

  • 1:初始化数据
  • DataSet toBroadcast = env.fromElements(1, 2, 3)
  • 2:广播数据
  • .withBroadcastSet(toBroadcast, "broadcastSetName");
  • 3:获取数据
  • Collection broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");

注意:

  • 1:广播出去的变量存在于每个节点的内存中,所以这个数据集不能太大。因为广播出去的数据,会常驻内存,除非程序执行结束
  • 2:广播变量在初始化广播出去以后不支持修改,这样才能保证每个节点的数据都是一致的。

2.代码实现

image.png

image.png

image.png

image.png

12、Flink的累加器

1.Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化

可以在Flink job任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。

Counter是一个具体的累加器(Accumulator)实现

  • IntCounter, LongCounter 和 DoubleCounter

l用法

  • 1:创建累加器
  • private IntCounter numLines = new IntCounter();
  • 2:注册累加器
  • getRuntimeContext().addAccumulator("num-lines", this.numLines);
  • 3:使用累加器
  • this.numLines.add(1);
  • 4:获取累加器的结果

myJobExecutionResult.getAccumulatorResult("num-lines")

2.代码

image.png

image.png运行结果

image.png

13、Flink Distributed Cache(分布式缓存)

Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件

此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统),通过ExecutionEnvironment注册缓存文件并为它起一个名称。当程序执行,Flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它

用法

  • 1:注册一个文件
  • env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
  • 2:访问数据
  • File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");

代码

image.png

image.png

运行结果

image.png

image.png

14、状态(State)

我们前面写的word count的例子,没有包含状态管理。如果一个task在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算。从容错和消息处理的语义上(at least once, exactly once),Flink引入了state和checkpoint。

首先区分一下两个概念

  • state一般指一个具体的task/operator的状态【state数据默认保存在java的堆内存中】
  • 而checkpoint【可以理解为checkpoint是把state数据持久化存储了】,则表示了一个Flink Job在一个特定时刻的一份全局状态快照,即包含了所有task/operator的状态
  • 注意:task是Flink中执行的基本单位。operator指算子(transformation)。

State可以被记录,在失败的情况下数据还可以恢复

Flink中有两种基本类型的State

  • Keyed State

Operator State

Keyed State和Operator State,可以以两种形式存在:

  • 原始状态(raw state)
  • 托管状态(managed state)

托管状态是由Flink框架管理的状态

而原始状态,由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知。



1.State-Keyed State介绍

顾名思义,就是基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,都对应一个state。

  • stream.keyBy(…)

保存state的数据结构

  • ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值
  • ListState:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值
  • ReducingState:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值
  • MapState:即状态值为一个map。用户通过put或putAll方法添加元素

需要注意的是,以上所述的State对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中。相当于我们只是持有了这个状态的句柄


2.State-Operator State

与Key无关的State,与Operator绑定的state,整个operator只对应一个state

保存state的数据结构

  • ListState

举例来说,Flink中的Kafka Connector,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射


checkPoint简介

为了保证state的容错性,Flink需要对state进行checkpoint。

Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常

Flink的checkpoint机制可以与(stream和state)的持久化存储交互的前提:

  • 持久化的source,它需要支持在一定时间内重放事件。这种sources的典型例子是持久化的消息队列(比如Apache Kafka,RabbitMQ等)或文件系统(比如HDFS,S3,GFS等)
  • 用于state的持久化存储,例如分布式文件系统(比如HDFS,S3,GFS等)

默认checkpoint功能是disabled的,想要使用的时候需要先启用

checkpoint开启之后,默认的checkPointMode是Exactly-once

checkpoint的checkPointMode有两种,Exactly-once和At-least-once

Exactly-once对于大多数应用来说是最合适的。At-least-once可能用在某些延迟超低的应用程序(始终延迟为几毫秒

默认checkpoint功能是disabled的,想要使用的时候需要先启用

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】

env.enableCheckpointing(1000);

// 高级选项:

// 设置模式为exactly-once (这是默认值)

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】

env.getCheckpointConfig().setCheckpointTimeout(60000);

// 同一时间只允许进行一个检查点

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】

env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

State Backend(状态的后端存储)

默认情况下,state会保存在taskmanager的内存中,checkpoint会存储在JobManager的内存中。

state 的store和checkpoint的位置取决于State Backend的配置

  • env.setStateBackend(…)

一共有三种State Backend

  • MemoryStateBackend
  • FsStateBackend

RocksDBStateBackend

MemoryStateBackend

  • state数据保存在java堆内存中,执行checkpoint的时候,会把state的快照数据保存到jobmanager的内存中
  • 基于内存的state backend在生产环境下不建议使用

FsStateBackend

  • state数据保存在taskmanager的内存中,执行checkpoint的时候,会把state的快照数据保存到配置的文件系统中
  • 可以使用hdfs等分布式文件系统

RocksDBStateBackend

  • RocksDB跟上面的都略有不同,它会在本地文件系统中维护状态,state会直接写入本地rocksdb中。同时它需要配置一个远端的filesystem uri(一般是HDFS),在做checkpoint的时候,会把本地的数据直接复制到filesystem中。fail over的时候从filesystem中恢复到本地
  • RocksDB克服了state受内存限制的缺点,同时又能够持久化到远端文件系统中,比较适合在生产中使用l修改State Backend的两种方式第一种:单任务调整
  • 修改当前任务代码
  • env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
  • 或者new MemoryStateBackend()
  • 或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依赖】
  • 第二种:全局调整
  • 修改flink-conf.yaml
  • state.backend: filesystem
  • state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
  • 注意:state.backend的值可以是下面几种:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)
  • Restart Strategies(重启策略)

Flink支持不同的重启策略,以在故障发生时控制作业如何重启

集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。 如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略

默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。

常用的重启策略

  • 固定间隔 (Fixed delay)
  • 失败率 (Failure rate)
  • 无重启 (No restart)

如果没有启用 checkpointing,则使用无重启 (no restart) 策略。

如果启用了 checkpointing,但没有配置重启策略,则使用固定间隔 (fixed-delay) 策略,其中 Integer.MAX_VALUE 参数是尝试重启次数

重启策略可以在flink-conf.yaml中配置,表示全局的配置。也可以在应用代码中动态指定,会覆盖全局配置

重启策略之固定间隔 (Fixed delay)

l第一种:全局配置 flink-conf.yaml

  • restart-strategy: fixed-delay
  • restart-strategy.fixed-delay.attempts: 3
  • restart-strategy.fixed-delay.delay: 10 s

l第二种:应用代码设置

  • env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  • 3, // 尝试重启的次数
  • Time.of(10, TimeUnit.SECONDS) // 间隔
  • ));

重启策略之失败率 (Failure rate)

第一种:全局配置 flink-conf.yaml

  • restart-strategy: failure-rate
  • restart-strategy.failure-rate.max-failures-per-interval: 3
  • restart-strategy.failure-rate.failure-rate-interval: 5 min
  • restart-strategy.failure-rate.delay: 10 s

第二种:应用代码设置

  • env.setRestartStrategy(RestartStrategies.failureRateRestart(
  • 3, // 一个时间段内的最大失败次数
  • Time.of(5, TimeUnit.MINUTES), // 衡量失败次数的是时间段
  • Time.of(10, TimeUnit.SECONDS) // 间隔

));

重启策略之无重启 (No restart)

第一种:全局配置 flink-conf.yaml

  • restart-strategy: none

第二种:应用代码设置

  • env.setRestartStrategy(RestartStrategies.noRestart());

保存多个Checkpoint

默认情况下,如果设置了Checkpoint选项,则Flink只保留最近成功生成的1个Checkpoint,而当Flink程序失败时,可以从最近的这个Checkpoint来进行恢复。但是,如果我们希望保留多个Checkpoint,并能够根据实际需要选择其中一个进行恢复,这样会更加灵活,比如,我们发现最近4个小时数据记录处理有问题,希望将整个状态还原到4小时之前

Flink可以支持保留多个Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的个数

  • state.checkpoints.num-retained: 20

这样设置以后就查看对应的Checkpoint在HDFS上存储的文件目录

l如果Flink程序异常失败,或者最近一段时间内数据处理错误,我们可以将程序从某一个Checkpoint点进行恢复

lbin/flink run -s hdfs://namenode:9000/flink/checkpoints/467e17d2cc343e6c56255d222bae3421/chk-56/_metadata flink-job.jar

l程序正常运行后,还会按照Checkpoint配置进行运行,继续生成Checkpoint数据

savePoint

Flink通过Savepoint功能可以做到程序升级后,继续从升级前的那个点开始执行计算,保证数据不中断

全局,一致性快照。可以保存数据源offset,operator操作状态等信息

可以从应用在过去任意做了savepoint的时刻开始继续消费

checkPoint vs savePoint

checkPoint

  • 应用定时触发,用于保存状态,会过期
  • 内部应用失败重启的时候使用

savePoint

  • 用户手动执行,是指向Checkpoint的指针,不会过期
  • 在升级的情况下使用
  • 注意:为了能够在作业的不同版本之间以及 Flink 的不同版本之间顺利升级,强烈推荐程序员通过 uid(String) 方法手动的给算子赋予 ID,这些 ID 将用于确定每一个算子的状态范围。如果不手动给各算子指定 ID,则会由 Flink 自动给每个算子生成一个 ID。只要这些 ID 没有改变就能从保存点(savepoint)将程序恢复回来。而这些自动生成的 ID 依赖于程序的结构,并且对代码的更改是很敏感的。因此,强烈建议用户手动的设置 ID。

savePoint的使用

1:在flink-conf.yaml中配置Savepoint存储位置

  • 不是必须设置,但是设置后,后面创建指定Job的Savepoint时,可以不用在手动执行命令时指定Savepoint的位置
  • state.savepoints.dir: hdfs://namenode:9000/flink/savepoints

2:触发一个savepoint【直接触发或者在cancel的时候触发】

  • bin/flink savepoint jobId [targetDirectory] [-yid yarnAppId]【针对on yarn模式需要指定-yid参数】
  • bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]【针对on yarn模式需要指定-yid参数】

3:从指定的savepoint启动job

  • bin/flink run -s savepointPath [runArgs]


THE END


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
18天前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
308 33
The Past, Present and Future of Apache Flink
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
146 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
3月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
871 13
Apache Flink 2.0-preview released
|
3月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
103 3
zdl
|
2月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
167 56
|
11天前
|
机器学习/深度学习 数据可视化 大数据
机器学习与大数据分析的结合:智能决策的新引擎
机器学习与大数据分析的结合:智能决策的新引擎
89 15
|
4天前
|
机器学习/深度学习 分布式计算 数据挖掘
MaxFrame 性能评测:阿里云MaxCompute上的分布式Pandas引擎
MaxFrame是一款兼容Pandas API的分布式数据分析工具,基于MaxCompute平台,极大提升了大规模数据处理效率。其核心优势在于结合了Pandas的易用性和MaxCompute的分布式计算能力,无需学习新编程模型即可处理海量数据。性能测试显示,在涉及`groupby`和`merge`等复杂操作时,MaxFrame相比本地Pandas有显著性能提升,最高可达9倍。适用于大规模数据分析、数据清洗、预处理及机器学习特征工程等场景。尽管存在网络延迟和资源消耗等问题,MaxFrame仍是处理TB级甚至PB级数据的理想选择。
24 4
|
11天前
|
存储 SQL 分布式计算
大数据时代的引擎:大数据架构随记
大数据架构通常分为四层:数据采集层、数据存储层、数据计算层和数据应用层。数据采集层负责从各种源采集、清洗和转换数据,常用技术包括Flume、Sqoop和Logstash+Filebeat。数据存储层管理数据的持久性和组织,常用技术有Hadoop HDFS、HBase和Elasticsearch。数据计算层处理大规模数据集,支持离线和在线计算,如Spark SQL、Flink等。数据应用层将结果可视化或提供给第三方应用,常用工具为Tableau、Zeppelin和Superset。
151 8
|
2月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
73 1
|
3月前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
66 1

推荐镜像

更多