Flink SQL 结合 HiveCatalog 使用

简介: Flink 支持 HiveCatalog 作为表元数据持久化的介质,在生产环境我们一般采用 HiveCatalog 来管理元数据, 这样的好处是不需要重复使用 DDL 创建表,只需要关心业务逻辑的 SQL,简化了开发的流程,可以节省很多时间,下面就来介绍一下怎么配置和使用 HiveCatalog.sql-client-defaults.yaml 配置

Flink 支持 HiveCatalog 作为表元数据持久化的介质,在生产环境我们一般采用 HiveCatalog 来管理元数据, 这样的好处是不需要重复使用 DDL 创建表,只需要关心业务逻辑的 SQL,简化了开发的流程,可以节省很多时间,下面就来介绍一下怎么配置和使用 HiveCatalog.


sql-client-defaults.yaml 配置


catalogs:
   - name: myhive
     type: hive
     hive-conf-dir: /home/jason/bigdata/hive/hive-2.3.4
     default-database: mydatabase


添加依赖


-rw-r--r--. 1 root root     42998 Jul 22  2020 flink-connector-filesystem_2.11-1.11.1.jar
-rw-r--r--. 1 root root    196416 Dec 11 17:51 flink-connector-jdbc_2.11-1.12.0.jar
-rw-r--r--. 1 root root     91553 Dec  2 17:46 flink-csv-1.12.0.jar
-rw-r--r--. 1 root root 114120165 Dec  2 17:50 flink-dist_2.11-1.12.0.jar
-rw-r--r--. 1 root root    136663 Dec  2 17:46 flink-json-1.12.0.jar
-rw-r--r--. 1 root root  43317025 Dec 11 12:44 flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
-rw-r--r--. 1 root root   7709741 Sep 30 01:49 flink-shaded-zookeeper-3.4.14.jar
-rw-r--r--. 1 root root   3309441 Dec 12 15:35 flink-sql-avro-1.12.0.jar
-rw-r--r--. 1 root root  40650306 Dec 19 12:42 flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar
-rw-r--r--. 1 root root   3650212 Dec 11 14:44 flink-sql-connector-kafka_2.11-1.12.0.jar
-rw-r--r--. 1 root root   2124047 Dec 12 15:35 flink-sql-orc_2.11-1.12.0.jar
-rw-r--r--. 1 root root   5666201 Dec 12 15:35 flink-sql-parquet_2.11-1.12.0.jar
-rw-r--r--. 1 root root  36147819 Dec  2 17:49 flink-table_2.11-1.12.0.jar
-rw-r--r--. 1 root root  40286358 Dec  2 17:49 flink-table-blink_2.11-1.12.0.jar
-rw-r--r--. 1 root root  34214106 Dec 19 19:18 hive-exec-2.3.4.jar
-rw-r--r--. 1 root root     67114 Feb 22  2020 log4j-1.2-api-2.12.1.jar
-rw-r--r--. 1 root root    276771 Feb 22  2020 log4j-api-2.12.1.jar
-rw-r--r--. 1 root root   1674433 Feb 22  2020 log4j-core-2.12.1.jar
-rw-r--r--. 1 root root     23518 Feb 22  2020 log4j-slf4j-impl-2.12.1.jar
-rw-r--r--. 1 root root   1007502 Dec 11 17:50 mysql-connector-java-5.1.47.jar


代码里面使用 HiveCatalog


package flink.ddl
import java.time.ZoneOffset._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings.newInstance
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment.create
import org.apache.flink.table.catalog.hive.HiveCatalog
import org.apache.flink.table.module.hive.HiveModule
import org.apache.hadoop.hive.ql.exec.UDF
import org.apache.hadoop.io.Text
/**
 * Flink SQL 使用 hive catalog
 */
object FlinkDDLHiveCatalog {
    private val catalog_name = "myhive"
    private val hive_version = "2.3.4"
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val settings = newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build()
        val tEnv = create(env, settings)
        tEnv.getConfig.setLocalTimeZone(ofHours(8))
        // 设置 early fired
        tEnv.getConfig().getConfiguration().setBoolean("table.exec.emit.early-fire.enabled", true)
        tEnv.getConfig().getConfiguration().setString("table.exec.emit.early-fire.delay", "5000 ms")
        // 设置 job name
        tEnv.getConfig.getConfiguration.setString("pipeline.name",this.getClass.getSimpleName.replace("$",""))
        val catalog = new HiveCatalog(
            catalog_name,                   // catalog name
            "mydatabase",                // default database
            "/home/jason/bigdata/hive/hive-2.3.4",  // Hive config (hive-site.xml) directory
            hive_version                   // Hive version
        )
        // 注册 catalog
        tEnv.registerCatalog("myhive", catalog)
        // 选择一个 catalog
        tEnv.useCatalog("myhive")
        // 选择 database
        tEnv.useDatabase("mydatabase")
        // 加载 hive 的内置函数
        tEnv.loadModule(catalog_name,new HiveModule(hive_version))
        // kafka_source_jason 和 print_table 提前已经创建好可以直接使用
        tEnv.executeSql(
            """
              |insert into print_table
              |select
              |lower(funcName),
              |MIN(`timestamp`) as min_timestamp,
              |FIRST_VALUE(`timestamp`) as first_timestamp,
              |MAX(`timestamp`) as max_timestamp
              |from kafka_source_jason
              |group by funcName
              |""".stripMargin)
    }
}


因为 kafka_source_jason 和 print_table 这两张表提前已经创建过了,已经保存在 HiveCatalog 里面,所以代码里面可以直接使用不用再次创建.


提交任务


在启动任务之前,需要先把 Hiv e的 metastore 起起来,因为 HiveCatalog 会和 metastore 连接这样才能访问元数据信息.


hive --service metastore &
flink run -d -m yarn-cluster \
-c flink.ddl.FlinkDDLHiveCatalog \
-yqu flink \
-nm FlinkDDLHiveCatalog \
-p 6 \
/home/jason/bigdata/jar/flink-1.11.1-1.0-SNAPSHOT.jar


打印结果



上面的代码还加载了 hive 的内置函数, Flink SQL 里面可以直接使用 hive 的内置函数, SQL 中的 lower 就是 hive 的函数可以直接拿来使用,这样就会非常的方便.


相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
8月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
1076 43
|
8月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
491 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
9月前
|
SQL 消息中间件 Kafka
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是 Apache Flink 提供的 SQL 引擎,支持流批一体处理,统一操作流数据与批数据,具备高性能、低延迟、丰富数据源支持及标准 SQL 兼容性,适用于实时与离线数据分析。
1180 1
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
533 15
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
2324 27
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
1210 14
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
344 0
|
8月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
736 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。