Flink关系型API的公共部分

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 关系型程序的公共部分 下面的代码段展示了Table&SQL API所编写流式程序的程序模式: val env = StreamExecutionEnvironment.getExecutionEnvironment //创建TableEnvironment对象 val tableEnv = TableEnvironment.

关系型程序的公共部分

下面的代码段展示了Table&SQL API所编写流式程序的程序模式:

val env = StreamExecutionEnvironment.getExecutionEnvironment

//创建TableEnvironment对象
val tableEnv = TableEnvironment.getTableEnvironment(env)

//注册表
tableEnv.registerTable("table1", ...)           //或者
tableEnv.registerTableSource("table2", ...)     //或者
tableEnv.registerExternalCatalog("extCat", ...) 

//基于Table API的查询创建Table对象
val tapiResult = tableEnv.scan("table1").select(...)
//从SQL查询创建Table
val sqlResult  = tableEnv.sql("SELECT ... FROM table2 ...")

//将Table API的查询到的结果表输出到TableSink,SQL查询到的结果表同样如此
tapiResult.writeToSink(...)

//触发执行
env.execute()

通过分步解读以上代码段,我们可以发现一个关系型的Flink程序大致分为如下几步:

  • 构建环境对象
  • 注册表、catalog相关的信息(source部分)
  • 调用Table&SQL API创建表、对表进行查询
  • 得到结果表的数据并输出(sink部分)
  • 调用环境对象的execute方法触发程序执行

这几步中,跟关系型API有关的2 ~ 4步,我们发现在一个关系型的程序中,用户既可以混合使用Table&SQL API,并且除了后端环境对象的不同,TableEnvironment相关的部分在API层面上具有相同的抽象,也就是说,一套程序主体既可以适用于batch模式也可以适用于Streaming模式,这对用户而言也许更具吸引力。接下来,我们将对关系型API相关的几步进行解读。

TableEnvironment

跟streaming和batch程序一样,关系型程序也会要求先构建一个环境对象。因为Flink致力于为streaming和batch提供统一的关系型API,因此关系型程序只有唯一的环境对象TableEnvironment。

但具体到一些内部实现上,streaming跟batch还是有着较大的差异。所以,TableEnvironment针对两者又扩展了StreamTableEnvironment和BatchTableEnvironment这两个抽象类。这两个类主要提供streaming和batch的特定语义,比如提供DataSet、DataStream跟Table之间的转换。

最终的关系型程序中,原先streaming跟batch的环境对象和TableEnvironment对象都是必须的,它们承担着不同的职责:

  • streaming/batch 环境对象:辅助构建Table环境对象、触发程序执行调用、构建DataStream、DataSet;
  • TTableEnvironment对象:构建关系型程序的主体逻辑;

catalog

在Calcite中存在多个概念,其中一个概念就是“catalog”。从关系型的观点上来看,catalog处于所有的schema(外部的、概念上的、内部的)以及mapping(外部与概念以及概念与内部之间)之上的[1]。从SQL标准的角度来看,catalog在一个SQL环境中被称之为schame的集合,一个SQL环境包含零个或多个catalog,而一个catalog包含一个或多个schema(总是会包含一个名为“INFORMATION_SCHEMA”的schema)[2]。

在Calcite中,catalog定义了可在SQL查询中被访问的元数据跟命名空间。其中包含了如下几个概念:

  • Schema:一个定义了模式与表的集合,可被任意地嵌套
  • Table:表示一个单独的数据集,字段通过RelDataType来定义
  • RelDataType:表示在数据集中的字段,支持所有的SQL数据类型,包括结构体与数组
  • Statistic:提供用于优化的表统计信息

以一个SQL查询为例,来认识一下catalog中包含的那些概念:

在接下来讲source的这一小节,我们将看到被注册进Table&SQL API中当作“表”使用的对象,最终都会被转换为Calcite所识别的Table对象并加入其Schema中。

另外,Flink允许用户注册外部的catalog以提供如何访问外部数据库的相关信息,通过TableEnvironment对象的registerExternalCatalog方法即可注入。外部的catalog必须继承ExternalCatalog这一trait,它相当于外部数据库跟Table&SQL API的一个连接器。而Table&SQL API某种程度上又充当了外部catalog跟Calcite的连接器,整个桥接模式如下图所示:

对应到代码实现上来,Flink会通过一个ExternalCatalogSchema类来完成跟Calcite的catalog API的对接,包括注册跟获取catalog以及内部的子schema等。示例代码如下:

//获得TableEnvironment对象
val tableEnv = TableEnvironment.getTableEnvironment(env)

//创建一个外部的catalog
val catalog: ExternalCatalog = new InMemoryExternalCatalog

//注册
tableEnv.registerExternalCatalog("InMemCatalog", catalog)

一旦外部的catalog被注册到环境对象,Table&SQL API都可以以类似于“catalog.database.table”这样的全限定名来访问表等信息。当前Flink提供了一个基于内存的ExternalCatalogSchema的实现:InMemoryExternalCatalog,它内部维护了两个映射:

  • 数据库映射:数据库名对应ExternalCatalog实例;
  • 表映射:表名对应ExternalCatalogTable实例;

source

source作为Table&SQL API的数据源,同时也是程序的入口。当前Flink的Table&SQL API整体而言支持三种source:Table source、DataSet以及DataStream,它们都通过特定的API注册到Table环境对象。

我们先来看Table source,它直接以表对象作为source。这里的表对象可细分为:

  • Flink以Table类定义的关系表对象,通过TableEnvironment的registerTable方法注册;
  • 外部source经过桥接而成的表对象,基础抽象为TableSource,通过具体环境对象的registerTableSource;

下图展示了,Table source被注册时,对应的内部转化图(虚线表示对应关系):

由上图可见,不管是直接注册Table对象还是注册外部source,在内部都直接对应了特定的XXXTable对象。

TableSource trait针对Streaming和Batch分部扩展有两个trait,它们是StreamTableSource和BatchTableSource,它们各自都提供了从数据源转换为核心对象(DataStream跟DataSource)的方法。

除了这三个基本的trait之外,还有一些特定对source的需求以独立的trait提供以方便实现者自行组合,比如ProjectableTableSource这一trait,它支持将Projection下推(push-down)到TableSource。Flink内置实现的CsvTableSource就继承了这一trait。

当前Flink所支持的TableSource大致上分为两类:

  • CsvTableSouce:同时可用于Batch跟Streaming模式;
  • kafka系列TableSource:包含Kafka的各个版本(0.8,0.9,0.10)以及各种不同的格式(Json、Avro),基本上它们只支持Streaming模式,它们都依赖于各种kafka的connector;

使用方式如下:

// specify JSON field names and types
val typeInfo = Types.ROW(
  Array("id", "name", "score"),
  Array(Types.INT, Types.STRING, Types.DOUBLE)
)

val kafkaTableSource = new Kafka08JsonTableSource(
    kafkaTopic,
    kafkaProperties,
    typeInfo)
tableEnvironment.registerTableSource("kafka-source", kafkaTableSource);

CsvTableSource的构建方式如下:

val csvTableSource = CsvTableSource
    .builder
    .path("/path/to/your/file.csv")
    .field("name", Types.STRING)
    .field("id", Types.INT)
    .field("score", Types.DOUBLE)
    .field("comments", Types.STRING)
    .fieldDelimiter("#")
    .lineDelimiter("$")
    .ignoreFirstLine
    .ignoreParseErrors
    .commentPrefix("%")
    .build

除了以TableSource作为Table&SQL的source,还支持通过特定的环境对象直接注册DataStream、DataSet。注册DataStream的示例如下:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

val cust = env.fromElements(...)
val ord = env.fromElements(...)

// register the DataStream cust as table "Customers" with fields derived from the datastream
tableEnv.registerDataStream("Customers", cust)

