SQLAlchemy

技术栈
工具链
sqlalchemypythonORM数据库SQL会话管理

概览

SQLAlchemy

SQLAlchemy 是 Python 最强大的数据库工具包和 ORM 框架,由 Mike Bayer 于 2006 年创建。它提供 Core(底层 SQL 表达式)和 ORM(面向对象映射)两层 API。

解决什么问题

  • 用 Python 对象代替原生 SQL 操作数据库
  • 提供统一的数据库抽象层,无缝切换 MySQL / PostgreSQL / SQLite
  • 通过 Session 管理事务和对象生命周期

关键特性

  • 两层架构:Core(SQL Expression Language)+ ORM(声明式映射)
  • 支持 MySQL、PostgreSQL、SQLite、Oracle、SQL Server 等主流数据库
  • Alembic 数据库迁移工具
  • 强大的 relationship 定义,支持 eager loading、lazy loading
  • 原生 SQL 与 ORM 混合使用,灵活度极高

安装

环境准备

  • 操作系统: Windows / macOS / Linux 均可
  • Python 版本: 3.7 及以上(推荐 3.10+)
  • 数据库驱动: 根据目标数据库安装对应的 DBAPI 驱动

安装命令

# 基础安装
pip install sqlalchemy

# 加 Alembic 数据库迁移工具
pip install sqlalchemy alembic

# 按数据库装驱动
pip install psycopg2-binary    # PostgreSQL
pip install pymysql            # MySQL / MariaDB
pip install aioodbc            # SQL Server

# 验证安装
python -c "import sqlalchemy; print(sqlalchemy.__version__)"

常用组合安装

# 全栈 Web + 数据库
pip install sqlalchemy alembic psycopg2-binary

常见安装问题

Q: psycopg2 安装失败(缺少 pg_config)

# macOS
brew install postgresql

# Ubuntu/Debian
sudo apt install libpq-dev python3-dev

# 或直接用二进制包
pip install psycopg2-binary

Q: MySQL 连接报 cryptography 错误

pip install cryptography pymysql
# 连接串使用
# mysql+pymysql://user:pass@localhost/db

Q: 导入 SQLAlchemy 时绿色线程警告

pip install greenlet

Q: Alembic 初始化找不到 SQLAlchemy 模型
确保 alembic env.py 中正确导入了 Base metadata:

from myapp.models import Base
target_metadata = Base.metadata

示例

SQLAlchemy Hello World:声明式 ORM 入门

目标

使用 SQLAlchemy 2.0 声明式模型创建表、插入数据、查询数据,展示 Core 和 ORM 两种模式。

完整代码

from sqlalchemy import create_engine, select, func
from sqlalchemy.orm import Session, DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy import String, Integer, ForeignKey, DateTime
from datetime import datetime


# === 1. 连接数据库 ===
engine = create_engine("sqlite:///demo.db", echo=True)


# === 2. 声明式模型 ===
class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(50))
    email: Mapped[str] = mapped_column(String(100), unique=True)
    created_at: Mapped[datetime] = mapped_column(default=datetime.now)

    posts: Mapped[list["Post"]] = relationship(back_populates="author")

    def __repr__(self):
        return f"<User(id={self.id}, name='{self.name}')>"


class Post(Base):
    __tablename__ = "posts"

    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(100))
    content: Mapped[str] = mapped_column(String(500))
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))

    author: Mapped["User"] = relationship(back_populates="posts")


# === 3. 建表 ===
Base.metadata.create_all(engine)


