Shortcuts

Callbacks

Run

BatchOverfitCallback

class catalyst.callbacks.batch_overfit.BatchOverfitCallback(**kwargs)[source]

Bases: catalyst.core.callback.Callback

Callback to overfit loaders with specified number of batches. By default we use 1 batch for loader.

Parameters

kwargs – loader names and their number of batches to overfit.

For example, if you have train, train_additional, valid and valid_additional loaders and wan’t to overfit train on first 1 batch, train_additional on first 2 batches, valid - on first 20% of batches and valid_additional - on 50% batches:

from catalyst.dl import (
    SupervisedRunner, BatchOverfitCallback,
)
runner = SupervisedRunner()
runner.train(
    ...
    loaders={
        "train": ...,
        "train_additional": ...,
        "valid": ...,
        "valid_additional":...
    }
    ...
    callbacks=[
        ...
        BatchOverfitCallback(
            train_additional=2,
            valid=0.2,
            valid_additional=0.5
        ),
        ...
    ]
    ...
)

Minimal working example

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# data
num_samples, num_features = int(1e4), int(1e1)
X, y = torch.rand(num_samples, num_features), torch.rand(num_samples)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, 1)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [3, 6])

# model training
runner = dl.SupervisedRunner()
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=8,
    verbose=True,
    callbacks=[dl.BatchOverfitCallback(train=10, valid=0.5)]
)
__init__(**kwargs)[source]

Init.

BatchTransformCallback

class catalyst.callbacks.batch_transform.BatchTransformCallback(transform: Callable, scope: str, input_key: Optional[Union[str, List[str]]] = None, output_key: Optional[Union[str, List[str]]] = None)[source]

Bases: catalyst.core.callback.Callback

Preprocess your batch with specified function.

Parameters
  • transform (Callable) – Function to apply.

  • scope (str) – "on_batch_end" (post-processing model output) or "on_batch_start" (pre-processing model input).

  • input_key (Union[List[str], str], optional) – Keys in batch dict to apply function. Defaults to None.

  • output_key (Union[List[str], str], optional) – Keys for output. If None then will apply function inplace to keys_to_apply. Defaults to None.

Raises

TypeError – When keys is not str or a list. When scope is not in ["on_batch_end", "on_batch_start"].

Examples

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# sample data
num_users, num_features, num_items = int(1e4), int(1e1), 10
X = torch.rand(num_users, num_features)
y = (torch.rand(num_users, num_items) > 0.5).to(torch.float32)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_items)
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = SupervisedRunner()
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    num_epochs=3,
    verbose=True,
    callbacks=[
        dl.LambdaPreprocessCallback(
            input_key="logits", output_key="scores", transform=torch.sigmoid
        ),
        dl.CriterionCallback(
            input_key="logits", target_key="targets", metric_key="loss"
        ),
# uncomment for extra metrics:
#       dl.AUCCallback(
#           input_key="scores", target_key="targets"
#       ),
#       dl.HitrateCallback(
#           input_key="scores", target_key="targets", topk_args=(1, 3, 5)
#       ),
#       dl.MRRCallback(
#           input_key="scores", target_key="targets", topk_args=(1, 3, 5)
#       ),
#       dl.MAPCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)),
#       dl.NDCGCallback(
#           input_key="scores", target_key="targets", topk_args=(1, 3, 5)
#       ),
        dl.OptimizerCallback(metric_key="loss"),
        dl.SchedulerCallback(),
        dl.CheckpointCallback(
            logdir="./logs", loader_key="valid", metric_key="map01", minimize=False
        ),
    ]
)
class CustomRunner(dl.Runner):

    def handle_batch(self, batch):
        logits = self.model(
            batch["features"].view(batch["features"].size(0), -1)
        )

        loss = F.cross_entropy(logits, batch["targets"])
        accuracy01, accuracy03 = metrics.accuracy(
            logits, batch["targets"], topk=(1, 3)
        )
        self.batch_metrics.update(
            {"loss": loss, "accuracy01": accuracy01, "accuracy03": accuracy03}
        )

        if self.is_train_loader:
            loss.backward()
            self.optimizer.step()
            self.optimizer.zero_grad()


class MnistDataset(torch.utils.data.Dataset):
    def __init__(self, dataset):
        self.dataset = dataset

    def __getitem__(self, item):
        return {"features": self.dataset[item][0], "targets": self.dataset[item][1]}

    def __len__(self):
        return len(self.dataset)

model = torch.nn.Linear(28 * 28, 10)
optimizer = torch.optim.Adam(model.parameters(), lr=0.02)

loaders = {
    "train": DataLoader(
        MnistDataset(
            MNIST(os.getcwd(), train=False, download=True, transform=ToTensor())
        ),
        batch_size=32,
    ),
    "valid": DataLoader(
        MnistDataset(
            MNIST(os.getcwd(), train=False, download=True, transform=ToTensor())
        ),
        batch_size=32,
    ),
}

transrorms = [
    augmentation.RandomAffine(degrees=(-15, 20), scale=(0.75, 1.25)),
]

runner = CustomRunner()

# model training
runner.train(
    model=model,
    optimizer=optimizer,
    loaders=loaders,
    logdir="./logs",
    num_epochs=5,
    verbose=False,
    load_best_on_end=True,
    check=True,
    callbacks=[
        BatchTransformCallback(
            transform=transrorms, scope="on_batch_start", input_key="features"
        )
    ],
)
__init__(transform: Callable, scope: str, input_key: Optional[Union[str, List[str]]] = None, output_key: Optional[Union[str, List[str]]] = None)[source]

Preprocess your batch with specified function.

Parameters
  • transform (Callable) – Function to apply.

  • scope (str) – "on_batch_end" (post-processing model output) or "on_batch_start" (pre-processing model input).

  • input_key (Union[List[str], str], optional) – Keys in batch dict to apply function. Defaults to None.

  • output_key (Union[List[str], str], optional) – Keys for output. If None then will apply function inplace to keys_to_apply. Defaults to None.

Raises

TypeError – When keys is not str or a list. When scope is not in ["on_batch_end", "on_batch_start"].

CheckpointCallback

class catalyst.callbacks.checkpoint.CheckpointCallback(logdir: Optional[str] = None, loader_key: Optional[str] = None, metric_key: Optional[str] = None, minimize: Optional[bool] = None, min_delta: float = 1e-06, save_n_best: int = 1, load_on_stage_start: Optional[Union[str, Dict[str, str]]] = None, load_on_stage_end: Optional[Union[str, Dict[str, str]]] = None, metrics_filename: str = '_metrics.json', mode: str = 'all', use_logdir_postfix: bool = False, use_runner_logdir: bool = False)[source]

Bases: catalyst.callbacks.checkpoint.ICheckpointCallback

Checkpoint callback to save/restore your model/criterion/optimizer/scheduler.

Parameters
  • logdir – directory to store chekpoints

  • loader_key – loader key for best model selection (based on metric score over the dataset)

  • metric_key – metric key for best model selection (based on metric score over the dataset)

  • minimize – boolean flag to minimize the required metric

  • min_delta – minimal delta for metric improve

  • save_n_best – number of best checkpoint to keep, if 0 then store only last state of model and load_on_stage_end should be one of last or last_full.

  • load_on_stage_start (str or Dict[str, str]) –

    load specified state/model at stage start.

    If passed string then will be performed initialization from specified state (best/best_full/last/last_full) or checkpoint file.

    If passed dict then will be performed initialization only for specified parts - model, criterion, optimizer, scheduler.

    Example

    >>> # possible checkpoints to use:
    >>> #   "best"/"best_full"/"last"/"last_full"
    >>> #   or path to specific checkpoint
    >>> to_load = {
    >>>    "model": "path/to/checkpoint.pth",
    >>>    "criterion": "best",
    >>>    "optimizer": "last_full",
    >>>    "scheduler": "best_full",
    >>> }
    >>> CheckpointCallback(load_on_stage_start=to_load)
    

    All other keys instead of "model", "criterion", "optimizer" and "scheduler" will be ignored.

    If None or an empty dict (or dict without mentioned above keys) then no action is required at stage start and:

    • Config API - will be used best state of model

    • Notebook API - no action will be performed (will be used the last state)

    NOTE: Loading will be performed on all stages except first.

    NOTE: Criterion, optimizer and scheduler are optional keys and should be loaded from full checkpoint.

    Model state can be loaded from any checkpoint.

    When dict contains keys for model and some other part (for example {"model": "last", "optimizer": "last"}) and they match in prefix ("best" and "best_full") then will be loaded full checkpoint because it contains required states.

  • load_on_stage_end (str or Dict[str, str]) –

    load specified state/model at stage end.

    If passed string then will be performed initialization from specified state (best/best_full/last/last_full) or checkpoint file.

    If passed dict then will be performed initialization only for specified parts - model, criterion, optimizer, scheduler. Logic for dict is the same as for load_on_stage_start.

    If None then no action is required at stage end and will be used the last runner.

    NOTE: Loading will be performed always at stage end.

  • metrics_filename – filename to save metrics in checkpoint folder. Must ends on .json or .yml

  • mode – checkpoining mode, could be all, full, model

  • use_logdir_postfix – boolean flag to use extra prefix checkpoints for logdir

  • use_runner_logdir – boolean flag to use runner._logdir as logdir

