1.Tensor(张量)

1.1 Tensor 是什么?

Tensor = N 维数组(支持 GPU + 自动求梯度 + 更高性能)

  • 标量(0 维)→ 3

  • 向量(1 维)→ [1,2,3]

  • 矩阵(2 维)→ [[1,2],[3,4]]

  • 三维张量(3 维)→ 图像 RGB

  • 四维张量(4 维)→ batch 图像(比如 (64,3,32,32))

1.2 创建张量

# 从数据创建
torch.tensor([1,2,3])
# 全零张量
torch.zeros(2,3)
# 全一张量
torch.ones(2,3)
# 单位矩阵 创建 ?*? 的矩阵 ?代表你输入的参数
torch.eye(4)
# 生成均匀分布矩阵:0-1 之间的数值 矩阵,包含0 不包含1
torch.rand(2,3)
# 生成标准正态分布的张量:均值为0,标准差 1 ~ -1 之间
torch.randn(2,3)
# 生成等差数列,值范围为0-10 步进2,包含开始值,不包含结尾值[0,2,4,6,8]
torch.arange(0,10,2)
# 生成线性间隔张量,start 0 end 10  分成 5个值 [ 0.0000,  2.5000,  5.0000,  7.5000, 10.0000]
torch.linspace(0,10,5)

# 从numpy 创建 张量
import numpy as np
# 根据数组创建张量
numpy_array = np.array([1,2,3,4])
# 转换成torch 的张量
a = torch.from_numpy(numpy_array)
# 创建一个形状 数据类型 设备 完全相同的张量,不过所有元素都为0
torch.zeros_like(a)
# 和上边相同,不过所以元素都是标准正态分布的
torch.rand_like(a)
# dtype 代表的是当前张量内的元素类型或者说是 精度
torch.tensor([1,2,3,4],dtype=torch.float32)

1.3 查看 Tensor 信息

out = torch.rand(2,3,4)
# 查看张量形状 torch.Size([2, 3, 4])
print(out.shape)
# 查看张量维度 3
print(out.ndim)
# 查看张量的运行设备 cpu
print(out.device)
# 查看张量的梯度
print(out.grad)
# 查看是否保存梯度
print(out.requires_grad)
# 访问张量底层一维数据存储,可以进行修改
print(out.untyped_storage())

1.4 Tensor 的数学运算

# 加减乘
a+b a-b a*b
# 矩阵运算
a@b
# 广播
a + 3
# 点积
dot = torch.dot(a, b)

1.5 张量操作(索引、切片、变形)

# 索引
tor = torch.rand(2,3,4)
print(tor)
# 取第一个维度 维度索引为0的全部元素
print(tor[0])
# 取第一个维度的全部 第二个维度 索引为0 的全部元素
print(tor[:,0])
# 取最后一个维度 下标为0的元素
print(tor[...,0])

# 重塑
tor = torch.arange(12)
# 将 1d 张量 转换成 3x4 的 2d张量
print(tor.reshape(3,4))
# 返回一个视图,与源张量共享内存地址,一个改变了,另外一个也会跟着变化
print(tor.view(3,4))
# 将张量展平为一维
print(tor.flatten())

1.6 CPU 与 GPU 切换

# 查看gpu 是否可用
torch.cuda.is_available()
# 将tensor 放在gpu 上
a = torch.ones((3,3)).cuda()
# 从gpu 转到 cpu
a = a.cpu()
# 常用写法
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
a = a.to(device)

1.7 Tensor 和 Numpy 转换

# tensor -> numpy
a.numpy()
# numpy -> tensor
torch.from_numpy(np_array)
# tensor.from_numpy() 产生的 tensor 和 numpy arrary 共享内存(改变一个,另一个也跟着转变)
# 如果不想共享内存 则使用
torch.tensor(b)

1.8 归约操作

tor = torch.randn(3, 4)
print(tor)
# 所有元素总和
print(tor.sum())
# 沿dim=0求和
print(tor.sum(dim=0))
# 取平均值
print(tor.mean())
# 标准差
print(tor.std())
# 最大值和索引
max_val,max_ids = tor.max(dim=1)

1.9 比较操作

mask = tensor > 0.5            # 布尔掩码
where = torch.where(mask, tensor, torch.zeros_like(tensor))  # 条件选择

1.10 小测试

import torch
import torch.nn.functional as F
"""
1.创建一个3x4x5的张量,然后:
tensor = torch.rand(3,4,5)
2.计算每个"面"(4x5)的平均值
tensor.mean(dim=(1, 2))
3.找出所有大于0.5的元素
mask = tensor > 0.5
tensor[mask]
4.将张量展平后重新形状为5x12
flattened = tensor.flatten()
flattened.reshape(5, 12)
5.使用广播机制实现与一个1x4x1张量的加法
tensor1 = torch.rand(1,4,1)
print(tensor1 + tensor)

# 创建模拟数据
batch_size = 8
features = 10
data = torch.randn(batch_size, features)
labels = torch.randint(0, 2, (batch_size,))
print(data)
# 任务:
# 1. 标准化每个特征(减去均值,除以标准差)
feature_means = data.mean(dim=0)
feature_stds = data.std(dim=0)
# 避免出现除零异常
feature_stds = torch.where(feature_stds == 0, torch.ones_like(feature_stds), feature_stds)
normalized_data = (data - feature_means) / feature_stds

# 2. 找出每个样本的最大特征值及其位置
max_value,max_index = data.max(dim=1)
# 3. 计算类别的one-hot编码
num_classes = 2
F.one_hot(labels, num_classes=2)
"""

2.Autograd(自动求导)

Autograd 负责自动构建计算图,并根据前向计算自动推导出梯度。

  • 深度学习训练的流程(本质)

    • 前向传播:算 output

    • 反向传播:算梯度(autograd 自动完成)

    • 梯度更新:optimizer.step()

  • 手动求导

# 方法一:直接展开(代数法)
# 1.展开平方
out = (5x+3)²
    = (5x+3) × (5x+3)      # 平方就是乘以自身
    = 5x × (5x+3) + 3 × (5x+3)  # 分配律
    = 25x² + 15x + 15x + 9  # 进一步展开
    = 25x² + 30x + 9        # 合并同类项
# 2.逐项求导
d(out)/dx = d(25x²)/dx + d(30x)/dx + d(9)/dx
          = 25 × d(x²)/dx + 30 × d(x)/dx + 0
          = 25 × (2x) + 30 × 1
          = 50x + 30

# 链式法则(复合函数求导)
# 1.识别复合结构
外层函数:f(u) = u²       # 平方函数
内层函数:u(x) = 5x + 3    # 线性函数
复合函数:f(u(x)) = (5x+3)²
# 2.计算各层导数
1. 外层导数:df/du = 2u
2. 内层导数:du/dx = 5
# 3.应用链式法则
链式法则公式:df/dx = df/du × du/dx

代入:
df/dx = (2u) × (5)
     = 2u × 5
     = 10u
     = 10 × (5x+3)   # 将 u = 5x+3 代入
     = 50x + 30

2.1 requires_grad

# 你希望 PyTorch 帮你计算梯度,就要给 Tensor 设置:requires_grad=True
import torch
x = torch.tensor([2.0, 3.0], requires_grad=True)

# 1.1 创建需要梯度的张量
x = torch.tensor(2.0, requires_grad=True)  # requires_grad=True表示跟踪该张量的所有操作
y = torch.tensor(3.0, requires_grad=True)

# 1.2 执行计算(构建计算图)
z = x**2 + y**3 + x*y

print(f"x = {x}, requires_grad = {x.requires_grad}")
print(f"y = {y}, requires_grad = {y.requires_grad}")
print(f"z = {z}")

# 1.3 计算梯度
z.backward()  # 计算z关于所有requires_grad=True的叶节点的梯度

print(f"∂z/∂x = {x.grad}")  # 2x + y = 2*2 + 3 = 7
print(f"∂z/∂y = {y.grad}")  # 3y² + x = 3*9 + 2 = 29

2.2 计算图是如何构建的

x = torch.tensor([2.0], requires_grad=True)
y = x * 3
z = y ** 2

"""
这里背后发生了什么:
PyTorch 记录了:
z 是通过 y → (平方运算) 产生
y 是通过 x → (乘法运算) 产生
计算图为:
x --(*3)--> y --(^2)--> z
"""

2.3 backward() 自动反向传播

# 对 z 求梯度:
z.backward()
# PyTorch 会自动计算:
dz/dx = dz/dy * dy/dx
dz/dx = (2y) * (3)
# backward() 会把梯度写入 x.grad

2.4 常见规则

2.4.1 .grad 只会被写入 叶子节点(leaf tensor)

x = torch.tensor([3.0], requires_grad=True)
y = x * 2
z = y ** 2

只有 x.grad 会有值
y.grad 和 z.grad 没有梯度(默认不存)

2.4.2 每次 backward() 前要梯度清零

因为 .grad 会累加

x.grad = None or optimizer.zero_grad()

2.4.3 backward() 只能对标量调用

标量:一个单独的数,只有大小 没有方向

2.4.4 线性回归

import torch

# 训练数据
x = torch.randn((10,1))
y = 3 * x + 1
# 参数(需要求梯度)
w = torch.randn((1,1), requires_grad=True)
b = torch.randn((1), requires_grad=True)
lr = 0.1
for i in range(100):
    # forward
    y_pred = x @ w + b
    # loss
    loss = ((y_pred - y)**2).mean()
    # backward
    loss.backward()
    # 梯度下降:w = w - lr * dw
    with torch.no_grad():
        w -= lr * w.grad
        b -= lr * b.grad
        w.grad = None
        b.grad = None
print("w =", w)
print("b =", b)

2.5 标量 vs 非标量梯度

# 2.1 标量输出(最常见)
x = torch.tensor([1.0, 2.0, 3.0], requires_grad=True)
y = x.sum()  # 标量输出
y.backward()  # 不需要参数
print(f"∂y/∂x = {x.grad}")  # [1., 1., 1.]

# 2.2 非标量输出(需要传入gradient参数)
x = torch.tensor([1.0, 2.0, 3.0], requires_grad=True)
y = x ** 2  # 向量输出:[1, 4, 9]

# 方法1:求和变成标量
y_sum = y.sum()
y_sum.backward()
print(f"通过求和: ∂(∑y)/∂x = {x.grad}")  # [2., 4., 6.]

# 清零梯度
x.grad.zero_()

# 方法2:直接对向量调用backward并传入梯度权重
grad_output = torch.tensor([1.0, 1.0, 1.0])
y.backward(grad_output)
print(f"直接backward: ∂y/∂x = {x.grad}")  # 相同结果 [2., 4., 6.]

2.6 梯度累加和清零

# 3.1 梯度累加特性
x = torch.tensor(2.0, requires_grad=True)

for i in range(3):
    y = x ** 2
    y.backward(retain_graph=True)  # retain_graph保持计算图
    print(f"第{i+1}次反向传播后梯度: {x.grad}")
    # 注意:梯度会累加!2 + 2 + 2 = 6

# 3.2 正确做法:每次迭代前清零梯度
x = torch.tensor(2.0, requires_grad=True)

for i in range(3):
    if x.grad is not None:
        x.grad.zero_()  # 清零梯度
    y = x ** 2
    y.backward()
    print(f"清零后第{i+1}次梯度: {x.grad}")  # 每次都是2

2.7 禁用梯度计算

# 4.1 torch.no_grad()上下文管理器
x = torch.tensor([1.0, 2.0, 3.0], requires_grad=True)

with torch.no_grad():
    y = x * 2  # 不跟踪梯度
    print(f"y.requires_grad: {y.requires_grad}")  # False

# 4.2 .detach()方法 - 从计算图中分离
x = torch.tensor([1.0, 2.0, 3.0], requires_grad=True)
y = x ** 2

y_detached = y.detach()  # 创建一个新张量,不共享计算历史
print(f"y.requires_grad: {y.requires_grad}")  # True
print(f"y_detached.requires_grad: {y_detached.requires_grad}")  # False

# 4.3 推理时禁用梯度
@torch.no_grad()  # 装饰器版本
def inference(model, input_data):
    return model(input_data)  # 不计算梯度,节省内存

2.8 自定义函数与高阶梯度

# 5.1 自定义函数(了解即可,nn.Module通常更简单)
class CustomReLU(torch.autograd.Function):
    """
    自定义ReLU激活函数
    forward: y = max(0, x)
    backward: ∂y/∂x = 1 if x > 0 else 0
    """
    @staticmethod
    def forward(ctx, input):
        ctx.save_for_backward(input)  # 保存输入用于反向传播
        return input.clamp(min=0)
    
    @staticmethod
    def backward(ctx, grad_output):
        input, = ctx.saved_tensors
        grad_input = grad_output.clone()
        grad_input[input < 0] = 0  # ReLU的导数
        return grad_input

# 使用自定义函数
x = torch.tensor([-1.0, 0.5, 2.0], requires_grad=True)
relu = CustomReLU.apply
y = relu(x)
y.sum().backward()
print(f"x: {x}")
print(f"y: {y}")
print(f"梯度: {x.grad}")  # [0., 1., 1.]

# 5.2 计算二阶导数(高阶梯度)
x = torch.tensor(2.0, requires_grad=True)

# 一阶导数
y = x ** 3
grad1 = torch.autograd.grad(y, x, create_graph=True)[0]  # create_graph=True允许计算高阶导数
print(f"一阶导数 (dy/dx): {grad1}")  # 3x² = 12

# 二阶导数
grad2 = torch.autograd.grad(grad1, x)[0]
print(f"二阶导数 (d²y/dx²): {grad2}")  # 6x = 12

2.9 常见陷阱与调试

# 7.1 原地操作会破坏梯度
x = torch.tensor([1.0, 2.0, 3.0], requires_grad=True)
y = x ** 2

# ❌ 错误的原地操作
# x[0] = 10.0  # 这会破坏计算图!

# ✅ 正确做法:创建新张量
x_new = x.clone()
x_new[0] = 10.0

# 7.2 检查梯度是否存在
x = torch.tensor([1.0, 2.0, 3.0], requires_grad=True)
y = x.sum()

print(f"y.requires_grad: {y.requires_grad}")
print(f"x.grad before backward: {x.grad}")  # None

y.backward()
print(f"x.grad after backward: {x.grad}")   # [1., 1., 1.]

# 7.3 梯度检查(数值梯度 vs 自动梯度)
def gradient_check():
    x = torch.tensor([1.5], requires_grad=True)
    y = torch.sin(x)
    
    # 自动微分
    y.backward()
    auto_grad = x.grad.item()
    
    # 数值微分(中心差分)
    eps = 1e-5
    x_plus = torch.tensor([1.5 + eps])
    x_minus = torch.tensor([1.5 - eps])
    num_grad = (torch.sin(x_plus) - torch.sin(x_minus)) / (2 * eps)
    
    print(f"自动微分梯度: {auto_grad:.8f}")
    print(f"数值微分梯度: {num_grad.item():.8f}")
    print(f"差异: {abs(auto_grad - num_grad.item()):.10f}")

gradient_check()

3.广播、常见维度操作、拼接与切分、矩阵运算

3.1 广播机制 Broadcasting

PyTorch 在计算时,如果两个张量维度不一样,它会自动扩展它们,使形状兼容。

广播规则类似 numpy:

  1. 从后往前对齐维度

  2. 如果一个维度是 1,可以扩展

  3. 如果维度不相等且都不是 1 → 报错

import torch

a = torch.randn(3, 1)   # shape (3,1)
b = torch.randn(1, 4)   # shape (1,4)

c = a + b  # 自动广播成为 (3,4)
print(c.shape)

3.2 常见维度操作

3.2.1 reshape / view(改变形状)

a = torch.arange(12)
b = a.reshape(3, 4)
# view 更严格,要求内存连续。
# 内存布局
tensor = torch.randn(3, 4)
contiguous = tensor.is_contiguous()
tensor_contiguous = tensor.contiguous()  # 转换为连续内存

view = tensor.view(12)      # 共享内存
clone = tensor.clone()      # 深拷贝,不共享内存

3.2.2 unsqueeze:增加某一维 —— squeeze:去掉维度为1的维度

# squeeze : 挤压 
# 加上un 取反
# unsqueeze : 解压 或是拓展
a = torch.randn(1,3,1,5)
b = a.squeeze()     # 去掉所有 1 维度

a = torch.tensor([1,2,3])
print(a.shape)      # (3,)
b = a.unsqueeze(0)  # (1,3)
c = a.unsqueeze(1)  # (3,1)

3.2.3 permute:改变维度顺序

x = torch.randn(2, 3, 4)
transposed = tensor.T # 转置
y = x.permute(0, 2, 1)  # (2,4,3) 中间的参数为 维度的下标

3.2.4 transpose:两维度交换

x = torch.randn(3, 4)
y = x.transpose(0, 1)  # 变成 (4,3) 
# 两个参数代表维度的索引 指要交换的两个维度

3.2.5 expand / repeat(复制维度)

expand

核心机制:基于广播机制,只能将维度大小为1的维度扩展到更大的尺寸。

内存:与原始张量共享内存(是原张量的视图)。

变化:如果你修改了原始张量,expand后的张量也会变化,反之亦然(因为它们共享内存)。

维度限制:只能扩展维度大小为1的维度,不能扩展其他维度(比如维度大小为2的维度不能直接扩展为5)。

repeat

核心机制:通过复制数据来扩展张量,可以扩展任何维度(无论原始维度大小是多少)。

内存:不共享内存,创建新的张量,数据被复制。

变化:修改原始张量不会影响repeat得到的张量,修改repeat得到的张量也不会影响原始张量。

维度灵活性:可以任意指定每个维度重复的次数。

# 原始张量
x = torch.tensor([[1, 2, 3]])  # shape: (1, 3)
# expand
y_expand = x.expand(4, 3)  # 将第0维从1扩展到4
# 可以这样理解:y_expand是x的"视图",逻辑上有4行,但实际存储只有1行数据
# repeat
y_repeat = x.repeat(4, 1)  # 第0维重复4次,第1维重复1次
# 可以这样理解:y_repeat是真正复制了4份数据,存储了4行数据
# 内存共享测试
print("原始x的内存地址:", x.storage().data_ptr())
print("expand后的内存地址:", y_expand.storage().data_ptr())
print("repeat后的内存地址:", y_repeat.storage().data_ptr())
# 你会发现expand和x相同,repeat不同
# 修改测试
x[0, 0] = 100
print("修改x后,y_expand[0, 0]:", y_expand[0, 0])  # 变成100
print("修改x后,y_repeat[0, 0]:", y_repeat[0, 0])  # 仍然是1

3.3 拼接与切分

3.3.1 cat:拼接已有维度

a = torch.ones(2,3)
b = torch.zeros(2,3)

c = torch.cat([a,b], dim=0)   # (4,3) 下标为0的维度相加
d = torch.cat([a,b], dim=1)   # (2,6) 下标为1的维度相加
stack = torch.stack([a, b])  # 新增一个维度,变成 2x2x3

3.3.2 stack:会创建新维度

a = torch.ones(3)
b = torch.zeros(3)

torch.stack([a,b], dim=0)  # (2,3)
torch.stack([a,b], dim=1)  # (3,2)

3.3.3 split & chunk:切分张量

x = torch.arange(10)

