Shortcuts

Callbacks

Main

BatchOverfitCallback

class catalyst.callbacks.batch_overfit.BatchOverfitCallback(**kwargs)[source]

Bases: catalyst.core.callback.Callback

Callback for ovefitting loaders with specified number of batches. By default we use 1 batch for loader.

For example, if you have train, train_additional, valid and valid_additional loaders and wan’t to overfit train on first 1 batch, train_additional on first 2 batches, valid - on first 20% of batches and valid_additional - on 50% batches:

from catalyst.dl import (
    SupervisedRunner, BatchOverfitCallback,
)
runner = SupervisedRunner()
runner.train(
    ...
    loaders={
        "train": ...,
        "train_additional": ...,
        "valid": ...,
        "valid_additional":...
    }
    ...
    callbacks=[
        ...
        BatchOverfitCallback(
            train_additional=2,
            valid=0.2,
            valid_additional=0.5
        ),
        ...
    ]
    ...
)

Minimal working example

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# data
num_samples, num_features = int(1e4), int(1e1)
X, y = torch.rand(num_samples, num_features), torch.rand(num_samples)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, 1)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [3, 6])

# model training
runner = dl.SupervisedRunner()
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=8,
    verbose=True,
    callbacks=[dl.BatchOverfitCallback(train=10, valid=0.5)]
)
__init__(**kwargs)[source]
Parameters

kwargs – loader names and their number of batches to overfit.

on_epoch_end(runner: IRunner)[source]

Unwraps loaders for current epoch.

Parameters

runner – current runner

on_epoch_start(runner: IRunner) → None[source]

Wraps loaders for current epoch. If number-of-batches for loader is not provided then the first batch from loader will be used for overfitting.

Parameters

runner – current runner

Checkpoint

class catalyst.callbacks.checkpoint.CheckpointCallback(save_n_best: int = 1, resume: str = None, resume_dir: str = None, metrics_filename: str = '_metrics.json', load_on_stage_start: Union[str, Dict[str, str]] = None, load_on_stage_end: Union[str, Dict[str, str]] = None)[source]

Bases: catalyst.callbacks.checkpoint.BaseCheckpointCallback

Checkpoint callback to save/restore your model/criterion/optimizer/scheduler.

__init__(save_n_best: int = 1, resume: str = None, resume_dir: str = None, metrics_filename: str = '_metrics.json', load_on_stage_start: Union[str, Dict[str, str]] = None, load_on_stage_end: Union[str, Dict[str, str]] = None)[source]
Parameters
  • save_n_best – number of best checkpoint to keep, if 0 then store only last state of model and load_on_stage_end should be one of last or last_full.

  • resume – path to checkpoint to load and initialize runner state

  • resume_dir – directory with checkpoints, if specified in combination with resume than resume checkpoint will be loaded from resume_dir

  • metrics_filename – filename to save metrics in checkpoint folder. Must ends on .json or .yml

  • load_on_stage_start (str or Dict[str, str]) –

    load specified state/model at stage start.

    If passed string then will be performed initialization from specified state (best/best_full/last/last_full) or checkpoint file.

    If passed dict then will be performed initialization only for specified parts - model, criterion, optimizer, scheduler.

    Example

    >>> # possible checkpoints to use:
    >>> #   "best"/"best_full"/"last"/"last_full"
    >>> #   or path to specific checkpoint
    >>> to_load = {
    >>>    "model": "path/to/checkpoint.pth",
    >>>    "criterion": "best",
    >>>    "optimizer": "last_full",
    >>>    "scheduler": "best_full",
    >>> }
    >>> CheckpointCallback(load_on_stage_start=to_load)
    

    All other keys instead of "model", "criterion", "optimizer" and "scheduler" will be ignored.

    If None or an empty dict (or dict without mentioned above keys) then no action is required at stage start and:

    • Config API - will be used best state of model

    • Notebook API - no action will be performed (will be used the last state)

    NOTE: Loading will be performed on all stages except first.

    NOTE: Criterion, optimizer and scheduler are optional keys and should be loaded from full checkpoint.

    Model state can be loaded from any checkpoint.

    When dict contains keys for model and some other part (for example {"model": "last", "optimizer": "last"}) and they match in prefix ("best" and "best_full") then will be loaded full checkpoint because it contains required states.

  • load_on_stage_end (str or Dict[str, str]) –

    load specified state/model at stage end.

    If passed string then will be performed initialization from specified state (best/best_full/last/last_full) or checkpoint file.

    If passed dict then will be performed initialization only for specified parts - model, criterion, optimizer, scheduler. Logic for dict is the same as for load_on_stage_start.

    If None then no action is required at stage end and will be used the last runner.

    NOTE: Loading will be performed always at stage end.

on_epoch_end(runner: IRunner) → None[source]

Collect and save checkpoint after epoch.

Parameters

runner – current runner

on_stage_end(runner: IRunner) → None[source]

Show information about best checkpoints during the stage and load model specified in load_on_stage_end.

Parameters

runner – current runner

on_stage_start(runner: IRunner) → None[source]

Setup model for stage.

Note

If CheckpointCallback initialized with resume (as path to checkpoint file) or resume (as filename) and resume_dir (as directory with file) then will be performed loading checkpoint.

Parameters

runner – current runner

process_checkpoint(logdir: Union[str, pathlib.Path], checkpoint: Dict, is_best: bool, main_metric: str = 'loss', minimize_metric: bool = True) → None[source]

Save checkpoint and metrics.

Parameters
  • logdir (str or Path object) – directory for storing checkpoints

  • checkpoint – dict with checkpoint data

  • is_best – indicator to save best checkpoint, if true then will be saved two additional checkpoints - best and best_full.

  • main_metric – metric to use for selecting the best model

  • minimize_metric – indicator for selecting best metric, if true then best metric will be the metric with the lowest value, otherwise with the greatest value.

process_metrics(last_valid_metrics: Dict[str, float]) → Dict[source]

Add last validation metrics to list of previous validation metrics and keep save_n_best metrics.

Parameters

last_valid_metrics – dict with metrics from last validation step.

Returns

processed metrics

Return type

OrderedDict

truncate_checkpoints(minimize_metric: bool) → None[source]

Keep save_n_best checkpoints based on main metric.

Parameters

minimize_metric – if True then keep save_n_best checkpoints with the lowest/highest values of the main metric.

class catalyst.callbacks.checkpoint.IterationCheckpointCallback(save_n_last: int = 1, period: int = 100, stage_restart: bool = True, metrics_filename: str = '_metrics_iter.json', load_on_stage_end: str = 'best_full')[source]

Bases: catalyst.callbacks.checkpoint.BaseCheckpointCallback

Iteration checkpoint callback to save your model/criterion/optimizer.

__init__(save_n_last: int = 1, period: int = 100, stage_restart: bool = True, metrics_filename: str = '_metrics_iter.json', load_on_stage_end: str = 'best_full')[source]
Parameters
  • save_n_last – number of last checkpoint to keep

  • period – save the checkpoint every period

  • stage_restart – restart counter every stage or not

  • metrics_filename – filename to save metrics in checkpoint folder. Must ends on .json or .yml

  • load_on_stage_end – name of the model to load at the end of the stage. You can use best, best_full (default) to load the best model according to validation metrics, or last last_full to use just the last one.

on_batch_end(runner: IRunner)[source]

Save checkpoint based on batches count.

Parameters

runner – current runner

on_stage_end(runner: IRunner)[source]

Load model specified in load_on_stage_end.

Parameters

runner – current runner

on_stage_start(runner: IRunner)[source]

Reset iterations counter.

Parameters

runner – current runner

process_checkpoint(logdir: Union[str, pathlib.Path], checkpoint: Dict, batch_metrics: Dict[str, float])[source]

Save checkpoint and metrics.

Parameters
  • logdir (str or Path object) – directory for storing checkpoints

  • checkpoint – dict with checkpoint data

  • batch_metrics – dict with metrics based on a few batches

process_metrics() → Dict[source]

Update metrics with last save_n_last checkpoints.

Returns

updated metrics

truncate_checkpoints(**kwargs) → None[source]

Keep save_n_best checkpoints based on main metric.

Parameters

**kwargs – extra params

class catalyst.callbacks.checkpoint.ICheckpointCallback(order: int, node: int = <CallbackNode.All: 0>, scope: int = <CallbackScope.Stage: 0>)[source]

Bases: catalyst.core.callback.Callback

Checkpoint callback interface, abstraction over model checkpointing step.

class catalyst.callbacks.checkpoint.BaseCheckpointCallback(metrics_filename: str = '_metrics.json')[source]

Bases: catalyst.callbacks.checkpoint.ICheckpointCallback

Base class for all checkpoint callbacks.

__init__(metrics_filename: str = '_metrics.json')[source]
Parameters

metrics_filename – filename to save metrics in checkpoint folder. Must ends on .json or .yml

on_exception(runner: IRunner)[source]

Expection handler.

Parameters

runner – current runner

Control Flow

class catalyst.callbacks.control_flow.ControlFlowCallback(base_callback: catalyst.core.callback.Callback, epochs: Union[int, Sequence[int]] = None, ignore_epochs: Union[int, Sequence[int]] = None, loaders: Union[str, Sequence[str], Mapping[str, Union[int, Sequence[int]]]] = None, ignore_loaders: Union[str, Sequence[str], Mapping[str, Union[int, Sequence[int]]]] = None, filter_fn: Union[str, Callable[[str, int, str], bool]] = None, use_global_epochs: bool = False)[source]

Bases: catalyst.core.callback.CallbackWrapper

Enable/disable callback execution on different stages, loaders and epochs.

Note

Please run experiment with check option to check if everything works as expected with this callback.

For example, if you don’t want to compute loss on a validation you can ignore CriterionCallback, for notebook API need to wrap callback:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst.dl import (
    SupervisedRunner, AccuracyCallback,
    CriterionCallback, ControlFlowCallback,
)

num_samples, num_features = 10_000, 10
n_classes = 10
X = torch.rand(num_samples, num_features)
y = torch.randint(0, n_classes, [num_samples])
loader = DataLoader(TensorDataset(X, y), batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

model = torch.nn.Linear(num_features, n_classes)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [3, 6])

