SparkStreaming使用mapWithState时,设置timeout()无法生效问题解决方案

简介: SparkStreaming使用mapWithState时,设置timeout()无法生效问题解决方案

前言

当我在测试SparkStreaming的状态操作mapWithState算子时,当我们设置timeout(3s)的时候,3s过后数据还是不会过期,不对此key进行操作,等到30s左右才会清除过期的数据。

百度了很久,关于timeout的资料很少,更没有解决这个问题的文章,所以说,百度也不是万能的,有时候还是需要靠自己。

image.png

所以我就在周末研究了一下,然后将结果整理了出来,希望能帮助大家更全面的理解Spark状态计算。

mapWithState

按理说Spark Streaming实时处理,数据就像流水,每个批次之间的数据都是独立的,处理完就处理完了,不留下任何状态。但是免不了一些有状态的操作,例如统计从流启动到现在,某个单词出现了多少次,所以状态操作就出现了。

状态操作分为updateStateByKey和mapWithState,两者有着很大的区别。简单的来说,前者每次输出的都是全量状态,后者输出的是增量状态。

过期原理

image.png

过期这一块估计很多人开始都理解错了,我刚开始理解就是数据从出现,经过多少秒之后就会过期。其实不是,这里的过期指的是空闲时间。

注释大概是这个意思:timeout()传入一个时间间隔参数,如果一个key在大于此间隔没有此key的数据流入,则被认为是空闲的,就会单独调用一次mapWithState中的func来清除这些空闲数据状态。

先写结论

使用了timeout()之后,需要使用以下代码来在间隔内清除失效key。

stream.checkpoint(Seconds(6))

checkpoint的时候,会开启全面扫描,才会对state中的失效key进行清理。

测试

val conf = new SparkConf().setMaster("local[2]").setAppName("state")
    val ssc = new StreamingContext(conf, Seconds(3))
    ssc.checkpoint("./tmp")
    val streams: DStream[(String, Int)] = ssc.socketTextStream("localhost", 9999)
      .map(x => (x, 1))
    val result = streams.mapWithState(StateSpec.function((k: String, v: Option[Int], state: State[Int]) => {
        val count = state.getOption().getOrElse(0)
        println(k)
        println(v)
        var sum = 0
        if (!state.isTimingOut()) {
          sum = count + v.get
          state.update(sum)
        } else {
          println("timeout")
        }
        Option(sum)
      })
      .timeout(Seconds(3))
    )
    // 这行代码是触发清除机制的关键
    // result.checkpoint(Seconds(6))
    result.print()
    ssc.start()
    ssc.awaitTermination()

使用上面的代码进行测试,设置过期时间为3s。但是3s过后发现key并没有过期,也不会被清除,大概30S之后被清除。

在9999端口输入一个tom后,不再进行任何操作。测试结果如下:

tom
Some(1)
-------------------------------------------
Time: 1618228587000 ms
-------------------------------------------
Some(1)
tom
None
timeout
-------------------------------------------
Time: 1618228614000 ms
-------------------------------------------
Some(0)

从测试结果可以看出,从输入到清除大概是27s。

我们现在将注释的代码放开,每6s进行checkpoint一次,输入tom:

tom
Some(1)
-------------------------------------------
Time: 1618228497000 ms
-------------------------------------------
Some(1)
tom
None
timeout
-------------------------------------------
Time: 1618228506000 ms
-------------------------------------------
Some(0)

从生成到清除用了9秒,正好是过期时间 + 下一个窗口时间,触发了checkpoint。

猜想

第一次学状态操作的时候,就考虑如何去掉一些过期的key,通过timeout()的方法没有完成自己想法,从网上也没有找到解决方案,所以就暂且搁置在一边了。后来又回过头来考虑这个问题,然后根据自己的想法去猜想、去验证。

1. 我先看的是mapWithState()的返回值

image.png

2. MapWithStateDStreamImpl

image.png

每个Dstream的计算逻辑都在compute()中,这里是调用了internalStream的getOrCompute(),根据继承关系,调用的是父类Dstream的此方法:

image.png

getOrCompute()主要功能为:计算、缓存、checkpoint。这里只需要记住几个地方:checkpointDuration,即checkpoint间隔,和调用了checkpoint()。其实真正的计算还是调用了compute(),接着去看compute()

3. InternalMapWithStateDStream

image.png

compute()里面也调用了getOrCompute()方法,其实和上面调用的一样,都是Dstream的,这里主要看的是使用createFromRDD()生成的StateRDD。

4. MapWithStateRDD

这个StateRDD就是参与状态计算的数据集合,首先看它是如何生成的: image.png

再看看StateRDD的compute()是如何计算的:

image.png

从compute()看出,当doFullScan为true的时候,才会触发过期key的清除,updateRecordWithData()负责全面扫描清除过期key

这不,思路就来了,我们只要找到开启FullScan的方法,不就可以自行触发清除机制了吗!

那么,我们先看看doFullScan的默认值:

image.png

默认是没开启的,接着通过快捷键看看哪些地方使用了doFullScan: image.png

从图中看出,有两处代码修改了doFullScan,我们找到这两处代码:

image.png

image.png

第一个基本上排除,那么就剩下第二个:checkpoint(),我们要知道的是,状态操作必须要checkpoint

还记得在2中的getOrCompute()吗,当checkpointDuration不为null的时候,调用checkpoint()。 我们来看3中InternalMapWithStateDStream是如何定义这个duration的:

image.png

image.png

