Flink SQL 功能解密系列 —— 维表 JOIN 与异步优化

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 流计算中一个常见的需求就是为数据流补齐字段。因为数据采集端采集到的数据往往比较有限,在做数据分析之前,就要先将所需的维度信息补全。比如采集到的交易日志中只记录了商品 id,但是在做业务时需要根据店铺维度或者行业纬度进行聚合,这就需要先将交易日志与商品维表进行关联,补全所需的维度信息。

引子

流计算中一个常见的需求就是为数据流补齐字段。因为数据采集端采集到的数据往往比较有限,在做数据分析之前,就要先将所需的维度信息补全。比如采集到的交易日志中只记录了商品 id,但是在做业务时需要根据店铺维度或者行业纬度进行聚合,这就需要先将交易日志与商品维表进行关联,补全所需的维度信息。这里所说的维表与数据仓库中的概念类似,是维度属性的集合,比如商品维,地点维,用户维等等。

在流计算中,这是一个典型的 stream-to-table jon 的问题。本文主要讲解在 Flink SQL 中是如何解决这个问题的,用户如何简单上手使用这个功能。

维表 JOIN 语法

由于维表是一张不断变化的表(静态表只是动态表的一种特例)。那如何 JOIN 一张不断变化的表呢?如果用传统的 JOIN 语法SELECT * FROM T JOIN dim_table on T.id = dim_table.id来表达维表 JOIN,是不完整的。因为维表是一直在更新变化的,如果用这个语法那么关联上的是哪个时刻的维表呢?我们是不知道的,结果是不确定的。所以 Flink SQL 的维表 JOIN 语法引入了 SQL:2011 Temporal Table 的标准语法,用来声明关联的是维表哪个时刻的快照。维表 JOIN 语法/示例如下。

假设我们有一个 Orders 订单数据流,希望根据产品 ID 补全流上的产品维度信息,所以需要跟 Products 维度表进行关联。Orders 和 Products 的 DDL 声明语句如下:

CREATE TABLE Orders (
  orderId VARCHAR,          -- 订单 id
  productId VARCHAR,        -- 产品 id
  units INT,                -- 购买数量
  orderTime TIMESTAMP       -- 下单时间
) with (
  type = 'tt',              -- tt 日志流
  ...
)

CREATE TABLE Products (
  productId VARCHAR,        -- 产品 id
  name VARCHAR,             -- 产品名称
  unitPrice DOUBLE          -- 单价
  PERIOD FOR SYSTEM_TIME,   -- 这是一张随系统时间而变化的表,用来声明维表
  PRIMARY KEY (productId)   -- 维表必须声明主键
) with (
  type = 'alihbase',        -- HBase 数据源
  ...
)

如上声明了一张来自 TT 的 Orders 订单数据流,和一张存储于 HBase 中的 Products 产品维表。为了补齐订单流的产品信息,需要 JOIN 维表,这里可以分为 JOIN 当前表和 JOIN 历史表。

JOIN 当前维表

SELECT *
FROM Orders AS o
[LEFT] JOIN Products FOR SYSTEM_TIME AS OF PROCTIME() AS p
ON o.productId = p.productId

Flink SQL 支持 LEFT JOIN 和 INNER JOIN 的维表关联。如上语法所示的,维表 JOIN 语法与传统的 JOIN 语法并无二异。只是 Products 维表后面需要跟上 FOR SYSTEM_TIME AS OF PROCTIME() 的关键字,其含义是每条到达的数据所关联上的是到达时刻的维表快照,也就是说,当数据到达时,我们会根据数据上的 key 去查询远程数据库,拿到匹配的结果后关联输出。这里的 PROCTIME 即 processing time。使用 JOIN 当前维表功能需要注意的是,如果维表插入了一条数据能匹配上之前左表的数据时,JOIN的结果流,不会发出更新的数据以弥补之前的未匹配。JOIN行为只发生在处理时间(processing time),即使维表中的数据都被删了,之前JOIN流已经发出的关联上的数据也不会被撤回或改变。

