Distributed training (multi-gpu, multi-node)¶
Catalyst supports automatic experiments scaling with distributed training support.
Notebook API¶
Suppose you have the following pipeline with MNIST Classification:
import os
from torch import nn, optim
from torch.utils.data import DataLoader
from catalyst import dl
from catalyst.data import ToTensor
from catalyst.contrib.datasets import MNIST
model = nn.Sequential(nn.Flatten(), nn.Linear(28 * 28, 10))
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.02)
loaders = {
"train": DataLoader(
MNIST(os.getcwd(), train=True, download=True, transform=ToTensor()), batch_size=32
),
"valid": DataLoader(
MNIST(os.getcwd(), train=False), batch_size=32
),
}
runner = dl.SupervisedRunner()
# model training
runner.train(
model=model,
criterion=criterion,
optimizer=optimizer,
loaders=loaders,
num_epochs=1,
logdir="./logs",
valid_loader="valid",
valid_metric="loss",
minimize_valid_metric=True,
verbose=True,
callbacks=[
dl.AccuracyCallback(input_key="logits", target_key="targets", num_classes=10),
dl.PrecisionRecallF1SupportCallback(
input_key="logits", target_key="targets", num_classes=10
),
dl.AUCCallback(input_key="logits", target_key="targets"),
# catalyst[ml] required
# dl.ConfusionMatrixCallback(
# input_key="logits", target_key="targets", num_classes=num_classes
# ),
]
)
For correct DDP training, you need to separate creation of the dataset(s) from the training. In this way Catalyst could easily transfer your datasets to the distributed mode and there would be no data re-creation on each worker.
As a best practice scenario for this case:
import torch
from torch.utils.data import DataLoader, TensorDataset, DistributedSampler
from catalyst import dl
class CustomRunner(dl.SupervisedRunner):
def get_engine(self):
return dl.DistributedDataParallelEngine()
def get_loaders(self, stage: str):
train_data = MNIST(os.getcwd(), train=True, download=True, transform=ToTensor())
valid_data = MNIST(os.getcwd(), train=False)
loaders = {
"train": DataLoader(
train_data, sampler=DistributedSampler(dataset=train_data), batch_size=32
),
"valid": DataLoader(
valid_data, sampler=DistributedSampler(dataset=valid_data), batch_size=32
),
}
return loaders
# model, criterion, optimizer, scheduler
model = nn.Sequential(nn.Flatten(), nn.Linear(28 * 28, 10))
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.02)
runner = CustomRunner()
runner.train(
model=model,
criterion=criterion,
optimizer=optimizer,
logdir="./logs",
valid_loader="valid",
valid_metric="loss",
minimize_valid_metric=True,
verbose=True,
num_epochs=8,
callbacks=[
dl.AccuracyCallback(input_key="logits", target_key="targets", num_classes=10),
dl.PrecisionRecallF1SupportCallback(
input_key="logits", target_key="targets", num_classes=10
),
dl.AUCCallback(input_key="logits", target_key="targets"),
# catalyst[ml] required
# dl.ConfusionMatrixCallback(
# input_key="logits", target_key="targets", num_classes=num_classes
# ),
]
)
Config API¶
To run Catalyst experiments in the DDP-mode,
the only thing you need to do for the Config API - pass required flag to the run
command:
catalyst-dl run -C=/path/to/configs --distributed
Launch your training¶
In your terminal, type the following line (adapt script_name to your script name ending with .py).
python {script_name}
You can vary availble GPUs with CUDA_VIBIBLE_DEVICES
option, for example,
# run only on 1st and 2nd GPUs
CUDA_VISIBLE_DEVICES="1,2" python {script_name}
# run only on 0, 1st and 3rd GPUs
CUDA_VISIBLE_DEVICES="0,1,3" python {script_name}
What will happen is that the same model will be copied on all your available GPUs.
During training, the full dataset will randomly split between the GPUs
(that will change at each epoch).
Each GPU will grab a batch (on that fraction of the dataset),
pass it through the model, compute the loss then back-propagate (to calculate the gradients).
Then they will share their results and average them,
which means like your training is the equivalent of a training
with a batch size of `batch_size x num_gpus
(where batch_size
is what you used in your script).
Since they all have the same gradients at this stage, they will all perform the same update, so the models will still be the same after this step. Then training continues with the next batch, until the number of desired iterations is done.
During training Catalyst will automatically average all metrics
and log them on Master
node only. Same logic used for model checkpointing.
If you haven’t found the answer for your question, feel free to join our slack for the discussion.