【Flink】(十)Flink Table API 和 Flink SQL 入门

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 【Flink】(十)Flink Table API 和 Flink SQL 入门

文章目录


一、需要引入的pom依赖

二、简单了解 Table API

2.1 动态表

2.2 字段

三、Table API 的窗口聚合操作

3.1 通过一个例子了解Table API

3.2 关于group by

3.3 关于时间窗口

四、SQL 如何编写


Table API 是流处理和批处理通用的关系型 API,Table API 可以基于流输入或者批输入来运行而不需要进行任何修改。Table API 是 SQL 语言的超集并专门为 Apache Flink 设计的,Table API 是 Scala 和 Java 语言集成式的 API。与常规 SQL 语言中将查询指定为字符串不同,Table API 查询是以 Java 或 Scala 中的语言嵌入样式来定义的,具有 IDE 支持如:自动完成和语法检测。


一、需要引入的pom依赖


<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-table_2.11</artifactId>
 <version>1.7.2</version>
</dependency>


二、简单了解 Table API


def main(args: Array[String]): Unit = {
 val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
 val myKafkaConsumer: FlinkKafkaConsumer011[String] = 
MyKafkaUtil.getConsumer("ECOMMERCE")
 val dstream: DataStream[String] = env.addSource(myKafkaConsumer)
 val tableEnv: StreamTableEnvironment = 
TableEnvironment.getTableEnvironment(env)
 val ecommerceLogDstream: DataStream[EcommerceLog] = dstream.map{ 
jsonString => JSON.parseObject(jsonString,classOf[EcommerceLog]) }
 val ecommerceLogTable: Table = 
 tableEnv.fromDataStream(ecommerceLogDstream)
 val table: Table = ecommerceLogTable.select("mid,ch").filter("ch='appstore'")
 val midchDataStream: DataStream[(String, String)] = 
table.toAppendStream[(String,String)]
 midchDataStream.print()
 env.execute()
}


2.1 动态表


如果流中的数据类型是 case class 可以直接根据 case class 的结构生成 table

tableEnv.fromDataStream(ecommerceLogDstream)


或者根据字段顺序单独命名

tableEnv.fromDataStream(ecommerceLogDstream,’mid,’uid .......)


最后的动态表可以转换为流进行输出

table.toAppendStream[(String,String)]


2.2 字段


用一个单引放到字段前面来标识字段名, 如 ‘name , ‘mid ,’amount 等


三、Table API 的窗口聚合操作


3.1 通过一个例子了解Table API


//每 10 秒中渠道为 appstore 的个数
def main(args: Array[String]): Unit = {
 //sparkcontext
 val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
 //时间特性改为 eventTime
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val myKafkaConsumer: FlinkKafkaConsumer011[String] = 
MyKafkaUtil.getConsumer("ECOMMERCE")
 val dstream: DataStream[String] = env.addSource(myKafkaConsumer)
 val ecommerceLogDstream: DataStream[EcommerceLog] = dstream.map{ jsonString 
=>JSON.parseObject(jsonString,classOf[EcommerceLog]) }
 //告知 watermark 和 eventTime 如何提取
 val ecommerceLogWithEventTimeDStream: DataStream[EcommerceLog] = 
ecommerceLogDstream.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[EcommerceLog](Time.seconds(0L)) {
 override def extractTimestamp(element: EcommerceLog): Long = {
 element.ts
 }
 }).setParallelism(1)
 val tableEnv: StreamTableEnvironment = 
TableEnvironment.getTableEnvironment(env)
 //把数据流转化成 Table
 val ecommerceTable: Table = 
