2021年最新最全Flink系列教程__Flink高级API(四)

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 2021年最新最全Flink系列教程__Flink高级API(四)

day04_Flink高级API

今日目标

  • Flink的四大基石
  • Flink窗口Window操作
  • Flink时间 - Time
  • Flink水印 - Watermark机制
  • Flink的state状态管理-keyed state 和 operator state

Flink的四大基石

  • Checkpoint - 检查点, 分布式一致性,解决数据丢失,故障恢复数据, 存储的是全局的状态, 持久化HDFS分布式文件系统中
  • State - 状态,分为Managed state(托管状态) 和 Rawed state (原始状态); 数据结构的角度来说 ValueState、ListState、MapState,BroadcastState
  • Time - 时间 , EventTime事件时间、Ingestion摄取时间、Process处理时间
  • Window - 窗口,时间窗口 和 计数窗口, TimeWindow 、 countwindow、 sessionwindow

Window操作

  • 为什么需要 Window - 窗口
    数据是动态的, 无界的, 需要窗口划定范围,将无界数据转换成有界、静态的数据进行计算。

Window分类

  • time - 时间进行分类
  • 时间的窗口级别, 一天,一小时,一分钟
  • 用的比较多 滚动窗口 - tumbling window 和 滑动窗口 - sliding window
  • 滚动窗口 ,窗口时间和滑动时间一样就是滚动时间
  • 滑动窗口, 滑动的时间小于窗口的时间;
  • 会话窗口 - session windows
  • count - 计数进行分类
  • 滚动计数窗口
  • 滑动计数窗口

如何使用

windows的案例

时间窗口需求
  • 每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量–基于时间的滚动窗口
  • 每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量–基于时间的滑动窗口
package cn.itcast.flink.basestone;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
 * Author itcast
 * Date 2021/6/18 15:00
 * 开发步骤
 * 1. 将 字符串 9,3 转换成 CartInfo
 * 2. 使用 滚动窗口, 滑动窗口
 * 3. 分组和聚合
 * 4. 打印输出
 * 5. 执行环境
 */
