Engines¶
You could check engines overview under examples/engines section.
AMP¶
AMPEngine¶
-
class
catalyst.engines.amp.
AMPEngine
(device: str = 'cuda', scaler_kwargs: Dict[str, Any] = None)[source]¶ Bases:
catalyst.engines.torch.DeviceEngine
Pytorch.AMP single training device engine.
- Parameters
device – used device, default is “cuda”.
scaler_kwargs – parameters for torch.cuda.amp.GradScaler. Possible parameters: https://pytorch.org/docs/stable/amp.html#torch.cuda.amp.GradScaler
Examples:
from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.AMPEngine("cuda:1"), ... )
from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.AMPEngine("cuda:1") # ...
args: logs: ... model: _target_: ... ... engine: _target_: AMPEngine device: cuda:1 stages: ...
DataParallelAMPEngine¶
-
class
catalyst.engines.amp.
DataParallelAMPEngine
(scaler_kwargs: Dict[str, Any] = None)[source]¶ Bases:
catalyst.engines.amp.AMPEngine
AMP multi-gpu training device engine.
- Parameters
scaler_kwargs – parameters for torch.cuda.amp.GradScaler. Possible parameters: https://pytorch.org/docs/stable/amp.html#torch.cuda.amp.GradScaler
Examples:
from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.DataParallelAMPEngine(), ... )
from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.DataParallelAMPEngine() # ...
args: logs: ... model: _target_: ... ... engine: _target_: DataParallelAMPEngine stages: ...
DistributedDataParallelAMPEngine¶
-
class
catalyst.engines.amp.
DistributedDataParallelAMPEngine
(address: str = None, port: Union[str, int] = None, sync_bn: bool = False, ddp_kwargs: Dict[str, Any] = None, process_group_kwargs: Dict[str, Any] = None, scaler_kwargs: Dict[str, Any] = None)[source]¶ Bases:
catalyst.engines.torch.DistributedDataParallelEngine
Distributed AMP multi-gpu training device engine.
- Parameters
address – address to use for backend.
port – port to use for backend.
sync_bn – boolean flag for batchnorm synchonization during disributed training. if True, applies PyTorch convert_sync_batchnorm to the model for native torch distributed only. Default, False.
ddp_kwargs – parameters for torch.nn.parallel.DistributedDataParallel. More info here: https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html#torch.nn.parallel.DistributedDataParallel
process_group_kwargs – parameters for torch.distributed.init_process_group. More info here: https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group
scaler_kwargs – parameters for torch.cuda.amp.GradScaler. Possible parameters: https://pytorch.org/docs/stable/amp.html#torch.cuda.amp.GradScaler
Examples:
from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.DistributedDataParallelAMPEngine(), ... )
from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.DistributedDataParallelAMPEngine( address="0.0.0.0", port=23234, ddp_kwargs={"find_unused_parameters": False}, process_group_kwargs={"port": 12345}, scaler_kwargs={"growth_factor": 1.5} ) # ...
args: logs: ... model: _target_: ... ... engine: _target_: DistributedDataParallelAMPEngine address: 0.0.0.0 port: 23234 ddp_kwargs: find_unused_parameters: false process_group_kwargs: port: 12345 scaler_kwargs: growth_factor: 1.5 stages: ...
Apex¶
APEXEngine¶
-
class
catalyst.engines.apex.
APEXEngine
(device: str = 'cuda', apex_kwargs: Dict[str, Any] = None)[source]¶ Bases:
catalyst.engines.torch.DeviceEngine
Apex single training device engine.
- Parameters
device – use device, default is “cuda”.
apex_kwargs –
parameters for apex.amp.initialize except models and optimizers (they will be forwared automatically).
Docs for apex.amp.initialize: https://nvidia.github.io/apex/amp.html#apex.amp.initialize
Examples:
from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.APEXEngine(apex_kwargs=dict(opt_level="O1", keep_batchnorm_fp32=False)), ... )
from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.APEXEngine(apex_kwargs=dict(opt_level="O1", keep_batchnorm_fp32=False)) # ...
args: logs: ... model: _target_: ... ... engine: _target_: APEXEngine apex_kwargs: opt_level: O1 keep_batchnorm_fp32: false stages: ...
DataParallelApexEngine¶
-
catalyst.engines.apex.
DataParallelApexEngine
¶ alias of
catalyst.engines.apex.DataParallelAPEXEngine
DistributedDataParallelApexEngine¶
-
catalyst.engines.apex.
DistributedDataParallelApexEngine
¶ alias of
catalyst.engines.apex.DistributedDataParallelAPEXEngine
DeepSpeed¶
DistributedDataParallelDeepSpeedEngine¶
-
class
catalyst.engines.deepspeed.
DistributedDataParallelDeepSpeedEngine
(address: str = None, port: Union[str, int] = None, process_group_kwargs: Dict[str, Any] = None, deepspeed_kwargs: Dict[str, Any] = None, train_batch_size: int = 256)[source]¶ Bases:
catalyst.engines.torch.DeviceEngine
Distributed DeepSpeed MultiGPU training device engine.
- Parameters
address – address to use for backend.
port – port to use for backend.
process_group_kwargs – parameters for torch.distributed.init_process_group. More info here: https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group
deepspeed_kwargs – parameters for deepspeed.initialize. More info here: https://deepspeed.readthedocs.io/en/latest/initialize.html
train_batch_size – shortcut for train batch size for deepspeed scaling (default: 256) for proper configuration, please use deepspeed_kwargs[‘config’] instead
Examples:
from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.DistributedDataParallelDeepSpeedEngine(), ... )
from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.DistributedDataParallelDeepSpeedEngine( address="0.0.0.0", port=23234, process_group_kwargs={"port": 12345}, deepspeed_kwargs={"config": {"train_batch_size": 64}} ) # ...
args: logs: ... model: _target_: ... ... engine: _target_: DistributedDataParallelDeepSpeedEngine address: 0.0.0.0 port: 23234 process_group_kwargs: port: 12345 deepspeed_kwargs: config: train_batch_size: 64 stages: ...
FairScale¶
PipelineParallelFairScaleEngine¶
-
class
catalyst.engines.fairscale.
PipelineParallelFairScaleEngine
(pipe_kwargs: Dict[str, Any] = None)[source]¶ Bases:
catalyst.engines.torch.DeviceEngine
FairScale multi-gpu training device engine.
- Parameters
pipe_kwargs – parameters for fairscale.nn.Pipe. Docs for fairscale.nn.Pipe: https://fairscale.readthedocs.io/en/latest/api/nn/pipe.html
Examples:
from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.PipelineParallelFairScaleEngine(), ... )
from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.PipelineParallelFairScaleEngine( pipe_kwargs={"balance": [3, 1]} ) # ...
args: logs: ... model: _target_: ... ... engine: _target_: PipelineParallelFairScaleEngine pipe_kwargs: balance: [3, 1] stages: ...
Torch¶
DeviceEngine¶
-
class
catalyst.engines.torch.
DeviceEngine
(device: str = None)[source]¶ Bases:
catalyst.core.engine.IEngine
Single training device engine.
- Parameters
device – use device, default is “cpu”.
Examples:
from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.DeviceEngine("cuda:1"), ... )
from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.DeviceEngine("cuda:1") # ...
args: logs: ... model: _target_: ... ... engine: _target_: DeviceEngine device: cuda:1 stages: ...
DataParallelEngine¶
-
class
catalyst.engines.torch.
DataParallelEngine
[source]¶ Bases:
catalyst.engines.torch.DeviceEngine
MultiGPU training device engine.
Examples:
from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.DataParallelEngine(), ... )
from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.DataParallelEngine() # ...
args: logs: ... model: _target_: ... ... engine: _target_: DataParallelEngine stages: ...
DistributedDataParallelEngine¶
-
class
catalyst.engines.torch.
DistributedDataParallelEngine
(address: str = None, port: Union[str, int] = None, sync_bn: bool = False, ddp_kwargs: Dict[str, Any] = None, process_group_kwargs: Dict[str, Any] = None)[source]¶ Bases:
catalyst.engines.torch.DeviceEngine
Distributed MultiGPU training device engine.
- Parameters
address – address to use for backend.
port – port to use for backend.
sync_bn – boolean flag for batchnorm synchonization during disributed training. if True, applies PyTorch convert_sync_batchnorm to the model for native torch distributed only. Default, False.
ddp_kwargs – parameters for torch.nn.parallel.DistributedDataParallel. More info here: https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html#torch.nn.parallel.DistributedDataParallel
process_group_kwargs – parameters for torch.distributed.init_process_group. More info here: https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group
Examples:
from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.DistributedDataParallelEngine(), ... )
from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.DistributedDataParallelEngine( address="0.0.0.0", port=23234, ddp_kwargs={"find_unused_parameters": False}, process_group_kwargs={"backend": "nccl"}, ) # ...
args: logs: ... model: _target_: ... ... engine: _target_: DistributedDataParallelEngine address: 0.0.0.0 port: 23234 ddp_kwargs: find_unused_parameters: false process_group_kwargs: backend: nccl stages: ...
XLA¶
DeviceEngine¶
-
class
catalyst.engines.xla.
XLAEngine
[source]¶ Bases:
catalyst.engines.torch.DeviceEngine
XLA SingleTPU training device engine.
Examples:
import os from datetime import datetime import torch from torch import nn, optim from torch.utils.data import DataLoader from catalyst import dl from catalyst.contrib import ( ImageToTensor, NormalizeImage, Compose, CIFAR10, ResidualBlock ) def conv_block(in_channels, out_channels, pool=False): layers = [ nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1), nn.BatchNorm2d(out_channels), nn.ReLU(inplace=True), ] if pool: layers.append(nn.MaxPool2d(2)) return nn.Sequential(*layers) def resnet9(in_channels: int, num_classes: int, size: int = 16): sz, sz2, sz4, sz8 = size, size * 2, size * 4, size * 8 return nn.Sequential( conv_block(in_channels, sz), conv_block(sz, sz2, pool=True), ResidualBlock(nn.Sequential(conv_block(sz2, sz2), conv_block(sz2, sz2))), conv_block(sz2, sz4, pool=True), conv_block(sz4, sz8, pool=True), ResidualBlock(nn.Sequential(conv_block(sz8, sz8), conv_block(sz8, sz8))), nn.Sequential( nn.MaxPool2d(4), nn.Flatten(), nn.Dropout(0.2), nn.Linear(sz8, num_classes) ), ) class CustomRunner(dl.IRunner): def __init__(self, logdir): super().__init__() self._logdir = logdir def get_engine(self): return dl.XLAEngine() def get_loggers(self): return { "console": dl.ConsoleLogger(), "csv": dl.CSVLogger(logdir=self._logdir), "tensorboard": dl.TensorboardLogger(logdir=self._logdir), } @property def stages(self): return ["train"] def get_stage_len(self, stage: str) -> int: return 3 def get_loaders(self, stage: str): transform = Compose( [ImageToTensor(), NormalizeImage((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))] ) train_data = CIFAR10(os.getcwd(), train=False, download=True, transform=transform) valid_data = CIFAR10(os.getcwd(), train=False, download=True, transform=transform) if self.engine.is_ddp: train_sampler = torch.utils.data.distributed.DistributedSampler( train_data, num_replicas=self.engine.world_size, rank=self.engine.rank, shuffle=True ) valid_sampler = torch.utils.data.distributed.DistributedSampler( valid_data, num_replicas=self.engine.world_size, rank=self.engine.rank, shuffle=False ) else: train_sampler = valid_sampler = None return { "train": DataLoader(train_data, batch_size=32, sampler=train_sampler), "valid": DataLoader(valid_data, batch_size=32, sampler=valid_sampler), } def get_model(self, stage: str): model = self.model if self.model is not None else resnet9(in_channels=3, num_classes=10) return model def get_criterion(self, stage: str): return nn.CrossEntropyLoss() def get_optimizer(self, stage: str, model): return optim.Adam(model.parameters(), lr=1e-3) def get_scheduler(self, stage: str, optimizer): return optim.lr_scheduler.MultiStepLR(optimizer, [5, 8], gamma=0.3) def get_callbacks(self, stage: str): return { "criterion": dl.CriterionCallback( metric_key="loss", input_key="logits", target_key="targets" ), "optimizer": dl.OptimizerCallback(metric_key="loss"), "scheduler": dl.SchedulerCallback(loader_key="valid", metric_key="loss"), "accuracy": dl.AccuracyCallback( input_key="logits", target_key="targets", topk_args=(1, 3, 5) ), "checkpoint": dl.CheckpointCallback( self._logdir, loader_key="valid", metric_key="accuracy", minimize=False, save_n_best=1, ), "tqdm": dl.TqdmCallback(), } def handle_batch(self, batch): x, y = batch logits = self.model(x) self.batch = { "features": x, "targets": y, "logits": logits, } logdir = f"logs/{datetime.now().strftime('%Y%m%d-%H%M%S')}" runner = CustomRunner(logdir) runner.run()
DataParallelEngine¶
-
class
catalyst.engines.xla.
DistributedXLAEngine
[source]¶ Bases:
catalyst.engines.torch.DeviceEngine
Distributed XLA MultiTPU training device engine.
Examples:
import os from datetime import datetime import torch from torch import nn, optim from torch.utils.data import DataLoader from catalyst import dl from catalyst.contrib import ( ImageToTensor, NormalizeImage, Compose, CIFAR10, ResidualBlock ) def conv_block(in_channels, out_channels, pool=False): layers = [ nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1), nn.BatchNorm2d(out_channels), nn.ReLU(inplace=True), ] if pool: layers.append(nn.MaxPool2d(2)) return nn.Sequential(*layers) def resnet9(in_channels: int, num_classes: int, size: int = 16): sz, sz2, sz4, sz8 = size, size * 2, size * 4, size * 8 return nn.Sequential( conv_block(in_channels, sz), conv_block(sz, sz2, pool=True), ResidualBlock(nn.Sequential(conv_block(sz2, sz2), conv_block(sz2, sz2))), conv_block(sz2, sz4, pool=True), conv_block(sz4, sz8, pool=True), ResidualBlock(nn.Sequential(conv_block(sz8, sz8), conv_block(sz8, sz8))), nn.Sequential( nn.MaxPool2d(4), nn.Flatten(), nn.Dropout(0.2), nn.Linear(sz8, num_classes) ), ) class CustomRunner(dl.IRunner): def __init__(self, logdir): super().__init__() self._logdir = logdir def get_engine(self): return dl.DistributedXLAEngine() def get_loggers(self): return { "console": dl.ConsoleLogger(), "csv": dl.CSVLogger(logdir=self._logdir), "tensorboard": dl.TensorboardLogger(logdir=self._logdir), } @property def stages(self): return ["train"] def get_stage_len(self, stage: str) -> int: return 3 def get_loaders(self, stage: str): transform = Compose( [ImageToTensor(), NormalizeImage((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))] ) train_data = CIFAR10(os.getcwd(), train=False, download=True, transform=transform) valid_data = CIFAR10(os.getcwd(), train=False, download=True, transform=transform) if self.engine.is_ddp: train_sampler = torch.utils.data.distributed.DistributedSampler( train_data, num_replicas=self.engine.world_size, rank=self.engine.rank, shuffle=True ) valid_sampler = torch.utils.data.distributed.DistributedSampler( valid_data, num_replicas=self.engine.world_size, rank=self.engine.rank, shuffle=False ) else: train_sampler = valid_sampler = None return { "train": DataLoader(train_data, batch_size=32, sampler=train_sampler), "valid": DataLoader(valid_data, batch_size=32, sampler=valid_sampler), } def get_model(self, stage: str): model = self.model if self.model is not None else resnet9(in_channels=3, num_classes=10) return model def get_criterion(self, stage: str): return nn.CrossEntropyLoss() def get_optimizer(self, stage: str, model): return optim.Adam(model.parameters(), lr=1e-3) def get_scheduler(self, stage: str, optimizer): return optim.lr_scheduler.MultiStepLR(optimizer, [5, 8], gamma=0.3) def get_callbacks(self, stage: str): return { "criterion": dl.CriterionCallback( metric_key="loss", input_key="logits", target_key="targets" ), "optimizer": dl.OptimizerCallback(metric_key="loss"), "scheduler": dl.SchedulerCallback(loader_key="valid", metric_key="loss"), "accuracy": dl.AccuracyCallback( input_key="logits", target_key="targets", topk_args=(1, 3, 5) ), "checkpoint": dl.CheckpointCallback( self._logdir, loader_key="valid", metric_key="accuracy", minimize=False, save_n_best=1, ), "tqdm": dl.TqdmCallback(), } def handle_batch(self, batch): x, y = batch logits = self.model(x) self.batch = { "features": x, "targets": y, "logits": logits, } logdir = f"logs/{datetime.now().strftime('%Y%m%d-%H%M%S')}" runner = CustomRunner(logdir) runner.run()