Shortcuts

Engines

You could check engines overview under examples/engines section.

AMP

AMPEngine

class catalyst.engines.amp.AMPEngine(device: str = 'cuda', scaler_kwargs: Optional[Dict[str, Any]] = None)[source]

Bases: catalyst.engines.torch.DeviceEngine

Pytorch.AMP single training device engine.

Parameters

Examples:

from catalyst import dl

runner = dl.SupervisedRunner()
runner.train(
    engine=dl.AMPEngine("cuda:1"),
    ...
)
from catalyst import dl

class MyRunner(dl.IRunner):
    # ...
    def get_engine(self):
        return dl.AMPEngine("cuda:1")
    # ...
args:
    logs: ...

model:
    _target_: ...
    ...

engine:
    _target_: AMPEngine
    device: cuda:1

stages:
    ...

DataParallelAMPEngine

class catalyst.engines.amp.DataParallelAMPEngine(scaler_kwargs: Optional[Dict[str, Any]] = None)[source]

Bases: catalyst.engines.amp.AMPEngine

AMP multi-gpu training device engine.

Parameters

scaler_kwargs – parameters for torch.cuda.amp.GradScaler. Possible parameters: https://pytorch.org/docs/stable/amp.html#torch.cuda.amp.GradScaler

Examples:

from catalyst import dl

runner = dl.SupervisedRunner()
runner.train(
    engine=dl.DataParallelAMPEngine(),
    ...
)
from catalyst import dl

class MyRunner(dl.IRunner):
    # ...
    def get_engine(self):
        return dl.DataParallelAMPEngine()
    # ...
args:
    logs: ...

model:
    _target_: ...
    ...

engine:
    _target_: DataParallelAMPEngine

stages:
    ...

DistributedDataParallelAMPEngine

class catalyst.engines.amp.DistributedDataParallelAMPEngine(address: Optional[str] = None, port: Optional[Union[str, int]] = None, sync_bn: bool = False, ddp_kwargs: Optional[Dict[str, Any]] = None, process_group_kwargs: Optional[Dict[str, Any]] = None, scaler_kwargs: Optional[Dict[str, Any]] = None)[source]

Bases: catalyst.engines.torch.DistributedDataParallelEngine

Distributed AMP multi-gpu training device engine.

Parameters

Examples:

from catalyst import dl

runner = dl.SupervisedRunner()
runner.train(
    engine=dl.DistributedDataParallelAMPEngine(),
    ...
)
from catalyst import dl

class MyRunner(dl.IRunner):
    # ...
    def get_engine(self):
        return dl.DistributedDataParallelAMPEngine(
            address="0.0.0.0",
            port=23234,
            ddp_kwargs={"find_unused_parameters": False},
            process_group_kwargs={"port": 12345},
            scaler_kwargs={"growth_factor": 1.5}
        )
    # ...
args:
    logs: ...

model:
    _target_: ...
    ...

engine:
    _target_: DistributedDataParallelAMPEngine
    address: 0.0.0.0
    port: 23234
    ddp_kwargs:
        find_unused_parameters: false
    process_group_kwargs:
        port: 12345
    scaler_kwargs:
        growth_factor: 1.5

stages:
    ...

Apex

APEXEngine

class catalyst.engines.apex.APEXEngine(device: str = 'cuda', apex_kwargs: Optional[Dict[str, Any]] = None)[source]

Bases: catalyst.engines.torch.DeviceEngine

Apex single training device engine.

Parameters

Examples:

from catalyst import dl

runner = dl.SupervisedRunner()
runner.train(
    engine=dl.APEXEngine(apex_kwargs=dict(opt_level="O1", keep_batchnorm_fp32=False)),
    ...
)
from catalyst import dl

class MyRunner(dl.IRunner):
    # ...
    def get_engine(self):
        return dl.APEXEngine(apex_kwargs=dict(opt_level="O1", keep_batchnorm_fp32=False))
    # ...
args:
    logs: ...

model:
    _target_: ...
    ...

engine:
    _target_: APEXEngine
    apex_kwargs:
        opt_level: O1
        keep_batchnorm_fp32: false

stages:
    ...

DataParallelApexEngine

catalyst.engines.apex.DataParallelApexEngine

alias of catalyst.engines.apex.DataParallelAPEXEngine

DistributedDataParallelApexEngine

catalyst.engines.apex.DistributedDataParallelApexEngine

alias of catalyst.engines.apex.DistributedDataParallelAPEXEngine

DeepSpeed

DistributedDataParallelDeepSpeedEngine

