Shortcuts

Distributed training tutorial

If you have multiple GPUs, the most reliable way to utilize their full potential during training is to use the distributed package from PyTorch. For such a case, there are many distributed helpers in Catalyst to make this engineering stuff a bit more user-friendly.

Please note that due to PyTorch multiprocessing realization, GPU-based distributed training doesn’t work in a notebook, so prepare a script to run the training. Nevertheless, XLA-based training could be run directly in the notebook.

Prepare your script

Let’s start with a simple script for ResNet9 model training on CIFAR10:

import os

from torch import nn, optim
from torch.utils.data import DataLoader

from catalyst import dl
from catalyst.contrib import CIFAR10, Compose, ImageToTensor, NormalizeImage, ResidualBlock

def conv_block(in_channels, out_channels, pool=False):
    layers = [
        nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
        nn.BatchNorm2d(out_channels),
        nn.ReLU(inplace=True),
    ]
    if pool:
        layers.append(nn.MaxPool2d(2))
    return nn.Sequential(*layers)


def resnet9(in_channels: int, num_classes: int, size: int = 16):
    sz, sz2, sz4, sz8 = size, size * 2, size * 4, size * 8
    return nn.Sequential(
        conv_block(in_channels, sz),
        conv_block(sz, sz2, pool=True),
        ResidualBlock(nn.Sequential(conv_block(sz2, sz2), conv_block(sz2, sz2))),
        conv_block(sz2, sz4, pool=True),
        conv_block(sz4, sz8, pool=True),
        ResidualBlock(nn.Sequential(conv_block(sz8, sz8), conv_block(sz8, sz8))),
        nn.Sequential(
            nn.MaxPool2d(4), nn.Flatten(),
            nn.Dropout(0.2), nn.Linear(sz8, num_classes)
        ),
    )

