FastAPI

技术栈
后端框架
pythonweb框架异步ASGIOpenAPI类型提示

概览

FastAPI 技术栈概览

FastAPI 是一个现代、高性能的 Python Web 框架,由 Sebastián Ramírez 于 2018 年发布,用于构建 RESTful API。它基于 Starlette(ASGI 服务器工具包)和 Pydantic(数据验证),充分利用 Python 3.6+ 的类型提示,提供自动生成的交互式 API 文档和极高的运行时性能。

核心特性:

  • 极致性能 — 性能媲美 Node.js 和 Go,基于 ASGI 异步架构
  • 📝 类型驱动 — 利用 Python 类型提示进行请求/响应验证、序列化和编辑器自动补全
  • 📖 自动文档 — 自动生成 OpenAPI(Swagger)和 ReDoc 交互式文档,无需额外代码
  • 🧪 Pydantic 集成 — 强大的数据验证和序列化,支持嵌套模型、枚举、正则等
  • 🔄 异步原生async/await 一等公民,轻松处理高并发 WebSocket、SSE
  • 💉 依赖注入 — 优雅的依赖注入系统,适合认证、数据库连接、权限检查
  • 🔒 安全工具 — 内置 OAuth2、JWT、API Key 等认证方案

适用场景: 高性能 REST API、微服务网关、实时 WebSocket 服务、机器学习模型服务(ML Serving)、数据管道接口。

安装

1. 环境准备

  • 操作系统: Windows 10+ / macOS 11+ / Linux(Ubuntu 20.04 推荐)
  • Python 版本: Python 3.9 及以上(推荐 3.11+,充分利用类型提示新特性)
  • 依赖项: pip、virtualenv/venv

创建并激活虚拟环境

python -m venv venv
# Windows:
venv\Scripts\activate
# macOS / Linux:
source venv/bin/activate

2. 安装命令

# 安装 FastAPI + ASGI 服务器
pip install fastapi uvicorn[standard]

# 验证安装
uvicorn --version

# 安装常用扩展
pip install pydantic[email]        # Pydantic 数据验证(FastAPI 已依赖)
pip install sqlalchemy             # ORM(如需数据库)
pip install alembic                # 数据库迁移
pip install python-jose[cryptography]  # JWT 认证
pip install passlib[bcrypt]        # 密码哈希
pip install python-multipart       # 文件上传支持
pip install httpx                  # 异步 HTTP 客户端(测试用)
pip install pytest                 # 测试框架

# 一键安装
pip install fastapi uvicorn[standard] sqlalchemy alembic python-jose[cryptography] passlib[bcrypt] python-multipart httpx pytest

# 运行第一个 FastAPI 应用
uvicorn main:app --reload
# 浏览器访问 http://127.0.0.1:8000/docs (Swagger 文档)

3. 常见安装问题

问题 1:uvicorn: command not found

# 确认虚拟环境已激活
# 或用 python -m 方式运行
python -m uvicorn main:app --reload

问题 2:pip 安装慢

pip install fastapi uvicorn -i https://pypi.tuna.tsinghua.edu.cn/simple

问题 3:端口 8000 被占用

uvicorn main:app --reload --port 8080
# 或查看占用进程
lsof -i :8000   # macOS/Linux
netstat -ano | findstr :8000  # Windows

问题 4:ImportError: No module named 'pydantic'

FastAPI 依赖 Pydantic v1 或 v2(自动安装)。若版本冲突:

pip uninstall pydantic
pip install "pydantic>=2.0"
pip install fastapi --no-deps

问题 5:文件上传报"Missing multipart boundary"

# 确保安装了 python-multipart
pip install python-multipart

示例

FastAPI Hello World —— 类型驱动的 API 开发

目标

创建一个最小的 FastAPI 应用,理解:

  • FastAPI 的装饰器路由
  • Pydantic 数据模型与类型验证
  • 自动 OpenAPI 文档(Swagger / ReDoc)
  • ASGI 异步支持

完整代码

