【Flink】(十二)Flink Table API 和 Flink SQL 编程(更新中....)2

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【Flink】(十二)Flink Table API 和 Flink SQL 编程(更新中....)2


五、读取文件创建表


TableEnvironment 可以调用.connect() 方法,连接外部系统,并调用.createTemporaryTable() 方法,在 Catalog 中注册表

tableEnv
  .connect(...)  // 定义表的数据来源,和外部系统建立连接
  .withFormat(...)  // 定义数据格式化方法
  .withSchema(...)  // 定义表结构
  .createTemporaryTable("MyTable")  // 创建临时表


可以创建Table来描述文件数据,它可以从文件中读取,或者将数据写入文件


image.png


可以看到,我们从txt文件中读出六条数据,并以三元组的形式进行输出。


image.png


六、读取Kafka数据创建表


消费Kafka数据


image.png


七、表的查询 - Table API & SQL


Table API 是集成在 Scala 和 Java 语言内的查询API


Table API 基于代表“表”的Table类,并提供一整套操作处理的方法API;这些方法会返回一个新的Table对象,表示对输入表应用转换操作的结果


有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构

val sensorTable:Table = tableEnv.form("inputTable")
val resultTable:Table = sensorTable
  .select("id,temperature")
  .filter("id = 'sensor_1'")


image.png



测试结果如下:


image.png


true / false —> 表示数据是否是新增 or 撤回回收 。


SQL 查询示例:



八、表和流的相互转换


将 DataStream 转换成表


对于一个DataStream,可以直接转换成Table,进而方便地调用 Table API 做转换操作

val dataStream:DataStream[SensorReading] = ...
val sensorTable:Table = tableEnv.fromDataStream(dataStream)


默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来

val dataStream:DataStream[SensorReading] = ...
val sensorTable = tableEnv.fromDataStream(dataStream,
      'id,'timestamp,'temperature)


数据类型与Schema的对应


DataStream 中的数据类型,与表的 Schema 之间的对应关系,可以有两种:基于字段名称,或者基于字段位置


基于名称(name-based)

val sensorTable = tableEnv.formDataStream(
        'timestamp as 'ts,'id as 'myId,'temperature)


基于位置(position-based)


val sensorTable = tableEnv.from


创建临时视图(Temporary View)


基于 DataStream 创建临时视图

tableEnv.createTemporaryView("sensorView",dataStream)
tableEnv.create



相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
安全 Java API
告别繁琐编码,拥抱Java 8新特性:Stream API与Optional类助你高效编程,成就卓越开发者!
【8月更文挑战第29天】Java 8为开发者引入了多项新特性,其中Stream API和Optional类尤其值得关注。Stream API对集合操作进行了高级抽象,支持声明式的数据处理,避免了显式循环代码的编写;而Optional类则作为非空值的容器,有效减少了空指针异常的风险。通过几个实战示例,我们展示了如何利用Stream API进行过滤与转换操作,以及如何借助Optional类安全地处理可能为null的数据,从而使代码更加简洁和健壮。
115 0
|
2月前
|
网络协议 API Windows
MASM32编程调用 API函数RtlIpv6AddressToString,windows 10 容易,Windows 7 折腾
MASM32编程调用 API函数RtlIpv6AddressToString,windows 10 容易,Windows 7 折腾
|
1月前
|
IDE API 定位技术
Python--API编程:IP地址翻译成实际的物理地址
Python--API编程:IP地址翻译成实际的物理地址
|
3月前
|
JavaScript API 开发者
RESTful API 设计的传奇征程:突破常规,拥抱最佳实践,铸就编程巅峰!
【8月更文挑战第7天】希望通过以上的探讨,能让您对 RESTful API 设计有更深入的理解和认识。
53 5
|
3月前
|
JSON API 数据库
神秘编程力量来袭!Rails 究竟隐藏着怎样的魔力,能构建出强大的 RESTful API?快来一探究竟!
【8月更文挑战第31天】《构建 RESTful API:使用 Rails 进行服务端开发》介绍了如何利用 Ruby on Rails 框架高效构建可扩展的 RESTful API。Rails 采用“约定优于配置”,简化开发流程,通过示例展示了路由定义、控制器设计及模型层交互等内容,帮助开发者快速搭建稳定可靠的服务端。无论小型项目还是大型应用,Rails 均能提供强大支持,提升开发效率。
29 0
|
4月前
|
SQL Oracle 关系型数据库
CREATE TABLE 时的 SQL FOREIGN KEY 约束
【7月更文挑战第24天】CREATE TABLE 时的 SQL FOREIGN KEY 约束。
56 5
|
4月前
|
SQL Oracle 关系型数据库
CREATE TABLE 时的 SQL FOREIGN KEY 约束
【7月更文挑战第19天】CREATE TABLE 时的 SQL FOREIGN KEY 约束
41 8
|
4月前
|
SQL Oracle 关系型数据库
ALTER TABLE 时的 SQL PRIMARY KEY 约束
【7月更文挑战第24天】ALTER TABLE 时的 SQL PRIMARY KEY 约束。
44 3
|
4月前
|
SQL Oracle 关系型数据库
CREATE TABLE 时的 SQL PRIMARY KEY 约束
【7月更文挑战第24天】CREATE TABLE 时的 SQL PRIMARY KEY 约束。
43 2
|
4月前
|
SQL 关系型数据库 MySQL
ALTER TABLE 时的 SQL DEFAULT 约束
【7月更文挑战第20天】ALTER TABLE 时的 SQL DEFAULT 约束。
42 1
下一篇
无影云桌面