Flink SQL与Hive的集成

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

(1)Flink SQL与Hive集成的架构图


1.png


(2)Flink 与 Hive 的集成包含两个层面


一是利用了 Hive 的 MetaStore 作为持久化的 Catalog

用户可通过HiveCatalog将不同会话中的 Flink 元数据存储到 Hive Metastore 中。 例如,用户可以使用HiveCatalog将其 Kafka 表或 Elasticsearch 表存储在 Hive Metastore 中,并后续在 SQL 查询中重新使用它们。


二是利用 Flink 来读写 Hive 的表。

HiveCatalog的设计提供了与 Hive 良好的兼容性,用户可以”开箱即用”的访问其已有的 Hive 数仓。 您不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。


(3)Flink SQL与Hive的集成配置


第一步:配置HADOOP_CLASSPATH,需要在/etc/profile文件中配置如下的环境变量

export HADOOP_CLASSPATH=`hadoop classpath`

第二步:将hive的jar包复制到flink的lib目录下


flink-connector-hive_2.11-1.12.1.jar

hive-exec-2.3.4.jar

flink-sql-connector-hive-2.3.6_2.11-1.12.1.jar

flink-connector-hive_2.11-1.12.1.jar这个包在maven仓库下载:


https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive_2.11/1.12.1

flink-sql-connector-hive-2.3.6_2.11-1.12.1.jar这个包在maven仓库下载:


https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-hive-2.3.6_2.11/1.12.1

第三步:添加Maven 依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hive_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-exec</artifactId>
  <version>${hive.version}</version>
</dependency>


(4)测试Flink SQL与Hive集成代码


package com.aikfk.flink.sql;
public class CommonSQL {
    public static final String hiveCatalog_name = "flink_udata";
    public static final String hiveDatabase_name = "flink";
    public static final String hiveConfDir = "/Users/caizhengjie/Desktop/hive-conf";
    public static final String version = "2.3.4";
    public static final String user_product_hive_create = "CREATE  TABLE  user_product_hive (\n" +
            "  user_id STRING,\n" +
            "  product_id STRING,\n" +
            "  click_count INT" +
            ") partitioned by (dt string,hr string) " +
            "stored as PARQUET " +
            "TBLPROPERTIES (\n" +
            "  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',\n" +
            "  'sink.partition-commit.delay'='5 s',\n" +
            "  'sink.partition-commit.trigger'='partition-time',\n" +
            "  'sink.partition-commit.policy.kind'='metastore,success-file'" +
            ")";
    public static final String user_product_kafka_create =
            "CREATE TABLE user_product_kafka (\n" +
            " user_id STRING," +
            " product_id STRING," +
            " click_count INT ," +
            " ts bigint ," +
            " r_t AS TO_TIMESTAMP(FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss'),\n" +
            " WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND " +
            ") WITH (" +
            " 'connector' = 'kafka'," +
            " 'topic' = 'kfk'," +
            " 'properties.bootstrap.servers' = 'bigdata-pro-m07:9092'," +
            " 'properties.group.id' = 'test1'," +
            " 'format' = 'json'," +
            " 'scan.startup.mode' = 'latest-offset'" +
            ")";
    public static final String user_product_kafka_drop ="DROP TABLE IF EXISTS user_product_kafka";
    public static final String user_product_hive_drop ="DROP TABLE IF EXISTS user_product_hive";
    public static final String user_product_kafka_insert_hive =
            "insert into user_product_hive SELECT user_id, product_id, click_count, " +
            " DATE_FORMAT(r_t, 'yyyy-MM-dd'), DATE_FORMAT(r_t, 'HH') FROM user_product_kafka";
}
package com.aikfk.flink.sql.hive;
import com.aikfk.flink.sql.CommonSQL;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
public class FlinkKafkaHive {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings);
        env.enableCheckpointing(5000);
        HiveCatalog hiveCatalog =
                new HiveCatalog(
                        CommonSQL.hiveCatalog_name,
                        CommonSQL.hiveDatabase_name,
                        CommonSQL.hiveConfDir,
                        CommonSQL.version
                        );
        tableEnvironment.registerCatalog(CommonSQL.hiveCatalog_name,hiveCatalog);
        tableEnvironment.useCatalog(CommonSQL.hiveCatalog_name);
        tableEnvironment.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        tableEnvironment.executeSql(CommonSQL.user_product_kafka_drop);
        tableEnvironment.executeSql(CommonSQL.user_product_kafka_create);
        tableEnvironment.getConfig().setSqlDialect(SqlDialect.HIVE);
        tableEnvironment.executeSql(CommonSQL.user_product_hive_drop);
        tableEnvironment.executeSql(CommonSQL.user_product_hive_create);
        tableEnvironment.executeSql(CommonSQL.user_product_kafka_insert_hive).print();
        env.execute();
    }
}


