Source code for catalyst.utils.pandas

from typing import Dict, List, Optional, Tuple, Union  # isort:skip
from collections import defaultdict

import numpy as np
import pandas as pd
from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.utils import shuffle
from tqdm.auto import tqdm

from catalyst.utils import args_are_not_none

tqdm.pandas()


[docs]def dataframe_to_list(dataframe: pd.DataFrame) -> List[dict]: """ Converts dataframe to a list of rows (without indexes) Args: dataframe (DataFrame): input dataframe Returns: (List[dict]): list of rows """ result = list(dataframe.to_dict(orient="index").values()) return result
[docs]def folds_to_list(folds: Union[list, str, pd.Series]) -> List[int]: """ This function formats string or either list of numbers into a list of unique int Args: folds (Union[list, str, pd.Series]): Either list of numbers or one string with numbers separated by commas or pandas series Returns: List[int]: list of unique ints Examples: >>> folds_to_list("1,2,1,3,4,2,4,6") [1, 2, 3, 4, 6] >>> folds_to_list([1, 2, 3.0, 5]) [1, 2, 3, 5] Raises: ValueError: if value in string or array cannot be casted to int """ if isinstance(folds, str): folds = folds.split(",") elif isinstance(folds, pd.Series): folds = list(sorted(folds.unique())) return list(sorted(list({int(x) for x in folds})))
[docs]def split_dataframe_train_test( dataframe: pd.DataFrame, **train_test_split_args ) -> Tuple[pd.DataFrame, pd.DataFrame]: """ Split dataframe in train and test part. Args: dataframe: pd.DataFrame to split **train_test_split_args: test_size : float, int, or None (default is None) If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the test split. If int, represents the absolute number of test samples. If None, the value is automatically set to the complement of the train size. If train size is also None, test size is set to 0.25. train_size : float, int, or None (default is None) If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the train split. If int, represents the absolute number of train samples. If None, the value is automatically set to the complement of the test size. random_state : int or RandomState Pseudo-random number generator state used for random sampling. stratify : array-like or None (default is None) If not None, data is split in a stratified fashion, using this as the class labels. Returns: train and test DataFrames PS. It exist cause sklearn `split` is overcomplicated. """ df_train, df_test = train_test_split(dataframe, **train_test_split_args) return df_train, df_test
[docs]def split_dataframe_on_folds( dataframe: pd.DataFrame, random_state: int = 42, n_folds: int = 5 ) -> pd.DataFrame: """ Splits DataFrame into `N` folds. Args: dataframe: a dataset random_state: seed for random shuffle n_folds: number of result folds Returns: pd.DataFrame: new dataframe with `fold` column """ dataframe = shuffle(dataframe, random_state=random_state) df_tmp = [] for i, df_el in enumerate(np.array_split(dataframe, n_folds)): df_el["fold"] = i df_tmp.append(df_el) dataframe = pd.concat(df_tmp) return dataframe
[docs]def split_dataframe_on_stratified_folds( dataframe: pd.DataFrame, class_column: str, random_state: int = 42, n_folds: int = 5 ) -> pd.DataFrame: """ Splits DataFrame into `N` stratified folds. Also see :class:`catalyst.data.sampler.BalanceClassSampler` Args: dataframe: a dataset class_column: which column to use for split random_state: seed for random shuffle n_folds: number of result folds Returns: pd.DataFrame: new dataframe with `fold` column """ skf = StratifiedKFold( n_splits=n_folds, shuffle=True, random_state=random_state ) fold_column = np.zeros(len(dataframe), dtype=int) for i, (_, test_index) in enumerate( skf.split(range(len(dataframe)), dataframe[class_column]) ): fold_column[test_index] = i dataframe["fold"] = fold_column return dataframe
[docs]def split_dataframe_on_column_folds( dataframe: pd.DataFrame, column: str, random_state: int = 42, n_folds: int = 5 ) -> pd.DataFrame: """ Splits DataFrame into `N` folds. Args: dataframe: a dataset column: which column to use random_state: seed for random shuffle n_folds: number of result folds Returns: pd.DataFrame: new dataframe with `fold` column """ df_tmp = [] labels = shuffle( sorted(dataframe[column].unique()), random_state=random_state ) for i, fold_labels in enumerate(np.array_split(labels, n_folds)): df_label = dataframe[dataframe[column].isin(fold_labels)] df_label["fold"] = i df_tmp.append(df_label) dataframe = pd.concat(df_tmp) return dataframe
[docs]def map_dataframe( dataframe: pd.DataFrame, tag_column: str, class_column: str, tag2class: Dict[str, int], verbose: bool = False ) -> pd.DataFrame: """ This function maps tags from ``tag_column`` to ints into ``class_column`` Using ``tag2class`` dictionary Args: dataframe (pd.DataFrame): input dataframe tag_column (str): column with tags class_column (str) output column with classes tag2class (Dict[str, int]): mapping from tags to class labels verbose: flag if true, uses tqdm Returns: pd.DataFrame: updated dataframe with ``class_column`` """ dataframe: pd.DataFrame = dataframe.copy() def map_label(x): return tag2class[str(x)] if verbose: series: pd.Series = dataframe[tag_column].progress_apply(map_label) else: series: pd.Series = dataframe[tag_column].apply(map_label) dataframe.loc[series.index, class_column] = series return dataframe
[docs]def separate_tags( dataframe: pd.DataFrame, tag_column: str = "tag", tag_delim: str = "," ) -> pd.DataFrame: """ Separates values in ``class_column`` column Args: dataframe: a dataset tag_column: column name to separate values tag_delim: delimiter to separate values Returns: pd.DataFrame: new dataframe """ df_new = [] for _, row in dataframe.iterrows(): for class_name in row[tag_column].split(tag_delim): df_new.append({**row, **{tag_column: class_name}}) df_new = pd.DataFrame(df_new) return df_new
[docs]def get_dataset_labeling( dataframe: pd.DataFrame, tag_column: str ) -> Dict[str, int]: """ Prepares a mapping using unique values from ``tag_column`` .. code-block:: javascript { "class_name_0": 0, "class_name_1": 1, ... "class_name_N": N } Args: dataframe: a dataset tag_column: which column to use Returns: Dict[str, int]: mapping from tag to labels """ tag_to_labels = { str(class_name): label for label, class_name in enumerate(sorted(dataframe[tag_column].unique())) } return tag_to_labels
[docs]def split_dataframe( dataframe: pd.DataFrame, train_folds: List[int], valid_folds: Optional[List[int]] = None, infer_folds: Optional[List[int]] = None, tag2class: Optional[Dict[str, int]] = None, tag_column: str = None, class_column: str = None, seed: int = 42, n_folds: int = 5 ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: """ Split a Pandas DataFrame into folds. Args: dataframe (pd.DataFrame): input dataframe train_folds (List[int]): train folds valid_folds (List[int], optional): valid folds. If none takes all folds not included in ``train_folds`` infer_folds (List[int], optional): infer folds. If none takes all folds not included in ``train_folds`` and ``valid_folds`` tag2class (Dict[str, int], optional): mapping from label names into int tag_column (str, optional): column with label names class_column (str, optional): column to use for split seed (int): seed for split n_folds (int): number of folds Returns: (tuple): tuple with 4 dataframes whole dataframe, train part, valid part and infer part """ if args_are_not_none(tag2class, tag_column, class_column): dataframe = map_dataframe( dataframe, tag_column, class_column, tag2class ) if class_column is not None: result_dataframe = split_dataframe_on_stratified_folds( dataframe, class_column=class_column, random_state=seed, n_folds=n_folds ) else: result_dataframe = split_dataframe_on_folds( dataframe, random_state=seed, n_folds=n_folds ) fold_series = result_dataframe["fold"] train_folds = folds_to_list(train_folds) df_train = result_dataframe[fold_series.isin(train_folds)] if valid_folds is None: mask = ~fold_series.isin(train_folds) valid_folds = result_dataframe[mask]["fold"] valid_folds = folds_to_list(valid_folds) df_valid = result_dataframe[fold_series.isin(valid_folds)] infer_folds = folds_to_list(infer_folds or []) df_infer = result_dataframe[fold_series.isin(infer_folds)] return result_dataframe, df_train, df_valid, df_infer
[docs]def merge_multiple_fold_csv( fold_name: str, paths: Optional[str] ) -> pd.DataFrame: """ Reads csv into one DataFrame with column ``fold`` Args: fold_name (str): current fold name paths (str): paths to csv separated by commas Returns: pd.DataFrame: merged dataframes with column ``fold`` == ``fold_name`` """ result = pd.DataFrame() if paths is not None: for csv_path in paths.split(","): dataframe = pd.read_csv(csv_path) dataframe["fold"] = fold_name result = result.append(dataframe, ignore_index=True) return result
[docs]def read_multiple_dataframes( in_csv_train: str = None, in_csv_valid: str = None, in_csv_infer: str = None, tag2class: Optional[Dict[str, int]] = None, class_column: str = None, tag_column: str = None ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: """This function reads train/valid/infer dataframes from giving paths Args: in_csv_train (str): paths to train csv separated by commas in_csv_valid (str): paths to valid csv separated by commas in_csv_infer (str): paths to infer csv separated by commas tag2class (Dict[str, int], optional): mapping from label names into int tag_column (str, optional): column with label names class_column (str, optional): column to use for split Returns: (tuple): tuple with 4 dataframes whole dataframe, train part, valid part and infer part """ assert any( [x is not None for x in (in_csv_train, in_csv_valid, in_csv_infer)] ) result_df = None fold_dfs = {} for fold_df, fold_name in zip( (in_csv_train, in_csv_valid, in_csv_infer), ("train", "valid", "infer") ): if fold_df is not None: fold_df = merge_multiple_fold_csv( fold_name=fold_name, paths=fold_df ) if args_are_not_none(tag2class, tag_column, class_column): fold_df = map_dataframe( fold_df, tag_column, class_column, tag2class ) fold_dfs[fold_name] = fold_df result_df = fold_df \ if result_df is None \ else result_df.append(fold_df, ignore_index=True) output = ( result_df, fold_dfs.get("train", None), fold_dfs.get("valid", None), fold_dfs.get("infer", None), ) return output
[docs]def read_csv_data( in_csv: str = None, train_folds: Optional[List[int]] = None, valid_folds: Optional[List[int]] = None, infer_folds: Optional[List[int]] = None, seed: int = 42, n_folds: int = 5, in_csv_train: str = None, in_csv_valid: str = None, in_csv_infer: str = None, tag2class: Optional[Dict[str, int]] = None, class_column: str = None, tag_column: str = None, ) -> Tuple[pd.DataFrame, List[dict], List[dict], List[dict]]: """ From giving path ``in_csv`` reads a dataframe and split it to train/valid/infer folds or from several paths ``in_csv_train``, ``in_csv_valid``, ``in_csv_infer`` reads independent folds. Note: This function can be used with different combinations of params. First block is used to get dataset from one `csv`: in_csv, train_folds, valid_folds, infer_folds, seed, n_folds Second includes paths to different csv for train/valid and infer parts: in_csv_train, in_csv_valid, in_csv_infer The other params (tag2class, tag_column, class_column) are optional for any previous block Args: in_csv (str): paths to whole dataset train_folds (List[int]): train folds valid_folds (List[int], optional): valid folds. If none takes all folds not included in ``train_folds`` infer_folds (List[int], optional): infer folds. If none takes all folds not included in ``train_folds`` and ``valid_folds`` seed (int): seed for split n_folds (int): number of folds in_csv_train (str): paths to train csv separated by commas in_csv_valid (str): paths to valid csv separated by commas in_csv_infer (str): paths to infer csv separated by commas tag2class (Dict[str, int]): mapping from label names into ints tag_column (str): column with label names class_column (str): column to use for split Returns: (Tuple[pd.DataFrame, List[dict], List[dict], List[dict]]): tuple with 4 elements (whole dataframe, list with train data, list with valid data and list with infer data) """ from_one_df: bool = in_csv is not None from_multiple_df: bool = \ in_csv_train is not None \ or in_csv_valid is not None \ or in_csv_infer is not None if from_one_df == from_multiple_df: raise ValueError( "You should pass `in_csv` " "or `in_csv_train` with `in_csv_valid` but not both!" ) if from_one_df: dataframe: pd.DataFrame = pd.read_csv(in_csv) dataframe, df_train, df_valid, df_infer = split_dataframe( dataframe, train_folds=train_folds, valid_folds=valid_folds, infer_folds=infer_folds, tag2class=tag2class, class_column=class_column, tag_column=tag_column, seed=seed, n_folds=n_folds ) else: dataframe, df_train, df_valid, df_infer = read_multiple_dataframes( in_csv_train=in_csv_train, in_csv_valid=in_csv_valid, in_csv_infer=in_csv_infer, tag2class=tag2class, class_column=class_column, tag_column=tag_column ) for data in [df_train, df_valid, df_infer]: if data is not None and "fold" in data.columns: del data["fold"] result = ( dataframe, dataframe_to_list(df_train) if df_train is not None else None, dataframe_to_list(df_valid) if df_valid is not None else None, dataframe_to_list(df_infer) if df_infer is not None else None, ) return result
[docs]def balance_classes( dataframe: pd.DataFrame, class_column: str = "label", random_state: int = 42, how: str = "downsampling" ) -> pd.DataFrame: """ Balance classes in dataframe by ``class_column`` See also :class:`catalyst.data.sampler.BalanceClassSampler` Args: dataframe: a dataset class_column: which column to use for split random_state: seed for random shuffle how: strategy to sample must be one on ["downsampling", "upsampling"] Returns: pd.DataFrame: new dataframe with balanced ``class_column`` """ cnt = defaultdict(lambda: 0.0) for label in sorted(dataframe[class_column].unique()): cnt[label] = len(dataframe[dataframe[class_column] == label]) if isinstance(how, int) or how == "upsampling": samples_per_class = how if isinstance(how, int) else max(cnt.values()) balanced_dfs = {} for label in sorted(dataframe[class_column].unique()): df_class_column = dataframe[dataframe[class_column] == label] if samples_per_class <= len(df_class_column): balanced_dfs[label] = df_class_column.sample( samples_per_class, replace=True, random_state=random_state ) else: df_class_column_additional = df_class_column.sample( samples_per_class - len(df_class_column), replace=True, random_state=random_state ) balanced_dfs[label] = pd.concat( (df_class_column, df_class_column_additional) ) elif how == "downsampling": samples_per_class = min(cnt.values()) balanced_dfs = {} for label in sorted(dataframe[class_column].unique()): balanced_dfs[label] = ( dataframe[dataframe[class_column] == label].sample( samples_per_class, replace=False, random_state=random_state ) ) else: raise NotImplementedError() balanced_df = pd.concat(balanced_dfs.values()) return balanced_df