4 延迟执行
所有Flink程序都是延迟执行:当执行程序的main方法时,数据加载和转换不会立即执行。而是创建每个操作并将其添加到程序的计划中。
当执行环境上的execute()调用显式触发执行时,实际执行操作。
程序是在本地执行还是在集群上执行取决于执行环境的类型
延迟执行使我们可以构建Flink作为一个整体计划单元执行的复杂程序,进行内部的优化。
5 指定keys
上述程序中的这些数据如何确定呢?
某些转换(join,coGroup,keyBy,groupBy)要求在元素集合上定义key
其他转换(Reduce,GroupReduce,Aggregate,Windows)允许数据在应用之前在key上分组。
- DataSet分组为
DataSet<...> input = // [...] DataSet<...> reduced = input .groupBy(/*define key here*/) .reduceGroup(/*do something*/);
虽然可以使用DataStream指定key
DataStream<...> input = // [...] DataStream<...> windowed = input .keyBy(/*define key here*/) .window(/*window specification*/);
Flink的数据模型不基于键值对。 因此,无需将数据集类型物理打包到键和值中。 键是“虚拟的”:它们被定义为实际数据上的函数,以指导分组操作符。
注意:在下面的讨论中,将使用DataStream API和keyBy。 对于DataSet API,只需要用DataSet和groupBy替换。
5.1 定义元组的键
源码
- 即 :按给定的键位置(对于元组/数组类型)对DataStream的元素进行分组,以与分组运算符(如分组缩减或分组聚合)一起使用。
最简单的情况是在元组的一个或多个字段上对元组进行分组:
val input: DataStream[(Int, String, Long)] = // [...] val keyed = input.keyBy(0)
元组在第一个字段(整数类型)上分组。
val input: DataSet[(Int, String, Long)] = // [...] val grouped = input.groupBy(0,1)
在这里,我们将元组分组在由第一个和第二个字段组成的复合键上。
关于嵌套元组的注释:如果你有一个带有嵌套元组的DataStream,例如:
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
指定keyBy(0)将使系统使用完整的Tuple2作为键(以Integer和Float为键)。 如果要“导航”到嵌套的Tuple2中,则必须使用下面解释的字段表达式键。
5.2 指定key的字段表达式
可以使用基于字符串的字段表达式来引用嵌套字段,并定义用于分组,排序,连接或coGrouping的键。
字段表达式可以非常轻松地选择(嵌套)复合类型中的字段,例如Tuple和POJO类型。
我们有一个WC POJO,其中包含两个字段“word”和“count”。
Java版本代码
Scala版本代码
要按字段分组,我们只需将其名称传递给keyBy()函数。
// some ordinary POJO (Plain old Java Object) class WC(var word: String, var count: Int) { def this() { this("", 0L) } } val words: DataStream[WC] = // [...] val wordCounts = words.keyBy("word").window(/*window specification*/) // or, as a case class, which is less typing case class WC(word: String, count: Int) val words: DataStream[WC] = // [...] val wordCounts = words.keyBy("word").window(/*window specification*/)
5.2.1 字段表达式语法:
按字段名称选择POJO字段
例如,“user”指的是POJO类型的“user”字段
通过1偏移字段名称或0偏移字段索引选择元组字段
例如,“_ 1”和“5”分别表示Scala Tuple类型的第一个和第六个字段。
可以在POJO和Tuples中选择嵌套字段
例如,“user.zip”指的是POJO的“zip”字段,其存储在POJO类型的“user”字段中。 支持任意嵌套和混合POJO和元组,例如“_2.user.zip”或“user._4.1.zip”。
可以使用“_”通配符表达式选择完整类型
这也适用于非Tuple或POJO类型的类型。
5.2.2 字段表达示例
class WC(var complex: ComplexNestedClass, var count: Int) { def this() { this(null, 0) } } class ComplexNestedClass( var someNumber: Int, someFloat: Float, word: (Long, Long, String), hadoopCitizen: IntWritable) { def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) } }
这些是上面示例代码的有效字段表达式:
- “count”:WC类中的count字段。
- “complex”:递归选择POJO类型ComplexNestedClass的字段复合体的所有字段。
- “complex.word._3”:选择嵌套Tuple3的最后一个字段。
- “complex.hadoopCitizen”:选择Hadoop IntWritable类型。
5.3 指定key的key选择器函数
定义键的另一种方法是“键选择器”功能。 键选择器函数将单个元素作为输入并返回元素的键。 key可以是任何类型,并且可以从确定性计算中导出。
以下示例显示了一个键选择器函数,它只返回一个对象的字段:
Java
Scala