Flink数据问题之checkpoint数据删除失败如何解决

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink-1.11 ddl 写入json 格式数据到hdfs问题

代码引用

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#full-example

将parquet换成了json之后,chk成功,但是文件状态一直处于in-progress状态,我应该如何让它成功呢?

parquet目前是已经success了。

*来自志愿者整理的flink邮件归档



参考答案:

如同[1]里面说的,对于csv和json,你还需要配置rolling相关参数,因为它们是可以不在checkpoint强行rolling的。

NOTE: For row formats (csv, json), you can set the parameter

sink.rolling-policy.file-size or sink.rolling-policy.rollover-interval in

the connector properties and parameter execution.checkpointing.interval in

flink-conf.yaml together if you don’t want to wait a long period before

observe the data exists in file system. For other formats (avro, orc), you

can just set parameter execution.checkpointing.interval in flink-conf.yaml.

所以如果你想通过时间来rolling,你还需要配sink.rolling-policy.rollover-interval和sink.rolling-policy.check-interval

[1]

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#rolling-policy*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370217?spm=a2c6h.12873639.article-detail.68.6f9243783Lv0fl



问题二:flink 1.11 checkpoint使用

我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候 从checkpoint恢复以后,新来op=d的数据会删除失败 重启命令:./bin/flink run -m yarn-cluster /root/bigdata-flink-1.0.jar -s hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata 代码: EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 最大允许同时出现几个CheckPoint env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); // 最小得间隔时间 env.getCheckpointConfig().setPreferCheckpointForRecovery(true); // 是否倾向于用CheckPoint做故障恢复 env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1); // 容忍多少次CheckPoint失败 //Checkpoint文件清理策略

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //Checkpoint外部文件路径 env.setStateBackend(new FsStateBackend(new URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false)); TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS))); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); String sourceDDL = String.format( "CREATE TABLE debezium_source (" + " id INT NOT NULL," + " name STRING," + " description STRING," + " weight Double" + ") WITH (" + " 'connector' = 'kafka-0.11'," + " 'topic' = '%s'," + " 'properties.bootstrap.servers' = '%s'," + " 'scan.startup.mode' = 'group-offsets'," + " 'format' = 'debezium-json'" + ")", "ddd", " 172.22.20.206:9092"); String sinkDDL = "CREATE TABLE sink (" + " id INT NOT NULL," + " name STRING," + " description STRING," + " weight Double," + " PRIMARY KEY (id,name, description,weight) NOT ENFORCED " + ") WITH (" + " 'connector' = 'jdbc'," + " 'url' = 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," + " 'table-name' = 'products'," + " 'driver'= 'com.mysql.cj.jdbc.Driver'," + " 'username'='DataPip'," + " 'password'='DataPip'" + ")"; String dml = "INSERT INTO sink SELECT id,name ,description, weight FROM debezium_source GROUP BY id,name ,description, weight"; tEnv.executeSql(sourceDDL); tEnv.executeSql(sinkDDL); tEnv.executeSql(dml);

*来自志愿者整理的flink邮件归档



参考答案:

为什么要 GROUP BY id,name ,description, weight ? 直接 "INSERT INTO sink SELECT id,name ,description, weight FROM debezium_source" 不能满足需求?*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370218?spm=a2c6h.12873639.article-detail.69.6f9243783Lv0fl



问题三:flink 1.11任务提交的问题

请教下flink 1.11任务提交的问题。如果我的一个作业里既有sql dml提交(executeSQL执行),又通过DataStream.addSink来写出, 通过StreamExecutionEnvironment.execute提交,yarn per-job貌似会提交两个作业。这种情况下,我该如何处理呢?只想提交一个作业。

*来自志愿者整理的flink邮件归档



参考答案:

目前这个做不到。executeSQL 和 table to DataStream 是分别优化和提交作业的。

即使在1.11 之前,table to DataStream 也不会和 sqlUpdate 或者 insertInto 的语句一起优化,

虽然只提交了一个job,但是是两个独立的pipeline,也没有计算复用,和两个job没啥差别。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370220?spm=a2c6h.12873639.article-detail.70.6f9243783Lv0fl



问题四:flink1.9写权限认证的es6

请问flink如何将数据写入到权限认证的es集群哪,没找到配置用户名密码的地方,哪位大佬帮忙解答一下。。。。

*来自志愿者整理的flink邮件归档



参考答案:

SQL添加认证的逻辑已经在FLINK-18361[1] 中完成了,1.12版本会支持这个功能

[1] https://issues.apache.org/jira/browse/FLINK-18361*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370221?spm=a2c6h.12873639.article-detail.71.6f9243783Lv0fl



问题五:flink state问题

大家好

我有一个去重的需求,想节省内存用的bloomfilter,代码如下:

