Amazon DynamoDB

技术栈
数据库
nosqlkey-valuedocumentserverlessawsmanaged

概览

Amazon DynamoDB 是 AWS 的全托管 NoSQL 数据库,支持键值和文档两种数据模型。无需管理服务器,自动扩缩容,提供毫秒级延迟和内置的多区域复制(Global Tables)。特别适合 Serverless 应用(与 AWS Lambda 集成)、游戏、物联网、电商等高并发场景。按读写容量(RCU/WCU)计费,也可按需付费。

安装

Amazon DynamoDB 安装与配置指南

1. 环境准备

要求 说明
AWS 账号 注册 https://aws.amazon.com/
AWS CLI 命令行管理工具
开发语言 Python / Node.js / Java / Go 等
AWS SDK boto3(Python)、aws-sdk(Node.js)等

DynamoDB 是托管服务,无需手动安装。本地开发使用 Docker 或 Local 版本。

2. 配置命令

AWS CLI 安装与配置

# Linux
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip &;& sudo ./aws/install

# macOS
brew install awscli

# 配置凭证
aws configure
# 输入: Access Key ID, Secret Access Key, Region (如 ap-northeast-1), Output (json)

Docker 本地版(开发测试用)

# Amazon DynamoDB Local
docker run -d --name dynamodb \
  -p 8000:8000 \
  amazon/dynamodb-local

# 验证
curl http://localhost:8000/shell/

# 创建表(使用 AWS CLI 指向本地)
aws dynamodb create-table \
  --table-name Students \
  --attribute-definitions \
    AttributeName=student_id,AttributeType=S \
  --key-schema \
    AttributeName=student_id,KeyType=HASH \
  --billing-mode PAY_PER_REQUEST \
  --endpoint-url http://localhost:8000

Python SDK (boto3)

pip install boto3

3. 常见配置问题

Q1: Access Denied / 权限不足

确保 IAM 用户/角色有 DynamoDB 权限:

{
  "Effect": "Allow",
  "Action": [
    "dynamodb:PutItem",
    "dynamodb:GetItem",
    "dynamodb:UpdateItem",
    "dynamodb:DeleteItem",
    "dynamodb:Query",
    "dynamodb:Scan",
    "dynamodb:BatchWriteItem"
  ],
  "Resource": "arn:aws:dynamodb:*:*:table/*"
}

Q2: 本地 DynamoDB 连接被拒

本地版默认绑定在 localhost:8000,务必在请求中加 --endpoint-url http://localhost:8000,或 SDK 中指定:

dynamodb = boto3.resource('dynamodb', endpoint_url='http://localhost:8000')

Q3: 计费模式选择

模式 适用场景
PROVISIONED 可预测流量,成本更可控
PAY_PER_REQUEST 不确定或波动的流量,按实际使用付费

学生/开发建议用 PAY_PER_REQUEST(免费额度内无需成本)。

Q4: 免费额度

AWS 免费层包含每月 25 GB 存储 + 25 读写容量单位,足够学习和小型项目。

示例

DynamoDB Hello World:学生表 CRUD 操作

目标

使用 Python (boto3) 操作 DynamoDB,完成建表、插入、查询、更新、删除等核心操作。

完整代码

# pip install boto3
import boto3
from decimal import Decimal
from datetime import datetime

# 使用本地 DynamoDB(开发环境)
dynamodb = boto3.resource(
    'dynamodb',
    endpoint_url='http://localhost:8000',
    region_name='us-east-1',
    aws_access_key_id='fake',
    aws_secret_access_key='fake'
)

# === 1. 创建表 ===
def create_table():
    table = dynamodb.create_table(
        TableName='Students',
        KeySchema=[
            {'AttributeName': 'student_id', 'KeyType': 'HASH'},  # 分区键
            {'AttributeName': 'enrolled_date', 'KeyType': 'RANGE'} # 排序键
        ],
        AttributeDefinitions=[
            {'AttributeName': 'student_id', 'AttributeType': 'S'},
            {'AttributeName': 'enrolled_date', 'AttributeType': 'S'}
        ],
        BillingMode='PAY_PER_REQUEST'  # 按需付费
    )
    table.wait_until_exists()
    print(f"表 {table.table_name} 已创建, ARN: {table.table_arn}")
    return table

