Core¶
Runner¶
- 
class catalyst.core.runner.IRunner(model: Optional[Union[torch.nn.modules.module.Module, Dict[str, torch.nn.modules.module.Module]]] = None, engine: Optional[catalyst.core.engine.IEngine] = None)[source]¶
- Bases: - catalyst.core.callback.ICallback,- catalyst.core.logger.ILogger,- abc.ABC- An abstraction that contains all the logic of how to run the experiment, stages, epochs, loaders and batches. - IRunner supports the logic for deep learning pipeline configuration with pure python code. Please check the examples for intuition. - Parameters
- model – Torch model object 
- engine – IEngine instance 
 
 - Abstraction, please check out implementations for more details: - Note - To learn more about Catalyst Core concepts, please check out - Note - Please follow the minimal examples sections for use cases. - Examples: - import os from torch import nn, optim from torch.utils.data import DataLoader from catalyst import dl, utils from catalyst.contrib.datasets import MNIST from catalyst.data.transforms import ToTensor class CustomRunner(dl.IRunner): def __init__(self, logdir, device): super().__init__() self._logdir = logdir self._device = device def get_engine(self): return dl.DeviceEngine(self._device) def get_loggers(self): return { "console": dl.ConsoleLogger(), "csv": dl.CSVLogger(logdir=self._logdir), "tensorboard": dl.TensorboardLogger(logdir=self._logdir), } @property def stages(self): return ["train_freezed", "train_unfreezed"] def get_stage_len(self, stage: str) -> int: return 3 def get_loaders(self, stage: str): loaders = { "train": DataLoader( MNIST(os.getcwd(), train=True, download=True, transform=ToTensor()), batch_size=32 ), "valid": DataLoader( MNIST(os.getcwd(), train=False, download=True, transform=ToTensor()), batch_size=32 ), } return loaders def get_model(self, stage: str): model = ( self.model if self.model is not None else nn.Sequential( nn.Flatten(), nn.Linear(784, 128), nn.ReLU(), nn.Linear(128, 10) ) ) if stage == "train_freezed": # freeze layer utils.set_requires_grad(model[1], False) else: utils.set_requires_grad(model, True) return model def get_criterion(self, stage: str): return nn.CrossEntropyLoss() def get_optimizer(self, stage: str, model): if stage == "train_freezed": return optim.Adam(model.parameters(), lr=1e-3) else: return optim.SGD(model.parameters(), lr=1e-1) def get_scheduler(self, stage: str, optimizer): return None def get_callbacks(self, stage: str): return { "criterion": dl.CriterionCallback( metric_key="loss", input_key="logits", target_key="targets" ), "optimizer": dl.OptimizerCallback(metric_key="loss"), "accuracy": dl.AccuracyCallback( input_key="logits", target_key="targets", topk_args=(1, 3, 5) ), "classification": dl.PrecisionRecallF1SupportCallback( input_key="logits", target_key="targets", num_classes=10 ), "checkpoint": dl.CheckpointCallback( self._logdir, loader_key="valid", metric_key="loss", minimize=True, save_n_best=3, ), } def handle_batch(self, batch): x, y = batch logits = self.model(x) self.batch = { "features": x, "targets": y, "logits": logits, } runner = CustomRunner("./logs", "cpu") runner.run() - 
get_callbacks(stage: str) → OrderedDict[str, ICallback][source]¶
- Returns callbacks for a given stage. - Parameters
- stage – stage name of interest like “pretrain” / “train” / “finetune” / etc 
- Returns
- Ordered dictionary # noqa: DAR202 with callbacks for current stage. 
- Return type
- OrderedDict[str, Callback] 
 
 - 
get_criterion(stage: str) → Optional[torch.nn.modules.module.Module][source]¶
- Returns the criterion for a given stage and epoch. - Example: - # for typical classification task >>> runner.get_criterion(stage="train") nn.CrossEntropyLoss() - Parameters
- stage – stage name of interest like “pretrain” / “train” / “finetune” / etc 
 - Returns: # noqa: DAR201, DAR202
- Criterion: criterion for a given stage. 
 
 - 
get_datasets(stage: str) → OrderedDict[str, Dataset][source]¶
- Returns the datasets for a given stage and epoch. # noqa: DAR401 - Note - For Deep Learning cases you have the same dataset during whole stage. - For Reinforcement Learning it’s common to change the dataset (experiment) every training epoch. - Parameters
- stage – stage name of interest, like “pretrain” / “train” / “finetune” / etc 
 - Returns: # noqa: DAR202