Examples:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# sample data
num_users, num_features, num_items = int(1e4), int(1e1), 10
X = torch.rand(num_users, num_features)
y = (torch.rand(num_users, num_items) > 0.5).to(torch.float32)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_items)
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = dl.SupervisedRunner(
    input_key="features", output_key="logits", target_key="targets", loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    num_epochs=3,
    verbose=True,
    callbacks=[
        dl.BatchTransformCallback(
            transform=torch.sigmoid,
            scope="on_batch_end",
            input_key="logits",
            output_key="scores"
        ),
        dl.CriterionCallback(
            input_key="logits", target_key="targets", metric_key="loss"
        ),
        dl.AUCCallback(input_key="scores", target_key="targets"),
        dl.HitrateCallback(
            input_key="scores", target_key="targets", topk_args=(1, 3, 5)
        ),
        dl.MRRCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)),
        dl.MAPCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)),
        dl.NDCGCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)),
        dl.OptimizerCallback(metric_key="loss"),
        dl.SchedulerCallback(),
        dl.CheckpointCallback(
            logdir="./logs", loader_key="valid", metric_key="loss", minimize=True
        ),
    ]
)

Note

Please follow the minimal examples sections for more use cases.

__init__(logdir: Optional[str] = None, loader_key: Optional[str] = None, metric_key: Optional[str] = None, minimize: Optional[bool] = None, min_delta: float = 1e-06, save_n_best: int = 1, load_on_stage_start: Optional[Union[str, Dict[str, str]]] = None, load_on_stage_end: Optional[Union[str, Dict[str, str]]] = None, metrics_filename: str = '_metrics.json', mode: str = 'all', use_logdir_postfix: bool = False, use_runner_logdir: bool = False)[source]

Init.

ControlFlowCallback

class catalyst.callbacks.control_flow.ControlFlowCallback(base_callback: catalyst.core.callback.Callback, epochs: Optional[Union[int, Sequence[int]]] = None, ignore_epochs: Optional[Union[int, Sequence[int]]] = None, loaders: Optional[Union[str, Sequence[str], Mapping[str, Union[int, Sequence[int]]]]] = None, ignore_loaders: Optional[Union[str, Sequence[str], Mapping[str, Union[int, Sequence[int]]]]] = None, filter_fn: Optional[Union[str, Callable[[str, int, str], bool]]] = None, use_global_epochs: bool = False)[source]

Bases: catalyst.core.callback.CallbackWrapper

Enable/disable callback execution on different stages, loaders and epochs.

Parameters
  • base_callback – callback to wrap

  • epochs

    epochs where need to enable callback, on other epochs callback will be disabled.

    If passed int/float then callback will be enabled with period specified as epochs value (epochs expression epoch_number % epochs == 0) and disabled on other epochs.

    If passed list of epochs then will be executed callback on specified epochs.

    Default value is None.

  • ignore_epochs:

    epochs where: need to disable callback, on other epochs callback will be enabled.

    If passed int/float then callback will be disabled with period specified as epochs value (epochs expression epoch_number % epochs != 0) and enabled on other epochs.

    If passed list of epochs then will be disabled callback on specified epochs.

    Default value is None.

  • loaders (str/Sequence[str]/Mapping[str, int/Sequence[str]]) –

    loaders where should be enabled callback, on other loaders callback will be disabled.

    If passed string object then will be disabled callback for loader with specified name.

    If passed list/tuple of strings then will be disabled callback for loaders with specified names.

    If passed dictionary where key is a string and values int or list of integers then callback will be disabled on epochs (dictionary value) for specified loader (dictionary key).

    Default value is None.

  • ignore_loaders (str/Sequence[str]/Mapping[str, int/Sequence[str]]) –

    loader names where should be disabled callback, on other loaders callback will be enabled.

    If passed string object then will be disabled callback for loader with specified name.

    If passed list/tuple of strings then will be disabled callback for loaders with specified names.

    If passed dictionary where key is a string and values int or list of integers then callback will be disabled on epochs (dictionary value) for specified loader (dictionary key).

    Default value is None.

  • filter_fn (str or Callable[[str, int, str], bool]) –

    function to use instead of loaders or epochs arguments.

    If the object passed to a filter_fn is a string then it will be interpreted as python code. Expected lambda function with three arguments stage name (str), epoch number (int), loader name (str) and this function should return True if callback should be enabled on some condition.

    If passed callable object then it should accept three arguments - stage name (str), epoch number (int), loader name (str) and should return True if callback should be enabled on some condition othervise should return False.

    Default value is None.

    Examples:

    # enable callback on all loaders
    # exept "train" loader every 2 epochs
    ControlFlowCallback(
        ...
        filter_fn=lambda s, e, l: l != "train" and e % 2 == 0
        ...
    )
    # or with string equivalent
    ControlFlowCallback(
        ...
        filter_fn="lambda s, e, l: l != 'train' and e % 2 == 0"
        ...
    )
    

  • use_global_epochs – if True then will be used global epochs instead of epochs in a stage, the default value is False

Note

Please run experiment with check option to check if everything works as expected with this callback.

For example, if you don’t want to compute loss on a validation you can ignore CriterionCallback, for notebook API need to wrap callback:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst.dl import (
    SupervisedRunner, AccuracyCallback,
    CriterionCallback, ControlFlowCallback,
)

num_samples, num_features = 10_000, 10
n_classes = 10
X = torch.rand(num_samples, num_features)
y = torch.randint(0, n_classes, [num_samples])
loader = DataLoader(TensorDataset(X, y), batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

model = torch.nn.Linear(num_features, n_classes)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [3, 6])

runner = SupervisedRunner()
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=5,
    verbose=False,
    valid_metric="accuracy03",
    minimize_metric=False,
    callbacks=[
        AccuracyCallback(
            accuracy_args=[1, 3, 5]
        ),
        ControlFlowCallback(
            base_callback=CriterionCallback(),
            ignore_loaders="valid"  # or loaders="train"
        )
    ]
)

In config API need to use _wrapper argument:

callbacks_params:
  ...
  loss:
    _wrapper:
       callback: ControlFlowCallback
       ignore_loaders: valid
    callback: CriterionCallback
  ...
__init__(base_callback: catalyst.core.callback.Callback, epochs: Optional[Union[int, Sequence[int]]] = None, ignore_epochs: Optional[Union[int, Sequence[int]]] = None, loaders: Optional[Union[str, Sequence[str], Mapping[str, Union[int, Sequence[int]]]]] = None, ignore_loaders: Optional[Union[str, Sequence[str], Mapping[str, Union[int, Sequence[int]]]]] = None, filter_fn: Optional[Union[str, Callable[[str, int, str], bool]]] = None, use_global_epochs: bool = False)[source]

Init.

CriterionCallback

class catalyst.callbacks.criterion.CriterionCallback(input_key: str, target_key: str, metric_key: str, criterion_key: Optional[str] = None, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metrics.functional_metric.FunctionalMetricCallback, catalyst.callbacks.criterion.ICriterionCallback

Criterion callback, abstraction over criterion step.

Parameters
  • input_key – input key to use for metric calculation, specifies our y_pred

  • target_key – output key to use for metric calculation, specifies our y_true

  • metric_key – key to store computed metric in runner.batch_metrics dictionary

  • criterion_key – A key to take a criterion in case there are several of them, and they are in a dictionary format.

Examples:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# sample data
num_users, num_features, num_items = int(1e4), int(1e1), 10
X = torch.rand(num_users, num_features)
y = (torch.rand(num_users, num_items) > 0.5).to(torch.float32)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_items)
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = dl.SupervisedRunner(
    input_key="features", output_key="logits", target_key="targets", loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    num_epochs=3,
    verbose=True,
    callbacks=[
        dl.BatchTransformCallback(
            transform=torch.sigmoid,
            scope="on_batch_end",
            input_key="logits",
            output_key="scores"
        ),
        dl.CriterionCallback(
            input_key="logits", target_key="targets", metric_key="loss"
        ),
        dl.AUCCallback(input_key="scores", target_key="targets"),
        dl.HitrateCallback(
            input_key="scores", target_key="targets", topk_args=(1, 3, 5)
        ),
        dl.MRRCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)),
        dl.MAPCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)),
        dl.NDCGCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)),
        dl.OptimizerCallback(metric_key="loss"),
        dl.SchedulerCallback(),
        dl.CheckpointCallback(
            logdir="./logs", loader_key="valid", metric_key="loss", minimize=True
        ),
    ]
)

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, metric_key: str, criterion_key: Optional[str] = None, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

Metric – BatchMetricCallback

class catalyst.callbacks.metric.BatchMetricCallback(metric: catalyst.metrics._metric.ICallbackBatchMetric, input_key: Union[str, Iterable[str], Dict[str, str]], target_key: Union[str, Iterable[str], Dict[str, str]], log_on_batch: bool = True)[source]

Bases: catalyst.callbacks.metric.MetricCallback

BatchMetricCallback implements batch-based metrics update and computation over loader

Parameters
  • metric – metric to calculate in callback

  • input_key – keys of tensors that should be used as inputs in metric calculation

  • target_key – keys of tensors that should be used as targets in metric calculation

  • log_on_batch – boolean flag to log computed metrics every batch

