【Flink】(十)Flink Table API 和 Flink SQL 入门

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【Flink】(十)Flink Table API 和 Flink SQL 入门

文章目录


一、需要引入的pom依赖

二、简单了解 Table API

2.1 动态表

2.2 字段

三、Table API 的窗口聚合操作

3.1 通过一个例子了解Table API

3.2 关于group by

3.3 关于时间窗口

四、SQL 如何编写


Table API 是流处理和批处理通用的关系型 API,Table API 可以基于流输入或者批输入来运行而不需要进行任何修改。Table API 是 SQL 语言的超集并专门为 Apache Flink 设计的,Table API 是 Scala 和 Java 语言集成式的 API。与常规 SQL 语言中将查询指定为字符串不同,Table API 查询是以 Java 或 Scala 中的语言嵌入样式来定义的,具有 IDE 支持如:自动完成和语法检测。


一、需要引入的pom依赖


<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-table_2.11</artifactId>
 <version>1.7.2</version>
</dependency>


二、简单了解 Table API


def main(args: Array[String]): Unit = {
 val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
 val myKafkaConsumer: FlinkKafkaConsumer011[String] = 
MyKafkaUtil.getConsumer("ECOMMERCE")
 val dstream: DataStream[String] = env.addSource(myKafkaConsumer)
 val tableEnv: StreamTableEnvironment = 
TableEnvironment.getTableEnvironment(env)
 val ecommerceLogDstream: DataStream[EcommerceLog] = dstream.map{ 
jsonString => JSON.parseObject(jsonString,classOf[EcommerceLog]) }
 val ecommerceLogTable: Table = 
 tableEnv.fromDataStream(ecommerceLogDstream)
 val table: Table = ecommerceLogTable.select("mid,ch").filter("ch='appstore'")
 val midchDataStream: DataStream[(String, String)] = 
table.toAppendStream[(String,String)]
 midchDataStream.print()
 env.execute()
}


2.1 动态表


如果流中的数据类型是 case class 可以直接根据 case class 的结构生成 table

tableEnv.fromDataStream(ecommerceLogDstream)


或者根据字段顺序单独命名

tableEnv.fromDataStream(ecommerceLogDstream,’mid,’uid .......)


最后的动态表可以转换为流进行输出

table.toAppendStream[(String,String)]


2.2 字段


用一个单引放到字段前面来标识字段名, 如 ‘name , ‘mid ,’amount 等


三、Table API 的窗口聚合操作


3.1 通过一个例子了解Table API


//每 10 秒中渠道为 appstore 的个数
def main(args: Array[String]): Unit = {
 //sparkcontext
 val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
 //时间特性改为 eventTime
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val myKafkaConsumer: FlinkKafkaConsumer011[String] = 
MyKafkaUtil.getConsumer("ECOMMERCE")
 val dstream: DataStream[String] = env.addSource(myKafkaConsumer)
 val ecommerceLogDstream: DataStream[EcommerceLog] = dstream.map{ jsonString 
=>JSON.parseObject(jsonString,classOf[EcommerceLog]) }
 //告知 watermark 和 eventTime 如何提取
 val ecommerceLogWithEventTimeDStream: DataStream[EcommerceLog] = 
ecommerceLogDstream.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[EcommerceLog](Time.seconds(0L)) {
 override def extractTimestamp(element: EcommerceLog): Long = {
 element.ts
 }
 }).setParallelism(1)
 val tableEnv: StreamTableEnvironment = 
TableEnvironment.getTableEnvironment(env)
 //把数据流转化成 Table
 val ecommerceTable: Table = 
tableEnv.fromDataStream(ecommerceLogWithEventTimeDStream , 
'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ts.rowtime)
 //通过 table api 进行操作
 // 每 10 秒 统计一次各个渠道的个数 table api 解决
 //1 groupby 2 要用 window 3 用 eventtime 来确定开窗时间
 val resultTable: Table = ecommerceTable.
 window(Tumble over 10000.millis on 'ts as 'tt).groupBy('ch,'tt ).select( 'ch, 'ch.count)
 //把 Table 转化成数据流
 val resultDstream: DataStream[(Boolean, (String, Long))] = resultSQLTable.toRetractStream[(String,Long)]
 resultDstream.filter(_._1).print()
 env.execute()
}


3.2 关于group by


1.如果了使用 groupby,table 转换为流的时候只能用 toRetractDstream

val rDstream: DataStream[(Boolean, (String, Long))] = table
.toRetractStream[(String,Long)]


2.toRetractDstream 得到的第一个 boolean 型字段标识 true 就是最新的数据(Insert),false 表示过期老数据(Delete)

val rDstream: DataStream[(Boolean, (String, Long))] = table
.toRetractStream[(String,Long)]
 rDstream.filter(_._1).print()


3.如果使用的 api 包括时间窗口,那么窗口的字段必须出现在 groupBy 中。

val table: Table = ecommerceLogTable
.filter("ch ='appstore'")
.window(Tumble over 10000.millis on 'ts as 'tt)
.groupBy('ch ,'tt)
.select("ch,ch.count ")


3.3 关于时间窗口


1.用到时间窗口,必须提前声明时间字段,如果是 processTime 直接在创建动态表时进行追加就可以。

