Flink SQL与Hive的集成

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 笔记

(1)Flink SQL与Hive集成的架构图


1.png


(2)Flink 与 Hive 的集成包含两个层面


一是利用了 Hive 的 MetaStore 作为持久化的 Catalog

用户可通过HiveCatalog将不同会话中的 Flink 元数据存储到 Hive Metastore 中。 例如,用户可以使用HiveCatalog将其 Kafka 表或 Elasticsearch 表存储在 Hive Metastore 中,并后续在 SQL 查询中重新使用它们。


二是利用 Flink 来读写 Hive 的表。

HiveCatalog的设计提供了与 Hive 良好的兼容性,用户可以”开箱即用”的访问其已有的 Hive 数仓。 您不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。


(3)Flink SQL与Hive的集成配置


第一步:配置HADOOP_CLASSPATH,需要在/etc/profile文件中配置如下的环境变量

export HADOOP_CLASSPATH=`hadoop classpath`

第二步:将hive的jar包复制到flink的lib目录下


flink-connector-hive_2.11-1.12.1.jar

hive-exec-2.3.4.jar

flink-sql-connector-hive-2.3.6_2.11-1.12.1.jar

flink-connector-hive_2.11-1.12.1.jar这个包在maven仓库下载:


https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive_2.11/1.12.1

flink-sql-connector-hive-2.3.6_2.11-1.12.1.jar这个包在maven仓库下载:


https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-hive-2.3.6_2.11/1.12.1

第三步:添加Maven 依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hive_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-exec</artifactId>
  <version>${hive.version}</version>
</dependency>


(4)测试Flink SQL与Hive集成代码


package com.aikfk.flink.sql;
public class CommonSQL {
    public static final String hiveCatalog_name = "flink_udata";
    public static final String hiveDatabase_name = "flink";
    public static final String hiveConfDir = "/Users/caizhengjie/Desktop/hive-conf";
    public static final String version = "2.3.4";
    public static final String user_product_hive_create = "CREATE  TABLE  user_product_hive (\n" +
            "  user_id STRING,\n" +
            "  product_id STRING,\n" +
            "  click_count INT" +
            ") partitioned by (dt string,hr string) " +
            "stored as PARQUET " +
            "TBLPROPERTIES (\n" +
            "  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',\n" +
            "  'sink.partition-commit.delay'='5 s',\n" +
            "  'sink.partition-commit.trigger'='partition-time',\n" +
            "  'sink.partition-commit.policy.kind'='metastore,success-file'" +
            ")";
    public static final String user_product_kafka_create =
            "CREATE TABLE user_product_kafka (\n" +
            " user_id STRING," +
            " product_id STRING," +
            " click_count INT ," +
            " ts bigint ," +
            " r_t AS TO_TIMESTAMP(FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss'),\n" +
            " WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND " +
            ") WITH (" +
            " 'connector' = 'kafka'," +
            " 'topic' = 'kfk'," +
            " 'properties.bootstrap.servers' = 'bigdata-pro-m07:9092'," +
            " 'properties.group.id' = 'test1'," +
            " 'format' = 'json'," +
            " 'scan.startup.mode' = 'latest-offset'" +
            ")";
    public static final String user_product_kafka_drop ="DROP TABLE IF EXISTS user_product_kafka";
    public static final String user_product_hive_drop ="DROP TABLE IF EXISTS user_product_hive";
    public static final String user_product_kafka_insert_hive =
            "insert into user_product_hive SELECT user_id, product_id, click_count, " +
            " DATE_FORMAT(r_t, 'yyyy-MM-dd'), DATE_FORMAT(r_t, 'HH') FROM user_product_kafka";
}
package com.aikfk.flink.sql.hive;
import com.aikfk.flink.sql.CommonSQL;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
public class FlinkKafkaHive {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings);
        env.enableCheckpointing(5000);
        HiveCatalog hiveCatalog =
                new HiveCatalog(
                        CommonSQL.hiveCatalog_name,
                        CommonSQL.hiveDatabase_name,
                        CommonSQL.hiveConfDir,
                        CommonSQL.version
                        );
        tableEnvironment.registerCatalog(CommonSQL.hiveCatalog_name,hiveCatalog);
        tableEnvironment.useCatalog(CommonSQL.hiveCatalog_name);
        tableEnvironment.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        tableEnvironment.executeSql(CommonSQL.user_product_kafka_drop);
        tableEnvironment.executeSql(CommonSQL.user_product_kafka_create);
        tableEnvironment.getConfig().setSqlDialect(SqlDialect.HIVE);
        tableEnvironment.executeSql(CommonSQL.user_product_hive_drop);
        tableEnvironment.executeSql(CommonSQL.user_product_hive_create);
        tableEnvironment.executeSql(CommonSQL.user_product_kafka_insert_hive).print();
        env.execute();
    }
}


(5)测试kafka数据源与hive写入数据


生产者生产数据:

package com.aikfk.flink.base;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Random;
public class KafkaProducerUtil extends Thread {
        private String topic = "kfk";
        public KafkaProducerUtil() {
            super();
        }
        private Producer<String, String> createProducer() {
            // 通过Properties类设置Producer的属性
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "bigdata-pro-m07:9092");
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            return new KafkaProducer<String, String>(properties);
        }
        @Override
        public void run() {
            Producer<String, String> producer = createProducer();
            Random random = new Random();
            Random random2 = new Random();
            while (true) {
                String user_id = "user_"+random.nextInt(10);
                String product_id = "product_"+random2.nextInt(100);
                System.out.println(user_id + " :" + product_id);
                String time = System.currentTimeMillis() / 1000 + 5 + "";
                try {
//
                    String kaifa_log = "{" +
                            "\"user_id\":\"" + user_id+"\"," +
                            "\"product_id\":\"" + product_id+"\"," +
                            "\"click_count\":\"1" + "\"," +
                            "\"ts\":" + time + "}";
                    System.out.println("kaifa_log = " + kaifa_log);
                    producer.send(new ProducerRecord<String, String>(this.topic, kaifa_log));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("=========循环一次==========");
                try {
                    sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        public static void main(String[] args) {
            new KafkaProducerUtil().run();
        }
}

生产结果:

user_7 :product_74
kaifa_log = {"user_id":"user_7","product_id":"product_74","click_count":"1","ts":1618228651}
=========循环一次==========
user_5 :product_62
kaifa_log = {"user_id":"user_5","product_id":"product_62","click_count":"1","ts":1618228653}
=========循环一次==========
user_9 :product_50
kaifa_log = {"user_id":"user_9","product_id":"product_50","click_count":"1","ts":1618228654}
=========循环一次==========

通过flink SQL client查看数据

Flink SQL> select * from user_product_kafka;


3.png

查看hive写入数据情况:问题暂未解决!可能是jar包问题

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
371 15
|
6月前
|
SQL 分布式计算 资源调度
Dataphin功能Tips系列(48)-如何根据Hive SQL/Spark SQL的任务优先级指定YARN资源队列
如何根据Hive SQL/Spark SQL的任务优先级指定YARN资源队列
183 4
|
8月前
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
1580 27
|
9月前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
680 14
|
11月前
|
SQL 分布式计算 Hadoop
Hadoop-12-Hive 基本介绍 下载安装配置 MariaDB安装 3台云服务Hadoop集群 架构图 对比SQL HQL
Hadoop-12-Hive 基本介绍 下载安装配置 MariaDB安装 3台云服务Hadoop集群 架构图 对比SQL HQL
260 3
|
11月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
212 0
|
11月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
280 0
|
12月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
412 13
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。
267 9

热门文章

最新文章