Comparison of PyTorch's Dataset and TorchData API

Deep neural networks take a long time to train. Training speed is affected by factors such as model complexity, batch size, GPU, size of training dataset, etc.

In PyTorch, torch.utils.data.Dataset and torch.utils.data.DataLoader are commonly used to load datasets and generate batches. But starting with version 1.11, PyTorch introduces the TorchData library, which implements a different way of loading datasets.

In this article, we will compare how these two methods work when the dataset is relatively large. We take the face images of CelebA and DigiFace1M as examples. Table 1 shows their comparative characteristics. We train the model using ResNet-50. Then conduct 1 round of training to compare the usage method and time.

The information of the dataset is as follows:

CelebA (align) Number of images: 202,599 Total size: 1.4 Image size: 178x218

DigiFace1M Number of pictures: 720,000 Total size: 14.6 Picture size: 112x112

The environment we use is as follows:

CPU: Intel(R) Core(TM) i9-9900K CPU @ 3.60GHz(16 cores)

GPU: GeForce RTX 2080 Ti 11Gb

Driver version 515.65.01 / CUDA 11.7 / CUDNN 8.4.0.27

Docker 20.10.21

Pytorch 1.12.1

TrochData 0.4.1

The training code is as follows:

 def train(data_loader: torch.utils.data.DataLoader, cfg: Config):
     # create model
     model = resnet50(num_classes=cfg.n_celeba_classes + cfg.n_digiface1m_classes, pretrained=True)
     torch.cuda.set_device(cfg.gpu)
     model = model.cuda(cfg.gpu)
     model.train()
 
     # define loss function (criterion) and optimizer
     criterion = torch.nn.CrossEntropyLoss().cuda(cfg.gpu)
     optimizer = torch.optim.SGD(model.parameters(), lr=0.1,
                                 momentum=0.9,
                                 weight_decay=1e-4)
 
     start_time = time.time()
     for _ in range(cfg.epochs):
         scaler = torch.cuda.amp.GradScaler(enabled=cfg.use_amp)
         for batch_idx, (images, target) in enumerate(data_loader):
             images = images.cuda(cfg.gpu, non_blocking=True)
             target = target.cuda(cfg.gpu, non_blocking=True)
 
             # compute output
             with torch.cuda.amp.autocast(enabled=cfg.use_amp):
                 output = model(images)
                 loss = criterion(output, target)
 
             # compute gradient
             scaler.scale(loss).backward()
 
             # do SGD step
             scaler.step(optimizer)
             scaler.update()
             optimizer.zero_grad()
 
             print(batch_idx, loss.item())
     print(f'{time.time() - start_time} sec')

Dataset

Let's first look at Dataset, which has been used since Pytorch was released, and we should be very familiar with this. PyTorch supports two types of datasets: map-style Datasets and iterable-style Datasets. Map-style Dataset is convenient to use when the number of elements is known in advance. This class implements the __getitem__() and __len__() methods. If reading through the index is too time-consuming or unavailable, you can use iterable-style, you need to implement the __iter__() method. In our case, map-style is already ok because for the CelebA and DigiFace1M datasets we know the total number of images in them.

Next we create the CelebADataset class. For CelebA, the class labels are in the identity_CelebA.txt file. The face images in CelebA and DigiFace1M differ in cropping, so to reduce these differences in the getitem method after the images are uploaded, they have to be slightly cropped from all sides.

 from PIL import Image
 from torch.utils.data import Dataset
 
 class CelebADataset(torch.utils.data.Dataset):
     def __init__(self, data_path: str, transform) -> None:
         self.data_path = data_path
         self.transform = transform
         self.image_names, self.labels = self.load_labels(f'{data_path}/identity_CelebA.txt')
 
     def __len__(self) -> int:
         return len(self.image_names)
 
     def  __getitem__(self, idx: int) -> Tuple[torch.Tensor, int]:
         image_path = f'{self.data_path}/img_align_celeba/{self.image_names[idx]}'
         image = Image.open(image_path)
         left, right, top, bottom = 25, 153, 45, 173
         image = image.crop((left, top, right, bottom))
         if self.transform is not None:
             image = self.transform(image)
         label = self.labels[idx]
         return image, label
 
     @staticmethod
     def load_labels(labels_path: str) -> Tuple[list, list]:
         image_names, labels = [], []
         with open(labels_path, 'r', encoding='utf-8') as labels_file:
             lines = labels_file.readlines()
             for line in lines:
                 file_name, class_id = line.split(' ')
                 image_names.append(file_name)
                 labels.append(int(class_id[:-1]))
         return image_names, labels

