【Flink】(十)Flink Table API 和 Flink SQL 入门

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【Flink】(十)Flink Table API 和 Flink SQL 入门

文章目录


一、需要引入的pom依赖

二、简单了解 Table API

2.1 动态表

2.2 字段

三、Table API 的窗口聚合操作

3.1 通过一个例子了解Table API

3.2 关于group by

3.3 关于时间窗口

四、SQL 如何编写


Table API 是流处理和批处理通用的关系型 API,Table API 可以基于流输入或者批输入来运行而不需要进行任何修改。Table API 是 SQL 语言的超集并专门为 Apache Flink 设计的,Table API 是 Scala 和 Java 语言集成式的 API。与常规 SQL 语言中将查询指定为字符串不同,Table API 查询是以 Java 或 Scala 中的语言嵌入样式来定义的,具有 IDE 支持如:自动完成和语法检测。


一、需要引入的pom依赖


<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-table_2.11</artifactId>
 <version>1.7.2</version>
</dependency>


二、简单了解 Table API


def main(args: Array[String]): Unit = {
 val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
 val myKafkaConsumer: FlinkKafkaConsumer011[String] = 
MyKafkaUtil.getConsumer("ECOMMERCE")
 val dstream: DataStream[String] = env.addSource(myKafkaConsumer)
 val tableEnv: StreamTableEnvironment = 
TableEnvironment.getTableEnvironment(env)
 val ecommerceLogDstream: DataStream[EcommerceLog] = dstream.map{ 
jsonString => JSON.parseObject(jsonString,classOf[EcommerceLog]) }
 val ecommerceLogTable: Table = 
 tableEnv.fromDataStream(ecommerceLogDstream)
 val table: Table = ecommerceLogTable.select("mid,ch").filter("ch='appstore'")
 val midchDataStream: DataStream[(String, String)] = 
table.toAppendStream[(String,String)]
 midchDataStream.print()
 env.execute()
}


2.1 动态表


如果流中的数据类型是 case class 可以直接根据 case class 的结构生成 table

tableEnv.fromDataStream(ecommerceLogDstream)


或者根据字段顺序单独命名

tableEnv.fromDataStream(ecommerceLogDstream,’mid,’uid .......)


最后的动态表可以转换为流进行输出

table.toAppendStream[(String,String)]


2.2 字段


用一个单引放到字段前面来标识字段名, 如 ‘name , ‘mid ,’amount 等


三、Table API 的窗口聚合操作


3.1 通过一个例子了解Table API


//每 10 秒中渠道为 appstore 的个数
def main(args: Array[String]): Unit = {
 //sparkcontext
 val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
 //时间特性改为 eventTime
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val myKafkaConsumer: FlinkKafkaConsumer011[String] = 
MyKafkaUtil.getConsumer("ECOMMERCE")
 val dstream: DataStream[String] = env.addSource(myKafkaConsumer)
 val ecommerceLogDstream: DataStream[EcommerceLog] = dstream.map{ jsonString 
=>JSON.parseObject(jsonString,classOf[EcommerceLog]) }
 //告知 watermark 和 eventTime 如何提取
 val ecommerceLogWithEventTimeDStream: DataStream[EcommerceLog] = 
ecommerceLogDstream.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[EcommerceLog](Time.seconds(0L)) {
 override def extractTimestamp(element: EcommerceLog): Long = {
 element.ts
 }
 }).setParallelism(1)
 val tableEnv: StreamTableEnvironment = 
TableEnvironment.getTableEnvironment(env)
 //把数据流转化成 Table
 val ecommerceTable: Table = 
tableEnv.fromDataStream(ecommerceLogWithEventTimeDStream , 
'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ts.rowtime)
 //通过 table api 进行操作
 // 每 10 秒 统计一次各个渠道的个数 table api 解决
 //1 groupby 2 要用 window 3 用 eventtime 来确定开窗时间
 val resultTable: Table = ecommerceTable.
 window(Tumble over 10000.millis on 'ts as 'tt).groupBy('ch,'tt ).select( 'ch, 'ch.count)
 //把 Table 转化成数据流
 val resultDstream: DataStream[(Boolean, (String, Long))] = resultSQLTable.toRetractStream[(String,Long)]
 resultDstream.filter(_._1).print()
 env.execute()
}


3.2 关于group by


1.如果了使用 groupby,table 转换为流的时候只能用 toRetractDstream

val rDstream: DataStream[(Boolean, (String, Long))] = table
.toRetractStream[(String,Long)]


2.toRetractDstream 得到的第一个 boolean 型字段标识 true 就是最新的数据(Insert),false 表示过期老数据(Delete)

val rDstream: DataStream[(Boolean, (String, Long))] = table
.toRetractStream[(String,Long)]
 rDstream.filter(_._1).print()


3.如果使用的 api 包括时间窗口,那么窗口的字段必须出现在 groupBy 中。

val table: Table = ecommerceLogTable
.filter("ch ='appstore'")
.window(Tumble over 10000.millis on 'ts as 'tt)
.groupBy('ch ,'tt)
.select("ch,ch.count ")


3.3 关于时间窗口


1.用到时间窗口,必须提前声明时间字段,如果是 processTime 直接在创建动态表时进行追加就可以。