runner = SupervisedRunner()
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=5,
    verbose=False,
    main_metric="accuracy03",
    minimize_metric=False,
    callbacks=[
        AccuracyCallback(
            accuracy_args=[1, 3, 5]
        ),
        ControlFlowCallback(
            base_callback=CriterionCallback(),
            ignore_loaders="valid"  # or loaders="train"
        )
    ]
)

In config API need to use _wrapper argument:

callbacks_params:
  ...
  loss:
    _wrapper:
       callback: ControlFlowCallback
       ignore_loaders: valid
    callback: CriterionCallback
  ...
__init__(base_callback: catalyst.core.callback.Callback, epochs: Union[int, Sequence[int]] = None, ignore_epochs: Union[int, Sequence[int]] = None, loaders: Union[str, Sequence[str], Mapping[str, Union[int, Sequence[int]]]] = None, ignore_loaders: Union[str, Sequence[str], Mapping[str, Union[int, Sequence[int]]]] = None, filter_fn: Union[str, Callable[[str, int, str], bool]] = None, use_global_epochs: bool = False)[source]
Parameters
  • base_callback – callback to wrap

  • epochs

    epochs where need to enable callback, on other epochs callback will be disabled.

    If passed int/float then callback will be enabled with period specified as epochs value (epochs expression epoch_number % epochs == 0) and disabled on other epochs.

    If passed list of epochs then will be executed callback on specified epochs.

    Default value is None.

  • ignore_epochs:

    epochs where: need to disable callback, on other epochs callback will be enabled.

    If passed int/float then callback will be disabled with period specified as epochs value (epochs expression epoch_number % epochs != 0) and enabled on other epochs.

    If passed list of epochs then will be disabled callback on specified epochs.

    Default value is None.

  • loaders (str/Sequence[str]/Mapping[str, int/Sequence[str]]) –

    loaders where should be enabled callback, on other loaders callback will be disabled.

    If passed string object then will be disabled callback for loader with specified name.

    If passed list/tuple of strings then will be disabled callback for loaders with specified names.

    If passed dictionary where key is a string and values int or list of integers then callback will be disabled on epochs (dictionary value) for specified loader (dictionary key).

    Default value is None.

  • ignore_loaders (str/Sequence[str]/Mapping[str, int/Sequence[str]]) –

    loader names where should be disabled callback, on other loaders callback will be enabled.

    If passed string object then will be disabled callback for loader with specified name.

    If passed list/tuple of strings then will be disabled callback for loaders with specified names.

    If passed dictionary where key is a string and values int or list of integers then callback will be disabled on epochs (dictionary value) for specified loader (dictionary key).

    Default value is None.

  • filter_fn (str or Callable[[str, int, str], bool]) –

    function to use instead of loaders or epochs arguments.

    If the object passed to a filter_fn is a string then it will be interpreted as python code. Expected lambda function with three arguments stage name (str), epoch number (int), loader name (str) and this function should return True if callback should be enabled on some condition.

    If passed callable object then it should accept three arguments - stage name (str), epoch number (int), loader name (str) and should return True if callback should be enabled on some condition othervise should return False.

    Default value is None.

    Examples:

    # enable callback on all loaders
    # exept "train" loader every 2 epochs
    ControlFlowCallback(
        ...
        filter_fn=lambda s, e, l: l != "train" and e % 2 == 0
        ...
    )
    # or with string equivalent
    ControlFlowCallback(
        ...
        filter_fn="lambda s, e, l: l != 'train' and e % 2 == 0"
        ...
    )
    

  • use_global_epochs – if True then will be used global epochs instead of epochs in a stage, the default value is False

on_loader_end(runner: IRunner) → None[source]

Reset status of callback

Parameters

runner – current runner

on_loader_start(runner: IRunner) → None[source]

Check if current epoch should be skipped.

Parameters

runner – current runner

Criterion

class catalyst.callbacks.criterion.CriterionCallback(input_key: Union[str, List[str], Dict[str, str]] = 'targets', output_key: Union[str, List[str], Dict[str, str]] = 'logits', prefix: str = 'loss', criterion_key: str = None, multiplier: float = 1.0, **metric_kwargs)[source]

Bases: catalyst.callbacks.metric.IBatchMetricCallback

Callback for that measures loss with specified criterion.

__init__(input_key: Union[str, List[str], Dict[str, str]] = 'targets', output_key: Union[str, List[str], Dict[str, str]] = 'logits', prefix: str = 'loss', criterion_key: str = None, multiplier: float = 1.0, **metric_kwargs)[source]
Parameters
  • input_key (Union[str, List[str], Dict[str, str]]) – key/list/dict of keys that takes values from the input dictionary If ‘__all__’, the whole input will be passed to the criterion If None, empty dict will be passed to the criterion.

  • output_key (Union[str, List[str], Dict[str, str]]) – key/list/dict of keys that takes values from the input dictionary If ‘__all__’, the whole output will be passed to the criterion If None, empty dict will be passed to the criterion.

  • prefix – prefix for metrics and output key for loss in runner.batch_metrics dictionary

  • criterion_key – A key to take a criterion in case there are several of them and they are in a dictionary format.

  • multiplier – scale factor for the output loss.

property metric_fn

Criterion function.

on_stage_start(runner: IRunner)[source]

Checks that the current stage has correct criterion.

Parameters

runner – current runner

Early Stop

class catalyst.callbacks.early_stop.CheckRunCallback(num_batch_steps: int = 3, num_epoch_steps: int = 2)[source]

Bases: catalyst.core.callback.Callback

Executes only a pipeline part from the Experiment.

Minimal working example (Notebook API):

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# data
num_samples, num_features = int(1e4), int(1e1)
X, y = torch.rand(num_samples, num_features), torch.rand(num_samples)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, 1)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [3, 6])

# model training
runner = dl.SupervisedRunner()
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=8,
    verbose=True,
    callbacks=[
        dl.CheckRunCallback(num_batch_steps=3, num_epoch_steps=3)
    ]
)
__init__(num_batch_steps: int = 3, num_epoch_steps: int = 2)[source]
Parameters
  • num_batch_steps – number of batches to iterate in epoch

  • num_epoch_steps – number of epoch to perform in a stage

on_batch_end(runner: IRunner)[source]

Check if iterated specified number of batches.

Parameters

runner – current runner

on_epoch_end(runner: IRunner)[source]

Check if iterated specified number of epochs.

Parameters

runner – current runner

class catalyst.callbacks.early_stop.EarlyStoppingCallback(patience: int, metric: str = 'loss', minimize: bool = True, min_delta: float = 1e-06)[source]

Bases: catalyst.core.callback.Callback

Early exit based on metric.

Minimal working example (Notebook API):

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl

# data
num_samples, num_features = int(1e4), int(1e1)
X, y = torch.rand(num_samples, num_features), torch.rand(num_samples)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

# model, criterion, optimizer, scheduler
model = torch.nn.Linear(num_features, 1)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [3, 6])

# model training
runner = dl.SupervisedRunner()
runner.train(
  model=model,
  criterion=criterion,
  optimizer=optimizer,
  scheduler=scheduler,
  loaders=loaders,
  logdir="./logdir",
  num_epochs=8,
  verbose=True,
  callbacks=[
    dl.EarlyStoppingCallback(patience=2, metric="loss", minimize=True)
  ]
)

Example of usage in config API:

stages:
  ...
  stage_N:
    ...
    callbacks_params:
      ...
      early_stopping:
        callback: EarlyStoppingCallback
        # arguments for EarlyStoppingCallback
        patience: 5
        metric: my_metric
        minimize: true
  ...
__init__(patience: int, metric: str = 'loss', minimize: bool = True, min_delta: float = 1e-06)[source]
Parameters
  • patience – number of epochs with no improvement after which training will be stopped.

  • metric – metric name to use for early stopping, default is "loss".

  • minimize – if True then expected that metric should decrease and early stopping will be performed only when metric stops decreasing. If False then expected that metric should increase. Default value True.

  • min_delta – minimum change in the monitored metric to qualify as an improvement, i.e. an absolute change of less than min_delta, will count as no improvement, default value is 1e-6.

on_epoch_end(runner: IRunner) → None[source]

Check if should be performed early stopping.

Parameters

runner – current runner

Exception

class catalyst.callbacks.exception.ExceptionCallback[source]

Bases: catalyst.core.callback.Callback

Handles python exceptions during run.

__init__()[source]

Initialisation for ExceptionCallback.

on_exception(runner: IRunner) → None[source]

Exception handle hook. # noqa: DAR401

Parameters

runner – experiment runner

Raises

Exception – if during exception handling, # noqa: DAR402 we still need to reraise exception

Logging

class catalyst.callbacks.logging.ILoggerCallback(order: int, node: int = <CallbackNode.All: 0>, scope: int = <CallbackScope.Stage: 0>)[source]

Bases: catalyst.core.callback.Callback

Logger callback interface, abstraction over logging step

class catalyst.callbacks.logging.ConsoleLogger[source]

Bases: catalyst.callbacks.logging.ILoggerCallback

Logger callback, translates runner.*_metrics to console and text file.

__init__()[source]

Init ConsoleLogger.

on_epoch_end(runner: IRunner)[source]

Translate runner.metric_manager to console and text file at the end of an epoch.

Parameters

runner – current runner instance

on_stage_end(runner: IRunner)[source]

Called at the end of each stage.

on_stage_start(runner: IRunner)[source]

Prepare runner.logdir for the current stage.

class catalyst.callbacks.logging.TensorboardLogger(metric_names: List[str] = None, log_on_batch_end: bool = True, log_on_epoch_end: bool = True)[source]

Bases: catalyst.callbacks.logging.ILoggerCallback

Logger callback, translates runner.metric_manager to tensorboard.

__init__(metric_names: List[str] = None, log_on_batch_end: bool = True, log_on_epoch_end: bool = True)[source]
Parameters
  • metric_names – list of metric names to log, if none - logs everything

  • log_on_batch_end – logs per-batch metrics if set True

  • log_on_epoch_end – logs per-epoch metrics if set True

on_batch_end(runner: IRunner)[source]

