最近在验证需求,kafka数据流写hdfs文件系统,使用官网文档Flink-1.11版本的示例demo成功提交到yarn之后,发现如期生成分区目录及文件,但是分区提交有些疑惑想请教下大家。
问题描述:
在15点37分时,查看hdfs如期生成[/user/flink/order/dt=2020-03-13/hour=14][/user/flink/order/dt=2020-03-14/hour=21]等相同格式的诸多分区目录,然后具体进入hour=14目录下发现partfile处于inprogress,官网描述说当前系统时间大于分区创建时间+延迟时间,即提交分区;当我在16点37分、38分再去查看时,hour=14目录下的partfile仍处于inprogress状态,查明原因是因为我在16点07分时又向kafka写入了数据,此时发现所有的分区目录下的partfile创建时间都变成了16点07分,因此之前15点37分就已经创建partfile的分区都还要等到17点07分才能进行提交。(理论上是这个意思吧)
那么问题来了,看如下ddl可知我的分区是基于day+hour,那么我的理解是分区提交时间计算是基于hour分区目录的创建时间来进行的,对吗?如果是这样的话,那为何我16点07分写数据时会影响到之前那些15点37分创建的分区提交呢?而导致全部都需要等到17点07分才能进行提交..
另外,查看了一下我16点07分写数据时,除了这时本身应写入的分区目录下的partfile是16点07分之外,其他所有分区目录下的partfile文件创建时间都被修改成了16点07分,而hour目录却没变化。
描述的有点长可能也有点乱,可能是我对流写文件还不够熟悉还没有理解其中真正的意思,所以希望有大佬能帮忙解答,谢谢!
source ddl:
CREATE TABLE kafka_source (
order_id STRING,
order_sales DOUBLE,
update_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'flink-kafka',
'properties.bootstrap.servers' = '10.3.15.128:9092',
'properties.group.id' = 'kafka_hdfs',
'format' = 'json',
'scan.startup.mode' = 'group-offsets'
)
sink ddl:
CREATE TABLE hdfs_sink (
order_id STRING,
order_sales DOUBLE,
dt STRING,
hour
STRING
) PARTITIONED BY (dt, hour
) WITH (
'connector' = 'filesystem',
'path' = 'hdfs:///user/flink/order',
'format' = 'json',
'sink.partition-commit.delay' = '1h',
'sink.partition-commit.policy.kind' = 'success-file'
)
transform dml:
INSERT INTO hdfs_sink
SELECT
order_id,
order_sales,
DATE_FORMAT(update_time, 'yyyy-MM-dd'),
DATE_FORMAT(update_time, 'HH')
FROM kafka_source
best,
amenhub*来自志愿者整理的flink邮件归档
开启checkpoint吗?
Part files can be in one of three states:
In-progress : The part file that is currently being written to is in-progress Pending : Closed (due to the specified rolling policy) in-progress files that are waiting to be committed Finished : On successful checkpoints (STREAMING) or at the end of input (BATCH) pending files transition to “Finished”
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。