JOIN 历史维表

SELECT *
FROM Orders AS o
[LEFT] JOIN Products FOR SYSTEM_TIME AS OF o.orderTime AS p
ON o.productId = p.productId

有时候想关联上的维度数据,并不是当前时刻的值,而是某个历史时刻的值。比如,产品的价格一直在发生变化,订单流希望补全的是下单时的价格,而不是当前的价格,那就是 JOIN 历史维表。语法上只需要将上文的 PROCTIME() 改成 o.orderTime 即可。含义是关联上的是下单时刻的 Products 维表。

Flink 在获取维度数据时,会根据左流的时间去查对应时刻的快照数据。因此 JOIN 历史维表需要外部存储支持多版本存储,如 HBase,或者存储的数据中带有多版本信息。

注:JOIN 历史维表功能目前暂未开放

维表优化

在实际使用的过程中,会遇到许多性能问题。为了解决这些性能问题,我们做了大量的优化,性能得到大幅提升,在双11的洪峰下表现特别淡定。

我们的优化主要是为了解决两方面的问题:
1. 提高吞吐。维表的IO请求严重阻塞了数据流的计算处理。
2. 降低维表数据库读压力。如 HBase 也只能承受单机每秒 20 万的读请求。

Async IO

我在 《Flink 原理与实现:Aysnc I/O》 中介绍了 Async IO 的使用场景和实现原理。原始的维表 JOIN 是同步访问的方式,来一条数据,去数据库查询一次,等待返回后输出关联结果。可以发现网络等待时间极大地阻碍了吞吐和延迟。为了解决同步访问的问题,异步模式可以并发地处理多个请求和回复,从而连续的请求之间不需要阻塞等待。


缓存