public class WindowDemo01 {
    public static void main(String[] args) throws Exception {
        //1.env 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2.读取 socket 数据源
        DataStreamSource<String> source = env.socketTextStream("192.168.88.161", 9999);
        //3.将9,3转为CartInfo(9,3)
        DataStream<CartInfo> mapDS = source.map(new MapFunction<String, CartInfo>() {
            @Override
            public CartInfo map(String value) throws Exception {
                String[] kv = value.split(",");
                return new CartInfo(kv[0], Integer.parseInt(kv[1]));
            }
        });
        //4.按照 sensorId 分组并划分滚动窗口为5秒,在窗口上求和
        // Tumbling(滚动)Processing(处理)TimeWindows(时间窗口)
        //需求1:每5秒钟统计一次,最近5秒钟内,各个路口/信号灯通过红绿灯汽车的数量
        SingleOutputStreamOperator<CartInfo> result1 = mapDS.keyBy(t -> t.sensorId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum("count");
        //需求2:每5秒钟统计一次,最近10秒钟内,各个路口/信号灯通过红绿灯汽车的数量
        SingleOutputStreamOperator<CartInfo> result2 = mapDS.keyBy(t -> t.sensorId)
                .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
                .sum("count");
        //5.打印输出
        //result1.print();
        result2.print();
        //6.execute
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo {
        private String sensorId;//信号灯id
        private Integer count;//通过该信号灯的车的数量
    }
}
计数窗口需求
  • 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计–基于数量的滚动窗口
  • 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计–基于数量的滑动窗口
package cn.itcast.flink.basestone;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * Author itcast
 * Date 2021/6/18 15:46
 * Desc TODO
 */
public class CountWindowDemo01 {
    public static void main(String[] args) throws Exception {
        //1.env 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2.读取 socket 数据源
        DataStreamSource<String> source = env.socketTextStream("192.168.88.161", 9999);
        //3.将9,3转为CartInfo(9,3)
        DataStream<WindowDemo01.CartInfo> mapDS = source.map(new MapFunction<String, WindowDemo01.CartInfo>() {
            @Override
            public WindowDemo01.CartInfo map(String value) throws Exception {
                String[] kv = value.split(",");
                return new WindowDemo01.CartInfo(kv[0], Integer.parseInt(kv[1]));
            }
        });
        // * 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口
        //        //countWindow(long size, long slide)
        SingleOutputStreamOperator<WindowDemo01.CartInfo> result1 = mapDS.keyBy(t -> t.getSensorId())
                .countWindow(5)
                .sum("count");
        // * 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口
        SingleOutputStreamOperator<WindowDemo01.CartInfo> result2 = mapDS.keyBy(t -> t.getSensorId())
                .countWindow(5, 3)
                .sum("count");
        //打印输出
        //result1.print();
        result2.print();
        //执行环境
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo {
        private String sensorId;//信号灯id
        private Integer count;//通过该信号灯的车的数量
    }
}
package cn.itcast.flink.basestone;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * Author itcast
 * Date 2021/6/18 15:46
 * Desc TODO
 */
public class CountWindowDemo01 {
    public static void main(String[] args) throws Exception {
        //1.env 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2.读取 socket 数据源
        DataStreamSource<String> source = env.socketTextStream("192.168.88.161", 9999);
        //3.将9,3转为CartInfo(9,3)
        DataStream<WindowDemo01.CartInfo> mapDS = source.map(new MapFunction<String, WindowDemo01.CartInfo>() {
            @Override
            public WindowDemo01.CartInfo map(String value) throws Exception {
                String[] kv = value.split(",");
                return new WindowDemo01.CartInfo(kv[0], Integer.parseInt(kv[1]));
            }
        });
        // * 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口
        //        //countWindow(long size, long slide)
        SingleOutputStreamOperator<WindowDemo01.CartInfo> result1 = mapDS.keyBy(t -> t.getSensorId())
                .countWindow(5)
                .sum("count");
        // * 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口
        SingleOutputStreamOperator<WindowDemo01.CartInfo> result2 = mapDS.keyBy(t -> t.getSensorId())
                .countWindow(5, 3)
                .sum("count");
        //打印输出
        //result1.print();
        result2.print();
        //执行环境
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo {
        private String sensorId;//信号灯id
        private Integer count;//通过该信号灯的车的数量
    }
}

Flink - Time 和 watermark

Time - 时间

水印机制 - watermark

  • 主要解决数据延迟问题
  • 水印(时间戳) = 事件时间 - 允许最大的延时时间
  • 窗口触发条件
    水印时间 >= 窗口的结束时间 触发计算

需求

有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)

要求每隔5s, 计算5秒内,每个用户的订单总金额

并添加Watermark来解决一定程度上的数据延迟和数据乱序(最多延时 3 秒)问题。

package cn.itcast.flink.basestone;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
import java.util.Random;
import java.util.UUID;
/**
 * Author itcast
 * Date 2021/6/18 16:54
 * Desc TODO
 */
public class WatermarkDemo01 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置属性 ProcessingTime  , 新版本 默认设置 EventTime
        //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //2.Source 创建 Order 类 orderId:String userId:Integer money:Integer eventTime:Long
        DataStreamSource<Order> source = env.addSource(new SourceFunction<Order>() {
            boolean flag = true;
            Random rm = new Random();
            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                while (flag) {
                    ctx.collect(new Order(
                            UUID.randomUUID().toString(),
                            rm.nextInt(3),
                            rm.nextInt(101),
                            //模拟生成 Order 数据  事件时间=当前时间-5秒钟随机*1000
                            System.currentTimeMillis() - rm.nextInt(5) * 1000
                    ));
                    Thread.sleep(1000);
                }
            }
            @Override
            public void cancel() {
                flag = false;
            }
        });
        //3.Transformation
        //-告诉Flink要基于事件时间来计算!
        //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//新版本默认就是EventTime
        DataStream<Order> result = source.assignTimestampsAndWatermarks(
                WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner((element, recordTimestamp) -> element.eventTime)
        )
                //-分配水印机制,最多延迟3秒,告诉Flink数据中的哪一列是事件时间,因为Watermark = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
                //代码走到这里,就已经被添加上Watermark了!接下来就可以进行窗口计算了
                //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
                .keyBy(t -> t.userId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sum("money");
        //4.Sink
        result.print();
        //5.execute
        env.execute();
    }
    //创建订单类
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order{
        private String orderId;
        private Integer userId;
        private Integer money;
        private Long eventTime;
    }
}
  • 自定义重写接口实现水印机制
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
import java.util.Random;
import java.util.UUID;
/**
 * Author itcast
 * Date 2021/6/18 16:54
 * Desc TODO
 */
public class WatermarkDemo01 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置属性 ProcessingTime  , 新版本 默认设置 EventTime
        //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //2.Source 创建 Order 类 orderId:String userId:Integer money:Integer eventTime:Long
        DataStreamSource<Order> source = env.addSource(new SourceFunction<Order>() {
            boolean flag = true;
            Random rm = new Random();
            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                while (flag) {
                    ctx.collect(new Order(
                            UUID.randomUUID().toString(),
                            rm.nextInt(3),
                            rm.nextInt(101),
                            //模拟生成 Order 数据  事件时间=当前时间-5秒钟随机*1000
                            System.currentTimeMillis() - rm.nextInt(5) * 1000
                    ));
                    Thread.sleep(1000);
                }
            }
            @Override
            public void cancel() {
                flag = false;
            }
        });
        //3.Transformation
        //-告诉Flink要基于事件时间来计算!
        //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//新版本默认就是EventTime
        DataStream<Order> result = source.assignTimestampsAndWatermarks(
                WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner((element, recordTimestamp) -> element.eventTime)
        )
                //-分配水印机制,最多延迟3秒,告诉Flink数据中的哪一列是事件时间,因为Watermark = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
                //代码走到这里,就已经被添加上Watermark了!接下来就可以进行窗口计算了
                //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
                .keyBy(t -> t.userId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sum("money");
        //4.Sink
        result.print();
        //5.execute
        env.execute();
    }
    //创建订单类
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order{
        private String orderId;
        private Integer userId;
        private Integer money;
        private Long eventTime;
    }
}
  • 一秒钟生成一条订单数据, 根据用户进行统计总金额,每5秒钟计算5秒中窗口, 允许最大的延时时间是 3秒; 如果超过了 3秒时间,再来的数据存储到 outputTag ,打印输出正常的数据和严重迟到的数据
