Shortcuts

Callbacks

Run-based

BackwardCallback

class catalyst.callbacks.backward.BackwardCallback(metric_key: str, grad_clip_fn: Optional[Union[str, Callable]] = None, grad_clip_params: Optional[Dict] = None, log_gradient: bool = False)[source]

Bases: catalyst.core.callback.IBackwardCallback

Optimizer callback, abstraction over backward step.

Parameters
  • metric_key – a key to get loss from runner.batch_metrics

  • grad_clip_fn – callable gradient cliping function or it’s name

  • grad_clip_params – key-value parameters for grad_clip_fn

  • log_gradient – boolean flag to log gradient norm to runner.batch_metrics

Note

Please follow the minimal examples sections for more use cases.

__init__(metric_key: str, grad_clip_fn: Optional[Union[str, Callable]] = None, grad_clip_params: Optional[Dict] = None, log_gradient: bool = False)[source]

Init.

BatchOverfitCallback

class catalyst.callbacks.batch_overfit.BatchOverfitCallback(**kwargs)[source]

Bases: catalyst.core.callback.Callback

Callback to overfit loaders with specified number of batches. By default we use 1 batch for loader.

Parameters

kwargs – loader names and their number of batches to overfit.

For example, if you have train, train_additional, valid and valid_additional loaders and wan’t to overfit train on first 1 batch, train_additional on first 2 batches, valid - on first 20% of batches and valid_additional - on 50% batches:

from catalyst.dl import SupervisedRunner, BatchOverfitCallback
runner = SupervisedRunner()
runner.train(
    ...
    loaders={
        "train": ...,
        "train_additional": ...,
        "valid": ...,
        "valid_additional":...
    }
    ...
    callbacks=[
        ...
        BatchOverfitCallback(
            train_additional=2,
            valid=0.2,
            valid_additional=0.5
        ),
        ...
    ]
    ...
)

Minimal working example

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# data
num_samples, num_features = int(1e4), int(1e1)
X, y = torch.rand(num_samples, num_features), torch.rand(num_samples)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, 1)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [3, 6])

# model training
runner = dl.SupervisedRunner()
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=8,
    verbose=True,
    callbacks=[dl.BatchOverfitCallback(train=10, valid=0.5)]
)
__init__(**kwargs)[source]

Init.

BatchTransformCallback

class catalyst.callbacks.batch_transform.BatchTransformCallback(transform: Union[Callable, str], scope: str, input_key: Optional[Union[List[str], str]] = None, output_key: Optional[Union[List[str], str]] = None, transform_kwargs: Optional[Dict[str, Any]] = None)[source]

Bases: catalyst.core.callback.Callback

Preprocess your batch with specified function.

Parameters
  • transform – Function to apply. If string will get function from registry.

  • scope"on_batch_end" (post-processing model output) or "on_batch_start" (pre-processing model input).

  • input_key – Keys in batch dict to apply function. Defaults to None.

  • output_key – Keys for output. If None then will apply function inplace to keys_to_apply. Defaults to None.

  • transform_kwargs – Kwargs for transform.

Raises

TypeError – When keys is not str or a list. When scope is not in ["on_batch_end", "on_batch_start"].

Examples

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# sample data
num_users, num_features, num_items = int(1e4), int(1e1), 10
X = torch.rand(num_users, num_features)
y = (torch.rand(num_users, num_items) > 0.5).to(torch.float32)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_items)
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = SupervisedRunner()
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    num_epochs=3,
    verbose=True,
    callbacks=[
        dl.BatchTransformCallback(
            input_key="logits", output_key="scores", transform="F.sigmoid"
        ),
        dl.CriterionCallback(
            input_key="logits", target_key="targets", metric_key="loss"
        ),
        dl.OptimizerCallback(metric_key="loss"),
        dl.SchedulerCallback(),
        dl.CheckpointCallback(
            logdir="./logs",
            loader_key="valid",
            metric_key="map01",
            minimize=False
        ),
    ]
)
class CustomRunner(dl.Runner):

    def handle_batch(self, batch):
        logits = self.model(
            batch["features"].view(batch["features"].size(0), -1)
        )

        loss = F.cross_entropy(logits, batch["targets"])
        accuracy01, accuracy03 = metrics.accuracy(
            logits, batch["targets"], topk=(1, 3)
        )
        self.batch_metrics.update({
            "loss": loss,
            "accuracy01":accuracy01,
            "accuracy03": accuracy03
        })

        if self.is_train_loader:
            self.engine.backward(loss)
            self.optimizer.step()
            self.optimizer.zero_grad()


class MnistDataset(torch.utils.data.Dataset):
    def __init__(self, dataset):
        self.dataset = dataset

    def __getitem__(self, item):
        return {
            "features": self.dataset[item][0],
            "targets": self.dataset[item][1]
        }

    def __len__(self):
        return len(self.dataset)

model = torch.nn.Linear(28 * 28, 10)
optimizer = torch.optim.Adam(model.parameters(), lr=0.02)

loaders = {
    "train": DataLoader(
        MnistDataset(
            MNIST(os.getcwd(), train=False)
        ),
        batch_size=32,
    ),
    "valid": DataLoader(
        MnistDataset(
            MNIST(os.getcwd(), train=False)
        ),
        batch_size=32,
    ),
}

transrorms = [
    augmentation.RandomAffine(degrees=(-15, 20), scale=(0.75, 1.25)),
]

runner = CustomRunner()

# model training
runner.train(
    model=model,
    optimizer=optimizer,
    loaders=loaders,
    logdir="./logs",
    num_epochs=5,
    verbose=False,
    load_best_on_end=True,
    check=True,
    callbacks=[
        BatchTransformCallback(
            transform=transrorms,
            scope="on_batch_start",
            input_key="features"
        )
    ],
)
...
callbacks:
    transform:
        _target_: BatchTransformCallback
        transform: catalyst.ToTensor
        scope: on_batch_start
        input_key: features
__init__(transform: Union[Callable, str], scope: str, input_key: Optional[Union[List[str], str]] = None, output_key: Optional[Union[List[str], str]] = None, transform_kwargs: Optional[Dict[str, Any]] = None)[source]

Preprocess your batch with specified function.

Parameters
  • transform – Function to apply. If string will get function from registry.

  • scope"on_batch_end" (post-processing model output) or "on_batch_start" (pre-processing model input).

  • input_key – Keys in batch dict to apply function. Defaults to None.

  • output_key – Keys for output. If None then will apply function inplace to keys_to_apply. Defaults to None.

  • transform_kwargs – Kwargs for transform.

Raises

TypeError – When keys is not str or a list. When scope is not in ["on_batch_end", "on_batch_start"].

CheckpointCallback

class catalyst.callbacks.checkpoint.CheckpointCallback(logdir: str, loader_key: Optional[str] = None, metric_key: Optional[str] = None, minimize: Optional[bool] = None, topk: int = 1, mode: str = 'model', save_last: bool = True, save_best: bool = True, resume_model: Optional[str] = None, resume_runner: Optional[str] = None, load_best_on_end: bool = False)[source]

Bases: catalyst.core.callback.ICheckpointCallback

Checkpoint callback to save/restore your model/runner.

Parameters
  • logdir – directory to store checkpoints

  • loader_key – loader key for best model selection (based on metric score over the dataset)

  • metric_key – metric key for best model selection (based on metric score over the dataset)

  • minimize – boolean flag to minimize the required metric

  • topk – number of best checkpoint to keep

  • mode – checkpoint type to save, model or runner. (default: model)

  • save_last – boolean flag to save extra last checkpoint as {mode}.last.pth

  • save_best – boolean flag to save extra best checkpoint as {mode}.best.pth

  • resume_model – path to model checkpoint to load on experiment start

  • resume_runner – path to runner checkpoint to load on experiment start

  • load_best_on_end – boolean flag to load best model on experiment end

__init__(logdir: str, loader_key: Optional[str] = None, metric_key: Optional[str] = None, minimize: Optional[bool] = None, topk: int = 1, mode: str = 'model', save_last: bool = True, save_best: bool = True, resume_model: Optional[str] = None, resume_runner: Optional[str] = None, load_best_on_end: bool = False)[source]

Init.

on_epoch_end_best(runner: catalyst.core.runner.IRunner) None[source]

Event handler.

on_epoch_end_last(runner: catalyst.core.runner.IRunner) None[source]

Event handler.

CheckRunCallback

class catalyst.callbacks.misc.CheckRunCallback(num_batch_steps: int = 3, num_epoch_steps: int = 3)[source]

Bases: catalyst.core.callback.Callback

Executes only a pipeline part from the run.

Parameters
  • num_batch_steps – number of batches to iterate in epoch

  • num_epoch_steps – number of epoch to perform in an experiment

Minimal working example (Notebook API):

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# data
num_samples, num_features = int(1e4), int(1e1)
X, y = torch.rand(num_samples, num_features), torch.rand(num_samples)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, 1)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [3, 6])

# model training
runner = dl.SupervisedRunner()
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=8,
    verbose=True,
    callbacks=[
        dl.CheckRunCallback(num_batch_steps=3, num_epoch_steps=3)
    ]
)
__init__(num_batch_steps: int = 3, num_epoch_steps: int = 3)[source]

Init.

ControlFlowCallbackWrapper

