我有一个“容量”数据帧:
scala> sql("create table capacity (id String, capacity Int)");
scala> sql("insert into capacity values ('A', 50), ('B', 100)");
scala> sql("select * from capacity").show(false)
id | capacity |
---|---|
A | 50 |
B | 100 |
我有另一个“使用过的”数据框,其中包含以下信息:
scala> sql ("create table used (id String, capacityId String, used Int)");
scala> sql ("insert into used values ('item1', 'A', 10), ('item2', 'A', 20), ('item3', 'A', 10), ('item4', 'B', 30), ('item5', 'B', 40), ('item6', 'B', 40)")
scala> sql("select * from used order by capacityId").show(false)
id | capacityId | used |
---|---|---|
item1 | A | 10 |
item3 | A | 10 |
item2 | A | 20 |
item6 | B | 40 |
item4 | B | 30 |
item5 | B | 40 |
“used”数据帧的列“capacityId”是“capacity”数据帧的列“id”的外键。我想计算“capacityLeft”列,它是该时间点的剩余量。
id | capacityId | used | capacityLeft | |
---|---|---|---|---|
item1 | A | 10 | 40 | <- 50(capacity of 'A')-10 |
item3 | A | 10 | 30 | <- 40-10 |
item2 | A | 20 | 10 | <- 30-20 |
item6 | B | 40 | 60 | <- 100(capacity of 'B')-40 |
item4 | B | 30 | 30 | <- 60-30 |
item5 | B | 40 | -10 | <- 30-40 |
在实际的Senario中,“createdDate”列用于排序“used”数据帧列。
Spark版本:2.2
这可以通过在Spark中使用窗口函数来解决。请注意,要使其工作,需要存在一个跟踪每个行的行顺序的列capacityId。
首先将两个数据框连接在一起:
val df = used.join(capacity.withColumnRenamed("id", "capacityId"), Seq("capacityId"), "inner")
这里capacity重命名数据框中的id以匹配数据框中的id名称,used以便不保留重复的列。
现在创建一个窗口并计算使用过的列的cumsum。取值capacity并减去cumsum以获得剩余金额:
val w = Window.partitionBy("capacityId").orderBy("createdDate")
val df2 = df.withColumn("capacityLeft", $"capacity" - sum($"used").over(w))
结果数据框与示例createdDate列:
capacityId | id | used | createdDate | capacity | capacityLeft |
---|---|---|---|---|---|
B | item6 | 40 | 1 | 100 | 60 |
B | item4 | 30 | 2 | 100 | 30 |
B | item5 | 40 | 3 | 100 | -10 |
A | item1 | 10 | 1 | 50 | 40 |
A | item3 | 10 | 2 | 50 | 30 |
A | item2 | 20 | 3 | 50 | 10 |
现在可以删除任何不需要的列drop。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。