我正在使用Spark 1.60和Scala 2.10.5
我有这样的数据帧,
id | needed |
---|---|
1 | 2 |
1 | 0 |
1 | 3 |
2 | 0 |
2 | 0 |
3 | 1 |
3 | 2 |
从这个df我创建了rdd这样的,
val dfRDD = df.rdd
从我rdd,我想分组id和计数needed是> 0。
((1,2),(2,0),(3,2))
所以,我试过这样的,
val groupedDF = dfRDD.map(x =>(x(0), x(1) > 0)).count.redueByKey(_+_)
在这种情况下,我收到一个错误:
错误:值>不是任何数据
问题是,在你map调用Row的apply 方法时,正如你在其scaladoc中看到的那样,该方法返回Any - 正如你可以看到的那样,错误和scaladoc中没有这样的方法在Any中 <
您可以使用该getAs[T]方法修复它。
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
val spark =
SparkSession
.builder
.master("local[*]")
.getOrCreate()
import spark.implicits._
val df =
List(
(1, 2),
(1, 0),
(1, 3),
(2, 0),
(2, 0),
(3, 1),
(3, 2)
).toDF("id", "needed")
val rdd: RDD[(Int, Int)] = df.rdd.map(row => (row.getAsInt, row.getAsInt))
从那里你可以继续聚合,你的逻辑中有一些错误。
首先,您不需要count通话。
第二,如果你需要计算的次数"needed"大于你不能做的次数_ + _,因为这是所需值的总和。
val grouped: RDD[(Int, Int)] = rdd.reduceByKey { (acc, v) => if (v > 0) acc + 1 else acc }
val result: Array[(Int, Int)] = grouped.collect()
// Array((1,3), (2,0), (3,2))
PS:你应该告诉你的教授升级到Spark 2和Scala 2.11;)
编辑
在上面的例子中使用case类。
final case class Data(id: Int, needed: Int)
val rdd: RDD[Data] = df.as[Data].rdd
val grouped: RDD[(Int, Int)] = rdd.map(d => d.id -> d.needed).reduceByKey { (acc, v) => if (v > 0) acc + 1 else acc }
val result: Array[(Int, Int)] = grouped.collect()
// Array((1,3), (2,0), (3,2))
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。