Celery

技术栈
工具链
celerypython异步任务消息队列分布式定时任务

概览

Celery

Celery 是 Python 最流行的分布式任务队列框架,由 Ask Solem 于 2009 年创建。它让开发者可以将耗时操作从 Web 请求中剥离出来异步执行。

解决什么问题

  • 将邮件发送、图像处理、报告生成等耗时任务从请求线程中解耦
  • 实现定时任务(类似 cron)和周期性任务
  • 分布式执行任务,轻松横向扩展 Worker

关键特性

  • 支持 RabbitMQ、Redis、Amazon SQS 等多种消息代理
  • 内置 Flower 监控面板
  • Celery Beat 实现定时调度
  • 任务链(chain)、分组(group)、和弦(chord)等编排原语
  • 自动重试机制和幂等性保障

安装

环境准备

  • 操作系统: Linux / macOS / Windows(Windows 建议 WSL2)
  • Python 版本: 3.8 及以上(推荐 3.10+)
  • 消息代理: Redis(推荐)或 RabbitMQ
  • 结果后端(可选): Redis / PostgreSQL / 等

安装命令

# 基础安装
pip install celery

# 安装 Celery + Redis 全套
pip install celery redis

# 安装监控面板 Flower
pip install celery flower

# 清华镜像
pip install celery -i https://pypi.tuna.tsinghua.edu.cn/simple

启动 Redis(Docker 方式,最简单)

docker run -d --name redis-celery -p 6379:6379 redis:7-alpine

常见安装问题

Q: Windows 上 Celery 4.x 运行报错
Windows 不支持 fork,需使用 --pool=solo

celery -A app worker --pool=solo --loglevel=info

或升级到 Celery 5.x + WSL2。

Q: Redis 连接失败 Error 111
确认 Redis 已启动:redis-cli ping 应返回 PONG

Q: DLL load failed on Windows(pycurl 相关)

# 不安装默认的 pycurl 监控依赖
pip install celery[redis]

Q: 导入模块时报 ModuleNotFoundError
确认项目结构和 PYTHONPATH

export PYTHONPATH="${PYTHONPATH}:$(pwd)"

示例

Celery Hello World:异步发送邮件

目标

将耗时的邮件发送操作从 Web 请求中异步化,用户无需等待邮件发送完成就能收到 HTTP 响应。

完整代码

# celery_app.py — Celery 配置
from celery import Celery

app = Celery(
    "tasks",
    broker="redis://localhost:6379/0",      # 消息代理
    backend="redis://localhost:6379/0",     # 结果后端(可选)
)

app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="Asia/Shanghai",
    enable_utc=True,
)


# tasks.py — 任务定义
import time
from celery_app import app

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def send_welcome_email(self, user_email: str, username: str):
    """模拟发送欢迎邮件(耗时操作)"""
    try:
        print(f"[开始] 发送邮件到 {user_email}")
        time.sleep(3)  # 模拟网络 IO
        print(f"[完成] {username} 的欢迎邮件已发送至 {user_email}")
        return f"ok: {user_email}"
    except Exception as exc:
        print(f"[失败] 重试中... ({self.request.retries + 1}/3)")
        raise self.retry(exc=exc)


@app.task
def generate_report(report_id: int):
    """模拟生成报表(CPU 密集型)"""
    print(f"[开始] 生成报表 #{report_id}")
    time.sleep(5)
    print(f"[完成] 报表 #{report_id} 已生成")
    return f"report_{report_id}.pdf"
# main.py — 调用任务
from tasks import send_welcome_email, generate_report

# 异步调用 — 立即返回 AsyncResult,不阻塞
result = send_welcome_email.delay("user@example.com", "张三")
print(f"任务 ID: {result.id}")
print(f"任务状态: {result.status}")  # PENDING
print("邮件任务已提交,用户可以继续浏览页面!")

# 批量异步任务
report_result = generate_report.delay(42)
print(f"报表任务 ID: {report_result.id}")

启动 Worker

# 终端 1:启动 Redis
docker run -d --name redis -p 6379:6379 redis:7-alpine

# 终端 2:启动 Worker
celery -A celery_app worker --loglevel=info

# 终端 3:运行任务
python main.py

预期输出(Worker 终端)

[开始] 发送邮件到 user@example.com
[开始] 生成报表 #42
[完成] 张三 的欢迎邮件已发送至 user@example.com
[完成] 报表 #42 已生成

教程

Celery 分布式任务队列实战

背景

在 Web 应用中,HTTP 请求-响应周期应该是毫秒级的。但如果用户注册后需要发邮件、生成 PDF 报告、处理图像——这些操作可能耗时数秒甚至数分钟。Celery 将这些「太重」的操作从请求线程中剥离到后台 Worker 上执行。


第 1 章:Celery 架构速览

[Web App] --delay()--> [Broker (Redis)] --pull--> [Worker] --> [Result Backend]
                                                      |
                                                   [Flower 监控]
  • Producer:你的 Django/Flask/FastAPI 应用
  • Broker:消息中间件,Redis / RabbitMQ
  • Worker:实际执行任务的进程
  • Result Backend:存储任务结果(可选)

第 2 章:任务编排原语

from celery import chain, group, chord, signature

# chain — 串行执行
chain(add.s(1, 2) | mul.s(3) | log_result.s())()

# group — 并行执行
group(send_email.s(user) for user in users)()

# chord — 并行 + 回调
chord(
    [process_image.s(img) for img in images],
    merge_results.s()
)()

# 可配合使用
workflow = chord(
    group([task_a.s(i) for i in range(10)]),
    final_callback.s()
)

第 3 章:定时任务(Celery Beat)

from celery.schedules import crontab
from celery_app import app

app.conf.beat_schedule = {
    "cleanup-every-midnight": {
        "task": "tasks.clean_expired_sessions",
        "schedule": crontab(hour=0, minute=0),
    },
    "send-weekly-report": {
        "task": "tasks.send_weekly_digest",
        "schedule": crontab(hour=9, minute=0, day_of_week=1),
    },
    "refresh-cache-every-5min": {
        "task": "tasks.refresh_hot_posts",
        "schedule": 300,  # 秒
    },
}

启动 Beat:

celery -A celery_app beat --loglevel=info

第 4 章:与 Django 集成

# settings.py
CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "redis://localhost:6379/0"
CELERY_TIMEZONE = "Asia/Shanghai"

# celery.py (项目根目录)
import os
from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")
app = Celery("mysite")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()  # 自动发现 tasks.py

# 在 Django model 的 save() 中触发
# user.save()
# send_welcome_email.delay(user.email, user.username)

第 5 章:监控与运维

# 启动 Flower 监控面板
celery -A celery_app flower --port=5555
# 访问 http://localhost:5555

# 查看活跃任务
celery -A celery_app inspect active

# 查看已注册任务列表
celery -A celery_app inspect registered

# 清空队列
celery -A celery_app purge

思考题

  1. Celery 任务中什么操作应该避免?(提示:数据库连接、文件描述符)
  2. 如何保证 Celery 任务的幂等性?
  3. 当 Worker 宕机时,已从 broker 取走但未完成的任务会丢失吗?acks_late 是什么?