开发者学堂课程【开源 Flink 极速上手教程:Stream Processing with Apache Flink】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/331/detail/3708
Stream Processing with Apache Flink(一)
内容介绍:
一、并行处理和编程范式
二、DataStream API 概览
三、状态和时间
主要内容分为三部分,首先是数据处理和编程相关的背景知识,其次是课程是核心,如何利用 Flink DataStream API 完成简单的应用,最后是Flink中的状态和时间,第二第三两部分内容是 Flink 作为流处理引擎具有特色和代表性的特性。
一、并行处理和编程范式
1.并行处理和 DAG
对于计算密集型或数据密集型计算量较大的工作,并行计算或分而治之的思想是解决此类问题非常有效的手段,在手段中比较关键的一部分是如何对已有任务的划分或已有计算资源的分配。
如帮助老师批改试卷,假设有多份试卷交由几个同学批阅,卷子共有 ABC 三道题,可以考虑本地合作方式将所有试卷的三个题分别交由不同的同学批阅,每一个同学批完自己的题目传递给写一个同学,依次传递,达到类似流水线的工作效果,随着同学们数量的增加,协作的困难也越大,假设不同的题目有多个同学共同批阅,A 题目2个同学,B题目3个同学,C题目一个同学,需要进一步考虑对任务进行划分,将整体批卷子分为三组,第一组批A题,可以一个同学批A题中一部分,另一个同学批另一部分,分别批完之后将试卷传给下一个组,对试卷本身题目的划分以及对试卷本体的划分,是试卷的并行和是计算的并行,将协作用图形表示类似于有向无环图的样式。
在途中假设 A 组同学还需要进行一些额外的工作,如将试卷从办公室拿到批阅的地点,C 组同学也有额外的任务,将试卷进行最总的分数统计与记录,按照规则可以将图中所有的计算节点划分为三个类别,第一个为 Source,获取试卷的额外的规则,第二个节点不需要与外部打交道 Transformation,第三类节点是 C 除了完成本身工作,还需要将整个逻辑的结果写出到外部系统,称为Sink。节点表示计算,节点的边代表计算之间的依赖关系,以上是并行计算的背景知识。
2.命令式编程和声明式编程
数据集中有从1-10十个数字,任务将所有的数字放大一倍,即乘以2,再进行累计求和
List<Integer> data = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
编程解决此类问题有两类大的思想
第一类是命令式编程的方式,一步一步告诉机器如何生成处理结构,如何让处理结构存储中间的处理结果,如何将中间结果转化为最终结果。
另一种思路是声明式的方式,与命令式最大的不同在于声明式通常只需要告诉机器需要完成的任务,不需要详细传递如何一步步完成任务只需要将原有的数据集转化成stream,将 stream 转化成数字类型的 stream,在中间转换将每个数字乘以2,最后直接调用 sum 方法,获得所有数字的和。
命令式像假设我是老板,需要让手下的员工干活,如果员工是新手,需要一步一步指导完成的工作如何做,保证员工完全按照老板的意志完成任务,不足之处在于非常繁琐,需要详细介绍每一步如何做。
声明式像有经验的员工做一项工作时,不需要详细介绍,只需要告诉需要完成的事情,更有经验的员工甚至会出现特殊情况,相关工作人员今天未上班,第二天可以自动重拾最终完成分配的目标。
代码示例:
(1)命令式
public static int imperative () {
List<Integer> tempList = new ArrayList<> ();
for (int v : data) {
tempList.add (v * 2);
}
int result =0;
for (int v : tempList) {
result += v;
return result;
}
每一步都需要指定如何做,有许多方式
在原始数据集上操作,不新建一个临时的数据集存储翻倍之后的结果,直接在原始数据集对每一个数据乘以2,再进行累加
public static int iimperative2 () {
for (int i=0;i < data.size();++i){
data.set(i,data.get(i)* 2);
}
int result =A
For(int v : data){
result +=v;
}
return result;
}
还可以直接对数据进行优化,直接累加,不生成任何中间结果
public static int imperatie3(){
Int result = 0;
for (int v : data){
result +=V*2;
}
return result;
}
所有的方法,所达到效果都是相同的
(2)声明式
public static int declarative(){
return data.stream ().mapToInt(v -> v* 2).sum ();
}
SELECT SUM(2 * value) FROM data
对于命令式编程写的代码量少,对于开发人员而言,更加高效
运行方法
得到的结果是一致的,命令式2的方法会改变原始数据集,放在最后运行,防止对数据集产生影响。
SQL 也是一个典型的声明式的编程方式,比上述已给的编程的声明度更高,代码更加简洁。
作为简介的开发方式,是计算引擎所追求的效果,在Flink中所有开发相关的API都是比较偏向声明式的方式。
二、DataStream API 概览
1.Flink API 逻辑层次
在旧版本 Flink 中,开发的 API 遵循四层的关系,最上层使用更加高级的 API 或声明程度更高的 Table API/SQL 的方式编写逻辑,所有的 Table API/SQL 编写方式都会被内部翻译或优化成为一个 DataStream API 编写的程序,DataStream API 编写的程序进一步编写为 Transformations,最终被翻译成 JobGraph,即DAG形式。
在新版本 Flink中发生了一些改变,主要改变在 Table API/SQ中,不再翻译成DataStream API 编写的程序,直接编写为 Transformations的形式,Table API/SQL与 DataStream API 从上层下层关系变为平级关系,流程简化带来一些好处。重点是DataStream API
2.DataStream API 示例
解决上述1-10累加问题
将此问题放入 Flink 中,代码相对复杂
第一步在 Flink 中做开发,获取运行环境,获取环境后调用环境的 addSource 方法为整个逻辑添加最初始的数据源的输入,设置数据源后拿到数据源的引用,即 Source对象,调用一系列的转换方法对 Source 数据进行一系列的转化,转化将每个数字乘以2,进行分组,分组相当于返回的常数,表示的含义将所有的数据分到一组,对组里的数据按照第一个字段进行累加,最终得到结果。
得到结果后在整个逻辑加一个输出的 sink,将所有的数据写到目标中,调用 execute方法,将上面编写的逻辑统一提交远程或本地集群执行。
用DataStream编写程序与单机程序最大的不同在于前几步的过程不会触发对于数据的计算,只有在最后一步提交,前面是一个绘制DAG图的过程,整个逻辑图绘制完成后,调用 execute 方法将整个 DAG 图作为整体提交到集群中执行。
public static void datastream () throws Exception {
//1、获取运行环境
StreamExecutionEnvironment e =StreamExecutionEnvironment .getExecutionEnvironment ();
// 2、设置 source 读取数据
DataStream<Integer> source = e.addSource (
new FromElementsFunction<>(Types.INT.createSerializer (e.getConfig () ), data),Types.INT);
//3、对数据进行一系列转换
DataStream<Integer> ds = source.map(v -> v *
2 ) .keyBy(value -> 1 ) . sum (0);
// 4、将数据写入sink
ds .addsink (new PrintsinkFunction<> ());
// 5、提交执行
e .execute ( ) ;
3.Flink 作业产生过程
Flink具体的作业产生过程是复杂的,需要经过一步步的转化与优化
Flink 作业产生过程图,不需要了解每一步具体代表什么,只是做一个具体脉络的梳理,对阅读源码有帮助,如任务是如何产生的,可以对照图的每一块在源码中找对应的类观察整个作业的生成过程。
4.DataStream 转换操作
每一个 DataStream 对象在调用相关方法时,相当于产生一个新的转换,新的转换对应每一个新的算子,将新的算子添加到已有的逻辑代码图中,相当于添加一条边指向现有的最后的节点,采取不断地扩展图的方式,所有的 API 在调用时会产生一个新的对象,继续在新的对象上继续调用转换方法,像一种链式的方式将图一步步绘制出来,其中涉及到高阶函数的思想,每调用 DataStream 一个转换时,需要传递一个参数 function,转换决定了对数据进行怎样的操作,实际传递是函数包在算子中,函数决定转换操作具体如何完成。
除图中所列的内容,Flink DataStream API 还包含两个重要的点,ProcessFunction与 CoProcessFunction,作为最底层的处理逻辑提供给用户使用,所有左侧图设计的转换,理论上都可以用底层 ProcessFunction 与 CoProcessFunction去完成。自行梳理每个转换对数据进行的操作,通过官网查阅 DataStream API 相关文档查看具体说明。
5.数据分区(Shuffle)
将牌当作数据,传统批处理的 Shuffle操作相当于理牌的过程,在拿到牌时首先将牌理顺好,按顺序排列,方便找到要出的牌。
流处理所有的数据是动态到来的,数据按照不同的数字或花色分组分区的过程是动态完成的,给出上游两个A处理,下游三个 B 处理,流处理的 Shuffle 是数据的分区,A处理完发出的数据发送给下游的哪一个处理,是流处理的 Shuffle。
6.分区策略
类型 |
描述 |
dataStream.KeyBy() |
按照 Key 值分区 |
dataStream.global() |
全部发往第1个实例 |
dataStream.broadcast() |
广播 |
dataStream.forward() |
上下游并行度一样时一对一发送 |
dataStream.rebalance() |
随机均匀分配 |
dataStream.rescale() |
Round—Robin(轮流分配) |
dataStream.partitionCustom() |
自定义单播 |
dataStream 调用 keyBy 方法后,将整个数据按照Key值进行分区,严格上说,keyBy不是一种底层的物理分区策略,更多是一种转化的操作,从API角度看会将dataStream转化成key的dataStream类型,两者支持的操作有所不同,所有的分区策略中较难理解的是 rescale,设计上下游数据的本地性问题,与传统的rebalance Round-Robin 轮流分配不同在于会尽量避免数据跨网络传输,上述分区策略都不使用时,可以调用 partitionCustom 方法自定义分区,partitionCustom 只是一种自定义单播,每个数据只能指定一个下游所发送的示例,不能复制多份,发送到下有多个示例中
7.Flink 连接器
有两类处理比较关键的节点,一类是A需要连接外部系统,从外部系统将数据读取到Flink 集群中。
第二轮是 Sink 节点,需要汇总处理完的结果,将结果写入某个外部系统中,外部系统可以是链系统、数据库等,每一个数据库中的计算逻辑可以没有数据输出,可以不将最终的数据写出到外部系统,Flink 中还有 state 状态的概念,中间计算的结果可以通过 state 计算方式暴露给外部系统,可以没有 sink,但必须有 Source,必须从某个地方将数据读入,才能进行后续处理。
外部系统:文件系统、数据库、消息队列....
Source是否支持监测并接入更新(Dynamic or static)是否能够感知更新数据,将更新数据传输到系统中。如Flink中对文件有相应感应连接器,CS文件连接器在定义时可以指定参数,是否对CS文件读取一次,相当于文件系统的快照读到系统中,持续检测目录的变化情况,目录新添加一个CS文件,系统可以感知到变化并且可以将新添加的数据读到系统中,两者是静态数据源和动态数据源的概念,要关注数据源,决定了后续的处理是有限的还是持续的处理。
Sink是否支持更新已有结果(Append-only or support update)
对于部分系统如Kafka,将数据写入Kafka中,Kafka数据写入是追加,不能进行修改已经写入到系统中的某一条记录,对应的数据库支持对数据的更新,数据是1,第二次更新变为10,可以通过一些手段将1删掉,把更新的10写入,以上两个特性决定Flink连接器是一个面向静态、面向动态的数据关键的特点。关于系统连接器更多的信息参照官方文档内容,给出的截图是1.1之后的文档,datasource文档在1.1之后进行了重构、修改。对于Table SQL/API层面上的连接器,比起在dataStream层面上的连接器要承担更多的任务,会涉及stream定义以及连接器是否支持投影、下推等,支持这些可以提高数据处理的性能。
运行案例:定义environment,添加一个数据源,进行一系列的转换操作,添加数据最终写出的sink,将整个执行计划输出,调用execute方法将所有逻辑执行,运行
public static void datastream () throws Exception {
StreamExecutionEnvironment e =StreamExecutionEnvironment .getExecutionEnvironment ();
DataStream<Integer> source = e.addSource (
new FromElementsFunction<>(Types.INT.createSerializer (e.getConfig () ), data),Types.INT);
DataStream<Integer> ds = source.map(v -> v * 2 ) .keyBy(value -> 1 ) . sum (0);
ds .addsink (new PrintsinkFunction<> ());
e .execute ( ) ;
}
输出执行计划
Sum操作是一个不停更新的sum的操作,到达新的数据加载到已有的sum结果,最终所有的数据加载之后得到最终结果110,与单机完成的结果是一致的