Source code for catalyst.core.callbacks.scheduler

import safitty

import torch

from catalyst import utils
from catalyst.contrib.nn.schedulers import BatchScheduler, OneCycleLRWithWarmup
from catalyst.core import _State, Callback, CallbackOrder


[docs]class SchedulerCallback(Callback): def __init__( self, scheduler_key: str = None, mode: str = None, reduce_metric: str = "loss" ): super().__init__(CallbackOrder.Scheduler) self.scheduler_key = scheduler_key self.mode = mode self.reduce_metric = reduce_metric
[docs] def step(self, state: _State): scheduler = state.get_key( key="scheduler", inner_key=self.scheduler_key ) valid_metric = \ safitty.get(state.metric_manager.valid_values, self.reduce_metric) lr, momentum = self._scheduler_step( scheduler=scheduler, valid_metric=valid_metric ) state.set_key(lr, key="lr", inner_key=self.scheduler_key) state.set_key(momentum, key="momentum", inner_key=self.scheduler_key)
[docs] def on_stage_start(self, state: _State): scheduler = state.get_key( key="scheduler", inner_key=self.scheduler_key ) assert scheduler is not None if self.mode is None: if isinstance(scheduler, BatchScheduler): self.mode = "batch" else: self.mode = "epoch" if isinstance(scheduler, OneCycleLRWithWarmup) and \ self.mode == "batch": scheduler.reset()
[docs] def on_loader_start(self, state: _State): scheduler = state.get_key( key="scheduler", inner_key=self.scheduler_key ) if state.loader_name.startswith("train") and \ isinstance(scheduler, OneCycleLRWithWarmup) and \ self.mode == "batch": scheduler.recalculate( loader_len=state.loader_len, current_step=state.stage_epoch )
[docs] def on_batch_end(self, state: _State): if self.mode == "batch": self.step(state=state)
[docs] def on_epoch_end(self, state: _State): if self.mode == "epoch": self.step(state=state)
@staticmethod def _scheduler_step( scheduler, valid_metric=None, ): if isinstance(scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau): scheduler.step(valid_metric) lr = safitty.get(scheduler.optimizer.param_groups, 0, "lr") else: scheduler.step() lr = scheduler.get_lr()[0] momentum = utils.get_optimizer_momentum(scheduler.optimizer) return lr, momentum
[docs]class LRUpdater(Callback): """Basic class that all Lr updaters inherit from"""
[docs] def __init__(self, optimizer_key: str = None): """ Args: optimizer_key: which optimizer key to use for learning rate scheduling """ super().__init__(CallbackOrder.Scheduler) self.init_lr = 0 self.optimizer_key = optimizer_key
[docs] def calc_lr(self): return None
[docs] def calc_momentum(self): return None
@staticmethod def _update_lr(optimizer, new_lr): for pg in optimizer.param_groups: pg["lr"] = new_lr @staticmethod def _update_momentum(optimizer, new_momentum): if "betas" in optimizer.param_groups[0]: for pg in optimizer.param_groups: pg["betas"] = (new_momentum, pg["betas"][1]) else: for pg in optimizer.param_groups: pg["momentum"] = new_momentum def _update_optimizer(self, optimizer): new_lr = self.calc_lr() if new_lr is not None: self._update_lr(optimizer, new_lr) new_momentum = self.calc_momentum() if new_momentum is not None: self._update_momentum(optimizer, new_momentum) else: new_momentum = utils.get_optimizer_momentum(optimizer) return new_lr, new_momentum
[docs] def update_optimizer(self, state: _State): if not state.need_backward: return optimizer = state.get_key( key="optimizer", inner_key=self.optimizer_key ) lr, momentum = self._update_optimizer(optimizer=optimizer) state.set_key(lr, key="lr", inner_key=self.optimizer_key) state.set_key(momentum, key="momentum", inner_key=self.optimizer_key)
[docs] def on_stage_start(self, state: _State): optimizer = state.get_key( key="optimizer", inner_key=self.optimizer_key ) self.init_lr = optimizer.defaults["lr"]
[docs] def on_loader_start(self, state: _State): if state.need_backward: self.update_optimizer(state=state)
[docs] def on_batch_end(self, state: _State): if state.need_backward: self.update_optimizer(state=state)
__all__ = ["SchedulerCallback", "LRUpdater"]