# main.py
from fastapi import FastAPI, Path, Query, HTTPException
from pydantic import BaseModel, Field, EmailStr
from typing import Optional, List
from enum import Enum
from datetime import datetime

# 创建 FastAPI 实例
app = FastAPI(
    title="我的第一个 FastAPI",
    description="FastAPI Hello World 示例",
    version="1.0.0",
    docs_url="/docs",       # Swagger UI
    redoc_url="/redoc",     # ReDoc
)

# ---------- Pydantic 数据模型 ----------

class UserRole(str, Enum):
    ADMIN = "admin"
    USER = "user"
    GUEST = "guest"


class User(BaseModel):
    """用户创建请求模型"""
    username: str = Field(..., min_length=3, max_length=50, description="用户名")
    email: EmailStr = Field(..., description="邮箱地址")
    age: int = Field(ge=0, le=150, description="年龄")
    role: UserRole = Field(default=UserRole.USER, description="用户角色")
    tags: List[str] = Field(default=[], description="标签列表")


class UserResponse(BaseModel):
    """用户响应模型"""
    id: int
    username: str
    email: str
    role: UserRole
    created_at: datetime

    model_config = {"from_attributes": True}  # 支持 ORM 对象


# ---------- 路由 ----------

@app.get("/")
async def root():
    """根路由 —— Hello World"""
    return {
        "message": "🚀 Hello, FastAPI!",
        "docs": "/docs",
        "redoc": "/redoc",
    }


@app.get("/hello/{name}")
async def hello(
    name: str = Path(..., min_length=1, max_length=50, description="用户名"),
    greeting: str = Query("Hello", description="问候语"),
):
    """路径参数 + 查询参数"""
    return {"message": f"{greeting}, {name}!"}


@app.post("/users", response_model=UserResponse, status_code=201)
async def create_user(user: User):
    """创建用户 —— Pydantic 自动验证请求体"""
    # 在实际应用中此处写数据库
    if user.username == "admin":
        raise HTTPException(status_code=400, detail="用户名 'admin' 已被保留")
    
    return UserResponse(
        id=42,
        username=user.username,
        email=user.email,
        role=user.role,
        created_at=datetime.now(),
    )


@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(
    user_id: int = Path(..., ge=1, description="用户 ID"),
):
    """获取用户详情"""
    if user_id > 1000:
        raise HTTPException(status_code=404, detail="用户不存在")
    
    return UserResponse(
        id=user_id,
        username="demo_user",
        email="demo@example.com",
        role=UserRole.USER,
        created_at=datetime.now(),
    )


@app.get("/items/")
async def list_items(
    skip: int = Query(0, ge=0),
    limit: int = Query(10, ge=1, le=100),
    q: Optional[str] = Query(None, min_length=3),
):
    """分页查询 + 搜索"""
    items = [{"id": i, "name": f"Item {i}"} for i in range(1, 31)]
    if q:
        items = [i for i in items if q.lower() in i["name"].lower()]
    return {"items": items[skip : skip + limit], "total": len(items)}


# ---------- 自定义异常处理器 ----------

@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
    from fastapi.responses import JSONResponse
    return JSONResponse(
        status_code=exc.status_code,
        content={
            "error": exc.detail,
            "path": str(request.url.path),
        },
    )

运行步骤

# 开发模式(热重载)
uvicorn main:app --reload

# 生产模式(多 worker)
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4

访问

URL 说明
http://127.0.0.1:8000/ 根路由
http://127.0.0.1:8000/hello/Vibe?greeting=Hi 路径 + 查询参数
http://127.0.0.1:8000/docs Swagger 交互文档
http://127.0.0.1:8000/redoc ReDoc 文档
http://127.0.0.1:8000/openapi.json OpenAPI 规范 JSON

测试 API

# POST /users
curl -X POST http://127.0.0.1:8000/users \
  -H "Content-Type: application/json" \
  -d '{"username":"vibe","email":"vibe@test.com","age":25,"role":"admin","tags":["python","api"]}'

