FastAPI 进阶实战 —— WebSocket、后台任务与部署
本章目标
- 实现 WebSocket 实时通信
- 使用 BackgroundTasks 处理异步后处理
- FastAPI + Celery 重型任务
- Docker + Nginx + Uvicorn 生产部署
1. WebSocket 实时通信
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List, Dict
import json
app = FastAPI()
# 连接管理器
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, List[WebSocket]] = {}
async def connect(self, room: str, websocket: WebSocket):
await websocket.accept()
if room not in self.active_connections:
self.active_connections[room] = []
self.active_connections[room].append(websocket)
await self.broadcast(room, {"type": "system", "message": f"用户加入房间 {room}"})
def disconnect(self, room: str, websocket: WebSocket):
if room in self.active_connections:
self.active_connections[room].remove(websocket)
if not self.active_connections[room]:
del self.active_connections[room]
async def broadcast(self, room: str, message: dict):
if room in self.active_connections:
for connection in self.active_connections[room]:
await connection.send_json(message)
async def send_personal(self, message: dict, websocket: WebSocket):
await websocket.send_json(message)
manager = ConnectionManager()
@app.get("/")
async def get_chat_page():
"""返回简易聊天页面 HTML"""
return """
<!DOCTYPE html>
<html>
<head><title>FastAPI Chat</title></head>
<body>
<h2>💬 实时聊天</h2>
<input id="room" placeholder="房间名" value="general"><br>
<input id="name" placeholder="你的昵称" value="Vibe"><br>
<textarea id="messages" rows="15" cols="50" readonly></textarea><br>
<input id="input" placeholder="输入消息..." size="40">
<button onclick="send()">发送</button>
<script>
let ws;
function connect() {
const room = document.getElementById('room').value;
ws = new WebSocket(`ws://localhost:8000/ws/${room}?name=${document.getElementById('name').value}`);
ws.onmessage = (e) => {
const data = JSON.parse(e.data);
document.getElementById('messages').value +=
`[${data.type}] ${data.name || ''}: ${data.message}\\n`;
};
ws.onclose = () => document.getElementById('messages').value += '❌ 连接断开\\n';
}
function send() {
const msg = document.getElementById('input').value;
ws.send(msg);
document.getElementById('input').value = '';
}
connect();
</script>
</body>
</html>
"""
@app.websocket("/ws/{room}")
async def websocket_endpoint(
websocket: WebSocket,
room: str,
name: str = "Anonymous",
):
await manager.connect(room, websocket)
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(room, {
"type": "chat",
"name": name,
"message": data,
})
except WebSocketDisconnect:
manager.disconnect(room, websocket)
await manager.broadcast(room, {
"type": "system",
"message": f"{name} 离开了房间",
})
2. BackgroundTasks —— 轻量后处理
from fastapi import BackgroundTasks
def send_email_notification(email: str, message: str):
"""模拟发送邮件(后台执行)"""
import time
time.sleep(3) # 模拟耗时
print(f"✅ 已发送邮件到 {email}: {message}")
def generate_thumbnail(image_path: str):
"""模拟生成缩略图"""
import time
time.sleep(2)
print(f"✅ 缩略图已生成: {image_path}")
@app.post("/register")
async def register(email: str, background_tasks: BackgroundTasks):
# 注册逻辑...
background_tasks.add_task(send_email_notification, email, "欢迎注册!")
return {"message": "注册成功,欢迎邮件将在后台发送"}
@app.post("/upload-image")
async def upload_image(background_tasks: BackgroundTasks):
# 文件保存逻辑...
background_tasks.add_task(generate_thumbnail, "/uploads/photo.jpg")
return {"message": "上传成功,缩略图正在后台生成"}
BackgroundTasks vs Celery
| 维度 |
BackgroundTasks |
Celery |
| 复杂度 |
零配置 |
需 Redis/RabbitMQ |
| 可靠性 |
进程重启丢失 |
持久化到队列 |
| 重试机制 |
❌ 无 |
✅ 内置重试 |
| 进度追踪 |
❌ 无 |
✅ AsyncResult |
| 适用场景 |
轻量后处理 |
重型长时间任务 |
3. Docker 生产部署
FROM python:3.11-slim
WORKDIR /app
RUN apt-get update &
curl \
&
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt -i https:
COPY . .
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
version: "3.8"
services:
api:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql+asyncpg:
- REDIS_URL=redis:
depends_on:
db:
condition: service_healthy
redis:
condition: service_started
db:
image: postgres:15-alpine
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: fastapi
volumes:
- pgdata:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U user -d fastapi"]
interval: 5s
timeout: 5s
retries: 5
redis:
image: redis:7-alpine
celery:
build: .
command: celery -A app.tasks worker --loglevel=info
environment:
- REDIS_URL=redis:
depends_on:
- redis
volumes:
pgdata:
4. Nginx 反向代理配置
upstream fastapi {
server 127.0.0.1:8000;
}
server {
listen 443 ssl http2;
server_name api.example.com;
# ... SSL 配置 ...
location / {
proxy_pass http:
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# WebSocket 支持
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
5. 健康检查与优雅关闭
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用启动/关闭时的处理"""
# 启动:初始化连接池
print("🚀 应用启动中...")
# await init_db_pool()
yield
# 关闭:释放资源
print("👋 应用正在关闭...")
# await close_db_pool()
app = FastAPI(lifespan=lifespan)
@app.get("/health")
async def health():
"""Kubernetes / Docker 健康检查端点"""
return {
"status": "healthy",
"version": "2.0.0",
}
思考题
- WebSocket 与 HTTP/2 Server-Sent Events (SSE) 有何异同?什么场景选哪个?
- BackgroundTasks 在 uvicorn worker 重启时会丢失,如何保障任务可靠性?
- FastAPI + Uvicorn 的多 worker 模式下 WebSocket 有什么限制?如何解决?
- lifespan vs 全局
@app.on_event("startup") 有何区别?FastAPI 推荐哪种?