class catalyst.engines.deepspeed.DistributedDataParallelDeepSpeedEngine(address: Optional[str] = None, port: Optional[Union[str, int]] = None, process_group_kwargs: Optional[Dict[str, Any]] = None, deepspeed_kwargs: Optional[Dict[str, Any]] = None, train_batch_size: int = 256)[source]

Bases: catalyst.engines.torch.DeviceEngine

Distributed DeepSpeed MultiGPU training device engine.

Parameters

Examples:

from catalyst import dl

runner = dl.SupervisedRunner()
runner.train(
    engine=dl.DistributedDataParallelDeepSpeedEngine(),
    ...
)
from catalyst import dl

class MyRunner(dl.IRunner):
    # ...
    def get_engine(self):
        return dl.DistributedDataParallelDeepSpeedEngine(
            address="0.0.0.0",
            port=23234,
            process_group_kwargs={"port": 12345},
            deepspeed_kwargs={"config": {"train_batch_size": 64}}
        )
    # ...
args:
    logs: ...

model:
    _target_: ...
    ...

engine:
    _target_: DistributedDataParallelDeepSpeedEngine
    address: 0.0.0.0
    port: 23234
    process_group_kwargs:
        port: 12345
    deepspeed_kwargs:
        config:
            train_batch_size: 64

stages:
    ...

FairScale

PipelineParallelFairScaleEngine

class catalyst.engines.fairscale.PipelineParallelFairScaleEngine(pipe_kwargs: Optional[Dict[str, Any]] = None)[source]

Bases: catalyst.engines.torch.DeviceEngine

FairScale multi-gpu training device engine.

Parameters

pipe_kwargs – parameters for fairscale.nn.Pipe. Docs for fairscale.nn.Pipe: https://fairscale.readthedocs.io/en/latest/api/nn/pipe.html

Examples:

from catalyst import dl

runner = dl.SupervisedRunner()
runner.train(
    engine=dl.PipelineParallelFairScaleEngine(),
    ...
)
from catalyst import dl

class MyRunner(dl.IRunner):
    # ...
    def get_engine(self):
        return dl.PipelineParallelFairScaleEngine(
            pipe_kwargs={"balance": [3, 1]}
        )
    # ...
args:
    logs: ...

model:
    _target_: ...
    ...

engine:
    _target_: PipelineParallelFairScaleEngine
    pipe_kwargs:
        balance: [3, 1]

stages:
    ...

SharedDataParallelFairScaleEngine

class catalyst.engines.fairscale.SharedDataParallelFairScaleEngine(address: Optional[str] = None, port: Optional[Union[str, int]] = None, sync_bn: bool = False, ddp_kwargs: Optional[Dict[str, Any]] = None, process_group_kwargs: Optional[Dict[str, Any]] = None)[source]

Bases: catalyst.engines.torch.DistributedDataParallelEngine

Distributed FairScale MultiGPU training device engine.

Parameters

Examples:

from catalyst import dl

runner = dl.SupervisedRunner()
runner.train(
    engine=dl.SharedDataParallelFairScaleEngine(),
    ...
)
from catalyst import dl

class MyRunner(dl.IRunner):
    # ...
    def get_engine(self):
        return dl.SharedDataParallelFairScaleEngine(
            address="0.0.0.0",
            port=23234,
            ddp_kwargs={"find_unused_parameters": False},
            process_group_kwargs={"port": 12345},
        )
    # ...
args:
    logs: ...

model:
    _target_: ...
    ...

engine:
    _target_: SharedDataParallelFairScaleEngine
    address: 0.0.0.0
    port: 23234
    ddp_kwargs:
        find_unused_parameters: false
    process_group_kwargs:
        port: 12345

stages:
    ...

SharedDataParallelFairScaleAMPEngine

class catalyst.engines.fairscale.SharedDataParallelFairScaleAMPEngine(address: Optional[str] = None, port: Optional[Union[str, int]] = None, sync_bn: bool = False, ddp_kwargs: Optional[Dict[str, Any]] = None, process_group_kwargs: Optional[Dict[str, Any]] = None, scaler_kwargs: Optional[Dict[str, Any]] = None)[source]

Bases: catalyst.engines.fairscale.SharedDataParallelFairScaleEngine

Distributed FairScale MultiGPU training device engine.

Parameters

Examples:

from catalyst import dl

runner = dl.SupervisedRunner()
runner.train(
    engine=dl.SharedDataParallelFairScaleAMPEngine(),
    ...
)
from catalyst import dl

