生产者与消费者基础示例

知识库
知识库文档
/tech-stacks/apache-kafka/examples/生产者与消费者基础示例.md

文档

Kafka 生产者与消费者 - Python (kafka-python)

目标

演示 Kafka 的 Producer 和 Consumer 基本用法,包含分区和消费者组概念。

环境准备

pip install kafka-python
# 确保 Kafka 已启动并创建了 test-topic

完整代码

生产者(producer.py)

from kafka import KafkaProducer
import json
import time

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8') if k else None,
    acks='all',           # 等待所有副本确认
    retries=3,            # 重试次数
    compression_type='gzip'
)

for i in range(10):
    message = {"id": i, "content": f"消息 #{i}", "timestamp": time.time()}
    # key 相同则进入同一分区
    future = producer.send('test-topic', key=str(i % 3), value=message)
    metadata = future.get(timeout=10)
    print(f"Sent: {message} → partition: {metadata.partition}, offset: {metadata.offset}")

producer.flush()
producer.close()

消费者(consumer.py)

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'test-topic',
    bootstrap_servers=['localhost:9092'],
    group_id='my-group',              # 消费者组
    auto_offset_reset='earliest',     # 从最早开始读
    enable_auto_commit=False,         # 手动提交偏移量
    value_deserializer=lambda v: json.loads(v.decode('utf-8')),
    key_deserializer=lambda k: k.decode('utf-8') if k else None
)

print("开始消费...")
for message in consumer:
    print(f"key={message.key}, value={message.value}, "
          f"partition={message.partition}, offset={message.offset}")
    # 手动提交
    consumer.commit()

运行步骤

# 先创建 topic
kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 \
  --partitions 3 --replication-factor 1

# 终端1:启动消费者
python consumer.py

# 终端2:发送消息
python producer.py

预期输出

Sent: {'id': 0, 'content': '消息 #0', ...} → partition: 0, offset: 0
Sent: {'id': 1, 'content': '消息 #1', ...} → partition: 1, offset: 0
...

消费者端会按分区顺序输出消息。

信息

路径
/tech-stacks/apache-kafka/examples/生产者与消费者基础示例.md
更新时间
2026/5/31