参考了PyTorch官方文档和Ray Tune官方文档
1、Hyperparameter tuning with Ray Tune — PyTorch Tutorials 1.9.1+cu102 documentation
2、How to use Tune with PyTorch — Ray v1.7.0
以PyTorch中的CIFAR 10图片分类为例,示范如何将Ray Tune融入PyTorch模型训练过程中。
Code:
from functools import partial
import numpy as np
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import random_split
import torchvision
import torchvision.transforms as transforms
from ray import tune
from ray.tune import CLIReporter
from ray.tune.schedulers import ASHAScheduler
# 定义神经网络模型
class Net(nn.Module):
def __init__(self, l1=120, l2=84):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, l1) # 参数待指定
self.fc2 = nn.Linear(l1, l2) # 参数待指定
self.fc3 = nn.Linear(l2, 10) # 参数待指定
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
# 封装数据加载过程,传递全局数据路径,以保证不同实验间共享数据路径
def load_data(data_dir="/home/taoshouzheng/Local_Connection/Algorithms/ray/"):
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
trainset = torchvision.datasets.CIFAR10(
root=data_dir, train=True, download=True, transform=transform)
testset = torchvision.datasets.CIFAR10(
root=data_dir, train=False, download=True, transform=transform)
return trainset, testset
# 封装训练脚本
# config参数用于指定超参数
# checkpoint_dir参数用于存储检查点
# data_dir参数用于指定数据加载和存储路径
def train_cifar(config, checkpoint_dir=None, data_dir=None):
# 模型实例化
net = Net(config["l1"], config["l2"]) # 2个超参数
# 这种写法保证没有GPU可用时模型也可以训练
device = "cpu"
if torch.cuda.is_available():
device = "cuda:0"
if torch.cuda.device_count() > 1:
# 将模型封装到nn.DataParallel中以支持多GPU并行训练
net = nn.DataParallel(net)
net.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=config["lr"], momentum=0.9) # 1个超参数
# 用于存储检查点
if checkpoint_dir:
# 模型的状态、优化器的状态
model_state, optimizer_state = torch.load(
os.path.join(checkpoint_dir, "checkpoint"))
net.load_state_dict(model_state)
optimizer.load_state_dict(optimizer_state)
trainset, testset = load_data(data_dir)
test_abs = int(len(trainset) * 0.8)
# 将训练数据划分为训练集(80%)和验证集(20%)
train_subset, val_subset = random_split(
trainset, [test_abs, len(trainset) - test_abs])
trainloader = torch.utils.data.DataLoader(
train_subset,
batch_size=int(config["batch_size"]), # 1个超参数
shuffle=True,
num_workers=8)
valloader = torch.utils.data.DataLoader(
val_subset,
batch_size=int(config["batch_size"]),
shuffle=True,
num_workers=8)
for epoch in range(10): # loop over the dataset multiple times
running_loss = 0.0
epoch_steps = 0
# 训练循环
for i, data in enumerate(trainloader, 0):
# get the inputs; data is a list of [inputs, labels]
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
# zero the parameter gradients
optimizer.zero_grad()
# forward + backward + optimize
outputs = net(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
# print statistics
running_loss += loss.item()
epoch_steps += 1
if i % 2000 == 1999: # print every 2000 mini-batches
print("[%d, %5d] loss: %.3f" % (epoch + 1, i + 1,
running_loss / epoch_steps))
running_loss = 0.0
# 验证循环
# Validation loss
val_loss = 0.0
val_steps = 0
total = 0
correct = 0
for i, data in enumerate(valloader, 0):
with torch.no_grad():
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
outputs = net(inputs)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
loss = criterion(outputs, labels)
val_loss += loss.cpu().numpy()
val_steps += 1
# 保存检查点
# ray.tune.checkpoint_dir(step)返回检查点路径
with tune.checkpoint_dir(epoch) as checkpoint_dir:
path = os.path.join(checkpoint_dir, "checkpoint")
torch.save((net.state_dict(), optimizer.state_dict()), path)
# 打印平均损失和平均精度
tune.report(loss=(val_loss / val_steps), accuracy=correct / total)
print("Finished Training")
# 测试集精度
def test_accuracy(net, device="cpu"):
trainset, testset = load_data()
testloader = torch.utils.data.DataLoader(
testset, batch_size=4, shuffle=False, num_workers=2)
correct = 0
total = 0
with torch.no_grad():
for data in testloader:
images, labels = data
images, labels = images.to(device), labels.to(device)
outputs = net(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
return correct / total
def main(num_samples=10, max_num_epochs=10, gpus_per_trial=2):
# 全局文件路径
data_dir = os.path.abspath("/home/taoshouzheng/Local_Connection/Algorithms/ray/")
# 加载训练数据
load_data(data_dir)
# 配置超参数搜索空间
# 每次实验,Ray Tune会随机采样超参数组合,并行训练模型,找到最优参数组合
config = {
# 自定义采样方法
"l1": tune.sample_from(lambda _: 2 ** np.random.randint(2, 9)),
"l2": tune.sample_from(lambda _: 2 ** np.random.randint(2, 9)),
# 随机分布采样
"lr": tune.loguniform(1e-4, 1e-1),
# 从类别型值中随机选择
"batch_size": tune.choice([2, 4, 8, 16])
}
# ASHAScheduler会根据指定标准提前中止坏实验
scheduler = ASHAScheduler(
metric="loss",
mode="min",
max_t=max_num_epochs,
grace_period=1,
reduction_factor=2)
# 在命令行打印实验报告
reporter = CLIReporter(
# parameter_columns=["l1", "l2", "lr", "batch_size"],
metric_columns=["loss", "accuracy", "training_iteration"])
# 执行训练过程
result = tune.run(
partial(train_cifar, data_dir=data_dir),
# 指定训练资源
resources_per_trial={"cpu": 8, "gpu": gpus_per_trial},
config=config,
num_samples=num_samples,
scheduler=scheduler,
progress_reporter=reporter)
# 找出最佳实验
best_trial = result.get_best_trial("loss", "min", "last")
# 打印最佳实验的参数配置
print("Best trial config: {}".format(best_trial.config))
print("Best trial final validation loss: {}".format(
best_trial.last_result["loss"]))
print("Best trial final validation accuracy: {}".format(
best_trial.last_result["accuracy"]))
# 打印最优超参数组合对应的模型在测试集上的性能
best_trained_model = Net(best_trial.config["l1"], best_trial.config["l2"])
device = "cpu"
if torch.cuda.is_available():
device = "cuda:0"
if gpus_per_trial > 1:
best_trained_model = nn.DataParallel(best_trained_model)
best_trained_model.to(device)
best_checkpoint_dir = best_trial.checkpoint.value
model_state, optimizer_state = torch.load(os.path.join(
best_checkpoint_dir, "checkpoint"))
best_trained_model.load_state_dict(model_state)
test_acc = test_accuracy(best_trained_model, device)
print("Best trial test set accuracy: {}".format(test_acc))
if __name__ == "__main__":
# You can change the number of GPUs per trial here:
main(num_samples=10, max_num_epochs=10, gpus_per_trial=0)