etcd 分布式锁与服务发现实战

知识库
知识库文档
/tech-stacks/etcd/examples/etcd 分布式锁与服务发现实战.md

文档

目标

使用 Python + etcd3 实现分布式锁和基于 Lease 的服务注册发现,展示 etcd 的核心应用模式。

完整代码

1. 安装依赖

pip install etcd3

2. 分布式锁

# distributed_lock.py
import etcd3
import time
import uuid
from contextlib import contextmanager

class EtcdLock:
    """基于 etcd 的分布式锁"""
    
    def __init__(self, client: etcd3.Etcd3Client, name: str, ttl: int = 10):
        self.client = client
        self.name = f"/locks/{name}"
        self.ttl = ttl
        self.lease = None
    
    def acquire(self, timeout: int = 30) -> bool:
        """获取锁,阻塞直到成功或超时"""
        deadline = time.time() + timeout
        while time.time() < deadline:
            # 创建 Lease
            self.lease = self.client.lease(self.ttl)
            # 尝试原子创建(CAS:不存在时创建)
            success, _ = self.client.transaction(
                compare=[
                    self.client.transactions.create(self.name) == 0
                ],
                success=[
                    self.client.transactions.put(self.name, 
                        str(uuid.uuid4()), lease=self.lease)
                ],
                failure=[]
            )
            if success:
                return True
            time.sleep(0.1)
        return False
    
    def release(self):
        """释放锁"""
        if self.lease:
            self.lease.revoke()
            self.lease = None
    
    @contextmanager
    def __call__(self):
        """上下文管理器用法"""
        if not self.acquire():
            raise TimeoutError(f"Failed to acquire lock: {self.name}")
        try:
            yield
        finally:
            self.release()


# ===== 使用示例 =====
def critical_section(lock_name: str):
    client = etcd3.client(host="localhost", port=2379)
    lock = EtcdLock(client, lock_name)
    
    with lock:
        print(f"[{lock_name}] Entering critical section...")
        time.sleep(3)  # 模拟关键操作
        print(f"[{lock_name}] Done!")


if __name__ == "__main__":
    # 测试:两个"进程"抢锁
    from threading import Thread
    t1 = Thread(target=critical_section, args=("my-resource",))
    t2 = Thread(target=critical_section, args=("my-resource",))
    t1.start(); t2.start()
    t1.join(); t2.join()

3. 服务注册发现

# service_discovery.py
import etcd3
import socket
import json
import time
import threading

class ServiceRegistry:
    """基于 etcd Lease 的服务注册"""
    
    def __init__(self, client: etcd3.Etcd3Client, service_name: str, ttl: int = 10):
        self.client = client
        self.prefix = f"/services/{service_name}"
        self.ttl = ttl
    
    def register(self, host: str, port: int, metadata: dict = None):
        """注册服务实例(带 Lease 自动过期)"""
        instance_id = f"{host}:{port}"
        key = f"{self.prefix}/{instance_id}"
        value = json.dumps({
            "host": host,
            "port": port,
            "metadata": metadata or {},
            "registered_at": time.time()
        })
        
        # 创建 Lease
        lease = self.client.lease(self.ttl)
        
        # 注册
        self.client.put(key, value, lease=lease)
        print(f"[Register] {instance_id} registered (TTL={self.ttl}s)")
        
        # 续约线程
        def keepalive():
            while True:
                try:
                    lease.refresh()
                    time.sleep(self.ttl / 3)
                except Exception:
                    print(f"[Register] Lease lost for {instance_id}")
                    break
        
        threading.Thread(target=keepalive, daemon=True).start()
    
    def discover(self) -> list:
        """发现所有健康实例"""
        instances = []
        for value, metadata in self.client.get_prefix(self.prefix):
            info = json.loads(value.decode())
            instances.append(info)
        return instances
    
    def watch(self, callback):
        """实时监听服务变化"""
        events, cancel = self.client.watch_prefix(self.prefix)
        for event in events:
            callback(event)


# ===== 使用示例 =====
def start_service():
    client = etcd3.client(host="localhost", port=2379)
    registry = ServiceRegistry(client, "order-service")
    
    hostname = socket.gethostname()
    port = 8080
    
    # 注册
    registry.register(hostname, port, {"version": "v1.2.3", "region": "cn-east"})
    
    # 模拟运行
    time.sleep(30)


def discover_services():
    client = etcd3.client(host="localhost", port=2379)
    registry = ServiceRegistry(client, "order-service")
    
    # 轮询发现
    while True:
        instances = registry.discover()
        print(f"[Discover] Found {len(instances)} instances:")
        for inst in instances:
            print(f"  - {inst['host']}:{inst['port']} v{inst['metadata']['version']}")
        time.sleep(5)


if __name__ == "__main__":
    import sys
    if sys.argv[1] == "server":
        start_service()
    else:
        discover_services()

运行步骤

# 1. 启动 etcd
docker run -d --name etcd -p 2379:2379 bitnami/etcd:3.5

# 2. 测试分布式锁
python distributed_lock.py
# [my-resource] Entering critical section... (第一个线程)
# [my-resource] Done!
# [my-resource] Entering critical section... (第二个线程,等待后)
# [my-resource] Done!

# 3. 测试服务发现(终端1)
python service_discovery.py server
# [Register] hostname:8080 registered (TTL=10s)

# 4. 测试服务发现(终端2)
python service_discovery.py discover
# [Discover] Found 1 instances:
#   - hostname:8080 v1.2.3

# 5. 停止 server → 10s 后自动消失

预期输出

# 分布式锁
[my-resource] Entering critical section...
[my-resource] Done!
[my-resource] Entering critical section...
[my-resource] Done!

# 服务发现
[Register] MacBook-Pro:8080 registered (TTL=10s)
[Discover] Found 1 instances:
  - MacBook-Pro:8080 v1.2.3

关键点

  • Lease + 事务(CAS)实现安全分布式锁,避免死锁
  • Lease 自动过期防止实例宕机后"幽灵注册"
  • Watch 机制实时推送变更,避免轮询开销
  • etcd 保证强一致性,分布式锁在 CP 场景下安全可靠

信息

路径
/tech-stacks/etcd/examples/etcd 分布式锁与服务发现实战.md
更新时间
2026/5/31