开发者社区> 问答> 正文

计算dataframe列中的剩余金额

我有一个“容量”数据帧:

scala> sql("create table capacity (id String, capacity Int)");
scala> sql("insert into capacity values ('A', 50), ('B', 100)");
scala> sql("select * from capacity").show(false)

id capacity
A 50
B 100

我有另一个“使用过的”数据框,其中包含以下信息:

scala> sql ("create table used (id String, capacityId String, used Int)");
scala> sql ("insert into used values ('item1', 'A', 10), ('item2', 'A', 20), ('item3', 'A', 10), ('item4', 'B', 30), ('item5', 'B', 40), ('item6', 'B', 40)")
scala> sql("select * from used order by capacityId").show(false)

id capacityId used
item1 A 10
item3 A 10
item2 A 20
item6 B 40
item4 B 30
item5 B 40

“used”数据帧的列“capacityId”是“capacity”数据帧的列“id”的外键。我想计算“capacityLeft”列,它是该时间点的剩余量。

id capacityId used capacityLeft
item1 A 10 40 <- 50(capacity of 'A')-10
item3 A 10 30 <- 40-10
item2 A 20 10 <- 30-20
item6 B 40 60 <- 100(capacity of 'B')-40
item4 B 30 30 <- 60-30
item5 B 40 -10 <- 30-40

在实际的Senario中,“createdDate”列用于排序“used”数据帧列。

Spark版本:2.2

展开
收起
社区小助手 2018-12-12 11:20:12 2020 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    这可以通过在Spark中使用窗口函数来解决。请注意,要使其工作,需要存在一个跟踪每个行的行顺序的列capacityId。

    首先将两个数据框连接在一起:

    val df = used.join(capacity.withColumnRenamed("id", "capacityId"), Seq("capacityId"), "inner")
    这里capacity重命名数据框中的id以匹配数据框中的id名称,used以便不保留重复的列。

    现在创建一个窗口并计算使用过的列的cumsum。取值capacity并减去cumsum以获得剩余金额:

    val w = Window.partitionBy("capacityId").orderBy("createdDate")
    val df2 = df.withColumn("capacityLeft", $"capacity" - sum($"used").over(w))
    结果数据框与示例createdDate列:

    capacityIdidusedcreatedDatecapacitycapacityLeft
    Bitem640110060
    Bitem430210030
    Bitem5403100-10
    Aitem11015040
    Aitem31025030
    Aitem22035010

    现在可以删除任何不需要的列drop。

    2019-07-17 23:20:03
    赞同 展开评论 打赏
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载