Pandas

技术栈
工具链
python数据分析DataFrame数据清洗表格处理SQL

概览

Pandas 技术栈概览

Pandas 是 Python 数据分析的核心库,由 Wes McKinney 于 2008 年创建。它提供两种主要数据结构:Series(一维)和 DataFrame(二维表格),并围绕它们构建了丰富的数据读取、清洗、转换、聚合、可视化和导出功能。

核心特性:

  • 📊 DataFrame — 类似 Excel 表格或 SQL 表的二维数据结构,支持列名索引
  • 🧹 数据清洗 — 缺失值处理、重复删除、类型转换、字符串操作
  • 🔀 数据合并 — merge/join/concat,类似 SQL JOIN 的操作
  • 📊 分组聚合 — groupby → agg/transform/filter,秒杀 Excel 透视表
  • 📈 时间序列 — 日期范围生成、重采样、滚动窗口、时区处理
  • 📁 多格式读写 — CSV、Excel、JSON、Parquet、SQL、HDF5 等 20+ 种格式
  • 🎨 内置可视化 — 基于 Matplotlib 的快速绘图 API

适用场景: 数据清洗与预处理、探索性数据分析(EDA)、特征工程、报表生成、金融数据分析、时间序列分析。

安装

1. 环境准备

  • 操作系统: Windows 10+ / macOS 11+ / Linux
  • Python 版本: Python 3.9 - 3.12(推荐 3.10+)
  • 依赖项: pip、NumPy(Pandas 自动安装)

2. 安装命令

# === pip 安装 ===
pip install pandas

# === 安装完整数据分析栈 ===
pip install pandas numpy matplotlib seaborn jupyter openpyxl

# === 验证安装 ===
python -c "import pandas as pd; print(f'Pandas {pd.__version__}'); print(pd.DataFrame({'A':[1,2,3]}))"

# === 安装可选依赖(增强性能)===
pip install pyarrow       # Parquet/Apache Arrow 支持,读写更快
pip install fastparquet    # 替代 pyarrow 的 Parquet 库
pip install tables         # HDF5 支持
pip install sqlalchemy     # SQL 数据库读写
pip install xlsxwriter     # Excel 写入(带格式)
pip install lxml           # read_html() 支持

3. 常见安装问题

问题 1:pip 安装速度慢

pip install pandas -i https://pypi.tuna.tsinghua.edu.cn/simple

问题 2:导入时 DLL 加载失败(Windows)

# 通常需要安装 Visual C++ Redistributable
# 从微软官网下载 VC_redist.x64.exe 安装

问题 3:内存不足(处理大文件)

# Pandas 默认加载全部数据到内存
# 对于超大文件,使用分块读取:
for chunk in pd.read_csv("large_file.csv", chunksize=100000):
    process(chunk)

# 或使用 Dask(类 Pandas API 但惰性计算 + 分布式)
pip install dask[dataframe]

问题 4:Excel 读写报错

# 需要安装 openpyxl(.xlsx)或 xlrd(旧 .xls)
pip install openpyxl

示例

Pandas DataFrame 核心操作 —— 数据读写、清洗、聚合

目标

  • 掌握 DataFrame 的创建、读取、写入
  • 掌握数据筛选、排序、分组聚合
  • 理解链式操作与管道风格

完整代码

import pandas as pd
import numpy as np

# ============================================================
# 1. DataFrame 创建
# ============================================================

# 从字典创建
df = pd.DataFrame({
    "name": ["Alice", "Bob", "Charlie", "Diana", "Eve"],
    "age": [25, 32, 28, 35, 22],
    "city": ["Beijing", "Shanghai", "Beijing", "Shenzhen", "Shanghai"],
    "salary": [15000, 22000, 18000, 25000, 12000],
    "department": ["Engineering", "Sales", "Engineering", "HR", "Sales"],
})

print(f"原始数据:\n{df}\n")

# ============================================================
# 2. 基本属性与信息
# ============================================================
print(f"形状: {df.shape}")
print(f"列名: {df.columns.tolist()}")
print(f"数据类型:\n{df.dtypes}")
print(f"描述统计:\n{df.describe()}")
print(f"\n缺失值:\n{df.isnull().sum()}")

