(1)Flink SQL与Hive集成的架构图
(2)Flink 与 Hive 的集成包含两个层面
一是利用了 Hive 的 MetaStore 作为持久化的 Catalog
用户可通过HiveCatalog将不同会话中的 Flink 元数据存储到 Hive Metastore 中。 例如,用户可以使用HiveCatalog将其 Kafka 表或 Elasticsearch 表存储在 Hive Metastore 中,并后续在 SQL 查询中重新使用它们。
二是利用 Flink 来读写 Hive 的表。
HiveCatalog的设计提供了与 Hive 良好的兼容性,用户可以”开箱即用”的访问其已有的 Hive 数仓。 您不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。
(3)Flink SQL与Hive的集成配置
第一步:配置HADOOP_CLASSPATH,需要在/etc/profile文件中配置如下的环境变量
export HADOOP_CLASSPATH=`hadoop classpath`
第二步:将hive的jar包复制到flink的lib目录下
flink-connector-hive_2.11-1.12.1.jar
hive-exec-2.3.4.jar
flink-sql-connector-hive-2.3.6_2.11-1.12.1.jar
flink-connector-hive_2.11-1.12.1.jar这个包在maven仓库下载:
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive_2.11/1.12.1
flink-sql-connector-hive-2.3.6_2.11-1.12.1.jar这个包在maven仓库下载:
https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-hive-2.3.6_2.11/1.12.1
第三步:添加Maven 依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> </dependency>
(4)测试Flink SQL与Hive集成代码
package com.aikfk.flink.sql; public class CommonSQL { public static final String hiveCatalog_name = "flink_udata"; public static final String hiveDatabase_name = "flink"; public static final String hiveConfDir = "/Users/caizhengjie/Desktop/hive-conf"; public static final String version = "2.3.4"; public static final String user_product_hive_create = "CREATE TABLE user_product_hive (\n" + " user_id STRING,\n" + " product_id STRING,\n" + " click_count INT" + ") partitioned by (dt string,hr string) " + "stored as PARQUET " + "TBLPROPERTIES (\n" + " 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',\n" + " 'sink.partition-commit.delay'='5 s',\n" + " 'sink.partition-commit.trigger'='partition-time',\n" + " 'sink.partition-commit.policy.kind'='metastore,success-file'" + ")"; public static final String user_product_kafka_create = "CREATE TABLE user_product_kafka (\n" + " user_id STRING," + " product_id STRING," + " click_count INT ," + " ts bigint ," + " r_t AS TO_TIMESTAMP(FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss'),\n" + " WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND " + ") WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'kfk'," + " 'properties.bootstrap.servers' = 'bigdata-pro-m07:9092'," + " 'properties.group.id' = 'test1'," + " 'format' = 'json'," + " 'scan.startup.mode' = 'latest-offset'" + ")"; public static final String user_product_kafka_drop ="DROP TABLE IF EXISTS user_product_kafka"; public static final String user_product_hive_drop ="DROP TABLE IF EXISTS user_product_hive"; public static final String user_product_kafka_insert_hive = "insert into user_product_hive SELECT user_id, product_id, click_count, " + " DATE_FORMAT(r_t, 'yyyy-MM-dd'), DATE_FORMAT(r_t, 'HH') FROM user_product_kafka"; }
package com.aikfk.flink.sql.hive; import com.aikfk.flink.sql.CommonSQL; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog; public class FlinkKafkaHive { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings); env.enableCheckpointing(5000); HiveCatalog hiveCatalog = new HiveCatalog( CommonSQL.hiveCatalog_name, CommonSQL.hiveDatabase_name, CommonSQL.hiveConfDir, CommonSQL.version ); tableEnvironment.registerCatalog(CommonSQL.hiveCatalog_name,hiveCatalog); tableEnvironment.useCatalog(CommonSQL.hiveCatalog_name); tableEnvironment.getConfig().setSqlDialect(SqlDialect.DEFAULT); tableEnvironment.executeSql(CommonSQL.user_product_kafka_drop); tableEnvironment.executeSql(CommonSQL.user_product_kafka_create); tableEnvironment.getConfig().setSqlDialect(SqlDialect.HIVE); tableEnvironment.executeSql(CommonSQL.user_product_hive_drop); tableEnvironment.executeSql(CommonSQL.user_product_hive_create); tableEnvironment.executeSql(CommonSQL.user_product_kafka_insert_hive).print(); env.execute(); } }
(5)测试kafka数据源与hive写入数据
生产者生产数据:
package com.aikfk.flink.base; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; import java.util.Random; public class KafkaProducerUtil extends Thread { private String topic = "kfk"; public KafkaProducerUtil() { super(); } private Producer<String, String> createProducer() { // 通过Properties类设置Producer的属性 Properties properties = new Properties(); properties.put("bootstrap.servers", "bigdata-pro-m07:9092"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return new KafkaProducer<String, String>(properties); } @Override public void run() { Producer<String, String> producer = createProducer(); Random random = new Random(); Random random2 = new Random(); while (true) { String user_id = "user_"+random.nextInt(10); String product_id = "product_"+random2.nextInt(100); System.out.println(user_id + " :" + product_id); String time = System.currentTimeMillis() / 1000 + 5 + ""; try { // String kaifa_log = "{" + "\"user_id\":\"" + user_id+"\"," + "\"product_id\":\"" + product_id+"\"," + "\"click_count\":\"1" + "\"," + "\"ts\":" + time + "}"; System.out.println("kaifa_log = " + kaifa_log); producer.send(new ProducerRecord<String, String>(this.topic, kaifa_log)); } catch (Exception e) { e.printStackTrace(); } System.out.println("=========循环一次=========="); try { sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { new KafkaProducerUtil().run(); } }
生产结果:
user_7 :product_74 kaifa_log = {"user_id":"user_7","product_id":"product_74","click_count":"1","ts":1618228651} =========循环一次========== user_5 :product_62 kaifa_log = {"user_id":"user_5","product_id":"product_62","click_count":"1","ts":1618228653} =========循环一次========== user_9 :product_50 kaifa_log = {"user_id":"user_9","product_id":"product_50","click_count":"1","ts":1618228654} =========循环一次==========
通过flink SQL client查看数据
Flink SQL> select * from user_product_kafka;
查看hive写入数据情况:问题暂未解决!可能是jar包问题