sql client 启动模式
Flink有五个client实现,都⽤来编译Flink app为dataflow graph并提交给JobManager(针对不同部署模式, JobManager也有不同实现),SQL Client就是其中⼀个client实现,⽤于以SQL的⽅式提交代码。SQL client的参考 ⽂档如下:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sqlclient/
SQL Client⽬前只有embedded模式(嵌⼊式独⽴进程)
启动Flink SQL CLi
$FLINK_HOME/bin/sql-client.shembedded
embedded不是必须的,默认就是embedded模式
执⾏样例SQL
SET'sql-client.execution.result-mode'='tableau'; SET'execution.runtime-mode'='batch'; SELECTname, COUNT(*) AScntFROM (VALUES ('Bob')
命令行交互中的重要命令和配置
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sqlclient/#configuration
#设置查询结果的展示模式,可选项:TABLE、tableau、CHANGELOG,⼤⼩写都可以,CHANGELOG只能在流处理模式下SET'sql-client.execution.result-mode'='tableau'; #转换批流模式SET'execution.runtime-mode'='batch'; #TABLE模式下,最多缓存的⾏数(TABLE是分⻚显示的,⼀⻚展示不下的会缓存到内存中) SET'sql-client.execution.max-table-result.rows'='10000'; #同步执⾏dml语句,默认是异步的,如果批处理的多个dml语句需要按照顺序依次执⾏,则需要打开同步SET'table.dml-sync'='true'; #从指定的savepoint开始执⾏SQLJob(影响后续所有的SQL,除⾮被重置)SET'execution.savepoint.path'='/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab'; #重置前⾯的execution.savepoint.path,接下来的SQL不使⽤保存点RESETexecution.savepoint.path; #给接下来要执⾏的所有DQL和DMLjob指定jobname(注意是flinkjob的名字,不是yarn集群⾥app的名字) SET'pipeline.name'='kafka-to-hive'; #重置pipeline.name配置,接下来的DQL和DMLjob的名字不受该配置影响RESETpipeline.name; #给接下来要执⾏的所有DQL和DMLjob指定提交到yarn后app的名字,在FlinkonYARN模式下才有⽤(Flinkjob本身的名字由pipeline.name指定)SET'yarn.application.name'='test_job'; #重置yarn.application.name配置RESETyarn.application.name;
SET与 STATEMENT SET 区别
命令 |
描述 |
样例 |
SET / RESET |
SET⽤于设置某个session配置,⽤于重置某个session配置。 |
|
STATEMENT SET |
sql client默认把每个INSERT INTO作为单独的job来运⾏,这样不利⽤重⽤pipeline资源,所以可以使⽤ STATEMENT SET语句块把多个insert语句包起来,所有语句都经过整体优化并作为单个 Flink 作业执⾏。 联合优化 和执⾏允许重⽤公共中间结果,因此可以显着提⾼执⾏多个查询的效率 |
BEGIN STATEMENT SET; -- ⼀个或多个 INSERT INTO 语句 { INSERT INTO|OVERWRITE ; }+ END; |
案例1:
CREATETABLEpageviews ( user_idBIGINT, page_idBIGINT, viewtimeTIMESTAMP, proctimeASPROCTIME() ) WITH ( 'connector'='kafka', 'topic'='pageviews', 'properties.bootstrap.servers'='...', 'format'='avro'); CREATETABLEpageview ( page_idBIGINT, cntBIGINT) WITH ( 'connector'='jdbc', 'url'='jdbc:mysql://localhost:3306/mydatabase', 'table-name'='pageview'); CREATETABLEuniqueview ( page_idBIGINT, cntBIGINT) WITH ( 'connector'='jdbc', 'url'='jdbc:mysql://localhost:3306/mydatabase', 'table-name'='uniqueview'); BEGINSTATEMENTSET; INSERTINTOpageviewsSELECTpage_id, count(1) FROMpageviewsGROUPBYpage_id; INSERTINTOuniqueviewSELECTpage_id, count(distinctuser_id) FROMpageviewsGROUPBYpage_id; END;
案例2:
在SQL Client中执⾏如下SQL脚本:
CREATETABLEsourceTable ( --user是关键字所以要``包含起来`user`STRING, urlSTRING, cTimeSTRING) WITH ( 'connector'='kafka', 'topic'='clicklog_input', 'properties.bootstrap.servers'='node02:6667', 'properties.group.id'='test1', 'scan.startup.mode'='latest-offset', 'format'='json'); CREATETABLEsinkTable ( `user`STRING, cntBIGINT, PRIMARYKEY (`user`) NOTENFORCED) WITH ( 'connector'='jdbc', 'url'='jdbc:mysql://node01:3306/test', 'username'='root', 'password'='root%123', 'table-name'='sinkTable'); INSERTINTOsinkTableSELECTuser,count(url) ascntFROMsourceTablegroupbyuser;
注意:
在执⾏之前,我们要把相关依赖jar包放到$FLINK_HOME/lib⽬录下去(我们前⾯在代码⾥是本地运⾏的,所以添加 maven依赖即可,这⾥可是本地运⾏,所以要⾃⼰下载依赖包):
#kafkaconnector依赖包https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connectorkafka_2.11/1.14.3/flink-sql-connector-kafka_2.11-1.14.3.jar#jdbcconnector相关依赖包https://repo.maven.apache.org/maven2/org/apache/flink/flink-connectorjdbc_2.11/1.14.3/flink-connector-jdbc_2.11-1.14.3.jarhttps://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java8.0.28.jar
放到⼀个⽂件,以脚本⽅式执⾏,也可以在交互式模式下执⾏:
$FLINK_HOME/bin/sql-client.shembedded-fmy.sql
SQL Client 集成Hive
依赖包下载
将如下依赖包放到$FLINK_HOME/lib⽬录中:
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive3.1.2_2.11/1.14.3/flink-sql-connector-hive-3.1.2_2.11-1.14.3.jar
初始化sql client 脚本
准备初始化脚本init.sql
CREATECATALOGmyhiveWITH ( 'type'='hive', 'default-database'='default', 'hive-conf-dir'='/etc/hive/conf'); USECATALOGmyhive; SET'sql-client.execution.result-mode'='tableau'; --使⽤Hive⽅⾔(默认是default) set'table.sql-dialect'='hive'; --加载hiveMODULE(Hive的udf是以单独的模块提供的,加载之后才能使⽤Hive的⾃定义函数) LOADMODULEhiveWITH ('hive-version'='3.1.0');
注意:Flink1.13开始移除了sql-client-defaults.yml配置⽂件,所以在该配置⽂件配置catalog的⽅法就不存在了
初始化脚本中我们创建了hive catalog,相关配置项如下:
选项 |
是否必须 |
默认值 |
类型 |
描述 |
type |
yes |
none |
string |
hive就是创建HiveCatalog |
hive-conf-dir |
no |
none |
string |
hive配置⽂件hive-site.xml所在的⽬录,Hadoop⽀持的⽂件系 统都可以。如果没指定就会在classpath下找 |
default-database |
no |
default |
string |
默认数据库 |
hive-version |
no |
none |
string |
HiveCatalog会⾃动检测版本的,不建议设置 |
hadoop-conf-dir |
no |
none |
string |
Hadoop配置⽂件的⽬录,只⽀持本地⽂件系统,推荐使⽤环境 变量HADOOP_CONF_DIR来指定,不要通过选项指定 |
加载hivemetastore权限问题
启动SQL Client时会⾃动读取/usr/hdp/current/hive-client/conf/hivemetastore-site.xml,没有权限会报错的:
sudochmod-Rg+r/usr/hdp/current/hive-client/conf/hivemetastore-site.xml
启动sql client
$FLINK_HOME/bin/sql-client.sh-iinit.sql
执行查询
select*frommyrs.user_op_log;