val ecommerceLogTable: Table = tableEnv
.fromDataStream( ecommerceLogWithEtDstream,
'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ps.proctime)


2.如果是 EventTime 要在创建动态表时声明

val ecommerceLogTable: Table = tableEnv
.fromDataStream(ecommerceLogWithEtDstream,
'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ts.rowtime)


3.滚动窗口可以使用 Tumble over 10000.millis on 来表示

val table: Table = ecommerceLogTable.filter("ch ='appstore'")
.window(Tumble over 10000.millis on 'ts as 'tt)
.groupBy('ch ,'tt)
.select("ch,ch.count ")


四、SQL 如何编写


def main(args: Array[String]): Unit = {
 //sparkcontext
 val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
 //时间特性改为 eventTime
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 val myKafkaConsumer: FlinkKafkaConsumer011[String] = 
MyKafkaUtil.getConsumer("ECOMMERCE")
 val dstream: DataStream[String] = env.addSource(myKafkaConsumer)
 val ecommerceLogDstream: DataStream[EcommerceLog] = dstream.map{ jsonString 
=>JSON.parseObject(jsonString,classOf[EcommerceLog]) }
 //告知 watermark 和 eventTime 如何提取
 val ecommerceLogWithEventTimeDStream: DataStream[EcommerceLog] = 
ecommerceLogDstream.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[EcommerceLog](Time.seconds(0L)) {
 override def extractTimestamp(element: EcommerceLog): Long = {
 element.ts
 }
 }).setParallelism(1)
 //SparkSession
 val tableEnv: StreamTableEnvironment = 
TableEnvironment.getTableEnvironment(env)
 //把数据流转化成 Table
 val ecommerceTable: Table = 
tableEnv.fromDataStream(ecommerceLogWithEventTimeDStream , 
'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ts.rowtime)
 //通过 table api 进行操作
 // 每 10 秒 统计一次各个渠道的个数 table api 解决
 //1 groupby 2 要用 window 3 用 eventtime 来确定开窗时间
 val resultTable: Table = ecommerceTable
  .window(Tumble over 10000.millis on 'ts as 'tt)
  .groupBy('ch,'tt )
   .select( 'ch, 'ch.count)
// 通过 sql 进行操作
 val resultSQLTable : Table = tableEnv.sqlQuery( "select ch ,count(ch) from 
 "+ecommerceTable+" group by ch ,Tumble(ts,interval '10' SECOND )")
 //把 Table 转化成数据流
 //val appstoreDStream: DataStream[(String, String, Long)] = 
appstoreTable.toAppendStream[(String,String,Long)]
 val resultDstream: DataStream[(Boolean, (String, Long))] = 
resultSQLTable.toRetractStream[(String,Long)]
 resultDstream.filter(_._1).print()
 env.execute()
}


///

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
开发框架 .NET API
RESTful API 设计与实现:C# 开发者的一分钟入门
【10月更文挑战第5天】本文从零开始,介绍了如何使用 C# 和 ASP.NET Core 设计并实现一个简单的 RESTful API。首先解释了 RESTful API 的概念及其核心原则,然后详细说明了设计 RESTful API 的关键步骤,包括资源识别、URI 设计、HTTP 方法选择、状态码使用和错误处理。最后,通过一个用户管理 API 的示例,演示了如何创建项目、定义模型、实现控制器及运行测试,帮助读者掌握 RESTful API 的开发技巧。
92 7
|
4月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
214 15
|
22天前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
98 14
|
3月前
|
SQL 存储 数据管理
SQL数据库的使用指南:从入门到精通
随着信息技术的飞速发展,数据库已成为各类企业和组织不可或缺的一部分。作为最流行的数据库管理系统之一,SQL数据库广泛应用于各种场景,如数据存储、数据管理、数据分析等。本文将详细介绍SQL数据库的使用方法,帮助初学者快速入门,并帮助有经验的开发者深化理解。一、SQL数据库基础首先,我们需要理解SQL数
172 2
|
3月前
|
机器学习/深度学习 算法 API
机器学习入门(五):KNN概述 | K 近邻算法 API,K值选择问题
机器学习入门(五):KNN概述 | K 近邻算法 API,K值选择问题
|
3月前
|
SQL 数据库
SQL数据库基础语法入门
[link](http://www.vvo.net.cn/post/082935.html)
|
3月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
64 0
|
4月前
|
SQL 安全 数据库
从入门到精通:Python Web安全守护指南,SQL注入、XSS、CSRF全防御!
【9月更文挑战第13天】在开发Python Web应用时,安全性至关重要。本文通过问答形式,详细介绍如何防范SQL注入、XSS及CSRF等常见威胁。通过使用参数化查询、HTML转义和CSRF令牌等技术,确保应用安全。附带示例代码,帮助读者从入门到精通Python Web安全。
106 6
|
3月前
|
SQL 安全 关系型数据库
SQL自动化注ru-SQLmap入门操作(一)
SQL自动化注ru-SQLmap入门操作(一)
|
3月前
|
SQL 安全 关系型数据库
SQL自动化注茹-SQLmap入门操作(二)
SQL自动化注茹-SQLmap入门操作(二)