FastAPI 依赖注入与 JWT 认证
目标
- 掌握 FastAPI 依赖注入系统(
Depends)
- 实现 JWT (OAuth2 Password Flow) 认证
- 实现基于角色的权限控制
- 理解子依赖(依赖的依赖)
完整代码
# main.py
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from typing import Optional, List
from datetime import datetime, timedelta, timezone
from passlib.context import CryptContext
from jose import JWTError, jwt
import os
# ---------- 配置 ----------
SECRET_KEY = os.getenv("SECRET_KEY", "dev-secret-change-in-production-12345")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# ---------- 密码哈希 ----------
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# ---------- OAuth2 方案 ----------
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
# ---------- 模拟用户数据库 ----------
fake_users_db = {
"alice": {
"username": "alice",
"full_name": "Alice Wang",
"email": "alice@example.com",
"hashed_password": pwd_context.hash("secret123"),
"role": "admin",
"disabled": False,
},
"bob": {
"username": "bob",
"full_name": "Bob Li",
"email": "bob@example.com",
"hashed_password": pwd_context.hash("secret456"),
"role": "user",
"disabled": False,
},
}
# ---------- Pydantic 模型 ----------
class Token(BaseModel):
access_token: str
token_type: str = "bearer"
class TokenData(BaseModel):
username: Optional[str] = None
class User(BaseModel):
username: str
email: str
full_name: str
role: str
disabled: bool
class UserOut(BaseModel):
username: str
email: str
full_name: str
role: str
# ---------- 工具函数 ----------
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_user(username: str) -> Optional[dict]:
return fake_users_db.get(username)
def authenticate_user(username: str, password: str):
user = get_user(username)
if not user or not verify_password(password, user["hashed_password"]):
return None
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
# ---------- 依赖项(核心:依赖注入链)----------
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
"""从 JWT Token 解析当前用户 —— 核心认证依赖"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无法验证凭据",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(token_data.username)
if user is None:
raise credentials_exception
return User(**user)
async def get_current_active_user(current_user: User = Depends(get_current_user)):
"""依赖 get_current_user —— 检查用户是否启用"""
if current_user.disabled:
raise HTTPException(status_code=400, detail="用户已被禁用")
return current_user
def require_role(required_role: str):
"""依赖工厂 —— 生成角色检查依赖"""
async def role_checker(current_user: User = Depends(get_current_active_user)):
if current_user.role != required_role and current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"需要 {required_role} 或 admin 权限",
)
return current_user
return role_checker
# ---------- FastAPI 应用 ----------
app = FastAPI(title="FastAPI 认证示例", version="2.0.0")
# ---------- 路由 ----------
@app.post("/auth/login", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
"""OAuth2 密码流登录 —— 获取 JWT Token"""
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = create_access_token(
data={"sub": user["username"], "role": user["role"]},
expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
)
return Token(access_token=access_token)
@app.get("/users/me", response_model=UserOut)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
"""获取当前用户信息(需认证)"""
return current_user
@app.get("/admin/dashboard")
async def admin_dashboard(
current_user: User = Depends(require_role("admin")),
):
"""管理员专属 —— 需要 admin 角色"""
return {
"message": f"欢迎管理员 {current_user.full_name}",
"users": list(fake_users_db.keys()),
}
@app.get("/public/health")
async def health_check():
"""公开接口 —— 无需认证"""
return {"status": "healthy", "users_count": len(fake_users_db)}
测试
curl -X POST http:
-H "Content-Type: application/x-www-form-urlencoded" \
-d "username=alice&password=secret123"
TOKEN="<从上面获取的 access_token>"
curl http:
-H "Authorization: Bearer $TOKEN"
curl http:
-H "Authorization: Bearer $TOKEN"
依赖注入原理图
请求到达
│
▼
Depends(oauth2_scheme) ← 从 Header 提取 Bearer Token
│
▼
Depends(get_current_user) ← 解码 JWT → 查询用户
│ │
│ └──→ 可能被 get_current_active_user 依赖
│ 可能被 require_role("admin") 依赖
│
▼
路由处理函数 ← 获得已验证的用户对象
关键要点
| 概念 |
说明 |
Depends() |
依赖注入的核心,可嵌套形成依赖链 |
OAuth2PasswordBearer |
标准 OAuth2 密码流,Swagger 自动显示 🔓 按钮 |
| 依赖工厂 |
返回 callable 的函数,用于参数化依赖(如 require_role) |
OAuth2PasswordRequestForm |
标准登录表单(username + password + scope) |
| 子依赖 |
get_current_active_user → get_current_user → oauth2_scheme |
python-jose |
JWT 编码/解码库 |
passlib[bcrypt] |
密码哈希,bcrypt 算法(自动加盐) |