ZooKeeper Python 客户端操作与分布式锁
目标
使用 Python kazoo 库操作 ZooKeeper ZNode,并实现一个分布式锁。
环境准备
pip install kazoo
完整代码
基础操作(zk_basic.py)
from kazoo.client import KazooClient
import time
# 连接
zk = KazooClient(hosts='localhost:2181')
zk.start()
# 创建节点
zk.ensure_path('/services/users')
zk.create('/services/users/node1', b'http://192.168.1.10:8080', ephemeral=True)
# 读取节点
data, stat = zk.get('/services/users/node1')
print(f"数据: {data.decode()}, 版本: {stat.version}")
# 更新节点
zk.set('/services/users/node1', b'http://192.168.1.11:8080')
# 监听节点变化
@zk.DataWatch('/services/users/node1')
def watch_node(data, stat):
print(f"节点变化: {data.decode() if data else '已删除'}")
# 获取子节点
children = zk.get_children('/services/users')
print(f"子节点: {children}")
# 删除
zk.delete('/services/users/node1')
zk.stop()
分布式锁(zk_lock.py)
from kazoo.client import KazooClient
from kazoo.recipe.lock import Lock
import threading
import time
def worker(worker_id, lock):
print(f"Worker-{worker_id} 尝试获取锁...")
with lock:
print(f"Worker-{worker_id} 获得锁,开始处理")
time.sleep(2)
print(f"Worker-{worker_id} 处理完成,释放锁")
# 连接
zk = KazooClient(hosts='localhost:2181')
zk.start()
# 创建锁
lock = Lock(zk, '/locks/resource-a')
# 多线程竞争
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i, lock))
threads.append(t)
t.start()
for t in threads:
t.join()
print("所有 Worker 完成")
zk.stop()
Leader 选举(zk_leader.py)
from kazoo.client import KazooClient
from kazoo.recipe.election import Election
zk = KazooClient(hosts='localhost:2181')
zk.start()
election = Election(zk, '/elections/leader')
def leader_callback():
print("✅ 我成为了 Leader!")
# 做 Leader 的工作...
def follower_callback():
print("👤 我是 Follower")
# 参与选举
election.run(leader_callback)
# 阻塞等待
import time
time.sleep(60)
zk.stop()
运行步骤
docker ps | grep zookeeper
python zk_basic.py
python zk_lock.py
python zk_leader.py
预期输出
分布式锁示例中,5 个 Worker 串行执行(不会同时输出"获得锁")。Leader 选举中,只有一个实例成为 Leader。