02-进阶实战-Celery异步任务与部署

知识库
知识库文档
/tech-stacks/flask/tutorial/02-进阶实战-Celery异步任务与部署.md

文档

Flask 进阶实战 —— 异步任务、Docker 部署与性能优化

本章目标

  • 掌握 Celery + Redis 异步任务队列
  • 使用 Docker 容器化 Flask 应用
  • 了解 Gunicorn 生产部署
  • 常见性能优化策略

1. Celery 异步任务

1.1 为什么需要异步任务?

Web 请求应该在毫秒级完成。以下场景必须异步处理:

  • 发送邮件验证码(可能耗时 2~5 秒)
  • 生成报表/PDF(可能耗时 10 秒+)
  • 图像/视频处理
  • 调用第三方 API(超时风险)

1.2 架构概览

用户请求 → Flask (快速返回 202 Accepted)
                │
                ↓
         Redis/RabbitMQ (消息队列/代理)
                │
                ↓
         Celery Worker (后台异步执行)
                │
                ↓
         Redis/DB (结果存储)
                │
                ↓
         用户轮询/WebSocket 获取结果

1.3 配置与使用

# tasks.py
from celery import Celery

celery = Celery(
    "my_flask_tasks",
    broker="redis://localhost:6379/0",       # 消息代理
    backend="redis://localhost:6379/1",      # 结果后端
)

# 配置序列化
celery.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_expires=3600,  # 结果过期时间(秒)
)


@celery.task(bind=True, max_retries=3)
def send_welcome_email(self, user_email, username):
    """发送欢迎邮件(带重试机制)"""
    try:
        # 模拟发送邮件
        import smtplib
        import time
        time.sleep(2)  # 模拟耗时操作
        print(f"✅ 已发送欢迎邮件至 {user_email}")
        return {"status": "ok", "email": user_email}
    except Exception as exc:
        print(f"❌ 发送失败,将重试: {exc}")
        raise self.retry(exc=exc, countdown=60)  # 60秒后重试


@celery.task
def generate_report(report_id):
    """生成报表——耗时任务"""
    import time
    time.sleep(5)  # 模拟复杂计算
    # ... 实际报表生成逻辑
    return {"report_id": report_id, "status": "done"}
# app/routes.py 中使用异步任务
from tasks import send_welcome_email, generate_report

@main_bp.route("/register", methods=["POST"])
def register():
    username = request.form["username"]
    email = request.form["email"]

    # 创建用户... (省略)
    
    # 异步发送欢迎邮件
    task = send_welcome_email.delay(email, username)
    
    return jsonify({
        "message": "注册成功,欢迎邮件正在发送中",
        "task_id": task.id,
    }), 202


@main_bp.route("/task/<task_id>")
def get_task_status(task_id):
    """查询异步任务状态"""
    from celery.result import AsyncResult
    result = AsyncResult(task_id, app=celery)
    
    if result.state == "PENDING":
        response = {"state": "PENDING", "progress": "等待执行..."}
    elif result.state == "SUCCESS":
        response = {"state": "SUCCESS", "result": result.get()}
    elif result.state == "FAILURE":
        response = {"state": "FAILURE", "error": str(result.info)}
    else:
        response = {"state": result.state}
    
    return jsonify(response)

1.4 启动命令

# 需要先安装并启动 Redis
sudo apt install redis-server  # Linux
brew install redis              # macOS

# 启动 Celery Worker
celery -A tasks.celery worker --loglevel=info --concurrency=4

# 启动 Celery Beat(定时任务调度,可选)
celery -A tasks.celery beat --loglevel=info

# 同时启动 Worker 和 Beat
celery -A tasks.celery worker -B --loglevel=info

2. Docker 容器化部署

# Dockerfile
FROM python:3.11-slim

# 设置工作目录
WORKDIR /app

