02-time-series-and-io

知识库
知识库文档
/tech-stacks/pandas/examples/02-time-series-and-io.md

文档

Pandas 时间序列与文件 I/O 实战

目标

  • 掌握时间序列数据的处理:重采样、滚动窗口、shift
  • 掌握多格式文件读写与性能优化
  • 实战:分析电商订单数据

完整代码

1. 时间序列处理

import pandas as pd
import numpy as np
from datetime import datetime, timedelta

# ============================================================
# 日期范围生成
# ============================================================
# 日频
dates = pd.date_range("2024-01-01", "2024-01-31", freq="D")
print(f"日频: {len(dates)} 天, 首日: {dates[0]}, 末日: {dates[-1]}")

# 小时频
hours = pd.date_range("2024-01-01", periods=48, freq="h")
print(f"小时频: {hours[:5]}")

# 工作日
business_days = pd.date_range("2024-01-01", "2024-01-31", freq="B")
print(f"工作日: {len(business_days)} 天")

# ============================================================
# 创建时间序列 DataFrame
# ============================================================
np.random.seed(42)
df_ts = pd.DataFrame({
    "date": pd.date_range("2024-01-01", periods=90, freq="D"),
    "sales": np.random.randint(100, 500, 90),
    "customers": np.random.randint(10, 50, 90),
})
df_ts = df_ts.set_index("date")
print(f"\n时间序列数据:\n{df_ts.head(10)}")

# ============================================================
# 重采样(resample)
# ============================================================
# 按周聚合
weekly = df_ts.resample("W").agg({
    "sales": "sum",
    "customers": "sum",
})
print(f"\n周汇总:\n{weekly}")

# 按月聚合
monthly = df_ts.resample("ME").agg({
    "sales": ["sum", "mean", "max"],
})
print(f"\n月汇总:\n{monthly}")

# ============================================================
# 滚动窗口
# ============================================================
df_ts["sales_7d_avg"] = df_ts["sales"].rolling(window=7).mean()
df_ts["sales_7d_std"] = df_ts["sales"].rolling(window=7).std()
df_ts["sales_ma_ratio"] = df_ts["sales"] / df_ts["sales_7d_avg"]

print(f"\n7天移动平均:\n{df_ts[["sales", "sales_7d_avg", "sales_ma_ratio"]].head(10)}")

# 指数加权移动平均(近期权重更高)
df_ts["sales_ewm"] = df_ts["sales"].ewm(span=7).mean()

# ============================================================
# shift / diff / pct_change
# ============================================================
df_ts["prev_day"] = df_ts["sales"].shift(1)
df_ts["day_diff"] = df_ts["sales"].diff()
df_ts["pct_change"] = df_ts["sales"].pct_change() * 100

print(f"\n日环比:\n{df_ts[["sales", "prev_day", "day_diff", "pct_change"]].head(10)}")

2. 电商订单分析实战

# ============================================================
# 模拟电商数据
# ============================================================
np.random.seed(42)
n = 1000

orders = pd.DataFrame({
    "order_id": range(1, n + 1),
    "customer_id": np.random.randint(1, 201, n),
    "product": np.random.choice(["手机", "笔记本", "耳机", "平板", "手表"], n, p=[0.3, 0.2, 0.25, 0.15, 0.1]),
    "amount": np.random.uniform(50, 5000, n).round(2),
    "quantity": np.random.randint(1, 5, n),
    "order_date": pd.date_range("2024-01-01", periods=n, freq="h") + pd.to_timedelta(np.random.randint(0, 720, n), unit="h"),
    "is_returned": np.random.choice([True, False], n, p=[0.05, 0.95]),
})

orders["month"] = orders["order_date"].dt.to_period("M")
orders["day_of_week"] = orders["order_date"].dt.day_name()
orders["hour"] = orders["order_date"].dt.hour

print(f"原始数据:\n{orders.head()}\n")

# Q1: 每种产品的总销售额和退货率
product_stats = orders.groupby("product").agg(
    total_revenue=("amount", "sum"),
    avg_order=("amount", "mean"),
    order_count=("order_id", "count"),
    return_rate=("is_returned", "mean"),
).sort_values("total_revenue", ascending=False)
product_stats["return_rate"] = (product_stats["return_rate"] * 100).round(1)
print(f"产品统计:\n{product_stats}\n")

# Q2: 月度销售趋势
monthly_revenue = orders.groupby("month")["amount"].sum()
print(f"月度销售额:\n{monthly_revenue}\n")

# Q3: 星期几销售额最高
dow_sales = orders.groupby("day_of_week")["amount"].mean().round(2)
# 恢复正确的星期顺序
day_order = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
dow_sales = dow_sales.reindex(day_order)
print(f"星期均值:\n{dow_sales}\n")

# Q4: Top 10 客户
top_customers = orders.groupby("customer_id").agg(
    total_spent=("amount", "sum"),
    order_count=("order_id", "count"),
).sort_values("total_spent", ascending=False).head(10)
print(f"Top 10 客户:\n{top_customers}")

3. 高性能文件读写

# ============================================================
# 读取与写入优化
# ============================================================

# CSV —— 指定数据类型加速读取
df = pd.read_csv("large_file.csv",
    dtype={"customer_id": "int32", "product": "category"},
    parse_dates=["order_date"],
    usecols=["customer_id", "product", "amount", "order_date"],
    nrows=100_000,  # 限制行数
)
df.to_csv("output.csv", index=False)

# Parquet —— 推荐!体积小,读取快
df.to_parquet("output.parquet", compression="snappy")
df = pd.read_parquet("output.parquet")

# Excel
df.to_excel("output.xlsx", sheet_name="Orders", index=False)
df = pd.read_excel("output.xlsx", sheet_name="Orders")

# 从 SQL 读取
from sqlalchemy import create_engine
engine = create_engine("sqlite:///orders.db")
df.to_sql("orders", engine, if_exists="replace", index=False)
df = pd.read_sql("SELECT product, SUM(amount) FROM orders GROUP BY product", engine)

4. 内存优化技巧

# 查看内存使用
print(orders.memory_usage(deep=True))

# 优化数据类型
orders_optimized = orders.copy()
orders_optimized["customer_id"] = orders_optimized["customer_id"].astype("int32")
orders_optimized["product"] = orders_optimized["product"].astype("category")
orders_optimized["is_returned"] = orders_optimized["is_returned"].astype("bool")
orders_optimized["amount"] = orders_optimized["amount"].astype("float32")

print(f"\n优化前: {orders.memory_usage(deep=True).sum() / 1024:.1f} KB")
print(f"优化后: {orders_optimized.memory_usage(deep=True).sum() / 1024:.1f} KB")

数据格式对比

格式 读取速度 写入速度 文件大小 适用场景
CSV 通用交换格式
Parquet 大数据分析(推荐)
Excel 很慢 很慢 与业务人员交换
JSON API 数据
Feather 极快 极快 中间缓存

信息

路径
/tech-stacks/pandas/examples/02-time-series-and-io.md
更新时间
2026/5/30