# ============================================================
# 3. 数据筛选
# ============================================================

# 按条件筛选
engineers = df[df["department"] == "Engineering"]
print(f"Engineering 部门:\n{engineers}\n")

# 多条件(用 & 不是 and,用 | 不是 or)
high_paid = df[(df["salary"] > 15000) & (df["city"] == "Beijing")]
print(f"北京高薪员工:\n{high_paid}\n")

# query() 方法(更易读)
result = df.query("salary > 15000 and city == 'Beijing'")
print(f"query 方法:\n{result}\n")

# 按位置:iloc
print(f"前两行:\n{df.iloc[:2]}")
print(f"第1,3行和第0,2列:\n{df.iloc[[0,2], [0,2]]}\n")

# 按标签:loc
print(f"name 和 salary 列:\n{df.loc[:, ["name", "salary"]]}")

# ============================================================
# 4. 数据排序
# ============================================================
sorted_df = df.sort_values("salary", ascending=False)
print(f"\n按工资降序:\n{sorted_df}")

# 多列排序
sorted_df = df.sort_values(["department", "salary"], ascending=[True, False])
print(f"\n按部门+工资排序:\n{sorted_df}")

# ============================================================
# 5. 分组聚合(核心!)
# ============================================================

# 单列聚合
print(f"\n按部门平均工资:\n{df.groupby("department")["salary"].mean()}")

# 多列多聚合
stats = df.groupby("city").agg({
    "salary": ["mean", "min", "max", "count"],
    "age": "mean",
})
print(f"\n按城市多聚合:\n{stats}")

# 自定义聚合
def salary_range(x):
    return x.max() - x.min()

custom_agg = df.groupby("department").agg(
    avg_salary=("salary", "mean"),
    total_salary=("salary", "sum"),
    salary_spread=("salary", salary_range),
    headcount=("name", "count"),
)
print(f"\n自定义聚合:\n{custom_agg}")

# transform —— 保持原形状
df["dept_avg"] = df.groupby("department")["salary"].transform("mean")
df["salary_pct"] = df["salary"] / df["dept_avg"] * 100
print(f"\n带部门均值:\n{df}")

# ============================================================
# 6. 链式操作(推荐风格)
# ============================================================

result = (
    df
    .query("age >= 25")
    .groupby("city")
    .agg(avg_salary=("salary", "mean"), count=("name", "count"))
    .query("count >= 1")
    .sort_values("avg_salary", ascending=False)
    .reset_index()
)
print(f"\n链式操作结果:\n{result}")

# ============================================================
# 7. 表连接(类似 SQL JOIN)
# ============================================================

departments = pd.DataFrame({
    "department": ["Engineering", "Sales", "HR", "Finance"],
    "manager": ["张工", "李经理", "王主管", "赵总监"],
    "floor": [3, 5, 4, 6],
})

# INNER JOIN
merged = pd.merge(df, departments, on="department", how="inner")
print(f"\nINNER JOIN:\n{merged}")

# LEFT JOIN
merged = pd.merge(df, departments, on="department", how="left")
print(f"\nLEFT JOIN:\n{merged}")

# ============================================================
# 8. 数据透视表
# ============================================================
pivot = df.pivot_table(
    values="salary",
    index="department",
    columns="city",
    aggfunc="mean",
    fill_value=0,
)
print(f"\n数据透视表:\n{pivot}")

关键要点

操作 方法
筛选 df[condition], df.query(), df.loc[], df.iloc[]
排序 df.sort_values()
分组 df.groupby().agg()
窗口 df["col"].transform()
合并 pd.merge(), pd.concat()
透视 df.pivot_table()
链式 .pipe() 或在括号中连续调用

Pandas 时间序列与文件 I/O 实战

目标

  • 掌握时间序列数据的处理:重采样、滚动窗口、shift
  • 掌握多格式文件读写与性能优化
  • 实战:分析电商订单数据

完整代码

1. 时间序列处理