For the DigiFace1M dataset, all images of the same class are in a separate folder. But in these two data sets, the class labels are the same, so for DigiFace1M we don't need to get the category, but increase it by class in CelebA. So we need add_to_class variable. In addition, the image in DigiFace1M is stored in "RGBA" format, so it still needs to be converted to "RGB".

 class DigiFace1M(torch.utils.data.Dataset):
     def __init__(self, data_path: str, transform, add_to_class: int = 0) -> None:
         self.data_path = data_path
         self.transform = transform
         self.image_paths, self.labels = self.load_labels(data_path, add_to_class)
 
     def __len__(self):
         return len(self.image_paths)
 
     def __getitem__(self, idx: int) -> Tuple[torch.Tensor, int]:
         image = Image.open(self.image_paths[idx]).convert('RGB')
         if self.transform is not None:
             image = self.transform(image)
         label = self.labels[idx]
         return image, label
 
     @staticmethod
     def load_labels(data_path: str, add_to_class: int) -> Tuple[list, list]:
         image_paths, labels = [], []
         for root, _, files in os.walk(data_path):
             for file_name in files:
                 if file_name.endswith('.png'):
                     image_paths.append(f'{root}/{file_name}')
                     labels.append(int(os.path.basename(root)) + add_to_class)
         return image_paths, labels

Now we can use torch.utils.data to combine the two data sets into one data set ConcatDataset, create a DataLoader, and start training.

 def main():
     cfg = Config()
     celeba_dataset = CelebADataset(f'{cfg.data_path}/CelebA', cfg.transform)
     digiface_dataset = DigiFace1M(f'{cfg.data_path}/DigiFace1M', cfg.transform, cfg.n_celeba_classes)
     dataset = torch.utils.data.ConcatDataset([celeba_dataset, digiface_dataset])
 
     loader = torch.utils.data.DataLoader(
         dataset=dataset,
         batch_size=cfg.batch_size,
         shuffle=True,
         drop_last=True,
         num_workers=cfg.n_workers)
 
     utils.train(loader, cfg)

TorchData API

Like Dataset, TorchData supports map-style and iterable-style data processing pipelines. But the official recommendation is to use IterDataPipe and only convert it to MapDataPipe when necessary.

Because TorchData provides an optimized data loading utility, it can help us build a processing flow conveniently. Here are some key features:

  • IterableWrapper: Wraps an iterable object to create an IterDataPipe.
  • FileListerr: given a path to a directory, will generate the file pathname (path + filename) of the file within the root directory
  • Filterr: filter elements from the source data port according to the input filter_fn (function name: filter)
  • Mapperr: apply a function (function name: map) to each item in the source DataPipe
  • Concaterr: connect multiple iterable data pipelines (function name: concat)
  • Shuffler: Shuffle the order of input DataPipe data (function name: shuffle)
  • ShardingFilterr: Allows sharding of DataPipe (function name: sharding_filter)

To use TorchData to build the data processing pipeline of CelebA and DigiFace1M, we need to perform the following steps:

For the CelebA dataset: Create a List(file_name, label, ' CelebA ') and create an IterDataPipe from it using IterableWrapper

For DigiFace1M: Use FileLister to create an IterDataPipe that returns paths to all image files, use Mapper to use collate_ann. This function takes an image path as input and returns a tuple (file_name, label, ' DigiFace1M ').

After the above two steps, we get results of two data types (file_name, label, data_name). Then use Concater to connect them into a data pipeline.

Use Shufflerr to shuffle the order, which is the same as setting shuffle=True in DataLoader.

Shard the data pipeline into shards using a ShardingFilter. Each worker will have n parts of the original DataPipe element, where n is equal to the number of workers. (multithreading, num_worker in DataLoader)

The last step is to read the image from disk

The complete code is as follows:

 @torchdata.datapipes.functional_datapipe("load_image")
 class ImageLoader(torchdata.datapipes.iter.IterDataPipe):
     def __init__(self, source_datapipe, **kwargs) -> None:
         self.source_datapipe = source_datapipe
         self.transform = kwargs['transform']
 
     def __iter__(self) -> Tuple[torch.Tensor, int]:
         for file_name, label, data_name in self.source_datapipe:
             image = Image.open(file_name)
             if data_name == 'DigiFace1M':
                 image = image.convert('RGB')
             elif data_name == 'CelebA':
                 left, right, top, bottom = 25, 153, 45, 173
                 image = image.crop((left, top, right, bottom))
             if self.transform is not None:
                 image = self.transform(image)
             yield image, label
 
 
 def collate_ann(file_path):
     label = int(os.path.basename(os.path.dirname(file_path))) + N_CELEBA_CLASSES
     data_name = os.path.basename(os.path.dirname(os.path.dirname(file_path)))
     return file_path, label, data_name
 
 
 def load_celeba_labels(labels_path: str) -> Dict[str, int]:
     labels = []
     data_path = os.path.split(labels_path)[0]
     with open(labels_path, 'r', encoding='utf-8') as labels_file:
         lines = labels_file.readlines()
         for line in lines:
             file_name, class_id = line.split(' ')
             class_id = int(class_id[:-1])
             labels.append((f'{data_path}/img_align_celeba/{file_name}', class_id, 'CelebA'))
     return labels
 
 
 def build_datapipes(cfg: Config) -> torchdata.datapipes.iter.IterDataPipe:
     celeba_dp = torchdata.datapipes.iter.IterableWrapper(
         load_celeba_labels(
             labels_path=f'{cfg.data_path}/CelebA/identity_CelebA.txt'))
 
     digiface_dp = torchdata.datapipes.iter.FileLister(f'{cfg.data_path}/DigiFace1M', masks='*.png', recursive=True)
     digiface_dp = digiface_dp.map(collate_ann)
 
     datapipe = celeba_dp.concat(digiface_dp)
     datapipe = datapipe.shuffle(buffer_size=100000)
     datapipe = datapipe.sharding_filter()
     datapipe = datapipe.load_image(transform=cfg.transform)
     return datapipe

