【Flink】(十)Flink Table API 和 Flink SQL 入门

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【Flink】(十)Flink Table API 和 Flink SQL 入门

文章目录


一、需要引入的pom依赖

二、简单了解 Table API

2.1 动态表

2.2 字段

三、Table API 的窗口聚合操作

3.1 通过一个例子了解Table API

3.2 关于group by

3.3 关于时间窗口

四、SQL 如何编写


Table API 是流处理和批处理通用的关系型 API,Table API 可以基于流输入或者批输入来运行而不需要进行任何修改。Table API 是 SQL 语言的超集并专门为 Apache Flink 设计的,Table API 是 Scala 和 Java 语言集成式的 API。与常规 SQL 语言中将查询指定为字符串不同,Table API 查询是以 Java 或 Scala 中的语言嵌入样式来定义的,具有 IDE 支持如:自动完成和语法检测。


一、需要引入的pom依赖


<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-table_2.11</artifactId>
 <version>1.7.2</version>
</dependency>


二、简单了解 Table API


def main(args: Array[String]): Unit = {
 val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
 val myKafkaConsumer: FlinkKafkaConsumer011[String] = 
MyKafkaUtil.getConsumer("ECOMMERCE")
 val dstream: DataStream[String] = env.addSource(myKafkaConsumer)
 val tableEnv: StreamTableEnvironment = 
TableEnvironment.getTableEnvironment(env)
 val ecommerceLogDstream: DataStream[EcommerceLog] = dstream.map{ 
jsonString => JSON.parseObject(jsonString,classOf[EcommerceLog]) }
 val ecommerceLogTable: Table = 
 tableEnv.fromDataStream(ecommerceLogDstream)
 val table: Table = ecommerceLogTable.select("mid,ch").filter("ch='appstore'")
 val midchDataStream: DataStream[(String, String)] = 
table.toAppendStream[(String,String)]
 midchDataStream.print()
 env.execute()
}


2.1 动态表


如果流中的数据类型是 case class 可以直接根据 case class 的结构生成 table

tableEnv.fromDataStream(ecommerceLogDstream)


或者根据字段顺序单独命名

tableEnv.fromDataStream(ecommerceLogDstream,’mid,’uid .......)


最后的动态表可以转换为流进行输出

table.toAppendStream[(String,String)]


2.2 字段


用一个单引放到字段前面来标识字段名, 如 ‘name , ‘mid ,’amount 等


三、Table API 的窗口聚合操作


3.1 通过一个例子了解Table API


//每 10 秒中渠道为 appstore 的个数
def main(args: Array[String]): Unit = {
 //sparkcontext
 val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
 //时间特性改为 eventTime
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val myKafkaConsumer: FlinkKafkaConsumer011[String] = 
MyKafkaUtil.getConsumer("ECOMMERCE")
 val dstream: DataStream[String] = env.addSource(myKafkaConsumer)
 val ecommerceLogDstream: DataStream[EcommerceLog] = dstream.map{ jsonString 
=>JSON.parseObject(jsonString,classOf[EcommerceLog]) }
 //告知 watermark 和 eventTime 如何提取
 val ecommerceLogWithEventTimeDStream: DataStream[EcommerceLog] = 
ecommerceLogDstream.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[EcommerceLog](Time.seconds(0L)) {
 override def extractTimestamp(element: EcommerceLog): Long = {
 element.ts
 }
 }).setParallelism(1)
 val tableEnv: StreamTableEnvironment = 
TableEnvironment.getTableEnvironment(env)
 //把数据流转化成 Table
 val ecommerceTable: Table = 
tableEnv.fromDataStream(ecommerceLogWithEventTimeDStream , 
'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ts.rowtime)
 //通过 table api 进行操作
 // 每 10 秒 统计一次各个渠道的个数 table api 解决
 //1 groupby 2 要用 window 3 用 eventtime 来确定开窗时间
 val resultTable: Table = ecommerceTable.
 window(Tumble over 10000.millis on 'ts as 'tt).groupBy('ch,'tt ).select( 'ch, 'ch.count)
 //把 Table 转化成数据流
 val resultDstream: DataStream[(Boolean, (String, Long))] = resultSQLTable.toRetractStream[(String,Long)]
 resultDstream.filter(_._1).print()
 env.execute()
}


3.2 关于group by


1.如果了使用 groupby,table 转换为流的时候只能用 toRetractDstream

val rDstream: DataStream[(Boolean, (String, Long))] = table
.toRetractStream[(String,Long)]


2.toRetractDstream 得到的第一个 boolean 型字段标识 true 就是最新的数据(Insert),false 表示过期老数据(Delete)

val rDstream: DataStream[(Boolean, (String, Long))] = table
.toRetractStream[(String,Long)]
 rDstream.filter(_._1).print()


3.如果使用的 api 包括时间窗口,那么窗口的字段必须出现在 groupBy 中。

val table: Table = ecommerceLogTable
.filter("ch ='appstore'")
.window(Tumble over 10000.millis on 'ts as 'tt)
.groupBy('ch ,'tt)
.select("ch,ch.count ")


3.3 关于时间窗口


1.用到时间窗口,必须提前声明时间字段,如果是 processTime 直接在创建动态表时进行追加就可以。

