Apache Flink 漫谈系列(10) - JOIN LATERAL

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 聊什么 上一篇《Apache Flink 漫谈系列 - JOIN算子》我们对最常见的JOIN做了详尽的分析,本篇介绍一个特殊的JOIN,那就是JOIN LATERAL。JOIN LATERAL为什么特殊呢,直观说因为JOIN的右边不是一个实际的物理表,而是一个VIEW或者Table-valued Funciton。

聊什么

上一篇《Apache Flink 漫谈系列 - JOIN算子》我们对最常见的JOIN做了详尽的分析,本篇介绍一个特殊的JOIN,那就是JOIN LATERAL。JOIN LATERAL为什么特殊呢,直观说因为JOIN的右边不是一个实际的物理表,而是一个VIEW或者Table-valued Funciton。如下图所示:

image

本篇会先介绍传统数据库对LATERAL JOIN的支持,然后介绍Apache Flink目前对LATERAL JOIN的支持情况。

实际问题

假设我们有两张表,一张是Customers表(消费者id, 所在城市), 一张是Orders表(订单id,消费者id),两张表的DDL(SQL Server)如下:

  • Customers
CREATE TABLE Customers (
  customerid char(5) NOT NULL,
  city varchar (10) NOT NULL
)

insert into Customers values('C001','Beijing');
insert into Customers values('C002','Beijing');
insert into Customers values('C003','Beijing');
insert into Customers values('C004','HangZhou');

查看数据:

  • Orders
CREATE TABLE Orders(
  orderid char(5) NOT NULL,
  customerid char(5) NULL
)

insert into Orders values('O001','C001');
insert into Orders values('O002','C001');
insert into Orders values('O003','C003');
insert into Orders values('O004','C001');

查看数据:

问题示例

假设我们想查询所有Customers的客户ID,地点和订单信息,我们想得到的信息是:

用INNER JOIN解决

如果大家查阅了《Apache Flink 漫谈系列 - JOIN算子》,我想看到这样的查询需求会想到INNER JOIN来解决,SQL如下:

SELECT 
    c.customerid, c.city, o.orderid 
FROM Customers c JOIN Orders o 
    ON o.customerid = c.customerid

查询结果如下:

但如果我们真的用上面的方式来解决,就不会有本篇要介绍的内容了,所以我们换一种写法。

用 Correlated subquery解决

Correlated subquery 是在subquery中使用关联表的字段,subquery可以在FROM Clause中也可以在WHERE Clause中。

  • WHERE Clause
    用WHERE Clause实现上面的查询需求,SQL如下:
SELECT 
    c.customerid, c.city
FROM Customers c WHERE c.customerid IN (
    SELECT 
      o.customerid, o.orderid
    FROM Orders o
    WHERE o.customerid = c.customerid
)

执行情况:

上面的问题是用在WHERE Clause里面subquery的查询列必须和需要比较的列对应,否则我们无法对o.orderid进行投影, 上面查询我为什么要加一个o.orderid呢,因为查询需求是需要o.orderid的,去掉o.orderid查询能成功,但是拿到的结果并不是我们想要的,如下:

SELECT 
    c.customerid, c.city
FROM Customers c WHERE c.customerid IN (
    SELECT 
      o.customerid
    FROM Orders o
    WHERE o.customerid = c.customerid
)

查询结果:

可见上面查询结果缺少了o.orderid,不能满足我们的查询需求。

  • FROM Clause
    用FROM Clause实现上面的查询需求,SQL如下:
SELECT 
    c.customerid, c.city, o.orderid 
FROM Customers c, (
    SELECT 
      o.orderid, o.customerid 
    FROM Orders o
    WHERE o.customerid = c.customerid
) as o

我们会得到如下错误:

错误信息提示我们无法识别c.customerid。在ANSI-SQL里面FROM Clause里面的subquery是无法引用左边表信息的,所以简单的用FROM Clause里面的subquery,也无法解决上面的问题,
那么上面的查询需求除了INNER JOIN 我们还可以如何解决呢?

JOIN LATERAL

我们分析上面的需求,本质上是根据左表Customers的customerid,去查询右表的Orders信息,就像一个For循环一样,外层是遍历左表Customers所有数据,内层是根据左表Customers的每一个Customerid去右表Orders中进行遍历查询,然后再将符合条件的左右表数据进行JOIN,这种根据左表逐条数据动态生成右表进行JOIN的语义,SQL标准里面提出了LATERAL关键字,也叫做 lateral drive table

CROSS APPLY和LATERAL

上面的示例我们用的是SQL Server进行测试的,这里在多提一下在SQL Server里面是如何支持 LATERAL 的呢?SQL Server是用自己的方言 CROSS APPLY 来支持的。那么为啥不用ANSI-SQL的LATERAL而用CROSS APPLY呢? 可能的原因是当时SQL Server为了解决TVF问题而引入的,同时LATERAL是SQL2003引入的,而CROSS APPLY是SQL Server 2005就支持了,SQL Server 2005的开发是在2000年就进行了,这个可能也有个时间差,等LATERAL出来的时候,CROSS APPLY在SQL Server里面已经开发完成了。所以种种原因SQL Server里面就采用了CROSS APPLY,但CROSS APPLY的语义与LATERAL却完全一致,同时后续支持LATERAL的Oracle12和PostgreSQL94同时支持了LATERALCROSS APPLY

