开发者社区> 问答> 正文

为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包

1.写orc和写parquet的作业在同一个作业中,并没有报错,但是hive中查不到数据,在hdfs目录里面有但是并没有按照checkpoint间隔生成,也没有生成_success文件。 2.没有设置table.exec.hive.fallback-mapred-writer。 以下是我的几个疑问 1.为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要? 2.sql client 我想要设置checkpoint生成间隔我应该在哪里设置? 以下是hdfs目录图片

添加不了附件,我就直接贴代码了

import java.time.Duration

import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, TableResult} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.catalog.hive.HiveCatalog

/** * author dinghh * time 2020-08-11 17:03 */ object WriteHiveStreaming { def main(args: Array[String]): Unit = {

val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) streamEnv.setParallelism(3)

val tableEnvSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings) tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE) tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20))

val catalogName = "my_catalog" val catalog = new HiveCatalog( catalogName, // catalog name "default", // default database "D:\ideaspace\data-integrate-bigdata\flink-restart\flink-sql\src\main\resources", // Hive config (hive-site.xml) directory "1.1.0" // Hive version ) tableEnv.registerCatalog(catalogName, catalog) tableEnv.useCatalog(catalogName)

//删除流表 tableEnv.executeSql( """ |DROP TABLE IF EXISTS stream_db.datagen_user """.stripMargin)

//创建流表 tableEnv.executeSql( """ |CREATE TABLE stream_db.datagen_user ( | id INT, | name STRING, | dt AS localtimestamp, | WATERMARK FOR dt AS dt |) WITH ( | 'connector' = 'datagen', | 'rows-per-second'='10', | 'fields.id.kind'='random', | 'fields.id.min'='1', | 'fields.id.max'='1000', | 'fields.name.length'='5' |) """.stripMargin)

//切换hive方言 tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)

//删除hive orc表 tableEnv.executeSql( """ |DROP TABLE IF EXISTS default.hive_user_orc | """.stripMargin)

//创建hive orc表 tableEnv.executeSql( """ |CREATE TABLE default.hive_user_orc ( | id INT, | name STRING |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute STRING ) STORED AS ORC TBLPROPERTIES ( | 'partition.time-extractor.timestamp-pattern'='$ts_dt $ts_hour:$ts_minute:00.000', | 'sink.partition-commit.trigger'='partition-time', | 'sink.partition-commit.delay'='1 min', | 'sink.partition-commit.policy.kind'='metastore,success-file' |) """.stripMargin)

//删除hive parquet表 tableEnv.executeSql( """ |DROP TABLE IF EXISTS default.hive_user_parquet """.stripMargin) //创建hive parquet表 tableEnv.executeSql( """ |CREATE TABLE default.hive_user_parquet ( | id INT, | name STRING |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute STRING) STORED AS PARQUET TBLPROPERTIES ( | 'partition.time-extractor.timestamp-pattern'='$ts_dt $ts_hour:$ts_minute:00.000', | 'sink.partition-commit.trigger'='partition-time', | 'sink.partition-commit.delay'='1 min', | 'sink.partition-commit.policy.kind'='metastore,success-file' |) """.stripMargin) //设置flink方言 tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) //流式写入orc表 tableEnv.executeSql( """ |INSERT INTO default.hive_user_orc |SELECT | id,name, | DATE_FORMAT(dt,'yyyy-MM-dd'), | DATE_FORMAT(dt,'HH'), | DATE_FORMAT(dt,'mm') |FROM | stream_db.datagen_user """.stripMargin) //流式写入parquet表 tableEnv.executeSql( """ |INSERT INTO default.hive_user_parquet |SELECT | id,name, | DATE_FORMAT(dt,'yyyy-MM-dd'), | DATE_FORMAT(dt,'HH'), | DATE_FORMAT(dt,'mm') |FROM | stream_db.datagen_user """.stripMargin)

}

}

*来自志愿者整理的flink邮件归档

展开
收起
游客sadna6pkvqnz6 2021-12-07 17:16:12 968 0
1 条回答
写回答
取消 提交回答
  • 如果是IDE里执行的话,tableEnv.executeSql是马上返回的,然后就退出了,可以用类似这种写法等作业结束:

    val tableResult = tEnv.executeSql(insert) // wait to finish tableResult.getJobClient.get .getJobExecutionResult(Thread.currentThread.getContextClassLoader) .get*来自志愿者整理的flink

    2021-12-07 20:36:53
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Hive Bucketing in Apache Spark 立即下载
spark替代HIVE实现ETL作业 立即下载
2019大数据技术公开课第五季—Hive迁移到MaxCompute最佳实践 立即下载