Shortcuts

Source code for catalyst.metrics._mrr

from typing import Any, Dict, List

import torch

from catalyst.metrics._additive import AdditiveMetric
from catalyst.metrics._metric import ICallbackBatchMetric
from catalyst.metrics.functional._mrr import mrr


[docs]class MRRMetric(ICallbackBatchMetric): """ Calculates the Mean Reciprocal Rank (MRR) score given model outputs and targets The precision metric summarizes the fraction of relevant items Computes mean value of map and it's approximate std value Args: topk_args: list of `topk` for mrr@topk computing compute_on_call: if True, computes and returns metric value during metric call prefix: metric prefix suffix: metric suffix Examples: .. code-block:: python import torch from catalyst import metrics outputs = torch.Tensor([ [4.0, 2.0, 3.0, 1.0], [1.0, 2.0, 3.0, 4.0], ]) targets = torch.tensor([ [0, 0, 1.0, 1.0], [0, 0, 1.0, 1.0], ]) metric = metrics.MRRMetric(topk_args=[1, 3]) metric.reset() metric.update(outputs, targets) metric.compute() # ((0.5, 0.75), (0.0, 0.0)) # mean, std for @01, @03 metric.compute_key_value() # { # 'mrr01': 0.5, # 'mrr03': 0.75, # 'mrr': 0.5, # 'mrr01/std': 0.0, # 'mrr03/std': 0.0, # 'mrr/std': 0.0 # } metric.reset() metric(outputs, targets) # ((0.5, 0.75), (0.0, 0.0)) # mean, std for @01, @03 .. code-block:: python import torch from torch.utils.data import DataLoader, TensorDataset from catalyst import dl # sample data num_users, num_features, num_items = int(1e4), int(1e1), 10 X = torch.rand(num_users, num_features) y = (torch.rand(num_users, num_items) > 0.5).to(torch.float32) # pytorch loaders dataset = TensorDataset(X, y) loader = DataLoader(dataset, batch_size=32, num_workers=1) loaders = {"train": loader, "valid": loader} # model, criterion, optimizer, scheduler model = torch.nn.Linear(num_features, num_items) criterion = torch.nn.BCEWithLogitsLoss() optimizer = torch.optim.Adam(model.parameters()) scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [2]) # model training runner = dl.SupervisedRunner( input_key="features", output_key="logits", target_key="targets", loss_key="loss" ) runner.train( model=model, criterion=criterion, optimizer=optimizer, scheduler=scheduler, loaders=loaders, num_epochs=3, verbose=True, callbacks=[ dl.BatchTransformCallback( transform=torch.sigmoid, scope="on_batch_end", input_key="logits", output_key="scores" ), dl.CriterionCallback( input_key="logits", target_key="targets", metric_key="loss" ), dl.AUCCallback(input_key="scores", target_key="targets"), dl.HitrateCallback( input_key="scores", target_key="targets", topk_args=(1, 3, 5) ), dl.MRRCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)), dl.MAPCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)), dl.NDCGCallback(input_key="scores", target_key="targets", topk_args=(1, 3, 5)), dl.OptimizerCallback(metric_key="loss"), dl.SchedulerCallback(), dl.CheckpointCallback( logdir="./logs", loader_key="valid", metric_key="loss", minimize=True ), ] ) .. note:: Please follow the `minimal examples`_ sections for more use cases. .. _`minimal examples`: https://github.com/catalyst-team/catalyst#minimal-examples """ def __init__( self, topk_args: List[int] = None, compute_on_call: bool = True, prefix: str = None, suffix: str = None, ): """Init MRRMetric""" super().__init__(compute_on_call=compute_on_call, prefix=prefix, suffix=suffix) self.metric_name_mean = f"{self.prefix}mrr{self.suffix}" self.metric_name_std = f"{self.prefix}mrr{self.suffix}/std" self.topk_args: List[int] = topk_args or [1] self.additive_metrics: List[AdditiveMetric] = [ AdditiveMetric() for _ in range(len(self.topk_args)) ] def reset(self) -> None: """Reset all fields""" for metric in self.additive_metrics: metric.reset() def update(self, logits: torch.Tensor, targets: torch.Tensor) -> List[float]: """ Update metric value with map for new data and return intermediate metrics values. Args: logits (torch.Tensor): tensor of logits targets (torch.Tensor): tensor of targets Returns: list of map@k values """ values = mrr(logits, targets, topk=self.topk_args) values = [v.item() for v in values] for value, metric in zip(values, self.additive_metrics): metric.update(value, len(targets)) return values def update_key_value(self, logits: torch.Tensor, targets: torch.Tensor) -> Dict[str, float]: """ Update metric value with mrr for new data and return intermediate metrics values in key-value format. Args: logits (torch.Tensor): tensor of logits targets (torch.Tensor): tensor of targets Returns: dict of mrr@k values """ values = self.update(logits=logits, targets=targets) output = { f"{self.prefix}mrr{key:02d}{self.suffix}": value for key, value in zip(self.topk_args, values) } output[self.metric_name_mean] = output[f"{self.prefix}mrr01{self.suffix}"] return output def compute(self) -> Any: """ Compute mrr for all data Returns: list of mean values, list of std values """ means, stds = zip(*(metric.compute() for metric in self.additive_metrics)) return means, stds def compute_key_value(self) -> Dict[str, float]: """ Compute mrr for all data and return results in key-value format Returns: dict of metrics """ means, stds = self.compute() output_mean = { f"{self.prefix}mrr{key:02d}{self.suffix}": value for key, value in zip(self.topk_args, means) } output_std = { f"{self.prefix}mrr{key:02d}{self.suffix}/std": value for key, value in zip(self.topk_args, stds) } output_mean[self.metric_name_mean] = output_mean[f"{self.prefix}mrr01{self.suffix}"] output_std[self.metric_name_std] = output_std[f"{self.prefix}mrr01{self.suffix}/std"] return {**output_mean, **output_std}
__all__ = ["MRRMetric"]