Apache Flink 漫谈系列(12) - Time Interval(Time-windowed) JOIN

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介:

说什么

JOIN 算子是数据处理的核心算子,前面我们在《Apache Flink 漫谈系列(09) - JOIN 算子》介绍了UnBounded的双流JOIN,在《Apache Flink 漫谈系列(10) - JOIN LATERAL》介绍了单流与UDTF的JOIN操作,在《Apache Flink 漫谈系列(11) - Temporal Table JOIN》又介绍了单流与版本表的JOIN,本篇将介绍在UnBounded数据流上按时间维度进行数据划分进行JOIN操作 - Time Interval(Time-windowed)JOIN, 后面我们叫做Interval JOIN。

实际问题

前面章节我们介绍了Flink中对各种JOIN的支持,那么想想下面的查询需求之前介绍的JOIN能否满足?需求描述如下:

比如有一个订单表Orders(orderId, productName, orderTime)和付款表Payment(orderId, payType, payTime)。 假设我们要统计下单一小时内付款的订单信息。

传统数据库解决方式

在传统刘数据库中完成上面的需求非常简单,查询sql如下::

SELECT 
  o.orderId,
  o.productName,
  p.payType,
  o.orderTime,
  payTime
FROM
  Orders AS o JOIN Payment AS p ON 
  o.orderId = p.orderId AND p.payTime >= orderTime AND p.payTime < orderTime + 3600 // 秒

上面查询可以完美的完成查询需求,那么在Apache Flink里面应该如何完成上面的需求呢?

Apache Flink解决方式

UnBounded 双流 JOIN

上面查询需求我们很容易想到利用《Apache Flink 漫谈系列(09) - JOIN 算子》介绍了UnBounded的双流JOIN,SQL语句如下:

 SELECT 
    o.orderId,
    o.productName,
    p.payType,
    o.orderTime,
    payTime 
  FROM
    Orders AS o JOIN Payment AS p ON 
    o.orderId = p.orderId AND p.payTime >= orderTime AND p.payTime as timestamp < TIMESTAMPADD(SECOND, 3600, orderTime)

UnBounded双流JOIN可以解决上面问题,这个示例和本篇要介绍的Interval JOIN有什么关系呢?

性能问题

虽然我们利用UnBounded的JOIN能解决上面的问题,但是仔细分析用户需求,会发现这个需求场景订单信息和付款信息并不需要长期存储,比如2018-12-27 14:22:22的订单只需要保持1小时,因为超过1个小时的订单如果没有被付款就是无效订单了。同样付款信息也不需要长期保持,2018-12-27 14:22:22的订单付款信息如果是2018-12-27 15:22:22以后到达的那么我们也没有必要保存到State中。 而对于UnBounded的双流JOIN我们会一直将数据保存到State中,如下示意图:
image

这样的底层实现,对于当前需求有不必要的性能损失。所以我们有必要开发一种新的可以清除State的JOIN方式(Interval JOIN)来高性能的完成上面的查询需求。

功能扩展

目前的UnBounded的双流JOIN是后面是没有办法再进行Event-Time的Window Aggregate的。也就是下面的语句在Apache Flink上面是无法支持的:

 SELECT COUNT(*) FROM (
  SELECT 
   ...,
   payTime
   FROM Orders AS o JOIN Payment AS p ON 
    o.orderId = p.orderId 
  ) GROUP BY TUMBLE(payTime, INTERVAL '15' MINUTE)

因为在UnBounded的双流JOIN中无法保证payTime的值一定大于WaterMark(WaterMark相关可以查阅<>). Apache Flink的Interval JOIN之后可以进行Event-Time的Window Aggregate。

Interval JOIN

为了完成上面需求,并且解决性能和功能扩展的问题,Apache Flink在1.4开始开发了Time-windowed Join,也就是本文所说的Interval JOIN。接下来我们详细介绍Interval JOIN的语法,语义和实现原理。

什么是Interval JOIN

Interval JOIN 相对于UnBounded的双流JOIN来说是Bounded JOIN。就是每条流的每一条数据会与另一条流上的不同时间区域的数据进行JOIN。对应Apache Flink官方文档的 Time-windowed JOIN(release-1.7之前都叫Time-Windowed JOIN)。

