【Flink】(十二)Flink Table API 和 Flink SQL 编程(更新中....)2

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【Flink】(十二)Flink Table API 和 Flink SQL 编程(更新中....)2


五、读取文件创建表


TableEnvironment 可以调用.connect() 方法,连接外部系统,并调用.createTemporaryTable() 方法,在 Catalog 中注册表

tableEnv
  .connect(...)  // 定义表的数据来源,和外部系统建立连接
  .withFormat(...)  // 定义数据格式化方法
  .withSchema(...)  // 定义表结构
  .createTemporaryTable("MyTable")  // 创建临时表


可以创建Table来描述文件数据,它可以从文件中读取,或者将数据写入文件


image.png


可以看到,我们从txt文件中读出六条数据,并以三元组的形式进行输出。


image.png


六、读取Kafka数据创建表


消费Kafka数据


image.png


七、表的查询 - Table API & SQL


Table API 是集成在 Scala 和 Java 语言内的查询API


Table API 基于代表“表”的Table类,并提供一整套操作处理的方法API;这些方法会返回一个新的Table对象,表示对输入表应用转换操作的结果


有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构

val sensorTable:Table = tableEnv.form("inputTable")
val resultTable:Table = sensorTable
  .select("id,temperature")
  .filter("id = 'sensor_1'")


image.png



测试结果如下:


image.png


true / false —> 表示数据是否是新增 or 撤回回收 。


SQL 查询示例:



八、表和流的相互转换


将 DataStream 转换成表


对于一个DataStream,可以直接转换成Table,进而方便地调用 Table API 做转换操作

val dataStream:DataStream[SensorReading] = ...
val sensorTable:Table = tableEnv.fromDataStream(dataStream)


默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来

val dataStream:DataStream[SensorReading] = ...
val sensorTable = tableEnv.fromDataStream(dataStream,
      'id,'timestamp,'temperature)


数据类型与Schema的对应


DataStream 中的数据类型,与表的 Schema 之间的对应关系,可以有两种:基于字段名称,或者基于字段位置


基于名称(name-based)

val sensorTable = tableEnv.formDataStream(
        'timestamp as 'ts,'id as 'myId,'temperature)


基于位置(position-based)


val sensorTable = tableEnv.from


创建临时视图(Temporary View)


基于 DataStream 创建临时视图

tableEnv.createTemporaryView("sensorView",dataStream)
tableEnv.create



相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
2月前
|
SQL 存储 大数据
Dataphin V5.0:支持创建异步调用API,实现慢 SQL 复杂计算的直连消费
本文介绍了数据服务产品中异步调用的应用场景与优势,包括大数据引擎查询、复杂SQL及大规模数据下载等场景,解决了同步调用可能导致的资源浪费和性能问题。通过创建异步API、测试发布以及权限申请等功能,实现高效稳定的服务提供。以电商订单查询为例,展示了如何利用异步调用提升系统性能与用户体验。
119 9
|
4月前
|
机器学习/深度学习 设计模式 API
Python 高级编程与实战:构建 RESTful API
本文深入探讨了使用 Python 构建 RESTful API 的方法,涵盖 Flask、Django REST Framework 和 FastAPI 三个主流框架。通过实战项目示例,详细讲解了如何处理 GET、POST 请求,并返回相应数据。学习这些技术将帮助你掌握构建高效、可靠的 Web API。
|
4月前
|
机器学习/深度学习 开发框架 API
Python 高级编程与实战:深入理解 Web 开发与 API 设计
在前几篇文章中,我们探讨了 Python 的基础语法、面向对象编程、函数式编程、元编程、性能优化、调试技巧以及数据科学和机器学习。本文将深入探讨 Python 在 Web 开发和 API 设计中的应用,并通过实战项目帮助你掌握这些技术。
|
4月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
Flink CDC YAML:面向数据集成的 API 设计
100 5
|
5月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
本文整理自阿里云智能集团 Flink PMC Member & Committer 徐榜江(雪尽)在 FFA 2024 分论坛的分享,涵盖四大主题:Flink CDC、YAML API、Transform + AI 和 Community。文章详细介绍了 Flink CDC 的发展历程及其优势,特别是 YAML API 的设计与实现,以及如何通过 Transform 和 AI 模型集成提升数据处理能力。最后,分享了社区动态和未来规划,欢迎更多开发者加入开源社区,共同推动 Flink CDC 的发展。
570 12
Flink CDC YAML:面向数据集成的 API 设计
|
9月前
|
IDE API 定位技术
Python--API编程:IP地址翻译成实际的物理地址
Python--API编程:IP地址翻译成实际的物理地址
159 0
|
10月前
|
网络协议 API Windows
MASM32编程调用 API函数RtlIpv6AddressToString,windows 10 容易,Windows 7 折腾
MASM32编程调用 API函数RtlIpv6AddressToString,windows 10 容易,Windows 7 折腾
|
11月前
|
API Java 数据库连接
从平凡到卓越:Hibernate Criteria API 让你的数据库查询瞬间高大上,彻底告别复杂SQL!
【8月更文挑战第31天】构建复杂查询是数据库应用开发中的常见需求。Hibernate 的 Criteria API 以其强大和灵活的特点,允许开发者以面向对象的方式构建查询逻辑,同时具备 SQL 的表达力。本文将介绍 Criteria API 的基本用法并通过示例展示其实际应用。此 API 通过 API 构建查询条件而非直接编写查询语句,提高了代码的可读性和安全性。无论是简单的条件过滤还是复杂的分页和连接查询,Criteria API 均能胜任,有助于提升开发效率和应用的健壮性。
334 0
|
SQL Apache 流计算
Flink table&SQL 的使用
Flink table&SQL 的使用
81 0
|
SQL 关系型数据库 MySQL
Flink教程(16)- Flink Table与SQL
Flink教程(16)- Flink Table与SQL
457 0

热门文章

最新文章