class catalyst.callbacks.control_flow.ControlFlowCallbackWrapper(base_callback: catalyst.core.callback.Callback, epochs: Optional[Union[int, Sequence[int]]] = None, ignore_epochs: Optional[Union[int, Sequence[int]]] = None, loaders: Optional[Union[str, Sequence[str], Mapping[str, Union[int, Sequence[int]]]]] = None, ignore_loaders: Optional[Union[str, Sequence[str], Mapping[str, Union[int, Sequence[int]]]]] = None, filter_fn: Optional[Union[str, Callable[[int, str], bool]]] = None)[source]

Bases: catalyst.core.callback.CallbackWrapper

Enable/disable callback execution on different epochs and loaders.

Parameters
  • base_callback – callback to wrap

  • epochs

    epochs where need to enable callback, on other epochs callback will be disabled.

    If passed int/float then callback will be enabled with period specified as epochs value (epochs expression epoch_number % epochs == 0) and disabled on other epochs.

    If passed list of epochs then will be executed callback on specified epochs.

    Default value is None.

  • ignore_epochs:

    epochs where: need to disable callback, on other epochs callback will be enabled.

    If passed int/float then callback will be disabled with period specified as epochs value (epochs expression epoch_number % epochs != 0) and enabled on other epochs.

    If passed list of epochs then will be disabled callback on specified epochs.

    Default value is None.

  • loaders (str/Sequence[str]/Mapping[str, int/Sequence[str]]) –

    loaders where should be enabled callback, on other loaders callback will be disabled.

    If passed string object then will be disabled callback for loader with specified name.

    If passed list/tuple of strings then will be disabled callback for loaders with specified names.

    If passed dictionary where key is a string and values int or list of integers then callback will be disabled on epochs (dictionary value) for specified loader (dictionary key).

    Default value is None.

  • ignore_loaders (str/Sequence[str]/Mapping[str, int/Sequence[str]]) –

    loader names where should be disabled callback, on other loaders callback will be enabled.

    If passed string object then will be disabled callback for loader with specified name.

    If passed list/tuple of strings then will be disabled callback for loaders with specified names.

    If passed dictionary where key is a string and values int or list of integers then callback will be disabled on epochs (dictionary value) for specified loader (dictionary key).

    Default value is None.

  • filter_fn (str or Callable[[int, str], bool]) –

    function to use instead of loaders or epochs arguments.

    If the object passed to a filter_fn is a string then it will be interpreted as python code. Expected lambda function with two arguments: epoch number (int) and loader name (str). This function should return True if callback should be enabled on some condition.

    If passed callable object then it should accept two arguments: epoch number (int) and loader name (str). It should return True if callback should be enabled on some condition othervise should return False.

    Default value is None.

    Examples:

    # enable callback on all loaders
    # exept "train" loader every 2 epochs
    ControlFlowCallback(
        ...
        filter_fn=lambda e, l: l != "train" and e % 2 == 0
        ...
    )
    # or with string equivalent
    ControlFlowCallback(
        ...
        filter_fn="lambda e, l: l != 'train' and e % 2 == 0"
        ...
    )
    

Note

Please run experiment with check option to check if everything works as expected with this callback.

For example, if you don’t want to compute loss on a validation you can ignore CriterionCallback, for notebook API need to wrap callback:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst.dl import (
    SupervisedRunner, AccuracyCallback,
    CriterionCallback, ControlFlowCallback,
)

num_samples, num_features = 10_000, 10
n_classes = 10
X = torch.rand(num_samples, num_features)
y = torch.randint(0, n_classes, [num_samples])
loader = DataLoader(TensorDataset(X, y), batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

model = torch.nn.Linear(num_features, n_classes)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [3, 6])

runner = SupervisedRunner()
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=5,
    verbose=False,
    valid_metric="accuracy03",
    minimize_metric=False,
    callbacks=[
        AccuracyCallback(
            accuracy_args=[1, 3, 5]
        ),
        ControlFlowCallback(
            base_callback=CriterionCallback(),
            ignore_loaders="valid"  # or loaders="train"
        )
    ]
)
__init__(base_callback: catalyst.core.callback.Callback, epochs: Optional[Union[int, Sequence[int]]] = None, ignore_epochs: Optional[Union[int, Sequence[int]]] = None, loaders: Optional[Union[str, Sequence[str], Mapping[str, Union[int, Sequence[int]]]]] = None, ignore_loaders: Optional[Union[str, Sequence[str], Mapping[str, Union[int, Sequence[int]]]]] = None, filter_fn: Optional[Union[str, Callable[[int, str], bool]]] = None)[source]

Init.

CriterionCallback

class catalyst.callbacks.criterion.CriterionCallback(input_key: str, target_key: str, metric_key: str, criterion_key: Optional[str] = None, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metrics.functional_metric.FunctionalMetricCallback, catalyst.core.callback.ICriterionCallback

Criterion callback, abstraction over criterion step.

Parameters
  • input_key – input key to use for metric calculation, specifies our y_pred

  • target_key – output key to use for metric calculation, specifies our y_true

  • metric_key – key to store computed metric in runner.batch_metrics dictionary

  • criterion_key – A key to take a criterion in case there are several of them, and they are in a dictionary format.

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, metric_key: str, criterion_key: Optional[str] = None, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

EarlyStoppingCallback

class catalyst.callbacks.misc.EarlyStoppingCallback(patience: int, loader_key: str, metric_key: str, minimize: bool, min_delta: float = 1e-06)[source]

Bases: catalyst.callbacks.misc.IEpochMetricHandlerCallback

Early stop based on metric.

Parameters
  • patience – number of epochs with no improvement after which training will be stopped.

  • loader_key – loader key for early stopping (based on metric score over the dataset)

  • metric_key – metric key for early stopping (based on metric score over the dataset)

  • minimize – if True then expected that metric should decrease and early stopping will be performed only when metric stops decreasing. If False then expected that metric should increase. Default value True.

  • min_delta – minimum change in the monitored metric to qualify as an improvement, i.e. an absolute change of less than min_delta, will count as no improvement, default value is 1e-6.

Minimal working example (Notebook API):

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# data
num_samples, num_features = int(1e4), int(1e1)
X, y = torch.rand(num_samples, num_features), torch.rand(num_samples)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, 1)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [3, 6])

# model training
runner = dl.SupervisedRunner()
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=100,
    callbacks=[
        dl.EarlyStoppingCallback(
            loader_key="valid",
            metric_key="loss",
            minimize=True,
            patience=3,
            min_delta=1e-2
        )
    ]
)
__init__(patience: int, loader_key: str, metric_key: str, minimize: bool, min_delta: float = 1e-06)[source]

Init.

LRFinder

class catalyst.callbacks.scheduler.LRFinder(final_lr: float, scale: str = 'log', num_steps: Optional[int] = None, optimizer_key: Optional[str] = None)[source]

Bases: catalyst.callbacks.scheduler.ILRUpdater

Helps you find an optimal learning rate for a model, as per suggestion of Cyclical Learning Rates for Training Neural Networks paper. Learning rate is increased in linear or log scale, depending on user input.

See How Do You Find A Good Learning Rate article for details.

__init__(final_lr: float, scale: str = 'log', num_steps: Optional[int] = None, optimizer_key: Optional[str] = None)[source]
Parameters
  • final_lr – final learning rate to try with

  • scale – learning rate increasing scale (“log” or “linear”)

  • num_steps – number of batches to try, if None - whole loader would be used.

  • optimizer_key – which optimizer key to use for learning rate scheduling

Raises

NotImplementedError – if invalid scale value.

MetricAggregationCallback

class catalyst.callbacks.metric_aggregation.MetricAggregationCallback(metric_key: str, metrics: Optional[Union[str, List[str], Dict[str, float]]] = None, mode: Union[str, Callable] = 'mean', scope: str = 'batch', multiplier: float = 1.0)[source]

Bases: catalyst.core.callback.Callback

A callback to aggregate several metrics in one value.

Parameters
  • metric_key – new key for aggregated metric.

  • metrics (Union[str, List[str], Dict[str, float]]) – If not None, it aggregates only the values from the metric by these keys. for weighted_sum aggregation it must be a Dict[str, float].

  • mode – function for aggregation. Must be either sum, mean or weighted_sum or user’s function to aggregate metrics. This function must get dict of metrics and runner and return aggregated metric. It can be useful for complicated fine tuning with different losses that depends on epochs and loader or something also

  • scope – type of metric. Must be either batch or loader

  • multiplier – scale factor for the aggregated metric.

Python example - loss is a weighted sum of cross entropy loss and binary cross entropy loss:

import torch
from torch.utils.data import DataLoader, TensorDataset

from catalyst import dl

# data
num_samples, num_features, num_classes = int(1e4), int(1e1), 4
X = torch.rand(num_samples, num_features)
y = (torch.rand(num_samples,) * num_classes).to(torch.int64)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_classes)
criterion = {
    "ce": torch.nn.CrossEntropyLoss(),
    "bce": torch.nn.BCEWithLogitsLoss()
}
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# runner
class CustomRunner(dl.Runner):
    def handle_batch(self, batch):
        x, y = batch
        logits = self.model(x)
        num_classes = logits.shape[-1]
        targets_onehot = torch.nn.functional.one_hot(y, num_classes=num_classes)
        self.batch = {
            "features": x,
            "logits": logits,
            "targets": y,
            "targets_onehot": targets_onehot.float(),
        }