import pandas as pd
import numpy as np
from datetime import datetime, timedelta

# ============================================================
# 日期范围生成
# ============================================================
# 日频
dates = pd.date_range("2024-01-01", "2024-01-31", freq="D")
print(f"日频: {len(dates)} 天, 首日: {dates[0]}, 末日: {dates[-1]}")

# 小时频
hours = pd.date_range("2024-01-01", periods=48, freq="h")
print(f"小时频: {hours[:5]}")

# 工作日
business_days = pd.date_range("2024-01-01", "2024-01-31", freq="B")
print(f"工作日: {len(business_days)} 天")

# ============================================================
# 创建时间序列 DataFrame
# ============================================================
np.random.seed(42)
df_ts = pd.DataFrame({
    "date": pd.date_range("2024-01-01", periods=90, freq="D"),
    "sales": np.random.randint(100, 500, 90),
    "customers": np.random.randint(10, 50, 90),
})
df_ts = df_ts.set_index("date")
print(f"\n时间序列数据:\n{df_ts.head(10)}")

# ============================================================
# 重采样(resample)
# ============================================================
# 按周聚合
weekly = df_ts.resample("W").agg({
    "sales": "sum",
    "customers": "sum",
})
print(f"\n周汇总:\n{weekly}")

# 按月聚合
monthly = df_ts.resample("ME").agg({
    "sales": ["sum", "mean", "max"],
})
print(f"\n月汇总:\n{monthly}")

# ============================================================
# 滚动窗口
# ============================================================
df_ts["sales_7d_avg"] = df_ts["sales"].rolling(window=7).mean()
df_ts["sales_7d_std"] = df_ts["sales"].rolling(window=7).std()
df_ts["sales_ma_ratio"] = df_ts["sales"] / df_ts["sales_7d_avg"]

print(f"\n7天移动平均:\n{df_ts[["sales", "sales_7d_avg", "sales_ma_ratio"]].head(10)}")

# 指数加权移动平均(近期权重更高)
df_ts["sales_ewm"] = df_ts["sales"].ewm(span=7).mean()

# ============================================================
# shift / diff / pct_change
# ============================================================
df_ts["prev_day"] = df_ts["sales"].shift(1)
df_ts["day_diff"] = df_ts["sales"].diff()
df_ts["pct_change"] = df_ts["sales"].pct_change() * 100

print(f"\n日环比:\n{df_ts[["sales", "prev_day", "day_diff", "pct_change"]].head(10)}")

2. 电商订单分析实战

# ============================================================
# 模拟电商数据
# ============================================================
np.random.seed(42)
n = 1000

orders = pd.DataFrame({
    "order_id": range(1, n + 1),
    "customer_id": np.random.randint(1, 201, n),
    "product": np.random.choice(["手机", "笔记本", "耳机", "平板", "手表"], n, p=[0.3, 0.2, 0.25, 0.15, 0.1]),
    "amount": np.random.uniform(50, 5000, n).round(2),
    "quantity": np.random.randint(1, 5, n),
    "order_date": pd.date_range("2024-01-01", periods=n, freq="h") + pd.to_timedelta(np.random.randint(0, 720, n), unit="h"),
    "is_returned": np.random.choice([True, False], n, p=[0.05, 0.95]),
})

orders["month"] = orders["order_date"].dt.to_period("M")
orders["day_of_week"] = orders["order_date"].dt.day_name()
orders["hour"] = orders["order_date"].dt.hour

print(f"原始数据:\n{orders.head()}\n")

# Q1: 每种产品的总销售额和退货率
product_stats = orders.groupby("product").agg(
    total_revenue=("amount", "sum"),
    avg_order=("amount", "mean"),
    order_count=("order_id", "count"),
    return_rate=("is_returned", "mean"),
).sort_values("total_revenue", ascending=False)
product_stats["return_rate"] = (product_stats["return_rate"] * 100).round(1)
print(f"产品统计:\n{product_stats}\n")

# Q2: 月度销售趋势
monthly_revenue = orders.groupby("month")["amount"].sum()
print(f"月度销售额:\n{monthly_revenue}\n")