tableEnv.fromDataStream(ecommerceLogWithEventTimeDStream , 
'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ts.rowtime)
 //通过 table api 进行操作
 // 每 10 秒 统计一次各个渠道的个数 table api 解决
 //1 groupby 2 要用 window 3 用 eventtime 来确定开窗时间
 val resultTable: Table = ecommerceTable.
 window(Tumble over 10000.millis on 'ts as 'tt).groupBy('ch,'tt ).select( 'ch, 'ch.count)
 //把 Table 转化成数据流
 val resultDstream: DataStream[(Boolean, (String, Long))] = resultSQLTable.toRetractStream[(String,Long)]
 resultDstream.filter(_._1).print()
 env.execute()
}


3.2 关于group by


1.如果了使用 groupby,table 转换为流的时候只能用 toRetractDstream

val rDstream: DataStream[(Boolean, (String, Long))] = table
.toRetractStream[(String,Long)]


2.toRetractDstream 得到的第一个 boolean 型字段标识 true 就是最新的数据(Insert),false 表示过期老数据(Delete)

val rDstream: DataStream[(Boolean, (String, Long))] = table
.toRetractStream[(String,Long)]
 rDstream.filter(_._1).print()


3.如果使用的 api 包括时间窗口,那么窗口的字段必须出现在 groupBy 中。

val table: Table = ecommerceLogTable
.filter("ch ='appstore'")
.window(Tumble over 10000.millis on 'ts as 'tt)
.groupBy('ch ,'tt)
.select("ch,ch.count ")


3.3 关于时间窗口


1.用到时间窗口,必须提前声明时间字段,如果是 processTime 直接在创建动态表时进行追加就可以。

