FlinkSQL编程

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 查询已注册表查询未注册表Flink sql 标准代码结果Flink sql读写kafkaFlink sql去kafka写mysql

查询已注册表


packagecom.blink.sb.sql;
importcom.blink.sb.beans.ClickLogs;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importorg.apache.flink.types.Row;
/*** Flink SQL查询已注册的表*/publicclassFlinkSQLQueryRegedTable {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读取数据DataStream<ClickLogs>clickLogs=env.fromElements(
"Mary,./home,2022-02-02 12:00:00",
"Bob,./cart,2022-02-02 12:00:00",
"Mary,./prod?id=1,2022-02-02 12:00:05",
"Liz,./home,2022-02-02 12:01:00",
"Bob,./prod?id=3,2022-02-02 12:01:30",
"Mary,./prod?id=7,2022-02-02 12:01:45"        ).map(event-> {
String[] props=event.split(",");
returnClickLogs                    .builder()
                    .user(props[0])
                    .url(props[1])
                    .cTime(props[2])
                    .build();
        });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(clickLogs);
//5、注册表tEnv.createTemporaryView("sourceTable",table);
//6、SQL查询已注册表TableresultTable=tEnv.sqlQuery("select user,count(url) as cnt from sourceTable group by user");
//7、转换成流直接打印或者输出tEnv.toChangelogStream(resultTable).print();
//8、执行任务env.execute("FlinkSQLQueryUnregTable");
    }
}



查询未注册表


packagecom.blink.sb.sql;
importcom.blink.sb.beans.ClickLogs;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/*** Flink SQL查询未注册的表*/publicclassFlinkSQLQueryUnregTable {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读取数据DataStream<ClickLogs>clickLogs=env.fromElements(
"Mary,./home,2022-02-02 12:00:00",
"Bob,./cart,2022-02-02 12:00:00",
"Mary,./prod?id=1,2022-02-02 12:00:05",
"Liz,./home,2022-02-02 12:01:00",
"Bob,./prod?id=3,2022-02-02 12:01:30",
"Mary,./prod?id=7,2022-02-02 12:01:45"        ).map(event-> {
String[] props=event.split(",");
returnClickLogs                    .builder()
                    .user(props[0])
                    .url(props[1])
                    .cTime(props[2])
                    .build();
        });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(clickLogs);
//5、SQL查询未注册表(注意table两边得有空格)TableresultTable=tEnv.sqlQuery("select user,count(url) as cnt from "+table+" group by user");
//6、执行并打印resultTable.execute().print();
    }
}




Flink SQL标准代码结构


Flink SQL的代码结构跟Flink Table API⼀模⼀样,核⼼就是Table。

这⾥以⽂件系统作为source和sink table举例如下


packagecom.blink.sb.sql;
importorg.apache.flink.table.api.*;
/*** Flink SQL标准结构*/publicclassFlinkSQLStandardStructure {
publicstaticvoidmain(String[] args) {
//1、创建TableEnvironmentEnvironmentSettingssettings=EnvironmentSettings                .newInstance()
                .build();
TableEnvironmenttEnv=TableEnvironment.create(settings);
//2、创建source table(DDL语句)-会自动注册表的tEnv.executeSql("CREATE TABLE emp_info ("+"    emp_id INT,"+"    name VARCHAR,"+"    dept_id INT"+") WITH ("+"    'connector' = 'filesystem',"+"    'path' = 'data/emp/input/',"+"    'format' = 'csv'"+")");//最后不要有分号,注意空格//3、创建sink table(DDL)//executeSql执行tEnv.executeSql("CREATE TABLE emp_info_copy ("+"    emp_id INT,"+"    name VARCHAR,"+"    dept_id INT"+") WITH ("+"    'connector' = 'filesystem',"+"    'path' = 'data/emp/output/',"+"    'format' = 'csv'"+")");
//4、执行SQL查询并输出结果TableresultTable=tEnv.sqlQuery("select * from emp_info where dept_id=10");
tEnv.createTemporaryView("result",resultTable);
tEnv.executeSql("INSERT INTO emp_info_copy "+"SELECT"+"   emp_id,"+"   name,"+"   dept_id "+"FROM result");
    }
}