class MyRunner(dl.IRunner):
    # ...
    def get_engine(self):
        return dl.SharedDataParallelFairScaleAMPEngine(
            address="0.0.0.0",
            port=23234,
            ddp_kwargs={"find_unused_parameters": False},
            process_group_kwargs={"port": 12345},
            scaler_kwargs={"growth_factor": 1.5}
        )
    # ...
args:
    logs: ...

model:
    _target_: ...
    ...

engine:
    _target_: SharedDataParallelFairScaleAMPEngine
    address: 0.0.0.0
    port: 23234
    ddp_kwargs:
        find_unused_parameters: false
    process_group_kwargs:
        port: 12345
    scaler_kwargs:
        growth_factor: 1.5

stages:
    ...

FullySharedDataParallelFairScaleEngine

class catalyst.engines.fairscale.FullySharedDataParallelFairScaleEngine(address: Optional[str] = None, port: Optional[Union[str, int]] = None, sync_bn: bool = False, ddp_kwargs: Optional[Dict[str, Any]] = None, process_group_kwargs: Optional[Dict[str, Any]] = None)[source]

Bases: catalyst.engines.fairscale.SharedDataParallelFairScaleEngine

Distributed FairScale MultiGPU training device engine.

Parameters

Examples:

from catalyst import dl

runner = dl.SupervisedRunner()
runner.train(
    engine=dl.FullySharedDataParallelFairScaleEngine(),
    ...
)
from catalyst import dl

class MyRunner(dl.IRunner):
    # ...
    def get_engine(self):
        return dl.FullySharedDataParallelFairScaleEngine(
            address="0.0.0.0",
            port=23234,
            ddp_kwargs={"find_unused_parameters": False},
            process_group_kwargs={"port": 12345},
        )
    # ...
args:
    logs: ...

model:
    _target_: ...
    ...

engine:
    _target_: FullySharedDataParallelFairScaleEngine
    address: 0.0.0.0
    port: 23234
    ddp_kwargs:
        find_unused_parameters: false
    process_group_kwargs:
        port: 12345

stages:
    ...

Torch

DeviceEngine

class catalyst.engines.torch.DeviceEngine(device: Optional[str] = None)[source]

Bases: catalyst.core.engine.IEngine

Single training device engine.

Parameters

device – use device, default is “cpu”.

Examples:

from catalyst import dl

runner = dl.SupervisedRunner()
runner.train(
    engine=dl.DeviceEngine("cuda:1"),
    ...
)
from catalyst import dl

class MyRunner(dl.IRunner):
    # ...
    def get_engine(self):
        return dl.DeviceEngine("cuda:1")
    # ...
args:
    logs: ...

model:
    _target_: ...
    ...

engine:
    _target_: DeviceEngine
    device: cuda:1

stages:
    ...

DataParallelEngine

class catalyst.engines.torch.DataParallelEngine[source]

Bases: catalyst.engines.torch.DeviceEngine

MultiGPU training device engine.

Examples:

from catalyst import dl

runner = dl.SupervisedRunner()
runner.train(
    engine=dl.DataParallelEngine(),
    ...
)
from catalyst import dl

class MyRunner(dl.IRunner):
    # ...
    def get_engine(self):
        return dl.DataParallelEngine()
    # ...
args:
    logs: ...

model:
    _target_: ...
    ...

engine:
    _target_: DataParallelEngine

stages:
    ...

DistributedDataParallelEngine

class catalyst.engines.torch.DistributedDataParallelEngine(address: Optional[str] = None, port: Optional[Union[str, int]] = None, sync_bn: bool = False, ddp_kwargs: Optional[Dict[str, Any]] = None, process_group_kwargs: Optional[Dict[str, Any]] = None)[source]

Bases: catalyst.engines.torch.DeviceEngine

Distributed MultiGPU training device engine.

Parameters

Examples:

from catalyst import dl

runner = dl.SupervisedRunner()
runner.train(
    engine=dl.DistributedDataParallelEngine(),
    ...
)
from catalyst import dl

class MyRunner(dl.IRunner):
    # ...
    def get_engine(self):
        return dl.DistributedDataParallelEngine(
            address="0.0.0.0",
            port=23234,
            ddp_kwargs={"find_unused_parameters": False},
            process_group_kwargs={"backend": "nccl"},
        )
    # ...
args:
    logs: ...

model:
    _target_: ...
    ...

engine:
    _target_: DistributedDataParallelEngine
    address: 0.0.0.0
    port: 23234
    ddp_kwargs:
        find_unused_parameters: false
    process_group_kwargs:
        backend: nccl

