Pandas 时间序列与文件 I/O 实战
目标
- 掌握时间序列数据的处理:重采样、滚动窗口、shift
- 掌握多格式文件读写与性能优化
- 实战:分析电商订单数据
完整代码
1. 时间序列处理
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
# ============================================================
# 日期范围生成
# ============================================================
# 日频
dates = pd.date_range("2024-01-01", "2024-01-31", freq="D")
print(f"日频: {len(dates)} 天, 首日: {dates[0]}, 末日: {dates[-1]}")
# 小时频
hours = pd.date_range("2024-01-01", periods=48, freq="h")
print(f"小时频: {hours[:5]}")
# 工作日
business_days = pd.date_range("2024-01-01", "2024-01-31", freq="B")
print(f"工作日: {len(business_days)} 天")
# ============================================================
# 创建时间序列 DataFrame
# ============================================================
np.random.seed(42)
df_ts = pd.DataFrame({
"date": pd.date_range("2024-01-01", periods=90, freq="D"),
"sales": np.random.randint(100, 500, 90),
"customers": np.random.randint(10, 50, 90),
})
df_ts = df_ts.set_index("date")
print(f"\n时间序列数据:\n{df_ts.head(10)}")
# ============================================================
# 重采样(resample)
# ============================================================
# 按周聚合
weekly = df_ts.resample("W").agg({
"sales": "sum",
"customers": "sum",
})
print(f"\n周汇总:\n{weekly}")
# 按月聚合
monthly = df_ts.resample("ME").agg({
"sales": ["sum", "mean", "max"],
})
print(f"\n月汇总:\n{monthly}")
# ============================================================
# 滚动窗口
# ============================================================
df_ts["sales_7d_avg"] = df_ts["sales"].rolling(window=7).mean()
df_ts["sales_7d_std"] = df_ts["sales"].rolling(window=7).std()
df_ts["sales_ma_ratio"] = df_ts["sales"] / df_ts["sales_7d_avg"]
print(f"\n7天移动平均:\n{df_ts[["sales", "sales_7d_avg", "sales_ma_ratio"]].head(10)}")
# 指数加权移动平均(近期权重更高)
df_ts["sales_ewm"] = df_ts["sales"].ewm(span=7).mean()
# ============================================================
# shift / diff / pct_change
# ============================================================
df_ts["prev_day"] = df_ts["sales"].shift(1)
df_ts["day_diff"] = df_ts["sales"].diff()
df_ts["pct_change"] = df_ts["sales"].pct_change() * 100
print(f"\n日环比:\n{df_ts[["sales", "prev_day", "day_diff", "pct_change"]].head(10)}")
2. 电商订单分析实战
# ============================================================
# 模拟电商数据
# ============================================================
np.random.seed(42)
n = 1000
orders = pd.DataFrame({
"order_id": range(1, n + 1),
"customer_id": np.random.randint(1, 201, n),
"product": np.random.choice(["手机", "笔记本", "耳机", "平板", "手表"], n, p=[0.3, 0.2, 0.25, 0.15, 0.1]),
"amount": np.random.uniform(50, 5000, n).round(2),
"quantity": np.random.randint(1, 5, n),
"order_date": pd.date_range("2024-01-01", periods=n, freq="h") + pd.to_timedelta(np.random.randint(0, 720, n), unit="h"),
"is_returned": np.random.choice([True, False], n, p=[0.05, 0.95]),
})
orders["month"] = orders["order_date"].dt.to_period("M")
orders["day_of_week"] = orders["order_date"].dt.day_name()
orders["hour"] = orders["order_date"].dt.hour
print(f"原始数据:\n{orders.head()}\n")
# Q1: 每种产品的总销售额和退货率
product_stats = orders.groupby("product").agg(
total_revenue=("amount", "sum"),
avg_order=("amount", "mean"),
order_count=("order_id", "count"),
return_rate=("is_returned", "mean"),
).sort_values("total_revenue", ascending=False)
product_stats["return_rate"] = (product_stats["return_rate"] * 100).round(1)
print(f"产品统计:\n{product_stats}\n")
# Q2: 月度销售趋势
monthly_revenue = orders.groupby("month")["amount"].sum()
print(f"月度销售额:\n{monthly_revenue}\n")
# Q3: 星期几销售额最高
dow_sales = orders.groupby("day_of_week")["amount"].mean().round(2)
# 恢复正确的星期顺序
day_order = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
dow_sales = dow_sales.reindex(day_order)
print(f"星期均值:\n{dow_sales}\n")
# Q4: Top 10 客户
top_customers = orders.groupby("customer_id").agg(
total_spent=("amount", "sum"),
order_count=("order_id", "count"),
).sort_values("total_spent", ascending=False).head(10)
print(f"Top 10 客户:\n{top_customers}")
3. 高性能文件读写
# ============================================================
# 读取与写入优化
# ============================================================
# CSV —— 指定数据类型加速读取
df = pd.read_csv("large_file.csv",
dtype={"customer_id": "int32", "product": "category"},
parse_dates=["order_date"],
usecols=["customer_id", "product", "amount", "order_date"],
nrows=100_000, # 限制行数
)
df.to_csv("output.csv", index=False)
# Parquet —— 推荐!体积小,读取快
df.to_parquet("output.parquet", compression="snappy")
df = pd.read_parquet("output.parquet")
# Excel
df.to_excel("output.xlsx", sheet_name="Orders", index=False)
df = pd.read_excel("output.xlsx", sheet_name="Orders")
# 从 SQL 读取
from sqlalchemy import create_engine
engine = create_engine("sqlite:///orders.db")
df.to_sql("orders", engine, if_exists="replace", index=False)
df = pd.read_sql("SELECT product, SUM(amount) FROM orders GROUP BY product", engine)
4. 内存优化技巧
# 查看内存使用
print(orders.memory_usage(deep=True))
# 优化数据类型
orders_optimized = orders.copy()
orders_optimized["customer_id"] = orders_optimized["customer_id"].astype("int32")
orders_optimized["product"] = orders_optimized["product"].astype("category")
orders_optimized["is_returned"] = orders_optimized["is_returned"].astype("bool")
orders_optimized["amount"] = orders_optimized["amount"].astype("float32")
print(f"\n优化前: {orders.memory_usage(deep=True).sum() / 1024:.1f} KB")
print(f"优化后: {orders_optimized.memory_usage(deep=True).sum() / 1024:.1f} KB")
数据格式对比
| 格式 |
读取速度 |
写入速度 |
文件大小 |
适用场景 |
| CSV |
慢 |
中 |
大 |
通用交换格式 |
| Parquet |
快 |
快 |
小 |
大数据分析(推荐) |
| Excel |
很慢 |
很慢 |
中 |
与业务人员交换 |
| JSON |
中 |
中 |
大 |
API 数据 |
| Feather |
极快 |
极快 |
中 |
中间缓存 |