Translate batch metrics to tensorboard.

on_epoch_end(runner: IRunner)[source]

Translate epoch metrics to tensorboard.

on_loader_start(runner: IRunner)[source]

Prepare tensorboard writers for the current stage.

on_stage_end(runner: IRunner)[source]

Close opened tensorboard writers.

on_stage_start(runner: IRunner) → None[source]

Stage start hook. Check logdir correctness.

Parameters

runner – current runner

class catalyst.callbacks.logging.VerboseLogger(always_show: List[str] = None, never_show: List[str] = None)[source]

Bases: catalyst.callbacks.logging.ILoggerCallback

Logs the params into console.

__init__(always_show: List[str] = None, never_show: List[str] = None)[source]
Parameters
  • always_show – list of metrics to always show if None default is ["_timer/_fps"] to remove always_show metrics set it to an empty list []

  • never_show – list of metrics which will not be shown

on_batch_end(runner: IRunner)[source]

Update tqdm progress bar at the end of each batch.

on_exception(runner: IRunner)[source]

Called if an Exception was raised.

on_loader_end(runner: IRunner)[source]

Cleanup and close tqdm progress bar.

on_loader_start(runner: IRunner)[source]

Init tqdm progress bar.

Metric

class catalyst.callbacks.metric.IMetricCallback(prefix: str, input_key: Union[str, List[str], Dict[str, str]] = 'targets', output_key: Union[str, List[str], Dict[str, str]] = 'logits', multiplier: float = 1.0, **metrics_kwargs)[source]

Bases: abc.ABC, catalyst.core.callback.Callback

Callback abstraction for metric computation.

__init__(prefix: str, input_key: Union[str, List[str], Dict[str, str]] = 'targets', output_key: Union[str, List[str], Dict[str, str]] = 'logits', multiplier: float = 1.0, **metrics_kwargs)[source]
Parameters
  • prefix – key prefix to store computed batch/loader/epoch metrics

  • input_key – input key to use for metric calculation; specifies our y_true

  • output_key – output key to use for metric calculation; specifies our y_pred

  • multiplier – scalar for metric reweighting

  • **metrics_kwargs – extra metric params to pass for metric computation

abstract property metric_fn

Specifies used metric function.

class catalyst.callbacks.metric.IBatchMetricCallback(prefix: str, input_key: Union[str, List[str], Dict[str, str]] = 'targets', output_key: Union[str, List[str], Dict[str, str]] = 'logits', multiplier: float = 1.0, **metrics_kwargs)[source]

Bases: catalyst.callbacks.metric.IMetricCallback

Batch-based metric callback. Computes metric on batch and saves for logging.

on_batch_end(runner: IRunner) → None[source]

Computes metrics and add them to batch metrics.

class catalyst.callbacks.metric.ILoaderMetricCallback(**kwargs)[source]

Bases: catalyst.callbacks.metric.IMetricCallback

Loader-based metric callback. Stores input/output values during loaders run and computes metric in the end.

__init__(**kwargs)[source]

Init.

Parameters

**kwargsIMetricCallback params.

on_batch_end(runner: IRunner) → None[source]

Stores new input/output for the metric computation.

on_loader_end(runner: IRunner)[source]

Computes loader-based metric.

Parameters

runner – current runner

on_loader_start(runner: IRunner)[source]

Reinitialises internal storage.

class catalyst.callbacks.metric.BatchMetricCallback(prefix: str, metric_fn: Callable, input_key: Union[str, List[str], Dict[str, str]] = 'targets', output_key: Union[str, List[str], Dict[str, str]] = 'logits', multiplier: float = 1.0, **metric_kwargs)[source]

Bases: catalyst.callbacks.metric.IBatchMetricCallback

A callback that returns single metric on runner.on_batch_end.

__init__(prefix: str, metric_fn: Callable, input_key: Union[str, List[str], Dict[str, str]] = 'targets', output_key: Union[str, List[str], Dict[str, str]] = 'logits', multiplier: float = 1.0, **metric_kwargs)[source]

Init.

Parameters
  • prefix – key prefix to store computed batch/loader/epoch metrics

  • input_key – input key to use for metric calculation; specifies our y_true

  • output_key – output key to use for metric calculation; specifies our y_pred

  • multiplier – scalar for metric reweighting

  • **metrics_kwargs – extra metric params to pass for metric computation

property metric_fn

Specifies used metric function.

class catalyst.callbacks.metric.LoaderMetricCallback(prefix: str, metric_fn: Callable, input_key: Union[str, List[str], Dict[str, str]] = 'targets', output_key: Union[str, List[str], Dict[str, str]] = 'logits', multiplier: float = 1.0, **metric_kwargs)[source]

Bases: catalyst.callbacks.metric.ILoaderMetricCallback

A callback that returns single metric on runner.on_batch_end.

__init__(prefix: str, metric_fn: Callable, input_key: Union[str, List[str], Dict[str, str]] = 'targets', output_key: Union[str, List[str], Dict[str, str]] = 'logits', multiplier: float = 1.0, **metric_kwargs)[source]

Init.

Parameters
  • prefix – key prefix to store computed batch/loader/epoch metrics

  • input_key – input key to use for metric calculation; specifies our y_true

  • output_key – output key to use for metric calculation; specifies our y_pred

  • multiplier – scalar for metric reweighting

  • **metrics_kwargs – extra metric params to pass for metric computation

property metric_fn

Specifies used metric function.

catalyst.callbacks.metric.MetricCallback

alias of catalyst.callbacks.metric.BatchMetricCallback

class catalyst.callbacks.metric.MetricAggregationCallback(prefix: str, metrics: Union[str, List[str], Dict[str, float]] = None, mode: str = 'mean', scope: str = 'batch', multiplier: float = 1.0)[source]

Bases: catalyst.core.callback.Callback

A callback to aggregate several metrics in one value.

__init__(prefix: str, metrics: Union[str, List[str], Dict[str, float]] = None, mode: str = 'mean', scope: str = 'batch', multiplier: float = 1.0) → None[source]
Parameters
  • prefix – new key for aggregated metric.

  • metrics (Union[str, List[str], Dict[str, float]]) – If not None, it aggregates only the values from the metric by these keys. for weighted_sum aggregation it must be a Dict[str, float].

  • mode – function for aggregation. Must be either sum, mean or weighted_sum.

  • multiplier – scale factor for the aggregated metric.

on_batch_end(runner: IRunner) → None[source]

Computes the metric and add it to the batch metrics.

Parameters

runner – current runner

on_epoch_end(runner: IRunner)[source]

Computes the metric and add it to the epoch metrics.

Parameters

runner – current runner

on_loader_end(runner: IRunner)[source]

Computes the metric and add it to the loader metrics.

Parameters

runner – current runner

class catalyst.callbacks.metric.MetricManagerCallback[source]

Bases: catalyst.core.callback.Callback

Prepares metrics for logging, transferring values from PyTorch to numpy.

__init__()[source]

Init.

on_batch_end(runner: IRunner) → None[source]

Batch end hook.

Parameters

runner – current runner

on_batch_start(runner: IRunner) → None[source]

Batch start hook.

Parameters

runner – current runner

on_epoch_start(runner: IRunner) → None[source]

Epoch start hook.

Parameters

runner – current runner

on_loader_end(runner: IRunner) → None[source]

Loader end hook.

Parameters

runner – current runner

on_loader_start(runner: IRunner) → None[source]

Loader start hook.

Parameters

runner – current runner

static to_single_value(value: Any) → float[source]

Convert any value to float.

Parameters

value – some value

Returns

result

Optimizer

class catalyst.callbacks.optimizer.IOptimizerCallback(order: int, node: int = <CallbackNode.All: 0>, scope: int = <CallbackScope.Stage: 0>)[source]

Bases: catalyst.core.callback.Callback

Optimizer callback interface, abstraction over optimizer step.

class catalyst.callbacks.optimizer.AMPOptimizerCallback(metric_key: str = None, optimizer_key: str = None, accumulation_steps: int = 1, grad_clip_params: Dict = None, loss_key: str = None)[source]

Bases: catalyst.callbacks.optimizer.IOptimizerCallback

Optimizer callback with native torch amp support.

__init__(metric_key: str = None, optimizer_key: str = None, accumulation_steps: int = 1, grad_clip_params: Dict = None, loss_key: str = None)[source]
Parameters
  • loss_key – key to get loss from runner.batch_metrics

  • optimizer_key – A key to take a optimizer in case there are several of them and they are in a dictionary format.

  • accumulation_steps – number of steps before model.zero_grad()

  • grad_clip_params – params for gradient clipping

  • decouple_weight_decay – If True - decouple weight decay regularization.

grad_step(*, optimizer: torch.optim.optimizer.Optimizer, grad_clip_fn: Callable = None) → None[source]

Makes a gradient step for a given optimizer.

Parameters
  • optimizer – the optimizer

  • grad_clip_fn – function for gradient clipping

on_batch_end(runner: IRunner) → None[source]

On batch end event

Parameters

runner – current runner

on_batch_start(runner: IRunner) → None[source]

On batch start event

Parameters

runner – current runner

on_epoch_end(runner: IRunner) → None[source]

On epoch end event.

Parameters

runner – current runner

on_stage_end(runner: IRunner) → None[source]

On stage end event.

Parameters

runner – current runner

on_stage_start(runner: IRunner) → None[source]

Checks that the current stage has correct optimizer.

Parameters

runner (IRunner) – current runner

class catalyst.callbacks.optimizer.OptimizerCallback(metric_key: str = None, optimizer_key: str = None, accumulation_steps: int = 1, grad_clip_params: Dict = None, decouple_weight_decay: bool = True, loss_key: str = None, use_fast_zero_grad: bool = False, xla_barrier: bool = True)[source]

Bases: catalyst.callbacks.optimizer.IOptimizerCallback

Optimizer callback, abstraction over optimizer step.