stages:
    ...

XLA

DeviceEngine

class catalyst.engines.xla.XLAEngine[source]

Bases: catalyst.engines.torch.DeviceEngine

XLA SingleTPU training device engine.

Examples:

import os
from datetime import datetime

import torch
from torch import nn, optim
from torch.utils.data import DataLoader

from catalyst import dl
from catalyst.contrib.datasets import CIFAR10
from catalyst.contrib.nn import ResidualBlock
from catalyst.data import transforms

def conv_block(in_channels, out_channels, pool=False):
    layers = [
        nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
        nn.BatchNorm2d(out_channels),
        nn.ReLU(inplace=True),
    ]
    if pool:
        layers.append(nn.MaxPool2d(2))
    return nn.Sequential(*layers)


def resnet9(in_channels: int, num_classes: int, size: int = 16):
    sz, sz2, sz4, sz8 = size, size * 2, size * 4, size * 8
    return nn.Sequential(
        conv_block(in_channels, sz),
        conv_block(sz, sz2, pool=True),
        ResidualBlock(nn.Sequential(conv_block(sz2, sz2), conv_block(sz2, sz2))),
        conv_block(sz2, sz4, pool=True),
        conv_block(sz4, sz8, pool=True),
        ResidualBlock(nn.Sequential(conv_block(sz8, sz8), conv_block(sz8, sz8))),
        nn.Sequential(
            nn.MaxPool2d(4), nn.Flatten(), nn.Dropout(0.2), nn.Linear(sz8, num_classes)
        ),
    )

