开发者社区> 问答> 正文

Flink 消费kafka ,怎么写ORC文件?

【现状如下】 Flink Job消费kafka消息,每半个小时将消费到的消息进行一系列聚合操作(flink 窗口聚合),然后写入一个orc文件。

据了解,flink写orc的桶分配策略[1],有两种:

一种是基于时间,即按时间为目录创建orc文件。[test/realtime/ : 为根目录]

test/realtime/ └── 2021-03-23--07 ├── part-0-0.orc ├── part-0-1.orc └── 2021-03-23--08 ├── part-0-0.orc ├── part-0-1.orc

一种是将所有部分文件放在一个目录下:

test/realtime/ ├── part-0-0.orc ├── part-0-1.orc ├── part-0-2.orc ├── part-0-3.orc

【问题】

最终需求是想按照partition将每半个小时的orc文件load到hive,hive表dt为分区字段,值为时间戳,如:

hive> show partitions table_demo;

OK dt=1616455800000 dt=1616457600000 dt=1616459400000 dt=1616461200001 dt=1616463000001

Time taken: 0.134 seconds, Fetched: 5 row(s)

因此希望每个orc文件的所在目录名都是dt=时间戳的格式:

http://apache-flink.147419.n8.nabble.com/file/t1162/dir.png

用flink实现这些功能后,发现这两种桶分配策略都不能实现上述需求。

不知如何实现?之前一直是自己写代码实现聚合、写orc的操作,目录文件名一切东西完全可控,现在用flink自带的功能实现,发现不太容易实现上述需求了

来自志愿者整理的flink邮件归档

展开
收起
moonlightdisco 2021-12-01 10:29:22 1294 0
1 条回答
写回答
取消 提交回答
  • 官网有这么一段:我们可以在格式构建器上调用 .withBucketAssigner(assigner) 来自定义 BucketAssigner 链接: https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/file_sink.html#%E6%A1%B6%E5%88%86%E9%85%8D

    2021-12-01 10:53:30
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载