__init__(metric_key: str = None, optimizer_key: str = None, accumulation_steps: int = 1, grad_clip_params: Dict = None, decouple_weight_decay: bool = True, loss_key: str = None, use_fast_zero_grad: bool = False, xla_barrier: bool = True)[source]
Parameters
  • loss_key – key to get loss from runner.batch_metrics

  • optimizer_key – A key to take a optimizer in case there are several of them and they are in a dictionary format.

  • accumulation_steps – number of steps before model.zero_grad()

  • grad_clip_params – params for gradient clipping

  • decouple_weight_decay – If True - decouple weight decay regularization.

  • use_fast_zero_grad – boost optiomizer.zero_grad(), default is False.

  • xla_barrier

    barrier option for xla. Here you can find more about usage of barrier flag and examples.

    Default is True.

grad_step(*, optimizer: torch.optim.optimizer.Optimizer, optimizer_wds: List[float] = 0, grad_clip_fn: Callable = None) → None[source]

Makes a gradient step for a given optimizer.

Parameters
  • optimizer – the optimizer

  • optimizer_wds – list of weight decay parameters for each param group

  • grad_clip_fn – function for gradient clipping

on_batch_end(runner: IRunner) → None[source]

On batch end event

Parameters

runner – current runner

on_epoch_end(runner: IRunner) → None[source]

On epoch end event.

Parameters

runner – current runner

on_epoch_start(runner: IRunner) → None[source]

On epoch start event.

Parameters

runner – current runner

on_stage_start(runner: IRunner) → None[source]

Checks that the current stage has correct optimizer.

Parameters

runner (IRunner) – current runner

PeriodicLoaderCallback

class catalyst.callbacks.periodic_loader.PeriodicLoaderCallback(**kwargs)[source]

Bases: catalyst.core.callback.Callback

Callback for runing loaders with specified period. To disable loader use 0 as period (if specified 0 for validation loader then will be raised an error).

For example, if you have train, train_additional, valid and valid_additional loaders and wan’t to use train_additional every 2 epochs, valid - every 3 epochs and valid_additional - every 5 epochs:

from catalyst.dl import (
    SupervisedRunner, PeriodicLoaderRunnerCallback,
)
runner = SupervisedRunner()
runner.train(
    ...
    loaders={
        "train": ...,
        "train_additional": ...,
        "valid": ...,
        "valid_additional":...
    }
    ...
    callbacks=[
        ...
        PeriodicLoaderRunnerCallback(
            train_additional=2,
            valid=3,
            valid_additional=5
        ),
        ...
    ]
    ...
)
__init__(**kwargs)[source]
Parameters

kwargs – loader names and their run periods.

on_epoch_end(runner: IRunner) → None[source]

Check if validation metric should be dropped for current epoch.

Parameters

runner – current runner

on_epoch_start(runner: IRunner) → None[source]

Set loaders for current epoch. If validation is not required then the first loader from loaders used in current epoch will be used as validation loader. Metrics from the latest epoch with true validation loader will be used in the epochs where this loader is missing.

Parameters

runner – current runner

Raises

ValueError – if there are no loaders in epoch

on_stage_start(runner: IRunner) → None[source]

Collect information about loaders.

Parameters

runner – current runner

Raises

ValueError – if there are no loaders in epoch

Pruning

class catalyst.callbacks.pruning.PruningCallback(pruning_fn: Union[Callable, str], keys_to_prune: Optional[List[str]] = None, amount: Union[int, float, None] = 0.5, prune_on_epoch_end: Optional[bool] = False, prune_on_stage_end: Optional[bool] = True, remove_reparametrization_on_stage_end: Optional[bool] = True, reinitialize_after_pruning: Optional[bool] = False, layers_to_prune: Optional[List[str]] = None, dim: Optional[int] = None, l_norm: Optional[int] = None)[source]

Bases: catalyst.core.callback.Callback

Pruning Callback

This callback is designed to prune network parameters during and/or after training.

Parameters
  • pruning_fn – function from torch.nn.utils.prune module or your based on BasePruningMethod. Can be string e.g. “l1_unstructured”. See pytorch docs for more details.

  • keys_to_prune – list of strings. Determines which tensor in modules will be pruned.

  • amount – quantity of parameters to prune. If float, should be between 0.0 and 1.0 and represent the fraction of parameters to prune. If int, it represents the absolute number of parameters to prune.

  • prune_on_epoch_end – bool flag determines call or not to call pruning_fn on epoch end.

  • prune_on_stage_end – bool flag determines call or not to call pruning_fn on stage end.

  • remove_reparametrization_on_stage_end – if True then all reparametrization pre-hooks and tensors with mask will be removed on stage end.

  • reinitialize_after_pruning – if True then will reinitialize model after pruning. (Lottery Ticket Hypothesis)

  • layers_to_prune – list of strings - module names to be pruned. If None provided then will try to prune every module in model.

  • dim – if you are using structured pruning method you need to specify dimension.

  • l_norm – if you are using ln_structured you need to specify l_norm.

__init__(pruning_fn: Union[Callable, str], keys_to_prune: Optional[List[str]] = None, amount: Union[int, float, None] = 0.5, prune_on_epoch_end: Optional[bool] = False, prune_on_stage_end: Optional[bool] = True, remove_reparametrization_on_stage_end: Optional[bool] = True, reinitialize_after_pruning: Optional[bool] = False, layers_to_prune: Optional[List[str]] = None, dim: Optional[int] = None, l_norm: Optional[int] = None) → None[source]

Init method for pruning callback

on_epoch_end(runner: IRunner) → None[source]

On epoch end action.

Active if prune_on_epoch_end is True.

Parameters

runner – runner for your experiment

on_stage_end(runner: IRunner) → None[source]

On stage end action.

Active if prune_on_stage_end or remove_reparametrization is True.

Parameters

runner – runner for your experiment

Quantization

class catalyst.callbacks.quantization.DynamicQuantizationCallback(metric: str = 'loss', minimize: bool = True, min_delta: float = 1e-06, mode: str = 'best', do_once: bool = True, qconfig_spec: Union[Set, Dict, None] = None, dtype: Optional[torch.dtype] = torch.qint8, out_dir: Union[str, pathlib.Path] = None, out_model: Union[str, pathlib.Path] = None, backend: str = None)[source]

Bases: catalyst.core.callback.Callback

Dynamic Quantization Callback

This callback applying dynamic quantization to the model.

Parameters
  • metric – Metric key we should trace model based on

  • minimize – Whether do we minimize metric or not

  • min_delta – Minimum value of change for metric to be considered as improved

  • mode – One of best or last

  • do_once – Whether do we trace once per stage or every epoch

  • qconfig_spec – torch.quantization.quantize_dynamic parameter, you can define layers to be quantize

  • dtype – type of the model parameters, default int8

  • out_dir (Union[str, Path]) – Directory to save model to

  • out_model (Union[str, Path]) – Path to save model to (overrides out_dir argument)

  • backend – defines backend for quantization

__init__(metric: str = 'loss', minimize: bool = True, min_delta: float = 1e-06, mode: str = 'best', do_once: bool = True, qconfig_spec: Union[Set, Dict, None] = None, dtype: Optional[torch.dtype] = torch.qint8, out_dir: Union[str, pathlib.Path] = None, out_model: Union[str, pathlib.Path] = None, backend: str = None)[source]

Init method for callback

on_epoch_end(runner: IRunner)[source]

Performing model quantization on epoch end if condition metric is improved

Parameters

runner – current runner

on_stage_end(runner: IRunner) → None[source]

On stage end action.

Parameters

runner – runner of your experiment

Scheduler

class catalyst.callbacks.scheduler.ISchedulerCallback(order: int, node: int = <CallbackNode.All: 0>, scope: int = <CallbackScope.Stage: 0>)[source]

Bases: catalyst.core.callback.Callback

Scheduler callback interface, abstraction over scheduler step.

class catalyst.callbacks.scheduler.SchedulerCallback(scheduler_key: str = None, mode: str = None, reduced_metric: str = None)[source]

Bases: catalyst.callbacks.scheduler.ISchedulerCallback

Callback for wrapping schedulers.

Notebook API example:

import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst.dl import (
    SupervisedRunner, AccuracyCallback,
    CriterionCallback, SchedulerCallback,
)

num_samples, num_features = 10_000, 10
n_classes = 10
X = torch.rand(num_samples, num_features)
y = torch.randint(0, n_classes, [num_samples])
loader = DataLoader(TensorDataset(X, y), batch_size=32, num_workers=1)
loaders = {"train": loader, "valid": loader}

model = torch.nn.Linear(num_features, n_classes)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [3, 6])

runner = SupervisedRunner()
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders=loaders,
    logdir="./logdir",
    num_epochs=5,
    verbose=False,
    main_metric="accuracy03",
    minimize_metric=False,
    callbacks=[
        AccuracyCallback(
            accuracy_args=[1, 3, 5]
        ),
        SchedulerCallback(reduced_metric="loss")
    ]
)

Config API usage example:

stages:
  ...
  scheduler_params:
    scheduler: MultiStepLR
    milestones: [1]
    gamma: 0.3
  ...
  stage_N:
    ...
    callbacks_params:
      ...
      scheduler:
        callback: SchedulerCallback
        # arguments for SchedulerCallback
        reduced_metric: loss
  ...
__init__(scheduler_key: str = None, mode: str = None, reduced_metric: str = None)[source]
Parameters
  • scheduler_key – scheduler name, if None, default is None.

  • mode – scheduler mode, should be one of "epoch" or "batch", default is None. If None and object is instance of BatchScheduler or OneCycleLRWithWarmup then will be used "batch" otherwise - "epoch".

  • reduced_metric – metric name to forward to scheduler object, if None then will be used main metric specified in experiment.

on_batch_end(runner: IRunner) → None[source]

Batch end hook.

Parameters

runner – current runner

on_epoch_end(runner: IRunner) → None[source]

Epoch end hook.

Parameters

runner – current runner

on_loader_start(runner: IRunner) → None[source]

Loader start hook.

Parameters

runner – current runner

on_stage_start(runner: IRunner) → None[source]

Stage start hook.

Parameters

runner – current runner

step_batch(runner: IRunner) → None[source]

Update learning rate and momentum in runner.

Parameters

runner – current runner

step_epoch(runner: IRunner) → None[source]

Update momentum in runner.

Parameters

runner – current runner

class catalyst.callbacks.scheduler.ILRUpdater(optimizer_key: str = None)[source]

