Source code for catalyst.dl.utils.wizard

from collections import OrderedDict
import pathlib

from prompt_toolkit import prompt
import yaml

from catalyst.dl import registry
from catalyst.dl.utils import clone_pipeline
from catalyst.dl.utils.pipelines import URL as pipeline_urls
from catalyst.utils.scripts import import_module

yaml.add_representer(
    OrderedDict,
    lambda dumper, data:
        dumper.represent_mapping("tag:yaml.org,2002:map", data.items()))


[docs]class Wizard: """ Class for Catalyst Config API Wizard. The instance of this class will be created and called from cli command: ``catalyst-dl init --interactive``. With help of this Wizard user will be able to setup pipeline from available templates and make choices of what predefined classes to use in different parts of pipeline. """
[docs] def __init__(self): """ Initialization of instance of this class will print welcome message and logo of Catalyst in ASCII format. Also here we'll save all classes of Catalyst own pipeline parts to be able to put user's modules on top of lists to ease the choice. """ self.__sep("Welcome to Catalyst Config API wizard!") print(""" ___________ (_ _) | | | | | | / \\ / ( \\ / / (# \\ / # # \\ / # # \\ / ####### \\ (_____________________) \n""") self._cfg = OrderedDict([ ("model_params", OrderedDict()), ("args", OrderedDict()), ("stages", OrderedDict()) ]) self.pipeline_path = pathlib.Path("./") self.__before_export = { "MODELS": registry.__dict__["MODELS"].all(), "CRITERIONS": registry.__dict__["CRITERIONS"].all(), "OPTIMIZERS": registry.__dict__["OPTIMIZERS"].all(), "SCHEDULERS": registry.__dict__["SCHEDULERS"].all(), "CALLBACKS": registry.__dict__["CALLBACKS"].all() }
@staticmethod def __sep(step_name: str = None): """ Separator between Wizard sections """ if step_name is None: print("\n" + "="*100 + "\n") else: msg = "\n" + "="*100 + "\n" msg += "="*10 + " " + step_name + " " msg += "="*(100 - len(step_name) - 12) msg += "\n" + "="*100 + "\n" print(msg) @staticmethod def _export_step(): print("Config is complete. What is next?\n\n" "1. Preview config in YAML format\n" "2. Save config to file\n" "3. Discard changes and exit\n") return prompt("Enter the number: ") @staticmethod def __res(result, is_yaml=False): if is_yaml: print(f"->\n{yaml.dump(result, default_flow_style=False)}") else: print(f"-> {result}") def __sorted_for_user(self, key): """ Here we put user's modules of specific part of pipeline on top of modules predefined in Catalyst """ modules = registry.__dict__[key].all() user_modules = list(set(modules) - set(self.__before_export[key])) user_modules = sorted(user_modules) return user_modules + sorted([m for m in modules if m[0].isupper()]) def _preview(self): """ Showing user final config in YAML format """ self.__sep() print(yaml.dump(self._cfg, default_flow_style=False)) self.__sep() def _dump_step(self): """ Asking where and saving final config converted into YAML """ path = prompt("Enter config path: ", default="./configs/config.yml") self.__res(path) path = pathlib.Path(path) with path.open(mode="w") as stream: yaml.dump(self._cfg, stream, default_flow_style=False) print(f"Config was written to {path}") def _skip_override_stages_common(self, param_name): """ Stages could have common params, in that case we will ask user if it should be overriden for specific step. If not - we'll just skip entire params section for stage """ common = None if param_name in self._cfg["stages"]: common = self._cfg["stages"][param_name] print("You have common setting for all stages:\n" + yaml.dump(common, default_flow_style=False)) res = prompt("Do you want to override it? (y/N): ", default="N") self.__res(res) return res.upper() == "N" return False def _callbacks_step(self, stage): self.__sep(f"Callbacks") print("Let's add some callbacks!\n\n" "!!! Remember that Catalyst will add Criterion, Optimizer and " "Checkpoint callbacks for you\n" "with default settings if name of the step is NOT started " "with ``infer``.\n") opts = OrderedDict() while True: callback = prompt("Enter callback section name, e.g. " "'loss_aggregator'" "(or hit Enter to stop adding callbacks): ") if not callback: if opts: stage["callbacks"] = opts return self.__res(callback) callback_params = OrderedDict() self._basic_params_step("callback", callback_params) opts[callback] = callback_params["callback_params"] def _basic_params_step(self, param, stage, optional=False): """ Step #x Models, criterions, callbacks, schedulers could be choosen from list of predefined in Catalyst as well as from imported from user expdir. Also it even could not exist yet, so we provide a way to enter class name of the entity. Also we request args or params of those modules, but they are weak-typed now and will be all strings/ints in final config. """ self.__sep(f"{param}_params") if self._skip_override_stages_common(f"{param}_params"): return opts = OrderedDict() modules = self.__sorted_for_user(f"{param.upper()}S") msg = f"What {param} you'll be using:\n\n" if modules: if optional: msg += "0: Skip this param\n" msg += "\n".join([f"{n+1}: {m}" for n, m in enumerate(modules)]) print(msg) module = prompt("\nEnter number from list above or " f"class name of {param} you'll be using: ") if module.isdigit(): module = int(module) if module == 0: self.__res("Skipping...") return module = modules[module - 1] self.__res(module) else: module = prompt(f"Enter class name of {param} " "you'll be using: ") self.__res(module) opts[param] = module res = prompt("If there are arguments you want to provide during " f"{param} initialization, provide them here in " "following format:\n\nlr=0.001,beta=3.41\n\n" "Or just skip this step (press Enter): ") if res: res = [t.split("=") for t in res.split(",")] for k, val in res: # We can add regex to parse params properly into types we need opts[k] = int(val) if val.isdigit() else val self.__res(opts, is_yaml=True) stage[f"{param}_params"] = opts def _state_params_step(self, stage): """ Step #5.b ``state_params`` of Experiment. """ self.__sep(f"state_params") if self._skip_override_stages_common("state_params"): return opts = OrderedDict() opts["num_epochs"] = int(prompt("How much epochs you want to run this " "stage: ", default="1")) self.__res(opts["num_epochs"]) opts["main_metric"] = prompt("What is the main_metric?: ", default="loss") self.__res(opts["main_metric"]) minimize = bool(prompt("Will it be minimized (True/False): ", default="True")) opts["minimize_metric"] = minimize self.__res(opts["minimize_metric"]) stage["state_params"] = opts def _data_params_step(self, stage): """ Step #5.a Here we'll store required ``data_params``. Right now experiment couldn't be run without ``num_worker`` param, but it's rarely when user needs batch_size of 1 """ self.__sep(f"data_params") if self._skip_override_stages_common("data_params"): return opts = OrderedDict() opts["batch_size"] = int(prompt("What is the batch_size?: ", default="1")) self.__res(opts["batch_size"]) opts["num_workers"] = int(prompt("What is the num_workers?: ", default="1")) self.__res(opts["num_workers"]) stage["data_params"] = opts def _stage_step(self, stage): """ Step #5 For stages' common params and for every stage params we'll run this method to gather all we need to know about the stage and its settings """ self._data_params_step(stage) self._state_params_step(stage) self._basic_params_step("criterion", stage) self._basic_params_step("optimizer", stage) self._basic_params_step("scheduler", stage, optional=True) self._callbacks_step(stage) return def _stages_step(self): """ Step #4 Stages params. We need to understand how much stages will be there, what are their names and if user wants to predefine something common for all stages """ self.__sep("stages") cnt = prompt("How much stages your exepriment will contain: ") self.__res(cnt) cnt = int(cnt) or 1 if cnt > 1: res = prompt("Do you want to assign some common settings " "for all stages? (y/N): ", default="y") self.__res(res) if res.lower() == "y": self._stage_step(self._cfg["stages"]) print(f"\nNow we'll configure all {cnt} stages one-by-one\n") for stage_id in range(cnt): name = prompt("What would be the name of this stage: ", default=f"stage{stage_id + 1}") self.__res(name) stage = OrderedDict() self._stage_step(stage) self._cfg["stages"][name] = stage def _model_step(self): """ Step #3 We need to user choose its model for experiment """ self._basic_params_step("model", self._cfg) def __export_user_modules(self): """ Private method to try to export user's modules. We need this to add user's modules to list of choices for pipeline parts """ try: # We need to import module to add possible modules to registry expdir = self._cfg["args"]["expdir"] if not isinstance(expdir, pathlib.Path): expdir = pathlib.Path(expdir) import_module(expdir) self.__res(f"Modules from {expdir} exported") except OSError: print(f"There is no modules to import found: {expdir}") except Exception as err: print("Unexpected error when tried to import modules from " f"{expdir}: {err}") def _args_step(self): """ Step #2 ``args`` section where two params required: expdir - where all user modules stored logdir - where Catalyst will write logs """ self.__sep("args") self._cfg["args"]["expdir"] = prompt( "Provide expdir for your experiment " "(where is the `__init__.py` with your modules stored): ", default=str(self.pipeline_path/"src")) self.__res(self._cfg["args"]["expdir"]) self._cfg["args"]["logdir"] = prompt( "Provide logdir for your experiment " "(where Catalyst supposed to save its logs): ", default=str(self.pipeline_path/"logs/experiment")) self.__res(self._cfg["args"]["logdir"]) self.__export_user_modules() def _pipeline_step(self): """ Step #1 User can choose which pipeline to clone and if not skipped - where. Then pipeline will be copied in requested directory """ self.__sep("Pipeline templates") opts = list(pipeline_urls.keys()) + ["empty"] opts = [opt.capitalize() for opt in opts] msg = "0: Skip this step\n" msg += "\n".join([f"{n + 1}: {v}" for n, v in enumerate(opts)]) print(msg) res = int(prompt("\nChoose pipeline template you want to init " "your project from: ")) if res == 0: self.__res("Skipped...") return pipeline = opts[res - 1] self.__res(pipeline) out_dir = prompt(f"Where we need to copy {pipeline} " "template files?: ", default="./") self.pipeline_path = pathlib.Path(out_dir) clone_pipeline(pipeline.lower(), self.pipeline_path) self.__res(f"{pipeline} cloned to {self.pipeline_path}")
[docs] def run(self): """ Walks user through predefined wizard steps """ self._pipeline_step() self._args_step() self._model_step() self._stages_step() while True: res = self._export_step() if res == "1": self._preview() elif res == "2": self._dump_step() return elif res == "3": return else: print(f"Unknown option `{res}`")
[docs]def run_wizard(): """ Method to initialize and run wizard """ wiz = Wizard() wiz.run()