class CustomRunner(dl.IRunner):
    def __init__(self, logdir):
        super().__init__()
        self._logdir = logdir

    def get_engine(self):
        return dl.XLAEngine()

    def get_loggers(self):
        return {
            "console": dl.ConsoleLogger(),
            "csv": dl.CSVLogger(logdir=self._logdir),
            "tensorboard": dl.TensorboardLogger(logdir=self._logdir),
        }

    @property
    def stages(self):
        return ["train"]

    def get_stage_len(self, stage: str) -> int:
        return 3

    def get_loaders(self, stage: str):
        transform = transforms.Compose(
            [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
        )
        train_data = CIFAR10(os.getcwd(), train=False, download=True, transform=transform)
        valid_data = CIFAR10(os.getcwd(), train=False, download=True, transform=transform)

        if self.engine.is_ddp:
            train_sampler = torch.utils.data.distributed.DistributedSampler(
                train_data,
                num_replicas=self.engine.world_size,
                rank=self.engine.rank,
                shuffle=True
            )
            valid_sampler = torch.utils.data.distributed.DistributedSampler(
                valid_data,
                num_replicas=self.engine.world_size,
                rank=self.engine.rank,
                shuffle=False
            )
        else:
            train_sampler = valid_sampler = None

        return {
            "train": DataLoader(train_data, batch_size=32, sampler=train_sampler),
            "valid": DataLoader(valid_data, batch_size=32, sampler=valid_sampler),
        }

    def get_model(self, stage: str):
        model = self.model                     if self.model is not None                     else resnet9(in_channels=3, num_classes=10)
        return model

    def get_criterion(self, stage: str):
        return nn.CrossEntropyLoss()

    def get_optimizer(self, stage: str, model):
        return optim.Adam(model.parameters(), lr=1e-3)

    def get_scheduler(self, stage: str, optimizer):
        return optim.lr_scheduler.MultiStepLR(optimizer, [5, 8], gamma=0.3)

    def get_callbacks(self, stage: str):
        return {
            "criterion": dl.CriterionCallback(
                metric_key="loss", input_key="logits", target_key="targets"
            ),
            "optimizer": dl.OptimizerCallback(metric_key="loss"),
            "scheduler": dl.SchedulerCallback(loader_key="valid", metric_key="loss"),
            "accuracy": dl.AccuracyCallback(
                input_key="logits", target_key="targets", topk_args=(1, 3, 5)
            ),
            "checkpoint": dl.CheckpointCallback(
                self._logdir,
                loader_key="valid",
                metric_key="accuracy",
                minimize=False,
                save_n_best=1,
            ),
            "tqdm": dl.TqdmCallback(),
        }

    def handle_batch(self, batch):
        x, y = batch
        logits = self.model(x)

        self.batch = {
            "features": x,
            "targets": y,
            "logits": logits,
        }

logdir = f"logs/{datetime.now().strftime('%Y%m%d-%H%M%S')}"
runner = CustomRunner(logdir)
runner.run()

DataParallelEngine

class catalyst.engines.xla.DistributedXLAEngine[source]

Bases: catalyst.engines.torch.DeviceEngine

Distributed XLA MultiTPU training device engine.

Examples:

import os
from datetime import datetime

import torch
from torch import nn, optim
from torch.utils.data import DataLoader

from catalyst import dl
from catalyst.contrib.datasets import CIFAR10
from catalyst.contrib.nn import ResidualBlock
from catalyst.data import transforms

def conv_block(in_channels, out_channels, pool=False):
    layers = [
        nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
        nn.BatchNorm2d(out_channels),
        nn.ReLU(inplace=True),
    ]
    if pool:
        layers.append(nn.MaxPool2d(2))
    return nn.Sequential(*layers)


def resnet9(in_channels: int, num_classes: int, size: int = 16):
    sz, sz2, sz4, sz8 = size, size * 2, size * 4, size * 8
    return nn.Sequential(
        conv_block(in_channels, sz),
        conv_block(sz, sz2, pool=True),
        ResidualBlock(nn.Sequential(conv_block(sz2, sz2), conv_block(sz2, sz2))),
        conv_block(sz2, sz4, pool=True),
        conv_block(sz4, sz8, pool=True),
        ResidualBlock(nn.Sequential(conv_block(sz8, sz8), conv_block(sz8, sz8))),
        nn.Sequential(
            nn.MaxPool2d(4), nn.Flatten(), nn.Dropout(0.2), nn.Linear(sz8, num_classes)
        ),
    )

class CustomRunner(dl.IRunner):
    def __init__(self, logdir):
        super().__init__()
        self._logdir = logdir

    def get_engine(self):
        return dl.DistributedXLAEngine()

    def get_loggers(self):
        return {
            "console": dl.ConsoleLogger(),
            "csv": dl.CSVLogger(logdir=self._logdir),
            "tensorboard": dl.TensorboardLogger(logdir=self._logdir),
        }

    @property
    def stages(self):
        return ["train"]

    def get_stage_len(self, stage: str) -> int:
        return 3

    def get_loaders(self, stage: str):
        transform = transforms.Compose(
            [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
        )
        train_data = CIFAR10(os.getcwd(), train=False, download=True, transform=transform)
        valid_data = CIFAR10(os.getcwd(), train=False, download=True, transform=transform)

        if self.engine.is_ddp:
            train_sampler = torch.utils.data.distributed.DistributedSampler(
                train_data,
                num_replicas=self.engine.world_size,
                rank=self.engine.rank,
                shuffle=True
            )
            valid_sampler = torch.utils.data.distributed.DistributedSampler(
                valid_data,
                num_replicas=self.engine.world_size,
                rank=self.engine.rank,
                shuffle=False
            )
        else:
            train_sampler = valid_sampler = None

        return {
            "train": DataLoader(train_data, batch_size=32, sampler=train_sampler),
            "valid": DataLoader(valid_data, batch_size=32, sampler=valid_sampler),
        }

    def get_model(self, stage: str):
        model = self.model                     if self.model is not None                     else resnet9(in_channels=3, num_classes=10)
        return model

    def get_criterion(self, stage: str):
        return nn.CrossEntropyLoss()

    def get_optimizer(self, stage: str, model):
        return optim.Adam(model.parameters(), lr=1e-3)

    def get_scheduler(self, stage: str, optimizer):
        return optim.lr_scheduler.MultiStepLR(optimizer, [5, 8], gamma=0.3)

    def get_callbacks(self, stage: str):
        return {
            "criterion": dl.CriterionCallback(
                metric_key="loss", input_key="logits", target_key="targets"
            ),
            "optimizer": dl.OptimizerCallback(metric_key="loss"),
            "scheduler": dl.SchedulerCallback(loader_key="valid", metric_key="loss"),
            "accuracy": dl.AccuracyCallback(
                input_key="logits", target_key="targets", topk_args=(1, 3, 5)
            ),
            "checkpoint": dl.CheckpointCallback(
                self._logdir,
                loader_key="valid",
                metric_key="accuracy",
                minimize=False,
                save_n_best=1,
            ),
            "tqdm": dl.TqdmCallback(),
        }

    def handle_batch(self, batch):
        x, y = batch
        logits = self.model(x)

        self.batch = {
            "features": x,
            "targets": y,
            "logits": logits,
        }

logdir = f"logs/{datetime.now().strftime('%Y%m%d-%H%M%S')}"
runner = CustomRunner(logdir)
runner.run()