package cn.itcast.flink.basestone;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
import java.util.Random;
import java.util.UUID;
/**
 * Author itcast
 * Date 2021/6/18 17:47
 * Desc TODO
 */
public class LatenessWatermarkDemo {
    public static void main(String[] args) throws Exception {
        //此时间的将严重延迟的数据保存到 outputTag 中。
        /// 创建 outputTag 用于存储严重延迟的数据 数据类型为 Order
        //4.Sink 打印正常和严重延迟的数据
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置属性 ProcessingTime  , 新版本 默认设置 EventTime
        //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //2.Source 创建 Order 类 orderId:String userId:Integer money:Integer eventTime:Long
        DataStreamSource<Order> source = env.addSource(new SourceFunction<Order>() {
            boolean flag = true;
            Random rm = new Random();
            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                while (flag) {
                    ctx.collect(new Order(
                            UUID.randomUUID().toString(),
                            rm.nextInt(3),
                            rm.nextInt(101),
                            //模拟生成 Order 数据  事件时间=当前时间-5秒钟随机*1000
                            System.currentTimeMillis() - rm.nextInt(10) * 1000
                    ));
                    //Thread.sleep(1000);
                }
            }
            @Override
            public void cancel() {
                flag = false;
            }
        });
        //定义一个允许最大的延时存储的数据 tag
        OutputTag<Order> seriousLateOrder = new OutputTag<>("SeriousLateOrder", TypeInformation.of(Order.class));
        //3.Transformation
        //-告诉Flink要基于事件时间来计算!
        //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//新版本默认就是EventTime
        SingleOutputStreamOperator<Order> result = source.assignTimestampsAndWatermarks(
                WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner((element, recordTimestamp) -> element.eventTime)
        )
                //-分配水印机制,最多延迟3秒,告诉Flink数据中的哪一列是事件时间,因为Watermark = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
                //代码走到这里,就已经被添加上Watermark了!接下来就可以进行窗口计算了
                //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
                .keyBy(t -> t.userId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .allowedLateness(Time.seconds(3))
                .sideOutputLateData(seriousLateOrder)
                .sum("money");
        //4.Sink
        result.print("正常数据和不严重迟到的数据");
        result.getSideOutput(seriousLateOrder).print();
        //5.execute
        env.execute();
    }
    //创建订单类
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order{
        private String orderId;
        private Integer userId;
        private Integer money;
        private Long eventTime;
    }
}

