开发者社区> 问答> 正文

从pyspark.sql.dataframe.DataFrame到arraytype

假设我有以下DataFrame。

import pyspark.sql.functions as f
from pyspark.sql.window import Window

l =[( 9 , 1, 'A' ),

( 9    , 2, 'B'  ),
( 9    , 3, 'C'  ),
( 9    , 4, 'D'  ),
( 10   , 1, 'A'  ),
( 10   , 2, 'B' )]

df = spark.createDataFrame(l, ['prod','rank', 'value'])
df.show()

prodrankvalue
91A
92B
93C
94D
101A
102B

如何创建一个带有数组的新框架,其中value列的值基于rank?

期望的输出:

l =[( 9 , ['A','B','C','D'] ),

( 10   , ['A','B'])]

l = spark.createDataFrame(l, ['prod', 'conc'])

prodconc
9[A, B, C, D]
10[A, B]

展开
收起
社区小助手 2018-12-19 16:53:18 3230 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    df = df.orderBy(["prod", "rank"], ascending=[1, 1])
    df = df.rdd.map(lambda r: (r.prod, r.value)).reduceByKey(lambda x,y: list(x) + list(y)).toDF(['prod','conc'])

    df.show()
    prod conc
    9 [A, B, C, D]
    10 [A, B]

    +----+------------+


    这是基于您指定的快速解决方案。

    w = Window.partitionBy('prod').orderBy('rank')
    desiredDF = df.withColumn('values_list', f.collect_list('value').over(w)).groupBy('prod').agg(f.max('values_list').alias('conc'))
    desiredDF.show()

    prod conc
    9 [A, B, C, D]
    10 [A, B]
    2019-07-17 23:23:03
    赞同 展开评论 打赏
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
Data Wrangling with PySpark for Data Scientists Who Know Pandas 立即下载
Comparison of Spark SQL with Hive 立即下载
LEARNINGS USING SPARK STREAMING & DATAFRAMES FOR WALMART SEARCH 立即下载