Bases: abc.ABC, catalyst.core.callback.Callback

Basic class that all Lr updaters inherit from.

__init__(optimizer_key: str = None)[source]
Parameters

optimizer_key – which optimizer key to use for learning rate scheduling

abstract calc_lr()[source]

Interface for calculating learning rate.

abstract calc_momentum()[source]

Interface for calculating momentum

on_batch_end(runner: IRunner) → None[source]

Batch end hook.

Parameters

runner – current runner

on_loader_start(runner: IRunner) → None[source]

Loader start hook.

Parameters

runner – current runner

on_stage_start(runner: IRunner) → None[source]

Stage start hook.

Parameters

runner – current runner

update_optimizer(runner: IRunner) → None[source]

Update learning rate and momentum in runner.

Parameters

runner – current runner

class catalyst.callbacks.scheduler.LRFinder(final_lr, scale: str = 'log', num_steps: Optional[int] = None, optimizer_key: str = None)[source]

Bases: catalyst.callbacks.scheduler.ILRUpdater

Helps you find an optimal learning rate for a model, as per suggestion of Cyclical Learning Rates for Training Neural Networks paper. Learning rate is increased in linear or log scale, depending on user input.

See How Do You Find A Good Learning Rate article for details.

__init__(final_lr, scale: str = 'log', num_steps: Optional[int] = None, optimizer_key: str = None)[source]
Parameters
  • final_lr – final learning rate to try with

  • scale – learning rate increasing scale (“log” or “linear”)

  • num_steps – number of batches to try; if None - whole loader would be used.

  • optimizer_key – which optimizer key to use for learning rate scheduling

calc_lr()[source]

Calculates learning rate.

Returns

learning rate.

calc_momentum()[source]

Calculates new momentum.

on_batch_end(runner: IRunner)[source]

Batch end hook. Make scheduler step and stops iterating if needed.

Parameters

runner – current runner

Raises

NotImplementedError – at the end of LRFinder

on_loader_start(runner: IRunner)[source]

Loader start hook. Updates scheduler statistics.

Parameters

runner – current runner

Timer

class catalyst.callbacks.timer.TimerCallback[source]

Bases: catalyst.core.callback.Callback

Logs pipeline execution time.

__init__()[source]

Initialisation for TimerCallback.

on_batch_end(runner: IRunner) → None[source]

Batch end hook.

Parameters

runner – current runner

on_batch_start(runner: IRunner) → None[source]

Batch start hook.

Parameters

runner – current runner

on_loader_end(runner: IRunner) → None[source]

Loader end hook.

Parameters

runner – current runner

on_loader_start(runner: IRunner) → None[source]

Loader start hook.

Parameters

runner – current runner

Tracing

class catalyst.callbacks.tracing.TracingCallback(metric: str = 'loss', minimize: bool = True, min_delta: float = 1e-06, mode: str = 'best', do_once: bool = True, method_name: str = 'forward', requires_grad: bool = False, opt_level: str = None, trace_mode: str = 'eval', out_dir: Union[str, pathlib.Path] = None, out_model: Union[str, pathlib.Path] = None)[source]

Bases: catalyst.core.callback.Callback

Traces model during training if metric provided is improved.

__init__(metric: str = 'loss', minimize: bool = True, min_delta: float = 1e-06, mode: str = 'best', do_once: bool = True, method_name: str = 'forward', requires_grad: bool = False, opt_level: str = None, trace_mode: str = 'eval', out_dir: Union[str, pathlib.Path] = None, out_model: Union[str, pathlib.Path] = None)[source]
Parameters
  • metric – Metric key we should trace model based on

  • minimize – Whether do we minimize metric or not

  • min_delta – Minimum value of change for metric to be considered as improved

  • mode – One of best or last

  • do_once – Whether do we trace once per stage or every epoch

  • method_name – Model’s method name that will be used as entrypoint during tracing

  • requires_grad – Flag to use grads

  • opt_level – AMP FP16 init level

  • trace_mode – Mode for model to trace (train or eval)

  • out_dir (Union[str, Path]) – Directory to save model to

  • out_model (Union[str, Path]) – Path to save model to (overrides out_dir argument)

on_epoch_end(runner: IRunner)[source]

Performing model tracing on epoch end if condition metric is improved

Parameters

runner – current runner

on_stage_end(runner: IRunner)[source]

Performing model tracing on stage end if do_once is True.

Parameters

runner – current runner

catalyst.callbacks.tracing.TracerCallback

alias of catalyst.callbacks.tracing.TracingCallback

Validation

class catalyst.callbacks.validation.ValidationManagerCallback[source]

Bases: catalyst.core.callback.Callback

A callback to aggregate runner.valid_metrics from runner.epoch_metrics.

__init__()[source]

Initialisation for ValidationManagerCallback.

on_epoch_end(runner: IRunner) → None[source]

Epoch end hook.

Parameters

runner – current runner

on_epoch_start(runner: IRunner) → None[source]

Epoch start hook.

Parameters

runner – current runner

Metrics

Accuracy