// register the DataStream ord as table "Orders" with fields user, product, and amount
tableEnv.registerDataStream("Orders", ord, 'user, 'product, 'amount)

注册DataSet的示例如下:

val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

val cust = env.fromElements(...)
val ord = env.fromElements(...)

// register the DataSet cust as table "Customers" with fields derived from the dataset
tableEnv.registerDataSet("Customers", cust)

// register the DataSet ord as table "Orders" with fields user, product, and amount
tableEnv.registerDataSet("Orders", ord, 'user, 'product, 'amount)

以上,通过调用环境对象的register[DataStream/DataSet]方法是一种显式注册的方式,除此之外,还有隐式注册方式。隐式注册方式,通过对DataStream跟DataSet对象增加的toTable方法来实现,使用方式示例如下:

val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// read a DataSet from an external source
val ds: DataSet[(Long, String, Integer)] = env.readCsvFile(...)

val table = ds.toTable(tableEnv, 'user, 'product, 'amount)
val result = tableEnv.sql(
  s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")

我们知道DataStream跟DataSet原先是没有toTable API的,如何为它们增加该API的呢?答案是利用了Scala的包对象(package object),该特性主要用于兼容旧版本的库或对某些类型的API进行增强。具体而言,toTable API其实是实现在DataSetConversions和DataStreamConversions两个类中,然后在包对象中对他们进行实例化。而定位到toTable的实现时,会看到它们其实是间接调用了特定环境对象的fromDataStream/fromDataSet方法并将当前的DataStream跟DataSet传递给这两个方法并通过方法返回得到Table对象。fromDataStream/fromDataSet方法对在实现时会调用跟registerDataStream/registerDataSet方法对相同的内部注册方法。

fromDataStream/fromDataSet方法通常主要的场景在于为DataStream/DataSet转换为Table对象提供便利,它本身也进行了隐式注册。然而,你也可以对通过这对方法得到的Table对象,再次调用registerTable来进行显式注册,不过通常没有必要。

因此,综合而言,注册DataStream跟DataSet的对应关系如下:

以上我们已经分析了所有的Table source的注册方式,有多种register系列方法并最终对应了内部各种XXXTable对象。稍显混乱,其实这些XXXTable对象是有联系的,并且所有的register系列方法最终都调用了TableEnvironment的registerTableInternal方法。因此其实注册Table source的内部原理是一致的,我们来分析一下。

TableEnvironment内部会以一个SchemaPlus类型的数据结构,它是Calcite中的数据结构,用来存储被注册的表、函数等在内的一系列对象(这些对象统称为Calcite中的Schema)。由此可见它无法直接接受Flink自定义的类似于TableSouce这样的对象,那么这里存在一个问题就是两个框架的衔接问题。这就是Flink定义那么多内部XXXTable类型的原因之一,我们来梳理一下它们之间的关系如下:

上图中的XXXTable对象同时以括号标明了在注册时它是由什么对象转化而来。

sink

sink其实跟source是反向的,一个是将数据源接入进来,另一个是将数据写到外部。因此,我们对比着source来看sink,当你实现一个Table&SQL程序并希望将处理之后的结果输出到外部。通常有以下几种方式:

  • 在Table对象上调用writeToSink API,它接收一个TableSink的实例;
  • 将Table再次转换为DataSet/DataStream,然后像输出DataSet/DataStream一样的方式来处理;

TableSink根据后端模式的差别,提供了两种实现:针对batch的BatchTableSink以及针对streaming的多种sink,它们拥有不同的特征,列举如下:

  • AppendStreamTableSink:它只支持插入变更,如果Table对象同时有更新和删除的变更,那么将会抛出TableException;
  • RetractStreamTableSink:它支持输出一个streaming模式的表,该表上可以有插入、更新和删除变更;
  • UpsertStreamTableSink:它支持输出一个streaming模式的表,该表上可以有插入、更新和删除变更,且还要求表要么有唯一的键字段要么是append-only模式的,如果两者都不满足,将抛出TableException;

跟source一样,内置的CsvTableSink同时兼具streaming跟batch的语义。

TableSink主要通过Table 的writeToSink API对外提供能力,然而最终的实现主要还是在特定的环境对象上。对BatchTableSink而言,BatchTableEnvironment会将具体的Table对象转换为DataSet,然后输出:

//将Table翻译为DataSet
val result: DataSet[T] = translate(table)(outputType)
//将DataSet给TableSink以使其输出
batchSink.emitDataSet(result)

针对streaming的各种sink则会在StreamTableEnvironment中挨个枚举不同的sink类型进行处理。但步骤跟BatchTableSink类似:先翻译为DataStream然后输出。

在source中可以直接从DataSet/DataStream转换为Table对象一样,同样从Table对象也可以转换为DataSet/DataStream对象。它们的实现手段都是类似的,通过package object对Table API进行增强,以使得Table 具备toDataSet/toXXXStream的API,最终由特定环境对象的toDataSet/toXXXStream方法完整具体的任务。

我们以CsvTableSink来分析一下,具体的emit是如何实现的,概况来讲有两步:

  1. 对数据集或数据流应用map运算符以CsvFormatter格式化器进行格式化;
  2. 再调用DataSet、DataStream的writeAsText sink到文件系统;

真正复杂的是各个Table环境对象中的translate方法,它们用于将Table翻译为DataSet/DataStream,这其中包含将相关的Table API调用以及SQL查询所对应的关系型的表达式树转换成DataSet/DataStream特定的运算符。这并不是本节的重点,我们将在后续对此进行介绍。


原文发布时间为:2017-07-13

本文作者:vinoYang

本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
6月前
|
SQL 分布式计算 测试技术
概述Flink API中的4个层次
【7月更文挑战第14天】Flink的API分为4个层次:核心底层API(如ProcessFunction)、DataStream/DataSet API、Table API和SQL。
|
7月前
|
SQL 关系型数据库 API
实时计算 Flink版产品使用问题之如何使用stream api
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
Kubernetes Oracle 关系型数据库
实时计算 Flink版操作报错合集之用dinky在k8s上提交作业,会报错:Caused by: org.apache.flink.table.api.ValidationException:,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
312 0
|
7月前
|
SQL 存储 API
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(5)
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】
|
7月前
|
SQL 消息中间件 Java
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(4)
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】
|
18天前
|
JSON 前端开发 搜索推荐
关于商品详情 API 接口 JSON 格式返回数据解析的示例
本文介绍商品详情API接口返回的JSON数据解析。最外层为`product`对象,包含商品基本信息(如id、name、price)、分类信息(category)、图片(images)、属性(attributes)、用户评价(reviews)、库存(stock)和卖家信息(seller)。每个字段详细描述了商品的不同方面,帮助开发者准确提取和展示数据。具体结构和字段含义需结合实际业务需求和API文档理解。
|
5天前
|
供应链 数据挖掘 API
1688app 商品详情接口系列(1688API)
1688作为国内知名批发采购平台,提供了一系列商品详情接口(API),助力企业和开发者获取商品基础、价格、库存及供应商信息。通过Python示例代码展示如何调用这些接口,应用场景涵盖采购决策辅助、数据分析与市场调研、电商平台整合及供应链管理系统的优化,为企业和采购商提供有力的数据支持,提升业务效率和竞争力。
42 15
|
3天前
|
搜索推荐 数据挖掘 API
微店商品详情接口(微店API系列)
微店商品详情接口是微店API的重要组成部分,帮助开发者和商家获取商品的详细信息(如标题、价格、库存等),并将其集成到应用程序或数据分析系统中。该接口支持HTTP GET/POST请求,返回JSON/XML格式数据,需通过AppKey和AppSecret进行身份验证和签名加密。应用场景包括商品信息同步、数据分析与市场调研、个性化推荐系统等,助力商业决策和业务拓展。
27 13
|
12天前
|
JSON 搜索推荐 API
京东店铺所有商品接口系列(京东 API)
本文介绍如何使用Python调用京东API获取店铺商品信息。前期需搭建Python环境,安装`requests`库并熟悉`json`库的使用。接口采用POST请求,参数包括`app_key`、`method`、`timestamp`、`v`、`sign`和业务参数`360buy_param_json`。通过示例代码展示如何生成签名并发送请求。应用场景涵盖店铺管理、竞品分析、数据统计及商品推荐系统,帮助商家优化运营和提升竞争力。
55 23
|
4天前
|
JSON 数据挖掘 开发者
1688 商品评论接口系列(1688API)
1688商品评论接口助力电商数据分析与优化。通过该接口,开发者可获取指定商品的评论数据(如昵称、内容、评分等),支持情感分析和质量反馈收集。接口采用HTTP GET/POST请求,返回JSON格式数据。Python示例代码展示如何调用接口并处理响应。应用场景包括商家产品优化、客户服务提升、市场调研及电商平台数据分析。