Shortcuts

Source code for catalyst.utils.scripts

from typing import Callable, Dict, Union
from importlib.util import module_from_spec, spec_from_file_location
import os
import pathlib
import shutil
import subprocess
import sys
import warnings

import torch
import torch.distributed

from catalyst.registry import EXPERIMENTS, RUNNERS
from catalyst.utils.distributed import (
    get_distributed_env,
    get_distributed_params,
)
from catalyst.utils.misc import get_utcnow_time


[docs]def import_module(expdir: Union[str, pathlib.Path]): """ Imports python module by path. Args: expdir: path to python module. Returns: Imported module. """ if not isinstance(expdir, pathlib.Path): expdir = pathlib.Path(expdir) sys.path.insert(0, str(expdir.absolute())) sys.path.insert(0, os.path.dirname(str(expdir.absolute()))) s = spec_from_file_location( expdir.name, str(expdir.absolute() / "__init__.py"), submodule_search_locations=[expdir.absolute()], ) m = module_from_spec(s) s.loader.exec_module(m) sys.modules[expdir.name] = m return m
[docs]def prepare_config_api_components(expdir: pathlib.Path, config: Dict): """ Imports and create core Config API components - Experiment, Runner and Config from ``expdir`` - experiment directory and ``config`` - experiment config. Args: expdir: experiment directory path config: dictionary with experiment Config Returns: Experiment, Runner, Config for Config API usage. """ if not isinstance(expdir, pathlib.Path): expdir = pathlib.Path(expdir) m = import_module(expdir) experiment_fn = getattr(m, "Experiment", None) runner_fn = getattr(m, "Runner", None) experiment_params = config.get("experiment_params", {}) experiment_from_config = experiment_params.pop("experiment", None) assert any( x is None for x in (experiment_fn, experiment_from_config) ), "Experiment is set both in code and config." if experiment_fn is None and experiment_from_config is not None: experiment_fn = EXPERIMENTS.get(experiment_from_config) runner_params = config.get("runner_params", {}) runner_from_config = runner_params.pop("runner", None) assert any( x is None for x in (runner_fn, runner_from_config) ), "Runner is set both in code and config." if runner_fn is None and runner_from_config is not None: runner_fn = RUNNERS.get(runner_from_config) experiment = experiment_fn(config) runner = runner_fn(**runner_params) return experiment, runner, config
def _tricky_dir_copy(dir_from: str, dir_to: str) -> None: os.makedirs(dir_to, exist_ok=True) shutil.rmtree(dir_to) shutil.copytree(dir_from, dir_to)
[docs]def dump_code( expdir: Union[str, pathlib.Path], logdir: Union[str, pathlib.Path] ) -> None: """ Dumps Catalyst code for reproducibility. Args: expdir (Union[str, pathlib.Path]): experiment dir path logdir (Union[str, pathlib.Path]): logging dir path """ expdir = expdir[:-1] if expdir.endswith("/") else expdir new_src_dir = "code" # @TODO: hardcoded old_pro_dir = os.path.dirname(os.path.abspath(__file__)) + "/../" new_pro_dir = os.path.join(logdir, new_src_dir, "catalyst") _tricky_dir_copy(old_pro_dir, new_pro_dir) old_expdir = os.path.abspath(expdir) new_expdir = os.path.basename(old_expdir) new_expdir = os.path.join(logdir, new_src_dir, new_expdir) _tricky_dir_copy(old_expdir, new_expdir)
[docs]def dump_python_files(src: pathlib.Path, dst: pathlib.Path) -> None: """ Dumps python code (``*.py`` and ``*.ipynb``) files. Args: src: source code path dst: destination code path """ py_files = list(src.glob("*.py")) ipynb_files = list(src.glob("*.ipynb")) py_files += ipynb_files py_files = list(set(py_files)) for py_file in py_files: shutil.copy2(f"{str(py_file.absolute())}", f"{dst}/{py_file.name}")
[docs]def dump_experiment_code(src: pathlib.Path, dst: pathlib.Path) -> None: """ Dumps your experiment code for Config API use cases. Args: src: source code path dst: destination code path """ utcnow = get_utcnow_time() dst = dst.joinpath("code") dst = dst.joinpath(f"code-{utcnow}") if dst.exists() else dst os.makedirs(dst, exist_ok=True) dump_python_files(src, dst)
[docs]def distributed_cmd_run( worker_fn: Callable, distributed: bool = True, *args, **kwargs ) -> None: """ Distributed run Args: worker_fn: worker fn to run in distributed mode distributed: distributed flag args: additional parameters for worker_fn kwargs: additional key-value parameters for worker_fn """ distributed_params = get_distributed_params() local_rank = distributed_params["local_rank"] world_size = distributed_params["world_size"] if distributed and torch.distributed.is_initialized(): warnings.warn( "Looks like you are trying to call distributed setup twice, " "switching to normal run for correct distributed training." ) if ( not distributed or torch.distributed.is_initialized() or world_size <= 1 ): worker_fn(*args, **kwargs) elif local_rank is not None: torch.cuda.set_device(int(local_rank)) torch.distributed.init_process_group( backend="nccl", init_method="env://" ) worker_fn(*args, **kwargs) else: workers = [] try: for local_rank in range(torch.cuda.device_count()): rank = distributed_params["start_rank"] + local_rank env = get_distributed_env(local_rank, rank, world_size) cmd = [sys.executable] + sys.argv.copy() workers.append(subprocess.Popen(cmd, env=env)) for worker in workers: worker.wait() finally: for worker in workers: worker.kill()
__all__ = [ "import_module", "dump_code", "dump_python_files", "prepare_config_api_components", "dump_experiment_code", "distributed_cmd_run", ]