Flink状态管理与Checkpoint实战——模拟电商订单计算过程中宕机的场景,探索宕机恢复时如何精准继续计算订单(下)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink状态管理与Checkpoint实战——模拟电商订单计算过程中宕机的场景,探索宕机恢复时如何精准继续计算订单
public class VideoOrderSourceV2 extends RichParallelSourceFunction<VideoOrder> {
    private volatile Boolean flag = true;
    private Random random = new Random();
    private static List<VideoOrder> list = new ArrayList<>();
    static {
        list.add(new VideoOrder("","java",10,0,null));
        list.add(new VideoOrder("","spring boot",15,0,null));
    }
    /**
     * run 方法调用前 用于初始化连接
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        System.out.println("-----open-----");
    }
    /**
     * 用于清理之前
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        System.out.println("-----close-----");
    }
    /**
     * 产生数据的逻辑
     * @param ctx
     * @throws Exception
     */
    @Override
    public void run(SourceContext<VideoOrder> ctx) throws Exception {
        while (flag){
            Thread.sleep(1000);
            String id = UUID.randomUUID().toString().substring(30);
            int userId = random.nextInt(10);
            int videoNum = random.nextInt(list.size());
            VideoOrder videoOrder = list.get(videoNum);
            videoOrder.setUserId(userId);
            videoOrder.setCreateTime(new Date());
            videoOrder.setTradeNo(id);
            System.out.println("产生:"+videoOrder.getTitle()+",价格:"+videoOrder.getMoney()+", 时间:"+ TimeUtil.format(videoOrder.getCreateTime()));
            ctx.collect(videoOrder);
        }
    }
    /**
     * 控制任务取消
     */
    @Override
    public void cancel() {
        flag = false;
    }
}

image.png

public class FlinkKeyByReduceApp {
    /**
     * source
     * transformation
     * sink
     *
     * @param args
     */
    public static void main(String[] args) throws Exception {
        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.enableCheckpointing(5000);
 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //这是我本机的ip地址       
        env.getCheckpointConfig().setCheckpointStorage(new                                           FileSystemCheckpointStorage("hdfs://192.168.192.100:8020/checkpoint"));
       DataStreamSource<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
        KeyedStream<VideoOrder, String> videoOrderStringKeyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() {
            @Override
            public String getKey(VideoOrder value) throws Exception {
                return value.getTitle();
            }
        });
        SingleOutputStreamOperator<VideoOrder> reduce = videoOrderStringKeyedStream.reduce(new ReduceFunction<VideoOrder>() {
            @Override
            public VideoOrder reduce(VideoOrder value1, VideoOrder value2) throws Exception {
                VideoOrder videoOrder = new VideoOrder();
                videoOrder.setTitle(value1.getTitle());
                videoOrder.setMoney(value1.getMoney() + value2.getMoney());
                return videoOrder;
            }
        });
        reduce.print();
        env.execute("job");
    }
}

在本地测试运行结果,可以看到数据根据订单分组不断的进行滚动计算image.png进入服务器的HDFS查看检查点数据是否存在image.png之后将应用进行打包,上传到服务器进行测试,可以使用Flink的Web页面进行手动提交jar包运行,也可以使用命令进行提交,之后可以看到程序运行过程中的相关日志输出

image.pngimage.png

-s : 指定检查点的元数据的位置,这个位置记录着宕机前程序的计算状态
./bin/flink run -s /checkpoint/id号/chk-23/_metadata -c net.xxx.xxx.FlinkKeyByReduceApp -p 3 /root/xdclass-flink.jar 

image.pngimage.png可以看到出现一次close的时候,代表我们的程序以及停止,服务器已经宕机,这个时候订单的计算结果如上图的红色方框。在我们运行了上面那条命令后再次查看日志的数据,从open开始可以看到这次就不是从订单最初的状态开始进行的了,而是从上一次宕机前计算的结果,继续往下计算,到这里Checkponit的实战应用测试就完成了。


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
SQL Kubernetes 调度
Flink 流批一体在模型特征场景的使用
本文整理自B站资深开发工程师张杨老师在 Flink Forward Asia 2023 中 AI 特征工程专场中的分享。
77120 4
Flink 流批一体在模型特征场景的使用
|
5月前
|
存储 消息中间件 缓存
读Flink源码谈设计:有效管理内存之道
在最初接触到Flink时,是来自于业界里一些头部玩家的分享——大家会用其来处理海量数据。在这种场景下,`如何避免JVM GC带来StopTheWorld带来的副作用`这样的问题一直盘绕在我心头。直到用了Flink以后,阅读了相关的源码(以1.14.0为基准),终于有了一些答案。在这篇文章里也是会分享给大家。
541 1
|
4月前
|
存储 Cloud Native 数据处理
Flink 2.0 状态管理存算分离架构演进
本文整理自阿里云智能 Flink 存储引擎团队负责人梅源在 Flink Forward Asia 2023 的分享,梅源结合阿里内部的实践,分享了状态管理的演进和 Flink 2.0 存算分离架构的选型。
852 1
Flink 2.0 状态管理存算分离架构演进
|
11天前
|
存储 传感器 消息中间件
[尚硅谷 flink] 状态管理 笔记
[尚硅谷 flink] 状态管理 笔记
|
11天前
|
传感器 存储 缓存
[尚硅谷flink学习笔记] 实战案例TopN 问题
这段内容是关于如何使用Apache Flink解决实时统计水位传感器数据中,在一定时间窗口内出现次数最多的水位问题,即&quot;Top N&quot;问题。首先,介绍了一个使用滑动窗口的简单实现,通过收集传感器数据,按照水位计数,然后排序并输出前两名。接着,提出了全窗口和优化方案,其中优化包括按键分区(按水位vc分组)、开窗操作(增量聚合计算count)和过程函数处理(聚合并排序输出Top N结果)。最后,给出了一个使用`KeyedProcessFunction`进行优化的示例代码,通过按键by窗口结束时间,确保每个窗口的所有数据到达后再进行处理,提高了效率。
|
3月前
|
消息中间件 SQL NoSQL
Flink数据源问题之自定义如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
31 3
|
4月前
|
消息中间件 存储 NoSQL
Flink几道经典编程场景
Flink几道经典编程场景
|
4月前
|
流计算
Flink CDC里假设我做widow计算使用ProcessTime计算
【1月更文挑战第23天】【1月更文挑战第113篇】Flink CDC里假设我做widow计算使用ProcessTime计算
199 45
|
4月前
|
SQL 关系型数据库 MySQL
Apache Flink 和 Paimon 在自如数据集成场景中的使用
Apache Flink 和 Paimon 在自如数据集成场景中的使用
318 0
|
4月前
|
存储 Java API
Flink中的状态管理是什么?请解释其作用和常用方法。
Flink中的状态管理是什么?请解释其作用和常用方法。
30 0