离线计算平台系列之一

简介: # 离线计算平台简介 在蚂蚁金服风控体系里面,有一个重要的环节就是离线仿真,在规则,模型上线之前,在离线的环境里面进行仿真验证,来对规则和模型进行效能的评估,避免人为因素造成不准确性从而造成的资损。起初为了达到这个目的,离线计算平台就这样孕育而生了,慢慢地整个离线平台覆盖了更多风控的业务,也慢慢变成目前Odps-Spark最大的用户,拥有的集群数目也是最大的。离线计算平台主要以Spark为基

离线计算平台简介

在蚂蚁金服风控体系里面,有一个重要的环节就是离线仿真,在规则,模型上线之前,在离线的环境里面进行仿真验证,来对规则和模型进行效能的评估,避免人为因素造成不准确性从而造成的资损。起初为了达到这个目的,离线计算平台就这样孕育而生了,慢慢地整个离线平台覆盖了更多风控的业务,也慢慢变成目前Odps-Spark最大的用户,拥有的集群数目也是最大的。离线计算平台主要以Spark为基础,在其上建立起来的一套平台. 后面我们团队会给大家带来一系列,关于离线平台的架构以及我们做过相应业务以及经验,希望和大家一起来探讨。
下面由我来给大家分享下,我们整个团队建立起离线计算平台里面的SparkContext管理以及几个Spark优化手段。

SparkContext 管理

在我们的离线业务场景里面,我们需要持续地接受用户提交实验任务进行分析以及运算,所以对于Spark的要求第一点就是需要Long-Live的SparkContext, 可以持续接受任务提交,而不是类似d2那样提交一次性的任务。所以我们以目前Odps-Spark Client模式为基础,建立了一套自己的SparkContext 管理系统,提供了动态新增,删除SparkContext功能,可以持续地提交任务,调度相应任务的功能。

Spark1

由于SparkContext需要一个独立的JVM进程服务,我们目前是利用RMI来完成启动SparkContext的工作。 在应用服务器上面,当它被通知自己需要启动SparkContext的时候,就会在进行相应的Spark Jar准备,然后进行打包,并且启动相应的RMI进程来当做SparkContext. 在任务执行的流程中,前端系统会向离线系统提交任务,离线系统就有一套自己的任务调度系统,根据目前管理的SparkContext里面选择一个相对比较空闲的,进行任务提交,否则就进行等待。当任务提交到RMI Server之后,就会对任务进行相应的组装以及转化成Spark可以执行的任务,进行执行并且同Spark Cluster进行交互。最终Spark任务结果会同步地返回给RMI Server, 然后再通过RMI Server异步的通知到应用服务器,并且进行任务的进一步处理。

经验分享:控制好每个SparkContext的并发度,充分利用SparkContext的能力(目前我们一个SparkContext上面并发3个任务,后续会引入动态任务大小的评估,从而实现真正的动态任务提交以及资源分配),统一管理每个SparkContext下面的并发线程,而且不是每个任务都去创建线程池,特别注意从RMI Server返回给离线系统数据,不能带有Scala相关的东西,离线系统无法解析。优雅地停掉SparkContext, 防止资源的泄露。


Spark程序调优手段

对于风控来说,查看用户历史数据来进行判断风险的行为是一种常见的手段,所以动不动就30天的交易记录,90的某相累计数据。。。 做离线的小伙伴已经哭晕在厕所,在做Spark离线的期间,我们也产出了自己的东西,这里我就介绍三种比较有意义的内容,后续欢迎大家来交流。

  • 动态加载Jar
    由于安全性的问题,目前Odps-Spark并没有实现动态加载Jar的功能,那如果真有这部分需求的时候,我们应该怎么做呢?例如有些规则脚本在我们实验的时候进行了更新,离线也需要同步更新,可是这里SparkContext已经创建出来,没有办法再新增Jar了。目前我们的解决方案是,在执行任务的时候,把这些Jar打好包,并且创建出一个ClassLoader来加载这些可变的Jar, 然后在实验的时候,把Classloader 通过broadcast机制分发到各个worker上面,然后再需要利用这些Jar的闭包里面,通过这个classloader进行加载并且执行。这里需要注意的问题是: 多线程的问题, 因为broadcast value每台worker只有一份,如果一个台worker上面有多个cpu去访问broadcast value的时候,如果没有控制好多线程的问题,就会出现一些奇葩问题,而且这种问题不好排查,因为已经在worker上面,不好去打日志进行验证。所以在处理Broadcast value的时候,尽量做到线程安全。
  • 减少shuffle key的数目
    在Spark开发中,基本上都会用到join, cogroup等操作,这类操作就会产生shuffle操作, Spark程序的性能很大程度就是取决于shuffle的性能上面,除了调优修改shuffle的参数(spark.shuffle.memoryFraction, spark.shuffle.manager, spark.shuffle.file.buffer等), 也可以利用其它手段来完成

    1. map side join, 特别是在两个RDD, 一个数据量小, 一个数据量大的情况,我们可以把数据量小的那个做成broadcast, 从而把这次shuffle操作转变成一次map操作,大大地减少shuffle中的性能消耗。
    2. 利用Bloom Filter过滤掉多余的key, 这也是我们在实践发现的,当两个RDD进行cogroup的时候,其实会有很多无用的key, 例如用userId进行关联的时候,会很多无用的userId进行干扰,但是他们也参与整个shuffle的流程中,这时候我们可以把key数目相对比较少的那个RDD的key收集来(可能会有多次collect操作,因为每次collect操作有大小限制),然后把这些做成Bloom Filter去过滤另外一个RDD里面的key, 从而达到减少shuffle中间的数据量的效果。
  • 数据倾斜的解决
    当你在logview上面发现你的2400分区的数据,2399都跑完,另外一个分区怎么跑都跑不完,并且执行时间已经远远超越同伴啦。那么恭喜你,很有可能是数据倾斜发生了。目前介绍下我们的做法

    • 首先是去采集哪些key会出现数据倾斜,这里可以使用groupbykey,然后进行count, 如果这样都会挂掉,那么进行sample抽样来解决,随机抽样10%的数据来进行判断。
    • 找到这样的key之后, 在RDD A里面过滤掉热点对应的key, 形成nonHot的RDD A, 然后针对有HotKey的RDD, 里面的每一个key打上 n 以内的随机数作为后缀。

        hasHotEvent.map(x =>   
      if (hotKeySet.value.contains(x.1)) {  
      (x.1 + "@" + Random.nextInt(hashPartitionSize), x.2)  
       else   
      x  
        
      })
    • 在RDD B里面首先同样先过滤掉热点的key, 形成nonHot的RDD B, 然后针对HotKey的RDD, 里面每一个key按顺序附加 0 - n 的后缀,每条hotKey的数据就会膨胀成n条数据

    1. hotValues = res.filter(x => hotKeySet.value.contains(x.1)).flatMap(x =>
      for (i <- 0.until(hashPartitionSize)) yield {

      (x.1 + "@" + i, x.2)  

      }

      })
    • 最后notHotKey的RDD 进行join, hotkey的RDD进行join, 最后再进行union操作就可以等到最后的结果。