# training
runner = CustomRunner()
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    valid_loader="valid",
    valid_metric="loss",
    minimize_valid_metric=True,
    num_epochs=3,
    callbacks=[
        dl.AccuracyCallback(
            input_key="logits", target_key="targets", num_classes=num_classes
        ),
        dl.CriterionCallback(
            input_key="logits",
            target_key="targets",
            metric_key="loss_ce",
            criterion_key="ce",
        ),
        dl.CriterionCallback(
            input_key="logits",
            target_key="targets_onehot",
            metric_key="loss_bce",
            criterion_key="bce",
        ),
        # loss aggregation
        dl.MetricAggregationCallback(
            metric_key="loss",
            metrics={"loss_ce": 0.6, "loss_bce": 0.4},
            mode="weighted_sum",
        ),
        dl.OptimizerCallback(metric_key="loss"),
    ],
)
__init__(metric_key: str, metrics: Optional[Union[str, List[str], Dict[str, float]]] = None, mode: Union[str, Callable] = 'mean', scope: str = 'batch', multiplier: float = 1.0) None[source]

Init.

MixupCallback

class catalyst.callbacks.mixup.MixupCallback(keys: Union[str, List[str]], alpha=0.2, mode='replace', on_train_only=True)[source]

Bases: catalyst.core.callback.Callback

Callback to do mixup augmentation. More details about mixin can be found in the paper mixup: Beyond Empirical Risk Minimization: https://arxiv.org/abs/1710.09412 .

Parameters
  • keys – batch keys to which you want to apply augmentation

  • alpha – beta distribution a=b parameters. Must be >=0. The more alpha closer to zero the less effect of the mixup.

  • mode – mode determines the method of use. Must be in [“replace”, “add”]. If “replace” then replaces the batch with a mixed one, while the batch size is not changed. If “add”, concatenates mixed examples to the current ones, the batch size increases by 2 times.

  • on_train_only – apply to train only. As the mixup use the proxy inputs, the targets are also proxy. We are not interested in them, are we? So, if on_train_only is True use a standard output/metric for validation.

Examples:

from typing import Any, Dict
import os

import numpy as np
import torch
from torch import nn
from torch.utils.data import DataLoader

from catalyst import dl
from catalyst.callbacks import MixupCallback
from catalyst.contrib.datasets import MNIST


class SimpleNet(nn.Module):
    def __init__(self, in_channels, in_hw, out_features):
        super().__init__()
        self.encoder = nn.Sequential(nn.Conv2d(in_channels,
                                               in_channels, 3, 1, 1), nn.Tanh())
        self.clf = nn.Linear(in_channels * in_hw * in_hw, out_features)

    def forward(self, x):
        features = self.encoder(x)
        features = features.view(features.size(0), -1)
        logits = self.clf(features)
        return logits


class SimpleDataset(torch.utils.data.Dataset):
    def __init__(self, train: bool = False):
        self.mnist = MNIST(os.getcwd(), train=train)

    def __len__(self) -> int:
        return len(self.mnist)

    def __getitem__(self, idx: int) -> Dict[str, Any]:
        x, y = self.mnist.__getitem__(idx)
        y_one_hot = np.zeros(10)
        y_one_hot[y] = 1
        return {"image": x,
                "clf_targets": y,
                "clf_targets_one_hot": torch.Tensor(y_one_hot)}


model = SimpleNet(1, 28, 10)
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.02)

loaders = {
    "train": DataLoader(SimpleDataset(train=True), batch_size=32),
    "valid": DataLoader(SimpleDataset(train=False), batch_size=32),
}


class CustomRunner(dl.Runner):
    def handle_batch(self, batch):
        image = batch["image"]
        clf_logits = self.model(image)
        self.batch["clf_logits"] = clf_logits


runner = CustomRunner()
runner.train(
    loaders=loaders,
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    logdir="./logdir14",
    num_epochs=2,
    verbose=True,
    valid_loader="valid",
    valid_metric="loss",
    minimize_valid_metric=True,
    callbacks={
        "mixup": MixupCallback(keys=["image", "clf_targets_one_hot"]),
        "criterion": dl.CriterionCallback(
            metric_key="loss",
            input_key="clf_logits",
            target_key="clf_targets_one_hot"
        ),
        "backward": dl.BackwardCallback(metric_key="loss"),
        "optimizer": dl.OptimizerCallback(metric_key="loss"),
        "classification": dl.ControlFlowCallback(
            dl.PrecisionRecallF1SupportCallback(
                input_key="clf_logits", target_key="clf_targets", num_classes=10
            ),
            ignore_loaders="train",
        ),
    },
)
__init__(keys: Union[str, List[str]], alpha=0.2, mode='replace', on_train_only=True)[source]

Init.

OptimizerCallback

class catalyst.callbacks.optimizer.OptimizerCallback(metric_key: str, optimizer_key: Optional[str] = None, accumulation_steps: int = 1, grad_clip_fn: Optional[Union[str, Callable]] = None, grad_clip_params: Optional[Dict] = None)[source]

Bases: catalyst.core.callback.IOptimizerCallback

Optimizer callback, abstraction over optimizer step.

Parameters
  • metric_key – a key to get loss from runner.batch_metrics

  • model_key – a key to select a model from runner.model in case there are several of them and they are in a dictionary format.

  • optimizer_key – a key to select a optimizer from runner.optimizer in case there are several of them and they are in a dictionary format.

  • accumulation_steps – number of steps before optimizer.step()

  • grad_clip_fn – callable gradient cliping function or it’s name or

  • grad_clip_params – key-value parameters for grad_clip_fn

Note

Please follow the minimal examples sections for more use cases.

__init__(metric_key: str, optimizer_key: Optional[str] = None, accumulation_steps: int = 1, grad_clip_fn: Optional[Union[str, Callable]] = None, grad_clip_params: Optional[Dict] = None)[source]

Init.

OptunaPruningCallback

class catalyst.callbacks.optuna.OptunaPruningCallback(trial: optuna.trial._trial.Trial, loader_key: str, metric_key: str, minimize: bool, min_delta: float = 1e-06)[source]

Bases: catalyst.core.callback.Callback

Optuna callback for pruning unpromising runs. This callback can be used for early stopping (pruning) unpromising runs.

Parameters
  • trial – Optuna.Trial for the experiment.

  • loader_key – loader key for best model selection (based on metric score over the dataset)

  • metric_key – metric key for best model selection (based on metric score over the dataset)

  • minimize – boolean flag to minimize the required metric

  • min_delta – minimal delta for metric improve

import optuna

from catalyst.dl import SupervisedRunner, OptunaPruningCallback

# some python code ...

def objective(trial: optuna.Trial):
    # standard optuna code for model and/or optimizer suggestion ...
    runner = SupervisedRunner()
    runner.train(
        model=model,
        loaders=loaders,
        criterion=criterion,
        optimizer=optimizer,
        callbacks=[
            OptunaPruningCallback(trial)
            # some other callbacks ...
        ],
        num_epochs=num_epochs,
    )
    return runner.best_valid_metrics[runner.valid_metric]

study = optuna.create_study()
study.optimize(objective, n_trials=100, timeout=600)
__init__(trial: optuna.trial._trial.Trial, loader_key: str, metric_key: str, minimize: bool, min_delta: float = 1e-06)[source]

Init.

PeriodicLoaderCallback

class catalyst.callbacks.periodic_loader.PeriodicLoaderCallback(valid_loader_key: str, valid_metric_key: str, minimize: bool, **kwargs)[source]

Bases: catalyst.core.callback.Callback

Callback for runing loaders with specified period. To disable loader use 0 as period (if specified 0 for validation loader then will be raised an error).

Parameters

kwargs – loader names and their run periods.

For example, if you have train, train_additional, valid and valid_additional loaders and wan’t to use train_additional every 2 epochs, valid - every 3 epochs and valid_additional - every 5 epochs:

from catalyst.dl import SupervisedRunner, PeriodicLoaderCallback
runner = SupervisedRunner()
runner.train(
    ...
    loaders={
        "train": ...,
        "train_additional": ...,
        "valid": ...,
        "valid_additional":...
    }
    ...
    callbacks=[
        ...
        PeriodicLoaderCallback(
            train_additional=2,
            valid=3,
            valid_additional=5
        ),
        ...
    ]
    ...
)
__init__(valid_loader_key: str, valid_metric_key: str, minimize: bool, **kwargs)[source]

Init.

ProfilerCallback

class catalyst.callbacks.profiler.ProfilerCallback(loader_key: Optional[str] = None, epoch: int = 1, num_batches: Optional[int] = None, profiler_kwargs: Optional[Dict[str, Any]] = None, tensorboard_path: Optional[str] = None, export_chrome_trace_path: Optional[str] = None, export_stacks_kwargs: Optional[Dict[str, Any]] = None)[source]

Bases: catalyst.core.callback.Callback

Profile specified epoch or some fixed number of batches.

Parameters
  • loader_key – name of the loader to use for profiling. If None then will be used first loader from experiment.

  • epoch – epoch number to use for profiling.

  • num_batches – number of batches to use in epoch to do a profiling. If None then will be used all batches in loader.

  • profiler_kwargs – arguments to pass to a profiler. To get more info about possible arguments please use PyTorch profiler docs.

  • tensorboard_path – path where should be stored logs for tensorboard. If None then will be ignored.

  • export_chrome_trace_path – path to export chrome trace. If None then will be ignored exporting chrome trace to a file.

  • export_stacks_kwargs – arguments to pass to a profiler.export_stacks method. If None then triggering profiler.export_stacks will be avoided.

