Source code for catalyst.contrib.dl.runner.neptune

from pathlib import Path

import neptune

from catalyst.dl import utils
from catalyst.dl.core import Experiment, Runner
from catalyst.dl.experiment import ConfigExperiment
from catalyst.dl.runner import SupervisedRunner


[docs]class NeptuneRunner(Runner): """ Runner wrapper with Neptune integration hooks. Read about Neptune here https://neptune.ai Examples: Initialize runner:: from catalyst.dl import SupervisedNeptuneRunner runner = SupervisedNeptuneRunner() Pass `monitoring_params` and train model:: runner.train( model=model, criterion=criterion, optimizer=optimizer, loaders=loaders, logdir=logdir, num_epochs=num_epochs, verbose=True, monitoring_params={ "init": { "project_qualified_name": "neptune-ai/catalyst", "api_token": os.getenv('NEPTUNE_API_TOKEN'), # api key }, "create_experiment": { "name": "catalyst-example", # experiment name "params": {"epoch_nr":10}, # immutable "properties": {"data_source": "cifar10"} , # mutable "tags": ["resnet", "no-augmentations"], "upload_source_files": ["**/*.py"] # grep-like } }) """ def _init( self, log_on_batch_end: bool = False, log_on_epoch_end: bool = True, ): super()._init() self.log_on_batch_end = log_on_batch_end self.log_on_epoch_end = log_on_epoch_end def _pre_experiment_hook(self, experiment: Experiment): monitoring_params = experiment.monitoring_params monitoring_params["dir"] = str(Path(experiment.logdir).absolute()) neptune.init(**monitoring_params["init"]) self._neptune_experiment = neptune.create_experiment( **monitoring_params["create_experiment"] ) log_on_batch_end: bool = \ monitoring_params.pop("log_on_batch_end", False) log_on_epoch_end: bool = \ monitoring_params.pop("log_on_epoch_end", True) self._init( log_on_batch_end=log_on_batch_end, log_on_epoch_end=log_on_epoch_end, ) self._neptune_experiment.set_property( "log_on_batch_end", self.log_on_batch_end ) self._neptune_experiment.set_property( "log_on_epoch_end", self.log_on_epoch_end ) if isinstance(experiment, ConfigExperiment): exp_config = utils.flatten_dict(experiment.stages_config) for name, value in exp_config.items(): self._neptune_experiment.set_property(name, value) def _post_experiment_hook(self, experiment: Experiment): # @TODO: add params for artefacts logging # logdir_src = Path(experiment.logdir) # self._neptune_experiment.set_property("logdir", logdir_src) # # checkpoints_src = logdir_src.joinpath("checkpoints") # self._neptune_experiment.log_artifact(checkpoints_src) self._neptune_experiment.stop() def _run_batch(self, batch): super()._run_batch(batch=batch) if self.log_on_batch_end: mode = self.state.loader_name metrics = self.state.batch_metrics for name, value in metrics.items(): self._neptune_experiment.log_metric( f"batch_{mode}_{name}", value ) def _run_epoch(self, stage: str, epoch: int): super()._run_epoch(stage=stage, epoch=epoch) if self.log_on_epoch_end: mode_metrics = utils.split_dict_to_subdicts( dct=self.state.epoch_metrics, prefixes=list(self.state.loaders.keys()), extra_key="_base", ) for mode, metrics in mode_metrics.items(): for name, value in metrics.items(): self._neptune_experiment.log_metric( f"epoch_{mode}_{name}", value )
[docs] def run_experiment(self, experiment: Experiment): self._pre_experiment_hook(experiment=experiment) super().run_experiment(experiment=experiment) self._post_experiment_hook(experiment=experiment)
[docs]class SupervisedNeptuneRunner(NeptuneRunner, SupervisedRunner): pass
__all__ = ["NeptuneRunner", "SupervisedNeptuneRunner"]