经验总结:主要思路就是把少数的key,打散成n份去进行join,这样就不会集中大量的数据在一个worker中,并且把它高挂。如果针对RDD里面很多的key都是hot key的情况,就无需过滤hot key, 直接给一个RDD打上随机数,另外一个RDD扩容n份进行join.


To Be Continued

还有很多细节以及经验想跟大家分享,奈何实力有限,后续的分享会持续到来,大家会看到变态数据点优化,velocity数据结构优化,会针对新版的内存计算平台进行相应的改造以及相关的技术落地。 以上都是来自 蚂蚁金服-大安全-成都安全技术平台的小伙伴们。顺便打个小广告,我们在大量招人,希望回成都的同学,以及想做离线这块的同学(当然我们也很多实时的业务),可以联系@锋扬,@云震。

目录
相关文章
|
5月前
|
消息中间件 存储 NoSQL
离线与实时数据开发方案
离线与实时数据开发方案
61 0
|
存储 JSON 固态存储
【离线】esrally实践总结
1.真正的离线安装esrally 2.术语介绍,官方数据集、track介绍 3.官方数据集下载 4.离线使用esrally测试现有ES测试集群 5.对比两次race(测试)的结果 6.测试时间太长怎么办? 7.报告分析
2806 2
【离线】esrally实践总结
|
8月前
|
存储 JSON 自然语言处理
【ODPS新品发布第2期】实时数仓Hologres:推出计算组实例/支持JSON数据/向量计算+大模型等新能力
本期将重点介绍Hologres推出计算组实例,Hologres支持JSON数据 ,Hologres向量计算+大模型能力,Hologres数据同步新能力,Hologres数据分层存储
|
8月前
|
canal SQL 弹性计算
实时数据及离线数据上云方案
本实验通过使用CANAL、DataHub、DataWorks、MaxCompute服务,实现数据上云,解决了数据孤岛问题,同时把数据迁移到云计算平台,对后续数据的计算和应用提供了第一步开山之路。
161 0
|
SQL 存储 运维
如何选择数据集成方式-离线&实时
如何在项目交付过程中合理经济的选择数据集成方案
948 1
如何选择数据集成方式-离线&实时
|
人工智能 文字识别 API
FastDeploy完成实时扣图模型部署
FastDeploy完成实时扣图模型部署
501 0
FastDeploy完成实时扣图模型部署
|
边缘计算 Kubernetes 物联网
k3s 离线部署指南
k3s 离线部署指南
|
机器学习/深度学习 人工智能 分布式计算
离线实时一体化新能力解读| 学习笔记
快速学习离线实时一体化新能力解读
355 0
离线实时一体化新能力解读| 学习笔记
|
数据采集 消息中间件 数据可视化
离线计算-本地数据注册成表|学习笔记
快速学习离线计算-本地数据注册成表
105 0
离线计算-本地数据注册成表|学习笔记
EMQ
|
SQL 存储 运维
流批结合计算以及更多原生分析能力支持
十月,eKuiper发布了1.7.0版本:引入了查询表和可更新Sink的概念,支持数据流与外部存储的数据一起计算,进一步完善了流批结合的实时计算能力。
EMQ
653 0
流批结合计算以及更多原生分析能力支持