问题解决

那么我们回到上面的问题,我们用SQL Server的CROSS APPLY来解决上面问题,SQL如下:

上面得到的结果完全满足查询需求。

JOIN LATERAL 与 INNER JOIN 关系

上面的查询需求并没有体现JOIN LATERALINNER JOIN的区别,我们还是以SQL Server中两个查询执行Plan来观察一下:

上面我们发现经过SQL Server优化器优化之后的两个执行plan完全一致,那么为啥还要再造一个LATERAL 出来呢?

性能方面

我们将上面的查询需求稍微改变一下,我们查询所有Customer和Customers的第一份订单信息。

  • LATERAL 的写法
SELECT 
    c.customerid, c.city, o.orderid 
FROM Customers c CROSS APPLY (
    SELECT 
     TOP(1) o.orderid, o.customerid 
    FROM Orders o 
    WHERE o.customerid = c.customerid
    ORDER BY o.customerid, o.orderid
) as o

查询结果:

我们发现虽然C001的Customer有三笔订单,但是我们查询的TOP1信息。

  • JOIN 写法
SELECT  c.customerid, c.city, o.orderid
 FROM    Customers c
  JOIN (
   SELECT 
     o2.*, 
     ROW_NUMBER() OVER (
        PARTITION BY customerid 
        ORDER BY orderid
      ) AS rn
   FROM    Orders o2
 ) o
ON c.customerid = o.customerid AND o.rn = 1

查询结果:

如上我们都完成了查询需求,我们在来看一下执行Plan,如下:

我们直观发现完成相同功能,使用CROSS APPLY进行查询,执行Plan简单许多。

功能方面

在功能方面INNER JOIN本身在ANSI-SQL中是不允许 JOIN 一个Function的,这也是SQL Server当时引入CROSS APPLY的根本原因。我们以一个SQL Server中DMV(相当于TVF)查询为例:

SELECT 
   name, log_backup_time 
FROM sys.databases AS s
 CROSS APPLY sys.dm_db_log_stats(s.database_id); 

查询结果:

Apache Flink对 LATERAL的支持

前面我花费了大量的章节来向大家介绍ANSI-SQL和传统数据库以SQL Server为例如何支持LATERAL的,接下来我们看看Apache Flink对LATERAL的支持情况。

Calcite

Apache Flink 利用 Calcite进行SQL的解析和优化,目前Calcite完全支持LATERAL语法,示例如下:

SELECT 
    e.NAME, e.DEPTNO, d.NAME 
FROM EMPS e, LATERAL (
    SELECT 
    *
    FORM DEPTS d 
    WHERE e.DEPTNO=d.DEPTNO
 ) as d;

查询结果:

我使用的是Calcite官方自带测试数据。

Flink

截止到Flink-1.6.2,Apache Flink 中有两种场景使用LATERAL,如下:

  • UDTF(TVF) - User-defined Table Funciton
  • Temporal Table - 涉及内容会在后续篇章单独介绍。

本篇我们以在TVF(UDTF)为例说明 Apache Fink中如何支持LATERAL

UDTF

UDTF- User-defined Table Function是Apache Flink中三大用户自定义函数(UDF,UDTF,UDAGG)之一。 自定义接口如下:

  • 基类
/**
  * Base class for all user-defined functions such as scalar functions, table functions,
  * or aggregation functions.
  */
abstract class UserDefinedFunction extends Serializable {
  // 关键是FunctionContext中提供了若干高级属性(在UDX篇会详细介绍)
  def open(context: FunctionContext): Unit = {}
  def close(): Unit = {}
}
  • TableFunction
/**
  * Base class for a user-defined table function (UDTF). A user-defined table functions works on
  * zero, one, or multiple scalar values as input and returns multiple rows as output.
  *
  * The behavior of a [[TableFunction]] can be defined by implementing a custom evaluation
  * method. An evaluation method must be declared publicly, not static and named "eval".
  * Evaluation methods can also be overloaded by implementing multiple methods named "eval".
  *
  * User-defined functions must have a default constructor and must be instantiable during runtime.
  *
  * By default the result type of an evaluation method is determined by Flink's type extraction
  * facilities. This is sufficient for basic types or simple POJOs but might be wrong for more
  * complex, custom, or composite types. In these cases [[TypeInformation]] of the result type
  * can be manually defined by overriding [[getResultType()]].
  */
abstract class TableFunction[T] extends UserDefinedFunction {

  // 对于泛型T,如果是基础类型那么Flink框架可以自动识别,
  // 对于用户自定义的复杂对象,需要用户overwrite这个实现。
  def getResultType: TypeInformation[T] = null
}

上面定义的核心是要求用户实现eval方法,我们写一个具体示例。

  • 示例
