我的数据集有~2亿行,~10个分组变量,和~20个变量,是一个~50GB的csv。我做的第一件事是查看运行时是按顺序运行的,但是是按块运行的。它有点复杂,因为有些groupbys实际上在另一个数据集中的不同聚合级别,所以它只有~200mb。现在相关的代码是这样的:
group_cols = ['cols','to','group','by']
cols_to_summarize = ['cols','to','summarize']
groupbys = []
df = pd.read_csv("file/path/df.csv",chunksize=1000000)
for chunk in df:
chunk = chunk.merge(other_df,left_on="id",right_index=True,how="inner")
groupbys.append(chunk.groupby(group_cols)[cols_to_summarize].sum())
finalAgg = pd.concat(groupbys).groupby(group_cols)[cols_to_summarize].sum()
每个数据块大约需要5秒钟来处理,所以200个数据块大约需要15-20分钟。我正在使用的服务器有16个内核,所以我希望能加快速度,如果能达到2-3分钟就太好了。 但是,当我尝试使用多进程时,我很难得到更多的加速。基于我的谷歌搜索,我认为这将有助于在CSVs阅读,但我想知道,如果多个进程不能读取相同的CSV,也许我应该先分割它?这是我试过的,比连续运行的时间长:
def agg_chunk(start):
[pull in small dataset]
chunk = pd.read_csv("file/path/df.csv",skiprows=range(1,start+1),nrows=1000000)
chunk = chunk.merge(other_df,left_on="id",right_index=True,how="inner")
return chunk.groupby(group_cols)[cols_to_summarize].sum()
if __name__ == "__main__":
pool = mp.Pool(16)
r = list(np.array(range(200))*1000000)
groupbys = pool.map(agg_chunk,r)
finalAgg = pd.concat(groupbys).groupby(group_cols)[cols_to_summarize].sum()
有更好的方法吗?额外的[拉入小数据集]块大约需要5秒,但是将每个进程的时间加倍,然后除以16应该仍然是一个很好的加速,不是吗?相反,并行版本已经运行了半个小时,仍然没有完成。还有,有没有什么方法可以将数据集传递给每个进程,而不是让每个进程重新创建数据集? 问题来源StackOverflow 地址:/questions/59385043/python-multiprocessing-to-speed-up-i-o-and-groupby-sum
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在处理大规模数据集时,使用Pandas直接操作CSV文件确实会遇到性能瓶颈,尤其是涉及到I/O操作和内存管理。针对您的问题,有几个策略可以尝试以提高效率:
Dask DataFrame: Dask 是一个并行计算库,设计用于处理大规模数据集。它允许您像使用Pandas一样编写代码,但会在后台进行分布式或并行计算。Dask DataFrames可以自动分割数据并在多个核心上并行处理。
import dask.dataframe as dd
# 读取csv为Dask DataFrame
df = dd.read_csv("file/path/df.csv", assume_missing=True)
# 合并其他DataFrame(确保other_df也转换为Dask DataFrame)
other_df_dask = dd.from_pandas(other_df, npartitions=16) # 假设other_df较小
df = df.merge(other_df_dask, left_on="id", right_index=True, how="inner")
# 分组聚合
finalAgg = df.groupby(group_cols)[cols_to_summarize].sum().compute()
compute()
方法会触发实际的计算过程,并将结果返回为Pandas DataFrame。
使用Parquet等列式存储格式: 列式存储格式如Parquet比CSV更高效,尤其是在处理大数据时。您可以先将CSV转换为Parquet,然后用Dask或Pandas读取。
优化内存使用: 确保在合并和分组前对数据进行适当的清洗和类型转换,避免不必要的内存消耗。例如,如果某些列可以安全地转换为更节省空间的数据类型,应该提前做这个转换。
多进程中的数据共享: 在多进程中直接共享大型数据集是不现实的,因为每个进程都有独立的内存空间。但是,通过预先分割大文件成多个小文件,然后让每个进程处理一个单独的文件,可以避免重复加载整个数据集。这正是您尝试的方法之一,但要注意的是,I/O操作可能成为新的瓶颈,特别是当磁盘速度较慢时。
考虑使用Spark: 如果有集群资源可用,Apache Spark是一个强大的工具,专为大规模数据处理而设计。它提供了丰富的数据处理功能,并且天然支持分布式计算。
综上所述,我建议首先尝试使用Dask,因为它能提供与Pandas类似的API,同时利用多核处理器进行并行计算,无需手动管理进程池和数据分割。如果Dask仍然不能满足需求,再考虑是否需要升级到更复杂的解决方案,如Spark。