1.写orc和写parquet的作业在同一个作业中,并没有报错,但是hive中查不到数据,在hdfs目录里面有但是并没有按照checkpoint间隔生成,也没有生成_success文件。 2.没有设置table.exec.hive.fallback-mapred-writer。 以下是我的几个疑问 1.为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要? 2.sql client 我想要设置checkpoint生成间隔我应该在哪里设置? 以下是hdfs目录图片
添加不了附件,我就直接贴代码了
import java.time.Duration
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, TableResult} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.catalog.hive.HiveCatalog
/** * author dinghh * time 2020-08-11 17:03 */ object WriteHiveStreaming { def main(args: Array[String]): Unit = {
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) streamEnv.setParallelism(3)
val tableEnvSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings) tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE) tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20))
val catalogName = "my_catalog" val catalog = new HiveCatalog( catalogName, // catalog name "default", // default database "D:\ideaspace\data-integrate-bigdata\flink-restart\flink-sql\src\main\resources", // Hive config (hive-site.xml) directory "1.1.0" // Hive version ) tableEnv.registerCatalog(catalogName, catalog) tableEnv.useCatalog(catalogName)
//删除流表 tableEnv.executeSql( """ |DROP TABLE IF EXISTS stream_db
.datagen_user
""".stripMargin)
//创建流表 tableEnv.executeSql( """ |CREATE TABLE stream_db
.datagen_user
( | id INT, | name STRING, | dt AS localtimestamp, | WATERMARK FOR dt AS dt |) WITH ( | 'connector' = 'datagen', | 'rows-per-second'='10', | 'fields.id.kind'='random', | 'fields.id.min'='1', | 'fields.id.max'='1000', | 'fields.name.length'='5' |) """.stripMargin)
//切换hive方言 tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
//删除hive orc表 tableEnv.executeSql( """ |DROP TABLE IF EXISTS default
.hive_user_orc
| """.stripMargin)
//创建hive orc表 tableEnv.executeSql( """ |CREATE TABLE default
.hive_user_orc
( | id INT, | name STRING |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute STRING ) STORED AS ORC TBLPROPERTIES ( | 'partition.time-extractor.timestamp-pattern'='$ts_dt $ts_hour:$ts_minute:00.000', | 'sink.partition-commit.trigger'='partition-time', | 'sink.partition-commit.delay'='1 min', | 'sink.partition-commit.policy.kind'='metastore,success-file' |) """.stripMargin)
//删除hive parquet表 tableEnv.executeSql( """ |DROP TABLE IF EXISTS default
.hive_user_parquet
""".stripMargin) //创建hive parquet表 tableEnv.executeSql( """ |CREATE TABLE default
.hive_user_parquet
( | id INT, | name STRING |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute STRING) STORED AS PARQUET TBLPROPERTIES ( | 'partition.time-extractor.timestamp-pattern'='$ts_dt $ts_hour:$ts_minute:00.000', | 'sink.partition-commit.trigger'='partition-time', | 'sink.partition-commit.delay'='1 min', | 'sink.partition-commit.policy.kind'='metastore,success-file' |) """.stripMargin) //设置flink方言 tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) //流式写入orc表 tableEnv.executeSql( """ |INSERT INTO default
.hive_user_orc
|SELECT | id,name, | DATE_FORMAT(dt,'yyyy-MM-dd'), | DATE_FORMAT(dt,'HH'), | DATE_FORMAT(dt,'mm') |FROM | stream_db.datagen_user """.stripMargin) //流式写入parquet表 tableEnv.executeSql( """ |INSERT INTO default
.hive_user_parquet
|SELECT | id,name, | DATE_FORMAT(dt,'yyyy-MM-dd'), | DATE_FORMAT(dt,'HH'), | DATE_FORMAT(dt,'mm') |FROM | stream_db.datagen_user """.stripMargin)
}
}
*来自志愿者整理的flink邮件归档
如果是IDE里执行的话,tableEnv.executeSql是马上返回的,然后就退出了,可以用类似这种写法等作业结束:
val tableResult = tEnv.executeSql(insert) // wait to finish tableResult.getJobClient.get .getJobExecutionResult(Thread.currentThread.getContextClassLoader) .get*来自志愿者整理的flink
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。