Regular Joins
Interval Joins
Temporal Joins
Lookup Join
Regular Joins
常规 JOIN(Regular JOIN)是语法最简单的一类 JOIN,和传统数据库的 JOIN 语法完全一致。对于左表和右表的任何变动,都会触发实时计算和更新,因此它的结果是“逐步逼近”最终的精确值,也就是下游可能看到变来变去的结果。为了支持结果的更新,下游目的表需要 定义主键 (PRIMARY KEY NOT ENFORCED)。
不支持时间窗口以及时间属性,任何一侧数据流有更改都是可见的,直接影响整个 join 结果。如果有一侧数据流增加一个新纪录,那么它将会把另一侧的所有的过去和将来的数据合并在一起,因为regular join 没有剔除策略,这就影响最新输出的结果; 正因为历史数据不会被清理,所以 regular join 支持数据流的任何更新操作(插入、更新、删除)。
这种 JOIN 要求 JOIN 两边数据都永久保留在 Flink state 中,才能保证输出结果的准确性,这将导致 State 的无限膨胀。
可以配置 state 的TTL(time-to-live:table.exec.state.ttl)来避免其无限增长,但请注意这可能会影响查询结果的准备性。
使用语法:目前,仅支持等值连接,INNER JOIN,LEFT JOIN,RIGHT JOIN,FULL OUTER JOIN
SELECT columns FROM t1 [AS <alias1>] [LEFT/INNER/FULL OUTER] JOIN t2 ON t1.column1 = t2.key-name1
适用场景:离线场景和小数据量场景。
Interval Joins
时间区间 JOIN 是另一种关联策略,它与上述的常规 JOIN 不同之处在于,左右表仅在某个时间范围(给定上界和下界)内进行关联,且只支持普通 Append 数据流,不支持含 Retract 的动态表。如下图(来自 Flink 官方文档)。它的好处是由于给定了关联的区间,因此只需要保留很少的状态,内存压力较小。但是缺点是如果关联的数据晚到或者早到,导致落不到 JOIN 区间内,就可能导致结果不准确。此外,只有当区间过了以后,JOIN 结果才会输出,因此会有一定的延迟存在。
如何设置边界条件:
right.timestamp ∈ [left.timestamp + lowerBound, left.timestamp + upperBound]
Regular Join 会产生回撤流,但是在实时数仓中一般写入的 sink 都是类似于 Kafka 这样的消息队列,然后后面接 clickhouse 等引擎,这些引擎又不具备处理回撤流的能力。 Interval Join 就是用于消灭回撤流的。
使用interval join,需要定义好时间属性字段(即处理时间或事件时间), 且将该时间戳字段用作 WATERMARK FOR 语句指定的时间字段。如果表实在没有时间戳字段,则可以使用 PROCTIME() 函数来生成一个Processing Time。如果定义的是 Processing Time,则Flink 框架本身根据系统划分的时间窗口定时清理数据;如果定义的是 Event Time,Flink 框架分配 Event Time 窗口并根据设置的 watermark 来清理数据。
特别注意:请不要直接使用未定义 WATERMARK 或 PROCTIME() 的原始 TIMESTAMP 类型字段,否则可能会退回到上述的 “常规 JOIN”。
使用语法:
--写法1 SELECT columns FROM t1 [AS <alias1>] [LEFT/INNER/FULL OUTER] JOIN t2 ON t1.column1 = t2.key-name1 AND t1.timestamp BETWEEN t2.timestamp AND BETWEEN t2.timestamp + + INTERVAL '10' MINUTE; --写法2 SELECT columns FROM t1 [AS <alias1>] [LEFT/INNER/FULL OUTER] JOIN t2 ON t1.column1 = t2.key-name1 AND t2.timestamp <= t1.timestamp and t1.timestamp <= t2.timestamp + + INTERVAL ’10' MINUTE ;
以下谓词是有效区间连接条件的示例:
ltime = rtime
ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
对于流式查询,与常规联接相比,间隔联接仅支持具有时间属性的仅追加表。由于时间属性是准单调递增的,因此 Flink 可以从其状态中移除旧值而不影响结果的正确性。
使用案例:
CREATE TABLE `Order` ( id INT, product_id INT, quantity INT, order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'datagen', 'fields.id.kind' = 'sequence', 'fields.id.start' = '1', 'fields.id.end' = '100000', 'fields.product_id.min' = '1', 'fields.product_id.max' = '100', 'rows-per-second' = '1' ); CREATE TABLE `Product` ( id INT, name VARCHAR, price DOUBLE, record_time TIMESTAMP(3), WATERMARK FOR record_time AS record_time, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'datagen', 'fields.id.min' = '1', 'fields.id.max' = '100', 'rows-per-second' = '1' ); CREATE TABLE `OrderDetails` ( id INT, product_name VARCHAR, total_price DOUBLE, order_time TIMESTAMP ) WITH ( 'connector' = 'print' ); INSERT INTO `OrderDetails` SELECT o.id, p.name, o.quantity * p.price, o.order_time FROM `Order` o, `Product` p WHERE o.product_id = p.id AND o.order_time BETWEEN p.record_time - INTERVAL '5' MINUTE AND p.record_time;
适用场景:双流join场
Temporal Joins
Temporal table 是随时间演变的表——在 Flink 中也称为动态表。时态表中的行与一个或多个时态周期相关联,并且所有 Flink 表都是时态的(动态的)。时态表包含一个或多个版本表快照,它可以是跟踪更改的更改历史表(例如数据库更改日志,包含所有快照)或实现更改的更改维度表(例如包含最新快照的数据库表)
.
Flink SQL 流批一体的核心是:流表二象性。围绕这一核心有若干概念,例如,动态表(Dynamic Table)/时态表(Temporal Table)、版本(Version)、版本表(Version Table)、普通表、连续查询、物化视图/虚拟视图、CDC(Change Data Capture)、Changelog Stream。版本: 时态表可以划分成一系列带版本的表快照集合,表快照中的版本代表了快照中所有记录的有效区间,有效区间的开始时间和结束时间可以通过用户指定,根据时态表是否可以追踪自身的历史版本与否,时态表可以分为 版本表 和 普通表。
版本表: 如果时态表中的记录可以追踪和并访问它的历史版本,这种表我们称之为版本表,来自数据库的 changelog 可以定义成版本表。
普通表: 如果时态表中的记录仅仅可以追踪并和它的最新版本,这种表我们称之为普通表,来自数据库 或 HBase 的表可以定义成普通表。
在 Flink 中,定义了主键约束和事件时间属性的表就是版本表。
-- 定义一张版本表 CREATE TABLE product_changelog ( product_id STRING, product_name STRING, product_price DECIMAL(10, 4), update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, PRIMARY KEY(product_id) NOT ENFORCED, -- (1) 定义主键约束 WATERMARK FOR update_time AS update_time -- (2) 通过 watermark 定义事件时间 ) WITH ( 'connector' = 'kafka', 'topic' = 'products', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = 'localhost:9092', 'value.format' = 'debezium-json' );
Flink 也支持定义版本视图只要一个视图包含主键和事件时间便是一个版本视图。
CREATE VIEW versioned_rates AS SELECT currency, rate, currency_time -- (1) `currency_time` 保留了事件时间 FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY currency -- (2) `currency` 是去重 query 的 unique key,可以作为主键 ORDER BY currency_time DESC) AS rowNum FROM RatesHistory ) WHERE rowNum = 1;
时态表 JOIN 分为 事件时间(Event Time) 和 处理时间(Processing Time)两种类型,且只支持 INNER 和 LEFT JOIN。
由于时态表 JOIN 需要得知不同时刻下右表的不同版本,因此它的右表必须是 Changelog 动态表(即 Upsert、Retract 数据流,而非 Append 数据流),且两侧的源表都必须定义 WATERMARK FOR。随着 Watermark 水位推进,Flink 可以逐步清理失效的数据,因此时态表 JOIN 的内存压力相对也不大。此外,还要求时态表的主键必须包含在 JOIN 等值条件中。
时态表是仅追加(append-only)表上的参数化视图,它将append-only表的行解释为表的变更日志,并在特定时间点提供该表的特定版本。将append- only表解释为变更日志需要指定主键属性和时间戳属性。主键确定覆盖哪些行,时间戳确定行有效的时间。
Flink使用FOR SYSTEM_TIME AS OF的SQL语法查询时态表,该语法于SQL:2011中提出。
SELECT [column_list] FROM table1 [AS <alias1>] [LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>] ON table1.column-name1 = table2.column-name1 # 例如: SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '10:15'; currency rate ======== ====== US Dollar 102 Euro 114 Yen 1
与Regular Joins相比,尽管构建端发生了变化,但之前的时态表结果不会受到影响。与Interval Joins 相比,时态表连接不定义记录将在其中连接的时间窗口。
来自探测端的记录总是在时间属性指定的时间与构建端的版本连接。
因此,构建端的行可能是任意旧的。随着时间的推移,不再需要的记录版本(对于给定的主键)将从状态中删除。
时态表总结:
Temporal Table 可提供历史某个时间点上的数据。
Temporal Table 根据时间来跟踪版本。
Temporal Table 需要提供时间属性和主键,join时主键必须在关联条件中。
Temporal Table 一般和关键词 LATERAL TABLE 结合使用。
Temporal Table 在基于 ProcessingTime 时间属性处理时,每个主键只保存最新版本的数据。
Temporal Table 在基于 EventTime 时间属性处理时,每个主键保存从上个 Watermark 到当前系统时间的所有版本。
Append-Only 表 Join 右侧 Temporal Table ,本质上还是左表驱动 Join ,即从左表拿到 Key ,根据 Key 和时间(可能是历史时间)去右侧 Temporal Table 表中查询。
Temporal Table Join 目前只支持 Inner Join和 LEFT JOIN。
Temporal Table Join 时,右侧 Temporal Table 表返回最新一个版本的数据。
适用场景:流和维度表 join
Temporal Table DDL 和 Temporal Table Function 的主要区别是:
- 时态表 DDL 可以在 SQL 中定义,但时态表函数不能;
- temporal table DDL 和 temporal table function 都支持 temporal join versioned table,但只有 temporal table function 可以 temporal join 任何 table/view 的最新版本。
Loopup Join
loopup join通常用于使用从外部系统查询的数据来丰富表。联接要求一个表具有处理时间属性,而另一个表由查找源连接器支持。
CREATE TABLE orders ( order_id STRING, price DECIMAL(32,2), total DECIMAL(32,2), currency STRING, order_time TIMESTAMP(3), proc_time as PROCTIME() ) WITH (/* ... */); -- Customers is backed by the JDBC connector and can be used for lookup joins CREATE TEMPORARY TABLE Customers ( id INT, name STRING, country STRING, zip STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://mysqlhost:3306/customerdb', 'table-name' = 'customers' ); -- enrich each order with customer information SELECT o.order_id, o.total, c.country, c.zip FROM Orders AS o JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id;