我在spark中有一个DataFrame,如下所示:
0 | true
1 | true
2 | false
3 | true
4 | true
5 | true
6 | false
7 | false
8 | true
9 | false
我希望得到另一个列,如果它有当前rowNumber flag == false,或者是下一个false值的rowNumber,那么输出将是这样的:
0 | true | 2
1 | true | 2
2 | false | 2
3 | true | 6
4 | true | 6
5 | true | 6
6 | false | 6
7 | false | 7
8 | true | 9
9 | false | 9
我想以矢量化的方式(不是按行迭代)来做这件事。所以我有效地想要逻辑:
对于每一行,获取min id大于或等于当前rowNum,其中有一个标志== false
我认为你的意思是你想循环数据框并有一个带退出的子循环。我找不到这样的例子,事实上我不确定它是否符合SPARK范式。获得相同的结果,处理更少:
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.expressions.Window
val df = Seq((0, true), (1, true), (2,false), (3, true), (4,true), (5,true), (6,false), (7,false), (8,true), (9,false)).toDF("id","flag")
@transient val w1 = org.apache.spark.sql.expressions.Window.orderBy("id1")
val ids = df.where("flag = false")
.select($"id".as("id1"))
val ids2 = ids.select($"*", lag("id1",1,-1).over(w1).alias("prev_id"))
val ids3 = ids2.withColumn("prev_id1", col("prev_id")+1).drop("prev_id")
// Less and better performance at scale, this is better theoretically for Catalyst to bound partitions? Less work to do in any event.
// Some understanding of data required! And no grouping and min.
val withNextFalse = df.join(ids3, df("id") >= ids3("prev_id1") && df("id") <= ids3("id1"))
.select($"id", $"flag", $"id1".alias("nextOrCurrentFalse"))
.orderBy(asc("id"),asc("id"))
withNextFalse.show(false)
返回:
id | flag | nextOrCurrentFalse |
---|---|---|
0 | true | 2 |
1 | true | 2 |
2 | false | 2 |
3 | true | 6 |
4 | true | 6 |
5 | true | 6 |
6 | false | 6 |
7 | false | 7 |
8 | true | 9 |
9 | false | 9 |
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。