文档
Celery 分布式任务队列实战
背景
在 Web 应用中,HTTP 请求-响应周期应该是毫秒级的。但如果用户注册后需要发邮件、生成 PDF 报告、处理图像——这些操作可能耗时数秒甚至数分钟。Celery 将这些「太重」的操作从请求线程中剥离到后台 Worker 上执行。
第 1 章:Celery 架构速览
[Web App] --delay()--> [Broker (Redis)] --pull--> [Worker] --> [Result Backend]
|
[Flower 监控]
- Producer:你的 Django/Flask/FastAPI 应用
- Broker:消息中间件,Redis / RabbitMQ
- Worker:实际执行任务的进程
- Result Backend:存储任务结果(可选)
第 2 章:任务编排原语
from celery import chain, group, chord, signature
# chain — 串行执行
chain(add.s(1, 2) | mul.s(3) | log_result.s())()
# group — 并行执行
group(send_email.s(user) for user in users)()
# chord — 并行 + 回调
chord(
[process_image.s(img) for img in images],
merge_results.s()
)()
# 可配合使用
workflow = chord(
group([task_a.s(i) for i in range(10)]),
final_callback.s()
)
第 3 章:定时任务(Celery Beat)
from celery.schedules import crontab
from celery_app import app
app.conf.beat_schedule = {
"cleanup-every-midnight": {
"task": "tasks.clean_expired_sessions",
"schedule": crontab(hour=0, minute=0),
},
"send-weekly-report": {
"task": "tasks.send_weekly_digest",
"schedule": crontab(hour=9, minute=0, day_of_week=1),
},
"refresh-cache-every-5min": {
"task": "tasks.refresh_hot_posts",
"schedule": 300, # 秒
},
}
启动 Beat:
celery -A celery_app beat --loglevel=info
第 4 章:与 Django 集成
# settings.py
CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "redis://localhost:6379/0"
CELERY_TIMEZONE = "Asia/Shanghai"
# celery.py (项目根目录)
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")
app = Celery("mysite")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks() # 自动发现 tasks.py
# 在 Django model 的 save() 中触发
# user.save()
# send_welcome_email.delay(user.email, user.username)
第 5 章:监控与运维
# 启动 Flower 监控面板
celery -A celery_app flower --port=5555
# 访问 http://localhost:5555
# 查看活跃任务
celery -A celery_app inspect active
# 查看已注册任务列表
celery -A celery_app inspect registered
# 清空队列
celery -A celery_app purge
思考题
- Celery 任务中什么操作应该避免?(提示:数据库连接、文件描述符)
- 如何保证 Celery 任务的幂等性?
- 当 Worker 宕机时,已从 broker 取走但未完成的任务会丢失吗?
acks_late是什么?