Example of using FlameGraph tool for profiler.export_stacks:

git clone https://github.com/brendangregg/FlameGraph
cd FlameGraph
./flamegraph.pl –title “CPU time” –countname “us.” profiler.stacks > perf_viz.svg

Note

Export to tensorboard and chrome trace mutually exclusive and specifying both of them will raise an error.

Example

import os

import torch
from torch import nn
from torch.utils.data import DataLoader

from catalyst import dl
from catalyst.data import ToTensor
from catalyst.contrib.datasets import MNIST

loaders = {
    "train": DataLoader(MNIST(os.getcwd(), train=False), batch_size=32),
    "valid": DataLoader(MNIST(os.getcwd(), train=False), batch_size=32),
}

model = nn.Sequential(nn.Flatten(), nn.Linear(784, 10))
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2)
runner = dl.SupervisedRunner()
runner.train(
    model=model,
    callbacks=[dl.ProfilerCallback(
        loader_key="train", epoch=3,
        profiler_kwargs=dict(
            activities=[
                torch.profiler.ProfilerActivity.CPU,
                torch.profiler.ProfilerActivity.CUDA,
            ],
            on_trace_ready=torch.profiler.tensorboard_trace_handler(
                "./logs/tb_profile"
            ),
            with_stack=True,
            with_flops=True,
        )
    )],
    loaders=loaders,
    criterion=criterion,
    optimizer=optimizer,
    num_epochs=5,
    logdir="./logs",
)

SchedulerCallback

class catalyst.callbacks.scheduler.SchedulerCallback(scheduler_key: Optional[str] = None, mode: Optional[str] = None, loader_key: Optional[str] = None, metric_key: Optional[str] = None)[source]

Bases: catalyst.core.callback.ISchedulerCallback

Scheduler callback, abstraction over scheduler step.

Parameters
  • scheduler_key – scheduler name, if None, default is None.

  • mode – scheduler mode, should be one of "epoch" or "batch", default is None. If None and object is instance of BatchScheduler or OneCycleLRWithWarmup then will be used "batch" otherwise - "epoch".

  • loader_key – loader name to look after for ReduceLROnPlateau scheduler

  • metric_key – metric name to forward to scheduler object, if None then will be used main metric specified in experiment.

Note

Please follow the minimal examples sections for more use cases.

__init__(scheduler_key: Optional[str] = None, mode: Optional[str] = None, loader_key: Optional[str] = None, metric_key: Optional[str] = None)[source]

Init.

TimerCallback

class catalyst.callbacks.misc.TimerCallback[source]

Bases: catalyst.core.callback.Callback

Logs pipeline execution time.

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# data
num_samples, num_features = int(1e4), int(1e1)
X, y = torch.rand(num_samples, num_features), torch.rand(num_samples)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, 1)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [3, 6])

# model training
runner = dl.SupervisedRunner()
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=1,
    verbose=True,
    callbacks=[dl.TimerCallback()]
)

You should see additional extra metrics, such as:

  • _timer/_fps - number handled samples per second during run.

  • _timer/batch_time - time required for single batch handling.

  • _timer/data_time - time required for single batch data preparation handling.

  • _timer/model_time - time required for single batch model forwarding.

Moreover, you could use it throught timeit=True flag:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# data
num_samples, num_features = int(1e4), int(1e1)
X, y = torch.rand(num_samples, num_features), torch.rand(num_samples)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, 1)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [3, 6])

# model training
runner = dl.SupervisedRunner()
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=1,
    verbose=True,
    timeit=True,
)
__init__()[source]

Init.

TqdmCallback

class catalyst.callbacks.misc.TqdmCallback[source]

Bases: catalyst.core.callback.Callback

Logs the params into tqdm console.

Minimal working example (Notebook API):

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# data
num_samples, num_features = int(1e4), int(1e1)
X, y = torch.rand(num_samples, num_features), torch.rand(num_samples)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, 1)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [3, 6])

# model training
runner = dl.SupervisedRunner()
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=1,
    callbacks=[dl.TqdmCallback()]
)

You should see a tqdm progress bar during the training.

Moreover, you could use it throught verbose=True flag:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# data
num_samples, num_features = int(1e4), int(1e1)
X, y = torch.rand(num_samples, num_features), torch.rand(num_samples)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, 1)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [3, 6])

# model training
runner = dl.SupervisedRunner()
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=1,
    verbose=True,
)
on_exception(runner: catalyst.core.runner.IRunner)[source]

Called if an Exception was raised.

Metric-based Interfaces

BatchMetricCallback

class catalyst.callbacks.metric.BatchMetricCallback(metric: catalyst.metrics._metric.ICallbackBatchMetric, input_key: Union[str, Iterable[str], Dict[str, str]], target_key: Union[str, Iterable[str], Dict[str, str]], log_on_batch: bool = True)[source]

Bases: catalyst.callbacks.metric._MetricCallback

BatchMetricCallback implements batch-based metrics update and computation over loader

Parameters
  • metric – metric to calculate in callback

  • input_key – keys of tensors that should be used as inputs in metric calculation

  • target_key – keys of tensors that should be used as targets in metric calculation

  • log_on_batch – boolean flag to log computed metrics every batch

__init__(metric: catalyst.metrics._metric.ICallbackBatchMetric, input_key: Union[str, Iterable[str], Dict[str, str]], target_key: Union[str, Iterable[str], Dict[str, str]], log_on_batch: bool = True) None[source]

Init BatchMetricCallback

LoaderMetricCallback

class catalyst.callbacks.metric.LoaderMetricCallback(metric: catalyst.metrics._metric.ICallbackLoaderMetric, input_key: Union[str, Iterable[str], Dict[str, str]], target_key: Union[str, Iterable[str], Dict[str, str]])[source]

Bases: catalyst.callbacks.metric._MetricCallback

LoaderMetricCallback implements loader-based metrics update and computation over loader

Parameters
  • metric – metric to calculate in callback

  • input_key – keys of tensors that should be used as inputs in metric calculation

  • target_key – keys of tensors that should be used as targets in metric calculation

Metric-based

AccuracyCallback

class catalyst.callbacks.metrics.accuracy.AccuracyCallback(input_key: str, target_key: str, topk: Optional[Iterable[int]] = None, num_classes: Optional[int] = None, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

Accuracy metric callback. Computes multiclass accuracy@topk for the specified values of topk.

Parameters
  • input_key – input key to use for metric calculation, specifies our y_pred

  • target_key – output key to use for metric calculation, specifies our y_true

  • topk – specifies which accuracy@K to log

  • num_classes – number of classes to calculate topk if accuracy_args is None

  • log_on_batch – boolean flag to log computed metrics every batch

  • prefix – metric prefix

  • suffix – metric suffix

Examples:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# sample data
num_samples, num_features, num_classes = int(1e4), int(1e1), 4
X = torch.rand(num_samples, num_features)
y = (torch.rand(num_samples,) * num_classes).to(torch.int64)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_classes)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = dl.SupervisedRunner(
    input_key="features",
    output_key="logits",
    target_key="targets",
    loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=3,
    valid_loader="valid",
    valid_metric="accuracy03",
    minimize_valid_metric=False,
    verbose=True,
    callbacks=[
        dl.AccuracyCallback(
            input_key="logits", target_key="targets", num_classes=num_classes
        ),
        dl.PrecisionRecallF1SupportCallback(
            input_key="logits", target_key="targets", num_classes=num_classes
        ),
        dl.AUCCallback(input_key="logits", target_key="targets"),
    ],
)

Note

Metric names depending on input parameters:

You can find them in runner.batch_metrics, runner.loader_metrics or runner.epoch_metrics.

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, topk: Optional[Iterable[int]] = None, num_classes: Optional[int] = None, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

AUCCallback

class catalyst.callbacks.metrics.auc.AUCCallback(input_key: str, target_key: str, compute_per_class_metrics: bool = False, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.LoaderMetricCallback

ROC-AUC metric callback.

Parameters
  • input_key – input key to use for auc calculation, specifies our y_true.

  • target_key – output key to use for auc calculation, specifies our y_pred.

  • compute_per_class_metrics – boolean flag to compute per-class metrics (default: SETTINGS.compute_per_class_metrics or False).

  • prefix – metric prefix

  • suffix – metric suffix

Examples:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# sample data
num_samples, num_features, num_classes = int(1e4), int(1e1), 4
X = torch.rand(num_samples, num_features)
y = (torch.rand(num_samples,) * num_classes).to(torch.int64)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_classes)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = dl.SupervisedRunner(
    input_key="features",
    output_key="logits",
    target_key="targets",
    loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=3,
    valid_loader="valid",
    valid_metric="accuracy03",
    minimize_valid_metric=False,
    verbose=True,
    callbacks=[
        dl.AccuracyCallback(
            input_key="logits", target_key="targets", num_classes=num_classes
        ),
        dl.PrecisionRecallF1SupportCallback(
            input_key="logits", target_key="targets", num_classes=num_classes
        ),
        dl.AUCCallback(input_key="logits", target_key="targets"),
    ],
)

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, compute_per_class_metrics: bool = False, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

