Spark Scala:如何同时过滤RDD和更新计数器-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

Spark Scala:如何同时过滤RDD和更新计数器

社区小助手 2018-12-19 16:46:33 1270

我的初始RDD是记录类型,记录的布局是:

a_key, b_key,c_key,f_name,l_name,address
现在我必须:

删除具有a_key或b_key或c_key为空/空的记录
我必须同时更新无效记录的计数器。
我这样试过:

sc.register( recordStatsAccumulator, "Stat accumulator for " + filename )

val nullFilteredRecords = records.map{ record =>

if( record.A_KEY.isEmpty ||

record.B_KEY.isEmpty ||
record.C_KEY.isEmpty )

{

recordStatsAccumulator.add( ValidationLoggingUtil.INVALID )

}

record

}
.filter( record =>

!record.A_KEY.isEmpty &&
  !record.B_KEY.isEmpty &&
  !record.C_KEY.isEmpty

)
但是,这段代码效率不高,因为整个RDD都是两次。首先,更新无效记录的计数器,然后再次删除无效记录。

有没有更好/更有效的方法来做到这一点?

分布式计算 Scala Spark
分享到
取消 提交回答
全部回答(1)
  • 社区小助手
    2019-07-17 23:23:02

    我想你可以一步完成这两项操作。像这样:

    val nullFilteredRecords = records.filter { record =>
    if( record.A_KEY.isEmpty ||

    record.B_KEY.isEmpty ||
    record.C_KEY.isEmpty ) {
    recordStatsAccumulator.add( ValidationLoggingUtil.INVALID )

    }
    !record.A_KEY.isEmpty && !record.B_KEY.isEmpty && !record.C_KEY.isEmpty
    }

    0 0
大数据
使用钉钉扫一扫加入圈子
+ 订阅

大数据计算实践乐园,近距离学习前沿技术

推荐文章
相似问题