如图,sideDuration是窗口时间,乘以系数10就是默认的checkpoint时长,所以当我设置窗口为3s时,checkpoint周期就是30s,30s才会清理一次过期key。

而通过checkpoint(interval)可以设置checkpoint的间隔,所以覆盖了上面程序中默认的30s。 image.png

5.MapWithStateRDDRecord

最后提一提,FullScan是在这个类中开启的,所以先看看这个Record的注释介绍: image.png

意思就是负责存储StateRDD的状态KV,updateRecordWithData()负责清除过期的Record,我们来看看这个方法的实现:

image.png

removeTimedoutData就是是否开启全面扫描,即doFullScan的值。

结语

写完看起来感觉真的是简简单单,逻辑看起来也比较清晰,但是自己去解决这个问题的时候也是花了一下午时间,过期key的清除与checkpoint有关也是我凭空弄猜想,然后分析了两次,某一瞬间才找到他们之间的关系。所以说,猜想和运气还是很重要的。

当然,找不到关于这块的文章和资料可能是因为这个知识点太小了。所以这次过后,要开始系统阅读Spark源码了,也希望在某一天能结合着自己的理解,写一下Spark的文章。



95后小程序员,写的都是日常工作中的亲身实践,置身于初学者的角度从0写到1,详细且认真。

文章会在公众号 [入门到放弃之路] 首发,期待你的关注。


相关文章
|
2天前
|
存储 缓存 关系型数据库
MySQL事务日志-Redo Log工作原理分析
事务的隔离性和原子性分别通过锁和事务日志实现,而持久性则依赖于事务日志中的`Redo Log`。在MySQL中,`Redo Log`确保已提交事务的数据能持久保存,即使系统崩溃也能通过重做日志恢复数据。其工作原理是记录数据在内存中的更改,待事务提交时写入磁盘。此外,`Redo Log`采用简单的物理日志格式和高效的顺序IO,确保快速提交。通过不同的落盘策略,可在性能和安全性之间做出权衡。
1519 4
|
29天前
|
弹性计算 人工智能 架构师
阿里云携手Altair共拓云上工业仿真新机遇
2024年9月12日,「2024 Altair 技术大会杭州站」成功召开,阿里云弹性计算产品运营与生态负责人何川,与Altair中国技术总监赵阳在会上联合发布了最新的“云上CAE一体机”。
阿里云携手Altair共拓云上工业仿真新机遇
|
5天前
|
人工智能 Rust Java
10月更文挑战赛火热启动,坚持热爱坚持创作!
开发者社区10月更文挑战,寻找热爱技术内容创作的你,欢迎来创作!
502 19
|
2天前
|
存储 SQL 关系型数据库
彻底搞懂InnoDB的MVCC多版本并发控制
本文详细介绍了InnoDB存储引擎中的两种并发控制方法:MVCC(多版本并发控制)和LBCC(基于锁的并发控制)。MVCC通过记录版本信息和使用快照读取机制,实现了高并发下的读写操作,而LBCC则通过加锁机制控制并发访问。文章深入探讨了MVCC的工作原理,包括插入、删除、修改流程及查询过程中的快照读取机制。通过多个案例演示了不同隔离级别下MVCC的具体表现,并解释了事务ID的分配和管理方式。最后,对比了四种隔离级别的性能特点,帮助读者理解如何根据具体需求选择合适的隔离级别以优化数据库性能。
179 1
|
8天前
|
JSON 自然语言处理 数据管理
阿里云百炼产品月刊【2024年9月】
阿里云百炼产品月刊【2024年9月】,涵盖本月产品和功能发布、活动,应用实践等内容,帮助您快速了解阿里云百炼产品的最新动态。
阿里云百炼产品月刊【2024年9月】
|
21天前
|
存储 关系型数据库 分布式数据库
GraphRAG:基于PolarDB+通义千问+LangChain的知识图谱+大模型最佳实践
本文介绍了如何使用PolarDB、通义千问和LangChain搭建GraphRAG系统,结合知识图谱和向量检索提升问答质量。通过实例展示了单独使用向量检索和图检索的局限性,并通过图+向量联合搜索增强了问答准确性。PolarDB支持AGE图引擎和pgvector插件,实现图数据和向量数据的统一存储与检索,提升了RAG系统的性能和效果。
|
9天前
|
Linux 虚拟化 开发者
一键将CentOs的yum源更换为国内阿里yum源
一键将CentOs的yum源更换为国内阿里yum源
457 5
|
7天前
|
存储 人工智能 搜索推荐
数据治理,是时候打破刻板印象了
瓴羊智能数据建设与治理产品Datapin全面升级,可演进扩展的数据架构体系为企业数据治理预留发展空间,推出敏捷版用以解决企业数据量不大但需构建数据的场景问题,基于大模型打造的DataAgent更是为企业用好数据资产提供了便利。
314 2
|
23天前
|
人工智能 IDE 程序员
期盼已久!通义灵码 AI 程序员开启邀测,全流程开发仅用几分钟
在云栖大会上,阿里云云原生应用平台负责人丁宇宣布,「通义灵码」完成全面升级,并正式发布 AI 程序员。
|
25天前
|
机器学习/深度学习 算法 大数据
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
2024“华为杯”数学建模竞赛,对ABCDEF每个题进行详细的分析,涵盖风电场功率优化、WLAN网络吞吐量、磁性元件损耗建模、地理环境问题、高速公路应急车道启用和X射线脉冲星建模等多领域问题,解析了问题类型、专业和技能的需要。
2608 22
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析