Source code for catalyst.contrib.dl.callbacks.periodic_loader_callback

from typing import Mapping
from collections import OrderedDict
import copy

from import DataLoader

from catalyst.core.callback import Callback, CallbackOrder
from catalyst.core.runner import IRunner

[docs]class PeriodicLoaderCallback(Callback): """Callback for runing loaders with specified period. To disable loader use ``0`` as period. Example: >>> PeriodicLoaderRunnerCallback( >>> train_additional=2, >>> valid=3, >>> valid_additional=5 >>> ) """
[docs] def __init__(self, **kwargs): """ Args: kwargs: loader names and their run periods. """ super().__init__(order=CallbackOrder.External) self.valid_loader: str = None self.valid_metrics: Mapping[str, float] = None self.loaders: Mapping[str, DataLoader] = OrderedDict() self.loader_periods = {} for loader, period in kwargs.items(): if not isinstance(period, (int, float)): raise TypeError( "Expected loader period type is int/float " f"but got {type(period)}" ) self.loader_periods[loader] = int(period)
[docs] def on_stage_start(self, runner: IRunner) -> None: """Collect information about loaders. Arguments: runner (IRunner): current runner """ # store pointers to data loader objects for name, loader in runner.loaders.items(): self.loaders[name] = loader # stage validation loader self.valid_loader = copy.copy(runner.valid_loader) is_loaders_match = all( loader in runner.loaders for loader in self.loader_periods.keys() ) is_same_loaders_number = len(self.loader_periods) == len( runner.loaders ) if is_same_loaders_number and is_loaders_match: # find potential epoch with zero loaders zero_loaders_epochs = list( filter( lambda n: all( (p == 0 or n % p != 0) for p in self.loader_periods.values() ), range(1, runner.num_epochs + 1), ) ) if len(zero_loaders_epochs) > 0: epoch_with_err = zero_loaders_epochs[0] raise ValueError( f"There will be no loaders in epoch {epoch_with_err}!" )
[docs] def on_epoch_start(self, runner: IRunner) -> None: """Set loaders for current epoch. If validation is not required then the first loader from loaders used in current epoch will be used as validation loader. Metrics from the latest epoch with true validation loader will be used in the epochs where this loader is missing. Arguments: runner (IRunner): current runner """ epoch_num = runner.epoch # loaders to use in current epoch epoch_loaders = OrderedDict() for name, loader in self.loaders.items(): period = self.loader_periods.get(name, 1) # ignore loaders where period - 0 if period > 0 and epoch_num % period == 0: epoch_loaders[name] = loader if len(epoch_loaders) == 0: raise ValueError(f"There is no loaders in epoch {epoch_num}!") first_loader = next(iter(epoch_loaders.keys())) runner.valid_loader = ( self.valid_loader if self.valid_loader in epoch_loaders else first_loader ) runner.loaders = epoch_loaders
[docs] def on_epoch_end(self, runner: IRunner) -> None: """Store validation metrics and use latest validation score when validation loader is not required. Arguments: runner (IRunner): current runner """ if self.valid_loader in runner.loaders: self.valid_metrics = { runner.main_metric: runner.valid_metrics[runner.main_metric] } elif self.valid_metrics is not None: # use previous score on validation runner.valid_metrics = self.valid_metrics
__all__ = ["PeriodicLoaderCallback"]