val ecommerceLogTable: Table = tableEnv
.fromDataStream( ecommerceLogWithEtDstream,
'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ps.proctime)


2.如果是 EventTime 要在创建动态表时声明

val ecommerceLogTable: Table = tableEnv
.fromDataStream(ecommerceLogWithEtDstream,
'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ts.rowtime)


3.滚动窗口可以使用 Tumble over 10000.millis on 来表示

val table: Table = ecommerceLogTable.filter("ch ='appstore'")
.window(Tumble over 10000.millis on 'ts as 'tt)
.groupBy('ch ,'tt)
.select("ch,ch.count ")


四、SQL 如何编写


def main(args: Array[String]): Unit = {
 //sparkcontext
 val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
 //时间特性改为 eventTime
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 val myKafkaConsumer: FlinkKafkaConsumer011[String] = 
MyKafkaUtil.getConsumer("ECOMMERCE")
 val dstream: DataStream[String] = env.addSource(myKafkaConsumer)
 val ecommerceLogDstream: DataStream[EcommerceLog] = dstream.map{ jsonString 
=>JSON.parseObject(jsonString,classOf[EcommerceLog]) }
 //告知 watermark 和 eventTime 如何提取
 val ecommerceLogWithEventTimeDStream: DataStream[EcommerceLog] = 
ecommerceLogDstream.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[EcommerceLog](Time.seconds(0L)) {
 override def extractTimestamp(element: EcommerceLog): Long = {
 element.ts
 }
 }).setParallelism(1)
 //SparkSession
 val tableEnv: StreamTableEnvironment = 
TableEnvironment.getTableEnvironment(env)
 //把数据流转化成 Table
 val ecommerceTable: Table = 
tableEnv.fromDataStream(ecommerceLogWithEventTimeDStream , 
'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ts.rowtime)
 //通过 table api 进行操作
 // 每 10 秒 统计一次各个渠道的个数 table api 解决
 //1 groupby 2 要用 window 3 用 eventtime 来确定开窗时间
 val resultTable: Table = ecommerceTable
  .window(Tumble over 10000.millis on 'ts as 'tt)
  .groupBy('ch,'tt )
   .select( 'ch, 'ch.count)
// 通过 sql 进行操作
 val resultSQLTable : Table = tableEnv.sqlQuery( "select ch ,count(ch) from 
 "+ecommerceTable+" group by ch ,Tumble(ts,interval '10' SECOND )")
 //把 Table 转化成数据流
 //val appstoreDStream: DataStream[(String, String, Long)] = 
appstoreTable.toAppendStream[(String,String,Long)]
 val resultDstream: DataStream[(Boolean, (String, Long))] = 
resultSQLTable.toRetractStream[(String,Long)]
 resultDstream.filter(_._1).print()
 env.execute()
}


///

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
3月前
|
SQL 分布式计算 大数据
SparkSQL 入门指南:小白也能懂的大数据 SQL 处理神器
在大数据处理的领域,SparkSQL 是一种非常强大的工具,它可以让开发人员以 SQL 的方式处理和查询大规模数据集。SparkSQL 集成了 SQL 查询引擎和 Spark 的分布式计算引擎,使得我们可以在分布式环境下执行 SQL 查询,并能利用 Spark 的强大计算能力进行数据分析。
|
9月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
本文整理自阿里云智能集团 Flink PMC Member & Committer 徐榜江(雪尽)在 FFA 2024 分论坛的分享,涵盖四大主题:Flink CDC、YAML API、Transform + AI 和 Community。文章详细介绍了 Flink CDC 的发展历程及其优势,特别是 YAML API 的设计与实现,以及如何通过 Transform 和 AI 模型集成提升数据处理能力。最后,分享了社区动态和未来规划,欢迎更多开发者加入开源社区,共同推动 Flink CDC 的发展。
707 12
Flink CDC YAML:面向数据集成的 API 设计
|
9月前
|
SQL 存储 API
Flink Materialized Table:构建流批一体 ETL
本文整理自阿里云智能集团 Apache Flink Committer 刘大龙老师在2024FFA流批一体论坛的分享,涵盖三部分内容:数据工程师用户故事、Materialized Table 构建流批一体 ETL 及 Demo。文章通过案例分析传统 Lambda 架构的挑战,介绍了 Materialized Table 如何简化流批处理,提供统一 API 和声明式 ETL,实现高效的数据处理和维护。最后展示了基于 Flink 和 Paimon 的实际演示,帮助用户更好地理解和应用这一技术。
794 7
Flink Materialized Table:构建流批一体 ETL
|
8月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
Flink CDC YAML:面向数据集成的 API 设计
342 5
|
8月前
|
SQL 存储 API
Flink Materialized Table:构建流批一体 ETL
Flink Materialized Table:构建流批一体 ETL
153 3
|
10月前
|
SQL 存储 机器学习/深度学习
如何让SQL速度飞起来 入门YashanDB优化器
优化器,SQL引擎的核心组成部分,是数据库中用于把关系表达式转换成最优执行计划的核心组件,影响数据库系统执行性能的关键组件之一。
133 15
|
SQL 存储 数据管理
SQL数据库的使用指南:从入门到精通
随着信息技术的飞速发展,数据库已成为各类企业和组织不可或缺的一部分。作为最流行的数据库管理系统之一,SQL数据库广泛应用于各种场景,如数据存储、数据管理、数据分析等。本文将详细介绍SQL数据库的使用方法,帮助初学者快速入门,并帮助有经验的开发者深化理解。一、SQL数据库基础首先,我们需要理解SQL数
595 2
|
SQL 安全 数据库
从入门到精通:Python Web安全守护指南,SQL注入、XSS、CSRF全防御!
【9月更文挑战第13天】在开发Python Web应用时,安全性至关重要。本文通过问答形式,详细介绍如何防范SQL注入、XSS及CSRF等常见威胁。通过使用参数化查询、HTML转义和CSRF令牌等技术,确保应用安全。附带示例代码,帮助读者从入门到精通Python Web安全。
259 6
|
SQL 数据库
SQL数据库基础语法入门
[link](http://www.vvo.net.cn/post/082935.html)
|
SQL 安全 关系型数据库
SQL自动化注ru-SQLmap入门操作(一)
SQL自动化注ru-SQLmap入门操作(一)
下一篇
oss云网关配置