Shortcuts

Source code for catalyst.runners.runner

from typing import Any, Dict, Generator, Iterable, List, Mapping, Optional, Union
from collections import OrderedDict
import os

import torch
from torch import nn, optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.utils.data import DataLoader

from catalyst.callbacks.batch_overfit import BatchOverfitCallback
from catalyst.callbacks.checkpoint import CheckpointCallback, ICheckpointCallback
from catalyst.callbacks.criterion import CriterionCallback, ICriterionCallback
from catalyst.callbacks.misc import CheckRunCallback, TimerCallback, TqdmCallback
from catalyst.callbacks.optimizer import IOptimizerCallback, OptimizerCallback
from catalyst.callbacks.scheduler import ISchedulerCallback, SchedulerCallback
from catalyst.core.callback import Callback
from catalyst.core.logger import ILogger
from catalyst.core.misc import callback_isinstance, sort_callbacks_by_order
from catalyst.core.runner import IRunner, RunnerException
from catalyst.core.trial import ITrial
from catalyst.data.loader import ILoaderWrapper
from catalyst.engines import IEngine
from catalyst.loggers.console import ConsoleLogger
from catalyst.loggers.csv import CSVLogger
from catalyst.loggers.tensorboard import TensorboardLogger
from catalyst.runners.supervised import ISupervisedRunner
from catalyst.typing import (
    Criterion,
    Model,
    Optimizer,
    RunnerCriterion,
    RunnerModel,
    RunnerOptimizer,
    RunnerScheduler,
    Scheduler,
)
from catalyst.utils.data import get_loaders_from_params
from catalyst.utils.misc import maybe_recursive_call, set_global_seed
from catalyst.utils.torch import get_available_engine


def _process_loaders(
    loaders: "OrderedDict[str, DataLoader]", initial_seed: int
) -> "OrderedDict[str, DataLoader]":
    if not isinstance(loaders[list(loaders.keys())[0]], (DataLoader, ILoaderWrapper)):
        loaders = get_loaders_from_params(initial_seed=initial_seed, **loaders)
    return loaders