if __name__ == "__main__":
    # experiment setup
    logdir = "./logdir1"
    num_epochs = 10

    # data
    transform = Compose([
        ImageToTensor(),
        NormalizeImage((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])
    train_data = CIFAR10(
        os.getcwd(), train=True, download=True, transform=transform
    )
    valid_data = CIFAR10(
        os.getcwd(), train=False, download=True, transform=transform
    )
    loaders = {
        "train": DataLoader(train_data, batch_size=32, num_workers=4),
        "valid": DataLoader(valid_data, batch_size=32, num_workers=4),
    }

    # model, criterion, optimizer, scheduler
    model = resnet9(in_channels=3, num_classes=10)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    scheduler = optim.lr_scheduler.MultiStepLR(optimizer, [5, 8], gamma=0.3)

    # model training
    runner = dl.SupervisedRunner()
    runner.train(
        model=model,
        criterion=criterion,
        optimizer=optimizer,
        scheduler=scheduler,
        loaders=loaders,
        logdir=logdir,
        num_epochs=num_epochs,
        verbose=True,
        valid_loader="valid",
        valid_metric="loss",
        minimize_valid_metric=True,
    )

By default, without any additional specifications, Catalyst will utilize all available resources in - DataParallel setup if there are several GPUs available, - GPU` setup if there is only one GPU available, - CPU setup if there is no GPU available.

Fast DDP

Tranks to Catalyst Python API, you could run the same code without any change and get the distributed setup with only one line of code. Just pass ddp=True flag during .train call:

import os

from torch import nn, optim
from torch.utils.data import DataLoader

from catalyst import dl
from catalyst.contrib import CIFAR10, Compose, ImageToTensor, NormalizeImage, ResidualBlock

def conv_block(in_channels, out_channels, pool=False):
    layers = [
        nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
        nn.BatchNorm2d(out_channels),
        nn.ReLU(inplace=True),
    ]
    if pool:
        layers.append(nn.MaxPool2d(2))
    return nn.Sequential(*layers)

def resnet9(in_channels: int, num_classes: int, size: int = 16):
    sz, sz2, sz4, sz8 = size, size * 2, size * 4, size * 8
    return nn.Sequential(
        conv_block(in_channels, sz),
        conv_block(sz, sz2, pool=True),
        ResidualBlock(nn.Sequential(conv_block(sz2, sz2), conv_block(sz2, sz2))),
        conv_block(sz2, sz4, pool=True),
        conv_block(sz4, sz8, pool=True),
        ResidualBlock(nn.Sequential(conv_block(sz8, sz8), conv_block(sz8, sz8))),
        nn.Sequential(
            nn.MaxPool2d(4), nn.Flatten(),
            nn.Dropout(0.2), nn.Linear(sz8, num_classes)
        ),
    )

if __name__ == "__main__":
    # experiment setup
    logdir = "./logdir2"
    num_epochs = 10

    # data
    transform = Compose([
        ImageToTensor(),
        NormalizeImage((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])
    train_data = CIFAR10(
        os.getcwd(), train=True, download=True, transform=transform
    )
    valid_data = CIFAR10(
        os.getcwd(), train=False, download=True, transform=transform
    )
    loaders = {
        "train": DataLoader(train_data, batch_size=32, num_workers=4),
        "valid": DataLoader(valid_data, batch_size=32, num_workers=4),
    }

    # model, criterion, optimizer, scheduler
    model = resnet9(in_channels=3, num_classes=10)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    scheduler = optim.lr_scheduler.MultiStepLR(optimizer, [5, 8], gamma=0.3)

    # model training
    runner = dl.SupervisedRunner()
    runner.train(
        model=model,
        criterion=criterion,
        optimizer=optimizer,
        scheduler=scheduler,
        loaders=loaders,
        logdir=logdir,
        num_epochs=num_epochs,
        verbose=True,
        valid_loader="valid",
        valid_metric="loss",
        minimize_valid_metric=True,
        ddp=True,   # <-- here is the trick,
        amp=False,  # <-- here is another trick ;)
    )

Please note that you could also specify automatic mixed-precision usage with the amp flag in the same way.

In this way, Catalyst will automatically try to make your loaders work in a distributed setup and run experiment training.

Nevertheless, it has several disadvantages,
  • without proper specification, loaders will be created again and again for each distributed worker,

  • you can’t understand what is going under the hood of ddp=True,

  • we can’t always transfer your loaders to distributed mode correctly due to a large variety of data processing pipelines available.

For such a reason, Catalyst API also provides a proper “low-level” API for your data preparation for the distributed setup.

DDP under the hood

Let’s create our CustomSupervisedRunner and pass the data preparation under CustomSupervisedRunner.get_loaders.

import os

from torch import nn, optim
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler

from catalyst import dl
from catalyst.contrib import CIFAR10, Compose, ImageToTensor, NormalizeImage, ResidualBlock

def conv_block(in_channels, out_channels, pool=False):
    layers = [
        nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
        nn.BatchNorm2d(out_channels),
        nn.ReLU(inplace=True),
    ]
    if pool:
        layers.append(nn.MaxPool2d(2))
    return nn.Sequential(*layers)

def resnet9(in_channels: int, num_classes: int, size: int = 16):
    sz, sz2, sz4, sz8 = size, size * 2, size * 4, size * 8
    return nn.Sequential(
        conv_block(in_channels, sz),
        conv_block(sz, sz2, pool=True),
        ResidualBlock(nn.Sequential(conv_block(sz2, sz2), conv_block(sz2, sz2))),
        conv_block(sz2, sz4, pool=True),
        conv_block(sz4, sz8, pool=True),
        ResidualBlock(nn.Sequential(conv_block(sz8, sz8), conv_block(sz8, sz8))),
        nn.Sequential(
            nn.MaxPool2d(4), nn.Flatten(),
            nn.Dropout(0.2), nn.Linear(sz8, num_classes)
        ),
    )

class CustomSupervisedRunner(dl.SupervisedRunner):
    # here is the trick:
    def get_loaders(self, stage: str):
        transform = Compose([
            ImageToTensor(),
            NormalizeImage((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
        ])
        train_data = CIFAR10(
            os.getcwd(), train=True, download=True, transform=transform
        )
        valid_data = CIFAR10(
            os.getcwd(), train=False, download=True, transform=transform
        )
        if self.engine.is_ddp:
            train_sampler = DistributedSampler(
                train_data,
                num_replicas=self.engine.world_size,
                rank=self.engine.rank,
                shuffle=True,
            )
            valid_sampler = DistributedSampler(
                valid_data,
                num_replicas=self.engine.world_size,
                rank=self.engine.rank,
                shuffle=False,
            )
        else:
            train_sampler = valid_sampler = None

        train_loader = DataLoader(
            train_data, batch_size=32, sampler=train_sampler, num_workers=4
        )
        valid_loader = DataLoader(
            valid_data, batch_size=32, sampler=train_sampler, num_workers=4
        )
        return {"train": train_loader, "valid": valid_loader}

if __name__ == "__main__":
    # experiment setup
    logdir = "./logdir2"
    num_epochs = 10

    # model, criterion, optimizer, scheduler
    model = resnet9(in_channels=3, num_classes=10)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    scheduler = optim.lr_scheduler.MultiStepLR(optimizer, [5, 8], gamma=0.3)

    # model training
    runner = CustomSupervisedRunner()
    runner.train(
        model=model,
        criterion=criterion,
        optimizer=optimizer,
        scheduler=scheduler,
        loaders=None,  # <-- here is the trick
        logdir=logdir,
        num_epochs=num_epochs,
        verbose=True,
        valid_loader="valid",
        valid_metric="loss",
        minimize_valid_metric=True,
        ddp=True,   # <-- now it works like a charm
        amp=False,  # <-- you can still use this trick here ;)
    )

As you can see, we have the same code, except that the CustomSupervisedRunner now knows all the details about data preprocessing under distributed setup. And thanks to the pure PyTorch, the code is easily readable and straightforward.

Runner under the hood

As an extra point, you could also specify the whole experiment within Runner methods:

import os

from torch import nn, optim
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler

from catalyst import dl, utils
from catalyst.contrib import CIFAR10, Compose, ImageToTensor, NormalizeImage, ResidualBlock

def conv_block(in_channels, out_channels, pool=False):
    layers = [
        nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
        nn.BatchNorm2d(out_channels),
        nn.ReLU(inplace=True),
    ]
    if pool:
        layers.append(nn.MaxPool2d(2))
    return nn.Sequential(*layers)

def resnet9(in_channels: int, num_classes: int, size: int = 16):
    sz, sz2, sz4, sz8 = size, size * 2, size * 4, size * 8
    return nn.Sequential(
        conv_block(in_channels, sz),
        conv_block(sz, sz2, pool=True),
        ResidualBlock(nn.Sequential(conv_block(sz2, sz2), conv_block(sz2, sz2))),
        conv_block(sz2, sz4, pool=True),
        conv_block(sz4, sz8, pool=True),
        ResidualBlock(nn.Sequential(conv_block(sz8, sz8), conv_block(sz8, sz8))),
        nn.Sequential(
            nn.MaxPool2d(4), nn.Flatten(),
            nn.Dropout(0.2), nn.Linear(sz8, num_classes)
        ),
    )

class CustomRunner(dl.IRunner):
    def __init__(self, logdir: str):
        super().__init__()
        self._logdir = logdir

    def get_engine(self):
        return dl.DistributedDataParallelAMPEngine()

    def get_loggers(self):
        return {
            "console": dl.ConsoleLogger(),
            "csv": dl.CSVLogger(logdir=self._logdir),
            "tensorboard": dl.TensorboardLogger(logdir=self._logdir),
        }

    @property
    def stages(self):
        return ["train"]

    def get_stage_len(self, stage: str) -> int:
        return 10

    def get_loaders(self, stage: str):
        transform = Compose([
            ImageToTensor(),
            NormalizeImage((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
        ])
        train_data = CIFAR10(
            os.getcwd(), train=True, download=True, transform=transform
        )
        valid_data = CIFAR10(
            os.getcwd(), train=False, download=True, transform=transform
        )
        if self.engine.is_ddp:
            train_sampler = DistributedSampler(
                train_data,
                num_replicas=self.engine.world_size,
                rank=self.engine.rank,
                shuffle=True,
            )
            valid_sampler = DistributedSampler(
                valid_data,
                num_replicas=self.engine.world_size,
                rank=self.engine.rank,
                shuffle=False,
            )
        else:
            train_sampler = valid_sampler = None

        train_loader = DataLoader(
            train_data, batch_size=32, sampler=train_sampler, num_workers=4
        )
        valid_loader = DataLoader(
            valid_data, batch_size=32, sampler=train_sampler, num_workers=4
        )
        return {"train": train_loader, "valid": valid_loader}

    def get_model(self, stage: str):
        model = (
            self.model
            if self.model is not None
            else resnet9(in_channels=3, num_classes=10)
        )
        return model

    def get_criterion(self, stage: str):
        return nn.CrossEntropyLoss()

    def get_optimizer(self, stage: str, model):
        return optim.Adam(model.parameters(), lr=1e-3)

    def get_scheduler(self, stage: str, optimizer):
        return optim.lr_scheduler.MultiStepLR(optimizer, [5, 8], gamma=0.3)

    def get_callbacks(self, stage: str):
        return {
            "criterion": dl.CriterionCallback(
                metric_key="loss", input_key="logits", target_key="targets"
            ),
            "optimizer": dl.OptimizerCallback(metric_key="loss"),
            "scheduler": dl.SchedulerCallback(loader_key="valid", metric_key="loss"),
            "accuracy": dl.AccuracyCallback(
                input_key="logits", target_key="targets", topk_args=(1, 3, 5)
            ),
            "checkpoint": dl.CheckpointCallback(
                self._logdir,
                loader_key="valid",
                metric_key="accuracy",
                minimize=False,
                save_n_best=1,
            ),
            # "tqdm": dl.TqdmCallback(),
        }

    def handle_batch(self, batch):
        x, y = batch
        logits = self.model(x)
        self.batch = {
            "features": x,
            "targets": y,
            "logits": logits,
        }

 if __name__ == "__main__":
    # experiment setup
    logdir = "./logdir3"

    runner = CustomRunner(logdir)
    runner.run()

With such low-level runner specification, you could customize every detail you want: - hardware acceleration setup with get_engine, - data preparation with get_loaders, - experiment components with get_model, get_optimizer, get_criterion, get_schduler, - you main training/evaluating logic withing handle_batch, - all extra components with get_callbacks.

Launch your training

In your terminal, run:

python {script_name}.py

You can vary available GPUs with CUDA_VIBIBLE_DEVICES option, for example,

# run only on 1st and 2nd GPUs
CUDA_VISIBLE_DEVICES="1,2" python {script_name}.py
# run only on 0, 1st and 3rd GPUs
CUDA_VISIBLE_DEVICES="0,1,3" python {script_name}.py

What is going under the hood? - the same model will be copied on all your available GPUs, - then, during training, the full dataset will randomly be split between available GPUs (that will change at each epoch), - each GPU will grab a batch (on that fractioned dataset), - and pass it through the model, compute the loss, then back-propagate the gradients, - then they will share their results and average them, - since they all have the same gradients at this stage, they will all perform the same update, so the models will still be the same after the gradient step. - then training continues with the next batch until the number of desired iterations is done.

With such specification, the distributed training is “equivalent” to training with a batch size of `batch_size x num_gpus (where batch_size is what you used in your script).

During training Catalyst will automatically average all metrics and log them on rank-zero node only. Same logic used for model checkpointing.

Resume

During this tutorial, we have: - review how to run distributed training with Catalyst into one single line, - how to adapt your custom data preprocessing for the distributed training, - and even specify the whole custom Runner if it’s required.

Finally, we have reviewed the internals or distributed training and understood its “magic” under the hood.