RabbitMQ 入门教程:从概念到集群
1. 消息队列解决了什么问题?
在单体架构中,一个请求直接调用另一个服务。但在微服务架构中:
- 服务宕机:下游服务不可用时,请求直接失败
- 流量尖峰:秒杀场景下,瞬时流量压垮数据库
- 紧耦合:服务间直接调用,一处变更影响全局
消息队列通过"异步解耦"解决上述问题:生产者将消息发给 Broker,消费者按自己的节奏处理。
2. 核心概念
Exchange(交换机)
收到消息后,根据路由规则分发到队列。四种类型:
| 类型 |
路由逻辑 |
场景 |
| Direct |
精确匹配 routing_key |
单播、RPC |
| Topic |
通配符匹配(* 一个词,# 零或多个词) |
多条件订阅 |
| Fanout |
广播到所有绑定队列 |
推送通知 |
| Headers |
基于 Header 匹配 |
复杂条件路由 |
Queue(队列)
消息的存储容器。消费者从这里取消息。
Binding(绑定)
Exchange 和 Queue 之间的关联关系,指定 routing_key。
Message(消息)
由 Payload(业务数据)+ Properties(元数据)组成。
3. 消息确认机制
生产者确认(Publisher Confirm)
channel.confirm_delivery()
try:
channel.basic_publish(exchange='', routing_key='hello', body='msg')
print("消息已确认到达 Broker")
except pika.exceptions.UnroutableError:
print("消息无法路由")
消费者确认(Consumer Ack)
- auto_ack=True:Broker 发送后即删除(可能丢消息)
- auto_ack=False:消费者手动 ack/nack/reject
4. 高级特性
死信队列(DLX)
消息被拒绝、过期或队列满时进入死信队列,用于异常处理。
延迟队列
通过 TTL + DLX 实现,用于订单超时取消等场景。
优先级队列
channel.queue_declare(queue='priority_queue', arguments={'x-max-priority': 10})
channel.basic_publish(..., properties=pika.BasicProperties(priority=5))
5. 集群与高可用
Quorum Queue(推荐)
基于 Raft 协议的队列类型,替代传统镜像队列:
rabbitmqctl set_policy ha-policy "^quorum\." '{"queues":{"type":"quorum"}}'
集群部署要点
- 使用 Load Balancer 做客户端连接分发
- 所有节点同步 Erlang Cookie
- 使用
rabbitmqctl cluster_status 检查集群状态
6. 思考题
- 为什么 Topic Exchange 比 Direct Exchange 更灵活?各自的适用场景?
- 消息持久化(
delivery_mode=2)和消息确认(ACK)各自保证什么?
- 如果消费者处理太慢怎么办?什么是"背压"?