BytePS源码解析

 

# 引入BytePS
import byteps.torch as bps

 

# 初始化BytePS
bps.init()

 

# GPU和本地调用进程的编号绑定。然后GPU和进程一一对应。
torch.cuda.set_device(bps.local_rank())

 

local_rank:
"""A function that returns the local BytePS rank of the calling process, within the
   node that it is running on. For example, if there are seven processes running
   on a node, their local ranks will be zero through six, inclusive.
   Returns:
     An integer scalar with the local BytePS rank of the calling process.
"""

 

# 在push和pull过程中,把32位梯度压缩成16位。(注:精度损失问题怎么解决的?)
compression = bps.Compression.fp16 if args.fp16_pushpull else bps.Compression.none

 

compression:
"""Optional gradient compression algorithm used during push_pull."""
"""Compress all floating point gradients to 16-bit."""
注:梯度压缩是这个意思?

 

# 
optimizer = bps.DistributedOptimizer(optimizer, named_parameters=model.named_parameters(), compression=compression)

 

#
bps.broadcast_parameters(model.state_dict(), root_rank=0)

 

root_rank:
The rank of the process from which parameters will be broadcasted to all other processes.
注:这里的root_rank是本地的还是全局的?本地的,通常是0号进程。

 

push_pull_async:
"""
    A function that performs asynchronous averaging or summation of the input tensor
    over all the BytePS processes. The input tensor is not modified.
    The reduction operation is keyed by the name. If name is not provided, an incremented
    auto-generated name is used. The tensor type and shape must be the same on all
    BytePS processes for a given name. The reduction will not start until all processes
    are ready to send and receive the tensor.
    Arguments:
        tensor: A tensor to average or sum.
        average: A flag indicating whether to compute average or summation,
                 defaults to average.
        name: A name of the reduction operation.
    Returns:
        A handle to the push_pull operation that can be used with `poll()` or
        `synchronize()`.
"""

 

#
bps.broadcast_optimizer_state(optimizer, root_rank=0)

 

Python通过ctypes函数库调用C/C++。

节点之间的通信格式是key-value。

一个节点中,只有0号进程才参与网络通信。 

scheduler和server都是直接用MXNet代码,没用BytePS。

worker之间没有通信,server之间也没有通信。(注:李沐论文中说的Parameter Server之间有通信,是为了备份容错。)

BytePS源码解析

BytePS源码解析

BytePS源码解析

 

rank:
# A function that returns the BytePS rank of the calling process.
注:全局进程编号,通常用于控制日志打印。

 

size:
# A function that returns the number of BytePS processes.

 

local_size:
# A function that returns the number of BytePS processes within the node the current process is running on.

 

"""
    An optimizer that wraps another torch.optim.Optimizer, using an push_pull to
    average gradient values before applying gradients to model weights.
    push_pull operations are executed after each gradient is computed by `loss.backward()`
    in parallel with each other. The `step()` method ensures that all push_pull operations are
    finished before applying gradients to the model.
    DistributedOptimizer exposes the `synchronize()` method, which forces push_pull operations
    to finish before continuing the execution. It's useful in conjunction with gradient
    clipping, or other operations that modify gradients in place before `step()` is executed.
    Example of gradient clipping:
    ```
    output = model(data)
    loss = F.nll_loss(output, target)
    loss.backward()
    optimizer.synchronize()
    torch.nn.utils.clip_grad_norm(model.parameters(), args.clip)
    optimizer.step()
    ```
    Arguments:
        optimizer: Optimizer to use for computing gradients and applying updates.
        named_parameters: A mapping between parameter names and values. Used for naming of
                          push_pull operations. Typically just `model.named_parameters()`.
        compression: Compression algorithm used during push_pull to reduce the amount
                     of data sent during the each parameter update step.  Defaults to
                     not using compression.
        backward_passes_per_step: Number of expected backward passes to perform
                                  before calling step()/synchronize(). This
                                  allows accumulating gradients over multiple
                                  mini-batches before executing averaging and
                                  applying them.
    """
    # We dynamically create a new class that inherits from the optimizer that was passed in.
    # The goal is to override the `step()` method with an push_pull implementation.

 

communicator.h和communicator.cc:通信,如BytePSCommSocket。(注:为啥不是直接用pslite,而是用socket?)

global.h和global.cc:如全局初始化BytePSGlobal::Init,worker的pslite单例BytePSGlobal::GetPS。

core_loops.h和core_loops.cc:事件循环,如PushLoop和PullLoop,处理队列中的任务。

logging.h和logging.cc:

nccl_manager.h和nccl_manager.cc:

operations.h和operations.cc:

ready_table.h和ready_table.cc:

schedule_queue.h和schedule_queue.cc:

shared_memory.h和shared_memory.cc:共享内存。

ops.h和ops.cc:如DoPushPull。

adapter.h和adapter.cc:C++和Python的张量数据类型适配。 

 

// Total key space is 0 to 2^64 - 1
// It will be divided to N PS servers, for now we assume N <= 2^16

 

ps::KVWorker,继承SimpleApp,用于向server Push,或者从server Pull key-value数据,还有Wait函数。

ps is_recovery,节点是不是恢复的。(注:有可能中途断掉过?)

ps::Postoffice,全局管理单例。

ps::StartAsync,异步初始化节点。

ZPush/ZPull:zero-copy Push/Pull, This function is similar to Push except that all data will not be copied into system for better performance. It is the caller's responsibility to keep the content to be not changed before actually finished.

 

ADD_NODE

BARRIER

 

Tensor Partition

张量划分,可以让多个server并行分担计算和网络带宽,同时有利于异步pipeline。 

_name_to_cxt:哈希表,保存初始化过的张量(能用于PS通信)。

declared_key:初始化过的张量的编号,从0递增。 

GetPartitionBound:张量划分的单块字节数。

key = declared_key * 2^16 + part_num。

BytePS源码解析

 

 

参考链接

https://github.com/bytedance/byteps

https://pytorch.org/docs/stable/distributed.html

https://www.cs.cmu.edu/~muli/file/parameter_server_nips14.pdf

https://mxnet.incubator.apache.org/versions/master/faq/distributed_training.html

 

上一篇:TensorFlow Object Detection API中的Faster R-CNN /SSD模型参数调整


下一篇:optimizer.zero_grad()