【PyTorch与深度学习】5、深入剖析PyTorch DataLoader源码

课程地址
最近做实验发现自己还是基础框架上掌握得不好,于是开始重学一遍PyTorch框架,这个是课程笔记,此节课很详细,笔记记的比较粗

1. DataLoader

1.1 DataLoader类实现

1.1.1 构造函数__init__实现

构造函数有如下参数:

  • dataset:传入自己定义好的数据集类Dataset
  • batch_size:默认值为1,它代表着每批次训练的样本的个数
  • shuffle:布尔类型,True为打乱数据集,False为不打乱数据集
  • sampler:决定以何种方式对数据进行采样,可以不用shuffle随机打乱样本,可以用自己编写的函数去决定如何取样本,比如:你想让你的样本以一种有序的方式来组织成mini-batch,比如把长度比较接近的样本放入到一个mini-batch中,这个时候就不能用shuffle,因为一打乱,这些样本的长度就是乱的。如果传入该参数,则shuffle就没有意义。
  • batch_sampler:可以用自己编写的函数成批次地取样本。如果传入该参数,则shuffle就没有意义。
  • num_workers:默认值为0,它是指数据加载的子进程数量,以加快数据加载的速度,提高训练效率。一般数值设定取决于CPU的核心数,通常数字大到一定程度,其加载速度也不会再提高了。
  • collate_fn:聚集函数,它是对一个批次batch进行后处理,比如:我们通过shuffle打乱后得到一个批次batch,然后对这个batch我们希望对它进行一个pad,但是这个pad的长度只能通过batch去算出来,而不是预先能计算出长度,这个时候我们就要用到collate_fn参数,对之前的shuffle后的mini-batch再处理一下,把这个批次batch给它pad成一样的长度,然后再返回一个新的批次batch。

【注】在深度学习和自然语言处理(NLP)等领域中,pad(填充)是一个常见的预处理步骤,特别是在处理变长序列(如文本、时间序列等)时。当使用DataLoader从数据集中批量提取数据时,如果每个数据项(例如,句子或时间序列)的长度不同,那么为了能够在同一批次中进行高效计算(例如,通过矩阵运算),我们通常需要将这些数据项填充(或截断)到相同的长度。
这就是collate_fn参数发挥作用的地方。默认情况下,DataLoader使用了一个内置的collate_fn来将一批数据项组合成一个张量(tensor),但这个默认函数并不进行填充。为了进行填充,你需要提供一个自定义的collate_fn。

  • pin_memory:布尔类型,默认值为False,用于指定是否将数据加载到固定的内存区域(pinned memory)中。固定内存区域是指一块被操作系统锁定的内存,这样可以防止它被移动,从而提高数据传输的效率。当pin_memory参数设置为True时,PyTorch会尝试将从数据集加载的数据存储在固定的内存中,这对于GPU加速的情况下可以提高数据传输效率,因为GPU可以直接从固定内存中访问数据,而不需要进行额外的内存拷贝操作。需要注意的是,只有当你使用GPU进行训练时,才会考虑使用pin_memory参数。对于CPU训练来说,pin_memory参数的影响通常不太明显。而且这个东西对训练速度的影响还有待考究。
  • drop_last:布尔类型,默认为False,如果你的总样本数目不是每个批次batch的整数倍的话,这时候我们可以将drop_last设置为True,让最后那个小批次(样本数没达到batch-size的批次)丢掉。