# Q3: 星期几销售额最高
dow_sales = orders.groupby("day_of_week")["amount"].mean().round(2)
# 恢复正确的星期顺序
day_order = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
dow_sales = dow_sales.reindex(day_order)
print(f"星期均值:\n{dow_sales}\n")

# Q4: Top 10 客户
top_customers = orders.groupby("customer_id").agg(
    total_spent=("amount", "sum"),
    order_count=("order_id", "count"),
).sort_values("total_spent", ascending=False).head(10)
print(f"Top 10 客户:\n{top_customers}")

3. 高性能文件读写

# ============================================================
# 读取与写入优化
# ============================================================

# CSV —— 指定数据类型加速读取
df = pd.read_csv("large_file.csv",
    dtype={"customer_id": "int32", "product": "category"},
    parse_dates=["order_date"],
    usecols=["customer_id", "product", "amount", "order_date"],
    nrows=100_000,  # 限制行数
)
df.to_csv("output.csv", index=False)

# Parquet —— 推荐!体积小,读取快
df.to_parquet("output.parquet", compression="snappy")
df = pd.read_parquet("output.parquet")

# Excel
df.to_excel("output.xlsx", sheet_name="Orders", index=False)
df = pd.read_excel("output.xlsx", sheet_name="Orders")

# 从 SQL 读取
from sqlalchemy import create_engine
engine = create_engine("sqlite:///orders.db")
df.to_sql("orders", engine, if_exists="replace", index=False)
df = pd.read_sql("SELECT product, SUM(amount) FROM orders GROUP BY product", engine)

4. 内存优化技巧

# 查看内存使用
print(orders.memory_usage(deep=True))

# 优化数据类型
orders_optimized = orders.copy()
orders_optimized["customer_id"] = orders_optimized["customer_id"].astype("int32")
orders_optimized["product"] = orders_optimized["product"].astype("category")
orders_optimized["is_returned"] = orders_optimized["is_returned"].astype("bool")
orders_optimized["amount"] = orders_optimized["amount"].astype("float32")

print(f"\n优化前: {orders.memory_usage(deep=True).sum() / 1024:.1f} KB")
print(f"优化后: {orders_optimized.memory_usage(deep=True).sum() / 1024:.1f} KB")

数据格式对比

格式 读取速度 写入速度 文件大小 适用场景
CSV 通用交换格式
Parquet 大数据分析(推荐)
Excel 很慢 很慢 与业务人员交换
JSON API 数据
Feather 极快 极快 中间缓存

教程

Pandas 入门教程 —— 从 Excel 思维到 DataFrame 思维

本章目标

  • 理解 Series 和 DataFrame 的数据结构
  • 掌握数据清洗的标准流程(ETL)
  • 学会用 groupby+agg 替代 Excel 透视表
  • 理解链式操作与管道编程风格

1. Pandas 的核心数据结构

Series —— 带索引的一维数组

import pandas as pd

s = pd.Series([10, 20, 30, 40], index=["a", "b", "c", "d"])
print(s)
# a    10
# b    20
# c    30
# d    40

# Series 同时具备 dict 和 ndarray 的特性
print(s["b"])        # 像 dict
print(s[s > 20])     # 像 ndarray 布尔索引
print(s.values)      # 底层 NumPy 数组

DataFrame —— 带行列标签的二维表

df = pd.DataFrame({
    "name": ["Alice", "Bob"],
    "age": [25, 30],
    "salary": [50000, 60000],
})
# 每列是一个 Series
# 行索引 (index) 默认 0, 1, 2...
# 列索引 (columns) 是字典的 key

2. 数据清洗标准流程(EDA Pipeline)

# 第1步:加载
df = pd.read_csv("raw_data.csv")

# 第2步:表面检查
print(df.shape)
print(df.head())
print(df.info())
print(df.describe())
print(df.isnull().sum())

# 第3步:缺失值处理
df["age"].fillna(df["age"].median(), inplace=True)   # 数值:中位数
df["city"].fillna("Unknown", inplace=True)            # 分类:特定值