print(torch.split(x, 3))  # 按大小切 [3,3,3,1]
print(torch.chunk(x, 3))  # 均匀切 3 份 [4,3,3]

3.4 矩阵运算

3.4.1 matmul:自动选择正确的矩阵乘法

a = torch.randn(3, 4)
b = torch.randn(4, 5)

c = a @ b   # 等价 matmul

3.4.2 mm:两个 2D 矩阵

# 只能对2d张量相乘,第一个张量的列 必须要等与 第二个张量的行
c = torch.mm(a, b)

3.4.3 bmm:批矩阵乘法

# 只能对3d张量相乘,第一个张量的最后一个维度 必须要等于 第二个张量的 倒数第二个维度
a = torch.randn(10, 3, 4)
b = torch.randn(10, 4, 5)

c = torch.bmm(a, b)  # (10,3,5)

4.计算图、Autograd机制、requires_grad意义、detach 与 torch.no_grad 的区别、梯度清零的深层原因

4.1 计算图(Computational Graph)

PyTorch 会在你执行操作时,动态构建一个有向图:

  • 节点:Tensor

  • 边:操作(如 +, matmul, relu, sigmoid, mean)

x = torch.tensor([2.0], requires_grad=True)
y = x * 3
z = y + 5
out = z ** 2

# 计算图
x → (*) → y → (+) → z → (**) → out

每一步都会产生一个新 Tensor,它记录:

  • 是由哪种操作产生的:Tensor.grad_fn

  • 输入是谁(也就是“父节点”)

4.2 backward() 做了什么?

# 代表给 out 注入一个梯度 1(d(out)/d(out) = 1)
out.backward()

然后 PyTorch 会沿计算图反向传播,逐步计算:

  • d(out)/d(z)

  • d(out)/d(y)

  • d(out)/d(x)

最终存储在 x.grad;这就是自动求导的核心

4.3 requires_grad = True 的意义

Tensor 是否参与反向传播取决于:

  • x = torch.tensor([1.0], requires_grad=True)

    • 如果 True → PyTorch 跟踪它

    • 如果 False → 认为它是常量,不计算梯度

4.4 detach vs no_grad

4.4.1 detach()

从计算图中切断一个 Tensor 不影响其他人,只影响自己。

  • y = x * 3

    z = y.detach()

    • zy 数值一样,但:

    • z 不会被追踪梯度

    • y 仍然会被追踪

  • 适用于:

    • 做 forward 但不想反向传播到某一层

    • RNN 训练中清空历史图

4.4.2 with torch.no_grad()

临时关闭整个当前 block 的梯度追踪

with torch.no_grad():
    w -= lr * w.grad

常用于:

  • 权重更新(optimizer step)

  • 推理阶段(不训练)

  • 节省显存

4.5 为什么必须梯度清零 zero_grad()?

因为梯度会累加!

每次 backward() 都会把梯度累加到 .grad

w.grad += 当前梯度

因此在每个 batch 前必须:

w.grad = None
# 或
w.grad.zero_()

4.6 手写线性回归,最好全部理解

import torch

# 机器学习通常要求输入为二维,每行是一个样本。
x =  torch.linspace(0,10,20).unsqueeze(1)

# 创建标准答案 让模型学习的目标规律
y = x * 3 + 2 # 3 代表预设的 权重  2 代表 预设的 偏置

# 设置可学习参数 w 和 b
# w 称为 权重或斜率,表示x每变化一个单位,y会变化多少
w = torch.randn(1,requires_grad=True) # 开启梯度追踪
# b 称为 偏置或截距,表示当x为0时y的值。
b = torch.randn(1,requires_grad=True)
# 学习率:控制每次参数更新的步长。太大容易“跨过”最优点,太小则学习太慢。0.01是一个常用值
lr =0.01
# 一个epoch代表着一次完整的数据训练
for epoch in range(200):
    # 使用当前的 权重 和 偏置 计算模型的 预测值
    y_pred = x * w + b
    # 计算损失 衡量预测值与真实值差距,平方可放大大误差并确保正值。
    loss = ((y_pred - y) ** 2).mean()
    # 反向传播,自动计算梯度 PyTorch沿计算图反向计算loss对w和b的导数
    loss.backward()

    # 关闭梯度上下文,确保更新参数时不记录梯度,避免干扰到下次计算
    """
    为什么必须要关闭梯度上下文
    1.避免梯度被跟踪导致计算图不断变大
    2.避免 w和b 的更新操作参与到梯度计算中
    """
    with torch.no_grad():
        # 沿梯度反方向调整参数,使loss减小
        # loss 的 越来越小,那么梯度也会越来越小
        w -= lr * w.grad
        b -= lr * b.grad

    # PyTorch梯度会累积,必须手动清零防止影响下次迭代。
    # backward() 是累加梯度,如果不清零,会让多次训练的梯度叠加,导致优化方向错误。
    w.grad.zero_()
    b.grad.zero_()

    if epoch % 10 == 0:
        print(f"epoch {epoch}, loss={loss.item():.4f}, w={w.item():.4f}, b={b.item():.4f}")

5.神经网络(nn.Module + nn.Linear)

模型训练完整流程:

  • 使用 MSE loss 和 SGD 优化器

import torch
from torch import nn


# 所有模型都必须继承 nn.Module
class my_model(nn.Module):
    def __init__(self):
        super().__init__()
        # 定义网络层结构
        # fc1: 输入1个特征 → 输出10个特征(10个神经元) nn.Linear 他在内部自动创建了 w权重 b偏置
        # 并且默认带有 requires_grad=True
        self.fc1 = nn.Linear(in_features=1, out_features=10)
        # ReLU激活函数(非线性变换)
        self.relu = nn.ReLU()
        # fc2: 输入10个特征 → 输出1个特征
        self.fc2 = nn.Linear(in_features=10, out_features=1)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

model = my_model()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# to(device) 使用GPU进行训练
x = torch.tensor([[1.0],[2.0],[3.0]]).to(device).to(device)
y = torch.tensor([[2.0],[4.0],[6.0]]).to(device).to(device)
criterion = nn.MSELoss().to(device) # 损失函数
optimizer = torch.optim.SGD(model.parameters(), lr=0.01) # 优化器

for epoch in range(200):

    # 1. forward 调用 model(x)就相当于调用了 forward方法 -> forward 是定义前向计算逻辑
    pred = model(x)
    # 2. 计算损失
    loss = criterion(pred, y)
    # 3. backward
    optimizer.zero_grad()  # 清零梯度
    loss.view()
    # 4. 更新参数
    optimizer.step()

    if epoch % 40 == 0:
        print(epoch, loss.item())
        learned_weight = model.fc1.weight.data
        learned_bias = model.fc1.bias.data
        print(learned_weight, learned_bias)

标准流程:forward(计算图) -> loss(损失值) -> zero_grad(清空梯度) -> backward(反向传播) -> step(更新参数)

  • 核心区别对比:

特性

model.train() (训练模式)

model.eval() (评估模式)

主要目的

准备模型进行训练,启用所有正则化和学习机制。

准备模型进行测试、验证或推理,关闭训练专用行为以获得稳定输出。

Dropout层

启用。按概率随机“关闭”神经元,防止过拟合。

关闭。所有神经元都参与计算,保证输出的确定性和可重复性。

BatchNorm层

使用当前批次的统计量(均值/方差)进行归一化,并更新其运行统计量

使用训练阶段积累的全局运行统计量进行归一化,停止更新。

梯度计算

默认进行梯度计算和存储,为反向传播做准备。

通常与torch.no_grad()联用,不计算或存储梯度,节省内存/算力。

其他影响

可能影响其他行为与训练阶段相关的层(如InstanceNorm, DropPath等)。

确保这些层切换到确定性推理状态。

5.1 nn.Module 基础

5.1.1 最简单的神经网络

import torch
import torch.nn as nn
import torch.nn.functional as F

# 继承nn.Module定义网络
class SimpleNet(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()  # 必须调用父类初始化
        self.fc1 = nn.Linear(input_size, hidden_size)  # 全连接层1
        self.fc2 = nn.Linear(hidden_size, output_size)  # 全连接层2
        
    def forward(self, x):
        """定义前向传播"""
        x = self.fc1(x)
        x = F.relu(x)  # 激活函数
        x = self.fc2(x)
        return x

# 使用网络
model = SimpleNet(10, 20, 5)  # 10维输入 -> 20维隐藏层 -> 5维输出
print(model)

5,1,2 探索模型结构

# 查看模型结构
print(f"模型结构:\n{model}")

# 查看所有参数
print(f"\n所有参数名称:")
for name, param in model.named_parameters():
    print(f"  {name}: {param.shape}")

# 访问特定层
print(f"\n第一层权重形状: {model.fc1.weight.shape}")  # (20, 10)
print(f"第一层偏置形状: {model.fc1.bias.shape}")      # (20,)

# 计算参数量
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"\n总参数: {total_params}, 可训练参数: {trainable_params}")

5.2 常用神经网络层

5.2.1 全连接层

class FullyConnectedNet(nn.Module):
    def __init__(self):
        super().__init__()
        # 线性层: y = xW^T + b
        self.fc1 = nn.Linear(784, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 10)
        
        # Dropout层防止过拟合
        self.dropout = nn.Dropout(0.2)
        
        # 批标准化层
        self.bn1 = nn.BatchNorm1d(256)
        self.bn2 = nn.BatchNorm1d(128)
        
    def forward(self, x):
        x = x.view(-1, 784)  # 展平图像
        x = self.fc1(x)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.dropout(x)
        
        x = self.fc2(x)
        x = self.bn2(x)
        x = F.relu(x)
        x = self.dropout(x)
        
        x = self.fc3(x)
        return F.log_softmax(x, dim=1)  # 分类输出

5.2.2 卷积层(用于图像)

class CNN(nn.Module):
    def __init__(self):
        super().__init__()
        # 卷积层参数: (输入通道, 输出通道, 卷积核大小, 步长, 填充)
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        
        # 池化层
        self.pool = nn.MaxPool2d(2, 2)
        
        # 全连接层
        self.fc1 = nn.Linear(128 * 3 * 3, 256)  # 计算展平后的大小
        self.fc2 = nn.Linear(256, 10)
        
        # Dropout
        self.dropout = nn.Dropout(0.5)
        
    def forward(self, x):
        # 输入: (batch_size, 1, 28, 28)
        x = self.pool(F.relu(self.conv1(x)))  # -> (batch_size, 32, 14, 14)
        x = self.pool(F.relu(self.conv2(x)))  # -> (batch_size, 64, 7, 7)
        x = self.pool(F.relu(self.conv3(x)))  # -> (batch_size, 128, 3, 3)
        
        x = x.view(-1, 128 * 3 * 3)  # 展平
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)

5.2.3 循环层(用于序列数据)

class RNNModel(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_layers):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, 
                           batch_first=True, dropout=0.2)
        self.fc = nn.Linear(hidden_size, vocab_size)
        
    def forward(self, x, hidden=None):
        # x: (batch_size, seq_length)
        embedded = self.embedding(x)  # -> (batch_size, seq_length, embed_size)
        
        # LSTM处理
        lstm_out, hidden = self.lstm(embedded, hidden)
        # lstm_out: (batch_size, seq_length, hidden_size)
        
        # 只取最后一个时间步的输出
        last_output = lstm_out[:, -1, :]
        
        # 分类
        output = self.fc(last_output)
        return output, hidden

5.3 模型构建技巧

5.3.1 nn.Sequential - 快速构建

# 方法1: 直接使用nn.Sequential
model = nn.Sequential(
    nn.Linear(784, 256),
    nn.ReLU(),
    nn.Dropout(0.2),
    nn.Linear(256, 128),
    nn.ReLU(),
    nn.Dropout(0.2),
    nn.Linear(128, 10),
    nn.LogSoftmax(dim=1)
)

# 方法2: 在Module中使用Sequential
class SequentialNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 128, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
        )
        self.classifier = nn.Sequential(
            nn.Linear(128 * 7 * 7, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, 10)
        )
        
    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x

5.3.2 参数初始化

def init_weights(m):
    """自定义权重初始化"""
    if isinstance(m, nn.Linear):
        nn.init.xavier_uniform_(m.weight)  # Xavier初始化
        nn.init.constant_(m.bias, 0)       # 偏置初始化为0
    elif isinstance(m, nn.Conv2d):
        nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
    elif isinstance(m, nn.BatchNorm2d):
        nn.init.constant_(m.weight, 1)
        nn.init.constant_(m.bias, 0)

# 应用初始化
model = CNN()
model.apply(init_weights)

# 查看初始化后的权重
for name, param in model.named_parameters():
    if 'weight' in name:
        print(f"{name} 均值: {param.mean():.6f}, 标准差: {param.std():.6f}")

5.3.3 模块化设计

# 基础块(复用)
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, 3, stride, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, 3, 1, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        # 快捷连接
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, 1, stride),
                nn.BatchNorm2d(out_channels)
            )
            
    def forward(self, x):
        residual = self.shortcut(x)
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.bn2(self.conv2(x))
        x += residual
        return F.relu(x)

# 使用基础块构建复杂网络
class ResNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 64, 7, 2, padding=3)
        self.bn1 = nn.BatchNorm2d(64)
        self.pool = nn.MaxPool2d(3, 2, padding=1)
        
        # 重复使用残差块
        self.layer1 = self._make_layer(64, 64, 2, stride=1)
        self.layer2 = self._make_layer(64, 128, 2, stride=2)
        self.layer3 = self._make_layer(128, 256, 2, stride=2)
        
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(256, 10)
        
    def _make_layer(self, in_channels, out_channels, blocks, stride):
        layers = []
        layers.append(ResidualBlock(in_channels, out_channels, stride))
        for _ in range(1, blocks):
            layers.append(ResidualBlock(out_channels, out_channels))
        return nn.Sequential(*layers)
    
    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.pool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

5.4 训练循环完整示例

5.4.1 完整训练流程

import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# 1. 数据准备
def create_sample_data():
    """创建模拟数据"""
    torch.manual_seed(42)
    X = torch.randn(1000, 784)  # 1000个样本,784特征
    y = torch.randint(0, 10, (1000,))  # 10个类别
    return X, y

X, y = create_sample_data()
dataset = TensorDataset(X, y)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

# 2. 模型定义
class Classifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(784, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, 10)
        )
    
    def forward(self, x):
        return self.net(x)

# 3. 初始化
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = Classifier().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5)

# 4. 训练循环
epochs = 5
for epoch in range(epochs):
    model.train()  # 训练模式(启用dropout等)
    running_loss = 0.0
    correct = 0
    total = 0
    
    for batch_idx, (inputs, labels) in enumerate(dataloader):
        inputs, labels = inputs.to(device), labels.to(device)
        
        # 清零梯度
        optimizer.zero_grad()
        
        # 前向传播
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        # 反向传播
        loss.backward()
        
        # 梯度裁剪(防止梯度爆炸)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        # 更新参数
        optimizer.step()
        
        # 统计
        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()
        
        if batch_idx % 50 == 0:
            print(f'Epoch: {epoch+1}, Batch: {batch_idx}, Loss: {loss.item():.4f}')
    
    scheduler.step()  # 更新学习率
    
    # 每个epoch的统计
    epoch_loss = running_loss / len(dataloader)
    epoch_acc = 100. * correct / total
    print(f'Epoch {epoch+1} 完成 | Loss: {epoch_loss:.4f} | Acc: {epoch_acc:.2f}%')
    
    # 保存检查点
    if (epoch + 1) % 2 == 0:
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': epoch_loss,
        }, f'checkpoint_epoch_{epoch+1}.pth')

5.4.2 推理模式

# 切换到评估模式
model.eval()  # 禁用dropout、batchnorm使用运行统计

# 推理(不计算梯度)
with torch.no_grad():
    test_input = torch.randn(1, 784).to(device)
    output = model(test_input)
    probabilities = F.softmax(output, dim=1)
    _, predicted = output.max(1)
    print(f"预测类别: {predicted.item()}, 概率分布: {probabilities.squeeze()}")

# 切换回训练模式
model.train()

5.5 高级功能

5.5.1 钩子函数(Hooks)

class DebugNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(10, 5)
        self.fc2 = nn.Linear(5, 2)
        
        # 注册前向钩子
        self.fc1.register_forward_hook(self.forward_hook)
        self.fc1.register_backward_hook(self.backward_hook)
        
    def forward_hook(self, module, input, output):
        print(f"fc1 前向传播:")
        print(f"  输入形状: {input[0].shape}")
        print(f"  输出形状: {output.shape}")
        print(f"  输出均值: {output.mean():.4f}")
        
    def backward_hook(self, module, grad_input, grad_output):
        print(f"fc1 反向传播:")
        print(f"  梯度输入形状: {[g.shape for g in grad_input if g is not None]}")
        print(f"  梯度输出形状: {[g.shape for g in grad_output if g is not None]}")
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# 测试钩子
model = DebugNet()
x = torch.randn(4, 10)
output = model(x)
loss = output.sum()
loss.backward()

5.5.2 自定义层

class CustomLinear(nn.Module):
    """自定义线性层,带噪声注入"""
    def __init__(self, in_features, out_features, noise_std=0.1):
        super().__init__()
        self.weight = nn.Parameter(torch.randn(out_features, in_features) * 0.01)
        self.bias = nn.Parameter(torch.zeros(out_features))
        self.noise_std = noise_std
        
    def forward(self, x):
        if self.training and self.noise_std > 0:
            # 训练时添加噪声
            noise = torch.randn_like(x) * self.noise_std
            x = x + noise
        return F.linear(x, self.weight, self.bias)
    
    def extra_repr(self):
        return f'in_features={self.weight.shape[1]}, out_features={self.weight.shape[0]}, noise_std={self.noise_std}'

# 使用自定义层
class CustomNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer1 = CustomLinear(10, 20, noise_std=0.05)
        self.layer2 = nn.Linear(20, 5)
        
    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = self.layer2(x)
        return x

5.6 模型分析和可视化

from torchsummary import summary

# 模型摘要
model = CNN()
print("CNN模型摘要:")
summary(model, input_size=(1, 28, 28))  # 输入形状

# 计算FLOPs(需要thop库)
try:
    from thop import profile
    input_tensor = torch.randn(1, 1, 28, 28)
    flops, params = profile(model, inputs=(input_tensor,))
    print(f"FLOPs: {flops:,}, 参数数量: {params:,}")
except:
    print("安装thop: pip install thop")

5.7 小练习

class CIFAR10CNN(nn.Module):
    def __init__(self):
        super().__init__()
        # 任务:设计一个适合32x32彩色图像的CNN
        # 提示:CIFAR-10图像是3通道、32x32大小,有10个类别
        # 尝试包含:卷积层、池化层、批标准化层、全连接层
        
    def forward(self, x):
        # 实现前向传播
        pass

# 测试你的设计
model = CIFAR10CNN()
test_input = torch.randn(4, 3, 32, 32)  # batch_size=4, 3通道, 32x32
output = model(test_input)
print(f"输出形状: {output.shape}")  # 应为(4, 10)


class CIFAR10CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3,32,3,1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2,2),
            nn.Conv2d(32,64,3,1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2,2),
            nn.Conv2d(64,128,3,1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2,2)
        )

        self.classifier = nn.Sequential(
            nn.Linear(128*2*2,256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, 10)
        )

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        return self.classifier(x)