Flink状态管理

  • 状态就是基于 key 或者 算子 operator 的中间结果
  • Flink state 分为两种 : Managed state - 托管状态 , Raw state - 原始状态
  • Managed state 分为 两种:
  1. keyed state 基于 key 上的状态
    支持的数据结构 valueState listState mapState broadcastState
  2. operator state 基于操作的状态
    字节数组, ListState

Flink keyed state 案例

  • 需求
    使用KeyedState中的ValueState获取数据中的最大值(实际中直接使用maxBy即可),使用值状态自定义,
    <hello,1>
    <hello,3>
    <hello,2>
    输入Tuple2<String/单词/, Long/长度/> 输出 Tuple3<String/单词/, Long/长度/, Long/历史最大值/> 类型
  • 开发
package cn.itcast.flink.state;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * Author itcast
 * Date 2021/6/21 8:34
 * Desc TODO
 */
public class KeyedStateDemo {
    public static void main(String[] args) throws Exception {
        //1.env 设置并发度为1
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2.Source 参看课件 <城市,次数> => <城市,最大次数>
        DataStreamSource<Tuple2<String, Long>> tupleDS = env.fromElements(
                Tuple2.of("北京", 1L),
                Tuple2.of("上海", 2L),
                Tuple2.of("北京", 6L),
                Tuple2.of("上海", 8L),
                Tuple2.of("北京", 3L),
                Tuple2.of("上海", 4L)
        );
        //3.Transformation
        //使用KeyState中的ValueState获取流数据中的最大值(实际中直接使用maxBy即可)
        //实现方式1:直接使用maxBy--开发中使用该方式即可
        SingleOutputStreamOperator<Tuple2<String, Long>> result1 = tupleDS.keyBy(t -> t.f0)
                //min只会求出最小的那个字段,其他的字段不管
                //minBy会求出最小的那个字段和对应的其他的字段
                //max只会求出最大的那个字段,其他的字段不管
                //maxBy会求出最大的那个字段和对应的其他的字段
                .maxBy(1);
        //实现方式2:通过managed state输入的state
        //3.1.先根据字符串f0分组然后进行 map 操作,将Tuple2<String/*城市*/, Long/*次数*/> 输出 Tuple3<String/*城市*/, Long/*次数*/, Long/*历史最大值*/>
        //
        SingleOutputStreamOperator<Tuple3<String, Long, Long>> result2 = tupleDS
                .keyBy(t->t.f0)
                .map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String/*城市*/, Long/*次数*/, Long/*历史最大值*/>>() {
            ValueState<Long> maxState = null;
            //-1.定义值类型的状态用来存储最大值
            //3.2.重写 RichMapFunction 的open 方法
            @Override
            public void open(Configuration parameters) throws Exception {
                //-2.定义状态描述符
                //-3.从当前上下文获取内存中的状态值
                ValueStateDescriptor maxStateDesc = new ValueStateDescriptor("maxState", Long.class);
                maxState = getRuntimeContext().getState(maxStateDesc);
            }
            //3.3.重写 map 方法
            //-4.获取state中历史最大值value和当前元素的最大值并比较
            @Override
            public Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception {
                //内存中state的存储的最大值
                Long maxValue = maxState.value();
                //当前的值
                Long curValue = value.f1;
                if (maxValue == null || curValue > maxValue) {
                    maxState.update(curValue);
                    return Tuple3.of(value.f0, value.f1, curValue);
                } else {
                    return Tuple3.of(value.f0, value.f1, maxValue);
                }
            }
        });
        //-5.如果当前值大或历史值为空更新状态;返回Tuple3元祖结果
        //4.Sink 打印输出
        //result1.print();
        result2.print();
        //5.execute 执行环境
        env.execute();
    }
}

