Source code for pythia.modules.metrics

# Copyright (c) Facebook, Inc. and its affiliates.
"""
The metrics module contains implementations of various metrics used commonly to
understand how well our models are performing. For e.g. accuracy, vqa_accuracy,
r@1 etc.

For implementing your own metric, you need to follow these steps:

1. Create your own metric class and inherit ``BaseMetric`` class.
2. In the ``__init__`` function of your class, make sure to call
   ``super().__init__('name')`` where 'name' is the name of your metric. If
   you require any parameters in your ``__init__`` function, you can use
   keyword arguments to represent them and metric constructor will take care of
   providing them to your class from config.
3. Implement a ``calculate`` function which takes in ``SampleList`` and
   `model_output` as input and return back a float tensor/number.
4. Register your metric with a key 'name' by using decorator,
   ``@registry.register_metric('name')``.

Example::

    import torch

    from pythia.common.registry import registry
    from pythia.modules.metrics import BaseMetric

    @registry.register_metric("some")
    class SomeMetric(BaseMetric):
        def __init__(self, some_param=None):
            super().__init__("some")
            ....

        def calculate(self, sample_list, model_output):
            metric = torch.tensor(2, dtype=torch.float)
            return metric

Example config for above metric::

    model_attributes:
        pythia:
            metrics:
            - type: some
              params:
                some_param: a
"""

import collections

import torch

from pythia.common.registry import registry