Interval JOIN 语法

SELECT ... FROM t1 JOIN t2  ON t1.key = t2.key AND TIMEBOUND_EXPRESSION

TIMEBOUND_EXPRESSION 有两种写法,如下:

  • L.time between LowerBound(R.time) and UpperBound(R.time)
  • R.time between LowerBound(L.time) and UpperBound(L.time)
  • 带有时间属性(L.time/R.time)的比较表达式。

Interval JOIN 语义

Interval JOIN 的语义就是每条数据对应一个 Interval 的数据区间,比如有一个订单表Orders(orderId, productName, orderTime)和付款表Payment(orderId, payType, payTime)。 假设我们要统计在下单一小时内付款的订单信息。SQL查询如下:

SELECT 
  o.orderId,
  o.productName,
  p.payType,
  o.orderTime,
  cast(payTime as timestamp) as payTime
FROM
  Orders AS o JOIN Payment AS p ON 
  o.orderId = p.orderId AND 
  p.payTime BETWEEN orderTime AND 
  orderTime + INTERVAL '1' HOUR
  • Orders订单数据
orderId productName orderTime
001 iphone 2018-12-26 04:53:22.0
002 mac 2018-12-26 04:53:23.0
003 book 2018-12-26 04:53:24.0
004 cup 2018-12-26 04:53:38.0
  • Payment付款数据
orderId payType payTime
001 alipay 2018-12-26 05:51:41.0
002 card 2018-12-26 05:53:22.0
003 card 2018-12-26 05:53:30.0
004 alipay 2018-12-26 05:53:31.0

符合语义的预期结果是 订单id为003的信息不出现在结果表中,因为下单时间2018-12-26 04:53:24.0, 付款时间是 2018-12-26 05:53:30.0超过了1小时付款。
那么预期的结果信息如下:

orderId productName payType orderTime payTime
001 iphone alipay 2018-12-26 04:53:22.0 2018-12-26 05:51:41.0
002 mac card 2018-12-26 04:53:23.0 2018-12-26 05:53:22.0
004 cup alipay 2018-12-26 04:53:38.0 2018-12-26 05:53:31.0

这样Id为003的订单是无效订单,可以更新库存继续售卖。

接下来我们以图示的方式直观说明Interval JOIN的语义,我们对上面的示例需求稍微变化一下: 订单可以预付款(不管是否合理,我们只是为了说明语义)也就是订单 前后 1小时的付款都是有效的。SQL语句如下:

SELECT
  ...
FROM
  Orders AS o JOIN Payment AS p ON
  o.orderId = p.orderId AND
  p.payTime BETWEEN orderTime - INTERVAL '1' HOUR AND
  orderTime + INTERVAL '1' HOUR

这样的查询语义示意图如下:
image

上图有几个关键点,如下:

  • 数据JOIN的区间 - 比如Order时间为3的订单会在付款时间为[2, 4]区间进行JOIN。
  • WaterMark - 比如图示Order最后一条数据时间是3,Payment最后一条数据时间是5,那么WaterMark是根据实际最小值减去UpperBound生成,即:Min(3,5)-1 = 2
  • 过期数据 - 出于性能和存储的考虑,要将过期数据清除,如图当WaterMark是2的时候时间为2以前的数据过期了,可以被清除。

Interval JOIN 实现原理

由于Interval JOIN和双流JOIN类似都要存储左右两边的数据,所以底层实现中仍然是利用State进行数据的存储。流计算的特点是数据不停的流入,我们可以不停的进行增量计算,也就是我们每条数据流入都可以进行JOIN计算。我们还是以具体示例和图示来说明内部计算逻辑,如下图:

image

简单解释一下每条记录的处理逻辑如下:
image

实际的内部逻辑会比描述的复杂的多,大家可以根据如上简述理解内部原理即可。

示例代码

我们还是以订单和付款示例,将完整代码分享给大家,具体如下(代码基于flink-1.7.0):

import java.sql.Timestamp

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row

import scala.collection.mutable