(5)测试kafka数据源与hive写入数据


生产者生产数据:

package com.aikfk.flink.base;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Random;
public class KafkaProducerUtil extends Thread {
        private String topic = "kfk";
        public KafkaProducerUtil() {
            super();
        }
        private Producer<String, String> createProducer() {
            // 通过Properties类设置Producer的属性
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "bigdata-pro-m07:9092");
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            return new KafkaProducer<String, String>(properties);
        }
        @Override
        public void run() {
            Producer<String, String> producer = createProducer();
            Random random = new Random();
            Random random2 = new Random();
            while (true) {
                String user_id = "user_"+random.nextInt(10);
                String product_id = "product_"+random2.nextInt(100);
                System.out.println(user_id + " :" + product_id);
                String time = System.currentTimeMillis() / 1000 + 5 + "";
                try {
//
                    String kaifa_log = "{" +
                            "\"user_id\":\"" + user_id+"\"," +
                            "\"product_id\":\"" + product_id+"\"," +
                            "\"click_count\":\"1" + "\"," +
                            "\"ts\":" + time + "}";
                    System.out.println("kaifa_log = " + kaifa_log);
                    producer.send(new ProducerRecord<String, String>(this.topic, kaifa_log));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("=========循环一次==========");
                try {
                    sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        public static void main(String[] args) {
            new KafkaProducerUtil().run();
        }
}

生产结果:

user_7 :product_74
kaifa_log = {"user_id":"user_7","product_id":"product_74","click_count":"1","ts":1618228651}
=========循环一次==========
user_5 :product_62
kaifa_log = {"user_id":"user_5","product_id":"product_62","click_count":"1","ts":1618228653}
=========循环一次==========
user_9 :product_50
kaifa_log = {"user_id":"user_9","product_id":"product_50","click_count":"1","ts":1618228654}
=========循环一次==========

通过flink SQL client查看数据

Flink SQL> select * from user_product_kafka;


3.png

查看hive写入数据情况:问题暂未解决!可能是jar包问题

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
24天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
69 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
173 15
|
25天前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
53 1
|
2月前
|
算法 API Apache
Flink CDC:新一代实时数据集成框架
本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
615 1
Flink CDC:新一代实时数据集成框架
|
1月前
|
SQL 分布式计算 Hadoop
Hadoop-12-Hive 基本介绍 下载安装配置 MariaDB安装 3台云服务Hadoop集群 架构图 对比SQL HQL
Hadoop-12-Hive 基本介绍 下载安装配置 MariaDB安装 3台云服务Hadoop集群 架构图 对比SQL HQL
69 3
|
1月前
|
SQL 机器学习/深度学习 数据库
SQL与Python集成:数据库操作无缝衔接
在开始之前,确保你已经安装了必要的Python库,如`sqlite3`(用于SQLite数据库)或`psycopg2`(用于PostgreSQL数据库)。这些库提供了Python与SQL数据库之间的接口。
|
1月前
|
SQL 数据库连接 数据库
管理系统中的Visual Studio与SQL集成技巧与方法
在现代软件开发和管理系统中,Visual Studio(VS)作为强大的集成开发环境(IDE),与SQL数据库的紧密集成是构建高效、可靠应用程序的关键
|
1月前
|
SQL 机器学习/深度学习 数据采集
SQL与Python集成:数据库操作无缝衔接2a.bijius.com
Python与SQL的集成是现代数据科学和工程实践的核心。通过有效的数据查询、管理与自动化,可以显著提升数据分析和决策过程的效率与准确性。随着技术的不断发展,这种集成的应用场景将更加广泛,为数据驱动的创新提供更强大的支持。
|
1月前
|
SQL 机器学习/深度学习 数据库
SQL与Python集成:数据库操作无缝衔接
1. Python与SQL集成的关键步骤 在开始之前,确保你已经安装了必要的Python库,如`sqlite3`(用于SQLite数据库)或`psycopg2`(用于PostgreSQL数据库)。这些库提供了Python与SQL数据库之间的接口。
|
1月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
46 0
下一篇
无影云桌面