通量电容器?流式SQL中的时态表和连接

本文涉及的产品
容器镜像服务 ACR,镜像仓库100个 不限时长
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: 简单介绍Flink流式处理数据的特性,以例子讲解Flink建模时态数据。

在1985年的电影《回到未来》中,由发明家Doc Brown设计的神器磁通电容器(flux capacitor)让Marty Mcfly拥有了穿越时空的能力。而数据库一般是具有ACID四大特性,需要考虑时间对数据库的影响。一直以来,弄清楚如何管理和建模时间数据以进行有效的时间点分析是一项长期的研究过程,最早可以追溯到80年代早期,2011年才引入了SQL标准中的时态表(temporal tables)。到目前为止,用户注定要将其作为应用程序逻辑的一部分来实现,这通常会延长开发生命周期以及损失代码的可维护性。此外,虽然没有统一的、普遍接受的时态数据( temporal data)定义,但它所代表的挑战是相同的:如何根据动态变化的历史数据集验证或丰富数据?

出租车费和转换率

例如:如果出租车票价事件与乘坐地点的当地货币挂钩,我们尽可能的希望将票价转换为通用货币以便进一步处理。这是由于汇率在一段时间内的波动会很大,为了产生可靠的结果,每个出租车费用事件都需要与事件发生时的有效汇率相匹配。

使用Flink建模时态数据

在1.7版本中,Flink已将时态表的概念引入其流式SQL和表API中:仅附加表的参数化视图、或者仅允许插入记录、从不更新或删除记录的任何表,这些表被解释为变更日志,并将数据与时间上下文紧密相关,以便只能在特定时间段内将其解释为有效。将流转换为时态表需要以下两步:

  • 定义主键版本控制字段,可用于跟踪随时间发生的更改;
  • 将流公开为时间表函数,将每个时间点映射到静态关系。

回到上面的示例用例中,时态表正是我们对汇率数据进行建模所需要的,例如对时间点查询有用。临时表函数是作为Flink通用表函数类的扩展实现的,并且可以用与表API或SQL解析器一起使用来定义。

import org.apache.flink.table.functions.TemporalTableFunction;
 
(...)
 
// Get the stream and table environments.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);
 
// Provide a sample static data set of the rates history table.
List <Tuple2<String, Long>>ratesHistoryData =new ArrayList<>();
 
ratesHistoryData.add(Tuple2.of("USD", 102L)); 
ratesHistoryData.add(Tuple2.of("EUR", 114L)); 
ratesHistoryData.add(Tuple2.of("YEN", 1L)); 
ratesHistoryData.add(Tuple2.of("EUR", 116L)); 
ratesHistoryData.add(Tuple2.of("USD", 105L));
 
// Create and register an example table using the sample data set.
DataStream<Tuple2<String, Long>> ratesHistoryStream = env.fromCollection(ratesHistoryData);
 
Table ratesHistory = tEnv.fromDataStream(ratesHistoryStream, "r_currency, r_rate, r_proctime.proctime");
 
tEnv.registerTable("RatesHistory", ratesHistory);
 
// Create and register the temporal table function "rates".
// Define "r_proctime" as the versioning field and "r_currency" as the primary key.
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime", "r_currency");
 
tEnv.registerFunction("Rates", rates);
 
(...)

实际上,这个Rate函数的作用是什么呢?假设我们想检查给定时间的汇率,比如11:00这一时刻。我们可以简单地通过下面的sql语句得到答案:

SELECT * FROM Rates('11:00');

时间点查询

尽管Flink还不支持使用常量时间属性参数查询时态表函数,但这些函数可用于覆盖更有趣的场景:时态表连接。

使用临时表流式连接

时态表在与流数据结合使用时发挥了其全部潜力。例如,为应用程序提供动力,这些应用程序必须针对一个参考数据集连续地列出白名单,该数据集随着审计或法规遵从时间而发生变化。由于计算成本和资源的消耗,高效连接长期以来一直是查询处理器面临的持久挑战,但在流式数据上的连接带来了一些额外的挑战:

  • 流的无边界特性意味着输入被连续评估,并且中间连接结果可以无限地消耗内存资源。Flink优雅地管理其内存消耗(即使对于连接需要溢出到磁盘的较重情况),并支持时间窗口连接以限制需要保持为状态的数据量;
  • 流数据可能是乱序的或者有延迟,因此不可能预先强制执行排序,并且时间处理需要一些思考以避免不必要的输出和撤销。

