from typing import Any, Dict, Union
import copy
import os

import torch
import torch.distributed as dist

from catalyst.engines.torch import DeviceEngine
from catalyst.settings import SETTINGS
from catalyst.utils.distributed import ddp_reduce

if SETTINGS.deepspeed_required:
    import deepspeed

[docs]class DistributedDataParallelDeepSpeedEngine(DeviceEngine): """Distributed DeepSpeed MultiGPU training device engine. Args: address: address to use for backend. port: port to use for backend. process_group_kwargs: parameters for `torch.distributed.init_process_group`. More info here: deepspeed_kwargs: parameters for `deepspeed.initialize`. More info here: Examples: .. code-block:: python from catalyst import dl runner = dl.SupervisedRunner() runner.train( engine=dl.DistributedDataParallelDeepSpeedEngine(), ... ) .. code-block:: python from catalyst import dl class MyRunner(dl.IRunner): # ... def get_engine(self): return dl.DistributedDataParallelDeepSpeedEngine( address="", port=23234, process_group_kwargs={"port": 12345}, deepspeed_kwargs={"config": 64} ) # ... .. code-block:: yaml args: logs: ... model: _target_: ... ... engine: _target_: DistributedDataParallelDeepSpeedEngine address: port: 23234 process_group_kwargs: port: 12345 deepspeed_kwargs: config: train_batch_size: 64 stages: ... """ def __init__( self, address: str = None, port: Union[str, int] = None, process_group_kwargs: Dict[str, Any] = None, deepspeed_kwargs: Dict[str, Any] = None, ): """Init.""" super().__init__() self.address = address or "localhost" self.port = port or 12345 self._rank = 0 self._device = None if process_group_kwargs is None: process_group_kwargs = {} self.process_group_kwargs = copy.deepcopy(process_group_kwargs) self._world_size = ( self.process_group_kwargs.get("world_size", None) or torch.cuda.device_count() ) self.deepspeed_kwargs = deepspeed_kwargs or {} self.deepspeed_kwargs["config"] = self.deepspeed_kwargs.get("config", {}) self.deepspeed_kwargs["config"]["train_batch_size"] = self.deepspeed_kwargs["config"].get( "train_batch_size", 256 ) def __repr__(self): # noqa: D105 return ( f"{self.__class__.__name__}(address={self.address}, " f"port={self.port}, " f"process_group_kwargs={self.process_group_kwargs}, " f"deepspeed_kwargs={self.deepspeed_kwargs})" ) @property def rank(self) -> int: """Process rank for distributed training.""" return self._rank @property def world_size(self) -> int: """Process world size for distributed training.""" return self._world_size def setup_process(self, rank: int = -1, world_size: int = 1): """Initialize DDP variables and processes. Args: rank: process rank. Default is `-1`. world_size: number of devices in netwok to expect for train. Default is `1`. """ self._rank = rank self._world_size = world_size torch.cuda.set_device(int(self._rank)) self._device = f"cuda:{int(self._rank)}" os.environ["RANK"] = str(rank) os.environ["LOCAL_RANK"] = str(rank) os.environ["WORLD_SIZE"] = str(world_size) os.environ["MASTER_ADDR"] = str(self.address) os.environ["MASTER_PORT"] = str(self.port) deepspeed.init_distributed(**self.process_group_kwargs) def cleanup_process(self): """Clean DDP variables and processes.""" dist.barrier() dist.destroy_process_group() # @TODO: add all_gather def sync_tensor(self, tensor: torch.Tensor, mode: str) -> torch.Tensor: """Syncs ``tensor`` over ``world_size`` in distributed mode. Args: tensor: tensor to sync across the processes. mode: tensor synchronization type, should be one of 'sum' or 'mean'. Default is 'mean'. Returns: torch.Tensor with synchronized values. """ return ddp_reduce(tensor, mode, self.world_size) def init_components( self, model_fn=None, criterion_fn=None, optimizer_fn=None, scheduler_fn=None, ): """Inits the runs components.""" model = model_fn() model = self.sync_device(model) criterion = criterion_fn() criterion = self.sync_device(criterion) optimizer = optimizer_fn() optimizer = self.sync_device(optimizer) scheduler = scheduler_fn() scheduler = self.sync_device(scheduler) model, optimizer, _, scheduler = deepspeed.initialize( model=model, optimizer=optimizer, lr_scheduler=scheduler, **self.deepspeed_kwargs, ) return model, criterion, optimizer, scheduler def zero_grad(self, loss, model, optimizer) -> None: """Abstraction over ``model.zero_grad()`` step.""" model.zero_grad() def backward_loss(self, loss, model, optimizer) -> None: """Abstraction over ``loss.backward()`` step.""" model.backward(loss) def optimizer_step(self, loss, model, optimizer) -> None: """Abstraction over ``optimizer.step()`` step.""" model.step()
__all__ = ["DistributedDataParallelDeepSpeedEngine"]