[docs]class Metrics: """Internally used by Pythia, Metrics acts as wrapper for handling calculation of metrics over various metrics specified by the model in the config. It initializes all of the metrics and when called it runs calculate on each of them one by one and returns back a dict with proper naming back. For e.g. an example dict returned by Metrics class: ``{'val/vqa_accuracy': 0.3, 'val/r@1': 0.8}`` Args: metric_list (List[ConfigNode]): List of ConfigNodes where each ConfigNode specifies name and parameters of the metrics used. """ def __init__(self, metric_list): if not isinstance(metric_list, list): metric_list = [metric_list] self.writer = registry.get("writer") self.metrics = self._init_metrics(metric_list) def _init_metrics(self, metric_list): metrics = {} for metric in metric_list: params = {} if isinstance(metric, collections.abc.Mapping): if not hasattr(metric, "type"): raise ValueError( "Metric {} needs to have 'type' attribute".format(metric) ) metric = metric.type params = getattr(metric, "params", {}) else: if not isinstance(metric, str): raise TypeError( "Metric {} has inappropriate type" "'dict' or 'str' allowed".format(metric) ) metric_cls = registry.get_metric_class(metric) if metric_cls is None: raise ValueError( "No metric named {} registered to registry".format(metric) ) metrics[metric] = metric_cls(**params) return metrics def __call__(self, sample_list, model_output, *args, **kwargs): values = {} if not hasattr(sample_list, "targets"): return values dataset_type = sample_list.dataset_type with torch.no_grad(): for metric_name, metric_object in self.metrics.items(): key = "{}/{}".format(dataset_type, metric_name) values[key] = metric_object._calculate_with_checks( sample_list, model_output, *args, **kwargs ) if not isinstance(values[key], torch.Tensor): values[key] = torch.tensor(values[key], dtype=torch.float) else: values[key] = values[key].float() if values[key].dim() == 0: values[key] = values[key].view(1) registry.register( "{}.{}.{}".format("metrics", sample_list.dataset_name, dataset_type), values ) return values
[docs]class BaseMetric: """Base class to be inherited by all metrics registered to Pythia. See the description on top of the file for more information. Child class must implement ``calculate`` function. Args: name (str): Name of the metric. """ def __init__(self, name, *args, **kwargs): self.name = name
[docs] def calculate(self, sample_list, model_output, *args, **kwargs): """Abstract method to be implemented by the child class. Takes in a ``SampleList`` and a dict returned by model as output and returns back a float tensor/number indicating value for this metric. Args: sample_list (SampleList): SampleList provided by the dataloader for the current iteration. model_output (Dict): Output dict from the model for the current SampleList Returns: torch.Tensor|float: Value of the metric. """ # Override in your child class raise NotImplementedError( "'calculate' must be implemented in the child class" )
def __call__(self, *args, **kwargs): return self.calculate(*args, **kwargs) def _calculate_with_checks(self, *args, **kwargs): value = self.calculate(*args, **kwargs) return value
[docs]@registry.register_metric("accuracy") class Accuracy(BaseMetric): """Metric for calculating accuracy. **Key:** ``accuracy`` """ def __init__(self): super().__init__("accuracy")
[docs] def calculate(self, sample_list, model_output, *args, **kwargs): """Calculate accuracy and return it back. Args: sample_list (SampleList): SampleList provided by DataLoader for current iteration model_output (Dict): Dict returned by model. Returns: torch.FloatTensor: accuracy. """ output = model_output["scores"] expected = sample_list["targets"] assert output.dim() <= 2, ("Output from model shouldn't have " " more than dim 2 for accuracy") assert expected.dim() <= 2, ("Expected target shouldn't have " "more than dim 2 for accuracy") if output.dim() == 2: output = torch.max(output, 1)[1] # If more than 1 if expected.dim() == 2: expected = torch.max(expected, 1)[1] correct = (expected == output.squeeze()).sum().float() total = len(expected) value = correct / total return value
[docs]@registry.register_metric("caption_bleu4") class CaptionBleu4Metric(BaseMetric): """Metric for calculating caption accuracy using BLEU4 Score. **Key:** ``caption_bleu4`` """ import nltk.translate.bleu_score as bleu_score def __init__(self): super().__init__("caption_bleu4") self.caption_processor = registry.get("coco_caption_processor")
[docs] def calculate(self, sample_list, model_output, *args, **kwargs): """Calculate accuracy and return it back. Args: sample_list (SampleList): SampleList provided by DataLoader for current iteration model_output (Dict): Dict returned by model. Returns: torch.FloatTensor: bleu4 score. """ # Create reference and hypotheses captions. references = [] hypotheses = [] # References targets = sample_list.answers for j, p in enumerate(targets): img_captions = [ self.caption_processor(c)["tokens"] for c in targets[j].tolist() ] references.append(img_captions) # Hypotheses scores = torch.max(model_output["scores"], dim=-1)[1] scores = scores.tolist() predictions = [] for j, p in enumerate(scores): caption = self.caption_processor(scores[j])["tokens"] predictions.append(caption) hypotheses.extend(predictions) assert len(references) == len(hypotheses) bleu4 = self.bleu_score.corpus_bleu(references, hypotheses) return targets.new_tensor(bleu4, dtype=torch.float)
[docs]@registry.register_metric("vqa_accuracy") class VQAAccuracy(BaseMetric): """ Calculate VQAAccuracy. Find more information here_ **Key**: ``vqa_accuracy``. .. _here: https://visualqa.org/evaluation.html """ def __init__(self): super().__init__("vqa_accuracy") def _masked_unk_softmax(self, x, dim, mask_idx): x1 = torch.nn.functional.softmax(x, dim=dim) x1[:, mask_idx] = 0 x1_sum = torch.sum(x1, dim=1, keepdim=True) y = x1 / x1_sum return y
[docs] def calculate(self, sample_list, model_output, *args, **kwargs): """Calculate vqa accuracy and return it back. Args: sample_list (SampleList): SampleList provided by DataLoader for current iteration model_output (Dict): Dict returned by model. Returns: torch.FloatTensor: VQA Accuracy """ output = model_output["scores"] expected = sample_list["targets"] output = self._masked_unk_softmax(output, 1, 0) output = output.argmax(dim=1) # argmax one_hots = expected.new_zeros(*expected.size()) one_hots.scatter_(1, output.view(-1, 1), 1) scores = one_hots * expected accuracy = torch.sum(scores) / expected.size(0) return accuracy
[docs]class RecallAtK(BaseMetric): def __init__(self, name="recall@k"): super().__init__(name) def score_to_ranks(self, scores): # sort in descending order - largest score gets highest rank sorted_ranks, ranked_idx = scores.sort(1, descending=True) # convert from ranked_idx to ranks ranks = ranked_idx.clone().fill_(0) for i in range(ranked_idx.size(0)): for j in range(100): ranks[i][ranked_idx[i][j]] = j ranks += 1 return ranks def get_gt_ranks(self, ranks, ans_ind): _, ans_ind = ans_ind.max(dim=1) ans_ind = ans_ind.view(-1) gt_ranks = torch.LongTensor(ans_ind.size(0)) for i in range(ans_ind.size(0)): gt_ranks[i] = int(ranks[i, ans_ind[i].long()]) return gt_ranks def get_ranks(self, sample_list, model_output, *args, **kwargs): output = model_output["scores"] expected = sample_list["targets"] ranks = self.score_to_ranks(output) gt_ranks = self.get_gt_ranks(ranks, expected) ranks = self.process_ranks(gt_ranks) return ranks.float()
[docs] def calculate(self, sample_list, model_output, k, *args, **kwargs): ranks = self.get_ranks(sample_list, model_output) recall = float(torch.sum(torch.le(ranks, k))) / ranks.size(0) return recall
[docs]@registry.register_metric("r@1") class RecallAt1(RecallAtK): """ Calculate Recall@1 which specifies how many time the chosen candidate was rank 1. **Key**: ``r@1``. """ def __init__(self): super().__init__("r@1")
[docs] def calculate(self, sample_list, model_output, *args, **kwargs): """Calculate Recall@1 and return it back. Args: sample_list (SampleList): SampleList provided by DataLoader for current iteration model_output (Dict): Dict returned by model. Returns: torch.FloatTensor: Recall@1 """ return self.calculate(sample_list, model_output, k=1)
[docs]@registry.register_metric("r@5") class RecallAt5(RecallAtK): """ Calculate Recall@5 which specifies how many time the chosen candidate was among first 5 rank. **Key**: ``r@5``. """ def __init__(self): super().__init__("r@5")
[docs] def calculate(self, sample_list, model_output, *args, **kwargs): """Calculate Recall@5 and return it back. Args: sample_list (SampleList): SampleList provided by DataLoader for current iteration model_output (Dict): Dict returned by model. Returns: torch.FloatTensor: Recall@5 """ return self.calculate(sample_list, model_output, k=5)
[docs]@registry.register_metric("r@10") class RecallAt10(RecallAtK): """ Calculate Recall@10 which specifies how many time the chosen candidate was among first 10 ranks. **Key**: ``r@10``. """ def __init__(self): super().__init__("r@10")
[docs] def calculate(self, sample_list, model_output, *args, **kwargs): """Calculate Recall@10 and return it back. Args: sample_list (SampleList): SampleList provided by DataLoader for current iteration model_output (Dict): Dict returned by model. Returns: torch.FloatTensor: Recall@10 """ return self.calculate(sample_list, model_output, k=10)
[docs]@registry.register_metric("mean_r") class MeanRank(RecallAtK): """ Calculate MeanRank which specifies what was the average rank of the chosen candidate. **Key**: ``mean_r``. """ def __init__(self): super().__init__("mean_r")
[docs] def calculate(self, sample_list, model_output, *args, **kwargs): """Calculate Mean Rank and return it back. Args: sample_list (SampleList): SampleList provided by DataLoader for current iteration model_output (Dict): Dict returned by model. Returns: torch.FloatTensor: mean rank """ ranks = self.get_ranks(sample_list, model_output) return torch.mean(ranks)
[docs]@registry.register_metric("mean_rr") class MeanReciprocalRank(RecallAtK): """ Calculate reciprocal of mean rank.. **Key**: ``mean_rr``. """ def __init__(self): super().__init__("mean_rr")
[docs] def calculate(self, sample_list, model_output, *args, **kwargs): """Calculate Mean Reciprocal Rank and return it back. Args: sample_list (SampleList): SampleList provided by DataLoader for current iteration model_output (Dict): Dict returned by model. Returns: torch.FloatTensor: Mean Reciprocal Rank """ ranks = self.get_ranks(sample_list, model_output) return torch.mean(ranks.reciprocal())