Source code for catalyst.engines.deepspeed

from typing import Any, Dict, Optional, Union
import os

import torch

from catalyst.engines.torch import DistributedDataParallelEngine
from catalyst.settings import SETTINGS

if SETTINGS.deepspeed_required:
    import deepspeed

[docs]class DistributedDataParallelDeepSpeedEngine(DistributedDataParallelEngine): """Distributed DeepSpeed MultiGPU training device engine. Args: address: master node (rank 0)'s address, should be either the IP address or the hostname of node 0, for single node multi-proc training, can simply be port: master node (rank 0)'s free port that needs to be used for communication during distributed training world_size: the number of processes to use for distributed training. Should be less or equal to the number of GPUs workers_dist_rank: the rank of the first process to run on the node. It should be a number between `number of initialized processes` and `world_size - 1`, the other processes on the node wiil have ranks `# of initialized processes + 1`, `# of initialized processes + 2`, ..., `# of initialized processes + num_node_workers - 1` num_node_workers: the number of processes to launch on the node. For GPU training, this is recommended to be set to the number of GPUs on the current node so that each process can be bound to a single GPU process_group_kwargs: parameters for `torch.distributed.init_process_group`. More info here: train_batch_size: shortcut for train batch size for deepspeed scaling (default: 256) for proper configuration, please use deepspeed_kwargs['config'] instead deepspeed_kwargs: parameters for `deepspeed.initialize`. More info here: Examples: .. code-block:: python from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.DistributedDataParallelDeepSpeedEngine(), ... ) .. code-block:: python from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.DistributedDataParallelDeepSpeedEngine( address="", port=23234, process_group_kwargs={"port": 12345}, deepspeed_kwargs={"config": {"train_batch_size": 64}} ) # ... .. code-block:: yaml args: logs: ... model: _target_: ... ... engine: _target_: DistributedDataParallelDeepSpeedEngine address: port: 23234 process_group_kwargs: port: 12345 deepspeed_kwargs: config: train_batch_size: 64 stages: ... """ def __init__( self, address: str = "", port: Union[str, int] = 2112, world_size: Optional[int] = None, workers_dist_rank: int = 0, num_node_workers: Optional[int] = None, process_group_kwargs: Dict[str, Any] = None, train_batch_size: int = 256, deepspeed_kwargs: Dict[str, Any] = None, ): """Init.""" super().__init__( address=address, port=port, world_size=world_size, workers_dist_rank=workers_dist_rank, num_node_workers=num_node_workers, process_group_kwargs=process_group_kwargs, ) process_group_kwargs = process_group_kwargs or {} self.process_group_kwargs = { "dist_backend": "nccl", **process_group_kwargs, } self._backend = self.process_group_kwargs["dist_backend"] self.deepspeed_kwargs = deepspeed_kwargs or {} self.deepspeed_kwargs["config"] = self.deepspeed_kwargs.get("config", {}) self.deepspeed_kwargs["config"]["train_batch_size"] = self.deepspeed_kwargs["config"].get( "train_batch_size", train_batch_size ) def __repr__(self): # noqa: D105 return ( f"{self.__class__.__name__}(address={self.address}, " f"port={self.port}, " f"process_group_kwargs={self.process_group_kwargs}, " f"deepspeed_kwargs={self.deepspeed_kwargs})" ) @property def rank(self) -> int: """Process rank for distributed training.""" return self._rank @property def world_size(self) -> int: """Process world size for distributed training.""" return self._world_size @property def backend(self) -> Optional[str]: """String identifier for distributed backend.""" return self._backend def setup_process(self, rank: int = -1, world_size: int = 1): """Initialize DDP variables and processes. Args: rank: process rank. Default is `-1`. world_size: number of devices in netwok to expect for train. Default is `1`. """ self._rank = self.workers_global_rank + rank if torch.cuda.is_available(): torch.cuda.set_device(int(rank)) self._device = f"cuda:{int(rank)}" os.environ["MASTER_ADDR"] = str(self.address) os.environ["MASTER_PORT"] = str(self.port) os.environ["WORLD_SIZE"] = str(self._world_size) os.environ["RANK"] = str(self._rank) os.environ["LOCAL_RANK"] = str(rank) deepspeed.init_distributed(**self.process_group_kwargs) def init_components( self, model_fn=None, criterion_fn=None, optimizer_fn=None, scheduler_fn=None ): """Inits the runs components.""" model = model_fn() model = self.sync_device(model) criterion = criterion_fn() criterion = self.sync_device(criterion) optimizer = optimizer_fn(model) optimizer = self.sync_device(optimizer) scheduler = scheduler_fn(optimizer) scheduler = self.sync_device(scheduler) model, optimizer, _, scheduler = deepspeed.initialize( model=model, optimizer=optimizer, lr_scheduler=scheduler, **self.deepspeed_kwargs ) return model, criterion, optimizer, scheduler def zero_grad(self, loss, model, optimizer) -> None: """Abstraction over ``model.zero_grad()`` step.""" model.zero_grad() def backward_loss(self, loss, model, optimizer) -> None: """Abstraction over ``loss.backward()`` step.""" model.backward(loss) def optimizer_step(self, loss, model, optimizer) -> None: """Abstraction over ``optimizer.step()`` step.""" model.step()
__all__ = ["DistributedDataParallelDeepSpeedEngine"]