文档
Celery Hello World:异步发送邮件
目标
将耗时的邮件发送操作从 Web 请求中异步化,用户无需等待邮件发送完成就能收到 HTTP 响应。
完整代码
# celery_app.py — Celery 配置
from celery import Celery
app = Celery(
"tasks",
broker="redis://localhost:6379/0", # 消息代理
backend="redis://localhost:6379/0", # 结果后端(可选)
)
app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="Asia/Shanghai",
enable_utc=True,
)
# tasks.py — 任务定义
import time
from celery_app import app
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def send_welcome_email(self, user_email: str, username: str):
"""模拟发送欢迎邮件(耗时操作)"""
try:
print(f"[开始] 发送邮件到 {user_email}")
time.sleep(3) # 模拟网络 IO
print(f"[完成] {username} 的欢迎邮件已发送至 {user_email}")
return f"ok: {user_email}"
except Exception as exc:
print(f"[失败] 重试中... ({self.request.retries + 1}/3)")
raise self.retry(exc=exc)
@app.task
def generate_report(report_id: int):
"""模拟生成报表(CPU 密集型)"""
print(f"[开始] 生成报表 #{report_id}")
time.sleep(5)
print(f"[完成] 报表 #{report_id} 已生成")
return f"report_{report_id}.pdf"
# main.py — 调用任务
from tasks import send_welcome_email, generate_report
# 异步调用 — 立即返回 AsyncResult,不阻塞
result = send_welcome_email.delay("user@example.com", "张三")
print(f"任务 ID: {result.id}")
print(f"任务状态: {result.status}") # PENDING
print("邮件任务已提交,用户可以继续浏览页面!")
# 批量异步任务
report_result = generate_report.delay(42)
print(f"报表任务 ID: {report_result.id}")
启动 Worker
# 终端 1:启动 Redis
docker run -d --name redis -p 6379:6379 redis:7-alpine
# 终端 2:启动 Worker
celery -A celery_app worker --loglevel=info
# 终端 3:运行任务
python main.py
预期输出(Worker 终端)
[开始] 发送邮件到 user@example.com
[开始] 生成报表 #42
[完成] 张三 的欢迎邮件已发送至 user@example.com
[完成] 报表 #42 已生成