查询已注册表
packagecom.blink.sb.sql; importcom.blink.sb.beans.ClickLogs; importorg.apache.flink.streaming.api.datastream.DataStream; importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment; importorg.apache.flink.table.api.Table; importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment; importorg.apache.flink.types.Row; /*** Flink SQL查询已注册的表*/publicclassFlinkSQLQueryRegedTable { publicstaticvoidmain(String[] args) throwsException { //1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); //2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env); env.setParallelism(1); //3、读取数据DataStream<ClickLogs>clickLogs=env.fromElements( "Mary,./home,2022-02-02 12:00:00", "Bob,./cart,2022-02-02 12:00:00", "Mary,./prod?id=1,2022-02-02 12:00:05", "Liz,./home,2022-02-02 12:01:00", "Bob,./prod?id=3,2022-02-02 12:01:30", "Mary,./prod?id=7,2022-02-02 12:01:45" ).map(event-> { String[] props=event.split(","); returnClickLogs .builder() .user(props[0]) .url(props[1]) .cTime(props[2]) .build(); }); //4、流转换为动态表Tabletable=tEnv.fromDataStream(clickLogs); //5、注册表tEnv.createTemporaryView("sourceTable",table); //6、SQL查询已注册表TableresultTable=tEnv.sqlQuery("select user,count(url) as cnt from sourceTable group by user"); //7、转换成流直接打印或者输出tEnv.toChangelogStream(resultTable).print(); //8、执行任务env.execute("FlinkSQLQueryUnregTable"); } }
查询未注册表
packagecom.blink.sb.sql; importcom.blink.sb.beans.ClickLogs; importorg.apache.flink.streaming.api.datastream.DataStream; importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment; importorg.apache.flink.table.api.Table; importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment; /*** Flink SQL查询未注册的表*/publicclassFlinkSQLQueryUnregTable { publicstaticvoidmain(String[] args) throwsException { //1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); //2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env); env.setParallelism(1); //3、读取数据DataStream<ClickLogs>clickLogs=env.fromElements( "Mary,./home,2022-02-02 12:00:00", "Bob,./cart,2022-02-02 12:00:00", "Mary,./prod?id=1,2022-02-02 12:00:05", "Liz,./home,2022-02-02 12:01:00", "Bob,./prod?id=3,2022-02-02 12:01:30", "Mary,./prod?id=7,2022-02-02 12:01:45" ).map(event-> { String[] props=event.split(","); returnClickLogs .builder() .user(props[0]) .url(props[1]) .cTime(props[2]) .build(); }); //4、流转换为动态表Tabletable=tEnv.fromDataStream(clickLogs); //5、SQL查询未注册表(注意table两边得有空格)TableresultTable=tEnv.sqlQuery("select user,count(url) as cnt from "+table+" group by user"); //6、执行并打印resultTable.execute().print(); } }
Flink SQL标准代码结构
Flink SQL的代码结构跟Flink Table API⼀模⼀样,核⼼就是Table。
这⾥以⽂件系统作为source和sink table举例如下
packagecom.blink.sb.sql; importorg.apache.flink.table.api.*; /*** Flink SQL标准结构*/publicclassFlinkSQLStandardStructure { publicstaticvoidmain(String[] args) { //1、创建TableEnvironmentEnvironmentSettingssettings=EnvironmentSettings .newInstance() .build(); TableEnvironmenttEnv=TableEnvironment.create(settings); //2、创建source table(DDL语句)-会自动注册表的tEnv.executeSql("CREATE TABLE emp_info ("+" emp_id INT,"+" name VARCHAR,"+" dept_id INT"+") WITH ("+" 'connector' = 'filesystem',"+" 'path' = 'data/emp/input/',"+" 'format' = 'csv'"+")");//最后不要有分号,注意空格//3、创建sink table(DDL)//executeSql执行tEnv.executeSql("CREATE TABLE emp_info_copy ("+" emp_id INT,"+" name VARCHAR,"+" dept_id INT"+") WITH ("+" 'connector' = 'filesystem',"+" 'path' = 'data/emp/output/',"+" 'format' = 'csv'"+")"); //4、执行SQL查询并输出结果TableresultTable=tEnv.sqlQuery("select * from emp_info where dept_id=10"); tEnv.createTemporaryView("result",resultTable); tEnv.executeSql("INSERT INTO emp_info_copy "+"SELECT"+" emp_id,"+" name,"+" dept_id "+"FROM result"); } }
Flink SQL 语法支持
Flink SQL截⽌到1.14⽀持的SQL语法详⻅如下链接:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/overview/
Flink SQL输入和输出
读写kafka
需求:从kafka消费点击⽇志(JSON),转化为CSV格式之后输出到kafka
packagecom.blink.sb.sql; importorg.apache.flink.streaming.api.datastream.DataStream; importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment; importorg.apache.flink.table.api.EnvironmentSettings; importorg.apache.flink.table.api.Table; importorg.apache.flink.table.api.TableEnvironment; importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment; importorg.apache.flink.types.Row; importjava.awt.event.TextEvent; /*** 以Flink SQL方式从kafka消费点击日志(JSON),转化为CSV格式之后输出到Kafka*/publicclassFlinkSQLKafka2Kafka { publicstaticfinalStringinput_topic="clicklog_input"; publicstaticfinalStringoutput_topic="clicklog_output"; publicstaticvoidmain(String[] args) throwsException { //1、创建TableEnvironmentEnvironmentSettingssettings=EnvironmentSettings .newInstance() .build(); StreamTableEnvironmenttEnv=StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment(), settings); //2、创建source table(DDL语句)-会自动注册表的tEnv.executeSql("CREATE TABLE sourceTable ("+" `user` STRING,"+" `url` STRING,"+" `cTime` STRING"+") WITH ("+" 'connector' = 'kafka',"+" 'topic' = '"+input_topic+"',"+" 'properties.bootstrap.servers' = 'node02:6667',"+" 'properties.group.id' = 'test1',"+" 'scan.startup.mode' = 'latest-offset',"+" 'format' = 'json'"+")"); //3、创建sink table(DDL)tEnv.executeSql("CREATE TABLE sinkTable ("+" `user` STRING,"+" `url` STRING,"+" `cTime` STRING"+") WITH ("+" 'connector' = 'kafka',"+" 'topic' = '"+output_topic+"',"+" 'properties.bootstrap.servers' = 'node02:6667',"+" 'format' = 'csv'"+")"); //4、执行SQL查询并输出结果tEnv.executeSql("INSERT INTO sinkTable "+"SELECT"+" user,"+" url,"+" cTime "+"FROM sourceTable"); } }
读kafka写mysql
引入jdbc connector需要的依赖库:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}</version></dependency>
添加对应的数据库驱动:
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency>
提前在mysql中建表
CREATETABLEtest.sinkTable ( `user`varchar(100) , `cnt`BIGINT) ENGINE=InnoDBDEFAULTCHARSET=utf8COLLATE=utf8_general_ci;
相关代码:
packagecom.blink.sb.sql; importorg.apache.flink.table.api.EnvironmentSettings; importorg.apache.flink.table.api.TableEnvironment; importjava.math.BigInteger; /*** 以Flink SQL方式从kafka消费点击日志(JSON),统计之后输出到MySQL*/publicclassFlinkSQLKafka2MySQL { publicstaticfinalStringinput_topic="clicklog_input"; publicstaticvoidmain(String[] args) throwsException { //1、创建TableEnvironmentEnvironmentSettingssettings=EnvironmentSettings .newInstance() .build(); TableEnvironmenttEnv=TableEnvironment.create(settings); //2、创建source table(DDL语句)-会自动注册表的tEnv.executeSql("CREATE TABLE sourceTable ("+" `user` STRING,"+" `url` STRING,"+" `cTime` STRING"+") WITH ("+" 'connector' = 'kafka',"+" 'topic' = '"+input_topic+"',"+" 'properties.bootstrap.servers' = 'node02:6667',"+" 'properties.group.id' = 'test1',"+" 'scan.startup.mode' = 'latest-offset',"+" 'format' = 'json'"+")"); //3、创建sink table(DDL)// tEnv.executeSql("CREATE TABLE sinkTable (" +// " `user` STRING," +// " `url` STRING," +// " `cTime` STRING" +// ") WITH (" +// " 'connector' = 'jdbc'," +// " 'url' = 'jdbc:mysql://node01:3306/test'," +// " 'username' = 'root'," +// " 'password' = 'root%123'," +// " 'table-name' = 'sinkTable'" +// ")");tEnv.executeSql("CREATE TABLE sinkTable ("+" `user` STRING,"+" `cnt` BIGINT,"+" PRIMARY KEY (`user`) NOT ENFORCED"+") WITH ("+" 'connector' = 'jdbc',"+" 'url' = 'jdbc:mysql://node01:3306/test',"+" 'username' = 'root',"+" 'password' = 'root%123',"+" 'table-name' = 'sinkTable'"+")"); //4、执行SQL查询并输出结果// tEnv.executeSql("INSERT INTO sinkTable " +// "SELECT" +// " user," +// " url," +// " cTime " +// "FROM sourceTable");tEnv.executeSql("INSERT INTO sinkTable "+"SELECT"+" user,"+" count(url) as cnt "+"FROM sourceTable "+"group by user"); } }