Core¶
Runner¶
-
class
catalyst.core.runner.
IRunner
(model: Union[torch.nn.modules.module.Module, Dict[str, torch.nn.modules.module.Module]] = None, engine: catalyst.core.engine.IEngine = None)[source]¶ Bases:
catalyst.core.callback.ICallback
,catalyst.core.logger.ILogger
,abc.ABC
An abstraction that contains all the logic of how to run the experiment, stages, epochs, loaders and batches.
IRunner supports the logic for deep learning pipeline configuration with pure python code. Please check the examples for intuition.
- Parameters
model – Torch model object
engine – IEngine instance
Abstraction, please check out implementations for more details:
Note
To learn more about Catalyst Core concepts, please check out
Note
Please follow the minimal examples sections for use cases.
Examples:
import os from torch import nn, optim from torch.utils.data import DataLoader from catalyst import dl, utils from catalyst.contrib.datasets import MNIST from catalyst.data import ToTensor class CustomRunner(dl.IRunner): def __init__(self, logdir, device): super().__init__() self._logdir = logdir self._device = device def get_engine(self): return dl.DeviceEngine(self._device) def get_loggers(self): return { "console": dl.ConsoleLogger(), "csv": dl.CSVLogger(logdir=self._logdir), "tensorboard": dl.TensorboardLogger(logdir=self._logdir), } @property def stages(self): return ["train_freezed", "train_unfreezed"] def get_stage_len(self, stage: str) -> int: return 3 def get_loaders(self, stage: str): loaders = { "train": DataLoader( MNIST(os.getcwd(), train=True, download=True, transform=ToTensor()), batch_size=32 ), "valid": DataLoader( MNIST(os.getcwd(), train=False), batch_size=32 ), } return loaders def get_model(self, stage: str): model = ( self.model if self.model is not None else nn.Sequential( nn.Flatten(), nn.Linear(784, 128), nn.ReLU(), nn.Linear(128, 10) ) ) if stage == "train_freezed": # freeze layer utils.set_requires_grad(model[1], False) else: utils.set_requires_grad(model, True) return model def get_criterion(self, stage: str): return nn.CrossEntropyLoss() def get_optimizer(self, stage: str, model): if stage == "train_freezed": return optim.Adam(model.parameters(), lr=1e-3) else: return optim.SGD(model.parameters(), lr=1e-1) def get_scheduler(self, stage: str, optimizer): return None def get_callbacks(self, stage: str): return { "criterion": dl.CriterionCallback( metric_key="loss", input_key="logits", target_key="targets" ), "optimizer": dl.OptimizerCallback(metric_key="loss"), "accuracy": dl.AccuracyCallback( input_key="logits", target_key="targets", topk_args=(1, 3, 5) ), "classification": dl.PrecisionRecallF1SupportCallback( input_key="logits", target_key="targets", num_classes=10 ), "checkpoint": dl.CheckpointCallback( self._logdir, loader_key="valid", metric_key="loss", minimize=True, save_n_best=3, ), } def handle_batch(self, batch): x, y = batch logits = self.model(x) self.batch = { "features": x, "targets": y, "logits": logits, } runner = CustomRunner("./logs", "cpu") runner.run()
-
get_callbacks
(stage: str) → OrderedDict[str, ICallback][source]¶ Returns callbacks for a given stage.
- Parameters
stage – stage name of interest like “pretrain” / “train” / “finetune” / etc
- Returns
Ordered dictionary # noqa: DAR202 with callbacks for current stage.
- Return type
OrderedDict[str, Callback]
-
get_criterion
(stage: str) → Optional[torch.nn.modules.module.Module][source]¶ Returns the criterion for a given stage and epoch.
Example:
# for typical classification task >>> runner.get_criterion(stage="train") nn.CrossEntropyLoss()
- Parameters
stage – stage name of interest like “pretrain” / “train” / “finetune” / etc
- Returns: # noqa: DAR201, DAR202
Criterion: criterion for a given stage.
-
get_datasets
(stage: str) → OrderedDict[str, Dataset][source]¶ Returns the datasets for a given stage and epoch. # noqa: DAR401
Note
For Deep Learning cases you have the same dataset during whole stage.
For Reinforcement Learning it’s common to change the dataset (experiment) every training epoch.
- Parameters
stage – stage name of interest, like “pretrain” / “train” / “finetune” / etc
- Returns: # noqa: DAR202
- OrderedDict[str, Dataset]: Ordered dictionary
with datasets for current stage and epoch.
Note
We need ordered dictionary to guarantee the correct dataflow and order of our training datasets. For example, to run train loader before validation one :)
Example:
>>> runner.get_datasets(stage="training") OrderedDict({ "train": CsvDataset(in_csv=in_csv_train, ...), "valid": CsvDataset(in_csv=in_csv_valid, ...), })
-
abstract
get_loaders
(stage: str) → OrderedDict[str, DataLoader][source]¶ Returns the loaders for a given stage. # noqa: DAR401
Note
Wrapper for
catalyst.core.experiment.IExperiment.get_datasets
. For most of your experiments you need to rewrite get_datasets method only.- Parameters
stage – stage name of interest, like “pretrain” / “train” / “finetune” / etc
- Returns: # noqa: DAR201, DAR202
- OrderedDict[str, DataLoader]: Ordered dictionary
with loaders for current stage and epoch.
-
abstract
get_model
(stage: str) → torch.nn.modules.module.Module[source]¶ Returns the model for a given stage and epoch.
Example:
# suppose we have typical MNIST model, like # nn.Sequential(nn.Linear(28*28, 128), nn.Linear(128, 10)) >>> runner.get_model(stage="train") Sequential( : Linear(in_features=784, out_features=128, bias=True) : Linear(in_features=128, out_features=10, bias=True) )
- Parameters
stage – stage name of interest like “pretrain” / “train” / “finetune” / etc
- Returns: # noqa: DAR201, DAR202
Model: model for a given stage.
-
get_optimizer
(stage: str, model: torch.nn.modules.module.Module) → Optional[torch.optim.optimizer.Optimizer][source]¶ Returns the optimizer for a given stage and model.
Example:
>>> runner.get_optimizer(model=model, stage="train") torch.optim.Adam(model.parameters())
- Parameters
stage – stage name of interest like “pretrain” / “train” / “finetune” / etc
model – model to optimize with stage optimizer
- Returns: # noqa: DAR201, DAR202
Optimizer: optimizer for a given stage and model.
-
get_scheduler
(stage: str, optimizer: torch.optim.optimizer.Optimizer) → Optional[torch.optim.lr_scheduler._LRScheduler][source]¶ Returns the scheduler for a given stage and optimizer.
- Example::
>>> runner.get_scheduler(stage="training", optimizer=optimizer) torch.optim.lr_scheduler.StepLR(optimizer)
- Parameters
stage – stage name of interest like “pretrain” / “train” / “finetune” / etc
optimizer – optimizer to schedule with stage scheduler
- Returns: # noqa: DAR201, DAR202
Scheduler: scheduler for a given stage and optimizer.
-
get_stage_len
(stage: str) → int[source]¶ Returns number of epochs for the selected stage.
- Parameters
stage – current stage
- Returns
number of epochs in stage
Example:
>>> runner.get_stage_len("pretraining") 3
-
abstract
handle_batch
(batch: Mapping[str, Any]) → None[source]¶ Inner method to handle specified data batch. Used to make a train/valid/infer stage during Experiment run.
- Parameters
batch (Mapping[str, Any]) – dictionary with data batches from DataLoader.
-
property
hparams
¶ Returns hyper-parameters for current run.
- Example::
>>> runner.hparams OrderedDict([('optimizer', 'Adam'), ('lr', 0.02), ('betas', (0.9, 0.999)), ('eps', 1e-08), ('weight_decay', 0), ('amsgrad', False), ('train_batch_size', 32)])
- Returns
dictionary with hyperparameters
-
log_metrics
(*args, **kwargs) → None[source]¶ Logs batch, loader and epoch metrics to available loggers.
-
run
() → catalyst.core.runner.IRunner[source]¶ Runs the experiment.
- Returns
self, IRunner instance after the experiment
-
property
seed
¶ Experiment’s seed for reproducibility.
-
abstract property
stages
¶ Run’s stage names.
Example:
>>> runner.stages ["pretraining", "finetuning"]
Engine¶
-
class
catalyst.core.engine.
IEngine
[source]¶ Bases:
abc.ABC
An abstraction that syncs experiment run with different hardware-specific configurations.
CPU
GPU
DataParallel (deepspeed, fairscale, nvidia, torch)
AMP (deepspeed, fairscale, nvidia, torch)
DDP (deepspeed, fairscale, nvidia, torch)
XLA
Abstraction, please check out implementations for more details:
-
autocast
(*args, **kwargs)[source]¶ AMP scaling context. Default autocast context does not scale anything.
- Parameters
*args – some args
**kwargs – some kwargs
- Returns
context
-
autocast_loader
(loader: torch.utils.data.dataloader.DataLoader)[source]¶ Loader wrapper for the distributed mode.
-
abstract property
backend
¶ String identifier for distributed backend.
-
abstract
backward_loss
(loss: torch.Tensor, model: torch.nn.modules.module.Module, optimizer: torch.optim.optimizer.Optimizer) → None[source]¶ Abstraction over
loss.backward()
step. Should be overloaded in cases when required loss scaling. Examples - APEX and AMP.- Parameters
loss – tensor with loss value.
model – model module.
optimizer – model optimizer.
-
barrier
() → None[source]¶ Synchronizes all processes.
This collective blocks processes until the all runs enter the function.
-
ddp_sync_run
(function: Callable)[source]¶ Function wrapper for synchronous run in the distributed mode.
-
abstract
deinit_components
(runner=None)[source]¶ Deinits the runs components. In distributed mode should destroy process group.
-
abstract property
device
¶ Pytorch device.
-
abstract
init_components
(model_fn: Callable = None, criterion_fn: Callable = None, optimizer_fn: Callable = None, scheduler_fn: Callable = None)[source]¶ Inits the runs components.
-
property
is_ddp
¶ Boolean flag for distributed run.
-
property
is_master_process
¶ Checks if a process is master process. Should be implemented only for distributed training (ddp). For non distributed training should always return True.
- Returns
True if current process is a master process in other cases return False.
-
property
is_worker_process
¶ Checks if a process is worker process. Should be implemented only for distributed training (ddp). For non distributed training should always return False.
- Returns
True if current process is a worker process in other cases return False.
-
abstract
load_checkpoint
(path: str) → Dict[source]¶ Load checkpoint from path.
- Parameters
path – checkpoint file to load
-
abstract
optimizer_step
(loss: torch.Tensor, model: torch.nn.modules.module.Module, optimizer: torch.optim.optimizer.Optimizer) → None[source]¶ Abstraction over
optimizer.step()
step. Should be overloaded in cases when required gradient scaling. Example - AMP.- Parameters
loss – tensor with loss value.
model – model module.
optimizer – model optimizer.
-
abstract
pack_checkpoint
(model: torch.nn.modules.module.Module = None, criterion: torch.nn.modules.module.Module = None, optimizer: torch.optim.optimizer.Optimizer = None, scheduler: torch.optim.lr_scheduler._LRScheduler = None, **kwargs) → Dict[source]¶ Packs
model
,criterion
,optimizer
,scheduler
and some extra info**kwargs
to torch-based checkpoint.- Parameters
model – torch model
criterion – torch criterion
optimizer – torch optimizer
scheduler – torch scheduler
**kwargs – some extra info to pack
-
abstract property
rank
¶ Process rank for distributed training.
-
abstract
save_checkpoint
(checkpoint: Dict, path: str) → None[source]¶ Saves checkpoint to a file.
- Parameters
checkpoint – data to save.
path – filepath where checkpoint should be stored.
-
setup_process
(rank: int = -1, world_size: int = 1)[source]¶ Initialize DDP variables and processes.
- Parameters
rank – process rank. Default is -1.
world_size – number of devices in netwok to expect for train. Default is 1.
-
spawn
(fn: Callable, *args: Any, **kwargs: Any) → None[source]¶ Spawns abstraction for``nprocs`` creation with specified
fn
andargs
/kwargs
.- Parameters
fn (function) – Function is called as the entrypoint of the spawned process. This function must be defined at the top level of a module so it can be pickled and spawned. This is a requirement imposed by multiprocessing. The function is called as
fn(i, *args)
, wherei
is the process index andargs
is the passed through tuple of arguments.*args – Arguments passed to spawn method.
**kwargs – Keyword-arguments passed to spawn method.
- Returns
wrapped function (if needed).
-
abstract
sync_device
(tensor_or_module: Union[Dict, List, Tuple, numpy.ndarray, torch.Tensor, torch.nn.modules.module.Module]) → Union[Dict, List, Tuple, torch.Tensor, torch.nn.modules.module.Module][source]¶ Moves
tensor_or_module
to Engine’s device.- Parameters
tensor_or_module – tensor to mode
-
abstract
sync_metrics
(metrics: Dict) → Dict[source]¶ Syncs
metrics
overworld_size
in the distributed mode.
-
abstract
sync_tensor
(tensor: torch.Tensor, mode: str) → torch.Tensor[source]¶ Syncs
tensor
overworld_size
in the distributed mode.
-
abstract
unpack_checkpoint
(checkpoint: Dict, model: torch.nn.modules.module.Module = None, criterion: torch.nn.modules.module.Module = None, optimizer: torch.optim.optimizer.Optimizer = None, scheduler: torch.optim.lr_scheduler._LRScheduler = None, **kwargs) → None[source]¶ Load checkpoint from file and unpack the content to a model (if not None), criterion (if not None), optimizer (if not None), scheduler (if not None).
- Parameters
checkpoint – checkpoint to load
model – model where should be updated state
criterion – criterion where should be updated state
optimizer – optimizer where should be updated state
scheduler – scheduler where should be updated state
kwargs – extra arguments
-
abstract property
world_size
¶ Process world size for distributed training.
-
abstract
zero_grad
(loss: torch.Tensor, model: torch.nn.modules.module.Module, optimizer: torch.optim.optimizer.Optimizer) → None[source]¶ Abstraction over
model.zero_grad()
step. Should be overloaded in cases when required to set arguments formodel.zero_grad()
like set_to_none=True or you need to use custom scheme which replaces/improves .zero_grad() method.- Parameters
loss – tensor with loss value.
model – model module.
optimizer – model optimizer.
Callback¶
ICallback¶
-
class
catalyst.core.callback.
ICallback
[source]¶ Bases:
object
A callable abstraction for deep learning runs.
-
on_batch_end
(runner: IRunner) → None[source]¶ Event handler for batch end.
- Parameters
runner – IRunner instance.
-
on_batch_start
(runner: IRunner) → None[source]¶ Event handler for batch start.
- Parameters
runner – IRunner instance.
-
on_epoch_end
(runner: IRunner) → None[source]¶ Event handler for epoch end.
- Parameters
runner – IRunner instance.
-
on_epoch_start
(runner: IRunner) → None[source]¶ Event handler for epoch start.
- Parameters
runner – IRunner instance.
-
on_exception
(runner: IRunner) → None[source]¶ Event handler for exception case.
- Parameters
runner – IRunner instance.
-
on_experiment_end
(runner: IRunner) → None[source]¶ Event handler for experiment end.
- Parameters
runner – IRunner instance.
Note
This event work only on IRunner.
-
on_experiment_start
(runner: IRunner) → None[source]¶ Event handler for experiment start.
- Parameters
runner – IRunner instance.
Note
This event work only on IRunner.
-
on_loader_end
(runner: IRunner) → None[source]¶ Event handler for loader end.
- Parameters
runner – IRunner instance.
-
on_loader_start
(runner: IRunner) → None[source]¶ Event handler for loader start.
- Parameters
runner – IRunner instance.
-
CallbackNode¶
-
class
catalyst.core.callback.
CallbackNode
[source]¶ Bases:
enum.IntFlag
Callback node usage flag during distributed training.
All (0) - use on all nodes, botch master and worker.
Master (1) - use only on master node.
Worker (2) - use only in worker nodes.
-
All
= 0¶
-
Master
= 1¶
-
Worker
= 2¶
-
all
= 0¶
-
master
= 1¶
-
worker
= 2¶
CallbackOrder¶
-
class
catalyst.core.callback.
CallbackOrder
[source]¶ Bases:
enum.IntFlag
Callback usage order during training.
Catalyst executes Callbacks with low CallbackOrder before Callbacks with high CallbackOrder.
Predefined orders:
Internal (0) - some Catalyst Extras, like PhaseCallbacks (used in GANs).
Metric (20) - Callbacks with metrics and losses computation.
MetricAggregation (40) - metrics aggregation callbacks, like sum different losses into one.
Optimizer (60) - optimizer step, requires computed metrics for optimization.
Scheduler (80) - scheduler step, in ReduceLROnPlateau case requires computed validation metrics for optimizer schedule.
External (100) - additional callbacks with custom logic, like InferenceCallbacks
Nevertheless, you always can create CustomCallback with any order, for example:
>>> class MyCustomCallback(Callback): >>> def __init__(self): >>> super().__init__(order=33) >>> ... # MyCustomCallback will be executed after all `Metric`-Callbacks # but before all `MetricAggregation`-Callbacks.
-
External
= 100¶
-
ExternalExtra
= 120¶
-
Internal
= 0¶
-
Metric
= 20¶
-
MetricAggregation
= 40¶
-
Optimizer
= 60¶
-
Scheduler
= 80¶
-
external
= 100¶
-
external_extra
= 120¶
-
internal
= 0¶
-
metric
= 20¶
-
metric_aggregation
= 40¶
-
optimizer
= 60¶
-
scheduler
= 80¶
CallbackScope¶
Callback¶
-
class
catalyst.core.callback.
Callback
(order: int, node: int = <CallbackNode.All: 0>, scope: int = <CallbackScope.Stage: 0>)[source]¶ Bases:
catalyst.core.callback.ICallback
An abstraction that lets you customize your experiment run logic.
- Parameters
order – flag from
CallbackOrder
node – flag from
CallbackNode
scope – flag from
CallbackScope
To give users maximum flexibility and extensibility Catalyst supports callback execution anywhere in the training loop:
-- stage start ---- epoch start ------ loader start -------- batch start ---------- batch handler (Runner logic) -------- batch end ------ loader end ---- epoch end -- stage end exception – if an Exception was raised
Abstraction, please check out implementations for more details:
Note
To learn more about Catalyst Core concepts, please check out
CallbackList¶
-
class
catalyst.core.callback.
CallbackList
(callbacks: List[catalyst.core.callback.Callback], order: int, node: int = <CallbackNode.All: 0>, scope: int = <CallbackScope.Stage: 0>)[source]¶ Bases:
catalyst.core.callback.Callback
Callback wrapper for a list of Callbacks
- Parameters
callbacks – list of callbacks
order – flag from
CallbackOrder
node – flag from
CallbackNode
scope – flag from
CallbackScope
-
on_batch_end
(runner: IRunner) → None[source]¶ Event handler for batch end.
- Parameters
runner – IRunner instance.
-
on_batch_start
(runner: IRunner) → None[source]¶ Event handler for batch start.
- Parameters
runner – IRunner instance.
-
on_epoch_end
(runner: IRunner) → None[source]¶ Event handler for epoch end.
- Parameters
runner – IRunner instance.
-
on_epoch_start
(runner: IRunner) → None[source]¶ Event handler for epoch start.
- Parameters
runner – IRunner instance.
-
on_exception
(runner: IRunner) → None[source]¶ Event handler for exception case.
- Parameters
runner – IRunner instance.
-
on_experiment_end
(runner: IRunner) → None[source]¶ Event handler for experiment end.
- Parameters
runner – IRunner instance.
Note
This event work only on IRunner.
-
on_experiment_start
(runner: IRunner) → None[source]¶ Event handler for experiment start.
- Parameters
runner – IRunner instance.
Note
This event work only on IRunner.
-
on_loader_end
(runner: IRunner) → None[source]¶ Event handler for loader end.
- Parameters
runner – IRunner instance.
-
on_loader_start
(runner: IRunner) → None[source]¶ Event handler for loader start.
- Parameters
runner – IRunner instance.
CallbackWrapper¶
-
class
catalyst.core.callback.
CallbackWrapper
(base_callback: catalyst.core.callback.Callback, enable_callback: bool = True)[source]¶ Bases:
catalyst.core.callback.Callback
Enable/disable callback execution.
- Parameters
base_callback – callback to wrap
enable_callback – indicator to enable/disable callback, if
True
then callback will be enabled, defaultTrue
Logger¶
-
class
catalyst.core.logger.
ILogger
(log_batch_metrics: bool, log_epoch_metrics: bool)[source]¶ Bases:
object
An abstraction that syncs experiment run with monitoring tools.
- Parameters
log_batch_metrics – boolean flag to log batch metrics.
log_epoch_metrics – boolean flag to log epoch metrics.
Abstraction, please check out implementations for more details:
-
log_artifact
(tag: str, artifact: object = None, path_to_artifact: str = None, scope: str = None, run_key: str = None, global_epoch_step: int = 0, global_batch_step: int = 0, global_sample_step: int = 0, stage_key: str = None, stage_epoch_len: int = 0, stage_epoch_step: int = 0, stage_batch_step: int = 0, stage_sample_step: int = 0, loader_key: str = None, loader_batch_len: int = 0, loader_sample_len: int = 0, loader_batch_step: int = 0, loader_sample_step: int = 0) → None[source]¶ Logs artifact (arbitrary file like audio, video, model weights) to the logger.
-
property
log_batch_metrics
¶ Boolean flag to log batch metrics.
- Returns
boolean flag to log batch metrics.
- Return type
bool
-
property
log_epoch_metrics
¶ Boolean flag to log epoch metrics.
- Returns
boolean flag to log epoch metrics.
- Return type
bool
-
log_hparams
(hparams: Dict, scope: str = None, run_key: str = None, stage_key: str = None) → None[source]¶ Logs hyperparameters to the logger.
-
log_image
(tag: str, image: numpy.ndarray, scope: str = None, run_key: str = None, global_epoch_step: int = 0, global_batch_step: int = 0, global_sample_step: int = 0, stage_key: str = None, stage_epoch_len: int = 0, stage_epoch_step: int = 0, stage_batch_step: int = 0, stage_sample_step: int = 0, loader_key: str = None, loader_batch_len: int = 0, loader_sample_len: int = 0, loader_batch_step: int = 0, loader_sample_step: int = 0) → None[source]¶ Logs image to the logger.
-
log_metrics
(metrics: Dict[str, float], scope: str = None, run_key: str = None, global_epoch_step: int = 0, global_batch_step: int = 0, global_sample_step: int = 0, stage_key: str = None, stage_epoch_len: int = 0, stage_epoch_step: int = 0, stage_batch_step: int = 0, stage_sample_step: int = 0, loader_key: str = None, loader_batch_len: int = 0, loader_sample_len: int = 0, loader_batch_step: int = 0, loader_sample_step: int = 0) → None[source]¶ Logs metrics to the logger.
Trial¶
Scripts¶
You can use Catalyst scripts with catalyst-dl in your terminal. For example:
$ catalyst-dl run --help