2021年最新最全Flink系列教程__Flink高级API(三)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 2021年最新最全Flink系列教程__Flink高级API(三)

day03_Flink高级API

今日目标

  • Flink的四大基石
  • Flink窗口Window操作
  • Flink时间Time
  • Flink水印Watermark机制
  • Flink的state状态管理-keyed state 和 operator state

Flink的四大基石

  • Checkpoint 分布式一致性,解决数据丢失,故障恢复数据
  • State 状态,分为Keyed State ,Operator State; 数据结构的角度来说 ValueState、ListState、MapState,BroadcastState
  • Time , EventTime事件时间、Ingestion摄取时间、Process处理时间
  • Window窗口,TimeWindow 、 countwindow、 sessionwindow

Window操作

Window分类

  • time
  • 用的比较多 滚动窗口和滑动窗口
  • count

如何使用

案例

  • 需求
/**
 * Author itcast
 * Date 2021/5/7 9:13
 * 有如下数据表示:
 * 信号灯编号和通过该信号灯的车的数量
 * 9,3
 * 9,2
 * 9,7
 * 4,9
 * 2,6
 * 1,5
 * 2,3
 * 5,7
 * 5,4
 * 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口
 * 需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口
 */
  • 分析

  • 代码
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.concurrent.TimeUnit;
/**
 * Author itcast
 * Date 2021/5/7 9:13
 * 有如下数据表示:
 * 信号灯编号和通过该信号灯的车的数量
 * 9,3
 * 9,2
 * 9,7
 * 4,9
 * 2,6
 * 1,5
 * 2,3
 * 5,7
 * 5,4
 * 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口
 * 需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口
 */
