转载地址:https://bbs.huaweicloud.com/forum/thread-118716-1-1.html
作者:李响
梯度累积引入Mini-batch的概念,首先对每个Mini-batch的数据计算loss和梯度,但不立即更新模型参数,而是先对所得梯度进行累加,然后在指定数量(N)个Mini-batch之后,用累积后的梯度更新网络参数。下次训练前清空过往累积梯度后重新累加,如此往复。最终目的是为了达到跟直接用N*Mini-batch数据训练几乎同样的效果。
在单机模式下,主要通过将训练流程拆分为正向反向训练、参数更新和累积梯度清理三个部分实现梯度累积。
(以下以MNIST数据集为例)
导入需要的库文件:
import argparse
import os
from collections.abc import Iterable
import mindspore.nn as nn
from mindspore import ParameterTuple
from mindspore import context, DatasetHelper, save_checkpoint
from mindspore.nn import Cell
import mindspore.ops as ops
from model_zoo.official.cv.lenet.src.dataset import create_dataset
from model_zoo.official.cv.lenet.src.lenet import LeNet5
定义训练流程:
将训练流程拆分为正向反向训练、参数更新和累积梯度清理三个部分:
TrainForwardBackward计算loss和梯度,利用grad_sum实现梯度累加。
TrainOptim实现参数更新。
TrainClear实现对梯度累加变量grad_sum清零。
_sum_op = ops.MultitypeFuncGraph("grad_sum_op")
_clear_op = ops.MultitypeFuncGraph("clear_op")
@_sum_op.register("Tensor", "Tensor")
def _cumulative_grad(grad_sum, grad):
"""Apply grad sum to cumulative gradient."""
add = ops.AssignAdd()
return add(grad_sum, grad)
@_clear_op.register("Tensor", "Tensor")
def _clear_grad_sum(grad_sum, zero):
"""Apply zero to clear grad_sum."""
success = True
success = ops.depend(success, ops.assign(grad_sum, zero))
return success
class TrainForwardBackward(Cell):
def __init__(self, network, optimizer, grad_sum, sens=1.0):
super(TrainForwardBackward, self).__init__(auto_prefix=False)
self.network = network
self.network.set_grad()
self.network.add_flags(defer_inline=True)
self.weights = ParameterTuple(network.trainable_params())
self.optimizer = optimizer
self.grad_sum = grad_sum
self.grad = ops.GradOperation(get_by_list=True, sens_param=True)
self.sens = sens
self.hyper_map = ops.HyperMap()
def construct(self, *inputs):
weights = self.weights
loss = self.network(*inputs)
sens = ops.Fill()(ops.DType()(loss), ops.Shape()(loss), self.sens)
grads = self.grad(self.network, weights)(*inputs, sens)
return ops.depend(loss, self.hyper_map(ops.partial(_sum_op), self.grad_sum, grads))
class TrainOptim(Cell):
def __init__(self, optimizer, grad_sum):
super(TrainOptim, self).__init__(auto_prefix=False)
self.optimizer = optimizer
self.grad_sum = grad_sum
def construct(self):
return self.optimizer(self.grad_sum)
class TrainClear(Cell):
def __init__(self, grad_sum, zeros):
super(TrainClear, self).__init__(auto_prefix=False)
self.grad_sum = grad_sum
self.zeros = zeros
self.hyper_map = ops.HyperMap()
def construct(self):
success = self.hyper_map(ops.partial(_clear_op), self.grad_sum, self.zeros)
return success
训练并保存模型
调用网络、优化器及损失函数,然后自定义GradientAccumulation的train_process接口,进行模型训练。
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='MindSpore Grad Cumulative Example')
parser.add_argument('--device_target', type=str, default="GPU", choices=['GPU'],
help='device where the code will be implemented (default: GPU)')
parser.add_argument('--data_path', type=str, default="./Data",
help='path where the dataset is saved')
args = parser.parse_args()
context.set_context(mode=context.GRAPH_MODE, device_target=args.device_target)
ds_train = create_dataset(os.path.join(args.data_path, "train"), 32)
net = LeNet5(10)
net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction="mean")
net_opt = nn.Momentum(net.trainable_params(), 0.01, 0.9)
model = GradientAccumulation(net, net_loss, net_opt)
print("============== Starting Training ==============")
model.train_process(10, ds_train, mini_steps=4)
上面阐述的是单机模式,如果在并行模式下,则需要改变策略
在SEMI_AUTO_PARALLEL和AUTO_PARALLEL模式下使用梯度累积,主要是将累积迭代和更新迭代作为两张图下发并且交替执行。在累积迭代图上,只执行正反向运算及梯度累加。在更新迭代图上,执行正反向运算和参数更新。
定义并行训练流程
通常情况下,定义了正向网络后会使用TrainOneStepCell将网络正反向及优化器关联到一起。但是梯度累积时存在累积和更新两种情况,所以我们要基于原有类定义做一些改造。样例代码如下:
import numpy as np
import mindspore.common.dtype as mstype
from mindspore import ops, context, Tensor, Parameter
from mindspore.nn import TrainOneStepCell
from mindspore.common.initializer import initializer
zeroslike = ops.ZerosLike()
reset_accu_grads = ops.MultitypeFuncGraph("reset_accu_grads")
@reset_accu_grads.register("Tensor")
def _reset_accu_grads(accu_grad):
succ = True
return ops.depend(succ, ops.assign(accu_grad, zeroslike(accu_grad)))
cast = ops.Cast()
update_accu_grads = ops.MultitypeFuncGraph("update_accu_grads")
@update_accu_grads.register("Tensor", "Tensor")
def _update_accu_grads(accu_grad, grad):
succ = True
return ops.depend(succ, ops.assign_add(accu_grad, cast(grad, mstype.float32)))
class TrainAccuStepsCell(TrainOneStepCell):
def __init__(self, network, optimizer, sens=1.0):
super(TrainAccuStepsCell, self).__init__(network, optimizer, sens)
self.accumulation = False
self.accumulation_steps = context.get_auto_parallel_context("grad_accumulation_step")
self.accu_grads = self.weights.clone(prefix="accu_grads", init='zeros')
self.hyper_map = ops.HyperMap()
def construct(self, *inputs):
"""Defines the computation performed."""
weights = self.weights
loss = self.network(*inputs)
sens = ops.Fill()(ops.DType()(loss), ops.Shape()(loss), self.sens)
grads = self.grad(self.network, weights)(*inputs, sens)
if self.accumulation and self.accumulation_steps > 1:
accu_succ = self.hyper_map(update_accu_grads, self.accu_grads, grads)
loss = ops.depend(loss, accu_succ)
if self.accumulation:
succ = False
else:
grads = self.grad_reducer(grads)
accu_grads = ops.depend(self.accu_grads, grads)
accu_succ = self.hyper_map(reset_accu_grads, accu_grads)
loss = ops.depend(loss, accu_succ)
succ = self.optimizer(grads)
return ops.depend(loss, succ)
在TrainOneStepCell的基础上,增加累积标记accumulation和累积梯度参数accu_grads的定义,分别用于区分训练流程和保存累积梯度值。在累积迭代图上,accumulation为True,只执行正反向运算并将梯度累加到参数accu_grads。在更新迭代图上,accumulation为False,执行正反向运算和参数更新。在动态loss scale场景下,除了梯度需要累积外,溢出标志位也需要累积判断,可以基于TrainOneStepWithLossScaleCell改造,实现代码如下:
import numpy as np
import mindspore.common.dtype as mstype
from mindspore import ops, context, Tensor, Parameter
from mindspore.nn import TrainOneStepWithLossScaleCell
from mindspore.nn.wrap.loss_scale import _grad_scale
from mindspore.common.initializer import initializer
zeroslike = ops.ZerosLike()
reset_accu_grads = ops.MultitypeFuncGraph("reset_accu_grads")
@reset_accu_grads.register("Tensor")
def _reset_accu_grads(accu_grad):
succ = True
return ops.depend(succ, ops.assign(accu_grad, zeroslike(accu_grad)))
cast = ops.Cast()
update_accu_grads = ops.MultitypeFuncGraph("update_accu_grads")
@update_accu_grads.register("Tensor", "Tensor")
def _update_accu_grads(accu_grad, grad):
succ = True
return ops.depend(succ, ops.assign_add(accu_grad, cast(grad, mstype.float32)))
class TrainAccuStepsWithLossScaleCell(TrainOneStepWithLossScaleCell):
def __init__(self, network, optimizer, scale_sense):
super(TrainAccuStepsWithLossScaleCell, self).__init__(network, optimizer, scale_sense)
self.accumulation = False
self.accumulation_steps = context.get_auto_parallel_context("grad_accumulation_step")
self.one = Tensor(np.array([1]).astype(np.int32))
self.zero = Tensor(np.array([0]).astype(np.int32))
self.accu_grads = self.weights.clone(prefix="accu_grads", init='zeros')
self.accu_overflow = Parameter(initializer(0, [1], mstype.int32))
self.accu_loss = Parameter(initializer(0, [1], mstype.float32))
self.cast = ops.Cast()
self.logical_or = ops.LogicalOr()
self.not_equal = ops.NotEqual()
self.select = ops.Select()
self.reshape = ops.Reshape()
def construct(self, *inputs):
"""Defines the computation performed."""
weights = self.weights
loss = self.network(*inputs)
scaling_sens = self.scale_sense
status, scaling_sens = self.start_overflow_check(loss, scaling_sens)
scaling_sens_filled = ops.ones_like(loss) * ops.cast(scaling_sens, ops.dtype(loss))
grads = self.grad(self.network, weights)(*inputs, scaling_sens_filled)
# accumulate gradients
if self.accumulation and self.accumulation_steps > 1:
accu_succ = self.hyper_map(update_accu_grads, self.accu_grads, grads)
loss = ops.depend(loss, accu_succ)
overflow = self.get_overflow_status(status, grads)
overflow = self.logical_or(self.not_equal(self.accu_overflow, self.zero), overflow)
accu_overflow = self.select(overflow, self.one, self.zero)
if self.accumulation:
succ = False
self.accu_overflow = accu_overflow
else:
self.accu_overflow = self.zero
# apply grad reducer on grads
grads = self.grad_reducer(grads)
grads = self.hyper_map(ops.partial(_grad_scale, scaling_sens), grads)
accu_overflow = self.allreduce(accu_overflow)
overflow = self.less_equal(self.base, accu_overflow)
accu_grads = ops.depend(self.accu_grads, grads)
accu_succ = self.hyper_map(reset_accu_grads, accu_grads)
overflow = ops.depend(overflow, accu_succ)
overflow = self.reshape(overflow, (()))
overflow = self.process_loss_scale(overflow)
if overflow:
succ = False
else:
succ = self.optimizer(grads)
ret = (loss, overflow, scaling_sens)
return ops.depend(ret, succ)
定义并行训练模型
经过cell_wrapper封装的网络已经包含了正反向和优化器实现,我们还需要将数据集对接到网络并实现两张图交替执行。这里基于框架中的Model接口实现上述功能。
import math
from mindspore.train.callback import RunContext
from mindspore import context
from mindspore.context import ParallelMode
from mindspore import Model, connect_network_with_dataset
from mindspore.common.dtype import pytype_to_dtype
from mindspore._c_expression import init_exec_dataset
from mindspore.train.train_thor.dataset_helper import DatasetHelper
def _convert_type(types):
"""
Convert from numpy type to tensor type.
Args:
types (list): Numpy type list of element in dataset.
Returns:
list, list of element in dataset.
"""
ms_types = []
for np_type in types:
ms_type = pytype_to_dtype(np_type)
ms_types.append(ms_type)
return ms_types
def _get_types_and_shapes(dataset):
"""Get dataset types and shapes."""
dataset_types = _convert_type(dataset.output_types())
dataset_shapes = dataset.output_shapes()
return dataset_types, dataset_shapes
def _exec_datagraph(exec_dataset, dataset_size, phase='dataset'):
"""Initialize and execute the dataset graph."""
batch_size = exec_dataset.get_batch_size()
input_indexs = exec_dataset.input_indexs
# transform data format
dataset_types, dataset_shapes = _get_types_and_shapes(exec_dataset)
init_exec_dataset(exec_dataset.__transfer_dataset__.queue_name,
dataset_size,
batch_size,
dataset_types,
dataset_shapes,
input_indexs,
phase=phase,
need_run=False)
class Model_ACCU(Model):
def __init__(self, network, loss_fn=None, optimizer=None, metrics=None, eval_network=None,
eval_indexes=None, amp_level="O0", **kwargs):
super(Model_ACCU, self).__init__(network, loss_fn, optimizer, metrics, eval_network,
eval_indexes, amp_level, **kwargs)
self._frequency = context.get_auto_parallel_context("grad_accumulation_step")
self._train_network = self._build_train_network()
def _exec_preprocess(self, network, is_train, phase, dataset, dataset_sink_mode, sink_size=-1,
epoch_num=1, iter_first_order=1):
"""Initializes dataset."""
if dataset_sink_mode and not is_train:
dataset.__loop_size__ = 1
dataset_helper = DatasetHelper(dataset, dataset_sink_mode, sink_size, epoch_num, iter_first_order)
if dataset_sink_mode and context.get_context("device_target") != "GPU":
network = connect_network_with_dataset(network, dataset_helper)
network.set_train(is_train)
network.phase = phase
if self._parallel_mode in (ParallelMode.SEMI_AUTO_PARALLEL, ParallelMode.AUTO_PARALLEL):
network.set_auto_parallel()
return dataset_helper, network
def _train_dataset_sink_process(self, epoch, train_dataset, list_callback=None, cb_params=None, sink_size=-1):
"""
Training process. The data would be passed to network through dataset channel.
Args:
epoch (int): Total number of iterations on the data.
train_dataset (Dataset): A training dataset iterator. If there is no
loss_fn, a tuple with multiple data (data1, data2, data3, ...) should be
returned and passed to the network. Otherwise, a tuple (data, label) should
be returned. The data and label would be passed to the network and loss
function respectively.
list_callback (Callback): Executor of callback list. Default: None.
cb_params (_InternalCallbackParam): Callback parameters. Default: None.
sink_size (int): Control the amount of data in each sink. Default: -1.
"""
if sink_size == -1:
epoch_num = epoch
else:
epoch_num = math.ceil(epoch * sink_size / train_dataset.get_dataset_size())
iter_first_order = 1
iter_second_order = self._frequency - 1
train_dataset.__loop_size__ = iter_second_order
dataset_helper, train_network = self._exec_preprocess(self._train_network,
is_train=True,
phase='train',
dataset=train_dataset,
dataset_sink_mode=True,
sink_size=sink_size,
epoch_num=epoch_num,
iter_first_order=iter_first_order)
self._train_network = train_network
cb_params.train_network = self._train_network
cb_params.cur_step_num = 0
run_context = RunContext(cb_params)
list_callback.begin(run_context)
# used to stop training for early stop, such as stopAtTIme or stopATStep
should_stop = False
switch_branch_one = True
index_first_order = 0
train_network_init_flag = True
has_do_dataset_init = False
for i in range(epoch):
cb_params.cur_epoch_num = i + 1
list_callback.epoch_begin(run_context)
# for data sink dataset_helper only iter once, other wise iter epoch_size times.
for inputs in dataset_helper:
list_callback.step_begin(run_context)
if switch_branch_one:
cb_params.cur_step_num += iter_second_order
if train_network_init_flag:
self._train_network.add_flags_recursive(accumulation=True)
self._train_network.phase = 'train0'
else:
cb_params.cur_step_num += iter_first_order
if train_network_init_flag:
self._train_network.add_flags_recursive(accumulation=False)
train_network_init_flag = False
self._train_network.phase = 'train1'
if not has_do_dataset_init:
_exec_datagraph(train_dataset, iter_first_order, phase='train1_dataset')
has_do_dataset_init = True
switch_branch_one = not switch_branch_one
outputs = self._train_network(*inputs)
cb_params.net_outputs = outputs
list_callback.step_end(run_context)
list_callback.epoch_end(run_context)
should_stop = should_stop or run_context.get_stop_requested()
if should_stop:
break
dataset_helper.stop_send()
list_callback.end(run_context)
训练模型
完成上述定义后,即可利用训练接口完成模型训练。首先需要在context.set_auto_parallel_context配置grad_accumulation_step参数,使能梯度累积。其次利用改造的cell_warapper封装网络结构,传入Model_ACCU中初始化模型。
context.set_auto_parallel_context(parallel_mode=ParallelMode.AUTO_PARALLEL, gradients_mean=True, grad_accumulation_step=6)
loss_cb = LossMonitor()
data_path = os.getenv('DATA_PATH')
batch_size = 32
dataset = create_dataset(data_path, batch_size=batch_size)
num_classes = 10
net = resnet50(batch_size, num_classes)
loss = SoftmaxCrossEntropyExpand(sparse=True)
opt = Momentum(filter(lambda x: x.requires_grad, net.get_parameters()), 0.01, 0.9)
net_with_loss = nn.WithLossCell(net, loss)
net_with_loss = VirtualDatasetCell(net_with_loss)
wrap_net = TrainAccuStepsCell(net_with_loss, opt)
model = Model_ACCU(wrap_net)
model.train(epoch_size, dataset, callbacks=[loss_cb], dataset_sink_mode=True)