- OrderedDict[str, Dataset]: Ordered dictionary
- with datasets for current stage and epoch. 
 
 - Note - We need ordered dictionary to guarantee the correct dataflow and order of our training datasets. For example, to run train loader before validation one :) - Example: - >>> runner.get_datasets(stage="training") OrderedDict({ "train": CsvDataset(in_csv=in_csv_train, ...), "valid": CsvDataset(in_csv=in_csv_valid, ...), }) 
 - 
abstract get_engine() → catalyst.core.engine.IEngine[source]¶
- Returns the engine for the run. 
 - 
abstract get_loaders(stage: str) → OrderedDict[str, DataLoader][source]¶
- Returns the loaders for a given stage. # noqa: DAR401 - Note - Wrapper for - catalyst.core.experiment.IExperiment.get_datasets. For most of your experiments you need to rewrite get_datasets method only.- Parameters
- stage – stage name of interest, like “pretrain” / “train” / “finetune” / etc 
 - Returns: # noqa: DAR201, DAR202
- OrderedDict[str, DataLoader]: Ordered dictionary
- with loaders for current stage and epoch. 
 
 
 - 
get_loggers() → Dict[str, catalyst.core.logger.ILogger][source]¶
- Returns the loggers for the run. 
 - 
abstract get_model(stage: str) → torch.nn.modules.module.Module[source]¶
- Returns the model for a given stage and epoch. - Example: - # suppose we have typical MNIST model, like # nn.Sequential(nn.Linear(28*28, 128), nn.Linear(128, 10)) >>> runner.get_model(stage="train") Sequential( : Linear(in_features=784, out_features=128, bias=True) : Linear(in_features=128, out_features=10, bias=True) ) - Parameters
- stage – stage name of interest like “pretrain” / “train” / “finetune” / etc 
 - Returns: # noqa: DAR201, DAR202
- Model: model for a given stage. 
 
 - 
get_optimizer(stage: str, model: torch.nn.modules.module.Module) → Optional[torch.optim.optimizer.Optimizer][source]¶
- Returns the optimizer for a given stage and model. - Example: - >>> runner.get_optimizer(model=model, stage="train") torch.optim.Adam(model.parameters()) - Parameters
- stage – stage name of interest like “pretrain” / “train” / “finetune” / etc 
- model – model to optimize with stage optimizer 
 
 - Returns: # noqa: DAR201, DAR202
- Optimizer: optimizer for a given stage and model. 
 
 - 
get_scheduler(stage: str, optimizer: torch.optim.optimizer.Optimizer) → Optional[torch.optim.lr_scheduler._LRScheduler][source]¶
- Returns the scheduler for a given stage and optimizer. - Example::
- >>> runner.get_scheduler(stage="training", optimizer=optimizer) torch.optim.lr_scheduler.StepLR(optimizer) 
 - Parameters
- stage – stage name of interest like “pretrain” / “train” / “finetune” / etc 
- optimizer – optimizer to schedule with stage scheduler 
 
 - Returns: # noqa: DAR201, DAR202
- Scheduler: scheduler for a given stage and optimizer. 
 
 - 
get_stage_len(stage: str) → int[source]¶
- Returns number of epochs for the selected stage. - Parameters
- stage – current stage 
- Returns
- number of epochs in stage 
 - Example: - >>> runner.get_stage_len("pretraining") 3 
 - 
get_trial() → Optional[catalyst.core.trial.ITrial][source]¶
- Returns the trial for the run. 
 - 
abstract handle_batch(batch: Mapping[str, Any]) → None[source]¶
- Inner method to handle specified data batch. Used to make a train/valid/infer stage during Experiment run. - Parameters
- batch (Mapping[str, Any]) – dictionary with data batches from DataLoader. 
 
 - 
property hparams¶
- Returns hyper-parameters for current run. - Example::
- >>> runner.hparams OrderedDict([('optimizer', 'Adam'), ('lr', 0.02), ('betas', (0.9, 0.999)), ('eps', 1e-08), ('weight_decay', 0), ('amsgrad', False), ('train_batch_size', 32)]) 
 - Returns
- dictionary with hyperparameters 
 
 - 
log_metrics(*args, **kwargs) → None[source]¶
- Logs batch, loader and epoch metrics to available loggers. 
 - 
run() → catalyst.core.runner.IRunner[source]¶
- Runs the experiment. - Returns
- self, IRunner instance after the experiment 
 
 - 
property seed¶
- Experiment’s seed for reproducibility. 
 - 
abstract property stages¶
- Run’s stage names. - Example: - >>> runner.stages ["pretraining", "finetuning"] 
 