Torch's DataLoader supports both Datasets and DataPipe, so we can use it directly

 def main():
     cfg = Config()
     datapipe = build_datapipes(cfg)
     loader = torch.utils.data.DataLoader(
         dataset=datapipe,
         batch_size=cfg.batch_size,
         shuffle=True,
         drop_last=True,
         num_workers=cfg.n_workers)
 
     utils.train(loader, cfg)

A little trick to speed up data reading

One of the longest operations in batch processing is reading images from disk. To reduce the time this operation takes, all images can be loaded and split into small datasets, say 10,000 images saved as .pickle files. When reading, each worker only needs to read a corresponding pickle file

 def prepare_data():
     cfg = Config()
     cfg.transform = None
     os.makedirs(cfg.prepared_data_path, exist_ok=True)
     celeba_dataset = dataset_example.CelebADataset(f'{cfg.data_path}/CelebA', cfg.transform)
     digiface_dataset = dataset_example.DigiFace1M(f'{cfg.data_path}/DigiFace1M', cfg.transform, cfg.n_celeba_classes)
     dataset = torch.utils.data.ConcatDataset([celeba_dataset, digiface_dataset])
 
     shard_size = 10000
     next_shard = 0
     data = []
     shuffled_idxs = np.arange(len(dataset))
     np.random.shuffle(shuffled_idxs)
     for idx in tqdm(shuffled_idxs):
         data.append(dataset[idx])
         if len(data) == shard_size:
             with open(f'{cfg.prepared_data_path}/{next_shard}_shard.pickle', 'wb') as _file:
                 pickle.dump(data, _file)
             next_shard += 1
             data = []
     with open(f'{cfg.prepared_data_path}/{next_shard}_shard.pickle', 'wb') as _file:
         pickle.dump(data, _file)

Below are all the paths to collect a .pickle dataset using FileLister, split by worker and load the .pickle data on each worker.

 @torchdata.datapipes.functional_datapipe("load_pickle_data")
 class PickleDataLoader(torchdata.datapipes.iter.IterDataPipe):
     def __init__(self, source_datapipe, **kwargs) -> None:
         self.source_datapipe = source_datapipe
         self.transform = kwargs['transform']
 
     def __iter__(self) -> Tuple[torch.Tensor, int]:
         for file_name in self.source_datapipe:
             with open(file_name, 'rb') as _file:
                 pickle_data = pickle.load(_file)
                 for image, label in pickle_data:
                     image = self.transform(image)
                     yield image, label
 
 
 def build_datapipes(cfg: Config) -> torchdata.datapipes.iter.IterDataPipe:
     datapipe = torchdata.datapipes.iter.FileLister(cfg.prepared_data_path, masks='*.pickle')
     datapipe = datapipe.shuffle()
     datapipe = datapipe.sharding_filter()
     datapipe = datapipe.load_pickle_data(transform=cfg.transform)
     return datapipe

Data loading comparison

We compare three different data loading methods. For all tests, batch_size = 600.
n workersDatasets, secDataPipes, secDataPipe + pickle, sec10 3581 7986 7585 10034 2993 760
When training with DataPipe on unprepared data (without pickle), the first few hundred batches are generated very quickly, GPU usage is almost 100%, but then the speed gradually decreases, this method is even faster than using n_workers =10 dataset is even slower. While I understand that the speed of the two methods is the same because the operations performed are the same, they are actually not the same

The optimal n_workers for DataLoader does not have a fixed value, because it depends on the task (image size, complexity of image preprocessing, etc.) and computer configuration (HDD vs SSD).

When training on a dataset with a large number of small images, data preparation is necessary, such as combining small files into several large files, which can reduce the time to read data from disk. However, using this method requires thoroughly shuffling the data before writing it to the shard to avoid learning convergence deterioration. You also need to choose a reasonable shard size (it should be large enough to prevent disk problems and small enough to shuffle data efficiently using the Shuffler in datappipes).

Finally, the code of this article is here, and those who are interested can test and compare by themselves:

https://avoid.overfit.cn/post/d431289d4723430b882e189008aeb959

Author: Karina Ovchinnikova

Tags: Machine Learning neural networks Pytorch Deep Learning

Posted by psychowolvesbane on Tue, 27 Dec 2022 05:47:13 +0300