数据库的维表查询请求,有大量相同 key 的重复请求。如何减少重复请求?本地缓存是常用的方案。Flink SQL 目前提供两种缓存方案:LRU 和 ALL。(详见文档

LRU

通过 cache='LRU'参数可以开启 LRU 缓存优化,Blink 会为每个 JoinTable 节点创建一个 LRU 本地缓存。当每个数据进来的时候,先去缓存中查询,如果存在则直接关联输出,减少了一次 IO 请求。如果不存在,再发起数据库查询请求(异步或同步方式),请求返回的结果会先存入缓存中以备下次查询。

为了防止缓存无限制增长,所以使用的是 LRU 缓存,并且可以通过 cacheSize 调整缓存的大小。为了定期更新维表数据,可以通过 cacheTTLMs 调整缓存的失效时间。cacheTTLMs 是作用于每条缓存数据上的,也就是某条缓存数据在指定 timeout 时间内没有被访问,则会从缓存中移除。

ALL

Async 和 LRU-Cache 能极大提高吞吐率并降低数据库的读压力,但是仍然会有大量的 IO 请求存在,尤其是当 miss key(维表中不存在的 key)很多的时候。如果维表数据不大(通常百万级以内),那么其实可以将整个维表缓存到本地。那么 miss key 永远不会去请求数据库。因为本地缓存就是维表的镜像,缓存中不存在那么远程数据库中也不存在。

ALL cache 可以通过 cache='ALL'参数开启,通过cacheTTLMs控制缓存的刷新间隔。Flink SQL 会为 JoinTable 节点起一个异步线程去同步缓存。在 Job 刚启动时,会先阻塞主数据流,直到缓存数据加载完毕,保证主数据流流过时缓存就已经 ready。在之后的更新缓存的过程中,不会阻塞主数据流。因为异步更新线程会将维表数据加载到临时缓存中,加载完毕后再与主缓存做原子替换。只有替换操作是加了锁的。

因为几乎没有 IO 操作,所以使用 cache ALL 的维表 JOIN 性能可以非常高。但是由于内存需要能同时容纳下两份维表拷贝,因此需要加大内存的配置。

缓存未命中 key

在使用 LRU 缓存时,如果存在大量的 invalid key ,或者数据库中不存在的 key。由于命中不了缓存,导致缓存的收益较低,仍然会有大量请求打到数据库。因此我们将未命中的 key 也加进了缓存,提高了未命中 key 和 invalid key 情况下的缓存命中率。

Distribute By 提高缓存命中率

默认 JoinTable 节点与上游节点之间的数据是通过 shuffle 传输的。这在缓存大小有限、key总量大、热点不明显的情况下, 缓存的收益可能较低。这种情况下可以将上游节点与 JoinTable 节点的数据传输改成按 key 分区。这样通常可以缩小单个节点的 key 个数,提高缓存的命中率。比如一段时间内 JoinTable 节点总共需要处理100万个key, 节点并发100, 在数据不倾斜时单节点平均只需处理1万个key = 100万/100并发. 如果不做 key 分区, 单节点实际处理的key个数可能远大于1万。使用上也非常简单,在维表的 DDL 参数中加上partitionedJoin='true'即可。

最佳实践

在使用维表 JOIN 时,如果维表数据不大,或者 miss key (维表中不存在的 key)非常多,则可以使用 ALL cache,但是可能需要适当调大节点的内存,因为内存需要能同时容纳下两份维表拷贝。如果用不了 ALL cache,则可以使用 Async + LRU 来提高节点的吞吐。

未来工作

  1. 使用 SideInput 减少对数据库的全量读取
  2. 引入 Partitioned-ALL-cache 支持超大维表
  3. 使用批量请求提高单次 IO 的吞吐
  4. Multi-Join 优化

流计算中维表 JOIN 是一个非常常见的需求,遇到的挑战也非常多。比如超大维表问题,ALL cache 无法装下整个维表。未来我们打算引入 Partitioned-ALL-cache,也就是上游数据到 JoinTable 节点根据 JOIN key 分区,那么每个节点只需要加载属于该分区key的缓存数据,从而做到了缓存的水平扩展。从而遇到超大维表时可以通过扩并发也能够全量缓存下维表数据。另外,ALL cache 现在每个节点是都会起一个线程去加载全量维表数据,如果有1000个节点,则会全量读数据库1000次。未来打算通过  Side Input功能做到只需要全量读取一次,维表数据会自动分发到各个节点。

另外,Async 极大地提高了吞吐,但是每一次 IO 请求只取了单 key 的数据,效率比较低。未来计划使用 Batch Get 来提高每次 IO 请求的吞吐。

目前在 Table API 上已经支持了 Multi-Join 的优化,能极大提高多维表连续 JOIN 时的性能,减少网络数据的传输开销。未来会在 SQL 上也支持 Multi-Join 的优化。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
166 15
|
17天前
|
SQL
SQL JOIN
【11月更文挑战第06天】
34 4
|
18天前
|
SQL 关系型数据库 MySQL
SQL中,可以使用 `ORDER BY` 子句来实现排序功能
【10月更文挑战第26天】SQL中,可以使用 `ORDER BY` 子句来实现排序功能
46 5
|
23天前
|
SQL 关系型数据库 MySQL
图解 SQL 里的各种 JOIN
用文氏图表示 SQL 里的各种 JOIN,一下子就理解了。
33 2
|
1月前
|
SQL 分布式计算 Java
Hadoop-11-MapReduce JOIN 操作的Java实现 Driver Mapper Reducer具体实现逻辑 模拟SQL进行联表操作
Hadoop-11-MapReduce JOIN 操作的Java实现 Driver Mapper Reducer具体实现逻辑 模拟SQL进行联表操作
31 3
|
19天前
|
SQL 关系型数据库 MySQL
MySql5.6版本开启慢SQL功能-本次采用永久生效方式
MySql5.6版本开启慢SQL功能-本次采用永久生效方式
33 0
|
1月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
45 0
|
2月前
|
SQL 运维 程序员
一个功能丰富的SQL审核查询平台
一个功能丰富的SQL审核查询平台
|
2月前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
74 2
|
2月前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
46 1

相关产品

  • 实时计算 Flink版