文档
Amazon DynamoDB 从零到实战:Serverless 后端
1. 背景与概念
1.1 DynamoDB 设计哲学
DynamoDB 继承自 Amazon 的 Dynamo 论文,核心理念:
- 永远可写(Always Writable):无主从切换
- 最终一致:默认最终一致,可选强一致
- 单表设计:不像 SQL 多表 JOIN,最佳实践是"一张表囊括所有实体"
1.2 关键概念
| 概念 | 说明 |
|---|---|
| Partition Key | 决定数据物理存储分区 |
| Sort Key | 分区内排序,支持范围查询 |
| GSI | 全局二级索引(可自定义分区/排序键) |
| LSI | 本地二级索引(同分区键,不同排序键) |
| RCU/WCU | 读/写容量单位(1 RCU = 4KB 强一致读/秒) |
| DAX | DynamoDB 缓存加速层(微秒级) |
| Streams | 变更数据捕获(触发 Lambda) |
2. 分步实战:构建 Serverless 任务管理 API
场景
用 DynamoDB + AWS Lambda + API Gateway 构建无服务器任务管理 API。支持按用户查询、按状态过滤、分页。
步骤一:单表设计
# 表设计: Tasks
# PK (Partition Key): USER#<user_id>
# SK (Sort Key): TASK#<task_id>
# 属性: title, status, priority, created_at, due_date
# 一条典型记录:
{
"PK": "USER#alice",
"SK": "TASK#001",
"title": "完成实验报告",
"status": "IN_PROGRESS",
"priority": "HIGH",
"created_at": "2024-09-01T10:00:00Z",
"due_date": "2024-09-15T00:00:00Z"
}
步骤二:DynamoDB 操作封装
import boto3
import uuid
from datetime import datetime
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('Tasks')
class TaskRepository:
@staticmethod
def create(user_id, title, due_date=None):
task = {
'PK': f'USER#{user_id}',
'SK': f'TASK#{uuid.uuid4().hex[:8]}',
'title': title,
'status': 'TODO',
'priority': 'MEDIUM',
'created_at': datetime.utcnow().isoformat(),
'due_date': due_date
}
table.put_item(Item=task)
return task
@staticmethod
def list_by_user(user_id, status=None, limit=20):
expr = 'PK = :pk'
values = {':pk': f'USER#{user_id}'}
if status:
expr += ' AND #st = :status'
values[':status'] = status
response = table.query(
KeyConditionExpression=expr,
ExpressionAttributeNames={'#st': 'status'} if status else {},
ExpressionAttributeValues=values,
Limit=limit,
ScanIndexForward=False # 最新的在前
)
return response['Items']
@staticmethod
def update_status(user_id, task_id, new_status):
response = table.update_item(
Key={'PK': f'USER#{user_id}', 'SK': f'TASK#{task_id}'},
UpdateExpression='SET #st = :status, updated_at = :ts',
ExpressionAttributeNames={'#st': 'status'},
ExpressionAttributeValues={
':status': new_status,
':ts': datetime.utcnow().isoformat()
},
ReturnValues='ALL_NEW'
)
return response['Attributes']
@staticmethod
def delete(user_id, task_id):
table.delete_item(
Key={'PK': f'USER#{user_id}', 'SK': f'TASK#{task_id}'}
)
步骤三:GSI 实现按状态全局查询
# 创建 GSI(在表创建时或通过 update_table)
# GSI: StatusIndex
# PK: status (字符串)
# SK: created_at (时间排序)
# 查询所有用户的 IN_PROGRESS 任务
def list_all_in_progress():
response = table.query(
IndexName='StatusIndex',
KeyConditionExpression='#st = :status',
ExpressionAttributeNames={'#st': 'status'},
ExpressionAttributeValues={':status': 'IN_PROGRESS'}
)
return response['Items']
步骤四:集成 Lambda
# lambda_function.py
import json
from task_repository import TaskRepository
def lambda_handler(event, context):
http_method = event['httpMethod']
path = event['path']
user_id = event['requestContext']['authorizer']['claims']['sub']
if http_method == 'GET' and path == '/tasks':
tasks = TaskRepository.list_by_user(user_id)
return {'statusCode': 200, 'body': json.dumps(tasks)}
elif http_method == 'POST' and path == '/tasks':
body = json.loads(event['body'])
task = TaskRepository.create(user_id, body['title'])
return {'statusCode': 201, 'body': json.dumps(task)}
elif http_method == 'PATCH' and '/tasks/' in path:
task_id = path.split('/')[-1]
body = json.loads(event['body'])
updated = TaskRepository.update_status(user_id, task_id, body['status'])
return {'statusCode': 200, 'body': json.dumps(updated)}
return {'statusCode': 404, 'body': 'Not Found'}
3. 思考题
- 为什么 DynamoDB 推荐"单表设计"?在多表设计下 GSI 能否跨表?
- RCU/WCU 计费模式下,一次 Scan 全表 1GB 数据消耗多少 RCU?
- DynamoDB Streams + Lambda 可以实现哪些模式?(CQRS、事件溯源、实时通知)