# === 2. 插入数据 ===
def insert_student(table):
    items = [
        {
            'student_id': 'S2024001',
            'enrolled_date': '2024-09-01',
            'name': '张三',
            'age': 21,
            'major': '计算机科学',
            'gpa': Decimal('3.8'),
            'courses': {'数据库原理', '算法导论'}
        },
        {
            'student_id': 'S2024001',
            'enrolled_date': '2024-09-15',
            'name': '张三',
            'age': 21,
            'major': '计算机科学',
            'gpa': Decimal('3.9'),
            'courses': {'操作系统', '计算机网络'}
        },
        {
            'student_id': 'S2024002',
            'enrolled_date': '2024-09-01',
            'name': '李四',
            'age': 22,
            'major': '数学',
            'gpa': Decimal('3.5'),
            'courses': {'高等代数'}
        }
    ]
    
    for item in items:
        table.put_item(Item=item)
    print(f"插入 {len(items)} 条记录")

# === 3. 查询 ===
def query_students(table, student_id):
    """按分区键查询某学生的所有记录"""
    response = table.query(
        KeyConditionExpression=boto3.dynamodb.conditions.Key('student_id').eq(student_id)
    )
    print(f"\n查询 {student_id} 的记录:")
    for item in response['Items']:
        print(f"  日期: {item['enrolled_date']}, GPA: {item['gpa']}")

# === 4. 获取单条记录 ===
def get_item(table, student_id, enrolled_date):
    response = table.get_item(
        Key={
            'student_id': student_id,
            'enrolled_date': enrolled_date
        }
    )
    if 'Item' in response:
        item = response['Item']
        print(f"\n获取到: {item['name']}, GPA: {item['gpa']}")
    else:
        print("未找到记录")

# === 5. 更新记录 ===
def update_gpa(table, student_id, enrolled_date, new_gpa):
    response = table.update_item(
        Key={
            'student_id': student_id,
            'enrolled_date': enrolled_date
        },
        UpdateExpression='SET gpa = :gpa, #ts = :ts',
        ExpressionAttributeValues={
            ':gpa': Decimal(str(new_gpa)),
            ':ts': datetime.now().isoformat()
        },
        ExpressionAttributeNames={
            '#ts': 'updated_at'  # 新增字段
        },
        ReturnValues='UPDATED_NEW'
    )
    print(f"\n更新后: {response['Attributes']}")

# === 6. 使用 GSI(全局二级索引)按专业查询 ===
def create_gsi(table):
    """添加全局二级索引:按 major 查询"""
    table.update(
        AttributeDefinitions=[
            {'AttributeName': 'major', 'AttributeType': 'S'}
        ],
        GlobalSecondaryIndexUpdates=[
            {
                'Create': {
                    'IndexName': 'MajorIndex',
                    'KeySchema': [
                        {'AttributeName': 'major', 'KeyType': 'HASH'},
                        {'AttributeName': 'gpa', 'KeyType': 'RANGE'}
                    ],
                    'Projection': {'ProjectionType': 'ALL'}
                }
            }
        ]
    )
    print("GSI MajorIndex 创建中(约 1 分钟)...")

# === 主流程 ===
if __name__ == '__main__':
    try:
        table = create_table()
    except Exception:
        table = dynamodb.Table('Students')
        print("表已存在")

    insert_student(table)
    query_students(table, 'S2024001')
    get_item(table, 'S2024001', '2024-09-01')
    update_gpa(table, 'S2024002', '2024-09-01', 3.7)

预期输出

表 Students 已创建
插入 3 条记录

查询 S2024001 的记录:
  日期: 2024-09-01, GPA: 3.8
  日期: 2024-09-15, GPA: 3.9

获取到: 张三, GPA: 3.8

更新后: {'gpa': Decimal('3.7'), 'updated_at': '2024-...'}

关键点

  • DynamoDB 的 PRIMARY KEY = Partition Key (HASH) + 可选的 Sort Key (RANGE)
  • query() 需要完整 Partition Key,scan() 全表扫描(较慢)
  • GSI 允许按非主键字段查询,但最终一致
  • Decimal 类型用于精确数值(boto3 要求)
  • 集合类型(如 courses)支持 {'数据库原理', '算法导论'}

教程

Amazon DynamoDB 从零到实战:Serverless 后端

1. 背景与概念

1.1 DynamoDB 设计哲学

DynamoDB 继承自 Amazon 的 Dynamo 论文,核心理念:

  • 永远可写(Always Writable):无主从切换
  • 最终一致:默认最终一致,可选强一致
  • 单表设计:不像 SQL 多表 JOIN,最佳实践是"一张表囊括所有实体"

1.2 关键概念

概念 说明
Partition Key 决定数据物理存储分区
Sort Key 分区内排序,支持范围查询
GSI 全局二级索引(可自定义分区/排序键)
LSI 本地二级索引(同分区键,不同排序键)
RCU/WCU 读/写容量单位(1 RCU = 4KB 强一致读/秒)
DAX DynamoDB 缓存加速层(微秒级)
Streams 变更数据捕获(触发 Lambda)

