Skip to content
话题
Pandas
Pandas Apply: 使用自定义函数转换DataFrame

Pandas Apply: 使用自定义函数转换DataFrame和Series

Updated on

数据转换是每个数据分析工作流程的核心。虽然pandas为常见操作提供了数百个内置方法,但现实世界的数据往往需要标准函数无法处理的自定义逻辑。这造成了一个困境:如何高效地在数千或数百万行上应用复杂的用户定义转换?

apply()方法通过允许您在DataFrame列、行或Series元素上执行任何Python函数来解决这个问题。无论您需要清理不一致的字符串格式、实现条件业务逻辑,还是为机器学习模型设计特征,apply()都提供了处理pandas内置工具包之外操作的灵活性。然而,这种强大功能伴随着许多数据科学家忽视的性能权衡,导致代码运行速度比优化替代方案慢10-100倍。

本指南揭示了如何有效使用apply()、何时完全避免它,以及哪些向量化替代方案能在短时间内提供相同结果。

📚

理解pandas apply()基础

apply()方法有两种形式:DataFrame.apply()用于对整个列或行的操作,Series.apply()用于对单列的逐元素转换。

Series.apply()语法

处理单列(Series)时,apply()对每个元素执行函数:

import pandas as pd
import numpy as np
 
# 创建示例数据
df = pd.DataFrame({
    'price': [29.99, 45.50, 15.75, 89.00],
    'quantity': [2, 1, 5, 3],
    'product': ['widget', 'gadget', 'tool', 'device']
})
 
# 对Series应用函数
def add_tax(price):
    return price * 1.08
 
df['price_with_tax'] = df['price'].apply(add_tax)
print(df)

输出:

   price  quantity product  price_with_tax
0  29.99         2  widget        32.38920
1  45.50         1  gadget        49.14000
2  15.75         5    tool        17.01000
3  89.00         3  device        96.12000

带axis参数的DataFrame.apply()

axis参数控制apply()是处理列还是行:

  • axis=0(默认):对每列应用函数(垂直操作)
  • axis=1:对每行应用函数(水平操作)
# axis=0: 处理每列
def get_range(column):
    return column.max() - column.min()
 
ranges = df[['price', 'quantity']].apply(get_range, axis=0)
print(ranges)
# 输出:
# price       73.25
# quantity     4.00
# axis=1: 处理每行
def calculate_total(row):
    return row['price'] * row['quantity']
 
df['total'] = df.apply(calculate_total, axis=1)
print(df)

输出:

   price  quantity product   total
0  29.99         2  widget   59.98
1  45.50         1  gadget   45.50
2  15.75         5    tool   78.75
3  89.00         3  device  267.00

Lambda函数 vs 命名函数

Lambda函数提供简洁的内联转换,而命名函数为复杂逻辑提供更好的可读性。

Lambda函数

适用于简单的单行操作:

# 将产品名称转换为大写
df['product_upper'] = df['product'].apply(lambda x: x.upper())
 
# 计算折扣价格
df['discounted'] = df['price'].apply(lambda x: x * 0.9 if x > 30 else x)
 
# 组合多列
df['description'] = df.apply(
    lambda row: f"{row['quantity']}x {row['product']} @ ${row['price']}",
    axis=1
)
print(df['description'])

输出:

0    2x widget @ $29.99
1    1x gadget @ $45.5
2       5x tool @ $15.75
3    3x device @ $89.0

命名函数

对于多步骤转换和可重用逻辑至关重要:

def categorize_price(price):
    """按价格等级分类产品"""
    if price < 20:
        return 'Budget'
    elif price < 50:
        return 'Standard'
    else:
        return 'Premium'
 
df['tier'] = df['price'].apply(categorize_price)
 
def validate_order(row):
    """对订单数据应用业务规则"""
    if row['quantity'] > 10:
        return 'Bulk Order - Review Required'
    elif row['price'] * row['quantity'] > 200:
        return 'High Value - Priority Shipping'
    else:
        return 'Standard Processing'
 
df['order_status'] = df.apply(validate_order, axis=1)
print(df[['product', 'tier', 'order_status']])

理解result_type参数