# === 4. CRUD 操作 ===
with Session(engine) as session:
    # CREATE
    alice = User(name="Alice", email="alice@example.com")
    bob = User(name="Bob", email="bob@example.com")
    session.add_all([alice, bob])
    session.flush()  # 获取 id

    session.add_all([
        Post(title="Python Tips", content="Use list comprehensions!", user_id=alice.id),
        Post(title="SQL Guide", content="Always use indexes.", user_id=alice.id),
        Post(title="Flask vs FastAPI", content="Depends...", user_id=bob.id),
    ])
    session.commit()

    # READ — 查询所有用户
    users = session.execute(select(User)).scalars().all()
    print("\n所有用户:", users)

    # READ — 带条件的查询
    alice = session.execute(
        select(User).where(User.name == "Alice")
    ).scalar_one()
    print(f"\n{alice.name} 的文章:")
    for post in alice.posts:
        print(f"  - {post.title}")

    # AGGREGATE
    from sqlalchemy import func
    count_stmt = select(User.name, func.count(Post.id)).join(Post).group_by(User.name)
    for name, cnt in session.execute(count_stmt):
        print(f"  {name}: {cnt} 篇文章")

    # UPDATE
    session.execute(select(Post)).scalars().first()
    bob_post = session.execute(
        select(Post).where(Post.author.has(name="Bob"))
    ).scalar_one()
    bob_post.title = "Flask vs FastAPI (Updated)"
    session.commit()

    # DELETE
    session.delete(bob)
    session.commit()
    remaining = session.execute(select(func.count(User.id))).scalar()
    print(f"\n剩余用户数: {remaining}")

运行步骤

pip install sqlalchemy
python hello_sqlalchemy.py

预期输出

所有用户: [<User(id=1, name='Alice')>, <User(id=2, name='Bob')>]

Alice 的文章:
  - Python Tips
  - SQL Guide
  Alice: 2 篇文章
  Bob: 1 篇文章

剩余用户数: 1

教程

SQLAlchemy 数据库实战指南

背景

数据库是大多数应用的持久化层。SQLAlchemy 让你既能在高级抽象上操作对象,又能随时下沉到原生 SQL 级别——它不是把你束缚在 ORM 里,而是给你一个从 Python 到 SQL 的完整工具箱。


第 1 章:Core vs ORM

# Core 层 — 直接 SQL 表达式
from sqlalchemy import text, select

with engine.connect() as conn:
    # 原生 SQL
    rows = conn.execute(text("SELECT * FROM users WHERE active = :active"), {"active": True})

    # SQL 表达式
    stmt = select(User).where(User.email.like("%@example.com"))
    result = conn.execute(stmt)


# ORM 层 — 对象操作
with Session(engine) as session:
    user = session.get(User, 1)
    user.name = "New Name"
    session.commit()  # 自动生成 UPDATE

第 2 章:关系加载策略

from sqlalchemy.orm import joinedload, selectinload, lazyload

# N+1 问题:默认 lazy loading,每次访问 post.author 发一条 SQL
# 解决方案 1:joinedload(JOIN 一次查出)
users = session.execute(
    select(User).options(joinedload(User.posts))
).unique().scalars().all()

# 解决方案 2:selectinload(IN 子查询,适合大数据量)
users = session.execute(
    select(User).options(selectinload(User.posts))
).unique().scalars().all()

第 3 章:事务与隔离级别

from sqlalchemy import select, update

# 显式事务
with Session(engine) as session:
    with session.begin():
        user = session.get(User, 1)
        user.balance -= 100
        session.add(
            TransactionLog(user_id=1, amount=-100, type="withdraw")
        )
    # 自动 commit,出错自动 rollback


# 悲观锁(SELECT ... FOR UPDATE)
stmt = select(User).where(User.id == 1).with_for_update()
user = session.execute(stmt).scalar_one()

第 4 章:Alembic 数据库迁移

# 初始化
alembic init alembic

# 生成迁移脚本
alembic revision --autogenerate -m "add users table"

# 应用迁移
alembic upgrade head

# 回滚
alembic downgrade -1

# 查看历史
alembic history

第 5 章:与 FastAPI 集成

from fastapi import FastAPI, Depends
from sqlalchemy.orm import Session

app = FastAPI()

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

@app.get("/users/{user_id}")
def read_user(user_id: int, db: Session = Depends(get_db)):
    user = db.get(User, user_id)
    if not user:
        raise HTTPException(status_code=404)
    return {"id": user.id, "name": user.name}

思考题

  1. joinedloadselectinload 各自的性能特征是什么?在什么情况下该选哪个?
  2. SQLAlchemy Session 的 expire_on_commit 属性有什么作用?
  3. AsyncSession + asyncpg 模式下,relationship 的 lazy loading 还能工作吗?