2. 分步实战:构建 Serverless 任务管理 API

场景

用 DynamoDB + AWS Lambda + API Gateway 构建无服务器任务管理 API。支持按用户查询、按状态过滤、分页。

步骤一:单表设计

# 表设计: Tasks
# PK (Partition Key): USER#<user_id>
# SK (Sort Key):     TASK#<task_id>
# 属性: title, status, priority, created_at, due_date

# 一条典型记录:
{
  "PK": "USER#alice",
  "SK": "TASK#001",
  "title": "完成实验报告",
  "status": "IN_PROGRESS",
  "priority": "HIGH",
  "created_at": "2024-09-01T10:00:00Z",
  "due_date": "2024-09-15T00:00:00Z"
}

步骤二:DynamoDB 操作封装

import boto3
import uuid
from datetime import datetime

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('Tasks')

class TaskRepository:
    @staticmethod
    def create(user_id, title, due_date=None):
        task = {
            'PK': f'USER#{user_id}',
            'SK': f'TASK#{uuid.uuid4().hex[:8]}',
            'title': title,
            'status': 'TODO',
            'priority': 'MEDIUM',
            'created_at': datetime.utcnow().isoformat(),
            'due_date': due_date
        }
        table.put_item(Item=task)
        return task

    @staticmethod
    def list_by_user(user_id, status=None, limit=20):
        expr = 'PK = :pk'
        values = {':pk': f'USER#{user_id}'}

        if status:
            expr += ' AND #st = :status'
            values[':status'] = status

        response = table.query(
            KeyConditionExpression=expr,
            ExpressionAttributeNames={'#st': 'status'} if status else {},
            ExpressionAttributeValues=values,
            Limit=limit,
            ScanIndexForward=False  # 最新的在前
        )
        return response['Items']

    @staticmethod
    def update_status(user_id, task_id, new_status):
        response = table.update_item(
            Key={'PK': f'USER#{user_id}', 'SK': f'TASK#{task_id}'},
            UpdateExpression='SET #st = :status, updated_at = :ts',
            ExpressionAttributeNames={'#st': 'status'},
            ExpressionAttributeValues={
                ':status': new_status,
                ':ts': datetime.utcnow().isoformat()
            },
            ReturnValues='ALL_NEW'
        )
        return response['Attributes']

    @staticmethod
    def delete(user_id, task_id):
        table.delete_item(
            Key={'PK': f'USER#{user_id}', 'SK': f'TASK#{task_id}'}
        )

步骤三:GSI 实现按状态全局查询

# 创建 GSI(在表创建时或通过 update_table)
# GSI: StatusIndex
#   PK: status (字符串)
#   SK: created_at (时间排序)

# 查询所有用户的 IN_PROGRESS 任务
def list_all_in_progress():
    response = table.query(
        IndexName='StatusIndex',
        KeyConditionExpression='#st = :status',
        ExpressionAttributeNames={'#st': 'status'},
        ExpressionAttributeValues={':status': 'IN_PROGRESS'}
    )
    return response['Items']

步骤四:集成 Lambda

# lambda_function.py
import json
from task_repository import TaskRepository

def lambda_handler(event, context):
    http_method = event['httpMethod']
    path = event['path']
    user_id = event['requestContext']['authorizer']['claims']['sub']

    if http_method == 'GET' and path == '/tasks':
        tasks = TaskRepository.list_by_user(user_id)
        return {'statusCode': 200, 'body': json.dumps(tasks)}

    elif http_method == 'POST' and path == '/tasks':
        body = json.loads(event['body'])
        task = TaskRepository.create(user_id, body['title'])
        return {'statusCode': 201, 'body': json.dumps(task)}

    elif http_method == 'PATCH' and '/tasks/' in path:
        task_id = path.split('/')[-1]
        body = json.loads(event['body'])
        updated = TaskRepository.update_status(user_id, task_id, body['status'])
        return {'statusCode': 200, 'body': json.dumps(updated)}

    return {'statusCode': 404, 'body': 'Not Found'}

3. 思考题

  1. 为什么 DynamoDB 推荐"单表设计"?在多表设计下 GSI 能否跨表?
  2. RCU/WCU 计费模式下,一次 Scan 全表 1GB 数据消耗多少 RCU?
  3. DynamoDB Streams + Lambda 可以实现哪些模式?(CQRS、事件溯源、实时通知)

参考资料

  1. [1] Alex DeBrie. The DynamoDB Book. 2023.
  2. [2] Amazon Web Services. Amazon DynamoDB 开发者指南. 2024.
  3. [3] Rick Houlihan. DynamoDB Single-Table Design 最佳实践. 2020.