NNDL 实验13 卷积神经网络(4)预训练ResNet-CIFAR10

22
18·
本文介绍了如何使用PyTorch的预训练ResNet18模型对CIFAR-10数据集进行图像分类,探讨了迁移学习的概念,比较了使用预训练模型和不使用的情况,并展示了数据处理、模型构建、训练和评估的过程。实验任务:使用预训练resnet18实现CIFAR-10分类
数据集:https://links.jianshu.com/go?to=https%3A%2F%2Fwww.cs.toronto.edu%2F%7Ekriz%2Fcifar-10-python.tar.gz
数据集:CIFAR-10数据集

使用Pytorch高层API中的Resnet18进行图像分类实验。
torchvision.models.resnet18()
什么是“预训练模型”?什么是“迁移学习”?
比较“使用预训练模型”和“不使用预训练模型”的效果。
resnet = models.resnet18(pretrained=True)
resnet = models.resnet18(pretrained=False)
损失函数:交叉熵
优化器:Adam优化器,Adam优化器的介绍参考NNDL第7.2.4.3节
评价指标:准确率
1.1数据读取
在本实验中,将原始训练集拆分成了train_set、dev_set两个部分,分别包括40 000条和10 000条样本。将data_batch_1到data_batch_4作为训练集,data_batch_5作为验证集,test_batch作为测试集。 最终的数据集构成为:
- 训练集:40 000条样本。
- 验证集:10 000条样本。
- 测试集:10 000条样本。
读取一个batch数据的代码如下所示:
import osimport pickleimport numpy as npdef load_cifar10_batch(folder_path, batch_id=1, mode='train'): if mode == 'test': file_path = os.path.join(folder_path, 'test_batch') else: file_path = os.path.join(folder_path, 'data_batch_'+str(batch_id)) #加载数据集文件 with open(file_path, 'rb') as batch_file: batch = pickle.load(batch_file, encoding = 'latin1') imgs = batch['data'].reshape((len(batch['data']),3,32,32)) / 255. labels = batch['labels'] return np.array(imgs, dtype='float32'), np.array(labels)imgs_batch, labels_batch = load_cifar10_batch(folder_path=r'C:\Users\320\PycharmProjects\pythonProject/cifar-10-batches-py', batch_id=1, mode='train')查看数据的维度:
#打印一下每个batch中X和y的维度print ("batch of imgs shape: ",imgs_batch.shape, "batch of labels shape: ", labels_batch.shape)
可视化观察其中的一张样本图像和对应的标签,代码如下所示:
调用matplotlib库:
import matplotlib.pyplot as plt主函数
image, label = imgs_batch[3], labels_batch[3]print("The label in the picture is {}".format(label))plt.figure(figsize=(2, 2))plt.imshow(image.transpose(1,2,0))plt.savefig('cnn-car.pdf')

