一、核心逻辑
先明确整体训练框架:数据准备→模型 / 优化器 / 损失函数初始化→多轮次训练(训练 + 验证)→保存最优模型→可视化,以下按函数拆分讲解。
(一)数据处理函数 train_val_data_process
def train_val_data_process():
train_data = FashionMNIST(root='./data',
train=True,
transform=transforms.Compose([transforms.Resize(size=28), transforms.ToTensor()]),
download=True)
# 划分训练/验证集(8:2)
train_data, val_data = Data.random_split(train_data, [round(0.8*len(train_data)), round(0.2*len(train_data))])
# 构建训练集DataLoader
train_dataloader = Data.DataLoader(dataset=train_data,
batch_size=32,
shuffle=True,
num_workers=2)
# 构建验证集DataLoader
val_dataloader = Data.DataLoader(dataset=val_data,
batch_size=32,
shuffle=True,
num_workers=2)
return train_dataloader, val_dataloader逻辑:
- 加载 FashionMNIST 训练集:transform将图片转为模型可接受的 Tensor 格式,download=True自动下载数据集;
- 拆分训练 / 验证集:避免用测试集验证,保证评估客观性;
- 封装 DataLoader:核心是batch_size=32(批次加载)、shuffle=True(打乱数据)、num_workers=2(多线程加速)。
(二)核心训练函数 train_model_process
1. 初始化核心组件
def train_model_process(model, train_dataloader, val_dataloader, num_epochs):
# 自动选择GPU/CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Adam优化器(自适应学习率,更新模型参数)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# 分类损失函数:交叉熵
criterion = nn.CrossEntropyLoss()
# 模型迁移到指定设备(GPU/CPU)
model = model.to(device)
# 保存最优模型参数(避免过拟合覆盖)
best_model_wts = copy.deepcopy(model.state_dict())关键:所有计算(模型 / 数据 / 损失)必须在同一设备,否则报错。
2. 训练过程参数初始化
best_acc = 0.0 # 最优验证准确率
train_loss_all = [] # 训练损失记录
val_loss_all = [] # 验证损失记录
train_acc_all = [] # 训练准确率记录
val_acc_all = [] # 验证准确率记录
since = time.time() # 训练开始时间(统计耗时)逻辑:每轮 epoch 重置累加变量,避免跨轮污染;记录指标用于后续可视化。
3. 多轮次(epoch)训练循环
for epoch in range(num_epochs):
print("Epoch {}/{}".format(epoch, num_epochs-1))
print("-"*10)
# 每轮重置累加变量
train_loss = 0.0
train_corrects = 0
val_loss = 0.0
val_corrects = 0
train_num = 0
val_num = 0epoch:每轮完整遍历一次训练集 + 验证集,是训练的基本单位。
4. 训练阶段(核心:前向→反向→参数更新)
for step, (b_x, b_y) in enumerate(train_dataloader):
# 数据迁移到设备(和模型同设备)
b_x = b_x.to(device)
b_y = b_y.to(device)
# 模型设为训练模式(启用Dropout/BatchNorm训练行为)
model.train()
# 前向传播:模型输出原始得分(logits,未做softmax)
output = model(b_x)
# 预测类别:取logits最大值对应的索引
pre_lab = torch.argmax(output, dim=1)
# 计算批次损失(交叉熵内置softmax,无需手动加)
loss = criterion(output, b_y)
# 梯度清零(PyTorch梯度累加,必须手动清空)
optimizer.zero_grad()
# 反向传播:计算参数梯度
loss.backward()
# 参数更新:Adam根据梯度调整权重
optimizer.step()
# 累加训练损失(批次平均损失×批次大小=总损失)
train_loss += loss.item() * b_x.size(0)
# 累加正确数(预测=真实标签)
train_corrects += torch.sum(pre_lab == b_y.data)
# 累加训练样本数
train_num += b_x.size(0)核心闭环:前向传播(算预测)→ 损失计算(衡量误差)→ 反向传播(算梯度)→ 参数更新(降低误差);
model.train():启用 Dropout(随机失活神经元)、BatchNorm(更新均值 / 方差),仅训练阶段生效。
5. 验证阶段(仅前向传播,无参数更新)
for step, (b_x, b_y) in enumerate(val_dataloader):
b_x = b_x.to(device)
b_y = b_y.to(device)
# 模型设为评估模式(关闭Dropout,固定BatchNorm)
model.eval()
output = model(b_x)
pre_lab = torch.argmax(output, dim=1)
loss = criterion(output, b_y)
# 仅累加指标,无反向传播/参数更新
val_loss += loss.item() * b_x.size(0)
val_corrects += torch.sum(pre_lab == b_y.data)
val_num += b_x.size(0)model.eval():避免验证阶段的随机性,保证评估结果稳定;
无优化器操作:验证仅评估模型性能,不更新参数。
6. 每轮指标计算与最优模型保存
# 计算每轮平均损失/准确率
train_loss_all.append(train_loss / train_num)
train_acc_all.append(train_corrects.double().item() / train_num)
val_loss_all.append(val_loss / val_num)
val_acc_all.append(val_corrects.double().item() / val_num)
# 打印轮次指标
print("{} train loss:{:.4f} train acc: {:.4f}".format(epoch, train_loss_all[-1], train_acc_all[-1]))
print("{} val loss:{:.4f} val acc: {:.4f}".format(epoch, val_loss_all[-1], val_acc_all[-1]))
# 保存最优模型(按验证准确率)
if val_acc_all[-1] > best_acc:
best_acc = val_acc_all[-1]
best_model_wts = copy.deepcopy(model.state_dict())
# 统计耗时
time_use = time.time() - since
print("训练和验证耗费的时间{:.0f}m{:.0f}s".format(time_use//60, time_use%60))最优模型保存:用copy.deepcopy避免浅拷贝导致参数覆盖;
耗时统计:评估训练效率,调整超参数(如 batch_size、epoch 数)。
7. 训练完成后保存模型 + 返回指标
# 保存最优模型参数
torch.save(best_model_wts, "C:/Users/86159/Desktop/LeNet/best_model.pth")
# 封装训练过程为DataFrame(用于可视化)
train_process = pd.DataFrame(data={"epoch":range(num_epochs),
"train_loss_all":train_loss_all,
"val_loss_all":val_loss_all,
"train_acc_all":train_acc_all,
"val_acc_all":val_acc_all,})
return train_process(三)可视化函数 matplot_acc_loss
def matplot_acc_loss(train_process):
plt.figure(figsize=(12, 4))
# 损失曲线
plt.subplot(1, 2, 1)
plt.plot(train_process['epoch'], train_process.train_loss_all, "ro-", label="Train loss")
plt.plot(train_process['epoch'], train_process.val_loss_all, "bs-", label="Val loss")
plt.legend()
plt.xlabel("epoch")
plt.ylabel("Loss")
# 准确率曲线
plt.subplot(1, 2, 2)
plt.plot(train_process['epoch'], train_process.train_acc_all, "ro-", label="Train acc")
plt.plot(train_process['epoch'], train_process.val_acc_all, "bs-", label="Val acc")
plt.xlabel("epoch")
plt.ylabel("acc")
plt.legend()
plt.show()作用:直观观察训练趋势(如训练损失下降、验证损失上升→过拟合)。
(四)主入口
if __name__ == '__main__':
LeNet = LeNet() # 加载模型
train_data, val_data = train_val_data_process() # 加载数据
train_process = train_model_process(LeNet, train_data, val_data, num_epochs=20) # 启动训练
matplot_acc_loss(train_process) # 可视化二、重点问题
问题 1:分类转回归的修改(代码 + 损失函数)
分类任务预测离散类别(如 0-9),回归任务预测连续值(如房价、温度),核心修改如下:
| 维度 | 分类任务 | 回归任务 |
|---|---|---|
| 模型输出层 | 维度 = 类别数(10) | 维度 = 1(单值回归) |
| 损失函数 | nn.CrossEntropyLoss() | nn.MSELoss()<br/>(MSE)/nn.L1Loss()<br/>(MAE) |
| 标签类型 | 整数(0-9) | 浮点型(如 3.14) |
| 评估指标 | 准确率 | MAE/RMSE(平均绝对误差 / 均方根误差) |
| 预测逻辑 | torch.argmax(output, 1) | output.squeeze()<br/>(直接取输出值) |
核心修改示例:
# 1. 回归损失函数
criterion = nn.MSELoss() # 替换交叉熵
# 2. 模型输出层修改(LeNet)
# 分类:self.fc3 = nn.Linear(84, 10)
# 回归:self.fc3 = nn.Linear(84, 1)
# 3. 训练阶段(回归)
for step, (b_x, b_y) in enumerate(train_dataloader):
b_x = b_x.to(device)
b_y = b_y.float().to(device) # 回归标签转浮点型
model.train()
output = model(b_x)
# 匹配维度:output.squeeze()去掉1维,和b_y对齐
loss = criterion(output.squeeze(), b_y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 回归指标:MAE(替换准确率)
train_mae += torch.sum(torch.abs(output.squeeze() - b_y.data))
train_loss += loss.item() * b_x.size(0)
train_num += b_x.size(0)问题 2:交叉熵损失的内置逻辑
nn.CrossEntropyLoss() 是 PyTorch 分类任务的核心损失,内置 2 个关键步骤,无需手动实现:
- Step1:对模型输出(logits)做 Softmax将 “原始得分” 转为概率分布(所有类别概率和为 1)。
- Step2:对 Softmax 输出做负对数似然损失(NLLLoss)惩罚错误预测(正确类别概率越低,损失越大)。
关键注意:
- 模型输出无需手动加 Softmax,否则会导致 Softmax 被应用两次,损失计算错误;
- 等价于:
nn.CrossEntropyLoss() = nn.LogSoftmax() + nn.NLLLoss()。
完整代码如下:
import copy
import time
import torch
from torchvision.datasets import FashionMNIST
from torchvision import transforms
import torch.utils.data as Data
import numpy as np
import matplotlib.pyplot as plt
from model import LeNet
import torch.nn as nn
import pandas as pd
def train_val_data_process():
train_data = FashionMNIST(root='./data',
train=True,
transform=transforms.Compose([transforms.Resize(size=28), transforms.ToTensor()]),
download=True)
train_data, val_data = Data.random_split(train_data, [round(0.8*len(train_data)), round(0.2*len(train_data))])
train_dataloader = Data.DataLoader(dataset=train_data,
batch_size=32,
shuffle=True,
num_workers=2)
val_dataloader = Data.DataLoader(dataset=val_data,
batch_size=32,
shuffle=True,
num_workers=2)
return train_dataloader, val_dataloader
def train_model_process(model, train_dataloader, val_dataloader, num_epochs):
# 设定训练所用到的设备,有GPU用GPU没有GPU用CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 使用Adam优化器,学习率为0.001
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# 损失函数为交叉熵函数
criterion = nn.CrossEntropyLoss()
# 将模型放入到训练设备中
model = model.to(device)
# 复制当前模型的参数
best_model_wts = copy.deepcopy(model.state_dict())
# 初始化参数
# 最高准确度
best_acc = 0.0
# 训练集损失列表
train_loss_all = []
# 验证集损失列表
val_loss_all = []
# 训练集准确度列表
train_acc_all = []
# 验证集准确度列表
val_acc_all = []
# 当前时间
since = time.time()
for epoch in range(num_epochs):
print("Epoch {}/{}".format(epoch, num_epochs-1))
print("-"*10)
# 初始化参数
# 训练集损失函数
train_loss = 0.0
# 训练集准确度
train_corrects = 0
# 验证集损失函数
val_loss = 0.0
# 验证集准确度
val_corrects = 0
# 训练集样本数量
train_num = 0
# 验证集样本数量
val_num = 0
# 对每一个mini-batch训练和计算
for step, (b_x, b_y) in enumerate(train_dataloader):
# 将特征放入到训练设备中
b_x = b_x.to(device)
# 将标签放入到训练设备中
b_y = b_y.to(device)
# 设置模型为训练模式
model.train()
# 前向传播过程,输入为一个batch,输出为一个batch中对应的预测
output = model(b_x)
# 查找每一行中最大值对应的行标
pre_lab = torch.argmax(output, dim=1)
# 计算每一个batch的损失函数
loss = criterion(output, b_y)
# 将梯度初始化为0
optimizer.zero_grad()
# 反向传播计算
loss.backward()
# 根据网络反向传播的梯度信息来更新网络的参数,以起到降低loss函数计算值的作用
optimizer.step()
# 对损失函数进行累加
train_loss += loss.item() * b_x.size(0)
# 如果预测正确,则准确度train_corrects加1
train_corrects += torch.sum(pre_lab == b_y.data)
# 当前用于训练的样本数量
train_num += b_x.size(0)
for step, (b_x, b_y) in enumerate(val_dataloader):
# 将特征放入到验证设备中
b_x = b_x.to(device)
# 将标签放入到验证设备中
b_y = b_y.to(device)
# 设置模型为评估模式
model.eval()
# 前向传播过程,输入为一个batch,输出为一个batch中对应的预测
output = model(b_x)
# 查找每一行中最大值对应的行标
pre_lab = torch.argmax(output, dim=1)
# 计算每一个batch的损失函数
loss = criterion(output, b_y)
# 对损失函数进行累加
val_loss += loss.item() * b_x.size(0)
# 如果预测正确,则准确度train_corrects加1
val_corrects += torch.sum(pre_lab == b_y.data)
# 当前用于验证的样本数量
val_num += b_x.size(0)
# 计算并保存每一次迭代的loss值和准确率
# 计算并保存训练集的loss值
train_loss_all.append(train_loss / train_num)
# 计算并保存训练集的准确率
train_acc_all.append(train_corrects.double().item() / train_num)
# 计算并保存验证集的loss值
val_loss_all.append(val_loss / val_num)
# 计算并保存验证集的准确率
val_acc_all.append(val_corrects.double().item() / val_num)
print("{} train loss:{:.4f} train acc: {:.4f}".format(epoch, train_loss_all[-1], train_acc_all[-1]))
print("{} val loss:{:.4f} val acc: {:.4f}".format(epoch, val_loss_all[-1], val_acc_all[-1]))
if val_acc_all[-1] > best_acc:
# 保存当前最高准确度
best_acc = val_acc_all[-1]
# 保存当前最高准确度的模型参数
best_model_wts = copy.deepcopy(model.state_dict())
# 计算训练和验证的耗时
time_use = time.time() - since
print("训练和验证耗费的时间{:.0f}m{:.0f}s".format(time_use//60, time_use%60))
# 选择最优参数,保存最优参数的模型
torch.save(best_model_wts, "./model.pth")
train_process = pd.DataFrame(data={"epoch":range(num_epochs),
"train_loss_all":train_loss_all,
"val_loss_all":val_loss_all,
"train_acc_all":train_acc_all,
"val_acc_all":val_acc_all,})
return train_process
def matplot_acc_loss(train_process):
# 显示每一次迭代后的训练集和验证集的损失函数和准确率
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(train_process['epoch'], train_process.train_loss_all, "ro-", label="Train loss")
plt.plot(train_process['epoch'], train_process.val_loss_all, "bs-", label="Val loss")
plt.legend()
plt.xlabel("epoch")
plt.ylabel("Loss")
plt.subplot(1, 2, 2)
plt.plot(train_process['epoch'], train_process.train_acc_all, "ro-", label="Train acc")
plt.plot(train_process['epoch'], train_process.val_acc_all, "bs-", label="Val acc")
plt.xlabel("epoch")
plt.ylabel("acc")
plt.legend()
plt.show()
if __name__ == '__main__':
# 加载需要的模型
LeNet = LeNet()
# 加载数据集
train_data, val_data = train_val_data_process()
# 利用现有的模型进行模型的训练
train_process = train_model_process(LeNet, train_data, val_data, num_epochs=20)
matplot_acc_loss(train_process)