etcd

技术栈
工具链
分布式配置中心服务发现键值存储RaftCNCF

概览

etcd

etcd 是 CoreOS 开发的分布式键值存储系统,使用 Raft 共识算法保证强一致性。它是 Kubernetes 的核心组件(存储所有集群状态),也是 CNCF 的孵化项目。

是什么

etcd = 可靠 KV 存储 + Watch 机制 + Lease(租约) + MVCC(多版本并发控制)。你可以把它想象成分布式的 /etc 目录——存储最关键的系统配置和元数据。

解决什么问题

  • 分布式配置:存储动态配置,Watch 实时推送变更
  • 服务发现:服务注册 + 租约续期 + 健康检查
  • 选主/Leader Election:利用 Lease + Transaction 实现
  • 分布式锁:原子性 CAS 操作
  • Kubernetes 状态存储:所有 K8s 对象(Pod/Service/ConfigMap)的持久化

关键特性

  • 强一致性:Raft 算法,CAP 中的 CP 系统
  • Watch API:实时监听 key 变化,长连接推送
  • TTL/Lease:key 自动过期,支持续约
  • 事务(Mini-Transaction):IF-THEN-ELSE 原子操作
  • MVCC:每次修改保存历史版本
  • gRPC 协议:高性能,支持多语言 SDK
  • TLS 双向认证:内置证书管理

安装

etcd 安装与初始化

1. 环境准备

要求 说明
操作系统 Linux(推荐)/ macOS / Docker
架构 x86_64 / ARM64
端口 2379(Client API)、2380(Peer 通信)
磁盘 SSD 强烈推荐(Raft 写延迟敏感)
集群规模 建议 3/5/7 节点(奇数)

2. 安装命令

方式一:Docker(快速体验)

# 单节点
docker run -d --name etcd \
  -p 2379:2379 \
  -p 2380:2380 \
  --env ALLOW_NONE_AUTHENTICATION=yes \
  bitnami/etcd:3.5

# 验证
docker exec etcd etcdctl put foo bar
docker exec etcd etcdctl get foo

方式二:二进制包

# 下载
ETCD_VER=v3.5.14
wget https://github.com/etcd-io/etcd/releases/download/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz
tar xzf etcd-${ETCD_VER}-linux-amd64.tar.gz
cd etcd-${ETCD_VER}-linux-amd64

# 启动
./etcd --data-dir=/tmp/etcd-data
# 新终端
./etcdctl put hello world
./etcdctl get hello

方式三:集群部署(3 节点)

# 节点1
etcd --name infra0 \
  --initial-advertise-peer-urls http://10.0.1.10:2380 \
  --listen-peer-urls http://10.0.1.10:2380 \
  --listen-client-urls http://10.0.1.10:2379,http://127.0.0.1:2379 \
  --advertise-client-urls http://10.0.1.10:2379 \
  --initial-cluster-token etcd-cluster-1 \
  --initial-cluster infra0=http://10.0.1.10:2380,infra1=http://10.0.1.11:2380,infra2=http://10.0.1.12:2380 \
  --initial-cluster-state new

# 节点2、3 同理,修改 IP

验证集群

etcdctl endpoint health --cluster
# http://10.0.1.10:2379 is healthy
# http://10.0.1.11:2379 is healthy
# http://10.0.1.12:2379 is healthy

etcdctl member list

3. 常见安装问题

问题 解决方案
peer URL 连接失败 检查防火墙 2380 端口;确认 listen-peer-urls 监听正确 IP
集群无法启动 检查 initial-cluster 配置一致;initial-cluster-state 首次用 new,已有数据用 existing
磁盘 I/O 延迟高 使用 SSD;确保 --data-dir 所在磁盘不与其他服务共享
etcdctl v2/v3 不兼容 设置 export ETCDCTL_API=3 使用 v3 API
内存占用高 定期压缩历史版本:etcdctl compact <revision>etcdctl defrag
认证未启用 生产环境必须启用:etcdctl user add root--auth-token=jwt

示例

目标

使用 Python + etcd3 实现分布式锁和基于 Lease 的服务注册发现,展示 etcd 的核心应用模式。

完整代码

1. 安装依赖

pip install etcd3

2. 分布式锁

# distributed_lock.py
import etcd3
import time
import uuid
from contextlib import contextmanager

class EtcdLock:
    """基于 etcd 的分布式锁"""
    
    def __init__(self, client: etcd3.Etcd3Client, name: str, ttl: int = 10):
        self.client = client
        self.name = f"/locks/{name}"
        self.ttl = ttl
        self.lease = None
    
    def acquire(self, timeout: int = 30) -> bool:
        """获取锁,阻塞直到成功或超时"""
        deadline = time.time() + timeout
        while time.time() < deadline:
            # 创建 Lease
            self.lease = self.client.lease(self.ttl)
            # 尝试原子创建(CAS:不存在时创建)
            success, _ = self.client.transaction(
                compare=[
                    self.client.transactions.create(self.name) == 0
                ],
                success=[
                    self.client.transactions.put(self.name, 
                        str(uuid.uuid4()), lease=self.lease)
                ],
                failure=[]
            )
            if success:
                return True
            time.sleep(0.1)
        return False
    
    def release(self):
        """释放锁"""
        if self.lease:
            self.lease.revoke()
            self.lease = None
    
    @contextmanager
    def __call__(self):
        """上下文管理器用法"""
        if not self.acquire():
            raise TimeoutError(f"Failed to acquire lock: {self.name}")
        try:
            yield
        finally:
            self.release()


