Pandas DataFrame 核心操作 —— 数据读写、清洗、聚合
目标
- 掌握 DataFrame 的创建、读取、写入
- 掌握数据筛选、排序、分组聚合
- 理解链式操作与管道风格
完整代码
import pandas as pd
import numpy as np
# ============================================================
# 1. DataFrame 创建
# ============================================================
# 从字典创建
df = pd.DataFrame({
"name": ["Alice", "Bob", "Charlie", "Diana", "Eve"],
"age": [25, 32, 28, 35, 22],
"city": ["Beijing", "Shanghai", "Beijing", "Shenzhen", "Shanghai"],
"salary": [15000, 22000, 18000, 25000, 12000],
"department": ["Engineering", "Sales", "Engineering", "HR", "Sales"],
})
print(f"原始数据:\n{df}\n")
# ============================================================
# 2. 基本属性与信息
# ============================================================
print(f"形状: {df.shape}")
print(f"列名: {df.columns.tolist()}")
print(f"数据类型:\n{df.dtypes}")
print(f"描述统计:\n{df.describe()}")
print(f"\n缺失值:\n{df.isnull().sum()}")
# ============================================================
# 3. 数据筛选
# ============================================================
# 按条件筛选
engineers = df[df["department"] == "Engineering"]
print(f"Engineering 部门:\n{engineers}\n")
# 多条件(用 & 不是 and,用 | 不是 or)
high_paid = df[(df["salary"] > 15000) & (df["city"] == "Beijing")]
print(f"北京高薪员工:\n{high_paid}\n")
# query() 方法(更易读)
result = df.query("salary > 15000 and city == 'Beijing'")
print(f"query 方法:\n{result}\n")
# 按位置:iloc
print(f"前两行:\n{df.iloc[:2]}")
print(f"第1,3行和第0,2列:\n{df.iloc[[0,2], [0,2]]}\n")
# 按标签:loc
print(f"name 和 salary 列:\n{df.loc[:, ["name", "salary"]]}")
# ============================================================
# 4. 数据排序
# ============================================================
sorted_df = df.sort_values("salary", ascending=False)
print(f"\n按工资降序:\n{sorted_df}")
# 多列排序
sorted_df = df.sort_values(["department", "salary"], ascending=[True, False])
print(f"\n按部门+工资排序:\n{sorted_df}")
# ============================================================
# 5. 分组聚合(核心!)
# ============================================================
# 单列聚合
print(f"\n按部门平均工资:\n{df.groupby("department")["salary"].mean()}")
# 多列多聚合
stats = df.groupby("city").agg({
"salary": ["mean", "min", "max", "count"],
"age": "mean",
})
print(f"\n按城市多聚合:\n{stats}")
# 自定义聚合
def salary_range(x):
return x.max() - x.min()
custom_agg = df.groupby("department").agg(
avg_salary=("salary", "mean"),
total_salary=("salary", "sum"),
salary_spread=("salary", salary_range),
headcount=("name", "count"),
)
print(f"\n自定义聚合:\n{custom_agg}")
# transform —— 保持原形状
df["dept_avg"] = df.groupby("department")["salary"].transform("mean")
df["salary_pct"] = df["salary"] / df["dept_avg"] * 100
print(f"\n带部门均值:\n{df}")
# ============================================================
# 6. 链式操作(推荐风格)
# ============================================================
result = (
df
.query("age >= 25")
.groupby("city")
.agg(avg_salary=("salary", "mean"), count=("name", "count"))
.query("count >= 1")
.sort_values("avg_salary", ascending=False)
.reset_index()
)
print(f"\n链式操作结果:\n{result}")
# ============================================================
# 7. 表连接(类似 SQL JOIN)
# ============================================================
departments = pd.DataFrame({
"department": ["Engineering", "Sales", "HR", "Finance"],
"manager": ["张工", "李经理", "王主管", "赵总监"],
"floor": [3, 5, 4, 6],
})
# INNER JOIN
merged = pd.merge(df, departments, on="department", how="inner")
print(f"\nINNER JOIN:\n{merged}")
# LEFT JOIN
merged = pd.merge(df, departments, on="department", how="left")
print(f"\nLEFT JOIN:\n{merged}")
# ============================================================
# 8. 数据透视表
# ============================================================
pivot = df.pivot_table(
values="salary",
index="department",
columns="city",
aggfunc="mean",
fill_value=0,
)
print(f"\n数据透视表:\n{pivot}")
关键要点
| 操作 |
方法 |
| 筛选 |
df[condition], df.query(), df.loc[], df.iloc[] |
| 排序 |
df.sort_values() |
| 分组 |
df.groupby().agg() |
| 窗口 |
df["col"].transform() |
| 合并 |
pd.merge(), pd.concat() |
| 透视 |
df.pivot_table() |
| 链式 |
.pipe() 或在括号中连续调用 |
Pandas 时间序列与文件 I/O 实战
目标
- 掌握时间序列数据的处理:重采样、滚动窗口、shift
- 掌握多格式文件读写与性能优化
- 实战:分析电商订单数据
完整代码
1. 时间序列处理
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
# ============================================================
# 日期范围生成
# ============================================================
# 日频
dates = pd.date_range("2024-01-01", "2024-01-31", freq="D")
print(f"日频: {len(dates)} 天, 首日: {dates[0]}, 末日: {dates[-1]}")
# 小时频
hours = pd.date_range("2024-01-01", periods=48, freq="h")
print(f"小时频: {hours[:5]}")
# 工作日
business_days = pd.date_range("2024-01-01", "2024-01-31", freq="B")
print(f"工作日: {len(business_days)} 天")
# ============================================================
# 创建时间序列 DataFrame
# ============================================================
np.random.seed(42)
df_ts = pd.DataFrame({
"date": pd.date_range("2024-01-01", periods=90, freq="D"),
"sales": np.random.randint(100, 500, 90),
"customers": np.random.randint(10, 50, 90),
})
df_ts = df_ts.set_index("date")
print(f"\n时间序列数据:\n{df_ts.head(10)}")
# ============================================================
# 重采样(resample)
# ============================================================
# 按周聚合
weekly = df_ts.resample("W").agg({
"sales": "sum",
"customers": "sum",
})
print(f"\n周汇总:\n{weekly}")
# 按月聚合
monthly = df_ts.resample("ME").agg({
"sales": ["sum", "mean", "max"],
})
print(f"\n月汇总:\n{monthly}")
# ============================================================
# 滚动窗口
# ============================================================
df_ts["sales_7d_avg"] = df_ts["sales"].rolling(window=7).mean()
df_ts["sales_7d_std"] = df_ts["sales"].rolling(window=7).std()
df_ts["sales_ma_ratio"] = df_ts["sales"] / df_ts["sales_7d_avg"]
print(f"\n7天移动平均:\n{df_ts[["sales", "sales_7d_avg", "sales_ma_ratio"]].head(10)}")
# 指数加权移动平均(近期权重更高)
df_ts["sales_ewm"] = df_ts["sales"].ewm(span=7).mean()
# ============================================================
# shift / diff / pct_change
# ============================================================
df_ts["prev_day"] = df_ts["sales"].shift(1)
df_ts["day_diff"] = df_ts["sales"].diff()
df_ts["pct_change"] = df_ts["sales"].pct_change() * 100
print(f"\n日环比:\n{df_ts[["sales", "prev_day", "day_diff", "pct_change"]].head(10)}")
2. 电商订单分析实战
# ============================================================
# 模拟电商数据
# ============================================================
np.random.seed(42)
n = 1000
orders = pd.DataFrame({
"order_id": range(1, n + 1),
"customer_id": np.random.randint(1, 201, n),
"product": np.random.choice(["手机", "笔记本", "耳机", "平板", "手表"], n, p=[0.3, 0.2, 0.25, 0.15, 0.1]),
"amount": np.random.uniform(50, 5000, n).round(2),
"quantity": np.random.randint(1, 5, n),
"order_date": pd.date_range("2024-01-01", periods=n, freq="h") + pd.to_timedelta(np.random.randint(0, 720, n), unit="h"),
"is_returned": np.random.choice([True, False], n, p=[0.05, 0.95]),
})
orders["month"] = orders["order_date"].dt.to_period("M")
orders["day_of_week"] = orders["order_date"].dt.day_name()
orders["hour"] = orders["order_date"].dt.hour
print(f"原始数据:\n{orders.head()}\n")
# Q1: 每种产品的总销售额和退货率
product_stats = orders.groupby("product").agg(
total_revenue=("amount", "sum"),
avg_order=("amount", "mean"),
order_count=("order_id", "count"),
return_rate=("is_returned", "mean"),
).sort_values("total_revenue", ascending=False)
product_stats["return_rate"] = (product_stats["return_rate"] * 100).round(1)
print(f"产品统计:\n{product_stats}\n")
# Q2: 月度销售趋势
monthly_revenue = orders.groupby("month")["amount"].sum()
print(f"月度销售额:\n{monthly_revenue}\n")
# Q3: 星期几销售额最高
dow_sales = orders.groupby("day_of_week")["amount"].mean().round(2)
# 恢复正确的星期顺序
day_order = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
dow_sales = dow_sales.reindex(day_order)
print(f"星期均值:\n{dow_sales}\n")
# Q4: Top 10 客户
top_customers = orders.groupby("customer_id").agg(
total_spent=("amount", "sum"),
order_count=("order_id", "count"),
).sort_values("total_spent", ascending=False).head(10)
print(f"Top 10 客户:\n{top_customers}")
3. 高性能文件读写
# ============================================================
# 读取与写入优化
# ============================================================
# CSV —— 指定数据类型加速读取
df = pd.read_csv("large_file.csv",
dtype={"customer_id": "int32", "product": "category"},
parse_dates=["order_date"],
usecols=["customer_id", "product", "amount", "order_date"],
nrows=100_000, # 限制行数
)
df.to_csv("output.csv", index=False)
# Parquet —— 推荐!体积小,读取快
df.to_parquet("output.parquet", compression="snappy")
df = pd.read_parquet("output.parquet")
# Excel
df.to_excel("output.xlsx", sheet_name="Orders", index=False)
df = pd.read_excel("output.xlsx", sheet_name="Orders")
# 从 SQL 读取
from sqlalchemy import create_engine
engine = create_engine("sqlite:///orders.db")
df.to_sql("orders", engine, if_exists="replace", index=False)
df = pd.read_sql("SELECT product, SUM(amount) FROM orders GROUP BY product", engine)
4. 内存优化技巧
# 查看内存使用
print(orders.memory_usage(deep=True))
# 优化数据类型
orders_optimized = orders.copy()
orders_optimized["customer_id"] = orders_optimized["customer_id"].astype("int32")
orders_optimized["product"] = orders_optimized["product"].astype("category")
orders_optimized["is_returned"] = orders_optimized["is_returned"].astype("bool")
orders_optimized["amount"] = orders_optimized["amount"].astype("float32")
print(f"\n优化前: {orders.memory_usage(deep=True).sum() / 1024:.1f} KB")
print(f"优化后: {orders_optimized.memory_usage(deep=True).sum() / 1024:.1f} KB")
数据格式对比
| 格式 |
读取速度 |
写入速度 |
文件大小 |
适用场景 |
| CSV |
慢 |
中 |
大 |
通用交换格式 |
| Parquet |
快 |
快 |
小 |
大数据分析(推荐) |
| Excel |
很慢 |
很慢 |
中 |
与业务人员交换 |
| JSON |
中 |
中 |
大 |
API 数据 |
| Feather |
极快 |
极快 |
中 |
中间缓存 |