Flink数据问题之checkpoint数据删除失败如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink-1.11 ddl 写入json 格式数据到hdfs问题

代码引用

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#full-example

将parquet换成了json之后,chk成功,但是文件状态一直处于in-progress状态,我应该如何让它成功呢?

parquet目前是已经success了。

*来自志愿者整理的flink邮件归档



参考答案:

如同[1]里面说的,对于csv和json,你还需要配置rolling相关参数,因为它们是可以不在checkpoint强行rolling的。

NOTE: For row formats (csv, json), you can set the parameter

sink.rolling-policy.file-size or sink.rolling-policy.rollover-interval in

the connector properties and parameter execution.checkpointing.interval in

flink-conf.yaml together if you don’t want to wait a long period before

observe the data exists in file system. For other formats (avro, orc), you

can just set parameter execution.checkpointing.interval in flink-conf.yaml.

所以如果你想通过时间来rolling,你还需要配sink.rolling-policy.rollover-interval和sink.rolling-policy.check-interval

[1]

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#rolling-policy*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370217?spm=a2c6h.12873639.article-detail.68.6f9243783Lv0fl



问题二:flink 1.11 checkpoint使用

我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候 从checkpoint恢复以后,新来op=d的数据会删除失败 重启命令:./bin/flink run -m yarn-cluster /root/bigdata-flink-1.0.jar -s hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata 代码: EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 最大允许同时出现几个CheckPoint env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); // 最小得间隔时间 env.getCheckpointConfig().setPreferCheckpointForRecovery(true); // 是否倾向于用CheckPoint做故障恢复 env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1); // 容忍多少次CheckPoint失败 //Checkpoint文件清理策略

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //Checkpoint外部文件路径 env.setStateBackend(new FsStateBackend(new URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false)); TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS))); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); String sourceDDL = String.format( "CREATE TABLE debezium_source (" + " id INT NOT NULL," + " name STRING," + " description STRING," + " weight Double" + ") WITH (" + " 'connector' = 'kafka-0.11'," + " 'topic' = '%s'," + " 'properties.bootstrap.servers' = '%s'," + " 'scan.startup.mode' = 'group-offsets'," + " 'format' = 'debezium-json'" + ")", "ddd", " 172.22.20.206:9092"); String sinkDDL = "CREATE TABLE sink (" + " id INT NOT NULL," + " name STRING," + " description STRING," + " weight Double," + " PRIMARY KEY (id,name, description,weight) NOT ENFORCED " + ") WITH (" + " 'connector' = 'jdbc'," + " 'url' = 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," + " 'table-name' = 'products'," + " 'driver'= 'com.mysql.cj.jdbc.Driver'," + " 'username'='DataPip'," + " 'password'='DataPip'" + ")"; String dml = "INSERT INTO sink SELECT id,name ,description, weight FROM debezium_source GROUP BY id,name ,description, weight"; tEnv.executeSql(sourceDDL); tEnv.executeSql(sinkDDL); tEnv.executeSql(dml);

*来自志愿者整理的flink邮件归档



参考答案:

为什么要 GROUP BY id,name ,description, weight ? 直接 "INSERT INTO sink SELECT id,name ,description, weight FROM debezium_source" 不能满足需求?*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370218?spm=a2c6h.12873639.article-detail.69.6f9243783Lv0fl



问题三:flink 1.11任务提交的问题

请教下flink 1.11任务提交的问题。如果我的一个作业里既有sql dml提交(executeSQL执行),又通过DataStream.addSink来写出, 通过StreamExecutionEnvironment.execute提交,yarn per-job貌似会提交两个作业。这种情况下,我该如何处理呢?只想提交一个作业。

*来自志愿者整理的flink邮件归档



参考答案:

目前这个做不到。executeSQL 和 table to DataStream 是分别优化和提交作业的。

即使在1.11 之前,table to DataStream 也不会和 sqlUpdate 或者 insertInto 的语句一起优化,

虽然只提交了一个job,但是是两个独立的pipeline,也没有计算复用,和两个job没啥差别。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370220?spm=a2c6h.12873639.article-detail.70.6f9243783Lv0fl



问题四:flink1.9写权限认证的es6

请问flink如何将数据写入到权限认证的es集群哪,没找到配置用户名密码的地方,哪位大佬帮忙解答一下。。。。

*来自志愿者整理的flink邮件归档



参考答案:

SQL添加认证的逻辑已经在FLINK-18361[1] 中完成了,1.12版本会支持这个功能

[1] https://issues.apache.org/jira/browse/FLINK-18361*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370221?spm=a2c6h.12873639.article-detail.71.6f9243783Lv0fl



问题五:flink state问题

大家好

我有一个去重的需求,想节省内存用的bloomfilter,代码如下:

.keyBy(_._1).process(new KeyedProcessFunctionString,(String,String),String {

var state:ValueState[BloomFilter[CharSequence]]= null

override def open(parameters: Configuration): Unit = {

val stateDesc = new ValueStateDescriptor("state",TypeInformation.of(new TypeHintBloomFilter[CharSequence]{}))

state = getRuntimeContext.getState(stateDesc)

}

override def processElement(value: (String, String), ctx: KeyedProcessFunction[String, (String, String), String]#Context, out: Collector[String]) = {

var filter = state.value

if(filter==null){

println("null filter")

filter= BloomFilter.createCharSequence}

//val contains = filter.mightContain(value._2)

if(!filter.mightContain(value._2)) {

filter.put(value._2)

state.update(filter)

out.collect(value._2)

}

}

})

通过日志我看到每次我从savepoint恢复的时候这个state里面的bloomfilter都是null,这是为什么啊

*来自志愿者整理的flink邮件归档



参考答案:

你可以尝试用 state-process-api[1] 看一下 savepoint 中 state 的内容,先缩小一下问题的范围,如果

savepoint 中就没有了,那就是序列化到 savepoint 的时候出错了,savepoitn 是有的,那么就是恢复的时候出错了。

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370223?spm=a2c6h.12873639.article-detail.72.6f9243783Lv0fl

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
运维 数据处理 Apache
数据实时计算产品对比测评报告:阿里云实时计算Flink版
数据实时计算产品对比测评报告:阿里云实时计算Flink版
|
2月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
58 1
|
2月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
49 1
|
2月前
|
SQL 分布式计算 大数据
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
49 0
|
2月前
|
大数据 流计算
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)
46 0
|
3月前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
85 2
|
4月前
|
消息中间件 Kafka 数据处理
实时计算 Flink版产品使用问题之如何处理数据并记录每条数据的变更
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之同步时,上游批量删除大量数据(如20万条),如何提高删除效率
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

相关产品

  • 实时计算 Flink版