FlinkSQL编程

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 查询已注册表查询未注册表Flink sql 标准代码结果Flink sql读写kafkaFlink sql去kafka写mysql

查询已注册表


packagecom.blink.sb.sql;
importcom.blink.sb.beans.ClickLogs;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importorg.apache.flink.types.Row;
/*** Flink SQL查询已注册的表*/publicclassFlinkSQLQueryRegedTable {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读取数据DataStream<ClickLogs>clickLogs=env.fromElements(
"Mary,./home,2022-02-02 12:00:00",
"Bob,./cart,2022-02-02 12:00:00",
"Mary,./prod?id=1,2022-02-02 12:00:05",
"Liz,./home,2022-02-02 12:01:00",
"Bob,./prod?id=3,2022-02-02 12:01:30",
"Mary,./prod?id=7,2022-02-02 12:01:45"        ).map(event-> {
String[] props=event.split(",");
returnClickLogs                    .builder()
                    .user(props[0])
                    .url(props[1])
                    .cTime(props[2])
                    .build();
        });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(clickLogs);
//5、注册表tEnv.createTemporaryView("sourceTable",table);
//6、SQL查询已注册表TableresultTable=tEnv.sqlQuery("select user,count(url) as cnt from sourceTable group by user");
//7、转换成流直接打印或者输出tEnv.toChangelogStream(resultTable).print();
//8、执行任务env.execute("FlinkSQLQueryUnregTable");
    }
}



查询未注册表


packagecom.blink.sb.sql;
importcom.blink.sb.beans.ClickLogs;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/*** Flink SQL查询未注册的表*/publicclassFlinkSQLQueryUnregTable {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读取数据DataStream<ClickLogs>clickLogs=env.fromElements(
"Mary,./home,2022-02-02 12:00:00",
"Bob,./cart,2022-02-02 12:00:00",
"Mary,./prod?id=1,2022-02-02 12:00:05",
"Liz,./home,2022-02-02 12:01:00",
"Bob,./prod?id=3,2022-02-02 12:01:30",
"Mary,./prod?id=7,2022-02-02 12:01:45"        ).map(event-> {
String[] props=event.split(",");
returnClickLogs                    .builder()
                    .user(props[0])
                    .url(props[1])
                    .cTime(props[2])
                    .build();
        });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(clickLogs);
//5、SQL查询未注册表(注意table两边得有空格)TableresultTable=tEnv.sqlQuery("select user,count(url) as cnt from "+table+" group by user");
//6、执行并打印resultTable.execute().print();
    }
}




Flink SQL标准代码结构


Flink SQL的代码结构跟Flink Table API⼀模⼀样,核⼼就是Table。

这⾥以⽂件系统作为source和sink table举例如下


packagecom.blink.sb.sql;
importorg.apache.flink.table.api.*;
/*** Flink SQL标准结构*/publicclassFlinkSQLStandardStructure {
publicstaticvoidmain(String[] args) {
//1、创建TableEnvironmentEnvironmentSettingssettings=EnvironmentSettings                .newInstance()
                .build();
TableEnvironmenttEnv=TableEnvironment.create(settings);
//2、创建source table(DDL语句)-会自动注册表的tEnv.executeSql("CREATE TABLE emp_info ("+"    emp_id INT,"+"    name VARCHAR,"+"    dept_id INT"+") WITH ("+"    'connector' = 'filesystem',"+"    'path' = 'data/emp/input/',"+"    'format' = 'csv'"+")");//最后不要有分号,注意空格//3、创建sink table(DDL)//executeSql执行tEnv.executeSql("CREATE TABLE emp_info_copy ("+"    emp_id INT,"+"    name VARCHAR,"+"    dept_id INT"+") WITH ("+"    'connector' = 'filesystem',"+"    'path' = 'data/emp/output/',"+"    'format' = 'csv'"+")");
//4、执行SQL查询并输出结果TableresultTable=tEnv.sqlQuery("select * from emp_info where dept_id=10");
tEnv.createTemporaryView("result",resultTable);
tEnv.executeSql("INSERT INTO emp_info_copy "+"SELECT"+"   emp_id,"+"   name,"+"   dept_id "+"FROM result");
    }
}



