# Dataflow¶

## Base dataflow¶

Catalyst uses the “key-value is all you need” approach. In other words, the output of your Dataset/Dataloader should be a key-value (python dict).

Example of dataflow:

class MyDataset:

def __get_item__(self, index):
...
return {"features": np.ndarray, "targets": np.ndarray}

class MyModel:

def forward(self, features):
...
return logits

class MyRunner:

def handle_batch(self, batch):
# on this step we also have self.batch = batch = {"features": ..., "targets": ...}
logits = self.model(batch["features"])
self.batch.update({"logits": logits})
# this is useful for other components of the pipeline

model = MyModel()
runner = MyRunner()
runner.train(...)


Note

SupervisedRunner has data preprocessing features to transform tuple/list-based data into key-value.

Such approach is easily extensible for any number of features, targets and very convenient to read, thanks to “automatic naming documentation” - keys for the values:

class MyDataset:

def __get_item__(self, index):
...
return {"features": np.ndarray, "extra_features": np.ndarray, "targets": np.ndarray}

class MyModel:

def forward(self, features, extra_features):
...
return logits

class MyRunner:

def handle_batch(self, batch):
# on this step we also have
# self.batch = batch = {"features": ..., "extra_features": ...,"targets": ...}
logits = self.model(batch["features"], batch["extra_features"])
self.batch.update()"logits": logits})
# this is useful for other components of the pipeline

model = MyModel()
runner = MyRunner()
runner.train(...)


Moreover, if some of the features are not required anymore - you don’t have to rewrite your code:

class MyDataset:

def __get_item__(self, index):
...
return {"features": np.ndarray, "extra_features": np.ndarray, "targets": np.ndarray}

class MyModel:

def forward(self, features):
...
return logits

class MyRunner:

def handle_batch(self, batch):
# on this step we also have
# self.batch = batch = {"features": ..., "extra_features": ...,"targets": ...}
logits = self.model(batch["features"])
self.batch.update({"logits": logits})
# this is useful for other components of the pipeline

model = MyModel()
runner = MyRunner()
runner.train(...)


Key-value storage also can be used to store the datasets/loaders for the experiment. In this case we also need to use OrderedDict to ensure correct epoch handling - that your model will firstly train on some train dataset and only then will be evaluated on some valid dataset:

train_loader = MyDataset(...)
model = MyModel()
runner = MyRunner()


Catalyst uses the following “automatic naming documentation” for loader keys handling:

• if loader_key starts with “train” - is’s train datasoure, we need to run forward and backward passes on it.

• if loader_key starts with “valid” - is’s validation datasoure, we need to run forward, but not the backward pass on it.

• if loader_key starts with “infer” - is’s inference datasoure, we need to run forward, but not the backward pass on it.

## Multiple datasources¶

Thanks to key-value approach, it’s possible to handle any number of datasets/loader without code changes or tricks with Datasets concatenation, etc:

train_loader = MyDataset(...)
)
model = MyModel()
runner = MyRunner()


What is even more interesting, you could also do something like:

train_loader = MyDataset(...)

)
model = MyModel()
runner = MyRunner()


Once again, it’s also valid to do something like:

train_loader = MyDataset(...)
)
model = MyModel()
runner = MyRunner()


In case of multiple loaders, you could select one of them for model selection with valid_loader, valid_metric and minimize_valid_metric params in the runner.train. For example, to use valid2 loaders as your model selection one you could do the following:

train_loader = MyDataset(...)
)
model = MyModel()
criterion = ...
optimizer = ...
runner = MyRunner()
runner.train(
model=model,
criterion=criterion,
optimizer=optimizer,
valid_metric="loss",
minimize_valid_metric=True
)


Note

By default, Catalyst doesn’t suppose to use valid_loader, valid_metric and minimize_valid_metric for model selection.

## Metric for model selection¶

Suppose, you are using set of different metrics in your pipeline:

class MyRunner:

def handle_batch(self, batch):
# on this step we also have self.batch = batch = {"features": ..., "targets": ...}
logits = self.model(batch["features"])
self.output = {"logits": logits}
# this is useful for other components of the pipeline

loaders = {"train": ..., "valid": ...}
model = ...
criterion = ...
optimizer = ...
runner = MyRunner()
runner.train(
model=model,
criterion=criterion,
optimizer=optimizer,
callbacks=[
AccuracyCallback(input_key="logits", target_key="targets", topk=(1, 3))
]
)


You could select one for model selection with valid_loader, valid_metric and minimize_valid_metric params in the runner.train. For example, to use accuracy01 metric as your model selection one you could do the following:

class MyRunner:

def handle_batch(self, batch):
# on this step we also have self.batch = batch = {"features": ..., "targets": ...}
logits = self.model(batch["features"])
self.output = {"logits": logits}
# this is useful for other components of the pipeline

loaders = {"train": ..., "valid": ...}
model = ...
criterion = ...
optimizer = ...
runner = MyRunner()
# as far as we would like to maximize our model accuracy...
runner.train(
model=model,
criterion=criterion,
optimizer=optimizer,
valid_metric="accuracy01",
minimize_valid_metric=False,
callbacks=[
AccuracyCallback(input_key="logits", target_key="targets", topk=(1, 3))
]
)


Note

By default, Catalyst doesn’t suppose to use valid_loader, valid_metric and minimize_valid_metric for model selection.

## Use part of the data¶

If you would like to use only some part of your data from the loader (for example, you would like to overfit for one small portion of the data to check your pipeline), you could use BatchLimitLoaderWrapper:

train_loader = BatchLimitLoaderWrapper(MyDataset(...), num_batches=1)
model = MyModel()
runner = MyRunner()


As a more user-friendly approach with runner.train:

train_loader = MyDataset(...)
model = MyModel()
runner = MyRunner()
# here we overfit for one batch per loader


And more convenient and customizable way:

train_loader = MyDataset(...)
# here we overfit for 10 batches in train loader
# and half of the valid loader