Flink部署问题之committedOffsets指标为负值如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:RocksDBKeyedStateBackend如何写磁盘

请问RocksDBKeyedStateBackend是何时将state序列化到磁盘的, 窗口结束时间?还是配置的checkpoint周期,谢谢*来自志愿者整理的flink邮件归档



参考答案:

看到 RocksDBWriteBatchWrapper类有 flushIfNeeded()方法 , 是这个么?

private void flushIfNeeded() throws RocksDBException {

boolean needFlush = batch.count() == capacity || (batchSize > 0 &&

getDataSize() >= batchSize);

if (needFlush) {

flush();

}

}

batchSize 来自 state.backend.rocksdb.write-batch-size 参数的配置*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371630?spm=a2c6h.13066369.question.99.6ad26382yXROjy



问题二:flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

Hi,社区的各位大家好: 我目前生产上面使用的是1.8.2版本,相对稳定 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究, 截止目前先后研究了1.10.1 1.11.1共2个大版本

在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加 状态后端使用的是rocksdb 的增量模式 StateBackend backend =new RocksDBStateBackend("hdfs:///checkpoints-data/",true); 设置了官网文档中找到的删除策略: TableConfig tableConfig = streamTableEnvironment.getConfig(); tableConfig.setIdleStateRetentionTime(Time.minutes(2), Time.minutes(7));

请问是我使用的方式不对吗?

通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator

版本影响:flink1.10.1 flink1.11.1 planner:blink planner source : kafka source 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

sql: insert into result select request_time ,request_id ,request_cnt ,avg_resptime ,stddev_resptime ,terminal_cnt ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19) from ( select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1' MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time ,commandId as request_id ,count(*) as request_cnt ,ROUND(avg(CAST(respTime as double)),2) as avg_resptime ,ROUND(stddev_pop(CAST(respTime as double)),2) as stddev_resptime from log where commandId in (104005 ,204005 ,404005) and errCode=0 and attr=0 group by TUMBLE(times, INTERVAL '1' MINUTE),commandId

union all

select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1' MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time ,99999999 ,count(*) as request_cnt ,ROUND(avg(CAST(respTime as double)),2) as avg_resptime ,ROUND(stddev_pop(CAST(respTime as double)),2) as stddev_resptime from log where commandId in (104005 ,204005 ,404005) and errCode=0 and attr=0 group by TUMBLE(times, INTERVAL '1' MINUTE) )

source:

create table log ( eventTime bigint ,times timestamp(3) …………………… ,commandId integer ,watermark for times as times - interval '5' second ) with( 'connector' = 'kafka-0.10', 'topic' = '……', 'properties.bootstrap.servers' = '……', 'properties.group.id' = '……', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' )

sink1: create table result ( request_time varchar ,request_id integer ,request_cnt bigint ,avg_resptime double ,stddev_resptime double ,insert_time varchar ) with ( 'connector' = 'kafka-0.10', 'topic' = '……', 'properties.bootstrap.servers' = '……', 'properties.group.id' = '……', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' )*来自志愿者整理的flink邮件归档



参考答案:

SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state 越来越大的情况,或许可以检查下

watermark[1]

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html *来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371561?spm=a2c6h.13066369.question.100.6ad26382wn7scp



问题三:[DISCUSS] FLIP-133: Rework PyFlink Documentation

Since the release of Flink 1.11, users of PyFlink have continued to grow. As far as I know there are many companies have used PyFlink for data analysis, operation and maintenance monitoring business has been put into production(Such as 聚美优品1, 浙江墨芷[2] (Mozhi) etc.). According to the feedback we received, current documentation is not very friendly to PyFlink users. There are two shortcomings:

  • Python related content is mixed in the Java/Scala documentation, which makes it difficult for users who only focus on PyFlink to read.
  • There is already a "Python Table API" section in the Table API document to store PyFlink documents, but the number of articles is small and the content is fragmented. It is difficult for beginners to learn from it.

In addition, FLIP-130 introduced the Python DataStream API. Many documents will be added for those new APIs. In order to increase the readability and maintainability of the PyFlink document, Wei Zhong and me have discussed offline and would like to rework it via this FLIP.

We will rework the document around the following three objectives:

  • Add a separate section for Python API under the "Application Development" section.
  • Restructure current Python documentation to a brand new structure to ensure complete content and friendly to beginners.
  • Improve the documents shared by Python/Java/Scala to make it more friendly to Python users and without affecting Java/Scala users.

More detail can be found in the FLIP-133: https://cwiki.apache.org/confluence/display/FLINK/FLIP-133%3A+Rework+PyFlink+Documentation

Best, Jincheng

[1] https://mp.weixin.qq.com/s/zVsBIs1ZEFe4atYUYtZpRg [2] https://mp.weixin.qq.com/s/R4p_a2TWGpESBWr3pLtM2g*来自志愿者整理的flink邮件归档



参考答案:

Thanks a lot for bringing up this discussion and the proposal. +1 to improve the Python API doc.

I have received many feedbacks from PyFlink beginners about the PyFlink doc, e.g. the materials are too few, the Python doc is mixed with the Java doc and it's not easy to find the docs he wants to know.

I think it would greatly improve the user experience if we can have one place which includes most knowledges PyFlink users should know.*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371559?spm=a2c6h.13066369.question.99.6ad26382RBf3Qv



问题四:flink1.9.1 在WebUI中查看committedOffsets指标为负值

flink1.9.1 在WebUI中查看Source__Custom_Source.KafkaConsumer.topic.geek-event-target.partition.3.committedOffsets指标为负值,查看官网释义:对于每个分区,最后一次成功提交到Kafka的偏移量。 但我这里为什么是负值呢? http://apache-flink.147419.n8.nabble.com/file/t802/%E6%8D%95%E8%8E%B711.png

希望能得到指导,万分感谢~*来自志愿者整理的flink邮件归档



参考答案:

可以检查下在patition3上有没有成功提交过offsets。负值可能是没有提交过情况下的默认值(我猜这是个不变值)。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371557?spm=a2c6h.13066369.question.100.6ad26382BDwqsH

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
30天前
|
Kubernetes 流计算 Perl
在Rancher K8s上部署Flink时,TaskManager连接不上并不断重启可能是由多种原因导致的
在Rancher K8s上部署Flink时,TaskManager连接不上并不断重启可能是由多种原因导致的
34 7
|
1月前
|
资源调度 Kubernetes Apache
部署Flink集群后没有资源可能有以下几个原因
【2月更文挑战第23天】 部署Flink集群后没有资源可能有以下几个原因
12 2
|
2月前
|
SQL 消息中间件 Java
Flink部署问题之带上savepoint部署任务报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
SQL 消息中间件 Kafka
Flink部署问题之hive表没有数据如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
资源调度 Kubernetes Apache
Flink部署问题之条件筛选失败如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
SQL 消息中间件 Java
Flink部署问题之编译失败如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
SQL 关系型数据库 Java
Flink部署问题之不支持SupportsFilterPushDown如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
分布式计算 Java 分布式数据库
Flink部署问题之jobid不变如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
483 5
|
1月前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1355 1
官宣|Apache Flink 1.19 发布公告

相关产品

  • 实时计算 Flink版