在PySpark中,重分区模块有一个可选的列参数,该参数会通过该键重分区数据框架。
我的问题是 - 当没有密钥时,Spark如何重新分配?我无法深入研究源代码,以找到Spark本身的位置。
def repartition(self, numPartitions, *cols):
"""
Returns a new :class:`DataFrame` partitioned by the given partitioning expressions. The
resulting DataFrame is hash partitioned.
:param numPartitions:
can be an int to specify the target number of partitions or a Column.
If it is a Column, it will be used as the first partitioning column. If not specified,
the default number of partitions is used.
.. versionchanged:: 1.6
Added optional arguments to specify the partitioning columns. Also made numPartitions
optional if partitioning columns are specified.
>>> df.repartition(10).rdd.getNumPartitions()
10
>>> data = df.union(df).repartition("age")
>>> data.show()
+---+-----+
|age| name|
+---+-----+
| 5| Bob|
| 5| Bob|
| 2|Alice|
| 2|Alice|
+---+-----+
>>> data = data.repartition(7, "age")
>>> data.show()
+---+-----+
|age| name|
+---+-----+
| 2|Alice|
| 5| Bob|
| 2|Alice|
| 5| Bob|
+---+-----+
>>> data.rdd.getNumPartitions()
7
"""
if isinstance(numPartitions, int):
if len(cols) == 0:
return DataFrame(self._jdf.repartition(numPartitions), self.sql_ctx)
else:
return DataFrame(
self._jdf.repartition(numPartitions, self._jcols(*cols)), self.sql_ctx)
elif isinstance(numPartitions, (basestring, Column)):
cols = (numPartitions, ) + cols
return DataFrame(self._jdf.repartition(self._jcols(*cols)), self.sql_ctx)
else:
raise TypeError("numPartitions should be an int or Column")
例如:调用这些行完全没问题,但我不知道它的作用。它是整条线的哈希值吗?或许数据框中的第一栏?
df_2 = df_1\
.where(sf.col('some_column') == 1)\
.repartition(32)\
.alias('df_2')
默认情况下,如果没有指定分区,则分区不是基于数据的特征,而是以跨节点的随机和统一方式分布。
后面的重新分区算法df.repartition执行完整数据混洗,并在分区之间平均分配数据。为了减少数据混洗,最好使用df.coalesce
以下是如何使用https://hackernoon.com/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4重新分区的一些很好的解释DataFrame
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。