# flake8: noqa
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from collections import defaultdict
import glob
import itertools
import os
import numpy as np
import pandas as pd
from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.utils import shuffle
from catalyst.utils.misc import args_are_not_none
DictDataset = Dict[str, List[Any]]
[docs]def create_dataset(
dirs: str,
extension: str = None,
process_fn: Callable[[str], object] = None,
recursive: bool = False,
) -> DictDataset:
"""
Create dataset (dict like `{key: [values]}`) from vctk-like dataset::
dataset/
cat/
*.ext
dog/
*.ext
Args:
dirs: path to dirs, for example /home/user/data/**
extension: data extension you are looking for
process_fn (Callable[[str], object]): function(path_to_file) -> object
process function for found files, by default
recursive: enables recursive globbing
Returns:
dict: dataset
"""
extension = extension or "*"
dataset = defaultdict(list)
dirs = [os.path.expanduser(k) for k in dirs.strip().split()]
dirs = itertools.chain(*(glob.glob(d) for d in dirs))
dirs = [d for d in dirs if os.path.isdir(d)]
for d in sorted(dirs):
label = os.path.basename(d.rstrip("/"))
pathname = d + ("/**/" if recursive else "/") + extension
files = glob.iglob(pathname, recursive=recursive)
files = sorted(filter(os.path.isfile, files))
if process_fn is None:
dataset[label].extend(files)
else:
dataset[label].extend([process_fn(x) for x in files])
return dataset
[docs]def split_dataset_train_test(
dataset: pd.DataFrame, **train_test_split_args
) -> Tuple[DictDataset, DictDataset]:
"""Split dataset in train and test parts.
Args:
dataset: dict like dataset
**train_test_split_args:
test_size : float, int, or None (default is None)
If float, should be between 0.0 and 1.0 and represent the
proportion of the dataset to include in the test split. If
int, represents the absolute number of test samples. If None,
the value is automatically set
to the complement of the train size.
If train size is also None, test size is set to 0.25.
train_size : float, int, or None (default is None)
If float, should be between 0.0 and 1.0 and represent the
proportion of the dataset to include in the train split. If
int, represents the absolute number of train samples. If None,
the value is automatically set
to the complement of the test size.
random_state : int or RandomState
Pseudo-random number generator state used for random sampling.
stratify : array-like or None (default is None)
If not None, data is split in a stratified fashion,
using this as the class labels.
Returns:
train and test dicts
"""
train_dataset = defaultdict(list)
test_dataset = defaultdict(list)
for key, value in dataset.items():
train_ids, test_ids = train_test_split(range(len(value)), **train_test_split_args)
train_dataset[key].extend([value[i] for i in train_ids])
test_dataset[key].extend([value[i] for i in test_ids])
return train_dataset, test_dataset
[docs]def create_dataframe(dataset: DictDataset, **dataframe_args) -> pd.DataFrame:
"""Create pd.DataFrame from dict like `{key: [values]}`.
Args:
dataset: dict like `{key: [values]}`
**dataframe_args:
index : Index or array-like
Index to use for resulting frame.
Will default to np.arange(n) if no indexing information
part of input data and no index provided
columns : Index or array-like
Column labels to use for resulting frame. Will default to
np.arange(n) if no column labels are provided
dtype : dtype, default None
Data type to force, otherwise infer
Returns:
pd.DataFrame: dataframe from giving dataset
"""
data = [(key, value) for key, values in dataset.items() for value in values]
df = pd.DataFrame(data, **dataframe_args)
return df
[docs]def dataframe_to_list(dataframe: pd.DataFrame) -> List[dict]:
"""Converts dataframe to a list of rows (without indexes).
Args:
dataframe: input dataframe
Returns:
List[dict]: list of rows
"""
result = list(dataframe.to_dict(orient="index").values())
return result
[docs]def folds_to_list(folds: Union[list, str, pd.Series]) -> List[int]:
"""This function formats string or either list of numbers
into a list of unique int.
Examples:
>>> folds_to_list("1,2,1,3,4,2,4,6")
[1, 2, 3, 4, 6]
>>> folds_to_list([1, 2, 3.0, 5])
[1, 2, 3, 5]
Args:
folds (Union[list, str, pd.Series]): Either list of numbers or
one string with numbers separated by commas or
pandas series
Returns:
List[int]: list of unique ints
Raises:
ValueError: if value in string or array cannot be casted to int
"""
if isinstance(folds, str):
folds = folds.split(",")
elif isinstance(folds, pd.Series):
folds = sorted(folds.unique())
return sorted({int(x) for x in folds})
[docs]def split_dataframe_train_test(
dataframe: pd.DataFrame, **train_test_split_args
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Split dataframe in train and test part.
Args:
dataframe: pd.DataFrame to split
**train_test_split_args:
test_size : float, int, or None (default is None)
If float, should be between 0.0 and 1.0 and represent the
proportion of the dataset to include in the test split. If
int, represents the absolute number of test samples. If None,
the value is automatically set
to the complement of the train size.
If train size is also None, test size is set to 0.25.
train_size : float, int, or None (default is None)
If float, should be between 0.0 and 1.0 and represent the
proportion of the dataset to include in the train split. If
int, represents the absolute number of train samples. If None,
the value is automatically set
to the complement of the test size.
random_state : int or RandomState
Pseudo-random number generator state used for random sampling.
stratify : array-like or None (default is None)
If not None, data is split in a stratified fashion,
using this as the class labels.
Returns:
train and test DataFrames
.. note::
It exist cause sklearn `split` is overcomplicated.
"""
df_train, df_test = train_test_split(dataframe, **train_test_split_args)
return df_train, df_test
[docs]def split_dataframe_on_folds(
dataframe: pd.DataFrame, random_state: int = 42, n_folds: int = 5
) -> pd.DataFrame:
"""Splits DataFrame into `N` folds.
Args:
dataframe: a dataset
random_state: seed for random shuffle
n_folds: number of result folds
Returns:
pd.DataFrame: new dataframe with `fold` column
"""
dataframe = shuffle(dataframe, random_state=random_state)
df_tmp = []
for i, df_el in enumerate(np.array_split(dataframe, n_folds)):
df_el["fold"] = i
df_tmp.append(df_el)
dataframe = pd.concat(df_tmp)
return dataframe
[docs]def split_dataframe_on_stratified_folds(
dataframe: pd.DataFrame, class_column: str, random_state: int = 42, n_folds: int = 5,
) -> pd.DataFrame:
"""Splits DataFrame into `N` stratified folds.
Also see :class:`catalyst.data.sampler.BalanceClassSampler`
Args:
dataframe: a dataset
class_column: which column to use for split
random_state: seed for random shuffle
n_folds: number of result folds
Returns:
pd.DataFrame: new dataframe with `fold` column
"""
skf = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=random_state)
fold_column = np.zeros(len(dataframe), dtype=int)
for i, (_, test_index) in enumerate(skf.split(range(len(dataframe)), dataframe[class_column])):
fold_column[test_index] = i
dataframe["fold"] = fold_column
return dataframe
[docs]def split_dataframe_on_column_folds(
dataframe: pd.DataFrame, column: str, random_state: int = 42, n_folds: int = 5,
) -> pd.DataFrame:
"""Splits DataFrame into `N` folds.
Args:
dataframe: a dataset
column: which column to use
random_state: seed for random shuffle
n_folds: number of result folds
Returns:
pd.DataFrame: new dataframe with `fold` column
"""
df_tmp = []
labels = shuffle(sorted(dataframe[column].unique()), random_state=random_state)
for i, fold_labels in enumerate(np.array_split(labels, n_folds)):
df_label = dataframe[dataframe[column].isin(fold_labels)]
df_label["fold"] = i
df_tmp.append(df_label)
dataframe = pd.concat(df_tmp)
return dataframe
[docs]def map_dataframe(
dataframe: pd.DataFrame,
tag_column: str,
class_column: str,
tag2class: Dict[str, int],
verbose: bool = False,
) -> pd.DataFrame:
"""This function maps tags from ``tag_column`` to ints into
``class_column`` using ``tag2class`` dictionary.
Args:
dataframe: input dataframe
tag_column: column with tags
class_column (str) output column with classes
tag2class (Dict[str, int]): mapping from tags to class labels
verbose: flag if true, uses tqdm
Returns:
pd.DataFrame: updated dataframe with ``class_column``
"""
dataframe: pd.DataFrame = dataframe.copy()
def map_label(x):
return tag2class[str(x)]
if verbose:
series: pd.Series = dataframe[tag_column].progress_apply(map_label)
else:
series: pd.Series = dataframe[tag_column].apply(map_label)
dataframe.loc[series.index, class_column] = series
return dataframe
[docs]def get_dataset_labeling(dataframe: pd.DataFrame, tag_column: str) -> Dict[str, int]:
"""Prepares a mapping using unique values from ``tag_column``.
.. code-block:: javascript
{
"class_name_0": 0,
"class_name_1": 1,
...
"class_name_N": N
}
Args:
dataframe: a dataset
tag_column: which column to use
Returns:
Dict[str, int]: mapping from tag to labels
"""
tag_to_labels = {
str(class_name): label
for label, class_name in enumerate(sorted(dataframe[tag_column].unique()))
}
return tag_to_labels
[docs]def split_dataframe(
dataframe: pd.DataFrame,
train_folds: List[int],
valid_folds: Optional[List[int]] = None,
infer_folds: Optional[List[int]] = None,
tag2class: Optional[Dict[str, int]] = None,
tag_column: str = None,
class_column: str = None,
seed: int = 42,
n_folds: int = 5,
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""Split a Pandas DataFrame into folds.
Args:
dataframe: input dataframe
train_folds: train folds
valid_folds (List[int], optional): valid folds.
If none takes all folds not included in ``train_folds``
infer_folds (List[int], optional): infer folds.
If none takes all folds not included in ``train_folds``
and ``valid_folds``
tag2class (Dict[str, int], optional): mapping from label names into int
tag_column (str, optional): column with label names
class_column (str, optional): column to use for split
seed: seed for split
n_folds: number of folds
Returns:
tuple: tuple with 4 dataframes
whole dataframe, train part, valid part and infer part
"""
if args_are_not_none(tag2class, tag_column, class_column):
dataframe = map_dataframe(dataframe, tag_column, class_column, tag2class)
if class_column is not None:
df_all = split_dataframe_on_stratified_folds(
dataframe, class_column=class_column, random_state=seed, n_folds=n_folds,
)
else:
df_all = split_dataframe_on_folds(dataframe, random_state=seed, n_folds=n_folds)
fold_series = df_all["fold"]
train_folds = folds_to_list(train_folds)
df_train = df_all[fold_series.isin(train_folds)]
if valid_folds is None:
mask = ~fold_series.isin(train_folds)
valid_folds = df_all[mask]["fold"]
valid_folds = folds_to_list(valid_folds)
df_valid = df_all[fold_series.isin(valid_folds)]
infer_folds = folds_to_list(infer_folds or [])
df_infer = df_all[fold_series.isin(infer_folds)]
return df_all, df_train, df_valid, df_infer
# def merge_multiple_fold_csv(fold_name: str, paths: Optional[str]) -> pd.DataFrame:
# """Reads csv into one DataFrame with column ``fold``.
#
# Args:
# fold_name: current fold name
# paths: paths to csv separated by commas
#
# Returns:
# pd.DataFrame: merged dataframes with column ``fold`` == ``fold_name``
# """
# result = pd.DataFrame()
# if paths is not None:
# for csv_path in paths.split(","):
# dataframe = pd.read_csv(csv_path)
# dataframe["fold"] = fold_name
# result = result.append(dataframe, ignore_index=True)
#
# return result
# def read_multiple_dataframes(
# in_csv_train: str = None,
# in_csv_valid: str = None,
# in_csv_infer: str = None,
# tag2class: Optional[Dict[str, int]] = None,
# class_column: str = None,
# tag_column: str = None,
# ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
# """This function reads train/valid/infer dataframes from giving paths.
#
# Args:
# in_csv_train: paths to train csv separated by commas
# in_csv_valid: paths to valid csv separated by commas
# in_csv_infer: paths to infer csv separated by commas
# tag2class (Dict[str, int], optional): mapping from label names into int
# tag_column (str, optional): column with label names
# class_column (str, optional): column to use for split
#
# Returns:
# tuple: tuple with 4 dataframes
# whole dataframe, train part, valid part and infer part
# """
# assert any(x is not None for x in (in_csv_train, in_csv_valid, in_csv_infer))
#
# result_df = None
# fold_dfs = {}
# for fold_df, fold_name in zip(
# (in_csv_train, in_csv_valid, in_csv_infer), ("train", "valid", "infer")
# ):
# if fold_df is not None:
# fold_df = merge_multiple_fold_csv(fold_name=fold_name, paths=fold_df)
# if args_are_not_none(tag2class, tag_column, class_column):
# fold_df = map_dataframe(fold_df, tag_column, class_column, tag2class)
# fold_dfs[fold_name] = fold_df
#
# result_df = (
# fold_df if result_df is None else result_df.append(fold_df, ignore_index=True)
# )
#
# output = (
# result_df,
# fold_dfs.get("train", None),
# fold_dfs.get("valid", None),
# fold_dfs.get("infer", None),
# )
#
# return output
# def read_csv_data(
# in_csv: str = None,
# train_folds: Optional[List[int]] = None,
# valid_folds: Optional[List[int]] = None,
# infer_folds: Optional[List[int]] = None,
# seed: int = 42,
# n_folds: int = 5,
# in_csv_train: str = None,
# in_csv_valid: str = None,
# in_csv_infer: str = None,
# tag2class: Optional[Dict[str, int]] = None,
# class_column: str = None,
# tag_column: str = None,
# ) -> Tuple[pd.DataFrame, List[dict], List[dict], List[dict]]:
# """
# From giving path ``in_csv`` reads a dataframe
# and split it to train/valid/infer folds
# or from several paths ``in_csv_train``, ``in_csv_valid``, ``in_csv_infer``
# reads independent folds.
#
# .. note::
# This function can be used with different combinations of params.
# First block is used to get dataset from one `csv`:
# in_csv, train_folds, valid_folds, infer_folds, seed, n_folds
# Second includes paths to different csv for train/valid and infer parts:
# in_csv_train, in_csv_valid, in_csv_infer
# The other params (tag2class, tag_column, class_column) are optional
# for any previous block
#
# Args:
# in_csv: paths to whole dataset
# train_folds: train folds
# valid_folds (List[int], optional): valid folds.
# If none takes all folds not included in ``train_folds``
# infer_folds (List[int], optional): infer folds.
# If none takes all folds not included in ``train_folds``
# and ``valid_folds``
# seed: seed for split
# n_folds: number of folds
#
# in_csv_train: paths to train csv separated by commas
# in_csv_valid: paths to valid csv separated by commas
# in_csv_infer: paths to infer csv separated by commas
#
# tag2class (Dict[str, int]): mapping from label names into ints
# tag_column: column with label names
# class_column: column to use for split
#
# Returns:
# Tuple[pd.DataFrame, List[dict], List[dict], List[dict]]:
# tuple with 4 elements
# (whole dataframe,
# list with train data,
# list with valid data
# and list with infer data)
# """
# from_one_df: bool = in_csv is not None
# from_multiple_df: bool = (
# in_csv_train is not None or in_csv_valid is not None or in_csv_infer is not None
# )
#
# if from_one_df == from_multiple_df:
# raise ValueError(
# "You should pass `in_csv` " "or `in_csv_train` with `in_csv_valid` but not both!"
# )
#
# if from_one_df:
# dataframe: pd.DataFrame = pd.read_csv(in_csv)
# dataframe, df_train, df_valid, df_infer = split_dataframe(
# dataframe,
# train_folds=train_folds,
# valid_folds=valid_folds,
# infer_folds=infer_folds,
# tag2class=tag2class,
# class_column=class_column,
# tag_column=tag_column,
# seed=seed,
# n_folds=n_folds,
# )
# else:
# dataframe, df_train, df_valid, df_infer = read_multiple_dataframes(
# in_csv_train=in_csv_train,
# in_csv_valid=in_csv_valid,
# in_csv_infer=in_csv_infer,
# tag2class=tag2class,
# class_column=class_column,
# tag_column=tag_column,
# )
#
# for data in [df_train, df_valid, df_infer]:
# if data is not None and "fold" in data.columns:
# del data["fold"]
#
# result = (
# dataframe,
# dataframe_to_list(df_train) if df_train is not None else None,
# dataframe_to_list(df_valid) if df_valid is not None else None,
# dataframe_to_list(df_infer) if df_infer is not None else None,
# )
#
# return result
[docs]def balance_classes(
dataframe: pd.DataFrame,
class_column: str = "label",
random_state: int = 42,
how: str = "downsampling",
) -> pd.DataFrame:
"""Balance classes in dataframe by ``class_column``.
See also :class:`catalyst.data.sampler.BalanceClassSampler`.
Args:
dataframe: a dataset
class_column: which column to use for split
random_state: seed for random shuffle
how: strategy to sample, must be one on ["downsampling", "upsampling"]
Returns:
pd.DataFrame: new dataframe with balanced ``class_column``
Raises:
NotImplementedError:
if `how` is not in ["upsampling", "downsampling", int]
"""
cnt = defaultdict(lambda: 0.0)
for label in sorted(dataframe[class_column].unique()):
cnt[label] = len(dataframe[dataframe[class_column] == label])
if isinstance(how, int) or how == "upsampling":
samples_per_class = how if isinstance(how, int) else max(cnt.values())
balanced_dfs = {}
for label in sorted(dataframe[class_column].unique()):
df_class_column = dataframe[dataframe[class_column] == label]
if samples_per_class <= len(df_class_column):
balanced_dfs[label] = df_class_column.sample(
samples_per_class, replace=True, random_state=random_state
)
else:
df_class_column_additional = df_class_column.sample(
samples_per_class - len(df_class_column),
replace=True,
random_state=random_state,
)
balanced_dfs[label] = pd.concat((df_class_column, df_class_column_additional))
elif how == "downsampling":
samples_per_class = min(cnt.values())
balanced_dfs = {}
for label in sorted(dataframe[class_column].unique()):
balanced_dfs[label] = dataframe[dataframe[class_column] == label].sample(
samples_per_class, replace=False, random_state=random_state
)
else:
raise NotImplementedError()
balanced_df = pd.concat(balanced_dfs.values())
return balanced_df
__all__ = [
"dataframe_to_list",
"folds_to_list",
"split_dataframe",
"split_dataframe_on_column_folds",
"split_dataframe_on_folds",
"split_dataframe_on_stratified_folds",
"split_dataframe_train_test",
"separate_tags",
"map_dataframe",
"get_dataset_labeling",
"balance_classes",
"create_dataset",
"create_dataframe",
"split_dataset_train_test",
]