文档
目标
使用 Python + etcd3 实现分布式锁和基于 Lease 的服务注册发现,展示 etcd 的核心应用模式。
完整代码
1. 安装依赖
pip install etcd3
2. 分布式锁
# distributed_lock.py
import etcd3
import time
import uuid
from contextlib import contextmanager
class EtcdLock:
"""基于 etcd 的分布式锁"""
def __init__(self, client: etcd3.Etcd3Client, name: str, ttl: int = 10):
self.client = client
self.name = f"/locks/{name}"
self.ttl = ttl
self.lease = None
def acquire(self, timeout: int = 30) -> bool:
"""获取锁,阻塞直到成功或超时"""
deadline = time.time() + timeout
while time.time() < deadline:
# 创建 Lease
self.lease = self.client.lease(self.ttl)
# 尝试原子创建(CAS:不存在时创建)
success, _ = self.client.transaction(
compare=[
self.client.transactions.create(self.name) == 0
],
success=[
self.client.transactions.put(self.name,
str(uuid.uuid4()), lease=self.lease)
],
failure=[]
)
if success:
return True
time.sleep(0.1)
return False
def release(self):
"""释放锁"""
if self.lease:
self.lease.revoke()
self.lease = None
@contextmanager
def __call__(self):
"""上下文管理器用法"""
if not self.acquire():
raise TimeoutError(f"Failed to acquire lock: {self.name}")
try:
yield
finally:
self.release()
# ===== 使用示例 =====
def critical_section(lock_name: str):
client = etcd3.client(host="localhost", port=2379)
lock = EtcdLock(client, lock_name)
with lock:
print(f"[{lock_name}] Entering critical section...")
time.sleep(3) # 模拟关键操作
print(f"[{lock_name}] Done!")
if __name__ == "__main__":
# 测试:两个"进程"抢锁
from threading import Thread
t1 = Thread(target=critical_section, args=("my-resource",))
t2 = Thread(target=critical_section, args=("my-resource",))
t1.start(); t2.start()
t1.join(); t2.join()
3. 服务注册发现
# service_discovery.py
import etcd3
import socket
import json
import time
import threading
class ServiceRegistry:
"""基于 etcd Lease 的服务注册"""
def __init__(self, client: etcd3.Etcd3Client, service_name: str, ttl: int = 10):
self.client = client
self.prefix = f"/services/{service_name}"
self.ttl = ttl
def register(self, host: str, port: int, metadata: dict = None):
"""注册服务实例(带 Lease 自动过期)"""
instance_id = f"{host}:{port}"
key = f"{self.prefix}/{instance_id}"
value = json.dumps({
"host": host,
"port": port,
"metadata": metadata or {},
"registered_at": time.time()
})
# 创建 Lease
lease = self.client.lease(self.ttl)
# 注册
self.client.put(key, value, lease=lease)
print(f"[Register] {instance_id} registered (TTL={self.ttl}s)")
# 续约线程
def keepalive():
while True:
try:
lease.refresh()
time.sleep(self.ttl / 3)
except Exception:
print(f"[Register] Lease lost for {instance_id}")
break
threading.Thread(target=keepalive, daemon=True).start()
def discover(self) -> list:
"""发现所有健康实例"""
instances = []
for value, metadata in self.client.get_prefix(self.prefix):
info = json.loads(value.decode())
instances.append(info)
return instances
def watch(self, callback):
"""实时监听服务变化"""
events, cancel = self.client.watch_prefix(self.prefix)
for event in events:
callback(event)
# ===== 使用示例 =====
def start_service():
client = etcd3.client(host="localhost", port=2379)
registry = ServiceRegistry(client, "order-service")
hostname = socket.gethostname()
port = 8080
# 注册
registry.register(hostname, port, {"version": "v1.2.3", "region": "cn-east"})
# 模拟运行
time.sleep(30)
def discover_services():
client = etcd3.client(host="localhost", port=2379)
registry = ServiceRegistry(client, "order-service")
# 轮询发现
while True:
instances = registry.discover()
print(f"[Discover] Found {len(instances)} instances:")
for inst in instances:
print(f" - {inst['host']}:{inst['port']} v{inst['metadata']['version']}")
time.sleep(5)
if __name__ == "__main__":
import sys
if sys.argv[1] == "server":
start_service()
else:
discover_services()
运行步骤
# 1. 启动 etcd
docker run -d --name etcd -p 2379:2379 bitnami/etcd:3.5
# 2. 测试分布式锁
python distributed_lock.py
# [my-resource] Entering critical section... (第一个线程)
# [my-resource] Done!
# [my-resource] Entering critical section... (第二个线程,等待后)
# [my-resource] Done!
# 3. 测试服务发现(终端1)
python service_discovery.py server
# [Register] hostname:8080 registered (TTL=10s)
# 4. 测试服务发现(终端2)
python service_discovery.py discover
# [Discover] Found 1 instances:
# - hostname:8080 v1.2.3
# 5. 停止 server → 10s 后自动消失
预期输出
# 分布式锁
[my-resource] Entering critical section...
[my-resource] Done!
[my-resource] Entering critical section...
[my-resource] Done!
# 服务发现
[Register] MacBook-Pro:8080 registered (TTL=10s)
[Discover] Found 1 instances:
- MacBook-Pro:8080 v1.2.3
关键点
- Lease + 事务(CAS)实现安全分布式锁,避免死锁
- Lease 自动过期防止实例宕机后"幽灵注册"
- Watch 机制实时推送变更,避免轮询开销
- etcd 保证强一致性,分布式锁在 CP 场景下安全可靠