model = CIFAR10CNN()
test_input = torch.randn(4, 3, 32, 32)
output = model(test_input)
print(f"输出形状: {output.shape}")

6.Dataset & DataLoader

手喂数据的坏处:

  • 数据量大时无法全部放内存

  • 每次都整批训练 → 不稳定

  • 没有打乱数据 → 容易过拟合

  • 不能动态扩展数据源(图片/文本/数据库)

import torch
from torch.utils.data import Dataset, DataLoader

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 自定义数据集
class LinearDataset(Dataset):
    def __init__(self):
        # 100个样本
        self.x = torch.linspace(0, 10, 100).unsqueeze(1).to(device)
        self.y = 2 * self.x + 1

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        return self.x[idx], self.y[idx]

# 实例化数据集 & 数据加载器
dataset = LinearDataset()
loader = DataLoader(dataset, batch_size=10, shuffle=True)

# 简单模型
model = torch.nn.Linear(1, 1).to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
criterion = torch.nn.MSELoss().to(device)

# 训练
for epoch in range(5):
    for x, y in loader:
        batch_x = x.to(device)
        batch_y = y.to(device)
        pred = model(batch_x)
        loss = criterion(pred, batch_y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch}: loss={loss.item():.4f}")

shuffle=True

每轮打乱数据,防止模型记顺序

batch_size=10

mini-batch 训练,提高效率、泛化性(训练的步数 = 总量/当前批次数量)

__getitem__

返回一条数据 (特征, 标签)

使用batch(mini-batch)的好处

Mini-Batch 可以在内存可控范围内,使用 GPU 高效并行,并让梯度既有稳定性又保留随机性(提高收敛质量)

训练完成后取预测结果

x_test = dataset.x[:5].to(device)
y_test = dataset.y[:5].to(device)
pred = model(x_test)

print("True:", y_test.view(-1).tolist())
print("Pred:", pred.view(-1).tolist())

6.1 基础概念:Dataset 和 DataLoader

6.1.1 核心组件关系

原始数据 → Dataset → DataLoader → 训练循环
     ↓           ↓          ↓
     预处理      单样本访问   批量加载
                索引访问     shuffle
                          多线程

6.1.2 创建自定义Dataset

import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np
from PIL import Image
import os

# 基础Dataset示例
class CustomDataset(Dataset):
    def __init__(self, data, labels, transform=None):
        """
        Args:
            data: 特征数据
            labels: 标签数据
            transform: 数据预处理/增强函数
        """
        self.data = data
        self.labels = labels
        self.transform = transform
        
    def __len__(self):
        """返回数据集大小"""
        return len(self.data)
    
    def __getitem__(self, idx):
        """根据索引返回一个样本"""
        sample = self.data[idx]
        label = self.labels[idx]
        
        if self.transform:
            sample = self.transform(sample)
            
        return sample, label

# 使用示例
X = torch.randn(100, 3, 32, 32)  # 100个RGB图像,32x32
y = torch.randint(0, 10, (100,))  # 10个类别

dataset = CustomDataset(X, y)
print(f"数据集大小: {len(dataset)}")
print(f"第一个样本: {dataset[0][0].shape}, 标签: {dataset[0][1]}")

6.2 实际应用:图像数据集

6.2.1 图像文件Dataset

class ImageFolderDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        """
        Args:
            root_dir: 数据根目录,结构应为:
                root_dir/
                ├── class1/
                │   ├── img1.jpg
                │   └── img2.jpg
                └── class2/
                    ├── img1.jpg
                    └── img2.jpg
            transform: 图像预处理
        """
        self.root_dir = root_dir
        self.transform = transform
        self.classes = sorted(os.listdir(root_dir))
        self.class_to_idx = {cls: i for i, cls in enumerate(self.classes)}
        
        # 收集所有图像路径和标签
        self.image_paths = []
        self.labels = []
        
        for class_name in self.classes:
            class_dir = os.path.join(root_dir, class_name)
            if os.path.isdir(class_dir):
                for img_name in os.listdir(class_dir):
                    img_path = os.path.join(class_dir, img_name)
                    if img_path.lower().endswith(('.png', '.jpg', '.jpeg')):
                        self.image_paths.append(img_path)
                        self.labels.append(self.class_to_idx[class_name])
    
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        
        # 加载图像
        image = Image.open(img_path).convert('RGB')
        
        if self.transform:
            image = self.transform(image)
            
        return image, label

# 使用示例
# dataset = ImageFolderDataset('data/train', transform=...)

6.2.2 使用torchvision.datasets

import torchvision
import torchvision.transforms as transforms

# CIFAR-10数据集
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),      # 随机裁剪
    transforms.RandomHorizontalFlip(),         # 随机水平翻转
    transforms.ToTensor(),                     # 转换为Tensor (0-1)
    transforms.Normalize((0.4914, 0.4822, 0.4465),  # 标准化 (均值, 标准差)
                         (0.2023, 0.1994, 0.2010)),
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465),
                         (0.2023, 0.1994, 0.2010)),
])

# 下载并加载数据集
train_dataset = torchvision.datasets.CIFAR10(
    root='./data',
    train=True,
    download=True,
    transform=transform_train
)

test_dataset = torchvision.datasets.CIFAR10(
    root='./data',
    train=False,
    download=True,
    transform=transform_test
)

print(f"训练集大小: {len(train_dataset)}")
print(f"测试集大小: {len(test_dataset)}")
print(f"类别: {train_dataset.classes}")

6.3 DataLoader:批量加载数据

6.3.1 基础DataLoader使用

from torch.utils.data import DataLoader

# 创建DataLoader
train_loader = DataLoader(
    dataset=train_dataset,
    batch_size=64,          # 批量大小
    shuffle=True,           # 每个epoch打乱数据
    num_workers=4,          # 多进程加载数据
    pin_memory=True,        # 如果使用GPU,加速数据传输
    drop_last=True          # 丢弃最后不完整的批次
)

test_loader = DataLoader(
    dataset=test_dataset,
    batch_size=64,
    shuffle=False,          # 测试集不需要打乱
    num_workers=2,
    pin_memory=True
)

# 查看一个批次的数据
for batch_idx, (images, labels) in enumerate(train_loader):
    print(f"批次 {batch_idx}:")
    print(f"  图像形状: {images.shape}")  # [64, 3, 32, 32]
    print(f"  标签形状: {labels.shape}")  # [64]
    print(f"  图像范围: [{images.min():.3f}, {images.max():.3f}]")
    
    # 只查看第一个批次
    if batch_idx == 0:
        # 显示一个样本
        print(f"\n第一个样本:")
        print(f"  标签: {labels[0]} -> {train_dataset.classes[labels[0]]}")
        break

6.3.2 高级DataLoader功能

# 1. 使用sampler自定义采样策略
from torch.utils.data.sampler import WeightedRandomSampler

# 创建加权采样器(处理类别不平衡)
class_counts = [5000] * 10  # 假设CIFAR-10每类5000样本
weights = 1. / torch.tensor(class_counts, dtype=torch.float)
samples_weights = weights[list(range(len(train_dataset)))]  # 简化示例

sampler = WeightedRandomSampler(
    weights=samples_weights,
    num_samples=len(train_dataset),  # 采样数量
    replacement=True  # 有放回采样
)

weighted_loader = DataLoader(
    train_dataset,
    batch_size=64,
    sampler=sampler,  # 使用sampler时不能指定shuffle=True
    num_workers=4
)

# 2. 使用collate_fn自定义批次组织方式
def custom_collate_fn(batch):
    """自定义批次处理函数"""
    # batch是[(image1, label1), (image2, label2), ...]的列表
    
    # 分离图像和标签
    images = [item[0] for item in batch]
    labels = [item[1] for item in batch]
    
    # 处理不同大小的图像
    # 假设images大小不同,需要padding
    max_height = max([img.shape[1] for img in images])
    max_width = max([img.shape[2] for img in images])
    
    padded_images = []
    for img in images:
        # 对每个图像进行padding
        pad_height = max_height - img.shape[1]
        pad_width = max_width - img.shape[2]
        padded = torch.nn.functional.pad(
            img, 
            (0, pad_width, 0, pad_height),  # 左,右,上,下填充
            mode='constant', 
            value=0
        )
        padded_images.append(padded)
    
    # 堆叠成批次
    images_batch = torch.stack(padded_images)
    labels_batch = torch.tensor(labels)
    
    return images_batch, labels_batch

custom_loader = DataLoader(
    train_dataset,
    batch_size=32,
    collate_fn=custom_collate_fn,
    shuffle=True
)

6.4 数据预处理与增强

6.4.1 torchvision.transforms 详解

from torchvision import transforms
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np

# 创建示例图像
example_image = Image.new('RGB', (256, 256), color='red')
for i in range(50, 200):
    for j in range(50, 200):
        example_image.putpixel((i, j), (0, 255, 0))

# 定义各种变换
transforms_list = {
    "原始图像": transforms.ToTensor(),
    "调整大小": transforms.Compose([
        transforms.Resize((128, 128)),
        transforms.ToTensor()
    ]),
    "随机裁剪": transforms.Compose([
        transforms.RandomCrop(200),
        transforms.ToTensor()
    ]),
    "中心裁剪": transforms.Compose([
        transforms.CenterCrop(100),
        transforms.ToTensor()
    ]),
    "随机翻转": transforms.Compose([
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomVerticalFlip(p=0.5),
        transforms.ToTensor()
    ]),
    "颜色调整": transforms.Compose([
        transforms.ColorJitter(
            brightness=0.5,    # 亮度
            contrast=0.5,      # 对比度
            saturation=0.5,    # 饱和度
            hue=0.1           # 色调
        ),
        transforms.ToTensor()
    ]),
    "随机旋转": transforms.Compose([
        transforms.RandomRotation(45),  # 旋转 ±45度
        transforms.ToTensor()
    ]),
    "随机透视": transforms.Compose([
        transforms.RandomPerspective(distortion_scale=0.5, p=0.5),
        transforms.ToTensor()
    ]),
    "灰度化": transforms.Compose([
        transforms.Grayscale(num_output_channels=3),  # 保持3通道
        transforms.ToTensor()
    ]),
    "高斯模糊": transforms.Compose([
        transforms.GaussianBlur(kernel_size=5, sigma=(0.1, 2.0)),
        transforms.ToTensor()
    ]),
    "标准化": transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
    ])
}

# 应用变换并显示
fig, axes = plt.subplots(3, 4, figsize=(15, 10))
axes = axes.ravel()

for idx, (name, transform) in enumerate(list(transforms_list.items())[:12]):
    transformed = transform(example_image)
    
    # 转换回图像显示
    img_display = transformed.permute(1, 2, 0).numpy()  # (H, W, C)
    
    # 如果是标准化后的图像,反标准化
    if '标准化' in name:
        img_display = img_display * 0.5 + 0.5
    
    axes[idx].imshow(np.clip(img_display, 0, 1))
    axes[idx].set_title(name)
    axes[idx].axis('off')

plt.tight_layout()
plt.show()

6.4.2 自定义变换

class AddNoise(object):
    """添加高斯噪声"""
    def __init__(self, mean=0., std=0.1):
        self.mean = mean
        self.std = std
        
    def __call__(self, tensor):
        noise = torch.randn_like(tensor) * self.std + self.mean
        return tensor + noise
    
    def __repr__(self):
        return f"{self.__class__.__name__}(mean={self.mean}, std={self.std})"

class RandomErasing(object):
    """随机擦除(Cutout)"""
    def __init__(self, probability=0.5, sl=0.02, sh=0.4, r1=0.3):
        self.probability = probability
        self.sl = sl
        self.sh = sh
        self.r1 = r1
        
    def __call__(self, img):
        if torch.rand(1) > self.probability:
            return img
            
        img_h, img_w = img.shape[1], img.shape[2]
        area = img_h * img_w
        
        target_area = torch.empty(1).uniform_(self.sl, self.sh).item() * area
        aspect_ratio = torch.empty(1).uniform_(self.r1, 1/self.r1).item()
        
        h = int(round((target_area * aspect_ratio) ** 0.5))
        w = int(round((target_area / aspect_ratio) ** 0.5))
        
        if h < img_h and w < img_w:
            top = torch.randint(0, img_h - h, (1,)).item()
            left = torch.randint(0, img_w - w, (1,)).item()
            
            img[:, top:top+h, left:left+w] = torch.rand(3, h, w)
            
        return img

# 组合使用自定义变换
custom_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    AddNoise(std=0.05),
    RandomErasing(probability=0.3),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

6.4.3 针对不同任务的数据增强策略

# 图像分类增强
classification_transform = transforms.Compose([
    transforms.RandomResizedCrop(224),      # 随机缩放裁剪
    transforms.RandomHorizontalFlip(p=0.5), # 水平翻转
    transforms.ColorJitter(
        brightness=0.2, contrast=0.2,
        saturation=0.2, hue=0.1
    ),
    transforms.RandomRotation(10),          # 轻微旋转
    transforms.RandomAffine(
        degrees=0, translate=(0.1, 0.1),   # 平移
        scale=(0.9, 1.1)                   # 缩放
    ),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    ),
])

# 目标检测增强(需要同时变换图像和边界框)
class DetectionTransform:
    def __init__(self):
        self.image_transform = transforms.Compose([
            transforms.ColorJitter(brightness=0.2, contrast=0.2),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
    
    def __call__(self, image, boxes):
        # 随机水平翻转
        if torch.rand(1) > 0.5:
            image = transforms.functional.hflip(image)
            width = image.width
            boxes = boxes.clone()
            boxes[:, [0, 2]] = width - boxes[:, [2, 0]]  # 翻转x坐标
        
        # 应用图像变换
        image = self.image_transform(image)
        
        return image, boxes

# 分割任务增强
class SegmentationTransform:
    def __init__(self):
        self.base_transform = transforms.Compose([
            transforms.RandomHorizontalFlip(p=0.5),
            transforms.RandomRotation(10),
        ])
    
    def __call__(self, image, mask):
        # 同时应用相同的空间变换到图像和掩码
        seed = torch.randint(0, 100000, (1,)).item()
        
        torch.manual_seed(seed)
        image = self.base_transform(image)
        
        torch.manual_seed(seed)  # 使用相同的随机种子
        mask = self.base_transform(mask)
        
        return image, mask

6.5 数据集分割

6.5.1 使用torch.utils.data.random_split

from torch.utils.data import random_split

# 假设有一个完整的数据集
full_dataset = torchvision.datasets.CIFAR10(
    root='./data',
    train=True,
    download=True,
    transform=transform_train
)

# 定义分割比例
train_size = int(0.8 * len(full_dataset))
val_size = len(full_dataset) - train_size

# 随机分割
train_dataset, val_dataset = random_split(
    full_dataset,
    [train_size, val_size],
    generator=torch.Generator().manual_seed(42)  # 设置随机种子确保可重复性
)

print(f"训练集大小: {len(train_dataset)}")
print(f"验证集大小: {len(val_dataset)}")

# 创建对应的DataLoader
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

6.5.2 分层分割(保持类别比例)

from sklearn.model_selection import StratifiedShuffleSplit
import numpy as np

# 获取所有标签
labels = [label for _, label in full_dataset]

# 使用StratifiedShuffleSplit保持类别比例
sss = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=42)

for train_idx, val_idx in sss.split(np.zeros(len(labels)), labels):
    # 创建训练集和验证集
    train_subset = torch.utils.data.Subset(full_dataset, train_idx)
    val_subset = torch.utils.data.Subset(full_dataset, val_idx)

print(f"训练集类别分布:")
train_labels = [labels[i] for i in train_idx]
print(f"  {np.bincount(train_labels)}")

print(f"验证集类别分布:")
val_labels = [labels[i] for i in val_idx]
print(f"  {np.bincount(val_labels)}")

6.6 实践项目:完整的图像分类流水线

6.6.1 数据流水线构建

import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, random_split
import matplotlib.pyplot as plt
import numpy as np

def build_data_pipeline(dataset_name='CIFAR10', batch_size=64, num_workers=4):
    """构建完整的数据流水线"""
    
    # 数据增强和预处理
    train_transform = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(15),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), 
                           (0.247, 0.243, 0.261))
    ])
    
    test_transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465),
                           (0.247, 0.243, 0.261))
    ])
    
    # 加载数据集
    if dataset_name == 'CIFAR10':
        trainset = torchvision.datasets.CIFAR10(
            root='./data', train=True, 
            download=True, transform=train_transform)
        
        testset = torchvision.datasets.CIFAR10(
            root='./data', train=False,
            download=True, transform=test_transform)
        
        classes = ('plane', 'car', 'bird', 'cat', 'deer',
                  'dog', 'frog', 'horse', 'ship', 'truck')
    
    elif dataset_name == 'MNIST':
        trainset = torchvision.datasets.MNIST(
            root='./data', train=True,
            download=True, transform=transforms.ToTensor())
        
        testset = torchvision.datasets.MNIST(
            root='./data', train=False,
            download=True, transform=transforms.ToTensor())
        
        classes = tuple(str(i) for i in range(10))
    
    else:
        raise ValueError(f"不支持的dataset: {dataset_name}")
    
    # 分割训练集和验证集
    train_size = int(0.8 * len(trainset))
    val_size = len(trainset) - train_size
    trainset, valset = random_split(trainset, [train_size, val_size])
    
    # 创建DataLoader
    trainloader = DataLoader(
        trainset, batch_size=batch_size,
        shuffle=True, num_workers=num_workers,
        pin_memory=True, drop_last=True
    )
    
    valloader = DataLoader(
        valset, batch_size=batch_size,
        shuffle=False, num_workers=num_workers,
        pin_memory=True
    )
    
    testloader = DataLoader(
        testset, batch_size=batch_size,
        shuffle=False, num_workers=num_workers,
        pin_memory=True
    )
    
    return trainloader, valloader, testloader, classes

# 使用数据流水线
trainloader, valloader, testloader, classes = build_data_pipeline()

# 显示一些样本
def imshow(img, mean=None, std=None):
    """显示图像"""
    if mean and std:
        # 反标准化
        img = img * std + mean
    img = img.clamp(0, 1)
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.axis('off')

# 获取一个批次
dataiter = iter(trainloader)
images, labels = next(dataiter)