# ===== 使用示例 =====
def critical_section(lock_name: str):
    client = etcd3.client(host="localhost", port=2379)
    lock = EtcdLock(client, lock_name)
    
    with lock:
        print(f"[{lock_name}] Entering critical section...")
        time.sleep(3)  # 模拟关键操作
        print(f"[{lock_name}] Done!")


if __name__ == "__main__":
    # 测试:两个"进程"抢锁
    from threading import Thread
    t1 = Thread(target=critical_section, args=("my-resource",))
    t2 = Thread(target=critical_section, args=("my-resource",))
    t1.start(); t2.start()
    t1.join(); t2.join()

3. 服务注册发现

# service_discovery.py
import etcd3
import socket
import json
import time
import threading

class ServiceRegistry:
    """基于 etcd Lease 的服务注册"""
    
    def __init__(self, client: etcd3.Etcd3Client, service_name: str, ttl: int = 10):
        self.client = client
        self.prefix = f"/services/{service_name}"
        self.ttl = ttl
    
    def register(self, host: str, port: int, metadata: dict = None):
        """注册服务实例(带 Lease 自动过期)"""
        instance_id = f"{host}:{port}"
        key = f"{self.prefix}/{instance_id}"
        value = json.dumps({
            "host": host,
            "port": port,
            "metadata": metadata or {},
            "registered_at": time.time()
        })
        
        # 创建 Lease
        lease = self.client.lease(self.ttl)
        
        # 注册
        self.client.put(key, value, lease=lease)
        print(f"[Register] {instance_id} registered (TTL={self.ttl}s)")
        
        # 续约线程
        def keepalive():
            while True:
                try:
                    lease.refresh()
                    time.sleep(self.ttl / 3)
                except Exception:
                    print(f"[Register] Lease lost for {instance_id}")
                    break
        
        threading.Thread(target=keepalive, daemon=True).start()
    
    def discover(self) -> list:
        """发现所有健康实例"""
        instances = []
        for value, metadata in self.client.get_prefix(self.prefix):
            info = json.loads(value.decode())
            instances.append(info)
        return instances
    
    def watch(self, callback):
        """实时监听服务变化"""
        events, cancel = self.client.watch_prefix(self.prefix)
        for event in events:
            callback(event)


# ===== 使用示例 =====
def start_service():
    client = etcd3.client(host="localhost", port=2379)
    registry = ServiceRegistry(client, "order-service")
    
    hostname = socket.gethostname()
    port = 8080
    
    # 注册
    registry.register(hostname, port, {"version": "v1.2.3", "region": "cn-east"})
    
    # 模拟运行
    time.sleep(30)


def discover_services():
    client = etcd3.client(host="localhost", port=2379)
    registry = ServiceRegistry(client, "order-service")
    
    # 轮询发现
    while True:
        instances = registry.discover()
        print(f"[Discover] Found {len(instances)} instances:")
        for inst in instances:
            print(f"  - {inst['host']}:{inst['port']} v{inst['metadata']['version']}")
        time.sleep(5)


if __name__ == "__main__":
    import sys
    if sys.argv[1] == "server":
        start_service()
    else:
        discover_services()

运行步骤

# 1. 启动 etcd
docker run -d --name etcd -p 2379:2379 bitnami/etcd:3.5

# 2. 测试分布式锁
python distributed_lock.py
# [my-resource] Entering critical section... (第一个线程)
# [my-resource] Done!
# [my-resource] Entering critical section... (第二个线程,等待后)
# [my-resource] Done!

# 3. 测试服务发现(终端1)
python service_discovery.py server
# [Register] hostname:8080 registered (TTL=10s)

# 4. 测试服务发现(终端2)
python service_discovery.py discover
# [Discover] Found 1 instances:
#   - hostname:8080 v1.2.3

# 5. 停止 server → 10s 后自动消失

预期输出

# 分布式锁
[my-resource] Entering critical section...
[my-resource] Done!
[my-resource] Entering critical section...
[my-resource] Done!

# 服务发现
[Register] MacBook-Pro:8080 registered (TTL=10s)
[Discover] Found 1 instances:
  - MacBook-Pro:8080 v1.2.3

关键点

  • Lease + 事务(CAS)实现安全分布式锁,避免死锁
  • Lease 自动过期防止实例宕机后"幽灵注册"
  • Watch 机制实时推送变更,避免轮询开销
  • etcd 保证强一致性,分布式锁在 CP 场景下安全可靠

教程

前言