CMCScoreCallback

class catalyst.callbacks.metrics.cmc_score.CMCScoreCallback(embeddings_key: str, labels_key: str, is_query_key: str, topk: Optional[Iterable[int]] = None, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.LoaderMetricCallback

Cumulative Matching Characteristics callback.

This callback was designed to count cumulative matching characteristics. If current object is from query your dataset should output True in is_query_key and false if current object is from gallery. You can see QueryGalleryDataset in catalyst.contrib.datasets.metric_learning for more information. On batch end callback accumulate all embeddings

Parameters
  • embeddings_key – embeddings key in output dict

  • labels_key – labels key in output dict

  • is_query_key – bool key True if current object is from query

  • topk – specifies which cmc@K to log

  • prefix – metric prefix

  • suffix – metric suffix

Note

You should use it with ControlFlowCallback and add all query/gallery sets to loaders. Loaders should contain “is_query” and “label” key.

Examples:

import os
from torch.optim import Adam
from torch.utils.data import DataLoader
from catalyst import data, dl
from catalyst.contrib import data, datasets, models, nn


# 1. train and valid loaders
transforms = data.Compose([
    data.ImageToTensor(), data.NormalizeImage((0.1307,), (0.3081,))
])

train_dataset = datasets.MnistMLDataset(
    root=os.getcwd(), download=True, transform=transforms
    )
sampler = data.BatchBalanceClassSampler(
    labels=train_dataset.get_labels(), num_classes=5, num_samples=10
)
train_loader = DataLoader(dataset=train_dataset, batch_sampler=sampler)

valid_dataset = datasets.MnistQGDataset(
    root=os.getcwd(), transform=transforms, gallery_fraq=0.2
)
valid_loader = DataLoader(dataset=valid_dataset, batch_size=1024)

# 2. model and optimizer
model = models.MnistSimpleNet(out_features=16)
optimizer = Adam(model.parameters(), lr=0.001)

# 3. criterion with triplets sampling
sampler_inbatch = data.HardTripletsSampler(norm_required=False)
criterion = nn.TripletMarginLossWithSampler(
    margin=0.5, sampler_inbatch=sampler_inbatch
)

# 4. training with catalyst Runner
class CustomRunner(dl.SupervisedRunner):
    def handle_batch(self, batch) -> None:
        if self.is_train_loader:
            images, targets = batch["features"].float(), batch["targets"].long()
            features = self.model(images)
            self.batch = {"embeddings": features, "targets": targets}
        else:
            images, targets, is_query = (
                batch["features"].float(),
                batch["targets"].long(),
                batch["is_query"].bool()
            )
            features = self.model(images)
            self.batch = {
                "embeddings": features, "targets": targets, "is_query": is_query
            }

callbacks = [
    dl.ControlFlowCallback(
        dl.CriterionCallback(
            input_key="embeddings", target_key="targets", metric_key="loss"
        ),
        loaders="train",
    ),
    dl.ControlFlowCallback(
        dl.CMCScoreCallback(
            embeddings_key="embeddings",
            labels_key="targets",
            is_query_key="is_query",
            topk=[1],
        ),
        loaders="valid",
    ),
    dl.PeriodicLoaderCallback(
        valid_loader_key="valid",
        valid_metric_key="cmc01",
        minimize=False,
        valid=2
    ),
]

runner = CustomRunner(input_key="features", output_key="embeddings")
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    callbacks=callbacks,
    loaders={"train": train_loader, "valid": valid_loader},
    verbose=False,
    logdir="./logs",
    valid_loader="valid",
    valid_metric="cmc01",
    minimize_valid_metric=False,
    num_epochs=10,
)

Note

Metric names depending on input parameters:

  • topk = (1,) or None —> "cmc01"

  • topk = (1, 3) —> "cmc01", "cmc03"

  • topk = (1, 3, 5) —> "cmc01", "cmc03", "cmc05"

You can find them in runner.batch_metrics, runner.loader_metrics or runner.epoch_metrics.

Note

Please follow the minimal examples sections for more use cases.

__init__(embeddings_key: str, labels_key: str, is_query_key: str, topk: Optional[Iterable[int]] = None, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

ConfusionMatrixCallback

class catalyst.callbacks.metrics.confusion_matrix.ConfusionMatrixCallback(input_key: str, target_key: str, prefix: Optional[str] = None, class_names: Optional[List[str]] = None, num_classes: Optional[int] = None, normalize: bool = False, plot_params: Optional[Dict] = None)[source]

Bases: catalyst.core.callback.Callback

Callback to plot your confusion matrix to the loggers.

Parameters
  • input_key – key to use from runner.batch, specifies our y_pred

  • target_key – key to use from runner.batch, specifies our y_true

  • prefix – plot name for monitoring tools

  • class_names – list with class names

  • num_classes – number of classes

  • normalize – boolean flag for confusion matrix normalization

  • plot_params – extra params for plt.figure rendering

Note

catalyst[ml] required for this callback

Examples:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# sample data
num_samples, num_features, num_classes = int(1e4), int(1e1), 4
X = torch.rand(num_samples, num_features)
y = (torch.rand(num_samples,) * num_classes).to(torch.int64)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_classes)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = dl.SupervisedRunner(
    input_key="features",
    output_key="logits",
    target_key="targets",
    loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=3,
    valid_loader="valid",
    valid_metric="accuracy03",
    minimize_valid_metric=False,
    verbose=True,
    callbacks=[
        dl.AccuracyCallback(
            input_key="logits", target_key="targets", num_classes=num_classes
        ),
        dl.PrecisionRecallF1SupportCallback(
            input_key="logits", target_key="targets", num_classes=num_classes
        ),
        dl.AUCCallback(input_key="logits", target_key="targets"),
        dl.ConfusionMatrixCallback(
            input_key="logits", target_key="targets", num_classes=num_classes
        ),
    ],
)

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, prefix: Optional[str] = None, class_names: Optional[List[str]] = None, num_classes: Optional[int] = None, normalize: bool = False, plot_params: Optional[Dict] = None)[source]

Callback initialisation.

DiceCallback

class catalyst.callbacks.metrics.segmentation.DiceCallback(input_key: str, target_key: str, class_dim: int = 1, weights: Optional[List[float]] = None, class_names: Optional[List[str]] = None, threshold: Optional[float] = None, log_on_batch: bool = True, compute_per_class_metrics: bool = False, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

Dice metric callback.

Parameters
  • input_key – input key to use for metric calculation, specifies our y_pred

  • target_key – output key to use for metric calculation, specifies our y_true

  • class_dim – indicates class dimension (K) for outputs and targets tensors (default = 1)

  • weights – class weights

  • class_names – class names

  • threshold – threshold for outputs binarization

  • log_on_batch – boolean flag to log computed metrics every batch

  • compute_per_class_metrics – boolean flag to compute per-class metrics (default: SETTINGS.compute_per_class_metrics or False).

  • prefix – metric prefix

  • suffix – metric suffix

Examples:

import os
import torch
from torch import nn
from torch.utils.data import DataLoader
from catalyst import dl
from catalyst.data import ToTensor
from catalyst.contrib import MNIST, IoULoss


model = nn.Sequential(
    nn.Conv2d(1, 1, 3, 1, 1), nn.ReLU(),
    nn.Conv2d(1, 1, 3, 1, 1), nn.Sigmoid(),
)
criterion = IoULoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.02)

loaders = {
    "train": DataLoader(
        MNIST(os.getcwd(), train=True, download=True, transform=ToTensor()),
        batch_size=32
    ),
    "valid": DataLoader(
        MNIST(os.getcwd(), train=False),
        batch_size=32
    ),
}

class CustomRunner(dl.SupervisedRunner):
    def handle_batch(self, batch):
        x = batch[self._input_key]
        x_noise = (x + torch.rand_like(x)).clamp_(0, 1)
        x_ = self.model(x_noise)
        self.batch = {
            self._input_key: x, self._output_key: x_, self._target_key: x
        }

runner = CustomRunner(
    input_key="features",
    output_key="scores",
    target_key="targets",
    loss_key="loss"
)
# model training
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    loaders=loaders,
    num_epochs=1,
    callbacks=[
        dl.IOUCallback(input_key="scores", target_key="targets"),
        dl.DiceCallback(input_key="scores", target_key="targets"),
        dl.TrevskyCallback(input_key="scores", target_key="targets", alpha=0.2),
    ],
    logdir="./logdir",
    valid_loader="valid",
    valid_metric="loss",
    minimize_valid_metric=True,
    verbose=True,
)

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, class_dim: int = 1, weights: Optional[List[float]] = None, class_names: Optional[List[str]] = None, threshold: Optional[float] = None, log_on_batch: bool = True, compute_per_class_metrics: bool = False, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

FunctionalMetricCallback