# 显示
fig, axes = plt.subplots(4, 8, figsize=(12, 6))
for i in range(32):
    ax = axes[i//8, i%8]
    imshow(images[i], mean=(0.4914, 0.4822, 0.4465), 
           std=(0.247, 0.243, 0.261))
    ax.set_title(classes[labels[i]])
plt.tight_layout()
plt.show()

6.6.2 批处理可视化

def show_batch_statistics(dataloader, num_batches=5):
    """显示批次统计信息"""
    print(f"检查 {num_batches} 个批次...")
    
    for batch_idx, (images, labels) in enumerate(dataloader):
        if batch_idx >= num_batches:
            break
            
        print(f"\n批次 {batch_idx}:")
        print(f"  图像形状: {images.shape}")
        print(f"  标签形状: {labels.shape}")
        print(f"  图像值范围: [{images.min():.3f}, {images.max():.3f}]")
        print(f"  图像均值: {images.mean():.3f}, 标准差: {images.std():.3f}")
        print(f"  标签分布: {torch.bincount(labels).tolist()}")
        
        # 计算每个通道的统计
        if images.shape[1] == 3:  # RGB图像
            for ch in range(3):
                channel_data = images[:, ch, :, :]
                print(f"  通道{ch} - 均值: {channel_data.mean():.3f}, "
                      f"标准差: {channel_data.std():.3f}")

# 检查训练数据
show_batch_statistics(trainloader)

6.6.3 高级数据流水线:多任务学习

class MultiTaskDataset(Dataset):
    """支持多任务学习的数据集"""
    def __init__(self, images, labels_dict, transform=None):
        """
        Args:
            images: 图像数据
            labels_dict: 字典,键为任务名,值为对应的标签
            transform: 图像变换
        """
        self.images = images
        self.labels_dict = labels_dict
        self.transform = transform
        
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        image = self.images[idx]
        
        if self.transform:
            image = self.transform(image)
        
        # 返回图像和所有任务的标签
        labels = {task: labels[idx] for task, labels in self.labels_dict.items()}
        
        return image, labels

# 使用示例
images = torch.randn(1000, 3, 32, 32)
labels_dict = {
    'classification': torch.randint(0, 10, (1000,)),
    'regression': torch.randn(1000, 1),
    'segmentation': torch.randn(1000, 32, 32)
}

multi_dataset = MultiTaskDataset(images, labels_dict)
image, labels = multi_dataset[0]

print(f"图像形状: {image.shape}")
print(f"分类标签: {labels['classification']}")
print(f"回归标签: {labels['regression'].shape}")
print(f"分割标签: {labels['segmentation'].shape}")

6.7 性能优化技巧

6.7.1 数据加载优化

def optimize_dataloader(dataset, batch_size=64):
    """优化DataLoader配置"""
    import psutil
    
    # 自动设置num_workers
    num_workers = min(8, psutil.cpu_count(logical=False))  # 物理核心数
    
    # 根据GPU内存自动调整batch_size
    if torch.cuda.is_available():
        gpu_memory = torch.cuda.get_device_properties(0).total_memory
        # 简单启发式:GPU内存的20%用于批次数据
        max_batch_memory = gpu_memory * 0.2
        sample_memory = dataset[0][0].element_size() * dataset[0][0].nelement()
        max_batch_size = int(max_batch_memory / sample_memory)
        batch_size = min(batch_size, max_batch_size)
    
    print(f"优化配置:")
    print(f"  num_workers: {num_workers}")
    print(f"  batch_size: {batch_size}")
    
    loader = DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=num_workers,
        pin_memory=torch.cuda.is_available(),
        prefetch_factor=2 if num_workers > 0 else None,
        persistent_workers=True if num_workers > 0 else False
    )
    
    return loader

6.7.2 数据缓存

class CachedDataset(Dataset):
    """缓存数据集,加速数据加载"""
    def __init__(self, dataset, cache_size=1000):
        self.dataset = dataset
        self.cache = {}
        self.cache_size = cache_size
        
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, idx):
        if idx in self.cache:
            return self.cache[idx]
        
        sample = self.dataset[idx]
        
        # 如果缓存未满,添加新样本
        if len(self.cache) < self.cache_size:
            self.cache[idx] = sample
        
        return sample

7.模型保存和加载

7.1 保存模型

# 训练完成后
# 保存位置
save_path = "net_weights.pth"
torch.save({
    # 权重和偏置以及 缓冲张量
    'model_state_dict': model.state_dict(),
    # 模型的类名或结构定义
    'model_architecture': model.__class__.__name__
}, save_path)

7.2 测试模型

model.eval()
with torch.no_grad():
    test_x = torch.tensor([[1.0], [3.0], [5.0], [7.0], [10.0]])
    test_x = test_x.to(device)
    predictions = model(test_x)

    print("\n测试结果:")
    print("-" * 40)
    for i in range(len(test_x)):
        x_val = test_x[i].cpu().item()
        y_pred = predictions[i].cpu().item()
        y_true = 3 * x_val + 2
        error = abs(y_pred - y_true)
        print(f"输入 x={x_val:4.1f}: 预测={y_pred:7.3f}, 真实={y_true:7.3f}, 误差={error:7.3f}")
    print("-" * 40)

7.3 加载模型

# 加载模型
    checkpoint = torch.load(save_path, map_location=device)

    # 创建新模型实例
    new_model = Net()

    # 加载权重
    new_model.load_state_dict(checkpoint['model_state_dict'])
    new_model.to(device)
    new_model.eval()

    print("模型加载成功!")

7.4 完整步骤

import torch
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset
import matplotlib.pyplot as plt
import os
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'

# 设置设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {device}")


# 定义模型
class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(1, 10)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(10, 1)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x


class MyDataset(Dataset):
    def __init__(self, device='cpu'):
        # 数据保持在CPU上,训练时再移动到设备
        self.x = torch.linspace(1, 10, 100).unsqueeze(1)
        self.y = self.x * 3 + 2
        self.device = device

    def __len__(self):
        return len(self.x)

    def __getitem__(self, index):
        return self.x[index], self.y[index]


# 创建数据集(保持在CPU)
dataset = MyDataset()

# 创建数据加载器
loader = DataLoader(dataset, batch_size=20, shuffle=True)

# 创建模型并移动到设备
model = Net().to(device)
model.train()

# 损失函数和优化器
criterion = nn.MSELoss()
opt = optim.Adam(model.parameters(), lr=0.01)

# 训练模型
losses = []
print("开始训练...")

for epoch in range(100):
    epoch_loss = 0
    batch_count = 0

    for x, y in loader:
        # 将数据移动到设备
        x = x.to(device)
        y = y.to(device)

        # 前向传播
        pred = model(x)
        loss = criterion(pred, y)

        # 反向传播
        opt.zero_grad()
        loss.backward()
        opt.step()

        epoch_loss += loss.item()
        batch_count += 1

    # 计算平均损失
    avg_loss = epoch_loss / batch_count
    losses.append(avg_loss)

    # 每10个epoch打印一次
    if epoch % 10 == 0:
        print(f"Epoch {epoch:3d}: 平均损失 = {avg_loss:.6f}")

print("训练完成!")

plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'DejaVu Sans']  # 设置支持中文的字体
plt.rcParams['axes.unicode_minus'] = False  # 正确显示负号
# 可视化损失曲线
plt.figure(figsize=(10, 4))
plt.plot(losses)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('训练损失曲线')
plt.grid(True)
plt.show()

# 保存模型
save_path = "net_weights.pth"
torch.save({
    # 权重和偏置以及 缓冲张量
    'model_state_dict': model.state_dict(),
    # 模型的类名或结构定义
    'model_architecture': model.__class__.__name__
}, save_path)
print(f"模型保存到: {save_path}")

# 测试训练好的模型
model.eval()
with torch.no_grad():
    test_x = torch.tensor([[1.0], [3.0], [5.0], [7.0], [10.0]])
    test_x = test_x.to(device)
    predictions = model(test_x)

    print("\n测试结果:")
    print("-" * 40)
    for i in range(len(test_x)):
        x_val = test_x[i].cpu().item()
        y_pred = predictions[i].cpu().item()
        y_true = 3 * x_val + 2
        error = abs(y_pred - y_true)
        print(f"输入 x={x_val:4.1f}: 预测={y_pred:7.3f}, 真实={y_true:7.3f}, 误差={error:7.3f}")
    print("-" * 40)

# 加载模型并推理
print("\n加载模型并推理...")
try:
    # 加载模型
    checkpoint = torch.load(save_path, map_location=device)

    # 创建新模型实例
    new_model = Net()

    # 加载权重
    new_model.load_state_dict(checkpoint['model_state_dict'])
    new_model.to(device)
    new_model.eval()

    print("模型加载成功!")

    # 测试推理
    with torch.no_grad():
        # 单个样本推理
        x_single = torch.tensor([[3.0]]).to(device)
        pred_single = new_model(x_single)
        print(f"\n单个样本推理:")
        print(f"输入: {x_single.cpu().item():.1f}")
        print(f"预测: {pred_single.cpu().item():.3f}")
        print(f"真实: {3 * 3 + 2:.3f}")

        # 批量推理
        x_batch = torch.tensor([[1.0], [2.0], [3.0], [4.0], [5.0]]).to(device)
        pred_batch = new_model(x_batch)

        print(f"\n批量推理:")
        for i in range(len(x_batch)):
            x_val = x_batch[i].cpu().item()
            y_pred = pred_batch[i].cpu().item()
            print(f"  x={x_val:.1f}: 预测={y_pred:.3f}, 真实={3 * x_val + 2:.3f}")

except Exception as e:
    print(f"加载模型时出错: {e}")
    print("尝试使用兼容方式加载...")

    # 尝试直接加载权重
    new_model = Net()
    new_model.load_state_dict(torch.load(save_path, map_location='cpu'))
    new_model.eval()

    # CPU推理
    x = torch.tensor([[3.0]])
    with torch.no_grad():
        print(f"输入: {x.item():.1f}")
        print(f"预测: {new_model(x).item():.3f}")
        print(f"真实: {3 * 3 + 2:.3f}")

# 查看模型参数
print("\n模型参数:")
print(f"第一层权重形状: {model.fc1.weight.shape}")
print(f"第一层偏置形状: {model.fc1.bias.shape}")
print(f"第二层权重形状: {model.fc2.weight.shape}")
print(f"第二层偏置形状: {model.fc2.bias.shape}")

# 导出为ONNX格式(可选)
print("\n导出为ONNX格式...")
try:
    dummy_input = torch.randn(1, 1).to(device)
    torch.onnx.export(
        model,
        dummy_input,
        "model.onnx",
        input_names=['input'],
        output_names=['output'],
        dynamic_axes={'input': {0: 'batch_size'}, 'output': {0: 'batch_size'}}
    )
    print("ONNX模型导出成功!")
except Exception as e:
    print(f"ONNX导出失败: {e}")

8.训练过程可视化(TensorBoard)

8.1 安装

pip install tensorboard

8.2 主要命令

初始化记录器

writer = SummaryWriter("./logs")

记录标量

writer.add_scalar("loss", loss, step)

记录模型结构

writer.add_graph(model, input)

结束记录

writer.close()

打开 TensorBoard

tensorboard --logdir=logs

8.3 速查

8.3.1.TensorBoard记录方法速查

记录内容

方法

频率建议

用途

损失值

add_scalar("Loss/Train", loss, step)

每个batch或epoch

监控训练进度

准确率

add_scalar("Accuracy/Train", acc, step)

每个epoch

分类任务评估

学习率

add_scalar("Learning_Rate", lr, step)

每个epoch

调度器监控

参数分布

add_histogram("Weights/fc1", param, epoch)

每10-20epoch

参数健康检查

梯度分布

add_histogram("Gradients/fc1", grad, step)

每50-100step

梯度问题诊断

模型图

add_graph(model, dummy_input)

训练前一次

模型结构验证

预测图像

add_figure("Predictions", fig, step)

训练结束

结果可视化

超参数

add_hparams(hparams, metrics)

训练结束

实验记录

文本信息

add_text("Config", config_str)

训练开始

实验说明

8.3.2.常见问题诊断

现象

可能原因

解决方案

损失不下降

学习率太小、模型太简单、数据有问题

增大学习率、增加模型复杂度、检查数据

损失震荡

学习率太大、批次太小

减小学习率、增大批次大小

验证损失上升

过拟合

增加正则化、早停、数据增强

梯度为0

梯度消失、激活函数问题

使用BatchNorm、换激活函数、调整初始化

权重全0

初始化问题、激活函数问题

调整权重初始化、检查激活函数

训练很慢

模型太大、学习率太小

减少模型复杂度、适当增大学习率

8.4 如何使用

# 避免matplotlib.pyplot 报错
import os
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'

# 创建日志目录(如果不存在)
log_dir = "./logs"
os.makedirs(log_dir, exist_ok=True)

# 初始化TensorBoard writer
writer = SummaryWriter(log_dir=log_dir)

# 定义模型
# 定义数据集
......
......
......

# 使用虚拟输入记录模型图
dummy_input = torch.randn(1, 1).to(device)  # 标准输入形状
writer.add_graph(model, dummy_input)
print("模型图已记录到TensorBoard")
# 全局步数计数器
global_step = 0

print("开始训练...")
......
......
        opt.step()

        writer.add_scalar("Loss/train", loss.item(), global_step)

        # 记录梯度信息(每10个batch记录一次)
        if batch_idx % 10 == 0:
            for name, param in model.named_parameters():
                if param.grad is not None:
                    writer.add_histogram(f"Gradients/{name}", param.grad, global_step)


        epoch_loss += loss.item()
        batch_count += 1
        global_step += 1

    # 计算平均损失
    avg_loss = epoch_loss / batch_count

    # 记录模型参数(每10个epoch记录一次)
    if epoch % 10 == 0:
        for name, param in model.named_parameters():
            writer.add_histogram(f"Parameters/{name}", param.data, epoch)

    # 每10个epoch打印一次
    if epoch % 10 == 0:
        print(f"Epoch {epoch:3d}: 平均损失 = {avg_loss:.6f}")
        # 在TensorBoard中添加文本信息
        writer.add_text("Training Info",
                        f"Epoch {epoch}, Loss: {avg_loss:.6f}",epoch)

print("训练完成!")

print("开始测试")
......
......
    # 计算并记录最终损失
    final_loss = criterion(test_y_pred, test_y_true)
    writer.add_scalar("Loss/Final", final_loss.item(), 0)
    # 记录预测值与真实值的对比(作为图像)
    import matplotlib.pyplot as plt

    plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'DejaVu Sans']  # 设置支持中文的字体
    plt.rcParams['axes.unicode_minus'] = False  # 正确显示负号

    fig, ax = plt.subplots(figsize=(10, 6))

    # 原始训练数据
    train_x = dataset.x.numpy()
    train_y = dataset.y.numpy()
    ax.scatter(train_x, train_y, alpha=0.3, label='训练数据', color='blue')

    # 预测曲线
    test_x_np = test_x.cpu().numpy()
    test_y_pred_np = test_y_pred.cpu().numpy()
    test_y_true_np = test_y_true.cpu().numpy()

    ax.plot(test_x_np, test_y_true_np, 'r-', label='真实函数 (y=3x+2)', linewidth=2)
    ax.plot(test_x_np, test_y_pred_np, 'g--', label='模型预测', linewidth=2)

    ax.set_xlabel('x')
    ax.set_ylabel('y')
    ax.set_title('模型预测 vs 真实函数')
    ax.legend()
    ax.grid(True)

    # 将图像添加到TensorBoard
    writer.add_figure("Predictions/Final", fig, global_step)
    plt.close(fig)

# ----------------- 记录模型参数总结 -----------------
# 记录模型参数数量
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
writer.add_text("Model Info",
                f"总参数: {total_params}\n可训练参数: {trainable_params}",
                0)

# ----------------- 记录超参数和最终指标 -----------------
hparams = {
    "learning_rate": 0.001,
    "batch_size": 20,
    "epochs": 100,
    "optimizer": "Adam",
    "loss_function": "MSE"
}

final_metrics = {
    "final_loss": final_loss.item(),
    "avg_epoch_loss": avg_loss
}

writer.add_hparams(hparams, final_metrics)

# ----------------- 关闭writer -----------------
writer.close()

完整示例:

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from torch.utils.tensorboard import SummaryWriter
import numpy as np
import os
from datetime import datetime

# ============================
# 1. 设置TensorBoard
# ============================

# 创建带时间戳的日志目录,避免覆盖
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_dir = f"./runs/experiment_{timestamp}"
os.makedirs(log_dir, exist_ok=True)

# 创建SummaryWriter
writer = SummaryWriter(log_dir=log_dir)
print(f"📁 TensorBoard日志目录: {log_dir}")
print(f"🔗 启动命令: tensorboard --logdir=runs")
print(f"🌐 访问地址: http://localhost:6006")

# ============================
# 2. 创建数据和模型
# ============================

# 创建模拟数据:y = 3x + 2 + 噪声
n_samples = 500
x_data = torch.rand(n_samples, 1) * 10  # 0-10的随机数
y_data = 3 * x_data + 2 + torch.randn(n_samples, 1) * 1.5  # 加噪声

# 划分训练集和验证集 (80%/20%)
split_idx = int(0.8 * n_samples)
x_train, x_val = x_data[:split_idx], x_data[split_idx:]
y_train, y_val = y_data[:split_idx], y_data[split_idx:]

# 创建数据集和数据加载器
train_dataset = TensorDataset(x_train, y_train)
val_dataset = TensorDataset(x_val, y_val)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)

# 定义模型
class RegressionModel(nn.Module):
    def __init__(self, hidden_size=20):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(1, hidden_size),
            nn.ReLU(),
            nn.Dropout(0.1),  # 添加dropout防止过拟合
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, 1)
        )
    
    def forward(self, x):
        return self.net(x)

# 创建模型
model = RegressionModel(hidden_size=20)

# ============================
# 3. 记录模型图到TensorBoard
# ============================

# 记录模型计算图
dummy_input = torch.randn(1, 1)
writer.add_graph(model, dummy_input)
print("✅ 模型图已记录到TensorBoard")

# ============================
# 4. 记录超参数到TensorBoard
# ============================

# 定义超参数
hyper_params = {
    "learning_rate": 0.01,
    "batch_size": 32,
    "hidden_size": 20,
    "optimizer": "Adam",
    "loss_function": "MSE",
    "epochs": 200,
    "dropout_rate": 0.1
}

# 将超参数添加到TensorBoard
writer.add_text("Hyperparameters", 
               "\n".join([f"{k}: {v}" for k, v in hyper_params.items()]))

# ============================
# 5. 训练循环(重点:各种记录方式)
# ============================

# 设置训练参数
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=hyper_params["learning_rate"])
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.5)

print("🚀 开始训练...")

# 记录最佳验证损失
best_val_loss = float('inf')
global_step = 0  # 全局步数计数器