result_type参数(仅DataFrame.apply())控制当函数返回多个值时apply()如何格式化输出:

result_type行为用例
None(默认)自动推断输出格式通用目的
'expand'将类列表结果拆分为单独的列多个返回值
'reduce'尝试返回Series而不是DataFrame聚合操作
'broadcast'返回原始形状的DataFrame逐元素转换
def get_stats(column):
    """返回多个统计数据"""
    return [column.mean(), column.std(), column.max()]
 
# 默认行为(推断结构)
stats = df[['price', 'quantity']].apply(get_stats)
print(stats)
 
# 扩展为单独的行
stats_expanded = df[['price', 'quantity']].apply(get_stats, result_type='expand')
print(stats_expanded)

方法比较: apply vs map vs applymap vs transform

了解何时使用每种方法可以防止性能瓶颈:

方法操作对象输入函数接收输出类型最适合
Series.apply()单列单独的每个元素Series一列上的逐元素转换
Series.map()单列每个元素(也接受dict/Series)Series替换/查找操作
DataFrame.apply()整个DataFrame完整列(axis=0)或行(axis=1)Series/DataFrame按列/行操作
DataFrame.applymap()(已弃用)整个DataFrame单独的每个元素DataFrame所有列的逐元素(改用map())
DataFrame.transform()DataFrame/GroupBy完整列/组与输入相同形状保持DataFrame形状的操作
# Series.apply() - 逐元素自定义函数
df['price_rounded'] = df['price'].apply(lambda x: round(x, 0))
 
# Series.map() - 替换映射
tier_map = {'Budget': 1, 'Standard': 2, 'Premium': 3}
df['tier_code'] = df['tier'].map(tier_map)
 
# DataFrame.apply() - 按列聚合
totals = df[['price', 'quantity']].apply(sum, axis=0)
 
# DataFrame.transform() - 使用groupby保持形状
df['price_norm'] = df.groupby('tier')['price'].transform(
    lambda x: (x - x.mean()) / x.std()
)

性能: 为什么apply()很慢

apply()方法在Python循环中处理数据,绕过了pandas优化的C/Cython实现。对于大型数据集,这会造成严重的性能损失。

性能比较

import time
 
# 创建大型数据集
large_df = pd.DataFrame({
    'values': np.random.randn(100000)
})
 
# 方法1: 使用lambda的apply()
start = time.time()
result1 = large_df['values'].apply(lambda x: x ** 2)
apply_time = time.time() - start
 
# 方法2: 向量化操作
start = time.time()
result2 = large_df['values'] ** 2
vectorized_time = time.time() - start
 
print(f"apply()时间: {apply_time:.4f}s")
print(f"向量化时间: {vectorized_time:.4f}s")
print(f"加速: {apply_time/vectorized_time:.1f}x")

典型输出:

apply()时间: 0.0847s
向量化时间: 0.0012s
加速: 70.6x

何时避免apply()

在以下情况下改用向量化操作:

# 不要: 对算术运算使用apply
df['total'] = df.apply(lambda row: row['price'] * row['quantity'], axis=1)
 
# 要: 使用向量化乘法
df['total'] = df['price'] * df['quantity']
 
# 不要: 对字符串方法使用apply
df['upper'] = df['product'].apply(lambda x: x.upper())
 
# 要: 使用内置字符串访问器
df['upper'] = df['product'].str.upper()
 
# 不要: 对条件使用apply
df['expensive'] = df['price'].apply(lambda x: 'Yes' if x > 50 else 'No')
 
# 要: 使用np.where或直接比较
df['expensive'] = np.where(df['price'] > 50, 'Yes', 'No')

向量化替代方案

操作慢(apply)快(向量化)
算术运算.apply(lambda x: x * 2)* 2
条件语句.apply(lambda x: 'A' if x > 10 else 'B')np.where(condition, 'A', 'B')
字符串操作.apply(lambda x: x.lower()).str.lower()
日期操作.apply(lambda x: x.year).dt.year
多个条件.apply(complex_if_elif_else)np.select([cond1, cond2], [val1, val2], default)
# 使用np.select处理复杂条件
conditions = [
    df['price'] < 20,
    (df['price'] >= 20) & (df['price'] < 50),
    df['price'] >= 50
]
choices = ['Budget', 'Standard', 'Premium']
df['tier_fast'] = np.select(conditions, choices, default='Unknown')