class catalyst.callbacks.metrics.functional_metric.FunctionalMetricCallback(input_key: Union[str, Iterable[str], Dict[str, str]], target_key: Union[str, Iterable[str], Dict[str, str]], metric_fn: Callable, metric_key: str, compute_on_call: bool = True, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.FunctionalBatchMetricCallback

Parameters
  • input_key – input key to use for metric calculation, specifies our y_pred

  • target_key – output key to use for metric calculation, specifies our y_true

  • metric_fn – metric function, that get outputs, targets and return score as torch.Tensor

  • metric_key – key to store computed metric in runner.batch_metrics dictionary

  • compute_on_call – Computes and returns metric value during metric call. Used for per-batch logging. default: True

  • log_on_batch – boolean flag to log computed metrics every batch

  • prefix – metric prefix

  • suffix – metric suffix

__init__(input_key: Union[str, Iterable[str], Dict[str, str]], target_key: Union[str, Iterable[str], Dict[str, str]], metric_fn: Callable, metric_key: str, compute_on_call: bool = True, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

HitrateCallback

class catalyst.callbacks.metrics.recsys.HitrateCallback(input_key: str, target_key: str, topk: Optional[Iterable[int]] = None, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

Hitrate metric callback. Computes HR@topk for the specified values of topk.

Parameters
  • input_key – input key to use for metric calculation, specifies our y_pred

  • target_key – output key to use for metric calculation, specifies our y_true

  • topk – specifies which HR@K to log

  • log_on_batch – boolean flag to log computed metrics every batch

  • prefix – metric prefix

  • suffix – metric suffix

Examples:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# sample data
num_users, num_features, num_items = int(1e4), int(1e1), 10
X = torch.rand(num_users, num_features)
y = (torch.rand(num_users, num_items) > 0.5).to(torch.float32)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_items)
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = dl.SupervisedRunner(
    input_key="features",
    output_key="logits",
    target_key="targets",
    loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    num_epochs=3,
    verbose=True,
    callbacks=[
        dl.BatchTransformCallback(
            transform=torch.sigmoid,
            scope="on_batch_end",
            input_key="logits",
            output_key="scores"
        ),
        dl.CriterionCallback(
            input_key="logits", target_key="targets", metric_key="loss"
        ),
        dl.AUCCallback(input_key="scores", target_key="targets"),
        dl.HitrateCallback(
            input_key="scores", target_key="targets", topk=(1, 3, 5)
        ),
        dl.MRRCallback(input_key="scores", target_key="targets", topk=(1, 3, 5)),
        dl.MAPCallback(input_key="scores", target_key="targets", topk=(1, 3, 5)),
        dl.NDCGCallback(input_key="scores", target_key="targets", topk=(1, 3)),
        dl.OptimizerCallback(metric_key="loss"),
        dl.SchedulerCallback(),
        dl.CheckpointCallback(
            logdir="./logs", loader_key="valid", metric_key="loss", minimize=True
        ),
    ]
)

Note

Metric names depending on input parameters:

  • topk = (1,) or None —> "hitrate01"

  • topk = (1, 3) —> "hitrate01", "hitrate03"

  • topk = (1, 3, 5) —> "hitrate01", "hitrate03", "hitrate05"

You can find them in runner.batch_metrics, runner.loader_metrics or runner.epoch_metrics.

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, topk: Optional[Iterable[int]] = None, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

IOUCallback

class catalyst.callbacks.metrics.segmentation.IOUCallback(input_key: str, target_key: str, class_dim: int = 1, weights: Optional[List[float]] = None, class_names: Optional[List[str]] = None, threshold: Optional[float] = None, log_on_batch: bool = True, compute_per_class_metrics: bool = False, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

IOU metric callback.

Parameters
  • input_key – input key to use for metric calculation, specifies our y_pred

  • target_key – output key to use for metric calculation, specifies our y_true

  • class_dim – indicates class dimension (K) for outputs and targets tensors (default = 1)

  • weights – class weights

  • class_names – class names

  • threshold – threshold for outputs binarization

  • log_on_batch – boolean flag to log computed metrics every batch

  • compute_per_class_metrics – boolean flag to compute per-class metrics (default: SETTINGS.compute_per_class_metrics or False).

  • prefix – metric prefix

  • suffix – metric suffix

Examples:

import os
import torch
from torch import nn
from torch.utils.data import DataLoader
from catalyst import dl
from catalyst.data import ToTensor
from catalyst.contrib import MNIST, IoULoss


model = nn.Sequential(
    nn.Conv2d(1, 1, 3, 1, 1), nn.ReLU(),
    nn.Conv2d(1, 1, 3, 1, 1), nn.Sigmoid(),
)
criterion = IoULoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.02)

loaders = {
    "train": DataLoader(
        MNIST(os.getcwd(), train=True, download=True, transform=ToTensor()),
        batch_size=32
    ),
    "valid": DataLoader(
        MNIST(os.getcwd(), train=False),
        batch_size=32
    ),
}

class CustomRunner(dl.SupervisedRunner):
    def handle_batch(self, batch):
        x = batch[self._input_key]
        x_noise = (x + torch.rand_like(x)).clamp_(0, 1)
        x_ = self.model(x_noise)
        self.batch = {
            self._input_key: x, self._output_key: x_, self._target_key: x
        }

runner = CustomRunner(
    input_key="features",
    output_key="scores",
    target_key="targets",
    loss_key="loss"
)
# model training
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    loaders=loaders,
    num_epochs=1,
    callbacks=[
        dl.IOUCallback(input_key="scores", target_key="targets"),
        dl.DiceCallback(input_key="scores", target_key="targets"),
        dl.TrevskyCallback(input_key="scores", target_key="targets", alpha=0.2),
    ],
    logdir="./logdir",
    valid_loader="valid",
    valid_metric="loss",
    minimize_valid_metric=True,
    verbose=True,
)

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, class_dim: int = 1, weights: Optional[List[float]] = None, class_names: Optional[List[str]] = None, threshold: Optional[float] = None, log_on_batch: bool = True, compute_per_class_metrics: bool = False, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

MAPCallback

class catalyst.callbacks.metrics.recsys.MAPCallback(input_key: str, target_key: str, topk: Optional[Iterable[int]] = None, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

MAP metric callback. Computes MAP@topk for the specified values of topk.

Parameters
  • input_key – input key to use for metric calculation, specifies our y_pred

  • target_key – output key to use for metric calculation, specifies our y_true

  • prefix – key for the metric’s name

  • topk – specifies which MAP@K to log

  • log_on_batch – boolean flag to log computed metrics every batch

  • prefix – metric prefix

  • suffix – metric suffix

Examples:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# sample data
num_users, num_features, num_items = int(1e4), int(1e1), 10
X = torch.rand(num_users, num_features)
y = (torch.rand(num_users, num_items) > 0.5).to(torch.float32)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_items)
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = dl.SupervisedRunner(
    input_key="features",
    output_key="logits",
    target_key="targets",
    loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    num_epochs=3,
    verbose=True,
    callbacks=[
        dl.BatchTransformCallback(
            transform=torch.sigmoid,
            scope="on_batch_end",
            input_key="logits",
            output_key="scores"
        ),
        dl.CriterionCallback(
            input_key="logits", target_key="targets", metric_key="loss"
        ),
        dl.AUCCallback(input_key="scores", target_key="targets"),
        dl.HitrateCallback(
            input_key="scores", target_key="targets", topk=(1, 3, 5)
        ),
        dl.MRRCallback(input_key="scores", target_key="targets", topk=(1, 3, 5)),
        dl.MAPCallback(input_key="scores", target_key="targets", topk=(1, 3, 5)),
        dl.NDCGCallback(input_key="scores", target_key="targets", topk=(1, 3)),
        dl.OptimizerCallback(metric_key="loss"),
        dl.SchedulerCallback(),
        dl.CheckpointCallback(
            logdir="./logs", loader_key="valid", metric_key="loss", minimize=True
        ),
    ]
)

Note

Metric names depending on input parameters:

  • topk = (1,) or None —> "map01"

  • topk = (1, 3) —> "map01", "map03"

  • topk = (1, 3, 5) —> "map01", "map03", "map05"

You can find them in runner.batch_metrics, runner.loader_metrics or runner.epoch_metrics.

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, topk: Optional[Iterable[int]] = None, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

MultilabelAccuracyCallback

class catalyst.callbacks.metrics.accuracy.MultilabelAccuracyCallback(input_key: str, target_key: str, threshold: Union[float, torch.Tensor] = 0.5, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

Multilabel accuracy metric callback. Computes multilabel accuracy@topk for the specified values of topk.

Parameters
  • input_key – input key to use for metric calculation, specifies our y_pred

  • target_key – output key to use for metric calculation, specifies our y_true

  • threshold – thresholds for model scores

  • log_on_batch – boolean flag to log computed metrics every batch

  • prefix – metric prefix

  • suffix – metric suffix

Examples:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# sample data
num_samples, num_features, num_classes = int(1e4), int(1e1), 4
X = torch.rand(num_samples, num_features)
y = (torch.rand(num_samples, num_classes) > 0.5).to(torch.float32)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_classes)
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = dl.SupervisedRunner(
    input_key="features",
    output_key="logits",
    target_key="targets",
    loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=3,
    valid_loader="valid",
    valid_metric="accuracy",
    minimize_valid_metric=False,
    verbose=True,
    callbacks=[
        dl.AUCCallback(input_key="logits", target_key="targets"),
        dl.MultilabelAccuracyCallback(
            input_key="logits", target_key="targets", threshold=0.5
        )
    ]

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, threshold: Union[float, torch.Tensor] = 0.5, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

MultilabelPrecisionRecallF1SupportCallback

class catalyst.callbacks.metrics.classification.MultilabelPrecisionRecallF1SupportCallback(input_key: str, target_key: str, num_classes: Optional[int] = None, zero_division: int = 0, log_on_batch: bool = True, compute_per_class_metrics: bool = False, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

Multilabel PrecisionRecallF1Support metric callback.

Parameters
  • input_key – input key to use for metric calculation, specifies our y_pred

  • target_key – output key to use for metric calculation, specifies our y_true

  • num_classes – number of classes

  • zero_division – value to set in case of zero division during metrics (precision, recall) computation; should be one of 0 or 1

  • log_on_batch – boolean flag to log computed metrics every batch

  • compute_per_class_metrics – boolean flag to compute per-class metrics (default: SETTINGS.compute_per_class_metrics or False).

  • prefix – metric prefix

  • suffix – metric suffix

Examples:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# sample data
num_samples, num_features, num_classes = int(1e4), int(1e1), 4
X = torch.rand(num_samples, num_features)
y = (torch.rand(num_samples, num_classes) > 0.5).to(torch.float32)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_classes)
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = dl.SupervisedRunner(
    input_key="features",
    output_key="logits",
    target_key="targets",
    loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=3,
    valid_loader="valid",
    valid_metric="accuracy",
    minimize_valid_metric=False,
    verbose=True,
    callbacks=[
        dl.BatchTransformCallback(
            transform=torch.sigmoid,
            scope="on_batch_end",
            input_key="logits",
            output_key="scores"
        ),
        dl.AUCCallback(input_key="scores", target_key="targets"),
        dl.MultilabelAccuracyCallback(
            input_key="scores", target_key="targets", threshold=0.5
        ),
        dl.MultilabelPrecisionRecallF1SupportCallback(
            input_key="scores", target_key="targets", num_classes=num_classes
        ),
    ]
)

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, num_classes: Optional[int] = None, zero_division: int = 0, log_on_batch: bool = True, compute_per_class_metrics: bool = False, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

MRRCallback

class catalyst.callbacks.metrics.recsys.MRRCallback(input_key: str, target_key: str, topk: Optional[Iterable[int]] = None, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

MRR metric callback. Computes MRR@topk for the specified values of topk.

Parameters
  • input_key – input key to use for metric calculation, specifies our y_pred

  • target_key – output key to use for metric calculation, specifies our y_true

  • prefix – key for the metric’s name

  • topk – specifies which MRR@K to log

  • log_on_batch – boolean flag to log computed metrics every batch

  • prefix – metric prefix

  • suffix – metric suffix

Examples:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# sample data
num_users, num_features, num_items = int(1e4), int(1e1), 10
X = torch.rand(num_users, num_features)
y = (torch.rand(num_users, num_items) > 0.5).to(torch.float32)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_items)
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = dl.SupervisedRunner(
    input_key="features",
    output_key="logits",
    target_key="targets",
    loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    num_epochs=3,
    verbose=True,
    callbacks=[
        dl.BatchTransformCallback(
            transform=torch.sigmoid,
            scope="on_batch_end",
            input_key="logits",
            output_key="scores"
        ),
        dl.CriterionCallback(
            input_key="logits", target_key="targets", metric_key="loss"
        ),
        dl.AUCCallback(input_key="scores", target_key="targets"),
        dl.HitrateCallback(
            input_key="scores", target_key="targets", topk=(1, 3, 5)
        ),
        dl.MRRCallback(input_key="scores", target_key="targets", topk=(1, 3, 5)),
        dl.MAPCallback(input_key="scores", target_key="targets", topk=(1, 3, 5)),
        dl.NDCGCallback(input_key="scores", target_key="targets", topk=(1, 3)),
        dl.OptimizerCallback(metric_key="loss"),
        dl.SchedulerCallback(),
        dl.CheckpointCallback(
            logdir="./logs", loader_key="valid", metric_key="loss", minimize=True
        ),
    ]
)

Note

Metric names depending on input parameters:

  • topk = (1,) or None —> "mrr01"

  • topk = (1, 3) —> "mrr01", "mrr03"

  • topk = (1, 3, 5) —> "mrr01", "mrr03", "mrr05"

You can find them in runner.batch_metrics, runner.loader_metrics or runner.epoch_metrics.

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, topk: Optional[Iterable[int]] = None, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

NDCGCallback

class catalyst.callbacks.metrics.recsys.NDCGCallback(input_key: str, target_key: str, topk: Optional[Iterable[int]] = None, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

NDCG metric callback. Computes NDCG@topk for the specified values of topk.

Parameters
  • input_key – input key to use for metric calculation, specifies our y_pred

  • target_key – output key to use for metric calculation, specifies our y_true

  • prefix – key for the metric’s name

  • topk – specifies which NDCG@K to log

  • log_on_batch – boolean flag to log computed metrics every batch

  • prefix – metric prefix

  • suffix – metric suffix

Examples:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# sample data
num_users, num_features, num_items = int(1e4), int(1e1), 10
X = torch.rand(num_users, num_features)
y = (torch.rand(num_users, num_items) > 0.5).to(torch.float32)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_items)
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = dl.SupervisedRunner(
    input_key="features",
    output_key="logits",
    target_key="targets",
    loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    num_epochs=3,
    verbose=True,
    callbacks=[
        dl.BatchTransformCallback(
            transform=torch.sigmoid,
            scope="on_batch_end",
            input_key="logits",
            output_key="scores"
        ),
        dl.CriterionCallback(
            input_key="logits", target_key="targets", metric_key="loss"
        ),
        dl.AUCCallback(input_key="scores", target_key="targets"),
        dl.HitrateCallback(
            input_key="scores", target_key="targets", topk=(1, 3, 5)
        ),
        dl.MRRCallback(input_key="scores", target_key="targets", topk=(1, 3, 5)),
        dl.MAPCallback(input_key="scores", target_key="targets", topk=(1, 3, 5)),
        dl.NDCGCallback(input_key="scores", target_key="targets", topk=(1, 3)),
        dl.OptimizerCallback(metric_key="loss"),
        dl.SchedulerCallback(),
        dl.CheckpointCallback(
            logdir="./logs", loader_key="valid", metric_key="loss", minimize=True
        ),
    ]
)

Note

Metric names depending on input parameters:

  • topk = (1,) or None —> "ndcg01"

  • topk = (1, 3) —> "ndcg01", "ndcg03"

  • topk = (1, 3, 5) —> "ndcg01", "ndcg03", "ndcg05"

You can find them in runner.batch_metrics, runner.loader_metrics or runner.epoch_metrics.

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, topk: Optional[Iterable[int]] = None, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

PrecisionRecallF1SupportCallback

class catalyst.callbacks.metrics.classification.PrecisionRecallF1SupportCallback(input_key: str, target_key: str, num_classes: Optional[int] = None, zero_division: int = 0, log_on_batch: bool = True, compute_per_class_metrics: bool = False, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

Multiclass PrecisionRecallF1Support metric callback.

Parameters
  • input_key – input key to use for metric calculation, specifies our y_pred

  • target_key – output key to use for metric calculation, specifies our y_true

  • num_classes – number of classes

  • zero_division – value to set in case of zero division during metrics (precision, recall) computation; should be one of 0 or 1

  • log_on_batch – boolean flag to log computed metrics every batch

  • compute_per_class_metrics – boolean flag to compute per-class metrics (default: SETTINGS.compute_per_class_metrics or False).

  • prefix – metric prefix

  • suffix – metric suffix

Examples:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# sample data
num_samples, num_features, num_classes = int(1e4), int(1e1), 4
X = torch.rand(num_samples, num_features)
y = (torch.rand(num_samples,) * num_classes).to(torch.int64)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_classes)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = dl.SupervisedRunner(
    input_key="features",
    output_key="logits",
    target_key="targets",
    loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=3,
    valid_loader="valid",
    valid_metric="accuracy03",
    minimize_valid_metric=False,
    verbose=True,
    callbacks=[
        dl.AccuracyCallback(
            input_key="logits", target_key="targets", num_classes=num_classes
        ),
        dl.PrecisionRecallF1SupportCallback(
            input_key="logits", target_key="targets", num_classes=num_classes
        ),
        dl.AUCCallback(input_key="logits", target_key="targets"),
    ],
)

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, num_classes: Optional[int] = None, zero_division: int = 0, log_on_batch: bool = True, compute_per_class_metrics: bool = False, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

R2SquaredCallback

class catalyst.callbacks.metrics.r2_squared.R2SquaredCallback(input_key: str, target_key: str, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.LoaderMetricCallback

R2 Squared metric callback.

Parameters
  • input_key – input key to use for r2squared calculation, specifies our y_true

  • target_key – output key to use for r2squared calculation, specifies our y_pred

  • prefix – metric prefix

  • suffix – metric suffix

Examples:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# data
num_samples, num_features = int(1e4), int(1e1)
X, y = torch.rand(num_samples, num_features), torch.rand(num_samples)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, 1)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [3, 6])

# model training
runner = dl.SupervisedRunner()
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    valid_loader="valid",
    valid_metric="loss",
    minimize_valid_metric=True,
    num_epochs=8,
    verbose=True,
    callbacks=[
        dl.R2SquaredCallback(input_key="logits", target_key="targets")
    ]
)

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