# 第4步:重复值检查
df.drop_duplicates(subset=["email"], keep="first", inplace=True)

# 第5步:类型转换
df["date"] = pd.to_datetime(df["date"])
df["category"] = df["category"].astype("category")

# 第6步:异常值处理
q1 = df["price"].quantile(0.25)
q3 = df["price"].quantile(0.75)
iqr = q3 - q1
df = df[(df["price"] >= q1 - 1.5 * iqr) & (df["price"] <= q3 + 1.5 * iqr)]

# 第7步:保存
df.to_parquet("clean_data.parquet")

3. groupby 深度指南

groupby 三部曲:split → apply → combine

原始数据            Split               Apply            Combine
┌──┬──┬──┐     ┌──┬──┬──┐
│A│X│10│     │A│X│10│  → mean() →  A: 20
│B│Y│30│  →  │A│X│30│  → sum()  →  B: 105
│A│Z│20│     │B│Y│30│
│B│Y│75│     │B│Z│75│
└──┴──┴──┘     └──┴──┴──┘

agg vs transform vs apply

df = pd.DataFrame({
    "team": ["A", "A", "B", "B"],
    "score": [10, 20, 30, 40],
})

# agg: 聚合为一行(shape 缩小)
print(df.groupby("team").agg(avg_score=("score", "mean")))
#       avg_score
# team
# A          15.0
# B          35.0

# transform: 保持原形状(广播回来)
df["team_avg"] = df.groupby("team")["score"].transform("mean")
#   team  score  team_avg
# 0    A     10      15.0
# 1    A     20      15.0
# 2    B     30      35.0
# 3    B     40      35.0

# apply: 最灵活,但最慢
def top_score(group):
    return group.nlargest(1, "score")

print(df.groupby("team").apply(top_score))

4. 链式操作与管道

方法链(Method Chaining)

# ❌ 传统风格 —— 中间变量地狱
df1 = df[df["age"] > 25]
df2 = df1.groupby("city")
df3 = df2["salary"].mean()
df4 = df3.sort_values(ascending=False)
result = df4.head(5)

# ✅ 链式风格 —— 清晰的数据流
result = (
    df
    .query("age > 25")
    .groupby("city")["salary"]
    .mean()
    .sort_values(ascending=False)
    .head(5)
)

pipe —— 自定义函数链入管道

def remove_outliers(df, column):
    q1, q3 = df[column].quantile([0.25, 0.75])
    iqr = q3 - q1
    return df[df[column].between(q1 - 1.5*iqr, q3 + 1.5*iqr)]

def add_features(df):
    df["income_per_age"] = df["salary"] / df["age"]
    return df

clean_df = (
    df
    .pipe(remove_outliers, "salary")
    .pipe(add_features)
    .dropna()
)

5. Pandas 与 SQL 对照表

SQL Pandas
SELECT col1, col2 df[["col1", "col2"]]
WHERE condition df[df["col"] > 10]df.query()
GROUP BY df.groupby("col")
HAVING .filter(lambda g: g["val"].sum() > 100)
ORDER BY df.sort_values("col")
LIMIT n df.head(n)
JOIN ... ON pd.merge(left, right, on="key")
UNION ALL pd.concat([df1, df2])
COUNT(DISTINCT) df["col"].nunique()
CASE WHEN np.select()pd.cut()

思考题

  1. 什么情况下 df.groupby().apply()agg() 更好?性能上有何代价?
  2. pd.cutpd.qcut 的区别是什么?各自适用什么场景?
  3. 为什么 pd.concat([df1, df2]) 有时会导致索引重复?如何修复?
  4. Pandas 的 inplace=True 参数有什么优缺点?为什么很多开发者建议避免使用?

参考资料

  1. [1] Wes McKinney. Python for Data Analysis (3rd Edition). 2022.
  2. [2] Pandas Development Team. Pandas 官方文档. 2024.
  3. [3] Matt Harrison. Effective Pandas. 2021.
  4. [4] Matt Harrison, Theodore Petrou. Pandas Cookbook (2nd Edition). 2021.