Shortcuts

Core

Runner

class catalyst.core.runner.IRunner(model: Optional[Union[torch.nn.modules.module.Module, Dict[str, torch.nn.modules.module.Module]]] = None, engine: Optional[catalyst.core.engine.IEngine] = None)[source]

Bases: catalyst.core.callback.ICallback, catalyst.core.logger.ILogger, abc.ABC

An abstraction that contains all the logic of how to run the experiment, stages, epochs, loaders and batches.

IRunner supports the logic for deep learning pipeline configuration with pure python code. Please check the examples for intuition.

Parameters
  • model – Torch model object

  • engine – IEngine instance

Abstraction, please check out implementations for more details:

Note

To learn more about Catalyst Core concepts, please check out

Note

Please follow the minimal examples sections for use cases.

Examples:

import os
from torch import nn, optim
from torch.utils.data import DataLoader
from catalyst import dl, utils
from catalyst.contrib.datasets import MNIST
from catalyst.data import ToTensor


class CustomRunner(dl.IRunner):
    def __init__(self, logdir, device):
        super().__init__()
        self._logdir = logdir
        self._device = device

    def get_engine(self):
        return dl.DeviceEngine(self._device)

    def get_loggers(self):
        return {
            "console": dl.ConsoleLogger(),
            "csv": dl.CSVLogger(logdir=self._logdir),
            "tensorboard": dl.TensorboardLogger(logdir=self._logdir),
        }

    @property
    def stages(self):
        return ["train_freezed", "train_unfreezed"]

    def get_stage_len(self, stage: str) -> int:
        return 3

    def get_loaders(self, stage: str):
        loaders = {
            "train": DataLoader(
                MNIST(os.getcwd(), train=True, download=True, transform=ToTensor()),
                batch_size=32
            ),
            "valid": DataLoader(
                MNIST(os.getcwd(), train=False, download=True, transform=ToTensor()),
                batch_size=32
            ),
        }
        return loaders

    def get_model(self, stage: str):
        model = (
            self.model
            if self.model is not None
            else nn.Sequential(
                nn.Flatten(), nn.Linear(784, 128), nn.ReLU(), nn.Linear(128, 10)
            )
        )
        if stage == "train_freezed":
            # freeze layer
            utils.set_requires_grad(model[1], False)
        else:
            utils.set_requires_grad(model, True)
        return model

    def get_criterion(self, stage: str):
        return nn.CrossEntropyLoss()

    def get_optimizer(self, stage: str, model):
        if stage == "train_freezed":
            return optim.Adam(model.parameters(), lr=1e-3)
        else:
            return optim.SGD(model.parameters(), lr=1e-1)

    def get_scheduler(self, stage: str, optimizer):
        return None

    def get_callbacks(self, stage: str):
        return {
            "criterion": dl.CriterionCallback(
                metric_key="loss", input_key="logits", target_key="targets"
            ),
            "optimizer": dl.OptimizerCallback(metric_key="loss"),
            "accuracy": dl.AccuracyCallback(
                input_key="logits", target_key="targets", topk_args=(1, 3, 5)
            ),
            "classification": dl.PrecisionRecallF1SupportCallback(
                input_key="logits", target_key="targets", num_classes=10
            ),
            "checkpoint": dl.CheckpointCallback(
                self._logdir,
                loader_key="valid",
                metric_key="loss",
                minimize=True,
                save_n_best=3,
            ),
        }

    def handle_batch(self, batch):
        x, y = batch
        logits = self.model(x)

        self.batch = {
            "features": x,
            "targets": y,
            "logits": logits,
        }

runner = CustomRunner("./logs", "cpu")
runner.run()
get_callbacks(stage: str) OrderedDict[str, ICallback][source]

Returns callbacks for a given stage.

Parameters

stage – stage name of interest like “pretrain” / “train” / “finetune” / etc

Returns

Ordered dictionary # noqa: DAR202 with callbacks for current stage.

Return type

OrderedDict[str, Callback]

get_criterion(stage: str) Optional[torch.nn.modules.module.Module][source]

Returns the criterion for a given stage and epoch.

Example:

# for typical classification task
>>> runner.get_criterion(stage="train")
nn.CrossEntropyLoss()
Parameters