构造函数的具体代码和注释如下:

    def __init__(self, dataset: Dataset[T_co], batch_size: Optional[int] = 1,
                 shuffle: bool = False, sampler: Union[Sampler, Iterable, None] = None,
                 batch_sampler: Union[Sampler[Sequence], Iterable[Sequence], None] = None,
                 num_workers: int = 0, collate_fn: Optional[_collate_fn_t] = None,
                 pin_memory: bool = False, drop_last: bool = False,
                 timeout: float = 0, worker_init_fn: Optional[_worker_init_fn_t] = None,
                 multiprocessing_context=None, generator=None,
                 *, prefetch_factor: int = 2,
                 persistent_workers: bool = False):
        torch._C._log_api_usage_once("python.data_loader")

        if num_workers < 0:
            raise ValueError('num_workers option should be non-negative; '
                             'use num_workers=0 to disable multiprocessing.')

        if timeout < 0:
            raise ValueError('timeout option should be non-negative')

        if num_workers == 0 and prefetch_factor != 2:
            raise ValueError('prefetch_factor option could only be specified in multiprocessing.'
                             'let num_workers > 0 to enable multiprocessing.')
        assert prefetch_factor > 0

        if persistent_workers and num_workers == 0:
            raise ValueError('persistent_workers option needs num_workers > 0')
		# 设置成员函数
        self.dataset = dataset
        self.num_workers = num_workers
        self.prefetch_factor = prefetch_factor
        self.pin_memory = pin_memory
        self.timeout = timeout
        self.worker_init_fn = worker_init_fn
        self.multiprocessing_context = multiprocessing_context

        # 这里不用看,一般我们都是用Dataset类,而不是IterableDataset,所以直接看这个if条件后面对应的else条件
        if isinstance(dataset, IterableDataset):
            self._dataset_kind = _DatasetKind.Iterable
            
            if isinstance(dataset, IterDataPipe):
                torch.utils.data.graph_settings.apply_shuffle_settings(dataset, shuffle=shuffle)
            elif shuffle is not False:
                raise ValueError(
                    "DataLoader with IterableDataset: expected unspecified "
                    "shuffle option, but got shuffle={}".format(shuffle))

            if sampler is not None:
                # See NOTE [ Custom Samplers and IterableDataset ]
                raise ValueError(
                    "DataLoader with IterableDataset: expected unspecified "
                    "sampler option, but got sampler={}".format(sampler))
            elif batch_sampler is not None:
                # See NOTE [ Custom Samplers and IterableDataset ]
                raise ValueError(
                    "DataLoader with IterableDataset: expected unspecified "
                    "batch_sampler option, but got batch_sampler={}".format(batch_sampler))
        # 直接跳到else条件
        else:
        	# 设置数据集的种类是DatasetKind.Map类型
            self._dataset_kind = _DatasetKind.Map


		# 如果你设置了sampler(默认为None),如果你传入了自定义的sampler且shuffle设置为True的话,这种情况是没有意义的,shuffle是官方提供的一种随机采用党的sampler,你都自定义sampler了,就不需要shuffle来随机打乱。所以shuffle和sampler是互斥的,不能同时去设置
        if sampler is not None and shuffle:
            raise ValueError('sampler option is mutually exclusive with '
                             'shuffle')
        # batch_sampler是批次级别的采样,sampler是样本级的采样,
        if batch_sampler is not None:
            # 如果你设置了batch_size不是1,或者你设置了shuffle或者你设置了sampler,或者你设置了drop_last,这些都与batch_sampler是互斥的,总结一句话就是:你只要设置了batch_sampler就不需要设置batch_size了,因为你设置了batch_sampler就已经告诉PyTorch框架你的batch_size和以什么样的方式去构成mini-batch
            if batch_size != 1 or shuffle or sampler is not None or drop_last:
                raise ValueError('batch_sampler option is mutually exclusive '
                                 'with batch_size, shuffle, sampler, and '
                                 'drop_last')
            batch_size = None
            drop_last = False
        # 如果batch_size是None,同时如果有drop_last,这时候会报错
        elif batch_size is None:
            # no auto_collation
            if drop_last:
                raise ValueError('batch_size=None option disables auto-batching '
                                 'and is mutually exclusive with drop_last')
		# 如果你没有设置sampler的话
        if sampler is None:  # give default samplers
            if self._dataset_kind == _DatasetKind.Iterable:
                # See NOTE [ Custom Samplers and IterableDataset ]
                sampler = _InfiniteConstantSampler()
            else:  # map-style(常用的),如果你设置了shuffle的话,它就会用内置的一个叫random sample的类来去对我们这个Dataset进行一个随机的打乱。具体实现在下面的章节
                if shuffle:
                    sampler = RandomSampler(dataset, generator=generator)  # type: ignore[arg-type]
                # 如果没有设置shuffle为True的话,它就用SequentialSampler即按原本的顺序来采样
                else:
                    sampler = SequentialSampler(dataset)  # type: ignore[arg-type]
		# 如果你的batch_size不是None并且batch_sampler也不是None
		# 它就默认给你构造一个batch_sampler
		# BatchSampler源码实现见下面的章节
        if batch_size is not None and batch_sampler is None:
            # auto_collation without custom batch_sampler
            batch_sampler = BatchSampler(sampler, batch_size, drop_last)

        self.batch_size = batch_size
        self.drop_last = drop_last
        self.sampler = sampler
        self.batch_sampler = batch_sampler
        self.generator = generator
		# 如果collate_fn参数为None,则如果设置了auto_collatoion,就调用默认的default_collate
        if collate_fn is None:
        	# _auto_collation是根据batch_sampler是否为None来去设置的,如果batch_sampler不是None,_auto_collation设置为True,如果batch_sampler是None的话,它就会调用_utils.collate.default_convert这个函数,否则调用_utils.collate.default_collate函数。
        	# _utils.collate.default_collate函数是以batch作为输入,它相当于什么都没做,最后返回了个batch,如果自己要实现这个collate_fn,要以batch做输入,然后再做处理。
            if self._auto_collation:
                collate_fn = _utils.collate.default_collate
            else:
                collate_fn = _utils.collate.default_convert

        self.collate_fn = collate_fn
        self.persistent_workers = persistent_workers

        self.__initialized = True
        self._IterableDataset_len_called = None  # See NOTE [ IterableDataset and __len__ ]

        self._iterator = None

        self.check_worker_number_rationality()

        torch.set_vital('Dataloader', 'enabled', 'True')  # type: ignore[attr-defined]

1.1.2 _get_iterator函数

    def _get_iterator(self) -> '_BaseDataLoaderIter':
    	# 如果设置num_workers为0的话,它就走单个样本处理过程
        if self.num_workers == 0:
            return _SingleProcessDataLoaderIter(self)
        else:
        # 如果num_workers不为0,说明是多进程读取样本
            self.check_worker_number_rationality()
            return _MultiProcessingDataLoaderIter(self)

一般迭代用,是在__iter__方法中实现的,使得DataLoader能变成一个可迭代的对象。

1.2 RandomSampler 类的实现

重点看中文注释

class RandomSampler(Sampler[int]):
    r"""Samples elements randomly. If without replacement, then sample from a shuffled dataset.
    If with replacement, then user can specify :attr:`num_samples` to draw.

    Args:
        data_source (Dataset): dataset to sample from
        replacement (bool): samples are drawn on-demand with replacement if ``True``, default=``False``
        num_samples (int): number of samples to draw, default=`len(dataset)`.
        generator (Generator): Generator used in sampling.
    """
    data_source: Sized
    replacement: bool

    def __init__(self, data_source: Sized, replacement: bool = False,
                 num_samples: Optional[int] = None, generator=None) -> None:
        self.data_source = data_source
        self.replacement = replacement
        self._num_samples = num_samples
        self.generator = generator

        if not isinstance(self.replacement, bool):
            raise TypeError("replacement should be a boolean value, but got "
                            "replacement={}".format(self.replacement))

        if not isinstance(self.num_samples, int) or self.num_samples <= 0:
            raise ValueError("num_samples should be a positive integer "
                             "value, but got num_samples={}".format(self.num_samples))

    @property
    def num_samples(self) -> int:
        # dataset size might change at runtime
        if self._num_samples is None:
            return len(self.data_source)
        return self._num_samples
	
	# 首先看__iter__方法
    def __iter__(self) -> Iterator[int]:
    	# 获取数据集的大小
        n = len(self.data_source)
        # 如果没有传入generator的话,他就会随机生成一个种子,去构建一个生成器generator
        if self.generator is None:
       		# 设置随机数的种子
            seed = int(torch.empty((), dtype=torch.int64).random_().item())
            generator = torch.Generator()
            generator.manual_seed(seed)
        else:
            generator = self.generator

        if self.replacement:
            for _ in range(self.num_samples // 32):
                yield from torch.randint(high=n, size=(32,), dtype=torch.int64, generator=generator).tolist()
            # 返回0到n-1的列表的随机组合,n是数据集长度
            yield from torch.randint(high=n, size=(self.num_samples % 32,), dtype=torch.int64, generator=generator).tolist()
        else:
            for _ in range(self.num_samples // n):
                yield from torch.randperm(n, generator=generator).tolist()
            yield from torch.randperm(n, generator=generator).tolist()[:self.num_samples % n]

    def __len__(self) -> int:
        return self.num_samples

1.3 SequentialSampler类的实现

class SequentialSampler(Sampler[int]):
    r"""Samples elements sequentially, always in the same order.

    Args:
        data_source (Dataset): dataset to sample from
    """
    data_source: Sized

    def __init__(self, data_source: Sized) -> None:
        self.data_source = data_source
	# 如果迭代它,返回的是有序的索引
    def __iter__(self) -> Iterator[int]:
        return iter(range(len(self.data_source)))

    def __len__(self) -> int:
        return len(self.data_source)

1.4 BatchSampler类的实现

也是直接看__iter__函数

class BatchSampler(Sampler[List[int]]):

    def __init__(self, sampler: Union[Sampler[int], Iterable[int]], batch_size: int, drop_last: bool) -> None:
        # Since collections.abc.Iterable does not check for `__getitem__`, which
        # is one way for an object to be an iterable, we don't do an `isinstance`
        # check here.
        if not isinstance(batch_size, int) or isinstance(batch_size, bool) or \
                batch_size <= 0:
            raise ValueError("batch_size should be a positive integer value, "
                             "but got batch_size={}".format(batch_size))
        if not isinstance(drop_last, bool):
            raise ValueError("drop_last should be a boolean value, but got "
                             "drop_last={}".format(drop_last))
        self.sampler = sampler
        self.batch_size = batch_size
        self.drop_last = drop_last
	# 先看iter函数
    def __iter__(self) -> Iterator[List[int]]:
    	# 先创建一个空列表batch
        batch = []
        # 对sampler进行一个迭代,去元素的索引
        for idx in self.sampler:
        	# 将其索引添加到列表中
            batch.append(idx)
            # 如果列表长度等于batch_size,这时候就返回列表,相当于返回一个批次batch,然后把batch置为空
            if len(batch) == self.batch_size:
                yield batch
                batch = []
        # 如果drop_last(是否丢弃最后的不够一个批次数量的元素)设置为False,那我们就把最后这个不够数量的批次也返回
        if len(batch) > 0 and not self.drop_last:
            yield batch

    def __len__(self) -> int:
        # Can only be called if self.sampler has __len__ implemented
        # We cannot enforce this condition, so we turn off typechecking for the
        # implementation below.
        # Somewhat related: see NOTE [ Lack of Default `__len__` in Python Abstract Base Classes ]
        if self.drop_last:
            return len(self.sampler) // self.batch_size  # type: ignore[arg-type]
        else:
            return (len(self.sampler) + self.batch_size - 1) // self.batch_size  # type: ignore[arg-type]

1.5 其他

这个UP讲的太详细了,没全记录,部分细节可以看看视频

上一篇:【算法小白周赛1D】K阶恒星系 - 题解和代码-数据范围


下一篇:模型智能体开发之metagpt-单智能体实践