Source code for catalyst.core.utils.callbacks
from typing import Dict, List, Union
from collections import OrderedDict
from catalyst.core.callback import CallbackNode
from catalyst.core.utils import get_rank
[docs]def sort_callbacks_by_order(
callbacks: Union[List, Dict, OrderedDict]
) -> OrderedDict:
"""Creates an sequence of callbacks and sort them.
Args:
callbacks: either list of callbacks or ordered dict
Returns:
sequence of callbacks sorted by ``callback order``
"""
if callbacks is None:
output = OrderedDict()
elif isinstance(callbacks, (dict, OrderedDict)):
output = [(k, v) for k, v in callbacks.items()]
output = sorted(output, key=lambda x: x[1].order)
output = OrderedDict(output)
elif isinstance(callbacks, list):
output = sorted(callbacks, key=lambda x: x.order)
output = OrderedDict([(i, value) for i, value in enumerate(output)])
else:
raise TypeError(
f"Callbacks must be either Dict/OrderedDict or list, "
f"got {type(callbacks)}"
)
return output
[docs]def filter_callbacks_by_node(
callbacks: Union[Dict, OrderedDict]
) -> Union[Dict, OrderedDict]:
"""
Filters callbacks based on running node.
Deletes worker-only callbacks from ``CallbackNode.Master``
and master-only callbacks from ``CallbackNode.Worker``.
Args:
callbacks (Union[Dict, OrderedDict]): callbacks
Returns:
Union[Dict, OrderedDict]: filtered callbacks dictionary.
"""
# distributed run setting
output = callbacks.copy()
rank = get_rank()
if rank == 0: # master node
# remove worker-only callbacks on master node
for k in list(
filter(lambda c: output[c].node == CallbackNode.Worker, output,)
):
del output[k]
elif rank > 0: # worker node
# remove master-only callbacks on worker nodes
for k in list(
filter(lambda c: output[c].node == CallbackNode.Master, output,)
):
del output[k]
return output
__all__ = ["sort_callbacks_by_order", "filter_callbacks_by_node"]