概览
Redis 是最流行的内存键值数据库,由 Salvatore Sanfilippo 于 2009 年创建。它以亚毫秒级延迟著称,不仅支持 String/Hash/List/Set/SortedSet 等丰富数据结构,还提供发布订阅、Lua 脚本、事务、持久化、主从复制和集群模式。Redis 广泛用于缓存、会话存储、排行榜、消息队列、实时计数等场景,是现代高并发架构的标配组件。
Redis 是最流行的内存键值数据库,由 Salvatore Sanfilippo 于 2009 年创建。它以亚毫秒级延迟著称,不仅支持 String/Hash/List/Set/SortedSet 等丰富数据结构,还提供发布订阅、Lua 脚本、事务、持久化、主从复制和集群模式。Redis 广泛用于缓存、会话存储、排行榜、消息队列、实时计数等场景,是现代高并发架构的标配组件。
gcc(源码编译时需要),tcl(测试套件)sudo apt update
sudo apt install redis-server -y
sudo systemctl start redis-server
sudo systemctl enable redis-server
# 验证
redis-cli ping # 应返回 PONG
brew install redis
brew services start redis
redis-cli ping
docker run --name redis-dev \
-p 6379:6379 -v ~/redis-data:/data \
-d redis:7-alpine redis-server --appendonly yes
# 进入 CLI
docker exec -it redis-dev redis-cli
wget https://download.redis.io/redis-stable.tar.gz
tar xzf redis-stable.tar.gz
cd redis-stable
make -j$(nproc)
sudo make install
redis-server --daemonize yes
redis-cli
127.0.0.1:6379> SET mykey "Hello Redis"
OK
127.0.0.1:6379> GET mykey
"Hello Redis"
Q: 内存分配器错误 (MALLOC=libc)
# 编译时遇到 jemalloc 问题:
make MALLOC=libc
# 或安装 jemalloc:
sudo apt install libjemalloc-dev
Q: 无法远程连接
编辑 /etc/redis/redis.conf:
bind 0.0.0.0 # 改成监听所有接口
protected-mode no # 开发环境关闭保护模式
重启:sudo systemctl restart redis-server
Q: 数据持久化配置
Redis 默认 RDB 快照。开启 AOF 追加日志更安全:
appendonly yes
appendfsync everysec # 每秒同步(性能与安全的平衡)
通过 redis-cli 和 Python redis 库,完成 Redis 五种核心数据结构的操作,体验缓存加速效果。
redis-cli ping # 确保返回 PONG
pip install redis
redis-cli
# === String(字符串)===
SET username "alice"
GET username
INCR page_views # 原子自增计数器
INCRBY page_views 10 # 一次性 +10
EXPIRE page_views 3600 # 1小时后过期
# === Hash(哈希 - 对象存储)===
HSET user:1001 name "张三" age "21" email "zs@example.com"
HGET user:1001 name
HGETALL user:1001
HINCRBY user:1001 age 1 # 年龄 +1
# === List(列表 - 消息队列)===
LPUSH queue:tasks "task1" "task2" "task3"
RPOP queue:tasks # 取出最后一个
LLEN queue:tasks
LRANGE queue:tasks 0 -1 # 查看全部
# === Set(集合 - 去重/交并差)===
SADD skills:alice "Python" "Redis" "MySQL"
SADD skills:bob "Python" "MongoDB" "Redis"
SINTER skills:alice skills:bob # 共同技能
SUNION skills:alice skills:bob # 所有技能
# === Sorted Set(有序集合 - 排行榜)===
ZADD leaderboard 92 "Alice" 88 "Bob" 95 "Charlie" 87 "David"
ZRANGE leaderboard 0 -1 WITHSCORES # 全部排名
ZREVRANGE leaderboard 0 2 WITHSCORES # Top 3
ZINCRBY leaderboard 5 "Alice" # Alice 加分
import redis
import time
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# === 缓存模式 ===
def get_user(user_id):
"""先从缓存取,未命中再查'数据库'"""
cache_key = f"user:{user_id}"
cached = r.hgetall(cache_key)
if cached:
print(f"[缓存命中] {cached}")
return cached
# 模拟数据库查询
print(f"[数据库查询] user:{user_id}")
time.sleep(0.1) # 模拟慢查询
user_data = {"name": "张三", "age": 21, "email": f"user{user_id}@example.com"}
# 写入缓存(30秒过期)
r.hset(cache_key, mapping=user_data)
r.expire(cache_key, 30)
return user_data
print(get_user(1001)) # 第一次:数据库
print(get_user(1001)) # 第二次:缓存命中
# === 分布式锁 ===
def do_critical_task():
lock_key = "lock:task"
# 尝试获取锁(SET NX + EX = 原子操作)
acquired = r.set(lock_key, "locked", nx=True, ex=10)
if not acquired:
print("任务正在执行中,跳过")
return
try:
print("执行关键任务...")
time.sleep(1)
finally:
r.delete(lock_key)
# === 发布订阅 ===
def subscriber():
pubsub = r.pubsub()
pubsub.subscribe("channel:news")
for msg in pubsub.listen():
if msg['type'] == 'message':
print(f"收到消息: {msg['data']}")
# 另一个进程发布:
# r.publish("channel:news", "Hello Redis!")
import timeit
def without_cache():
time.sleep(0.05) # 模拟每次 DB 查询
return {"data": "result"}
def with_cache():
cached = r.get("expensive_data")
if cached:
return cached
data = without_cache()
r.setex("expensive_data", 60, str(data))
return data
# 预热缓存
with_cache()
t1 = timeit.timeit(without_cache, number=100)
t2 = timeit.timeit(with_cache, number=100)
print(f"无缓存 100次: {t1:.2f}s")
print(f"有缓存 100次: {t2:.2f}s (加速 {t1/t2:.0f}x)")
# redis-cli
127.0.0.1:6379> ZREVRANGE leaderboard 0 2 WITHSCORES
1) "Charlie" 2) "95"
3) "Alice" 4) "92"
5) "Bob" 6) "88"
# Python
[数据库查询] user:1001
{'name': '张三', 'age': 21, 'email': 'user1001@example.com'}
[缓存命中] {'name': '张三', 'age': 21, 'email': 'user1001@example.com'}
无缓存 100次: 5.10s
有缓存 100次: 0.05s (加速 102x)
演示 Redis 五大核心数据结构:String、Hash、List、Set、Sorted Set,以及分布式锁。
pip install redis
# 确保 Redis 运行
docker run -d --name redis -p 6379:6379 redis:7-alpine
import redis
import json
import time
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# ===== 1. String =====
r.set('app:config:timeout', '30', ex=3600)
r.setnx('app:config:version', '1.0.0') # 不存在才设置
r.incr('app:counter:visitors') # 原子递增
print(f"访客数: {r.get('app:counter:visitors')}")
# ===== 2. Hash(对象存储)=====
r.hset('user:1001', mapping={
'name': 'Alice', 'email': 'alice@example.com',
'age': 28, 'plan': 'premium'
})
print(f"用户名: {r.hget('user:1001', 'name')}")
print(f"用户信息: {r.hgetall('user:1001')}")
# ===== 3. List(消息队列)=====
r.lpush('task:queue', 'task1', 'task2', 'task3')
r.rpush('task:queue', 'task4')
task = r.blpop('task:queue', timeout=5) # 阻塞弹出
print(f"执行任务: {task}")
# ===== 4. Set(去重集合)=====
r.sadd('user:tags:1001', 'python', 'redis', 'docker')
r.sadd('user:tags:1002', 'python', 'go', 'k8s')
common = r.sinter('user:tags:1001', 'user:tags:1002') # 交集
print(f"共同标签: {common}")
# ===== 5. Sorted Set(排行榜)=====
scores = {'player_a': 9800, 'player_b': 8500, 'player_c': 9200, 'player_d': 7600}
r.zadd('game:leaderboard', scores)
# Top 3
top3 = r.zrevrange('game:leaderboard', 0, 2, withscores=True)
print(f"🏆 Top 3: {top3}")
# 玩家 B 的排名
rank = r.zrevrank('game:leaderboard', 'player_b')
print(f"player_b 排名: {rank + 1}")
# ===== 6. 分布式锁 =====
lock_key = 'lock:resource-a'
lock_value = f'{time.time_ns()}'
# 获取锁(NX + PX 原子操作)
acquired = r.set(lock_key, lock_value, nx=True, px=30000)
if acquired:
try:
print("🔒 获得锁,开始处理...")
time.sleep(1)
finally:
# Lua 脚本安全释放(只释放自己的锁)
release_script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
r.eval(release_script, 1, lock_key, lock_value)
print("🔓 释放锁")
else:
print("❌ 锁被占用")
# ===== 7. Pipeline(批量操作)=====
pipe = r.pipeline()
for i in range(100):
pipe.set(f'batch:key:{i}', f'value-{i}')
pipe.execute()
print("批量写入 100 条完成")
# ===== 8. Pub/Sub(发布订阅)=====
def subscriber():
import threading
def listen():
pubsub = r.pubsub()
pubsub.subscribe('channel:notifications')
for msg in pubsub.listen():
if msg['type'] == 'message':
print(f"📨 收到消息: {msg['data']}")
t = threading.Thread(target=listen, daemon=True)
t.start()
return t
# 在另一个进程启动 subscriber(),这里演示发布
t = subscriber()
r.publish('channel:notifications', '系统将在 10 分钟后维护')
time.sleep(0.5)
依次输出访客数、用户信息、任务、排行榜 Top3、分布式锁状态、批量操作、Pub/Sub 消息。
Redis(Remote Dictionary Server)是一个内存中的数据结构服务器。它不是关系型数据库,而是提供了 Key-Value 存储 + 丰富数据结构的内存中间件。
| 维度 | MySQL | Redis |
|---|---|---|
| 存储 | 磁盘(持久) | 内存(也可持久化) |
| 速度 | 毫秒级 | 微秒级(~100k ops/s) |
| 数据模型 | 表/行/列 | String/Hash/List/Set/ZSet/Stream... |
| 查询方式 | SQL | 基于 Key + 原子命令 |
| Schema | 严格 | 无 |
| 容量 | TB/PB 级 | 受限于内存(GB 级) |
SET session:abc123 "user:1001" EX 3600 # 会话存储(1小时过期)
INCR post:42:views # 文章浏览计数
SETBIT online_users 1001 1 # 位图:用户在线状态
HSET product:2001 name "机械键盘" price 299 stock 50
HINCRBY product:2001 stock -1 # 扣库存(原子操作)
# 一个 Hash 可存储 40 亿字段,比 String 存储多个键节省内存
LPUSH events:user:1001 "login" "view_homepage" "search"
LTRIM events:user:1001 0 99 # 只保留最近 100 条
# 阻塞队列(BRPOP)可实现消息队列
SADD followers:1001 2001 2002 2003 # 我的粉丝
SADD following:1001 2002 2004 # 我关注的
SINTER followers:1001 following:1001 # 互关好友
SDIFF following:1001 followers:1001 # 我关注但没关注我的
ZADD leaderboard:weekly 95 "Alice" 92 "Bob" 88 "Charlie"
ZREVRANGE leaderboard:weekly 0 9 WITHSCORES # Top 10
ZRANK leaderboard:weekly "Alice" # Alice 的排名(0-based)
读:先查 Redis → 命中返回 | 未命中 → 查 DB → 回写 Redis
写:更新 DB → 删除/更新 Redis 缓存
def get_article(article_id):
cache_key = f"article:{article_id}"
# 1. 尝试缓存
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# 2. 查数据库
article = db.query("SELECT * FROM articles WHERE id = ?", article_id)
# 3. 回写缓存(TTL 5 分钟)
r.setex(cache_key, 300, json.dumps(article))
return article
| 问题 | 现象 | 解决方案 |
|---|---|---|
| 穿透 | 查询不存在的数据,每次都打到 DB | 布隆过滤器 / 缓存空值 |
| 击穿 | 热点 Key 过期瞬间大量请求 | 互斥锁 / 永不过期 |
| 雪崩 | 大量 Key 同时过期 | TTL 加随机值 / 多级缓存 |
import redis
r = redis.Redis(decode_responses=True)
def record_score(game_id, user, score):
"""记录分数,保留最高分"""
key = f"leaderboard:{game_id}"
r.zadd(key, {user: score}) # ZADD 自动更新高分
def get_top_10(game_id):
key = f"leaderboard:{game_id}"
return r.zrevrange(key, 0, 9, withscores=True)
def get_user_rank(game_id, user):
"""返回排名(从1开始)"""
key = f"leaderboard:{game_id}"
rank = r.zrevrank(key, user)
if rank is None:
return None
return rank + 1
# 使用
record_score("tetris", "alice", 1500)
record_score("tetris", "bob", 1200)
record_score("tetris", "charlie", 1800)
for place, (user, score) in enumerate(get_top_10("tetris"), 1):
print(f"第{place}名: {user} - {int(score)}分")
print(f"Alice 排名: 第{get_user_rank('tetris', 'alice')}名")
RDB:定时快照(适合备份/灾备,可能丢最后几分钟数据)
AOF:追加日志(更安全,everysec 策略丢最多 1 秒数据)
生产建议:同时开启 RDB + AOF
# redis.conf
save 900 1 # 15分钟内有1次修改则快照
save 300 10
appendonly yes
appendfsync everysec