离线计算平台简介
在蚂蚁金服风控体系里面,有一个重要的环节就是离线仿真,在规则,模型上线之前,在离线的环境里面进行仿真验证,来对规则和模型进行效能的评估,避免人为因素造成不准确性从而造成的资损。起初为了达到这个目的,离线计算平台就这样孕育而生了,慢慢地整个离线平台覆盖了更多风控的业务,也慢慢变成目前Odps-Spark最大的用户,拥有的集群数目也是最大的。离线计算平台主要以Spark为基础,在其上建立起来的一套平台. 后面我们团队会给大家带来一系列,关于离线平台的架构以及我们做过相应业务以及经验,希望和大家一起来探讨。
下面由我来给大家分享下,我们整个团队建立起离线计算平台里面的SparkContext管理以及几个Spark优化手段。
SparkContext 管理
在我们的离线业务场景里面,我们需要持续地接受用户提交实验任务进行分析以及运算,所以对于Spark的要求第一点就是需要Long-Live的SparkContext, 可以持续接受任务提交,而不是类似d2那样提交一次性的任务。所以我们以目前Odps-Spark Client模式为基础,建立了一套自己的SparkContext 管理系统,提供了动态新增,删除SparkContext功能,可以持续地提交任务,调度相应任务的功能。
由于SparkContext需要一个独立的JVM进程服务,我们目前是利用RMI来完成启动SparkContext的工作。 在应用服务器上面,当它被通知自己需要启动SparkContext的时候,就会在进行相应的Spark Jar准备,然后进行打包,并且启动相应的RMI进程来当做SparkContext. 在任务执行的流程中,前端系统会向离线系统提交任务,离线系统就有一套自己的任务调度系统,根据目前管理的SparkContext里面选择一个相对比较空闲的,进行任务提交,否则就进行等待。当任务提交到RMI Server之后,就会对任务进行相应的组装以及转化成Spark可以执行的任务,进行执行并且同Spark Cluster进行交互。最终Spark任务结果会同步地返回给RMI Server, 然后再通过RMI Server异步的通知到应用服务器,并且进行任务的进一步处理。
经验分享:控制好每个SparkContext的并发度,充分利用SparkContext的能力(目前我们一个SparkContext上面并发3个任务,后续会引入动态任务大小的评估,从而实现真正的动态任务提交以及资源分配),统一管理每个SparkContext下面的并发线程,而且不是每个任务都去创建线程池,特别注意从RMI Server返回给离线系统数据,不能带有Scala相关的东西,离线系统无法解析。优雅地停掉SparkContext, 防止资源的泄露。
Spark程序调优手段
对于风控来说,查看用户历史数据来进行判断风险的行为是一种常见的手段,所以动不动就30天的交易记录,90的某相累计数据。。。 做离线的小伙伴已经哭晕在厕所,在做Spark离线的期间,我们也产出了自己的东西,这里我就介绍三种比较有意义的内容,后续欢迎大家来交流。
- 动态加载Jar
由于安全性的问题,目前Odps-Spark并没有实现动态加载Jar的功能,那如果真有这部分需求的时候,我们应该怎么做呢?例如有些规则脚本在我们实验的时候进行了更新,离线也需要同步更新,可是这里SparkContext已经创建出来,没有办法再新增Jar了。目前我们的解决方案是,在执行任务的时候,把这些Jar打好包,并且创建出一个ClassLoader来加载这些可变的Jar, 然后在实验的时候,把Classloader 通过broadcast机制分发到各个worker上面,然后再需要利用这些Jar的闭包里面,通过这个classloader进行加载并且执行。这里需要注意的问题是: 多线程的问题, 因为broadcast value每台worker只有一份,如果一个台worker上面有多个cpu去访问broadcast value的时候,如果没有控制好多线程的问题,就会出现一些奇葩问题,而且这种问题不好排查,因为已经在worker上面,不好去打日志进行验证。所以在处理Broadcast value的时候,尽量做到线程安全。 减少shuffle key的数目
在Spark开发中,基本上都会用到join, cogroup等操作,这类操作就会产生shuffle操作, Spark程序的性能很大程度就是取决于shuffle的性能上面,除了调优修改shuffle的参数(spark.shuffle.memoryFraction, spark.shuffle.manager, spark.shuffle.file.buffer等), 也可以利用其它手段来完成- map side join, 特别是在两个RDD, 一个数据量小, 一个数据量大的情况,我们可以把数据量小的那个做成broadcast, 从而把这次shuffle操作转变成一次map操作,大大地减少shuffle中的性能消耗。
- 利用Bloom Filter过滤掉多余的key, 这也是我们在实践发现的,当两个RDD进行cogroup的时候,其实会有很多无用的key, 例如用userId进行关联的时候,会很多无用的userId进行干扰,但是他们也参与整个shuffle的流程中,这时候我们可以把key数目相对比较少的那个RDD的key收集来(可能会有多次collect操作,因为每次collect操作有大小限制),然后把这些做成Bloom Filter去过滤另外一个RDD里面的key, 从而达到减少shuffle中间的数据量的效果。
数据倾斜的解决
当你在logview上面发现你的2400分区的数据,2399都跑完,另外一个分区怎么跑都跑不完,并且执行时间已经远远超越同伴啦。那么恭喜你,很有可能是数据倾斜发生了。目前介绍下我们的做法- 首先是去采集哪些key会出现数据倾斜,这里可以使用groupbykey,然后进行count, 如果这样都会挂掉,那么进行sample抽样来解决,随机抽样10%的数据来进行判断。
找到这样的key之后, 在RDD A里面过滤掉热点对应的key, 形成nonHot的RDD A, 然后针对有HotKey的RDD, 里面的每一个key打上 n 以内的随机数作为后缀。
hasHotEvent.map(x => if (hotKeySet.value.contains(x.1)) { (x.1 + "@" + Random.nextInt(hashPartitionSize), x.2) else x })
在RDD B里面首先同样先过滤掉热点的key, 形成nonHot的RDD B, 然后针对HotKey的RDD, 里面每一个key按顺序附加 0 - n 的后缀,每条hotKey的数据就会膨胀成n条数据
hotValues = res.filter(x => hotKeySet.value.contains(x.1)).flatMap(x =>
for (i <- 0.until(hashPartitionSize)) yield {(x.1 + "@" + i, x.2)
}
})
- 最后notHotKey的RDD 进行join, hotkey的RDD进行join, 最后再进行union操作就可以等到最后的结果。
经验总结:主要思路就是把少数的key,打散成n份去进行join,这样就不会集中大量的数据在一个worker中,并且把它高挂。如果针对RDD里面很多的key都是hot key的情况,就无需过滤hot key, 直接给一个RDD打上随机数,另外一个RDD扩容n份进行join.
To Be Continued
还有很多细节以及经验想跟大家分享,奈何实力有限,后续的分享会持续到来,大家会看到变态数据点优化,velocity数据结构优化,会针对新版的内存计算平台进行相应的改造以及相关的技术落地。 以上都是来自 蚂蚁金服-大安全-成都安全技术平台的小伙伴们。顺便打个小广告,我们在大量招人,希望回成都的同学,以及想做离线这块的同学(当然我们也很多实时的业务),可以联系@锋扬,@云震。