# 安装系统依赖
RUN apt-get update &;& apt-get install -y --no-install-recommends \
    gcc \
    &;& rm -rf /var/lib/apt/lists/*

# 复制并安装 Python 依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple

# 复制应用代码
COPY . .

# 创建非 root 用户
RUN useradd -m flaskuser &;& chown -R flaskuser:flaskuser /app
USER flaskuser

# 暴露端口
EXPOSE 8000

# 生产环境使用 Gunicorn 启动
CMD ["gunicorn", "-w", "4", "-b", "0.0.0.0:8000", "run:app"]
# docker-compose.yml
version: "3.8"

services:
  web:
    build: .
    ports:
      - "8000:8000"
    environment:
      - FLASK_ENV=production
      - DATABASE_URL=postgresql://user:pass@db:5432/flaskdb
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - db
      - redis

  db:
    image: postgres:15-alpine
    environment:
      POSTGRES_USER: user
      POSTGRES_PASSWORD: pass
      POSTGRES_DB: flaskdb
    volumes:
      - postgres_data:/var/lib/postgresql/data

  redis:
    image: redis:7-alpine

  celery:
    build: .
    command: celery -A tasks.celery worker --loglevel=info
    environment:
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - redis

volumes:
  postgres_data:

3. Gunicorn 生产部署

# 安装
pip install gunicorn

# 启动(4 个 worker 进程)
gunicorn -w 4 -b 0.0.0.0:8000 "run:create_app()"

# 带参数详解
gunicorn \
  -w 4 \                    # worker 数量:通常 CPU 核数 × 2 + 1
  -b 0.0.0.0:8000 \         # 绑定地址
  -k gevent \               # Worker 类型:gevent(异步IO,适合IO密集)
  --max-requests 10000 \    # 每个 worker 处理 10000 请求后重启(防内存泄漏)
  --max-requests-jitter 500 \  # 随机抖动避免同时重启
  --access-logfile - \      # 访问日志输出到 stdout
  --error-logfile - \       # 错误日志输出到 stderr
  --log-level info \
  "run:create_app()"

Worker 类型选择

Worker 类型 适用场景
sync(默认) CPU 密集型、低并发
gevent IO 密集型、高并发(数据库查询、API 调用)
gthread 多线程,共享内存,适合中等并发
uvicorn.workers.UvicornWorker ASGI,FastAPI 部署(Flask 2.0+ 支持 async)

4. 性能优化清单

4.1 数据库层面

# 使用连接池(Flask-SQLAlchemy 默认已具备)
app.config["SQLALCHEMY_ENGINE_OPTIONS"] = {
    "pool_size": 10,           # 连接池大小
    "pool_recycle": 3600,      # 连接回收时间
    "pool_pre_ping": True,     # 使用前检查连接有效性
}

# N+1 查询优化 —— 使用 joinedload
# ❌ 坏:每条 Post 触发一次 User 查询
posts = Post.query.all()
# ✅ 好:一次 JOIN 查询全部加载
from sqlalchemy.orm import joinedload
posts = Post.query.options(joinedload(Post.author)).all()

4.2 缓存策略

# Flask-Caching
from flask_caching import Cache
cache = Cache(config={"CACHE_TYPE": "RedisCache", "CACHE_REDIS_URL": "redis://localhost:6379/2"})

@main_bp.route("/hot-posts")
@cache.cached(timeout=300)  # 缓存 5 分钟
def hot_posts():
    return jsonify(get_expensive_data())

4.3 静态文件优化

# Nginx 配置:让 Nginx 直接服务静态文件,不经过 Flask
location /static/ {
    alias /app/static/;
    expires 30d;          # 设置缓存
    add_header Cache-Control "public, immutable";
}

location / {
    proxy_pass http://127.0.0.1:8000;  # 动态请求代理到 Gunicorn
}

思考题

  1. Celery 的 brokerbackend 各起什么作用?可以用同一个 Redis 实例吗?
  2. 为什么生产环境不推荐用 flask run,而要用 Gunicorn?
  3. Docker Compose 中 depends_on 是否保证服务已就绪?如何实现真正的启动顺序控制?
  4. 什么场景适合用 gevent worker 而非默认 sync worker?

信息

路径
/tech-stacks/flask/tutorial/02-进阶实战-Celery异步任务与部署.md
更新时间
2026/5/30