Celery 分布式任务队列实战

知识库
知识库文档
/tech-stacks/celery/tutorial/Celery 分布式任务队列实战.md

文档

Celery 分布式任务队列实战

背景

在 Web 应用中,HTTP 请求-响应周期应该是毫秒级的。但如果用户注册后需要发邮件、生成 PDF 报告、处理图像——这些操作可能耗时数秒甚至数分钟。Celery 将这些「太重」的操作从请求线程中剥离到后台 Worker 上执行。


第 1 章:Celery 架构速览

[Web App] --delay()--> [Broker (Redis)] --pull--> [Worker] --> [Result Backend]
                                                      |
                                                   [Flower 监控]
  • Producer:你的 Django/Flask/FastAPI 应用
  • Broker:消息中间件,Redis / RabbitMQ
  • Worker:实际执行任务的进程
  • Result Backend:存储任务结果(可选)

第 2 章:任务编排原语

from celery import chain, group, chord, signature

# chain — 串行执行
chain(add.s(1, 2) | mul.s(3) | log_result.s())()

# group — 并行执行
group(send_email.s(user) for user in users)()

# chord — 并行 + 回调
chord(
    [process_image.s(img) for img in images],
    merge_results.s()
)()

# 可配合使用
workflow = chord(
    group([task_a.s(i) for i in range(10)]),
    final_callback.s()
)

第 3 章:定时任务(Celery Beat)

from celery.schedules import crontab
from celery_app import app

app.conf.beat_schedule = {
    "cleanup-every-midnight": {
        "task": "tasks.clean_expired_sessions",
        "schedule": crontab(hour=0, minute=0),
    },
    "send-weekly-report": {
        "task": "tasks.send_weekly_digest",
        "schedule": crontab(hour=9, minute=0, day_of_week=1),
    },
    "refresh-cache-every-5min": {
        "task": "tasks.refresh_hot_posts",
        "schedule": 300,  # 秒
    },
}

启动 Beat:

celery -A celery_app beat --loglevel=info

第 4 章:与 Django 集成

# settings.py
CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "redis://localhost:6379/0"
CELERY_TIMEZONE = "Asia/Shanghai"

# celery.py (项目根目录)
import os
from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")
app = Celery("mysite")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()  # 自动发现 tasks.py

# 在 Django model 的 save() 中触发
# user.save()
# send_welcome_email.delay(user.email, user.username)

第 5 章:监控与运维

# 启动 Flower 监控面板
celery -A celery_app flower --port=5555
# 访问 http://localhost:5555

# 查看活跃任务
celery -A celery_app inspect active

# 查看已注册任务列表
celery -A celery_app inspect registered

# 清空队列
celery -A celery_app purge

思考题

  1. Celery 任务中什么操作应该避免?(提示:数据库连接、文件描述符)
  2. 如何保证 Celery 任务的幂等性?
  3. 当 Worker 宕机时,已从 broker 取走但未完成的任务会丢失吗?acks_late 是什么?

信息

路径
/tech-stacks/celery/tutorial/Celery 分布式任务队列实战.md
更新时间
2026/5/30