from typing import Callable, Dict, List # isort:skip
import logging
import safitty
import torch
from catalyst import utils
from catalyst.core import _State, Callback, CallbackOrder, registry
from catalyst.utils.tools.typing import Optimizer
logger = logging.getLogger(__name__)
[docs]class OptimizerCallback(Callback):
"""
Optimizer callback, abstraction over optimizer step.
"""
[docs] def __init__(
self,
grad_clip_params: Dict = None,
accumulation_steps: int = 1,
optimizer_key: str = None,
loss_key: str = "loss",
decouple_weight_decay: bool = True,
save_model_grads: bool = False
):
"""
Args:
grad_clip_params (dict): params for gradient clipping
accumulation_steps (int): number of steps before
``model.zero_grad()``
optimizer_key (str): A key to take a optimizer in case
there are several of them and they are in a dictionary format.
loss_key (str): key to get loss from ``state.loss``
decouple_weight_decay (bool): If True - decouple weight decay
regularization.
save_model_grads (bool): If True - State.model_grads will
contain gradients calculated on backward propagation on current
batch
"""
super().__init__(CallbackOrder.Optimizer)
grad_clip_params: dict = grad_clip_params or {}
self.grad_clip_fn = (
registry.GRAD_CLIPPERS.get_from_params(**grad_clip_params)
)
self.accumulation_steps: int = accumulation_steps
self.optimizer_key: str = optimizer_key
self.loss_key: str = loss_key
self.decouple_weight_decay = decouple_weight_decay
self.save_model_grads = save_model_grads
self._optimizer_wd: List[float] = [0.0]
self._accumulation_counter: int = 0
[docs] @staticmethod
def grad_step(
*,
optimizer: Optimizer,
optimizer_wds: List[float] = 0,
grad_clip_fn: Callable = None
):
"""
Makes a gradient step for a given optimizer
Args:
optimizer (Optimizer): the optimizer
optimizer_wds (List[float]): list of weight decay parameters
for each param group
grad_clip_fn (Callable): function for gradient clipping
"""
for group, wd in zip(optimizer.param_groups, optimizer_wds):
if wd > 0:
for param in group["params"]:
param.data = param.data.add(
-wd * group["lr"], param.data
)
if grad_clip_fn is not None:
grad_clip_fn(group["params"])
optimizer.step()
[docs] def on_stage_start(self, state: _State):
"""On stage start event"""
optimizer = state.get_key(
key="optimizer", inner_key=self.optimizer_key
)
assert optimizer is not None
lr = optimizer.defaults["lr"]
momentum = utils.get_optimizer_momentum(optimizer)
state.set_key(lr, "lr", inner_key=self.optimizer_key)
state.set_key(momentum, "momentum", inner_key=self.optimizer_key)
[docs] def on_epoch_start(self, state: _State):
"""On epoch start event"""
optimizer = state.get_key(
key="optimizer", inner_key=self.optimizer_key
)
if self.decouple_weight_decay:
self._optimizer_wd = [
group.get("weight_decay", 0.0)
for group in optimizer.param_groups
]
for i in range(len(optimizer.param_groups)):
safitty.set(
optimizer.param_groups, i, "weight_decay", value=0.0)
else:
self._optimizer_wd = [0.0] * len(optimizer.param_groups)
def _get_loss(self, state: _State) -> torch.Tensor:
loss = state.get_key(key="loss", inner_key=self.loss_key)
if isinstance(loss, list):
raise ValueError(
f"Loss is a list. "
f"Only the last value will be used for `backward`."
f"To aggregate losses into "
"one value use `CriterionAggregatorCallback`"
)
if isinstance(loss, dict):
error = f"Loss is a dict: {list(loss.keys())}, " \
f"to aggregate losses into " \
"one value use `CriterionAggregatorCallback`."
if self.loss_key is None:
error = error + " Or try to pass `loss_key` " \
"in the OptimizerCallback init"
raise ValueError(error)
return loss
[docs] def on_batch_start(self, state: _State):
"""On batch start event"""
state.loss = None
[docs] def on_batch_end(self, state: _State):
"""On batch end event"""
if not state.need_backward:
return
loss = self._get_loss(state)
self._accumulation_counter += 1
model = state.model
optimizer = state.get_key(
key="optimizer", inner_key=self.optimizer_key
)
need_gradient_step = \
(self._accumulation_counter + 1) % self.accumulation_steps == 0
# This is very hacky check whether we have AMP optimizer and this may
# change in future.
# But alternative solution is to have AmpOptimizerCallback.
# or expose another c'tor argument.
if hasattr(optimizer, "_amp_stash"):
from apex import amp
# Need to set ``delay_unscale``
# according to
# https://nvidia.github.io/apex/advanced.html#gradient-accumulation-across-iterations
delay_unscale = not need_gradient_step
with amp.scale_loss(
loss,
optimizer,
delay_unscale=delay_unscale
) as scaled_loss:
scaled_loss.backward()
else:
loss.backward()
if need_gradient_step:
self.grad_step(
optimizer=optimizer,
optimizer_wds=self._optimizer_wd,
grad_clip_fn=self.grad_clip_fn
)
if self.save_model_grads:
for tag, value in model.named_parameters():
tag = tag.replace(".", "/")
state.model_grads[tag] = value.grad.cpu().numpy()
utils.maybe_recursive_call(model, "zero_grad")
self._accumulation_counter = 0
[docs] def on_epoch_end(self, state: _State):
"""On epoch end event"""
if self.decouple_weight_decay:
optimizer = state.get_key(
key="optimizer", inner_key=self.optimizer_key
)
for i, wd in enumerate(self._optimizer_wd):
safitty.set(
optimizer.param_groups, i, "weight_decay", value=wd)
__all__ = ["OptimizerCallback"]