Shortcuts

Source code for catalyst.contrib.callbacks.telegram_logger

# flake8: noqa
# @TODO: code formatting issue for 20.07 release
from typing import List, TYPE_CHECKING
import logging
from urllib.parse import quote_plus
from urllib.request import Request, urlopen

from catalyst.core.callback import Callback, CallbackNode, CallbackOrder
from catalyst.settings import SETTINGS
from catalyst.utils.misc import format_metric, is_exception

if TYPE_CHECKING:
    from catalyst.core.runner import IRunner


[docs]class TelegramLogger(Callback): """ Logger callback, translates ``runner.metric_manager`` to telegram channel. """
[docs] def __init__( self, token: str = None, chat_id: str = None, metric_names: List[str] = None, log_on_stage_start: bool = True, log_on_loader_start: bool = True, log_on_loader_end: bool = True, log_on_stage_end: bool = True, log_on_exception: bool = True, ): """ Args: token: telegram bot's token, see https://core.telegram.org/bots chat_id: Chat unique identifier metric_names: List of metric names to log. if none - logs everything. log_on_stage_start: send notification on stage start log_on_loader_start: send notification on loader start log_on_loader_end: send notification on loader end log_on_stage_end: send notification on stage end log_on_exception: send notification on exception """ super().__init__(order=CallbackOrder.logging, node=CallbackNode.master) # @TODO: replace this logic with global catalyst config at ~/.catalyst self._token = token or SETTINGS.telegram_logger_token self._chat_id = chat_id or SETTINGS.telegram_logger_chat_id assert self._token is not None and self._chat_id is not None self._base_url = ( f"https://api.telegram.org/bot{self._token}/sendMessage" ) self.log_on_stage_start = log_on_stage_start self.log_on_loader_start = log_on_loader_start self.log_on_loader_end = log_on_loader_end self.log_on_stage_end = log_on_stage_end self.log_on_exception = log_on_exception self.metrics_to_log = metric_names
def _send_text(self, text: str): try: url = ( f"{self._base_url}?" f"chat_id={self._chat_id}&" f"disable_web_page_preview=1&" f"text={quote_plus(text, safe='')}" ) request = Request(url) urlopen(request) # noqa: S310 except Exception as e: logging.getLogger(__name__).warning(f"telegram.send.error:{e}")
[docs] def on_stage_start(self, runner: "IRunner"): """Notify about starting a new stage.""" if self.log_on_stage_start: text = f"{runner.stage_name} stage was started" self._send_text(text)
[docs] def on_loader_start(self, runner: "IRunner"): """Notify about starting running the new loader.""" if self.log_on_loader_start: text = ( f"{runner.loader_name} {runner.global_epoch} epoch has started" ) self._send_text(text)
[docs] def on_loader_end(self, runner: "IRunner"): """Translate ``runner.metric_manager`` to telegram channel.""" if self.log_on_loader_end: metrics = runner.loader_metrics if self.metrics_to_log is None: metrics_to_log = sorted(metrics.keys()) else: metrics_to_log = self.metrics_to_log rows: List[str] = [ f"{runner.loader_name} {runner.global_epoch}" f" epoch was finished:" ] for name in metrics_to_log: if name in metrics: rows.append(format_metric(name, metrics[name])) text = "\n".join(rows) self._send_text(text)
[docs] def on_stage_end(self, runner: "IRunner"): """Notify about finishing a stage.""" if self.log_on_stage_end: text = f"{runner.stage_name} stage was finished" self._send_text(text)
[docs] def on_exception(self, runner: "IRunner"): """Notify about raised ``Exception``.""" if self.log_on_exception: exception = runner.exception if is_exception(exception) and not isinstance( exception, KeyboardInterrupt ): text = ( f"`{type(exception).__name__}` exception was raised:\n" f"{exception}" ) self._send_text(text)
__all__ = ["TelegramLogger"]