扩展到大型数据集
pandas 提供了用于内存分析的数据结构,这使得使用 pandas 分析大于内存数据集的数据集有些棘手。即使是占用相当大内存的数据集也变得难以处理,因为一些 pandas 操作需要进行中间复制。
本文提供了一些建议,以便将您的分析扩展到更大的数据集。这是对提高性能的补充,后者侧重于加快适���内存的数据集的分析。
加载更少的数据
假设我们在磁盘上的原始数据集有许多列。
In [1]: import pandas as pd In [2]: import numpy as np In [3]: def make_timeseries(start="2000-01-01", end="2000-12-31", freq="1D", seed=None): ...: index = pd.date_range(start=start, end=end, freq=freq, name="timestamp") ...: n = len(index) ...: state = np.random.RandomState(seed) ...: columns = { ...: "name": state.choice(["Alice", "Bob", "Charlie"], size=n), ...: "id": state.poisson(1000, size=n), ...: "x": state.rand(n) * 2 - 1, ...: "y": state.rand(n) * 2 - 1, ...: } ...: df = pd.DataFrame(columns, index=index, columns=sorted(columns)) ...: if df.index[-1] == end: ...: df = df.iloc[:-1] ...: return df ...: In [4]: timeseries = [ ...: make_timeseries(freq="1min", seed=i).rename(columns=lambda x: f"{x}_{i}") ...: for i in range(10) ...: ] ...: In [5]: ts_wide = pd.concat(timeseries, axis=1) In [6]: ts_wide.head() Out[6]: id_0 name_0 x_0 ... name_9 x_9 y_9 timestamp ... 2000-01-01 00:00:00 977 Alice -0.821225 ... Charlie -0.957208 -0.757508 2000-01-01 00:01:00 1018 Bob -0.219182 ... Alice -0.414445 -0.100298 2000-01-01 00:02:00 927 Alice 0.660908 ... Charlie -0.325838 0.581859 2000-01-01 00:03:00 997 Bob -0.852458 ... Bob 0.992033 -0.686692 2000-01-01 00:04:00 965 Bob 0.717283 ... Charlie -0.924556 -0.184161 [5 rows x 40 columns] In [7]: ts_wide.to_parquet("timeseries_wide.parquet")
要加载我们想要的列,我们有两个选项。选项 1 加载所有数据,然后筛选我们需要的数据。
In [8]: columns = ["id_0", "name_0", "x_0", "y_0"] In [9]: pd.read_parquet("timeseries_wide.parquet")[columns] Out[9]: id_0 name_0 x_0 y_0 timestamp 2000-01-01 00:00:00 977 Alice -0.821225 0.906222 2000-01-01 00:01:00 1018 Bob -0.219182 0.350855 2000-01-01 00:02:00 927 Alice 0.660908 -0.798511 2000-01-01 00:03:00 997 Bob -0.852458 0.735260 2000-01-01 00:04:00 965 Bob 0.717283 0.393391 ... ... ... ... ... 2000-12-30 23:56:00 1037 Bob -0.814321 0.612836 2000-12-30 23:57:00 980 Bob 0.232195 -0.618828 2000-12-30 23:58:00 965 Alice -0.231131 0.026310 2000-12-30 23:59:00 984 Alice 0.942819 0.853128 2000-12-31 00:00:00 1003 Alice 0.201125 -0.136655 [525601 rows x 4 columns]
选项 2 仅加载我们请求的列。
In [10]: pd.read_parquet("timeseries_wide.parquet", columns=columns) Out[10]: id_0 name_0 x_0 y_0 timestamp 2000-01-01 00:00:00 977 Alice -0.821225 0.906222 2000-01-01 00:01:00 1018 Bob -0.219182 0.350855 2000-01-01 00:02:00 927 Alice 0.660908 -0.798511 2000-01-01 00:03:00 997 Bob -0.852458 0.735260 2000-01-01 00:04:00 965 Bob 0.717283 0.393391 ... ... ... ... ... 2000-12-30 23:56:00 1037 Bob -0.814321 0.612836 2000-12-30 23:57:00 980 Bob 0.232195 -0.618828 2000-12-30 23:58:00 965 Alice -0.231131 0.026310 2000-12-30 23:59:00 984 Alice 0.942819 0.853128 2000-12-31 00:00:00 1003 Alice 0.201125 -0.136655 [525601 rows x 4 columns]
如果我们测量这两个调用的内存使用情况,我们会发现在这种情况下指定columns
使用的内存约为 1/10。
使用pandas.read_csv()
,您可以指定usecols
来限制读入内存的列。并非所有可以被 pandas 读取的文件格式都提供读取子集列的选项。
使用高效的数据类型
默认的 pandas 数据类型并不是最节省内存的。特别是对于具有相对少量唯一值的文本数据列(通常称为“低基数”数据),这一点尤为明显。通过使用更高效的数据类型,您可以在内存中存储更大的数据集。
In [11]: ts = make_timeseries(freq="30s", seed=0) In [12]: ts.to_parquet("timeseries.parquet") In [13]: ts = pd.read_parquet("timeseries.parquet") In [14]: ts Out[14]: id name x y timestamp 2000-01-01 00:00:00 1041 Alice 0.889987 0.281011 2000-01-01 00:00:30 988 Bob -0.455299 0.488153 2000-01-01 00:01:00 1018 Alice 0.096061 0.580473 2000-01-01 00:01:30 992 Bob 0.142482 0.041665 2000-01-01 00:02:00 960 Bob -0.036235 0.802159 ... ... ... ... ... 2000-12-30 23:58:00 1022 Alice 0.266191 0.875579 2000-12-30 23:58:30 974 Alice -0.009826 0.413686 2000-12-30 23:59:00 1028 Charlie 0.307108 -0.656789 2000-12-30 23:59:30 1002 Alice 0.202602 0.541335 2000-12-31 00:00:00 987 Alice 0.200832 0.615972 [1051201 rows x 4 columns]
现在,让我们检查数据类型和内存使用情况,看看我们应该关注哪些方面。
In [15]: ts.dtypes Out[15]: id int64 name object x float64 y float64 dtype: object
In [16]: ts.memory_usage(deep=True) # memory usage in bytes Out[16]: Index 8409608 id 8409608 name 65176434 x 8409608 y 8409608 dtype: int64
name
列占用的内存比其他任何列都多得多。它只有几个唯一值,因此很适合转换为pandas.Categorical
。使用pandas.Categorical
,我们只需一次存储每个唯一名称,并使用节省空间的整数来知道每行中使用了哪个特定名称。
In [17]: ts2 = ts.copy() In [18]: ts2["name"] = ts2["name"].astype("category") In [19]: ts2.memory_usage(deep=True) Out[19]: Index 8409608 id 8409608 name 1051495 x 8409608 y 8409608 dtype: int64
我们可以进一步将数值列降级为它们的最小类型,使用pandas.to_numeric()
。
In [20]: ts2["id"] = pd.to_numeric(ts2["id"], downcast="unsigned") In [21]: ts2[["x", "y"]] = ts2[["x", "y"]].apply(pd.to_numeric, downcast="float") In [22]: ts2.dtypes Out[22]: id uint16 name category x float32 y float32 dtype: object
In [23]: ts2.memory_usage(deep=True) Out[23]: Index 8409608 id 2102402 name 1051495 x 4204804 y 4204804 dtype: int64
In [24]: reduction = ts2.memory_usage(deep=True).sum() / ts.memory_usage(deep=True).sum() In [25]: print(f"{reduction:0.2f}") 0.20
总的来说,我们将这个数据集的内存占用减少到原始大小的 1/5。
有关pandas.Categorical
的更多信息,请参阅分类数据,有关 pandas 所有数据类型的概述,请参阅数据类型。
使用分块加载
通过将一个大问题分成一堆小问题,一些工作负载可以通过分块来实现。例如,将单个 CSV 文件转换为 Parquet 文件,并为目录中的每个文件重复此操作。只要每个块适合内存,您就可以处理比内存大得多的数据集。
注意
当你执行的操作需要零或最小的块之间协调时,分块工作效果很好。对于更复杂的工作流程,最好使用其他库。
假设我们在磁盘上有一个更大的“逻辑数据集”,它是一个 parquet 文件目录。目录中的每个文件代表整个数据集的不同年份。
In [26]: import pathlib In [27]: N = 12 In [28]: starts = [f"20{i:>02d}-01-01" for i in range(N)] In [29]: ends = [f"20{i:>02d}-12-13" for i in range(N)] In [30]: pathlib.Path("data/timeseries").mkdir(exist_ok=True) In [31]: for i, (start, end) in enumerate(zip(starts, ends)): ....: ts = make_timeseries(start=start, end=end, freq="1min", seed=i) ....: ts.to_parquet(f"data/timeseries/ts-{i:0>2d}.parquet") ....:
data └── timeseries ├── ts-00.parquet ├── ts-01.parquet ├── ts-02.parquet ├── ts-03.parquet ├── ts-04.parquet ├── ts-05.parquet ├── ts-06.parquet ├── ts-07.parquet ├── ts-08.parquet ├── ts-09.parquet ├── ts-10.parquet └── ts-11.parquet
现在我们将实现一个分布式的pandas.Series.value_counts()
。这个工作流程的峰值内存使用量是最大块的内存,再加上一个小系列存储到目前为止的唯一值计数。只要每个单独的文件都适合内存,这将适用于任意大小的数据集。
In [32]: %%time ....: files = pathlib.Path("data/timeseries/").glob("ts*.parquet") ....: counts = pd.Series(dtype=int) ....: for path in files: ....: df = pd.read_parquet(path) ....: counts = counts.add(df["name"].value_counts(), fill_value=0) ....: counts.astype(int) ....: CPU times: user 760 ms, sys: 26.1 ms, total: 786 ms Wall time: 559 ms Out[32]: name Alice 1994645 Bob 1993692 Charlie 1994875 dtype: int64
一些读取器,比如pandas.read_csv()
,在读取单个文件时提供了控制chunksize
的参数。
手动分块是一个适合不需要太复杂操作的工作流程的选择。一些操作,比如pandas.DataFrame.groupby()
,在块方式下要困难得多。在这些情况下,最好切换到一个实现这些分布式算法的不同库。
使用其他库
还有其他类似于 pandas 并与 pandas DataFrame 很好配合的库,可以通过并行运行时、分布式内存、集群等功能来扩展大型数据集的处理和分析能力。您可以在生态系统页面找到更多信息。
加载更少的数据
假设我们在磁盘上的原始数据集有许多列。
In [1]: import pandas as pd In [2]: import numpy as np In [3]: def make_timeseries(start="2000-01-01", end="2000-12-31", freq="1D", seed=None): ...: index = pd.date_range(start=start, end=end, freq=freq, name="timestamp") ...: n = len(index) ...: state = np.random.RandomState(seed) ...: columns = { ...: "name": state.choice(["Alice", "Bob", "Charlie"], size=n), ...: "id": state.poisson(1000, size=n), ...: "x": state.rand(n) * 2 - 1, ...: "y": state.rand(n) * 2 - 1, ...: } ...: df = pd.DataFrame(columns, index=index, columns=sorted(columns)) ...: if df.index[-1] == end: ...: df = df.iloc[:-1] ...: return df ...: In [4]: timeseries = [ ...: make_timeseries(freq="1min", seed=i).rename(columns=lambda x: f"{x}_{i}") ...: for i in range(10) ...: ] ...: In [5]: ts_wide = pd.concat(timeseries, axis=1) In [6]: ts_wide.head() Out[6]: id_0 name_0 x_0 ... name_9 x_9 y_9 timestamp ... 2000-01-01 00:00:00 977 Alice -0.821225 ... Charlie -0.957208 -0.757508 2000-01-01 00:01:00 1018 Bob -0.219182 ... Alice -0.414445 -0.100298 2000-01-01 00:02:00 927 Alice 0.660908 ... Charlie -0.325838 0.581859 2000-01-01 00:03:00 997 Bob -0.852458 ... Bob 0.992033 -0.686692 2000-01-01 00:04:00 965 Bob 0.717283 ... Charlie -0.924556 -0.184161 [5 rows x 40 columns] In [7]: ts_wide.to_parquet("timeseries_wide.parquet")
要加载我们想要的列,我们有两个选项。选项 1 加载所有数据,然后筛选我们需要的数据。
In [8]: columns = ["id_0", "name_0", "x_0", "y_0"] In [9]: pd.read_parquet("timeseries_wide.parquet")[columns] Out[9]: id_0 name_0 x_0 y_0 timestamp 2000-01-01 00:00:00 977 Alice -0.821225 0.906222 2000-01-01 00:01:00 1018 Bob -0.219182 0.350855 2000-01-01 00:02:00 927 Alice 0.660908 -0.798511 2000-01-01 00:03:00 997 Bob -0.852458 0.735260 2000-01-01 00:04:00 965 Bob 0.717283 0.393391 ... ... ... ... ... 2000-12-30 23:56:00 1037 Bob -0.814321 0.612836 2000-12-30 23:57:00 980 Bob 0.232195 -0.618828 2000-12-30 23:58:00 965 Alice -0.231131 0.026310 2000-12-30 23:59:00 984 Alice 0.942819 0.853128 2000-12-31 00:00:00 1003 Alice 0.201125 -0.136655 [525601 rows x 4 columns]
选项 2 只加载我们请求的列。
In [10]: pd.read_parquet("timeseries_wide.parquet", columns=columns) Out[10]: id_0 name_0 x_0 y_0 timestamp 2000-01-01 00:00:00 977 Alice -0.821225 0.906222 2000-01-01 00:01:00 1018 Bob -0.219182 0.350855 2000-01-01 00:02:00 927 Alice 0.660908 -0.798511 2000-01-01 00:03:00 997 Bob -0.852458 0.735260 2000-01-01 00:04:00 965 Bob 0.717283 0.393391 ... ... ... ... ... 2000-12-30 23:56:00 1037 Bob -0.814321 0.612836 2000-12-30 23:57:00 980 Bob 0.232195 -0.618828 2000-12-30 23:58:00 965 Alice -0.231131 0.026310 2000-12-30 23:59:00 984 Alice 0.942819 0.853128 2000-12-31 00:00:00 1003 Alice 0.201125 -0.136655 [525601 rows x 4 columns]
如果我们测量这两个调用的内存使用情况,我们会发现在这种情况下指定columns
使用的内存约为 1/10。
使用pandas.read_csv()
,您可以指定usecols
来限制读入内存的列。并非所有可以被 pandas 读取的文件格式都提供了读取子集列的选项。
使用高效的数据类型
默认的 pandas 数据类型不是最节省内存的。对于具有相对少量唯一值的文本数据列(通常称为“低基数”数据),这一点尤为明显。通过使用更高效的数据类型,您可以在内存中存储更大的数据集。
In [11]: ts = make_timeseries(freq="30s", seed=0) In [12]: ts.to_parquet("timeseries.parquet") In [13]: ts = pd.read_parquet("timeseries.parquet") In [14]: ts Out[14]: id name x y timestamp 2000-01-01 00:00:00 1041 Alice 0.889987 0.281011 2000-01-01 00:00:30 988 Bob -0.455299 0.488153 2000-01-01 00:01:00 1018 Alice 0.096061 0.580473 2000-01-01 00:01:30 992 Bob 0.142482 0.041665 2000-01-01 00:02:00 960 Bob -0.036235 0.802159 ... ... ... ... ... 2000-12-30 23:58:00 1022 Alice 0.266191 0.875579 2000-12-30 23:58:30 974 Alice -0.009826 0.413686 2000-12-30 23:59:00 1028 Charlie 0.307108 -0.656789 2000-12-30 23:59:30 1002 Alice 0.202602 0.541335 2000-12-31 00:00:00 987 Alice 0.200832 0.615972 [1051201 rows x 4 columns]
现在,让我们检查数据类型和内存使用情况,看看我们应该把注意力放在哪里。
In [15]: ts.dtypes Out[15]: id int64 name object x float64 y float64 dtype: object
In [16]: ts.memory_usage(deep=True) # memory usage in bytes Out[16]: Index 8409608 id 8409608 name 65176434 x 8409608 y 8409608 dtype: int64
name
列占用的内存比其他任何列都多。它只有很少的唯一值,因此很适合转换为pandas.Categorical
。使用pandas.Categorical
,我们只需一次存储每个唯一名称,并使用空间高效的整数来知道每行中使用了哪个特定名称。
In [17]: ts2 = ts.copy() In [18]: ts2["name"] = ts2["name"].astype("category") In [19]: ts2.memory_usage(deep=True) Out[19]: Index 8409608 id 8409608 name 1051495 x 8409608 y 8409608 dtype: int64
我们可以进一步将数值列降级为它们的最小类型,使用pandas.to_numeric()
。
In [20]: ts2["id"] = pd.to_numeric(ts2["id"], downcast="unsigned") In [21]: ts2[["x", "y"]] = ts2[["x", "y"]].apply(pd.to_numeric, downcast="float") In [22]: ts2.dtypes Out[22]: id uint16 name category x float32 y float32 dtype: object
In [23]: ts2.memory_usage(deep=True) Out[23]: Index 8409608 id 2102402 name 1051495 x 4204804 y 4204804 dtype: int64
In [24]: reduction = ts2.memory_usage(deep=True).sum() / ts.memory_usage(deep=True).sum() In [25]: print(f"{reduction:0.2f}") 0.20
总的来说,我们已将此数据集的内存占用减少到原始大小的 1/5。
请查看 Categorical data 以了解更多关于pandas.Categorical
和 dtypes 以获得 pandas 所有 dtypes 的概述。
使用分块
通过将一个大问题分解为一堆小问题,可以使用分块来实现某些工作负载。例如,将单个 CSV 文件转换为 Parquet 文件,并为目录中的每个文件重复此操作。只要每个块适合内存,您就可以处理比内存大得多的数据集。
注意
当您执行的操作需要零或最小的分块之间协调时,分块效果很好。对于更复杂的工作流程,最好使用其他库。
假设我们在磁盘上有一个更大的“逻辑数据集”,它是一个 parquet 文件目录。目录中的每个文件代表整个数据集的不同年份。
In [26]: import pathlib In [27]: N = 12 In [28]: starts = [f"20{i:>02d}-01-01" for i in range(N)] In [29]: ends = [f"20{i:>02d}-12-13" for i in range(N)] In [30]: pathlib.Path("data/timeseries").mkdir(exist_ok=True) In [31]: for i, (start, end) in enumerate(zip(starts, ends)): ....: ts = make_timeseries(start=start, end=end, freq="1min", seed=i) ....: ts.to_parquet(f"data/timeseries/ts-{i:0>2d}.parquet") ....:
data └── timeseries ├── ts-00.parquet ├── ts-01.parquet ├── ts-02.parquet ├── ts-03.parquet ├── ts-04.parquet ├── ts-05.parquet ├── ts-06.parquet ├── ts-07.parquet ├── ts-08.parquet ├── ts-09.parquet ├── ts-10.parquet └── ts-11.parquet
现在我们将实现一个基于磁盘的pandas.Series.value_counts()
。此工作流的峰值内存使用量是最大的单个块,再加上一个小系列,用于存储到目前为止的唯一值计数。只要每个单独的文件都适合内存,这将适用于任意大小的数据集。
In [32]: %%time ....: files = pathlib.Path("data/timeseries/").glob("ts*.parquet") ....: counts = pd.Series(dtype=int) ....: for path in files: ....: df = pd.read_parquet(path) ....: counts = counts.add(df["name"].value_counts(), fill_value=0) ....: counts.astype(int) ....: CPU times: user 760 ms, sys: 26.1 ms, total: 786 ms Wall time: 559 ms Out[32]: name Alice 1994645 Bob 1993692 Charlie 1994875 dtype: int64
一些读取器,如pandas.read_csv()
,在读取单个文件时提供控制chunksize
的参数。
手动分块是一个适用于不需要太复杂操作的工作流程的选择。一些操作,比如pandas.DataFrame.groupby()
,在分块方式下要困难得多。在这些情况下,最好切换到另一个库,该库为您实现这些基于外存储算法。
使用其他库
还有其他库提供类似于 pandas 的 API,并与 pandas DataFrame 很好地配合,可以通过并行运行时、分布式内存、集群等功能来扩展大型数据集的处理和分析能力。您可以在生态系统页面找到更多信息。
Pandas 2.2 中文官方教程和指南(二十四)(2)https://developer.aliyun.com/article/1508865