Engines¶
You could check engines overview under examples/engines section.
AMP¶
AMPEngine¶
- class catalyst.engines.amp.AMPEngine(device: str = 'cuda', scaler_kwargs: Optional[Dict[str, Any]] = None)[source]¶
Bases:
catalyst.engines.torch.DeviceEnginePytorch.AMP single training device engine.
- Parameters
device – used device, default is “cuda”.
scaler_kwargs – parameters for torch.cuda.amp.GradScaler. Possible parameters: https://pytorch.org/docs/stable/amp.html#torch.cuda.amp.GradScaler
Examples:
from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.AMPEngine("cuda:1"), ... )
from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.AMPEngine("cuda:1") # ...
args: logs: ... model: _target_: ... ... engine: _target_: AMPEngine device: cuda:1 stages: ...
DataParallelAMPEngine¶
- class catalyst.engines.amp.DataParallelAMPEngine(scaler_kwargs: Optional[Dict[str, Any]] = None)[source]¶
Bases:
catalyst.engines.amp.AMPEngineAMP multi-gpu training device engine.
- Parameters
scaler_kwargs – parameters for torch.cuda.amp.GradScaler. Possible parameters: https://pytorch.org/docs/stable/amp.html#torch.cuda.amp.GradScaler
Examples:
from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.DataParallelAMPEngine(), ... )
from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.DataParallelAMPEngine() # ...
args: logs: ... model: _target_: ... ... engine: _target_: DataParallelAMPEngine stages: ...
DistributedDataParallelAMPEngine¶
- class catalyst.engines.amp.DistributedDataParallelAMPEngine(address: Optional[str] = None, port: Optional[Union[str, int]] = None, sync_bn: bool = False, ddp_kwargs: Optional[Dict[str, Any]] = None, process_group_kwargs: Optional[Dict[str, Any]] = None, scaler_kwargs: Optional[Dict[str, Any]] = None)[source]¶
Bases:
catalyst.engines.torch.DistributedDataParallelEngineDistributed AMP multi-gpu training device engine.
- Parameters
address – address to use for backend.
port – port to use for backend.
sync_bn – boolean flag for batchnorm synchonization during disributed training. if True, applies PyTorch convert_sync_batchnorm to the model for native torch distributed only. Default, False.
ddp_kwargs – parameters for torch.nn.parallel.DistributedDataParallel. More info here: https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html#torch.nn.parallel.DistributedDataParallel
process_group_kwargs – parameters for torch.distributed.init_process_group. More info here: https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group
scaler_kwargs – parameters for torch.cuda.amp.GradScaler. Possible parameters: https://pytorch.org/docs/stable/amp.html#torch.cuda.amp.GradScaler
Examples:
from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.DistributedDataParallelAMPEngine(), ... )
from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.DistributedDataParallelAMPEngine( address="0.0.0.0", port=23234, ddp_kwargs={"find_unused_parameters": False}, process_group_kwargs={"port": 12345}, scaler_kwargs={"growth_factor": 1.5} ) # ...
args: logs: ... model: _target_: ... ... engine: _target_: DistributedDataParallelAMPEngine address: 0.0.0.0 port: 23234 ddp_kwargs: find_unused_parameters: false process_group_kwargs: port: 12345 scaler_kwargs: growth_factor: 1.5 stages: ...
Apex¶
APEXEngine¶
- class catalyst.engines.apex.APEXEngine(device: str = 'cuda', apex_kwargs: Optional[Dict[str, Any]] = None)[source]¶
Bases:
catalyst.engines.torch.DeviceEngineApex single training device engine.
- Parameters
device – use device, default is “cuda”.
apex_kwargs –
parameters for apex.amp.initialize except models and optimizers (they will be forwared automatically).
Docs for apex.amp.initialize: https://nvidia.github.io/apex/amp.html#apex.amp.initialize
Examples:
from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.APEXEngine(apex_kwargs=dict(opt_level="O1", keep_batchnorm_fp32=False)), ... )
from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.APEXEngine(apex_kwargs=dict(opt_level="O1", keep_batchnorm_fp32=False)) # ...
args: logs: ... model: _target_: ... ... engine: _target_: APEXEngine apex_kwargs: opt_level: O1 keep_batchnorm_fp32: false stages: ...
DataParallelApexEngine¶
- catalyst.engines.apex.DataParallelApexEngine¶
alias of
catalyst.engines.apex.DataParallelAPEXEngine
DistributedDataParallelApexEngine¶
- catalyst.engines.apex.DistributedDataParallelApexEngine¶
alias of
catalyst.engines.apex.DistributedDataParallelAPEXEngine
DeepSpeed¶
DistributedDataParallelDeepSpeedEngine¶
- class catalyst.engines.deepspeed.DistributedDataParallelDeepSpeedEngine(address: Optional[str] = None, port: Optional[Union[str, int]] = None, process_group_kwargs: Optional[Dict[str, Any]] = None, deepspeed_kwargs: Optional[Dict[str, Any]] = None, train_batch_size: int = 256)[source]¶
Bases:
catalyst.engines.torch.DeviceEngineDistributed DeepSpeed MultiGPU training device engine.
- Parameters
address – address to use for backend.
port – port to use for backend.
process_group_kwargs – parameters for torch.distributed.init_process_group. More info here: https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group
deepspeed_kwargs – parameters for deepspeed.initialize. More info here: https://deepspeed.readthedocs.io/en/latest/initialize.html
train_batch_size – shortcut for train batch size for deepspeed scaling (default: 256) for proper configuration, please use deepspeed_kwargs[‘config’] instead
Examples:
from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.DistributedDataParallelDeepSpeedEngine(), ... )
from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.DistributedDataParallelDeepSpeedEngine( address="0.0.0.0", port=23234, process_group_kwargs={"port": 12345}, deepspeed_kwargs={"config": {"train_batch_size": 64}} ) # ...
args: logs: ... model: _target_: ... ... engine: _target_: DistributedDataParallelDeepSpeedEngine address: 0.0.0.0 port: 23234 process_group_kwargs: port: 12345 deepspeed_kwargs: config: train_batch_size: 64 stages: ...
FairScale¶
PipelineParallelFairScaleEngine¶
- class catalyst.engines.fairscale.PipelineParallelFairScaleEngine(pipe_kwargs: Optional[Dict[str, Any]] = None)[source]¶
Bases:
catalyst.engines.torch.DeviceEngineFairScale multi-gpu training device engine.
- Parameters
pipe_kwargs – parameters for fairscale.nn.Pipe. Docs for fairscale.nn.Pipe: https://fairscale.readthedocs.io/en/latest/api/nn/pipe.html
Examples:
from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.PipelineParallelFairScaleEngine(), ... )
from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.PipelineParallelFairScaleEngine( pipe_kwargs={"balance": [3, 1]} ) # ...
args: logs: ... model: _target_: ... ... engine: _target_: PipelineParallelFairScaleEngine pipe_kwargs: balance: [3, 1] stages: ...
Torch¶
DeviceEngine¶
- class catalyst.engines.torch.DeviceEngine(device: Optional[str] = None)[source]¶
Bases:
catalyst.core.engine.IEngineSingle training device engine.
- Parameters
device – use device, default is “cpu”.
Examples:
from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.DeviceEngine("cuda:1"), ... )
from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.DeviceEngine("cuda:1") # ...
args: logs: ... model: _target_: ... ... engine: _target_: DeviceEngine device: cuda:1 stages: ...
DataParallelEngine¶
- class catalyst.engines.torch.DataParallelEngine[source]¶
Bases:
catalyst.engines.torch.DeviceEngineMultiGPU training device engine.
Examples:
from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.DataParallelEngine(), ... )
from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.DataParallelEngine() # ...
args: logs: ... model: _target_: ... ... engine: _target_: DataParallelEngine stages: ...
DistributedDataParallelEngine¶
- class catalyst.engines.torch.DistributedDataParallelEngine(address: Optional[str] = None, port: Optional[Union[str, int]] = None, sync_bn: bool = False, ddp_kwargs: Optional[Dict[str, Any]] = None, process_group_kwargs: Optional[Dict[str, Any]] = None)[source]¶
Bases:
catalyst.engines.torch.DeviceEngineDistributed MultiGPU training device engine.
- Parameters
address – address to use for backend.
port – port to use for backend.
sync_bn – boolean flag for batchnorm synchonization during disributed training. if True, applies PyTorch convert_sync_batchnorm to the model for native torch distributed only. Default, False.
ddp_kwargs – parameters for torch.nn.parallel.DistributedDataParallel. More info here: https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html#torch.nn.parallel.DistributedDataParallel
process_group_kwargs – parameters for torch.distributed.init_process_group. More info here: https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group
Examples:
from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.DistributedDataParallelEngine(), ... )
from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.DistributedDataParallelEngine( address="0.0.0.0", port=23234, ddp_kwargs={"find_unused_parameters": False}, process_group_kwargs={"backend": "nccl"}, ) # ...
args: logs: ... model: _target_: ... ... engine: _target_: DistributedDataParallelEngine address: 0.0.0.0 port: 23234 ddp_kwargs: find_unused_parameters: false process_group_kwargs: backend: nccl stages: ...
XLA¶
DeviceEngine¶
- class catalyst.engines.xla.XLAEngine[source]¶
Bases:
catalyst.engines.torch.DeviceEngineXLA SingleTPU training device engine.
Examples:
import os from datetime import datetime import torch from torch import nn, optim from torch.utils.data import DataLoader from catalyst import dl from catalyst.contrib.datasets import CIFAR10 from catalyst.contrib.nn import ResidualBlock from catalyst.data import transforms def conv_block(in_channels, out_channels, pool=False): layers = [ nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1), nn.BatchNorm2d(out_channels), nn.ReLU(inplace=True), ] if pool: layers.append(nn.MaxPool2d(2)) return nn.Sequential(*layers) def resnet9(in_channels: int, num_classes: int, size: int = 16): sz, sz2, sz4, sz8 = size, size * 2, size * 4, size * 8 return nn.Sequential( conv_block(in_channels, sz), conv_block(sz, sz2, pool=True), ResidualBlock(nn.Sequential(conv_block(sz2, sz2), conv_block(sz2, sz2))), conv_block(sz2, sz4, pool=True), conv_block(sz4, sz8, pool=True), ResidualBlock(nn.Sequential(conv_block(sz8, sz8), conv_block(sz8, sz8))), nn.Sequential( nn.MaxPool2d(4), nn.Flatten(), nn.Dropout(0.2), nn.Linear(sz8, num_classes) ), ) class CustomRunner(dl.IRunner): def __init__(self, logdir): super().__init__() self._logdir = logdir def get_engine(self): return dl.XLAEngine() def get_loggers(self): return { "console": dl.ConsoleLogger(), "csv": dl.CSVLogger(logdir=self._logdir), "tensorboard": dl.TensorboardLogger(logdir=self._logdir), } @property def stages(self): return ["train"] def get_stage_len(self, stage: str) -> int: return 3 def get_loaders(self, stage: str): transform = transforms.Compose( [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))] ) train_data = CIFAR10(os.getcwd(), train=False, download=True, transform=transform) valid_data = CIFAR10(os.getcwd(), train=False, download=True, transform=transform) if self.engine.is_ddp: train_sampler = torch.utils.data.distributed.DistributedSampler( train_data, num_replicas=self.engine.world_size, rank=self.engine.rank, shuffle=True ) valid_sampler = torch.utils.data.distributed.DistributedSampler( valid_data, num_replicas=self.engine.world_size, rank=self.engine.rank, shuffle=False ) else: train_sampler = valid_sampler = None return { "train": DataLoader(train_data, batch_size=32, sampler=train_sampler), "valid": DataLoader(valid_data, batch_size=32, sampler=valid_sampler), } def get_model(self, stage: str): model = self.model if self.model is not None else resnet9(in_channels=3, num_classes=10) return model def get_criterion(self, stage: str): return nn.CrossEntropyLoss() def get_optimizer(self, stage: str, model): return optim.Adam(model.parameters(), lr=1e-3) def get_scheduler(self, stage: str, optimizer): return optim.lr_scheduler.MultiStepLR(optimizer, [5, 8], gamma=0.3) def get_callbacks(self, stage: str): return { "criterion": dl.CriterionCallback( metric_key="loss", input_key="logits", target_key="targets" ), "optimizer": dl.OptimizerCallback(metric_key="loss"), "scheduler": dl.SchedulerCallback(loader_key="valid", metric_key="loss"), "accuracy": dl.AccuracyCallback( input_key="logits", target_key="targets", topk_args=(1, 3, 5) ), "checkpoint": dl.CheckpointCallback( self._logdir, loader_key="valid", metric_key="accuracy", minimize=False, save_n_best=1, ), "tqdm": dl.TqdmCallback(), } def handle_batch(self, batch): x, y = batch logits = self.model(x) self.batch = { "features": x, "targets": y, "logits": logits, } logdir = f"logs/{datetime.now().strftime('%Y%m%d-%H%M%S')}" runner = CustomRunner(logdir) runner.run()
DataParallelEngine¶
- class catalyst.engines.xla.DistributedXLAEngine[source]¶
Bases:
catalyst.engines.torch.DeviceEngineDistributed XLA MultiTPU training device engine.
Examples:
import os from datetime import datetime import torch from torch import nn, optim from torch.utils.data import DataLoader from catalyst import dl from catalyst.contrib.datasets import CIFAR10 from catalyst.contrib.nn import ResidualBlock from catalyst.data import transforms def conv_block(in_channels, out_channels, pool=False): layers = [ nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1), nn.BatchNorm2d(out_channels), nn.ReLU(inplace=True), ] if pool: layers.append(nn.MaxPool2d(2)) return nn.Sequential(*layers) def resnet9(in_channels: int, num_classes: int, size: int = 16): sz, sz2, sz4, sz8 = size, size * 2, size * 4, size * 8 return nn.Sequential( conv_block(in_channels, sz), conv_block(sz, sz2, pool=True), ResidualBlock(nn.Sequential(conv_block(sz2, sz2), conv_block(sz2, sz2))), conv_block(sz2, sz4, pool=True), conv_block(sz4, sz8, pool=True), ResidualBlock(nn.Sequential(conv_block(sz8, sz8), conv_block(sz8, sz8))), nn.Sequential( nn.MaxPool2d(4), nn.Flatten(), nn.Dropout(0.2), nn.Linear(sz8, num_classes) ), ) class CustomRunner(dl.IRunner): def __init__(self, logdir): super().__init__() self._logdir = logdir def get_engine(self): return dl.DistributedXLAEngine() def get_loggers(self): return { "console": dl.ConsoleLogger(), "csv": dl.CSVLogger(logdir=self._logdir), "tensorboard": dl.TensorboardLogger(logdir=self._logdir), } @property def stages(self): return ["train"] def get_stage_len(self, stage: str) -> int: return 3 def get_loaders(self, stage: str): transform = transforms.Compose( [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))] ) train_data = CIFAR10(os.getcwd(), train=False, download=True, transform=transform) valid_data = CIFAR10(os.getcwd(), train=False, download=True, transform=transform) if self.engine.is_ddp: train_sampler = torch.utils.data.distributed.DistributedSampler( train_data, num_replicas=self.engine.world_size, rank=self.engine.rank, shuffle=True ) valid_sampler = torch.utils.data.distributed.DistributedSampler( valid_data, num_replicas=self.engine.world_size, rank=self.engine.rank, shuffle=False ) else: train_sampler = valid_sampler = None return { "train": DataLoader(train_data, batch_size=32, sampler=train_sampler), "valid": DataLoader(valid_data, batch_size=32, sampler=valid_sampler), } def get_model(self, stage: str): model = self.model if self.model is not None else resnet9(in_channels=3, num_classes=10) return model def get_criterion(self, stage: str): return nn.CrossEntropyLoss() def get_optimizer(self, stage: str, model): return optim.Adam(model.parameters(), lr=1e-3) def get_scheduler(self, stage: str, optimizer): return optim.lr_scheduler.MultiStepLR(optimizer, [5, 8], gamma=0.3) def get_callbacks(self, stage: str): return { "criterion": dl.CriterionCallback( metric_key="loss", input_key="logits", target_key="targets" ), "optimizer": dl.OptimizerCallback(metric_key="loss"), "scheduler": dl.SchedulerCallback(loader_key="valid", metric_key="loss"), "accuracy": dl.AccuracyCallback( input_key="logits", target_key="targets", topk_args=(1, 3, 5) ), "checkpoint": dl.CheckpointCallback( self._logdir, loader_key="valid", metric_key="accuracy", minimize=False, save_n_best=1, ), "tqdm": dl.TqdmCallback(), } def handle_batch(self, batch): x, y = batch logits = self.model(x) self.batch = { "features": x, "targets": y, "logits": logits, } logdir = f"logs/{datetime.now().strftime('%Y%m%d-%H%M%S')}" runner = CustomRunner(logdir) runner.run()