Flink operator state 案例

  • 需求
    使用ListState存储offset模拟消费Kafka的offset维护
  • 实现
package cn.itcast.flink.state;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.Iterator;
/**
 * Author itcast
 * Date 2021/6/21 9:18
 * Desc TODO
 */
public class OperatorStateDemo {
    public static void main(String[] args) throws Exception {
        //1.创建流环境,便于观察设置并行度为 1
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2.开启checkpoint ,并将状态保存到 file:///D:/chk ,先开启checkpoint ,state管理
        env.enableCheckpointing(1000);
        env.setStateBackend(new FsStateBackend("file:///D:/chk"));
        //3.设置checkpoint的配置 外部chk,仅一次语义等
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //4.开启重启策略 3秒钟尝试重启3次
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000));
        //5.添加数据源比如 MyMonitorKafkaSource , 实例化创建 MyMonitorKafkaSource
        DataStreamSource<String> source = env.addSource(new MyMonitorKafkaSource());
        //6.打印输出
        source.print();
        //7.执行
        env.execute();
    }
    //创建 MyMonitorKafkaSource 继承 RichParallelSourceFunction<String> 并实现 CheckpointedFunction
    public static class MyMonitorKafkaSource extends RichParallelSourceFunction<String>
    implements CheckpointedFunction{
        //重写initializeState方法 ListStateDescriptor 状态描述和通过context获取 offsetState
        ListState<Long> offsetState = null;
        boolean flag = true;
        Long offset = 0L;
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            ListStateDescriptor<Long> offsetStateDesc = new ListStateDescriptor<>("offsetState", Long.class);
            offsetState = context.getOperatorStateStore().getListState(offsetStateDesc);
        }
        //重写run方法 读取出 offset 并 循环读取offset+=1,拿到执行的核心编号,输出(核编号和offset),一秒一条,每5条模拟一个异常
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            Iterator<Long> iterator = offsetState.get().iterator();
            if(iterator.hasNext()){
                offset = iterator.next();
            }
            while(flag){
                offset = offset + 1;
                //处理 CPU 核心Index
                int idx = getRuntimeContext().getIndexOfThisSubtask();
                System.out.println("index:"+idx+" offset:"+offset);
                Thread.sleep(1000);
                if(offset % 5 ==0){
                    System.out.println("当前程序出错了....");
                    throw new Exception("程序出BUG...");
                }
            }
        }
        //重写cancel方法
        @Override
        public void cancel() {
            flag = false;
        }
        //重写snapshotState方法 , 清空 offsetState ,并将最新的offset添加进去
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            offsetState.clear();
            offsetState.add(offset);
        }
    }
}
IndexOfThisSubtask();
System.out.println(“index:”+idx+" offset:"+offset);
Thread.sleep(1000);
if(offset % 5 ==0){
System.out.println(“当前程序出错了…”);
throw new Exception(“程序出BUG…”);
}
}
}
//重写cancel方法
@Override
public void cancel() {
flag = false;
}
//重写snapshotState方法 , 清空 offsetState ,并将最新的offset添加进去
      @Override
      public void snapshotState(FunctionSnapshotContext context) throws Exception {
          offsetState.clear();
          offsetState.add(offset);
      }
  }

}


         


