点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
Hadoop(已更完)
HDFS(已更完)
MapReduce(已更完)
Hive(已更完)
Flume(已更完)
Sqoop(已更完)
Zookeeper(已更完)
HBase(已更完)
Redis (已更完)
Kafka(已更完)
Spark(已更完)
Flink(正在更新!)
章节内容
上节我们完成了如下的内容:
Flink Time 详解
示例内容分析
Watermark
Watermark
Watermark 在窗口计算中的作用
在使用基于事件时间的窗口时,Flink 依赖 Watermark 来决定何时触发窗口计算。例如,如果你有一个每 10 秒的滚动窗口,当 Watermark 达到某个窗口的结束时间后,Flink 才会触发该窗口的计算。
假设有一个 10 秒的窗口,并且 Watermark 达到 12:00:10,此时 Flink 会触发 12:00:00 - 12:00:10 的窗口计算。
如何处理迟到事件
尽管 Watermark 能有效解决乱序问题,但总有可能会出现事件在生成 Watermark 之后才到达的情况(即“迟到事件”)。为此,Flink 提供了处理迟到事件的机制:
允许一定的延迟处理:可以配置窗口允许迟到的时间。
迟到事件的侧输出流(Side Output):可以将迟到的事件发送到一个侧输出流中,以便后续处理。
DataStream<Tuple2<String, Integer>> mainStream = stream.keyBy(t -> t.f0) .window(TumblingEventTimeWindows.of(Time.seconds(10))) .allowedLateness(Time.seconds(5)) .sideOutputLateData(lateOutputTag);
代码实现
数据格式
01,1586489566000 01,1586489567000 01,1586489568000 01,1586489569000 01,1586489570000 01,1586489571000 01,1586489572000 01,1586489573000 01,1586489574000 01,1586489575000 01,1586489576000 01,1586489577000 01,1586489578000 01,1586489579000
编写代码
这段代码实现了:
通过 socket 获取实时流数据。
将流数据映射成带有时间戳的二元组形式。
应用了一个允许 5 秒乱序的水印策略,确保 Flink 可以处理乱序的事件流。
按照事件的 key 进行分组,并在事件时间的基础上进行 5 秒的滚动窗口计算。
最后输出每个窗口内事件的时间范围、窗口开始和结束时间等信息。
其中,这里对流数据进行了按 key(事件的第一个字段)分组,并且使用了 滚动窗口(Tumbling Window),窗口长度为 5 秒。
在 apply 方法中,你收集窗口中的所有事件,并根据事件时间戳进行排序,然后输出每个窗口的开始和结束时间,以及窗口中最早和最晚事件的时间戳。
SingleOutputStreamOperator<String> res = waterMark .keyBy(new KeySelector<Tuple2<String, Long>, String>() { @Override public String getKey(Tuple2<String, Long> value) throws Exception { return value.f0; } }) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .apply(new WindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() { @Override public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception { List<Long> list = new ArrayList<>(); for (Tuple2<String, Long> next : input) { list.add(next.f1); } Collections.sort(list); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); String result = "key: " + s + ", list.size(): " + list.size() + ", list.get(0): " + sdf.format(list.get(0)) + ", list.get(last): " + sdf.format(list.get(list.size() - 1)) + ", start: " + sdf.format(window.getStart()) + ", end: " + sdf.format(window.getEnd()); out.collect(result); } });
水印的策略,定义了一个Bounded Out-of-Orderness 的水印策略,允许最多 5 秒的事件乱序,在 extractTimestamp 中,提取了事件的时间戳,并打印出每个事件的 key 和对应的事件时间。还维护了一个 currentMaxTimestamp 来记录当前最大的事件时间戳:
WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy .<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() { Long currentMaxTimestamp = 0L; final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); @Override public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) { long timestamp = element.f1; currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp); System.out.println("Key:" + element.f0 + ", EventTime: " + element.f1 + ", " + format.format(element.f1)); return element.f1; } });
完整代码如下所示,代码实现了一个基于事件时间的流处理系统,并通过水印(Watermark)机制来处理乱序事件:
package icu.wzk; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.text.SimpleDateFormat; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; public class WatermarkTest01 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStreamSource<String> data = env.socketTextStream("localhost", 9999); SingleOutputStreamOperator<Tuple2<String, Long>> mapped = data.map( new MapFunction<String, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(String value) throws Exception { String[] split = value.split(","); return new Tuple2<>(split[0], Long.valueOf(split[1])); } } ); WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy .<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() { Long currentMaxTimestamp = 0L; final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); @Override public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) { long timestamp = element.f1; currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp); System.out.println("Key:" + element.f0 + ", EventTime: " + element.f1 + ", " + format.format(element.f1)); return element.f1; } }); SingleOutputStreamOperator<Tuple2<String, Long>> waterMark = mapped .assignTimestampsAndWatermarks(watermarkStrategy); SingleOutputStreamOperator<String> res = waterMark .keyBy(new KeySelector<Tuple2<String, Long>, String>() { @Override public String getKey(Tuple2<String, Long> value) throws Exception { return value.f0; } }) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .apply(new WindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() { @Override public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception { List<Long> list = new ArrayList<>(); for (Tuple2<String, Long> next : input) { list.add(next.f1); } Collections.sort(list); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); String result = "key: " + s + ", list.size(): " + list.size() + ", list.get(0): " + sdf.format(list.get(0)) + ", list.get(last): " + sdf.format(list.get(list.size() - 1)) + ", start: " + sdf.format(window.getStart()) + ", end: " + sdf.format(window.getEnd()); out.collect(result); } }); res.print(); env.execute(); } }
运行代码
传入数据
在控制台中,输入如下的数据:
查看结果
控制台运行结果如下: