[源码解析] PyTorch 分布式(15) --- 使用分布式 RPC 框架实现参数服务器

[源码解析] PyTorch 分布式(15) --- 使用分布式 RPC 框架实现参数服务器

0x00 摘要

在前面的文章之中,我们已经学习了PyTorch 分布式的基本模块,接下来我们通过几篇文章来看看如何把这些模块应用到实践之中,顺便把PyTorch分布式逻辑整体梳理一下。本文介绍如何使用分布式 RPC 框架实现参数服务器。

本文以 https://pytorch.org/tutorials/intermediate/rpc_param_server_tutorial.html 为基础,加入了自己的理解。

PyTorch分布式其他文章如下:

深度学习利器之自动微分(1)

深度学习利器之自动微分(2)

[源码解析]深度学习利器之自动微分(3) --- 示例解读

[源码解析]PyTorch如何实现前向传播(1) --- 基础类(上)

[源码解析]PyTorch如何实现前向传播(2) --- 基础类(下)

[源码解析] PyTorch如何实现前向传播(3) --- 具体实现

[源码解析] Pytorch 如何实现后向传播 (1)---- 调用引擎

[源码解析] Pytorch 如何实现后向传播 (2)---- 引擎静态结构

[源码解析] Pytorch 如何实现后向传播 (3)---- 引擎动态逻辑

[源码解析] PyTorch 如何实现后向传播 (4)---- 具体算法

[源码解析] PyTorch 分布式(1)------历史和概述

[源码解析] PyTorch 分布式(2) ----- DataParallel(上)

[源码解析] PyTorch 分布式(3) ----- DataParallel(下)

[源码解析] PyTorch 分布式(4)------分布式应用基础概念

[源码解析] PyTorch分布式(5) ------ DistributedDataParallel 总述&如何使用

[源码解析] PyTorch分布式(6) ---DistributedDataParallel -- 初始化&store

[源码解析] PyTorch 分布式(7) ----- DistributedDataParallel 之进程组

[源码解析] PyTorch 分布式(8) -------- DistributedDataParallel之论文篇

[源码解析] PyTorch 分布式(9) ----- DistributedDataParallel 之初始化

[源码解析] PyTorch 分布式(10)------DistributedDataParallel 之 Reducer静态架构

[源码解析] PyTorch 分布式(11) ----- DistributedDataParallel 之 构建Reducer和Join操作

[源码解析] PyTorch 分布式(12) ----- DistributedDataParallel 之 前向传播

[源码解析] PyTorch 分布式(13) ----- DistributedDataParallel 之 反向传播

[源码解析] PyTorch 分布式 Autograd (1) ---- 设计

[源码解析] PyTorch 分布式 Autograd (2) ---- RPC基础

[源码解析] PyTorch 分布式 Autograd (3) ---- 上下文相关

[源码解析] PyTorch 分布式 Autograd (4) ---- 如何切入引擎

[源码解析] PyTorch 分布式 Autograd (5) ---- 引擎(上)

[源码解析] PyTorch 分布式 Autograd (6) ---- 引擎(下)

[源码解析] PyTorch分布式优化器(1)----基石篇

[源码解析] PyTorch分布式优化器(2)----数据并行优化器

[源码解析] PyTorch分布式优化器(3)---- 模型并行

[源码解析] PyTorch 分布式(14) --使用 Distributed Autograd 和 Distributed Optimizer

注:本文没有完全按照原文顺序进行翻译,而是按照自己理解的思路重新组织了文章。

0x01 综述

本教程介绍了一个使用 PyTorch 的分布式 RPC 框架实现参数服务器的简单示例。参数服务器框架是一种范式,其中包括一组用来存储参数(例如大型嵌入表)的服务器,多个训练器查询参数服务器以检索最新的参数。这些训练器可以在本地运行一个训练循环,间或与参数服务器同步以获得最新的参数。有关参数服务器方法的更多信息,请查看https://www.cs.cmu.edu/~muli/file/parameter_server_osdi14.pdf。

我们将使用分布式 RPC 框架构建一个示例,其中多个trainer使用 RPC 与同一个参数服务器进行通信,并使用RRef访问远程参数服务器实例上的状态。每个trainer将通过使用分布式 autograd 跨多个节点拼接了一个 autograd 计算图,并且以分布式方式启动每个trainer自己的反向传播。

