Consul 服务注册发现与健康检查
目标
使用 Python consul 库注册服务、查询服务、配置健康检查。
环境准备
pip install python-consul
docker run -d --name consul -p 8500:8500 consul:latest agent -dev -client=0.0.0.0
完整代码
服务注册(register.py)
import consul
import socket
import time
c = consul.Consul(host='localhost', port=8500)
service_id = f"order-service-{socket.gethostname()}"
# 注册服务(含健康检查)
c.agent.service.register(
name='order-service',
service_id=service_id,
address='192.168.1.10',
port=8080,
tags=['v1.0', 'production'],
check=consul.Check.http(
url='http://192.168.1.10:8080/health',
interval='10s',
timeout='3s',
deregister='30s' # 故障 30s 后自动注销
)
)
print(f"✅ 服务已注册: {service_id}")
# 保持运行
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
c.agent.service.deregister(service_id)
print(f"👋 服务已注销: {service_id}")
服务发现(discover.py)
import consul
import random
c = consul.Consul(host='localhost', port=8500)
# 方式一:HTTP API 查询健康服务
def get_healthy_service(name):
_, nodes = c.health.service(name, passing=True)
if not nodes:
raise Exception(f"没有健康的 {name} 实例")
# 随机选一个(客户端负载均衡)
node = random.choice(nodes)
return f"http://{node['Service']['Address']}:{node['Service']['Port']}"
# 方式二:DNS 查询
def dns_discover(name):
import dns.resolver
answers = dns.resolver.query(f"{name}.service.consul", 'A')
for rdata in answers:
print(f"DNS 解析: {rdata.address}")
# 使用
try:
url = get_healthy_service('order-service')
print(f"🔗 发现服务: {url}")
except Exception as e:
print(f"❌ {e}")
# 列出所有服务
_, services = c.agent.services()
for sid, svc in services.items():
print(f" {sid}: {svc['Address']}:{svc['Port']} [{','.join(svc.get('Tags', []))}]")
KV 配置读写(config.py)
import consul
c = consul.Consul(host='localhost', port=8500)
# 写入配置
c.kv.put('app/db/url', 'mysql://localhost:3306/mydb')
c.kv.put('app/db/max_connections', '100')
c.kv.put('app/feature/ai_recommend', 'true')
# 读取配置
_, data = c.kv.get('app/db/url')
print(f"DB URL: {data['Value'].decode()}")
# 递归读取
_, keys = c.kv.get('app/', recurse=True)
for k in keys:
print(f" {k['Key']} = {k['Value']}")
# 监听配置变更
index = None
print("👂 监听配置变更...")
while True:
_, data = c.kv.get('app/db/url', index=index, wait='30s')
if data:
index = data['ModifyIndex'] + 1 # 下次从该版本之后监听
print(f"配置变更: {data['Value'].decode()}")
运行步骤
python register.py
python discover.py
python config.py
预期输出
discover.py 输出已注册的服务实例信息。config.py 能读写 KV 并实时监听变更。