stage – stage name of interest like “pretrain” / “train” / “finetune” / etc

Returns: # noqa: DAR201, DAR202

Criterion: criterion for a given stage.

get_datasets(stage: str) OrderedDict[str, Dataset][source]

Returns the datasets for a given stage and epoch. # noqa: DAR401

Note

For Deep Learning cases you have the same dataset during whole stage.

For Reinforcement Learning it’s common to change the dataset (experiment) every training epoch.

Parameters

stage – stage name of interest, like “pretrain” / “train” / “finetune” / etc

Returns: # noqa: DAR202
OrderedDict[str, Dataset]: Ordered dictionary

with datasets for current stage and epoch.

Note

We need ordered dictionary to guarantee the correct dataflow and order of our training datasets. For example, to run train loader before validation one :)

Example:

>>> runner.get_datasets(stage="training")
OrderedDict({
    "train": CsvDataset(in_csv=in_csv_train, ...),
    "valid": CsvDataset(in_csv=in_csv_valid, ...),
})
abstract get_engine() catalyst.core.engine.IEngine[source]

Returns the engine for the run.

abstract get_loaders(stage: str) OrderedDict[str, DataLoader][source]

Returns the loaders for a given stage. # noqa: DAR401

Note

Wrapper for catalyst.core.experiment.IExperiment.get_datasets. For most of your experiments you need to rewrite get_datasets method only.

Parameters

stage – stage name of interest, like “pretrain” / “train” / “finetune” / etc

Returns: # noqa: DAR201, DAR202
OrderedDict[str, DataLoader]: Ordered dictionary

with loaders for current stage and epoch.

get_loggers() Dict[str, catalyst.core.logger.ILogger][source]

Returns the loggers for the run.

abstract get_model(stage: str) torch.nn.modules.module.Module[source]

Returns the model for a given stage and epoch.

Example:

# suppose we have typical MNIST model, like
# nn.Sequential(nn.Linear(28*28, 128), nn.Linear(128, 10))
>>> runner.get_model(stage="train")
Sequential(
 : Linear(in_features=784, out_features=128, bias=True)
 : Linear(in_features=128, out_features=10, bias=True)
)
Parameters

stage – stage name of interest like “pretrain” / “train” / “finetune” / etc

Returns: # noqa: DAR201, DAR202

Model: model for a given stage.

get_optimizer(stage: str, model: torch.nn.modules.module.Module) Optional[torch.optim.optimizer.Optimizer][source]

Returns the optimizer for a given stage and model.

Example:

>>> runner.get_optimizer(model=model, stage="train")
torch.optim.Adam(model.parameters())
Parameters
  • stage – stage name of interest like “pretrain” / “train” / “finetune” / etc

  • model – model to optimize with stage optimizer

Returns: # noqa: DAR201, DAR202

Optimizer: optimizer for a given stage and model.

get_scheduler(stage: str, optimizer: torch.optim.optimizer.Optimizer) Optional[torch.optim.lr_scheduler._LRScheduler][source]

Returns the scheduler for a given stage and optimizer.

Example::
>>> runner.get_scheduler(stage="training", optimizer=optimizer)
torch.optim.lr_scheduler.StepLR(optimizer)
Parameters
  • stage – stage name of interest like “pretrain” / “train” / “finetune” / etc

  • optimizer – optimizer to schedule with stage scheduler

Returns: # noqa: DAR201, DAR202

Scheduler: scheduler for a given stage and optimizer.

get_stage_len(stage: str) int[source]

Returns number of epochs for the selected stage.

Parameters

stage – current stage

Returns

number of epochs in stage

Example:

>>> runner.get_stage_len("pretraining")
3
get_trial() Optional[catalyst.core.trial.ITrial][source]

Returns the trial for the run.

abstract handle_batch(batch: Mapping[str, Any]) None[source]

Inner method to handle specified data batch. Used to make a train/valid/infer stage during Experiment run.

Parameters

batch (Mapping[str, Any]) – dictionary with data batches from DataLoader.

property hparams: collections.OrderedDict

Returns hyper-parameters for current run.

