PyTorch Tensor 基础 —— 从 NumPy 到 GPU
目标
- 理解 Tensor 的创建、操作、形状变换
- 掌握 CPU ↔ GPU 迁移
- 理解 Tensor 与 NumPy ndarray 的互操作
- 掌握 autograd 自动微分基础
完整代码
import torch
import numpy as np
# ============================================================
# 1. Tensor 创建
# ============================================================
# 从列表创建
a = torch.tensor([1, 2, 3, 4])
print(f"从列表创建: {a}")
# 从 NumPy 创建(共享内存)
np_arr = np.array([5, 6, 7, 8])
b = torch.from_numpy(np_arr)
np_arr[0] = 99
print(f"NumPy → Tensor (共享内存): {b}") # b[0] 也变成 99
# 特殊张量
zeros = torch.zeros(3, 4) # 全零 3×4
ones = torch.ones(2, 3) # 全一 2×3
rand = torch.rand(3, 3) # [0,1) 均匀分布
randn = torch.randn(3, 3) # 标准正态分布
arange = torch.arange(0, 10, 2) # 类似 Python range
linspace = torch.linspace(0, 1, 5) # 等分
# 指定数据类型
int_tensor = torch.tensor([1, 2, 3], dtype=torch.int32)
float_tensor = torch.tensor([1, 2, 3], dtype=torch.float32)
print(f"\nint32: {int_tensor.dtype}, float32: {float_tensor.dtype}")
# ============================================================
# 2. Tensor 属性
# ============================================================
x = torch.randn(4, 3, 28, 28) # (batch, channel, height, width)
print(f"\n形状: {x.shape}")
print(f"维度数: {x.dim()}")
print(f"元素总数: {x.numel()}")
print(f"数据类型: {x.dtype}")
print(f"所在设备: {x.device}")
# ============================================================
# 3. 索引与切片
# ============================================================
t = torch.arange(1, 13).reshape(3, 4)
print(f"\n原始张量:\n{t}")
print(f"第1行: {t[0]}")
print(f"第2列: {t[:, 1]}")
print(f"前两行后两列:\n{t[:2, -2:]}")
print(f"按条件筛选: {t[t > 5]}")
# 花式索引
indices = torch.tensor([0, 2])
print(f"选取第0和第2行:\n{t[indices]}")
# ============================================================
# 4. 形状操作
# ============================================================
a = torch.arange(8)
print(f"\n原始: {a}")
# reshape / view(view 要求内存连续)
print(f"reshape 2×4:\n{a.reshape(2, 4)}")
print(f"reshape 4×2:\n{a.view(4, 2)}")
# 转置
m = torch.randn(3, 4)
print(f"转置:\n{m.T} 形状: {m.T.shape}")
# 添加维度
x = torch.tensor([1, 2, 3])
print(f"unsqueeze(0): {x.unsqueeze(0).shape}") # (1, 3)
print(f"unsqueeze(1): {x.unsqueeze(1).shape}") # (3, 1)
# 移除维度
y = torch.randn(1, 3, 1, 5)
print(f"squeeze: {y.squeeze().shape}") # (3, 5)
# 拼接
a = torch.randn(2, 3)
b = torch.randn(2, 3)
print(f"dim=0 拼接: {torch.cat([a, b], dim=0).shape}") # (4, 3)
print(f"dim=1 拼接: {torch.cat([a, b], dim=1).shape}") # (2, 6)
print(f"stack 新维度: {torch.stack([a, b], dim=0).shape}") # (2, 2, 3)
# ============================================================
# 5. 数学运算
# ============================================================
a = torch.randn(3, 4)
b = torch.randn(3, 4)
# 逐元素运算
add = a + b
mul = a * b # 逐元素乘法(不是矩阵乘法)
div = a / b
# 矩阵乘法
c = torch.randn(4, 5)
matmul1 = a @ c # Python 3.5+
matmul2 = torch.matmul(a, c)
matmul3 = torch.mm(a, c) # 仅限 2D
# 聚合
print(f"\n求和: {a.sum()}, 均值: {a.mean()}, 最大值: {a.max()}")
print(f"按维度求和: {a.sum(dim=0).shape}") # (4,)
print(f"按维度求和保持维度: {a.sum(dim=0, keepdim=True).shape}") # (1, 4)
# ============================================================
# 6. GPU 迁移
# ============================================================
if torch.cuda.is_available():
device = torch.device("cuda")
x_gpu = x.to(device)
print(f"\n✅ GPU Tensor: {x_gpu.device}")
# GPU 运算
y_gpu = torch.randn(3, 3, device="cuda")
result = x_gpu[:3, :3] @ y_gpu
# 回传到 CPU
result_cpu = result.cpu()
print(f"✅ 回传 CPU: {result_cpu.device}")
# 直接创建在 GPU 上
gpu_tensor = torch.ones(4, 4, device="cuda")
# ============================================================
# 7. autograd 自动微分
# ============================================================
# 需要计算梯度
x = torch.tensor([2.0, 3.0], requires_grad=True)
w = torch.tensor([0.5, 1.0], requires_grad=True)
b = torch.tensor(0.1, requires_grad=True)
# 前向传播:y = w·x + b
y = (w * x).sum() + b
print(f"\ny = {y.item():.4f}")
# 反向传播
y.backward()
print(f"∂y/∂x = {x.grad}") # 应等于 w = [0.5, 1.0]
print(f"∂y/∂w = {w.grad}") # 应等于 x = [2.0, 3.0]
print(f"∂y/∂b = {b.grad}") # 应等于 1.0
# 梯度清零(训练循环中必须)
x.grad.zero_()
w.grad.zero_()
b.grad.zero_()
运行输出示例
从列表创建: tensor([1, 2, 3, 4])
NumPy → Tensor (共享内存): tensor([99, 6, 7, 8])
形状: torch.Size([4, 3, 28, 28])
维度数: 4
元素总数: 9408
y = 4.1000
∂y/∂x = tensor([0.5000, 1.0000])
∂y/∂w = tensor([2., 3.])
∂y/∂b = tensor(1.)
关键要点
| 概念 |
说明 |
torch.tensor() vs torch.Tensor() |
前者是工厂函数(复制数据),后者是构造器(未初始化) |
from_numpy() |
与 NumPy 共享内存,修改会互相影响 |
.to(device) |
通用设备迁移(CPU / CUDA / MPS) |
requires_grad=True |
标记需要追踪梯度 |
.backward() |
自动计算所有 requires_grad=True 张量的梯度 |
.grad |
存储计算出的梯度 |
torch.no_grad() |
上下文管理器,禁用梯度计算(推理时用) |
PyTorch 神经网络 —— MNIST 手写数字识别
目标
- 构建完整的训练/验证/测试 Pipeline
- 掌握
nn.Module、DataLoader、optimizer 三大组件
- 理解训练循环(forward → loss → backward → step)
- 使用 GPU 加速训练
完整代码
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
from tqdm import tqdm
# ============================================================
# 0. 配置
# ============================================================
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
BATCH_SIZE = 64
EPOCHS = 5
LR = 0.001
print(f"使用设备: {DEVICE}")
# ============================================================
# 1. 数据加载与预处理
# ============================================================
transform = transforms.Compose([
transforms.ToTensor(), # 0-255 → 0-1,HWC → CHW
transforms.Normalize((0.1307,), (0.3081,)) # MNIST 的均值和标准差
])
train_dataset = datasets.MNIST(
root="./data", train=True, download=True, transform=transform
)
test_dataset = datasets.MNIST(
root="./data", train=False, download=True, transform=transform
)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)
print(f"训练集大小: {len(train_dataset)}, 测试集大小: {len(test_dataset)}")
# ============================================================
# 2. 定义模型
# ============================================================
class CNN(nn.Module):
"""简单的卷积神经网络"""
def __init__(self, num_classes=10):
super().__init__()
# 输入: (1, 28, 28)
self.conv1 = nn.Conv2d(1, 16, kernel_size=3, padding=1) # → (16, 28, 28)
self.bn1 = nn.BatchNorm2d(16)
self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1) # → (32, 14, 14)
self.bn2 = nn.BatchNorm2d(32)
self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1) # → (64, 7, 7)
self.bn3 = nn.BatchNorm2d(64)
self.pool = nn.MaxPool2d(2, 2) # 每次减半尺寸
self.dropout = nn.Dropout(0.3)
self.fc1 = nn.Linear(64 * 3 * 3, 128)
self.fc2 = nn.Linear(128, num_classes)
def forward(self, x):
x = F.relu(self.bn1(self.conv1(x)))
x = self.pool(x) # (16, 14, 14)
x = F.relu(self.bn2(self.conv2(x)))
x = self.pool(x) # (32, 7, 7)
x = F.relu(self.bn3(self.conv3(x)))
x = self.pool(x) # (64, 3, 3)
x = x.view(x.size(0), -1) # 展平
x = F.relu(self.fc1(x))
x = self.dropout(x)
x = self.fc2(x)
return x
# 实例化
model = CNN(num_classes=10).to(DEVICE)
print(f"模型参数量: {sum(p.numel() for p in model.parameters()):,}")
# ============================================================
# 3. 损失函数与优化器
# ============================================================
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LR)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=2, gamma=0.5)
# ============================================================
# 4. 训练与评估函数
# ============================================================
def train_epoch(model, loader, criterion, optimizer, device):
model.train()
running_loss = 0.0
correct = 0
total = 0
for images, labels in tqdm(loader, desc="训练", leave=False):
images, labels = images.to(device), labels.to(device)
# forward
outputs = model(images)
loss = criterion(outputs, labels)
# backward
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 统计
running_loss += loss.item() * images.size(0)
_, predicted = torch.max(outputs, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
avg_loss = running_loss / total
accuracy = correct / total
return avg_loss, accuracy
@torch.no_grad()
def evaluate(model, loader, criterion, device):
model.eval()
running_loss = 0.0
correct = 0
total = 0
for images, labels in loader:
images, labels = images.to(device), labels.to(device)
outputs = model(images)
loss = criterion(outputs, labels)
running_loss += loss.item() * images.size(0)
_, predicted = torch.max(outputs, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
avg_loss = running_loss / total
accuracy = correct / total
return avg_loss, accuracy
# ============================================================
# 5. 训练循环
# ============================================================
history = {"train_loss": [], "train_acc": [], "test_loss": [], "test_acc": []}
for epoch in range(1, EPOCHS + 1):
print(f"\n{'='*40}\nEpoch {epoch}/{EPOCHS}")
train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, DEVICE)
test_loss, test_acc = evaluate(model, test_loader, criterion, DEVICE)
scheduler.step()
history["train_loss"].append(train_loss)
history["train_acc"].append(train_acc)
history["test_loss"].append(test_loss)
history["test_acc"].append(test_acc)
print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2%}")
print(f"Test Loss: {test_loss:.4f} | Test Acc: {test_acc:.2%}")
print(f"\n✅ 训练完成!最终测试准确率: {test_acc:.2%}")
# ============================================================
# 6. 保存模型
# ============================================================
torch.save({
"epoch": EPOCHS,
"model_state_dict": model.state_dict(),
"optimizer_state_dict": optimizer.state_dict(),
"test_acc": test_acc,
}, "mnist_cnn.pth")
print("模型已保存为 mnist_cnn.pth")
# ============================================================
# 7. 预测单个样本
# ============================================================
model.eval()
sample, label = test_dataset[0]
with torch.no_grad():
output = model(sample.unsqueeze(0).to(DEVICE))
prob = F.softmax(output, dim=1)
pred = torch.argmax(prob, dim=1).item()
print(f"\n实际数字: {label}")
print(f"预测数字: {pred}")
print(f"各类概率: {prob.cpu().numpy().round(4)}")
预期输出(Epoch 5)
训练集大小: 60000, 测试集大小: 10000
模型参数量: 118,474
Epoch 5/5
Train Loss: 0.0123 | Train Acc: 99.52%
Test Loss: 0.0214 | Test Acc: 99.31%
✅ 训练完成!最终测试准确率: 99.31%
训练 Pipeline 图解
for epoch in range(EPOCHS):
for batch in DataLoader:
images, labels → to(device)
# ① 前向传播
outputs = model(images)
loss = criterion(outputs, labels)
# ② 反向传播
optimizer.zero_grad() # 清空旧梯度
loss.backward() # 计算新梯度
optimizer.step() # 更新参数
# ③ 学习率调整
scheduler.step()
# ④ 验证(torch.no_grad())
evaluate(model, test_loader)
关键要点
| 概念 |
说明 |
nn.Module |
所有神经网络层的基类,定义 forward() |
DataLoader |
自动批处理、打乱、多线程加载 |
transforms.Compose |
数据预处理流水线 |
optimizer.zero_grad() |
必须! 否则梯度会累积 |
model.train() / model.eval() |
切换 Dropout/BN 行为 |
torch.no_grad() |
推理时禁用梯度计算,节省内存 |
state_dict |
模型的参数字典,用于保存和加载 |
torch.save(obj, path) |
通用序列化保存(模型/字典/任意对象) |