扩展到大型数据集#
pandas 提供了用于内存内分析的数据结构,这使得使用 pandas 分析比内存大的数据集变得有些棘手。即使是内存中占相当大比例的数据集也会变得难以处理,因为某些 pandas 操作需要创建中间副本。
本文档提供了一些将分析扩展到大型数据集的建议。它是对提升性能的补充,后者侧重于加速对内存中数据集的分析。
加载更少数据#
假设我们磁盘上的原始数据集有许多列。
In [1]: import pandas as pd
In [2]: import numpy as np
In [3]: def make_timeseries(start="2000-01-01", end="2000-12-31", freq="1D", seed=None):
...: index = pd.date_range(start=start, end=end, freq=freq, name="timestamp")
...: n = len(index)
...: state = np.random.RandomState(seed)
...: columns = {
...: "name": state.choice(["Alice", "Bob", "Charlie"], size=n),
...: "id": state.poisson(1000, size=n),
...: "x": state.rand(n) * 2 - 1,
...: "y": state.rand(n) * 2 - 1,
...: }
...: df = pd.DataFrame(columns, index=index, columns=sorted(columns))
...: if df.index[-1] == end:
...: df = df.iloc[:-1]
...: return df
...:
In [4]: timeseries = [
...: make_timeseries(freq="1min", seed=i).rename(columns=lambda x: f"{x}_{i}")
...: for i in range(10)
...: ]
...:
In [5]: ts_wide = pd.concat(timeseries, axis=1)
In [6]: ts_wide.head()
Out[6]:
id_0 name_0 x_0 ... name_9 x_9 y_9
timestamp ...
2000-01-01 00:00:00 977 Alice -0.821225 ... Charlie -0.957208 -0.757508
2000-01-01 00:01:00 1018 Bob -0.219182 ... Alice -0.414445 -0.100298
2000-01-01 00:02:00 927 Alice 0.660908 ... Charlie -0.325838 0.581859
2000-01-01 00:03:00 997 Bob -0.852458 ... Bob 0.992033 -0.686692
2000-01-01 00:04:00 965 Bob 0.717283 ... Charlie -0.924556 -0.184161
[5 rows x 40 columns]
In [7]: ts_wide.to_parquet("timeseries_wide.parquet")
要加载我们想要的列,我们有两种选择。选项 1 是加载所有数据,然后筛选出我们需要的部分。
In [8]: columns = ["id_0", "name_0", "x_0", "y_0"]
In [9]: pd.read_parquet("timeseries_wide.parquet")[columns]
Out[9]:
id_0 name_0 x_0 y_0
timestamp
2000-01-01 00:00:00 977 Alice -0.821225 0.906222
2000-01-01 00:01:00 1018 Bob -0.219182 0.350855
2000-01-01 00:02:00 927 Alice 0.660908 -0.798511
2000-01-01 00:03:00 997 Bob -0.852458 0.735260
2000-01-01 00:04:00 965 Bob 0.717283 0.393391
... ... ... ... ...
2000-12-30 23:56:00 1037 Bob -0.814321 0.612836
2000-12-30 23:57:00 980 Bob 0.232195 -0.618828
2000-12-30 23:58:00 965 Alice -0.231131 0.026310
2000-12-30 23:59:00 984 Alice 0.942819 0.853128
2000-12-31 00:00:00 1003 Alice 0.201125 -0.136655
[525601 rows x 4 columns]
选项 2 是只加载我们请求的列。
In [10]: pd.read_parquet("timeseries_wide.parquet", columns=columns)
Out[10]:
id_0 name_0 x_0 y_0
timestamp
2000-01-01 00:00:00 977 Alice -0.821225 0.906222
2000-01-01 00:01:00 1018 Bob -0.219182 0.350855
2000-01-01 00:02:00 927 Alice 0.660908 -0.798511
2000-01-01 00:03:00 997 Bob -0.852458 0.735260
2000-01-01 00:04:00 965 Bob 0.717283 0.393391
... ... ... ... ...
2000-12-30 23:56:00 1037 Bob -0.814321 0.612836
2000-12-30 23:57:00 980 Bob 0.232195 -0.618828
2000-12-30 23:58:00 965 Alice -0.231131 0.026310
2000-12-30 23:59:00 984 Alice 0.942819 0.853128
2000-12-31 00:00:00 1003 Alice 0.201125 -0.136655
[525601 rows x 4 columns]
如果我们测量这两个调用的内存使用量,我们会发现在本例中指定 columns
大约只使用了 1/10 的内存。
使用 pandas.read_csv()
,你可以指定 usecols
来限制读取到内存中的列。并非所有 pandas 可读取的文件格式都提供读取列子集的选项。
使用高效数据类型#
默认的 pandas 数据类型并不是内存效率最高的。对于具有相对较少唯一值的文本数据列(通常称为“低基数”数据)尤其如此。通过使用更高效的数据类型,你可以在内存中存储更大的数据集。
In [11]: ts = make_timeseries(freq="30s", seed=0)
In [12]: ts.to_parquet("timeseries.parquet")
In [13]: ts = pd.read_parquet("timeseries.parquet")
In [14]: ts
Out[14]:
id name x y
timestamp
2000-01-01 00:00:00 1041 Alice 0.889987 0.281011
2000-01-01 00:00:30 988 Bob -0.455299 0.488153
2000-01-01 00:01:00 1018 Alice 0.096061 0.580473
2000-01-01 00:01:30 992 Bob 0.142482 0.041665
2000-01-01 00:02:00 960 Bob -0.036235 0.802159
... ... ... ... ...
2000-12-30 23:58:00 1022 Alice 0.266191 0.875579
2000-12-30 23:58:30 974 Alice -0.009826 0.413686
2000-12-30 23:59:00 1028 Charlie 0.307108 -0.656789
2000-12-30 23:59:30 1002 Alice 0.202602 0.541335
2000-12-31 00:00:00 987 Alice 0.200832 0.615972
[1051201 rows x 4 columns]
现在,让我们检查数据类型和内存使用情况,看看我们应该把注意力集中在哪里。
In [15]: ts.dtypes
Out[15]:
id int64
name object
x float64
y float64
dtype: object
In [16]: ts.memory_usage(deep=True) # memory usage in bytes
Out[16]:
Index 8409608
id 8409608
name 65176434
x 8409608
y 8409608
dtype: int64
name
列占用的内存比其他任何列都多。它只有少量唯一值,因此很适合转换为 pandas.Categorical
。使用 pandas.Categorical
,我们只存储每个唯一名称一次,并使用节省空间的整数来表示每行中使用的具体名称。
In [17]: ts2 = ts.copy()
In [18]: ts2["name"] = ts2["name"].astype("category")
In [19]: ts2.memory_usage(deep=True)
Out[19]:
Index 8409608
id 8409608
name 1051495
x 8409608
y 8409608
dtype: int64
我们可以更进一步,使用 pandas.to_numeric()
将数值列向下转换为它们的最小类型。
In [20]: ts2["id"] = pd.to_numeric(ts2["id"], downcast="unsigned")
In [21]: ts2[["x", "y"]] = ts2[["x", "y"]].apply(pd.to_numeric, downcast="float")
In [22]: ts2.dtypes
Out[22]:
id uint16
name category
x float32
y float32
dtype: object
In [23]: ts2.memory_usage(deep=True)
Out[23]:
Index 8409608
id 2102402
name 1051495
x 4204804
y 4204804
dtype: int64
In [24]: reduction = ts2.memory_usage(deep=True).sum() / ts.memory_usage(deep=True).sum()
In [25]: print(f"{reduction:0.2f}")
0.20
总而言之,我们将这个数据集的内存占用量减少到了其原始大小的 1/5。
有关 pandas.Categorical
的更多信息,请参阅 分类数据;有关所有 pandas dtypes 的概述,请参阅 dtypes。
使用分块#
有些工作负载可以通过分块来实现,即将一个大问题拆分成许多小问题。例如,将单个 CSV 文件转换为 Parquet 文件,并对目录中的每个文件重复此操作。只要每个分块都能装入内存,你就可以处理比内存大得多的数据集。
注意
当你执行的操作需要零或最少的块间协调时,分块效果很好。对于更复杂的工作流程,你最好使用其他库。
假设我们磁盘上有一个更大的“逻辑数据集”,它是一个 Parquet 文件目录。目录中的每个文件代表整个数据集的不同年份。
In [26]: import pathlib
In [27]: N = 12
In [28]: starts = [f"20{i:>02d}-01-01" for i in range(N)]
In [29]: ends = [f"20{i:>02d}-12-13" for i in range(N)]
In [30]: pathlib.Path("data/timeseries").mkdir(exist_ok=True)
In [31]: for i, (start, end) in enumerate(zip(starts, ends)):
....: ts = make_timeseries(start=start, end=end, freq="1min", seed=i)
....: ts.to_parquet(f"data/timeseries/ts-{i:0>2d}.parquet")
....:
data
└── timeseries
├── ts-00.parquet
├── ts-01.parquet
├── ts-02.parquet
├── ts-03.parquet
├── ts-04.parquet
├── ts-05.parquet
├── ts-06.parquet
├── ts-07.parquet
├── ts-08.parquet
├── ts-09.parquet
├── ts-10.parquet
└── ts-11.parquet
现在我们将实现一个核外(out-of-core)的 pandas.Series.value_counts()
。此工作流的峰值内存使用量是单个最大分块,加上一个存储到目前为止唯一值计数的少量系列。只要每个独立文件都能装入内存,这将适用于任意大小的数据集。
In [32]: %%time
....: files = pathlib.Path("data/timeseries/").glob("ts*.parquet")
....: counts = pd.Series(dtype=int)
....: for path in files:
....: df = pd.read_parquet(path)
....: counts = counts.add(df["name"].value_counts(), fill_value=0)
....: counts.astype(int)
....:
CPU times: user 1.01 s, sys: 37.3 ms, total: 1.04 s
Wall time: 1.03 s
Out[32]:
name
Alice 1994645
Bob 1993692
Charlie 1994875
dtype: int64
一些读取器,例如 pandas.read_csv()
,提供了参数来控制读取单个文件时的 chunksize
。
对于不需要过于复杂操作的工作流,手动分块是一个不错的选择。但某些操作,如 pandas.DataFrame.groupby()
,很难进行分块处理。在这种情况下,你最好切换到其他为你实现这些核外算法的库。
使用其他库#
还有其他一些库提供了与 pandas 相似的 API,并能很好地与 pandas DataFrame 配合使用,它们可以通过并行运行时、分布式内存、集群等方式为你提供扩展大型数据集处理和分析的能力。你可以在生态系统页面找到更多信息。