FlinkSQL编程

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: 查询已注册表查询未注册表Flink sql 标准代码结果Flink sql读写kafkaFlink sql去kafka写mysql

查询已注册表


packagecom.blink.sb.sql;
importcom.blink.sb.beans.ClickLogs;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importorg.apache.flink.types.Row;
/*** Flink SQL查询已注册的表*/publicclassFlinkSQLQueryRegedTable {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读取数据DataStream<ClickLogs>clickLogs=env.fromElements(
"Mary,./home,2022-02-02 12:00:00",
"Bob,./cart,2022-02-02 12:00:00",
"Mary,./prod?id=1,2022-02-02 12:00:05",
"Liz,./home,2022-02-02 12:01:00",
"Bob,./prod?id=3,2022-02-02 12:01:30",
"Mary,./prod?id=7,2022-02-02 12:01:45"        ).map(event-> {
String[] props=event.split(",");
returnClickLogs                    .builder()
                    .user(props[0])
                    .url(props[1])
                    .cTime(props[2])
                    .build();
        });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(clickLogs);
//5、注册表tEnv.createTemporaryView("sourceTable",table);
//6、SQL查询已注册表TableresultTable=tEnv.sqlQuery("select user,count(url) as cnt from sourceTable group by user");
//7、转换成流直接打印或者输出tEnv.toChangelogStream(resultTable).print();
//8、执行任务env.execute("FlinkSQLQueryUnregTable");
    }
}



查询未注册表


packagecom.blink.sb.sql;
importcom.blink.sb.beans.ClickLogs;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/*** Flink SQL查询未注册的表*/publicclassFlinkSQLQueryUnregTable {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读取数据DataStream<ClickLogs>clickLogs=env.fromElements(
"Mary,./home,2022-02-02 12:00:00",
"Bob,./cart,2022-02-02 12:00:00",
"Mary,./prod?id=1,2022-02-02 12:00:05",
"Liz,./home,2022-02-02 12:01:00",
"Bob,./prod?id=3,2022-02-02 12:01:30",
"Mary,./prod?id=7,2022-02-02 12:01:45"        ).map(event-> {
String[] props=event.split(",");
returnClickLogs                    .builder()
                    .user(props[0])
                    .url(props[1])
                    .cTime(props[2])
                    .build();
        });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(clickLogs);
//5、SQL查询未注册表(注意table两边得有空格)TableresultTable=tEnv.sqlQuery("select user,count(url) as cnt from "+table+" group by user");
//6、执行并打印resultTable.execute().print();
    }
}




Flink SQL标准代码结构


Flink SQL的代码结构跟Flink Table API⼀模⼀样,核⼼就是Table。

这⾥以⽂件系统作为source和sink table举例如下


packagecom.blink.sb.sql;
importorg.apache.flink.table.api.*;
/*** Flink SQL标准结构*/publicclassFlinkSQLStandardStructure {
publicstaticvoidmain(String[] args) {
//1、创建TableEnvironmentEnvironmentSettingssettings=EnvironmentSettings                .newInstance()
                .build();
TableEnvironmenttEnv=TableEnvironment.create(settings);
//2、创建source table(DDL语句)-会自动注册表的tEnv.executeSql("CREATE TABLE emp_info ("+"    emp_id INT,"+"    name VARCHAR,"+"    dept_id INT"+") WITH ("+"    'connector' = 'filesystem',"+"    'path' = 'data/emp/input/',"+"    'format' = 'csv'"+")");//最后不要有分号,注意空格//3、创建sink table(DDL)//executeSql执行tEnv.executeSql("CREATE TABLE emp_info_copy ("+"    emp_id INT,"+"    name VARCHAR,"+"    dept_id INT"+") WITH ("+"    'connector' = 'filesystem',"+"    'path' = 'data/emp/output/',"+"    'format' = 'csv'"+")");
//4、执行SQL查询并输出结果TableresultTable=tEnv.sqlQuery("select * from emp_info where dept_id=10");
tEnv.createTemporaryView("result",resultTable);
tEnv.executeSql("INSERT INTO emp_info_copy "+"SELECT"+"   emp_id,"+"   name,"+"   dept_id "+"FROM result");
    }
}



