Data¶
Data subpackage has data preprocessers and dataloader abstractions.
Collate Functions¶
FilteringCollateFn¶
Dataset¶
PyTorch Extensions¶
DatasetFromSampler¶
ListDataset¶
-
class
catalyst.data.dataset.torch.
ListDataset
(list_data: List[Dict], open_fn: Callable, dict_transform: Optional[Callable] = None)[source]¶ Bases:
torch.utils.data.dataset.Dataset
General purpose dataset class with several data sources list_data.
-
__init__
(list_data: List[Dict], open_fn: Callable, dict_transform: Optional[Callable] = None)[source]¶ - Parameters
list_data – list of dicts, that stores you data annotations, (for example path to images, labels, bboxes, etc.)
open_fn – function, that can open your annotations dict and transfer it to data, needed by your network (for example open image by path, or tokenize read string.)
dict_transform – transforms to use on dict. (for example normalize image, add blur, crop/resize/etc)
-
MergeDataset¶
NumpyDataset¶
-
class
catalyst.data.dataset.torch.
NumpyDataset
(numpy_data: numpy.ndarray, numpy_key: str = 'features', dict_transform: Optional[Callable] = None)[source]¶ Bases:
torch.utils.data.dataset.Dataset
General purpose dataset class to use with numpy_data.
-
__init__
(numpy_data: numpy.ndarray, numpy_key: str = 'features', dict_transform: Optional[Callable] = None)[source]¶ General purpose dataset class to use with numpy_data.
- Parameters
numpy_data – numpy data (for example path to embeddings, features, etc.)
numpy_key – key to use for output dictionary
dict_transform – transforms to use on dict. (for example normalize vector, etc)
-
PathsDataset¶
-
class
catalyst.data.dataset.torch.
PathsDataset
(filenames: List[Union[str, pathlib.Path]], open_fn: Callable[[dict], dict], label_fn: Callable[[Union[str, pathlib.Path]], Any], features_key: str = 'features', target_key: str = 'targets', **list_dataset_params)[source]¶ Bases:
catalyst.data.dataset.torch.ListDataset
Dataset that derives features and targets from samples filesystem paths.
Examples
>>> label_fn = lambda x: x.split("_")[0] >>> dataset = PathsDataset( >>> filenames=Path("/path/to/images/").glob("*.jpg"), >>> label_fn=label_fn, >>> open_fn=open_fn, >>> )
-
__init__
(filenames: List[Union[str, pathlib.Path]], open_fn: Callable[[dict], dict], label_fn: Callable[[Union[str, pathlib.Path]], Any], features_key: str = 'features', target_key: str = 'targets', **list_dataset_params)[source]¶ - Parameters
filenames – list of file paths that store information about your dataset samples; it could be images, texts or any other files in general.
open_fn – function, that can open your annotations dict and transfer it to data, needed by your network (for example open image by path, or tokenize read string)
label_fn – function, that can extract target value from sample path (for example, your sample could be an image file like
/path/to/your/image_1.png
where the target is encoded as a part of file path)features_key – key to use to store sample features
target_key – key to use to store target label
list_dataset_params – base class initialization parameters.
-
Metric Learning Datasets¶
MetricLearningTrainDataset¶
QueryGalleryDataset¶
-
class
catalyst.data.dataset.metric_learning.
QueryGalleryDataset
[source]¶ Bases:
torch.utils.data.dataset.Dataset
,abc.ABC
QueryGallleryDataset for CMCScoreCallback
-
__init__
()¶ Initialize self. See help(type(self)) for accurate signature.
-
abstract property
gallery_size
¶ Query/Gallery dataset should have property gallery size.
- Returns
DAR202
- Return type
gallery size # noqa
- Raises
NotImplementedError – You should implement it # noqa: DAR402
-
abstract property
query_size
¶ Query/Gallery dataset should have property query size.
- Returns
DAR202
- Return type
query size # noqa
- Raises
NotImplementedError – You should implement it # noqa: DAR402
-
In-batch Samplers¶
IInbatchTripletSampler¶
-
class
catalyst.data.
IInbatchTripletSampler
[source]¶ An abstraction of inbatch triplet sampler.
-
abstract
sample
(features: torch.Tensor, labels: Union[List[int], torch.Tensor]) → Tuple[torch.Tensor, torch.Tensor, torch.Tensor][source]¶ This method includes the logic of sampling/selecting triplets.
- Parameters
features – tensor of features
labels – labels of the samples in the batch, list or Tensor of shape (batch_size;)
Returns: the batch of triplets
- Raises
NotImplementedError – you should implement it
-
abstract
InBatchTripletsSampler¶
-
class
catalyst.data.
InBatchTripletsSampler
[source]¶ Base class for a triplets samplers. We expect that the child instances of this class will be used to forming triplets inside the batches. (Note. It is assumed that set of output features is a subset of samples features inside the batch.) The batches must contain at least 2 samples for each class and at least 2 different classes, such behaviour can be garantee via using catalyst.data.sampler.BalanceBatchSampler
But you are not limited to using it in any other way.
-
sample
(features: torch.Tensor, labels: Union[List[int], torch.Tensor]) → Tuple[torch.Tensor, torch.Tensor, torch.Tensor][source]¶ - Parameters
features – has the shape of [batch_size, feature_size]
labels – labels of the samples in the batch
- Returns
(anchor, positive, negative)
- Return type
the batch of the triplets in the order below
-
AllTripletsSampler¶
HardTripletsSampler¶
-
class
catalyst.data.
HardTripletsSampler
(norm_required: bool = False)[source]¶ This sampler selects hardest triplets based on distances between features: the hardest positive sample has the maximal distance to the anchor sample, the hardest negative sample has the minimal distance to the anchor sample.
Note that a typical triplet loss chart is as follows: 1. Falling: loss decreases to a value equal to the margin. 2. Long plato: the loss oscillates near the margin. 3. Falling: loss decreases to zero.
HardClusterSampler¶
-
class
catalyst.data.
HardClusterSampler
[source]¶ This sampler selects hardest triplets based on distance to mean vectors: anchor is a mean vector of features of i-th class in the batch, the hardest positive sample is the most distant from anchor sample of anchor’s class, the hardest negative sample is the closest mean vector of another classes.
The batch must contain k samples for p classes in it (k > 1, p > 1).
-
sample
(features: torch.Tensor, labels: Union[List[int], torch.Tensor]) → Tuple[torch.Tensor, torch.Tensor, torch.Tensor][source]¶ This method samples the hardest triplets in the batch.
- Parameters
features – tensor of shape (batch_size; embed_dim) that contains k samples for each of p classes
labels – labels of the batch, list or tensor of size (batch_size)
- Returns
p triplets of (mean_vector, positive, negative_mean_vector)
-
Loader¶
BatchLimitLoaderWrapper¶
-
class
catalyst.data.loader.
BatchLimitLoaderWrapper
(loader: torch.utils.data.dataloader.DataLoader, num_batches: Union[int, float])[source]¶ Loader wrapper. Limits number of batches used per each iteration.
For example, if you have some loader and want to use only first 5 bathes:
import torch from torch.utils.data import DataLoader, TensorDataset from catalyst.data.loader import BatchLimitLoaderWrapper num_samples, num_features = int(1e4), int(1e1) X, y = torch.rand(num_samples, num_features), torch.rand(num_samples) dataset = TensorDataset(X, y) loader = DataLoader(dataset, batch_size=32, num_workers=1) loader = BatchLimitLoaderWrapper(loader, num_batches=5)
or if you would like to use only some portion of Dataloader (we use 30% in the example below):
import torch from torch.utils.data import DataLoader, TensorDataset from catalyst.data.loader import BatchLimitLoaderWrapper num_samples, num_features = int(1e4), int(1e1) X, y = torch.rand(num_samples, num_features), torch.rand(num_samples) dataset = TensorDataset(X, y) loader = DataLoader(dataset, batch_size=32, num_workers=1) loader = BatchLimitLoaderWrapper(loader, num_batches=0.3)
Note
Generally speaking, this wrapper could be used with any iterator-like object. No
DataLoader
-specific code used.-
__init__
(loader: torch.utils.data.dataloader.DataLoader, num_batches: Union[int, float])[source]¶ Loader wrapper. Limits number of batches used per each iteration.
- Parameters
loader – torch dataloader.
num_batches (Union[int, float]) – number of batches to use (int), or portion of iterator (float, should be in [0;1] range)
-
BatchPrefetchLoaderWrapper¶
-
class
catalyst.data.loader.
BatchPrefetchLoaderWrapper
(loader: torch.utils.data.dataloader.DataLoader, num_prefetches: int = None)[source]¶ Loader wrapper. Prefetches specified number of batches on the GPU.
Base usage:
import torch from torch.utils.data import DataLoader, TensorDataset from catalyst.data import BatchPrefetchLoaderWrapper num_samples, num_features = int(1e4), int(1e1) X, y = torch.rand(num_samples, num_features), torch.rand(num_samples) dataset = TensorDataset(X, y) loader = DataLoader(dataset, batch_size=32, num_workers=1) loader = BatchPrefetchLoaderWrapper(loader)
Minimal working example:
import os import torch from torch.nn import functional as F from torch.utils.data import DataLoader from catalyst import dl, metrics from catalyst.data.cv import ToTensor from catalyst.contrib.datasets import MNIST from catalyst.data import BatchPrefetchLoaderWrapper class CustomRunner(dl.Runner): def handle_batch(self, batch): # model train/valid step x, y = batch y_hat = self.model(x.view(x.size(0), -1)) loss = F.cross_entropy(y_hat, y) accuracy01 = metrics.accuracy(y_hat, y, topk=(1, )) self.batch_metrics.update( {"loss": loss, "accuracy01": accuracy01} ) if self.is_train_loader: loss.backward() self.optimizer.step() self.optimizer.zero_grad() model = torch.nn.Linear(28 * 28, 10) optimizer = torch.optim.Adam(model.parameters(), lr=0.02) batch_size=32 loaders = { "train": DataLoader( MNIST( os.getcwd(), train=True, download=True, transform=ToTensor() ), batch_size=batch_size), "valid": DataLoader( MNIST( os.getcwd(), train=False, download=True, transform=ToTensor() ), batch_size=batch_size), } loaders = { k: BatchPrefetchLoaderWrapper(v) for k, v in loaders.items() } runner = CustomRunner() # model training runner.train( model=model, optimizer=optimizer, loaders=loaders, logdir="./logs", num_epochs=5, verbose=True, load_best_on_end=True, )
Samplers¶
BalanceBatchSampler¶
-
class
catalyst.data.sampler.
BalanceBatchSampler
(labels: Union[List[int], numpy.ndarray], p: int, k: int)[source]¶ This kind of sampler can be used for both metric learning and classification task.
Sampler with the given strategy for the C unique classes dataset: - Selection P of C classes for the 1st batch - Selection K instances for each class for the 1st batch - Selection P of C - P remaining classes for 2nd batch - Selection K instances for each class for the 2nd batch - … The epoch ends when there are no classes left. So, the batch sise is P * K except the last one.
Thus, in each epoch, all the classes will be selected once, but this does not mean that all the instances will be selected during the epoch.
One of the purposes of this sampler is to be used for forming triplets and pos/neg pairs inside the batch. To guarante existance of these pairs in the batch, P and K should be > 1. (1)
Behavior in corner cases: - If a class does not contain K instances, a choice will be made with repetition. - If C % P == 1 then one of the classes should be dropped otherwise statement (1) will not be met.
This type of sampling can be found in the classical paper of Person Re-Id, where P equals 32 and K equals 4: In Defense of the Triplet Loss for Person Re-Identification.
- Parameters
labels – list of classes labeles for each elem in the dataset
p – number of classes in a batch, should be > 1
k – number of instances of each class in a batch, should be > 1
BalanceClassSampler¶
-
class
catalyst.data.sampler.
BalanceClassSampler
(labels: List[int], mode: Union[str, int] = 'downsampling')[source]¶ Allows you to create stratified sample on unbalanced classes.
- Parameters
labels – list of class label for each elem in the dataset
mode – Strategy to balance classes. Must be one of [downsampling, upsampling]
DistributedSamplerWrapper¶
-
class
catalyst.data.sampler.
DistributedSamplerWrapper
(sampler, num_replicas: Optional[int] = None, rank: Optional[int] = None, shuffle: bool = True)[source]¶ Wrapper over Sampler for distributed training. Allows you to use any sampler in distributed mode.
It is especially useful in conjunction with torch.nn.parallel.DistributedDataParallel. In such case, each process can pass a DistributedSamplerWrapper instance as a DataLoader sampler, and load a subset of subsampled data of the original dataset that is exclusive to it.
Note
Sampler is assumed to be of constant size.
-
__init__
(sampler, num_replicas: Optional[int] = None, rank: Optional[int] = None, shuffle: bool = True)[source]¶ - Parameters
sampler – Sampler used for subsampling
num_replicas (int, optional) – Number of processes participating in distributed training
rank (int, optional) – Rank of the current process within
num_replicas
shuffle (bool, optional) – If true (default), sampler will shuffle the indices
-
DynamicBalanceClassSampler¶
-
class
catalyst.data.sampler.
DynamicBalanceClassSampler
(labels: List[Union[str, int]], exp_lambda: float = 0.9, start_epoch: int = 0, max_d: Optional[int] = None, mode: Union[str, int] = 'downsampling', ignore_warning: bool = False)[source]¶ This kind of sampler can be used for classification tasks with significant class imbalance.
The idea of this sampler that we start with the original class distribution and gradually move to uniform class distribution like with downsampling.
Let’s define :math: D_i = #C_i/ #C_min where :math: #C_i is a size of class i and :math: #C_min is a size of the rarest class, so :math: D_i define class distribution. Also define :math: g(n_epoch) is a exponential scheduler. On each epoch current :math: D_i calculated as :math: current D_i = D_i ^ g(n_epoch), after this data samples according this distribution.
Notes
In the end of the training, epochs will contain only min_size_class * n_classes examples. So, possible it will not necessary to do validation on each epoch. For this reason use ControlFlowCallback.
Examples
>>> import torch >>> import numpy as np
>>> from catalyst.data import DynamicBalanceClassSampler >>> from torch.utils import data
>>> features = torch.Tensor(np.random.random((200, 100))) >>> labels = np.random.randint(0, 4, size=(200,)) >>> sampler = DynamicBalanceClassSampler(labels) >>> labels = torch.LongTensor(labels) >>> dataset = data.TensorDataset(features, labels) >>> loader = data.dataloader.DataLoader(dataset, batch_size=8)
>>> for batch in loader: >>> b_features, b_labels = batch
Sampler was inspired by https://arxiv.org/abs/1901.06783
-
__init__
(labels: List[Union[str, int]], exp_lambda: float = 0.9, start_epoch: int = 0, max_d: Optional[int] = None, mode: Union[str, int] = 'downsampling', ignore_warning: bool = False)[source]¶ - Parameters
labels – list of labels for each elem in the dataset
exp_lambda – exponent figure for schedule
start_epoch – start epoch number, can be useful for multi-stage
experiments –
max_d – if not None, limit on the difference between the most
and the rarest classes, heuristic (frequent) –
mode – number of samples per class in the end of training. Must be
or number. Before change it, make sure that you ("downsampling") –
how does it work (understand) –
ignore_warning – ignore warning about min class size
-
DynamicLenBatchSampler¶
-
class
catalyst.data.sampler.
DynamicLenBatchSampler
(sampler: torch.utils.data.sampler.Sampler[int][int], batch_size: int, drop_last: bool)[source]¶ A dynamic batch length data sampler. Should be used with catalyst.utils.trim_tensors.
Adapted from Dynamic minibatch trimming to improve BERT training speed.
- Parameters
sampler – Base sampler.
batch_size – Size of minibatch.
drop_last – If
True
, the sampler will drop the last batchits size would be less than batch_size. (if) –
Usage example:
>>> from torch.utils import data >>> from catalyst.data import DynamicLenBatchSampler >>> from catalyst import utils
>>> dataset = data.TensorDataset( >>> input_ids, input_mask, segment_ids, labels >>> )
>>> sampler_ = data.RandomSampler(dataset) >>> sampler = DynamicLenBatchSampler( >>> sampler_, batch_size=16, drop_last=False >>> ) >>> loader = data.DataLoader(dataset, batch_sampler=sampler)
>>> for batch in loader: >>> tensors = utils.trim_tensors(batch) >>> b_input_ids, b_input_mask, b_segment_ids, b_labels = >>> tuple(t.to(device) for t in tensors)
-
__init__
(sampler: torch.utils.data.sampler.Sampler[int][int], batch_size: int, drop_last: bool) → None¶
MiniEpochSampler¶
-
class
catalyst.data.sampler.
MiniEpochSampler
(data_len: int, mini_epoch_len: int, drop_last: bool = False, shuffle: str = None)[source]¶ Sampler iterates mini epochs from the dataset used by
mini_epoch_len
.- Parameters
data_len – Size of the dataset
mini_epoch_len – Num samples from the dataset used in one mini epoch.
drop_last – If
True
, sampler will drop the last batches if its size would be less thanbatches_per_epoch
shuffle – one of
"always"
,"real_epoch"
, or None`. The sampler will shuffle indices > “per_mini_epoch” - every mini epoch (every__iter__
call) > “per_epoch” – every real epoch > None – don’t shuffle
Example
>>> MiniEpochSampler(len(dataset), mini_epoch_len=100) >>> MiniEpochSampler(len(dataset), mini_epoch_len=100, drop_last=True) >>> MiniEpochSampler(len(dataset), mini_epoch_len=100, >>> shuffle="per_epoch")
Transforms¶
Normalize¶
-
class
catalyst.data.transforms.
Normalize
(mean, std, inplace=False)[source]¶ Bases:
object
Normalize a tensor image with mean and standard deviation.
Given mean:
(mean[1],...,mean[n])
and std:(std[1],..,std[n])
forn
channels, this transform will normalize each channel of the inputtorch.*Tensor
i.e.,output[channel] = (input[channel] - mean[channel]) / std[channel]
Note
- This transform acts out of place, i.e.,
it does not mutate the input tensor.
ToTensor¶
-
class
catalyst.data.transforms.
ToTensor
[source]¶ Bases:
object
Convert a
numpy.ndarray
to tensor. Converts numpy.ndarray (H x W x C) in the range [0, 255] to a torch.FloatTensor of shape (C x H x W) in the range [0.0, 1.0] if the numpy.ndarray has dtype = np.uint8 In the other cases, tensors are returned without scaling.-
__init__
()¶ Initialize self. See help(type(self)) for accurate signature.
-
Contrib¶
Augmentors¶
Augmentor¶
-
class
catalyst.contrib.data.augmentor.
Augmentor
(dict_key: str, augment_fn: Callable, input_key: str = None, output_key: str = None, **kwargs)[source]¶ Augmentation abstraction to use with data dictionaries.
-
__init__
(dict_key: str, augment_fn: Callable, input_key: str = None, output_key: str = None, **kwargs)[source]¶ Augmentation abstraction to use with data dictionaries.
- Parameters
dict_key – key to transform
augment_fn – augmentation function to use
input_key –
augment_fn
input keyoutput_key –
augment_fn
output key**kwargs – default kwargs for augmentations function
-
AugmentorKeys¶
Readers¶
Readers are the abstraction for your dataset. They can open an elem from the dataset and transform it to data, needed by your network. For example open image by path, or read string and tokenize it.
IReader¶
ImageReader (CV)¶
-
class
catalyst.contrib.data.cv.reader.
ImageReader
(input_key: str, output_key: Optional[str] = None, rootpath: Optional[str] = None, grayscale: bool = False)[source]¶ Bases:
catalyst.contrib.data.reader.IReader
Image reader abstraction. Reads images from a
csv
dataset.-
__init__
(input_key: str, output_key: Optional[str] = None, rootpath: Optional[str] = None, grayscale: bool = False)[source]¶ - Parameters
input_key – key to use from annotation dict
output_key – key to use to store the result, default:
input_key
rootpath – path to images dataset root directory (so your can use relative paths in annotations)
grayscale – flag if you need to work only with grayscale images
-
LambdaReader¶
-
class
catalyst.contrib.data.reader.
LambdaReader
(input_key: str, output_key: Optional[str] = None, lambda_fn: Optional[Callable] = None, **kwargs)[source]¶ Reader abstraction with an lambda encoders. Can read an elem from dataset and apply encode_fn function to it.
-
__init__
(input_key: str, output_key: Optional[str] = None, lambda_fn: Optional[Callable] = None, **kwargs)[source]¶ - Parameters
input_key – input key to use from annotation dict
output_key – output key to use to store the result
lambda_fn – encode function to use to prepare your data (for example convert chars/words/tokens to indices, etc)
kwargs – kwargs for encode function
-
MaskReader (CV)¶
-
class
catalyst.contrib.data.cv.reader.
MaskReader
(input_key: str, output_key: Optional[str] = None, rootpath: Optional[str] = None, clip_range: Tuple[Union[int, float], Union[int, float]] = (0, 1))[source]¶ Bases:
catalyst.contrib.data.reader.IReader
Mask reader abstraction. Reads masks from a csv dataset.
-
__init__
(input_key: str, output_key: Optional[str] = None, rootpath: Optional[str] = None, clip_range: Tuple[Union[int, float], Union[int, float]] = (0, 1))[source]¶ - Parameters
input_key – key to use from annotation dict
output_key – key to use to store the result, default:
input_key
rootpath – path to images dataset root directory (so your can use relative paths in annotations)
clip_range (Tuple[int, int]) – lower and upper interval edges, image values outside the interval are clipped to the interval edges
-
ScalarReader¶
-
class
catalyst.contrib.data.reader.
ScalarReader
(input_key: str, output_key: Optional[str] = None, dtype: Type = <class 'numpy.float32'>, default_value: float = None, one_hot_classes: int = None, smoothing: float = None)[source]¶ Numeric data reader abstraction. Reads a single float, int, str or other from data
-
__init__
(input_key: str, output_key: Optional[str] = None, dtype: Type = <class 'numpy.float32'>, default_value: float = None, one_hot_classes: int = None, smoothing: float = None)[source]¶ - Parameters
input_key – input key to use from annotation dict
output_key – output key to use to store the result, default:
input_key
dtype – datatype of scalar values to use
default_value – default value to use if something goes wrong
one_hot_classes – number of one-hot classes
smoothing (float, optional) – if specified applies label smoothing to one_hot classes
-
ReaderCompose¶
Datasets (CV)¶
ImageFolderDataset¶
-
class
catalyst.contrib.data.cv.dataset.
ImageFolderDataset
(rootpath: str, target_key: str = 'targets', dir2class: Optional[Mapping[str, int]] = None, dict_transform: Optional[Callable[[Dict], Dict]] = None)[source]¶ Bases:
catalyst.data.dataset.torch.PathsDataset
Dataset class that derives targets from samples filesystem paths. Dataset structure should be the following:
rootpath/ |-- class1/ # folder of N images | |-- image11 | |-- image12 | ... | `-- image1N ... `-- classM/ # folder of K images |-- imageM1 |-- imageM2 ... `-- imageMK
-
__init__
(rootpath: str, target_key: str = 'targets', dir2class: Optional[Mapping[str, int]] = None, dict_transform: Optional[Callable[[Dict], Dict]] = None) → None[source]¶ Constructor method for the
ImageFolderDataset
class.- Parameters
rootpath – root directory of dataset
target_key – key to use to store target label
dir2class (Mapping[str, int], optional) – mapping from folder name to class index
dict_transform (Callable[[Dict], Dict]], optional) – transforms to use on dict
-