__init__(metric: catalyst.metrics._metric.ICallbackBatchMetric, input_key: Union[str, Iterable[str], Dict[str, str]], target_key: Union[str, Iterable[str], Dict[str, str]], log_on_batch: bool = True)None[source]

Init BatchMetricCallback

Metric – LoaderMetricCallback

class catalyst.callbacks.metric.LoaderMetricCallback(metric: catalyst.metrics._metric.ICallbackLoaderMetric, input_key: Union[str, Iterable[str], Dict[str, str]], target_key: Union[str, Iterable[str], Dict[str, str]])[source]

Bases: catalyst.callbacks.metric.MetricCallback

LoaderMetricCallback implements loader-based metrics update and computation over loader

Parameters
  • metric – metric to calculate in callback

  • input_key – keys of tensors that should be used as inputs in metric calculation

  • target_key – keys of tensors that should be used as targets in metric calculation

Metric – MetricAggregationCallback

class catalyst.callbacks.metric_aggregation.MetricAggregationCallback(metric_key: str, metrics: Optional[Union[str, List[str], Dict[str, float]]] = None, mode: Union[str, Callable] = 'mean', scope: str = 'batch', multiplier: float = 1.0)[source]

Bases: catalyst.core.callback.Callback

A callback to aggregate several metrics in one value.

Parameters
  • metric_key – new key for aggregated metric.

  • metrics (Union[str, List[str], Dict[str, float]]) – If not None, it aggregates only the values from the metric by these keys. for weighted_sum aggregation it must be a Dict[str, float].

  • mode – function for aggregation. Must be either sum, mean or weighted_sum or user’s function to aggregate metrics. This function must get dict of metrics and runner and return aggregated metric. It can be useful for complicated fine tuning with different losses that depends on epochs and loader or something also

  • scope – type of metric. Must be either batch or loader

  • multiplier – scale factor for the aggregated metric.

Python example - loss is a weighted sum of cross entropy loss and binary cross entropy loss:

import torch
from torch.utils.data import DataLoader, TensorDataset

from catalyst import dl

# data
num_samples, num_features, num_classes = int(1e4), int(1e1), 4
X = torch.rand(num_samples, num_features)
y = (torch.rand(num_samples,) * num_classes).to(torch.int64)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_classes)
criterion = {"ce": torch.nn.CrossEntropyLoss(), "bce": torch.nn.BCEWithLogitsLoss()}
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# runner
class CustomRunner(dl.Runner):
    def handle_batch(self, batch):
        x, y = batch
        logits = self.model(x)
        num_classes = logits.shape[-1]
        targets_onehot = torch.nn.functional.one_hot(y, num_classes=num_classes)
        self.batch = {
            "features": x,
            "logits": logits,
            "targets": y,
            "targets_onehot": targets_onehot.float(),
        }


# training
runner = CustomRunner()
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    valid_loader="valid",
    valid_metric="loss",
    minimize_valid_metric=True,
    num_epochs=3,
    callbacks=[
        dl.AccuracyCallback(
            input_key="logits", target_key="targets", num_classes=num_classes
        ),
        dl.CriterionCallback(
            input_key="logits",
            target_key="targets",
            metric_key="loss_ce",
            criterion_key="ce",
        ),
        dl.CriterionCallback(
            input_key="logits",
            target_key="targets_onehot",
            metric_key="loss_bce",
            criterion_key="bce",
        ),
        # loss aggregation
        dl.MetricAggregationCallback(
            metric_key="loss",
            metrics={"loss_ce": 0.6, "loss_bce": 0.4},
            mode="weighted_sum",
        ),
        dl.OptimizerCallback(metric_key="loss"),
    ],
)
__init__(metric_key: str, metrics: Optional[Union[str, List[str], Dict[str, float]]] = None, mode: Union[str, Callable] = 'mean', scope: str = 'batch', multiplier: float = 1.0)None[source]

Init.

Misc – CheckRunCallback

class catalyst.callbacks.misc.CheckRunCallback(num_batch_steps: int = 3, num_epoch_steps: int = 3)[source]

Bases: catalyst.core.callback.Callback

Executes only a pipeline part from the run.

Parameters
  • num_batch_steps – number of batches to iterate in epoch

  • num_epoch_steps – number of epoch to perform in a stage

Minimal working example (Notebook API):

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# data
num_samples, num_features = int(1e4), int(1e1)
X, y = torch.rand(num_samples, num_features), torch.rand(num_samples)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, 1)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [3, 6])

# model training
runner = dl.SupervisedRunner()
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=8,
    verbose=True,
    callbacks=[
        dl.CheckRunCallback(num_batch_steps=3, num_epoch_steps=3)
    ]
)
__init__(num_batch_steps: int = 3, num_epoch_steps: int = 3)[source]

Init.

Misc – EarlyStoppingCallback

class catalyst.callbacks.misc.EarlyStoppingCallback(patience: int, loader_key: str, metric_key: str, minimize: bool, min_delta: float = 1e-06)[source]

Bases: catalyst.callbacks.misc.IEpochMetricHandlerCallback

Stage early stop based on metric.

Parameters
  • patience – number of epochs with no improvement after which training will be stopped.

  • loader_key – loader key for early stopping (based on metric score over the dataset)

  • metric_key – metric key for early stopping (based on metric score over the dataset)

  • minimize – if True then expected that metric should decrease and early stopping will be performed only when metric stops decreasing. If False then expected that metric should increase. Default value True.

  • min_delta – minimum change in the monitored metric to qualify as an improvement, i.e. an absolute change of less than min_delta, will count as no improvement, default value is 1e-6.

__init__(patience: int, loader_key: str, metric_key: str, minimize: bool, min_delta: float = 1e-06)[source]

Init.

handle_score_is_better(runner: catalyst.core.runner.IRunner)[source]

Event handler.

handle_score_is_not_better(runner: catalyst.core.runner.IRunner)[source]

Event handler.

Misc – TimerCallback

class catalyst.callbacks.misc.TimerCallback[source]

Bases: catalyst.core.callback.Callback

Logs pipeline execution time.

__init__()[source]

Initialisation for TimerCallback.

Misc – TqdmCallback

class catalyst.callbacks.misc.TqdmCallback[source]

Bases: catalyst.core.callback.Callback

Logs the params into tqdm console.

on_exception(runner: catalyst.core.runner.IRunner)[source]

Called if an Exception was raised.

OnnxCallback

class catalyst.callbacks.onnx.OnnxCallback(input_key: str, logdir: Optional[Union[str, pathlib.Path]] = None, filename: str = 'onnx.py', method_name: str = 'forward', input_names: Optional[Iterable] = None, output_names: Optional[List[str]] = None, dynamic_axes: Optional[Union[Dict[str, int], Dict[str, Dict[str, int]]]] = None, opset_version: int = 9, do_constant_folding: bool = False, verbose: bool = False)[source]

Bases: catalyst.core.callback.Callback

Callback for converting model to onnx runtime.

Parameters
  • input_key – input key from runner.batch to use for onnx export

  • logdir – path to folder for saving

  • filename – filename

  • method_name (str, optional) – Forward pass method to be converted. Defaults to “forward”.

  • input_names (Iterable, optional) – name of inputs in graph. Defaults to None.

  • output_names (List[str], optional) – name of outputs in graph. Defaults to None.

  • dynamic_axes (Union[Dict[str, int], Dict[str, Dict[str, int]]], optional) – axes with dynamic shapes. Defaults to None.

  • opset_version (int, optional) – Defaults to 9.

  • do_constant_folding (bool, optional) – If True, the constant-folding optimization is applied to the model during export. Defaults to False.

  • verbose (bool, default False) – if specified, we will print out a debug description of the trace being exported.

Example

import os

import torch
from torch import nn
from torch.utils.data import DataLoader

from catalyst import dl
from catalyst.data.transforms import ToTensor
from catalyst.contrib.datasets import MNIST
from catalyst.contrib.nn.modules import Flatten

loaders = {
    "train": DataLoader(
        MNIST(
            os.getcwd(), train=False, download=True, transform=ToTensor()
        ),
        batch_size=32,
    ),
    "valid": DataLoader(
        MNIST(
            os.getcwd(), train=False, download=True, transform=ToTensor()
        ),
        batch_size=32,
    ),
}

model = nn.Sequential(Flatten(), nn.Linear(784, 512), nn.ReLU(), nn.Linear(512, 10))
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2)
runner = dl.SupervisedRunner()
runner.train(
    model=model,
    callbacks=[dl.OnnxCallback(input_key="features", logdir="./logs")],
    loaders=loaders,
    criterion=criterion,
    optimizer=optimizer,
    num_epochs=1,
    logdir="./logs",
)
__init__(input_key: str, logdir: Optional[Union[str, pathlib.Path]] = None, filename: str = 'onnx.py', method_name: str = 'forward', input_names: Optional[Iterable] = None, output_names: Optional[List[str]] = None, dynamic_axes: Optional[Union[Dict[str, int], Dict[str, Dict[str, int]]]] = None, opset_version: int = 9, do_constant_folding: bool = False, verbose: bool = False)[source]

Init.

OptimizerCallback