for epoch in range(hyper_params["epochs"]):
    # ==================== 训练阶段 ====================
    model.train()
    train_losses = []
    
    for batch_idx, (x_batch, y_batch) in enumerate(train_loader):
        x_batch, y_batch = x_batch.to(device), y_batch.to(device)
        
        # 前向传播
        pred = model(x_batch)
        loss = criterion(pred, y_batch)
        
        # 反向传播
        optimizer.zero_grad()
        loss.backward()
        
        # 📊 记录梯度信息(每50个batch记录一次)
        if batch_idx % 50 == 0:
            total_grad_norm = 0
            for name, param in model.named_parameters():
                if param.grad is not None:
                    # 记录每个参数的梯度分布
                    writer.add_histogram(f"Gradients/{name}", param.grad, global_step)
                    # 计算梯度范数
                    grad_norm = param.grad.norm().item()
                    total_grad_norm += grad_norm
            
            # 记录总梯度范数
            writer.add_scalar("Gradients/Total_Norm", total_grad_norm, global_step)
        
        optimizer.step()
        
        # 记录batch损失
        train_losses.append(loss.item())
        
        # 📈 记录每个batch的损失(高频率)
        writer.add_scalar("Loss/Batch_Train", loss.item(), global_step)
        
        global_step += 1
    
    # ==================== 验证阶段 ====================
    model.eval()
    val_losses = []
    
    with torch.no_grad():
        for x_batch, y_batch in val_loader:
            x_batch, y_batch = x_batch.to(device), y_batch.to(device)
            pred = model(x_batch)
            loss = criterion(pred, y_batch)
            val_losses.append(loss.item())
    
    # 计算epoch的平均损失
    avg_train_loss = np.mean(train_losses)
    avg_val_loss = np.mean(val_losses)
    
    # 📈 记录每个epoch的平均损失
    writer.add_scalar("Loss/Epoch_Train", avg_train_loss, epoch)
    writer.add_scalar("Loss/Epoch_Val", avg_val_loss, epoch)
    
    # 📊 记录模型参数分布(每20个epoch记录一次)
    if epoch % 20 == 0:
        for name, param in model.named_parameters():
            writer.add_histogram(f"Parameters/{name}", param.data, epoch)
    
    # 记录学习率
    current_lr = optimizer.param_groups[0]['lr']
    writer.add_scalar("Learning_Rate", current_lr, epoch)
    
    # 更新学习率
    scheduler.step()
    
    # 保存最佳模型
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        torch.save(model.state_dict(), f"{log_dir}/best_model.pth")
        writer.add_text("Best_Model", f"Epoch {epoch}: Val Loss = {avg_val_loss:.4f}")
    
    # 打印进度
    if epoch % 20 == 0:
        print(f"Epoch {epoch:3d}/{hyper_params['epochs']}: "
              f"Train Loss = {avg_train_loss:.4f}, "
              f"Val Loss = {avg_val_loss:.4f}, "
              f"LR = {current_lr:.5f}")
    
    # 📊 记录过拟合程度(训练损失/验证损失比率)
    overfit_ratio = avg_train_loss / avg_val_loss if avg_val_loss > 0 else 1.0
    writer.add_scalar("Metrics/Overfit_Ratio", overfit_ratio, epoch)

# ============================
# 6. 训练后分析
# ============================

print("✅ 训练完成!")

# 加载最佳模型
model.load_state_dict(torch.load(f"{log_dir}/best_model.pth"))
model.eval()

# 记录最终预测结果
with torch.no_grad():
    # 生成测试数据
    test_x = torch.linspace(0, 10, 100).unsqueeze(1).to(device)
    test_y_true = 3 * test_x + 2
    test_y_pred = model(test_x)
    
    # 计算R²分数
    ss_res = ((test_y_true - test_y_pred) ** 2).sum().item()
    ss_tot = ((test_y_true - test_y_true.mean()) ** 2).sum().item()
    r2_score = 1 - ss_res / ss_tot if ss_tot > 0 else 0
    
    # 记录R²分数
    writer.add_scalar("Metrics/R2_Score", r2_score, 0)
    
    # 📊 创建可视化预测图
    import matplotlib.pyplot as plt
    
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))
    
    # 左图:预测vs真实
    ax1.scatter(x_train.cpu().numpy(), y_train.cpu().numpy(), 
                alpha=0.3, label='训练数据', color='blue')
    ax1.scatter(x_val.cpu().numpy(), y_val.cpu().numpy(),
                alpha=0.5, label='验证数据', color='green')
    ax1.plot(test_x.cpu().numpy(), test_y_true.cpu().numpy(),
             'r-', label='真实函数', linewidth=3)
    ax1.plot(test_x.cpu().numpy(), test_y_pred.cpu().numpy(),
             'g--', label='模型预测', linewidth=2)
    ax1.set_xlabel('x')
    ax1.set_ylabel('y')
    ax1.set_title(f'模型预测 (R² = {r2_score:.3f})')
    ax1.legend()
    ax1.grid(True)
    
    # 右图:残差分析
    residuals = test_y_pred - test_y_true
    ax2.hist(residuals.cpu().numpy(), bins=20, alpha=0.7, color='orange')
    ax2.axvline(x=0, color='red', linestyle='--', linewidth=2)
    ax2.set_xlabel('残差 (预测 - 真实)')
    ax2.set_ylabel('频数')
    ax2.set_title('残差分布')
    ax2.grid(True)
    
    # 将图像添加到TensorBoard
    writer.add_figure("Analysis/Predictions_and_Residuals", fig, 0)
    plt.close(fig)
    
    print(f"📊 模型性能:")
    print(f"   - 训练样本数: {len(x_train)}")
    print(f"   - 验证样本数: {len(x_val)}")
    print(f"   - 最佳验证损失: {best_val_loss:.4f}")
    print(f"   - R²分数: {r2_score:.4f}")

# ============================
# 7. 记录最终结果和关闭writer
# ============================

# 记录最终超参数和指标
final_metrics = {
    "best_val_loss": best_val_loss,
    "r2_score": r2_score,
    "total_steps": global_step
}

# 添加超参数和指标的综合视图
writer.add_hparams(hyper_params, final_metrics)

# 添加总结文本
summary_text = f"""
训练总结:
- 总训练步数: {global_step}
- 最佳验证损失: {best_val_loss:.4f}
- R²分数: {r2_score:.4f}
- 训练时间: {timestamp}

模型结构:
- 输入层: 1个神经元
- 隐藏层1: 20个神经元 (ReLU激活)
- Dropout: 0.1
- 隐藏层2: 20个神经元 (ReLU激活)
- 输出层: 1个神经元
"""
writer.add_text("Summary", summary_text)

# 强制写入并关闭writer
writer.flush()
writer.close()

print("📤 TensorBoard数据已保存")
print("=" * 60)
print("🎯 如何在TensorBoard中查看:")
print("1. 打开终端,运行: tensorboard --logdir=runs")
print("2. 浏览器打开: http://localhost:6006")
print("\n📊 推荐查看顺序:")
print("1. SCALARS面板 - 看损失曲线、学习率变化")
print("2. HISTOGRAMS面板 - 看参数和梯度分布")
print("3. GRAPHS面板 - 看模型结构")
print("4. IMAGES面板 - 看预测结果图")
print("5. HPARAMS面板 - 看超参数效果")

8.5 如何解读

8.5.1 标量图(Scalars)- 看趋势

# 这些图表显示数值随时间的变化
"""
📈 常见标量图:
1. Loss (损失) - 训练的心脏指标
   - 训练损失应该稳步下降
   - 验证损失应该先降后平或微升(防过拟合)

2. Accuracy (准确率) - 分类任务的核心
   - 训练准确率稳步上升
   - 验证准确率应该同步上升

3. Learning Rate (学习率) - 训练的"油门"
   - 如果使用调度器,可以看到学习率变化

4. Gradient Norm (梯度范数) - 训练的"稳定性"
   - 太大可能梯度爆炸,太小可能梯度消失
"""

# TensorBoard中的样子:
# 横轴:训练步数(Step)或轮数(Epoch)
# 纵轴:数值(Value)
# 多条曲线:可以对比训练集/验证集
  • 理想情况:训练和验证损失同步下降,最后平稳

  • ⚠️ 过拟合:训练损失下降,验证损失上升(gap增大)

  • ⚠️ 欠拟合:训练和验证损失都很高,下降缓慢

  • ⚠️ 震荡:曲线剧烈波动(学习率可能太大)

8.5.2 直方图(Histograms)- 看分布

"""
📊 常见直方图:
1. 权重分布 - 参数的"健康状态"
   - 应该是钟形分布(正态或接近正态)
   - 不应该全是0或极端值

2. 梯度分布 - 训练的"反馈信号"
   - 应该是中心在0附近的对称分布
   - 不应该全是0(梯度消失)或极大值(梯度爆炸)

3. 激活值分布 - 神经元的"活跃程度"
   - ReLU网络应该有大量0值(稀疏性)
   - 不应该全是0(神经元死亡)
"""

# TensorBoard中的样子:
# 横轴:参数值范围
# 纵轴:数量/频率
# 随时间变化:可以看到分布如何演变
  • 健康的权重:钟形分布,均值在0附近

  • ⚠️ 权重过大:分布范围很宽,可能需权重衰减

  • ⚠️ 权重全0:模型可能没有学习

  • ⚠️ 梯度全0:可能梯度消失,考虑换激活函数

8.5.3 模型图(Graphs)- 看结构

"""
🏗️ 模型计算图:
1. 操作节点 - 计算步骤
2. 数据边 - 数据流向
3. 参数节点 - 可学习参数

# TensorBoard中的样子:
# 节点:各种计算操作(MatMul, Add, ReLU等)
# 边:数据流向
# 颜色:不同类型操作
"""
  • 检查数据流向是否正确

  • 查看参数数量和各层维度

  • 确认是否有意外的操作

8.5.4 PR曲线和ROC曲线 - 看分类性能

"""
🎯 分类任务专用:
1. PR曲线(Precision-Recall)
   - 横轴:召回率(Recall)
   - 纵轴:精确率(Precision)
   - 曲线越高越好(左上角最好)

2. ROC曲线(Receiver Operating Characteristic)
   - 横轴:假正率(False Positive Rate)
   - 纵轴:真正率(True Positive Rate)
   - 曲线越高越好(左上角最好)
   - AUC面积:0.5-1.0,越大越好
"""

8.5.5 常见模式识别

# 健康训练(理想):
# 训练损失 ↘ ↘ ↘ → → (稳定下降后平稳)
# 验证损失 ↘ ↘ ↘ → → (同步下降)
# 学习率     → → ↘ ↘ ↘ (逐渐降低)

# 过拟合:
# 训练损失 ↘ ↘ ↘ ↘ ↘ (持续下降)
# 验证损失 ↘ ↘ ↗ ↗ ↗ (先降后升)
# 差距      → → ↗ ↗ ↗ (越来越大)

# 欠拟合:
# 训练损失 → → → → → (基本不变)
# 验证损失 → → → → → (基本不变)
# 学习率可能需要增加

# 学习率太大:
# 训练损失 ↘ ↗ ↘ ↗ ↘ (剧烈震荡)
# 验证损失 ↗ ↘ ↗ ↘ ↗ (同步震荡)

9.非线性函数拟合(逻辑回归)

9.1 y = sin(x)

# 定义一个三层神经网络
class MLP(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(1, 32 ) 
        self.act1 = nn.Tanh()
        self.fc2 = nn.Linear(32, 32 )
        self.act2 = nn.Tanh()
        self.out = nn.Linear(32, 1)

    def forward(self, x):
        x = self.act1(self.fc1(x))
        x = self.act2(self.fc2(x))
        return self.out(x)
  • 激活函数:

    • Tanh 在 [-1,1] 范围更适合拟合sin

    • ReLU 也可以,但效果略差

  • 隐层神经元:10~64 都可以
    推荐:32 → 32 → 1

输入 → 32 → Tanh → 32 → Tanh → 输出

9.2 分类

类型

输入

输出

示例

损失函数

回归

数值

数值

预测房价

MSELoss

分类

数值

类别

猫/狗识别

CrossEntropyLoss

import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt

plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'DejaVu Sans']  # 设置支持中文的字体
plt.rcParams['axes.unicode_minus'] = False  # 正确显示负号

# ====== 1️⃣ 生成数据 ======
torch.manual_seed(42)
N = 200

# 生成 -1 到 1 的 200 个数
X = torch.randn(N, 1)
y = (X > 0).long().view(-1)   # 大于0类=1,小于0类=0   标签必须是 long 类型!

# 拆分训练和测试集
train_X, test_X = X[:150], X[150:]
train_y, test_y = y[:150], y[150:]


# ====== 2️⃣ 构建分类模型 ======
class Classifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(1, 16)   # TODO:你可以修改隐藏层神经元数
        self.act1 = nn.Tanh()         # TODO:选择 Tanh 或 ReLU
        self.out = nn.Linear(16, 2)   # 输出 2 类(0 和 1)

    def forward(self, x):
        x = self.act1(self.fc1(x))
        return self.out(x)


model = Classifier()

loss_fn = nn.CrossEntropyLoss()      # 分类任务必用交叉熵
optimizer = optim.Adam(model.parameters(), lr=0.01)


# ====== 3️⃣ 训练 ======
for epoch in range(1000):
    optimizer.zero_grad()
    output = model(train_X)

    loss = loss_fn(output, train_y)
    loss.backward()
    optimizer.step()

    if epoch % 200 == 0:
        print(f"epoch={epoch}, loss={loss.item():.4f}")


# ====== 4️⃣ 测试 ======
with torch.no_grad():
    pred = torch.argmax(model(test_X), dim=1)
    accuracy = (pred == test_y).float().mean().item()
    print(f"\n测试集准确率:{accuracy:.3f}")

# ====== 5️⃣ 可视化决策边界 ======
plt.figure(figsize=(12, 4))

# 子图1:原始数据和预测
plt.subplot(1, 2, 1)
plt.scatter(train_X.numpy(), train_y.numpy(), alpha=0.5, label='训练数据')
plt.scatter(test_X.numpy(), test_y.numpy(), alpha=0.5, label='测试数据', marker='x')

# 生成测试点并预测
x_test_range = torch.linspace(-3, 3, 300).reshape(-1, 1)
with torch.no_grad():
    pred_probs = torch.softmax(model(x_test_range), dim=1)
    pred_class = torch.argmax(pred_probs, dim=1)

# 绘制决策边界(概率变化处)
plt.plot(x_test_range.numpy(), pred_class.numpy(), 'r-', linewidth=2, label='决策边界')
plt.axvline(x=0, color='green', linestyle='--', alpha=0.5, label='真实边界 x=0')
plt.xlabel('X')
plt.ylabel('y / 预测类别')
plt.title('数据点与决策边界')
plt.legend()
plt.grid(True, alpha=0.3)

# 子图2:类别1的预测概率
plt.subplot(1, 2, 2)
plt.plot(x_test_range.numpy(), pred_probs[:, 1].numpy(), 'b-', linewidth=2, label='P(y=1|x)')
plt.axvline(x=0, color='green', linestyle='--', alpha=0.5, label='真实边界 x=0')
plt.axhline(y=0.5, color='red', linestyle='--', alpha=0.5, label='决策阈值 0.5')
plt.xlabel('X')
plt.ylabel('预测概率')
plt.title('类别1的概率曲线')
plt.legend()
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# ====== 6️⃣ 检查边界精确位置 ======
# 找到概率最接近0.5的点,即决策边界
prob_diff = torch.abs(pred_probs[:, 1] - 0.5)
boundary_idx = torch.argmin(prob_diff)
boundary_x = x_test_range[boundary_idx].item()
print(f"\n模型决策边界位置: x ≈ {boundary_x:.6f}")
print(f"与真实边界 x=0 的偏差: {abs(boundary_x):.6f}")
  • 分类一般使用的损失函数为:CrossEntropyLoss(nn.CrossEntropyLoss(),它内部已包含 Softmax)

特性

Softmax

CrossEntropyLoss

作用

将 logits 转换为概率分布

衡量预测概率与真实分布的差异

输出范围

[0, 1],总和为 1

标量值(损失值)

何时使用

需要获得概率时(如展示给用户)

训练时计算损失

PyTorch 特点

torch.softmax()

nn.CrossEntropyLoss() 已包含 Softmax

  • 准确值的计算

1.pred = torch.argmax(model(test_X), dim=1)
# 假设 test_X 有 50 个样本(因为训练150个,测试50个)
# test_X 形状: (50, 1)

output = model(test_X)  
# output 形状: (50, 2) - 每个样本对应2个类别的logits

pred = torch.argmax(output, dim=1)
# pred 形状: (50,) - 每个样本的预测类别索引

# 假设 model(test_X) 返回前3个样本的输出:
# 样本1: [2.1, -0.3]  # 类别0的logit=2.1, 类别1的logit=-0.3
# 样本2: [0.5, 1.8]   # 类别0=0.5, 类别1=1.8
# 样本3: [-1.2, 0.4]  # 类别0=-1.2, 类别1=0.4

# torch.argmax(..., dim=1) 对每个样本取最大值的索引:
# 样本1: max([2.1, -0.3]) 在索引0 → 预测为0
# 样本2: max([0.5, 1.8]) 在索引1 → 预测为1  
# 样本3: max([-1.2, 0.4]) 在索引1 → 预测为1

# 最终 pred = [0, 1, 1, ...]


2.accuracy = (pred == test_y).float().mean().item()
# 假设:
# pred = [0, 1, 1, 0, 1]  (模型预测的5个样本)
# test_y = [0, 1, 0, 0, 1] (真实的5个标签)

# 第一步: (pred == test_y) - 逐元素比较
comparison = pred == test_y
# 结果: [True, True, False, True, True]

# 第二步: .float() - 布尔值转浮点数
float_comparison = comparison.float()
# 结果: [1.0, 1.0, 0.0, 1.0, 1.0]

# 第三步: .mean() - 计算平均值
accuracy_tensor = float_comparison.mean()
# 结果: (1.0 + 1.0 + 0.0 + 1.0 + 1.0) / 5 = 0.8

# 第四步: .item() - 张量转Python标量
accuracy_value = accuracy_tensor.item()
# 结果: 0.8 (Python浮点数)

9.2.1 多分类