public class WindowDemo {
    public static void main(String[] args) throws Exception {
        //1.创建流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.setParallelism(1);
        //2.获取数据源
        DataStreamSource<String> source = env.socketTextStream("node1", 9999);
        //3.转换操作 基于key window 统计
        DataStream<CartInfo> cartInfoDS = source.map(new MapFunction<String, CartInfo>() {
            @Override
            public CartInfo map(String value) throws Exception {
                String[] split = value.split(",");
                return new CartInfo(split[0], Integer.parseInt(split[1]));
            }
        });
        //统计 滚动窗口
        DataStream<CartInfo> result = cartInfoDS
                .keyBy(t -> t.getSensorId())
                //使用的是处理时间
                //每5秒钟统计一次,最近5秒钟内
                .window(TumblingProcessingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
                .sum("count");
        //统计 滑动窗口
        DataStream<CartInfo> result1 = cartInfoDS
                .keyBy(t -> t.getSensorId())
                //使用的是处理时间
                //每5秒钟统计一次,最近5秒钟内
                .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
                .sum("count");
        //4.打印输出
        result1.print();
        //5.执行流环境
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo {
        private String sensorId;//信号灯id
        private Integer count;//通过该信号灯的车的数量
    }
}
  • 需求2 - countwindow
    需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计–基于数量的滚动窗口
    需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计–基于数量的滑动窗口
  • 代码
/**
 * Author itcast
 * Date 2021/5/7 9:13
 * 有如下数据表示:
 * 信号灯编号和通过该信号灯的车的数量
 * 9,3
 * 9,2
 * 9,7
 * 4,9
 * 2,6
 * 1,5
 * 2,3
 * 5,7
 * 5,4
 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口
 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口
 */
public class WindowDemo02 {
    public static void main(String[] args) throws Exception {
        //1.创建流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.setParallelism(1);
        //2.获取数据源
        DataStreamSource<String> source = env.socketTextStream("node1", 9999);
        //3.转换操作 基于key window 统计
        DataStream<CartInfo> cartInfoDS = source.map(new MapFunction<String, CartInfo>() {
            @Override
            public CartInfo map(String value) throws Exception {
                String[] split = value.split(",");
                return new CartInfo(split[0], Integer.parseInt(split[1]));
            }
        });
        //统计 滚动计数窗口
        DataStream<CartInfo> result = cartInfoDS
                .keyBy(t -> t.getSensorId())
                //使用的是处理时间
                //每5条数据统计一次
                .countWindow(5)
                .sum("count");
        //统计 滑动计数窗口
        DataStream<CartInfo> result1 = cartInfoDS
                .keyBy(t -> t.getSensorId())
                //使用的是处理时间
                //每5秒钟统计一次,最近5秒钟内
                .countWindow(10,5)
                .sum("count");
        //4.打印输出
        //result.printToErr();
        result1.print();
        //5.执行流环境
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo {
        private String sensorId;//信号灯id
        private Integer count;//通过该信号灯的车的数量
    }
}
  • 统计会话指定时间内的数据,如果这个窗口内没有数据,就不在计算,设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算。
  • 案例 - 设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算

  • 代码
/**
 * Author itcast
 * Date 2021/5/7 9:13
 * 有如下数据表示:
 * 信号灯编号和通过该信号灯的车的数量
 * 9,3
 * 9,2
 * 9,7
 * 4,9
 * 2,6
 * 1,5
 * 2,3
 * 5,7
 * 5,4
    设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算
 */
public class WindowDemo03 {
    public static void main(String[] args) throws Exception {
        //1.创建流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.setParallelism(1);
        //2.获取数据源
        DataStreamSource<String> source = env.socketTextStream("node1", 9999);
        //3.转换操作 基于key window 统计
        DataStream<CartInfo> cartInfoDS = source.map(new MapFunction<String, CartInfo>() {
            @Override
            public CartInfo map(String value) throws Exception {
                String[] split = value.split(",");
                return new CartInfo(split[0], Integer.parseInt(split[1]));
            }
        });
        //统计 会话窗口
        DataStream<CartInfo> result = cartInfoDS
                .keyBy(t -> t.getSensorId())
                //使用的是处理时间
                //每5条数据统计一次
                .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
                .sum("count");
        //4.打印输出
        //result.printToErr();
        result.print();
        //5.执行流环境
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo {
        private String sensorId;//信号灯id
        private Integer count;//通过该信号灯的车的数量
    }
}

Time 时间

  • EventTime的重要性
  • 防止出现网络抖动,造成数据的乱序,数据统计的丢失
  • 窗口: 开始时间-结束时间

watermark 水印时间

  • watermark 水印机制
  • watermark 就是时间戳
  • watermark = eventTime - maxDelayTime
  • 触发计算 watermak >= 结束时间

watermark 案例

  • 需求
    有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
    要求每隔5s,计算5秒内,每个用户的订单总金额
    并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。
    基础版:
package cn.itcast.sz22.day03;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
 * Author itcast
 * Date 2021/5/7 11:04
 * Desc TODO
 */
public class WatermarkDemo01 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //
        env.setParallelism(1);
        //2.Source
        //模拟实时订单数据(数据有延迟和乱序)
        DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() {
            private boolean flag = true;
            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                Random random = new Random();
                while (flag) {
                    String orderId = UUID.randomUUID().toString();
                    int userId = random.nextInt(3);
                    int money = random.nextInt(100);
                    //模拟数据延迟和乱序!
                    long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;
                    ctx.collect(new Order(orderId, userId, money, eventTime));
                    TimeUnit.SECONDS.sleep(1);
                }
            }
            @Override
            public void cancel() {
                flag = false;
            }
        });
        // 添加水印机制 最大允许延迟的时间为 3 秒
        //orderDS.printToErr();
        //分配水印机制
        SingleOutputStreamOperator<Order> sum = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy
                //指定最大的延迟时间
                .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                //指定 eventTime 是哪个字段 long extractTimestamp(T element, long recordTimestamp);
                .withTimestampAssigner((element, recordTimestamp) -> element.getEventTime()))
                //统计每个用户对应 购买 金额
                .keyBy(t -> t.getUserId())
                //指定窗口,每5秒钟统计5秒钟之内的数据
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sum("money");
        sum.print();
        //
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
        private String orderId;
        private Integer userId;
        private Integer money;
        private Long eventTime;
    }
}
  • 扩展版:
package cn.itcast.sz22.day03;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
 * Author itcast
 * Date 2021/5/7 11:04
 * Desc TODO
 */
