集群级配置StateBackend
全局配置需要需改集群中的配置文件,修改flink-conf.yaml
- 配置FsStateBackend
state.backend: filesystem state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
- 配置MemoryStateBackend
state.backend: jobmanager
- 配置RocksDBStateBackend
state.backend.rocksdb.checkpoint.transfer.thread.num: 1 同时操作RocksDB的线程数 state.backend.rocksdb.localdir: 本地path RocksDB存储状态数据的本地文件路径
Window
在流处理中,我们往往需要面对的是连续不断、无休无止的无界流,不可能等到所有数据都到齐了才开始处理。所以聚合计算其实在实际应用中,我们往往更关心一段时间内数据的统计结果,比如在过去的 1 分钟内有多少用户点击了网页。在这种情况下,我们就可以定义一个窗口,收集最近一分钟内的所有用户点击数据,然后进行聚合统计,最终输出一个结果就可以了。
说白了窗口就是将无界流通过窗口切割成一个个的有界流,窗口是左开右闭的。
Flink中的窗口分为两类:基于时间的窗口(Time-based Window)和基于数量的窗口(Count-based Window)。
- 时间窗口(Time Window):按照时间段去截取数据,这在实际应用中最常见。
- 计数窗口(Count Window):由数据驱动,也就是说按照固定的个数,来截取一段数据集。
时间窗口中又包含了:滚动时间窗口(Tumbling Window)、滑动时间窗口(Sliding Window)、会话窗口(Session Window)。
计数窗口包含了:滚动计数窗口和滑动计数窗口。
时间窗口、计数窗口只是对窗口的一个大致划分。在具体应用时,还需要定义更加精细的规则,来控制数据应该划分到哪个窗口中去。不同的分配数据的方式,就可以由不同的功能应用。
根据分配数据的规则,窗口的具体实现可以分为 4 类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window),以及全局窗口(Global Window)。
滚动窗口(Tumbling Windows)
滚动窗口每个窗口的大小固定,且相邻两个窗口之间没有重叠。滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有窗口大小,我们可以定义一个长度为1小时的滚动时间窗口,那么每个小时就会进行一次统计;或者定义一个长度为10的滚动计数窗口,就会每10个数进行一次统计。
基于时间的滚动窗口:
DataStream<T> input = ... // tumbling event-time windows input .keyBy(...) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .<window function> (...) // tumbling processing-time windows input .keyBy(...) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .<window function> (...)
在上面的代码中,我们使用了TumblingEventTimeWindows
和TumblingProcessingTimeWindows
来创建基于Event Time或Processing Time的滚动时间窗口。窗口的长度可以用org.apache.flink.streaming.api.windowing.time.Time
中的seconds
、minutes
、hours
和days
来设置。
基于计数的滚动窗口:
import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class TumblingCountWindowExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Long> input = env.fromElements(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L); input .keyBy(value -> 1) .countWindow(3) .reduce(new ReduceFunction<Long>() { @Override public Long reduce(Long value1, Long value2) throws Exception { return value1 + value2; } }) .print(); env.execute(); } }
在上面的代码中,我们使用了countWindow
方法来创建一个基于数量的滚动窗口,窗口大小为3个元素。当窗口中的元素数量达到3时,窗口就会触发计算。在这个例子中,我们使用了reduce
函数来对窗口中的元素进行求和。
滑动窗口(Sliding Windows)
滑动窗口的大小固定,但窗口之间不是首尾相接,而有部分重合。同样,滑动窗口也可以基于时间和计算定义。
滑动窗口的参数有两个:窗口大小和滑动步长。滑动步长是固定的。
img
基于时间的滑动窗口:
DataStream<T> input = ... // sliding event-time windows input .keyBy(...) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<window function> (...)
基于计数的滑动窗口:
DataStream<T> input = ... input .keyBy(...) .countWindow(10, 5) .<window function> (...)
countWindow
方法来创建一个基于计数的滑动窗口,窗口大小为10个元素,滑动步长为5个元素。当窗口中的元素数量达到10时,窗口就会触发计算。
会话窗口(Session Windows)
会话窗口是Flink中一种基于时间的窗口类型,每个窗口的大小不固定,且相邻两个窗口之间没有重叠。“会话”终止的标志就是隔一段时间没有数据来:
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; import org.apache.flink.streaming.api.windowing.time.Time; DataStream<T> input = ... input .keyBy(...) .window(EventTimeSessionWindows.withGap(Time.minutes(10))) .<window function> (...)
在上面的代码中,使用了EventTimeSessionWindows
来创建基于Event Time的会话窗口。withGap
方法用来设置会话窗口之间的间隔时间,当两个元素之间的时间差超过这个值时,它们就会被分配到不同的会话窗口中。
按键分区窗口和非按键分区窗口
在Flink中,数据流可以按键分区(keyed)或非按键分区(non-keyed)。按键分区是指将数据流根据特定的键值进行分区,使得相同键值的元素被分配到同一个分区中。这样可以保证相同键值的元素由同一个worker实例处理。只有按键分区的数据流才能使用键分区状态和计时器。
非按键分区是指数据流没有根据特定的键值进行分区。这种情况下,数据流中的元素可以被任意分配到不同的分区中。
在定义窗口操作之前,首先需要确定,到底是基于按键分区(Keyed)来开窗,还是直接在没有按键分区的DataStream上开窗。也就是在调用窗口算子之前是否有keyBy操作。
按键分区窗口:
import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; public class KeyedWindowExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Long> input = env.fromElements(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L); input .keyBy(value -> 1) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .reduce(new ReduceFunction<Long>() { @Override public Long reduce(Long value1, Long value2) throws Exception { return value1 + value2; } }) .print(); env.execute(); } }
在上面的代码中,使用了keyBy
方法来对数据流进行按键分区,然后使用window
方法来创建一个基于Event Time的滚动时间窗口。在这个例子中,我们使用了reduce
函数来对窗口中的元素进行求和。
非按键分区窗口:
import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.streaming.api.datastream.AllWindowedStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; public class NonKeyedWindowExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Long> input = env.fromElements(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L); AllWindowedStream<Long, ?> windowedStream = input.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); windowedStream.reduce(new ReduceFunction<Long>() { @Override public Long reduce(Long value1, Long value2) throws Exception { return value1 + value2; } }).print(); env.execute(); } }
在上面的代码中,使用了windowAll
方法来对非按键分区的数据流进行窗口操作。windowAll
方法接受一个WindowAssigner
参数,用来指定窗口类型。然后使用了reduce
函数来对窗口中的元素进行求和。
按键分区窗口(Keyed Windows)经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流(logical streams),这就是KeyedStream。基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。所以可以认为,每个key上都定义了一组窗口,各自独立地进行统计计算。
非按键分区(Non-Keyed Windows)如果没有进行keyBy,那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。所以在实际应用中一般不推荐使用这种方式
窗口函数(WindowFunction)
所谓的“窗口函数”(window functions),就是定义窗口如何进行计算的操作。
窗口函数根据处理的方式可以分为两类:增量聚合函数和全量聚合函数。
增量聚合函数
增量聚合函数每来一条数据就立即进行计算,中间保持着聚合状态;但是不立即输出结果。等到窗口到了结束时间需要输出计算结果的时候,取出之前聚合的状态直接输出。
常见的增量聚合的函数有:reduce(reduceFunction)、aggregate(aggregateFunction)、sum()、min()、max()。
下面是一个使用增量聚合函数的Java代码示例:
DataStream<Tuple2<String, Integer>> input = ... input.keyBy(new KeySelector<Tuple2<String, Integer>, String>() { @Override public String getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } }) .timeWindow(Time.seconds(5)) .reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t0, Tuple2<String, Integer> t1) throws Exception { return new Tuple2<>(t0.f0, t0.f1 + t1.f1); } });
这段代码首先使用keyBy
方法按照Tuple2中的第一个元素(f0)进行分组。然后,它定义了一个5秒的时间窗口,并使用reduce
方法对每个窗口内的数据进行聚合操作。在这个例子中,聚合操作是将具有相同key(即f0相同)的元素的第二个元素(f1)相加。最终,这段代码将输出一个包含每个key在每个5秒窗口内f1值之和的数据流。
另外还有一个常用的函数是聚合函数(AggregateFunction),ReduceFunction和AggregateFunction都是增量聚合函数,但它们之间有一些区别。AggregateFunction则更加灵活,ReduceFunction的输入类型、输出类型和中间状态类型必须相同,而AggregateFunction则允许这三种类型不同。
例如,如果我们希望计算一组数据的平均值,应该怎样做聚合呢?这时我们需要计算两个状态量:数据的总和(sum),以及数据的个数(count),而最终输出结果是两者的商(sum/count)。如果用ReduceFunction,那么我们应该先把数据转换成二元组 (sum, count)的形式,然后进行归约聚合,最后再将元组的两个元素相除转换得到最后的平均值。本来应该只是一个任务,可我们却需要 map-reduce-map 三步操作,这显然不够高效。而使用AggregateFunction则可以更加简单地实现这个需求。
下面是使用AggregateFunction计算平均值的代码示例:
DataStream<Tuple2<String, Double>> input = ... input .keyBy(new KeySelector<Tuple2<String, Double>, String>() { @Override public String getKey(Tuple2<String, Double> value) throws Exception { return value.f0; } }) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .aggregate(new AggregateFunction<Tuple2<String, Double>, Tuple2<Double, Integer>, Double>() { @Override public Tuple2<Double, Integer> createAccumulator() { return new Tuple2<>(0.0, 0); } @Override public Tuple2<Double, Integer> add(Tuple2<String, Double> value, Tuple2<Double, Integer> accumulator) { return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1); } @Override public Double getResult(Tuple2<Double, Integer> accumulator) { return accumulator.f0 / accumulator.f1; } @Override public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) { return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1); } });
这段代码首先使用keyBy
方法按照Tuple2中的第一个元素(f0)进行分组。然后,它定义了一个5秒的翻滚事件时间窗口,并使用aggregate
方法对每个窗口内的数据进行聚合操作。在这个例子中,聚合操作是计算具有相同key(即f0相同)的元素的第二个元素(f1)的平均值。最终,这段代码将输出一个包含每个key在每个5秒窗口内f1值平均值的数据流。