Flink SQL 语法支持



Flink SQL截⽌到1.14⽀持的SQL语法详⻅如下链接:


https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/overview/


Flink SQL输入和输出


读写kafka


需求:从kafka消费点击⽇志(JSON),转化为CSV格式之后输出到kafka




packagecom.blink.sb.sql;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.EnvironmentSettings;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.TableEnvironment;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importorg.apache.flink.types.Row;
importjava.awt.event.TextEvent;
/*** 以Flink SQL方式从kafka消费点击日志(JSON),转化为CSV格式之后输出到Kafka*/publicclassFlinkSQLKafka2Kafka {
publicstaticfinalStringinput_topic="clicklog_input";
publicstaticfinalStringoutput_topic="clicklog_output";
publicstaticvoidmain(String[] args) throwsException {
//1、创建TableEnvironmentEnvironmentSettingssettings=EnvironmentSettings                .newInstance()
                .build();
StreamTableEnvironmenttEnv=StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment(), settings);
//2、创建source table(DDL语句)-会自动注册表的tEnv.executeSql("CREATE TABLE sourceTable ("+"  `user` STRING,"+"  `url` STRING,"+"  `cTime` STRING"+") WITH ("+"  'connector' = 'kafka',"+"  'topic' = '"+input_topic+"',"+"  'properties.bootstrap.servers' = 'node02:6667',"+"  'properties.group.id' = 'test1',"+"  'scan.startup.mode' = 'latest-offset',"+"  'format' = 'json'"+")");
//3、创建sink table(DDL)tEnv.executeSql("CREATE TABLE sinkTable ("+"  `user` STRING,"+"  `url` STRING,"+"  `cTime` STRING"+") WITH ("+"  'connector' = 'kafka',"+"  'topic' = '"+output_topic+"',"+"  'properties.bootstrap.servers' = 'node02:6667',"+"  'format' = 'csv'"+")");
//4、执行SQL查询并输出结果tEnv.executeSql("INSERT INTO sinkTable "+"SELECT"+"   user,"+"   url,"+"   cTime "+"FROM sourceTable");
    }
}



读kafka写mysql


引入jdbc connector需要的依赖库:


<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}</version></dependency>


添加对应的数据库驱动:

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency>



提前在mysql中建表



CREATETABLEtest.sinkTable (
`user`varchar(100) ,
`cnt`BIGINT)
ENGINE=InnoDBDEFAULTCHARSET=utf8COLLATE=utf8_general_ci;


相关代码:

