在spark中,我想计算值是如何小于或等于其他值。我试图通过排名实现这一目标,但排名产生, [1,2,2,2,3,4] -> [1,2,2,2,5,6] 而我想要的是 [1,2,2,2,3,4] -> [1,4,4,4,5,6]
我可以通过排名,按等级分组然后根据组中的项目数量修改排名值来实现此目的。但这有点效率低下。有更好的方法吗?
编辑:添加了我想要完成的最小示例
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.rank
import org.apache.spark.sql.expressions.Window
object Question extends App {
val spark = SparkSession.builder.appName("Question").master("local[*]").getOrCreate()
import spark.implicits._
val win = Window.orderBy($"nums".asc)
Seq(1, 2, 2, 2, 3, 4)
.toDF("nums")
.select($"nums", rank.over(win).alias("rank"))
.as[(Int, Int)]
.groupByKey(_._2)
.mapGroups((rank, nums) => (rank, nums.toList.map(_._1)))
.map(x => (x._1 + x._2.length - 1, x._2))
.flatMap(x => x._2.map(num => (num, x._1)))
.toDF("nums", "rank")
.show(false)
}
输出:
nums | rank |
---|---|
1 | 1 |
2 | 4 |
2 | 4 |
2 | 4 |
3 | 5 |
4 | 6 |
使用窗口功能
scala> val df = Seq(1, 2, 2, 2, 3, 4).toDF("nums")
df: org.apache.spark.sql.DataFrame = [nums: int]
scala> df.createOrReplaceTempView("tbl")
scala> spark.sql(" with tab1(select nums, rank() over(order by nums) rk, count(*) over(partition by nums) cn from tbl) select nums, rk+cn-1 as rk2 from tab1 ").show(false)
18/11/28 02:20:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. | |
---|---|
nums | rk2 |
1 | 1 |
2 | 4 |
2 | 4 |
2 | 4 |
3 | 5 |
4 | 6 |
scala>
请注意,df不会在任何列上进行分区,因此spark会将所有数据移动到单个分区。
EDIT1:
scala> spark.sql(" select nums, rank() over(order by nums) + count(*) over(partition by nums) -1 as rk2 from tbl ").show
18/11/28 23:20:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. | |
---|---|
nums | rk2 |
1 | 1 |
2 | 4 |
2 | 4 |
2 | 4 |
3 | 5 |
4 | 6 |
scala>
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。