Source code for ai_marketplace_monitor.telegram

import textwrap
import time
from dataclasses import dataclass
from datetime import timedelta
from logging import Logger
from typing import TYPE_CHECKING, ClassVar, List

from .notification import PushNotificationConfig

if TYPE_CHECKING:
    import telegram


[docs] @dataclass class TelegramNotificationConfig(PushNotificationConfig): notify_method = "telegram" required_fields: ClassVar[List[str]] = ["telegram_token", "telegram_chat_id"] telegram_token: str | None = None telegram_chat_id: str | None = None # Enable rate limiting with Telegram-specific settings rate_limit_enabled: bool = True global_rate_limit: int = 30 # Telegram's higher limit # Telegram handles rate limiting in its own async _send_message_async # path — tell the base class not to also apply sync rate limiting. _handles_own_rate_limiting: bool = True
[docs] def handle_telegram_token(self: "TelegramNotificationConfig") -> None: if self.telegram_token is None: return if not isinstance(self.telegram_token, str) or not self.telegram_token: raise ValueError("An non-empty telegram_token is needed.") self.telegram_token = self.telegram_token.strip() # Validate token format: numbers:letters_and_numbers if ":" not in self.telegram_token: raise ValueError( "telegram_token must contain a colon (:) separating bot ID and secret." ) bot_id, secret = self.telegram_token.split(":", 1) if not bot_id.isdigit(): raise ValueError("telegram_token bot ID (before colon) must be numeric.") if not secret or not secret.replace("_", "").replace("-", "").isalnum(): raise ValueError( "telegram_token secret (after colon) must contain only alphanumeric characters, underscores, and hyphens." )
[docs] def handle_telegram_chat_id(self: "TelegramNotificationConfig") -> None: if self.telegram_chat_id is None: return if not isinstance(self.telegram_chat_id, str) or not self.telegram_chat_id: raise ValueError("An non-empty telegram_chat_id is needed.") self.telegram_chat_id = self.telegram_chat_id.strip() # Validate chat ID format: numeric (negative for groups) or @username if self.telegram_chat_id.startswith("@"): # Username format username = self.telegram_chat_id[1:] if not username or not username.replace("_", "").isalnum(): raise ValueError( "telegram_chat_id username must contain only alphanumeric characters and underscores." ) else: # Numeric format (can be negative for groups) try: int(self.telegram_chat_id) except ValueError as e: raise ValueError( "telegram_chat_id must be numeric or start with @ for usernames." ) from e
[docs] def send_message( self: "TelegramNotificationConfig", title: str, message: str, logger: Logger | None = None, ) -> bool: """Send message using asyncio.run to call async Telegram operations.""" import asyncio try: # Check if an event loop is already running try: asyncio.get_running_loop() # If we get here, an event loop is already running # We need to use asyncio.create_task or similar if logger: logger.debug("Event loop already running, using alternative async execution") # Create a new event loop in a thread to avoid conflicts import concurrent.futures def run_async(): # Create a new event loop for this thread new_loop = asyncio.new_event_loop() asyncio.set_event_loop(new_loop) try: return new_loop.run_until_complete( self._send_message_async(title, message, logger) ) finally: new_loop.close() with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(run_async) return future.result(timeout=60) # 60 second timeout for notification except RuntimeError: # No event loop running, safe to use asyncio.run if logger: logger.debug("No event loop running, using asyncio.run") return asyncio.run(self._send_message_async(title, message, logger)) except Exception as e: if logger: logger.error(f"Telegram notification failed: {e}") raise
def _split_message_at_boundaries( self: "TelegramNotificationConfig", text: str, max_length: int ) -> List[str]: """Split message at word boundaries while respecting character limit.""" return textwrap.wrap( text, width=max_length, break_long_words=False, break_on_hyphens=False ) def _is_group_chat(self: "TelegramNotificationConfig") -> bool: """Determine if the chat_id represents a group chat (negative ID or supergroup).""" if self.telegram_chat_id is None: return False # Group chats have negative IDs, individual chats have positive IDs # Usernames (@username) are treated as individual chats for rate limiting if self.telegram_chat_id.startswith("@"): return False try: chat_id_int = int(self.telegram_chat_id) return chat_id_int < 0 except ValueError: # If we can't parse as int, default to individual chat return False def _get_wait_time(self: "TelegramNotificationConfig") -> float: """Override for Telegram's group vs individual chat logic.""" if not self.rate_limit_enabled or self._last_send_time is None: return 0.0 elapsed = time.time() - self._last_send_time # Use different intervals: 1.1s for individual chats, 3.0s for groups min_interval = 3.0 if self._is_group_chat() else 1.1 return max(0.0, min_interval - elapsed) async def _wait_for_rate_limit( self: "TelegramNotificationConfig", logger: Logger | None = None ) -> None: """Override to provide Telegram-specific logging.""" if not self.rate_limit_enabled: return import asyncio # Check both per-instance and global rate limits instance_wait = self._get_wait_time() global_wait = self._get_global_wait_time() # Use the longer of the two wait times wait_time = max(instance_wait, global_wait) if wait_time > 0: if logger: if global_wait > instance_wait: logger.debug( f"Global rate limiting: waiting {wait_time:.1f} seconds (limit: {self.global_rate_limit} msg/sec)" ) else: chat_type = "group" if self._is_group_chat() else "individual" logger.debug( f"Rate limiting {chat_type} chat {self.telegram_chat_id}: waiting {wait_time:.1f} seconds" ) await asyncio.sleep(wait_time) # Record both per-instance and global send times self._last_send_time = time.time() self._record_global_send_time() async def _send_single_message_with_retry( self: "TelegramNotificationConfig", bot: "telegram.Bot", chat_id: str, text: str, logger: Logger | None = None, max_retries: int = 3, ) -> bool: """Send a single message with HTTP 429 retry handling.""" import asyncio import telegram for attempt in range(max_retries + 1): try: await bot.send_message(chat_id=chat_id, text=text, parse_mode="MarkdownV2") return True except telegram.error.RetryAfter as e: # Handle HTTP 429 with Retry-After header retry_after = e.retry_after # Convert timedelta to float seconds if needed for asyncio.sleep compatibility sleep_duration = ( retry_after.total_seconds() if isinstance(retry_after, timedelta) else float(retry_after) ) if logger: logger.warning( f"Telegram rate limit hit (429), waiting {sleep_duration} seconds (attempt {attempt + 1}/{max_retries + 1})" ) if attempt < max_retries: await asyncio.sleep(sleep_duration) continue else: if logger: logger.error(f"Max retries ({max_retries}) reached for 429 errors") return False except telegram.error.TelegramError as e: # Handle other Telegram errors with exponential backoff if attempt < max_retries: backoff_time = 2**attempt # Exponential backoff: 1s, 2s, 4s if logger: logger.warning( f"Telegram error: {e}, retrying in {backoff_time} seconds (attempt {attempt + 1}/{max_retries + 1})" ) await asyncio.sleep(backoff_time) continue else: if logger: logger.error( f"Max retries ({max_retries}) reached for Telegram errors: {e}" ) return False except Exception as e: # Handle unexpected errors if logger: logger.error(f"Unexpected error sending Telegram message: {e}") return False return False async def _send_message_async( self: "TelegramNotificationConfig", title: str, message: str, logger: Logger | None = None, ) -> bool: """Private async method to send messages using telegram.Bot.""" try: import telegram from telegram.helpers import escape_markdown except ImportError: if logger: logger.error("python-telegram-bot library is required for Telegram notifications") return False # Check for required Telegram configuration if self.telegram_token is None: if logger: logger.error("telegram_token is required but not configured") return False if self.telegram_chat_id is None: if logger: logger.error("telegram_chat_id is required but not configured") return False # Wait for rate limits before sending await self._wait_for_rate_limit(logger) try: bot = telegram.Bot(token=self.telegram_token) # Format message with MarkdownV2 escaping escaped_title = escape_markdown(title, version=2) escaped_message = escape_markdown(message, version=2) formatted_message = f"*{escaped_title}*\n\n{escaped_message}" # Telegram message length limit is 4096 characters max_message_length = 4096 # Check if message needs splitting if len(formatted_message) <= max_message_length: return await self._send_single_message_with_retry( bot, self.telegram_chat_id, formatted_message, logger ) # Split the ORIGINAL unescaped message to preserve MarkdownV2 formatting # Reserve space for title formatting and continuation indicators title_with_formatting = f"*{escaped_title}*\n\n" continuation_space = 15 # Space for " \(1/999\)" indicator available_for_message = ( max_message_length - len(title_with_formatting) - continuation_space ) # Split the original message (before escaping) to avoid breaking escape sequences message_parts = self._split_message_at_boundaries(message, available_for_message) total_parts = len(message_parts) # Send first message with title escaped_first_part = escape_markdown(message_parts[0], version=2) first_message = f"{title_with_formatting}{escaped_first_part}" if total_parts > 1: first_message += f" \\(1/{total_parts}\\)" success = await self._send_single_message_with_retry( bot, self.telegram_chat_id, first_message, logger ) if not success: return False # Send remaining parts without title for i, part in enumerate(message_parts[1:], 2): # Wait for rate limits before sending each additional part await self._wait_for_rate_limit(logger) escaped_part = escape_markdown(part, version=2) continuation_message = f"{escaped_part} \\({i}/{total_parts}\\)" success = await self._send_single_message_with_retry( bot, self.telegram_chat_id, continuation_message, logger ) if not success: return False return True except Exception as e: if logger: logger.error(f"Failed to send Telegram message: {e}") return False