Example::
>>> runner.hparams
OrderedDict([('optimizer', 'Adam'),
 ('lr', 0.02),
 ('betas', (0.9, 0.999)),
 ('eps', 1e-08),
 ('weight_decay', 0),
 ('amsgrad', False),
 ('train_batch_size', 32)])
Returns

dictionary with hyperparameters

log_hparams(*args, **kwargs) None[source]

Logs hyperparameters to available loggers.

log_image(*args, **kwargs) None[source]

Logs image to available loggers.

log_metrics(*args, **kwargs) None[source]

Logs batch, loader and epoch metrics to available loggers.

run() catalyst.core.runner.IRunner[source]

Runs the experiment.

Returns

self, IRunner instance after the experiment

property seed: int

Experiment’s seed for reproducibility.

abstract property stages: Iterable[str]

Run’s stage names.

Example:

>>> runner.stages
["pretraining", "finetuning"]
class catalyst.core.runner.RunnerError[source]

Bases: Exception

Exception class for all runner errors.

Engine

class catalyst.core.engine.IEngine[source]

Bases: abc.ABC

An abstraction that syncs experiment run with different hardware-specific configurations.

  • CPU

  • GPU

  • DataParallel (deepspeed, fairscale, nvidia, torch)

  • AMP (deepspeed, fairscale, nvidia, torch)

  • DDP (deepspeed, fairscale, nvidia, torch)

  • XLA

Abstraction, please check out implementations for more details:

autocast(*args, **kwargs)[source]

AMP scaling context. Default autocast context does not scale anything.

Parameters
  • *args – some args

  • **kwargs – some kwargs

Returns

context

autocast_loader(loader: torch.utils.data.dataloader.DataLoader)[source]

Loader wrapper for the distributed mode.

abstract property backend: Optional[str]

String identifier for distributed backend.

abstract backward_loss(loss: torch.Tensor, model: torch.nn.modules.module.Module, optimizer: torch.optim.optimizer.Optimizer) None[source]

Abstraction over loss.backward() step. Should be overloaded in cases when required loss scaling. Examples - APEX and AMP.

Parameters
  • loss – tensor with loss value.

  • model – model module.

  • optimizer – model optimizer.

barrier() None[source]

Synchronizes all processes.

This collective blocks processes until the all runs enter the function.

cleanup_process()[source]

Clean DDP variables and processes.

ddp_sync_run(function: Callable)[source]

Function wrapper for synchronous run in the distributed mode.

abstract deinit_components(runner=None)[source]

Deinits the runs components. In distributed mode should destroy process group.

abstract property device: Union[str, torch.device]

Pytorch device.

abstract init_components(model_fn: Optional[Callable] = None, criterion_fn: Optional[Callable] = None, optimizer_fn: Optional[Callable] = None, scheduler_fn: Optional[Callable] = None)[source]

Inits the runs components.

property is_ddp: bool

Boolean flag for distributed run.

property is_master_process: bool

Checks if a process is master process. Should be implemented only for distributed training (ddp). For non distributed training should always return True.

Returns

True if current process is a master process in other cases return False.

property is_worker_process: bool

Checks if a process is worker process. Should be implemented only for distributed training (ddp). For non distributed training should always return False.

Returns

True if current process is a worker process in other cases return False.

abstract load_checkpoint(path: str) Dict[source]

Load checkpoint from path.

Parameters

path – checkpoint file to load

abstract optimizer_step(loss: torch.Tensor, model: torch.nn.modules.module.Module, optimizer: torch.optim.optimizer.Optimizer) None[source]

Abstraction over optimizer.step() step. Should be overloaded in cases when required gradient scaling. Example - AMP.

Parameters
  • loss – tensor with loss value.

  • model – model module.

  • optimizer – model optimizer.

abstract pack_checkpoint(model: Optional[torch.nn.modules.module.Module] = None, criterion: Optional[torch.nn.modules.module.Module] = None, optimizer: Optional[torch.optim.optimizer.Optimizer] = None, scheduler: Optional[torch.optim.lr_scheduler._LRScheduler] = None, **kwargs) Dict[source]

Packs model, criterion, optimizer, scheduler and some extra info **kwargs to torch-based checkpoint.