如果你用过 Redis,你可能会问:"有了 Redis 为什么还要 etcd?"答案在于一个词:一致性。Redis 是 AP 系统(最终一致),而 etcd 是 CP 系统(强一致)。当你的分布式锁、选主、配置变更必须是"绝对正确"的时候,etcd 是答案。


第一章:理解 Raft

1.1 为什么需要共识算法

问题:3 个节点存储同一数据,如何确保所有节点返回相同值?

方案1:主从复制(MySQL)
  → 主写入,从复制。主挂 → 可能丢数据

方案2:Raft
  → 多数派(quorum)写入:3 节点中 2 个确认即成功
  → 保证:任何时刻,多数派中至少有一个节点有最新数据

1.2 Raft 三阶段

阶段1:Leader 选举
  所有节点 → Candidate → 获得多数票 → Leader
  心跳间隔:T(通常 100-500ms)

阶段2:日志复制
  Client → Leader → 写入日志 → 复制到 Follower → 多数确认 → 提交

阶段3:安全
  新 Leader 一定包含所有已提交的日志
  利用 Term(任期号)和 Index(日志索引)保证

1.3 Quorum 数学

N 节点集群需要 (N/2)+1 节点确认:

N=3 → 需 2 确认 → 容忍 1 节点故障
N=5 → 需 3 确认 → 容忍 2 节点故障
N=7 → 需 4 确认 → 容忍 3 节点故障

第二章:etcd 核心 API

2.1 KV 操作

etcdctl put /config/app/timeout "30s"
etcdctl get /config/app/timeout
etcdctl del /config/app/timeout
etcdctl get /config/ --prefix      # 前缀查询

2.2 Watch(长连接推送)

# 终端1:监听变化
etcdctl watch /config/ --prefix

# 终端2:修改值
etcdctl put /config/app/timeout "60s"

# 终端1 实时输出:
# PUT /config/app/timeout
# 60s

2.3 Lease(租约)

# 创建 30 秒租约
etcdctl lease grant 30
# lease 694d8f7a3b2e1c5d granted with TTL(30s)

# 用租约写 key(到期自动删除)
etcdctl put /services/app/instance1 "192.168.1.1:8080" --lease=694d8f7a3b2e1c5d

# 续约
etcdctl lease keep-alive 694d8f7a3b2e1c5d

2.4 事务

# 原子 CAS:如果 /lock 不存在则写入
etcdctl txn <;<EOF
compares:
  create("/lock/my-resource") = "0"

success requests (get, put, del):
  put /lock/my-resource "holder-abc"

failure requests (get, put, del):
  get /lock/my-resource
EOF

第三章:etcd vs Redis vs ZooKeeper vs Consul

特性 etcd Redis ZooKeeper Consul
CAP CP AP CP CP
共识算法 Raft 无(主从) ZAB Raft
配置存储 ✅ KV ✅ KV ✅ 树形 ✅ KV
Watch ✅ Pub/Sub
Lease/TTL ✅ EXPIRE ✅ Ephemeral
事务 ✅ MULTI/EXEC ✅ Multi
K8s 原生 ✅(核心依赖)
多数据中心
侧重点 一致性 + 简洁 性能 + 丰富数据结构 配置 + 选举 服务发现 + Mesh

第四章:生产运维要点

4.1 数据压缩与碎片整理

# 查看当前 revision
etcdctl endpoint status --write-out=table

# 压缩旧版本(保留最近 1000 个)
rev=$(etcdctl endpoint status --write-out=json | jq '.[0].Status.header.revision')
etcdctl compact $(($rev - 1000))

# 碎片整理
etcdctl defrag --cluster

4.2 备份与恢复

# 备份
etcdctl snapshot save backup.db

# 恢复
etcdctl snapshot restore backup.db \
  --data-dir=/var/lib/etcd-new \
  --name=infra0 \
  --initial-cluster=infra0=http://10.0.0.1:2380 \
  --initial-advertise-peer-urls=http://10.0.0.1:2380

4.3 安全加固

# 启用认证
etcdctl user add root
etcdctl auth enable

# 创建角色和用户
etcdctl role add app-role
etcdctl role grant-permission app-role readwrite /config/app/ --prefix
etcdctl user add app-user
etcdctl user grant-role app-user app-role

思考题

  1. 为什么 etcd 推荐奇数节点(3/5/7)?4 节点和 3 节点的容错能力一样吗?
  2. Raft 的 Leader 选举中如果出现"脑裂"(两个 Leader),etcd 怎么处理?客户端怎么知道谁是真正的 Leader?
  3. etcd 的 MVCC 机制和数据库的 MVCC 有什么异同?为什么要保存历史版本?
  4. Kubernetes 中所有资源都存储在 etcd 中,etcd 挂了对 K8s 集群有什么影响?已有 Pod 还能继续运行吗?

下一步

  • 学习 etcd + gRPC 实现命名解析(Name Resolver)
  • 学习 Kubernetes CRD + etcd 原理
  • 学习 etcd 性能调优:磁盘/网络/配置参数

参考资料

暂无参考文献