02-dependency-injection-auth

知识库
知识库文档
/tech-stacks/fastapi/examples/02-dependency-injection-auth.md

文档

FastAPI 依赖注入与 JWT 认证

目标

  • 掌握 FastAPI 依赖注入系统(Depends
  • 实现 JWT (OAuth2 Password Flow) 认证
  • 实现基于角色的权限控制
  • 理解子依赖(依赖的依赖)

完整代码

# main.py
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from typing import Optional, List
from datetime import datetime, timedelta, timezone
from passlib.context import CryptContext
from jose import JWTError, jwt
import os

# ---------- 配置 ----------
SECRET_KEY = os.getenv("SECRET_KEY", "dev-secret-change-in-production-12345")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# ---------- 密码哈希 ----------
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# ---------- OAuth2 方案 ----------
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")

# ---------- 模拟用户数据库 ----------
fake_users_db = {
    "alice": {
        "username": "alice",
        "full_name": "Alice Wang",
        "email": "alice@example.com",
        "hashed_password": pwd_context.hash("secret123"),
        "role": "admin",
        "disabled": False,
    },
    "bob": {
        "username": "bob",
        "full_name": "Bob Li",
        "email": "bob@example.com",
        "hashed_password": pwd_context.hash("secret456"),
        "role": "user",
        "disabled": False,
    },
}

# ---------- Pydantic 模型 ----------
class Token(BaseModel):
    access_token: str
    token_type: str = "bearer"


class TokenData(BaseModel):
    username: Optional[str] = None


class User(BaseModel):
    username: str
    email: str
    full_name: str
    role: str
    disabled: bool


class UserOut(BaseModel):
    username: str
    email: str
    full_name: str
    role: str


# ---------- 工具函数 ----------
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


def get_user(username: str) -> Optional[dict]:
    return fake_users_db.get(username)


def authenticate_user(username: str, password: str):
    user = get_user(username)
    if not user or not verify_password(password, user["hashed_password"]):
        return None
    return user


def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


# ---------- 依赖项(核心:依赖注入链)----------

async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
    """从 JWT Token 解析当前用户 —— 核心认证依赖"""
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="无法验证凭据",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception

    user = get_user(token_data.username)
    if user is None:
        raise credentials_exception
    return User(**user)


async def get_current_active_user(current_user: User = Depends(get_current_user)):
    """依赖 get_current_user —— 检查用户是否启用"""
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="用户已被禁用")
    return current_user


def require_role(required_role: str):
    """依赖工厂 —— 生成角色检查依赖"""
    async def role_checker(current_user: User = Depends(get_current_active_user)):
        if current_user.role != required_role and current_user.role != "admin":
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"需要 {required_role} 或 admin 权限",
            )
        return current_user
    return role_checker


# ---------- FastAPI 应用 ----------
app = FastAPI(title="FastAPI 认证示例", version="2.0.0")


# ---------- 路由 ----------

@app.post("/auth/login", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    """OAuth2 密码流登录 —— 获取 JWT Token"""
    user = authenticate_user(form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="用户名或密码错误",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token = create_access_token(
        data={"sub": user["username"], "role": user["role"]},
        expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
    )
    return Token(access_token=access_token)


@app.get("/users/me", response_model=UserOut)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    """获取当前用户信息(需认证)"""
    return current_user


@app.get("/admin/dashboard")
async def admin_dashboard(
    current_user: User = Depends(require_role("admin")),
):
    """管理员专属 —— 需要 admin 角色"""
    return {
        "message": f"欢迎管理员 {current_user.full_name}",
        "users": list(fake_users_db.keys()),
    }


@app.get("/public/health")
async def health_check():
    """公开接口 —— 无需认证"""
    return {"status": "healthy", "users_count": len(fake_users_db)}

测试

# 1. 获取 Token(alice / secret123)
curl -X POST http://127.0.0.1:8000/auth/login \
  -H "Content-Type: application/x-www-form-urlencoded" \
  -d "username=alice&password=secret123"

# 2. 使用 Token 访问受保护接口
TOKEN="<从上面获取的 access_token>"
curl http://127.0.0.1:8000/users/me \
  -H "Authorization: Bearer $TOKEN"

# 3. 访问管理员接口
curl http://127.0.0.1:8000/admin/dashboard \
  -H "Authorization: Bearer $TOKEN"

# 4. 测试 Bob(普通用户)无法访问管理员接口
# 先获取 bob 的 token...

依赖注入原理图

请求到达
    │
    ▼
Depends(oauth2_scheme)     ← 从 Header 提取 Bearer Token
    │
    ▼
Depends(get_current_user)   ← 解码 JWT → 查询用户
    │         │
    │         └──→ 可能被 get_current_active_user 依赖
    │             可能被 require_role("admin") 依赖
    │
    ▼
路由处理函数                 ← 获得已验证的用户对象

关键要点

概念 说明
Depends() 依赖注入的核心,可嵌套形成依赖链
OAuth2PasswordBearer 标准 OAuth2 密码流,Swagger 自动显示 🔓 按钮
依赖工厂 返回 callable 的函数,用于参数化依赖(如 require_role
OAuth2PasswordRequestForm 标准登录表单(username + password + scope)
子依赖 get_current_active_userget_current_useroauth2_scheme
python-jose JWT 编码/解码库
passlib[bcrypt] 密码哈希,bcrypt 算法(自动加盐)

信息

路径
/tech-stacks/fastapi/examples/02-dependency-injection-auth.md
更新时间
2026/5/30