Source code for catalyst.loggers.mlflow

from typing import Any, Dict, List, Optional, TYPE_CHECKING
import re

import numpy as np

from catalyst.core.logger import ILogger
from catalyst.settings import SETTINGS

if SETTINGS.mlflow_required:
    import mlflow
    from mlflow.tracking.fluent import ActiveRun
    from catalyst.core.runner import IRunner

def _get_or_start_run(run_name: Optional[str]) -> "ActiveRun":
    """The function of MLflow. Gets the active run and gives it a name.
    If active run does not exist, starts a new one.

        run_name: Name of the run

    active_run = mlflow.active_run()
    if active_run:
        mlflow.set_tag("mlflow.runName", run_name)
        return active_run
    return mlflow.start_run(run_name=run_name)

def _mlflow_log_params_dict(
    dictionary: Dict[str, Any],
    prefix: Optional[str] = None,
    log_type: Optional[str] = None,
    exclude: Optional[List[str]] = None,
    """The function of MLflow. Logs any value by its type from dictionary recursively.

        dictionary: Values to log as dictionary.
        prefix: Prefix for parameter name (if the parameter is composite).
        log_type: The entity of logging (param, metric, artifact, image, etc.).
        exclude: Keys in the dictionary to exclude from logging.

        ValueError: If meets unknown type or log_type for logging in MLflow
            (add new case if needed).
    for name, value in dictionary.items():
        if exclude is not None and name in exclude:

        name = re.sub(r"\W", "", name)
        name = f"{prefix}/{name}" if prefix else name

        if log_type == "dict":
            mlflow.log_dict(dictionary, name)
        elif isinstance(value, dict):
            _mlflow_log_params_dict(value, name, log_type, exclude)
        elif log_type == "param":
                mlflow.log_param(name, value)
            except mlflow.exceptions.MlflowException:
            raise ValueError(
                f"Unknown type of logging value: type({value})={type(value)}"

[docs]class MLflowLogger(ILogger): """Mlflow logger for parameters, metrics, images and other artifacts. Mlflow documentation: Args: experiment: Name of the experiment in MLflow to log to. run: Name of the run in Mlflow to log to. tracking_uri: URI of tracking server against which to log run information related. registry_uri: Address of local or remote model registry server. exclude: Name of to exclude from logging. log_batch_metrics: boolean flag to log batch metrics (default: SETTINGS.log_batch_metrics or False). log_epoch_metrics: boolean flag to log epoch metrics (default: SETTINGS.log_epoch_metrics or True). Python API examples: .. code-block:: python from catalyst import dl runner = dl.SupervisedRunner() runner.train( ..., loggers={"mlflow": dl.MLflowLogger(experiment="test_exp", run="test_run")} ) .. code-block:: python from catalyst import dl class CustomRunner(dl.IRunner): # ... def get_loggers(self): return { "console": dl.ConsoleLogger(), "mlflow": dl.MLflowLogger(experiment="test_exp", run="test_run") } # ... runner = CustomRunner().run() """ def __init__( self, experiment: str, run: Optional[str] = None, tracking_uri: Optional[str] = None, registry_uri: Optional[str] = None, exclude: Optional[List[str]] = None, log_batch_metrics: bool = SETTINGS.log_batch_metrics, log_epoch_metrics: bool = SETTINGS.log_epoch_metrics, ) -> None: super().__init__( log_batch_metrics=log_batch_metrics, log_epoch_metrics=log_epoch_metrics ) self.experiment = experiment = run self.tracking_uri = tracking_uri self.registry_uri = registry_uri self.exclude = exclude mlflow.set_tracking_uri(self.tracking_uri) mlflow.set_registry_uri(self.registry_uri) mlflow.set_experiment(self.experiment) _get_or_start_run( @property def logger(self): """Internal logger/experiment/etc. from the monitoring system.""" return mlflow @staticmethod def _log_metrics(metrics: Dict[str, float], step: int, loader_key: str, suffix=""): for key, value in metrics.items(): mlflow.log_metric(f"{key}/{loader_key}{suffix}", value, step=step) def log_artifact( self, tag: str, runner: "IRunner", artifact: object = None, path_to_artifact: str = None, scope: str = None, ) -> None: """Logs a local file or directory as an artifact to the logger.""" mlflow.log_artifact(path_to_artifact) def log_image( self, tag: str, image: np.ndarray, runner: "IRunner", scope: str = None, ) -> None: """Logs image to MLflow for current scope on current step.""" if scope == "batch" or scope == "loader": log_path = "_".join( [tag, f"epoch-{runner.epoch_step:04d}", f"loader-{runner.loader_key}"] ) elif scope == "epoch": log_path = "_".join([tag, f"epoch-{runner.epoch_step:04d}"]) elif scope == "experiment" or scope is None: log_path = tag mlflow.log_image(image, f"{log_path}.png") def log_hparams(self, hparams: Dict, runner: "IRunner" = None) -> None: """Logs parameters for current scope. Args: hparams: Parameters to log. runner: experiment runner """ _mlflow_log_params_dict(hparams, log_type="param", exclude=self.exclude) def log_metrics( self, metrics: Dict[str, float], scope: str, runner: "IRunner", ) -> None: """Logs batch and epoch metrics to MLflow.""" if scope == "batch" and self.log_batch_metrics: metrics = {k: float(v) for k, v in metrics.items()} self._log_metrics( metrics=metrics, step=runner.batch_step, loader_key=runner.loader_key, suffix="/batch", ) elif scope == "epoch" and self.log_epoch_metrics: for loader_key, per_loader_metrics in metrics.items(): self._log_metrics( metrics=per_loader_metrics, step=runner.epoch_step, loader_key=loader_key, suffix="/epoch", ) def close_log(self) -> None: """End an active MLflow run.""" mlflow.end_run()
__all__ = ["MLflowLogger"]