ReidCMCScoreCallback

class catalyst.callbacks.metrics.cmc_score.ReidCMCScoreCallback(embeddings_key: str, pids_key: str, cids_key: str, is_query_key: str, topk: Optional[Iterable[int]] = None, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.LoaderMetricCallback

Cumulative Matching Characteristics callback for reID case. More information about cmc-based callbacks in CMCScoreCallback’s docs.

Parameters
  • embeddings_key – embeddings key in output dict

  • pids_key – pids key in output dict

  • cids_key – cids key in output dict

  • is_query_key – bool key True if current object is from query

  • topk – specifies which cmc@K to log. [1] - cmc@1 [1, 3] - cmc@1 and cmc@3 [1, 3, 5] - cmc@1, cmc@3 and cmc@5

  • prefix – metric prefix

  • suffix – metric suffix

__init__(embeddings_key: str, pids_key: str, cids_key: str, is_query_key: str, topk: Optional[Iterable[int]] = None, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

SklearnBatchCallback

class catalyst.callbacks.metrics.scikit_learn.SklearnBatchCallback(keys: Mapping[str, Any], metric_fn: Union[Callable, str], metric_key: str, log_on_batch: bool = True, **metric_kwargs)[source]

Bases: catalyst.callbacks.metric.FunctionalBatchMetricCallback

SklearnBatchCallback implements an integration of batch-based Sklearn metrics

Parameters
  • keys – a dictionary containing: a mapping between metric_fn arguments and keys in runner.batch other arguments needed for metric_fn

  • metric_fn – metric function that gets outputs, targets, and other arguments given in keys and returns score

  • metric_key – key to store computed metric in runner.batch_metrics dictionary

  • log_on_batch – boolean flag to log computed metrics every batch

  • metric_kwargs – additional parameters for metric_fn

Note

catalyst[ml] required for this callback

Examples:

import sklearn
import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl
from functools import partial

# sample data
num_samples, num_features, num_classes = int(1e4), int(1e1), 4
X = torch.rand(num_samples, num_features)
y = (torch.rand(num_samples, num_classes) > 0.5).to(torch.float32)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_classes)
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = dl.SupervisedRunner(
    input_key="features",
    output_key="logits",
    target_key="targets",
    loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    num_epochs=3,
    verbose=True,
    callbacks=[
        dl.BatchTransformCallback(
            input_key="targets",
            output_key="labels",
            transform=partial(torch.argmax, dim=1),
            scope="on_batch_end",
        ),
        dl.BatchTransformCallback(
            input_key="logits",
            output_key="scores",
            transform=partial(torch.softmax, dim=1),
            scope="on_batch_end",
        ),
        dl.BatchTransformCallback(
            input_key="scores",
            output_key="preds",
            transform=partial(torch.argmax, dim=1),
            scope="on_batch_end",
        ),
        dl.MultilabelAccuracyCallback(
            input_key="logits", target_key="targets", threshold=0.5
        ),
        dl.SklearnBatchCallback(
            keys={"y_pred": "preds", "y_true": "labels"},
            metric_fn="f1_score",
            metric_key="sk_f1",
            average="macro",
            zero_division=1,
        )
    ]
)

Note

Please follow the minimal examples sections for more use cases.

__init__(keys: Mapping[str, Any], metric_fn: Union[Callable, str], metric_key: str, log_on_batch: bool = True, **metric_kwargs)[source]

Init.

SklearnLoaderCallback

class catalyst.callbacks.metrics.scikit_learn.SklearnLoaderCallback(keys: Mapping[str, Any], metric_fn: Union[Callable, str], metric_key: str, **metric_kwargs)[source]

Bases: catalyst.callbacks.metric.LoaderMetricCallback

SklearnLoaderCallback implements an integration of loader-based Sklearn metrics

Parameters
  • keys – a mapping between metric_fn arguments and keys in runner.batch

  • metric_fn – metric function that gets outputs, targets, and other arguments given in keys and returns score

  • metric_key – key to store computed metric in runner.batch_metrics dictionary

  • metric_kwargs – additional parameters for metric_fn

Note

catalyst[ml] required for this callback

Examples:

import sklearn
import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl
from functools import partial

# sample data
num_samples, num_features, num_classes = int(1e4), int(1e1), 4
X = torch.rand(num_samples, num_features)
y = (torch.rand(num_samples, num_classes) > 0.5).to(torch.float32)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_classes)
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = dl.SupervisedRunner(
    input_key="features",
    output_key="logits",
    target_key="targets",
    loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    num_epochs=3,
    verbose=True,
    callbacks=[
        dl.BatchTransformCallback(
            input_key="targets",
            output_key="labels",
            transform=partial(torch.argmax, dim=1),
            scope="on_batch_end",
        ),
        dl.BatchTransformCallback(
            input_key="logits",
            output_key="scores",
            transform=partial(torch.softmax, dim=1),
            scope="on_batch_end",
        ),
        dl.BatchTransformCallback(
            input_key="scores",
            output_key="preds",
            transform=partial(torch.argmax, dim=1),
            scope="on_batch_end",
        ),
        dl.MultilabelAccuracyCallback(
            input_key="logits", target_key="targets", threshold=0.5
        ),
        dl.SklearnLoaderCallback(
            keys={"y_score": "scores", "y_true": "labels"},
            metric_fn="roc_auc_score",
            metric_key="roc_auc_score",
            average="macro",
            multi_class="ovo"
        )
    ]
)

