FlinkSQL Client 集成Hive

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: FlinkSQL Client 集成Hiveflink sql cli 启动模式重要命令和配置sql client 集成hive执行hive 查询

sql client 启动模式


Flink有五个client实现,都⽤来编译Flink app为dataflow graph并提交给JobManager(针对不同部署模式, JobManager也有不同实现),SQL Client就是其中⼀个client实现,⽤于以SQL的⽅式提交代码。SQL client的参考 ⽂档如下:


https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sqlclient/



SQL Client⽬前只有embedded模式(嵌⼊式独⽴进程)


启动Flink SQL CLi



$FLINK_HOME/bin/sql-client.shembedded

embedded不是必须的,默认就是embedded模式



执⾏样例SQL


SET'sql-client.execution.result-mode'='tableau';
SET'execution.runtime-mode'='batch';
SELECTname,
COUNT(*) AScntFROM (VALUES ('Bob')


命令行交互中的重要命令和配置

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sqlclient/#configuration



#设置查询结果的展示模式,可选项:TABLE、tableau、CHANGELOG,⼤⼩写都可以,CHANGELOG只能在流处理模式SET'sql-client.execution.result-mode'='tableau';
#转换批流模式SET'execution.runtime-mode'='batch';
#TABLE模式下,最多缓存的⾏数(TABLE是分⻚显示的,⼀⻚展示不下的会缓存到内存中)
SET'sql-client.execution.max-table-result.rows'='10000';
#同步执⾏dml语句,默认是异步的,如果批处理的多个dml语句需要按照顺序依次执⾏,则需要打开同步SET'table.dml-sync'='true';
#从指定的savepoint开始执⾏SQLJob(影响后续所有的SQL,除⾮被重置)SET'execution.savepoint.path'='/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab';
#重置前⾯的execution.savepoint.path,接下来的SQL不使⽤保存点RESETexecution.savepoint.path;
#给接下来要执⾏的所有DQL和DMLjob指定jobname(注意是flinkjob的名字,不是yarn集群⾥app的名字)
SET'pipeline.name'='kafka-to-hive';
#重置pipeline.name配置,接下来的DQL和DMLjob的名字不受该配置影响RESETpipeline.name;
#给接下来要执⾏的所有DQL和DMLjob指定提交到yarn后app的名字,在FlinkonYARN模式下才有⽤(Flinkjob本身的名字由pipeline.name指定)SET'yarn.application.name'='test_job';
#重置yarn.application.name配置RESETyarn.application.name;


SET与 STATEMENT SET 区别


命令

描述

样例

SET / RESET

SET⽤于设置某个session配置,⽤于重置某个session配置。


STATEMENT SET

sql client默认把每个INSERT INTO作为单独的job来运⾏,这样不利⽤重⽤pipeline资源,所以可以使⽤ STATEMENT SET语句块把多个insert语句包起来,所有语句都经过整体优化并作为单个 Flink 作业执⾏。 联合优化 和执⾏允许重⽤公共中间结果,因此可以显着提⾼执⾏多个查询的效率


BEGIN STATEMENT SET; -- ⼀个或多个


INSERT INTO 语句 { INSERT INTO|OVERWRITE ; }+


END;


案例1:


CREATETABLEpageviews (
user_idBIGINT,
page_idBIGINT,
viewtimeTIMESTAMP,
proctimeASPROCTIME()
) WITH (
'connector'='kafka',
'topic'='pageviews',
'properties.bootstrap.servers'='...',
'format'='avro');
CREATETABLEpageview (
page_idBIGINT,
cntBIGINT) WITH (
'connector'='jdbc',
'url'='jdbc:mysql://localhost:3306/mydatabase',
'table-name'='pageview');
CREATETABLEuniqueview (
page_idBIGINT,
cntBIGINT) WITH (
'connector'='jdbc',
'url'='jdbc:mysql://localhost:3306/mydatabase',
'table-name'='uniqueview');
BEGINSTATEMENTSET;
INSERTINTOpageviewsSELECTpage_id, count(1)
FROMpageviewsGROUPBYpage_id;
INSERTINTOuniqueviewSELECTpage_id, count(distinctuser_id)
FROMpageviewsGROUPBYpage_id;
END;


案例2:


在SQL Client中执⾏如下SQL脚本:

CREATETABLEsourceTable (
--user是关键字所以要``包含起来`user`STRING,
urlSTRING,
cTimeSTRING) WITH (
'connector'='kafka',
'topic'='clicklog_input',
'properties.bootstrap.servers'='node02:6667',
'properties.group.id'='test1',
'scan.startup.mode'='latest-offset',
'format'='json');
CREATETABLEsinkTable (
`user`STRING,
cntBIGINT,
PRIMARYKEY (`user`) NOTENFORCED) WITH (
'connector'='jdbc',
'url'='jdbc:mysql://node01:3306/test',
'username'='root',
'password'='root%123',
'table-name'='sinkTable');
INSERTINTOsinkTableSELECTuser,count(url) ascntFROMsourceTablegroupbyuser;

注意:

在执⾏之前,我们要把相关依赖jar包放到$FLINK_HOME/lib⽬录下去(我们前⾯在代码⾥是本地运⾏的,所以添加 maven依赖即可,这⾥可是本地运⾏,所以要⾃⼰下载依赖包):



#kafkaconnector依赖包https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connectorkafka_2.11/1.14.3/flink-sql-connector-kafka_2.11-1.14.3.jar#jdbcconnector相关依赖包https://repo.maven.apache.org/maven2/org/apache/flink/flink-connectorjdbc_2.11/1.14.3/flink-connector-jdbc_2.11-1.14.3.jarhttps://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java8.0.28.jar


放到⼀个⽂件,以脚本⽅式执⾏,也可以在交互式模式下执⾏:

$FLINK_HOME/bin/sql-client.shembedded-fmy.sql


SQL Client 集成Hive


依赖包下载

将如下依赖包放到$FLINK_HOME/lib⽬录中:


https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive3.1.2_2.11/1.14.3/flink-sql-connector-hive-3.1.2_2.11-1.14.3.jar



初始化sql client 脚本

准备初始化脚本init.sql

CREATECATALOGmyhiveWITH (
'type'='hive',
'default-database'='default',
'hive-conf-dir'='/etc/hive/conf');
USECATALOGmyhive;
SET'sql-client.execution.result-mode'='tableau';
--使⽤Hive⽅⾔(默认是default)
set'table.sql-dialect'='hive';
--加载hiveMODULE(Hive的udf是以单独的模块提供的,加载之后才能使⽤Hive的⾃定义函数)
LOADMODULEhiveWITH ('hive-version'='3.1.0');

注意:Flink1.13开始移除了sql-client-defaults.yml配置⽂件,所以在该配置⽂件配置catalog的⽅法就不存在了


初始化脚本中我们创建了hive catalog,相关配置项如下:

选项

是否必须

默认值

类型

描述

type

yes

none

string

hive就是创建HiveCatalog

hive-conf-dir

no

none

string

hive配置⽂件hive-site.xml所在的⽬录,Hadoop⽀持的⽂件系 统都可以。如果没指定就会在classpath下找

default-database

no

default

string

默认数据库

hive-version

no

none

string

HiveCatalog会⾃动检测版本的,不建议设置

hadoop-conf-dir

no

none

string

Hadoop配置⽂件的⽬录,只⽀持本地⽂件系统,推荐使⽤环境 变量HADOOP_CONF_DIR来指定,不要通过选项指定


加载hivemetastore权限问题


启动SQL Client时会⾃动读取/usr/hdp/current/hive-client/conf/hivemetastore-site.xml,没有权限会报错的:

sudochmod-Rg+r/usr/hdp/current/hive-client/conf/hivemetastore-site.xml


启动sql client



$FLINK_HOME/bin/sql-client.sh-iinit.sql



执行查询


select*frommyrs.user_op_log;






















































相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
6月前
|
SQL 分布式计算 DataWorks
DataWorks报错问题之集成hive数据源报错如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
5月前
|
SQL 缓存 关系型数据库
ClickHouse(19)ClickHouse集成Hive表引擎详细解析
Hive引擎允许对HDFS Hive表执行 `SELECT` 查询。目前它支持如下输入格式: -文本:只支持简单的标量列类型,除了 `Binary` - ORC:支持简单的标量列类型,除了`char`; 只支持 `array` 这样的复杂类型 - Parquet:支持所有简单标量列类型;只支持 `array` 这样的复杂类型
212 1
|
6月前
|
SQL 分布式计算 Java
Apache Hudi与Hive集成手册
Apache Hudi与Hive集成手册
355 0
|
6月前
|
SQL 数据可视化 数据挖掘
将Sqoop与Hive集成无缝的数据分析
将Sqoop与Hive集成无缝的数据分析
|
6月前
|
SQL 存储 分布式计算
Spark与Hive的集成与互操作
Spark与Hive的集成与互操作
|
SQL 存储 API
Flink教程(25)- Flink高级特性(FlinkSQL整合Hive)
Flink教程(25)- Flink高级特性(FlinkSQL整合Hive)
952 0
|
6月前
|
SQL 存储 Apache
流数据湖平台Apache Paimon(四)集成 Hive 引擎
流数据湖平台Apache Paimon(四)集成 Hive 引擎
468 0
淘东电商项目(34) -SSO单点登录(Client端集成)
淘东电商项目(34) -SSO单点登录(Client端集成)
53 0
|
SQL 存储 大数据
大数据FlinkSQL整合Hive
大数据FlinkSQL整合Hive
174 0
|
SQL 分布式计算 分布式数据库
Hive集成Hue安装部署
Hive集成Hue安装部署
265 0