packagecom.blink.sb.sql;
importorg.apache.flink.table.api.EnvironmentSettings;
importorg.apache.flink.table.api.TableEnvironment;
importjava.math.BigInteger;
/*** 以Flink SQL方式从kafka消费点击日志(JSON),统计之后输出到MySQL*/publicclassFlinkSQLKafka2MySQL {
publicstaticfinalStringinput_topic="clicklog_input";
publicstaticvoidmain(String[] args) throwsException {
//1、创建TableEnvironmentEnvironmentSettingssettings=EnvironmentSettings                .newInstance()
                .build();
TableEnvironmenttEnv=TableEnvironment.create(settings);
//2、创建source table(DDL语句)-会自动注册表的tEnv.executeSql("CREATE TABLE sourceTable ("+"  `user` STRING,"+"  `url` STRING,"+"  `cTime` STRING"+") WITH ("+"  'connector' = 'kafka',"+"  'topic' = '"+input_topic+"',"+"  'properties.bootstrap.servers' = 'node02:6667',"+"  'properties.group.id' = 'test1',"+"  'scan.startup.mode' = 'latest-offset',"+"  'format' = 'json'"+")");
//3、创建sink table(DDL)//        tEnv.executeSql("CREATE TABLE sinkTable (" +//                "  `user` STRING," +//                "  `url` STRING," +//                "  `cTime` STRING" +//                ") WITH (" +//                "  'connector' = 'jdbc'," +//                "  'url' = 'jdbc:mysql://node01:3306/test'," +//                "  'username' = 'root'," +//                "  'password' = 'root%123'," +//                "  'table-name' = 'sinkTable'" +//                ")");tEnv.executeSql("CREATE TABLE sinkTable ("+"  `user` STRING,"+"  `cnt` BIGINT,"+"  PRIMARY KEY (`user`) NOT ENFORCED"+") WITH ("+"  'connector' = 'jdbc',"+"  'url' = 'jdbc:mysql://node01:3306/test',"+"  'username' = 'root',"+"  'password' = 'root%123',"+"  'table-name' = 'sinkTable'"+")");
//4、执行SQL查询并输出结果//        tEnv.executeSql("INSERT INTO sinkTable " +//                "SELECT" +//                "   user," +//                "   url," +//                "   cTime " +//                "FROM sourceTable");tEnv.executeSql("INSERT INTO sinkTable "+"SELECT"+"   user,"+"   count(url) as cnt "+"FROM sourceTable "+"group by user");
    }
}


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
消息中间件 Kafka 数据库
多个2.4的Flink CDC会有debezium冲突怎么解决啊?
【2月更文挑战第25天】多个2.4的Flink CDC会有debezium冲突怎么解决啊?
24 3
|
2月前
|
Java 数据库连接 API
Flink报错问题之用Tumble窗口函数报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
SQL Java 关系型数据库
flink 1.11问题之 upsert结果出错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
25 0
|
3月前
|
SQL 存储 缓存
大厂 5 年实时数据开发经验总结,Flink SQL 看这篇就够了!
大厂 5 年实时数据开发经验总结,Flink SQL 看这篇就够了!
111 0
|
4月前
|
存储 大数据 OLAP
一文快速搞懂Kudu到底是什么
一文快速搞懂Kudu到底是什么
131 0
|
7月前
|
SQL 消息中间件 存储
Flink SQL 核心概念剖析与编程案例实战
本文使用了 Docker 镜像快速安装一些基础组件,zk 和 kafka,并通过案例的方式,剖析了 SQL 的概念与详细的使用方式
|
8月前
|
SQL BI OLAP
【面试必问】窗口函数全解-HIVE
【面试必问】窗口函数全解-HIVE
|
9月前
|
SQL Java 关系型数据库
JDBC 事务和批处理 详解(通俗易懂)
JDBC 第四节 事务和批处理详解!
175 0
|
SQL 移动开发 并行计算
不需要编写代码,也能成为Hive SQL面试高手?ChatGPT告诉你...
当你面对 Hive SQL 面试时,不仅需要掌握 SQL 语言的基本知识,还需要熟练掌握 Hive SQL 的一些高级特性,比如窗口函数、分区等等。对于初学者而言,写出高效的 Hive SQL 代码往往是一件困难的事情,而这恰恰是面试官最为看重的。但是,你不必担心!现在,有一种神奇的工具——ChatGPT,可以帮助你快速生成 Hive SQL 代码,解决你在面试中遇到的各种难题。本文将会介绍如何使用 ChatGPT 生成 Hive SQL 代码,让你在面试中轻松成为 Hive SQL 面试高手,无需编写代码也能毫不费力地完成面试题。 让我们一起来看看吧!
|
SQL 分布式计算 IDE
FlinkSQL
FlinkSQL自制脑图, 主要为大家介绍了迅速入门并掌握 FlinkSQL 的技巧,包含FlinkSQL出现的背景介绍以及与 Table API 的区别,API调用方式更是介绍的非常详细全面, Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。
383 0
FlinkSQL