class catalyst.callbacks.optimizer.OptimizerCallback(metric_key: str, model_key: Optional[str] = None, optimizer_key: Optional[str] = None, accumulation_steps: int = 1, grad_clip_fn: Optional[Union[str, Callable]] = None, grad_clip_params: Optional[Dict] = None)[source]

Bases: catalyst.callbacks.optimizer.IOptimizerCallback

Optimizer callback, abstraction over optimizer step.

Parameters
  • metric_key – a key to get loss from runner.batch_metrics

  • model_key – a key to select a model from runner.model in case there are several of them and they are in a dictionary format.

  • optimizer_key – a key to select a optimizer from runner.optimizer in case there are several of them and they are in a dictionary format.

  • accumulation_steps – number of steps before model.zero_grad()

  • grad_clip_fn – callable gradient cliping function or it’s name or

  • grad_clip_params – key-value parameters for grad_clip_fn

Examples:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# sample data
num_users, num_features, num_items = int(1e4), int(1e1), 10
X = torch.rand(num_users, num_features)
y = (torch.rand(num_users, num_items) > 0.5).to(torch.float32)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_items)
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = dl.SupervisedRunner(
    input_key="features", output_key="logits", target_key="targets", loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    num_epochs=3,
    verbose=True,
    callbacks=[
        dl.BatchTransformCallback(
            transform=torch.sigmoid,
            scope="on_batch_end",
            input_key="logits",
            output_key="scores"
        ),
        dl.CriterionCallback(
            input_key="logits", target_key="targets", metric_key="loss"
        ),
        dl.AUCCallback(input_key="scores", target_key="targets"),
        dl.HitrateCallback(
            input_key="scores", target_key="targets", topk_args=(1, 3, 5)
        ),
        dl.MRRCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)),
        dl.MAPCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)),
        dl.NDCGCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)),
        dl.OptimizerCallback(metric_key="loss"),
        dl.SchedulerCallback(),
        dl.CheckpointCallback(
            logdir="./logs", loader_key="valid", metric_key="loss", minimize=True
        ),
    ]
)

Note

Please follow the minimal examples sections for more use cases.

__init__(metric_key: str, model_key: Optional[str] = None, optimizer_key: Optional[str] = None, accumulation_steps: int = 1, grad_clip_fn: Optional[Union[str, Callable]] = None, grad_clip_params: Optional[Dict] = None)[source]

Init.

OptunaPruningCallback

class catalyst.callbacks.optuna.OptunaPruningCallback(loader_key: str, metric_key: str, minimize: bool, min_delta: float = 1e-06, trial: Optional[optuna.trial._trial.Trial] = None)[source]

Bases: catalyst.core.callback.Callback

Optuna callback for pruning unpromising runs. This callback can be used for early stopping (pruning) unpromising runs.

Parameters
  • loader_key – loader key for best model selection (based on metric score over the dataset)

  • metric_key – metric key for best model selection (based on metric score over the dataset)

  • minimize – boolean flag to minimize the required metric

  • min_delta – minimal delta for metric improve

  • trial – Optuna.Trial for experiment.

import optuna

from catalyst.dl import SupervisedRunner, OptunaPruningCallback

# some python code ...

def objective(trial: optuna.Trial):
    # standard optuna code for model and/or optimizer suggestion ...
    runner = SupervisedRunner()
    runner.train(
        model=model,
        loaders=loaders,
        criterion=criterion,
        optimizer=optimizer,
        callbacks=[
            OptunaPruningCallback(trial)
            # some other callbacks ...
        ],
        num_epochs=num_epochs,
    )
    return runner.best_valid_metrics[runner.valid_metric]

study = optuna.create_study()
study.optimize(objective, n_trials=100, timeout=600)

Config API is supported through catalyst-dl tune command.

__init__(loader_key: str, metric_key: str, minimize: bool, min_delta: float = 1e-06, trial: Optional[optuna.trial._trial.Trial] = None)[source]

Init.

PeriodicLoaderCallback

class catalyst.callbacks.periodic_loader.PeriodicLoaderCallback(valid_loader_key: str, valid_metric_key: str, minimize: bool = True, **kwargs)[source]

Bases: catalyst.core.callback.Callback

Callback for runing loaders with specified period. To disable loader use 0 as period (if specified 0 for validation loader then will be raised an error).

Parameters

kwargs – loader names and their run periods.

For example, if you have train, train_additional, valid and valid_additional loaders and wan’t to use train_additional every 2 epochs, valid - every 3 epochs and valid_additional - every 5 epochs:

from catalyst.dl import (
    SupervisedRunner, PeriodicLoaderRunnerCallback,
)
runner = SupervisedRunner()
runner.train(
    ...
    loaders={
        "train": ...,
        "train_additional": ...,
        "valid": ...,
        "valid_additional":...
    }
    ...
    callbacks=[
        ...
        PeriodicLoaderRunnerCallback(
            train_additional=2,
            valid=3,
            valid_additional=5
        ),
        ...
    ]
    ...
)
__init__(valid_loader_key: str, valid_metric_key: str, minimize: bool = True, **kwargs)[source]

Init.

PruningCallback

class catalyst.callbacks.pruning.PruningCallback(pruning_fn: Union[Callable, str], amount: Union[int, float], keys_to_prune: Optional[List[str]] = None, prune_on_epoch_end: Optional[bool] = False, prune_on_stage_end: Optional[bool] = True, remove_reparametrization_on_stage_end: Optional[bool] = True, layers_to_prune: Optional[List[str]] = None, dim: Optional[int] = None, l_norm: Optional[int] = None)[source]

Bases: catalyst.core.callback.Callback

This callback prunes network parameters during and/or after training.

Parameters
  • pruning_fn – function from torch.nn.utils.prune module or your based on BasePruningMethod. Can be string e.g. “l1_unstructured”. See pytorch docs for more details.

  • amount – quantity of parameters to prune. If float, should be between 0.0 and 1.0 and represent the fraction of parameters to prune. If int, it represents the absolute number of parameters to prune.

  • keys_to_prune – list of strings. Determines which tensor in modules will be pruned.

  • prune_on_epoch_end – bool flag determines call or not to call pruning_fn on epoch end.

  • prune_on_stage_end – bool flag determines call or not to call pruning_fn on stage end.

  • remove_reparametrization_on_stage_end – if True then all reparametrization pre-hooks and tensors with mask will be removed on stage end.

  • layers_to_prune – list of strings - module names to be pruned. If None provided then will try to prune every module in model.

  • dim – if you are using structured pruning method you need to specify dimension.

  • l_norm – if you are using ln_structured you need to specify l_norm.

__init__(pruning_fn: Union[Callable, str], amount: Union[int, float], keys_to_prune: Optional[List[str]] = None, prune_on_epoch_end: Optional[bool] = False, prune_on_stage_end: Optional[bool] = True, remove_reparametrization_on_stage_end: Optional[bool] = True, layers_to_prune: Optional[List[str]] = None, dim: Optional[int] = None, l_norm: Optional[int] = None)None[source]

Init.

QuantizationCallback

class catalyst.callbacks.quantization.QuantizationCallback(logdir: Optional[Union[str, pathlib.Path]] = None, filename: str = 'quantized.pth', qconfig_spec: Optional[Dict] = None, dtype: Optional[Union[str, torch.dtype]] = 'qint8')[source]

Bases: catalyst.core.callback.Callback

Callback for model quantiztion.

Parameters
  • logdir – path to folder for saving

  • filename – filename

  • qconfig_spec (Dict, optional) – quantization config in PyTorch format. Defaults to None.

  • dtype (Union[str, Optional[torch.dtype]], optional) – Type of weights after quantization. Defaults to “qint8”.

Example

import os

import torch
from torch import nn
from torch.utils.data import DataLoader

from catalyst import dl
from catalyst.data.transforms import ToTensor
from catalyst.contrib.datasets import MNIST
from catalyst.contrib.nn.modules import Flatten

loaders = {
    "train": DataLoader(
        MNIST(os.getcwd(),
        train=False,
        download=True,
        transform=ToTensor()),
        batch_size=32,
    ),
    "valid": DataLoader(
        MNIST(os.getcwd(),
        train=False,
        download=True,
        transform=ToTensor()),
        batch_size=32,
    ),
}

model = nn.Sequential(Flatten(), nn.Linear(784, 512), nn.ReLU(), nn.Linear(512, 10))
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2)
runner = dl.SupervisedRunner()
runner.train(
    model=model,
    callbacks=[dl.QuantizationCallback(logdir="./logs")],
    loaders=loaders,
    criterion=criterion,
    optimizer=optimizer,
    num_epochs=1,
    logdir="./logs",
)
__init__(logdir: Optional[Union[str, pathlib.Path]] = None, filename: str = 'quantized.pth', qconfig_spec: Optional[Dict] = None, dtype: Optional[Union[str, torch.dtype]] = 'qint8')[source]

Init.

Scheduler – SchedulerCallback

class catalyst.callbacks.scheduler.SchedulerCallback(scheduler_key: Optional[str] = None, mode: Optional[str] = None, loader_key: Optional[str] = None, metric_key: Optional[str] = None)[source]

Bases: catalyst.callbacks.scheduler.ISchedulerCallback

Scheduler callback, abstraction over scheduler step.

