联接 Join 集合
可以使用具有SQL连接语法的JOIN语句,使用ksqlDB实时合并事件流。ksqlDB连接和关系数据库连接的相似之处在于,它们都基于通用值组合了来自两个或多个源的数据。ksqlDB连接的结果是一个新的流或表,其中填充了您在SELECT语句中指定的列值。
使用ksqlDB,无需围绕连接流和表编写低级逻辑,因此可以专注于用于组合流数据的业务逻辑。
可以通过以下方式联接流和表:
连接多个流以创建新的流。连接多个表以创建一个新表。连接多个流和表以创建一个新的流。
Join 语句
ksqlDB JOIN子句具有SQL JOIN子句的熟悉语法。以下示例创建一个pageviews_enriched流,该流是pageviews流和users表的组合:
CREATE STREAM pageviews_enriched AS
SELECT
users.userid AS userid,
pageid,
regionid,
gender
FROM pageviews
LEFT JOIN users ON pageviews.userid=users.userid
EMIT CHANGES;
当连接两个流时,必须指定WITHIN子句以匹配在指定时间间隔内都出现的记录。有关有效时间单位,请参见时间单位。
这里有一个例子流,信息流加入结合orders,payments与shipments流。结果shipped_orders流包含下订单后1小时内支付的所有订单,并在收到付款后2小时内发货。
CREATE STREAM shipped_orders AS
SELECT
o.id as orderId
o.itemid as itemId,
s.id as shipmentId,
p.id as paymentId
FROM orders o
INNER JOIN payments p WITHIN 1 HOURS ON p.id=o.id
INNER JOIN shipments s WITHIN 2 HOURS ON s.id=o.id;
Join和Windows
ksqlDB允许将具有相同键的记录进行分组,以便将有状态操作(例如联接)组合到windows中。您为窗口指定保留期,此保留期控制ksqlDB等待无序记录的时间。如果记录在窗口的保留期过后到达,则该记录将被丢弃,并且不会在该窗口中进行处理。
注意:仅对流连接流支持窗口。
每个记录键都跟踪Windows。在联接操作中,ksqlDB使用窗口状态存储区将到目前为止收到的所有记录存储在定义的窗口边界内。在指定的窗口保留期之后,将清除状态存储中的旧记录。
Join 需求
ksqlDB应用程序必须满足特定的需求,Join才能成功。
共分区的数据
连接时必须对输入数据进行共分区。这样可以确保在处理过程中,从连接的两侧将具有相同键的输入记录传递到同一流任务。联接时,用户有责任确保数据进行分区。
Join功能
ksqlDB支持大量的流和表联接操作,包括INNER,LEFT OUTER和FULL OUTER。通常,LEFT OUTER缩短为LEFT JOIN,而FULL OUTER缩短为OUTER JOIN。
不支持RIGHT OUTER JOIN。而是交换操作数并使用LEFT JOIN。
下表显示了支持的组合。
类型
INNER
LEFT OUTER
FULL OUTER
流-流
窗口式的
支持的
支持的
支持的
表-表
非窗口式
支持的
支持的
支持的
流-表
非窗口式
支持的
支持的
不支持
流流联接
ksqlDB支持流之间的INNER,LEFT OUTER和FULL OUTER连接。
所有这些操作都支持乱序记录。
要加入两个流,必须使用WITHIN子句指定一个窗口方案。一侧的新输入记录为另一侧的每个匹配记录生成联接输出,并且联接窗口中可以有多个此类匹配记录。
连接仅在将流标记为要重新分区时才对流进行数据重新分区。如果两个流都被标记,则都将被重新分区。
重要的提示
Kafka保证来自一个源分区的任何两个消息的相对顺序,前提是它们在重新分区后也都位于同一个分区中。否则,Kafka可能会交错插入事件。用例将确定这些订购保证是否可接受。
LEFT OUTER联接将在结果流中包含leftRecord-NULL记录,这意味着该联接包含从没有进行匹配的右侧流中选择的字段的NULL值。
FULL OUTER联接将在结果流中包含leftRecord-NULL或NULL-rightRecord记录,这意味着联接中包含来自未进行匹配的流的字段的NULL值。
流流连接的语义
下表显示了各种流-流连接变体的语义。在表中,每一行代表一个新的传入记录。适用以下假设:
所有记录具有相同的键。所有记录都属于一个连接窗口。所有记录均按时间戳顺序处理。
收到新输入后,将在表中列出的条件下触发联接。具有NULL键或NULL值的输入记录将被忽略,并且不会触发联接。
时间戳记
左流
右流
内部联接 Inner
左联接 left
右联接 right
1个
Null
2个
Null
3
A
[A,Null]
[A,Null]
4
一个
[A,a]
[A,a]
[A,a]
5
B
[B,a]
[B,a]
[B,a]
6
b
[A,b],[B,b]
[A,b],[B,b]
[A,b],[B,b]
7
Null
8
Null
9
C
[C,a],[C,b]
[C,a],[C,b]
[C,a],[C,b]
10
C
[A,c],[B,c],[C,c]
[A,c],[B,c],[C,c]
[A,c],[B,c],[C,c]
11
Null
12
Null
13
Null
14
d
[A,d],[B,d],[C,d]
[A,d],[B,d],[C,d]
[A,d],[B,d],[C,d]
15
D
[D,a],[D,b],[D,c],[D,d]
[D,a],[D,b],[D,c],[D,d]
[D,a],[D,b],[D,c],[D,d]
流表联接
ksqlDB仅支持流和表之间的INNER和LEFT连接。
流表联接始终是非窗口联接。当新记录到达流上时,您可以针对表执行表查找。只有到达流侧的事件才触发下游更新并产生联接输出。表端的更新不会产生更新的联接输出。
流表联接仅在将流标记为要重新分区时才对流进行数据重新分区。
重要提示
ksqlDB当前在时间同步方面提供了最大的努力,但是没有保证,这可能会导致结果丢失或left Record-NULL结果。
流表联接的语义
下表显示了各种流表连接变体的语义。在表中,每一行代表一个新的传入记录。适用以下假设:
所有记录具有相同的键。所有记录均按时间戳顺序处理。
仅左侧流的输入记录会触发联接。右侧表的输入记录仅更新内部右侧连接状态。
具有NULL值的表的输入记录被解释为对应键的逻辑删除,表示从表中删除了键。逻辑删除不会触发联接。
时间戳记
左流
右表
内部联接
左联接
1
Null
2
Null(墓碑)
3
A
[A,Null]
4
一个
5
B
[B,a]
[B,a]
6
b
7
Null
8
Null(墓碑)
9
C
[C,Null]
10
C
11
Null
12
Null
13
Null
14
d
15
D
[D,d]
[D,d]
请注意,即使表端稍后已填充,如果表端尚未包含键的值,INNER JOIN也不会产生任何输出。对于LEFT JOIN,相同的场景将导致leftRecord-NULL的输出。因此,重要的是在接收流事件之前加载表数据。
ksqlDB尝试按事件时间顺序处理连接的两端,但是不能提供有力的保证,尤其是在存在乱序行的情况下。
为了最大程度地提高联接的可预测性,请确保源主题中提供了历史表数据,查询正在运行,并且ksqlDB在开始生成流之前有足够的时间来处理表数据。
表表联接
ksqlDB支持表之间的INNER,LEFT OUTER和FULL OUTER连接。不支持与多个记录匹配的联接(一对多)。
表-表联接始终是非窗口联接。
表-表联接最终是一致的。
重要提示
ksqlDB当前在时间同步方面提供了最大的努力,但是没有保证,这可能会导致结果丢失或leftRecord-NULL结果。
表-表联接只能在其PRIMARY KEY字段上联接,并且不支持一对多(1:N)联接。
表-表联接的语义
下表显示了各种表-表连接变体的语义。在表中,二手手游账号买卖平台每一行代表一个新的传入记录。适用以下假设:
所有记录具有相同的键。所有记录均按时间戳顺序处理。
具有NULL值的输入记录被解释为相应键的逻辑删除,表示从表中删除了键。逻辑删除不会触发联接。如果接收到输入逻辑删除,则输出逻辑删除将直接转发到连接结果表(如果连接结果表中已经存在相应的键)。
时间戳记
左表
右表
内部联接
左联接
外连接
1
Null(墓碑)
2
Null(墓碑)
3
A
[A,Null]
[A,Null]
4
一个
[A,a]
[A,a]
[A,a]
5
B
[B,a]
[B,a]
[B,a]
6
b
[B,b]
[B,b]
[B,b]
7
Null(墓碑)
Null(墓碑)
Null(墓碑)
[null,b]
8
Null(墓碑)
Null(墓碑)
9
C
[C,Null]
[C,Null]
10
C
[C,c]
[C,c]
[C,c]
11
Null(墓碑)
Null(墓碑)
[C,Null]
[C,Null]
12
Null(墓碑)
Null(墓碑)
Null(墓碑)
13
Null(墓碑)
14
d
[n,d]
15
D
[D,d]
[D,d]
[D,d]
N向联接 Join
ksqlDB支持在单个语句中连接两个以上的源。这些连接在语义上等效于连续连接N个源,并且连接的顺序由写入连接的顺序控制。
以下面的查询为例,其中A是事件流,B并且C都是表:
CREATE STREAM joined AS
SELECT *
FROM A
JOIN B ON A.id=Bduct_id
JOIN C ON A.id=C.purchased_id;
该查询的输出是流,中间连接结果将是stream A ? B。如果C是流而不是表,那么您将通过添加一个WITHIN子句来相应地重写连接,因为A ? Bwith与C是流-流连接:
CREATE STREAM joined AS
SELECT *
FROM A
JOIN B ON A.id=Bduct_id
JOIN C WITHIN 10 SECONDS ON A.id=C.purchased_id;
N向联接的局限性
前面各节中对N向联接的每个中间步骤所描述的限制和限制。例如,FULL OUTER不支持流和表之间的联接。这意味着,如果N向联接中的任何阶段都解析为FULL OUTER流和表之间的联接,则整个查询将失败:
--- This JOIN fails with the following exception:
--- Join between invalid operands requested: left type: KTABLE, right type: KSTREAM
CREATE STREAM joined AS
SELECT *
FROM A
JOIN B WITHIN 10 SECONDS ON A.id=Bduct_id
FULL OUTER JOIN C ON A.id=C.purchased_id;分区要求
使用ksqlDB联接流数据时,必须确保流和表是共分区的,这意味着联接两侧的输入记录的分区配置都相同。
要连接两个数据源,流或表,ksqlDB需要根据连接列比较它们的记录。为确保具有相同联接列的记录在同一流任务上共置一处,联接列必须与源分区所在的列重合。
键 Keys
表始终按其分区PRIMARY KEY,而ksqlDB不允许对表进行重新分区,这意味着您只能将表的主键用作连接列。
流没有主键,但是有一个可选的 KEY 列。当存在KEY列时,定义了分区列。
流允许对除键列之外的表达式进行联接。当连接条件与KEY列不同时,ksqlDB在内部对流进行重新分区,这将隐式定义正确的键和分区。
重要提示
Kafka保证来自一个源分区的任何两个消息的相对顺序,前提是它们在重新分区后都位于同一个分区中。否则,Kafka可能会交错插入事件。用例将确定这些订购保证是否可接受。
以下示例显示了一个users表,该表clicks在点击的userId列上与流连接在一起。该users表具有id相同SQL类型的正确主键。该clicks流没有定义的键,因此ksqlDBuserId在执行连接之前在连接列()上对其内部重新分区以分配键。
-- clicks stream, with no or unknown key.
-- the schema of stream clicks is: USERID BIGINT | URL STRING
CREATE STREAM clicks (
userId BIGINT,
url STRING
) WITH (
kafka_topic='clickstream',
value_format='json'
);
-- users table, with userId primary key.
-- the schema of table users is: USERID BIGINT PRIMARY KEY | FULLNAME STRING
CREATE TABLE users (
id BIGINT PRIMARY KEY,
fullName STRING
) WITH (
kafka_topic='users',
value_format='json'
);
-- join of users table with clicks stream, joining on the table's primary key and the stream's userId column:
-- join will automatically repartition clicks stream:
SELECT
c.userId,
c.url,
u.fullName
FROM clicks c
JOIN users u ON c.userId=u.id;
共分区要求
使用ksqlDB联接流数据时,必须确保流和表是共分区的,这意味着联接两侧的输入记录的分区配置都相同。
联接的输入记录必须具有相同的键架构。输入记录的两侧必须具有相同数量的分区。连接的两端必须具有相同的分区策略。
当您对输入进行共同分区时,在连接过程中,来自连接两侧的具有相同键的记录将被传递到同一流任务。
记录具有相同的键Schema
为了使联接生效,两端的键必须具有相同的SQL类型。
例如,您可以STRING将以用户ID为关键字的用户点击流与也以STRING用户ID为键的用户配置文件表结合在一起。双方具有完全相同的用户ID的记录将被合并。
如果您希望加入的列的架构不匹配,则CAST一侧可能会匹配另一侧。例如,如果INT联接的一侧有一个userId列,而另一侧是a LONG,那么您可以选择将INT一侧转换为a LONG:
-- stream with INT userId
CREATE STREAM clicks (
userId INT KEY,
url STRING
) WITH (
kafka_topic='clickstream',
value_format='json'
);
-- table with BIGINT id stored in the key:
CREATE TABLE users (
id BIGINT PRIMARY KEY,
fullName STRING
) WITH (
kafka_topic='users',
value_format='json'
);
-- Join utilising a CAST to convert the left sides join column to match the rights type.
SELECT
clicks.url,
users.fullName
FROM clicks
JOIN users ON CAST(clicks.userId AS BIGINT)=users.id;
在现有Kafka主题之上创建的表(例如,使用CREATE TABLE语句创建的表)将根据Kafka主题中记录的键中所保存的数据进行键入。ksqlDB在PRIMARY KEY列中显示此数据。
在ksqlDB内部从其他源创建的表(例如,使用CREATE TABLE AS SELECT语句创建的表)将从其源复制键,除非存在显式GROUP BY或JOIN子句,否则该显式或子句可以更改键入该表的内容。
注意
如果联接需要,则ksqlDB会自动对流进行重新分区,但是ksqlDB会拒绝不是键的表列上的任何联接。这是因为ksqlDB不支持外键上的联接,并且对表的主题进行重新分区有可能对事件进行重新排序并错误地解释逻辑删除,这可能导致意外或意外的副作用。
如果在多个联接中使用相同的源,并且需要对数据进行重新分区,则您可能希望手动重新分区,以避免ksqlDB多次重新分区。
要对流进行重新分区,请使用PARTITION BY子句。请注意,只有在重新分区之后,Kafka才能保证来自一个源分区的任何两个消息的相对顺序,它们也都位于同一个分区中。否则,Kafka可能会交错插入邮事件。用例将确定这些订购保证是否可接受。
重要提示
如果PARTITION BY表达式的计算结果为NULL,则结果行将产生一个随机分区。您可能需要使用COALESCE来包装表达式并将所有NULL值转换为默认值,例如PARTITION BY COALESCE(MY_UDF_THAT_MAY_FAIL(Col0), 0)。
例如,如果您需要对要由product_id字段进行键控的流进行重新分区,并且需要将键分布在6个分区上才能进行联接,请使用以下SQL语句:
CREATE STREAM products_rekeyed
WITH (PARTITIONS=6) AS
SELECT *
FROM products
PARTITION BY product_id;
记录具有相同数量的分区
联接的输入记录两侧必须具有相同数量的分区。
ksqlDB会检查这部分分区需求,并拒绝分区计数不同的任何连接。
使用DESCRIBE EXTENDED CLI中的命令确定源下的Kafka主题,并使用CLI中的SHOW TOPICS命令列出主题及其分区数。
如果联接的两面具有不同的分区数,则可能要更改源主题的分区数,或重新分配一面以匹配另一面的分区数。
以下示例创建一个重新分区的流,并使用指定数量的分区来维护现有键。
CREATE STREAM products_rekeyed
WITH (PARTITIONS=6) AS
SELECT * FROM products;
记录具有相同的分区策略
连接两侧的记录必须具有相同的分区策略。如果在所有应用程序中使用默认分区程序设置,并且生产者未指定显式分区,则无需担心分区策略。
但是,如果您的记录的生产者应用程序在配置中指定了自定义分区程序,则必须对联接两侧的记录使用相同的自定义分区程序逻辑。写入联接输入的应用程序必须具有相同的分区策略,以便具有相同键的记录将传递到相同的分区号。
这意味着输入记录必须在连接的两侧都位于同一分区中。例如,在流表联接中,如果具有userId键值的键alice123在流的分区1中,但alice123在表的分区2中,则即使两端都由键键入,联接也不会匹配userId。
ksqlDB无法验证两个连接输入的分区策略是否相同,因此必须确保这一点。
该DefaultPartitioner类实现了以下的分区策略:
如果生产者在记录中指定了分区,请使用它。如果生产者指定键而不是分区,请根据键的哈希值选择一个分区。如果生产者未指定分区或键,请以循环方式选择一个分区。
自定义分区程序类实现了Partitioner接口,并在生产者配置属性中分配partitioner.class。
综合键列
某些联接的结果中有一个合成键列。此列并非来自任何来源。这是一个示例,可以帮助解释什么是合成键列以及为什么需要它们:
CREATE TABLE OUTPUT AS
SELECT * FROM L FULL OUTER JOIN R ON L.ID=R.ID;
前面的语句似乎很简单:创建一个新表,该表是对两个源表进行完全外部联接并在它们的ID列上联接的结果。但是在完全外部联接中,一个L.ID或R.ID可能会丢失(NULL),或者两个都可能具有相同的值。由于生成给ApacheKafka?的数据应始终具有非空消息键,因此ksql选择要使用的第一个非空键:
L.ID
R.ID
Kafka消息键
10
null
10
null
7
7
8
8
8
Kafka消息键中存储的数据可能与两个源ID列都不匹配。相反,它是一个新列:合成列,这意味着该列不属于任何一个源表。
哪些联接导致合成键列?
结果中的键列与任何源列都不匹配的任何联接都称为具有合成键列。
下列类型的联接会导致将合成键列添加到结果模式:
FULL OUTER JOIN,例如:
sql CREATE TABLE OUTPUT AS SELECT * FROM L FULL OUTER JOIN R ON L.ID=R.ID;在联接ON条件中使用的所有表达式都不是简单列引用的任何联接。例如:
sql -- join on expressions other than column references:
CREATE TABLE OUTPUT AS SELECT * FROM L JOIN R ON ABS(L.ID)=ABS(R.ID);
合成键列分配了什么名称?
合成键列的默认名称为ROWKEY。但是,如果联接中使用的任何源已经包含名为的列ROWKEY,则合成键列的名称为ROWKEY_1,或者ROWKEY_2存在存在名为的源列ROWKEY_1等。
-- given sources:
CREATE STREAM S1 (ROWKEY INT KEY, V0 STRING) WITH (...);
CREATE STREAM S2 (ID INT KEY, ROWKEY_1 INT) WITH (...);
CREATE STREAM OUTPUT AS
SELECT *
FROM S1 JOIN S2
WITHIN 30 SECONDS
ON ABS(S1.ROWKEY)=ABS(S2.ID);
-- result in OUTPUT with synthetic key column name: ROWKEY_2
与其他任何键列一样,合成键列必须包含在流查询的投影中。如果投影缺少合成键,则将返回类似以下的错误,指示丢失的键列的名称:
Key missing from projection.
The query used to build OUTPUT
must include the join expression ROWKEY in its projection.
ROWKEY was added as a synthetic key column because the join criteria did not match any source column. This expression must be included in the projection and may be aliased.
(可选)可以为投影中的键列提供别名。推荐这样做,因为不能保证系统生成的名称在版本之间保持一致。例如:
CREATE STREAM OUTPUT AS
SELECT ROWKEY AS ID, S1.C0, S2.C1 FROM S1 FULL OUTER JOIN S2 ON S1.ID=S2.ID;