public class WatermarkDemo02 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //
        env.setParallelism(1);
        //2.Source
        //模拟实时订单数据(数据有延迟和乱序)
        DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() {
            private boolean flag = true;
            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                Random random = new Random();
                while (flag) {
                    String orderId = UUID.randomUUID().toString();
                    int userId = random.nextInt(3);
                    int money = random.nextInt(100);
                    //模拟数据延迟和乱序!
                    long eventTime = System.currentTimeMillis() - random.nextInt(15) * 1000;
                    ctx.collect(new Order(orderId, userId, money, eventTime));
                    TimeUnit.SECONDS.sleep(1);
                }
            }
            @Override
            public void cancel() {
                flag = false;
            }
        });
        DataStream<Order> WatermarkDS = orderDS
                .assignTimestampsAndWatermarks(
                        new WatermarkStrategy<Order>() {
                            @Override
                            public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                                return new WatermarkGenerator<Order>() {
                                    FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
                                    private int userId = 0;
                                    private long eventTime = 0L;
                                    private final long outOfOrdernessMillis = 3000;
                                    private long maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
                                    @Override
                                    public void onEvent(Order event, long eventTimestamp, WatermarkOutput output) {
                                        userId = event.userId;
                                        eventTime = event.eventTime;
                                        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
                                    }
                                    @Override
                                    public void onPeriodicEmit(WatermarkOutput output) {
                                        //Watermark = 当前最大事件时间 - 最大允许的延迟时间或乱序时间 时间戳
                                        Watermark watermark = new Watermark(maxTimestamp - outOfOrdernessMillis - 1);
                                        System.out.println("key:" + userId + ",系统时间:" + df.format(System.currentTimeMillis()) + ",事件时间:" + df.format(eventTime) + ",水印时间:" + df.format(watermark.getTimestamp()));
                                        output.emitWatermark(watermark);
                                    }
                                };
                            }
                        }.withTimestampAssigner((event, timestamp) -> event.getEventTime())
                );
        //代码走到这里,就已经被添加上Watermark了!接下来就可以进行窗口计算了
        //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
       /* DataStream<Order> result = WatermarkDS
                 .keyBy(Order::getUserId)
                //.timeWindow(Time.seconds(5), Time.seconds(5))
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sum("money");*/
        //开发中使用上面的代码进行业务计算即可
        //学习测试时可以使用下面的代码对数据进行更详细的输出,如输出窗口触发时各个窗口中的数据的事件时间,Watermark时间
        DataStream<String> result = WatermarkDS
                .keyBy(Order::getUserId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                //把apply中的函数应用在窗口中的数据上
                //WindowFunction<IN, OUT, KEY, W extends Window>
                .apply(new WindowFunction<Order, String, Integer, TimeWindow>() {
                    FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
                    @Override
                    public void apply(Integer key, TimeWindow window, Iterable<Order> input, Collector<String> out) throws Exception {
                        //准备一个集合用来存放属于该窗口的数据的事件时间
                        List<String> eventTimeList = new ArrayList<>();
                        for (Order order : input) {
                            Long eventTime = order.eventTime;
                            eventTimeList.add(df.format(eventTime));
                        }
                        String outStr = String.format("key:%s,窗口开始结束:[%s~%s),属于该窗口的事件时间:%s",
                                key.toString(), df.format(window.getStart()), df.format(window.getEnd()), eventTimeList);
                        out.collect(outStr);
                    }
                });
        // 添加水印机制 最大允许延迟的时间为 3 秒
        //orderDS.printToErr();
        result.printToErr();
        env.execute();
        //分配水印机制
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
            private String orderId;
            private Integer userId;
            private Integer money;
            private Long eventTime;
        }
}

Allowed lateness

  • 案例
    有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
    要求每隔5s,计算5秒内,每个用户的订单总金额
    并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。
package cn.itcast.sz22.day03;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
 * Author itcast
 * Date 2021/5/7 14:51
 * 有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
 * 要求每隔5s,计算5秒内,每个用户的订单总金额
 * 并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。
 */
public class WatermarkDemo03 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        //模拟实时订单数据(数据有延迟和乱序)
        DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() {
            private boolean flag = true;
            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                Random random = new Random();
                while (flag) {
                    String orderId = UUID.randomUUID().toString();
                    int userId = random.nextInt(3);
                    int money = random.nextInt(100);
                    //模拟数据延迟和乱序!
                    long eventTime = System.currentTimeMillis() - random.nextInt(15) * 1000;
                    ctx.collect(new Order(orderId, userId, money, eventTime));
                    TimeUnit.SECONDS.sleep(1);
                }
            }
            @Override
            public void cancel() {
                flag = false;
            }
        });
        OutputTag<Order> oot = new OutputTag<Order>("maxDelayOrder", TypeInformation.of(Order.class));
        //分配水印机制 eventTime 默认使用 maxDelay 3秒
        SingleOutputStreamOperator<Order> result = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy
                .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner((element, recordTimestamp) -> element.getEventTime()))
                .keyBy(t -> t.getUserId())
                //窗口设置  每隔5s,计算5秒内
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                //实例化侧输出流 主要用于晚于最大延迟 3 秒的数据
                .allowedLateness(Time.seconds(3))
                .sideOutputLateData(oot)
                //统计
                .sum("money");
        result.print("正常数据");
        result.getSideOutput(oot).print("严重迟到的数据");
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
        private String orderId;
        private Integer userId;
        private Integer money;
        private Long eventTime;
    }
}