pandas apply()的常见用例

尽管有性能限制,apply()对于缺乏向量化等效项的操作仍然至关重要。

1. 使用复杂逻辑清理字符串

# 混乱数据示例
messy_df = pd.DataFrame({
    'email': ['John.Doe@COMPANY.com', ' jane_smith@test.co.uk ', 'ADMIN@Site.NET']
})
 
def clean_email(email):
    """标准化电子邮件格式"""
    email = email.strip().lower()
    # 删除额外的点
    username, domain = email.split('@')
    username = username.replace('..', '.')
    return f"{username}@{domain}"
 
messy_df['email_clean'] = messy_df['email'].apply(clean_email)
print(messy_df)

2. 使用外部数据的条件逻辑

# 基于外部查找的价格调整
discount_rules = {
    'widget': 0.10,
    'gadget': 0.15,
    'device': 0.05
}
 
def apply_discount(row):
    """应用产品特定折扣与最小逻辑"""
    base_price = row['price']
    discount = discount_rules.get(row['product'], 0)
    discounted = base_price * (1 - discount)
    # 最低价格
    return max(discounted, 9.99)
 
df['final_price'] = df.apply(apply_discount, axis=1)

3. ML的特征工程

# 创建交互特征
def create_features(row):
    """生成用于预测建模的特征"""
    features = {}
    features['price_per_unit'] = row['price'] / row['quantity']
    features['is_bulk'] = 1 if row['quantity'] > 5 else 0
    features['revenue_tier'] = pd.cut(
        [row['price'] * row['quantity']],
        bins=[0, 50, 150, 300],
        labels=['Low', 'Medium', 'High']
    )[0]
    return pd.Series(features)
 
feature_df = df.apply(create_features, axis=1)
df = pd.concat([df, feature_df], axis=1)

4. API调用和外部查找

def geocode_address(address):
    """调用外部API进行地理编码"""
    # 实际API调用的占位符
    # 实际上: requests.get(f"api.geocode.com?q={address}")
    return {'lat': 40.7128, 'lon': -74.0060}
 
# 使用速率限制应用
import time
 
def safe_geocode(address):
    time.sleep(0.1)  # 速率限制
    return geocode_address(address)
 
# df['coords'] = df['address'].apply(safe_geocode)

高级技巧

传递额外参数

函数可以通过argskwargs接收额外参数:

def apply_markup(price, markup_pct, min_profit):
    """添加带有最小利润的百分比加价"""
    markup_amount = price * (markup_pct / 100)
    return price + max(markup_amount, min_profit)
 
# 传递额外参数
df['retail_price'] = df['price'].apply(
    apply_markup,
    args=(25,),  # markup_pct=25
    min_profit=5.0  # 关键字参数
)

将apply()与groupby()一起使用

结合groupby和apply进行复杂聚合:

# 组级转换
def normalize_group(group):
    """组内Z分数标准化"""
    return (group - group.mean()) / group.std()
 
df['price_normalized'] = df.groupby('tier')['price'].apply(normalize_group)
 
# 自定义组聚合
def group_summary(group):
    """为组创建汇总统计"""
    return pd.Series({
        'total_revenue': (group['price'] * group['quantity']).sum(),
        'avg_price': group['price'].mean(),
        'item_count': len(group)
    })
 
tier_summary = df.groupby('tier').apply(group_summary)
print(tier_summary)

使用tqdm的进度条

监控长时间运行的apply操作:

from tqdm import tqdm
tqdm.pandas()
 
# 使用progress_apply代替apply
# df['result'] = df['column'].progress_apply(slow_function)

优雅地处理错误

def safe_transform(value):
    """应用带有错误处理的转换"""
    try:
        return complex_operation(value)
    except Exception as e:
        return None  # 或np.nan,或默认值
 
df['result'] = df['column'].apply(safe_transform)

常见错误和调试

错误1: 在可以向量化时使用axis=1

