Flink部署问题之committedOffsets指标为负值如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:RocksDBKeyedStateBackend如何写磁盘

请问RocksDBKeyedStateBackend是何时将state序列化到磁盘的, 窗口结束时间?还是配置的checkpoint周期,谢谢*来自志愿者整理的flink邮件归档



参考答案:

看到 RocksDBWriteBatchWrapper类有 flushIfNeeded()方法 , 是这个么?

private void flushIfNeeded() throws RocksDBException {

boolean needFlush = batch.count() == capacity || (batchSize > 0 &&

getDataSize() >= batchSize);

if (needFlush) {

flush();

}

}

batchSize 来自 state.backend.rocksdb.write-batch-size 参数的配置*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371630?spm=a2c6h.13066369.question.99.6ad26382yXROjy



问题二:flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

Hi,社区的各位大家好: 我目前生产上面使用的是1.8.2版本,相对稳定 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究, 截止目前先后研究了1.10.1 1.11.1共2个大版本

在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加 状态后端使用的是rocksdb 的增量模式 StateBackend backend =new RocksDBStateBackend("hdfs:///checkpoints-data/",true); 设置了官网文档中找到的删除策略: TableConfig tableConfig = streamTableEnvironment.getConfig(); tableConfig.setIdleStateRetentionTime(Time.minutes(2), Time.minutes(7));

请问是我使用的方式不对吗?

通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator

版本影响:flink1.10.1 flink1.11.1 planner:blink planner source : kafka source 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