1.3 构造Dataset类
import osimport pickleimport numpy as npdef load_cifar10_batch(folder_path, batch_id=1, mode='train'): if mode == 'test': file_path = os.path.join(folder_path, 'test_batch') else: file_path = os.path.join(folder_path, 'data_batch_'+str(batch_id)) #加载数据集文件 with open(file_path, 'rb') as batch_file: batch = pickle.load(batch_file, encoding = 'latin1') imgs = batch['data'].reshape((len(batch['data']),3,32,32)) / 255. labels = batch['labels'] return np.array(imgs, dtype='float32'), np.array(labels)imgs_batch, labels_batch = load_cifar10_batch(folder_path=r'C:\Users\320\PycharmProjects\pythonProject/cifar-10-batches-py', batch_id=1, mode='train')import torchfrom torch.utils.data import Dataset,DataLoaderimport torchvision.transforms as transformsclass CIFAR10Dataset(Dataset): def __init__(self, folder_path=r'C:\Users\320\PycharmProjects\pythonProject\cifar-10-batches-py', mode='train'): if mode == 'train': #加载batch1-batch4作为训练集 self.imgs, self.labels = load_cifar10_batch(folder_path=folder_path, batch_id=1, mode='train') for i in range(2, 5): imgs_batch, labels_batch = load_cifar10_batch(folder_path=folder_path, batch_id=i, mode='train') self.imgs, self.labels = np.concatenate([self.imgs, imgs_batch]), np.concatenate([self.labels, labels_batch]) elif mode == 'dev': #加载batch5作为验证集 self.imgs, self.labels = load_cifar10_batch(folder_path=folder_path, batch_id=5, mode='dev') elif mode == 'test': #加载测试集 self.imgs, self.labels = load_cifar10_batch(folder_path=folder_path, mode='test') self.transforms = transforms.Compose([transforms.Resize(32),transforms.ToTensor(), transforms.Normalize(mean=[0.4914,0.4822,0.4465], std=[0.2023, 0.1994, 0.2010])]) def __getitem__(self, idx): img, label = self.imgs[idx], self.labels[idx] img = self.transform(img) return img, label def __len__(self): return len(self.imgs)train_dataset = CIFAR10Dataset(folder_path=r'C:\Users\320\PycharmProjects\pythonProject\cifar-10-batches-py', mode='train')dev_dataset = CIFAR10Dataset(folder_path=r'C:\Users\320\PycharmProjects\pythonProject\cifar-10-batches-py', mode='dev')test_dataset = CIFAR10Dataset(folder_path=r'C:\Users\320\PycharmProjects\pythonProject/cifar-10-batches-py', mode='test')2.1模型构建
使用PyTorch API中的Resnet18进行图像分类实验
from torchvision.models import resnet18resnet18_model=resnet18(pretrained=True)3.1 模型训练
复用RunnerV3类,实例化RunnerV3类,并传入训练配置。
使用训练集和验证集进行模型训练,共训练30个epoch。
在实验中,保存准确率最高的模型作为最佳模型。代码实现如下:、
其中RunnerV3和Accuracy代码可以直接调用飞桨中的nndl
RunnerV3代码:
class RunnerV3(object): def __init__(self, model, optimizer, loss_fn, metric, **kwargs): self.model = model self.optimizer = optimizer self.loss_fn = loss_fn self.metric = metric # 只用于计算评价指标 # 记录训练过程中的评价指标变化情况 self.dev_scores = [] # 记录训练过程中的损失函数变化情况 self.train_epoch_losses = [] # 一个epoch记录一次loss self.train_step_losses = [] # 一个step记录一次loss self.dev_losses = [] # 记录全局最优指标 self.best_score = 0 def train(self, train_loader, dev_loader=None, **kwargs): # 将模型切换为训练模式 self.model.train() # 传入训练轮数,如果没有传入值则默认为0 num_epochs = kwargs.get("num_epochs", 0) # 传入log打印频率,如果没有传入值则默认为100 log_steps = kwargs.get("log_steps", 100) # 评价频率 eval_steps = kwargs.get("eval_steps", 0) # 传入模型保存路径,如果没有传入值则默认为"best_model.pdparams" save_path = kwargs.get("save_path", "best_model.pdparams") custom_print_log = kwargs.get("custom_print_log", None) # 训练总的步数 num_training_steps = num_epochs * len(train_loader) if eval_steps: if self.metric is None: raise RuntimeError('Error: Metric can not be None!') if dev_loader is None: raise RuntimeError('Error: dev_loader can not be None!') # 运行的step数目 global_step = 0 # 进行num_epochs轮训练 for epoch in range(num_epochs): # 用于统计训练集的损失 total_loss = 0 for step, data in enumerate(train_loader): X, y = data # 获取模型预测 logits = self.model(X.to(device)) loss = self.loss_fn(logits, y.long().to(device)) # 默认求mean total_loss += loss # 训练过程中,每个step的loss进行保存 self.train_step_losses.append((global_step, loss.item())) if log_steps and global_step % log_steps == 0: print( f"[Train] epoch: {epoch}/{num_epochs}, step: {global_step}/{num_training_steps}, loss: {loss.item():.5f}") # 梯度反向传播,计算每个参数的梯度值 loss.backward() if custom_print_log: custom_print_log(self) # 小批量梯度下降进行参数更新 self.optimizer.step() # 梯度归零 self.optimizer.zero_grad() # 判断是否需要评价 if eval_steps > 0 and global_step > 0 and \ (global_step % eval_steps == 0 or global_step == (num_training_steps - 1)): dev_score, dev_loss = self.evaluate(dev_loader, global_step=global_step) print(f"[Evaluate] dev score: {dev_score:.5f}, dev loss: {dev_loss:.5f}") # 将模型切换为训练模式 self.model.train() # 如果当前指标为最优指标,保存该模型 if dev_score > self.best_score: self.save_model(save_path) print( f"[Evaluate] best accuracy performence has been updated: {self.best_score:.5f} --> {dev_score:.5f}") self.best_score = dev_score global_step += 1 # 当前epoch 训练loss累计值 trn_loss = (total_loss / len(train_loader)).item() # epoch粒度的训练loss保存 self.train_epoch_losses.append(trn_loss) print("[Train] Training done!") # 模型评估阶段,使用'torch.no_grad()'控制不计算和存储梯度 @torch.no_grad() def evaluate(self, dev_loader, **kwargs): assert self.metric is not None # 将模型设置为评估模式 self.model.eval() global_step = kwargs.get("global_step", -1) # 用于统计训练集的损失 total_loss = 0 # 重置评价 self.metric.reset() # 遍历验证集每个批次 for batch_id, data in enumerate(dev_loader): X, y = data # 计算模型输出 logits = self.model(X.to(device)) # 计算损失函数 loss = self.loss_fn(logits, y.long().to(device)).item() # 累积损失 total_loss += loss # 累积评价 self.metric.update(logits, y.to(device)) dev_loss = (total_loss / len(dev_loader)) dev_score = self.metric.accumulate() # 记录验证集loss if global_step != -1: self.dev_losses.append((global_step, dev_loss)) self.dev_scores.append(dev_score) return dev_score, dev_loss # 模型评估阶段,使用'torch.no_grad()'控制不计算和存储梯度 @torch.no_grad() def predict(self, x, **kwargs): # 将模型设置为评估模式 self.model.eval() # 运行模型前向计算,得到预测值 logits = self.model(x.to(device)) return logits def save_model(self, save_path): torch.save(self.model.state_dict(), save_path) def load_model(self, model_path): state_dict = torch.load(model_path) self.model.load_state_dict(state_dict)Accuracy:
class Accuracy(): def __init__(self, is_logist=True): # 用于统计正确的样本个数 self.num_correct = 0 # 用于统计样本的总数 self.num_count = 0 self.is_logist = is_logist def update(self, outputs, labels): # 判断是二分类任务还是多分类任务,shape[1]=1时为二分类任务,shape[1]>1时为多分类任务 if outputs.shape[1] == 1: # 二分类 outputs = torch.squeeze(outputs, dim=-1) if self.is_logist: # logist判断是否大于0 preds = torch.tensor((outputs >= 0), dtype=torch.float32) else: # 如果不是logist,判断每个概率值是否大于0.5,当大于0.5时,类别为1,否则类别为0 preds = torch.tensor((outputs >= 0.5), dtype=torch.float32) else: # 多分类时,使用'torch.argmax'计算最大元素索引作为类别 preds = torch.argmax(outputs, dim=1) # 获取本批数据中预测正确的样本个数 labels = torch.squeeze(labels, dim=-1) batch_correct = torch.sum(torch.tensor(preds == labels, dtype=torch.float32)).cpu().numpy() batch_count = len(labels) # 更新num_correct 和 num_count self.num_correct += batch_correct self.num_count += batch_count def accumulate(self): # 使用累计的数据,计算总的指标 if self.num_count == 0: return 0 return self.num_correct / self.num_count def reset(self): # 重置正确的数目和总数 self.num_correct = 0 self.num_count = 0 def name(self): return "Accuracy"您可能感兴趣的与本文相关的镜像

PyTorch 2.5
PyTorch 是一个开源的 Python 机器学习库,基于 Torch 库,底层由 C++ 实现,应用于人工智能领域,如计算机视觉和自然语言处理
















345





















