from typing import Dict, List, Optional, Tuple, Union # isort:skip
from collections import defaultdict
import numpy as np
import pandas as pd
from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.utils import shuffle
from tqdm.auto import tqdm
from catalyst.utils import args_are_not_none
tqdm.pandas()
[docs]def dataframe_to_list(dataframe: pd.DataFrame) -> List[dict]:
"""
Converts dataframe to a list of rows (without indexes)
Args:
dataframe (DataFrame): input dataframe
Returns:
(List[dict]): list of rows
"""
result = list(dataframe.to_dict(orient="index").values())
return result
[docs]def folds_to_list(folds: Union[list, str, pd.Series]) -> List[int]:
"""
This function formats string or either list of numbers
into a list of unique int
Args:
folds (Union[list, str, pd.Series]): Either list of numbers or
one string with numbers separated by commas or
pandas series
Returns:
List[int]: list of unique ints
Examples:
>>> folds_to_list("1,2,1,3,4,2,4,6")
[1, 2, 3, 4, 6]
>>> folds_to_list([1, 2, 3.0, 5])
[1, 2, 3, 5]
Raises:
ValueError: if value in string or array cannot be casted to int
"""
if isinstance(folds, str):
folds = folds.split(",")
elif isinstance(folds, pd.Series):
folds = list(sorted(folds.unique()))
return list(sorted(list({int(x) for x in folds})))
[docs]def split_dataframe_train_test(
dataframe: pd.DataFrame, **train_test_split_args
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Split dataframe in train and test part.
Args:
dataframe: pd.DataFrame to split
**train_test_split_args:
test_size : float, int, or None (default is None)
If float, should be between 0.0 and 1.0 and represent the
proportion of the dataset to include in the test split. If
int, represents the absolute number of test samples. If None,
the value is automatically set
to the complement of the train size.
If train size is also None, test size is set to 0.25.
train_size : float, int, or None (default is None)
If float, should be between 0.0 and 1.0 and represent the
proportion of the dataset to include in the train split. If
int, represents the absolute number of train samples. If None,
the value is automatically set
to the complement of the test size.
random_state : int or RandomState
Pseudo-random number generator state used for random sampling.
stratify : array-like or None (default is None)
If not None, data is split in a stratified fashion,
using this as the class labels.
Returns:
train and test DataFrames
PS. It exist cause sklearn `split` is overcomplicated.
"""
df_train, df_test = train_test_split(dataframe, **train_test_split_args)
return df_train, df_test
[docs]def split_dataframe_on_folds(
dataframe: pd.DataFrame, random_state: int = 42, n_folds: int = 5
) -> pd.DataFrame:
"""
Splits DataFrame into `N` folds.
Args:
dataframe: a dataset
random_state: seed for random shuffle
n_folds: number of result folds
Returns:
pd.DataFrame: new dataframe with `fold` column
"""
dataframe = shuffle(dataframe, random_state=random_state)
df_tmp = []
for i, df_el in enumerate(np.array_split(dataframe, n_folds)):
df_el["fold"] = i
df_tmp.append(df_el)
dataframe = pd.concat(df_tmp)
return dataframe
[docs]def split_dataframe_on_stratified_folds(
dataframe: pd.DataFrame,
class_column: str,
random_state: int = 42,
n_folds: int = 5
) -> pd.DataFrame:
"""
Splits DataFrame into `N` stratified folds.
Also see :class:`catalyst.data.sampler.BalanceClassSampler`
Args:
dataframe: a dataset
class_column: which column to use for split
random_state: seed for random shuffle
n_folds: number of result folds
Returns:
pd.DataFrame: new dataframe with `fold` column
"""
skf = StratifiedKFold(
n_splits=n_folds, shuffle=True, random_state=random_state
)
fold_column = np.zeros(len(dataframe), dtype=int)
for i, (_, test_index) in enumerate(
skf.split(range(len(dataframe)), dataframe[class_column])
):
fold_column[test_index] = i
dataframe["fold"] = fold_column
return dataframe
[docs]def split_dataframe_on_column_folds(
dataframe: pd.DataFrame,
column: str,
random_state: int = 42,
n_folds: int = 5
) -> pd.DataFrame:
"""
Splits DataFrame into `N` folds.
Args:
dataframe: a dataset
column: which column to use
random_state: seed for random shuffle
n_folds: number of result folds
Returns:
pd.DataFrame: new dataframe with `fold` column
"""
df_tmp = []
labels = shuffle(
sorted(dataframe[column].unique()), random_state=random_state
)
for i, fold_labels in enumerate(np.array_split(labels, n_folds)):
df_label = dataframe[dataframe[column].isin(fold_labels)]
df_label["fold"] = i
df_tmp.append(df_label)
dataframe = pd.concat(df_tmp)
return dataframe
[docs]def map_dataframe(
dataframe: pd.DataFrame,
tag_column: str,
class_column: str,
tag2class: Dict[str, int],
verbose: bool = False
) -> pd.DataFrame:
"""
This function maps tags from ``tag_column`` to ints into ``class_column``
Using ``tag2class`` dictionary
Args:
dataframe (pd.DataFrame): input dataframe
tag_column (str): column with tags
class_column (str) output column with classes
tag2class (Dict[str, int]): mapping from tags to class labels
verbose: flag if true, uses tqdm
Returns:
pd.DataFrame: updated dataframe with ``class_column``
"""
dataframe: pd.DataFrame = dataframe.copy()
def map_label(x):
return tag2class[str(x)]
if verbose:
series: pd.Series = dataframe[tag_column].progress_apply(map_label)
else:
series: pd.Series = dataframe[tag_column].apply(map_label)
dataframe.loc[series.index, class_column] = series
return dataframe
[docs]def get_dataset_labeling(
dataframe: pd.DataFrame, tag_column: str
) -> Dict[str, int]:
"""
Prepares a mapping using unique values from ``tag_column``
.. code-block:: javascript
{
"class_name_0": 0,
"class_name_1": 1,
...
"class_name_N": N
}
Args:
dataframe: a dataset
tag_column: which column to use
Returns:
Dict[str, int]: mapping from tag to labels
"""
tag_to_labels = {
str(class_name): label
for label, class_name in
enumerate(sorted(dataframe[tag_column].unique()))
}
return tag_to_labels
[docs]def split_dataframe(
dataframe: pd.DataFrame,
train_folds: List[int],
valid_folds: Optional[List[int]] = None,
infer_folds: Optional[List[int]] = None,
tag2class: Optional[Dict[str, int]] = None,
tag_column: str = None,
class_column: str = None,
seed: int = 42,
n_folds: int = 5
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""
Split a Pandas DataFrame into folds.
Args:
dataframe (pd.DataFrame): input dataframe
train_folds (List[int]): train folds
valid_folds (List[int], optional): valid folds.
If none takes all folds not included in ``train_folds``
infer_folds (List[int], optional): infer folds.
If none takes all folds not included in ``train_folds``
and ``valid_folds``
tag2class (Dict[str, int], optional): mapping from label names into int
tag_column (str, optional): column with label names
class_column (str, optional): column to use for split
seed (int): seed for split
n_folds (int): number of folds
Returns:
(tuple): tuple with 4 dataframes
whole dataframe, train part, valid part and infer part
"""
if args_are_not_none(tag2class, tag_column, class_column):
dataframe = map_dataframe(
dataframe, tag_column, class_column, tag2class
)
if class_column is not None:
result_dataframe = split_dataframe_on_stratified_folds(
dataframe,
class_column=class_column,
random_state=seed,
n_folds=n_folds
)
else:
result_dataframe = split_dataframe_on_folds(
dataframe, random_state=seed, n_folds=n_folds
)
fold_series = result_dataframe["fold"]
train_folds = folds_to_list(train_folds)
df_train = result_dataframe[fold_series.isin(train_folds)]
if valid_folds is None:
mask = ~fold_series.isin(train_folds)
valid_folds = result_dataframe[mask]["fold"]
valid_folds = folds_to_list(valid_folds)
df_valid = result_dataframe[fold_series.isin(valid_folds)]
infer_folds = folds_to_list(infer_folds or [])
df_infer = result_dataframe[fold_series.isin(infer_folds)]
return result_dataframe, df_train, df_valid, df_infer
[docs]def merge_multiple_fold_csv(
fold_name: str, paths: Optional[str]
) -> pd.DataFrame:
"""
Reads csv into one DataFrame with column ``fold``
Args:
fold_name (str): current fold name
paths (str): paths to csv separated by commas
Returns:
pd.DataFrame: merged dataframes with column ``fold`` == ``fold_name``
"""
result = pd.DataFrame()
if paths is not None:
for csv_path in paths.split(","):
dataframe = pd.read_csv(csv_path)
dataframe["fold"] = fold_name
result = result.append(dataframe, ignore_index=True)
return result
[docs]def read_multiple_dataframes(
in_csv_train: str = None,
in_csv_valid: str = None,
in_csv_infer: str = None,
tag2class: Optional[Dict[str, int]] = None,
class_column: str = None,
tag_column: str = None
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""This function reads train/valid/infer dataframes from giving paths
Args:
in_csv_train (str): paths to train csv separated by commas
in_csv_valid (str): paths to valid csv separated by commas
in_csv_infer (str): paths to infer csv separated by commas
tag2class (Dict[str, int], optional): mapping from label names into int
tag_column (str, optional): column with label names
class_column (str, optional): column to use for split
Returns:
(tuple): tuple with 4 dataframes
whole dataframe, train part, valid part and infer part
"""
assert any(
[x is not None for x in (in_csv_train, in_csv_valid, in_csv_infer)]
)
result_df = None
fold_dfs = {}
for fold_df, fold_name in zip(
(in_csv_train, in_csv_valid, in_csv_infer),
("train", "valid", "infer")
):
if fold_df is not None:
fold_df = merge_multiple_fold_csv(
fold_name=fold_name, paths=fold_df
)
if args_are_not_none(tag2class, tag_column, class_column):
fold_df = map_dataframe(
fold_df, tag_column, class_column, tag2class
)
fold_dfs[fold_name] = fold_df
result_df = fold_df \
if result_df is None \
else result_df.append(fold_df, ignore_index=True)
output = (
result_df,
fold_dfs.get("train", None),
fold_dfs.get("valid", None),
fold_dfs.get("infer", None),
)
return output
[docs]def read_csv_data(
in_csv: str = None,
train_folds: Optional[List[int]] = None,
valid_folds: Optional[List[int]] = None,
infer_folds: Optional[List[int]] = None,
seed: int = 42,
n_folds: int = 5,
in_csv_train: str = None,
in_csv_valid: str = None,
in_csv_infer: str = None,
tag2class: Optional[Dict[str, int]] = None,
class_column: str = None,
tag_column: str = None,
) -> Tuple[pd.DataFrame, List[dict], List[dict], List[dict]]:
"""
From giving path ``in_csv`` reads a dataframe
and split it to train/valid/infer folds
or from several paths ``in_csv_train``, ``in_csv_valid``, ``in_csv_infer``
reads independent folds.
Note:
This function can be used with different combinations of params.
First block is used to get dataset from one `csv`:
in_csv, train_folds, valid_folds, infer_folds, seed, n_folds
Second includes paths to different csv for train/valid and infer parts:
in_csv_train, in_csv_valid, in_csv_infer
The other params (tag2class, tag_column, class_column) are optional
for any previous block
Args:
in_csv (str): paths to whole dataset
train_folds (List[int]): train folds
valid_folds (List[int], optional): valid folds.
If none takes all folds not included in ``train_folds``
infer_folds (List[int], optional): infer folds.
If none takes all folds not included in ``train_folds``
and ``valid_folds``
seed (int): seed for split
n_folds (int): number of folds
in_csv_train (str): paths to train csv separated by commas
in_csv_valid (str): paths to valid csv separated by commas
in_csv_infer (str): paths to infer csv separated by commas
tag2class (Dict[str, int]): mapping from label names into ints
tag_column (str): column with label names
class_column (str): column to use for split
Returns:
(Tuple[pd.DataFrame, List[dict], List[dict], List[dict]]):
tuple with 4 elements
(whole dataframe,
list with train data,
list with valid data
and list with infer data)
"""
from_one_df: bool = in_csv is not None
from_multiple_df: bool = \
in_csv_train is not None \
or in_csv_valid is not None \
or in_csv_infer is not None
if from_one_df == from_multiple_df:
raise ValueError(
"You should pass `in_csv` "
"or `in_csv_train` with `in_csv_valid` but not both!"
)
if from_one_df:
dataframe: pd.DataFrame = pd.read_csv(in_csv)
dataframe, df_train, df_valid, df_infer = split_dataframe(
dataframe,
train_folds=train_folds,
valid_folds=valid_folds,
infer_folds=infer_folds,
tag2class=tag2class,
class_column=class_column,
tag_column=tag_column,
seed=seed,
n_folds=n_folds
)
else:
dataframe, df_train, df_valid, df_infer = read_multiple_dataframes(
in_csv_train=in_csv_train,
in_csv_valid=in_csv_valid,
in_csv_infer=in_csv_infer,
tag2class=tag2class,
class_column=class_column,
tag_column=tag_column
)
for data in [df_train, df_valid, df_infer]:
if data is not None and "fold" in data.columns:
del data["fold"]
result = (
dataframe,
dataframe_to_list(df_train) if df_train is not None else None,
dataframe_to_list(df_valid) if df_valid is not None else None,
dataframe_to_list(df_infer) if df_infer is not None else None,
)
return result
[docs]def balance_classes(
dataframe: pd.DataFrame,
class_column: str = "label",
random_state: int = 42,
how: str = "downsampling"
) -> pd.DataFrame:
"""
Balance classes in dataframe by ``class_column``
See also :class:`catalyst.data.sampler.BalanceClassSampler`
Args:
dataframe: a dataset
class_column: which column to use for split
random_state: seed for random shuffle
how: strategy to sample
must be one on ["downsampling", "upsampling"]
Returns:
pd.DataFrame: new dataframe with balanced ``class_column``
"""
cnt = defaultdict(lambda: 0.0)
for label in sorted(dataframe[class_column].unique()):
cnt[label] = len(dataframe[dataframe[class_column] == label])
if isinstance(how, int) or how == "upsampling":
samples_per_class = how if isinstance(how, int) else max(cnt.values())
balanced_dfs = {}
for label in sorted(dataframe[class_column].unique()):
df_class_column = dataframe[dataframe[class_column] == label]
if samples_per_class <= len(df_class_column):
balanced_dfs[label] = df_class_column.sample(
samples_per_class, replace=True, random_state=random_state
)
else:
df_class_column_additional = df_class_column.sample(
samples_per_class - len(df_class_column),
replace=True,
random_state=random_state
)
balanced_dfs[label] = pd.concat(
(df_class_column, df_class_column_additional)
)
elif how == "downsampling":
samples_per_class = min(cnt.values())
balanced_dfs = {}
for label in sorted(dataframe[class_column].unique()):
balanced_dfs[label] = (
dataframe[dataframe[class_column] == label].sample(
samples_per_class,
replace=False,
random_state=random_state
)
)
else:
raise NotImplementedError()
balanced_df = pd.concat(balanced_dfs.values())
return balanced_df