class catalyst.callbacks.metrics.accuracy.AccuracyCallback(input_key: str = 'targets', output_key: str = 'logits', prefix: str = 'accuracy', multiplier: float = 1.0, topk_args: List[int] = None, num_classes: int = None, accuracy_args: List[int] = None, **kwargs)[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

Accuracy metric callback.

Computes multi-class accuracy@topk for the specified values of topk.

Note

For multi-label accuracy please use catalyst.callbacks.metrics.MultiLabelAccuracyCallback

__init__(input_key: str = 'targets', output_key: str = 'logits', prefix: str = 'accuracy', multiplier: float = 1.0, topk_args: List[int] = None, num_classes: int = None, accuracy_args: List[int] = None, **kwargs)[source]
Parameters
  • input_key – input key to use for accuracy calculation; specifies our y_true

  • output_key – output key to use for accuracy calculation; specifies our y_pred

  • prefix – key for the metric’s name

  • topk_args – specifies which accuracy@K to log: [1] - accuracy [1, 3] - accuracy at 1 and 3 [1, 3, 5] - accuracy at 1, 3 and 5

  • num_classes – number of classes to calculate topk_args if accuracy_args is None

  • activation – An torch.nn activation applied to the outputs. Must be one of "none", "Sigmoid", or "Softmax"

class catalyst.callbacks.metrics.accuracy.MultiLabelAccuracyCallback(input_key: str = 'targets', output_key: str = 'logits', prefix: str = 'multi_label_accuracy', threshold: float = None, activation: str = 'Sigmoid')[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

Accuracy metric callback. Computes multi-class accuracy@topk for the specified values of topk.

Note

For multi-label accuracy please use catalyst.callbacks.metrics.MultiLabelAccuracyCallback

__init__(input_key: str = 'targets', output_key: str = 'logits', prefix: str = 'multi_label_accuracy', threshold: float = None, activation: str = 'Sigmoid')[source]
Parameters
  • input_key – input key to use for accuracy calculation; specifies our y_true

  • output_key – output key to use for accuracy calculation; specifies our y_pred

  • prefix – key for the metric’s name

  • threshold – threshold for for model output

  • activation – An torch.nn activation applied to the outputs. Must be one of "none", "Sigmoid", or "Softmax"

AUC

class catalyst.callbacks.metrics.auc.AUCCallback(input_key: str = 'targets', output_key: str = 'logits', prefix: str = 'auc', multiplier: float = 1.0, class_args: List[str] = None, **kwargs)[source]

Bases: catalyst.callbacks.metric.LoaderMetricCallback

Calculates the AUC per class for each loader.

Note

Currently, supports binary and multi-label cases.

__init__(input_key: str = 'targets', output_key: str = 'logits', prefix: str = 'auc', multiplier: float = 1.0, class_args: List[str] = None, **kwargs)[source]
Parameters
  • input_key – input key to use for auc calculation specifies our y_true.

  • output_key – output key to use for auc calculation; specifies our y_pred.

  • prefix – metric’s name.

  • multiplier – scale factor for the metric.

  • class_args – class names to display in the logs. If None, defaults to indices for each class, starting from 0

CMC score

class catalyst.callbacks.metrics.cmc_score.CMCScoreCallback(embeddings_key: str = 'logits', labels_key: str = 'targets', is_query_key: str = 'is_query', prefix: str = 'cmc', topk_args: List[int] = None, num_classes: int = None)[source]

Bases: catalyst.core.callback.Callback

Cumulative Matching Characteristics callback.

Note

You should use it with ControlFlowCallback and add all query/gallery sets to loaders. Loaders should contain “is_query” and “label” key.

An usage example can be found in Readme.md under “CV - MNIST with Metric Learning”.

__init__(embeddings_key: str = 'logits', labels_key: str = 'targets', is_query_key: str = 'is_query', prefix: str = 'cmc', topk_args: List[int] = None, num_classes: int = None)[source]

This callback was designed to count cumulative matching characteristics. If current object is from query your dataset should output True in is_query_key and false if current object is from gallery. You can see QueryGalleryDataset in catalyst.contrib.datasets.metric_learning for more information. On batch end callback accumulate all embeddings

Parameters
  • embeddings_key – embeddings key in output dict

  • labels_key – labels key in output dict

  • is_query_key – bool key True if current object is from query

  • prefix – key for the metric’s name

  • topk_args – specifies which cmc@K to log. [1] - cmc@1 [1, 3] - cmc@1 and cmc@3 [1, 3, 5] - cmc@1, cmc@3 and cmc@5

  • num_classes – number of classes to calculate accuracy_args if topk_args is None

on_batch_end(runner: IRunner)[source]

On batch end action

on_loader_end(runner: IRunner)[source]

On loader end action

on_loader_start(runner: IRunner)[source]

On loader start action

Dice

class catalyst.callbacks.metrics.dice.DiceCallback(input_key: str = 'targets', output_key: str = 'logits', prefix: str = 'dice', eps: float = 1e-07, threshold: float = None, activation: str = 'Sigmoid')[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

Dice metric callback.

Parameters
  • input_key – input key to use for iou calculation specifies our y_true

  • output_key – output key to use for iou calculation; specifies our y_pred

  • prefix – key to store in logs

  • eps – epsilon to avoid zero division

  • threshold – threshold for outputs binarization

  • activation – An torch.nn activation applied to the outputs. Must be one of 'none', 'Sigmoid', 'Softmax2d'

__init__(input_key: str = 'targets', output_key: str = 'logits', prefix: str = 'dice', eps: float = 1e-07, threshold: float = None, activation: str = 'Sigmoid')[source]
Parameters
  • input_key – input key to use for iou calculation specifies our y_true

  • output_key – output key to use for iou calculation; specifies our y_pred

  • prefix – key to store in logs

  • eps – epsilon to avoid zero division

  • threshold – threshold for outputs binarization

  • activation – An torch.nn activation applied to the outputs. Must be one of 'none', 'Sigmoid', 'Softmax2d'

class catalyst.callbacks.metrics.dice.MultiClassDiceMetricCallback(input_key: str = 'targets', output_key: str = 'logits', prefix: str = 'dice', class_names=None)[source]

Bases: catalyst.core.callback.Callback

Global Multi-Class Dice Metric Callback: calculates the exact dice score across multiple batches. This callback is good for getting the dice score with small batch sizes where the batchwise dice is noisier.

__init__(input_key: str = 'targets', output_key: str = 'logits', prefix: str = 'dice', class_names=None)[source]
Parameters
  • input_key – input key to use for dice calculation; specifies our y_true

  • output_key – output key to use for dice calculation; specifies our y_pred

  • prefix – prefix for printing the metric

  • class_names – if dictionary, should be: {class_id: class_name, …} where class_id is an integer This allows you to ignore class indices. if list, make sure it corresponds to the number of classes

on_batch_end(runner: IRunner)[source]

Records the confusion matrix at the end of each batch.

Parameters

runner – current runner

on_loader_end(runner: IRunner)[source]

@TODO: Docs. Contribution is welcome.

Parameters

runner – current runner

catalyst.callbacks.metrics.dice.MulticlassDiceMetricCallback

alias of catalyst.callbacks.metrics.dice.MultiClassDiceMetricCallback

F1 score

class catalyst.callbacks.metrics.f1_score.F1ScoreCallback(input_key: str = 'targets', output_key: str = 'logits', prefix: str = 'f1_score', beta: float = 1.0, eps: float = 1e-07, threshold: float = None, activation: str = 'Sigmoid')[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

F1 score metric callback.

__init__(input_key: str = 'targets', output_key: str = 'logits', prefix: str = 'f1_score', beta: float = 1.0, eps: float = 1e-07, threshold: float = None, activation: str = 'Sigmoid')[source]
Parameters
  • input_key – input key to use for iou calculation specifies our y_true

  • output_key – output key to use for iou calculation; specifies our y_pred

  • prefix – key to store in logs

  • beta – beta param for f_score

  • eps – epsilon to avoid zero division

  • threshold – threshold for outputs binarization

  • activation – An torch.nn activation applied to the outputs. Must be one of 'none', 'Sigmoid', or 'Softmax2d'

IOU

class catalyst.callbacks.metrics.iou.IouCallback(input_key: str = 'targets', output_key: str = 'logits', prefix: str = 'iou', eps: float = 1e-07, threshold: float = None, activation: str = 'Sigmoid')[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

IoU (Jaccard) metric callback.

Parameters
  • input_key – input key to use for iou calculation specifies our y_true

  • output_key – output key to use for iou calculation; specifies our y_pred

  • prefix – key to store in logs

  • eps – epsilon to avoid zero division

  • threshold – threshold for outputs binarization

  • activation – An torch.nn activation applied to the outputs. Must be one of 'none', 'Sigmoid', 'Softmax2d'

__init__(input_key: str = 'targets', output_key: str = 'logits', prefix: str = 'iou', eps: float = 1e-07, threshold: float = None, activation: str = 'Sigmoid')[source]
Parameters
  • input_key – input key to use for iou calculation specifies our y_true

  • output_key – output key to use for iou calculation; specifies our y_pred

  • prefix – key to store in logs

  • eps – epsilon to avoid zero division

  • threshold – threshold for outputs binarization

  • activation – An torch.nn activation applied to the outputs. Must be one of 'none', 'Sigmoid', 'Softmax2d'

catalyst.callbacks.metrics.iou.JaccardCallback

alias of catalyst.callbacks.metrics.iou.IouCallback

class catalyst.callbacks.metrics.iou.ClasswiseIouCallback(input_key: str = 'targets', output_key: str = 'logits', prefix: str = 'iou', classes: List[str] = None, num_classes: int = None, eps: float = 1e-07, threshold: float = None, activation: str = 'Sigmoid')[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

Classwise IoU (Jaccard) metric callback.

__init__(input_key: str = 'targets', output_key: str = 'logits', prefix: str = 'iou', classes: List[str] = None, num_classes: int = None, eps: float = 1e-07, threshold: float = None, activation: str = 'Sigmoid')[source]
Parameters
  • input_key – input key to use for iou calculation specifies our y_true

  • output_key – output key to use for iou calculation; specifies our y_pred

  • prefix – key to store in logs (will be prefix_class_name)

  • classes – list of class names You should specify either ‘classes’ or ‘num_classes’

  • num_classes – number of classes You should specify either ‘classes’ or ‘num_classes’

  • eps – epsilon to avoid zero division

  • threshold – threshold for outputs binarization

  • activation – An torch.nn activation applied to the outputs. Must be one of 'none', 'Sigmoid', 'Softmax2d'

catalyst.callbacks.metrics.iou.ClasswiseJaccardCallback

alias of catalyst.callbacks.metrics.iou.ClasswiseIouCallback

MRR

class catalyst.callbacks.metrics.mrr.MRRCallback(input_key: str = 'targets', output_key: str = 'logits', prefix: str = 'mrr')[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

Calculates the AUC per class for each loader.

Note

Currently, supports binary and multi-label cases.

__init__(input_key: str = 'targets', output_key: str = 'logits', prefix: str = 'mrr')[source]
Parameters
  • input_key (str) – input key to use for mrr calculation specifies our y_true

  • output_key (str) – output key to use for mrr calculation; specifies our y_pred

  • prefix (str) – name to display for mrr when printing

Global precision, recall and F1-score

class catalyst.callbacks.metrics.ppv_tpr_f1.PrecisionRecallF1ScoreCallback(input_key: str = 'targets', output_key: str = 'logits', class_names: List[str] = None, num_classes: int = 2, threshold: float = 0.5, activation: str = 'Sigmoid')[source]

Bases: catalyst.callbacks.meter.MeterMetricsCallback

Calculates the global precision (positive predictive value or ppv), recall (true positive rate or tpr), and F1-score per class for each loader.

Note

Currently, supports binary and multi-label cases.

__init__(input_key: str = 'targets', output_key: str = 'logits', class_names: List[str] = None, num_classes: int = 2, threshold: float = 0.5, activation: str = 'Sigmoid')[source]
Parameters
  • input_key – input key to use for metric calculation specifies our y_true

  • output_key – output key to use for metric calculation; specifies our y_pred

  • class_names – class names to display in the logs. If None, defaults to indices for each class, starting from 0.

  • num_classes – Number of classes; must be > 1

  • threshold – threshold for outputs binarization

  • activation – An torch.nn activation applied to the outputs. Must be one of 'none', 'Sigmoid', 'Softmax2d'

Precision

class catalyst.callbacks.metrics.precision.AveragePrecisionCallback(input_key: str = 'targets', output_key: str = 'logits', prefix: str = 'average_precision', multiplier: float = 1.0, class_args: List[str] = None, **kwargs)[source]

Bases: catalyst.callbacks.metric.LoaderMetricCallback

AveragePrecision metric callback.

__init__(input_key: str = 'targets', output_key: str = 'logits', prefix: str = 'average_precision', multiplier: float = 1.0, class_args: List[str] = None, **kwargs)[source]
Parameters
  • input_key – input key to use for calculation mean average precision; specifies our y_true.

  • output_key – output key to use for calculation mean average precision; specifies our y_pred.

  • prefix – metric’s name.

  • multiplier – scale factor for the metric.

  • class_args – class names to display in the logs. If None, defaults to indices for each class, starting from 0

Contrib

AlchemyLogger

class catalyst.contrib.callbacks.alchemy_logger.AlchemyLogger(metric_names: List[str] = None, log_on_batch_end: bool = True, log_on_epoch_end: bool = True, **logging_params)[source]

Bases: catalyst.core.callback.Callback

Logger callback, translates runner.*_metrics to Alchemy. Read about Alchemy here https://alchemy.host

Example

from catalyst.dl import SupervisedRunner, AlchemyLogger

runner = SupervisedRunner()

runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    loaders=loaders,
    logdir=logdir,
    num_epochs=num_epochs,
    verbose=True,
    callbacks={
        "logger": AlchemyLogger(
            token="...", # your Alchemy token
            project="your_project_name",
            experiment="your_experiment_name",
            group="your_experiment_group_name",
        )
    }
)

Powered by Catalyst.Ecosystem.

__init__(metric_names: List[str] = None, log_on_batch_end: bool = True, log_on_epoch_end: bool = True, **logging_params)[source]
Parameters
  • metric_names – list of metric names to log, if none - logs everything

  • log_on_batch_end – logs per-batch metrics if set True

  • log_on_epoch_end – logs per-epoch metrics if set True

on_batch_end(runner: IRunner)[source]

Translate batch metrics to Alchemy.

on_epoch_end(runner: IRunner)[source]

Translate epoch metrics to Alchemy.

on_loader_end(runner: IRunner)[source]

Translate loader metrics to Alchemy.

ConfusionMatrixCallback

class catalyst.contrib.callbacks.confusion_matrix_logger.ConfusionMatrixCallback(input_key: str = 'targets', output_key: str = 'logits', prefix: str = 'confusion_matrix', mode: str = 'tnt', class_names: List[str] = None, num_classes: int = None, plot_params: Dict = None, tensorboard_callback_name: str = '_tensorboard', version: str = None)[source]

Bases: catalyst.core.callback.Callback

Callback to plot your confusion matrix to the Tensorboard.

Parameters
  • input_key – key to use from runner.input, specifies our y_true

  • output_key – key to use from runner.output, specifies our y_pred

  • prefix – tensorboard plot name

  • mode – Strategy to compute confusion matrix. Must be one of [tnt, sklearn]

  • class_names – list with class names

  • num_classes – number of classes

  • plot_params – extra params for plt.figure rendering

  • tensorboard_callback_name – name of the tensorboard logger callback

__init__(input_key: str = 'targets', output_key: str = 'logits', prefix: str = 'confusion_matrix', mode: str = 'tnt', class_names: List[str] = None, num_classes: int = None, plot_params: Dict = None, tensorboard_callback_name: str = '_tensorboard', version: str = None)[source]

Callback initialisation.

on_batch_end(runner: IRunner)[source]

Batch end hook.

Parameters

runner – current runner

on_loader_end(runner: IRunner)[source]

Loader end hook.

Parameters

runner – current runner

on_loader_start(runner: IRunner)[source]

Loader start hook.

Parameters

runner – current runner

CutmixCallback

class catalyst.contrib.callbacks.cutmix_callback.CutmixCallback(fields: List[str] = ('features', ), alpha=1.0, on_train_only=True, **kwargs)[source]

Bases: catalyst.callbacks.criterion.CriterionCallback

Callback to do Cutmix augmentation that has been proposed in CutMix: Regularization Strategy to Train Strong Classifiers with Localizable Features.

Warning

catalyst.contrib.callbacks.CutmixCallback is inherited from catalyst.callbacks.CriterionCallback and does its work. You may not use them together.

__init__(fields: List[str] = ('features', ), alpha=1.0, on_train_only=True, **kwargs)[source]
Parameters
  • fields – list of features which must be affected.

  • alpha – beta distribution parameter.

  • on_train_only – Apply to train only. So, if on_train_only is True, use a standard output/metric for validation.

on_batch_start(runner: IRunner) → None[source]

Mixes data according to Cutmix algorithm.

Parameters

runner – current runner

on_loader_start(runner: IRunner) → None[source]

Checks if it is needed for the loader.

Parameters

runner – current runner

GradNormLogger

class catalyst.contrib.callbacks.gradnorm_logger.GradNormLogger(norm_type: int = 2, accumulation_steps: int = 1)[source]

Bases: catalyst.core.callback.Callback

Callback for logging model gradients.

__init__(norm_type: int = 2, accumulation_steps: int = 1)[source]
Parameters
  • norm_type – norm type used to calculate norm of gradients. If OptimizerCallback provides non-default argument grad_clip_params with custom norm type, then corresponding norm type should be used in this class.

  • accumulation_steps – number of steps before model.zero_grad(). Should be the same as in OptimizerCallback.

static grad_norm(*, model: torch.nn.modules.module.Module, prefix: str, norm_type: int) → Dict[source]

Computes gradient norms for a given model.

Parameters
  • model – model which gradients to be saved.

  • prefix – prefix for keys in resulting dictionary.

  • norm_type – norm type of gradient norm.

Returns

dictionary in which gradient norms are stored.

Return type

Dict

on_batch_end(runner: IRunner) → None[source]

On batch end event

Parameters

runner – current runner

InferCallback

class catalyst.contrib.callbacks.inference_callback.InferCallback(out_dir=None, out_prefix=None)[source]

Bases: catalyst.core.callback.Callback

@TODO: Docs. Contribution is welcome.

__init__(out_dir=None, out_prefix=None)[source]
Parameters

@TODO – Docs. Contribution is welcome

on_batch_end(runner: IRunner)[source]

Batch end hook.

Parameters

runner – current runner

on_loader_end(runner: IRunner)[source]

Loader end hook.

Parameters

runner – current runner

on_loader_start(runner: IRunner)[source]

Loader start hook.

Parameters

runner – current runner

on_stage_start(runner: IRunner)[source]

Stage start hook.

Parameters

runner – current runner

KNNMetricCallback

class catalyst.contrib.callbacks.knn_metric.KNNMetricCallback(input_key: str = 'logits', output_key: str = 'targets', prefix: str = 'knn', num_classes: int = 2, class_names: dict = None, cv_loader_names: Dict[str, List[str]] = None, metric_fn: str = 'f1-score', knn_metric: str = 'euclidean', num_neighbors: int = 5)[source]

Bases: catalyst.core.callback.Callback

A callback that returns single metric on runner.on_loader_end.

__init__(input_key: str = 'logits', output_key: str = 'targets', prefix: str = 'knn', num_classes: int = 2, class_names: dict = None, cv_loader_names: Dict[str, List[str]] = None, metric_fn: str = 'f1-score', knn_metric: str = 'euclidean', num_neighbors: int = 5)[source]

Returns metric value calculated using kNN algorithm.

Parameters
  • input_key – input key to get features.

  • output_key – output key to get targets.

  • prefix – key to store in logs.

  • num_classes – Number of classes; must be > 1.

  • class_names – of indexes and class names.

  • cv_loader_names – dict with keys and values of loader_names for which cross validation should be calculated. For example {“train” : [“valid”, “test”]}.

  • metric_fn – one of accuracy, precision, recall, f1-score. default is f1-score.

  • knn_metric – look sklearn.neighbors.NearestNeighbors parameter.

  • num_neighbors – number of neighbors, default is 5.

on_batch_end(runner: IRunner) → None[source]

Batch end hook.

Parameters

runner – current runner

on_epoch_end(runner: IRunner) → None[source]

Epoch end hook.

Parameters

runner – current runner

on_loader_end(runner: IRunner) → None[source]

Loader end hook.

Parameters

runner – current runner

Raises

Warning – if targets has more classes than num classes

BatchTransformCallback

class catalyst.contrib.callbacks.kornia_transform.BatchTransformCallback(transform: Sequence[Union[dict, torch.nn.modules.module.Module]], input_key: Union[str, int] = 'image', additional_input_key: Optional[str] = None, output_key: Union[str, int, None] = None, additional_output_key: Optional[str] = None)[source]

Bases: catalyst.core.callback.Callback

Callback to perform data augmentations on GPU using kornia library.

Look at Kornia: an Open Source Differentiable Computer Vision Library for PyTorch for details.

Usage example for notebook API:

import os

from kornia import augmentation
import torch
from torch.nn import functional as F
from torch.utils.data import DataLoader

from catalyst import dl
from catalyst.contrib.data.transforms import ToTensor
from catalyst.contrib.datasets import MNIST
from catalyst.contrib.callbacks.kornia_transform import (
    BatchTransformCallback
)
from catalyst import metrics


class CustomRunner(dl.Runner):
    def predict_batch(self, batch):
        # model inference step
        return self.model(
            batch[0].to(self.device).view(batch[0].size(0), -1)
        )

    def _handle_batch(self, batch):
        # model train/valid step
        x, y = batch
        y_hat = self.model(x.view(x.size(0), -1))

        loss = F.cross_entropy(y_hat, y)
        accuracy01, *_ = metrics.accuracy(y_hat, y)
        self.batch_metrics.update(
            {"loss": loss, "accuracy01": accuracy01}
        )

        if self.is_train_loader:
            loss.backward()
            self.optimizer.step()
            self.optimizer.zero_grad()

model = torch.nn.Linear(28 * 28, 10)
optimizer = torch.optim.Adam(model.parameters(), lr=0.02)

loaders = {
    "train": DataLoader(
        MNIST(os.getcwd(), train=True, transform=ToTensor()),
        batch_size=32,
    ),
    "valid": DataLoader(
        MNIST(os.getcwd(), train=False, transform=ToTensor()),
        batch_size=32,
    ),
}
transrorms = [
    augmentation.RandomAffine(degrees=(-15, 20), scale=(0.75, 1.25)),
]

runner = CustomRunner()

# model training
runner.train(
    model=model,
    optimizer=optimizer,
    loaders=loaders,
    logdir="./logs",
    num_epochs=5,
    verbose=True,
    callbacks=[BatchTransformCallback(transrorms, input_key=0)],
)

To apply augmentations only during specific loader e.g. only during training catalyst.core.callbacks.control_flow.ControlFlowCallback callback can be used. For config API it can look like this:

callbacks_params:
  ...
  train_transforms:
    _wrapper:
      callback: ControlFlowCallback
      loaders: train
    callback: BatchTransformCallback
    transforms:
      - transform: kornia.RandomAffine
        degrees: [-15, 20]
        scale: [0.75, 1.25]
        return_transform: true
      - transform: kornia.ColorJitter
        brightness: 0.1
        contrast: 0.1
        saturation: 0.1
        return_transform: false
    input_key: image
    additional_input_key: mask
  ...
__init__(transform: Sequence[Union[dict, torch.nn.modules.module.Module]], input_key: Union[str, int] = 'image', additional_input_key: Optional[str] = None, output_key: Union[str, int, None] = None, additional_output_key: Optional[str] = None) → None[source]

Constructor method for the BatchTransformCallback callback.

Parameters
  • transform (Sequence[Union[dict, nn.Module]]) –

    define augmentations to apply on a batch

    If a sequence of transforms passed, then each element should be either kornia.augmentation.AugmentationBase2D, kornia.augmentation.AugmentationBase3D, or nn.Module compatible with kornia interface.

    If a sequence of params (dict) passed, then each element of the sequence must contain 'transform' key with an augmentation name as a value. Please note that in this case to use custom augmentation you should add it to the TRANSFORMS registry first.

  • input_key (Union[str, int]) – key in batch dict mapping to transform, e.g. ‘image’

  • additional_input_key (Optional[Union[str, int]]) – key of an additional target in batch dict mapping to transform, e.g. ‘mask’

  • output_key – key to use to store the result of the transform, defaults to input_key if not provided

  • additional_output_key – key to use to store the result of additional target transformation, defaults to additional_input_key if not provided

on_batch_start(runner: IRunner) → None[source]

Apply transforms.

Parameters

runner – сurrent runner

InferMaskCallback

class catalyst.contrib.callbacks.mask_inference.InferMaskCallback(out_dir=None, out_prefix=None, input_key=None, output_key=None, name_key=None, mean=None, std=None, threshold: float = 0.5, mask_strength: float = 0.5, mask_type: str = 'soft')[source]

Bases: catalyst.core.callback.Callback

@TODO: Docs. Contribution is welcome.

__init__(out_dir=None, out_prefix=None, input_key=None, output_key=None, name_key=None, mean=None, std=None, threshold: float = 0.5, mask_strength: float = 0.5, mask_type: str = 'soft')[source]
Parameters

@TODO – Docs. Contribution is welcome

on_batch_end(runner: IRunner)[source]

Batch end hook.

Parameters

runner – current runner

on_loader_start(runner: IRunner)[source]

Loader start hook.

Parameters

runner – current runner

on_stage_start(runner: IRunner)[source]

Stage start hook.

Parameters

runner – current runner

MixupCallback

class catalyst.contrib.callbacks.mixup_callback.MixupCallback(input_key: str = 'targets', output_key: str = 'logits', fields: List[str] = ('features', ), alpha=1.0, on_train_only=True, **kwargs)[source]

Bases: catalyst.callbacks.criterion.CriterionCallback

Callback to do mixup augmentation.

More details about mixin can be found in the paper mixup: Beyond Empirical Risk Minimization.

Warning

catalyst.contrib.callbacks.MixupCallback is inherited from catalyst.callbacks.CriterionCallback and does its work. You may not use them together.

__init__(input_key: str = 'targets', output_key: str = 'logits', fields: List[str] = ('features', ), alpha=1.0, on_train_only=True, **kwargs)[source]
Parameters
  • fields – list of features which must be affected.

  • alpha – beta distribution a=b parameters. Must be >=0. The more alpha closer to zero the less effect of the mixup.

  • on_train_only – Apply to train only. As the mixup use the proxy inputs, the targets are also proxy. We are not interested in them, are we? So, if on_train_only is True, use a standard output/metric for validation.

on_batch_start(runner: catalyst.core.runner.IRunner)[source]

Batch start hook.

Parameters

runner – current runner

on_loader_start(runner: catalyst.core.runner.IRunner)[source]

Loader start hook.

Parameters

runner – current runner

NeptuneLogger

class catalyst.contrib.callbacks.neptune_logger.NeptuneLogger(metric_names: List[str] = None, log_on_batch_end: bool = True, log_on_epoch_end: bool = True, offline_mode: bool = False, **logging_params)[source]

Bases: catalyst.core.callback.Callback

Logger callback, translates runner.*_metrics to Neptune. Read about Neptune here https://neptune.ai

Example

from catalyst.runners import SupervisedRunner
from catalyst.contrib.callbacks import NeptuneLogger

runner = SupervisedRunner()

runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    loaders=loaders,
    logdir=logdir,
    num_epochs=num_epochs,
    verbose=True,
    callbacks=[
        NeptuneLogger(
            api_token="...", # your Neptune token
            project_name="your_project_name",
            offline_mode=False, # turn off neptune for debug
            name="your_experiment_name",
            params={...},  # your hyperparameters
            tags=["resnet", "no-augmentations"], # tags
            upload_source_files=["*.py"], # files to save
             )
        ]
     )

You can see an example experiment here: https://ui.neptune.ai/o/shared/org/catalyst-integration/e/CAT-13/charts

You can log your experiments without registering. Just use “ANONYMOUS” token:

runner.train(
    ...
    callbacks=[
        "NepuneLogger(
            api_token="ANONYMOUS",
            project_name="shared/catalyst-integration",
            ...
             )
        ]
    )
__init__(metric_names: List[str] = None, log_on_batch_end: bool = True, log_on_epoch_end: bool = True, offline_mode: bool = False, **logging_params)[source]
Parameters
  • metric_names – list of metric names to log, if none - logs everything

  • log_on_batch_end – logs per-batch metrics if set True

  • log_on_epoch_end – logs per-epoch metrics if set True

  • offline_mode – whether logging to Neptune server should be turned off. It is useful for debugging

on_batch_end(runner: IRunner)[source]

Log batch metrics to Neptune.

on_loader_end(runner: IRunner)[source]

Translate epoch metrics to Neptune.

OptunaCallback

catalyst.contrib.callbacks.optuna_callback.OptunaCallback

alias of catalyst.contrib.callbacks.optuna_callback.OptunaPruningCallback

class catalyst.contrib.callbacks.optuna_callback.OptunaPruningCallback(trial: optuna.trial._trial.Trial = None)[source]

Bases: catalyst.core.callback.Callback

Optuna callback for pruning unpromising runs

import optuna

from catalyst.dl import SupervisedRunner, OptunaPruningCallback

# some python code ...

def objective(trial: optuna.Trial):
    # standard optuna code for model and/or optimizer suggestion ...
    runner = SupervisedRunner()
    runner.train(
        model=model,
        loaders=loaders,
        criterion=criterion,
        optimizer=optimizer,
        callbacks=[
            OptunaPruningCallback(trial)
            # some other callbacks ...
        ],
        num_epochs=num_epochs,
    )
    return runner.best_valid_metrics[runner.main_metric]

study = optuna.create_study()
study.optimize(objective, n_trials=100, timeout=600)

Config API is supported through catalyst-dl tune command.

__init__(trial: optuna.trial._trial.Trial = None)[source]

This callback can be used for early stopping (pruning) unpromising runs.

Parameters

trial – Optuna.Trial for experiment.

on_epoch_end(runner: IRunner)[source]

On epoch end hook.

Considering prune or not to prune current run at current epoch.

Parameters

runner – runner for current experiment

Raises

TrialPruned – if current run should be pruned

on_stage_start(runner: IRunner)[source]

On stage start hook. Takes optuna_trial from Experiment for future usage if needed.

Parameters

runner – runner for current experiment

Raises

NotImplementedError – if no Optuna trial was found on stage start.

PerplexityMetricCallback

class catalyst.contrib.callbacks.perplexity_metric.PerplexityMetricCallback(input_key: str = 'targets', output_key: str = 'logits', prefix: str = 'perplexity', ignore_index: int = None)[source]

Bases: catalyst.callbacks.metric.BatchMetricCallback

Perplexity is a very popular metric in NLP especially in Language Modeling task. It is 2^cross_entropy.

__init__(input_key: str = 'targets', output_key: str = 'logits', prefix: str = 'perplexity', ignore_index: int = None)[source]
Parameters
  • input_key – input key to use for perplexity calculation, target tokens

  • output_key – output key to use for perplexity calculation, logits of the predicted tokens

  • ignore_index – index to ignore, usually pad_index

metric_fn(outputs, targets)[source]

Calculate perplexity

Parameters
  • outputs – model output

  • targets – model targets

Returns

computed perplexity metric

TelegramLogger

class catalyst.contrib.callbacks.telegram_logger.TelegramLogger(token: str = None, chat_id: str = None, metric_names: List[str] = None, log_on_stage_start: bool = True, log_on_loader_start: bool = True, log_on_loader_end: bool = True, log_on_stage_end: bool = True, log_on_exception: bool = True)[source]

Bases: catalyst.core.callback.Callback

Logger callback, translates runner.metric_manager to telegram channel.

__init__(token: str = None, chat_id: str = None, metric_names: List[str] = None, log_on_stage_start: bool = True, log_on_loader_start: bool = True, log_on_loader_end: bool = True, log_on_stage_end: bool = True, log_on_exception: bool = True)[source]
Parameters
  • token – telegram bot’s token, see https://core.telegram.org/bots

  • chat_id – Chat unique identifier

  • metric_names – List of metric names to log. if none - logs everything.

  • log_on_stage_start – send notification on stage start

  • log_on_loader_start – send notification on loader start

  • log_on_loader_end – send notification on loader end

  • log_on_stage_end – send notification on stage end

  • log_on_exception – send notification on exception

on_exception(runner: IRunner)[source]

Notify about raised Exception.

on_loader_end(runner: IRunner)[source]

Translate runner.metric_manager to telegram channel.

on_loader_start(runner: IRunner)[source]

Notify about starting running the new loader.

on_stage_end(runner: IRunner)[source]

Notify about finishing a stage.

on_stage_start(runner: IRunner)[source]

Notify about starting a new stage.

WandbLogger

class catalyst.contrib.callbacks.wandb_logger.WandbLogger(metric_names: List[str] = None, log_on_batch_end: bool = False, log_on_epoch_end: bool = True, log: str = None, **logging_params)[source]

Bases: catalyst.core.callback.Callback

Logger callback, translates runner.*_metrics to Weights & Biases. Read about Weights & Biases here https://docs.wandb.com/

Example

from catalyst import dl
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

class Projector(nn.Module):
    def __init__(self, input_size):
        super().__init__()
        self.linear = nn.Linear(input_size, 1)

    def forward(self, X):
        return self.linear(X).squeeze(-1)

X = torch.rand(16, 10)
y = torch.rand(X.shape[0])
model = Projector(X.shape[1])
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=8)
runner = dl.SupervisedRunner()

runner.train(
    model=model,
    loaders={
        "train": loader,
        "valid": loader
    },
    criterion=nn.MSELoss(),
    optimizer=optim.Adam(model.parameters()),
    logdir="log_example",
    callbacks=[
        dl.callbacks.WandbLogger(
            project="wandb_logger_example"
        )
    ],
    num_epochs=10
)
__init__(metric_names: List[str] = None, log_on_batch_end: bool = False, log_on_epoch_end: bool = True, log: str = None, **logging_params)[source]
Parameters
  • metric_names – list of metric names to log, if None - logs everything

  • log_on_batch_end – logs per-batch metrics if set True

  • log_on_epoch_end – logs per-epoch metrics if set True

  • log – wandb.watch parameter. Can be “all”, “gradients” or “parameters”

  • **logging_params – any parameters of function wandb.init except reinit which is automatically set to True and dir which is set to <logdir>

on_batch_end(runner: IRunner)[source]

Translate batch metrics to Weights & Biases.

on_epoch_end(runner: IRunner)[source]

Translate epoch metrics to Weights & Biases.

on_loader_end(runner: IRunner)[source]

Translate loader metrics to Weights & Biases.

on_stage_end(runner: IRunner)[source]

Finish logging to Weights & Biases.

on_stage_start(runner: IRunner)[source]

Initialize Weights & Biases.