Flink SQL 语法支持



Flink SQL截⽌到1.14⽀持的SQL语法详⻅如下链接:


https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/overview/


Flink SQL输入和输出


读写kafka


需求:从kafka消费点击⽇志(JSON),转化为CSV格式之后输出到kafka




packagecom.blink.sb.sql;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.EnvironmentSettings;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.TableEnvironment;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importorg.apache.flink.types.Row;
importjava.awt.event.TextEvent;
/*** 以Flink SQL方式从kafka消费点击日志(JSON),转化为CSV格式之后输出到Kafka*/publicclassFlinkSQLKafka2Kafka {
publicstaticfinalStringinput_topic="clicklog_input";
publicstaticfinalStringoutput_topic="clicklog_output";
publicstaticvoidmain(String[] args) throwsException {
//1、创建TableEnvironmentEnvironmentSettingssettings=EnvironmentSettings                .newInstance()
                .build();
StreamTableEnvironmenttEnv=StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment(), settings);
//2、创建source table(DDL语句)-会自动注册表的tEnv.executeSql("CREATE TABLE sourceTable ("+"  `user` STRING,"+"  `url` STRING,"+"  `cTime` STRING"+") WITH ("+"  'connector' = 'kafka',"+"  'topic' = '"+input_topic+"',"+"  'properties.bootstrap.servers' = 'node02:6667',"+"  'properties.group.id' = 'test1',"+"  'scan.startup.mode' = 'latest-offset',"+"  'format' = 'json'"+")");
//3、创建sink table(DDL)tEnv.executeSql("CREATE TABLE sinkTable ("+"  `user` STRING,"+"  `url` STRING,"+"  `cTime` STRING"+") WITH ("+"  'connector' = 'kafka',"+"  'topic' = '"+output_topic+"',"+"  'properties.bootstrap.servers' = 'node02:6667',"+"  'format' = 'csv'"+")");
//4、执行SQL查询并输出结果tEnv.executeSql("INSERT INTO sinkTable "+"SELECT"+"   user,"+"   url,"+"   cTime "+"FROM sourceTable");
    }
}



读kafka写mysql


引入jdbc connector需要的依赖库:


<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}</version></dependency>


添加对应的数据库驱动:

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency>



提前在mysql中建表



CREATETABLEtest.sinkTable (
`user`varchar(100) ,
`cnt`BIGINT)
ENGINE=InnoDBDEFAULTCHARSET=utf8COLLATE=utf8_general_ci;


相关代码:

packagecom.blink.sb.sql;
importorg.apache.flink.table.api.EnvironmentSettings;
importorg.apache.flink.table.api.TableEnvironment;
importjava.math.BigInteger;
/*** 以Flink SQL方式从kafka消费点击日志(JSON),统计之后输出到MySQL*/publicclassFlinkSQLKafka2MySQL {
publicstaticfinalStringinput_topic="clicklog_input";
publicstaticvoidmain(String[] args) throwsException {
//1、创建TableEnvironmentEnvironmentSettingssettings=EnvironmentSettings                .newInstance()
                .build();
TableEnvironmenttEnv=TableEnvironment.create(settings);
//2、创建source table(DDL语句)-会自动注册表的tEnv.executeSql("CREATE TABLE sourceTable ("+"  `user` STRING,"+"  `url` STRING,"+"  `cTime` STRING"+") WITH ("+"  'connector' = 'kafka',"+"  'topic' = '"+input_topic+"',"+"  'properties.bootstrap.servers' = 'node02:6667',"+"  'properties.group.id' = 'test1',"+"  'scan.startup.mode' = 'latest-offset',"+"  'format' = 'json'"+")");
//3、创建sink table(DDL)//        tEnv.executeSql("CREATE TABLE sinkTable (" +//                "  `user` STRING," +//                "  `url` STRING," +//                "  `cTime` STRING" +//                ") WITH (" +//                "  'connector' = 'jdbc'," +//                "  'url' = 'jdbc:mysql://node01:3306/test'," +//                "  'username' = 'root'," +//                "  'password' = 'root%123'," +//                "  'table-name' = 'sinkTable'" +//                ")");tEnv.executeSql("CREATE TABLE sinkTable ("+"  `user` STRING,"+"  `cnt` BIGINT,"+"  PRIMARY KEY (`user`) NOT ENFORCED"+") WITH ("+"  'connector' = 'jdbc',"+"  'url' = 'jdbc:mysql://node01:3306/test',"+"  'username' = 'root',"+"  'password' = 'root%123',"+"  'table-name' = 'sinkTable'"+")");
//4、执行SQL查询并输出结果//        tEnv.executeSql("INSERT INTO sinkTable " +//                "SELECT" +//                "   user," +//                "   url," +//                "   cTime " +//                "FROM sourceTable");tEnv.executeSql("INSERT INTO sinkTable "+"SELECT"+"   user,"+"   count(url) as cnt "+"FROM sourceTable "+"group by user");
    }
}


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
SQL 分布式计算 数据处理
FlinkSQL开发经验分享
FlinkSQL开发经验分享
|
2月前
|
资源调度 关系型数据库 MySQL
【Flink on YARN + CDC 3.0】神操作!看完这篇教程,你也能成为数据流处理高手!从零开始,一步步教会你在Flink on YARN模式下如何配置Debezium CDC 3.0,让你的数据库变更数据瞬间飞起来!
【8月更文挑战第15天】随着Apache Flink的普及,企业广泛采用Flink on YARN部署流处理应用,高效利用集群资源。变更数据捕获(CDC)工具在现代数据栈中至关重要,能实时捕捉数据库变化并转发给下游系统处理。本文以Flink on YARN为例,介绍如何在Debezium CDC 3.0中配置MySQL连接器,实现数据流处理。首先确保YARN上已部署Flink集群,接着安装Debezium MySQL连接器并配置Kafka Connect。最后,创建Flink任务消费变更事件并提交任务到Flink集群。通过这些步骤,可以构建出从数据库变更到实时处理的无缝数据管道。
150 2
|
5月前
|
SQL 存储 缓存
大厂 5 年实时数据开发经验总结,Flink SQL 看这篇就够了!
大厂 5 年实时数据开发经验总结,Flink SQL 看这篇就够了!
285 58
|
5月前
|
分布式计算 监控 API
flink 入门编程day02
flink 入门编程day02
|
5月前
|
Java 数据库连接 API
Flink报错问题之用Tumble窗口函数报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
SQL 消息中间件 存储
Flink SQL 核心概念剖析与编程案例实战
本文使用了 Docker 镜像快速安装一些基础组件,zk 和 kafka,并通过案例的方式,剖析了 SQL 的概念与详细的使用方式
|
SQL 移动开发 并行计算
不需要编写代码,也能成为Hive SQL面试高手?ChatGPT告诉你...
当你面对 Hive SQL 面试时,不仅需要掌握 SQL 语言的基本知识,还需要熟练掌握 Hive SQL 的一些高级特性,比如窗口函数、分区等等。对于初学者而言,写出高效的 Hive SQL 代码往往是一件困难的事情,而这恰恰是面试官最为看重的。但是,你不必担心!现在,有一种神奇的工具——ChatGPT,可以帮助你快速生成 Hive SQL 代码,解决你在面试中遇到的各种难题。本文将会介绍如何使用 ChatGPT 生成 Hive SQL 代码,让你在面试中轻松成为 Hive SQL 面试高手,无需编写代码也能毫不费力地完成面试题。 让我们一起来看看吧!
|
SQL 数据处理 调度
Exactly Once 语义在 Flink 中的实现|青训营笔记
本篇文章主要讲述了Flink是如何实现在分布式环境下,对于task的处理做到exactly-once的语义的(结合二阶段提交协议)
152 0
Exactly Once 语义在 Flink 中的实现|青训营笔记
|
SQL API 流计算
Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(三)SQL篇(三)
Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(三)SQL篇
682 0
Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(三)SQL篇(三)
|
SQL 存储 消息中间件
Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(三)SQL篇(一)
Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(三)SQL篇
361 0
Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(三)SQL篇(一)