from typing import Any, Dict, List, Optional
from collections import OrderedDict
from copy import deepcopy
import logging
import os
from torch.utils.data import DataLoader, Dataset
from catalyst.callbacks import CheckpointCallback, ICheckpointCallback
from catalyst.callbacks.batch_overfit import BatchOverfitCallback
from catalyst.callbacks.misc import CheckRunCallback, TimerCallback, TqdmCallback
from catalyst.core._misc import callback_isinstance
from catalyst.core.callback import Callback
from catalyst.core.logger import ILogger
from catalyst.core.runner import IRunner
from catalyst.core.trial import ITrial
from catalyst.engines import IEngine
from catalyst.loggers.console import ConsoleLogger
from catalyst.loggers.csv import CSVLogger
from catalyst.loggers.tensorboard import TensorboardLogger
from catalyst.registry import REGISTRY
from catalyst.runners._misc import (
do_lr_linear_scaling,
get_loaders_from_params,
get_model_parameters,
)
from catalyst.runners.self_supervised import ISelfSupervisedRunner
from catalyst.runners.supervised import ISupervisedRunner
from catalyst.typing import (
RunnerCriterion,
RunnerModel,
RunnerOptimizer,
RunnerScheduler,
Sampler,
Scheduler,
)
from catalyst.utils.misc import get_by_keys, get_short_hash, get_utcnow_time
from catalyst.utils.torch import get_available_engine
logger = logging.getLogger(__name__)
[docs]class ConfigRunner(IRunner):
"""Runner created from a dictionary configuration file.
Used for Catalyst Config API.
Args:
config: dictionary with parameters
.. note::
Please follow the `minimal examples`_ sections for use cases.
.. _`minimal examples`: https://github.com/catalyst-team/catalyst#minimal-examples
Examples:
.. code-block:: python
dataset = SomeDataset()
runner = SupervisedConfigRunner(
config={
"args": {"logdir": logdir},
"model": {"_target_": "SomeModel", "in_features": 4, "out_features": 2},
"engine": {"_target_": "DeviceEngine", "device": device},
"stages": {
"stage1": {
"num_epochs": 10,
"criterion": {"_target_": "MSELoss"},
"optimizer": {"_target_": "Adam", "lr": 1e-3},
"loaders": {"batch_size": 4, "num_workers": 0},
"callbacks": {
"criterion": {
"_target_": "CriterionCallback",
"metric_key": "loss",
"input_key": "logits",
"target_key": "targets",
},
"optimizer": {
"_target_": "OptimizerCallback",
"metric_key": "loss"
},
},
},
},
}
)
runner.get_datasets = lambda *args, **kwargs: {
"train": dataset,
"valid": dataset,
}
runner.run()
"""
def __init__(self, config: Dict):
"""Init."""
super().__init__()
self._config: Dict = deepcopy(config)
self._stage_config: Dict = self._config["stages"]
self._apex: bool = get_by_keys(self._config, "args", "apex", default=False)
self._amp: bool = get_by_keys(self._config, "args", "amp", default=False)
self._ddp: bool = get_by_keys(self._config, "args", "ddp", default=False)
self._fp16: bool = get_by_keys(self._config, "args", "fp16", default=False)
self._seed: int = get_by_keys(self._config, "args", "seed", default=42)
self._verbose: bool = get_by_keys(self._config, "args", "verbose", default=False)
self._timeit: bool = get_by_keys(self._config, "args", "timeit", default=False)
self._check: bool = get_by_keys(self._config, "args", "check", default=False)
self._overfit: bool = get_by_keys(self._config, "args", "overfit", default=False)
self._resume: str = get_by_keys(self._config, "args", "resume")
self._name: str = self._get_run_name()
self._logdir: str = self._get_run_logdir()
# @TODO: hack for catalyst-dl tune, could be done better
self._trial = None
def _get_run_name(self) -> str:
timestamp = get_utcnow_time()
config_hash = get_short_hash(self._config)
default_name = f"{timestamp}-{config_hash}"
name = get_by_keys(self._config, "args", "name", default=default_name)
return name
def _get_logdir(self, config: Dict) -> str:
timestamp = get_utcnow_time()
config_hash = get_short_hash(config)
logdir = f"{timestamp}.{config_hash}"
return logdir
def _get_run_logdir(self) -> str:
output = None
exclude_tag = "none"
logdir: str = get_by_keys(self._config, "args", "logdir", default=None)
baselogdir: str = get_by_keys(self._config, "args", "baselogdir", default=None)
if logdir is not None and logdir.lower() != exclude_tag:
output = logdir
elif baselogdir is not None and baselogdir.lower() != exclude_tag:
logdir = self._get_logdir(self._config)
output = f"{baselogdir}/{logdir}"
return output
@property
def logdir(self) -> str:
"""Experiment's logdir for artefacts and logging."""
return self._logdir
@property
def seed(self) -> int:
"""Experiment's seed for reproducibility."""
return self._seed
@property
def name(self) -> str:
"""Returns run name for monitoring tools."""
return self._name
@property
def hparams(self) -> Dict:
"""Returns hyper parameters"""
return OrderedDict(self._config)
@property
def stages(self) -> List[str]:
"""Experiment's stage names."""
stages_keys = list(self._stage_config.keys())
return stages_keys
[docs] def get_stage_len(self, stage: str) -> int:
"""Returns number of epochs for the selected stage.
Args:
stage: current stage
Returns:
number of epochs in stage
Example::
>>> runner.get_stage_len("pretraining")
3
"""
return get_by_keys(self._stage_config, stage, "num_epochs", default=1)
[docs] def get_trial(self) -> ITrial:
"""Returns the trial for the run."""
return self._trial
[docs] def get_engine(self) -> IEngine:
"""Returns the engine for the run."""
engine_params = self._config.get("engine", None)
if engine_params is not None:
engine = REGISTRY.get_from_params(**engine_params)
else:
engine = get_available_engine(
fp16=self._fp16, ddp=self._ddp, amp=self._amp, apex=self._apex
)
return engine
[docs] def get_loggers(self) -> Dict[str, ILogger]:
"""Returns the loggers for the run."""
loggers_params = self._config.get("loggers", {})
loggers = REGISTRY.get_from_params(**loggers_params)
is_logger_exists = lambda logger_fn: any(
isinstance(x, logger_fn) for x in loggers.values()
)
if not is_logger_exists(ConsoleLogger):
loggers["_console"] = ConsoleLogger()
if self._logdir is not None and not is_logger_exists(CSVLogger):
loggers["_csv"] = CSVLogger(logdir=self._logdir, use_logdir_postfix=True)
if self._logdir is not None and not is_logger_exists(TensorboardLogger):
loggers["_tensorboard"] = TensorboardLogger(
logdir=self._logdir, use_logdir_postfix=True
)
return loggers
[docs] def get_datasets(self, stage: str) -> "OrderedDict[str, Dataset]":
"""
Returns datasets for a given stage.
Args:
stage: stage name
Returns:
Dict: datasets objects
"""
datasets_params = self._stage_config[stage]["loaders"]["datasets"]
datasets = REGISTRY.get_from_params(**datasets_params)
return OrderedDict(datasets)
[docs] def get_samplers(self, stage: str) -> "OrderedDict[str, Sampler]":
"""
Returns samplers for a given stage.
Args:
stage: stage name
Returns:
Dict of samplers
"""
samplers_params = get_by_keys(self._stage_config, stage, "loaders", "samplers", default={})
samplers = REGISTRY.get_from_params(**samplers_params)
return OrderedDict(samplers)
def _get_loaders_from_params(self, **params) -> "Optional[OrderedDict[str, DataLoader]]":
"""Creates dataloaders from ``**params`` parameters."""
loaders = dict(REGISTRY.get_from_params(**params))
return loaders if all(isinstance(dl, DataLoader) for dl in loaders.values()) else None
[docs] def get_loaders(self, stage: str) -> "OrderedDict[str, DataLoader]":
"""
Returns loaders for a given stage.
Args:
stage: stage name
Returns:
Dict: loaders objects
"""
loaders_params = deepcopy(self._stage_config[stage]["loaders"])
loaders = self._get_loaders_from_params(**loaders_params)
if loaders is None:
# config is parsed manyally in `get_datasets` and `get_samplers` methods
loaders_params.pop("datasets", None)
loaders_params.pop("samplers", None)
loaders = get_loaders_from_params(
datasets=self.get_datasets(stage=stage),
samplers=self.get_samplers(stage=stage),
initial_seed=self.seed,
**loaders_params,
)
return loaders
@staticmethod
def _get_model_from_params(**params) -> RunnerModel:
params = deepcopy(params)
is_key_value = params.pop("_key_value", False)
if is_key_value:
model = {
model_key: ConfigRunner._get_model_from_params(**model_params)
for model_key, model_params in params.items()
}
else:
model = REGISTRY.get_from_params(**params)
return model
[docs] def get_model(self, stage: str) -> RunnerModel:
"""Returns the model for a given stage."""
assert "model" in self._config, "config must contain 'model' key"
model_params: Dict = self._config["model"]
model: RunnerModel = (
self._get_model_from_params(**model_params) if self.model is None else self.model
)
return model
[docs] def get_criterion(self, stage: str) -> RunnerCriterion:
"""Returns the criterion for a given stage."""
criterion_params = get_by_keys(self._stage_config, stage, "criterion", default={})
criterion = REGISTRY.get_from_params(**criterion_params)
return criterion or None
def _get_optimizer_from_params(
self, model: RunnerModel, stage: str, **params
) -> RunnerOptimizer:
# @TODO 1: refactor; this method is too long
params = deepcopy(params)
# learning rate linear scaling
lr_scaling_params = params.pop("lr_linear_scaling", None)
if lr_scaling_params:
loaders_params = dict(self._stage_config[stage]["loaders"])
lr, lr_scaling = do_lr_linear_scaling(
lr_scaling_params=lr_scaling_params,
batch_size=loaders_params.get("batch_size", 1),
per_gpu_scaling=loaders_params.get("per_gpu_scaling", False),
)
params["lr"] = lr
else:
lr_scaling = 1.0
# getting layer-wise parameters
layerwise_params = params.pop("layerwise_params", OrderedDict())
no_bias_weight_decay = params.pop("no_bias_weight_decay", True)
# getting model parameters
model_key = params.pop("_model", None)
model_params = get_model_parameters(
models=model,
models_keys=model_key,
layerwise_params=layerwise_params,
no_bias_weight_decay=no_bias_weight_decay,
lr_scaling=lr_scaling,
)
# instantiate optimizer
# use `shared_params` to pass model params to the nested optimizers
optimizer = REGISTRY.get_from_params(**params, shared_params={"params": model_params})
return optimizer
[docs] def get_optimizer(self, model: RunnerModel, stage: str) -> RunnerOptimizer:
"""
Returns the optimizer for a given stage and epoch.
Args:
model: model or a dict of models
stage: current stage name
Returns:
optimizer for selected stage and epoch
"""
if "optimizer" not in self._stage_config[stage]:
return None
optimizer_params = get_by_keys(self._stage_config, stage, "optimizer", default={})
optimizer_params = deepcopy(optimizer_params)
is_key_value = optimizer_params.pop("_key_value", False)
if is_key_value:
optimizer = {}
for key, params in optimizer_params.items():
optimizer[key] = self._get_optimizer_from_params(
model=model, stage=stage, **params
)
else:
optimizer = self._get_optimizer_from_params(
model=model, stage=stage, **optimizer_params
)
return optimizer
@staticmethod
def _get_scheduler_from_params(*, optimizer: RunnerOptimizer, **params) -> RunnerScheduler:
params = deepcopy(params)
is_key_value = params.pop("_key_value", False)
if is_key_value:
scheduler: Dict[str, Scheduler] = {}
for key, scheduler_params in params.items():
scheduler_params = deepcopy(scheduler_params)
optimizer_key = scheduler_params.pop("_optimizer", None)
optim = optimizer[optimizer_key] if optimizer_key else optimizer
scheduler[key] = ConfigRunner._get_scheduler_from_params(
**scheduler_params, optimizer=optim
)
else:
optimizer_key = params.pop("_optimizer", None)
optimizer = optimizer[optimizer_key] if optimizer_key else optimizer
scheduler = REGISTRY.get_from_params(**params, optimizer=optimizer)
return scheduler
[docs] def get_scheduler(self, optimizer: RunnerOptimizer, stage: str) -> RunnerScheduler:
"""Returns the scheduler for a given stage."""
if "scheduler" not in self._stage_config[stage]:
return None
scheduler_params = get_by_keys(self._stage_config, stage, "scheduler", default={})
scheduler = self._get_scheduler_from_params(optimizer=optimizer, **scheduler_params)
return scheduler
[docs] def get_callbacks(self, stage: str) -> "OrderedDict[str, Callback]":
"""Returns the callbacks for a given stage."""
callbacks_params = get_by_keys(self._stage_config, stage, "callbacks", default={})
callbacks = OrderedDict(REGISTRY.get_from_params(**callbacks_params))
is_callback_exists = lambda callback_fn: any(
callback_isinstance(x, callback_fn) for x in callbacks.values()
)
if self._verbose and not is_callback_exists(TqdmCallback):
callbacks["_verbose"] = TqdmCallback()
if self._timeit and not is_callback_exists(TimerCallback):
callbacks["_timer"] = TimerCallback()
if self._check and not is_callback_exists(CheckRunCallback):
callbacks["_check"] = CheckRunCallback()
if self._overfit and not is_callback_exists(BatchOverfitCallback):
callbacks["_overfit"] = BatchOverfitCallback()
if self._logdir is not None and not is_callback_exists(ICheckpointCallback):
callbacks["_checkpoint"] = CheckpointCallback(
logdir=os.path.join(self._logdir, "checkpoints"), resume=self._resume
)
return callbacks
[docs]class SelfSupervisedConfigRunner(ISelfSupervisedRunner, ConfigRunner):
"""ConfigRunner for contrastive tasks
Args:
config: dictionary with parameters
input_key: key in ``runner.batch`` dict mapping for model input
target_key: key in ``runner.batch`` dict mapping for target
loss_key: key for ``runner.batch_metrics`` to store criterion loss output
augemention_prefix: key for ``runner.batch`` to sample augumentions
projection_prefix: key for ``runner.batch`` to store model projection
embedding_prefix: key for `runner.batch`` to store model embeddings
.. note::
Please follow the `minimal examples`_ sections for use cases.
.. _`minimal examples`: https://github.com/catalyst-team/catalyst#minimal-examples
Examples:
.. code-block:: python
dataset = SomeDataset()
runner = SupervisedConfigRunner(
config={
"args": {"logdir": logdir},
"model": {"_target_": "SomeContrastiveModel", ...},
"engine": {"_target_": "DeviceEngine", "device": device},
"stages": {
"stage1": {
"num_epochs": 10,
"criterion": {"_target_": "NTXentLoss", "tau": 0.1},
"optimizer": {"_target_": "Adam", "lr": 1e-3},
"loaders": {"batch_size": 4, "num_workers": 0},
"callbacks": {
"criterion": {
"_target_": "CriterionCallback",
"metric_key": "loss",
"input_key": "logits",
"target_key": "targets",
},
"optimizer": {
"_target_": "OptimizerCallback",
"metric_key": "loss"
},
},
},
},
}
)
runner.get_datasets = lambda *args, **kwargs: {
"train": dataset,
"valid": dataset,
}
runner.run()
"""
def __init__(
self,
config: Dict = None,
input_key: str = "features",
target_key: str = "target",
loss_key: str = "loss",
augemention_prefix: str = "augment",
projection_prefix: str = "projection",
embedding_prefix: str = "embedding",
):
"""Init."""
ISelfSupervisedRunner.__init__(
self,
input_key=input_key,
target_key=target_key,
loss_key=loss_key,
augemention_prefix=augemention_prefix,
projection_prefix=projection_prefix,
embedding_prefix=embedding_prefix,
)
ConfigRunner.__init__(self, config=config)
[docs]class SupervisedConfigRunner(ISupervisedRunner, ConfigRunner):
"""ConfigRunner for supervised tasks
Args:
config: dictionary with parameters
input_key: key in ``runner.batch`` dict mapping for model input
output_key: key for ``runner.batch`` to store model output
target_key: key in ``runner.batch`` dict mapping for target
loss_key: key for ``runner.batch_metrics`` to store criterion loss output
.. note::
Please follow the `minimal examples`_ sections for use cases.
.. _`minimal examples`: https://github.com/catalyst-team/catalyst#minimal-examples
Examples:
.. code-block:: python
dataset = SomeDataset()
runner = SupervisedConfigRunner(
config={
"args": {"logdir": logdir},
"model": {"_target_": "SomeModel", "in_features": 4, "out_features": 2},
"engine": {"_target_": "DeviceEngine", "device": device},
"stages": {
"stage1": {
"num_epochs": 10,
"criterion": {"_target_": "MSELoss"},
"optimizer": {"_target_": "Adam", "lr": 1e-3},
"loaders": {
"batch_size": 4,
"num_workers": 0,
"datasets": {
"train": {
"_target_": "SelfSupervisedDatasetWrapper",
"dataset": dataset
},
"transforms": ...,
"transform_original": ...,
},
},
"callbacks": {
"criterion": {
"_target_": "CriterionCallback",
"metric_key": "loss",
"input_key": "logits",
"target_key": "targets",
},
"optimizer": {
"_target_": "OptimizerCallback",
"metric_key": "loss"
},
},
},
},
}
)
runner.run()
"""
def __init__(
self,
config: Dict = None,
input_key: Any = "features",
output_key: Any = "logits",
target_key: str = "targets",
loss_key: str = "loss",
):
"""Init."""
ISupervisedRunner.__init__(
self,
input_key=input_key,
output_key=output_key,
target_key=target_key,
loss_key=loss_key,
)
ConfigRunner.__init__(self, config=config)
__all__ = ["ConfigRunner", "SupervisedConfigRunner", "SelfSupervisedConfigRunner"]