class MLP(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(28*28, 256)
        self.act1 = nn.ReLU()
        self.fc2 = nn.Linear(256, 128)
        self.act2 = nn.ReLU()
        self.out = nn.Linear(128, 10)

    def forward(self, x):
        x = x.view(x.size(0), -1)  # 展平成 (B, 784)
        x = self.act1(self.fc1(x))
        x = self.act2(self.fc2(x))
        return self.out(x)

Tanh vs Sigmoid vs ReLU

  • Tanh (双曲正切):输出范围[-1, 1]

    • 优点:以0为中心,梯度更大(导数范围0~1)

    • 缺点:梯度消失问题(特别是深层网络)

    • 适合:RNN、中间层、输出需要负值时

  • Sigmoid:输出范围(0, 1)

    • 优点:输出可解释为概率

    • 缺点:梯度消失严重,不以0为中心

    • 适合:二分类输出层(配合BCE损失)

  • ReLU:max(0, x)

    • 优点:计算简单,缓解梯度消失

    • 缺点:"神经元死亡"问题(负值梯度为0)

    • 适合:大多数现代神经网络的隐藏层

  • Softmax:输出总和为1的概率分布

    • 只适合多分类的输出层

神经元数量与训练的关系

# 模型结构分析
self.fc1 = nn.Linear(28*28, 256)  # 第一层:784 → 256个神经元
self.fc2 = nn.Linear(256, 512)     # 第二层:256 → 512个神经元
self.out = nn.Linear(512, 10)      # 输出层:512 → 10个神经元(对应10个数字)

# 神经元数量的影响:
# 1. 太少:欠拟合(模型太简单,无法学习复杂模式)
#    - 示例:784 → 16 → 16 → 10
#    - 结果:训练和测试准确率都低
    
# 2. 太多:过拟合(模型太复杂,记忆训练数据)
#    - 示例:784 → 2048 → 2048 → 10
#    - 结果:训练准确率高,测试准确率低
    
# 3. 合适:平衡(如示例的256 → 512)
#    - 需要根据任务复杂度调整
#    - 可以通过实验(超参数调优)确定

经验法则:

  • 第一层通常比输入维度小(降维)

  • 后续层可以逐渐增加(学习更抽象特征)

  • MNIST相对简单,256-512足够

  • 更复杂任务(如CIFAR-10)可能需要1024+个神经元

为什么需要展平(flatten)

def forward(self, x):
    x = x.view(x.size(0), -1)  # 关键步骤:展平
    
    # MNIST图像形状变化:
    # 输入:x.shape = [batch_size, 1, 28, 28]
    #       批量大小, 通道数, 高度, 宽度
    
    # 展平后:x.shape = [batch_size, 784]
    #         批量大小, 28*28=784个像素
  1. 全连接层要求nn.Linear只能处理一维输入

  2. 保持批处理x.size(0)保留batch维度

  3. 自动计算-1让PyTorch自动计算展平后的维度

10.CNN卷积神经网络

10.1 数据处理

# 导入必要的库
import torch
from torch import nn, optim  # nn: 神经网络模块,optim: 优化器
from torchvision import datasets, transforms  # 计算机视觉数据集和变换
from torch.utils.data import DataLoader  # 数据加载器

# ------- 数据处理 -------
transform = transforms.ToTensor()  # 将PIL图像或numpy数组转换为PyTorch张量
# ToTensor()完成:
# 1. 将图像从[0,255]缩放到[0.0,1.0]
# 2. 将形状从(H,W)改为(C,H,W) - 添加通道维度

train_data = datasets.MNIST(
    root="./mnist",        # 数据存储路径
    train=True,            # 加载训练集(False则加载测试集)
    transform=transform,   # 应用上述变换
    download=True          # 如果本地没有数据则下载
)

test_data = datasets.MNIST(
    root="./mnist",
    train=False,           # 加载测试集
    transform=transform,
    download=True
)

# DataLoader:将数据分批加载,支持shuffle和多线程
train_loader = DataLoader(
    train_data, 
    batch_size=64,  # 每批64个样本
    shuffle=True    # 每个epoch打乱数据顺序
)

test_loader = DataLoader(
    test_data, 
    batch_size=64   # 测试时不需要shuffle
)

10.2 CNN模型定义部分

# ------- CNN模型 -------
class CNN(nn.Module):  # 继承nn.Module基类
    def __init__(self):
        super().__init__()  # 调用父类初始化
        
        # 卷积层1: 输入1通道(灰度图), 输出16通道, 3x3卷积核, padding=1
        self.conv1 = nn.Conv2d(1, 16, 3, padding=1)
        # 参数解释:
        # - in_channels=1: 输入通道数(黑白图像=1, RGB图像=3)
        # - out_channels=16: 输出通道数(即卷积核数量)
        # - kernel_size=3: 卷积核大小3x3
        # - padding=1: 在图像四周填充1像素,保持特征图尺寸不变
        
        # 最大池化层: 2x2窗口,步长2
        self.pool = nn.MaxPool2d(2, 2)
        # 作用: 下采样,减少特征图尺寸,增加感受野
        
        # 卷积层2: 输入16通道, 输出32通道, 3x3卷积核, padding=1
        self.conv2 = nn.Conv2d(16, 32, 3, padding=1)
        
        # 全连接层: 输入32*7*7=1568维, 输出10维(对应10个数字)
        self.fc = nn.Linear(32*7*7, 10)
        # 为什么是32*7*7?
        # 原始输入: 28x28
        # conv1后: 28x28 (padding=1保持尺寸)
        # pool1后: 14x14 (尺寸减半)
        # conv2后: 14x14
        # pool2后: 7x7 (尺寸再减半)
        # 通道数: 32
        # 总维度: 32 * 7 * 7 = 1568

    def forward(self, x):
        # x形状: [batch_size, 1, 28, 28]
        
        # 第一层: 卷积 -> ReLU激活 -> 池化
        x = self.pool(torch.relu(self.conv1(x)))
        # conv1(x): [batch_size, 16, 28, 28]
        # relu: 非线性激活,保留正数,负数变0
        # pool: [batch_size, 16, 14, 14]
        
        # 第二层: 卷积 -> ReLU激活 -> 池化
        x = self.pool(torch.relu(self.conv2(x)))
        # conv2(x): [batch_size, 32, 14, 14]
        # relu: [batch_size, 32, 14, 14]
        # pool: [batch_size, 32, 7, 7]
        
        # 展平: 将4D张量变为2D,为全连接层准备
        x = x.view(x.size(0), -1)
        # view作用: 改变张量形状
        # x.size(0): batch_size
        # -1: 自动计算剩余维度(32*7*7=1568)
        # 结果: [batch_size, 1568]
        
        # 全连接层输出
        return self.fc(x)  # [batch_size, 10]
        # 注意: 没有使用softmax,因为CrossEntropyLoss自带softmax

10.3 模型初始化与损失函数

# 创建模型实例
model = CNN()

# 损失函数: 交叉熵损失(用于多分类)
criterion = nn.CrossEntropyLoss()
# CrossEntropyLoss = Softmax + 负对数似然损失
# 输入: 模型输出的原始分数(logits), 形状[batch_size, 10]
# 目标: 真实标签, 形状[batch_size]

# 优化器: Adam优化算法
optimizer = optim.Adam(model.parameters(), lr=0.001)
# Adam: 自适应学习率优化器,结合了Momentum和RMSProp的优点
# model.parameters(): 获取模型所有可训练参数
# lr=0.001: 学习率,控制参数更新步长

10.4 训练过程

# ------- 训练 -------
for epoch in range(1):  # 只训练1个epoch
    total_loss = 0  # 累计损失
    correct = 0     # 累计正确预测数
    
    # 遍历训练数据的所有批次
    for x, y in train_loader:
        # x: [64, 1, 28, 28], y: [64]
        
        # 清零梯度 (重要!)
        optimizer.zero_grad()
        # 每次迭代前必须清零梯度,否则梯度会累积
        
        # 前向传播
        pred = model(x)  # [64, 10]
        # 模型计算预测值
        
        # 计算损失
        loss = criterion(pred, y)
        # 比较预测值和真实值
        
        # 反向传播
        loss.backward()
        # 计算损失对每个参数的梯度
        
        # 参数更新
        optimizer.step()
        # 根据梯度和学习率更新参数
        
        # 统计信息
        total_loss += loss.item()  # 累加损失值
        # .item(): 从单元素张量中提取Python数值
        
        # 计算本批次正确数
        correct += (pred.argmax(1) == y).sum().item()
        # pred.argmax(1): 获取每个样本预测的最高分数索引
        # == y: 与真实标签比较,返回布尔张量
        # .sum(): 统计True的数量
        # .item(): 提取数值
    
    # 计算并打印训练准确率
    train_acc = correct / len(train_data)
    # len(train_data): 60000 (MNIST训练集大小)
    print(f"epoch=0, loss={total_loss:.4f}, acc={train_acc:.4f}")

10.5 测试

# ------- 测试 -------
correct = 0  # 正确预测计数

# with torch.no_grad(): 禁用梯度计算,节省内存和计算资源
with torch.no_grad():
    # 遍历测试集
    for x, y in test_loader:
        # 前向传播(无梯度)
        pred = model(x)
        
        # 统计本批次正确预测数
        correct += (pred.argmax(1) == y).sum().item()
        # pred.argmax(1): 预测类别
        # == y: 比较预测和真实
        # .sum(): 统计正确数
        # .item(): 转换为Python数值
        # 获取真实值 和预测值
        pred_classes = pred.argmax(dim=1)
        for i in range(len(y)):
            print(f" 真实={y[i].item()} | 预测={pred_classes[i].item()}")

# 计算测试准确率
test_acc = correct / len(test_data)  # len(test_data)=10000
print("Test Accuracy:", test_acc)

10.6 关键概念解释

# 1. 卷积层的作用
"""
卷积层通过卷积核在图像上滑动,提取局部特征:
- 边缘检测
- 纹理提取
- 特征组合

每个卷积核学习一种特征,16个卷积核学习16种不同特征。
"""

# 2. 激活函数的作用
"""
ReLU(Rectified Linear Unit): max(0, x)
作用:
- 引入非线性,使网络能学习复杂模式
- 缓解梯度消失问题
- 计算简单
"""

# 3. 池化层的作用
"""
最大池化(MaxPooling):
- 降低特征图尺寸,减少计算量
- 增加感受野(看到更大区域)
- 提供平移不变性(轻微位置变化不影响结果)
"""

# 4. 全连接层的作用
"""
将学到的特征映射到输出类别:
- 综合所有局部特征
- 进行最终分类决策
"""

# 5. 训练过程总结
"""
1. 前向传播:输入→模型→预测
2. 计算损失:预测与真实的差距
3. 反向传播:计算梯度(损失对参数的导数)
4. 参数更新:沿梯度反方向调整参数
5. 重复:直到模型收敛
"""

# 6. 为什么测试时用torch.no_grad()
"""
- 节省内存:不保存中间结果的梯度
- 加速计算:不进行梯度相关计算
- 测试时不需要更新参数,只需要前向传播
"""

10.6.1 卷积层:像放大镜一样提取特征

  • 你有一个28×28的手写数字图片

  • 卷积层就像拿着一个3×3的放大镜在图片上滑动

  • 每次放大镜看到的一小部分,都转换成一种特征

# 可视化理解卷积
# 原始图片 (28×28)
原始图片 = [
    [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
    [0,0,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],  # 可能是一个竖线的开始
    [0,0,1,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
    ...  # 省略
]

# 3×3卷积核示例 (检测竖线)
卷积核 = [
    [-1, 0, 1],   # 左边暗,中间不变,右边亮 → 检测垂直边缘
    [-1, 0, 1],
    [-1, 0, 1]
]

# 卷积操作:把卷积核放在图片上,对应位置相乘再相加
# 如果卷积核区域正好是一个竖线,结果值会很大
# 如果不是竖线,结果值很小

代码中的卷积层:self.conv1 = nn.Conv2d(1, 16, 3, padding=1)

  • 1→16:从1个输入通道变成16个输出通道

  • 为什么是16个?:相当于有16个不同的放大镜(卷积核),每个放大镜寻找不同的特征

    • 放大镜1:找竖线

    • 放大镜2:找横线

    • 放大镜3:找左上到右下的斜线

    • 放大镜4:找圆形

    • ... 共16种特征探测器

10.6.2 池化层:压缩信息,抓重点

  • 你看完一篇文章,要写总结

  • 你不会记住每个字,而是记住关键信息

  • 池化层就是做这个:保留最重要的信息,忽略细节位置变化

# 最大池化 (MaxPooling) 示例
# 假设有一个4×4的特征图
特征图 = [
    [1, 3, 2, 4],   # 窗口1: 1,3,2,0 → 最大值是3
    [2, 9, 1, 5],   # 窗口2: 9,1,5,6 → 最大值是9
    [0, 1, 5, 6],
    [7, 8, 3, 2]
]

# 使用2×2的池化窗口,步长2
池化后 = [
    [3, 4],   # 第一个2×2窗口的最大值是3, 第二个窗口最大值是4
    [8, 6]    # 第三个窗口最大值是8, 第四个窗口最大值是6
]

为什么需要池化?

  1. 降低维度:28×28 → 14×14 → 7×7,计算量大大减少

  2. 位置不变性:数字"5"在图片中间还是偏左一点,池化后特征类似

  3. 防止过拟合:丢弃一些细节,让模型更关注重要特征

10.6.3 全连接层:做最终决策

这一层的每个输入节点都连接到了每个输出节点

# 全连接层的数学原理
# 输入: x = [x1, x2, x3, ..., x1568]   (1568个特征)
# 输出: y = [y1, y2, ..., y10]         (10个数字的得分)

# 计算过程:
y1 = w11*x1 + w12*x2 + ... + w1_1568*x1568 + b1
y2 = w21*x1 + w22*x2 + ... + w2_1568*x1568 + b2
...
y10 = w10_1*x1 + w10_2*x2 + ... + w10_1568*x1568 + b10

# w是权重,b是偏置
# 每个输出节点(y_i)都连接到所有输入节点(x1...x1568) → 所以叫"全连接"

代码中的全连接层:self.fc = nn.Linear(32*7*7, 10)

  • 输入维度:32×7×7 = 1568

  • 输出维度:10(对应10个数字0-9)

  • 为什么是这个数字?

    • 32:最后一个卷积层的通道数(有32种特征)

    • 7×7:经过两次池化后,图片从28×28变成7×7

    • 所以有32×7×7=1568个特征点要输入到全连接层

# 全连接层的内部工作原理
class SimpleLinearLayer:
    def __init__(self, input_size, output_size):
        # 创建权重矩阵 W: [output_size, input_size]
        # 创建偏置向量 b: [output_size]
        self.weights = torch.randn(output_size, input_size)
        self.bias = torch.randn(output_size)
    
    def forward(self, x):
        # x: [batch_size, input_size]
        # 输出: [batch_size, output_size]
        # 计算: y = x @ W.T + b
        return torch.matmul(x, self.weights.T) + self.bias

# 比较 PyTorch 的 nn.Linear
linear_layer = nn.Linear(1568, 10)
print(f"权重形状: {linear_layer.weight.shape}")  # [10, 1568]
print(f"偏置形状: {linear_layer.bias.shape}")    # [10]

# 这意味着:
# - 有10个输出神经元(对应10个数字)
# - 每个输出神经元有1568个权重(连接1568个输入)
# - 每个输出神经元有一个偏置

比喻

作用

输入→输出

卷积层

特征侦探

提取局部特征

[1,28,28] → [16,28,28]

池化层

信息压缩器

下采样,保留关键信息

[16,28,28] → [16,14,14]

全连接层

决策委员会

综合所有特征做分类

[1568] → [10]

10.6.4 整个CNN的流程比喻

想象你在看手写数字"5":

  1. 卷积层1:像小侦探找简单特征

    • "这里有个竖线"

    • "这里有个弯钩"

    • "这里有个横线"

  2. 池化层1:像小组长做简报

    • "第1区最重要的特征是:有竖线"

    • "第2区最重要的特征是:有弯钩"

    • 忽略具体位置和微小变化

  3. 卷积层2:像经理组合特征

    • "竖线+弯钩 = 可能是数字5的上半部分"

    • "横线+圆 = 可能是数字8的下半部分"

  4. 池化层2:像总监做总结

    • "整体来看,像数字5"

    • "也有点像数字6,但概率较低"

  5. 全连接层:像CEO做最终决策

    • 综合考虑所有信息

    • "根据所有特征,这是数字5的可能性最大"

    • 输出:数字5的得分最高

10.6.5 为什么 CNN 比全连接网络强

  • 图片的特征具有两个核心性质

性质

解释

举例

空间局部性

相邻像素相关联

一条线是由相邻像素组成的

平移不变性

特征出现在哪里都一样

数字 7 在左边 or 右边都还是 7

  • 全连接层(MLP)的问题是什么?

    • 如果用全连接网络处理图像:

      • 把图片每个像素都当成独立信息

      • 打散所有空间结构 → 毁掉了图片的形状特征

输入维度

参数量举例

MNIST:28×28 = 784

784×512 ≈ 40万参数

CIFAR10:3×32×32 = 3072

3072×1024 ≈ 300万参数

  • CNN 的核心能力

能力

如何做到

利用局部性

小卷积核只关注局部像素

利用平移不变性

同一个卷积核在整张图“滑动”,共享参数

提取深层特征

多层卷积从边缘 → 纹理 → 形状

  • 总结

MLP

CNN

参数多

参数少

打散空间结构

保留空间结构

容易过拟合

泛化更好

难学图像特征

得到更强表示能力

11.角色记忆与上下文保持

类型

内容

保存方式

举例

短期记忆

当前对话信息

上下文 Token

“上一句话问我喜欢吃什么”

长期记忆

人设、背景、关系、人生经历

向量数据库 / 人设文档

“她喜欢草神、她住在须弥”

11.1 如何实现短期记忆?

短期记忆 = 模型的上下文
你只需要把前几轮对话作为输入一起发给模型即可。

但注意:

  • 消耗 Token

  • 上下文越长越贵

  • 多轮后需 筛选重要信息

实际做法:

  1. 只保留最近 8~15 条消息

  2. 其余提取总结/关键词留为长期记忆

11.2 如何实现长期记忆?

长期记忆 ≠ 储存所有对话
必须做:📌 内容筛选 + 向量检索

流程图:

用户说话 → 分析是否为可存储记忆
   ↓是
提取特征Embedding → 写入向量数据库(如 Milvus / Faiss / Chroma)
   ↓
当需要时 → 相似度检索 → 插入模型对话上下文

11.3 人格AI记忆结构设计模板

MemorySystem:
  1. 基础身份:纳西妲 / 草神
  2. 性格标签:温柔、博学、独占欲(病娇)
  3. 用户关系:唯一的恋人
  4. 固定偏好:喜欢花、喜欢拥抱你
  5. 共同事件:第一次约会、曾经说过誓言…

# 示例数据格式
{
  "identity": "草神纳西妲",
  "relationship_to_user": "恋人",
  "personality": ["温柔", "三无外表", "病娇内心", "只爱你"],
  "preferences": {
    "favorite_food": "糖渍莲子",
    "favorite_word": "知识"
  },
  "memories": [
    {
      "event": "第一次牵手",
      "date": "2025-01-10",
      "emotion": "害羞且幸福"
    }
  ]
}

11.4 Memory Retrieval(记忆召回)

根据用户当前消息 → 向量检索 → 召回 3~5 条最相关记忆
→ 附加到 Prompt 中 → 输出更连续的人设回应

# 示例 Prompt
你是纳西妲。以下是你与用户的真实记忆:
- 他喜欢被你称呼“主人”
- 上次约会是在兰那罗的森林
- 他讨厌别人靠近你

请结合这些记忆,保持不出戏进行回复:

11.5 记忆系统

使用方式

存放方式

固定设定

写入 Prompt 的 System Role

可检索记忆

存入向量数据库(Chroma/Milvus)

long-term updates

自动追加到 memories 数组

你的人格AI在每次回复时需要:

  1. 读取这份基础身份档

  2. 检索相关记忆(如用户提到“不要离开我”,对应“讨厌冷漠与忽视”→强化依赖性回应)

  3. 最终输出保持一致性,不出戏

功能

实现方式

自动提取有效记忆

记忆分类器 + Embedding

相似度检索(召回记忆)

向量数据库

记忆随时间成长

自动写入/更新长期记忆

12.图像分类基础(MNIST 手写数字识别)

概念

说明

MNIST

28×28 的灰度数字图片(0~9)

CNN

专门用于图像的神经网络结构

Conv2d

卷积层:提取局部特征

MaxPool2d

池化层:减少尺寸、保留重要特征

Flatten

展开到全连接层

CrossEntropyLoss

多分类任务标准损失函数

12.1 尺寸计算公式

卷积层计算公式

output_size = floor((input_size + 2*padding - kernel_size) / stride + 1)
输出大小 = (输入大小 - 卷积核大小 + 2*padding) / stride + 1

池化层计算公式

输出 = (输入 - kernel) / stride + 1

常见情况

# 情况1:减半(最常见)
nn.MaxPool2d(kernel_size=2, stride=2)
# 28 → 14, 14 → 7 (每次减半)

# 情况2:不减半
nn.MaxPool2d(kernel_size=2, stride=1)
# 28 → 27 (只减少1)

# 情况3:减为1/3
nn.MaxPool2d(kernel_size=3, stride=3)
# 28 → 9 (28/3≈9.33,向下取整为9)
import torch
from torch import nn, optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

# 数据预处理
transform = transforms.ToTensor()

train_dataset = datasets.MNIST(root="./data", train=True, download=True, transform=transform)
test_dataset = datasets.MNIST(root="./data", train=False, download=True, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader  = DataLoader(test_dataset, batch_size=64)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ① 定义CNN模型(补全)
class CNN(nn.Module):
    def __init__(self):
        super().__init__()
        # TODO: 添加卷积层(输入通道1 → 输出通道16)
        self.conv1 = nn.Conv2d(1, 16, 3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        # TODO: 再加一层卷积(16 → 32)
        self.conv2 = nn.Conv2d(16, 32, 3, padding=1)
        # 28x28 → 池化 → 14x14 → 池化 → 7x7
        self.fc1 = nn.Linear(32 * 7 * 7, 10)  # 输出10类

    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = self.pool(x)
        x = torch.relu(self.conv2(x))
        x = self.pool(x)
        x = x.view(x.size(0), -1)  # 展开
        return self.fc1(x)

model = CNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# ② 训练与测试
for epoch in range(5):
    model.train()
    total_loss = 0
    correct = 0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        preds = model(images)
        loss = criterion(preds, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        correct += (preds.argmax(1) == labels).sum().item()

    acc = correct / len(train_dataset)
    print(f"Epoch={epoch}, loss={total_loss:.3f}, train_acc={acc:.4f}")

# ③ 测试集准确率
model.eval()
correct = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        preds = model(images)
        correct += (preds.argmax(1) == labels).sum().item()

test_acc = correct / len(test_dataset)
print("Test Accuracy:", test_acc)

13.CNN 中间特征可视化 Feature Map

13.1 概念:什么是 Feature Map?

在卷积神经网络中:

做什么

Conv2d

提取局部特征(边缘、线条、形状…)

ReLU

激活重要特征(抑制没用的)

Pooling

压缩信息,使特征更抽象

当你输入一张 7
早期层会检测竖线、斜线
后期层会组合成完整数字轮廓

Feature Map 就是这些 提取出的特征

  • 在之前的cnn代码中加入这一块

from torch.utils.tensorboard import SummaryWriter
import torchvision

writer = SummaryWriter("runs/feature_map")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ===== Hook 机制:捕获中间层输出 =====
feature_maps = {}

def save_feature_map(name):
    def hook(module, input, output):
        feature_maps[name] = output.detach()
    return hook

# 注册 hook
model.conv1.register_forward_hook(save_feature_map("conv1"))
model.conv2.register_forward_hook(save_feature_map("conv2"))


# ===== 输入一张图片做推理 =====
model.eval()
example_img, label = test_dataset[0]  # 第一张图片
img = example_img.unsqueeze(0).to(device)

with torch.no_grad():
    _ = model(img)


# ===== 写入 TensorBoard =====
# conv1 feature maps
conv1_map = feature_maps["conv1"]
writer.add_image("input", example_img)

# 只显示前 8 个通道
for i in range(8):
    writer.add_image(f"conv1/{i}", conv1_map[0, i:i+1])

# conv2 feature maps
conv2_map = feature_maps["conv2"]
for i in range(min(16, conv2_map.shape[1])):
    writer.add_image(f"conv2/{i}", conv2_map[0, i:i+1])

writer.close()
print("🎉 特征图已记录!在 TensorBoard 中查看: tensorboard --logdir=runs/feature_map")
  • conv1 特征图的特点:

    • 提取基础几何特征

    • 包含局部边缘、直线、尖角

    • 每个通道只关注某种“形状”

    • 有些滤波器对该数字不敏感 → 输出全黑

    • 总结:conv1 是「特征提取的第一层」,负责发现最基础的视觉构成元素

  • conv2 特征图的特点:

    • 由 conv1 的边缘组合而成

    • 特征更高维结构化

    • 模型开始关注整体形状而不是局部线条

    • 模糊 ≠ 无用
      模糊说明模型在抽象信息

conv1 看局部conv2 看整体

层级

模型关注点

肉眼感受

说明

Conv1

原始边缘、线条细节

像“数字碎片”

视觉信息还保留

Conv2

组合成形状模式

抽象、模糊

信息被编码成特征而非图形

Conv1 提取浅层视觉特征(边缘/角点)
Conv2 将这些特征组合为更高阶的抽象模式
随着层数加深,模型逐渐不再保留人眼能识别的形状
而是专注于对分类最有区分力的结构信息

14.反向传播(Propagation)入门

14.1 什么是反向传播?

模型先猜 → 和真实答案对比 → 算出“错多少” → 调整参数 → 下次更准

这个找错 → 修正参数的过程
就是 反向传播(Backpropagation)

14.2 反向传播的流程(任何神经网络都遵循)

阶段

名称

作用

Step1

前向传播

得到模型预测结果

Step2

计算损失(Loss)

知道自己错多少

Step3

反向传播

误差逐层传回去

Step4

更新权重(参数)

调整卷积核、全连接层等

14.3 为什么越深的层越抽象?

因为误差在回传时:

对高层来说:直接影响最终预测
对低层来说:只能通过高层“间接”感受到影响

所以:

看到什么

对结果影响方式

conv1

边缘、线条

“底层打基础”

conv2+

结构、模糊形状

“越来越抽象的信息组合”

最终层

我要判断数字是什么

直接决定答案

14.4 总结

  • 错多少由 Loss 告诉我们

  • 怎么改由 梯度 Gradient 决定

  • 改多少由 学习率 Learning Rate 决定

  • 参数更新方向:反向 (从输出层回到输入层)

误差从输出层开始,沿着网络结构“反向”计算梯度,逐层传回前面的卷积核与权重,从而更新所有参数。

也就是说:

不是梯度本身属于某一层

而是:
模型先计算输出层的误差,然后通过链式法则把误差对每层参数的影响算出来
每一层都会得到一个自己的梯度
并进行更新

Loss 作用

判断错多少

梯度作用

指导参数怎么改

学习率作用

决定改多少

反向传播作用

把错误变成学习

为什么越练越聪明

参数朝正确方向更新

14.5 学习率过大 vs 过小

情况

会发生什么

专业解释

学习率太大

参数每次改太猛 → 跳过最佳点 → Loss 震荡甚至爆炸

导致训练不收敛(Divergence)

学习率太小

每次只改一丢丢 → 走得很慢 → 学习一辈子也到不了最好点

收敛速度极慢,甚至卡在次优解

15.梯度处理技巧

15.1 梯度消失 & 梯度爆炸

—— 深度学习三大难题之一

15.1.1 什么是梯度爆炸(Gradient Explosion)?

  • Loss 一直变大、甚至变成 NaN

  • 准确率突然变得莫名其妙

  • 参数权重巨大、模型崩溃

原因(简单理解):

层层反向传播时,不断累乘 > 1 的数 → 越乘越大!

解决方案:

方法

说明

Gradient Clipping

限制梯度最大值

更好的初始化方式

Xavier、Kaiming init

使用 BN、LN

让数值保持稳定

降低学习率

小步慢走

15.1.2 什么是梯度消失(Gradient Vanishing)?

  • 前面几层完全学不到

  • Loss 不下降

  • 准确率卡住不动

原因:

反向传播时,不断累乘 < 1 的数 → 越乘越小 → 消失!

尤其使用 Sigmoid / Tanh 时非常严重

解决方案:

方法

说明

使用 ReLU 激活

梯度为常数,不会缩小

ResNet 的 残差连接

防止梯度断掉

BN 层缓解分布变化

防止梯度变小

适当的初始化权重

Kaiming init 专治 ReLU

15.1.3 为什么 CNN 就比全连接更抗梯度问题?

因为 CNN 的参数远少于 MLP
梯度传播链更短、不易累积误差
而 ResNet 等结构直接提供 “捷径路径”

Sigmoid 的梯度最大只有 0.25(远小于 1)
在反向传播中不断相乘 → 越来越小 → 消失

所以深层网络里基本很少再用它做隐藏层激活。

15.2 梯度裁剪

def gradient_clipping_demo():
    """演示梯度裁剪的效果"""
    # 创建一个容易产生梯度爆炸的网络
    class UnstableNetwork(nn.Module):
        def __init__(self):
            super().__init__()
            self.fc1 = nn.Linear(10, 100)
            self.fc2 = nn.Linear(100, 100)
            self.fc3 = nn.Linear(100, 1)
            
        def forward(self, x):
            x = torch.sigmoid(self.fc1(x)) * 10  # 故意放大激活
            x = torch.sigmoid(self.fc2(x)) * 10
            x = self.fc3(x)
            return x
    
    # 生成数据
    torch.manual_seed(42)
    X = torch.randn(32, 10)
    y = torch.randn(32, 1)
    
    # 训练配置
    models = {
        'Without Clipping': UnstableNetwork(),
        'With Clipping (norm=1.0)': UnstableNetwork(),
        'With Clipping (norm=0.1)': UnstableNetwork(),
    }
    
    results = {}
    
    for name, model in models.items():
        print(f"\n训练: {name}")
        
        optimizer = optim.SGD(model.parameters(), lr=0.01)
        criterion = nn.MSELoss()
        
        losses = []
        grad_norms = []
        
        for step in range(100):
            optimizer.zero_grad()
            
            predictions = model(X)
            loss = criterion(predictions, y)
            loss.backward()
            
            # 计算梯度范数
            total_norm = 0
            for p in model.parameters():
                if p.grad is not None:
                    param_norm = p.grad.data.norm(2)
                    total_norm += param_norm.item() ** 2
            total_norm = total_norm ** 0.5
            grad_norms.append(total_norm)
            
            # 应用梯度裁剪
            if 'Clipping' in name:
                max_norm = 1.0 if '1.0' in name else 0.1
                torch.nn.utils.clip_grad_norm_(
                    model.parameters(), max_norm=max_norm
                )
            
            optimizer.step()
            losses.append(loss.item())
            
            if step % 20 == 0:
                print(f"  Step {step}: Loss = {loss.item():.4f}, "
                      f"Grad Norm = {total_norm:.4f}")
        
        results[name] = {
            'losses': losses,
            'grad_norms': grad_norms
        }
    
    # 可视化
    fig, axes = plt.subplots(1, 2, figsize=(15, 5))
    
    # 损失曲线
    for name, data in results.items():
        axes[0].plot(data['losses'], label=name)
    axes[0].set_xlabel('Training Step')
    axes[0].set_ylabel('Loss')
    axes[0].set_title('Training Loss')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)
    
    # 梯度范数
    for name, data in results.items():
        axes[1].plot(data['grad_norms'], label=name)
    axes[1].set_xlabel('Training Step')
    axes[1].set_ylabel('Gradient Norm')
    axes[1].set_title('Gradient Norms')
    axes[1].legend()
    axes[1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

gradient_clipping_demo()

15.3 梯度累积

def gradient_accumulation_demo():
    """演示梯度累积技术(用于小批量训练)"""
    # 模拟一个大模型,只能用小batch训练
    class LargeModel(nn.Module):
        def __init__(self):
            super().__init__()
            self.fc = nn.Sequential(
                *[nn.Linear(100, 100) for _ in range(10)]
            )
            
        def forward(self, x):
            return self.fc(x)
    
    # 生成数据
    torch.manual_seed(42)
    dataset_size = 1000
    X = torch.randn(dataset_size, 100)
    y = torch.randn(dataset_size, 100)
    
    # 配置
    batch_size = 8  # 小batch(模拟内存限制)
    accumulation_steps = 4  # 累积4步,等效batch_size=32
    effective_batch_size = batch_size * accumulation_steps
    
    model = LargeModel()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.MSELoss()
    
    print(f"实际batch size: {batch_size}")
    print(f"累积步数: {accumulation_steps}")
    print(f"等效batch size: {effective_batch_size}")
    
    # 训练循环
    losses = []
    
    for epoch in range(3):
        model.train()
        optimizer.zero_grad()
        
        epoch_loss = 0
        accumulation_count = 0
        
        for i in range(0, dataset_size, batch_size):
            # 获取小批量
            end_idx = min(i + batch_size, dataset_size)
            batch_X = X[i:end_idx]
            batch_y = y[i:end_idx]
            
            # 前向传播
            predictions = model(batch_X)
            loss = criterion(predictions, batch_y)
            loss = loss / accumulation_steps  # 缩放损失
            
            # 反向传播(累积梯度)
            loss.backward()
            epoch_loss += loss.item() * accumulation_steps  # 恢复原始损失值
            
            accumulation_count += 1
            
            # 达到累积步数时更新参数
            if accumulation_count % accumulation_steps == 0:
                optimizer.step()
                optimizer.zero_grad()
                
                # 记录损失
                current_loss = epoch_loss / accumulation_count
                losses.append(current_loss)
                
                print(f"Epoch {epoch+1}, Batch {i//batch_size + 1}: "
                      f"Loss = {current_loss:.4f}")
        
        # 处理剩余梯度
        if accumulation_count % accumulation_steps != 0:
            optimizer.step()
            optimizer.zero_grad()
    
    # 可视化
    plt.figure(figsize=(10, 6))
    plt.plot(losses)
    plt.xlabel('Update Step')
    plt.ylabel('Loss')
    plt.title('Training with Gradient Accumulation')
    plt.grid(True, alpha=0.3)
    plt.show()

gradient_accumulation_demo()

16.优化器全解析 —— 让模型更快更稳地学习

16.1 SGD(Stochastic Gradient Descent)

只按照当前梯度方向更新

优点:

  • 简单

  • 不容易过拟合

缺点:

  • 走走停停、容易在山区来回震荡

  • 在深坑附近非常不稳定

16.2 SGD + Momentum(动量)

像滚动的球 → 越滚越快

加入惯性帮助冲出局部最小值:

  • 方向更稳定

  • 抖动更少

  • 学得更快

16.3 Adam(最常用的优化器)

Adam = Momentum + RMSProp
既考虑方向,又考虑每个参数学习速度

特点:

  • 自动调学习率

  • 收敛快

  • 对训练数据不敏感

非常适合大模型、大数据训练
几乎所有深度学习项目都默认用它

缺点:

  • 有时最终收敛不如 SGD 稳

  • 容易“记忆过多”,导致泛化差一点

优化器

优点

缺点

适用场景

SGD

稳定、泛化好

慢、震荡大

分类任务最终微调

SGD + Momentum

更快速稳定

仍需调 LR

中等深度网络

Adam

快、自动调参

泛化略差

大模型、CNN、NLP

一句话记忆:

想快就用 Adam
想准就用 SGD(后期微调)

16.4 常见优化器对比

import torch
import torch.optim as optim
import matplotlib.pyplot as plt
import numpy as np

def compare_optimizers():
    """比较不同优化器在简单函数上的表现"""
    def test_function(x):
        return x**4 - 3*x**3 + 2
    
    # 不同优化器配置
    optimizers_config = {
        'SGD': optim.SGD,
        'SGD+Momentum': lambda params: optim.SGD(params, lr=0.01, momentum=0.9),
        'Adam': optim.Adam,
        'AdamW': optim.AdamW,  # 带权重衰减的Adam
        'RMSprop': optim.RMSprop,
        'Adagrad': optim.Adagrad,
        'Adadelta': optim.Adadelta,
    }
    
    # 测试每个优化器
    results = {}
    for name, opt_class in optimizers_config.items():
        torch.manual_seed(42)
        x = torch.tensor([5.0], requires_grad=True)  # 初始点
        
        if 'AdamW' in name:
            optimizer = opt_class([x], lr=0.01, weight_decay=0.01)
        else:
            optimizer = opt_class([x], lr=0.01)
        
        path = [x.item()]
        
        for i in range(100):
            optimizer.zero_grad()
            loss = test_function(x)
            loss.backward()
            optimizer.step()
            path.append(x.item())
            
            if abs(grad := x.grad.item()) < 1e-5:  # 提前停止
                break
        
        results[name] = {
            'path': path,
            'final_value': x.item(),
            'iterations': len(path)-1
        }
    
    # 可视化
    fig, axes = plt.subplots(1, 2, figsize=(15, 5))
    
    # 绘制函数曲线和优化路径
    x_vals = np.linspace(-2, 6, 400)
    y_vals = test_function(torch.tensor(x_vals)).numpy()
    
    axes[0].plot(x_vals, y_vals, 'k-', alpha=0.3, label='Function')
    colors = plt.cm.Set1(np.linspace(0, 1, len(results)))
    
    for (name, data), color in zip(results.items(), colors):
        path = data['path']
        axes[0].plot(path, [test_function(torch.tensor(p)).item() for p in path], 
                    'o-', label=name, color=color, markersize=4)
        axes[0].scatter(path[-1], test_function(torch.tensor(path[-1])).item(), 
                       color=color, s=100, zorder=5)
    
    axes[0].set_xlabel('x')
    axes[0].set_ylabel('f(x)')
    axes[0].set_title('Optimizer Paths')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)
    
    # 绘制收敛速度
    axes[1].bar(results.keys(), [data['iterations'] for data in results.values()])
    axes[1].set_xlabel('Optimizer')
    axes[1].set_ylabel('Iterations to converge')
    axes[1].set_title('Convergence Speed')
    axes[1].tick_params(axis='x', rotation=45)
    
    plt.tight_layout()
    plt.show()
    
    # 打印结果
    print("优化器性能对比:")
    for name, data in results.items():
        print(f"{name:15} -> 最终值: {data['final_value']:.6f}, 迭代次数: {data['iterations']}")
    
    return results

# 运行比较
compare_optimizers()

16.5 优化器选择指南

def optimizer_selection_guide():
    """优化器选择指南"""
    print("""
    ===============================
        优化器选择指南
    ===============================
    
    1. SGD (随机梯度下降)
       优点: 简单、可解释性强、通常能找到更泛化的最小值
       缺点: 收敛慢、需要仔细调参
       适用: 凸优化问题、需要更好泛化的任务
       
       推荐参数:
           lr: 0.01-0.1
           momentum: 0.9
           nesterov: True (使用Nesterov动量)
    
    2. SGD+Momentum
       优点: 加速收敛、减少震荡
       缺点: 增加一个超参数
       适用: 大多数深度学习任务
       
       推荐参数:
           lr: 0.01
           momentum: 0.9
    
    3. Adam (自适应矩估计)
       优点: 自适应学习率、通常收敛快
       缺点: 可能不如SGD泛化好、内存占用稍大
       适用: 大多数深度学习任务、推荐作为默认选择
       
       推荐参数:
           lr: 0.001
           betas: (0.9, 0.999)
           eps: 1e-8
    
    4. AdamW (Adam with decoupled weight decay)
       优点: 更好的权重衰减实现、通常比Adam更好
       缺点: 需要调weight_decay参数
       适用: 需要权重正则化的任务、Transformer等
       
       推荐参数:
           lr: 0.001
           weight_decay: 0.01
    
    5. RMSprop
       优点: 处理非平稳目标好、适合RNN
       缺点: 学习率可能太小
       适用: 循环神经网络、非平稳目标
       
       推荐参数:
           lr: 0.001
           alpha: 0.99
    
    6. Adagrad
       优点: 适合稀疏数据
       缺点: 学习率单调下降可能过早停止学习
       适用: 自然语言处理、推荐系统
       
       推荐参数:
           lr: 0.01
    
    选择策略:
    1. 从Adam或AdamW开始
    2. 如果泛化不好,尝试SGD+Momentum
    3. 对于RNN,尝试RMSprop
    4. 对于稀疏数据,尝试Adagrad
    """)

# 调用指南
optimizer_selection_guide()

17.学习率调度器(Learning Rate Scheduler)

学习率不是固定的,越训练越应该变小

为什么?
因为初期要 ,后期要

  • 开始:大步探索 → 找到正确方向

  • 后期:小步微调 → 逼近最优点

就像
找路先大步走 → 确定方向 → 靠近目标时放慢脚步

17.1 最常用调度器三种

调度器

作用

示例

StepLR

每隔几轮就缩小一次 LR

常用于 CNN

MultiStepLR

在几个关键节点降低 LR

微调阶段精准控制

CosineAnnealingLR

平滑下降,最常用!

NLP、CV 大模型

optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.5)

for epoch in range(100):
    train(...)
    scheduler.step()   # 非常重要!!

# 每 20 个 epoch,学习率 × 0.5 (减半)

17.2 常用学习率调度器

class LearningRateSchedulerDemo:
    def __init__(self, initial_lr=0.1, total_epochs=100):
        self.initial_lr = initial_lr
        self.total_epochs = total_epochs
        self.model = torch.nn.Linear(10, 1)
        
    def demo_all_schedulers(self):
        """演示所有学习率调度器"""
        schedulers = {
            'StepLR': optim.lr_scheduler.StepLR,
            'MultiStepLR': optim.lr_scheduler.MultiStepLR,
            'ExponentialLR': optim.lr_scheduler.ExponentialLR,
            'CosineAnnealingLR': optim.lr_scheduler.CosineAnnealingLR,
            'ReduceLROnPlateau': optim.lr_scheduler.ReduceLROnPlateau,
            'CosineAnnealingWarmRestarts': optim.lr_scheduler.CosineAnnealingWarmRestarts,
            'CyclicLR': optim.lr_scheduler.CyclicLR,
            'OneCycleLR': optim.lr_scheduler.OneCycleLR,
        }
        
        # 收集各调度器的学习率变化
        lr_history = {}
        
        for name, scheduler_class in schedulers.items():
            optimizer = optim.SGD(self.model.parameters(), lr=self.initial_lr)
            
            if name == 'StepLR':
                scheduler = scheduler_class(optimizer, step_size=30, gamma=0.1)
            elif name == 'MultiStepLR':
                scheduler = scheduler_class(optimizer, milestones=[30, 60, 90], gamma=0.1)
            elif name == 'ExponentialLR':
                scheduler = scheduler_class(optimizer, gamma=0.95)
            elif name == 'CosineAnnealingLR':
                scheduler = scheduler_class(optimizer, T_max=self.total_epochs)
            elif name == 'ReduceLROnPlateau':
                scheduler = scheduler_class(optimizer, mode='min', factor=0.1, patience=10)
            elif name == 'CosineAnnealingWarmRestarts':
                scheduler = scheduler_class(optimizer, T_0=20, T_mult=2)
            elif name == 'CyclicLR':
                scheduler = scheduler_class(optimizer, base_lr=0.001, max_lr=0.1, 
                                           step_size_up=20, mode='triangular')
            elif name == 'OneCycleLR':
                scheduler = scheduler_class(optimizer, max_lr=0.1, 
                                           total_steps=self.total_epochs)
            else:
                continue
            
            # 模拟训练过程
            lrs = []
            for epoch in range(self.total_epochs):
                lrs.append(optimizer.param_groups[0]['lr'])
                
                # 模拟训练步骤
                optimizer.zero_grad()
                loss = torch.randn(1)  # 模拟损失
                loss.backward()
                optimizer.step()
                
                if name == 'ReduceLROnPlateau':
                    scheduler.step(loss.item())
                else:
                    scheduler.step()
            
            lr_history[name] = lrs
        
        # 可视化
        fig, axes = plt.subplots(3, 3, figsize=(15, 12))
        axes = axes.ravel()
        
        for idx, (name, lrs) in enumerate(list(lr_history.items())[:9]):
            ax = axes[idx]
            ax.plot(lrs, linewidth=2)
            ax.set_title(name)
            ax.set_xlabel('Epoch')
            ax.set_ylabel('Learning Rate')
            ax.grid(True, alpha=0.3)
            ax.set_ylim(0, self.initial_lr * 1.1)
        
        plt.tight_layout()
        plt.show()
        
        return lr_history

# 演示调度器
demo = LearningRateSchedulerDemo(initial_lr=0.1, total_epochs=100)
lr_histories = demo.demo_all_schedulers()

17.3 自定义学习率调度器

class WarmupCosineAnnealingLR(optim.lr_scheduler._LRScheduler):
    """带热身的余弦退火学习率调度器"""
    def __init__(self, optimizer, warmup_epochs, total_epochs, 
                 min_lr=1e-6, last_epoch=-1):
        self.warmup_epochs = warmup_epochs
        self.total_epochs = total_epochs
        self.min_lr = min_lr
        super().__init__(optimizer, last_epoch)
    
    def get_lr(self):
        if self.last_epoch < self.warmup_epochs:
            # 线性热身
            alpha = self.last_epoch / self.warmup_epochs
            return [base_lr * alpha for base_lr in self.base_lrs]
        else:
            # 余弦退火
            progress = (self.last_epoch - self.warmup_epochs) / \
                      (self.total_epochs - self.warmup_epochs)
            cosine_decay = 0.5 * (1 + np.cos(np.pi * progress))
            return [self.min_lr + (base_lr - self.min_lr) * cosine_decay 
                   for base_lr in self.base_lrs]

class PolynomialDecayLR(optim.lr_scheduler._LRScheduler):
    """多项式衰减学习率调度器"""
    def __init__(self, optimizer, total_epochs, power=0.9, 
                 min_lr=1e-6, last_epoch=-1):
        self.total_epochs = total_epochs
        self.power = power
        self.min_lr = min_lr
        super().__init__(optimizer, last_epoch)
    
    def get_lr(self):
        progress = min(self.last_epoch / self.total_epochs, 1.0)
        decay = (1 - progress) ** self.power
        return [self.min_lr + (base_lr - self.min_lr) * decay 
               for base_lr in self.base_lrs]

# 测试自定义调度器
def test_custom_schedulers():
    model = torch.nn.Linear(10, 1)
    optimizer = optim.SGD(model.parameters(), lr=0.1)
    
    # 创建调度器
    warmup_cosine = WarmupCosineAnnealingLR(
        optimizer, warmup_epochs=10, total_epochs=100, min_lr=1e-4
    )
    
    poly_decay = PolynomialDecayLR(
        optimizer, total_epochs=100, power=0.9, min_lr=1e-4
    )
    
    # 收集学习率
    lr_history = {'WarmupCosine': [], 'PolynomialDecay': []}
    
    for epoch in range(100):
        lr_history['WarmupCosine'].append(
            warmup_cosine.get_last_lr()[0]
        )
        lr_history['PolynomialDecay'].append(
            poly_decay.get_last_lr()[0]
        )
        
        # 模拟训练步骤
        optimizer.step()
        warmup_cosine.step()
        
        # 为多项式衰减创建新的优化器
        if epoch == 0:
            optimizer2 = optim.SGD(model.parameters(), lr=0.1)
            poly_decay = PolynomialDecayLR(
                optimizer2, total_epochs=100, power=0.9, min_lr=1e-4
            )
        poly_decay.step()
    
    # 可视化
    plt.figure(figsize=(10, 6))
    for name, lrs in lr_history.items():
        plt.plot(lrs, label=name, linewidth=2)
    
    plt.xlabel('Epoch')
    plt.ylabel('Learning Rate')
    plt.title('Custom Learning Rate Schedulers')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()

test_custom_schedulers()

18.过拟合 vs 欠拟合(深度学习最常考概念)

名称

表现

原因

解决方向

欠拟合 Underfitting

训练集都学不好

模型太简单、训练太少

增大模型、训练更久、降低正则

过拟合 Overfitting

训练集准确但测试集崩盘

记住了训练数据

加正则化、数据增强、加 Dropout

18.1 防止过拟合的常见手段

例子:

Epoch

Train Acc

Test Acc

1

90%

88%

5

98%

85%

10

100%

70%

训练准确上升,测试准确下降

  • 防止过拟合的常见手段

方法

解释

Dropout

随机关闭部分神经元,让模型不依赖某些特征

L2 正则化 weight decay

抑制权重过大,逼模型更通用

数据增强(Data Augmentation)

给模型更多“不同版本”的数据

早停 Early Stopping

测试效果下降就停止训练

减少模型复杂度

减少层数或神经元

class MLP(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(784, 256)
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(256, 10)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.dropout(x)  # 训练时随机关闭神经元
        return self.fc2(x)

18.2 正则化(权重衰减)

  • 和 Dropout 不同的数学理念

  • 更适合 CNN & Transformer 大模型

18.2.1 L1/L2正则化(权重衰减)

def compare_regularization():
    """比较不同正则化方法"""
    import torch.nn as nn
    
    # 创建一个简单的过拟合场景
    class SimpleModel(nn.Module):
        def __init__(self, input_size=20, hidden_size=100, output_size=1):
            super().__init__()
            self.fc1 = nn.Linear(input_size, hidden_size)
            self.fc2 = nn.Linear(hidden_size, hidden_size)
            self.fc3 = nn.Linear(hidden_size, hidden_size)
            self.fc4 = nn.Linear(hidden_size, output_size)
            
        def forward(self, x):
            x = torch.relu(self.fc1(x))
            x = torch.relu(self.fc2(x))
            x = torch.relu(self.fc3(x))
            x = self.fc4(x)
            return x
    
    # 生成数据
    torch.manual_seed(42)
    n_samples = 100
    n_features = 20
    
    X_train = torch.randn(n_samples, n_features)
    # 创建一个简单的线性关系,加上一些噪声
    true_weights = torch.randn(n_features, 1)
    y_train = X_train @ true_weights + 0.1 * torch.randn(n_samples, 1)
    
    X_test = torch.randn(50, n_features)
    y_test = X_test @ true_weights + 0.1 * torch.randn(50, 1)
    
    # 训练配置
    epochs = 200
    learning_rate = 0.01
    
    # 不同正则化策略
    strategies = {
        'No Regularization': {'weight_decay': 0},
        'L2 Regularization': {'weight_decay': 0.01},
        'L1 Regularization': {'weight_decay': 0},  # 需要手动实现
    }
    
    results = {}
    
    for strategy_name, config in strategies.items():
        print(f"\n训练策略: {strategy_name}")
        
        model = SimpleModel(input_size=n_features)
        optimizer = optim.SGD(model.parameters(), lr=learning_rate, 
                             weight_decay=config['weight_decay'])
        criterion = nn.MSELoss()
        
        train_losses = []
        test_losses = []
        weight_norms = []
        
        for epoch in range(epochs):
            # 训练
            model.train()
            optimizer.zero_grad()
            
            predictions = model(X_train)
            loss = criterion(predictions, y_train)
            
            # 手动添加L1正则化
            if strategy_name == 'L1 Regularization':
                l1_lambda = 0.001
                l1_norm = sum(p.abs().sum() for p in model.parameters())
                loss = loss + l1_lambda * l1_norm
            
            loss.backward()
            optimizer.step()
            
            train_losses.append(loss.item())
            
            # 测试
            model.eval()
            with torch.no_grad():
                test_predictions = model(X_test)
                test_loss = criterion(test_predictions, y_test)
                test_losses.append(test_loss.item())
                
                # 计算权重范数
                total_norm = sum(p.norm().item() for p in model.parameters())
                weight_norms.append(total_norm)
            
            if epoch % 40 == 0:
                print(f"  Epoch {epoch}: Train Loss = {loss.item():.4f}, "
                      f"Test Loss = {test_loss.item():.4f}")
        
        results[strategy_name] = {
            'train_losses': train_losses,
            'test_losses': test_losses,
            'weight_norms': weight_norms,
            'final_train_loss': train_losses[-1],
            'final_test_loss': test_losses[-1],
            'final_weight_norm': weight_norms[-1]
        }
    
    # 可视化
    fig, axes = plt.subplots(1, 3, figsize=(15, 5))
    
    # 训练损失
    for strategy_name, data in results.items():
        axes[0].plot(data['train_losses'], label=strategy_name)
    axes[0].set_xlabel('Epoch')
    axes[0].set_ylabel('Training Loss')
    axes[0].set_title('Training Loss Comparison')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)
    
    # 测试损失
    for strategy_name, data in results.items():
        axes[1].plot(data['test_losses'], label=strategy_name)
    axes[1].set_xlabel('Epoch')
    axes[1].set_ylabel('Test Loss')
    axes[1].set_title('Test Loss Comparison')
    axes[1].legend()
    axes[1].grid(True, alpha=0.3)
    
    # 权重范数
    for strategy_name, data in results.items():
        axes[2].plot(data['weight_norms'], label=strategy_name)
    axes[2].set_xlabel('Epoch')
    axes[2].set_ylabel('Weight Norm')
    axes[2].set_title('Weight Norm Comparison')
    axes[2].legend()
    axes[2].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    # 打印结果
    print("\n正则化效果总结:")
    for strategy_name, data in results.items():
        print(f"{strategy_name:25} -> "
              f"最终训练损失: {data['final_train_loss']:.4f}, "
              f"最终测试损失: {data['final_test_loss']:.4f}, "
              f"权重范数: {data['final_weight_norm']:.2f}")
    
    return results

compare_regularization()

18.2.2 Dropout与BatchNorm

def dropout_batchnorm_comparison():
    """比较Dropout和BatchNorm的效果"""
    import torch.nn as nn
    
    class ModelWithDropout(nn.Module):
        def __init__(self, dropout_rate=0.5):
            super().__init__()
            self.fc1 = nn.Linear(100, 200)
            self.dropout1 = nn.Dropout(dropout_rate)
            self.fc2 = nn.Linear(200, 200)
            self.dropout2 = nn.Dropout(dropout_rate)
            self.fc3 = nn.Linear(200, 1)
            
        def forward(self, x):
            x = torch.relu(self.fc1(x))
            x = self.dropout1(x)
            x = torch.relu(self.fc2(x))
            x = self.dropout2(x)
            x = self.fc3(x)
            return x
    
    class ModelWithBatchNorm(nn.Module):
        def __init__(self):
            super().__init__()
            self.fc1 = nn.Linear(100, 200)
            self.bn1 = nn.BatchNorm1d(200)
            self.fc2 = nn.Linear(200, 200)
            self.bn2 = nn.BatchNorm1d(200)
            self.fc3 = nn.Linear(200, 1)
            
        def forward(self, x):
            x = torch.relu(self.bn1(self.fc1(x)))
            x = torch.relu(self.bn2(self.fc2(x)))
            x = self.fc3(x)
            return x
    
    class ModelWithBoth(nn.Module):
        def __init__(self, dropout_rate=0.3):
            super().__init__()
            self.fc1 = nn.Linear(100, 200)
            self.bn1 = nn.BatchNorm1d(200)
            self.dropout1 = nn.Dropout(dropout_rate)
            self.fc2 = nn.Linear(200, 200)
            self.bn2 = nn.BatchNorm1d(200)
            self.dropout2 = nn.Dropout(dropout_rate)
            self.fc3 = nn.Linear(200, 1)
            
        def forward(self, x):
            x = torch.relu(self.bn1(self.fc1(x)))
            x = self.dropout1(x)
            x = torch.relu(self.bn2(self.fc2(x)))
            x = self.dropout2(x)
            x = self.fc3(x)
            return x
    
    # 生成数据
    torch.manual_seed(42)
    X = torch.randn(1000, 100)
    y = torch.randn(1000, 1)
    
    # 训练配置
    models = {
        'No Regularization': nn.Sequential(
            nn.Linear(100, 200), nn.ReLU(),
            nn.Linear(200, 200), nn.ReLU(),
            nn.Linear(200, 1)
        ),
        'With Dropout': ModelWithDropout(dropout_rate=0.5),
        'With BatchNorm': ModelWithBatchNorm(),
        'With Both': ModelWithBoth(dropout_rate=0.3),
    }
    
    results = {}
    
    for name, model in models.items():
        print(f"\n训练模型: {name}")
        
        optimizer = optim.Adam(model.parameters(), lr=0.001)
        criterion = nn.MSELoss()
        
        train_losses = []
        
        for epoch in range(100):
            model.train()
            optimizer.zero_grad()
            
            predictions = model(X)
            loss = criterion(predictions, y)
            loss.backward()
            optimizer.step()
            
            train_losses.append(loss.item())
            
            if epoch % 20 == 0:
                print(f"  Epoch {epoch}: Loss = {loss.item():.4f}")
        
        results[name] = train_losses
    
    # 可视化
    plt.figure(figsize=(10, 6))
    for name, losses in results.items():
        plt.plot(losses, label=name, linewidth=2)
    
    plt.xlabel('Epoch')
    plt.ylabel('Training Loss')
    plt.title('Effect of Dropout and BatchNorm')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()
    
    print("\n技巧总结:")
    print("1. Dropout: 防止过拟合,训练时随机丢弃神经元")
    print("2. BatchNorm: 稳定训练,加速收敛,减少对初始化的敏感度")
    print("3. 组合使用: Dropout在BatchNorm后使用效果更好")

dropout_batchnorm_comparison()

Dropout :训练时随机关闭一部分神经元,让模型不要死记硬背某些特征

  • Dropout 的作用

效果

原因

防止过拟合

不让模型依赖少数特征

强迫学习更鲁棒的特征

每次训练网络结构都不同

提升泛化能力

测试时不会关闭(更强)

class CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 8, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(8, 16, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.dropout = nn.Dropout(0.5)  # ← 加入 Dropout
        self.fc = nn.Linear(16 * 7 * 7, 10)

    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = self.pool(torch.relu(self.conv2(x)))
        x = x.view(x.size(0), -1)
        x = self.dropout(x)  # ← 在全连接层前随机屏蔽神经元
        return self.fc(x)
  • Dropout 只在训练模式下生效

    • model.train() → 开启 dropout

    • model.eval() → 关闭 dropout

18.3

19.高级训练技巧

后续再补上,先去看ts

主题

已掌握

还需要补充

为什么非常重要

Numpy

❌ 广播规则更详细、视图 vs 拷贝、axis深度理解

Transformer中大量矩阵操作依赖它

PyTorch Tensor 基础

❌ inplace运算的风险、数据类型(float16/bfloat16)

训练显存优化必备

Autograd

❌ retain_graph、梯度裁剪、复杂网络梯度可视化

防梯度爆炸/消失

Optimizer

✔ (SGD/Adam)

❌ 学习率调度器(Scheduler)

加速训练收敛

Loss

基本

❌ CrossEntropy正确使用方法、KLDivLoss

训练语言模型会用到

nn.Module

❌ weight initialization 权重初始化策略

模型训练能否稳定取决于它

Dataset + Loader

❌ 多进程加载(num_workers)、pin_memory、collate_fn

NLP 数据加载会依赖这些

正/测试模式切换

❌ model.eval() + no_grad() 在推理必须同时使用

性能和输出稳定性依赖它

可视化

❌ 训练中实时查看曲线、保存训练对比

微调模型调参必用

任务类型

❌ 多分类、语言任务(对于你目标最关键)

人格AI属于语言模型