扩展到大型数据集#

pandas 提供了用于内存内分析的数据结构,这使得使用 pandas 分析大于内存数据集的数据集变得有些棘手。即使是占内存相当一部分的数据集也会变得难以处理,因为一些 pandas 操作需要进行中间复制。

本文档提供了一些将分析扩展到更大数据集的建议。它是对 提高性能 的补充,后者侧重于加快对适合内存的数据集的分析。

加载较少数据#

假设我们磁盘上的原始数据集有许多列。

In [1]: import pandas as pd

In [2]: import numpy as np

In [3]: def make_timeseries(start="2000-01-01", end="2000-12-31", freq="1D", seed=None):
   ...:     index = pd.date_range(start=start, end=end, freq=freq, name="timestamp")
   ...:     n = len(index)
   ...:     state = np.random.RandomState(seed)
   ...:     columns = {
   ...:         "name": state.choice(["Alice", "Bob", "Charlie"], size=n),
   ...:         "id": state.poisson(1000, size=n),
   ...:         "x": state.rand(n) * 2 - 1,
   ...:         "y": state.rand(n) * 2 - 1,
   ...:     }
   ...:     df = pd.DataFrame(columns, index=index, columns=sorted(columns))
   ...:     if df.index[-1] == end:
   ...:         df = df.iloc[:-1]
   ...:     return df
   ...: 

In [4]: timeseries = [
   ...:     make_timeseries(freq="1min", seed=i).rename(columns=lambda x: f"{x}_{i}")
   ...:     for i in range(10)
   ...: ]
   ...: 

In [5]: ts_wide = pd.concat(timeseries, axis=1)

In [6]: ts_wide.head()
Out[6]: 
                     id_0 name_0       x_0  ...   name_9       x_9       y_9
timestamp                                   ...                             
2000-01-01 00:00:00   977  Alice -0.821225  ...  Charlie -0.957208 -0.757508
2000-01-01 00:01:00  1018    Bob -0.219182  ...    Alice -0.414445 -0.100298
2000-01-01 00:02:00   927  Alice  0.660908  ...  Charlie -0.325838  0.581859
2000-01-01 00:03:00   997    Bob -0.852458  ...      Bob  0.992033 -0.686692
2000-01-01 00:04:00   965    Bob  0.717283  ...  Charlie -0.924556 -0.184161

[5 rows x 40 columns]

In [7]: ts_wide.to_parquet("timeseries_wide.parquet")

要加载我们想要的列,我们有两个选择。选项 1 加载所有数据,然后过滤到我们需要的部分。

In [8]: columns = ["id_0", "name_0", "x_0", "y_0"]

In [9]: pd.read_parquet("timeseries_wide.parquet")[columns]
Out[9]: 
                     id_0 name_0       x_0       y_0
timestamp                                           
2000-01-01 00:00:00   977  Alice -0.821225  0.906222
2000-01-01 00:01:00  1018    Bob -0.219182  0.350855
2000-01-01 00:02:00   927  Alice  0.660908 -0.798511
2000-01-01 00:03:00   997    Bob -0.852458  0.735260
2000-01-01 00:04:00   965    Bob  0.717283  0.393391
...                   ...    ...       ...       ...
2000-12-30 23:56:00  1037    Bob -0.814321  0.612836
2000-12-30 23:57:00   980    Bob  0.232195 -0.618828
2000-12-30 23:58:00   965  Alice -0.231131  0.026310
2000-12-30 23:59:00   984  Alice  0.942819  0.853128
2000-12-31 00:00:00  1003  Alice  0.201125 -0.136655

[525601 rows x 4 columns]

选项 2 只加载我们请求的列。

In [10]: pd.read_parquet("timeseries_wide.parquet", columns=columns)
Out[10]: 
                     id_0 name_0       x_0       y_0
timestamp                                           
2000-01-01 00:00:00   977  Alice -0.821225  0.906222
2000-01-01 00:01:00  1018    Bob -0.219182  0.350855
2000-01-01 00:02:00   927  Alice  0.660908 -0.798511
2000-01-01 00:03:00   997    Bob -0.852458  0.735260
2000-01-01 00:04:00   965    Bob  0.717283  0.393391
...                   ...    ...       ...       ...
2000-12-30 23:56:00  1037    Bob -0.814321  0.612836
2000-12-30 23:57:00   980    Bob  0.232195 -0.618828
2000-12-30 23:58:00   965  Alice -0.231131  0.026310
2000-12-30 23:59:00   984  Alice  0.942819  0.853128
2000-12-31 00:00:00  1003  Alice  0.201125 -0.136655

[525601 rows x 4 columns]

如果我们测量这两个调用的内存使用情况,我们会发现指定 columns 在这种情况下大约使用 1/10 的内存。

使用 pandas.read_csv(),您可以指定 usecols 来限制加载到内存中的列。并非所有可以被 pandas 读取的文件格式都提供读取列子集的选项。

使用高效的数据类型#

默认的 pandas 数据类型不是最节省内存的。对于具有相对较少唯一值的文本数据列(通常称为“低基数”数据),尤其如此。通过使用更有效的数据类型,您可以将更大的数据集存储在内存中。

In [11]: ts = make_timeseries(freq="30s", seed=0)

In [12]: ts.to_parquet("timeseries.parquet")

In [13]: ts = pd.read_parquet("timeseries.parquet")

In [14]: ts
Out[14]: 
                       id     name         x         y
timestamp                                             
2000-01-01 00:00:00  1041    Alice  0.889987  0.281011
2000-01-01 00:00:30   988      Bob -0.455299  0.488153
2000-01-01 00:01:00  1018    Alice  0.096061  0.580473
2000-01-01 00:01:30   992      Bob  0.142482  0.041665
2000-01-01 00:02:00   960      Bob -0.036235  0.802159
...                   ...      ...       ...       ...
2000-12-30 23:58:00  1022    Alice  0.266191  0.875579
2000-12-30 23:58:30   974    Alice -0.009826  0.413686
2000-12-30 23:59:00  1028  Charlie  0.307108 -0.656789
2000-12-30 23:59:30  1002    Alice  0.202602  0.541335
2000-12-31 00:00:00   987    Alice  0.200832  0.615972

[1051201 rows x 4 columns]

现在,让我们检查数据类型和内存使用情况,看看我们应该把注意力集中在哪里。

In [15]: ts.dtypes
Out[15]: 
id        int64
name     object
x       float64
y       float64
dtype: object
In [16]: ts.memory_usage(deep=True)  # memory usage in bytes
Out[16]: 
Index     8409608
id        8409608
name     65176434
x         8409608
y         8409608
dtype: int64

name 列占用的内存比其他任何列都多。它只有几个唯一值,因此它是转换为 pandas.Categorical 的良好候选者。使用 pandas.Categorical,我们只存储每个唯一名称一次,并使用节省空间的整数来知道每行使用哪个特定名称。

In [17]: ts2 = ts.copy()

In [18]: ts2["name"] = ts2["name"].astype("category")

In [19]: ts2.memory_usage(deep=True)
Out[19]: 
Index    8409608
id       8409608
name     1051495
x        8409608
y        8409608
dtype: int64

我们可以更进一步,使用 pandas.to_numeric() 将数值列向下转换为最小的类型。

In [20]: ts2["id"] = pd.to_numeric(ts2["id"], downcast="unsigned")

In [21]: ts2[["x", "y"]] = ts2[["x", "y"]].apply(pd.to_numeric, downcast="float")

In [22]: ts2.dtypes
Out[22]: 
id        uint16
name    category
x        float32
y        float32
dtype: object
In [23]: ts2.memory_usage(deep=True)
Out[23]: 
Index    8409608
id       2102402
name     1051495
x        4204804
y        4204804
dtype: int64
In [24]: reduction = ts2.memory_usage(deep=True).sum() / ts.memory_usage(deep=True).sum()

In [25]: print(f"{reduction:0.2f}")
0.20

总的来说,我们已经将此数据集的内存占用量减少到其原始大小的 1/5。

有关 pandas.Categorical 的更多信息,请参见 分类数据,有关所有 pandas 数据类型的概述,请参见 数据类型

使用分块#

一些工作负载可以通过分块来实现,即将一个大问题分成许多小问题。例如,将单个 CSV 文件转换为 Parquet 文件,并对目录中的每个文件重复此操作。只要每个块都适合内存,您就可以处理比内存大得多的数据集。

注意

当您执行的操作需要零或最少的块间协调时,分块效果很好。对于更复杂的工作流程,您最好 使用其他库

假设我们磁盘上有一个更大的“逻辑数据集”,它是一个parquet文件的目录。目录中的每个文件代表整个数据集的不同年份。

In [26]: import pathlib

In [27]: N = 12

In [28]: starts = [f"20{i:>02d}-01-01" for i in range(N)]

In [29]: ends = [f"20{i:>02d}-12-13" for i in range(N)]

In [30]: pathlib.Path("data/timeseries").mkdir(exist_ok=True)

In [31]: for i, (start, end) in enumerate(zip(starts, ends)):
   ....:     ts = make_timeseries(start=start, end=end, freq="1min", seed=i)
   ....:     ts.to_parquet(f"data/timeseries/ts-{i:0>2d}.parquet")
   ....: 
data
└── timeseries
    ├── ts-00.parquet
    ├── ts-01.parquet
    ├── ts-02.parquet
    ├── ts-03.parquet
    ├── ts-04.parquet
    ├── ts-05.parquet
    ├── ts-06.parquet
    ├── ts-07.parquet
    ├── ts-08.parquet
    ├── ts-09.parquet
    ├── ts-10.parquet
    └── ts-11.parquet

现在我们将实现一个超出内存的 pandas.Series.value_counts()。此工作流程的峰值内存使用量是单个最大块,加上一个小的系列,用于存储到目前为止的唯一值计数。只要每个单独的文件都适合内存,这将适用于任意大小的数据集。

In [32]: %%time
   ....: files = pathlib.Path("data/timeseries/").glob("ts*.parquet")
   ....: counts = pd.Series(dtype=int)
   ....: for path in files:
   ....:     df = pd.read_parquet(path)
   ....:     counts = counts.add(df["name"].value_counts(), fill_value=0)
   ....: counts.astype(int)
   ....: 
CPU times: user 744 ms, sys: 28.7 ms, total: 773 ms
Wall time: 551 ms
Out[32]: 
name
Alice      1994645
Bob        1993692
Charlie    1994875
dtype: int64

一些读取器,比如 pandas.read_csv(),提供参数来控制读取单个文件时的 chunksize

手动分块对于不需要太复杂操作的工作流程来说是一个不错的选择。一些操作,比如 pandas.DataFrame.groupby(),很难按块进行。在这些情况下,您可能最好切换到另一个为您实现这些超出内存算法的库。

使用 Dask#

pandas 只是提供 DataFrame API 的一个库。由于其受欢迎程度,pandas 的 API 已成为其他库实现的一种标准。pandas 文档在 生态系统页面 中维护了一个实现 DataFrame API 的库列表。

例如,Dask,一个并行计算库,有 dask.dataframe,一个类似 pandas 的 API,用于并行处理大于内存的数据集。Dask 可以使用单台机器上的多个线程或进程,或者使用机器集群来并行处理数据。

我们将导入 dask.dataframe 并注意到 API 与 pandas 类似。我们可以使用 Dask 的 read_parquet 函数,但提供要读取的文件的通配符字符串。

In [33]: import dask.dataframe as dd

In [34]: ddf = dd.read_parquet("data/timeseries/ts*.parquet", engine="pyarrow")

In [35]: ddf
Out[35]: 
Dask DataFrame Structure:
                   id    name        x        y
npartitions=12                                 
                int64  string  float64  float64
                  ...     ...      ...      ...
...               ...     ...      ...      ...
                  ...     ...      ...      ...
                  ...     ...      ...      ...
Dask Name: read-parquet, 1 graph layer

检查 ddf 对象,我们看到了一些东西

  • 有一些熟悉的属性,比如 .columns.dtypes

  • 有一些熟悉的方法,例如 .groupby.sum 等。

  • 还有一些新的属性,例如 .npartitions.divisions

分区和分隔是 Dask 并行计算的方式。一个 **Dask** DataFrame 由许多 pandas pandas.DataFrame 组成。对 Dask DataFrame 的单个方法调用最终会进行许多 pandas 方法调用,而 Dask 知道如何协调所有操作以获得结果。

In [36]: ddf.columns
Out[36]: Index(['id', 'name', 'x', 'y'], dtype='object')

In [37]: ddf.dtypes
Out[37]: 
id                int64
name    string[pyarrow]
x               float64
y               float64
dtype: object

In [38]: ddf.npartitions
Out[38]: 12

一个主要区别是:dask.dataframe API 是 *延迟* 的。如果您查看上面的 repr,您会注意到值实际上并没有打印出来;只有列名和数据类型。这是因为 Dask 还没有实际读取数据。与其立即执行,不如构建一个 **任务图**。

In [39]: ddf
Out[39]: 
Dask DataFrame Structure:
                   id    name        x        y
npartitions=12                                 
                int64  string  float64  float64
                  ...     ...      ...      ...
...               ...     ...      ...      ...
                  ...     ...      ...      ...
                  ...     ...      ...      ...
Dask Name: read-parquet, 1 graph layer

In [40]: ddf["name"]
Out[40]: 
Dask Series Structure:
npartitions=12
    string
       ...
     ...  
       ...
       ...
Name: name, dtype: string
Dask Name: getitem, 2 graph layers

In [41]: ddf["name"].value_counts()
Out[41]: 
Dask Series Structure:
npartitions=1
    int64[pyarrow]
               ...
Name: count, dtype: int64[pyarrow]
Dask Name: value-counts-agg, 4 graph layers

这些调用中的每一个都是即时的,因为结果还没有被计算出来。我们只是在构建一个计算列表,以便在有人需要结果时执行。Dask 知道 pandas.Series.value_counts 的返回类型是一个 pandas pandas.Series,它具有特定的数据类型和名称。因此,Dask 版本返回一个具有相同数据类型和相同名称的 Dask Series。

要获得实际结果,您可以调用 .compute()

In [42]: %time ddf["name"].value_counts().compute()
CPU times: user 538 ms, sys: 45.8 ms, total: 584 ms
Wall time: 165 ms
Out[42]: 
name
Charlie    1994875
Alice      1994645
Bob        1993692
Name: count, dtype: int64[pyarrow]

此时,您将获得与 pandas 相同的结果,在本例中是一个具体的 pandas pandas.Series,其中包含每个 name 的计数。

调用 .compute 会导致整个任务图被执行。这包括读取数据、选择列以及执行 value_counts。执行尽可能地并行进行,Dask 试图保持整体内存占用量较小。您可以处理比内存大得多的数据集,只要每个分区(一个普通的 pandas pandas.DataFrame)适合内存即可。

默认情况下,dask.dataframe 操作使用线程池来并行执行操作。我们也可以连接到集群以在多台机器上分配工作。在这种情况下,我们将连接到一个由这台机器上的多个进程组成的本地“集群”。

>>> from dask.distributed import Client, LocalCluster

>>> cluster = LocalCluster()
>>> client = Client(cluster)
>>> client
<Client: 'tcp://127.0.0.1:53349' processes=4 threads=8, memory=17.18 GB>

创建此 client 后,所有 Dask 的计算都将在集群上进行(在本例中只是进程)。

Dask 实现了 pandas API 中最常用的部分。例如,我们可以进行熟悉的 groupby 聚合。

In [43]: %time ddf.groupby("name")[["x", "y"]].mean().compute().head()
CPU times: user 1.04 s, sys: 66.7 ms, total: 1.1 s
Wall time: 319 ms
Out[43]: 
                x         y
name                       
Alice   -0.000224 -0.000194
Bob     -0.000746  0.000349
Charlie  0.000604  0.000250

分组和聚合是在内存之外并行完成的。

当 Dask 知道数据集的 divisions 时,可以进行某些优化。当读取由 dask 编写的 parquet 数据集时,分区将自动知道。在本例中,由于我们手动创建了 parquet 文件,因此我们需要手动提供分区。

In [44]: N = 12

In [45]: starts = [f"20{i:>02d}-01-01" for i in range(N)]

In [46]: ends = [f"20{i:>02d}-12-13" for i in range(N)]

In [47]: divisions = tuple(pd.to_datetime(starts)) + (pd.Timestamp(ends[-1]),)

In [48]: ddf.divisions = divisions

In [49]: ddf
Out[49]: 
Dask DataFrame Structure:
                   id    name        x        y
npartitions=12                                 
2000-01-01      int64  string  float64  float64
2001-01-01        ...     ...      ...      ...
...               ...     ...      ...      ...
2011-01-01        ...     ...      ...      ...
2011-12-13        ...     ...      ...      ...
Dask Name: read-parquet, 1 graph layer

现在我们可以做一些事情,比如使用 .loc 进行快速随机访问。

In [50]: ddf.loc["2002-01-01 12:01":"2002-01-01 12:05"].compute()
Out[50]: 
                       id     name         x         y
timestamp                                             
2002-01-01 12:01:00   971      Bob -0.659481  0.556184
2002-01-01 12:02:00  1015  Charlie  0.120131 -0.609522
2002-01-01 12:03:00   991      Bob -0.357816  0.811362
2002-01-01 12:04:00   984    Alice -0.608760  0.034187
2002-01-01 12:05:00   998  Charlie  0.551662 -0.461972

Dask 知道只在第 3 个分区中查找以选择 2002 年的值。它不需要查看任何其他数据。

许多工作流程涉及大量数据,并以减少大小的方式处理它,使其适合内存。在本例中,我们将对数据进行重采样,使其频率为每天,并取平均值。一旦我们取了平均值,我们就知道结果将适合内存,因此我们可以安全地调用 compute 而不必担心内存不足。此时它只是一个普通的 pandas 对象。

In [51]: ddf[["x", "y"]].resample("1D").mean().cumsum().compute().plot()
Out[51]: <Axes: xlabel='timestamp'>
../_images/dask_resample.png

这些 Dask 示例都是使用一台机器上的多个进程完成的。Dask 可以 部署在集群上 以扩展到更大的数据集。

您可以在 https://examples.dask.org 找到更多 Dask 示例。