Flink SQL 语法支持



Flink SQL截⽌到1.14⽀持的SQL语法详⻅如下链接:


https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/overview/


Flink SQL输入和输出


读写kafka


需求:从kafka消费点击⽇志(JSON),转化为CSV格式之后输出到kafka




packagecom.blink.sb.sql;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.EnvironmentSettings;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.TableEnvironment;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importorg.apache.flink.types.Row;
importjava.awt.event.TextEvent;
/*** 以Flink SQL方式从kafka消费点击日志(JSON),转化为CSV格式之后输出到Kafka*/publicclassFlinkSQLKafka2Kafka {
publicstaticfinalStringinput_topic="clicklog_input";
publicstaticfinalStringoutput_topic="clicklog_output";
publicstaticvoidmain(String[] args) throwsException {
//1、创建TableEnvironmentEnvironmentSettingssettings=EnvironmentSettings                .newInstance()
                .build();
StreamTableEnvironmenttEnv=StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment(), settings);
//2、创建source table(DDL语句)-会自动注册表的tEnv.executeSql("CREATE TABLE sourceTable ("+"  `user` STRING,"+"  `url` STRING,"+"  `cTime` STRING"+") WITH ("+"  'connector' = 'kafka',"+"  'topic' = '"+input_topic+"',"+"  'properties.bootstrap.servers' = 'node02:6667',"+"  'properties.group.id' = 'test1',"+"  'scan.startup.mode' = 'latest-offset',"+"  'format' = 'json'"+")");
//3、创建sink table(DDL)tEnv.executeSql("CREATE TABLE sinkTable ("+"  `user` STRING,"+"  `url` STRING,"+"  `cTime` STRING"+") WITH ("+"  'connector' = 'kafka',"+"  'topic' = '"+output_topic+"',"+"  'properties.bootstrap.servers' = 'node02:6667',"+"  'format' = 'csv'"+")");
//4、执行SQL查询并输出结果tEnv.executeSql("INSERT INTO sinkTable "+"SELECT"+"   user,"+"   url,"+"   cTime "+"FROM sourceTable");
    }
}



读kafka写mysql


引入jdbc connector需要的依赖库:


<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}</version></dependency>


添加对应的数据库驱动:

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency>



提前在mysql中建表



CREATETABLEtest.sinkTable (
`user`varchar(100) ,
`cnt`BIGINT)
ENGINE=InnoDBDEFAULTCHARSET=utf8COLLATE=utf8_general_ci;


相关代码:

packagecom.blink.sb.sql;
importorg.apache.flink.table.api.EnvironmentSettings;
importorg.apache.flink.table.api.TableEnvironment;
importjava.math.BigInteger;
/*** 以Flink SQL方式从kafka消费点击日志(JSON),统计之后输出到MySQL*/publicclassFlinkSQLKafka2MySQL {
publicstaticfinalStringinput_topic="clicklog_input";
publicstaticvoidmain(String[] args) throwsException {
//1、创建TableEnvironmentEnvironmentSettingssettings=EnvironmentSettings                .newInstance()
                .build();
TableEnvironmenttEnv=TableEnvironment.create(settings);
//2、创建source table(DDL语句)-会自动注册表的tEnv.executeSql("CREATE TABLE sourceTable ("+"  `user` STRING,"+"  `url` STRING,"+"  `cTime` STRING"+") WITH ("+"  'connector' = 'kafka',"+"  'topic' = '"+input_topic+"',"+"  'properties.bootstrap.servers' = 'node02:6667',"+"  'properties.group.id' = 'test1',"+"  'scan.startup.mode' = 'latest-offset',"+"  'format' = 'json'"+")");
//3、创建sink table(DDL)//        tEnv.executeSql("CREATE TABLE sinkTable (" +//                "  `user` STRING," +//                "  `url` STRING," +//                "  `cTime` STRING" +//                ") WITH (" +//                "  'connector' = 'jdbc'," +//                "  'url' = 'jdbc:mysql://node01:3306/test'," +//                "  'username' = 'root'," +//                "  'password' = 'root%123'," +//                "  'table-name' = 'sinkTable'" +//                ")");tEnv.executeSql("CREATE TABLE sinkTable ("+"  `user` STRING,"+"  `cnt` BIGINT,"+"  PRIMARY KEY (`user`) NOT ENFORCED"+") WITH ("+"  'connector' = 'jdbc',"+"  'url' = 'jdbc:mysql://node01:3306/test',"+"  'username' = 'root',"+"  'password' = 'root%123',"+"  'table-name' = 'sinkTable'"+")");
//4、执行SQL查询并输出结果//        tEnv.executeSql("INSERT INTO sinkTable " +//                "SELECT" +//                "   user," +//                "   url," +//                "   cTime " +//                "FROM sourceTable");tEnv.executeSql("INSERT INTO sinkTable "+"SELECT"+"   user,"+"   count(url) as cnt "+"FROM sourceTable "+"group by user");
    }
}


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
4月前
|
SQL 分布式计算 数据处理
FlinkSQL开发经验分享
FlinkSQL开发经验分享
118 8
|
3月前
|
资源调度 关系型数据库 MySQL
【Flink on YARN + CDC 3.0】神操作!看完这篇教程,你也能成为数据流处理高手!从零开始,一步步教会你在Flink on YARN模式下如何配置Debezium CDC 3.0,让你的数据库变更数据瞬间飞起来!
【8月更文挑战第15天】随着Apache Flink的普及,企业广泛采用Flink on YARN部署流处理应用,高效利用集群资源。变更数据捕获(CDC)工具在现代数据栈中至关重要,能实时捕捉数据库变化并转发给下游系统处理。本文以Flink on YARN为例,介绍如何在Debezium CDC 3.0中配置MySQL连接器,实现数据流处理。首先确保YARN上已部署Flink集群,接着安装Debezium MySQL连接器并配置Kafka Connect。最后,创建Flink任务消费变更事件并提交任务到Flink集群。通过这些步骤,可以构建出从数据库变更到实时处理的无缝数据管道。
303 2
|
6月前
|
SQL 存储 缓存
大厂 5 年实时数据开发经验总结,Flink SQL 看这篇就够了!
大厂 5 年实时数据开发经验总结,Flink SQL 看这篇就够了!
297 58
|
6月前
|
SQL Java 关系型数据库
Flink SQL 问题之用代码执行报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
703 6
|
6月前
|
Java 数据库连接 API
Flink报错问题之用Tumble窗口函数报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
SQL API 流计算
Flink SQL代码补全提示(源码分析)
Flink SQL代码补全提示(源码分析)
70 0
|
SQL 消息中间件 存储
Flink SQL 核心概念剖析与编程案例实战
本文使用了 Docker 镜像快速安装一些基础组件,zk 和 kafka,并通过案例的方式,剖析了 SQL 的概念与详细的使用方式
|
SQL 存储 缓存
最佳实践|如何写出简单高效的 Flink SQL?
通过几个经典案例介绍 Flink SQL 的最佳实践:如何写出简单高效的 Flink SQL,哪些 SQL 是 BAD SQL。帮助大家更好地的认识 Flink SQL。
45674 0
最佳实践|如何写出简单高效的 Flink SQL?
EMQ
|
SQL 存储 物联网
eKuiper 源码解读:从一条 SQL 到流处理任务的旅程
在本篇文章中,我们以梳理关键代码节点的方式了解了 eKuiper 的 SQL 计算引擎中是如何解析、处理,并最终执行这条 SQL 得到相应的结果。对于整个计算引擎关键处理节点里,我们了解了每个环节的代码大致是如何运行的。
EMQ
170 0
eKuiper 源码解读:从一条 SQL 到流处理任务的旅程
|
SQL API 流计算
Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(三)SQL篇(三)
Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(三)SQL篇
698 0
Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(三)SQL篇(三)