注意:本教程介绍了分布式 RPC 框架的使用,该框架可用于将模型拆分到多台机器上,或用于实现参数服务器训练策略(trainer获取托管在一个不同机器上的参数)。如果您正在寻找跨多个 GPU 复制模型进行数据并行训练,请参阅分布式数据并行教程

0x02 基础网络

我们首先要介绍一下基础网络。让我们从熟悉的开始:导入我们所需的模块并定义一个简单的 ConvNet,它将在 MNIST 数据集上进行训练。下面的网络主要来自pytorch/examples repo 中定义的网络。

# --------- MNIST Network to train, from pytorch/examples -----
class Net(nn.Module):
def __init__(self, num_gpus=0):
super(Net, self).__init__()
print(f"Using {num_gpus} GPUs to train")
self.num_gpus = num_gpus
device = torch.device(
"cuda:0" if torch.cuda.is_available() and self.num_gpus > 0 else "cpu")
print(f"Putting first 2 convs on {str(device)}")
# Put conv layers on the first cuda device
self.conv1 = nn.Conv2d(1, 32, 3, 1).to(device)
self.conv2 = nn.Conv2d(32, 64, 3, 1).to(device)
# Put rest of the network on the 2nd cuda device, if there is one
if "cuda" in str(device) and num_gpus > 1:
device = torch.device("cuda:1") print(f"Putting rest of layers on {str(device)}")
self.dropout1 = nn.Dropout2d(0.25).to(device)
self.dropout2 = nn.Dropout2d(0.5).to(device)
self.fc1 = nn.Linear(9216, 128).to(device)
self.fc2 = nn.Linear(128, 10).to(device) def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.max_pool2d(x, 2) x = self.dropout1(x)
x = torch.flatten(x, 1)
# Move tensor to next device if necessary
next_device = next(self.fc1.parameters()).device
x = x.to(next_device) x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output

0x03 辅助函数

接下来,让我们定义一些对我们脚本的其余部分有用的辅助函数。下面使用rpc_syncRRef来定义一个函数,该函数调用位于远程节点上的对象上的给定方法。我们由给定的rref参数生成远程对象的句柄,这样我们可以在拥有它的节点(rref.owner())上运行这个远程对象。在调用者节点上,我们通过使用 rpc_sync来同步运行此命令,这意味着我们将阻塞直到收到响应。

# --------- Helper Methods --------------------

# On the local node, call a method with first arg as the value held by the
# RRef. Other args are passed in as arguments to the function called.
# Useful for calling instance methods.
def call_method(method, rref, *args, **kwargs):
return method(rref.local_value(), *args, **kwargs) # Given an RRef, return the result of calling the passed in method on the value
# held by the RRef. This call is done on the remote node that owns
# the RRef. args and kwargs are passed into the method.
# Example: If the value held by the RRef is of type Foo, then
# remote_method(Foo.bar, rref, arg1, arg2) is equivalent to calling
# <foo_instance>.bar(arg1, arg2) on the remote node and getting the result
# back.
def remote_method(method, rref, *args, **kwargs):
args = [method, rref] + list(args)
return rpc.rpc_sync(rref.owner(), call_method, args=args, kwargs=kwargs)

0x04 启动

4.1 启动方式

要在本地运行此用例,需要在单独的终端窗口运行如下命令来启动master和worker:python rpc_parameter_server.py --world_size=WORLD_SIZE --rank=RANK

  • 对于 world size 大小为 2 的master节点,命令是:python rpc_parameter_server.py --world_size=2 --rank=0

  • 对于trainer,命令是:python rpc_parameter_server.py --world_size=2 --rank=1

请注意,本教程假设使用 0 到 2 个 GPU 进行训练,可以通过传递--num_gpus=N到训练脚本来配置此参数。当trainer和master在不同机器上运行时,您可以传入命令行参数--master_addr=ADDRESS--master_port=PORT来标明master worker 正在侦听的地址和端口。

4.2 启动脚本

首先,我们来看看启动参数服务器和训练器所需要的各种参数。

  • world_size对应于将参与训练的节点总数,是所有训练器和参数服务器的总和。
  • 我们还必须为每个单独的进程传递一个唯一的rank,该值从 0(在其中将运行一个参数服务器)到world_size - 1
  • master_addrmaster_port被用来标示 rank 0 进程,其将被各个节点用于发现彼此。
  • 要在本地测试此示例,只需传入localhost和相同的master_port到所有的实例即可。

请注意,出于演示目的,此示例仅支持 0-2 个 GPU,但可以扩展该模式以使用其他 GPU。

# --------- Launcher --------------------
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description="Parameter-Server RPC based training")
parser.add_argument(
"--world_size",
type=int,
default=4,
help="""Total number of participating processes. Should be the sum of
master node and all training nodes.""")
parser.add_argument(
"--rank",
type=int,
default=None,
help="Global rank of this process. Pass in 0 for master.")
parser.add_argument(
"--num_gpus",
type=int,
default=0,
help="""Number of GPUs to use for training, currently supports between 0
and 2 GPUs. Note that this argument will be passed to the parameter servers.""")
parser.add_argument(
"--master_addr",
type=str,
default="localhost",
help="""Address of master, will default to localhost if not provided.
Master must be able to accept network traffic on the address + port.""")
parser.add_argument(
"--master_port",
type=str,
default="29500",
help="""Port that master is listening on, will default to 29500 if not
provided. Master must be able to accept network traffic on the host and port.""") args = parser.parse_args()
assert args.rank is not None, "must provide rank argument."
assert args.num_gpus <= 3, f"Only 0-2 GPUs currently supported (got {args.num_gpus})."
os.environ['MASTER_ADDR'] = args.master_addr
os.environ['MASTER_PORT'] = args.master_port

现在,我们将根据命令行参数创建一个参数服务器进程或者或训练器进程。如果传入的rank为 0,我们将创建一个 ParameterServer,否则创建一个 TrainerNet

请注意,我们使用torch.multiprocessing启动与我们要执行的函数相对应的子进程,并在主线程使用p.join() 来等待进程结束。我们也使用PyTorch dataloaders 来生成从MNIST数据集加载数据的训练 data loaders 和测试 data loaders。

processes = []
world_size = args.world_size
if args.rank == 0:
# 这里是参数服务器
p = mp.Process(target=run_parameter_server, args=(0, world_size))
p.start()
processes.append(p)
else:
# 这里是trainer
# Get data to train on
train_loader = torch.utils.data.DataLoader(
datasets.MNIST('../data', train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])),
batch_size=32, shuffle=True)
test_loader = torch.utils.data.DataLoader(
datasets.MNIST('../data', train=False,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])),
batch_size=32, shuffle=True)
# start training worker on this node
p = mp.Process(
target=run_worker, # 启动trainer
args=(
args.rank,
world_size, args.num_gpus,
train_loader,
test_loader))
p.start()
processes.append(p) for p in processes:
p.join()

目前逻辑如下,我们假设一个master,一个 worker,其将会在自己进程之中运行不同的代码:

rpc_parameter_server.py      Master      +    Worker         rpc_parameter_server.py
+ | +
| | |
| rank == 0 | |
| | |
v | v
|
run_parameter_server | mp.Process(run_worker)
|
|
|
+

4.3 启动参数服务器

首先,我们将初始化我们的参数服务器。请注意,所有进程中只有一个参数服务器实例,所有训练器都将与同一个参数服务器对话并更新同一个存储模型。如 run_parameter_server中所见,服务器本身不执行任何独立行动,它只是等待来自训练器的请求,并通过运行请求的函数来响应它们。

代码中主要两步:为参数服务器初始化rpc 和 rpc.shutdown()。注意,这里没有真正初始化参数服务器

注意,rpc.shutdown()不会立即关闭参数服务器。相反,它将等待所有worker(在本例中为trainer)也调用rpc.shutdown()。这保证了在所有 trainer完成训练过程之前参数服务器不会宕机。

def run_parameter_server(rank, world_size):
# The parameter server just acts as a host for the model and responds to
# requests from trainers, hence it does not need to run a loop.
# rpc.shutdown() will wait for all workers to complete by default, which
# in this case means that the parameter server will wait for all trainers
# to complete, and then exit.
print("PS master initializing RPC")
rpc.init_rpc(name="parameter_server", rank=rank, world_size=world_size)
print("RPC initialized! Running parameter server...")
rpc.shutdown() # 保证不会宕机
print("RPC shutdown on parameter server.")

逻辑拓展为:

          rpc_parameter_server.py      Master      +    Worker    rpc_parameter_server.py
+ | +
| | |
| rank == 0 | |
| | |
v | v
|
run_parameter_server | mp.Process(run_worker)
+ |
| |
| |
v |
|
rpc.init_rpc("parameter_server", rank, world_size) |
|
+

4.4 启动worker

run_worker 函数其内部逻辑也是启动 rpc,然后进入了主循环。

# Main loop for trainers.
def run_worker(rank, world_size, num_gpus, train_loader, test_loader):
print(f"Worker rank {rank} initializing RPC")
rpc.init_rpc(
name=f"trainer_{rank}",
rank=rank,
world_size=world_size) print(f"Worker {rank} done initializing RPC") run_training_loop(rank, num_gpus, train_loader, test_loader)
rpc.shutdown()

逻辑拓展为:

 rpc_parameter_server.py      Master      +    Worker    rpc_parameter_server.py
+ | +
| | |
| rank == 0 | |
| | |
v | v
|
run_parameter_server | mp.Process(run_worker)
+ | +
| | |
| | |
v | |
| v
rpc.init_rpc("parameter_server" | run_worker
,rank,world_size) | +
| |
| |
| |
| v
| rpc.init_rpc(f"trainer_{rank}"
| ,rank,world_size)
| +
| |
| |
| v
+ run_training_loop

4.5 建立参数服务器

但是目前代码没有建立参数服务器,get_parameter_server 这里才是构建参数服务器的内容。

param_server = None
global_lock = Lock() def get_parameter_server(num_gpus=0):
global param_server
# Ensure that we get only one handle to the ParameterServer.
with global_lock:
if not param_server:
# construct it once
param_server = ParameterServer(num_gpus=num_gpus)
return param_server

究竟什么地方建立了参数服务器?其实是在 worker 主循环之中,TrainerNet 初始化生成的,后续会讲到。

0x05 TrainerNet

接下来,我们将定义我们的TrainerNet类。这也是nn.Module 的子类,我们的__init__方法将使用rpc.remoteAPI 来获取到我们的参数服务器的 RRef 或远程引用。请注意,这里我们没有将参数服务器复制到我们的本地进程,相反,我们可以将self.param_server_rref视为指向位于另一个独立进程上的参数服务器的分布式共享指针

注意,TrainerNet是为了训练用,不是业务函数,和前面的 class Net(nn.Module) 一定要区分清楚。TrainerNet 只是为了整体代码逻辑需要而实现的一个中转站或者adapter,TrainNet 拥有一个指向 ParameterServer 的 param_server_rref,所以通过 TrainerNet 就能使用 ParameterServer。

5.1 总体代码

# --------- Trainers --------------------

# nn.Module corresponding to the network trained by this trainer. The
# forward() method simply invokes the network on the given parameter
# server.
class TrainerNet(nn.Module):
def __init__(self, num_gpus=0):
super().__init__()
self.num_gpus = num_gpus
self.param_server_rref = rpc.remote(
"parameter_server", get_parameter_server, args=(num_gpus,)) # 生成参数服务器 def get_global_param_rrefs(self): # 此函数后续会用到
remote_params = remote_method(
ParameterServer.get_param_rrefs,
self.param_server_rref)
return remote_params def forward(self, x):
model_output = remote_method(
ParameterServer.forward, self.param_server_rref, x)
return model_output

5.2 生成参数服务器

初始化方法之中通过如下代码生成参数服务器。

def __init__(self, num_gpus=0):
super().__init__()
self.num_gpus = num_gpus
self.param_server_rref = rpc.remote(
"parameter_server", get_parameter_server, args=(num_gpus,))

就调用到了之前提到的 get_parameter_server,正式构建了参数服务器,注意,这里是在 worker 调用 get_parameter_server,但是 get_parameter_server 在 master 之上运行,在 master 之上建立参数服务器。

此时逻辑拓展如下:

 rpc_parameter_server.py      Master      +    Worker    rpc_parameter_server.py
+ | +
| | |
| rank == 0 | |
| | |
v | v
|
run_parameter_server | mp.Process(run_worker)
+ | +
| | |
| | |
v | |
| v
rpc.init_rpc("parameter_server" | run_worker
,rank,world_size) | +
+ | |
| | |
| | |
| | v
| | rpc.init_rpc(f"trainer_{rank}"
| | ,rank,world_size)
| | +
| | |
| | |
| | v
| | run_training_loop
| | +
| | |
| | |
| | v
| | net = TrainerNet(num_gpus=num_gpus)
| | +
| | |
| | |
v | v
get_parameter_server <-------------------+ self.param_server_rref = rpc.remote(
+ | "parameter_server",
| | get_parameter_server,
| | args=(num_gpus,))
v |
ParameterServer |
+

到目前为止,我们已经完成了初始化阶段,下面就看看具体运行阶段。

5.3 建立rref

在 TrainerNet 之中,还有一个get_global_param_rrefs 方法, 我们随后会介绍如何使用。但是这里会先分析一下这个方法。

为何要提供这个方法?我们有必要通读DistributedOptimizer的文档,特别是 API 签名。我们优化器需要优化一些远程参数,但是如何在本地构建优化器时候传入这些参数?这些远程参数在本地就是 RRef,所以在本地构建优化器时候,我们就传递这个代表远端参数的RRef列表。

由于与给定TrainerNet交互的唯一远程节点是 ParameterServer,所以我们只需要在 ParameterServer上调用 一个 remote_method 。于是,我们使用在ParameterServer类中定义的get_param_rrefs方法。这个方法会返回一个RRefs的列表,这个列表指向需要优化的参数

请注意,在这种情况下,我们TrainerNet没有定义自己的参数;如果定义了它自己的参数(需要优化),我们也需要将每个参数包装在一个RRef之中,并将其包含在 DistributedOptimizer 输入中。

class TrainerNet(nn.Module):
...
def get_global_param_rrefs(self):
remote_params = remote_method(
ParameterServer.get_param_rrefs,
self.param_server_rref)
return remote_params

5.4 前向函数

我们再看看forward方法,该方法将调用(同步)RPC 来运行定义在ParameterServer之上的模型网络的前向传播。我们传入了self.param_server_rref,它是ParameterServer的一个远端handle,用来运行RPC。调用forward将向正在运行ParameterServer的节点发送一个 RPC ,以调用参数服务器的forward函数,并返回对应于模型输出的结果Tensor

class TrainerNet(nn.Module):
...
def forward(self, x):
model_output = remote_method(
ParameterServer.forward, self.param_server_rref, x)
return model_output

0x06 参数服务器

我们接下来看看参数服务器。

6.1 总体代码

参数服务器总体代码如下:

# --------- Parameter Server --------------------
class ParameterServer(nn.Module):
def __init__(self, num_gpus=0):
super().__init__()
model = Net(num_gpus=num_gpus)
self.model = model
self.input_device = torch.device(
"cuda:0" if torch.cuda.is_available() and num_gpus > 0 else "cpu") def forward(self, inp):
inp = inp.to(self.input_device)
out = self.model(inp)
# This output is forwarded over RPC, which as of 1.5.0 only accepts CPU tensors.
# Tensors must be moved in and out of GPU memory due to this.
out = out.to("cpu")
return out # Use dist autograd to retrieve gradients accumulated for this model.
# Primarily used for verification.
def get_dist_gradients(self, cid):
grads = dist_autograd.get_gradients(cid)
# This output is forwarded over RPC, which as of 1.5.0 only accepts CPU tensors.
# Tensors must be moved in and out of GPU memory due to this.
cpu_grads = {}
for k, v in grads.items():
k_cpu, v_cpu = k.to("cpu"), v.to("cpu")
cpu_grads[k_cpu] = v_cpu
return cpu_grads # Wrap local parameters in a RRef. Needed for building the
# DistributedOptimizer which optimizes parameters remotely.
def get_param_rrefs(self):
param_rrefs = [rpc.RRef(param) for param in self.model.parameters()]
return param_rrefs

6.2 初始化

前面提到了,参数服务器的正式初始化是在 TrainerNet 之中完成的,我们接下来看看如何初始化。

参数服务器是nn.Module的派生类,其保存上面定义的模型网络的句柄。这里就是用前面介绍的业务模型网络 class Net(nn.Module) 来生成了内部的 model 成员变量。 参数服务器后续就是用这个来进行具体参数处理。我们还将保存一个输入设备,在调用模型之前,我们的输入需要传输到这个设备之上。

# --------- Parameter Server --------------------
class ParameterServer(nn.Module):
def __init__(self, num_gpus=0):
super().__init__()
model = Net(num_gpus=num_gpus) # 我们的业务网络模型
self.model = model # self.model就是handle
self.input_device = torch.device( # 输入设备
"cuda:0" if torch.cuda.is_available() and num_gpus > 0 else "cpu")

6.3 前向函数

接下来,我们将定义前向传播函数。请注意,无论模型输出的设备如何,我们都将输出移动到 CPU,因为分布式 RPC 框架目前仅支持通过 RPC 发送 CPU 张量。我们刻意禁止通过 RPC 发送 CUDA 张量,因为调用方/被调用方可能使用不同的设备(CPU/GPU),可能会在未来版本中支持 RPC 发送 CUDA 张量。

class ParameterServer(nn.Module):
...
def forward(self, inp):
inp = inp.to(self.input_device)
out = self.model(inp)
# This output is forwarded over RPC, which as of 1.5.0 only accepts CPU tensors.
# Tensors must be moved in and out of GPU memory due to this.
out = out.to("cpu")
return out

6.4 杂项函数

接下来,我们将定义一些对训练和验证有用的杂项函数。

  • get_dist_gradients 将接收一个分布式 Autograd 上下文 ID 并调用dist_autograd.get_gradients以检索由分布式 autograd 计算的梯度。更多信息可以在分布式 autograd 文档中找到。请注意,我们还遍历生成的字典并将每个张量转换为 CPU 张量,因为该框架目前仅支持通过 RPC 发送张量。
  • get_param_rrefs将遍历我们的模型参数并将它们包装为(本地)RRef。该方法将由 trainer 节点通过 RPC 调用,并将返回要分布式优化的参数列表。这参数列表将作为分布式优化器的输入,因此,参数服务器必须把必须优化的所有参数转换为RRefs列表。对应代码就是获取 Net 的参数,最终返回给 worker 端的 DistributedOptimizer。
# Use dist autograd to retrieve gradients accumulated for this model.
# Primarily used for verification.
def get_dist_gradients(self, cid):
grads = dist_autograd.get_gradients(cid)
# This output is forwarded over RPC, which as of 1.5.0 only accepts CPU tensors.
# Tensors must be moved in and out of GPU memory due to this.
cpu_grads = {}
for k, v in grads.items():
k_cpu, v_cpu = k.to("cpu"), v.to("cpu")
cpu_grads[k_cpu] = v_cpu
return cpu_grads # Wrap local parameters in a RRef. Needed for building the
# DistributedOptimizer which optimizes paramters remotely.
def get_param_rrefs(self):
param_rrefs = [rpc.RRef(param) for param in self.model.parameters()]
return param_rrefs

6.5 逻辑关系

我们需要一个逻辑关系图来梳理一下:

  1. 生成 DistributedOptimizer 的时候,调用 TrainerNet 的 get_global_param_rrefs 方法来获取需要分布式优化的参数。
  2. TrainerNet 调用 ParameterServer 的 get_param_rrefs 方法来取参数服务器获取。
  3. ParameterServer 调用 Net 的 parameters() 方法获取最终需要的参数。
  4. 这些参数原路返回,最终给了 DistributedOptimizer,DistributedOptimizer 以后就是优化这些参数。
                        Master             +      Worker
|
|
+--------------------+ | +----------------------------------------+
| ParameterServer | | | run_training_loop |
| | 4 | | +-------------------------+ |
| | +--------------------------> | TrainerNet | |
| | | | | | |
| | <---------------------------+ | | |
| model | 2 get_param_rrefs | | | | |
| ^ + | | | | | |
| | | | | | | | |
| | | | | | +---+----+----------------+ |
+--------------------+ | | | ^ |
| | | | | | |
| | | | 4 | 1 | get_global_param_rrefs |
4 | 3 | model.parameters() | | | | |
| | | | v | |
| v | | +---+----+----------------+ |
+--------+---+-------+ | | | DistributedOptimizer | |
| Net | | | | | |
| | | | | | |
| | | | | | |
| | | | +-------------------------+ |
+--------------------+ | +----------------------------------------+
+

0x07 worker 主循环

现在,初始化完毕,参数服务器也分析完毕,我们接下来看看 worker 主循环,它将创建我们的网络和优化器,通过网络运行一些输入并计算损失。训练循环看起来很像本地训练程序,但由于我们的模型网络是分布式的,所以做了一些修改。

7.1 总体代码

在主循环之中,我们初始化"TrainerNet"并构建"DistributedOptimizer"。

请注意,如上所述,我们必须传入所有的需要优化的全局(跨参与分布式训练的所有节点)参数。此外,我们传入要使用的本地优化器,在本例中为SGD。另外,我们可以使用与创建本地优化器相同的方式来配置底层优化器算法。例如,我们可以传入一个自定义学习率,其将用作所有本地优化器的学习率。

worker 主循环 run_training_loop 代码如下,其中 model_output = net(data) 会调用 TrainerNet 的forward 方法。

def run_training_loop(rank, num_gpus, train_loader, test_loader):
# Runs the typical neural network forward + backward + optimizer step, but
# in a distributed fashion.
net = TrainerNet(num_gpus=num_gpus)
# Build DistributedOptimizer.
param_rrefs = net.get_global_param_rrefs()
opt = DistributedOptimizer(optim.SGD, param_rrefs, lr=0.03)
for i, (data, target) in enumerate(train_loader):
with dist_autograd.context() as cid:
# 1. 调用 TrainerNet 的 forward
model_output = net(data)
target = target.to(model_output.device)
# 2. 计算损失
loss = F.nll_loss(model_output, target)
if i % 5 == 0:
print(f"Rank {rank} training batch {i} loss {loss.item()}")
# 3. 反向传播
dist_autograd.backward(cid, [loss])
# Ensure that dist autograd ran successfully and gradients were
# returned.
# 4. 进行验证
assert remote_method(
ParameterServer.get_dist_gradients,
net.param_server_rref,
cid) != {}
opt.step(cid) print("Training complete!")
print("Getting accuracy....")
get_accuracy(test_loader, net)

7.2 训练

我们需要对 run_training_loop 之中具体的训练代码再做一下分析,就是主训练循环。

我们通过PyTorch的DataLoader给出的iterables进行循环。在编写典型的前向/后向/优化器循环之前,我们首先将逻辑包装在Distributed Autograd context之中, 请注意,需要记录在模型的前向传播中调用的RPC,以便可以构造一个适当的计算图,其包括后向传播中所有参与的分布式工作进程。Distributed Autograd context 将返回一个"context_id",该id是一个标识符,用来标示与特定迭代对应的梯度累积和优化。

与调用典型的loss.backward()来启动本地工作进程的向后传播不同,我们在上下文中调用dist_autograd.backward() ,调用时传入 loss 和 context_id,这是我们希望向后传播开始的根。此外,我们将这个"context_id"传递到优化器调用中,因为优化器调用需要能够在所有节点上查找这个特定向后传递计算的相应梯度。

for i, (data, target) in enumerate(train_loader):
with dist_autograd.context() as cid:
# 1. 调用 TrainerNet 的 forward
model_output = net(data)
target = target.to(model_output.device)
# 2. 计算损失
loss = F.nll_loss(model_output, target)
if i % 5 == 0:
print(f"Rank {rank} training batch {i} loss {loss.item()}")
# 3. 反向传播
dist_autograd.backward(cid, [loss])
# Ensure that dist autograd ran successfully and gradients were
# returned.
# 4. 进行验证
assert remote_method(
ParameterServer.get_dist_gradients,
net.param_server_rref,
cid) != {}
# 5. 更新
opt.step(cid)

我们拓展前面的逻辑(忽略初始化部分),下面图上的数值对应训练代码注释中的数值

目前总体思路如下:

  • Master 之上运行了 ParameterServer,其包含了Net 这个业务模型。
  • Worker 之上运行了 trainer,其包含了 TrainerNet,TrainerNet 只是为了整体代码逻辑需要而实现的一个中转站或者adapter,TrainNet 拥有一个指向 ParameterServer 的 param_server_rref,所以通过 TrainerNet 就能使用 ParameterServer。
  • 具体 Forward 操作流程是: TrainerNet.forward --> ParameterServer.forward ---> Net.forward()。
  • 具体 Backward 是被dist_autograd.backward 调用 dist.autograd 引擎自动完成的。
  • 优化器更新则是DistributedOptimizer自动完成。

我们拓展前面的逻辑(忽略初始化部分)图如下,下面图上的数值对应训练代码注释中的数值

             Master      +    Worker
|
ParameterServer | run_training_loop TrainerNet
+ | + +
| | | |
| | v |
| | net = TrainerNet |
| | + |
| | | |
+ | | |
model = Net(num_gpus) | v |
+ | param_server_rref = rpc.remote( |
| | "parameter_server", |
| | get_parameter_server,) |
| | + |
| | | |
| | | |
| | opt = DistributedOptimizer(param_rrefs) |
| | + |
| | | |
| | | |
| | v |
| | model_output = net(data) |
| | + |
| | | 1 |
| | +----------------------------> |
| | | +
| | | ParameterServer.forward
| | | +
| | | 2 |
| <--------------------------------------------------------------+
+ | | |
forward | | |
+ | | |
| | | |
+ | | |
out = self.model(inp) | | |
+ | | |
| return out | 3 | |
+------------------------------> | |
| | | |
| | + |
| | F.nll_loss |
| | + |
| | dist_autograd.backward |
| 4 | + |
| <----------------------get_dist_gradients |
| | + |
| | | |
| 5 | + |
| <-----------------------+ opt.step |
| | | |
| | | |
v + v v

7.3 准确性

下面是在我们完成训练后计算模型的精度,方法很像传统的局部模型。但是请注意,我们传递到这个函数的网络是TraineNet的一个实例,因此前向传播将以透明的方式调用RPC。

def get_accuracy(test_loader, model):
model.eval()
correct_sum = 0
# Use GPU to evaluate if possible
device = torch.device("cuda:0" if model.num_gpus > 0
and torch.cuda.is_available() else "cpu")
with torch.no_grad():
for i, (data, target) in enumerate(test_loader):
out = model(data, -1)
pred = out.argmax(dim=1, keepdim=True)
pred, target = pred.to(device), target.to(device)
correct = pred.eq(target.view_as(pred)).sum().item()
correct_sum += correct print(f"Accuracy {correct_sum / len(test_loader.dataset)}")

0x08 总结

我们之前介绍过参数服务器的经典实现 ps-lite,现在通过前文和本文的学习,大家可以看到不同于ps-lite的思路。

  • ps-lite 是类似传统服务器实现,有自己主动的业务循环,可以响应用户的显式请求,也有自己明确的逻辑,本地也有自己的KV存储。
  • PyTorch 这两篇官方文档之中,参数服务器则是另外一种思路,其上没有主动的循环,没有KV存储,没有服务器逻辑,而是可以直接存储业务模型,业务驱动由trainer完成。

具体如何实现就要看使用者自己业务需求了。

ps-lite 文章链接如下,大家有兴趣可以对比一下。

[源码解析] 机器学习参数服务器ps-lite (1) ----- PostOffice

[源码解析] 机器学习参数服务器ps-lite(2) ----- 通信模块Van

[源码解析] 机器学习参数服务器ps-lite 之(3) ----- 代理人Customer

[源码解析]机器学习参数服务器ps-lite(4) ----- 应用节点实现

0xFF 参考

https://pytorch.apachecn.org/docs/1.7/65.html

https://pytorch.org/tutorials/intermediate/rpc_param_server_tutorial.html

上一篇:[源码解析] PyTorch 分布式(18) --- 使用 RPC 的分布式管道并行


下一篇:CRM总结