# GET /users/1
curl http://127.0.0.1:8000/users/1

# GET /items 分页
curl "http://127.0.0.1:8000/items/?skip=0&limit=5&q=item"

# 验证错误示例(年龄超限)
curl -X POST http://127.0.0.1:8000/users \
  -H "Content-Type: application/json" \
  -d '{"username":"abc","email":"bad","age":200,"role":"god"}'

关键要点

概念 说明
Body(...) / Path(...) / Query(...) 参数来源标注 + 验证规则
... (Ellipsis) 表示字段必填
Field(ge=0, le=150) 数值范围约束
response_model 响应模型,自动过滤多余字段、转换类型
Enum 枚举类型,Swagger 自动显示下拉选择
HTTPException 标准 HTTP 异常,返回结构化错误
async def 异步视图,支持非阻塞 IO(数据库、HTTP 调用)

FastAPI 依赖注入与 JWT 认证

目标

  • 掌握 FastAPI 依赖注入系统(Depends
  • 实现 JWT (OAuth2 Password Flow) 认证
  • 实现基于角色的权限控制
  • 理解子依赖(依赖的依赖)

完整代码

# main.py
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from typing import Optional, List
from datetime import datetime, timedelta, timezone
from passlib.context import CryptContext
from jose import JWTError, jwt
import os

# ---------- 配置 ----------
SECRET_KEY = os.getenv("SECRET_KEY", "dev-secret-change-in-production-12345")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# ---------- 密码哈希 ----------
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# ---------- OAuth2 方案 ----------
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")

# ---------- 模拟用户数据库 ----------
fake_users_db = {
    "alice": {
        "username": "alice",
        "full_name": "Alice Wang",
        "email": "alice@example.com",
        "hashed_password": pwd_context.hash("secret123"),
        "role": "admin",
        "disabled": False,
    },
    "bob": {
        "username": "bob",
        "full_name": "Bob Li",
        "email": "bob@example.com",
        "hashed_password": pwd_context.hash("secret456"),
        "role": "user",
        "disabled": False,
    },
}

# ---------- Pydantic 模型 ----------
class Token(BaseModel):
    access_token: str
    token_type: str = "bearer"


class TokenData(BaseModel):
    username: Optional[str] = None


class User(BaseModel):
    username: str
    email: str
    full_name: str
    role: str
    disabled: bool


class UserOut(BaseModel):
    username: str
    email: str
    full_name: str
    role: str


# ---------- 工具函数 ----------
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


def get_user(username: str) -> Optional[dict]:
    return fake_users_db.get(username)


def authenticate_user(username: str, password: str):
    user = get_user(username)
    if not user or not verify_password(password, user["hashed_password"]):
        return None
    return user


def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


# ---------- 依赖项(核心:依赖注入链)----------

async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
    """从 JWT Token 解析当前用户 —— 核心认证依赖"""
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="无法验证凭据",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception

    user = get_user(token_data.username)
    if user is None:
        raise credentials_exception
    return User(**user)


async def get_current_active_user(current_user: User = Depends(get_current_user)):
    """依赖 get_current_user —— 检查用户是否启用"""
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="用户已被禁用")
    return current_user


def require_role(required_role: str):
    """依赖工厂 —— 生成角色检查依赖"""
    async def role_checker(current_user: User = Depends(get_current_active_user)):
        if current_user.role != required_role and current_user.role != "admin":
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"需要 {required_role} 或 admin 权限",
            )
        return current_user
    return role_checker


# ---------- FastAPI 应用 ----------
app = FastAPI(title="FastAPI 认证示例", version="2.0.0")


# ---------- 路由 ----------