Parameters
  • model – torch model

  • criterion – torch criterion

  • optimizer – torch optimizer

  • scheduler – torch scheduler

  • **kwargs – some extra info to pack

abstract property rank: int

Process rank for distributed training.

abstract save_checkpoint(checkpoint: Dict, path: str) None[source]

Saves checkpoint to a file.

Parameters
  • checkpoint – data to save.

  • path – filepath where checkpoint should be stored.

setup_process(rank: int = - 1, world_size: int = 1)[source]

Initialize DDP variables and processes.

Parameters
  • rank – process rank. Default is -1.

  • world_size – number of devices in netwok to expect for train. Default is 1.

spawn(fn: Callable, *args: Any, **kwargs: Any) None[source]

Spawns abstraction for``nprocs`` creation with specified fn and args/kwargs.

Parameters
  • fn (function) – Function is called as the entrypoint of the spawned process. This function must be defined at the top level of a module so it can be pickled and spawned. This is a requirement imposed by multiprocessing. The function is called as fn(i, *args), where i is the process index and args is the passed through tuple of arguments.

  • *args – Arguments passed to spawn method.

  • **kwargs – Keyword-arguments passed to spawn method.

Returns

wrapped function (if needed).

abstract sync_device(tensor_or_module: Union[Dict, List, Tuple, numpy.ndarray, torch.Tensor, torch.nn.modules.module.Module]) Union[Dict, List, Tuple, torch.Tensor, torch.nn.modules.module.Module][source]

Moves tensor_or_module to Engine’s device.

Parameters

tensor_or_module – tensor to mode

abstract sync_metrics(metrics: Dict) Dict[source]

Syncs metrics over world_size in the distributed mode.

abstract sync_tensor(tensor: torch.Tensor, mode: str) torch.Tensor[source]

Syncs tensor over world_size in the distributed mode.

abstract unpack_checkpoint(checkpoint: Dict, model: Optional[torch.nn.modules.module.Module] = None, criterion: Optional[torch.nn.modules.module.Module] = None, optimizer: Optional[torch.optim.optimizer.Optimizer] = None, scheduler: Optional[torch.optim.lr_scheduler._LRScheduler] = None, **kwargs) None[source]

Load checkpoint from file and unpack the content to a model (if not None), criterion (if not None), optimizer (if not None), scheduler (if not None).

Parameters
  • checkpoint – checkpoint to load

  • model – model where should be updated state

  • criterion – criterion where should be updated state

  • optimizer – optimizer where should be updated state

  • scheduler – scheduler where should be updated state

  • kwargs – extra arguments

abstract property world_size: int

Process world size for distributed training.

abstract zero_grad(loss: torch.Tensor, model: torch.nn.modules.module.Module, optimizer: torch.optim.optimizer.Optimizer) None[source]

Abstraction over model.zero_grad() step. Should be overloaded in cases when required to set arguments for model.zero_grad() like set_to_none=True or you need to use custom scheme which replaces/improves .zero_grad() method.

Parameters
  • loss – tensor with loss value.

  • model – model module.

  • optimizer – model optimizer.

Callback

ICallback

class catalyst.core.callback.ICallback[source]

Bases: object

A callable abstraction for deep learning runs.

on_batch_end(runner: IRunner) None[source]

Event handler for batch end.

Parameters

runner – IRunner instance.

on_batch_start(runner: IRunner) None[source]

Event handler for batch start.

Parameters

runner – IRunner instance.

on_epoch_end(runner: IRunner) None[source]

Event handler for epoch end.

Parameters

runner – IRunner instance.

on_epoch_start(runner: IRunner) None[source]

Event handler for epoch start.

Parameters

runner – IRunner instance.

on_exception(runner: IRunner) None[source]

Event handler for exception case.

Parameters

runner – IRunner instance.

on_experiment_end(runner: IRunner) None[source]

Event handler for experiment end.

Parameters

runner – IRunner instance.

Note

This event work only on IRunner.

on_experiment_start(runner: IRunner) None[source]

Event handler for experiment start.

Parameters

runner – IRunner instance.

Note

This event work only on IRunner.

on_loader_end(runner: IRunner) None[source]

Event handler for loader end.

Parameters

runner – IRunner instance.

on_loader_start(runner: IRunner) None[source]

