Shortcuts

Source code for catalyst.contrib.callbacks.wandb_logger

# flake8: noqa
# @TODO: code formatting issue for 20.07 release
from typing import Dict, List, TYPE_CHECKING

import wandb

from catalyst.core.callback import (
    Callback,
    CallbackNode,
    CallbackOrder,
    CallbackScope,
)
from catalyst.utils.dict import split_dict_to_subdicts

if TYPE_CHECKING:
    from catalyst.core.runner import IRunner


[docs]class WandbLogger(Callback): """Logger callback, translates ``runner.*_metrics`` to Weights & Biases. Read about Weights & Biases here https://docs.wandb.com/ Example: .. code-block:: python from catalyst import dl import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, TensorDataset class Projector(nn.Module): def __init__(self, input_size): super().__init__() self.linear = nn.Linear(input_size, 1) def forward(self, X): return self.linear(X).squeeze(-1) X = torch.rand(16, 10) y = torch.rand(X.shape[0]) model = Projector(X.shape[1]) dataset = TensorDataset(X, y) loader = DataLoader(dataset, batch_size=8) runner = dl.SupervisedRunner() runner.train( model=model, loaders={ "train": loader, "valid": loader }, criterion=nn.MSELoss(), optimizer=optim.Adam(model.parameters()), logdir="log_example", callbacks=[ dl.callbacks.WandbLogger( project="wandb_logger_example" ) ], num_epochs=10 ) """
[docs] def __init__( self, metric_names: List[str] = None, log_on_batch_end: bool = False, log_on_epoch_end: bool = True, log: str = None, **logging_params, ): """ Args: metric_names: list of metric names to log, if None - logs everything log_on_batch_end: logs per-batch metrics if set True log_on_epoch_end: logs per-epoch metrics if set True log: wandb.watch parameter. Can be "all", "gradients" or "parameters" **logging_params: any parameters of function `wandb.init` except `reinit` which is automatically set to `True` and `dir` which is set to `<logdir>` """ super().__init__( order=CallbackOrder.logging, node=CallbackNode.master, scope=CallbackScope.experiment, ) self.metrics_to_log = metric_names self.log_on_batch_end = log_on_batch_end self.log_on_epoch_end = log_on_epoch_end self.log = log if not (self.log_on_batch_end or self.log_on_epoch_end): raise ValueError("You have to log something!") if (self.log_on_batch_end and not self.log_on_epoch_end) or ( not self.log_on_batch_end and self.log_on_epoch_end ): self.batch_log_suffix = "" self.epoch_log_suffix = "" else: self.batch_log_suffix = "_batch" self.epoch_log_suffix = "_epoch" self.logging_params = logging_params
def _log_metrics( self, metrics: Dict[str, float], step: int, mode: str, suffix="", commit=True, ): if self.metrics_to_log is None: metrics_to_log = sorted(metrics.keys()) else: metrics_to_log = self.metrics_to_log def key_locate(key: str): """ Wandb uses first symbol _ for it service purposes because of that fact, we can not send original metric names Args: key: metric name Returns: formatted metric name """ if key.startswith("_"): return key[1:] return key metrics = { f"{key_locate(key)}/{mode}{suffix}": value for key, value in metrics.items() if key in metrics_to_log } wandb.log(metrics, step=step, commit=commit)
[docs] def on_stage_start(self, runner: "IRunner"): """Initialize Weights & Biases.""" wandb.init(**self.logging_params, reinit=True, dir=str(runner.logdir)) wandb.watch( models=runner.model, criterion=runner.criterion, log=self.log )
[docs] def on_stage_end(self, runner: "IRunner"): """Finish logging to Weights & Biases.""" wandb.join()
[docs] def on_batch_end(self, runner: "IRunner"): """Translate batch metrics to Weights & Biases.""" if self.log_on_batch_end: mode = runner.loader_name metrics = runner.batch_metrics self._log_metrics( metrics=metrics, step=runner.global_sample_step, mode=mode, suffix=self.batch_log_suffix, commit=True, )
[docs] def on_loader_end(self, runner: "IRunner"): """Translate loader metrics to Weights & Biases.""" if self.log_on_epoch_end: mode = runner.loader_name metrics = runner.loader_metrics self._log_metrics( metrics=metrics, step=runner.global_epoch, mode=mode, suffix=self.epoch_log_suffix, commit=False, )
[docs] def on_epoch_end(self, runner: "IRunner"): """Translate epoch metrics to Weights & Biases.""" extra_mode = "_base" splitted_epoch_metrics = split_dict_to_subdicts( dct=runner.epoch_metrics, prefixes=list(runner.loaders.keys()), extra_key=extra_mode, ) if self.log_on_epoch_end: if extra_mode in splitted_epoch_metrics.keys(): # if we are using OptimizerCallback self._log_metrics( metrics=splitted_epoch_metrics[extra_mode], step=runner.global_epoch, mode=extra_mode, suffix=self.epoch_log_suffix, commit=True, )
__all__ = ["WandbLogger"]