相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
12天前
|
人工智能 数据可视化 测试技术
Postman 性能测试教程:快速上手 API 压测
本文介绍API上线后因高频调用导致服务器告警,通过Postman与Apifox进行压力测试排查性能瓶颈。对比两款工具在批量请求、断言验证、可视化报告等方面的优劣,探讨API性能优化策略及行业未来发展方向。
Postman 性能测试教程:快速上手 API 压测
|
3月前
|
JSON 监控 API
在线网络PING接口检测服务器连通状态免费API教程
接口盒子提供免费PING检测API,可测试域名或IP的连通性与响应速度,支持指定地域节点,适用于服务器运维和网络监控。
|
3月前
|
JSON API PHP
通用图片搜索API:百度源免费接口教程
本文介绍一款基于百度图片搜索的免费API接口,由接口盒子提供。支持关键词搜索,具备详细请求与返回参数说明,并提供PHP及Python调用示例。开发者可快速集成实现图片搜索功能,适用于内容聚合、素材库建设等场景。
|
3月前
|
JSON 机器人 API
随机昵称网名API接口教程:轻松获取百万创意昵称库
接口盒子提供随机昵称网名API,拥有百万级中文昵称库,支持聊天机器人、游戏角色等场景的昵称生成。提供详细调用指南及多语言示例代码,助力开发者高效集成。
|
25天前
|
人工智能 API 开发者
图文教程:阿里云百炼API-KEY获取方法,亲测全流程
本文详细介绍了如何获取阿里云百炼API-KEY,包含完整流程与截图指引。需先开通百炼平台及大模型服务,再通过控制台创建并复制API-KEY。目前平台提供千万tokens免费额度,适合开发者快速上手使用。
456 5
|
3月前
|
JSON API PHP
天气预报免费API接口【地址查询版】使用教程
本文介绍了如何使用中国气象局官方数据提供的免费天气预报API接口,通过省份和地点查询指定地区当日天气信息。该接口由接口盒子支持,提供JSON格式数据、GET/POST请求方式,并需注册获取用户ID和KEY进行身份验证。
1744 2
|
3月前
|
JSON API PHP
ICP备案查询免费API接口使用教程
本文介绍如何通过接口盒子提供的免费API接口查询域名ICP备案信息,包含请求地址、参数说明及PHP和Python调用示例,适用于开发者快速集成备案查询功能。
|
3月前
|
存储 JSON API
文本存储免费API接口教程
接口盒子提供免费文本存储服务,支持1000条记录,每条最多5000字符,适用于公告、日志、配置等场景,支持修改与读取。
|
3月前
|
数据采集 JSON 监控
获取网页状态码(可指定地域)免费API接口教程
本文介绍如何使用接口盒子的免费API获取网页状态码,支持国内、香港、美国等不同地域访问节点。内容包括接口参数、调用方法及示例,适用于网站监控、链接检查等场景。
|
3月前
|
JSON 物联网 API
天气预报免费API接口【IP查询版】使用教程
IP查询天气API是一款免费实用的接口,可根据IP地址自动获取所在地天气预报,支持自定义IP查询。核心功能包括自动识别请求IP、全国IP天气查询,数据源自中国气象局,无日调用上限。提供详细的返回参数及多语言示例代码,适用于网站、APP、物联网设备等应用场景。

热门文章

最新文章