Engine¶
- 
class catalyst.core.engine.IEngine[source]¶
- Bases: - abc.ABC- An abstraction that syncs experiment run with different hardware-specific configurations. - cpu 
- single-gpu 
- multi-gpu 
- amp (nvidia, torch) 
- ddp (torch, etc) 
 - Abstraction, please check out implementations for more details: - 
autocast(*args, **kwargs)[source]¶
- AMP scaling context. Default autocast context does not scale anything. - Parameters
- *args – some args 
- **kwargs – some kwargs 
 
- Returns
- context 
 
 - 
abstract backward_loss(loss, model, optimizer) → None[source]¶
- Abstraction over - loss.backward()step. Should be overloaded in cases when required loss scaling. Examples - APEX and AMP.- Parameters
- loss – tensor with loss value. 
- model – model module. 
- optimizer – model optimizer. 
 
 
 - 
abstract deinit_components()[source]¶
- Deinits the runs components. In distributed mode should destroy process group. 
 - 
abstract init_components(model_fn=None, criterion_fn=None, optimizer_fn=None, scheduler_fn=None)[source]¶
- Inits the runs components. 
 - 
property is_ddp¶
- Boolean flag for distributed run. 
 - 
property is_master_process¶
- Checks if a process is master process. Should be implemented only for distributed training (ddp). For non distributed training should always return True. - Returns
- True if current process is a master process in other cases return False. 
 
 - 
property is_worker_process¶
- Checks if a process is worker process. Should be implemented only for distributed training (ddp). For non distributed training should always return False. - Returns
- True if current process is a worker process in other cases return False. 
 
 - 
abstract load_checkpoint(path: str) → Dict[source]¶
- Load checkpoint from path. - Parameters
- path – checkpoint file to load 
 
 - 
abstract optimizer_step(loss, model, optimizer) → None[source]¶
- Abstraction over - optimizer.step()step. Should be overloaded in cases when required gradient scaling. Example - AMP.- Parameters
- loss – tensor with loss value. 
- model – model module. 
- optimizer – model optimizer. 
 
 
 - 
abstract pack_checkpoint(model: Optional[torch.nn.modules.module.Module] = None, criterion: Optional[torch.nn.modules.module.Module] = None, optimizer: Optional[torch.optim.optimizer.Optimizer] = None, scheduler: Optional[torch.optim.lr_scheduler._LRScheduler] = None, **kwargs) → Dict[source]¶
- Packs - model,- criterion,- optimizer,- schedulerand some extra info- **kwargsto torch-based checkpoint.- Parameters
- model – torch model 
- criterion – torch criterion 
- optimizer – torch optimizer 
- scheduler – torch scheduler 
- **kwargs – some extra info to pack 
 
 
 - 
abstract property rank¶
- Process rank for distributed training. 
 - 
abstract save_checkpoint(checkpoint: Dict, path: str) → None[source]¶
- Saves checkpoint to a file. - Parameters
- checkpoint – data to save. 
- path – filepath where checkpoint should be stored. 
 
 
 - 
abstract sync_device(tensor_or_module: Any) → Any[source]¶
- Moves - tensor_or_moduleto Engine’s device.- Parameters
- tensor_or_module – tensor to mode 
 
 - 
abstract sync_tensor(tensor: Any, mode: str) → Any[source]¶
- Syncs - tensorover- world_sizein distributed mode.
 - 
abstract unpack_checkpoint(checkpoint: Dict, model: Optional[torch.nn.modules.module.Module] = None, criterion: Optional[torch.nn.modules.module.Module] = None, optimizer: Optional[torch.optim.optimizer.Optimizer] = None, scheduler: Optional[torch.optim.lr_scheduler._LRScheduler] = None, **kwargs) → None[source]¶
- Load checkpoint from file and unpack the content to a model (if not None), criterion (if not None), optimizer (if not None), scheduler (if not None). - Parameters
- checkpoint – checkpoint to load 
- model – model where should be updated state 
- criterion – criterion where should be updated state 
- optimizer – optimizer where should be updated state 
- scheduler – scheduler where should be updated state 
- kwargs – extra arguments 
 
 
 - 
abstract property world_size¶
- Process world size for distributed training. 
 - 
abstract zero_grad(loss, model, optimizer) → None[source]¶
- Abstraction over - model.zero_grad()step. Should be overloaded in cases when required to set arguments for- model.zero_grad()like set_to_none=True or you need to use custom scheme which replaces/improves .zero_grad() method.- Parameters
- loss – tensor with loss value. 
- model – model module. 
- optimizer – model optimizer. 
 
 
 
Callback¶
ICallback¶
- 
class catalyst.core.callback.ICallback[source]¶
- Bases: - object- A callable abstraction for deep learning runs. - 
on_batch_end(runner: IRunner) → None[source]¶
- Event handler for batch end. - Parameters
- runner – IRunner instance. 
 
 - 