// 定义一个简单的UDTF返回类型,对应接口上的 T 
case class SimpleUser(name: String, age: Int)
// 继承TableFunction,并实现evale方法
// 核心功能是解析以#分割的字符串
class SplitTVF extends TableFunction[SimpleUser] {
  // make sure input element's format is "<string>#<int>"
  def eval(user: String): Unit = {
    if (user.contains("#")) {
      val splits = user.split("#")
      collect(SimpleUser(splits(0), splits(1).toInt))
    }
  }
}

示例(完整的ITCase):

  • 测试数据
    我们构造一个只包含一个data字段的用户表,用户表数据如下:
data
Sunny#8
Kevin#36
Panpan#36
  • 查询需求
    查询的需求是将data字段flatten成为name和age两个字段的表,期望得到:
name age
Sunny 8
Kevin 36
Panpan 36
  • 查询示例
    我们以ITCase方式完成如上查询需求,完整代码如下:
@Test
  def testLateralTVF(): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)
    env.setStateBackend(getStateBackend)
    StreamITCase.clear

    val userData = new mutable.MutableList[(String)]
    userData.+=(("Sunny#8"))
    userData.+=(("Kevin#36"))
    userData.+=(("Panpan#36"))

    val SQLQuery = "SELECT data, name, age FROM userTab, LATERAL TABLE(splitTVF(data)) AS T(name, age)"

    val users = env.fromCollection(userData).toTable(tEnv, 'data)

    val tvf = new SplitTVF()
    tEnv.registerTable("userTab", users)
    tEnv.registerFunction("splitTVF", tvf)

    val result = tEnv.SQLQuery(SQLQuery).toAppendStream[Row]
    result.addSink(new StreamITCase.StringSink[Row])
    env.execute()
    StreamITCase.testResults.foreach(println(_))
  }

运行结果:

上面的核心语句是:

val SQLQuery = "SELECT data, name, age FROM userTab, LATERAL TABLE(splitTVF(data)) AS T(name, age)"

如果大家想运行上面的示例,请查阅《Apache Flink 漫谈系列 - SQL概览》中 源码方式 搭建测试环境。

小结

本篇重点向大家介绍了一种新的JOIN类型 - JOIN LATERAL。并向大家介绍了SQL Server中对LATERAL的支持方式,详细分析了JOIN LATERALINNER JOIN的区别与联系,最后切入到Apache Flink中,以UDTF示例说明了Apache Flink中对JOIN LATERAL的支持,后续篇章会介绍Apache Flink中另一种使用LATERAL的场景,就是Temporal JION,Temporal JION也是一种新的JOIN类型,我们下一篇再见!

关于点赞和评论

本系列文章难免有很多缺陷和不足,真诚希望读者对有收获的篇章给予点赞鼓励,对有不足的篇章给予反馈和建议,先行感谢大家!

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
2月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
121 2
|
25天前
|
消息中间件 资源调度 API
Apache Flink 流批融合技术介绍
本文源自阿里云高级研发工程师周云峰在Apache Asia Community OverCode 2024的分享,内容涵盖从“流批一体”到“流批融合”的演进、技术解决方案及社区进展。流批一体已在API、算子和引擎层面实现统一,但用户仍需手动配置作业模式。流批融合旨在通过动态调整优化策略,自动适应不同场景需求。文章详细介绍了如何通过量化指标(如isProcessingBacklog和isInsertOnly)实现这一目标,并展示了针对不同场景的具体优化措施。此外,还概述了社区当前进展及未来规划,包括将优化方案推向Flink社区、动态调整算子流程结构等。
316 31
Apache Flink 流批融合技术介绍
|
3天前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
18 0
|
2月前
|
Java 微服务 Spring
驾驭复杂性:Spring Cloud在微服务构建中的决胜法则
【8月更文挑战第31天】Spring Cloud是在Spring Framework基础上打造的微服务解决方案,提供服务发现、配置管理、消息路由等功能,适用于构建复杂的微服务架构。本文介绍如何利用Spring Cloud搭建微服务,包括Eureka服务发现、Config Server配置管理和Zuul API网关等组件的配置与使用。通过Spring Cloud,可实现快速开发、自动化配置,并提升系统的伸缩性和容错性,尽管仍需面对分布式事务等挑战,但其强大的社区支持有助于解决问题。
54 0
|
20天前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
3月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
747 7
阿里云实时计算Flink在多行业的应用和实践
|
2月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
21天前
|
存储 运维 监控
阿里云实时计算Flink版的评测
阿里云实时计算Flink版的评测
48 15
|
20天前
|
运维 分布式计算 监控
评测报告:阿里云实时计算Flink版
本评测主要针对阿里云实时计算Flink版在用户行为分析中的应用。作为一名数据分析师,我利用该服务处理了大量日志数据,包括用户点击流和登录行为。Flink的强大实时处理能力让我能够迅速洞察用户行为变化,及时调整营销策略。此外,其卓越的性能和稳定性显著降低了运维负担,提升了项目效率。产品文档详尽且易于理解,但建议增加故障排查示例。

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多