Parameters
  • scheduler_key – scheduler name, if None, default is None.

  • mode – scheduler mode, should be one of "epoch" or "batch", default is None. If None and object is instance of BatchScheduler or OneCycleLRWithWarmup then will be used "batch" otherwise - "epoch".

  • loader_key – loader name to look after for ReduceLROnPlateau scheduler

  • metric_key – metric name to forward to scheduler object, if None then will be used main metric specified in experiment.

Examples:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# sample data
num_users, num_features, num_items = int(1e4), int(1e1), 10
X = torch.rand(num_users, num_features)
y = (torch.rand(num_users, num_items) > 0.5).to(torch.float32)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_items)
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = dl.SupervisedRunner(
    input_key="features", output_key="logits", target_key="targets", loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    num_epochs=3,
    verbose=True,
    callbacks=[
        dl.BatchTransformCallback(
            transform=torch.sigmoid,
            scope="on_batch_end",
            input_key="logits",
            output_key="scores"
        ),
        dl.CriterionCallback(
            input_key="logits", target_key="targets", metric_key="loss"
        ),
        dl.AUCCallback(input_key="scores", target_key="targets"),
        dl.HitrateCallback(
            input_key="scores", target_key="targets", topk_args=(1, 3, 5)
        ),
        dl.MRRCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)),
        dl.MAPCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)),
        dl.NDCGCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)),
        dl.OptimizerCallback(metric_key="loss"),
        dl.SchedulerCallback(),
        dl.CheckpointCallback(
            logdir="./logs", loader_key="valid", metric_key="loss", minimize=True
        ),
    ]
)

Note

Please follow the minimal examples sections for more use cases.

__init__(scheduler_key: Optional[str] = None, mode: Optional[str] = None, loader_key: Optional[str] = None, metric_key: Optional[str] = None)[source]

Init.

make_batch_step(runner: IRunner)None[source]

Perform scheduler step and update batch metrics

Parameters

runner – current runner

make_epoch_step(runner: IRunner)None[source]

Perform scheduler step and update epoch metrics

Parameters

runner – current runner

Scheduler – LRFinder

class catalyst.callbacks.scheduler.LRFinder(final_lr: float, scale: str = 'log', num_steps: Optional[int] = None, optimizer_key: Optional[str] = None)[source]

Bases: catalyst.callbacks.scheduler.ILRUpdater

Helps you find an optimal learning rate for a model, as per suggestion of Cyclical Learning Rates for Training Neural Networks paper. Learning rate is increased in linear or log scale, depending on user input.

See How Do You Find A Good Learning Rate article for details.

__init__(final_lr: float, scale: str = 'log', num_steps: Optional[int] = None, optimizer_key: Optional[str] = None)[source]
Parameters
  • final_lr – final learning rate to try with

  • scale – learning rate increasing scale (“log” or “linear”)

  • num_steps – number of batches to try, if None - whole loader would be used.

  • optimizer_key – which optimizer key to use for learning rate scheduling

calc_lr()[source]

Calculates learning rate.

Returns

learning rate.

calc_momentum()[source]

Calculates new momentum.

Tracing

class catalyst.callbacks.tracing.TracingCallback(input_key: Union[str, List[str]], logdir: Optional[Union[str, pathlib.Path]] = None, filename: str = 'traced_model.pth', method_name: str = 'forward')[source]

Bases: catalyst.core.callback.Callback

Callback for model tracing.

Parameters
  • input_key – input key from runner.batch to use for model tracing

  • logdir – path to folder for saving

  • filename – filename

  • method_name – Model’s method name that will be used as entrypoint during tracing

Example

import os

import torch
from torch import nn
from torch.utils.data import DataLoader

from catalyst import dl
from catalyst.data.transforms import ToTensor
from catalyst.contrib.datasets import MNIST
from catalyst.contrib.nn.modules import Flatten

loaders = {
    "train": DataLoader(
        MNIST(
            os.getcwd(), train=False, download=True, transform=ToTensor()
        ),
        batch_size=32,
    ),
    "valid": DataLoader(
        MNIST(
            os.getcwd(), train=False, download=True, transform=ToTensor()
        ),
        batch_size=32,
    ),
}

model = nn.Sequential(
    Flatten(), nn.Linear(784, 512), nn.ReLU(), nn.Linear(512, 10)
)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2)
runner = dl.SupervisedRunner()
runner.train(
    model=model,
    callbacks=[dl.TracingCallback(input_key="features", logdir="./logs")],
    loaders=loaders,
    criterion=criterion,
    optimizer=optimizer,
    num_epochs=1,
    logdir="./logs",
)
__init__(input_key: Union[str, List[str]], logdir: Optional[Union[str, pathlib.Path]] = None, filename: str = 'traced_model.pth', method_name: str = 'forward')[source]

Callback for model tracing.

Parameters
  • input_key – input key from runner.batch to use for model tracing

  • logdir – path to folder for saving

  • filename – filename

  • method_name – Model’s method name that will be used as entrypoint during tracing

Example

import os

import torch
from torch import nn
from torch.utils.data import DataLoader

from catalyst import dl
from catalyst.data.transforms import ToTensor
from catalyst.contrib.datasets import MNIST
from catalyst.contrib.nn.modules import Flatten

loaders = {
    "train": DataLoader(
        MNIST(
            os.getcwd(), train=False, download=True, transform=ToTensor()
        ),
        batch_size=32,
    ),
    "valid": DataLoader(
        MNIST(
            os.getcwd(), train=False, download=True, transform=ToTensor()
        ),
        batch_size=32,
    ),
}

model = nn.Sequential(
    Flatten(), nn.Linear(784, 512), nn.ReLU(), nn.Linear(512, 10)
)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2)
runner = dl.SupervisedRunner()
runner.train(
    model=model,
    callbacks=[dl.TracingCallback(input_key="features", logdir="./logs")],
    loaders=loaders,
    criterion=criterion,
    optimizer=optimizer,
    num_epochs=1,
    logdir="./logs",
)

Metric

Accuracy - AccuracyCallback

class catalyst.callbacks.metrics.accuracy.AccuracyCallback(input_key: str, target_key: str, topk_args: Optional[List[int]] = None, num_classes: Optional[int] = None, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

Accuracy metric callback. Computes multiclass accuracy@topk for the specified values of topk.

Parameters
  • input_key – input key to use for metric calculation, specifies our y_pred

  • target_key – output key to use for metric calculation, specifies our y_true

  • topk_args – specifies which accuracy@K to log: [1] - accuracy [1, 3] - accuracy at 1 and 3 [1, 3, 5] - accuracy at 1, 3 and 5

  • num_classes – number of classes to calculate topk_args if accuracy_args is None

  • log_on_batch – boolean flag to log computed metrics every batch

  • prefix – metric prefix

  • suffix – metric suffix

Examples:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# sample data
num_samples, num_features, num_classes = int(1e4), int(1e1), 4
X = torch.rand(num_samples, num_features)
y = (torch.rand(num_samples,) * num_classes).to(torch.int64)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_classes)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = dl.SupervisedRunner(
    input_key="features", output_key="logits", target_key="targets", loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=3,
    valid_loader="valid",
    valid_metric="accuracy03",
    minimize_valid_metric=False,
    verbose=True,
    callbacks=[
        dl.AccuracyCallback(
            input_key="logits", target_key="targets", num_classes=num_classes
        ),
        dl.PrecisionRecallF1SupportCallback(
            input_key="logits", target_key="targets", num_classes=num_classes
        ),
        dl.AUCCallback(input_key="logits", target_key="targets"),
    ],
)

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, topk_args: Optional[List[int]] = None, num_classes: Optional[int] = None, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

Accuracy - MultilabelAccuracyCallback

