文档
Flask 进阶实战 —— 异步任务、Docker 部署与性能优化
本章目标
- 掌握 Celery + Redis 异步任务队列
- 使用 Docker 容器化 Flask 应用
- 了解 Gunicorn 生产部署
- 常见性能优化策略
1. Celery 异步任务
1.1 为什么需要异步任务?
Web 请求应该在毫秒级完成。以下场景必须异步处理:
- 发送邮件验证码(可能耗时 2~5 秒)
- 生成报表/PDF(可能耗时 10 秒+)
- 图像/视频处理
- 调用第三方 API(超时风险)
1.2 架构概览
用户请求 → Flask (快速返回 202 Accepted)
│
↓
Redis/RabbitMQ (消息队列/代理)
│
↓
Celery Worker (后台异步执行)
│
↓
Redis/DB (结果存储)
│
↓
用户轮询/WebSocket 获取结果
1.3 配置与使用
# tasks.py
from celery import Celery
celery = Celery(
"my_flask_tasks",
broker="redis://localhost:6379/0", # 消息代理
backend="redis://localhost:6379/1", # 结果后端
)
# 配置序列化
celery.conf.update(
task_serializer="json",
accept_content=["json"],
result_expires=3600, # 结果过期时间(秒)
)
@celery.task(bind=True, max_retries=3)
def send_welcome_email(self, user_email, username):
"""发送欢迎邮件(带重试机制)"""
try:
# 模拟发送邮件
import smtplib
import time
time.sleep(2) # 模拟耗时操作
print(f"✅ 已发送欢迎邮件至 {user_email}")
return {"status": "ok", "email": user_email}
except Exception as exc:
print(f"❌ 发送失败,将重试: {exc}")
raise self.retry(exc=exc, countdown=60) # 60秒后重试
@celery.task
def generate_report(report_id):
"""生成报表——耗时任务"""
import time
time.sleep(5) # 模拟复杂计算
# ... 实际报表生成逻辑
return {"report_id": report_id, "status": "done"}
# app/routes.py 中使用异步任务
from tasks import send_welcome_email, generate_report
@main_bp.route("/register", methods=["POST"])
def register():
username = request.form["username"]
email = request.form["email"]
# 创建用户... (省略)
# 异步发送欢迎邮件
task = send_welcome_email.delay(email, username)
return jsonify({
"message": "注册成功,欢迎邮件正在发送中",
"task_id": task.id,
}), 202
@main_bp.route("/task/<task_id>")
def get_task_status(task_id):
"""查询异步任务状态"""
from celery.result import AsyncResult
result = AsyncResult(task_id, app=celery)
if result.state == "PENDING":
response = {"state": "PENDING", "progress": "等待执行..."}
elif result.state == "SUCCESS":
response = {"state": "SUCCESS", "result": result.get()}
elif result.state == "FAILURE":
response = {"state": "FAILURE", "error": str(result.info)}
else:
response = {"state": result.state}
return jsonify(response)
1.4 启动命令
# 需要先安装并启动 Redis
sudo apt install redis-server # Linux
brew install redis # macOS
# 启动 Celery Worker
celery -A tasks.celery worker --loglevel=info --concurrency=4
# 启动 Celery Beat(定时任务调度,可选)
celery -A tasks.celery beat --loglevel=info
# 同时启动 Worker 和 Beat
celery -A tasks.celery worker -B --loglevel=info
2. Docker 容器化部署
# Dockerfile
FROM python:3.11-slim
# 设置工作目录
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc \
&& rm -rf /var/lib/apt/lists/*
# 复制并安装 Python 依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
# 复制应用代码
COPY . .
# 创建非 root 用户
RUN useradd -m flaskuser && chown -R flaskuser:flaskuser /app
USER flaskuser
# 暴露端口
EXPOSE 8000
# 生产环境使用 Gunicorn 启动
CMD ["gunicorn", "-w", "4", "-b", "0.0.0.0:8000", "run:app"]
# docker-compose.yml
version: "3.8"
services:
web:
build: .
ports:
- "8000:8000"
environment:
- FLASK_ENV=production
- DATABASE_URL=postgresql://user:pass@db:5432/flaskdb
- REDIS_URL=redis://redis:6379/0
depends_on:
- db
- redis
db:
image: postgres:15-alpine
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: flaskdb
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
celery:
build: .
command: celery -A tasks.celery worker --loglevel=info
environment:
- REDIS_URL=redis://redis:6379/0
depends_on:
- redis
volumes:
postgres_data:
3. Gunicorn 生产部署
# 安装
pip install gunicorn
# 启动(4 个 worker 进程)
gunicorn -w 4 -b 0.0.0.0:8000 "run:create_app()"
# 带参数详解
gunicorn \
-w 4 \ # worker 数量:通常 CPU 核数 × 2 + 1
-b 0.0.0.0:8000 \ # 绑定地址
-k gevent \ # Worker 类型:gevent(异步IO,适合IO密集)
--max-requests 10000 \ # 每个 worker 处理 10000 请求后重启(防内存泄漏)
--max-requests-jitter 500 \ # 随机抖动避免同时重启
--access-logfile - \ # 访问日志输出到 stdout
--error-logfile - \ # 错误日志输出到 stderr
--log-level info \
"run:create_app()"
Worker 类型选择
| Worker 类型 | 适用场景 |
|---|---|
sync(默认) |
CPU 密集型、低并发 |
gevent |
IO 密集型、高并发(数据库查询、API 调用) |
gthread |
多线程,共享内存,适合中等并发 |
uvicorn.workers.UvicornWorker |
ASGI,FastAPI 部署(Flask 2.0+ 支持 async) |
4. 性能优化清单
4.1 数据库层面
# 使用连接池(Flask-SQLAlchemy 默认已具备)
app.config["SQLALCHEMY_ENGINE_OPTIONS"] = {
"pool_size": 10, # 连接池大小
"pool_recycle": 3600, # 连接回收时间
"pool_pre_ping": True, # 使用前检查连接有效性
}
# N+1 查询优化 —— 使用 joinedload
# ❌ 坏:每条 Post 触发一次 User 查询
posts = Post.query.all()
# ✅ 好:一次 JOIN 查询全部加载
from sqlalchemy.orm import joinedload
posts = Post.query.options(joinedload(Post.author)).all()
4.2 缓存策略
# Flask-Caching
from flask_caching import Cache
cache = Cache(config={"CACHE_TYPE": "RedisCache", "CACHE_REDIS_URL": "redis://localhost:6379/2"})
@main_bp.route("/hot-posts")
@cache.cached(timeout=300) # 缓存 5 分钟
def hot_posts():
return jsonify(get_expensive_data())
4.3 静态文件优化
# Nginx 配置:让 Nginx 直接服务静态文件,不经过 Flask
location /static/ {
alias /app/static/;
expires 30d; # 设置缓存
add_header Cache-Control "public, immutable";
}
location / {
proxy_pass http://127.0.0.1:8000; # 动态请求代理到 Gunicorn
}
思考题
- Celery 的
broker和backend各起什么作用?可以用同一个 Redis 实例吗? - 为什么生产环境不推荐用
flask run,而要用 Gunicorn? - Docker Compose 中
depends_on是否保证服务已就绪?如何实现真正的启动顺序控制? - 什么场景适合用
geventworker 而非默认syncworker?