FastAPI Hello World —— 类型驱动的 API 开发
目标
创建一个最小的 FastAPI 应用,理解:
- FastAPI 的装饰器路由
- Pydantic 数据模型与类型验证
- 自动 OpenAPI 文档(Swagger / ReDoc)
- ASGI 异步支持
完整代码
# main.py
from fastapi import FastAPI, Path, Query, HTTPException
from pydantic import BaseModel, Field, EmailStr
from typing import Optional, List
from enum import Enum
from datetime import datetime
# 创建 FastAPI 实例
app = FastAPI(
title="我的第一个 FastAPI",
description="FastAPI Hello World 示例",
version="1.0.0",
docs_url="/docs", # Swagger UI
redoc_url="/redoc", # ReDoc
)
# ---------- Pydantic 数据模型 ----------
class UserRole(str, Enum):
ADMIN = "admin"
USER = "user"
GUEST = "guest"
class User(BaseModel):
"""用户创建请求模型"""
username: str = Field(..., min_length=3, max_length=50, description="用户名")
email: EmailStr = Field(..., description="邮箱地址")
age: int = Field(ge=0, le=150, description="年龄")
role: UserRole = Field(default=UserRole.USER, description="用户角色")
tags: List[str] = Field(default=[], description="标签列表")
class UserResponse(BaseModel):
"""用户响应模型"""
id: int
username: str
email: str
role: UserRole
created_at: datetime
model_config = {"from_attributes": True} # 支持 ORM 对象
# ---------- 路由 ----------
@app.get("/")
async def root():
"""根路由 —— Hello World"""
return {
"message": "🚀 Hello, FastAPI!",
"docs": "/docs",
"redoc": "/redoc",
}
@app.get("/hello/{name}")
async def hello(
name: str = Path(..., min_length=1, max_length=50, description="用户名"),
greeting: str = Query("Hello", description="问候语"),
):
"""路径参数 + 查询参数"""
return {"message": f"{greeting}, {name}!"}
@app.post("/users", response_model=UserResponse, status_code=201)
async def create_user(user: User):
"""创建用户 —— Pydantic 自动验证请求体"""
# 在实际应用中此处写数据库
if user.username == "admin":
raise HTTPException(status_code=400, detail="用户名 'admin' 已被保留")
return UserResponse(
id=42,
username=user.username,
email=user.email,
role=user.role,
created_at=datetime.now(),
)
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(
user_id: int = Path(..., ge=1, description="用户 ID"),
):
"""获取用户详情"""
if user_id > 1000:
raise HTTPException(status_code=404, detail="用户不存在")
return UserResponse(
id=user_id,
username="demo_user",
email="demo@example.com",
role=UserRole.USER,
created_at=datetime.now(),
)
@app.get("/items/")
async def list_items(
skip: int = Query(0, ge=0),
limit: int = Query(10, ge=1, le=100),
q: Optional[str] = Query(None, min_length=3),
):
"""分页查询 + 搜索"""
items = [{"id": i, "name": f"Item {i}"} for i in range(1, 31)]
if q:
items = [i for i in items if q.lower() in i["name"].lower()]
return {"items": items[skip : skip + limit], "total": len(items)}
# ---------- 自定义异常处理器 ----------
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
from fastapi.responses import JSONResponse
return JSONResponse(
status_code=exc.status_code,
content={
"error": exc.detail,
"path": str(request.url.path),
},
)
运行步骤
uvicorn main:app --reload
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
访问
测试 API
curl -X POST http:
-H "Content-Type: application/json" \
-d '{"username":"vibe","email":"vibe@test.com","age":25,"role":"admin","tags":["python","api"]}'
curl http:
curl "http://127.0.0.1:8000/items/?skip=0&limit=5&q=item"
curl -X POST http:
-H "Content-Type: application/json" \
-d '{"username":"abc","email":"bad","age":200,"role":"god"}'
关键要点
| 概念 |
说明 |
Body(...) / Path(...) / Query(...) |
参数来源标注 + 验证规则 |
... (Ellipsis) |
表示字段必填 |
Field(ge=0, le=150) |
数值范围约束 |
response_model |
响应模型,自动过滤多余字段、转换类型 |
Enum |
枚举类型,Swagger 自动显示下拉选择 |
HTTPException |
标准 HTTP 异常,返回结构化错误 |
async def |
异步视图,支持非阻塞 IO(数据库、HTTP 调用) |
FastAPI 依赖注入与 JWT 认证
目标
- 掌握 FastAPI 依赖注入系统(
Depends)
- 实现 JWT (OAuth2 Password Flow) 认证
- 实现基于角色的权限控制
- 理解子依赖(依赖的依赖)
完整代码
# main.py
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from typing import Optional, List
from datetime import datetime, timedelta, timezone
from passlib.context import CryptContext
from jose import JWTError, jwt
import os
# ---------- 配置 ----------
SECRET_KEY = os.getenv("SECRET_KEY", "dev-secret-change-in-production-12345")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# ---------- 密码哈希 ----------
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# ---------- OAuth2 方案 ----------
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
# ---------- 模拟用户数据库 ----------
fake_users_db = {
"alice": {
"username": "alice",
"full_name": "Alice Wang",
"email": "alice@example.com",
"hashed_password": pwd_context.hash("secret123"),
"role": "admin",
"disabled": False,
},
"bob": {
"username": "bob",
"full_name": "Bob Li",
"email": "bob@example.com",
"hashed_password": pwd_context.hash("secret456"),
"role": "user",
"disabled": False,
},
}
# ---------- Pydantic 模型 ----------
class Token(BaseModel):
access_token: str
token_type: str = "bearer"
class TokenData(BaseModel):
username: Optional[str] = None
class User(BaseModel):
username: str
email: str
full_name: str
role: str
disabled: bool
class UserOut(BaseModel):
username: str
email: str
full_name: str
role: str
# ---------- 工具函数 ----------
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_user(username: str) -> Optional[dict]:
return fake_users_db.get(username)
def authenticate_user(username: str, password: str):
user = get_user(username)
if not user or not verify_password(password, user["hashed_password"]):
return None
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
# ---------- 依赖项(核心:依赖注入链)----------
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
"""从 JWT Token 解析当前用户 —— 核心认证依赖"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无法验证凭据",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(token_data.username)
if user is None:
raise credentials_exception
return User(**user)
async def get_current_active_user(current_user: User = Depends(get_current_user)):
"""依赖 get_current_user —— 检查用户是否启用"""
if current_user.disabled:
raise HTTPException(status_code=400, detail="用户已被禁用")
return current_user
def require_role(required_role: str):
"""依赖工厂 —— 生成角色检查依赖"""
async def role_checker(current_user: User = Depends(get_current_active_user)):
if current_user.role != required_role and current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"需要 {required_role} 或 admin 权限",
)
return current_user
return role_checker
# ---------- FastAPI 应用 ----------
app = FastAPI(title="FastAPI 认证示例", version="2.0.0")
# ---------- 路由 ----------
@app.post("/auth/login", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
"""OAuth2 密码流登录 —— 获取 JWT Token"""
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = create_access_token(
data={"sub": user["username"], "role": user["role"]},
expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
)
return Token(access_token=access_token)
@app.get("/users/me", response_model=UserOut)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
"""获取当前用户信息(需认证)"""
return current_user
@app.get("/admin/dashboard")
async def admin_dashboard(
current_user: User = Depends(require_role("admin")),
):
"""管理员专属 —— 需要 admin 角色"""
return {
"message": f"欢迎管理员 {current_user.full_name}",
"users": list(fake_users_db.keys()),
}
@app.get("/public/health")
async def health_check():
"""公开接口 —— 无需认证"""
return {"status": "healthy", "users_count": len(fake_users_db)}
测试
curl -X POST http:
-H "Content-Type: application/x-www-form-urlencoded" \
-d "username=alice&password=secret123"
TOKEN="<从上面获取的 access_token>"
curl http:
-H "Authorization: Bearer $TOKEN"
curl http:
-H "Authorization: Bearer $TOKEN"
依赖注入原理图
请求到达
│
▼
Depends(oauth2_scheme) ← 从 Header 提取 Bearer Token
│
▼
Depends(get_current_user) ← 解码 JWT → 查询用户
│ │
│ └──→ 可能被 get_current_active_user 依赖
│ 可能被 require_role("admin") 依赖
│
▼
路由处理函数 ← 获得已验证的用户对象
关键要点
| 概念 |
说明 |
Depends() |
依赖注入的核心,可嵌套形成依赖链 |
OAuth2PasswordBearer |
标准 OAuth2 密码流,Swagger 自动显示 🔓 按钮 |
| 依赖工厂 |
返回 callable 的函数,用于参数化依赖(如 require_role) |
OAuth2PasswordRequestForm |
标准登录表单(username + password + scope) |
| 子依赖 |
get_current_active_user → get_current_user → oauth2_scheme |
python-jose |
JWT 编码/解码库 |
passlib[bcrypt] |
密码哈希,bcrypt 算法(自动加盐) |