class catalyst.callbacks.metrics.accuracy.MultilabelAccuracyCallback(input_key: str, target_key: str, threshold: Union[float, torch.Tensor] = 0.5, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

Multilabel accuracy metric callback. Computes multilabel accuracy@topk for the specified values of topk.

Parameters
  • input_key – input key to use for metric calculation, specifies our y_pred

  • target_key – output key to use for metric calculation, specifies our y_true

  • threshold – thresholds for model scores

  • log_on_batch – boolean flag to log computed metrics every batch

  • prefix – metric prefix

  • suffix – metric suffix

Examples:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# sample data
num_samples, num_features, num_classes = int(1e4), int(1e1), 4
X = torch.rand(num_samples, num_features)
y = (torch.rand(num_samples, num_classes) > 0.5).to(torch.float32)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_classes)
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = dl.SupervisedRunner(
    input_key="features", output_key="logits", target_key="targets", loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=3,
    valid_loader="valid",
    valid_metric="accuracy",
    minimize_valid_metric=False,
    verbose=True,
    callbacks=[
        dl.AUCCallback(input_key="logits", target_key="targets"),
        dl.MultilabelAccuracyCallback(
            input_key="logits", target_key="targets", threshold=0.5
        )
    ]

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, threshold: Union[float, torch.Tensor] = 0.5, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

AUCCallback

class catalyst.callbacks.metrics.auc.AUCCallback(input_key: str, target_key: str, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.LoaderMetricCallback

ROC-AUC metric callback.

Parameters
  • input_key – input key to use for auc calculation, specifies our y_true.

  • target_key – output key to use for auc calculation, specifies our y_pred.

  • prefix – metric prefix

  • suffix – metric suffix

Examples:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# sample data
num_samples, num_features, num_classes = int(1e4), int(1e1), 4
X = torch.rand(num_samples, num_features)
y = (torch.rand(num_samples,) * num_classes).to(torch.int64)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_classes)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = dl.SupervisedRunner(
    input_key="features", output_key="logits", target_key="targets", loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=3,
    valid_loader="valid",
    valid_metric="accuracy03",
    minimize_valid_metric=False,
    verbose=True,
    callbacks=[
        dl.AccuracyCallback(
            input_key="logits", target_key="targets", num_classes=num_classes
        ),
        dl.PrecisionRecallF1SupportCallback(
            input_key="logits", target_key="targets", num_classes=num_classes
        ),
        dl.AUCCallback(input_key="logits", target_key="targets"),
    ],
)

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

Classification – PrecisionRecallF1SupportCallback

class catalyst.callbacks.metrics.classification.PrecisionRecallF1SupportCallback(input_key: str, target_key: str, num_classes: int, zero_division: int = 0, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

Multiclass PrecisionRecallF1Support metric callback.

Parameters
  • input_key – input key to use for metric calculation, specifies our y_pred

  • target_key – output key to use for metric calculation, specifies our y_true

  • num_classes – number of classes

  • zero_division – value to set in case of zero division during metrics (precision, recall) computation; should be one of 0 or 1

  • log_on_batch – boolean flag to log computed metrics every batch

  • prefix – metric prefix

  • suffix – metric suffix

Examples:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# sample data
num_samples, num_features, num_classes = int(1e4), int(1e1), 4
X = torch.rand(num_samples, num_features)
y = (torch.rand(num_samples,) * num_classes).to(torch.int64)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_classes)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = dl.SupervisedRunner(
    input_key="features", output_key="logits", target_key="targets", loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=3,
    valid_loader="valid",
    valid_metric="accuracy03",
    minimize_valid_metric=False,
    verbose=True,
    callbacks=[
        dl.AccuracyCallback(
            input_key="logits", target_key="targets", num_classes=num_classes
        ),
        dl.PrecisionRecallF1SupportCallback(
            input_key="logits", target_key="targets", num_classes=num_classes
        ),
        dl.AUCCallback(input_key="logits", target_key="targets"),
    ],
)

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, num_classes: int, zero_division: int = 0, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

Classification – MultilabelPrecisionRecallF1SupportCallback

class catalyst.callbacks.metrics.classification.MultilabelPrecisionRecallF1SupportCallback(input_key: str, target_key: str, num_classes: int, zero_division: int = 0, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

Multilabel PrecisionRecallF1Support metric callback.

Parameters
  • input_key – input key to use for metric calculation, specifies our y_pred

  • target_key – output key to use for metric calculation, specifies our y_true

  • num_classes – number of classes

  • zero_division – value to set in case of zero division during metrics (precision, recall) computation; should be one of 0 or 1

  • log_on_batch – boolean flag to log computed metrics every batch

  • prefix – metric prefix

  • suffix – metric suffix

Examples:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# sample data
num_samples, num_features, num_classes = int(1e4), int(1e1), 4
X = torch.rand(num_samples, num_features)
y = (torch.rand(num_samples, num_classes) > 0.5).to(torch.float32)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_classes)
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = dl.SupervisedRunner(
    input_key="features", output_key="logits", target_key="targets", loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=3,
    valid_loader="valid",
    valid_metric="accuracy",
    minimize_valid_metric=False,
    verbose=True,
    callbacks=[
        dl.BatchTransformCallback(
            transform=torch.sigmoid,
            scope="on_batch_end",
            input_key="logits",
            output_key="scores"
        ),
        dl.AUCCallback(input_key="scores", target_key="targets"),
        dl.MultilabelAccuracyCallback(
            input_key="scores", target_key="targets", threshold=0.5
        ),
        dl.MultilabelPrecisionRecallF1SupportCallback(
            input_key="scores", target_key="targets", num_classes=num_classes
        ),
    ]
)

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, num_classes: int, zero_division: int = 0, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

CMCScoreCallback

class catalyst.callbacks.metrics.cmc_score.CMCScoreCallback(embeddings_key: str, labels_key: str, is_query_key: str, topk_args: Optional[List[int]] = None, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.LoaderMetricCallback

Cumulative Matching Characteristics callback.

This callback was designed to count cumulative matching characteristics. If current object is from query your dataset should output True in is_query_key and false if current object is from gallery. You can see QueryGalleryDataset in catalyst.contrib.datasets.metric_learning for more information. On batch end callback accumulate all embeddings

Parameters
  • embeddings_key – embeddings key in output dict

  • labels_key – labels key in output dict

  • is_query_key – bool key True if current object is from query

  • topk_args – specifies which cmc@K to log. [1] - cmc@1 [1, 3] - cmc@1 and cmc@3 [1, 3, 5] - cmc@1, cmc@3 and cmc@5

  • prefix – metric prefix

  • suffix – metric suffix

Note

You should use it with ControlFlowCallback and add all query/gallery sets to loaders. Loaders should contain “is_query” and “label” key.

Examples:

import os
from torch.optim import Adam
from torch.utils.data import DataLoader
from catalyst import data, dl
from catalyst.contrib import datasets, models, nn
from catalyst.data.transforms import Compose, Normalize, ToTensor


# 1. train and valid loaders
transforms = Compose([ToTensor(), Normalize((0.1307,), (0.3081,))])

train_dataset = datasets.MnistMLDataset(
    root=os.getcwd(), download=True, transform=transforms
    )
sampler = data.BalanceBatchSampler(labels=train_dataset.get_labels(), p=5, k=10)
train_loader = DataLoader(
    dataset=train_dataset, sampler=sampler, batch_size=sampler.batch_size
    )

valid_dataset = datasets.MnistQGDataset(
    root=os.getcwd(), transform=transforms, gallery_fraq=0.2
)
valid_loader = DataLoader(dataset=valid_dataset, batch_size=1024)

# 2. model and optimizer
model = models.MnistSimpleNet(out_features=16)
optimizer = Adam(model.parameters(), lr=0.001)

# 3. criterion with triplets sampling
sampler_inbatch = data.HardTripletsSampler(norm_required=False)
criterion = nn.TripletMarginLossWithSampler(margin=0.5, sampler_inbatch=sampler_inbatch)

# 4. training with catalyst Runner
class CustomRunner(dl.SupervisedRunner):
    def handle_batch(self, batch) -> None:
        if self.is_train_loader:
            images, targets = batch["features"].float(), batch["targets"].long()
            features = self.model(images)
            self.batch = {"embeddings": features, "targets": targets,}
        else:
            images, targets, is_query =                         batch["features"].float(),                         batch["targets"].long(),                         batch["is_query"].bool()
            features = self.model(images)
            self.batch = {
                "embeddings": features, "targets": targets, "is_query": is_query
            }

callbacks = [
    dl.ControlFlowCallback(
        dl.CriterionCallback(
            input_key="embeddings", target_key="targets", metric_key="loss"
        ),
        loaders="train",
    ),
    dl.ControlFlowCallback(
        dl.CMCScoreCallback(
            embeddings_key="embeddings",
            labels_key="targets",
            is_query_key="is_query",
            topk_args=[1],
        ),
        loaders="valid",
    ),
    dl.PeriodicLoaderCallback(
        valid_loader_key="valid", valid_metric_key="cmc01", minimize=False, valid=2
    ),
]

runner = CustomRunner(input_key="features", output_key="embeddings")
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    callbacks=callbacks,
    loaders={"train": train_loader, "valid": valid_loader},
    verbose=False,
    logdir="./logs",
    valid_loader="valid",
    valid_metric="cmc01",
    minimize_valid_metric=False,
    num_epochs=10,
)

Note

Please follow the minimal examples sections for more use cases.

__init__(embeddings_key: str, labels_key: str, is_query_key: str, topk_args: Optional[List[int]] = None, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

ReidCMCScoreCallback

class catalyst.callbacks.metrics.cmc_score.ReidCMCScoreCallback(embeddings_key: str, pids_key: str, cids_key: str, is_query_key: str, topk_args: Optional[List[int]] = None, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.LoaderMetricCallback

Cumulative Matching Characteristics callback for reID case. More information about cmc-based callbacks in CMCScoreCallback’s docs.

Parameters
  • embeddings_key – embeddings key in output dict

  • pids_key – pids key in output dict

  • cids_key – cids key in output dict

  • is_query_key – bool key True if current object is from query

  • topk_args – specifies which cmc@K to log. [1] - cmc@1 [1, 3] - cmc@1 and cmc@3 [1, 3, 5] - cmc@1, cmc@3 and cmc@5

  • prefix – metric prefix

  • suffix – metric suffix

__init__(embeddings_key: str, pids_key: str, cids_key: str, is_query_key: str, topk_args: Optional[List[int]] = None, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

ConfusionMatrixCallback

class catalyst.callbacks.metrics.confusion_matrix.ConfusionMatrixCallback(input_key: str, target_key: str, prefix: Optional[str] = None, class_names: Optional[List[str]] = None, num_classes: Optional[int] = None, normalized: bool = False, plot_params: Optional[Dict] = None)[source]

Bases: catalyst.core.callback.Callback

Callback to plot your confusion matrix to the loggers.

Parameters
  • input_key – key to use from runner.batch, specifies our y_pred

  • target_key – key to use from runner.batch, specifies our y_true

  • prefix – plot name for monitoring tools

  • class_names – list with class names

  • num_classes – number of classes

  • normalized – boolean flag for confusion matrix normalization

  • plot_params – extra params for plt.figure rendering

Note

catalyst[ml] required for this callback

Examples:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# sample data
num_samples, num_features, num_classes = int(1e4), int(1e1), 4
X = torch.rand(num_samples, num_features)
y = (torch.rand(num_samples,) * num_classes).to(torch.int64)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_classes)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = dl.SupervisedRunner(
    input_key="features", output_key="logits", target_key="targets", loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=3,
    valid_loader="valid",
    valid_metric="accuracy03",
    minimize_valid_metric=False,
    verbose=True,
    callbacks=[
        dl.AccuracyCallback(
            input_key="logits", target_key="targets", num_classes=num_classes
        ),
        dl.PrecisionRecallF1SupportCallback(
            input_key="logits", target_key="targets", num_classes=num_classes
        ),
        dl.AUCCallback(input_key="logits", target_key="targets"),
        dl.ConfusionMatrixCallback(
            input_key="logits", target_key="targets", num_classes=num_classes
        ),
    ],
)

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, prefix: Optional[str] = None, class_names: Optional[List[str]] = None, num_classes: Optional[int] = None, normalized: bool = False, plot_params: Optional[Dict] = None)[source]

Callback initialisation.

FunctionalMetricCallback

class catalyst.callbacks.metrics.functional_metric.FunctionalMetricCallback(input_key: Union[str, Iterable[str], Dict[str, str]], target_key: Union[str, Iterable[str], Dict[str, str]], metric_fn: Callable, metric_key: str, compute_on_call: bool = True, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.FunctionalBatchMetricCallback

Parameters
  • input_key – input key to use for metric calculation, specifies our y_pred

  • target_key – output key to use for metric calculation, specifies our y_true

  • metric_fn – metric function, that get outputs, targets and return score as torch.Tensor

  • metric_key – key to store computed metric in runner.batch_metrics dictionary

  • compute_on_call – Computes and returns metric value during metric call. Used for per-batch logging. default: True

  • log_on_batch – boolean flag to log computed metrics every batch

  • prefix – metric prefix

  • suffix – metric suffix

__init__(input_key: Union[str, Iterable[str], Dict[str, str]], target_key: Union[str, Iterable[str], Dict[str, str]], metric_fn: Callable, metric_key: str, compute_on_call: bool = True, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

RecSys – HitrateCallback

class catalyst.callbacks.metrics.recsys.HitrateCallback(input_key: str, target_key: str, topk_args: Optional[List[int]] = None, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

Hitrate metric callback. Computes HR@topk for the specified values of topk.

Parameters
  • input_key – input key to use for metric calculation, specifies our y_pred

  • target_key – output key to use for metric calculation, specifies our y_true

  • topk_args – specifies which HR@K to log: [1] - HR [1, 3] - HR at 1 and 3 [1, 3, 5] - HR at 1, 3 and 5

  • log_on_batch – boolean flag to log computed metrics every batch

  • prefix – metric prefix

  • suffix – metric suffix

Examples:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# sample data
num_users, num_features, num_items = int(1e4), int(1e1), 10
X = torch.rand(num_users, num_features)
y = (torch.rand(num_users, num_items) > 0.5).to(torch.float32)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_items)
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = dl.SupervisedRunner(
    input_key="features", output_key="logits", target_key="targets", loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    num_epochs=3,
    verbose=True,
    callbacks=[
        dl.BatchTransformCallback(
            transform=torch.sigmoid,
            scope="on_batch_end",
            input_key="logits",
            output_key="scores"
        ),
        dl.CriterionCallback(
            input_key="logits", target_key="targets", metric_key="loss"
        ),
        dl.AUCCallback(input_key="scores", target_key="targets"),
        dl.HitrateCallback(
            input_key="scores", target_key="targets", topk_args=(1, 3, 5)
        ),
        dl.MRRCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)),
        dl.MAPCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)),
        dl.NDCGCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)),
        dl.OptimizerCallback(metric_key="loss"),
        dl.SchedulerCallback(),
        dl.CheckpointCallback(
            logdir="./logs", loader_key="valid", metric_key="loss", minimize=True
        ),
    ]
)

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, topk_args: Optional[List[int]] = None, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

