Shortcuts

Source code for catalyst.loggers.mlflow

from typing import Any, Dict, Optional

import numpy as np

from catalyst.core.logger import ILogger
from catalyst.settings import SETTINGS
from catalyst.typing import Directory, File, Number

if SETTINGS.mlflow_required:
    import mlflow

EXPERIMENT_PARAMS = (
    "shared",
    "args",
    "runner",
    "engine",
    "model",
    "stages",
)
STAGE_PARAMS = ("data", "criterion", "optimizer", "scheduler", "stage")
EXCLUDE_PARAMS = ("loggers", "transform", "callbacks")


def _get_or_start_run(run_name):
    """The function of MLflow. Gets the active run and gives it a name.
    If active run does not exist, starts a new one.
    """
    active_run = mlflow.active_run()
    if active_run:
        mlflow.set_tag("mlflow.runName", run_name)
        return active_run
    return mlflow.start_run(run_name=run_name)


def _mlflow_log_dict(dictionary: Dict[str, Any], prefix: str = "", log_type: Optional[str] = None):
    """The function of MLflow. Logs any value by its type from dictionary recursively.

    Args:
        dictionary: Values to log as dictionary.
        prefix: Prefix for parameter name (if the parameter is composite).
        log_type: The entity of logging (param, metric, artifact, image, etc.).

    Raises:
        ValueError: If meets unknown type or log_type for logging in MLflow
            (add new case if needed).
    """
    for name, value in dictionary.items():
        if name in EXCLUDE_PARAMS:
            continue

        name = name.replace("*", "")
        if prefix not in STAGE_PARAMS and prefix:
            name = f"{prefix}/{name}"

        if log_type == "dict":
            mlflow.log_dict(dictionary, name)
        elif isinstance(value, dict):
            _mlflow_log_dict(value, name, log_type)
        elif log_type == "param":
            try:
                mlflow.log_param(name, value)
            except mlflow.exceptions.MlflowException:
                continue
        elif isinstance(value, (Directory, File)) or log_type == "artifact":
            mlflow.log_artifact(value)
        elif isinstance(value, Number):
            mlflow.log_metric(name, value)
        else:
            raise ValueError(f"Unknown type of logging value: {type(value)}")


[docs]class MLflowLogger(ILogger): """Mlflow logger for parameters, metrics, images and other artifacts. Mlflow documentation: https://mlflow.org/docs/latest/index.html. Args: experiment: Name of the experiment in MLflow to log to. run: Name of the run in Mlflow to log to. tracking_uri: URI of tracking server against which to log run information related. registry_uri: Address of local or remote model registry server. Python API examples: .. code-block:: python from catalyst import dl runner = dl.SupervisedRunner() runner.train( ..., loggers={"mlflow": dl.MLflowLogger(experiment="test_exp", run="test_run")} ) .. code-block:: python from catalyst import dl class CustomRunner(dl.IRunner): # ... def get_loggers(self): return { "console": dl.ConsoleLogger(), "mlflow": dl.MLflowLogger(experiment="test_exp", run="test_run") } # ... runner = CustomRunner().run() Config API example: .. code-block:: yaml loggers: mlflow: _target_: MLflowLogger experiment: test_exp run: test_run ... Hydra API example: .. code-block:: yaml loggers: mlflow: _target_: catalyst.dl.MLflowLogger experiment: test_exp run: test_run ... """ def __init__( self, experiment: str, run: Optional[str] = None, tracking_uri: Optional[str] = None, registry_uri: Optional[str] = None, ) -> None: self.experiment = experiment self.run = run self.tracking_uri = tracking_uri self.registry_uri = registry_uri self._multistage = False mlflow.set_tracking_uri(self.tracking_uri) mlflow.set_registry_uri(self.registry_uri) mlflow.set_experiment(self.experiment) _get_or_start_run(run_name=self.run) @staticmethod def _log_metrics(metrics: Dict[str, float], step: int, loader_key: str, suffix=""): for key, value in metrics.items(): mlflow.log_metric(f"{key}/{loader_key}{suffix}", value, step=step) def log_metrics( self, metrics: Dict[str, Any], scope: str = None, # experiment info run_key: str = None, global_epoch_step: int = 0, global_batch_step: int = 0, global_sample_step: int = 0, # stage info stage_key: str = None, stage_epoch_len: int = 0, stage_epoch_step: int = 0, stage_batch_step: int = 0, stage_sample_step: int = 0, # loader info loader_key: str = None, loader_batch_len: int = 0, loader_sample_len: int = 0, loader_batch_step: int = 0, loader_sample_step: int = 0, ) -> None: """Logs batch and epoch metrics to MLflow.""" if scope == "batch": metrics = {k: float(v) for k, v in metrics.items()} self._log_metrics( metrics=metrics, step=global_batch_step, loader_key=loader_key, suffix="/batch" ) elif scope == "epoch": for loader_key, per_loader_metrics in metrics.items(): self._log_metrics( metrics=per_loader_metrics, step=global_epoch_step, loader_key=loader_key, suffix="/epoch", ) def log_image( self, tag: str, image: np.ndarray, scope: str = None, # experiment info run_key: str = None, global_epoch_step: int = 0, global_batch_step: int = 0, global_sample_step: int = 0, # stage info stage_key: str = None, stage_epoch_len: int = 0, stage_epoch_step: int = 0, stage_batch_step: int = 0, stage_sample_step: int = 0, # loader info loader_key: str = None, loader_batch_len: int = 0, loader_sample_len: int = 0, loader_batch_step: int = 0, loader_sample_step: int = 0, ) -> None: """Logs image to MLflow for current scope on current step.""" mlflow.log_image(image, f"{tag}_scope_{scope}_epoch_{global_epoch_step}.png") def log_hparams( self, hparams: Dict, scope: str = None, # experiment info run_key: str = None, stage_key: str = None, ) -> None: """Logs parameters for current scope. If there in experiment more than one stage, creates nested runs. If the scope is "experiment", it does nothing, since overwriting parameters in MLflow is prohibited. Thus, first, the parameters of the stage are recorded, and only then the experiment. Args: hparams: Parameters to log. scope: On which scope log parameters. run_key: Experiment info. stage_key: Stage info. """ stages = set(hparams.get("stages", {})) - set(STAGE_PARAMS) - set(EXCLUDE_PARAMS) self._multistage = len(stages) > 1 if scope == "experiment": if self._multistage: mlflow.set_tag("mlflow.runName", run_key) if scope == "stage": if self._multistage: mlflow.start_run(run_name=stage_key, nested=True) scope_params = hparams.get("stages", {}).get(run_key, {}) _mlflow_log_dict(scope_params, log_type="param") for key in STAGE_PARAMS: stage_params = hparams.get("stages", {}).get(key, {}) _mlflow_log_dict(stage_params, log_type="param") for key in EXPERIMENT_PARAMS: exp_params = hparams.get(key, {}) _mlflow_log_dict(exp_params, log_type="param") def close_log(self) -> None: """Finds all **running** runs and ends them.""" all_runs = mlflow.search_runs() for _ in all_runs[all_runs.status == "RUNNING"]: mlflow.end_run()
__all__ = ["MLflowLogger"]