在时间数据的特定情况下,时间窗连接是不够的(至少在不进行一些代价调整的情况下是不够的)。这会导致迟早会发生每个参考记录将落在窗口之外并从状态擦除的情况,不再正在考虑将来的联接结果。为了解决这个限制,Flink引入了对时态表连接的支持,以涵盖时变关系。

出租车费与转换率之间的时间表联系

探测端(Taxi Fare)上仅附加表中的每个记录与构建端(Conversion Rate)上的时间表中的记录版本相连接,该版本与探测端记录时间属性(time)找最接近匹配的主键(currency)值。还记得之前注册的时态表函数(Rates)吗?现在可用于将此连接表达为一个简单的SQL语句,否则需要使用子查询复杂语句。

定期加入与时间表加入

时态表连接既支持处理语义, 也支持事件时间语义,并有效地限制保持在状态中的数据量,同时还允许构建端上任意旧的记录,这与时间窗口连接相反。探测端记录只需要在很短的时间内保持状态,以确保存在无序记录时正确语义。本节开头提到的问题可以通过以下方式克服:

  • 缩小连接的范围:对于给定的taxiFare.time只有时间匹配版本的ratesHistory可见;
  • 从状态中修剪不需要的记录:在当前时间和水印(watermark))延迟之间的记录对于探测端和构建端都是持久的。一旦水印到达并且结果被发出,这些将被丢弃,允许连接操作在时间上向前移动,构建表在状态下“刷新”其版本。

结论

根据上面的内容可以总结到,可以使用Flink在关系和时变参数中能够表达丰富的连续流,而不必涉及语法拼写或者对性能有所影响。换句话说:流式时间旅行无需磁通电容器。将此语法扩展到批处理,以使用适当的(事件)时间语义来丰富历史数据,这也是Flink路线图的一部分!

如果想在使用Flink SQL(通常是Flink SQL)连接流方面获得一些实际操作实践,这里有一个免费培训项目,培训环境基于Docker,只需几分钟即可建立。

作者信息

本文由阿里云开发者社区组织翻译。

文章原标题《Flux capacitor, huh? Temporal Tables and Joins in Streaming SQL》

作者:morsapaes

译者:海棠

文章为简译,更为详细的内容,请查看原文

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
2月前
|
SQL IDE Java
Java连接SQL Server数据库的详细操作流程
Java连接SQL Server数据库的详细操作流程
|
2月前
|
SQL 关系型数据库 MySQL
mysql sql语句删除一个库下的所有表
mysql sql语句删除一个库下的所有表
31 1
|
6天前
|
SQL Java 数据库连接
JDBC连接SQL Server2008 完成增加、删除、查询、修改等基本信息基本格式及示例代码
这篇文章提供了使用JDBC连接SQL Server 2008数据库进行增加、删除、查询和修改操作的基本步骤和示例代码。
|
5天前
|
SQL 监控 数据库
SQL Server如何判断哪些会话/连接是长连接?
【8月更文挑战第14天】在SQL Server中,判断长连接可通过活动监视器查看持续时间和状态;查询`sys.dm_exec_sessions`获取持续时间超阈值的会话;利用性能监视器跟踪“User Connections”计数器变化;审查应用代码中连接池配置;或分析网络流量寻找持久连接。这些方法有助于管理和优化连接。
|
24天前
|
SQL 监控 关系型数据库
PolarDB产品使用问题之SQL防火墙怎么拦截没有指定WHERE条件的特定表的SQL语
PolarDB产品使用合集涵盖了从创建与管理、数据管理、性能优化与诊断、安全与合规到生态与集成、运维与支持等全方位的功能和服务,旨在帮助企业轻松构建高可用、高性能且易于管理的数据库环境,满足不同业务场景的需求。用户可以通过阿里云控制台、API、SDK等方式便捷地使用这些功能,实现数据库的高效运维与持续优化。
|
6天前
|
SQL 安全 Java
驱动程序无法通过使用安全套接字层(SSL)加密与 SQL Server 建立安全连接。错误:“The server selected protocol version TLS10 is not accepted by client
驱动程序无法通过使用安全套接字层(SSL)加密与 SQL Server 建立安全连接。错误:“The server selected protocol version TLS10 is not accepted by client
35 0
|
1月前
|
SQL 分布式计算 DataWorks
MaxCompute操作报错合集之使用sql查询一个表的分区数据时遇到报错,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
2月前
|
SQL 存储 测试技术
|
29天前
|
SQL 存储 数据库
MySQL设计规约问题之如何处理日志类型的表
MySQL设计规约问题之如何处理日志类型的表
|
1月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何使用Flink SQL连接带有Kerberos认证的Hive
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。