【Flink-容错API】重启策略-CheckPoint/StateBackend

简介: 【Flink-容错API】重启策略-CheckPoint/StateBackend

一、Flink容错


1.1 State状态


Flink实时计算为了保证计算过程中,出现异常可以容错,就要中间结果的计算结果存储起来,这些中间数据就叫做State。

State时多类型的,默认是保存在JobManger的内存中,也可以保存在TaskManager的本地文件中,也可以保存在本地文件系统或者HDFS这样的分布式文件系统中。


1.2 StateBackend


用来保存State的存储后端就叫做StateBackend,默认是保存在JobManger的内存中,也可以保存在TaskManager的本地文件中,也可以保存在本地文件系统或者HDFS这样的分布式文件系统中。

程序:

public class StateBackend01 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        /**
         * 只有开启了checkpoint,5s才会有重启策略,固定时间,无限重启
         * 默认把中间结果保存于JobMananger的内存
         */
        env.enableCheckpointing(5000);
        //自定义重启固定次数,和重启时间
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000));
        //本地目录:设置状态存储的后端,只是当前的job,建议在配置文件中全局配置
        //env.setStateBackend(new FsStateBackend("file://D:\\APP\\IDEA\\workplace\\FlinkTurbineFaultDiagnosis\\checkpoint"));
        //HFDS:存储chenckpoint
        System.setProperty("HADOOP_USER_NAME", "root");
        env.setStateBackend(new FsStateBackend("hdfs://hadoop1:9000/checkpoint01"));
        /**
         * 程序异常退出,或者人为取消,不删除checkpoint目录数据
         */
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        DataStreamSource<String> lines = env.socketTextStream("192.168.52.200", 8888);
        SingleOutputStreamOperator<String> wangyining = lines.map(new MapFunction<String, String>() {
            @Override
            public String map(String line) throws Exception {
                if (line.startsWith("wangyining")) {
                    throw new RuntimeException("老王的程序挂了!");
                }
                return line.toUpperCase();
            }
        });
        wangyining.print();
        env.execute();
    }
}

1.3 CheckPointing


Flink为了实时容错,可以量中间结果定期的保存期起来,这种定期触发保存中间结果的机制叫做CheckPointing. CheckPointing 是周期执行的.具体的过程是JobManager定期的向TaskManager中的SubTask发送RPC消息,SubTask 将其计算的State保存StateBackEnd 中,并晌JobManager相应Checkpoint是否成功。如果程序出现异常或重启TaskManager 中的SubTask可以从上-一次成功的CheckPointing的State恢复。

image.png

1.4 重启策略


Flink实时计算程序,为了容错,需要开启CheckPointing,- - 旦开启CheckPointing,如果没有重启策略,默认的重启策略是无限重启,可以也可以设置其他重启策略,如:重启固定次数且可以延迟执行的策略。


public class RestartStrategies01 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        /**
         * 只有开启了checkpoint,如 5s 才会有重启策略,固定时间,无限重启
         * 默认把中间结果保存于JobMananger的内存
         */
        env.enableCheckpointing(5000);
        //自定义重启固定次数,和重启时间
        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000));
        DataStreamSource<String> lines = env.socketTextStream("192.168.52.200", 8888);
        SingleOutputStreamOperator<String> wangyining = lines.map(new MapFunction<String, String>() {
            @Override
            public String map(String line) throws Exception {
                if (line.startsWith("wangyining")) {
                    throw new RuntimeException("老王的程序挂了!");
                }
                return line.toUpperCase();
            }
        });
        wangyining.print();
        env.execute();
    }
}

结果:

20200923160421740.png

 

