概览
Amazon DynamoDB 是 AWS 的全托管 NoSQL 数据库,支持键值和文档两种数据模型。无需管理服务器,自动扩缩容,提供毫秒级延迟和内置的多区域复制(Global Tables)。特别适合 Serverless 应用(与 AWS Lambda 集成)、游戏、物联网、电商等高并发场景。按读写容量(RCU/WCU)计费,也可按需付费。
Amazon DynamoDB 是 AWS 的全托管 NoSQL 数据库,支持键值和文档两种数据模型。无需管理服务器,自动扩缩容,提供毫秒级延迟和内置的多区域复制(Global Tables)。特别适合 Serverless 应用(与 AWS Lambda 集成)、游戏、物联网、电商等高并发场景。按读写容量(RCU/WCU)计费,也可按需付费。
| 要求 | 说明 |
|---|---|
| AWS 账号 | 注册 https://aws.amazon.com/ |
| AWS CLI | 命令行管理工具 |
| 开发语言 | Python / Node.js / Java / Go 等 |
| AWS SDK | boto3(Python)、aws-sdk(Node.js)等 |
DynamoDB 是托管服务,无需手动安装。本地开发使用 Docker 或 Local 版本。
# Linux
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip && sudo ./aws/install
# macOS
brew install awscli
# 配置凭证
aws configure
# 输入: Access Key ID, Secret Access Key, Region (如 ap-northeast-1), Output (json)
# Amazon DynamoDB Local
docker run -d --name dynamodb \
-p 8000:8000 \
amazon/dynamodb-local
# 验证
curl http://localhost:8000/shell/
# 创建表(使用 AWS CLI 指向本地)
aws dynamodb create-table \
--table-name Students \
--attribute-definitions \
AttributeName=student_id,AttributeType=S \
--key-schema \
AttributeName=student_id,KeyType=HASH \
--billing-mode PAY_PER_REQUEST \
--endpoint-url http://localhost:8000
pip install boto3
确保 IAM 用户/角色有 DynamoDB 权限:
{
"Effect": "Allow",
"Action": [
"dynamodb:PutItem",
"dynamodb:GetItem",
"dynamodb:UpdateItem",
"dynamodb:DeleteItem",
"dynamodb:Query",
"dynamodb:Scan",
"dynamodb:BatchWriteItem"
],
"Resource": "arn:aws:dynamodb:*:*:table/*"
}
本地版默认绑定在 localhost:8000,务必在请求中加 --endpoint-url http://localhost:8000,或 SDK 中指定:
dynamodb = boto3.resource('dynamodb', endpoint_url='http://localhost:8000')
| 模式 | 适用场景 |
|---|---|
| PROVISIONED | 可预测流量,成本更可控 |
| PAY_PER_REQUEST | 不确定或波动的流量,按实际使用付费 |
学生/开发建议用 PAY_PER_REQUEST(免费额度内无需成本)。
AWS 免费层包含每月 25 GB 存储 + 25 读写容量单位,足够学习和小型项目。
使用 Python (boto3) 操作 DynamoDB,完成建表、插入、查询、更新、删除等核心操作。
# pip install boto3
import boto3
from decimal import Decimal
from datetime import datetime
# 使用本地 DynamoDB(开发环境)
dynamodb = boto3.resource(
'dynamodb',
endpoint_url='http://localhost:8000',
region_name='us-east-1',
aws_access_key_id='fake',
aws_secret_access_key='fake'
)
# === 1. 创建表 ===
def create_table():
table = dynamodb.create_table(
TableName='Students',
KeySchema=[
{'AttributeName': 'student_id', 'KeyType': 'HASH'}, # 分区键
{'AttributeName': 'enrolled_date', 'KeyType': 'RANGE'} # 排序键
],
AttributeDefinitions=[
{'AttributeName': 'student_id', 'AttributeType': 'S'},
{'AttributeName': 'enrolled_date', 'AttributeType': 'S'}
],
BillingMode='PAY_PER_REQUEST' # 按需付费
)
table.wait_until_exists()
print(f"表 {table.table_name} 已创建, ARN: {table.table_arn}")
return table
# === 2. 插入数据 ===
def insert_student(table):
items = [
{
'student_id': 'S2024001',
'enrolled_date': '2024-09-01',
'name': '张三',
'age': 21,
'major': '计算机科学',
'gpa': Decimal('3.8'),
'courses': {'数据库原理', '算法导论'}
},
{
'student_id': 'S2024001',
'enrolled_date': '2024-09-15',
'name': '张三',
'age': 21,
'major': '计算机科学',
'gpa': Decimal('3.9'),
'courses': {'操作系统', '计算机网络'}
},
{
'student_id': 'S2024002',
'enrolled_date': '2024-09-01',
'name': '李四',
'age': 22,
'major': '数学',
'gpa': Decimal('3.5'),
'courses': {'高等代数'}
}
]
for item in items:
table.put_item(Item=item)
print(f"插入 {len(items)} 条记录")
# === 3. 查询 ===
def query_students(table, student_id):
"""按分区键查询某学生的所有记录"""
response = table.query(
KeyConditionExpression=boto3.dynamodb.conditions.Key('student_id').eq(student_id)
)
print(f"\n查询 {student_id} 的记录:")
for item in response['Items']:
print(f" 日期: {item['enrolled_date']}, GPA: {item['gpa']}")
# === 4. 获取单条记录 ===
def get_item(table, student_id, enrolled_date):
response = table.get_item(
Key={
'student_id': student_id,
'enrolled_date': enrolled_date
}
)
if 'Item' in response:
item = response['Item']
print(f"\n获取到: {item['name']}, GPA: {item['gpa']}")
else:
print("未找到记录")
# === 5. 更新记录 ===
def update_gpa(table, student_id, enrolled_date, new_gpa):
response = table.update_item(
Key={
'student_id': student_id,
'enrolled_date': enrolled_date
},
UpdateExpression='SET gpa = :gpa, #ts = :ts',
ExpressionAttributeValues={
':gpa': Decimal(str(new_gpa)),
':ts': datetime.now().isoformat()
},
ExpressionAttributeNames={
'#ts': 'updated_at' # 新增字段
},
ReturnValues='UPDATED_NEW'
)
print(f"\n更新后: {response['Attributes']}")
# === 6. 使用 GSI(全局二级索引)按专业查询 ===
def create_gsi(table):
"""添加全局二级索引:按 major 查询"""
table.update(
AttributeDefinitions=[
{'AttributeName': 'major', 'AttributeType': 'S'}
],
GlobalSecondaryIndexUpdates=[
{
'Create': {
'IndexName': 'MajorIndex',
'KeySchema': [
{'AttributeName': 'major', 'KeyType': 'HASH'},
{'AttributeName': 'gpa', 'KeyType': 'RANGE'}
],
'Projection': {'ProjectionType': 'ALL'}
}
}
]
)
print("GSI MajorIndex 创建中(约 1 分钟)...")
# === 主流程 ===
if __name__ == '__main__':
try:
table = create_table()
except Exception:
table = dynamodb.Table('Students')
print("表已存在")
insert_student(table)
query_students(table, 'S2024001')
get_item(table, 'S2024001', '2024-09-01')
update_gpa(table, 'S2024002', '2024-09-01', 3.7)
表 Students 已创建
插入 3 条记录
查询 S2024001 的记录:
日期: 2024-09-01, GPA: 3.8
日期: 2024-09-15, GPA: 3.9
获取到: 张三, GPA: 3.8
更新后: {'gpa': Decimal('3.7'), 'updated_at': '2024-...'}
query() 需要完整 Partition Key,scan() 全表扫描(较慢)courses)支持 {'数据库原理', '算法导论'}DynamoDB 继承自 Amazon 的 Dynamo 论文,核心理念:
| 概念 | 说明 |
|---|---|
| Partition Key | 决定数据物理存储分区 |
| Sort Key | 分区内排序,支持范围查询 |
| GSI | 全局二级索引(可自定义分区/排序键) |
| LSI | 本地二级索引(同分区键,不同排序键) |
| RCU/WCU | 读/写容量单位(1 RCU = 4KB 强一致读/秒) |
| DAX | DynamoDB 缓存加速层(微秒级) |
| Streams | 变更数据捕获(触发 Lambda) |
用 DynamoDB + AWS Lambda + API Gateway 构建无服务器任务管理 API。支持按用户查询、按状态过滤、分页。
# 表设计: Tasks
# PK (Partition Key): USER#<user_id>
# SK (Sort Key): TASK#<task_id>
# 属性: title, status, priority, created_at, due_date
# 一条典型记录:
{
"PK": "USER#alice",
"SK": "TASK#001",
"title": "完成实验报告",
"status": "IN_PROGRESS",
"priority": "HIGH",
"created_at": "2024-09-01T10:00:00Z",
"due_date": "2024-09-15T00:00:00Z"
}
import boto3
import uuid
from datetime import datetime
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('Tasks')
class TaskRepository:
@staticmethod
def create(user_id, title, due_date=None):
task = {
'PK': f'USER#{user_id}',
'SK': f'TASK#{uuid.uuid4().hex[:8]}',
'title': title,
'status': 'TODO',
'priority': 'MEDIUM',
'created_at': datetime.utcnow().isoformat(),
'due_date': due_date
}
table.put_item(Item=task)
return task
@staticmethod
def list_by_user(user_id, status=None, limit=20):
expr = 'PK = :pk'
values = {':pk': f'USER#{user_id}'}
if status:
expr += ' AND #st = :status'
values[':status'] = status
response = table.query(
KeyConditionExpression=expr,
ExpressionAttributeNames={'#st': 'status'} if status else {},
ExpressionAttributeValues=values,
Limit=limit,
ScanIndexForward=False # 最新的在前
)
return response['Items']
@staticmethod
def update_status(user_id, task_id, new_status):
response = table.update_item(
Key={'PK': f'USER#{user_id}', 'SK': f'TASK#{task_id}'},
UpdateExpression='SET #st = :status, updated_at = :ts',
ExpressionAttributeNames={'#st': 'status'},
ExpressionAttributeValues={
':status': new_status,
':ts': datetime.utcnow().isoformat()
},
ReturnValues='ALL_NEW'
)
return response['Attributes']
@staticmethod
def delete(user_id, task_id):
table.delete_item(
Key={'PK': f'USER#{user_id}', 'SK': f'TASK#{task_id}'}
)
# 创建 GSI(在表创建时或通过 update_table)
# GSI: StatusIndex
# PK: status (字符串)
# SK: created_at (时间排序)
# 查询所有用户的 IN_PROGRESS 任务
def list_all_in_progress():
response = table.query(
IndexName='StatusIndex',
KeyConditionExpression='#st = :status',
ExpressionAttributeNames={'#st': 'status'},
ExpressionAttributeValues={':status': 'IN_PROGRESS'}
)
return response['Items']
# lambda_function.py
import json
from task_repository import TaskRepository
def lambda_handler(event, context):
http_method = event['httpMethod']
path = event['path']
user_id = event['requestContext']['authorizer']['claims']['sub']
if http_method == 'GET' and path == '/tasks':
tasks = TaskRepository.list_by_user(user_id)
return {'statusCode': 200, 'body': json.dumps(tasks)}
elif http_method == 'POST' and path == '/tasks':
body = json.loads(event['body'])
task = TaskRepository.create(user_id, body['title'])
return {'statusCode': 201, 'body': json.dumps(task)}
elif http_method == 'PATCH' and '/tasks/' in path:
task_id = path.split('/')[-1]
body = json.loads(event['body'])
updated = TaskRepository.update_status(user_id, task_id, body['status'])
return {'statusCode': 200, 'body': json.dumps(updated)}
return {'statusCode': 404, 'body': 'Not Found'}