val ecommerceLogTable: Table = tableEnv
.fromDataStream( ecommerceLogWithEtDstream,
'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ps.proctime)


2.如果是 EventTime 要在创建动态表时声明

val ecommerceLogTable: Table = tableEnv
.fromDataStream(ecommerceLogWithEtDstream,
'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ts.rowtime)


3.滚动窗口可以使用 Tumble over 10000.millis on 来表示

val table: Table = ecommerceLogTable.filter("ch ='appstore'")
.window(Tumble over 10000.millis on 'ts as 'tt)
.groupBy('ch ,'tt)
.select("ch,ch.count ")


四、SQL 如何编写


def main(args: Array[String]): Unit = {
 //sparkcontext
 val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
 //时间特性改为 eventTime
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 val myKafkaConsumer: FlinkKafkaConsumer011[String] = 
MyKafkaUtil.getConsumer("ECOMMERCE")
 val dstream: DataStream[String] = env.addSource(myKafkaConsumer)
 val ecommerceLogDstream: DataStream[EcommerceLog] = dstream.map{ jsonString 
=>JSON.parseObject(jsonString,classOf[EcommerceLog]) }
 //告知 watermark 和 eventTime 如何提取
 val ecommerceLogWithEventTimeDStream: DataStream[EcommerceLog] = 
ecommerceLogDstream.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[EcommerceLog](Time.seconds(0L)) {
 override def extractTimestamp(element: EcommerceLog): Long = {
 element.ts
 }
 }).setParallelism(1)
 //SparkSession
 val tableEnv: StreamTableEnvironment = 
TableEnvironment.getTableEnvironment(env)
 //把数据流转化成 Table
 val ecommerceTable: Table = 
tableEnv.fromDataStream(ecommerceLogWithEventTimeDStream , 
'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ts.rowtime)
 //通过 table api 进行操作
 // 每 10 秒 统计一次各个渠道的个数 table api 解决
 //1 groupby 2 要用 window 3 用 eventtime 来确定开窗时间
 val resultTable: Table = ecommerceTable
  .window(Tumble over 10000.millis on 'ts as 'tt)
  .groupBy('ch,'tt )
   .select( 'ch, 'ch.count)
// 通过 sql 进行操作
 val resultSQLTable : Table = tableEnv.sqlQuery( "select ch ,count(ch) from 
 "+ecommerceTable+" group by ch ,Tumble(ts,interval '10' SECOND )")
 //把 Table 转化成数据流
 //val appstoreDStream: DataStream[(String, String, Long)] = 
appstoreTable.toAppendStream[(String,String,Long)]
 val resultDstream: DataStream[(Boolean, (String, Long))] = 
resultSQLTable.toRetractStream[(String,Long)]
 resultDstream.filter(_._1).print()
 env.execute()
}


///

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
打赏
0
0
0
0
89
分享
相关文章
Flink CDC YAML:面向数据集成的 API 设计
本文整理自阿里云智能集团 Flink PMC Member & Committer 徐榜江(雪尽)在 FFA 2024 分论坛的分享,涵盖四大主题:Flink CDC、YAML API、Transform + AI 和 Community。文章详细介绍了 Flink CDC 的发展历程及其优势,特别是 YAML API 的设计与实现,以及如何通过 Transform 和 AI 模型集成提升数据处理能力。最后,分享了社区动态和未来规划,欢迎更多开发者加入开源社区,共同推动 Flink CDC 的发展。
436 12
Flink CDC YAML:面向数据集成的 API 设计
|
6月前
|
RESTful API 设计与实现:C# 开发者的一分钟入门
【10月更文挑战第5天】本文从零开始,介绍了如何使用 C# 和 ASP.NET Core 设计并实现一个简单的 RESTful API。首先解释了 RESTful API 的概念及其核心原则,然后详细说明了设计 RESTful API 的关键步骤,包括资源识别、URI 设计、HTTP 方法选择、状态码使用和错误处理。最后,通过一个用户管理 API 的示例,演示了如何创建项目、定义模型、实现控制器及运行测试,帮助读者掌握 RESTful API 的开发技巧。
190 7
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
320 26
如何让SQL速度飞起来 入门YashanDB优化器
优化器,SQL引擎的核心组成部分,是数据库中用于把关系表达式转换成最优执行计划的核心组件,影响数据库系统执行性能的关键组件之一。
40 15
|
4月前
|
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
258 14
SQL数据库的使用指南:从入门到精通
随着信息技术的飞速发展,数据库已成为各类企业和组织不可或缺的一部分。作为最流行的数据库管理系统之一,SQL数据库广泛应用于各种场景,如数据存储、数据管理、数据分析等。本文将详细介绍SQL数据库的使用方法,帮助初学者快速入门,并帮助有经验的开发者深化理解。一、SQL数据库基础首先,我们需要理解SQL数
295 2
机器学习入门(五):KNN概述 | K 近邻算法 API,K值选择问题
机器学习入门(五):KNN概述 | K 近邻算法 API,K值选择问题
SQL数据库基础语法入门
[link](http://www.vvo.net.cn/post/082935.html)
|
6月前
|
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
112 0
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等