sql: insert into result select request_time ,request_id ,request_cnt ,avg_resptime ,stddev_resptime ,terminal_cnt ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19) from ( select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1' MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time ,commandId as request_id ,count(*) as request_cnt ,ROUND(avg(CAST(respTime as double)),2) as avg_resptime ,ROUND(stddev_pop(CAST(respTime as double)),2) as stddev_resptime from log where commandId in (104005 ,204005 ,404005) and errCode=0 and attr=0 group by TUMBLE(times, INTERVAL '1' MINUTE),commandId

union all

select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1' MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time ,99999999 ,count(*) as request_cnt ,ROUND(avg(CAST(respTime as double)),2) as avg_resptime ,ROUND(stddev_pop(CAST(respTime as double)),2) as stddev_resptime from log where commandId in (104005 ,204005 ,404005) and errCode=0 and attr=0 group by TUMBLE(times, INTERVAL '1' MINUTE) )

source:

create table log ( eventTime bigint ,times timestamp(3) …………………… ,commandId integer ,watermark for times as times - interval '5' second ) with( 'connector' = 'kafka-0.10', 'topic' = '……', 'properties.bootstrap.servers' = '……', 'properties.group.id' = '……', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' )

sink1: create table result ( request_time varchar ,request_id integer ,request_cnt bigint ,avg_resptime double ,stddev_resptime double ,insert_time varchar ) with ( 'connector' = 'kafka-0.10', 'topic' = '……', 'properties.bootstrap.servers' = '……', 'properties.group.id' = '……', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' )*来自志愿者整理的flink邮件归档



参考答案:

SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state 越来越大的情况,或许可以检查下

watermark[1]

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html *来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371561?spm=a2c6h.13066369.question.100.6ad26382wn7scp



问题三:[DISCUSS] FLIP-133: Rework PyFlink Documentation

Since the release of Flink 1.11, users of PyFlink have continued to grow. As far as I know there are many companies have used PyFlink for data analysis, operation and maintenance monitoring business has been put into production(Such as 聚美优品1, 浙江墨芷[2] (Mozhi) etc.). According to the feedback we received, current documentation is not very friendly to PyFlink users. There are two shortcomings:

  • Python related content is mixed in the Java/Scala documentation, which makes it difficult for users who only focus on PyFlink to read.
  • There is already a "Python Table API" section in the Table API document to store PyFlink documents, but the number of articles is small and the content is fragmented. It is difficult for beginners to learn from it.

In addition, FLIP-130 introduced the Python DataStream API. Many documents will be added for those new APIs. In order to increase the readability and maintainability of the PyFlink document, Wei Zhong and me have discussed offline and would like to rework it via this FLIP.

We will rework the document around the following three objectives:

  • Add a separate section for Python API under the "Application Development" section.
  • Restructure current Python documentation to a brand new structure to ensure complete content and friendly to beginners.
  • Improve the documents shared by Python/Java/Scala to make it more friendly to Python users and without affecting Java/Scala users.

More detail can be found in the FLIP-133: https://cwiki.apache.org/confluence/display/FLINK/FLIP-133%3A+Rework+PyFlink+Documentation

Best, Jincheng

[1] https://mp.weixin.qq.com/s/zVsBIs1ZEFe4atYUYtZpRg [2] https://mp.weixin.qq.com/s/R4p_a2TWGpESBWr3pLtM2g*来自志愿者整理的flink邮件归档



参考答案:

Thanks a lot for bringing up this discussion and the proposal. +1 to improve the Python API doc.

I have received many feedbacks from PyFlink beginners about the PyFlink doc, e.g. the materials are too few, the Python doc is mixed with the Java doc and it's not easy to find the docs he wants to know.

I think it would greatly improve the user experience if we can have one place which includes most knowledges PyFlink users should know.*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371559?spm=a2c6h.13066369.question.99.6ad26382RBf3Qv



问题四:flink1.9.1 在WebUI中查看committedOffsets指标为负值

flink1.9.1 在WebUI中查看Source__Custom_Source.KafkaConsumer.topic.geek-event-target.partition.3.committedOffsets指标为负值,查看官网释义:对于每个分区,最后一次成功提交到Kafka的偏移量。 但我这里为什么是负值呢? http://apache-flink.147419.n8.nabble.com/file/t802/%E6%8D%95%E8%8E%B711.png

希望能得到指导,万分感谢~*来自志愿者整理的flink邮件归档



参考答案:

可以检查下在patition3上有没有成功提交过offsets。负值可能是没有提交过情况下的默认值(我猜这是个不变值)。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371557?spm=a2c6h.13066369.question.100.6ad26382BDwqsH

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
Kubernetes Cloud Native 流计算
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
113 3
|
3月前
|
资源调度 分布式计算 大数据
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
149 0
|
5月前
|
机器学习/深度学习 人工智能 运维
美团 Flink 大作业部署问题之Flink在生态技术演进上有什么主要方向
美团 Flink 大作业部署问题之Flink在生态技术演进上有什么主要方向
|
5月前
|
监控 Serverless Apache
美团 Flink 大作业部署问题之如何体现Flink在业界的影响力
美团 Flink 大作业部署问题之如何体现Flink在业界的影响力
|
5月前
|
监控 Serverless 数据库
美团 Flink 大作业部署问题之端云联调并将流量恢复到云端实例如何结束
美团 Flink 大作业部署问题之端云联调并将流量恢复到云端实例如何结束
|
5月前
|
监控 Java Serverless
美团 Flink 大作业部署问题之想在Serverless平台上实时查看Spring Boot应用的日志要怎么操作
美团 Flink 大作业部署问题之想在Serverless平台上实时查看Spring Boot应用的日志要怎么操作
|
5月前
|
Java 流计算
美团 Flink 大作业部署问题之files-to-delete 的执行为什么能够异步进行呢
美团 Flink 大作业部署问题之files-to-delete 的执行为什么能够异步进行呢
|
5月前
|
缓存 流计算
美团 Flink 大作业部署问题之根据已存在的 Checkpoint 副本进行增量的副本制作如何实现
美团 Flink 大作业部署问题之根据已存在的 Checkpoint 副本进行增量的副本制作如何实现
|
5月前
|
分布式计算 流计算
美团 Flink 大作业部署问题之Checkpoint Replicate Service 跨 HDFS 集群的副本制作是如何实现的
美团 Flink 大作业部署问题之Checkpoint Replicate Service 跨 HDFS 集群的副本制作是如何实现的
|
5月前
|
流计算
美团 Flink 大作业部署问题之新启动作业的 Checkpoint 跨作业文件引用的问题要如何避免
美团 Flink 大作业部署问题之新启动作业的 Checkpoint 跨作业文件引用的问题要如何避免

相关产品

  • 实时计算 Flink版