Event handler for loader start.

Parameters

runner – IRunner instance.

on_stage_end(runner: IRunner) None[source]

Event handler for stage end.

Parameters

runner – IRunner instance.

on_stage_start(runner: IRunner) None[source]

Event handler for stage start.

Parameters

runner – IRunner instance.

CallbackNode

class catalyst.core.callback.CallbackNode(value)[source]

Bases: enum.IntFlag

Callback node usage flag during distributed training.

  • All (0) - use on all nodes, botch master and worker.

  • Master (1) - use only on master node.

  • Worker (2) - use only in worker nodes.

All = 0
Master = 1
Worker = 2
all = 0
master = 1
worker = 2

CallbackOrder

class catalyst.core.callback.CallbackOrder(value)[source]

Bases: enum.IntFlag

Callback usage order during training.

Catalyst executes Callbacks with low CallbackOrder before Callbacks with high CallbackOrder.

Predefined orders:

  • Internal (0) - some Catalyst Extras, like PhaseCallbacks (used in GANs).

  • Metric (20) - Callbacks with metrics and losses computation.

  • MetricAggregation (40) - metrics aggregation callbacks, like sum different losses into one.

  • Optimizer (60) - optimizer step, requires computed metrics for optimization.

  • Scheduler (80) - scheduler step, in ReduceLROnPlateau case requires computed validation metrics for optimizer schedule.

  • External (100) - additional callbacks with custom logic, like InferenceCallbacks

Nevertheless, you always can create CustomCallback with any order, for example:

>>> class MyCustomCallback(Callback):
>>>     def __init__(self):
>>>         super().__init__(order=33)
>>>     ...
# MyCustomCallback will be executed after all `Metric`-Callbacks
# but before all `MetricAggregation`-Callbacks.
External = 100
ExternalExtra = 120
Internal = 0
Metric = 20
MetricAggregation = 40
Optimizer = 60
Scheduler = 80
external = 100
external_extra = 120
internal = 0
metric = 20
metric_aggregation = 40
optimizer = 60
scheduler = 80

CallbackScope

class catalyst.core.callback.CallbackScope(value)[source]

Bases: enum.IntFlag

Callback scope usage flag during training.

  • Stage (0) - use Callback only during one experiment stage.

  • Experiment (1) - use Callback during whole experiment run.

Experiment = 1
Stage = 0
experiment = 1
stage = 0

Callback

class catalyst.core.callback.Callback(order: int, node: int = CallbackNode.All, scope: int = CallbackScope.Stage)[source]

Bases: catalyst.core.callback.ICallback

An abstraction that lets you customize your experiment run logic.

Parameters
  • order – flag from CallbackOrder

  • node – flag from CallbackNode

  • scope – flag from CallbackScope

To give users maximum flexibility and extensibility Catalyst supports callback execution anywhere in the training loop:

-- stage start
---- epoch start
------ loader start
-------- batch start
---------- batch handler (Runner logic)
-------- batch end
------ loader end
---- epoch end
-- stage end

exception – if an Exception was raised

Abstraction, please check out implementations for more details:

Note

To learn more about Catalyst Core concepts, please check out

__init__(order: int, node: int = CallbackNode.All, scope: int = CallbackScope.Stage)[source]

Callback initializer.

CallbackList

class catalyst.core.callback.CallbackList(callbacks: List[catalyst.core.callback.Callback], order: int, node: int = CallbackNode.All, scope: int = CallbackScope.Stage)[source]

Bases: catalyst.core.callback.Callback

Callback wrapper for a list of Callbacks

Parameters
  • callbacks – list of callbacks

  • order – flag from CallbackOrder

  • node – flag from CallbackNode

  • scope – flag from CallbackScope

__init__(callbacks: List[catalyst.core.callback.Callback], order: int, node: int = CallbackNode.All, scope: int = CallbackScope.Stage)[source]

Init.

on_batch_end(runner: IRunner) None[source]

Event handler for batch end.

Parameters

runner – IRunner instance.

on_batch_start(runner: IRunner) None[source]

Event handler for batch start.

Parameters

runner – IRunner instance.

on_epoch_end(runner: IRunner) None[source]

Event handler for epoch end.

Parameters

