【Flink】(十二)Flink Table API 和 Flink SQL 编程(更新中....)2

简介: 【Flink】(十二)Flink Table API 和 Flink SQL 编程(更新中....)2


五、读取文件创建表


TableEnvironment 可以调用.connect() 方法,连接外部系统,并调用.createTemporaryTable() 方法,在 Catalog 中注册表

tableEnv
  .connect(...)  // 定义表的数据来源,和外部系统建立连接
  .withFormat(...)  // 定义数据格式化方法
  .withSchema(...)  // 定义表结构
  .createTemporaryTable("MyTable")  // 创建临时表


可以创建Table来描述文件数据,它可以从文件中读取,或者将数据写入文件


image.png


可以看到,我们从txt文件中读出六条数据,并以三元组的形式进行输出。


image.png


六、读取Kafka数据创建表


消费Kafka数据


image.png


七、表的查询 - Table API & SQL


Table API 是集成在 Scala 和 Java 语言内的查询API


Table API 基于代表“表”的Table类,并提供一整套操作处理的方法API;这些方法会返回一个新的Table对象,表示对输入表应用转换操作的结果


有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构

val sensorTable:Table = tableEnv.form("inputTable")
val resultTable:Table = sensorTable
  .select("id,temperature")
  .filter("id = 'sensor_1'")


image.png



测试结果如下:


image.png


true / false —> 表示数据是否是新增 or 撤回回收 。


SQL 查询示例:



八、表和流的相互转换


将 DataStream 转换成表


对于一个DataStream,可以直接转换成Table,进而方便地调用 Table API 做转换操作

val dataStream:DataStream[SensorReading] = ...
val sensorTable:Table = tableEnv.fromDataStream(dataStream)


默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来

val dataStream:DataStream[SensorReading] = ...
val sensorTable = tableEnv.fromDataStream(dataStream,
      'id,'timestamp,'temperature)


数据类型与Schema的对应


DataStream 中的数据类型,与表的 Schema 之间的对应关系,可以有两种:基于字段名称,或者基于字段位置


基于名称(name-based)

val sensorTable = tableEnv.formDataStream(
        'timestamp as 'ts,'id as 'myId,'temperature)


基于位置(position-based)


val sensorTable = tableEnv.from


创建临时视图(Temporary View)


基于 DataStream 创建临时视图

tableEnv.createTemporaryView("sensorView",dataStream)
tableEnv.create



相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
9月前
|
SQL 存储 大数据
Dataphin V5.0:支持创建异步调用API,实现慢 SQL 复杂计算的直连消费
本文介绍了数据服务产品中异步调用的应用场景与优势,包括大数据引擎查询、复杂SQL及大规模数据下载等场景,解决了同步调用可能导致的资源浪费和性能问题。通过创建异步API、测试发布以及权限申请等功能,实现高效稳定的服务提供。以电商订单查询为例,展示了如何利用异步调用提升系统性能与用户体验。
385 9
|
存储 API 网络架构
【Azure 存储服务】调用REST API获取Stroage Account Table中所有的Entity计数 -- Count
【Azure 存储服务】调用REST API获取Stroage Account Table中所有的Entity计数 -- Count
180 1
|
11月前
|
SQL 数据库连接 Linux
数据库编程:在PHP环境下使用SQL Server的方法。
看看你吧,就像一个调皮的小丑鱼在一片广阔的数据库海洋中游弋,一路上吞下大小数据如同海中的珍珠。不管有多少难关,只要记住这个流程,剩下的就只是探索未知的乐趣,沉浸在这个充满挑战的数据库海洋中。
342 16
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
本文整理自阿里云智能集团 Flink PMC Member & Committer 徐榜江(雪尽)在 FFA 2024 分论坛的分享,涵盖四大主题:Flink CDC、YAML API、Transform + AI 和 Community。文章详细介绍了 Flink CDC 的发展历程及其优势,特别是 YAML API 的设计与实现,以及如何通过 Transform 和 AI 模型集成提升数据处理能力。最后,分享了社区动态和未来规划,欢迎更多开发者加入开源社区,共同推动 Flink CDC 的发展。
803 12
Flink CDC YAML:面向数据集成的 API 设计
|
SQL 存储 API
Flink Materialized Table:构建流批一体 ETL
本文整理自阿里云智能集团 Apache Flink Committer 刘大龙老师在2024FFA流批一体论坛的分享,涵盖三部分内容:数据工程师用户故事、Materialized Table 构建流批一体 ETL 及 Demo。文章通过案例分析传统 Lambda 架构的挑战,介绍了 Materialized Table 如何简化流批处理,提供统一 API 和声明式 ETL,实现高效的数据处理和维护。最后展示了基于 Flink 和 Paimon 的实际演示,帮助用户更好地理解和应用这一技术。
924 7
Flink Materialized Table:构建流批一体 ETL
|
11月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
Flink CDC YAML:面向数据集成的 API 设计
553 5
|
11月前
|
SQL 存储 API
Flink Materialized Table:构建流批一体 ETL
Flink Materialized Table:构建流批一体 ETL
234 3
|
SQL 数据挖掘 Python
数据分析编程:SQL,Python or SPL?
数据分析编程用什么,SQL、python or SPL?话不多说,直接上代码,对比明显,明眼人一看就明了:本案例涵盖五个数据分析任务:1) 计算用户会话次数;2) 球员连续得分分析;3) 连续三天活跃用户数统计;4) 新用户次日留存率计算;5) 股价涨跌幅分析。每个任务基于相应数据表进行处理和计算。
|
存储 JSON API
【Azure 存储服务】使用REST API操作Azure Storage Table,删除数据(Delete Entity)
【Azure 存储服务】使用REST API操作Azure Storage Table,删除数据(Delete Entity)
174 0
【Azure 存储服务】使用REST API操作Azure Storage Table,删除数据(Delete Entity)
|
API Java 数据库连接
从平凡到卓越:Hibernate Criteria API 让你的数据库查询瞬间高大上,彻底告别复杂SQL!
【8月更文挑战第31天】构建复杂查询是数据库应用开发中的常见需求。Hibernate 的 Criteria API 以其强大和灵活的特点,允许开发者以面向对象的方式构建查询逻辑,同时具备 SQL 的表达力。本文将介绍 Criteria API 的基本用法并通过示例展示其实际应用。此 API 通过 API 构建查询条件而非直接编写查询语句,提高了代码的可读性和安全性。无论是简单的条件过滤还是复杂的分页和连接查询,Criteria API 均能胜任,有助于提升开发效率和应用的健壮性。
511 0

热门文章

最新文章