假设我试图对一些由对(a和b值)组成的样本数据进行统计。有些对存在多次,有些则不存在。
spark.createDataFrame([
Row(a=5, b=10), Row(a=5, b=10), Row(a=5, b=10),
Row(a=6, b=10), Row(a=6, b=10), Row(a=6, b=10), Row(a=6, b=10), Row(a=6, b=10), Row(a=6, b=10),
Row(a=5, b=11), Row(a=5, b=11),
Row(a=6, b=12), Row(a=6, b=12), Row(a=6, b=12), Row(a=6, b=12),
Row(a=5, b=5), Row(a=5, b=5), Row(a=5, b=5), Row(a=5, b=5), Row(a=5, b=5), Row(a=5, b=5), Row(a=5, b=5),
]).registerTempTable('mydata')
首先,我只是计算每对存在的频率:
spark.sql('''
SELECT a, b,
COUNT(*) as count
FROM mydata AS o
GROUP BY a, b
''').show()
输出:
a | b | count |
---|---|---|
6 | 12 | 4 |
5 | 5 | 7 |
6 | 10 | 6 |
5 | 10 | 3 |
5 | 11 | 2 |
现在,我想添加一个额外的列,其中包含一对存在的频率百分比与具有相同值的对的总数相比。为此,我尝试添加一个计算总数的相关子查询:
spark.sql('''
SELECT a, b,
COUNT(*) as count,
(COUNT(*) / (
SELECT COUNT(*) FROM mydata AS i WHERE o.a = i.a
)) as percentage
FROM mydata AS o
GROUP BY a, b
''').show()
我期待的是:
a | b | count | percentage | |
---|---|---|---|---|
6 | 12 | 4 | 0.4 | --> 10 pairs exist with a=6 --> 4/10 = 0.4 |
5 | 5 | 7 | 0.5833 | --> 12 pairs exist with a=5 --> 7/12 =0.5833 |
6 | 10 | 6 | 0.6 | --> ... |
5 | 10 | 3 | 0.25 | |
5 | 11 | 2 | 0.1666 |
我得到了什么:
py4j.protocol.Py4JJavaError: An error occurred while calling o371.showString.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: count(1)#382L
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.catalyst.expressions.BindReferences
$$ anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:91) at org.apache.spark.sql.catalyst.expressions.BindReferences $$
anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:90)
[...]
Caused by: java.lang.RuntimeException: Couldn't find count(1)#382L in [a#305L,b#306L,count(1)#379L]
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.expressions.BindReferences
$$ anonfun$bindReference$1 $$
anonfun$applyOrElse$1.apply(BoundAttribute.scala:97)
at org.apache.spark.sql.catalyst.expressions.BindReferences
$$ anonfun$bindReference$1 $$
anonfun$applyOrElse$1.apply(BoundAttribute.scala:91)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 80 more
这听起来有点令人困惑 - 某种程度上pyspark想要访问内部联接的计数?
我的子查询语法有什么问题吗?
第一个表中,您可以使用窗口函数计算百分比; sum(count) over (partition by a)将总结的count由a与所述结果的长度不减小,这允许用户通过直接在另一列来划分:
spark.sql('''
SELECT a, b,
COUNT(*) as count
FROM mydata AS o
GROUP BY a, b
''').registerTempTable('count')
spark.sql('''
SELECT *,
count / sum(count) over (partition by a) as percentage
FROM count
''').show() | |||
---|---|---|---|
a | b | count | percentage |
6 | 12 | 4 | 0.4 |
6 | 10 | 6 | 0.6 |
5 | 5 | 7 | 0.5833333333333334 |
5 | 10 | 3 | 0.25 |
5 | 11 | 2 | 0.16666666666666666 |
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。