object SimpleTimeIntervalJoin {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 构造订单数据
    val ordersData = new mutable.MutableList[(String, String, Timestamp)]
    ordersData.+=(("001", "iphone", new Timestamp(1545800002000L)))
    ordersData.+=(("002", "mac", new Timestamp(1545800003000L)))
    ordersData.+=(("003", "book", new Timestamp(1545800004000L)))
    ordersData.+=(("004", "cup", new Timestamp(1545800018000L)))

    // 构造付款表
    val paymentData = new mutable.MutableList[(String, String, Timestamp)]
    paymentData.+=(("001", "alipay", new Timestamp(1545803501000L)))
    paymentData.+=(("002", "card", new Timestamp(1545803602000L)))
    paymentData.+=(("003", "card", new Timestamp(1545803610000L)))
    paymentData.+=(("004", "alipay", new Timestamp(1545803611000L)))
    val orders = env
      .fromCollection(ordersData)
      .assignTimestampsAndWatermarks(new TimestampExtractor[String, String]())
      .toTable(tEnv, 'orderId, 'productName, 'orderTime.rowtime)
    val ratesHistory = env
      .fromCollection(paymentData)
      .assignTimestampsAndWatermarks(new TimestampExtractor[String, String]())
      .toTable(tEnv, 'orderId, 'payType, 'payTime.rowtime)

    tEnv.registerTable("Orders", orders)
    tEnv.registerTable("Payment", ratesHistory)

    var sqlQuery =
      """
        |SELECT
        |  o.orderId,
        |  o.productName,
        |  p.payType,
        |  o.orderTime,
        |  cast(payTime as timestamp) as payTime
        |FROM
        |  Orders AS o JOIN Payment AS p ON o.orderId = p.orderId AND
        | p.payTime BETWEEN orderTime AND orderTime + INTERVAL '1' HOUR
        |""".stripMargin
    tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery))

    val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row]
    result.print()
    env.execute()
  }

}

class TimestampExtractor[T1, T2]
  extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) {
  override def extractTimestamp(element: (T1, T2, Timestamp)): Long = {
    element._3.getTime
  }
}

运行结果如下:
image

小节

本篇由实际业务需求场景切入,介绍了相同业务需求既可以利用Unbounded 双流JOIN实现,也可以利用Time Interval JOIN来实现,Time Interval JOIN 性能优于UnBounded的双流JOIN,并且Interval JOIN之后可以进行Window Aggregate算子计算。然后介绍了Interval JOIN的语法,语义和实现原理,最后将订单和付款的完整示例代码分享给大家。期望本篇能够让大家对Apache Flink Time Interval JOIN有一个具体的了解!

关于点赞和评论

本系列文章难免有很多缺陷和不足,真诚希望读者对有收获的篇章给予点赞鼓励,对有不足的篇章给予反馈和建议,先行感谢大家!

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
28天前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
575 13
Apache Flink 2.0-preview released
|
1月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
63 3
|
1月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
55 1
|
1月前
|
数据挖掘 物联网 数据处理
深入探讨Apache Flink:实时数据流处理的强大框架
在数据驱动时代,企业需高效处理实时数据流。Apache Flink作为开源流处理框架,以其高性能和灵活性成为首选平台。本文详细介绍Flink的核心特性和应用场景,包括实时流处理、强大的状态管理、灵活的窗口机制及批处理兼容性。无论在实时数据分析、金融服务、物联网还是广告技术领域,Flink均展现出巨大潜力,是企业实时数据处理的理想选择。随着大数据需求增长,Flink将继续在数据处理领域发挥重要作用。
|
2月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
4月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
820 7
阿里云实时计算Flink在多行业的应用和实践
|
15天前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
679 10
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
|
3月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
12天前
|
SQL 运维 数据可视化
阿里云实时计算Flink版产品体验测评
阿里云实时计算Flink基于Apache Flink构建,提供一站式实时大数据分析平台,支持端到端亚秒级实时数据分析,适用于实时大屏、实时报表、实时ETL和风控监测等场景,具备高性价比、开发效率、运维管理和企业安全等优势。
zdl
|
3天前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
19 0

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多