RecSys – MAPCallback

class catalyst.callbacks.metrics.recsys.MAPCallback(input_key: str, target_key: str, topk_args: Optional[List[int]] = None, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

MAP metric callback. Computes MAP@topk for the specified values of topk.

Parameters
  • input_key – input key to use for metric calculation, specifies our y_pred

  • target_key – output key to use for metric calculation, specifies our y_true

  • prefix – key for the metric’s name

  • topk_args – specifies which MAP@K to log: [1] - MAP [1, 3] - MAP at 1 and 3 [1, 3, 5] - MAP at 1, 3 and 5

  • log_on_batch – boolean flag to log computed metrics every batch

  • prefix – metric prefix

  • suffix – metric suffix

Examples:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# sample data
num_users, num_features, num_items = int(1e4), int(1e1), 10
X = torch.rand(num_users, num_features)
y = (torch.rand(num_users, num_items) > 0.5).to(torch.float32)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_items)
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = dl.SupervisedRunner(
    input_key="features", output_key="logits", target_key="targets", loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    num_epochs=3,
    verbose=True,
    callbacks=[
        dl.BatchTransformCallback(
            transform=torch.sigmoid,
            scope="on_batch_end",
            input_key="logits",
            output_key="scores"
        ),
        dl.CriterionCallback(
            input_key="logits", target_key="targets", metric_key="loss"
        ),
        dl.AUCCallback(input_key="scores", target_key="targets"),
        dl.HitrateCallback(
            input_key="scores", target_key="targets", topk_args=(1, 3, 5)
        ),
        dl.MRRCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)),
        dl.MAPCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)),
        dl.NDCGCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)),
        dl.OptimizerCallback(metric_key="loss"),
        dl.SchedulerCallback(),
        dl.CheckpointCallback(
            logdir="./logs", loader_key="valid", metric_key="loss", minimize=True
        ),
    ]
)

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, topk_args: Optional[List[int]] = None, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

RecSys – MRRCallback

class catalyst.callbacks.metrics.recsys.MRRCallback(input_key: str, target_key: str, topk_args: Optional[List[int]] = None, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

MRR metric callback. Computes MRR@topk for the specified values of topk.

Parameters
  • input_key – input key to use for metric calculation, specifies our y_pred

  • target_key – output key to use for metric calculation, specifies our y_true

  • prefix – key for the metric’s name

  • topk_args – specifies which MRR@K to log: [1] - MRR [1, 3] - MRR at 1 and 3 [1, 3, 5] - MRR at 1, 3 and 5

  • log_on_batch – boolean flag to log computed metrics every batch

  • prefix – metric prefix

  • suffix – metric suffix

Examples:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# sample data
num_users, num_features, num_items = int(1e4), int(1e1), 10
X = torch.rand(num_users, num_features)
y = (torch.rand(num_users, num_items) > 0.5).to(torch.float32)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_items)
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = dl.SupervisedRunner(
    input_key="features", output_key="logits", target_key="targets", loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    num_epochs=3,
    verbose=True,
    callbacks=[
        dl.BatchTransformCallback(
            transform=torch.sigmoid,
            scope="on_batch_end",
            input_key="logits",
            output_key="scores"
        ),
        dl.CriterionCallback(
            input_key="logits", target_key="targets", metric_key="loss"
        ),
        dl.AUCCallback(input_key="scores", target_key="targets"),
        dl.HitrateCallback(
            input_key="scores", target_key="targets", topk_args=(1, 3, 5)
        ),
        dl.MRRCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)),
        dl.MAPCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)),
        dl.NDCGCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)),
        dl.OptimizerCallback(metric_key="loss"),
        dl.SchedulerCallback(),
        dl.CheckpointCallback(
            logdir="./logs", loader_key="valid", metric_key="loss", minimize=True
        ),
    ]
)

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, topk_args: Optional[List[int]] = None, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

RecSys – NDCGCallback

class catalyst.callbacks.metrics.recsys.NDCGCallback(input_key: str, target_key: str, topk_args: Optional[List[int]] = None, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

NDCG metric callback. Computes NDCG@topk for the specified values of topk.

Parameters
  • input_key – input key to use for metric calculation, specifies our y_pred

  • target_key – output key to use for metric calculation, specifies our y_true

  • prefix – key for the metric’s name

  • topk_args – specifies which NDCG@K to log: [1] - NDCG [1, 3] - NDCG at 1 and 3 [1, 3, 5] - NDCG at 1, 3 and 5

  • log_on_batch – boolean flag to log computed metrics every batch

  • prefix – metric prefix

  • suffix – metric suffix

Examples:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# sample data
num_users, num_features, num_items = int(1e4), int(1e1), 10
X = torch.rand(num_users, num_features)
y = (torch.rand(num_users, num_items) > 0.5).to(torch.float32)

# pytorch loaders
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, num_items)
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])