状态管理 state

  • 有状态管理场景

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-znYxlAeB-1624261970363)(assets/image-20210507151242102.png)]

  • 是否被Flink托管分为两类
  1. managed state
    通过Flink自身进行状态的管理
    数据结构: valueState ListState mapState
  2. raw state
    需要用户、程序员自己维护状态
    数据结构: ListState
  • 是否基于 key 进行state 管理
  1. keyed state
    数据结构: valueState ListState mapState
    reducingState
  2. operator state
    数据结构: ListState

Keyed state

  • 案例 - 使用KeyState中的ValueState获取数据中的最大值(实际中直接使用maxBy即可),使用值状态自定义,输入Tuple2<String/单词/, Long/长度/> 输出 Tuple3<String/单词/, Long/长度/, Long/历史最大值/> 类型
  1. map映射
  2. 定义valueState 用于统计当前的 历史最大值
  3. 输出 Tuple3<String/单词/, Long/长度/, Long/历史最大值/>
package cn.itcast.sz22.day03;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * Author itcast
 * Date 2021/5/7 15:58
 * 使用KeyState中的ValueState获取数据中的最大值(实际中直接使用maxBy即可),使用值状态自定义.
 */
public class StateDemo01 {
    public static void main(String[] args) throws Exception {
        //1.env 设置并发度为1
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2.Source 参看课件
        DataStreamSource<Tuple2<String, Long>> tupleDS = env.fromElements(
                Tuple2.of("北京", 1L),
                Tuple2.of("上海", 2L),
                Tuple2.of("北京", 6L),
                Tuple2.of("上海", 8L),
                Tuple2.of("北京", 3L),
                Tuple2.of("上海", 4L)
        );
        //3.Transformation
        //使用KeyState中的ValueState获取流数据中的最大值(实际中直接使用maxBy即可)
        //实现方式1:直接使用maxBy--开发中使用该方式即可
        DataStream<Tuple2<String, Long>> result1 = tupleDS.keyBy(t -> t.f0)
                .maxBy(1);
        //min只会求出最小的那个字段,其他的字段不管
        //minBy会求出最小的那个字段和对应的其他的字段
        //max只会求出最大的那个字段,其他的字段不管
        //maxBy会求出最大的那个字段和对应的其他的字段
        //实现方式2:通过managed state输入的state
        //Tuple2<String/*单词*/, Long/*长度*/> 输出 Tuple3<String/*单词*/, Long/*长度*/, Long/*历史最大值*/>
        SingleOutputStreamOperator<Tuple3<String, Long, Long>> maxCount = tupleDS.keyBy(t -> t.f0)
                .map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String, Long, Long>>() {
                    //保存当前内存中最大的值的state
                    private transient ValueState<Long> currentMaxValue;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        //存储到内存中的数据结构的描述
                        ValueStateDescriptor desc = new ValueStateDescriptor("maxCount", TypeInformation.of(Long.class));
                        currentMaxValue = getRuntimeContext().getState(desc);
                    }
                    @Override
                    public Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception {
                        String city = value.f0;
                        Long currentValue = value.f1;
                        if (currentMaxValue.value() == null || currentMaxValue.value() < currentValue) {
                            currentMaxValue.update(currentValue);
                            return Tuple3.of(city, currentValue, currentMaxValue.value());
                        } else {
                            return Tuple3.of(city, currentValue, currentMaxValue.value());
                        }
                    }
                });
        //3.1.先根据字符串f0分组然后进行 map 操作,将Tuple2<String/*单词*/, Long/*长度*/> 输出 Tuple3<String/*单词*/, Long/*长度*/, Long/*历史最大值*/>
        //-1.定义值类型的状态用来存储最大值
        //3.2.重写 RichMapFunction 的open 方法
        //-2.定义状态描述符
        //-3.从当前上下文获取内存中的状态值
        //3.3.重写 map 方法
        //-4.获取state中历史最大值value和当前元素的最大值并比较
        //-5.如果当前值大或历史值为空更新状态;返回Tuple3元祖结果
        //4.Sink 打印输出