# 错误: 慢速逐行操作
df['total'] = df.apply(lambda row: row['price'] * row['quantity'], axis=1)
 
# 正确: 快速向量化操作
df['total'] = df['price'] * df['quantity']

错误2: 忘记return语句

# 错误: 没有返回值
def broken_function(x):
    x * 2  # 缺少return!
 
# 正确: 明确的return
def working_function(x):
    return x * 2

错误3: 在apply()内修改DataFrame

# 错误: 在迭代期间尝试修改DataFrame
def bad_function(row):
    df.loc[row.name, 'new_col'] = row['price'] * 2  # 不安全!
    return row['price']
 
# 正确: 返回值并在之后分配
def good_function(row):
    return row['price'] * 2
 
df['new_col'] = df.apply(good_function, axis=1)

调试apply()函数

# 首先在单行/元素上测试函数
test_row = df.iloc[0]
result = your_function(test_row)
print(f"测试结果: {result}")
 
# 在函数内添加调试打印
def debug_function(row):
    print(f"处理行{row.name}: {row.to_dict()}")
    result = complex_logic(row)
    print(f"结果: {result}")
    return result
 
# 在小子集上测试
df.head(3).apply(debug_function, axis=1)

使用PyGWalker可视化结果

使用apply()转换DataFrame后,可视化结果有助于验证转换并发现模式。PyGWalker直接在Jupyter笔记本中将pandas DataFrame转换为类似Tableau的交互式界面。

import pygwalker as pyg
 
# 交互式可视化转换后的数据
walker = pyg.walk(df)

PyGWalker支持对已应用转换的拖放分析:

  • 使用并排图表比较原始列与转换后的列
  • 通过过滤和分组验证条件逻辑
  • 通过分布图发现计算字段中的异常值
  • 导出可视化用于文档

github.com/Kanaries/pygwalker (opens in a new tab)探索PyGWalker,将数据探索从静态图转变为交互式分析。

FAQ

如何一次对多列应用函数?

使用axis=0的DataFrame.apply()处理每个选定列,或直接对多列使用向量化操作:

# 应用于多列
df[['price', 'quantity']] = df[['price', 'quantity']].apply(lambda x: x * 1.1)
 
# 或向量化(更快)
df[['price', 'quantity']] = df[['price', 'quantity']] * 1.1

apply()和map()有什么区别?

apply()逐元素执行任何可调用函数,而map()针对通过字典、Series或函数进行替换进行了优化。对查找和替换使用map()(更快),对需要复杂逻辑的自定义转换使用apply()

为什么我的apply()函数这么慢?

apply()在Python循环中执行而不是编译的C代码,使其比向量化操作慢10-100倍。在使用apply()之前,始终检查pandas是否有内置方法(.str.dt、算术运算符)或np.where()/np.select()是否可以替代您的逻辑。

我可以使用访问多列的lambda函数与apply()吗?

可以,使用axis=1处理行:

df['result'] = df.apply(lambda row: row['col1'] + row['col2'] * 0.5, axis=1)

但是,对于大型DataFrame这很慢。优先使用: df['result'] = df['col1'] + df['col2'] * 0.5

如何从单个apply()调用返回多列?

返回带有命名索引的pd.Series,pandas会自动将其扩展为列:

def multi_output(row):
    return pd.Series({
        'sum': row['a'] + row['b'],
        'product': row['a'] * row['b']
    })
 
df[['sum', 'product']] = df.apply(multi_output, axis=1)

结论

pandas apply()方法为超出pandas内置操作的自定义转换提供了重要的灵活性。虽然其Python循环实现会造成性能权衡,但理解何时使用apply()与向量化替代方案可以区分高效的数据工作流程和在生产数据集上停滞的工作流程。

关键要点: 仅当向量化方法、字符串访问器或NumPy函数无法实现您的目标时才使用apply()。在应用于整个DataFrame之前,先在单行上测试函数。对于大型数据集上的逐行操作,研究Cython、Numba JIT编译或切换到polars以实现并行执行。

掌握apply()的强大功能和局限性,您的数据转换工具包将能够处理pandas抛出的任何挑战。

📚