Apache Kafka

技术栈
工具链
消息队列流处理分布式Kafka事件驱动

概览

Apache Kafka

Apache Kafka 是由 LinkedIn 开发、现由 Apache 软件基金会维护的分布式事件流平台。它不仅能做消息队列,更是一种分布式的提交日志(Commit Log)。

核心价值:高吞吐、低延迟、持久化、水平扩展。适合万亿级消息场景,如日志聚合、流处理、事件溯源。

关键特性

  • 分区(Partition)+ 副本(Replica)模型,天然支持水平扩展
  • 基于磁盘的顺序读写,达到 O(1) 的复杂度
  • 消费者组(Consumer Group)实现消息的广播与单播
  • 精确一次语义(Exactly-Once Semantics)
  • Kafka Connect 连接外部系统,Kafka Streams 做流处理

安装

1. 环境准备

  • 操作系统:Linux(生产推荐)、macOS、Windows(WSL2)
  • Java:JDK 11+(推荐 JDK 17 LTS)
  • ZooKeeper 或 KRaft:Kafka 3.3+ 支持 KRaft 模式(无需 ZooKeeper)
  • 端口:9092(Kafka Broker)、2181(ZooKeeper,如使用)
  • 磁盘:SSD 推荐,生产环境至少 100GB

2. 安装命令

Docker Compose(推荐快速体验)

# docker-compose.yml
version: '3.8'
services:
  kafka:
    image: confluentinc/cp-kafka:7.6.0
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092'
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@localhost:9093'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LISTENERS: 'PLAINTEXT://:9092,CONTROLLER://:9093'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
docker compose up -d

手动安装(Linux)

# 下载 Kafka
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
tar -xzf kafka_2.13-3.6.0.tgz
cd kafka_2.13-3.6.0

# KRaft 模式格式化存储
KAFKA_CLUSTER_ID=$(bin/kafka-storage.sh random-uuid)
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

# 启动 Kafka
bin/kafka-server-start.sh config/kraft/server.properties

# 创建测试 Topic
bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

macOS

brew install kafka
brew services start kafka

3. 常见安装问题

问题 解决方案
端口已占用 修改 server.propertieslistenersadvertised.listeners
__consumer_offsets 不存在 首次消费时自动创建,或手动创建 offsets.topic.replication.factor
KRaft 模式启动失败 确认已执行 kafka-storage.sh format,且 KAFKA_CLUSTER_ID 一致
生产者连接超时 检查 advertised.listeners 是否正确,Docker 中需设为宿主机可访问地址
内存不足 Kafka 默认 JVM 堆 1GB,修改 KAFKA_HEAP_OPTS 环境变量

示例

Kafka 生产者与消费者 - Python (kafka-python)

目标

演示 Kafka 的 Producer 和 Consumer 基本用法,包含分区和消费者组概念。

环境准备

pip install kafka-python
# 确保 Kafka 已启动并创建了 test-topic

完整代码

生产者(producer.py)

from kafka import KafkaProducer
import json
import time

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8') if k else None,
    acks='all',           # 等待所有副本确认
    retries=3,            # 重试次数
    compression_type='gzip'
)

for i in range(10):
    message = {"id": i, "content": f"消息 #{i}", "timestamp": time.time()}
    # key 相同则进入同一分区
    future = producer.send('test-topic', key=str(i % 3), value=message)
    metadata = future.get(timeout=10)
    print(f"Sent: {message} → partition: {metadata.partition}, offset: {metadata.offset}")

producer.flush()
producer.close()

消费者(consumer.py)

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'test-topic',
    bootstrap_servers=['localhost:9092'],
    group_id='my-group',              # 消费者组
    auto_offset_reset='earliest',     # 从最早开始读
    enable_auto_commit=False,         # 手动提交偏移量
    value_deserializer=lambda v: json.loads(v.decode('utf-8')),
    key_deserializer=lambda k: k.decode('utf-8') if k else None
)

print("开始消费...")
for message in consumer:
    print(f"key={message.key}, value={message.value}, "
          f"partition={message.partition}, offset={message.offset}")
    # 手动提交
    consumer.commit()

运行步骤

# 先创建 topic
kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 \
  --partitions 3 --replication-factor 1

# 终端1:启动消费者
python consumer.py

# 终端2:发送消息
python producer.py

预期输出

Sent: {'id': 0, 'content': '消息 #0', ...} → partition: 0, offset: 0
Sent: {'id': 1, 'content': '消息 #1', ...} → partition: 1, offset: 0
...

消费者端会按分区顺序输出消息。

教程

Kafka 入门教程:高吞吐流处理核心概念

1. Kafka 与其他 MQ 的区别

维度 RabbitMQ Kafka
定位 消息代理(Message Broker) 事件流平台(Event Streaming)
吞吐量 万级/秒 百万级/秒
消息保留 消费后删除 基于时间/大小保留
消费模式 Push Pull
顺序保证 队列级 分区级
消息回溯 不支持 支持(重置 offset)

2. 核心概念深入

Topic & Partition

  • Topic 是消息的逻辑分类,如 orderslogs
  • Partition 是物理存储单元,每个 Partition 是一个有序不可变日志
  • 消息在 Partition 内严格有序,跨 Partition 无序

Producer 分区策略

# 1. 指定 key → hash(key) % num_partitions 决定分区
producer.send('topic', key='user_123', value=data)

# 2. 指定 partition → 直接写入
producer.send('topic', partition=0, value=data)

# 3. 都不指定 → 轮询
producer.send('topic', value=data)

Consumer Group

  • 同一 Group 内,每个 Partition 只被一个 Consumer 消费
  • 不同 Group 独立消费(广播)
  • Group 内 Consumer 数量 ≤ Partition 数量,多余的 Consumer 空闲

3. 偏移量(Offset)管理

Partition 0: [msg0] [msg1] [msg2] [msg3] [msg4] ...
                ↑         ↑
            earliest    current offset
# 重置偏移量 - 回溯消费
consumer.seek(topic_partition, offset)  # 精确重置
consumer.seek_to_beginning()            # 从头开始
consumer.seek_to_end()                  # 跳到最新

4. 精确一次语义(Exactly-Once)

Kafka 通过以下机制实现 EOS:

  • 幂等生产者enable.idempotence=true
  • 事务producer.init_transactions() + producer.commit_transaction()
  • 消费者隔离级别isolation.level=read_committed

5. Kafka Streams 简介

无需额外集群,直接在应用内做流处理:

KStream<String, Order> orders = builder.stream("orders");
KTable<String, Double> totalByUser = orders
    .groupByKey()
    .aggregate(() -> 0.0,
        (key, order, total) -> total + order.getAmount(),
        Materialized.as("order-total-store"));

6. 思考题

  1. 为什么 Kafka 要采用 Pull 模型而非 Push?
  2. Partition 数量设置多少合适?多了和少了分别有什么问题?
  3. 如何保证跨 Partition 的全局有序?有什么代价?

参考资料

暂无参考文献