//        result1.printToErr();
        maxCount.print();
        //5.execute 执行环境
        env.execute();
    }
}

operate state

  • 大多数场景就是读取 source ,用到数据结构 ListState
  • 案例 - 使用ListState存储offset模拟Kafka的offset维护
package cn.itcast.sz22.day03;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
/**
 * Author itcast
 * Date 2021/5/7 16:59
 *  使用ListState存储offset模拟Kafka的offset维护
 */
public class OperatorStateDemo01 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //先直接使用下面的代码设置Checkpoint时间间隔和磁盘路径以及代码遇到异常后的重启策略,下午会学
        env.enableCheckpointing(1000);//每隔 1s 执行一次Checkpoint
        //将全局的状态保存到哪里?  hdfs://node1:8020/checkpoint/
        env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
        //当前任务被取消,checkpoint是否被保存下来
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //当前checkpoint 机制 EXACTLY_ONCE
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //固定延迟重启策略: 程序出现异常的时候,重启2次,每次延迟3秒钟重启,超过2次,程序退出
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000));
        //2.Source
        DataStreamSource<String> sourceData = env.addSource(new MyMoniKafkaSource());
        //3.Transformation
        //4.Sink
        sourceData.print();
        //5.execute
        env.execute();
    }
    //1.创建类 MyMoniKafkaSource 继承 RichparallelSourceFunction 并实现 CheckpointedFunction
    public static class MyMoniKafkaSource extends RichParallelSourceFunction<String> implements CheckpointedFunction{
        //1.1. 定义ListState<Long>用于存储 offsetState、offset、flag
        ListState<Long> offsetState;
        Long offset = 0L;
        boolean flag = true;
        //1.2. 重写 initializeState 方法
        //  //创建List状态描述器
        //  //根据状态描述器初始化状态通过context
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            ListStateDescriptor<Long> desc = new ListStateDescriptor<>("offsetState",
                    TypeInformation.of(Long.class));
            offsetState = context.getOperatorStateStore()
                    .getListState(desc);
        }
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            //获取并迭代ListState中的值,如果存在赋值给offset
            Iterable<Long> offsets = offsetState.get();
            if(offsets.iterator().hasNext()){
                offset = offsets.iterator().next();
            }
            //  //while(flag)
            while(flag){
                //将处理的offset累加1、获取当前子任务的Index
                offset += 1;
                //ctx收集id和offset("分区:"+id+"消费到offset的位置为:"+offset
                int id = getRuntimeContext().getIndexOfThisSubtask();
                ctx.collect("分区:"+id+"消费到offset的位置为:"+offset);
                Thread.sleep(2000);
                //)并输出
                //  //休眠2秒,此时保存state到checkpoint
                //  //模拟异常 每5秒钟抛出异常,看后续offset是否还能恢复
                if(offset%5==0){
                    System.out.println("当前程序出现bug");
                    throw new Exception("当前程序出现bug");
                }
            }
        }
        @Override
        public void cancel() {
            flag  = false;
        }
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            //清空内存offsetState中存储的offset
            offsetState.clear();
            //添加offset到state中
            offsetState.add(offset);
        }
    }
}
set的位置为:"+offset
int id = getRuntimeContext().getIndexOfThisSubtask();
ctx.collect(“分区:”+id+“消费到offset的位置为:”+offset);
Thread.sleep(2000);
//)并输出
// //休眠2秒,此时保存state到checkpoint
// //模拟异常 每5秒钟抛出异常,看后续offset是否还能恢复
if(offset%5==0){
System.out.println(“当前程序出现bug”);
throw new Exception(“当前程序出现bug”);
}
}
}
@Override
    public void cancel() {
        flag  = false;
    }
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        //清空内存offsetState中存储的offset
        offsetState.clear();
        //添加offset到state中
        offsetState.add(offset);
    }
}

}


         


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
10天前
|
API
车牌号归属地查询免费API接口教程
本接口用于根据车牌号查询社会车辆的归属地,不支持军车、使馆等特殊车牌。请求地址为 `https://cn.apihz.cn/api/other/chepai.php`,支持 POST 和 GET 请求。请求参数包括 `id`、`key` 和 `words`,返回数据包含车牌归属地信息。示例请求:`https://cn.apihz.cn/api/other/chepai.php?id=88888888&key=88888888&words=川B1234`。
46 21
|
9天前
|
API
获取网页重定向地址免费API接口教程
该API用于获取网页重定向跳转后的最终地址。请求地址为`https://cn.apihz.cn/api/wangzhan/tiaozhuan.php`,支持POST或GET方式。请求参数包括`id`、`key`和`url`,返回数据包含状态码`code`和最终URL`url`。示例返回:`{&quot;code&quot;:200,&quot;url&quot;:&quot;https://www.baidu.com/&quot;}`。
51 29
|
14天前
|
API
将秒数转换为时间免费API接口教程
该API用于将指定秒数转换为年、日、时、分、秒。支持指定转换类型。请求地址为 `https://cn.apihz.cn/api/time/stime.php`,需提供ID、密钥、类型和秒数参数。返回结果包含转换后的年、日、时、分、秒等信息。示例请求:`https://cn.apihz.cn/api/time/stime.php?id=88888888&key=88888888&type=1&s=123456`。更多详情见 [文档](https://www.apihz.cn/api/timestime.html)。
将秒数转换为时间免费API接口教程
|
5天前
|
API
天气预报1天-中国气象局-地址查询版免费API接口教程
此接口提供中国气象局官方的当日天气信息,支持POST和GET请求,需提供用户ID、KEY、省份及具体地点。返回数据包括状态码、消息、天气详情等。示例中使用的ID与KEY为公共测试用,建议使用个人ID与KEY以享受更高调用频次。
|
10天前
|
网络协议 API
检测指定TCP端口开放状态免费API接口教程
该API用于检测目标主机指定TCP端口是否开放,适用于检测连通状态等场景。支持指定大陆、美国、香港等检测节点。请求地址为 `https://cn.apihz.cn/api/wangzhan/port.php`,支持POST和GET请求方式。请求参数包括 `id`、`key`、`type`、`host` 和 `port`。返回参数包含检测结果和状态码。示例请求:`https://cn.apihz.cn/api/wangzhan/port.php?id=88888888&key=88888888&type=1&host=49.234.56.78&port=80`。
|
8天前
|
API 数据安全/隐私保护
抖音视频,图集无水印直链解析免费API接口教程
该接口用于解析抖音视频和图集的无水印直链地址。请求地址为 `https://cn.apihz.cn/api/fun/douyin.php`,支持POST或GET请求。请求参数包括用户ID、用户KEY和视频或图集地址。返回参数包括状态码、信息提示、作者昵称、标题、视频地址、封面、图集和类型。示例请求和返回数据详见文档。
|
13天前
|
API
图片压缩+格式转换免费API接口教程
这是一个免费的图片压缩和格式转换API接口,支持GET和POST请求。请求地址为 `https://cn.apihz.cn/api/img/yasuo.php`,需提供 `id`、`key`、`img` 等参数。返回数据包含处理后的图片URL和其他相关信息。更多详情请参考:https://www.apihz.cn/api/imgyasuo.html
|
13天前
|
API
天气预报-腾讯天气-7天-IP查询版免费API接口教程
根据IP地址自动查询该IP归属地7天天气预报的腾讯天气API。请求地址为`https://cn.apihz.cn/api/tianqi/tengxunip.php`,支持GET和POST请求。需提供ID、Key和IP地址作为参数。返回数据包含天气预报信息。
|
12天前
|
前端开发 JavaScript API
取网页纯文本内容免费API接口教程
该API用于获取指定网页的纯文本内容,去除HTML标签、CSS和JS等元素。支持POST和GET请求,需提供ID、Key、URL等参数。请求示例:https://cn.apihz.cn/api/wangzhan/getyuan.php?id=88888888&key=88888888&url=www.apihz.cn&dy=1。返回纯文本数据。
|
1月前
|
API 微服务
Traefik 微服务 API 网关教程(全)
Traefik 微服务 API 网关教程(全)

热门文章

最新文章

下一篇
无影云桌面