Shortcuts

Source code for catalyst.loggers.mlflow

from typing import Any, Dict, List, Optional
import re

import numpy as np

from catalyst.core.logger import ILogger
from catalyst.settings import SETTINGS
from catalyst.typing import Directory, File, Union

if SETTINGS.mlflow_required:
    import mlflow


def _get_or_start_run(run_name):
    """The function of MLflow. Gets the active run and gives it a name.
    If active run does not exist, starts a new one.
    """
    active_run = mlflow.active_run()
    if active_run:
        mlflow.set_tag("mlflow.runName", run_name)
        return active_run
    return mlflow.start_run(run_name=run_name)


def _mlflow_log_params_dict(
    dictionary: Dict[str, Any],
    prefix: Optional[str] = None,
    log_type: Optional[str] = None,
    exclude: Optional[List[str]] = None,
):
    """The function of MLflow. Logs any value by its type from dictionary recursively.

    Args:
        dictionary: Values to log as dictionary.
        prefix: Prefix for parameter name (if the parameter is composite).
        log_type: The entity of logging (param, metric, artifact, image, etc.).
        exclude: Keys in the dictionary to exclude from logging.

    Raises:
        ValueError: If meets unknown type or log_type for logging in MLflow
            (add new case if needed).
    """
    for name, value in dictionary.items():
        if exclude is not None and name in exclude:
            continue

        name = re.sub(r"\W", "", name)
        name = f"{prefix}/{name}" if prefix else name

        if log_type == "dict":
            mlflow.log_dict(dictionary, name)
        elif isinstance(value, dict):
            _mlflow_log_params_dict(value, name, log_type, exclude)
        elif log_type == "param":
            try:
                mlflow.log_param(name, value)
            except mlflow.exceptions.MlflowException:
                continue
        else:
            raise ValueError(f"Unknown type of logging value: type({value})={type(value)}")


[docs]class MLflowLogger(ILogger): """Mlflow logger for parameters, metrics, images and other artifacts. Mlflow documentation: https://mlflow.org/docs/latest/index.html. Args: experiment: Name of the experiment in MLflow to log to. run: Name of the run in Mlflow to log to. tracking_uri: URI of tracking server against which to log run information related. registry_uri: Address of local or remote model registry server. exclude: Name of to exclude from logging. Python API examples: .. code-block:: python from catalyst import dl runner = dl.SupervisedRunner() runner.train( ..., loggers={"mlflow": dl.MLflowLogger(experiment="test_exp", run="test_run")} ) .. code-block:: python from catalyst import dl class CustomRunner(dl.IRunner): # ... def get_loggers(self): return { "console": dl.ConsoleLogger(), "mlflow": dl.MLflowLogger(experiment="test_exp", run="test_run") } # ... runner = CustomRunner().run() Config API example: .. code-block:: yaml loggers: mlflow: _target_: MLflowLogger experiment: test_exp run: test_run ... Hydra API example: .. code-block:: yaml loggers: mlflow: _target_: catalyst.dl.MLflowLogger experiment: test_exp run: test_run ... """ def __init__( self, experiment: str, run: Optional[str] = None, tracking_uri: Optional[str] = None, registry_uri: Optional[str] = None, exclude: Optional[List[str]] = None, ) -> None: self.experiment = experiment self.run = run self.tracking_uri = tracking_uri self.registry_uri = registry_uri self.exclude = exclude self._multistage = False mlflow.set_tracking_uri(self.tracking_uri) mlflow.set_registry_uri(self.registry_uri) mlflow.set_experiment(self.experiment) _get_or_start_run(run_name=self.run) @staticmethod def _log_metrics(metrics: Dict[str, float], step: int, loader_key: str, suffix=""): for key, value in metrics.items(): mlflow.log_metric(f"{key}/{loader_key}{suffix}", value, step=step) def log_metrics( self, metrics: Dict[str, Any], scope: str = None, # experiment info run_key: str = None, global_epoch_step: int = 0, global_batch_step: int = 0, global_sample_step: int = 0, # stage info stage_key: str = None, stage_epoch_len: int = 0, stage_epoch_step: int = 0, stage_batch_step: int = 0, stage_sample_step: int = 0, # loader info loader_key: str = None, loader_batch_len: int = 0, loader_sample_len: int = 0, loader_batch_step: int = 0, loader_sample_step: int = 0, ) -> None: """Logs batch and epoch metrics to MLflow.""" if scope == "batch": metrics = {k: float(v) for k, v in metrics.items()} self._log_metrics( metrics=metrics, step=global_batch_step, loader_key=loader_key, suffix="/batch" ) elif scope == "epoch": for loader_key, per_loader_metrics in metrics.items(): self._log_metrics( metrics=per_loader_metrics, step=global_epoch_step, loader_key=loader_key, suffix="/epoch", ) def log_image( self, tag: str, image: np.ndarray, scope: str = None, # experiment info run_key: str = None, global_epoch_step: int = 0, global_batch_step: int = 0, global_sample_step: int = 0, # stage info stage_key: str = None, stage_epoch_len: int = 0, stage_epoch_step: int = 0, stage_batch_step: int = 0, stage_sample_step: int = 0, # loader info loader_key: str = None, loader_batch_len: int = 0, loader_sample_len: int = 0, loader_batch_step: int = 0, loader_sample_step: int = 0, ) -> None: """Logs image to MLflow for current scope on current step.""" mlflow.log_image(image, f"{tag}_scope_{scope}_epoch_{global_epoch_step}.png") def log_hparams( self, hparams: Dict, scope: str = None, # experiment info run_key: str = None, stage_key: str = None, ) -> None: """Logs parameters for current scope. If there in experiment more than one stage, creates nested runs. Note: If the scope is "experiment", it does nothing, since overwriting parameters in MLflow is prohibited. Thus, first, the parameters of the stage are recorded, and only then the experiment. Args: hparams: Parameters to log. scope: On which scope log parameters. run_key: Experiment info. stage_key: Stage info. """ stages = hparams.get("stages", {}) self._multistage = len(stages) > 1 if scope == "experiment": if not self.run: mlflow.set_tag("mlflow.runName", run_key) if scope == "stage": if self._multistage: mlflow.start_run(run_name=stage_key, nested=True) stage_params = hparams.get("stages", {}).get(stage_key, {}) _mlflow_log_params_dict(stage_params, log_type="param", exclude=self.exclude) experiment_params = {key: value for key, value in hparams.items() if key != "stages"} _mlflow_log_params_dict(experiment_params, log_type="param", exclude=self.exclude) def log_artifact( self, tag: str, artifact: Union[Directory, File] = None, path_to_artifact: str = None, scope: str = None, # experiment info run_key: str = None, global_epoch_step: int = 0, global_batch_step: int = 0, global_sample_step: int = 0, # stage info stage_key: str = None, stage_epoch_len: int = 0, stage_epoch_step: int = 0, stage_batch_step: int = 0, stage_sample_step: int = 0, # loader info loader_key: str = None, loader_batch_len: int = 0, loader_sample_len: int = 0, loader_batch_step: int = 0, loader_sample_step: int = 0, ) -> None: """Logs a local file or directory as an artifact to the logger.""" mlflow.log_artifact(path_to_artifact) def close_log(self, scope: str = None) -> None: """End an active MLflow run.""" if scope == "stage" and self._multistage: mlflow.end_run() if scope == "experiment": mlflow.end_run()
__all__ = ["MLflowLogger"]