问题一:RocksDBKeyedStateBackend如何写磁盘
请问RocksDBKeyedStateBackend是何时将state序列化到磁盘的, 窗口结束时间?还是配置的checkpoint周期,谢谢*来自志愿者整理的flink邮件归档
参考答案:
看到 RocksDBWriteBatchWrapper类有 flushIfNeeded()方法 , 是这个么?
private void flushIfNeeded() throws RocksDBException {
boolean needFlush = batch.count() == capacity || (batchSize > 0 &&
getDataSize() >= batchSize);
if (needFlush) {
flush();
}
}
batchSize 来自 state.backend.rocksdb.write-batch-size 参数的配置*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371630?spm=a2c6h.13066369.question.99.6ad26382yXROjy
问题二:flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
Hi,社区的各位大家好: 我目前生产上面使用的是1.8.2版本,相对稳定 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究, 截止目前先后研究了1.10.1 1.11.1共2个大版本
在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加 状态后端使用的是rocksdb 的增量模式 StateBackend backend =new RocksDBStateBackend("hdfs:///checkpoints-data/",true); 设置了官网文档中找到的删除策略: TableConfig tableConfig = streamTableEnvironment.getConfig(); tableConfig.setIdleStateRetentionTime(Time.minutes(2), Time.minutes(7));
请问是我使用的方式不对吗?
通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator
版本影响:flink1.10.1 flink1.11.1 planner:blink planner source : kafka source 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
sql: insert into result select request_time ,request_id ,request_cnt ,avg_resptime ,stddev_resptime ,terminal_cnt ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19) from ( select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1' MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time ,commandId as request_id ,count(*) as request_cnt ,ROUND(avg(CAST(respTime
as double)),2) as avg_resptime ,ROUND(stddev_pop(CAST(respTime
as double)),2) as stddev_resptime from log where commandId in (104005 ,204005 ,404005) and errCode=0 and attr=0 group by TUMBLE(times, INTERVAL '1' MINUTE),commandId
union all
select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1' MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time ,99999999 ,count(*) as request_cnt ,ROUND(avg(CAST(respTime
as double)),2) as avg_resptime ,ROUND(stddev_pop(CAST(respTime
as double)),2) as stddev_resptime from log where commandId in (104005 ,204005 ,404005) and errCode=0 and attr=0 group by TUMBLE(times, INTERVAL '1' MINUTE) )
source:
create table log ( eventTime bigint ,times timestamp(3) …………………… ,commandId integer ,watermark for times as times - interval '5' second ) with( 'connector' = 'kafka-0.10', 'topic' = '……', 'properties.bootstrap.servers' = '……', 'properties.group.id' = '……', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' )
sink1: create table result ( request_time varchar ,request_id integer ,request_cnt bigint ,avg_resptime double ,stddev_resptime double ,insert_time varchar ) with ( 'connector' = 'kafka-0.10', 'topic' = '……', 'properties.bootstrap.servers' = '……', 'properties.group.id' = '……', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' )*来自志愿者整理的flink邮件归档
参考答案:
SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state 越来越大的情况,或许可以检查下
watermark[1]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html *来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371561?spm=a2c6h.13066369.question.100.6ad26382wn7scp
问题三:[DISCUSS] FLIP-133: Rework PyFlink Documentation
Since the release of Flink 1.11, users of PyFlink have continued to grow. As far as I know there are many companies have used PyFlink for data analysis, operation and maintenance monitoring business has been put into production(Such as 聚美优品1, 浙江墨芷[2] (Mozhi) etc.). According to the feedback we received, current documentation is not very friendly to PyFlink users. There are two shortcomings:
- Python related content is mixed in the Java/Scala documentation, which makes it difficult for users who only focus on PyFlink to read.
- There is already a "Python Table API" section in the Table API document to store PyFlink documents, but the number of articles is small and the content is fragmented. It is difficult for beginners to learn from it.
In addition, FLIP-130 introduced the Python DataStream API. Many documents will be added for those new APIs. In order to increase the readability and maintainability of the PyFlink document, Wei Zhong and me have discussed offline and would like to rework it via this FLIP.
We will rework the document around the following three objectives:
- Add a separate section for Python API under the "Application Development" section.
- Restructure current Python documentation to a brand new structure to ensure complete content and friendly to beginners.
- Improve the documents shared by Python/Java/Scala to make it more friendly to Python users and without affecting Java/Scala users.
More detail can be found in the FLIP-133: https://cwiki.apache.org/confluence/display/FLINK/FLIP-133%3A+Rework+PyFlink+Documentation
Best, Jincheng
[1] https://mp.weixin.qq.com/s/zVsBIs1ZEFe4atYUYtZpRg [2] https://mp.weixin.qq.com/s/R4p_a2TWGpESBWr3pLtM2g*来自志愿者整理的flink邮件归档
参考答案:
Thanks a lot for bringing up this discussion and the proposal. +1 to improve the Python API doc.
I have received many feedbacks from PyFlink beginners about the PyFlink doc, e.g. the materials are too few, the Python doc is mixed with the Java doc and it's not easy to find the docs he wants to know.
I think it would greatly improve the user experience if we can have one place which includes most knowledges PyFlink users should know.*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371559?spm=a2c6h.13066369.question.99.6ad26382RBf3Qv
问题四:flink1.9.1 在WebUI中查看committedOffsets指标为负值
flink1.9.1 在WebUI中查看Source__Custom_Source.KafkaConsumer.topic.geek-event-target.partition.3.committedOffsets指标为负值,查看官网释义:对于每个分区,最后一次成功提交到Kafka的偏移量。 但我这里为什么是负值呢? http://apache-flink.147419.n8.nabble.com/file/t802/%E6%8D%95%E8%8E%B711.png
希望能得到指导,万分感谢~*来自志愿者整理的flink邮件归档
参考答案:
可以检查下在patition3上有没有成功提交过offsets。负值可能是没有提交过情况下的默认值(我猜这是个不变值)。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371557?spm=a2c6h.13066369.question.100.6ad26382BDwqsH