Engines¶
You could check engines overview under examples/engines section.
AMP¶
AMPEngine¶
-
class
catalyst.engines.amp.
AMPEngine
(device: str = 'cuda', scaler_kwargs: Dict[str, Any] = None)[source]¶ Bases:
catalyst.engines.torch.DeviceEngine
Pytorch.AMP single training device engine.
- Parameters
device – used device, default is “cuda”.
scaler_kwargs – parameters for torch.cuda.amp.GradScaler. Possible parameters: https://pytorch.org/docs/stable/amp.html#torch.cuda.amp.GradScaler
Examples:
from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.AMPEngine("cuda:1"), ... )
from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.AMPEngine("cuda:1") # ...
args: logs: ... model: _target_: ... ... engine: _target_: AMPEngine device: cuda:1 stages: ...
DataParallelAMPEngine¶
-
class
catalyst.engines.amp.
DataParallelAMPEngine
(scaler_kwargs: Dict[str, Any] = None)[source]¶ Bases:
catalyst.engines.amp.AMPEngine
AMP multi-gpu training device engine.
- Parameters
scaler_kwargs – parameters for torch.cuda.amp.GradScaler. Possible parameters: https://pytorch.org/docs/stable/amp.html#torch.cuda.amp.GradScaler
Examples:
from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.DataParallelAMPEngine(), ... )
from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.DataParallelAMPEngine() # ...
args: logs: ... model: _target_: ... ... engine: _target_: DataParallelAMPEngine stages: ...
DistributedDataParallelAMPEngine¶
-
class
catalyst.engines.amp.
DistributedDataParallelAMPEngine
(address: str = None, port: Union[str, int] = None, sync_bn: bool = False, ddp_kwargs: Dict[str, Any] = None, process_group_kwargs: Dict[str, Any] = None, scaler_kwargs: Dict[str, Any] = None)[source]¶ Bases:
catalyst.engines.torch.DistributedDataParallelEngine
Distributed AMP multi-gpu training device engine.
- Parameters
address – address to use for backend.
port – port to use for backend.
sync_bn – boolean flag for batchnorm synchonization during disributed training. if True, applies PyTorch convert_sync_batchnorm to the model for native torch distributed only. Default, False.
ddp_kwargs – parameters for torch.nn.parallel.DistributedDataParallel. More info here: https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html#torch.nn.parallel.DistributedDataParallel
process_group_kwargs – parameters for torch.distributed.init_process_group. More info here: https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group
scaler_kwargs – parameters for torch.cuda.amp.GradScaler. Possible parameters: https://pytorch.org/docs/stable/amp.html#torch.cuda.amp.GradScaler
Examples:
from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.DistributedDataParallelAMPEngine(), ... )
from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.DistributedDataParallelAMPEngine( address="0.0.0.0", port=23234, ddp_kwargs={"find_unused_parameters": False}, process_group_kwargs={"port": 12345}, scaler_kwargs={"growth_factor": 1.5} ) # ...
args: logs: ... model: _target_: ... ... engine: _target_: DistributedDataParallelAMPEngine address: 0.0.0.0 port: 23234 ddp_kwargs: find_unused_parameters: false process_group_kwargs: port: 12345 scaler_kwargs: growth_factor: 1.5 stages: ...
Apex¶
APEXEngine¶
-
class
catalyst.engines.apex.
APEXEngine
(device: str = 'cuda', apex_kwargs: Dict[str, Any] = None)[source]¶ Bases:
catalyst.engines.torch.DeviceEngine
Apex single training device engine.
- Parameters
device – use device, default is “cuda”.
apex_kwargs –
parameters for apex.amp.initialize except models and optimizers (they will be forwared automatically).
Docs for apex.amp.initialize: https://nvidia.github.io/apex/amp.html#apex.amp.initialize
Examples:
from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.APEXEngine(apex_kwargs=dict(opt_level="O1", keep_batchnorm_fp32=False)), ... )
from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.APEXEngine(apex_kwargs=dict(opt_level="O1", keep_batchnorm_fp32=False)) # ...
args: logs: ... model: _target_: ... ... engine: _target_: APEXEngine apex_kwargs: opt_level: O1 keep_batchnorm_fp32: false stages: ...
DataParallelApexEngine¶
-
catalyst.engines.apex.
DataParallelApexEngine
¶ alias of
catalyst.engines.apex.DataParallelAPEXEngine
DistributedDataParallelApexEngine¶
-
catalyst.engines.apex.
DistributedDataParallelApexEngine
¶ alias of
catalyst.engines.apex.DistributedDataParallelAPEXEngine
DeepSpeed¶
DistributedDataParallelDeepSpeedEngine¶
-
class
catalyst.engines.deepspeed.
DistributedDataParallelDeepSpeedEngine
(address: str = None, port: Union[str, int] = None, process_group_kwargs: Dict[str, Any] = None, deepspeed_kwargs: Dict[str, Any] = None, train_batch_size: int = 256)[source]¶ Bases:
catalyst.engines.torch.DeviceEngine
Distributed DeepSpeed MultiGPU training device engine.
- Parameters
address – address to use for backend.
port – port to use for backend.
process_group_kwargs – parameters for torch.distributed.init_process_group. More info here: https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group
deepspeed_kwargs – parameters for deepspeed.initialize. More info here: https://deepspeed.readthedocs.io/en/latest/initialize.html
train_batch_size – shortcut for train batch size for deepspeed scaling (default: 256) for proper configuration, please use deepspeed_kwargs[‘config’] instead
Examples:
from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.DistributedDataParallelDeepSpeedEngine(), ... )
from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.DistributedDataParallelDeepSpeedEngine( address="0.0.0.0", port=23234, process_group_kwargs={"port": 12345}, deepspeed_kwargs={"config": {"train_batch_size": 64}} ) # ...
args: logs: ... model: _target_: ... ... engine: _target_: DistributedDataParallelDeepSpeedEngine address: 0.0.0.0 port: 23234 process_group_kwargs: port: 12345 deepspeed_kwargs: config: train_batch_size: 64 stages: ...
FairScale¶
PipelineParallelFairScaleEngine¶
-
class
catalyst.engines.fairscale.
PipelineParallelFairScaleEngine
(pipe_kwargs: Dict[str, Any] = None)[source]¶ Bases:
catalyst.engines.torch.DeviceEngine
FairScale multi-gpu training device engine.
- Parameters
pipe_kwargs – parameters for fairscale.nn.Pipe. Docs for fairscale.nn.Pipe: https://fairscale.readthedocs.io/en/latest/api/nn/pipe.html
Examples:
from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.PipelineParallelFairScaleEngine(), ... )
from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.PipelineParallelFairScaleEngine( pipe_kwargs={"balance": [3, 1]} ) # ...
args: logs: ... model: _target_: ... ... engine: _target_: PipelineParallelFairScaleEngine pipe_kwargs: balance: [3, 1] stages: ...
Torch¶
DeviceEngine¶
-
class
catalyst.engines.torch.
DeviceEngine
(device: str = None)[source]¶ Bases:
catalyst.core.engine.IEngine
Single training device engine.
- Parameters
device – use device, default is “cpu”.
Examples:
from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.DeviceEngine("cuda:1"), ... )
from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.DeviceEngine("cuda:1") # ...
args: logs: ... model: _target_: ... ... engine: _target_: DeviceEngine device: cuda:1 stages: ...
DataParallelEngine¶
-
class
catalyst.engines.torch.
DataParallelEngine
[source]¶ Bases:
catalyst.engines.torch.DeviceEngine
MultiGPU training device engine.
Examples:
from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.DataParallelEngine(), ... )
from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.DataParallelEngine() # ...
args: logs: ... model: _target_: ... ... engine: _target_: DataParallelEngine stages: ...
DistributedDataParallelEngine¶
-
class
catalyst.engines.torch.
DistributedDataParallelEngine
(address: str = None, port: Union[str, int] = None, sync_bn: bool = False, ddp_kwargs: Dict[str, Any] = None, process_group_kwargs: Dict[str, Any] = None)[source]¶ Bases:
catalyst.engines.torch.DeviceEngine
Distributed MultiGPU training device engine.
- Parameters
address – address to use for backend.
port – port to use for backend.
sync_bn – boolean flag for batchnorm synchonization during disributed training. if True, applies PyTorch convert_sync_batchnorm to the model for native torch distributed only. Default, False.
ddp_kwargs – parameters for torch.nn.parallel.DistributedDataParallel. More info here: https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html#torch.nn.parallel.DistributedDataParallel
process_group_kwargs – parameters for torch.distributed.init_process_group. More info here: https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group
Examples:
from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.DistributedDataParallelEngine(), ... )
from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.DistributedDataParallelEngine( address="0.0.0.0", port=23234, ddp_kwargs={"find_unused_parameters": False}, process_group_kwargs={"backend": "nccl"}, ) # ...
args: logs: ... model: _target_: ... ... engine: _target_: DistributedDataParallelEngine address: 0.0.0.0 port: 23234 ddp_kwargs: find_unused_parameters: false process_group_kwargs: backend: nccl stages: ...
XLA¶
DeviceEngine¶
-
class
catalyst.engines.xla.
XLAEngine
[source]¶ Bases:
catalyst.engines.torch.DeviceEngine
XLA SingleTPU training device engine.
Examples:
import os from datetime import datetime import torch from torch import nn, optim from torch.utils.data import DataLoader from catalyst import dl from catalyst.contrib.datasets import CIFAR10 from catalyst.contrib.nn import ResidualBlock from catalyst.data import transforms def conv_block(in_channels, out_channels, pool=False): layers = [ nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1), nn.BatchNorm2d(out_channels), nn.ReLU(inplace=True), ] if pool: layers.append(nn.MaxPool2d(2)) return nn.Sequential(*layers) def resnet9(in_channels: int, num_classes: int, size: int = 16): sz, sz2, sz4, sz8 = size, size * 2, size * 4, size * 8 return nn.Sequential( conv_block(in_channels, sz), conv_block(sz, sz2, pool=True), ResidualBlock(nn.Sequential(conv_block(sz2, sz2), conv_block(sz2, sz2))), conv_block(sz2, sz4, pool=True), conv_block(sz4, sz8, pool=True), ResidualBlock(nn.Sequential(conv_block(sz8, sz8), conv_block(sz8, sz8))), nn.Sequential( nn.MaxPool2d(4), nn.Flatten(), nn.Dropout(0.2), nn.Linear(sz8, num_classes) ), ) class CustomRunner(dl.IRunner): def __init__(self, logdir): super().__init__() self._logdir = logdir def get_engine(self): return dl.XLAEngine() def get_loggers(self): return { "console": dl.ConsoleLogger(), "csv": dl.CSVLogger(logdir=self._logdir), "tensorboard": dl.TensorboardLogger(logdir=self._logdir), } @property def stages(self): return ["train"] def get_stage_len(self, stage: str) -> int: return 3 def get_loaders(self, stage: str): transform = transforms.Compose( [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))] ) train_data = CIFAR10(os.getcwd(), train=False, download=True, transform=transform) valid_data = CIFAR10(os.getcwd(), train=False, download=True, transform=transform) if self.engine.is_ddp: train_sampler = torch.utils.data.distributed.DistributedSampler( train_data, num_replicas=self.engine.world_size, rank=self.engine.rank, shuffle=True ) valid_sampler = torch.utils.data.distributed.DistributedSampler( valid_data, num_replicas=self.engine.world_size, rank=self.engine.rank, shuffle=False ) else: train_sampler = valid_sampler = None return { "train": DataLoader(train_data, batch_size=32, sampler=train_sampler), "valid": DataLoader(valid_data, batch_size=32, sampler=valid_sampler), } def get_model(self, stage: str): model = self.model if self.model is not None else resnet9(in_channels=3, num_classes=10) return model def get_criterion(self, stage: str): return nn.CrossEntropyLoss() def get_optimizer(self, stage: str, model): return optim.Adam(model.parameters(), lr=1e-3) def get_scheduler(self, stage: str, optimizer): return optim.lr_scheduler.MultiStepLR(optimizer, [5, 8], gamma=0.3) def get_callbacks(self, stage: str): return { "criterion": dl.CriterionCallback( metric_key="loss", input_key="logits", target_key="targets" ), "optimizer": dl.OptimizerCallback(metric_key="loss"), "scheduler": dl.SchedulerCallback(loader_key="valid", metric_key="loss"), "accuracy": dl.AccuracyCallback( input_key="logits", target_key="targets", topk_args=(1, 3, 5) ), "checkpoint": dl.CheckpointCallback( self._logdir, loader_key="valid", metric_key="accuracy", minimize=False, save_n_best=1, ), "tqdm": dl.TqdmCallback(), } def handle_batch(self, batch): x, y = batch logits = self.model(x) self.batch = { "features": x, "targets": y, "logits": logits, } logdir = f"logs/{datetime.now().strftime('%Y%m%d-%H%M%S')}" runner = CustomRunner(logdir) runner.run()
DataParallelEngine¶
-
class
catalyst.engines.xla.
DistributedXLAEngine
[source]¶ Bases:
catalyst.engines.torch.DeviceEngine
Distributed XLA MultiTPU training device engine.
Examples:
import os from datetime import datetime import torch from torch import nn, optim from torch.utils.data import DataLoader from catalyst import dl from catalyst.contrib.datasets import CIFAR10 from catalyst.contrib.nn import ResidualBlock from catalyst.data import transforms def conv_block(in_channels, out_channels, pool=False): layers = [ nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1), nn.BatchNorm2d(out_channels), nn.ReLU(inplace=True), ] if pool: layers.append(nn.MaxPool2d(2)) return nn.Sequential(*layers) def resnet9(in_channels: int, num_classes: int, size: int = 16): sz, sz2, sz4, sz8 = size, size * 2, size * 4, size * 8 return nn.Sequential( conv_block(in_channels, sz), conv_block(sz, sz2, pool=True), ResidualBlock(nn.Sequential(conv_block(sz2, sz2), conv_block(sz2, sz2))), conv_block(sz2, sz4, pool=True), conv_block(sz4, sz8, pool=True), ResidualBlock(nn.Sequential(conv_block(sz8, sz8), conv_block(sz8, sz8))), nn.Sequential( nn.MaxPool2d(4), nn.Flatten(), nn.Dropout(0.2), nn.Linear(sz8, num_classes) ), ) class CustomRunner(dl.IRunner): def __init__(self, logdir): super().__init__() self._logdir = logdir def get_engine(self): return dl.DistributedXLAEngine() def get_loggers(self): return { "console": dl.ConsoleLogger(), "csv": dl.CSVLogger(logdir=self._logdir), "tensorboard": dl.TensorboardLogger(logdir=self._logdir), } @property def stages(self): return ["train"] def get_stage_len(self, stage: str) -> int: return 3 def get_loaders(self, stage: str): transform = transforms.Compose( [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))] ) train_data = CIFAR10(os.getcwd(), train=False, download=True, transform=transform) valid_data = CIFAR10(os.getcwd(), train=False, download=True, transform=transform) if self.engine.is_ddp: train_sampler = torch.utils.data.distributed.DistributedSampler( train_data, num_replicas=self.engine.world_size, rank=self.engine.rank, shuffle=True ) valid_sampler = torch.utils.data.distributed.DistributedSampler( valid_data, num_replicas=self.engine.world_size, rank=self.engine.rank, shuffle=False ) else: train_sampler = valid_sampler = None return { "train": DataLoader(train_data, batch_size=32, sampler=train_sampler), "valid": DataLoader(valid_data, batch_size=32, sampler=valid_sampler), } def get_model(self, stage: str): model = self.model if self.model is not None else resnet9(in_channels=3, num_classes=10) return model def get_criterion(self, stage: str): return nn.CrossEntropyLoss() def get_optimizer(self, stage: str, model): return optim.Adam(model.parameters(), lr=1e-3) def get_scheduler(self, stage: str, optimizer): return optim.lr_scheduler.MultiStepLR(optimizer, [5, 8], gamma=0.3) def get_callbacks(self, stage: str): return { "criterion": dl.CriterionCallback( metric_key="loss", input_key="logits", target_key="targets" ), "optimizer": dl.OptimizerCallback(metric_key="loss"), "scheduler": dl.SchedulerCallback(loader_key="valid", metric_key="loss"), "accuracy": dl.AccuracyCallback( input_key="logits", target_key="targets", topk_args=(1, 3, 5) ), "checkpoint": dl.CheckpointCallback( self._logdir, loader_key="valid", metric_key="accuracy", minimize=False, save_n_best=1, ), "tqdm": dl.TqdmCallback(), } def handle_batch(self, batch): x, y = batch logits = self.model(x) self.batch = { "features": x, "targets": y, "logits": logits, } logdir = f"logs/{datetime.now().strftime('%Y%m%d-%H%M%S')}" runner = CustomRunner(logdir) runner.run()