val ecommerceLogTable: Table = tableEnv
.fromDataStream( ecommerceLogWithEtDstream,
'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ps.proctime)


2.如果是 EventTime 要在创建动态表时声明

val ecommerceLogTable: Table = tableEnv
.fromDataStream(ecommerceLogWithEtDstream,
'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ts.rowtime)


3.滚动窗口可以使用 Tumble over 10000.millis on 来表示

val table: Table = ecommerceLogTable.filter("ch ='appstore'")
.window(Tumble over 10000.millis on 'ts as 'tt)
.groupBy('ch ,'tt)
.select("ch,ch.count ")


四、SQL 如何编写


def main(args: Array[String]): Unit = {
 //sparkcontext
 val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
 //时间特性改为 eventTime
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 val myKafkaConsumer: FlinkKafkaConsumer011[String] = 
MyKafkaUtil.getConsumer("ECOMMERCE")
 val dstream: DataStream[String] = env.addSource(myKafkaConsumer)
 val ecommerceLogDstream: DataStream[EcommerceLog] = dstream.map{ jsonString 
=>JSON.parseObject(jsonString,classOf[EcommerceLog]) }
 //告知 watermark 和 eventTime 如何提取
 val ecommerceLogWithEventTimeDStream: DataStream[EcommerceLog] = 
ecommerceLogDstream.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[EcommerceLog](Time.seconds(0L)) {
 override def extractTimestamp(element: EcommerceLog): Long = {
 element.ts
 }
 }).setParallelism(1)
 //SparkSession
 val tableEnv: StreamTableEnvironment = 
TableEnvironment.getTableEnvironment(env)
 //把数据流转化成 Table
 val ecommerceTable: Table = 
tableEnv.fromDataStream(ecommerceLogWithEventTimeDStream , 
'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ts.rowtime)
 //通过 table api 进行操作
 // 每 10 秒 统计一次各个渠道的个数 table api 解决
 //1 groupby 2 要用 window 3 用 eventtime 来确定开窗时间
 val resultTable: Table = ecommerceTable
  .window(Tumble over 10000.millis on 'ts as 'tt)
  .groupBy('ch,'tt )
   .select( 'ch, 'ch.count)
// 通过 sql 进行操作
 val resultSQLTable : Table = tableEnv.sqlQuery( "select ch ,count(ch) from 
 "+ecommerceTable+" group by ch ,Tumble(ts,interval '10' SECOND )")
 //把 Table 转化成数据流
 //val appstoreDStream: DataStream[(String, String, Long)] = 
appstoreTable.toAppendStream[(String,String,Long)]
 val resultDstream: DataStream[(Boolean, (String, Long))] = 
resultSQLTable.toRetractStream[(String,Long)]
 resultDstream.filter(_._1).print()
 env.execute()
}


///

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
30天前
|
SQL 分布式计算 测试技术
Flink API的4个层次
【2月更文挑战第28天】
|
1月前
|
SQL 资源调度 Oracle
Flink CDC产品常见问题之sql运行中查看日志任务失败如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
消息中间件 SQL Kafka
如何高效接入 Flink: Connecter / Catalog API 核心设计与社区进展
本文整理自阿里云实时计算团队 Apache Flink Committer 和 PMC Member 任庆盛在 FFA 2023 核心技术专场(二)中的分享。
269 0
如何高效接入 Flink: Connecter / Catalog API 核心设计与社区进展
|
1月前
|
分布式计算 API 数据处理
Flink【基础知识 01】(简介+核心架构+分层API+集群架构+应用场景+特点优势)(一篇即可大概了解flink)
【2月更文挑战第15天】Flink【基础知识 01】(简介+核心架构+分层API+集群架构+应用场景+特点优势)(一篇即可大概了解flink)
56 1
|
1月前
|
SQL 关系型数据库 MySQL
Flink 提供了一种名为 Flink SQL 的查询语言,它支持多种数据库之间的 DDL 语句转换
【2月更文挑战第18天】Flink 提供了一种名为 Flink SQL 的查询语言,它支持多种数据库之间的 DDL 语句转换
171 2
|
1月前
|
SQL 存储 Apache
在 Apache Flink SQL 中,并没有内置的 GROUP_CONCAT 函数
【2月更文挑战第16天】在 Apache Flink SQL 中,并没有内置的 GROUP_CONCAT 函数
186 2
|
1月前
|
SQL 分布式计算 HIVE
基于 Kyuubi 实现分布式 Flink SQL 网关
本文整理自网易互娱资深开发工程师、Apache Kyuubi Committer 林小铂的《基于 Kyuubi 实现分布式 Flink SQL 网关》分享。
104432 64
基于 Kyuubi 实现分布式 Flink SQL 网关
|
2月前
|
SQL 数据采集 JSON
弱结构化日志 Flink SQL 怎么写?SLS SPL 来帮忙
弱结构化日志 Flink SQL 怎么写?SLS SPL 来帮忙
125151 136
|
2月前
|
SQL 监控 API
Flink SQL支持写判断语句
【2月更文挑战第8天】Flink SQL支持写判断语句
234 12
|
2月前
|
SQL 消息中间件 Kafka
flink问题之做实时数仓sql保证分topic区有序如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
704 3