8.Flink增量聚合和全窗口函数
8.1.AggregateFunction增量聚合函数
- 增量聚合函数
aggregate(agg函数,WindowFunction(){ })
- 窗口保存临时数据,每进入一个新数据,会与中间数据累加,生成新的中间数据,再保存到窗口中
- 常见的增量聚合函数有 reduceFunction、aggregateFunction
- min、max、sum 都是简单的聚合操作,不需要自定义规则
AggregateFunction<IN, ACC, OUT> IN是输入类型,ACC是中间聚合状态类型,OUT是输出类型,是聚合统计当前窗口的数据
- 滚动窗口聚合案例
/** * @author lixiang * Tumbling-Window滚动窗口 */ public class FlinkWindow1Demo { public static void main(String[] args) throws Exception { //构建环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStream<VideoOrderDO> ds = env.addSource(new VideoOrderSource()); WindowedStream<VideoOrderDO, String, TimeWindow> stream = ds.keyBy(new KeySelector<VideoOrderDO, String>() { @Override public String getKey(VideoOrderDO value) throws Exception { return value.getTitle(); } }).window(TumblingProcessingTimeWindows.of(Time.seconds(5))); stream.aggregate(new AggregateFunction<VideoOrderDO, Map<String, Object>, Map<String, Object>>() { //初始化累加器 @Override public Map<String, Object> createAccumulator() { return new HashMap<>(); } //聚合方式 @Override public Map<String, Object> add(VideoOrderDO value, Map<String, Object> accumulator) { if (accumulator.size() == 0) { accumulator.put("title", value.getTitle()); accumulator.put("money", value.getMoney()); accumulator.put("num", 1); accumulator.put("createTime", value.getCreateTime()); } else { accumulator.put("title", value.getTitle()); accumulator.put("money", value.getMoney() + Integer.parseInt(accumulator.get("money").toString())); accumulator.put("num", 1 + Integer.parseInt(accumulator.get("num").toString())); accumulator.put("createTime", value.getCreateTime()); } return accumulator; } //返回结果 @Override public Map<String, Object> getResult(Map<String, Object> accumulator) { return accumulator; } //合并内容 @Override public Map<String, Object> merge(Map<String, Object> a, Map<String, Object> b) { return null; } }).print(); env.execute("Tumbling Window job"); } }
8.2.WindowFunction全窗口函数
- 全窗口函数
apply(new WindowFunction(){ })
- 窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算
- 常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息,包括窗口信息)
IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗 WindowFunction<IN, OUT, KEY, W extends Window>
- 案例实战
/** * @author lixiang * apply */ public class FlinkWindow2Demo { public static void main(String[] args) throws Exception { //构建环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStream<VideoOrderDO> ds = env.addSource(new VideoOrderSource()); WindowedStream<VideoOrderDO, String, TimeWindow> stream = ds.keyBy(new KeySelector<VideoOrderDO, String>() { @Override public String getKey(VideoOrderDO value) throws Exception { return value.getTitle(); } }).window(TumblingProcessingTimeWindows.of(Time.seconds(5))); stream.apply(new WindowFunction<VideoOrderDO, Map<String,Object>, String, TimeWindow>() { @Override public void apply(String key, TimeWindow timeWindow, Iterable<VideoOrderDO> iterable, Collector<Map<String, Object>> collector) throws Exception { List<VideoOrderDO> list = IterableUtils.toStream(iterable).collect(Collectors.toList()); long sum = list.stream().collect(Collectors.summarizingInt(VideoOrderDO::getMoney)).getSum(); Map<String,Object> map = new HashMap<>(); map.put("sumMoney",sum); map.put("title",key); collector.collect(map); } }).print(); env.execute("apply Window job"); } }
8.3.processWindowFunction全窗口函数
- 全窗口函数
process(new ProcessWindowFunction(){})
- 窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算
- 常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息,包括窗口信息)
IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗 ProcessWindowFunction<IN, OUT, KEY, W extends Window>
- 案例实战
/** * @author lixiang * process-Window滚动窗口 */ public class FlinkWindow3Demo { public static void main(String[] args) throws Exception { //构建环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStream<VideoOrderDO> ds = env.addSource(new VideoOrderSource()); WindowedStream<VideoOrderDO, String, TimeWindow> stream = ds.keyBy(new KeySelector<VideoOrderDO, String>() { @Override public String getKey(VideoOrderDO value) throws Exception { return value.getTitle(); } }).window(TumblingProcessingTimeWindows.of(Time.seconds(5))); stream.process(new ProcessWindowFunction<VideoOrderDO, Map<String,Object>, String, TimeWindow>() { @Override public void process(String key, ProcessWindowFunction<VideoOrderDO, Map<String, Object>, String, TimeWindow>.Context context, Iterable<VideoOrderDO> iterable, Collector<Map<String, Object>> collector) throws Exception { List<VideoOrderDO> list = IterableUtils.toStream(iterable).collect(Collectors.toList()); long sum = list.stream().collect(Collectors.summarizingInt(VideoOrderDO::getMoney)).getSum(); Map<String,Object> map = new HashMap<>(); map.put("sumMoney",sum); map.put("title",key); collector.collect(map); } }).print(); env.execute("process Window job"); } }
窗口函数对比
- 增量聚合
aggregate(new AggregateFunction(){});
- 全窗口聚合
apply(new WindowFunction(){}) process(new ProcessWindowFunction(){}) //比WindowFunction功能强大
9.迟到无序数据处理watermark
9.1.Watermark简介和应用
(1)基本概念
- Window:Window是处理无界流的关键,Windows将流拆分为一个个有限大小的buckets,可以可以在每一个buckets中进行计算
- start_time,end_time:当Window时时间窗口的时候,每个window都会有一个开始时间和结束时间(前开后闭),这个时间是系统时间
- event-time: 事件发生时间,是事件发生所在设备的当地时间,
- 比如一个点击事件的时间发生时间,是用户点击操作所在的手机或电脑的时间
- Watermarks:可以把他理解为一个水位线,等于evevtTime - delay(比如规定为20分钟),一旦Watermarks大于了某个window的end_time,
就会触发此window的计算,Watermarks就是用来触发1window计算的。
- Watermaker = 当前计算窗口最大的事件时间 - 允许乱序延迟的时间
推迟窗口触发的时间,实现方式:通过当前窗口中最大的eventTime-延迟时间所得到的Watermark与窗口原始触发时间进行对比,当Watermark大于窗口原始触发时间时则触发窗口执行!!!我们知道,流处理从事件产生,
到流经source,再到operator,中间是有一个过程和时间的,虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、分布式等原因,导致乱序的产生,所谓乱序,
就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的。
那么此时出现一个问题,一旦出现乱序,如果只根据eventTime决定window的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去此时必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了,这个特别的机制,就是Watermark。
(2)Watermark水位线介绍
- 由flink的某个operator操作生成后,就在整个程序中随event数据流转
- With Periodic Watermarks(周期生成,可以定义一个最大允许乱序的时间,用的很多)
- With Punctuated Watermarks(标点水位线,根据数据流中某些特殊标记事件来生成,相对少)
- 衡量数据是否乱序的时间,什么时候不用等早之前的数据
- 是一个全局时间戳,不是某一个key下的值
- 是一个特殊字段,单调递增的方式,主要是和数据本身的时间戳做比较
- 用来确定什么时候不再等待更早的数据了,可以触发窗口进行计算,忍耐是有限度的,给迟到的数据一些机会
- 注意
- Watermark 设置太小会影响数据准确性,设置太大会影响数据的实时性,更加会加重Flink作业的负担
- 需要经过测试,和业务相关联,得出一个较合适的值即可
- 触发计算后,其他窗口内数据再到达也被丢弃
9.2.Watermark案例实战
- 概念很抽象,下面用一个案例给解释下watermark的作用。
- 需求:每10s分组统计不同视频的成交总价,数据有乱序延迟,允许5秒的时间。
(1)时间工具类
public class TimeUtil { /** * 时间处理 * @param date * @return */ public static String toDate(Date date){ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); ZoneId zoneId = ZoneId.systemDefault(); return formatter.format(date.toInstant().atZone(zoneId)); } /** * 字符串转日期类型 * @param time * @return */ public static Date strToDate(String time){ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); LocalDateTime dateTime = LocalDateTime.parse(time, formatter); return Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant()); } /** * 时间处理 * @param date * @return */ public static String format(long date){ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); ZoneId zoneId = ZoneId.systemDefault(); return formatter.format(new Date(date).toInstant().atZone(zoneId)); } }
(2)Flink入口函数
/** * @author lixiang */ public class FlinkWaterDemo { public static void main(String[] args) throws Exception { //初始化环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度 env.setParallelism(1); //监听socket输入 DataStreamSource<String> source = env.socketTextStream("192.168.139.20", 8888); //一对多转换,将输入的字符串转成Tuple类型 SingleOutputStreamOperator<Tuple3<String, String, Integer>> flatMap = source.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception { String[] split = value.split(","); out.collect(new Tuple3<String,String,Integer>(split[0],split[1],Integer.parseInt(split[2]))); } }); //设置watermark,官方文档直接拿来的,注意修改自己的时间参数 SingleOutputStreamOperator<Tuple3<String, String, Integer>> watermarks = flatMap.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((element, recordTimestamp) -> { return TimeUtil.strToDate(element.f1).getTime(); })); //根据标题进行分组 watermarks.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() { @Override public String getKey(Tuple3<String, String, Integer> value) throws Exception { return value.f0; } }).window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new WindowFunction<Tuple3<String, String, Integer>, String, String, TimeWindow>() { //滚动窗口,10s一统计,全窗口函数 @Override public void apply(String key, TimeWindow timeWindow, Iterable<Tuple3<String, String, Integer>> iterable, Collector<String> collector) throws Exception { List<String> eventTimeList = new ArrayList<>(); int total = 0; for (Tuple3<String, String, Integer> order : iterable) { eventTimeList.add(order.f1); total = total + order.f2; } String outStr = "分组key:"+key+",总价:"+total+",窗口开始时间:"+TimeUtil.format(timeWindow.getStart())+",窗口结束时间:"+TimeUtil.format(timeWindow.getEnd())+",窗口所有事件时间:"+eventTimeList; collector.collect(outStr); } }).print(); env.execute("watermark job"); } }
(3)测试数据,nc -lk 8888监听8888端口,一条一条的输入
[root@flink ~]# nc -lk 8888 java,2022-11-11 23:12:07,10 java,2022-11-11 23:12:11,10 java,2022-11-11 23:12:08,10 mysql,2022-11-11 23:12:13,20 java,2022-11-11 23:12:13,10 java,2022-11-11 23:12:17,10 java,2022-11-11 23:12:09,10 java,2022-11-11 23:12:20,10 java,2022-11-11 23:12:22,10 java,2022-11-11 23:12:25,10
9.3.二次兜底延迟数据处理
- 超过了watermark的等待后,还有延迟数据到达怎么办?
- 上一个案例我们发现,第七个窗口本是[0-10s)窗口的数据,
- 但是[0-10s)窗口的数据已经被统计了,所以数据丢失了,这就需要allowedLateness 来做二次兜底延迟数据处理。
- 编码很简单,只需要在开窗函数那设置allowedLateness即可
/** * @author lixiang */ public class FlinkWaterDemo { public static void main(String[] args) throws Exception { //初始化环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度 env.setParallelism(1); //监听socket输入 DataStreamSource<String> source = env.socketTextStream("192.168.139.20", 8888); //一对多转换,将输入的字符串转成Tuple类型 SingleOutputStreamOperator<Tuple3<String, String, Integer>> flatMap = source.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception { String[] split = value.split(","); out.collect(new Tuple3<String,String,Integer>(split[0],split[1],Integer.parseInt(split[2]))); } }); //设置watermark,官方文档直接拿来的,注意修改自己的时间参数 SingleOutputStreamOperator<Tuple3<String, String, Integer>> watermarks = flatMap.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((element, recordTimestamp) -> { return TimeUtil.strToDate(element.f1).getTime(); })); //根据标题进行分组 watermarks.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() { @Override public String getKey(Tuple3<String, String, Integer> value) throws Exception { return value.f0; } }).window(TumblingEventTimeWindows.of(Time.seconds(10))) //1min的容忍时间,即使时间段窗口被统计了,只要数据没有超过1min就可以再次被统计进去 .allowedLateness(Time.minutes(1)) .apply(new WindowFunction<Tuple3<String, String, Integer>, String, String, TimeWindow>() { //滚动窗口,10s一统计,全窗口函数 @Override public void apply(String key, TimeWindow timeWindow, Iterable<Tuple3<String, String, Integer>> iterable, Collector<String> collector) throws Exception { List<String> eventTimeList = new ArrayList<>(); int total = 0; for (Tuple3<String, String, Integer> order : iterable) { eventTimeList.add(order.f1); total = total + order.f2; } String outStr = "分组key:"+key+",总价:"+total+",窗口开始时间:"+TimeUtil.format(timeWindow.getStart())+",窗口结束时间:"+TimeUtil.format(timeWindow.getEnd())+",窗口所有事件时间:"+eventTimeList; collector.collect(outStr); } }).print(); env.execute("watermark job"); } }
- 测试
9.4.最后的兜底延迟数据处理
- watermark先输出,然后配置allowedLateness 再延长时间,然后到了后更新之前的窗口数据。
- 数据超过了allowedLateness 后,怎么办?这会就用侧输出流 SideOutput,最终的一个兜底。
- 侧输出流不会在统计到之前的窗口上,类似于独立存储起来,就是说没有被统计的数据会被单独存放在一个容器中,自定义的去进行最终一致性的操作。我们的数据输出会存放到redis或者mysql,侧输出的那一部分也存放在redis或者mysql,这样等统计之后,我们可以手动的讲结果进行整合。
- 编码实战
/** * @author lixiang */ public class FlinkWaterDemo { public static void main(String[] args) throws Exception { //初始化环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度 env.setParallelism(1); //监听socket输入 DataStreamSource<String> source = env.socketTextStream("192.168.139.20", 8888); //一对多转换,将输入的字符串转成Tuple类型 SingleOutputStreamOperator<Tuple3<String, String, Integer>> flatMap = source.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception { String[] split = value.split(","); out.collect(new Tuple3<String,String,Integer>(split[0],split[1],Integer.parseInt(split[2]))); } }); //设置watermark,官方文档直接拿来的,注意修改自己的时间参数 SingleOutputStreamOperator<Tuple3<String, String, Integer>> watermarks = flatMap.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((element, recordTimestamp) -> { return TimeUtil.strToDate(element.f1).getTime(); })); //new 一个OutputTag Bean OutputTag<Tuple3<String,String,Integer>> lateData = new OutputTag<Tuple3<String,String,Integer>>("lateData"){}; //根据标题进行分组 SingleOutputStreamOperator<String> operator = watermarks.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() { @Override public String getKey(Tuple3<String, String, Integer> value) throws Exception { return value.f0; } }).window(TumblingEventTimeWindows.of(Time.seconds(10))) //1min的容忍时间,即使时间段窗口被统计了,只要数据没有超过1min就可以再次被统计进去 .allowedLateness(Time.minutes(1)) //侧输入,最后的兜底数据 .sideOutputLateData(lateData) .apply(new WindowFunction<Tuple3<String, String, Integer>, String, String, TimeWindow>() { //滚动窗口,10s一统计,全窗口函数 @Override public void apply(String key, TimeWindow timeWindow, Iterable<Tuple3<String, String, Integer>> iterable, Collector<String> collector) throws Exception { List<String> eventTimeList = new ArrayList<>(); int total = 0; for (Tuple3<String, String, Integer> order : iterable) { eventTimeList.add(order.f1); total = total + order.f2; } String outStr = "分组key:" + key + ",总价:" + total + ",窗口开始时间:" + TimeUtil.format(timeWindow.getStart()) + ",窗口结束时间:" + TimeUtil.format(timeWindow.getEnd()) + ",窗口所有事件时间:" + eventTimeList; collector.collect(outStr); } }); operator.print(); //侧输出流数据 operator.getSideOutput(lateData).print(); env.execute("watermark job"); } }
- 测试
[root@flink ~]# nc -lk 8888 java,2022-11-11 23:12:07,10 java,2022-11-11 23:12:11,10 java,2022-11-11 23:12:08,10 mysql,2022-11-11 23:12:13,20 java,2022-11-11 23:12:13,10 java,2022-11-11 23:12:17,10 java,2022-11-11 23:12:09,10 java,2022-11-11 23:12:20,10 java,2022-11-11 23:14:22,10 java,2022-11-11 23:12:25,10 #设置一个超过1分钟的数据,测试
9.5.Flink多层保证措施归纳
(1)如何保证在需要的窗口内获取指定的数据?数据有乱序延迟
- flink采用watermark、allowedLateness()、sideOutputLateData()三个机制来保证获取数据。
- watermark的作用是防止出现延迟乱序,允许等待一会在触发窗口计算。
- allowLateness,是将窗口关闭时间在延迟一段时间,允许有一个最大迟到时间,allowLateness的数据会重新触发窗口计算
- sideOutPut是最后的兜底操作,超过allowLateness后,窗口已经彻底关闭,就会把数据放到侧输出流,侧输出流OutputTag tag = new OutputTag(){},由于泛型擦除的问题,需要重写方法,加花括号。
(2)应用场景:实时监控平台
- 可以用watermark及时输出数据
- allowLateness 做短期的更新迟到数据
- sideOutPut做兜底更新保证数据准确性
(3)总结Flink的机制
- 第一层 窗口window 的作用是从DataStream数据流里指定范围获取数据。
- 第二层 watermark的作用是防止数据出现乱序延迟,允许窗口等待延迟数据达到,再触发计算
- 第三层 allowLateness 会让窗口关闭时间再延迟一段时间, 如果还有数据达到,会局部修复数据并主动更新窗口的数据输出
- 第四层 sideOutPut侧输出流是最后兜底操作,在窗口已经彻底关闭后,所有过期延迟数据放到侧输出流,可以单独获取,存储到某个地方再批量更新之前的聚合的数据
- 注意
- Flink 默认的处理方式直接丢弃迟到的数据
- sideOutPut还可以进行分流功能
- DataStream没有getSideOutput方法,SingleOutputStreamOperator才有
(4)版本弃用API
新接口,`WatermarkStrategy`,`TimestampAssigner` 和 `WatermarkGenerator` 因为其对时间戳和 watermark 等重点的抽象和分离很清晰,并且还统一了周期性和标记形式的 watermark 生成方式 新接口之前是用AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks ,现在可以弃用了
10.Flink状态State管理和Checkpoint
10.1.Flink的状态State管理简介
(1)什么是State状态
- 数据流处理离不开状态管理,比如窗口聚合统计、去重、排序等
- 是一个Operator的运行的状态\历史值,是维护在内存中
- 流程:一个算子的子任务接收输入流,获取对应的状态,计算新的结果然后把结果更新到状态里面
(2)有状态和无状态介绍
- 无状态计算: 同个数据进到算子里面多少次,都是一样的输出,比如 filter
- 有状态计算:需要考虑历史状态,同个输入会有不同的输出,比如sum、reduce聚合操作
(3)状态管理分类
- ManagedState
- Flink管理,自动存储恢复
- 细分两类
- Keyed State 键控状态(用的多)
- 有KeyBy才用这个,仅限用在KeyStream中,每个key都有state ,是基于KeyedStream上的状态
- 一般是用richFlatFunction,或者其他richfunction里面,在open()声明周期里面进行初始化
- ValueState、ListState、MapState等数据结构
- Operator State 算子状态(用的少,部分source会用)
- ListState、UnionListState、BroadcastState等数据结构
- RawState
- 用户自己管理和维护
- 存储结构:二进制数组
- State数据结构(状态值可能存在内存、磁盘、DB或者其他分布式存储中)
- ValueState 简单的存储一个值(ThreadLocal / String)
- ValueState.value()
- ValueState.update(T value)
- ListState 列表
- ListState.add(T value)
- ListState.get() //得到一个Iterator
- MapState 映射类型
- MapState.get(key)
- MapState.put(key, value)
10.2.Flink状态State后端存储讲解
从Flink 1.13开始,社区重新设计了其公共状态后端类,以帮助用户更好地理解本地状态存储和检查点存储的分离 用户可以迁移现有应用程序以使用新 API,⽽不会丢失任何状态或⼀致性。
(1)Flink内置了以下这些开箱即用的state backends :
- (新版)HashMapStateBackend、EmbeddedRocksDBStateBackend
- 如果没有其他配置,系统将使用 HashMapStateBackend。
- (旧版)MemoryStateBackend、FsStateBackend、RocksDBStateBackend
- 如果不设置,默认使用 MemoryStateBackend
(2)State状态详解
HashMapStateBackend 保存数据在内部作为Java堆的对象。
- 键/值状态和窗口操作符持有哈希表,用于存储值、触发器等
- 非常快,因为每个状态访问和更新都对 Java 堆上的对象进行操作
- 但是状态大小受集群内可用内存的限制
- 场景:
- 具有大状态、长窗口、大键/值状态的作业。
- 所有高可用性设置。
EmbeddedRocksDBStateBackend 在RocksDB数据库中保存状态数据
- 该数据库(默认)存储在 TaskManager 本地数据目录中
- 与HashMapStateBackend在java存储 对象不同,数据存储为序列化的字节数组
- RocksDB可以根据可用磁盘空间进行扩展,并且是唯一支持增量快照的状态后端。
- 但是每个状态访问和更新都需要(反)序列化并可能从磁盘读取,这导致平均性能比内存状态后端慢一个数量级
- 场景
- 具有非常大状态、长窗口、大键/值状态的作业。
- 所有高可用性设置
- 旧版的状态管理
MemoryStateBackend(内存,不推荐在生产场景使用) FsStateBackend(文件系统上,本地文件系统、HDFS, 性能更好,常用) RocksDBStateBackend (无需担心 OOM 风险,是大部分时候的选择)
(3)配置方式
方式一:可以flink-conf.yaml使用配置键在中配置默认状态后端state.backend。
配置条目的可能值是hashmap (HashMapStateBackend)、rocksdb (EmbeddedRocksDBStateBackend) 或实现状态后端工厂StateBackendFactory的类的完全限定类名 #全局配置例子一 # The backend that will be used to store operator state checkpoints state.backend: hashmap # Optional, Flink will automatically default to JobManagerCheckpointStorage # when no checkpoint directory is specified. state.checkpoint-storage: jobmanager #全局配置例子二 state.backend: rocksdb state.checkpoints.dir: file:///checkpoint-dir/ # Optional, Flink will automatically default to FileSystemCheckpointStorage # when a checkpoint directory is specified. state.checkpoint-storage: filesystem
方式二:代码 单独job配置例子
//代码配置一 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new HashMapStateBackend()); env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage()); //代码配置二 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new EmbeddedRocksDBStateBackend()); env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir"); //或者 env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
- 备注:使用 RocksDBStateBackend 需要加依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId> <version>1.13.1</version> </dependency>
10.3.Flink的状态State管理实战
- sum()、maxBy() 等函数底层源码也是有ValueState进行状态存储
- 需求:
- 根据订单进行分组,统计找出每个商品最大的订单成交额
- 不用maxBy实现,用ValueState实现
- 编码实战
/** * 使用valueState实现maxBy功能,统计分组内订单金额最高的订单 * @author lixiang */ public class FlinkStateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<String> ds = env.socketTextStream("192.168.139.20", 8888); DataStream<Tuple3<String, String, Integer>> flatMapDS = ds.flatMap(new RichFlatMapFunction<String, Tuple3<String, String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception { String[] arr = value.split(","); out.collect(Tuple3.of(arr[0], arr[1], Integer.parseInt(arr[2]))); } }); //一定要key by后才可以使用键控状态ValueState SingleOutputStreamOperator<Tuple2<String, Integer>> maxVideoOrder = flatMapDS.keyBy(new KeySelector<Tuple3<String,String,Integer>, String>() { @Override public String getKey(Tuple3<String, String, Integer> value) throws Exception { return value.f0; } }).map(new RichMapFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>>() { private ValueState<Integer> valueState = null; @Override public void open(Configuration parameters) throws Exception { valueState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("total", Integer.class)); } @Override public Tuple2<String, Integer> map(Tuple3<String, String, Integer> tuple3) throws Exception { // 取出State中的最大值 Integer stateMaxValue = valueState.value(); Integer currentValue = tuple3.f2; if (stateMaxValue == null || currentValue > stateMaxValue) { //更新状态,把当前的作为新的最大值存到状态中 valueState.update(currentValue); return Tuple2.of(tuple3.f0, currentValue); } else { //历史值更大 return Tuple2.of(tuple3.f0, stateMaxValue); } } }); maxVideoOrder.print(); env.execute("valueState job"); } }
- 测试
[root@flink ~]# nc -lk 8888 java,2022-11-11 23:12:07,10 java,2022-11-11 23:12:11,10 java,2022-11-11 23:12:08,30 mysql,2022-11-11 23:12:13,20 java,2022-11-11 23:12:13,10
10.4.Flink的Checkpoint-SavePoint
- 什么是Checkpoint 检查点
- Flink中所有的Operator的当前State的全局快照
- 默认情况下 checkpoint 是禁用的
- Checkpoint是把State数据定时持久化存储,防止丢失
- 手工调用checkpoint,叫 savepoint,主要是用于flink集群维护升级等
- 底层使用了Chandy-Lamport 分布式快照算法,保证数据在分布式环境下的一致性
- 开箱即用,Flink 捆绑了这些检查点存储类型:
- 作业管理器检查点存储 JobManagerCheckpointStorage
- 文件系统检查点存储 FileSystemCheckpointStorage
- 配置
//全局配置checkpoints state.checkpoints.dir: hdfs:///checkpoints/ //作业单独配置checkpoints env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints-data/"); //全局配置savepoint state.savepoints.dir: hdfs:///flink/savepoints
Savepoint 与 Checkpoint 的不同之处
- 类似于传统数据库中的备份与恢复日志之间的差异
- Checkpoint 的主要目的是为意外失败的作业提供【重启恢复机制】,
- Checkpoint 的生命周期由 Flink 管理,即 Flink 创建,管理和删除 Checkpoint - 无需用户交互
- Savepoint 由用户创建,拥有和删除, 主要是【升级 Flink 版本】,调整用户逻辑
- 除去概念上的差异,Checkpoint 和 Savepoint 的当前实现基本上使用相同的代码并生成相同的格式
- 端到端(end-to-end)状态一致性
数据一致性保证都是由流处理器实现的,也就是说都是在Flink流处理器内部保证的
在真实应用中,了流处理器以外还包含了数据源(例如Kafka、Mysql)和输出到持久化系统(Kafka、Mysql、Hbase、CK)
端到端的一致性保证,是意味着结果的正确性贯穿了整个流处理应用的各个环节,每一个组件都要保证自己的一致性。
- Source
- 需要外部数据源可以重置读取位置,当发生故障的时候重置偏移量到故障之前的位置
- 内部
- 依赖Checkpoints机制,在发生故障的时可以恢复各个环节的数据
- Sink:
- 当故障恢复时,数据不会重复写入外部系统,常见的就是 幂等和事务写入(和checkpoint配合)
10.5.Flink的Checkpoint代码配置
//两个检查点之间间隔时间,默认是0,单位毫秒 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); //Checkpoint过程中出现错误,是否让整体任务都失败,默认值为0,表示不容忍任何Checkpoint失败 env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5); //Checkpoint是进行失败恢复,当一个 Flink 应用程序失败终止、人为取消等时,它的 Checkpoint 就会被清除 //可以配置不同策略进行操作 // DELETE_ON_CANCELLATION: 当作业取消时,Checkpoint 状态信息会被删除,因此取消任务后,不能从 Checkpoint 位置进行恢复任务 // RETAIN_ON_CANCELLATION(多): 当作业手动取消时,将会保留作业的 Checkpoint 状态信息,要手动清除该作业的 Checkpoint 状态信息 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //Flink 默认提供 Extractly-Once 保证 State 的一致性,还提供了 Extractly-Once,At-Least-Once 两种模式, // 设置checkpoint的模式为EXACTLY_ONCE,也是默认的, env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //设置checkpoint的超时时间, 如果规定时间没完成则放弃,默认是10分钟 env.getCheckpointConfig().setCheckpointTimeout(60000); //设置同一时刻有多少个checkpoint可以同时执行,默认为1就行,以避免占用太多正常数据处理资源 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //设置了重启策略, 作业在失败后能自动恢复,失败后最多重启3次,每次重启间隔10s env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));
11.Flink复杂事件处理CEP
11.1.Flink复杂事件处理CEP介绍
(1)什么是FlinkCEP
- CEP全称 Complex event processing 复杂事件处理
- FlinkCEP 是在 Flink 之上实现的复杂事件处理(CEP)库
- 擅长高吞吐、低延迟的处理,市场上有多种CEP的解决方案,例如Spark,但是Flink专门类库更方便使用
(2)FlinkCEP用途
- 检测和发现无边界事件流中多个记录的关联规则,得到满足规则的复杂事件
- 允许业务定义要从输入流中提取的复杂模式序列
(3)FlinkCEP使用流程
- 定义pattern
- pattern应用到数据流,得到模式流
- 从模式流 获取结果
(4)CEP并不包含在flink中,使用前需要自己导入
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep-scala_2.11</artifactId> <version>1.7.0</version> </dependency>
11.2.Flink的复杂事件CEP常见概念
(1)模式(Pattern):定义处理事件的规则
- 三种模式PatternAPI
- 个体模式(Individual Patterns):组成复杂规则的每一个单独的模式定义,就是个体模式
组合模式(Combining Patterns):很多个体模式组合起来,形成组合模式
模式组(Groups of Patterns):将一个组合模式作为条件嵌套在个体模式里,就是模式组
近邻模式
严格近邻:期望所有匹配事件严格地一个接一个出现,中间没有任何不匹配的事件, API是.next()
宽松近邻:允许中间出现不匹配的事件,API是.followedBy()
非确定性宽松近邻:可以忽略已经匹配的条件,API是followedByAny()
指定时间约束:指定模式在多长时间内匹配有效,API是within
如果您不希望事件类型直接跟随另一个,notNext()
如果您不希望事件类型介于其他两种事件类型之间,notFollowedBy()
模式分类
单次模式:接收一次一个事件
循环模式:接收一个或多个事件
(2)其他参数
times:指定固定的循环执行次数
greedy:贪婪模式,尽可能多触发
oneOrMore:指定触发一次或多次
timesOrMore:指定触发固定以上的次数
optional:要么不触发要么触发指定的次数
11.3.Flink复杂事件CEP案例实战
需求:同个账号,在5秒内连续登录失败2次,则认为存在而已登录问题
数据格式 李祥,2022-11-11 12:01:01,-1
/** * cep-demo * @author lixiang */ public class FlinkCEPDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<String> ds = env.socketTextStream("192.168.139.20",8888); SingleOutputStreamOperator<Tuple3<String, String, Integer>> flatMapDS = ds.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception { String[] arr = value.split(","); out.collect(Tuple3.of(arr[0], arr[1], Integer.parseInt(arr[2]))); } }); SingleOutputStreamOperator<Tuple3<String, String, Integer>> watermarks = flatMapDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Integer>>forMonotonousTimestamps() .withTimestampAssigner((event, timestamp) -> TimeUtil.strToDate(event.f1).getTime())); KeyedStream<Tuple3<String, String, Integer>, String> keyedStream = watermarks.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() { @Override public String getKey(Tuple3<String, String, Integer> value) throws Exception { return value.f0; } }); //定义模式 Pattern<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>> pattern = Pattern.<Tuple3<String, String, Integer>> begin("firstTimeLogin") .where(new SimpleCondition<Tuple3<String, String, Integer>>() { @Override public boolean filter(Tuple3<String, String, Integer> value) throws Exception { return value.f2 == -1; } }) .next("secondTimeLogin") .where(new SimpleCondition<Tuple3<String, String, Integer>>() { @Override public boolean filter(Tuple3<String, String, Integer> value) throws Exception { return value.f2 == -1; } }).within(Time.seconds(5)); //匹配检查 PatternStream<Tuple3<String, String, Integer>> patternStream = CEP.pattern(keyedStream, pattern); SingleOutputStreamOperator<Tuple3<String, String, String>> select = patternStream.select(new PatternSelectFunction<Tuple3<String, String, Integer>, Tuple3<String, String, String>>() { @Override public Tuple3<String, String, String> select(Map<String, List<Tuple3<String, String, Integer>>> map) throws Exception { Tuple3<String, String, Integer> firstLoginFail = map.get("firstTimeLogin").get(0); Tuple3<String, String, Integer> secondLoginFail = map.get("secondTimeLogin").get(0); return Tuple3.of(firstLoginFail.f0, firstLoginFail.f1, secondLoginFail.f1); } }); select.print("匹配结果"); env.execute("CEP job"); } }
- 测试
张三,2022-11-11 12:01:01,-1 李四,2022-11-11 12:01:10,-1 李四,2022-11-11 12:01:11,-1 张三,2022-11-11 12:01:13,-1 李四,2022-11-11 12:01:14,-1 李四,2022-11-11 12:01:15,1 张三,2022-11-11 12:01:16,-1 李四,2022-11-11 12:01:17,-1 张三,2022-11-11 12:01:20,1
12.Flink项目打包插件+部署
12.1.Flink服务端多种部署模式
Flink 部署方式是灵活,主要是对Flink计算时所需资源的管理方式不同
- 文档:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/deployment/overview/
- Local 本地部署,直接启动进程,适合调试使用
- 直接部署启动服务
- Standalone Cluster集群部署,flink自带集群模式
- Hadoop YARN 计算资源统一由Hadoop YARN管理资源进行调度,按需使用提高集群的资源利用率
- Kubernetes 部署
- Docker部署
12.2.Flink本地模式部署Linux服务器
(1)安装JDK8环境
(1)上传jdk1.8安装包,解压到指目录 tar -xvf jdk-8u181-linux-x64.tar.gz -C /usr/local/ (2)查看解压后的文件 [root@flink ~]# cd /usr/local [root@flink local]# ls bin etc flink-1.13.1 games include jdk1.8.0_181 lib lib64 libexec sbin share src (3)jdk1.8.0_181重命名为jdk1.8 mv jdk1.8.0_181/ jdk1.8 (4)配置环境变量 vi /etc/profile 添加配置: JAVA_HOME=/usr/local/jdk1.8 CLASSPATH=$JAVA_HOME/lib/ PATH=$PATH:$JAVA_HOME/bin export PATH JAVA_HOME CLASSPATH (5)刷新配置 source /etc/profile (6)查看java环境 [root@flink ~]# java -version java version "1.8.0_181" Java(TM) SE Runtime Environment (build 1.8.0_181-b13) Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
- 注意:如果不安装jdk环境的话启动flink会报这个错误
Please specify JAVA_HOME. Either in Flink config ./conf/flink-conf.yaml or as system-wide JAVA_HOME.
(2)准备flink环境
tar -xvf flink-1.13.1-bin-scala_2.12.tgz -C /usr/local/
调整配置文件:conf/flink-conf.yaml #web ui 端口 rest.port=8081 #调整jobmanager和taskmanager的大小,根据自己的机器进行调整 jobmanager.memory.process.size: 256m taskmanager.memory.process.size: 256m
本地模式用到这两个脚本 start-cluster.sh stop-cluster.sh 启动本地模式:./start-cluster.sh 注意这会可能会报错: The derived from fraction jvm overhead memory (19.200mb (20132659 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead 原因分析: flink最少需要192M的内存才能启动,jdk1.8之后堆内存采用的是元空间,元空间是根据内存大小自动分配的。如果元空间给的太小,flink将会启动不起来。 调整如下配置:conf/flink-conf.yaml taskmanager.memory.process.size: 512m taskmanager.memory.framework.heap.size: 64m taskmanager.memory.framework.off-heap.size: 64m taskmanager.memory.jvm-metaspace.size: 64m taskmanager.memory.jvm-overhead.fraction: 0.2 taskmanager.memory.jvm-overhead.min: 16m taskmanager.memory.jvm-overhead.max: 64m taskmanager.memory.network.fraction: 0.1 taskmanager.memory.network.min: 1mb taskmanager.memory.network.max: 256mb
- 重新启动,成功
- 访问web UI ip:8081