on_batch_start(runner: IRunner) → None[source]¶
- Event handler for batch start. - Parameters
- runner – IRunner instance. 
 
 - 
on_epoch_end(runner: IRunner) → None[source]¶
- Event handler for epoch end. - Parameters
- runner – IRunner instance. 
 
 - 
on_epoch_start(runner: IRunner) → None[source]¶
- Event handler for epoch start. - Parameters
- runner – IRunner instance. 
 
 - 
on_exception(runner: IRunner) → None[source]¶
- Event handler for exception case. - Parameters
- runner – IRunner instance. 
 
 - 
on_experiment_end(runner: IRunner) → None[source]¶
- Event handler for experiment end. - Parameters
- runner – IRunner instance. 
 - Note - This event work only on IRunner. 
 - 
on_experiment_start(runner: IRunner) → None[source]¶
- Event handler for experiment start. - Parameters
- runner – IRunner instance. 
 - Note - This event work only on IRunner. 
 - 
on_loader_end(runner: IRunner) → None[source]¶
- Event handler for loader end. - Parameters
- runner – IRunner instance. 
 
 - 
on_loader_start(runner: IRunner) → None[source]¶
- Event handler for loader start. - Parameters
- runner – IRunner instance. 
 
 
- 
CallbackNode¶
- 
class catalyst.core.callback.CallbackNode(value)[source]¶
- Bases: - enum.IntFlag- Callback node usage flag during distributed training. - All (0) - use on all nodes, botch master and worker. 
- Master (1) - use only on master node. 
- Worker (2) - use only in worker nodes. 
 - 
All= 0¶
 - 
Master= 1¶
 - 
Worker= 2¶
 - 
all= 0¶
 - 
master= 1¶
 - 
worker= 2¶
 
CallbackOrder¶
- 
class catalyst.core.callback.CallbackOrder(value)[source]¶
- Bases: - enum.IntFlag- Callback usage order during training. - Catalyst executes Callbacks with low CallbackOrder before Callbacks with high CallbackOrder. - Predefined orders: - Internal (0) - some Catalyst Extras, like PhaseCallbacks (used in GANs). 
- Metric (20) - Callbacks with metrics and losses computation. 
- MetricAggregation (40) - metrics aggregation callbacks, like sum different losses into one. 
- Optimizer (60) - optimizer step, requires computed metrics for optimization. 
- Scheduler (80) - scheduler step, in ReduceLROnPlateau case requires computed validation metrics for optimizer schedule. 
- External (100) - additional callbacks with custom logic, like InferenceCallbacks 
 - Nevertheless, you always can create CustomCallback with any order, for example: - >>> class MyCustomCallback(Callback): >>> def __init__(self): >>> super().__init__(order=33) >>> ... # MyCustomCallback will be executed after all `Metric`-Callbacks # but before all `MetricAggregation`-Callbacks. - 
External= 100¶
 - 
ExternalExtra= 120¶
 - 
Internal= 0¶
 - 
Metric= 20¶
 - 
MetricAggregation= 40¶
 - 
Optimizer= 60¶
 - 
Scheduler= 80¶
 - 
external= 100¶
 - 
external_extra= 120¶
 - 
internal= 0¶
 - 
metric= 20¶
 - 
metric_aggregation= 40¶
 - 
optimizer= 60¶
 - 
scheduler= 80¶
 
Callback¶
- 
class catalyst.core.callback.Callback(order: int, node: int = <CallbackNode.All: 0>, scope: int = <CallbackScope.Stage: 0>)[source]¶
- Bases: - catalyst.core.callback.ICallback- An abstraction that lets you customize your experiment run logic. - Parameters
- order – flag from - CallbackOrder
- node – flag from - CallbackNode
- scope – flag from - CallbackScope
 
 - To give users maximum flexibility and extensibility Catalyst supports callback execution anywhere in the training loop: - -- stage start ---- epoch start ------ loader start -------- batch start ---------- batch handler (Runner logic) -------- batch end ------ loader end ---- epoch end -- stage end exception – if an Exception was raised - Abstraction, please check out implementations for more details: - Note - To learn more about Catalyst Core concepts, please check out 
CallbackList¶
- 
class catalyst.core.callback.CallbackList(callbacks: List[catalyst.core.callback.Callback], order: int, node: int = <CallbackNode.All: 0>, scope: int = <CallbackScope.Stage: 0>)[source]¶
- Bases: - catalyst.core.callback.Callback- Callback wrapper for a list of Callbacks - Parameters
- callbacks – list of callbacks 
- order – flag from - CallbackOrder
- node – flag from - CallbackNode
- scope – flag from - CallbackScope
 
 - 
