from typing import Any, Dict, List, Optional
import re
import numpy as np
from mlflow.tracking.fluent import ActiveRun
from catalyst.core.logger import ILogger
from catalyst.settings import SETTINGS
from catalyst.typing import Directory, File, Union
if SETTINGS.mlflow_required:
import mlflow
def _get_or_start_run(run_name: Optional[str]) -> ActiveRun:
"""The function of MLflow. Gets the active run and gives it a name.
If active run does not exist, starts a new one.
Args:
run_name: Name of the run
Returns:
ActiveRun
"""
active_run = mlflow.active_run()
if active_run:
mlflow.set_tag("mlflow.runName", run_name)
return active_run
return mlflow.start_run(run_name=run_name)
def _mlflow_log_params_dict(
dictionary: Dict[str, Any],
prefix: Optional[str] = None,
log_type: Optional[str] = None,
exclude: Optional[List[str]] = None,
):
"""The function of MLflow. Logs any value by its type from dictionary recursively.
Args:
dictionary: Values to log as dictionary.
prefix: Prefix for parameter name (if the parameter is composite).
log_type: The entity of logging (param, metric, artifact, image, etc.).
exclude: Keys in the dictionary to exclude from logging.
Raises:
ValueError: If meets unknown type or log_type for logging in MLflow
(add new case if needed).
"""
for name, value in dictionary.items():
if exclude is not None and name in exclude:
continue
name = re.sub(r"\W", "", name)
name = f"{prefix}/{name}" if prefix else name
if log_type == "dict":
mlflow.log_dict(dictionary, name)
elif isinstance(value, dict):
_mlflow_log_params_dict(value, name, log_type, exclude)
elif log_type == "param":
try:
mlflow.log_param(name, value)
except mlflow.exceptions.MlflowException:
continue
else:
raise ValueError(f"Unknown type of logging value: type({value})={type(value)}")
[docs]class MLflowLogger(ILogger):
"""Mlflow logger for parameters, metrics, images and other artifacts.
Mlflow documentation: https://mlflow.org/docs/latest/index.html.
Args:
experiment: Name of the experiment in MLflow to log to.
run: Name of the run in Mlflow to log to.
tracking_uri: URI of tracking server against which to log run information related.
registry_uri: Address of local or remote model registry server.
exclude: Name of to exclude from logging.
Python API examples:
.. code-block:: python
from catalyst import dl
runner = dl.SupervisedRunner()
runner.train(
...,
loggers={"mlflow": dl.MLflowLogger(experiment="test_exp", run="test_run")}
)
.. code-block:: python
from catalyst import dl
class CustomRunner(dl.IRunner):
# ...
def get_loggers(self):
return {
"console": dl.ConsoleLogger(),
"mlflow": dl.MLflowLogger(experiment="test_exp", run="test_run")
}
# ...
runner = CustomRunner().run()
Config API example:
.. code-block:: yaml
loggers:
mlflow:
_target_: MLflowLogger
experiment: test_exp
run: test_run
...
Hydra API example:
.. code-block:: yaml
loggers:
mlflow:
_target_: catalyst.dl.MLflowLogger
experiment: test_exp
run: test_run
...
"""
def __init__(
self,
experiment: str,
run: Optional[str] = None,
tracking_uri: Optional[str] = None,
registry_uri: Optional[str] = None,
exclude: Optional[List[str]] = None,
) -> None:
self.experiment = experiment
self.run = run
self.tracking_uri = tracking_uri
self.registry_uri = registry_uri
self.exclude = exclude
self._multistage = False
mlflow.set_tracking_uri(self.tracking_uri)
mlflow.set_registry_uri(self.registry_uri)
mlflow.set_experiment(self.experiment)
_get_or_start_run(run_name=self.run)
@staticmethod
def _log_metrics(metrics: Dict[str, float], step: int, loader_key: str, suffix=""):
for key, value in metrics.items():
mlflow.log_metric(f"{key}/{loader_key}{suffix}", value, step=step)
def log_metrics(
self,
metrics: Dict[str, Any],
scope: str = None,
# experiment info
run_key: str = None,
global_epoch_step: int = 0,
global_batch_step: int = 0,
global_sample_step: int = 0,
# stage info
stage_key: str = None,
stage_epoch_len: int = 0,
stage_epoch_step: int = 0,
stage_batch_step: int = 0,
stage_sample_step: int = 0,
# loader info
loader_key: str = None,
loader_batch_len: int = 0,
loader_sample_len: int = 0,
loader_batch_step: int = 0,
loader_sample_step: int = 0,
) -> None:
"""Logs batch and epoch metrics to MLflow."""
if scope == "batch":
metrics = {k: float(v) for k, v in metrics.items()}
self._log_metrics(
metrics=metrics, step=global_batch_step, loader_key=loader_key, suffix="/batch"
)
elif scope == "epoch":
for loader_key, per_loader_metrics in metrics.items():
self._log_metrics(
metrics=per_loader_metrics,
step=global_epoch_step,
loader_key=loader_key,
suffix="/epoch",
)
def log_image(
self,
tag: str,
image: np.ndarray,
scope: str = None,
# experiment info
run_key: str = None,
global_epoch_step: int = 0,
global_batch_step: int = 0,
global_sample_step: int = 0,
# stage info
stage_key: str = None,
stage_epoch_len: int = 0,
stage_epoch_step: int = 0,
stage_batch_step: int = 0,
stage_sample_step: int = 0,
# loader info
loader_key: str = None,
loader_batch_len: int = 0,
loader_sample_len: int = 0,
loader_batch_step: int = 0,
loader_sample_step: int = 0,
) -> None:
"""Logs image to MLflow for current scope on current step."""
mlflow.log_image(image, f"{tag}_scope_{scope}_epoch_{global_epoch_step}.png")
def log_hparams(
self,
hparams: Dict,
scope: str = None,
# experiment info
run_key: str = None,
stage_key: str = None,
) -> None:
"""Logs parameters for current scope.
If there in experiment more than one stage, creates nested runs.
Note:
If the scope is "experiment", it does nothing, since overwriting parameters
in MLflow is prohibited. Thus, first, the parameters of the stage
are recorded, and only then the experiment.
Args:
hparams: Parameters to log.
scope: On which scope log parameters.
run_key: Experiment info.
stage_key: Stage info.
"""
stages = hparams.get("stages", {})
self._multistage = len(stages) > 1
if scope == "experiment":
if not self.run:
mlflow.set_tag("mlflow.runName", run_key)
if scope == "stage":
if self._multistage:
mlflow.start_run(run_name=stage_key, nested=True)
stage_params = hparams.get("stages", {}).get(stage_key, {})
_mlflow_log_params_dict(stage_params, log_type="param", exclude=self.exclude)
experiment_params = {key: value for key, value in hparams.items() if key != "stages"}
_mlflow_log_params_dict(experiment_params, log_type="param", exclude=self.exclude)
def log_artifact(
self,
tag: str,
artifact: Union[Directory, File] = None,
path_to_artifact: str = None,
scope: str = None,
# experiment info
run_key: str = None,
global_epoch_step: int = 0,
global_batch_step: int = 0,
global_sample_step: int = 0,
# stage info
stage_key: str = None,
stage_epoch_len: int = 0,
stage_epoch_step: int = 0,
stage_batch_step: int = 0,
stage_sample_step: int = 0,
# loader info
loader_key: str = None,
loader_batch_len: int = 0,
loader_sample_len: int = 0,
loader_batch_step: int = 0,
loader_sample_step: int = 0,
) -> None:
"""Logs a local file or directory as an artifact to the logger."""
mlflow.log_artifact(path_to_artifact)
def close_log(self, scope: str = None) -> None:
"""End an active MLflow run."""
if scope == "stage" and self._multistage:
mlflow.end_run()
if scope == "experiment":
mlflow.end_run()
__all__ = ["MLflowLogger"]