可以看到,程序不会挂掉。

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
5月前
|
人工智能 搜索推荐 算法
唯品会 API 开启唯品会店铺精准营销新策略
在竞争激烈的电商市场中,唯品会作为领先的折扣平台,通过API技术助力商家实现精准营销。API为商家提供用户行为、商品与交易数据,支撑用户画像、个性化推荐、动态定价、跨渠道营销与库存预测等策略,提升转化率与用户忠诚度,推动销售增长与营销智能化升级。
233 1
|
7月前
|
算法 搜索推荐 API
API让电商“活”起来:动态定价策略的革新力量
在电商竞争中,动态定价策略通过API实时调整价格,响应市场变化,提升利润与竞争力。本文解析其原理、技术实现与应用,探讨API如何重塑电商生态。
444 1
|
8月前
|
人工智能 监控 Cloud Native
深度剖析电商API监控与报警:守护电商系统稳定的核心策略
电商API监控与报警是保障电商业务稳定运行的关键工具。文章从重要性、关键指标(如响应时间、成功率、错误率等)、技术工具(如日志监控、性能监控、异常检测)及实施步骤等方面详细阐述了如何构建高效的监控体系。通过案例分析,如京东的商品API实战,展示了全链路追踪与智能告警的应用价值。未来,随着AI、自动化和云原生技术的发展,电商API监控将更加智能高效,助力提升用户体验与业务效率。
|
7月前
|
自然语言处理 供应链 前端开发
深度解析与技术实践:高效调用淘宝商品评论API的策略与代码实现
本文深入解析淘宝开放平台商品评论接口(Taobao.item_review),涵盖接口功能、调用逻辑与实战代码,助力开发者高效获取用户评价数据,提升电商数据分析能力。
|
8月前
|
缓存 负载均衡 监控
微服务架构下的电商API接口设计:策略、方法与实战案例
本文探讨了微服务架构下的电商API接口设计,旨在打造高效、灵活与可扩展的电商系统。通过服务拆分(如商品、订单、支付等模块)和标准化设计(RESTful或GraphQL风格),确保接口一致性与易用性。同时,采用缓存策略、负载均衡及限流技术优化性能,并借助Prometheus等工具实现监控与日志管理。微服务架构的优势在于支持敏捷开发、高并发处理和独立部署,满足电商业务快速迭代需求。未来,电商API设计将向智能化与安全化方向发展。
510 102
|
5月前
|
存储 监控 前端开发
淘宝商品详情 API 实战:5 大策略提升店铺转化率(附签名优化代码 + 避坑指南)
本文深入解析淘宝商品详情API的核心字段与实战应用,分享如何通过动态定价、库存预警、差评控制等5大策略提升电商转化率。结合300+店铺实战经验,提供优化代码与避坑指南,助力开发者与运营者实现数据驱动的精细化运营。
|
5月前
|
数据采集 监控 API
亚马逊:对接竞品监控API实时跟踪价格变动,调整定价策略
在电商竞争中,亚马逊通过对接竞品监控API,实现价格实时采集与分析,动态调整定价策略。本文详解其技术实现、商业价值及挑战,展现数据驱动下的智能定价如何提升竞争力与利润。
594 0
|
5月前
|
人工智能 算法 搜索推荐
拼多多:通过用户分组API实施差异化营销策略,提高客单价
拼多多通过用户分组API实现差异化营销,精准提升客单价。基于用户行为数据自动分类,针对不同群体推送专属优惠,如高频用户推高端商品、新用户送礼包、低频用户唤醒激励。结合满减、捆绑销售等策略,有效提高单笔订单金额。该策略提升营销效率,增强用户粘性,助力平台实现数据驱动的可持续增长。
367 0
|
6月前
|
存储 监控 数据可视化
淘宝API实时竞品监控,市场策略快人一步!
在电商竞争中,实时掌握竞品动态至关重要。本文详解如何利用淘宝开放API构建竞品监控系统,实现价格、库存、促销等数据的自动化采集与分析,帮助企业快速响应市场变化,优化定价、促销与库存策略,提升市场竞争力。
344 0
|
5月前
|
供应链 算法 API
网易严选 API 助力,品质电商商品选品策略升级
在电商竞争激烈背景下,网易严选通过开放商品数据API,提供动态选品解决方案,助力合作伙伴精准决策,实现效率与品质双提升,推动电商战略升级。
148 0

热门文章

最新文章