开发者社区 问答 正文

Python多处理来加速I/O和Groupby/Sum

我的数据集有~2亿行,~10个分组变量,和~20个变量,是一个~50GB的csv。我做的第一件事是查看运行时是按顺序运行的,但是是按块运行的。它有点复杂,因为有些groupbys实际上在另一个数据集中的不同聚合级别,所以它只有~200mb。现在相关的代码是这样的:

group_cols = ['cols','to','group','by']
cols_to_summarize = ['cols','to','summarize']
groupbys = []
df = pd.read_csv("file/path/df.csv",chunksize=1000000)
for chunk in df:
  chunk = chunk.merge(other_df,left_on="id",right_index=True,how="inner")
  groupbys.append(chunk.groupby(group_cols)[cols_to_summarize].sum())
finalAgg = pd.concat(groupbys).groupby(group_cols)[cols_to_summarize].sum()

每个数据块大约需要5秒钟来处理,所以200个数据块大约需要15-20分钟。我正在使用的服务器有16个内核,所以我希望能加快速度,如果能达到2-3分钟就太好了。 但是,当我尝试使用多进程时,我很难得到更多的加速。基于我的谷歌搜索,我认为这将有助于在CSVs阅读,但我想知道,如果多个进程不能读取相同的CSV,也许我应该先分割它?这是我试过的,比连续运行的时间长:

def agg_chunk(start):
  [pull in small dataset]
  chunk = pd.read_csv("file/path/df.csv",skiprows=range(1,start+1),nrows=1000000)
  chunk = chunk.merge(other_df,left_on="id",right_index=True,how="inner")
  return chunk.groupby(group_cols)[cols_to_summarize].sum()
if __name__ == "__main__":
  pool = mp.Pool(16)
  r = list(np.array(range(200))*1000000)
  groupbys = pool.map(agg_chunk,r)
  finalAgg = pd.concat(groupbys).groupby(group_cols)[cols_to_summarize].sum()

有更好的方法吗?额外的[拉入小数据集]块大约需要5秒,但是将每个进程的时间加倍,然后除以16应该仍然是一个很好的加速,不是吗?相反,并行版本已经运行了半个小时,仍然没有完成。还有,有没有什么方法可以将数据集传递给每个进程,而不是让每个进程重新创建数据集? 问题来源StackOverflow 地址:/questions/59385043/python-multiprocessing-to-speed-up-i-o-and-groupby-sum

展开
收起
kun坤 2019-12-26 10:50:38 870 分享 版权
阿里云 AI 助理回答

在处理大规模数据集时,使用Pandas直接操作CSV文件确实会遇到性能瓶颈,尤其是涉及到I/O操作和内存管理。针对您的问题,有几个策略可以尝试以提高效率:

  1. Dask DataFrame: Dask 是一个并行计算库,设计用于处理大规模数据集。它允许您像使用Pandas一样编写代码,但会在后台进行分布式或并行计算。Dask DataFrames可以自动分割数据并在多个核心上并行处理。

    import dask.dataframe as dd
    
    # 读取csv为Dask DataFrame
    df = dd.read_csv("file/path/df.csv", assume_missing=True)
    
    # 合并其他DataFrame(确保other_df也转换为Dask DataFrame)
    other_df_dask = dd.from_pandas(other_df, npartitions=16) # 假设other_df较小
    df = df.merge(other_df_dask, left_on="id", right_index=True, how="inner")
    
    # 分组聚合
    finalAgg = df.groupby(group_cols)[cols_to_summarize].sum().compute()
    

    compute()方法会触发实际的计算过程,并将结果返回为Pandas DataFrame。

  2. 使用Parquet等列式存储格式: 列式存储格式如Parquet比CSV更高效,尤其是在处理大数据时。您可以先将CSV转换为Parquet,然后用Dask或Pandas读取。

  3. 优化内存使用: 确保在合并和分组前对数据进行适当的清洗和类型转换,避免不必要的内存消耗。例如,如果某些列可以安全地转换为更节省空间的数据类型,应该提前做这个转换。

  4. 多进程中的数据共享: 在多进程中直接共享大型数据集是不现实的,因为每个进程都有独立的内存空间。但是,通过预先分割大文件成多个小文件,然后让每个进程处理一个单独的文件,可以避免重复加载整个数据集。这正是您尝试的方法之一,但要注意的是,I/O操作可能成为新的瓶颈,特别是当磁盘速度较慢时。

  5. 考虑使用Spark: 如果有集群资源可用,Apache Spark是一个强大的工具,专为大规模数据处理而设计。它提供了丰富的数据处理功能,并且天然支持分布式计算。

综上所述,我建议首先尝试使用Dask,因为它能提供与Pandas类似的API,同时利用多核处理器进行并行计算,无需手动管理进程池和数据分割。如果Dask仍然不能满足需求,再考虑是否需要升级到更复杂的解决方案,如Spark。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答
问答分类:
问答地址: