用户定义函数(UDF)#

在 pandas 中,用户定义函数(UDF)提供了一种扩展库功能的方式,允许用户将自定义计算应用于其数据。虽然 pandas 提供了许多内置的数据操作函数,但当内置方法不足以满足需求时,UDF 提供了灵活性。这些函数可以在不同级别应用:逐元素、逐行、逐列或逐组,并且根据所使用的方法,其行为有所不同。

这是一个应用于 Series 的 UDF 的简单示例

In [1]: s = pd.Series([1, 2, 3])

# Simple UDF that adds 1 to a value
In [2]: def add_one(x):
   ...:     return x + 1
   ...: 

# Apply the function element-wise using .map
In [3]: s.map(add_one)
Out[3]: 
0    2
1    3
2    4
dtype: int64

为什么不使用用户定义函数#

尽管 UDF 提供了灵活性,但它们也带来了显著的缺点,主要与性能和行为有关。在使用 UDF 时,pandas 必须对结果进行推断,而这种推断可能不正确。此外,与矢量化操作不同,UDF 速度较慢,因为 pandas 无法优化它们的计算,从而导致处理效率低下。

注意

总的来说,大多数任务都可以并且应该使用 pandas 的内置方法或矢量化操作来完成。

尽管存在缺点,但在以下情况下 UDF 仍可能很有用:

  • 需要自定义计算:实现 pandas 内置方法无法处理的复杂逻辑或领域特定计算。

  • 扩展 pandas 的功能:应用 pandas 中未提供的外部库或专用算法。

  • 处理复杂的分组操作:对标准方法不支持的分组数据执行操作。

例如:

from sklearn.linear_model import LinearRegression

# Sample data
df = pd.DataFrame({
    'group': ['A', 'A', 'A', 'B', 'B', 'B'],
    'x': [1, 2, 3, 1, 2, 3],
    'y': [2, 4, 6, 1, 2, 1.5]
})

# Function to fit a model to each group
def fit_model(group):
    model = LinearRegression()
    model.fit(group[['x']], group['y'])
    group['y_pred'] = model.predict(group[['x']])
    return group

result = df.groupby('group').apply(fit_model)

支持用户定义函数的的方法#

用户定义函数可以应用于各种 pandas 方法

方法

函数输入

函数输出

描述

Series.map() 和 DataFrame.map()

标量

标量

将函数应用于每个元素

Series.apply() 和 DataFrame.apply() (axis=0)

列(Series)

列(Series)

将函数应用于每一列

Series.apply() 和 DataFrame.apply() (axis=1)

行(Series)

行(Series)

将函数应用于每一行

Series.pipe() 和 DataFrame.pipe()

Series 或 DataFrame

Series 或 DataFrame

链接函数以应用于 Series 或 Dataframe

Series.filter() 和 DataFrame.filter()

Series 或 DataFrame

布尔值

仅在 groupby 中支持 UDF。为每个组调用函数,如果函数返回 False,则从结果中删除该组。

Series.agg() 和 DataFrame.agg()

Series 或 DataFrame

标量或 Series

聚合并汇总值,例如,sum 或自定义缩减器。

Series.transform() 和 DataFrame.transform() (axis=0)

列(Series)

列(Series)

apply() (axis=0) 相同,但如果函数更改数据形状,则会引发异常。

Series.transform() 和 DataFrame.transform() (axis=1)

行(Series)

行(Series)

apply() (axis=1) 相同,但如果函数更改数据形状,则会引发异常。

在 pandas 中应用 UDF 时,选择适合特定任务的方法至关重要。每种方法都有其优点,并且是为不同的用例设计的。理解每种方法的目的和行为将帮助您做出明智的决定,确保代码更高效、更易于维护。

注意

其中一些方法也可以应用于 groupby、resample 和各种窗口对象。有关详细信息,请参阅 Group by: split-apply-combineresample()rolling()expanding()ewm()

Series.map()DataFrame.map()#

map() 方法专门用于应用逐元素的 UDF。这意味着函数将针对 SeriesDataFrame 中的每个元素调用,并将单个值或单元格作为函数参数。

In [4]: temperature_celsius = pd.DataFrame({
   ...:     "NYC": [14, 21, 23],
   ...:     "Los Angeles": [22, 28, 31],
   ...: })
   ...: 

In [5]: def to_fahrenheit(value):
   ...:     return value * (9 / 5) + 32
   ...: 

In [6]: temperature_celsius.map(to_fahrenheit)
Out[6]: 
    NYC  Los Angeles
0  57.2         71.6
1  69.8         82.4
2  73.4         87.8

在此示例中,函数 to_fahrenheit 将被调用 6 次,即 DataFrame 中每个值调用一次。每次调用的结果将返回到结果 DataFrame 的相应单元格中。

总的来说,map 会很慢,因为它不利用矢量化。相反,每次值都需要一个 Python 函数调用,如果处理中等或大型数据,这将大大降低速度。

何时使用:对 DataFrames 或 Series 应用逐元素 UDF 时,请使用 map()

Series.apply()DataFrame.apply()#

apply() 方法允许您为整列或整行应用 UDF。这与 map() 的不同之处在于,函数是为每一列(或每一行)调用的,而不是为每个单独的元素调用的。

In [7]: temperature_celsius = pd.DataFrame({
   ...:     "NYC": [14, 21, 23],
   ...:     "Los Angeles": [22, 28, 31],
   ...: })
   ...: 

In [8]: def to_fahrenheit(column):
   ...:     return column * (9 / 5) + 32
   ...: 

In [9]: temperature_celsius.apply(to_fahrenheit)
Out[9]: 
    NYC  Los Angeles
0  57.2         71.6
1  69.8         82.4
2  73.4         87.8

在此示例中,to_fahrenheit 将只调用两次,而不是像使用 map() 那样调用 6 次。这会比使用 map() 快,因为每列的操作都是矢量化的,并且大大减少了在 Python 中迭代数据和调用 Python 函数的开销。

在某些情况下,函数可能需要所有数据才能计算结果。因此,需要 apply(),因为使用 map() 时,函数只能一次访问一个元素。

In [10]: temperature = pd.DataFrame({
   ....:     "NYC": [14, 21, 23],
   ....:     "Los Angeles": [22, 28, 31],
   ....: })
   ....: 

In [11]: def normalize(column):
   ....:     return column / column.mean()
   ....: 

In [12]: temperature.apply(normalize)
Out[12]: 
        NYC  Los Angeles
0  0.724138     0.814815
1  1.086207     1.037037
2  1.189655     1.148148

在此示例中,normalize 函数需要计算整列的平均值才能将其除以每个元素。因此,我们不能为每个元素调用函数,而是需要函数接收整列。

通过指定 axis=1apply() 也可以按行执行函数。

In [13]: temperature = pd.DataFrame({
   ....:     "NYC": [14, 21, 23],
   ....:     "Los Angeles": [22, 28, 31],
   ....: })
   ....: 

In [14]: def hotter(row):
   ....:     return row["Los Angeles"] - row["NYC"]
   ....: 

In [15]: temperature.apply(hotter, axis=1)
Out[15]: 
0    8
1    7
2    8
dtype: int64

在此示例中,函数 hotter 将被调用 3 次,即每行调用一次。每次调用都将接收整行作为参数,从而允许执行需要行中多个值的计算。

apply 也可用于 SeriesGroupBy.apply()DataFrameGroupBy.apply()Rolling.apply()Expanding.apply()Resampler.apply()。您可以在 Flexible apply 中阅读有关 groupby 操作中 apply 的更多信息。

何时使用:当没有可用的替代矢量化方法或 UDF 方法时,apply() 是合适的,但请考虑尽可能使用矢量化操作来优化性能。

Series.pipe()DataFrame.pipe()#

pipe 方法与 mapapply 类似,但函数接收的是调用它的整个 SeriesDataFrame

In [16]: temperature = pd.DataFrame({
   ....:     "NYC": [14, 21, 23],
   ....:     "Los Angeles": [22, 28, 31],
   ....: })
   ....: 

In [17]: def normalize(df):
   ....:     return df / df.mean().mean()
   ....: 

In [18]: temperature.pipe(normalize)
Out[18]: 
        NYC  Los Angeles
0  0.604317     0.949640
1  0.906475     1.208633
2  0.992806     1.338129

这等同于将 DataFrame 作为参数调用 normalize 函数。

In [19]: normalize(temperature)
Out[19]: 
        NYC  Los Angeles
0  0.604317     0.949640
1  0.906475     1.208633
2  0.992806     1.338129

使用 pipe 的主要优势是可读性。它允许进行方法链式调用,并在调用多个函数时使代码更清晰。

In [20]: temperature_celsius = pd.DataFrame({
   ....:     "NYC": [14, 21, 23],
   ....:     "Los Angeles": [22, 28, 31],
   ....: })
   ....: 

In [21]: def multiply_by_9(value):
   ....:     return value * 9
   ....: 

In [22]: def divide_by_5(value):
   ....:     return value / 5
   ....: 

In [23]: def add_32(value):
   ....:     return value + 32
   ....: 

# Without `pipe`:
In [24]: fahrenheit = add_32(divide_by_5(multiply_by_9(temperature_celsius)))

# With `pipe`:
In [25]: fahrenheit = (temperature_celsius.pipe(multiply_by_9)
   ....:                                  .pipe(divide_by_5)
   ....:                                  .pipe(add_32))
   ....: 

pipe 也可用于 SeriesGroupBy.pipe()DataFrameGroupBy.pipe()Resampler.pipe()。您可以在 Piping function calls 中阅读有关 groupby 操作中 pipe 的更多信息。

何时使用:当您需要创建操作管道并希望代码保持可读性和可维护性时,请使用 pipe()

Series.filter()DataFrame.filter()#

filter 方法用于选择满足特定条件的行子集。Series.filter()DataFrame.filter() 不支持用户定义函数,但 SeriesGroupBy.filter()DataFrameGroupBy.filter() 支持。您可以在 Filtration 中阅读有关 groupby 操作中 filter 的更多信息。

Series.agg()DataFrame.agg()#

agg 方法用于将一组数据点聚合为一个。pandas 中已经实现了最常见的聚合函数,如 minmaxmeansum 等。agg 允许实现其他自定义聚合函数。

In [26]: temperature = pd.DataFrame({
   ....:     "NYC": [14, 21, 23],
   ....:     "Los Angeles": [22, 28, 31],
   ....: })
   ....: 

In [27]: def highest_jump(column):
   ....:     return column.pct_change().max()
   ....: 

In [28]: temperature.agg(highest_jump)
Out[28]: 
NYC            0.500000
Los Angeles    0.272727
dtype: float64

何时使用:对于执行自定义聚合,其中操作为每个输入返回一个标量值,请使用 agg()

Series.transform()DataFrame.transform()#

transform` 方法类似于聚合,但区别在于结果会广播到原始数据。

In [29]: temperature = pd.DataFrame({
   ....:     "NYC": [14, 21, 23],
   ....:     "Los Angeles": [22, 28, 31]},
   ....:     index=pd.date_range("2000-01-01", "2000-01-03"))
   ....: 

In [30]: def warm_up_all_days(column):
   ....:     return pd.Series(column.max(), index=column.index)
   ....: 

In [31]: temperature.transform(warm_up_all_days)
Out[31]: 
            NYC  Los Angeles
2000-01-01   23           31
2000-01-02   23           31
2000-01-03   23           31

在此示例中,warm_up_all_days 函数像聚合一样计算 max,但它不是仅返回最大值,而是返回一个与原始数据形状相同的 DataFrame,其中每天的值被替换为该城市的最大温度。

transform 也可用于 SeriesGroupBy.transform()DataFrameGroupBy.transform()Resampler.transform(),在这些场景下更为常见。您可以在 Transformation 中阅读有关 groupby 操作中 transform 的更多信息。

何时使用:当您需要执行一个将在 DataFrame 的原始结构中返回的聚合时。

性能#

虽然 UDF 提供了灵活性,但通常不鼓励使用它们,因为它们会引入性能问题,尤其是在纯 Python 中编写时。为了提高效率,对于常见操作,请考虑使用内置的 NumPypandas 函数,而不是 UDF。

注意

如果性能至关重要,请在考虑使用 UDF 之前,先探索**矢量化操作**。

矢量化操作#

下面是使用 UDF 与使用矢量化操作的比较

# User-defined function
def calc_ratio(row):
    return 100 * (row["one"] / row["two"])

df["new_col"] = df.apply(calc_ratio, axis=1)

# Vectorized Operation
df["new_col2"] = 100 * (df["one"] / df["two"])

测量每个操作所需的时间

User-defined function:  5.6435 secs
Vectorized:             0.0043 secs

pandas 中的矢量化操作比使用 DataFrame.apply() 和 UDF 显著更快,因为它们通过 NumPy 利用高度优化的 C 函数一次处理整个数组。这种方法避免了在 Python 中循环遍历行并为每一行进行单独函数调用的开销,这既慢又不高效。此外,NumPy 数组受益于内存效率和 CPU 级优化,因此在可能的情况下,矢量化操作是首选。

通过 UDF 提高性能#

在必须使用 UDF 的场景中,仍然有方法可以缓解其性能上的不足。一种方法是使用 **Numba**,这是一个即时(JIT)编译器,可以通过在运行时将 Python 函数编译成优化的机器代码来显著加速数值 Python 代码。

通过使用 @numba.jit 装饰您的 UDF,您可以获得接近矢量化操作的性能,尤其是在计算密集型任务中。

注意

您还可以参考关于 Enhancing performance 的用户指南,以获取更详细的 **Numba** 使用指南。

使用 DataFrame.pipe() 实现可组合逻辑#

另一种有用的模式,可以提高可读性和可组合性,尤其是在混合矢量化逻辑和 UDF 时,是使用 DataFrame.pipe() 方法。

DataFrame.pipe() 本身不直接提高性能,但它通过将整个对象传递给函数,实现了更清晰的方法链式调用。这在链接自定义转换时特别有用。

def add_ratio_column(df):
    df["ratio"] = 100 * (df["one"] / df["two"])
    return df

df = (
    df
    .query("one > 0")
    .pipe(add_ratio_column)
    .dropna()
)

这在功能上等同于调用 add_ratio_column(df),但保持了代码的清晰和可组合性。传递给 DataFrame.pipe() 的函数可以使用矢量化操作、逐行 UDF 或任何其他逻辑;DataFrame.pipe() 对此是无关紧要的。

注意

虽然 DataFrame.pipe() 本身不提高性能,但它促进了清晰、模块化的设计,并允许将矢量化和基于 UDF 的逻辑组合到方法链中。