Note

Please follow the minimal examples sections for more use cases.

__init__(keys: Mapping[str, Any], metric_fn: Union[Callable, str], metric_key: str, **metric_kwargs)[source]

Init.

SklearnModelCallback

class catalyst.callbacks.sklearn_model.SklearnModelCallback(feature_key: str, target_key: Optional[str], train_loader: str, valid_loaders: Union[str, List[str]], model_fn: Union[Callable, str], predict_method: str = 'predict', predict_key: str = 'sklearn_predict', **model_kwargs)[source]

Bases: catalyst.core.callback.Callback

Callback to train a classifier on the train loader and to give predictions on the valid loader.

Parameters
  • feature_key – keys of tensors that should be used as features for the classifier fit

  • target_key – keys of tensors that should be used as targets for the classifier fit

  • train_loader – train loader name

  • valid_loaders – valid loaders where model should be predicted

  • model_fn – fabric to produce objects with .fit and predict method

  • predict_method – predict method name for the classifier

  • predict_key – key to store computed classifier predicts in runner.batch

  • model_kwargs – additional parameters for model_fn

Note

catalyst[ml] required for this callback

TrevskyCallback

class catalyst.callbacks.metrics.segmentation.TrevskyCallback(input_key: str, target_key: str, alpha: float, beta: Optional[float] = None, class_dim: int = 1, weights: Optional[List[float]] = None, class_names: Optional[List[str]] = None, threshold: Optional[float] = None, log_on_batch: bool = True, compute_per_class_metrics: bool = False, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

Trevsky metric callback.

Parameters
  • input_key – input key to use for metric calculation, specifies our y_pred

  • target_key – output key to use for metric calculation, specifies our y_true

  • alpha – false negative coefficient, bigger alpha bigger penalty for false negative. if beta is None, alpha must be in (0, 1)

  • beta – false positive coefficient, bigger alpha bigger penalty for false positive. Must be in (0, 1), if None beta = (1 - alpha)

  • class_dim – indicates class dimension (K) for outputs and targets tensors (default = 1)

  • weights – class weights

  • class_names – class names

  • threshold – threshold for outputs binarization

  • log_on_batch – boolean flag to log computed metrics every batch

  • compute_per_class_metrics – boolean flag to compute per-class metrics (default: SETTINGS.compute_per_class_metrics or False).

  • prefix – metric prefix

  • suffix – metric suffix

Examples:

import os
import torch
from torch import nn
from torch.utils.data import DataLoader
from catalyst import dl
from catalyst.contrib import MNIST, IoULoss

model = nn.Sequential(
    nn.Conv2d(1, 1, 3, 1, 1), nn.ReLU(),
    nn.Conv2d(1, 1, 3, 1, 1), nn.Sigmoid(),
)
criterion = IoULoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.02)

loaders = {
    "train": DataLoader(
        MNIST(os.getcwd(), train=True),
        batch_size=32
    ),
    "valid": DataLoader(
        MNIST(os.getcwd(), train=False),
        batch_size=32
    ),
}

class CustomRunner(dl.SupervisedRunner):
    def handle_batch(self, batch):
        x = batch[self._input_key]
        x_noise = (x + torch.rand_like(x)).clamp_(0, 1)
        x_ = self.model(x_noise)
        self.batch = {
            self._input_key: x, self._output_key: x_, self._target_key: x
        }

runner = CustomRunner(
    input_key="features",
    output_key="scores",
    target_key="targets",
    loss_key="loss"
)
# model training
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    loaders=loaders,
    num_epochs=1,
    callbacks=[
        dl.IOUCallback(input_key="scores", target_key="targets"),
        dl.DiceCallback(input_key="scores", target_key="targets"),
        dl.TrevskyCallback(input_key="scores", target_key="targets", alpha=0.2),
    ],
    logdir="./logdir",
    valid_loader="valid",
    valid_metric="loss",
    minimize_valid_metric=True,
    verbose=True,
)

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, alpha: float, beta: Optional[float] = None, class_dim: int = 1, weights: Optional[List[float]] = None, class_names: Optional[List[str]] = None, threshold: Optional[float] = None, log_on_batch: bool = True, compute_per_class_metrics: bool = False, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.