__init__(callbacks: List[catalyst.core.callback.Callback], order: int, node: int = <CallbackNode.All: 0>, scope: int = <CallbackScope.Stage: 0>)[source]¶
- Init. 
 - 
on_batch_end(runner: IRunner) → None[source]¶
- Event handler for batch end. - Parameters
- runner – IRunner instance. 
 
 - 
on_batch_start(runner: IRunner) → None[source]¶
- Event handler for batch start. - Parameters
- runner – IRunner instance. 
 
 - 
on_epoch_end(runner: IRunner) → None[source]¶
- Event handler for epoch end. - Parameters
- runner – IRunner instance. 
 
 - 
on_epoch_start(runner: IRunner) → None[source]¶
- Event handler for epoch start. - Parameters
- runner – IRunner instance. 
 
 - 
on_exception(runner: IRunner) → None[source]¶
- Event handler for exception case. - Parameters
- runner – IRunner instance. 
 
 - 
on_experiment_end(runner: IRunner) → None[source]¶
- Event handler for experiment end. - Parameters
- runner – IRunner instance. 
 - Note - This event work only on IRunner. 
 - 
on_experiment_start(runner: IRunner) → None[source]¶
- Event handler for experiment start. - Parameters
- runner – IRunner instance. 
 - Note - This event work only on IRunner. 
 - 
on_loader_end(runner: IRunner) → None[source]¶
- Event handler for loader end. - Parameters
- runner – IRunner instance. 
 
 - 
on_loader_start(runner: IRunner) → None[source]¶
- Event handler for loader start. - Parameters
- runner – IRunner instance. 
 
 
CallbackWrapper¶
- 
class catalyst.core.callback.CallbackWrapper(base_callback: catalyst.core.callback.Callback, enable_callback: bool = True)[source]¶
- Bases: - catalyst.core.callback.Callback- Enable/disable callback execution. - Parameters
- base_callback – callback to wrap 
- enable_callback – indicator to enable/disable callback, if - Truethen callback will be enabled, default- True
 
 - 
__init__(base_callback: catalyst.core.callback.Callback, enable_callback: bool = True)[source]¶
- Init. 
 
Logger¶
- 
class catalyst.core.logger.ILogger[source]¶
- Bases: - object- An abstraction that syncs experiment run with monitoring tools. - Abstraction, please check out implementations for more details: - 
log_artifact(tag: str, artifact: Optional[object] = None, path_to_artifact: Optional[str] = None, scope: Optional[str] = None, run_key: Optional[str] = None, global_epoch_step: int = 0, global_batch_step: int = 0, global_sample_step: int = 0, stage_key: Optional[str] = None, stage_epoch_len: int = 0, stage_epoch_step: int = 0, stage_batch_step: int = 0, stage_sample_step: int = 0, loader_key: Optional[str] = None, loader_batch_len: int = 0, loader_sample_len: int = 0, loader_batch_step: int = 0, loader_sample_step: int = 0) → None[source]¶
- Logs artifact (arbitrary file like audio, video, model weights) to the logger. 
 - 
log_hparams(hparams: Dict, scope: Optional[str] = None, run_key: Optional[str] = None, stage_key: Optional[str] = None) → None[source]¶
- Logs hyperparameters to the logger. 
 - 
log_image(tag: str, image: numpy.ndarray, scope: Optional[str] = None, run_key: Optional[str] = None, global_epoch_step: int = 0, global_batch_step: int = 0, global_sample_step: int = 0, stage_key: Optional[str] = None, stage_epoch_len: int = 0, stage_epoch_step: int = 0, stage_batch_step: int = 0, stage_sample_step: int = 0, loader_key: Optional[str] = None, loader_batch_len: int = 0, loader_sample_len: int = 0, loader_batch_step: int = 0, loader_sample_step: int = 0) → None[source]¶
- Logs image to the logger. 
 - 
log_metrics(metrics: Dict[str, float], scope: Optional[str] = None, run_key: Optional[str] = None, global_epoch_step: int = 0, global_batch_step: int = 0, global_sample_step: int = 0, stage_key: Optional[str] = None, stage_epoch_len: int = 0, stage_epoch_step: int = 0, stage_batch_step: int = 0, stage_sample_step: int = 0, loader_key: Optional[str] = None, loader_batch_len: int = 0, loader_sample_len: int = 0, loader_batch_step: int = 0, loader_sample_step: int = 0) → None[source]¶
- Logs metrics to the logger. 
 
-