02-进阶实战-WebSocket-后台任务-部署

知识库
知识库文档
/tech-stacks/fastapi/tutorial/02-进阶实战-WebSocket-后台任务-部署.md

文档

FastAPI 进阶实战 —— WebSocket、后台任务与部署

本章目标

  • 实现 WebSocket 实时通信
  • 使用 BackgroundTasks 处理异步后处理
  • FastAPI + Celery 重型任务
  • Docker + Nginx + Uvicorn 生产部署

1. WebSocket 实时通信

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List, Dict
import json

app = FastAPI()


# 连接管理器
class ConnectionManager:
    def __init__(self):
        self.active_connections: Dict[str, List[WebSocket]] = {}

    async def connect(self, room: str, websocket: WebSocket):
        await websocket.accept()
        if room not in self.active_connections:
            self.active_connections[room] = []
        self.active_connections[room].append(websocket)
        await self.broadcast(room, {"type": "system", "message": f"用户加入房间 {room}"})

    def disconnect(self, room: str, websocket: WebSocket):
        if room in self.active_connections:
            self.active_connections[room].remove(websocket)
            if not self.active_connections[room]:
                del self.active_connections[room]

    async def broadcast(self, room: str, message: dict):
        if room in self.active_connections:
            for connection in self.active_connections[room]:
                await connection.send_json(message)

    async def send_personal(self, message: dict, websocket: WebSocket):
        await websocket.send_json(message)


manager = ConnectionManager()


@app.get("/")
async def get_chat_page():
    """返回简易聊天页面 HTML"""
    return """
    <!DOCTYPE html>
    <html>
    <head><title>FastAPI Chat</title></head>
    <body>
        <h2>💬 实时聊天</h2>
        <input id="room" placeholder="房间名" value="general"><br>
        <input id="name" placeholder="你的昵称" value="Vibe"><br>
        <textarea id="messages" rows="15" cols="50" readonly></textarea><br>
        <input id="input" placeholder="输入消息..." size="40">
        <button onclick="send()">发送</button>
        <script>
            let ws;
            function connect() {
                const room = document.getElementById('room').value;
                ws = new WebSocket(`ws://localhost:8000/ws/${room}?name=${document.getElementById('name').value}`);
                ws.onmessage = (e) => {
                    const data = JSON.parse(e.data);
                    document.getElementById('messages').value +=
                        `[${data.type}] ${data.name || ''}: ${data.message}\\n`;
                };
                ws.onclose = () => document.getElementById('messages').value += '❌ 连接断开\\n';
            }
            function send() {
                const msg = document.getElementById('input').value;
                ws.send(msg);
                document.getElementById('input').value = '';
            }
            connect();
        </script>
    </body>
    </html>
    """


@app.websocket("/ws/{room}")
async def websocket_endpoint(
    websocket: WebSocket,
    room: str,
    name: str = "Anonymous",
):
    await manager.connect(room, websocket)
    try:
        while True:
            data = await websocket.receive_text()
            await manager.broadcast(room, {
                "type": "chat",
                "name": name,
                "message": data,
            })
    except WebSocketDisconnect:
        manager.disconnect(room, websocket)
        await manager.broadcast(room, {
            "type": "system",
            "message": f"{name} 离开了房间",
        })

2. BackgroundTasks —— 轻量后处理

from fastapi import BackgroundTasks


def send_email_notification(email: str, message: str):
    """模拟发送邮件(后台执行)"""
    import time
    time.sleep(3)  # 模拟耗时
    print(f"✅ 已发送邮件到 {email}: {message}")


def generate_thumbnail(image_path: str):
    """模拟生成缩略图"""
    import time
    time.sleep(2)
    print(f"✅ 缩略图已生成: {image_path}")


@app.post("/register")
async def register(email: str, background_tasks: BackgroundTasks):
    # 注册逻辑...
    background_tasks.add_task(send_email_notification, email, "欢迎注册!")
    return {"message": "注册成功,欢迎邮件将在后台发送"}


@app.post("/upload-image")
async def upload_image(background_tasks: BackgroundTasks):
    # 文件保存逻辑...
    background_tasks.add_task(generate_thumbnail, "/uploads/photo.jpg")
    return {"message": "上传成功,缩略图正在后台生成"}

BackgroundTasks vs Celery

维度 BackgroundTasks Celery
复杂度 零配置 需 Redis/RabbitMQ
可靠性 进程重启丢失 持久化到队列
重试机制 ❌ 无 ✅ 内置重试
进度追踪 ❌ 无 ✅ AsyncResult
适用场景 轻量后处理 重型长时间任务

3. Docker 生产部署

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update &;& apt-get install -y --no-install-recommends \
    curl \
    &;& rm -rf /var/lib/apt/lists/*

# 安装 Python 依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple

COPY . .

EXPOSE 8000

# 使用 Uvicorn 多 worker
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
# docker-compose.yml
version: "3.8"

services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql+asyncpg://user:pass@db:5432/fastapi
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      db:
        condition: service_healthy
      redis:
        condition: service_started

  db:
    image: postgres:15-alpine
    environment:
      POSTGRES_USER: user
      POSTGRES_PASSWORD: pass
      POSTGRES_DB: fastapi
    volumes:
      - pgdata:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U user -d fastapi"]
      interval: 5s
      timeout: 5s
      retries: 5

  redis:
    image: redis:7-alpine

  celery:
    build: .
    command: celery -A app.tasks worker --loglevel=info
    environment:
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - redis

volumes:
  pgdata:

4. Nginx 反向代理配置

upstream fastapi {
    server 127.0.0.1:8000;
}

server {
    listen 443 ssl http2;
    server_name api.example.com;

    # ... SSL 配置 ...

    location / {
        proxy_pass http://fastapi;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;

        # WebSocket 支持
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
    }
}

5. 健康检查与优雅关闭

from contextlib import asynccontextmanager
from fastapi import FastAPI


@asynccontextmanager
async def lifespan(app: FastAPI):
    """应用启动/关闭时的处理"""
    # 启动:初始化连接池
    print("🚀 应用启动中...")
    # await init_db_pool()
    yield
    # 关闭:释放资源
    print("👋 应用正在关闭...")
    # await close_db_pool()


app = FastAPI(lifespan=lifespan)


@app.get("/health")
async def health():
    """Kubernetes / Docker 健康检查端点"""
    return {
        "status": "healthy",
        "version": "2.0.0",
    }

思考题

  1. WebSocket 与 HTTP/2 Server-Sent Events (SSE) 有何异同?什么场景选哪个?
  2. BackgroundTasks 在 uvicorn worker 重启时会丢失,如何保障任务可靠性?
  3. FastAPI + Uvicorn 的多 worker 模式下 WebSocket 有什么限制?如何解决?
  4. lifespan vs 全局 @app.on_event("startup") 有何区别?FastAPI 推荐哪种?

信息

路径
/tech-stacks/fastapi/tutorial/02-进阶实战-WebSocket-后台任务-部署.md
更新时间
2026/5/30