@app.post("/auth/login", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    """OAuth2 密码流登录 —— 获取 JWT Token"""
    user = authenticate_user(form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="用户名或密码错误",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token = create_access_token(
        data={"sub": user["username"], "role": user["role"]},
        expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
    )
    return Token(access_token=access_token)


@app.get("/users/me", response_model=UserOut)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    """获取当前用户信息(需认证)"""
    return current_user


@app.get("/admin/dashboard")
async def admin_dashboard(
    current_user: User = Depends(require_role("admin")),
):
    """管理员专属 —— 需要 admin 角色"""
    return {
        "message": f"欢迎管理员 {current_user.full_name}",
        "users": list(fake_users_db.keys()),
    }


@app.get("/public/health")
async def health_check():
    """公开接口 —— 无需认证"""
    return {"status": "healthy", "users_count": len(fake_users_db)}

测试

# 1. 获取 Token(alice / secret123)
curl -X POST http://127.0.0.1:8000/auth/login \
  -H "Content-Type: application/x-www-form-urlencoded" \
  -d "username=alice&password=secret123"

# 2. 使用 Token 访问受保护接口
TOKEN="<从上面获取的 access_token>"
curl http://127.0.0.1:8000/users/me \
  -H "Authorization: Bearer $TOKEN"

# 3. 访问管理员接口
curl http://127.0.0.1:8000/admin/dashboard \
  -H "Authorization: Bearer $TOKEN"

# 4. 测试 Bob(普通用户)无法访问管理员接口
# 先获取 bob 的 token...

依赖注入原理图

请求到达
    │
    ▼
Depends(oauth2_scheme)     ← 从 Header 提取 Bearer Token
    │
    ▼
Depends(get_current_user)   ← 解码 JWT → 查询用户
    │         │
    │         └──→ 可能被 get_current_active_user 依赖
    │             可能被 require_role("admin") 依赖
    │
    ▼
路由处理函数                 ← 获得已验证的用户对象

关键要点

概念 说明
Depends() 依赖注入的核心,可嵌套形成依赖链
OAuth2PasswordBearer 标准 OAuth2 密码流,Swagger 自动显示 🔓 按钮
依赖工厂 返回 callable 的函数,用于参数化依赖(如 require_role
OAuth2PasswordRequestForm 标准登录表单(username + password + scope)
子依赖 get_current_active_userget_current_useroauth2_scheme
python-jose JWT 编码/解码库
passlib[bcrypt] 密码哈希,bcrypt 算法(自动加盐)

教程

FastAPI 入门教程 —— 现代 Python API 开发

本章目标

  • 理解 FastAPI 的异步架构与性能优势
  • 掌握 Pydantic v2 数据模型深度用法
  • 构建完整 CRUD API(含 SQLAlchemy 异步数据库)
  • 理解 FastAPI 与 Flask/Django 的定位差异

1. FastAPI 为何这么快?

传统 WSGI(Flask/Django)
 请求 → 线程池 → 阻塞等待(DB、网络)→ 响应
        每个请求占一个线程

FastAPI ASGI
 请求 → 事件循环 → async/await(非阻塞)→ 响应
        一个线程处理成千上万并发

性能基准(每秒请求数,粗略对比):

  • Flask(sync):~5,000 req/s
  • FastAPI(async):~25,000 req/s
  • Node.js(Express):~20,000 req/s

2. Pydantic v2 深度用法

from pydantic import BaseModel, Field, field_validator, model_validator
from typing import Optional, List, Union
from datetime import date
from enum import Enum
import re


class ProductCategory(str, Enum):
    ELECTRONICS = "electronics"
    CLOTHING = "clothing"
    FOOD = "food"


class ProductCreate(BaseModel):
    """创建产品请求模型"""
    name: str = Field(..., min_length=1, max_length=200)
    price: float = Field(..., gt=0, description="价格必须大于0")
    category: ProductCategory
    tags: List[str] = Field(default=[], max_length=10)
    discount_percent: Optional[float] = Field(default=None, ge=0, le=100)
    release_date: Optional[date] = None

    # 字段验证器(v2 风格)
    @field_validator("name")
    @classmethod
    def name_must_not_be_empty(cls, v: str) -> str:
        if not v.strip():
            raise ValueError("名称不能为空或全是空格")
        return v.strip()

    @field_validator("tags")
    @classmethod
    def tags_must_be_unique(cls, v: List[str]) -> List[str]:
        if len(v) != len(set(v)):
            raise ValueError("标签不能重复")
        return [tag.strip().lower() for tag in v]

    # 模型级验证器(跨字段)
    @model_validator(mode="after")
    def validate_pricing(self):
        if self.discount_percent is not None and self.price < 10:
            raise ValueError("价格低于10元的产品不能设置折扣")
        return self


class ProductResponse(BaseModel):
    """产品响应模型"""
    id: int
    name: str
    price: float
    category: ProductCategory
    tags: List[str]
    final_price: float  # 计算字段

    model_config = {"from_attributes": True}

Pydantic v2 vs v1 关键变化

变化 v1 v2
字段验证器 @validator("field") @field_validator("field")
根验证器 @root_validator @model_validator(mode="after")
ORM 模式 class Config: orm_mode = True model_config = {"from_attributes": True}
正则 Field(regex=...) Field(pattern=...)

3. 异步 SQLAlchemy 集成

# database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String, Float, Enum as SAEnum, DateTime, func
import enum

DATABASE_URL = "sqlite+aiosqlite:///./products.db"

engine = create_async_engine(DATABASE_URL, echo=False)
async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)


class Base(DeclarativeBase):
    pass


class ProductCategory(str, enum.Enum):
    ELECTRONICS = "electronics"
    CLOTHING = "clothing"
    FOOD = "food"


class Product(Base):
    __tablename__ = "products"

    id: Mapped[int] = mapped_column(primary_key=True, index=True)
    name: Mapped[str] = mapped_column(String(200))
    price: Mapped[float] = mapped_column(Float)
    category: Mapped[ProductCategory] = mapped_column(SAEnum(ProductCategory))
    tags: Mapped[str | None] = mapped_column(String(500), nullable=True)
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now()
    )


# 依赖项 —— 获取数据库会话
async def get_db() -> AsyncSession:
    async with async_session() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()
# main.py - CRUD 路由
@app.post("/products", response_model=ProductResponse, status_code=201)
async def create_product(
    product: ProductCreate,
    db: AsyncSession = Depends(get_db),
):
    db_product = Product(
        name=product.name,
        price=product.price,
        category=product.category,
        tags=",".join(product.tags),
    )
    db.add(db_product)
    await db.flush()
    await db.refresh(db_product)
    return db_product


@app.get("/products", response_model=List[ProductResponse])
async def list_products(
    skip: int = 0,
    limit: int = 20,
    category: Optional[ProductCategory] = None,
    db: AsyncSession = Depends(get_db),
):
    from sqlalchemy import select
    stmt = select(Product).offset(skip).limit(limit)
    if category:
        stmt = stmt.where(Product.category == category)
    result = await db.execute(stmt)
    return result.scalars().all()

4. FastAPI 与 Flask/Django 对比

维度 FastAPI Flask Django
类型驱动 ✅ 原生 ❌(需 DRF)
异步原生 ✅ ASGI ⚠️ 2.0+ 部分支持 ⚠️ 3.1+ 部分支持
自动文档 ✅ OpenAPI 内置 ❌ 需插件 ❌ 需 DRF + drf-spectacular
数据验证 ✅ Pydantic ❌ 需手动 ✅ DRF Serializer
性能 ⭐⭐⭐⭐⭐ ⭐⭐⭐ ⭐⭐⭐
生态成熟度 ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐
学习曲线
适合场景 API/微服务/ML 小型应用/原型 全栈大型应用

5. 项目结构最佳实践

fastapi_project/
├── app/
│   ├── __init__.py
│   ├── main.py              # FastAPI 实例创建 + 路由注册
│   ├── core/
│   │   ├── config.py        # 配置(Pydantic Settings)
│   │   └── security.py      # 认证/密码工具
│   ├── models/              # SQLAlchemy 模型
│   ├── schemas/             # Pydantic 请求/响应模型
│   ├── api/
│   │   ├── __init__.py
│   │   ├── deps.py          # 共享依赖项
│   │   └── v1/
│   │       ├── __init__.py
│   │       ├── router.py    # 聚合子路由
│   │       └── endpoints/
│   │           ├── users.py
│   │           └── products.py
│   └── services/            # 业务逻辑层
├── alembic/                 # 数据库迁移
├── tests/
├── requirements.txt
└── .env

思考题

  1. FastAPI 的异步是否意味着所有代码都必须是 async def?什么场景同步函数也能获益?
  2. Pydantic v2 的 model_validatorfield_validator 分别何时使用?
  3. FastAPI 的依赖注入与类的构造函数注入有何异同?为什么它更适合 Web 场景?
  4. 将 Flask 项目迁移到 FastAPI 时,哪些部分最需要重写?

FastAPI 进阶实战 —— WebSocket、后台任务与部署

本章目标

  • 实现 WebSocket 实时通信
  • 使用 BackgroundTasks 处理异步后处理
  • FastAPI + Celery 重型任务
  • Docker + Nginx + Uvicorn 生产部署

1. WebSocket 实时通信

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List, Dict
import json

app = FastAPI()


# 连接管理器
class ConnectionManager:
    def __init__(self):
        self.active_connections: Dict[str, List[WebSocket]] = {}

    async def connect(self, room: str, websocket: WebSocket):
        await websocket.accept()
        if room not in self.active_connections:
            self.active_connections[room] = []
        self.active_connections[room].append(websocket)
        await self.broadcast(room, {"type": "system", "message": f"用户加入房间 {room}"})

    def disconnect(self, room: str, websocket: WebSocket):
        if room in self.active_connections:
            self.active_connections[room].remove(websocket)
            if not self.active_connections[room]:
                del self.active_connections[room]

    async def broadcast(self, room: str, message: dict):
        if room in self.active_connections:
            for connection in self.active_connections[room]:
                await connection.send_json(message)

    async def send_personal(self, message: dict, websocket: WebSocket):
        await websocket.send_json(message)


manager = ConnectionManager()


@app.get("/")
async def get_chat_page():
    """返回简易聊天页面 HTML"""
    return """
    <!DOCTYPE html>
    <html>
    <head><title>FastAPI Chat</title></head>
    <body>
        <h2>💬 实时聊天</h2>
        <input id="room" placeholder="房间名" value="general"><br>
        <input id="name" placeholder="你的昵称" value="Vibe"><br>
        <textarea id="messages" rows="15" cols="50" readonly></textarea><br>
        <input id="input" placeholder="输入消息..." size="40">
        <button onclick="send()">发送</button>
        <script>
            let ws;
            function connect() {
                const room = document.getElementById('room').value;
                ws = new WebSocket(`ws://localhost:8000/ws/${room}?name=${document.getElementById('name').value}`);
                ws.onmessage = (e) => {
                    const data = JSON.parse(e.data);
                    document.getElementById('messages').value +=
                        `[${data.type}] ${data.name || ''}: ${data.message}\\n`;
                };
                ws.onclose = () => document.getElementById('messages').value += '❌ 连接断开\\n';
            }
            function send() {
                const msg = document.getElementById('input').value;
                ws.send(msg);
                document.getElementById('input').value = '';
            }
            connect();
        </script>
    </body>
    </html>
    """


@app.websocket("/ws/{room}")
async def websocket_endpoint(
    websocket: WebSocket,
    room: str,
    name: str = "Anonymous",
):
    await manager.connect(room, websocket)
    try:
        while True:
            data = await websocket.receive_text()
            await manager.broadcast(room, {
                "type": "chat",
                "name": name,
                "message": data,
            })
    except WebSocketDisconnect:
        manager.disconnect(room, websocket)
        await manager.broadcast(room, {
            "type": "system",
            "message": f"{name} 离开了房间",
        })

2. BackgroundTasks —— 轻量后处理

from fastapi import BackgroundTasks


def send_email_notification(email: str, message: str):
    """模拟发送邮件(后台执行)"""
    import time
    time.sleep(3)  # 模拟耗时
    print(f"✅ 已发送邮件到 {email}: {message}")


def generate_thumbnail(image_path: str):
    """模拟生成缩略图"""
    import time
    time.sleep(2)
    print(f"✅ 缩略图已生成: {image_path}")


@app.post("/register")
async def register(email: str, background_tasks: BackgroundTasks):
    # 注册逻辑...
    background_tasks.add_task(send_email_notification, email, "欢迎注册!")
    return {"message": "注册成功,欢迎邮件将在后台发送"}


@app.post("/upload-image")
async def upload_image(background_tasks: BackgroundTasks):
    # 文件保存逻辑...
    background_tasks.add_task(generate_thumbnail, "/uploads/photo.jpg")
    return {"message": "上传成功,缩略图正在后台生成"}

BackgroundTasks vs Celery

维度 BackgroundTasks Celery
复杂度 零配置 需 Redis/RabbitMQ
可靠性 进程重启丢失 持久化到队列
重试机制 ❌ 无 ✅ 内置重试
进度追踪 ❌ 无 ✅ AsyncResult
适用场景 轻量后处理 重型长时间任务

3. Docker 生产部署

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update &;& apt-get install -y --no-install-recommends \
    curl \
    &;& rm -rf /var/lib/apt/lists/*

# 安装 Python 依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple

COPY . .

EXPOSE 8000

# 使用 Uvicorn 多 worker
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
# docker-compose.yml
version: "3.8"

services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql+asyncpg://user:pass@db:5432/fastapi
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      db:
        condition: service_healthy
      redis:
        condition: service_started

  db:
    image: postgres:15-alpine
    environment:
      POSTGRES_USER: user
      POSTGRES_PASSWORD: pass
      POSTGRES_DB: fastapi
    volumes:
      - pgdata:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U user -d fastapi"]
      interval: 5s
      timeout: 5s
      retries: 5

  redis:
    image: redis:7-alpine

  celery:
    build: .
    command: celery -A app.tasks worker --loglevel=info
    environment:
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - redis

volumes:
  pgdata:

4. Nginx 反向代理配置

upstream fastapi {
    server 127.0.0.1:8000;
}

server {
    listen 443 ssl http2;
    server_name api.example.com;

    # ... SSL 配置 ...

    location / {
        proxy_pass http://fastapi;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;

        # WebSocket 支持
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
    }
}

5. 健康检查与优雅关闭

from contextlib import asynccontextmanager
from fastapi import FastAPI


@asynccontextmanager
async def lifespan(app: FastAPI):
    """应用启动/关闭时的处理"""
    # 启动:初始化连接池
    print("🚀 应用启动中...")
    # await init_db_pool()
    yield
    # 关闭:释放资源
    print("👋 应用正在关闭...")
    # await close_db_pool()


app = FastAPI(lifespan=lifespan)


@app.get("/health")
async def health():
    """Kubernetes / Docker 健康检查端点"""
    return {
        "status": "healthy",
        "version": "2.0.0",
    }

思考题

  1. WebSocket 与 HTTP/2 Server-Sent Events (SSE) 有何异同?什么场景选哪个?
  2. BackgroundTasks 在 uvicorn worker 重启时会丢失,如何保障任务可靠性?
  3. FastAPI + Uvicorn 的多 worker 模式下 WebSocket 有什么限制?如何解决?
  4. lifespan vs 全局 @app.on_event("startup") 有何区别?FastAPI 推荐哪种?

参考资料

  1. [1] Sebastián Ramírez. FastAPI 官方文档. 2024.
  2. [2] François Voron. Building Data Science Applications with FastAPI. 2021.
  3. [3] Bill Lubanovic. FastAPI: Modern Python Web Development. 2023.
  4. [4] Pydantic Team. Pydantic v2 官方文档. 2024.