Source code for catalyst.contrib.callbacks.alchemy_logger
# flake8: noqa
# @TODO: code formatting issue for 20.07 release
from typing import Dict, List, TYPE_CHECKING
from alchemy import Logger
from catalyst import utils
from catalyst.core.callback import (
Callback,
CallbackNode,
CallbackOrder,
CallbackScope,
)
if TYPE_CHECKING:
from catalyst.core.runner import IRunner
[docs]class AlchemyLogger(Callback):
"""Logger callback, translates ``runner.*_metrics`` to Alchemy.
Read about Alchemy here https://alchemy.host
Example:
.. code-block:: python
from catalyst.dl import SupervisedRunner, AlchemyLogger
runner = SupervisedRunner()
runner.train(
model=model,
criterion=criterion,
optimizer=optimizer,
loaders=loaders,
logdir=logdir,
num_epochs=num_epochs,
verbose=True,
callbacks={
"logger": AlchemyLogger(
token="...", # your Alchemy token
project="your_project_name",
experiment="your_experiment_name",
group="your_experiment_group_name",
)
}
)
Powered by Catalyst.Ecosystem.
"""
[docs] def __init__(
self,
metric_names: List[str] = None,
log_on_batch_end: bool = True,
log_on_epoch_end: bool = True,
**logging_params,
):
"""
Args:
metric_names: list of metric names to log,
if none - logs everything
log_on_batch_end: logs per-batch metrics if set True
log_on_epoch_end: logs per-epoch metrics if set True
"""
super().__init__(
order=CallbackOrder.logging,
node=CallbackNode.master,
scope=CallbackScope.experiment,
)
self.metrics_to_log = metric_names
self.log_on_batch_end = log_on_batch_end
self.log_on_epoch_end = log_on_epoch_end
if not (self.log_on_batch_end or self.log_on_epoch_end):
raise ValueError("You have to log something!")
if (self.log_on_batch_end and not self.log_on_epoch_end) or (
not self.log_on_batch_end and self.log_on_epoch_end
):
self.batch_log_suffix = ""
self.epoch_log_suffix = ""
else:
self.batch_log_suffix = "_batch"
self.epoch_log_suffix = "_epoch"
self.logger = Logger(**logging_params)
def __del__(self):
"""@TODO: Docs. Contribution is welcome."""
self.logger.close()
def _log_metrics(
self, metrics: Dict[str, float], step: int, mode: str, suffix=""
):
if self.metrics_to_log is None:
metrics_to_log = sorted(metrics.keys())
else:
metrics_to_log = self.metrics_to_log
for name in metrics_to_log:
if name in metrics:
metric_name = f"{name}/{mode}{suffix}"
metric_value = metrics[name]
self.logger.log_scalar(
name=metric_name, value=metric_value, step=step,
)
[docs] def on_batch_end(self, runner: "IRunner"):
"""Translate batch metrics to Alchemy."""
if self.log_on_batch_end:
mode = runner.loader_name
metrics = runner.batch_metrics
self._log_metrics(
metrics=metrics,
step=runner.global_sample_step,
mode=mode,
suffix=self.batch_log_suffix,
)
[docs] def on_loader_end(self, runner: "IRunner"):
"""Translate loader metrics to Alchemy."""
if self.log_on_epoch_end:
mode = runner.loader_name
metrics = runner.loader_metrics
self._log_metrics(
metrics=metrics,
step=runner.global_epoch,
mode=mode,
suffix=self.epoch_log_suffix,
)
[docs] def on_epoch_end(self, runner: "IRunner"):
"""Translate epoch metrics to Alchemy."""
extra_mode = "_base"
splitted_epoch_metrics = utils.split_dict_to_subdicts(
dct=runner.epoch_metrics,
prefixes=list(runner.loaders.keys()),
extra_key=extra_mode,
)
if self.log_on_epoch_end:
self._log_metrics(
metrics=splitted_epoch_metrics[extra_mode],
step=runner.global_epoch,
mode=extra_mode,
suffix=self.epoch_log_suffix,
)
__all__ = ["AlchemyLogger"]