# model training
runner = dl.SupervisedRunner(
    input_key="features", output_key="logits", target_key="targets", loss_key="loss"
)
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    num_epochs=3,
    verbose=True,
    callbacks=[
        dl.BatchTransformCallback(
            transform=torch.sigmoid,
            scope="on_batch_end",
            input_key="logits",
            output_key="scores"
        ),
        dl.CriterionCallback(
            input_key="logits", target_key="targets", metric_key="loss"
        ),
        dl.AUCCallback(input_key="scores", target_key="targets"),
        dl.HitrateCallback(
            input_key="scores", target_key="targets", topk_args=(1, 3, 5)
        ),
        dl.MRRCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)),
        dl.MAPCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)),
        dl.NDCGCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)),
        dl.OptimizerCallback(metric_key="loss"),
        dl.SchedulerCallback(),
        dl.CheckpointCallback(
            logdir="./logs", loader_key="valid", metric_key="loss", minimize=True
        ),
    ]
)

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, topk_args: Optional[List[int]] = None, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

Segmentation – DiceCallback

class catalyst.callbacks.metrics.segmentation.DiceCallback(input_key: str, target_key: str, class_dim: int = 1, weights: Optional[List[float]] = None, class_names: Optional[List[str]] = None, threshold: Optional[float] = None, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

Dice metric callback.

Parameters
  • input_key – input key to use for metric calculation, specifies our y_pred

  • target_key – output key to use for metric calculation, specifies our y_true

  • class_dim – indicates class dimension (K) for outputs and targets tensors (default = 1)

  • weights – class weights

  • class_names – class names

  • threshold – threshold for outputs binarization

  • log_on_batch – boolean flag to log computed metrics every batch

  • prefix – metric prefix

  • suffix – metric suffix

Examples:

import os
import torch
from torch import nn
from torch.utils.data import DataLoader
from catalyst import dl
from catalyst.data.transforms import ToTensor
from catalyst.contrib.datasets import MNIST
from catalyst.contrib.nn import IoULoss


model = nn.Sequential(
    nn.Conv2d(1, 1, 3, 1, 1), nn.ReLU(),
    nn.Conv2d(1, 1, 3, 1, 1), nn.Sigmoid(),
)
criterion = IoULoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.02)

loaders = {
    "train": DataLoader(
        MNIST(os.getcwd(), train=True, download=True, transform=ToTensor()),
        batch_size=32
    ),
    "valid": DataLoader(
        MNIST(os.getcwd(), train=False, download=True, transform=ToTensor()),
        batch_size=32
    ),
}

class CustomRunner(dl.SupervisedRunner):
    def handle_batch(self, batch):
        x = batch[self._input_key]
        x_noise = (x + torch.rand_like(x)).clamp_(0, 1)
        x_ = self.model(x_noise)
        self.batch = {self._input_key: x, self._output_key: x_, self._target_key: x}

runner = CustomRunner(
    input_key="features", output_key="scores", target_key="targets", loss_key="loss"
)
# model training
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    loaders=loaders,
    num_epochs=1,
    callbacks=[
        dl.IOUCallback(input_key="scores", target_key="targets"),
        dl.DiceCallback(input_key="scores", target_key="targets"),
        dl.TrevskyCallback(input_key="scores", target_key="targets", alpha=0.2),
    ],
    logdir="./logdir",
    valid_loader="valid",
    valid_metric="loss",
    minimize_valid_metric=True,
    verbose=True,
)

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, class_dim: int = 1, weights: Optional[List[float]] = None, class_names: Optional[List[str]] = None, threshold: Optional[float] = None, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

Segmentation – IOUCallback

class catalyst.callbacks.metrics.segmentation.IOUCallback(input_key: str, target_key: str, class_dim: int = 1, weights: Optional[List[float]] = None, class_names: Optional[List[str]] = None, threshold: Optional[float] = None, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

IOU metric callback.

Parameters
  • input_key – input key to use for metric calculation, specifies our y_pred

  • target_key – output key to use for metric calculation, specifies our y_true

  • class_dim – indicates class dimension (K) for outputs and targets tensors (default = 1)

  • weights – class weights

  • class_names – class names

  • threshold – threshold for outputs binarization

  • log_on_batch – boolean flag to log computed metrics every batch

  • prefix – metric prefix

  • suffix – metric suffix

Examples:

import os
import torch
from torch import nn
from torch.utils.data import DataLoader
from catalyst import dl
from catalyst.data.transforms import ToTensor
from catalyst.contrib.datasets import MNIST
from catalyst.contrib.nn import IoULoss


model = nn.Sequential(
    nn.Conv2d(1, 1, 3, 1, 1), nn.ReLU(),
    nn.Conv2d(1, 1, 3, 1, 1), nn.Sigmoid(),
)
criterion = IoULoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.02)

loaders = {
    "train": DataLoader(
        MNIST(os.getcwd(), train=True, download=True, transform=ToTensor()),
        batch_size=32
    ),
    "valid": DataLoader(
        MNIST(os.getcwd(), train=False, download=True, transform=ToTensor()),
        batch_size=32
    ),
}

class CustomRunner(dl.SupervisedRunner):
    def handle_batch(self, batch):
        x = batch[self._input_key]
        x_noise = (x + torch.rand_like(x)).clamp_(0, 1)
        x_ = self.model(x_noise)
        self.batch = {self._input_key: x, self._output_key: x_, self._target_key: x}

runner = CustomRunner(
    input_key="features", output_key="scores", target_key="targets", loss_key="loss"
)
# model training
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    loaders=loaders,
    num_epochs=1,
    callbacks=[
        dl.IOUCallback(input_key="scores", target_key="targets"),
        dl.DiceCallback(input_key="scores", target_key="targets"),
        dl.TrevskyCallback(input_key="scores", target_key="targets", alpha=0.2),
    ],
    logdir="./logdir",
    valid_loader="valid",
    valid_metric="loss",
    minimize_valid_metric=True,
    verbose=True,
)

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, class_dim: int = 1, weights: Optional[List[float]] = None, class_names: Optional[List[str]] = None, threshold: Optional[float] = None, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.

Segmentation – TrevskyCallback

class catalyst.callbacks.metrics.segmentation.TrevskyCallback(input_key: str, target_key: str, alpha: float, beta: Optional[float] = None, class_dim: int = 1, weights: Optional[List[float]] = None, class_names: Optional[List[str]] = None, threshold: Optional[float] = None, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

Trevsky metric callback.

Parameters
  • input_key – input key to use for metric calculation, specifies our y_pred

  • target_key – output key to use for metric calculation, specifies our y_true

  • alpha – false negative coefficient, bigger alpha bigger penalty for false negative. if beta is None, alpha must be in (0, 1)

  • beta – false positive coefficient, bigger alpha bigger penalty for false positive. Must be in (0, 1), if None beta = (1 - alpha)

  • class_dim – indicates class dimension (K) for outputs and targets tensors (default = 1)

  • weights – class weights

  • class_names – class names

  • threshold – threshold for outputs binarization

  • log_on_batch – boolean flag to log computed metrics every batch

  • prefix – metric prefix

  • suffix – metric suffix

Examples:

import os
import torch
from torch import nn
from torch.utils.data import DataLoader
from catalyst import dl
from catalyst.data.transforms import ToTensor
from catalyst.contrib.datasets import MNIST
from catalyst.contrib.nn import IoULoss


model = nn.Sequential(
    nn.Conv2d(1, 1, 3, 1, 1), nn.ReLU(),
    nn.Conv2d(1, 1, 3, 1, 1), nn.Sigmoid(),
)
criterion = IoULoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.02)

loaders = {
    "train": DataLoader(
        MNIST(os.getcwd(), train=True, download=True, transform=ToTensor()),
        batch_size=32
    ),
    "valid": DataLoader(
        MNIST(os.getcwd(), train=False, download=True, transform=ToTensor()),
        batch_size=32
    ),
}

class CustomRunner(dl.SupervisedRunner):
    def handle_batch(self, batch):
        x = batch[self._input_key]
        x_noise = (x + torch.rand_like(x)).clamp_(0, 1)
        x_ = self.model(x_noise)
        self.batch = {self._input_key: x, self._output_key: x_, self._target_key: x}

runner = CustomRunner(
    input_key="features", output_key="scores", target_key="targets", loss_key="loss"
)
# model training
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    loaders=loaders,
    num_epochs=1,
    callbacks=[
        dl.IOUCallback(input_key="scores", target_key="targets"),
        dl.DiceCallback(input_key="scores", target_key="targets"),
        dl.TrevskyCallback(input_key="scores", target_key="targets", alpha=0.2),
    ],
    logdir="./logdir",
    valid_loader="valid",
    valid_metric="loss",
    minimize_valid_metric=True,
    verbose=True,
)

Note

Please follow the minimal examples sections for more use cases.

__init__(input_key: str, target_key: str, alpha: float, beta: Optional[float] = None, class_dim: int = 1, weights: Optional[List[float]] = None, class_names: Optional[List[str]] = None, threshold: Optional[float] = None, log_on_batch: bool = True, prefix: Optional[str] = None, suffix: Optional[str] = None)[source]

Init.