RabbitMQ

技术栈
工具链
消息队列AMQPErlang中间件MQ

概览

RabbitMQ

RabbitMQ 是最广泛部署的开源消息代理(Message Broker),由 Pivotal/VMware 维护,使用 Erlang 编写。它实现了 AMQP 0-9-1 协议,支持 STOMP、MQTT 等扩展协议。

核心价值:解耦应用、削峰填谷、异步处理、可靠投递。支持多种交换机类型(Direct、Topic、Fanout、Headers),提供消息确认、持久化、死信队列、延迟队列等企业级特性。

关键特性

  • 灵活的路由机制(Exchange + Binding + Queue)
  • 消息确认(Publisher Confirm + Consumer Ack)
  • 高可用镜像队列和 Quorum Queue
  • 管理插件和 HTTP API
  • 丰富的客户端库(Java、Python、Go、.NET 等)

安装

1. 环境准备

  • 操作系统:Linux(推荐 Ubuntu 20.04+ / CentOS 7+)、macOS 12+、Windows 10+
  • Erlang 运行时:RabbitMQ 依赖 Erlang,推荐 Erlang 25.x+
  • 端口:5672(AMQP)、15672(管理界面)、25672(集群通信)
  • 内存/磁盘:生产环境至少 4GB RAM、10GB 可用磁盘

2. 安装命令

Ubuntu / Debian

# 添加 RabbitMQ 官方仓库
curl -fsSL https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo gpg --dearmor -o /usr/share/keyrings/rabbitmq.gpg
echo "deb [signed-by=/usr/share/keyrings/rabbitmq.gpg] https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu/ $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list

# 安装
sudo apt update
sudo apt install -y rabbitmq-server

# 启动并设为开机自启
sudo systemctl enable rabbitmq-server
sudo systemctl start rabbitmq-server

# 启用管理插件
sudo rabbitmq-plugins enable rabbitmq_management

Docker 快速安装

docker run -d --name rabbitmq \
  -p 5672:5672 -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=admin123 \
  rabbitmq:3.13-management

macOS

brew install rabbitmq
brew services start rabbitmq
rabbitmq-plugins enable rabbitmq_management

3. 常见安装问题

问题 解决方案
端口 5672 被占用 lsof -i :5672 检查占用进程,修改 RABBITMQ_NODE_PORT 环境变量
Erlang 版本不兼容 参考 RabbitMQ 官方 Erlang 兼容性矩阵,使用匹配版本
管理界面无法访问 确认已启用 rabbitmq_management 插件,检查防火墙规则
Docker 容器无法启动 检查 RABBITMQ_DEFAULT_USER / RABBITMQ_DEFAULT_PASS 是否设置
系统内存不足 RabbitMQ 会阻止发布,设置 vm_memory_high_watermark 阈值

示例

RabbitMQ Hello World - Python (pika)

目标

演示最基本的消息发送(Producer)和接收(Consumer)流程,使用 Direct Exchange + 默认队列。

环境准备

pip install pika

完整代码

发送端(producer.py)

import pika

# 建立连接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost', port=5672)
)
channel = connection.channel()

# 声明队列(幂等操作)
channel.queue_declare(queue='hello')

# 发送消息
channel.basic_publish(
    exchange='',
    routing_key='hello',
    body='Hello World!',
    properties=pika.BasicProperties(
        delivery_mode=2,  # 消息持久化
    )
)

print("[x] Sent 'Hello World!'")
connection.close()

接收端(consumer.py)

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost', port=5672)
)
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(f"[x] Received {body}")
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 手动确认

# 公平分发:每次只取一条
channel.basic_qos(prefetch_count=1)

channel.basic_consume(
    queue='hello',
    on_message_callback=callback,
    auto_ack=False  # 手动 ACK
)

print('[*] Waiting for messages. Press CTRL+C to exit.')
channel.start_consuming()

运行步骤

# 终端1:启动消费者
python consumer.py

# 终端2:发送消息
python producer.py

预期输出

  • 生产者终端:[x] Sent 'Hello World!'
  • 消费者终端:[x] Received b'Hello World!'

教程

RabbitMQ 入门教程:从概念到集群

1. 消息队列解决了什么问题?

在单体架构中,一个请求直接调用另一个服务。但在微服务架构中:

  • 服务宕机:下游服务不可用时,请求直接失败
  • 流量尖峰:秒杀场景下,瞬时流量压垮数据库
  • 紧耦合:服务间直接调用,一处变更影响全局

消息队列通过"异步解耦"解决上述问题:生产者将消息发给 Broker,消费者按自己的节奏处理。

2. 核心概念

Exchange(交换机)

收到消息后,根据路由规则分发到队列。四种类型:

类型 路由逻辑 场景
Direct 精确匹配 routing_key 单播、RPC
Topic 通配符匹配(* 一个词,# 零或多个词) 多条件订阅
Fanout 广播到所有绑定队列 推送通知
Headers 基于 Header 匹配 复杂条件路由

Queue(队列)

消息的存储容器。消费者从这里取消息。

Binding(绑定)

Exchange 和 Queue 之间的关联关系,指定 routing_key。

Message(消息)

由 Payload(业务数据)+ Properties(元数据)组成。

3. 消息确认机制

生产者确认(Publisher Confirm)

channel.confirm_delivery()
try:
    channel.basic_publish(exchange='', routing_key='hello', body='msg')
    print("消息已确认到达 Broker")
except pika.exceptions.UnroutableError:
    print("消息无法路由")

消费者确认(Consumer Ack)

  • auto_ack=True:Broker 发送后即删除(可能丢消息)
  • auto_ack=False:消费者手动 ack/nack/reject

4. 高级特性

死信队列(DLX)

消息被拒绝、过期或队列满时进入死信队列,用于异常处理。

延迟队列

通过 TTL + DLX 实现,用于订单超时取消等场景。

优先级队列

channel.queue_declare(queue='priority_queue', arguments={'x-max-priority': 10})
channel.basic_publish(..., properties=pika.BasicProperties(priority=5))

5. 集群与高可用

Quorum Queue(推荐)

基于 Raft 协议的队列类型,替代传统镜像队列:

rabbitmqctl set_policy ha-policy "^quorum\." '{"queues":{"type":"quorum"}}'

集群部署要点

  • 使用 Load Balancer 做客户端连接分发
  • 所有节点同步 Erlang Cookie
  • 使用 rabbitmqctl cluster_status 检查集群状态

6. 思考题

  1. 为什么 Topic Exchange 比 Direct Exchange 更灵活?各自的适用场景?
  2. 消息持久化(delivery_mode=2)和消息确认(ACK)各自保证什么?
  3. 如果消费者处理太慢怎么办?什么是"背压"?

参考资料

暂无参考文献