.keyBy(_._1).process(new KeyedProcessFunctionString,(String,String),String {

var state:ValueState[BloomFilter[CharSequence]]= null

override def open(parameters: Configuration): Unit = {

val stateDesc = new ValueStateDescriptor("state",TypeInformation.of(new TypeHintBloomFilter[CharSequence]{}))

state = getRuntimeContext.getState(stateDesc)

}

override def processElement(value: (String, String), ctx: KeyedProcessFunction[String, (String, String), String]#Context, out: Collector[String]) = {

var filter = state.value

if(filter==null){

println("null filter")

filter= BloomFilter.createCharSequence}

//val contains = filter.mightContain(value._2)

if(!filter.mightContain(value._2)) {

filter.put(value._2)

state.update(filter)

out.collect(value._2)

}

}

})

通过日志我看到每次我从savepoint恢复的时候这个state里面的bloomfilter都是null,这是为什么啊

*来自志愿者整理的flink邮件归档



参考答案:

你可以尝试用 state-process-api[1] 看一下 savepoint 中 state 的内容,先缩小一下问题的范围,如果

savepoint 中就没有了,那就是序列化到 savepoint 的时候出错了,savepoitn 是有的,那么就是恢复的时候出错了。

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370223?spm=a2c6h.12873639.article-detail.72.6f9243783Lv0fl

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
3月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
777 43
|
3月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
278 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
7月前
|
存储 消息中间件 Kafka
基于 Flink 的中国电信星海时空数据多引擎实时改造
本文整理自中国电信集团大数据架构师李新虎老师在Flink Forward Asia 2024的分享,围绕星海时空智能系统展开,涵盖四个核心部分:时空数据现状、实时场景多引擎化、典型应用及未来展望。系统日处理8000亿条数据,具备亚米级定位能力,通过Flink多引擎架构解决数据膨胀与响应时效等问题,优化资源利用并提升计算效率。应用场景包括运动状态识别、个体行为分析和群智感知,未来将推进湖仓一体改造与三维时空服务体系建设,助力数字化转型与智慧城市建设。
787 3
基于 Flink 的中国电信星海时空数据多引擎实时改造
|
3月前
|
SQL 关系型数据库 Apache
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
本文将深入解析 Flink-Doris-Connector 三大典型场景中的设计与实现,并结合 Flink CDC 详细介绍了整库同步的解决方案,助力构建更加高效、稳定的实时数据处理体系。
1696 0
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
|
4月前
|
存储 消息中间件 搜索推荐
京东零售基于Flink的推荐系统智能数据体系
摘要:本文整理自京东零售技术专家张颖老师,在 Flink Forward Asia 2024 生产实践(二)专场中的分享,介绍了基于Flink构建的推荐系统数据,以及Flink智能体系带来的智能服务功能。内容分为以下六个部分: 推荐系统架构 索引 样本 特征 可解释 指标 Tips:关注「公众号」回复 FFA 2024 查看会后资料~
331 1
京东零售基于Flink的推荐系统智能数据体系
|
8月前
|
Oracle 关系型数据库 Java
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
本文介绍通过Flink CDC实现Oracle数据实时同步至崖山数据库(YashanDB)的方法,支持全量与增量同步,并涵盖新增、修改和删除的DML操作。内容包括环境准备(如JDK、Flink版本等)、Oracle日志归档启用、用户权限配置、增量日志记录设置、元数据迁移、Flink安装与配置、生成Flink SQL文件、Streampark部署,以及创建和启动实时同步任务的具体步骤。适合需要跨数据库实时同步方案的技术人员参考。
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
|
9月前
|
Java 关系型数据库 MySQL
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
2319 45
|
8月前
|
消息中间件 关系型数据库 Kafka
阿里云基于 Flink CDC 的现代数据栈云上实践
阿里云基于 Flink CDC 的现代数据栈云上实践
154 1
|
存储 监控 数据处理
flink 向doris 数据库写入数据时出现背压如何排查?
本文介绍了如何确定和解决Flink任务向Doris数据库写入数据时遇到的背压问题。首先通过Flink Web UI和性能指标监控识别背压,然后从Doris数据库性能、网络连接稳定性、Flink任务数据处理逻辑及资源配置等方面排查原因,并通过分析相关日志进一步定位问题。
856 61
|
10月前
|
存储 监控 算法
Flink 四大基石之 Checkpoint 使用详解
Flink 的 Checkpoint 机制通过定期插入 Barrier 将数据流切分并进行快照,确保故障时能从最近的 Checkpoint 恢复,保障数据一致性。Checkpoint 分为精确一次和至少一次两种语义,前者确保每个数据仅处理一次,后者允许重复处理但不会丢失数据。此外,Flink 提供多种重启策略,如固定延迟、失败率和无重启策略,以应对不同场景。SavePoint 是手动触发的 Checkpoint,用于作业升级和迁移。Checkpoint 执行流程包括 Barrier 注入、算子状态快照、Barrier 对齐和完成 Checkpoint。
2082 20

相关产品

  • 实时计算 Flink版
  • 下一篇
    oss云网关配置