[docs]class Runner(IRunner): """Single-stage deep learning Runner with user-friendly API. Runner supports the logic for deep learning pipeline configuration with pure python code. Please check the examples for intuition. Args: *args: `IRunner` args (model, engine) **kwargs: `IRunner` kwargs (model, engine) .. note:: IRunner supports only base user-friendly callbacks, like TqdmCallback, TimerCallback, CheckRunCallback, BatchOverfitCallback, and CheckpointCallback. It does not automatically add Criterion, Optimizer or Scheduler callbacks. That means, that you have do optimization step by yourself during ``handle_batch`` method or specify the required callbacks in ``.train`` or ``get_callbacks`` methods. For more easy-to-go supervised use case please follow :py:mod:`catalyst.runners.runner.SupervisedRunner`. .. note:: Please follow the `minimal examples`_ sections for use cases. .. _`minimal examples`: https://github.com/catalyst-team/catalyst#minimal-examples Examples: .. code-block:: python import os from torch import nn, optim from torch.nn import functional as F from torch.utils.data import DataLoader from catalyst import dl, metrics from catalyst.data import ToTensor from catalyst.contrib.datasets import MNIST model = nn.Sequential(nn.Flatten(), nn.Linear(28 * 28, 10)) optimizer = optim.Adam(model.parameters(), lr=0.02) loaders = { "train": DataLoader( MNIST(os.getcwd(), train=True, download=True, transform=ToTensor()), batch_size=32 ), "valid": DataLoader( MNIST(os.getcwd(), train=False, download=True, transform=ToTensor()), batch_size=32 ), } class CustomRunner(dl.Runner): def predict_batch(self, batch): # model inference step return self.model(batch[0].to(self.device)) def on_loader_start(self, runner): super().on_loader_start(runner) self.meters = { key: metrics.AdditiveMetric(compute_on_call=False) for key in ["loss", "accuracy01", "accuracy03"] } def handle_batch(self, batch): # model train/valid step # unpack the batch x, y = batch # run model forward pass logits = self.model(x) # compute the loss loss = F.cross_entropy(logits, y) # compute other metrics of interest accuracy01, accuracy03 = metrics.accuracy(logits, y, topk=(1, 3)) # log metrics self.batch_metrics.update( {"loss": loss, "accuracy01": accuracy01, "accuracy03": accuracy03} ) for key in ["loss", "accuracy01", "accuracy03"]: self.meters[key].update(self.batch_metrics[key].item(), self.batch_size) # run model backward pass if self.is_train_loader: loss.backward() self.optimizer.step() self.optimizer.zero_grad() def on_loader_end(self, runner): for key in ["loss", "accuracy01", "accuracy03"]: self.loader_metrics[key] = self.meters[key].compute()[0] super().on_loader_end(runner) runner = CustomRunner() # model training runner.train( model=model, optimizer=optimizer, loaders=loaders, logdir="./logs", num_epochs=5, verbose=True, valid_loader="valid", valid_metric="loss", minimize_valid_metric=True, ) # model inference for logits in runner.predict_loader(loader=loaders["valid"]): assert logits.detach().cpu().numpy().shape[-1] == 10 """ def __init__(self, *args, **kwargs): """Init.""" super().__init__(*args, **kwargs) # the core self._trial: ITrial = None self._engine: IEngine = self.engine self._model: RunnerModel = self.model # the data self._loaders: Dict[str, DataLoader] = None # the components self._criterion: RunnerCriterion = None self._optimizer: RunnerOptimizer = None self._scheduler: RunnerScheduler = None # the callbacks self._callbacks: Dict[str, Callback] = {} # the loggers self._loggers: Dict[str, ILogger] = {} # extra self._seed = 42 self._hparams: Dict = None self._stage: str = "stage" self._num_epochs: int = 1 # model selection self._logdir = None self._valid_loader = None self._valid_metric = None self._minimize_valid_metric = None # extras self._verbose = False self._timeit = False self._check = False self._overfit = False self._load_best_on_end = False @property def seed(self) -> int: """Experiment's initial seed value.""" return self._seed @property def name(self) -> str: """Returns run name.""" return "experiment" if self._trial is None else f"experiment_{self._trial.number}" @property def hparams(self) -> Dict: """Returns hyperparameters.""" if self._hparams is not None: return self._hparams elif self._trial is not None: return self._trial.params else: return {} @property def stages(self) -> Iterable[str]: """Experiment's stage names (array with one value).""" return [self._stage]
[docs] def get_stage_len(self, stage: str) -> int: """Returns the stage length in epochs for a given stage.""" return self._num_epochs
[docs] def get_trial(self) -> ITrial: """Returns the trial for a run.""" return self._trial
[docs] def get_engine(self) -> IEngine: """Returns the engine for a run.""" return self._engine or get_available_engine()
[docs] def get_loggers(self) -> Dict[str, ILogger]: """Returns the logger for a run.""" loggers = self._loggers or {} is_logger_exists = lambda logger_fn: any( isinstance(x, logger_fn) for x in loggers.values() ) if not is_logger_exists(ConsoleLogger): loggers["_console"] = ConsoleLogger() if self._logdir is not None and not is_logger_exists(CSVLogger): loggers["_csv"] = CSVLogger(logdir=self._logdir, use_logdir_postfix=True) if self._logdir is not None and not is_logger_exists(TensorboardLogger): loggers["_tensorboard"] = TensorboardLogger( logdir=self._logdir, use_logdir_postfix=True ) return loggers
[docs] def get_loaders(self, stage: str) -> "OrderedDict[str, DataLoader]": """Returns the loaders for a given stage.""" self._loaders = _process_loaders(loaders=self._loaders, initial_seed=self.seed) return self._loaders
[docs] def get_model(self, stage: str) -> Model: """Returns the model for a given stage.""" model = ( self._model() if callable(self._model) and not isinstance(self._model, nn.Module) else self._model ) return model
[docs] def get_criterion(self, stage: str) -> Criterion: """Returns the criterion for a given stage.""" return ( self._criterion() if callable(self._criterion) and not isinstance(self._criterion, nn.Module) else self._criterion )
[docs] def get_optimizer(self, stage: str, model: Model) -> Optimizer: """Returns the optimizer for a given stage.""" return ( self._optimizer(model) if callable(self._optimizer) and not isinstance(self._optimizer, optim.Optimizer) else self._optimizer )
[docs] def get_scheduler(self, stage: str, optimizer: Optimizer) -> Scheduler: """Returns the scheduler for a given stage.""" return ( self._scheduler(optimizer) if callable(self._scheduler) and not isinstance( self._scheduler, (optim.lr_scheduler.ReduceLROnPlateau, optim.lr_scheduler._LRScheduler), ) else self._scheduler )
[docs] def get_callbacks(self, stage: str) -> "OrderedDict[str, Callback]": """Returns the callbacks for a given stage.""" callbacks = sort_callbacks_by_order(self._callbacks) is_callback_exists = lambda callback_fn: any( callback_isinstance(x, callback_fn) for x in callbacks.values() ) if self._verbose and not is_callback_exists(TqdmCallback): callbacks["_verbose"] = TqdmCallback() if self._timeit and not is_callback_exists(TimerCallback): callbacks["_timer"] = TimerCallback() if self._check and not is_callback_exists(CheckRunCallback): callbacks["_check"] = CheckRunCallback() if self._overfit and not is_callback_exists(BatchOverfitCallback): callbacks["_overfit"] = BatchOverfitCallback() if self._logdir is not None and not is_callback_exists(ICheckpointCallback): callbacks["_checkpoint"] = CheckpointCallback( logdir=os.path.join(self._logdir, "checkpoints"), loader_key=self._valid_loader, metric_key=self._valid_metric, minimize=self._minimize_valid_metric, ) return callbacks
[docs] def train( self, *, # the data loaders: "OrderedDict[str, DataLoader]", # the core model: Model, engine: Union["IEngine", str] = None, trial: ITrial = None, # the components criterion: Criterion = None, optimizer: Optimizer = None, scheduler: Scheduler = None, # the callbacks callbacks: "Union[List[Callback], OrderedDict[str, Callback]]" = None, # the loggers loggers: "Dict[str, ILogger]" = None, # experiment info seed: int = 42, hparams: Dict[str, Any] = None, # stage info num_epochs: int = 1, # extra info (callbacks info) logdir: str = None, valid_loader: str = None, valid_metric: str = None, minimize_valid_metric: bool = True, verbose: bool = False, timeit: bool = False, check: bool = False, overfit: bool = False, load_best_on_end: bool = False, # engine extra params, fp16: bool = False, amp: bool = False, apex: bool = False, ddp: bool = False, ) -> None: """ Starts the train stage of the model. Args: loaders: dictionary with one or several ``torch.utils.data.DataLoader`` for training, validation or inference model: model to train engine: engine to use for model training trial: trial to use during model training criterion: criterion function for training optimizer: optimizer for training scheduler: scheduler for training callbacks: list or dictionary with Catalyst callbacks loggers: dictionary with Catalyst loggers seed: experiment's initial seed value hparams: hyperparameters for the run num_epochs: number of training epochs logdir: path to output directory valid_loader: loader name used to calculate the metrics and save the checkpoints. For example, you can pass `train` and then the metrics will be taken from `train` loader. valid_metric: the key to the name of the metric by which the checkpoints will be selected. minimize_valid_metric: flag to indicate whether the ``valid_metric`` should be minimized or not (default: True). verbose: if `True`, it displays the status of the training to the console. timeit: if True, computes the execution time of training process and displays it to the console. check: if True, then only checks that pipeline is working (3 epochs only with 3 batches per loader) overfit: if True, then takes only one batch per loader for model overfitting, for advance usage please check ``BatchOverfitCallback`` load_best_on_end: if True, Runner will load best checkpoint state (model, optimizer, etc) according to validation metrics. Requires specified ``logdir``. fp16: boolean flag to use half-precision training (AMP > APEX) amp: boolean flag to use amp half-precision apex: boolean flag to use apex half-precision ddp: if `True` will start training in distributed mode. Note: Works only with python scripts. No jupyter support. .. note:: Please follow the `minimal examples`_ sections for use cases. .. _`minimal examples`: https://github.com/catalyst-team/catalyst#minimal-examples Examples: .. code-block:: python import os from torch import nn, optim from torch.nn import functional as F from torch.utils.data import DataLoader from catalyst import dl, metrics from catalyst.data import ToTensor from catalyst.contrib.datasets import MNIST model = nn.Sequential(nn.Flatten(), nn.Linear(28 * 28, 10)) optimizer = optim.Adam(model.parameters(), lr=0.02) loaders = { "train": DataLoader( MNIST(os.getcwd(), train=True, download=True, transform=ToTensor()), batch_size=32 ), "valid": DataLoader( MNIST(os.getcwd(), train=False, download=True, transform=ToTensor()), batch_size=32 ), } class CustomRunner(dl.Runner): def predict_batch(self, batch): # model inference step return self.model(batch[0].to(self.device)) def on_loader_start(self, runner): super().on_loader_start(runner) self.meters = { key: metrics.AdditiveMetric(compute_on_call=False) for key in ["loss", "accuracy01", "accuracy03"] } def handle_batch(self, batch): # model train/valid step # unpack the batch x, y = batch # run model forward pass logits = self.model(x) # compute the loss loss = F.cross_entropy(logits, y) # compute other metrics of interest accuracy01, accuracy03 = metrics.accuracy(logits, y, topk=(1, 3)) # log metrics self.batch_metrics.update( {"loss": loss, "accuracy01": accuracy01, "accuracy03": accuracy03} ) for key in ["loss", "accuracy01", "accuracy03"]: self.meters[key].update( self.batch_metrics[key].item(), self.batch_size ) # run model backward pass if self.is_train_loader: loss.backward() self.optimizer.step() self.optimizer.zero_grad() def on_loader_end(self, runner): for key in ["loss", "accuracy01", "accuracy03"]: self.loader_metrics[key] = self.meters[key].compute()[0] super().on_loader_end(runner) runner = CustomRunner() # model training runner.train( model=model, optimizer=optimizer, loaders=loaders, logdir="./logs", num_epochs=5, verbose=True, valid_loader="valid", valid_metric="loss", minimize_valid_metric=True, ) # model inference for logits in runner.predict_loader(loader=loaders["valid"]): assert logits.detach().cpu().numpy().shape[-1] == 10 """ # experiment setup self._engine = engine or get_available_engine(fp16=fp16, ddp=ddp, amp=amp, apex=apex) self._trial = trial self._loggers = loggers # the data self._loaders = loaders # the components self._model = model self._criterion = criterion self._optimizer = optimizer self._scheduler = scheduler # the callbacks self._callbacks = callbacks # extra self._stage = "train" self._seed = seed self._hparams = hparams self._num_epochs = num_epochs self._logdir = logdir self._valid_loader = valid_loader self._valid_metric = valid_metric self._minimize_valid_metric = minimize_valid_metric self._verbose = verbose self._timeit = timeit self._check = check self._overfit = overfit self._load_best_on_end = load_best_on_end # run self.run()
[docs] @torch.no_grad() def predict_batch(self, batch: Mapping[str, Any], **kwargs) -> Mapping[str, Any]: """ Run model inference on specified data batch. Args: batch: dictionary with data batches from DataLoader. **kwargs: additional kwargs to pass to the model Returns: Mapping: model output dictionary Raises: NotImplementedError: if not implemented yet """ raise NotImplementedError("Please implement `runner.predict_batch` method") return None # noqa: WPS427
[docs] @torch.no_grad() def predict_loader( self, *, loader: DataLoader, model: Model = None, engine: Union["IEngine", str] = None, seed: int = 42, # engine extra params, fp16: bool = False, amp: bool = False, apex: bool = False, ddp: bool = False, ) -> Generator: """ Runs model inference on PyTorch DataLoader and returns python generator with model predictions from `runner.predict_batch`. Args: loader: loader to predict model: model to use for prediction engine: engine to use for prediction seed: random seed to use before prediction fp16: boolean flag to use half-precision training (AMP > APEX) amp: boolean flag to use amp half-precision apex: boolean flag to use apex half-precision ddp: if `True` will start training in distributed mode. Note: Works only with python scripts. No jupyter support. Yields: bathes with model predictions .. note:: Please follow the `minimal examples`_ sections for use cases. .. _`minimal examples`: https://github.com/catalyst-team/catalyst#minimal-examples Examples: .. code-block:: python import os from torch import nn, optim from torch.nn import functional as F from torch.utils.data import DataLoader from catalyst import dl, metrics from catalyst.data import ToTensor from catalyst.contrib.datasets import MNIST model = nn.Sequential(nn.Flatten(), nn.Linear(28 * 28, 10)) optimizer = optim.Adam(model.parameters(), lr=0.02) loaders = { "train": DataLoader( MNIST(os.getcwd(), train=True, download=True, transform=ToTensor()), batch_size=32 ), "valid": DataLoader( MNIST(os.getcwd(), train=False, download=True, transform=ToTensor()), batch_size=32 ), } class CustomRunner(dl.Runner): def predict_batch(self, batch): # model inference step return self.model(batch[0].to(self.device)) def on_loader_start(self, runner): super().on_loader_start(runner) self.meters = { key: metrics.AdditiveMetric(compute_on_call=False) for key in ["loss", "accuracy01", "accuracy03"] } def handle_batch(self, batch): # model train/valid step # unpack the batch x, y = batch # run model forward pass logits = self.model(x) # compute the loss loss = F.cross_entropy(logits, y) # compute other metrics of interest accuracy01, accuracy03 = metrics.accuracy(logits, y, topk=(1, 3)) # log metrics self.batch_metrics.update( {"loss": loss, "accuracy01": accuracy01, "accuracy03": accuracy03} ) for key in ["loss", "accuracy01", "accuracy03"]: self.meters[key].update( self.batch_metrics[key].item(), self.batch_size ) # run model backward pass if self.is_train_loader: loss.backward() self.optimizer.step() self.optimizer.zero_grad() def on_loader_end(self, runner): for key in ["loss", "accuracy01", "accuracy03"]: self.loader_metrics[key] = self.meters[key].compute()[0] super().on_loader_end(runner) runner = CustomRunner() # model training runner.train( model=model, optimizer=optimizer, loaders=loaders, logdir="./logs", num_epochs=5, verbose=True, valid_loader="valid", valid_metric="loss", minimize_valid_metric=True, ) # model inference for logits in runner.predict_loader(loader=loaders["valid"]): assert logits.detach().cpu().numpy().shape[-1] == 10 """ self.engine = engine or get_available_engine(fp16=fp16, ddp=ddp, amp=amp, apex=apex) if model is not None: self.model = model assert self.model is not None # if resume is not None: # checkpoint = load_checkpoint(resume) # unpack_checkpoint(checkpoint, model=self.model) self.model = self.engine.sync_device(self.model) maybe_recursive_call(self.model, "train", mode=False) set_global_seed(seed) for batch in loader: yield self.predict_batch(batch)
[docs] def evaluate_loader( self, loader: DataLoader, callbacks: "Union[List[Callback], OrderedDict[str, Callback]]" = None, model: Optional[Model] = None, seed: int = 42, verbose: bool = False, ) -> Dict[str, Any]: """ Evaluates data from loader with given model and returns obtained metrics. # noqa: DAR401 Args: loader: loader to predict callbacks: list or dictionary with catalyst callbacks model: model, compatible with current runner. If `None` simply takes current model from runner. seed: random seed to use before prediction verbose: if `True`, it displays the status of the evaluation to the console. Returns: Dict with metrics counted on the loader. """ if isinstance(callbacks, List): for callback in callbacks: if isinstance(callback, CheckpointCallback): raise RunnerException( "CheckpointCallback isn`t allowed for evaluation loader method" ) else: for callback in callbacks.values(): if isinstance(callback, CheckpointCallback): raise RunnerException( "CheckpointCallback isn`t allowed for evaluation loader method" ) if model is None: model = self.model assert model is not None self.train( model=model, loaders=OrderedDict([("valid", loader)]), num_epochs=1, verbose=verbose, callbacks=callbacks, valid_loader="valid", seed=seed, ) return self.loader_metrics
[docs]class SupervisedRunner(ISupervisedRunner, Runner): """Runner for experiments with supervised model. Args: model: Torch model instance engine: IEngine instance input_key: key in ``runner.batch`` dict mapping for model input output_key: key for ``runner.batch`` to store model output target_key: key in ``runner.batch`` dict mapping for target loss_key: key for ``runner.batch_metrics`` to store criterion loss output .. note:: Please follow the `minimal examples`_ sections for use cases. .. _`minimal examples`: https://github.com/catalyst-team/catalyst#minimal-examples Examples: .. code-block:: python import os from torch import nn, optim from torch.utils.data import DataLoader from catalyst import dl, utils from catalyst.data import ToTensor from catalyst.contrib.datasets import MNIST model = nn.Sequential(nn.Flatten(), nn.Linear(28 * 28, 10)) criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=0.02) loaders = { "train": DataLoader( MNIST(os.getcwd(), train=True, download=True, transform=ToTensor()), batch_size=32 ), "valid": DataLoader( MNIST(os.getcwd(), train=False, download=True, transform=ToTensor()), batch_size=32 ), } runner = dl.SupervisedRunner( input_key="features", output_key="logits", target_key="targets", loss_key="loss" ) # model training runner.train( model=model, criterion=criterion, optimizer=optimizer, loaders=loaders, num_epochs=1, callbacks=[ dl.AccuracyCallback(input_key="logits", target_key="targets", topk_args=(1, 3)), dl.PrecisionRecallF1SupportCallback( input_key="logits", target_key="targets", num_classes=10 ), dl.AUCCallback(input_key="logits", target_key="targets"), ], logdir="./logs", valid_loader="valid", valid_metric="loss", minimize_valid_metric=True, verbose=True, load_best_on_end=True, ) # model inference for prediction in runner.predict_loader(loader=loaders["valid"]): assert prediction["logits"].detach().cpu().numpy().shape[-1] == 10 """ def __init__( self, model: RunnerModel = None, engine: IEngine = None, input_key: Any = "features", output_key: Any = "logits", target_key: str = "targets", loss_key: str = "loss", ): """Init.""" ISupervisedRunner.__init__( self, input_key=input_key, output_key=output_key, target_key=target_key, loss_key=loss_key, ) Runner.__init__(self, model=model, engine=engine)
[docs] @torch.no_grad() def predict_batch(self, batch: Mapping[str, Any], **kwargs) -> Mapping[str, Any]: """ Run model inference on specified data batch. .. warning:: You should not override this method. If you need specific model call, override forward() method Args: batch: dictionary with data batch from DataLoader. **kwargs: additional kwargs to pass to the model Returns: Mapping[str, Any]: model output dictionary """ batch = self._process_batch(batch) batch = self.engine.sync_device(tensor_or_module=batch) output = self.forward(batch, **kwargs) return output
[docs] def get_callbacks(self, stage: str) -> "OrderedDict[str, Callback]": """Prepares the callbacks for selected stage. Args: stage: stage name Returns: dictionary with stage callbacks """ callbacks = super().get_callbacks(stage=stage) is_callback_exists = lambda callback_fn: any( callback_isinstance(x, callback_fn) for x in callbacks.values() ) if isinstance(self._criterion, Criterion) and not is_callback_exists(ICriterionCallback): callbacks["_criterion"] = CriterionCallback( input_key=self._output_key, target_key=self._target_key, metric_key=self._loss_key, ) if isinstance(self._optimizer, Optimizer) and not is_callback_exists(IOptimizerCallback): callbacks["_optimizer"] = OptimizerCallback(metric_key=self._loss_key) if isinstance(self._scheduler, (Scheduler, ReduceLROnPlateau)) and not is_callback_exists( ISchedulerCallback ): callbacks["_scheduler"] = SchedulerCallback( loader_key=self._valid_loader, metric_key=self._valid_metric ) return callbacks
__all__ = ["Runner", "SupervisedRunner"]