文档
Kafka 生产者与消费者 - Python (kafka-python)
目标
演示 Kafka 的 Producer 和 Consumer 基本用法,包含分区和消费者组概念。
环境准备
pip install kafka-python
# 确保 Kafka 已启动并创建了 test-topic
完整代码
生产者(producer.py)
from kafka import KafkaProducer
import json
import time
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks='all', # 等待所有副本确认
retries=3, # 重试次数
compression_type='gzip'
)
for i in range(10):
message = {"id": i, "content": f"消息 #{i}", "timestamp": time.time()}
# key 相同则进入同一分区
future = producer.send('test-topic', key=str(i % 3), value=message)
metadata = future.get(timeout=10)
print(f"Sent: {message} → partition: {metadata.partition}, offset: {metadata.offset}")
producer.flush()
producer.close()
消费者(consumer.py)
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'test-topic',
bootstrap_servers=['localhost:9092'],
group_id='my-group', # 消费者组
auto_offset_reset='earliest', # 从最早开始读
enable_auto_commit=False, # 手动提交偏移量
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
key_deserializer=lambda k: k.decode('utf-8') if k else None
)
print("开始消费...")
for message in consumer:
print(f"key={message.key}, value={message.value}, "
f"partition={message.partition}, offset={message.offset}")
# 手动提交
consumer.commit()
运行步骤
# 先创建 topic
kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 \
--partitions 3 --replication-factor 1
# 终端1:启动消费者
python consumer.py
# 终端2:发送消息
python producer.py
预期输出
Sent: {'id': 0, 'content': '消息 #0', ...} → partition: 0, offset: 0
Sent: {'id': 1, 'content': '消息 #1', ...} → partition: 1, offset: 0
...
消费者端会按分区顺序输出消息。