runner – IRunner instance.

on_epoch_start(runner: IRunner) None[source]

Event handler for epoch start.

Parameters

runner – IRunner instance.

on_exception(runner: IRunner) None[source]

Event handler for exception case.

Parameters

runner – IRunner instance.

on_experiment_end(runner: IRunner) None[source]

Event handler for experiment end.

Parameters

runner – IRunner instance.

Note

This event work only on IRunner.

on_experiment_start(runner: IRunner) None[source]

Event handler for experiment start.

Parameters

runner – IRunner instance.

Note

This event work only on IRunner.

on_loader_end(runner: IRunner) None[source]

Event handler for loader end.

Parameters

runner – IRunner instance.

on_loader_start(runner: IRunner) None[source]

Event handler for loader start.

Parameters

runner – IRunner instance.

on_stage_end(runner: IRunner) None[source]

Event handler for stage end.

Parameters

runner – IRunner instance.

on_stage_start(runner: IRunner) None[source]

Event handler for stage start.

Parameters

runner – IRunner instance.

CallbackWrapper

class catalyst.core.callback.CallbackWrapper(base_callback: catalyst.core.callback.Callback, enable_callback: bool = True)[source]

Bases: catalyst.core.callback.Callback

Enable/disable callback execution.

Parameters
  • base_callback – callback to wrap

  • enable_callback – indicator to enable/disable callback, if True then callback will be enabled, default True

__init__(base_callback: catalyst.core.callback.Callback, enable_callback: bool = True)[source]

Init.

on_exception(runner: IRunner) None[source]

Run base_callback (if possible)

Parameters

runner – current runner

Logger

class catalyst.core.logger.ILogger[source]

Bases: object

An abstraction that syncs experiment run with monitoring tools.

Abstraction, please check out implementations for more details:

close_log(scope: Optional[str] = None) None[source]

Closes the logger.

flush_log() None[source]

Flushes the logger.

log_artifact(tag: str, artifact: object = None, path_to_artifact: Optional[str] = None, scope: Optional[str] = None, run_key: Optional[str] = None, global_epoch_step: int = 0, global_batch_step: int = 0, global_sample_step: int = 0, stage_key: Optional[str] = None, stage_epoch_len: int = 0, stage_epoch_step: int = 0, stage_batch_step: int = 0, stage_sample_step: int = 0, loader_key: Optional[str] = None, loader_batch_len: int = 0, loader_sample_len: int = 0, loader_batch_step: int = 0, loader_sample_step: int = 0) None[source]

Logs artifact (arbitrary file like audio, video, model weights) to the logger.

log_hparams(hparams: Dict, scope: Optional[str] = None, run_key: Optional[str] = None, stage_key: Optional[str] = None) None[source]

Logs hyperparameters to the logger.

log_image(tag: str, image: numpy.ndarray, scope: Optional[str] = None, run_key: Optional[str] = None, global_epoch_step: int = 0, global_batch_step: int = 0, global_sample_step: int = 0, stage_key: Optional[str] = None, stage_epoch_len: int = 0, stage_epoch_step: int = 0, stage_batch_step: int = 0, stage_sample_step: int = 0, loader_key: Optional[str] = None, loader_batch_len: int = 0, loader_sample_len: int = 0, loader_batch_step: int = 0, loader_sample_step: int = 0) None[source]

Logs image to the logger.

log_metrics(metrics: Dict[str, float], scope: Optional[str] = None, run_key: Optional[str] = None, global_epoch_step: int = 0, global_batch_step: int = 0, global_sample_step: int = 0, stage_key: Optional[str] = None, stage_epoch_len: int = 0, stage_epoch_step: int = 0, stage_batch_step: int = 0, stage_sample_step: int = 0, loader_key: Optional[str] = None, loader_batch_len: int = 0, loader_sample_len: int = 0, loader_batch_step: int = 0, loader_sample_step: int = 0) None[source]

Logs metrics to the logger.

Trial

class catalyst.core.trial.ITrial[source]

Bases: abc.ABC

An abstraction that syncs experiment run with different hyperparameter search systems.

Scripts

You can use Catalyst scripts with catalyst-dl in your terminal. For example:

$ catalyst-dl run --help