Flink-Table-SQL系列之source

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: source作为Table&SQL API的数据源,同时也是程序的入口。当前Flink的Table&SQL API整体而言支持三种source:Table source、DataSet以及DataStream,它们都通过特定的API注册到Table环境对象。

source作为Table&SQL API的数据源,同时也是程序的入口。当前Flink的Table&SQL API整体而言支持三种source:Table source、DataSet以及DataStream,它们都通过特定的API注册到Table环境对象。

我们先来看Table source,它直接以表对象作为source。这里的表对象可细分为:

  • Flink以Table类定义的关系表对象,通过TableEnvironment的registerTable方法注册;
  • 外部source经过桥接而成的表对象,基础抽象为TableSource,通过具体环境对象的registerTableSource;

下图展示了,Table source被注册时,对应的内部转化图(虚线表示对应关系):

Table-source-inner-map

由上图可见,不管是直接注册Table对象还是注册外部source,在内部都直接对应了特定的XXXTable对象。

TableSource trait针对Streaming和Batch分部扩展有两个trait,它们是StreamTableSource和BatchTableSource,它们各自都提供了从数据源转换为核心对象(DataStream跟DataSource)的方法。

除了这三个基本的trait之外,还有一些特定对source的需求以独立的trait提供以方便实现者自行组合,比如ProjectableTableSource这一trait,它支持将Projection下推(push-down)到TableSource。Flink内置实现的CsvTableSource就继承了这一trait。

当前Flink所支持的TableSource大致上分为两类:

  • CsvTableSouce:同时可用于Batch跟Streaming模式;
  • kafka系列TableSource:包含Kafka的各个版本(0.8,0.9,0.10)以及各种不同的格式(Json、Avro),基本上它们只支持Streaming模式,它们都依赖于各种kafka的connector;

使用方式如下:

// specify JSON field names and types
val typeInfo = Types.ROW(
  Array("id", "name", "score"),
  Array(Types.INT, Types.STRING, Types.DOUBLE)
)

val kafkaTableSource = new Kafka08JsonTableSource(
    kafkaTopic,
    kafkaProperties,
    typeInfo)
tableEnvironment.registerTableSource("kafka-source", kafkaTableSource);

CsvTableSource的构建方式如下:

val csvTableSource = CsvTableSource
    .builder
    .path("/path/to/your/file.csv")
    .field("name", Types.STRING)
    .field("id", Types.INT)
    .field("score", Types.DOUBLE)
    .field("comments", Types.STRING)
    .fieldDelimiter("#")
    .lineDelimiter("$")
    .ignoreFirstLine
    .ignoreParseErrors
    .commentPrefix("%")

除了以TableSource作为Table&SQL的source,还支持通过特定的环境对象直接注册DataStream、DataSet。注册DataStream的示例如下:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

val cust = env.fromElements(...)
val ord = env.fromElements(...)

// register the DataStream cust as table "Customers" with fields derived from the datastream
tableEnv.registerDataStream("Customers", cust)

// register the DataStream ord as table "Orders" with fields user, product, and amount
tableEnv.registerDataStream("Orders", ord, 'user, 'product, 'amount)

注册DataSet的示例如下:

val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

val cust = env.fromElements(...)
val ord = env.fromElements(...)

// register the DataSet cust as table "Customers" with fields derived from the dataset
tableEnv.registerDataSet("Customers", cust)

// register the DataSet ord as table "Orders" with fields user, product, and amount
tableEnv.registerDataSet("Orders", ord, 'user, 'product, 'amount)

以上,通过调用环境对象的registerXXX方法是一种显式注册的方式,除此之外,还有隐式注册方式。隐式注册方式,通过对DataStream跟DataSet对象增加的toTable方法来实现,使用方式示例如下:

val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// read a DataSet from an external source
val ds: DataSet[(Long, String, Integer)] = env.readCsvFile(...)

val table = ds.toTable(tableEnv, 'user, 'product, 'amount)
val result = tableEnv.sql(
  s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")

我们知道DataStream跟DataSet原先是没有toTable API的,如何为它们增加该API的呢?答案是利用了Scala的包对象(package object),该特性主要用于兼容旧版本的库或对某些类型的API进行增强。具体而言,toTable API其实是实现在DataSetConversions和DataStreamConversions两个类中,然后在包对象中对他们进行实例化。而定位到toTable的实现时,会看到它们其实是间接调用了特定环境对象的fromDataStream/fromDataSet方法并将当前的DataStream跟DataSet传递给这两个方法并通过方法返回得到Table对象。fromDataStream/fromDataSet方法对在实现时会调用跟registerDataStream/registerDataSet方法对相同的内部注册方法。

fromDataStream/fromDataSet方法通常主要的场景在于为DataStream/DataSet转换为Table对象提供便利,它本身也进行了隐式注册。然而,你也可以对得到的Table对象,再次调用registerTable来进行显式注册,不过通常没有必要。

因此,综合而言,注册DataStream跟DataSet的对应关系如下:

DataSet-DataStream-inner-table-mapping

以上我们已经分析了所有的Table source的注册方式,有多种register系列方法并最终对应了内部各种XXXTable对象。稍显混乱,其实这些XXXTable对象是有联系的,并且所有的register系列方法最终都调用了TableEnvironment的registerTableInternal方法。因此其实注册Table source的内部原理是一致的,我们来分析一下。

TableEnvironment内部会以一个SchemaPlus类型的数据结构,它是Calcite中的数据结构,用来存储被注册的表、函数等在内的一系列对象(这些对象统称为Calcite中的Schema)。由此可见它无法直接接受Flink自定义的类似于TableSouce这样的对象,那么这里存在一个问题就是两个框架的衔接问题。这就是Flink定义那么多内部XXXTable类型的原因之一,我们来梳理一下它们之间的关系如下:

table-source-mapping-and-relationship

上图中的XXXTable对象同时以括号标明了在注册时它是由什么对象转化而来。


原文发布时间为:2017-06-12

本文作者:vinoYang

本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
129 15
|
13天前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
37 0
|
1月前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
66 2
|
1月前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
40 1
|
2月前
|
SQL 流计算
Flink SQL 在快手实践问题之由于meta信息变化导致的state向前兼容问题如何解决
Flink SQL 在快手实践问题之由于meta信息变化导致的state向前兼容问题如何解决
44 1
|
2月前
|
SQL 安全 流计算
Flink SQL 在快手实践问题之Group Window Aggregate 中的数据倾斜问题如何解决
Flink SQL 在快手实践问题之Group Window Aggregate 中的数据倾斜问题如何解决
67 1
|
2月前
|
SQL 资源调度 流计算
慢sql治理问题之在 Flink 中, userjar 分发问题如何优化
慢sql治理问题之在 Flink 中, userjar 分发问题如何优化
|
2月前
|
SQL 设计模式 数据处理
Flink SQL 在快手实践问题之状态兼容的终极方案特点内容如何解决
Flink SQL 在快手实践问题之状态兼容的终极方案特点内容如何解决
20 0
|
2月前
|
SQL 存储 流计算
Flink SQL 在快手实践问题之表示 Mini-Batch hint如何解决
Flink SQL 在快手实践问题之表示 Mini-Batch hint如何解决
35 0