开发者社区> 问答> 正文

flink 1.11 使用sql写入hdfs无法自动提交分区

我在用flink 1.11 的sql从kafka消费然后写入hdfs的过程中,发现没法自动提交分区,请问这个是什么原因呢?谢谢

我的checkpoint设置了间隔10s,对于如下的配置,正常应该是每隔10在hdfs相应的分区下会有_SUCCESS文件,但是实际上过了好久也没有,ORC格式的结果数据是正常写入了。

public static void main(String[] args) throws Exception{ StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); bsEnv.enableCheckpointing(10000); bsEnv.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);

String sqlSource = "CREATE TABLE source_kafka (\n" + " appName STRING,\n" + " appVersion STRING,\n" + " uploadTime STRING\n" + ") WITH (\n" + " 'connector.type' = 'kafka', \n" + " 'connector.version' = '0.10',\n" + " 'connector.topic' = 'mytest',\n" + " 'connector.properties.zookeeper.connect' = 'localhost:2181',\n" + " 'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + " 'connector.properties.group.id' = 'testGroup',\n" + " 'format.type'='json',\n" + " 'update-mode' = 'append' )";

tEnv.executeSql(sqlSource);

String sql = "CREATE TABLE fs_table (\n" + " appName STRING,\n" + " appVersion STRING,\n" + " uploadTime STRING,\n" + " dt STRING," + " h string" + ") PARTITIONED BY (dt,h) WITH (\n" + " 'connector'='filesystem',\n" + " 'path'='hdfs://localhost/tmp/',\n" + " 'sink.partition-commit.policy.kind' = 'success-file', " + " 'format'='orc'\n" + ")"; tEnv.executeSql(sql);

String insertSql = "insert into fs_table SELECT appName ,appVersion,uploadTime, " + " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd'), DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka";

tEnv.executeSql(insertSql);

}

*来自志愿者整理的flink邮件归档

展开
收起
小阿矿 2021-12-06 16:53:02 926 0
1 条回答
写回答
取消 提交回答
  • 默认情况下,对ORC来说,理论上一旦有正式数据文件的生成,就会有对应SUCCESS文件产生,你是怎么确认没有SUCCESS文件的呢?

    我用同样SQL在我的环境是有的。

    *来自志愿者整理的flink邮件归档

    2021-12-06 17:07:20
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
海量数据分布式存储——Apache HDFS之最新进展 立即下载