import copy
import hashlib
import json
import os
import random
import re
import time
from dataclasses import asdict, dataclass, fields
from enum import Enum
from logging import Logger
from pathlib import Path
from typing import Any, Dict, List, Tuple, TypeVar
import parsedatetime # type: ignore
import requests # type: ignore
import rich
from diskcache import Cache # type: ignore
from playwright.sync_api import ProxySettings
from pyparsing import (
CharsNotIn,
Keyword,
ParserElement,
ParseResults,
Word,
alphanums,
infix_notation,
opAssoc,
)
from requests.exceptions import RequestException, Timeout # type: ignore
from rich.pretty import pretty_repr
try:
from pynput import keyboard # type: ignore
pynput_enabled = os.environ.get("DISABLE_PYNPUT", "").lower() not in ("1", "y", "true", "yes")
except ImportError:
# some platforms are not supported
pynput_enabled = False
import io
import rich.pretty
from PIL import Image
from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers import Observer
# home directory for all settings and caches
amm_home = Path.home() / ".ai-marketplace-monitor"
amm_home.mkdir(parents=True, exist_ok=True)
cache = Cache(amm_home)
TConfigType = TypeVar("TConfigType", bound="BaseConfig")
[docs]
class SleepStatus(Enum):
NOT_DISRUPTED = 0
BY_KEYBOARD = 1
BY_FILE_CHANGE = 2
[docs]
def aimm_event(kind: str, **fields: Any) -> Dict[str, Any]:
"""Build a structured-event payload for a log call.
Usage:
logger.info(message, extra=aimm_event("ai_eval", score=5, ...))
The web UI surfaces these structured fields in its filter dropdowns
(kind / item / score) and in the expand-row detail pane.
"""
return {"aimm": {"kind": kind, **fields}}
[docs]
class CacheType(Enum):
LISTING_DETAILS = "listing-details"
AI_INQUIRY = "ai-inquiries"
USER_NOTIFIED = "user-notifications"
COUNTERS = "counters"
[docs]
class CounterItem(Enum):
SEARCH_PERFORMED = "Search performed"
LISTING_EXAMINED = "Total listing examined"
LISTING_QUERY = "New listing fetched"
EXCLUDED_LISTING = "Listing excluded"
NEW_VALIDATED_LISTING = "New validated listing"
AI_QUERY = "Total AI Queries"
NEW_AI_QUERY = "New AI Queries"
FAILED_AI_QUERY = "Failed AI Queries)"
NOTIFICATIONS_SENT = "Notifications sent"
REMINDERS_SENT = "Reminders sent"
[docs]
class Currency(Enum):
USD = "USD"
JPY = "JPY"
BGN = "BGN"
CYP = "CYP"
EUR = "EUR"
CZK = "CZK"
DKK = "DKK"
EEK = "EEK"
GBP = "GBP"
HUF = "HUF"
LTL = "LTL"
LVL = "LVL"
MTL = "MTL"
PLN = "PLN"
ROL = "ROL"
RON = "RON"
SEK = "SEK"
SIT = "SIT"
SKK = "SKK"
CHF = "CHF"
ISK = "ISK"
NOK = "NOK"
HRK = "HRK"
RUB = "RUB"
TRL = "TRL"
TRY = "TRY"
AUD = "AUD"
BRL = "BRL"
CAD = "CAD"
CNY = "CNY"
HKD = "HKD"
IDR = "IDR"
ILS = "ILS"
INR = "INR"
KRW = "KRW"
MXN = "MXN"
MYR = "MYR"
NZD = "NZD"
PHP = "PHP"
SGD = "SGD"
THB = "THB"
ZAR = "ZAR"
ARS_unsupported = "ARS"
[docs]
class KeyboardMonitor:
confirm_character = "c"
def __init__(self: "KeyboardMonitor") -> None:
self._paused: bool = False
self._listener: keyboard.Listener | None = None
self._sleeping: bool = False
self._confirmed: bool | None = None
[docs]
def start(self: "KeyboardMonitor") -> None:
if pynput_enabled:
self._listener = keyboard.Listener(on_press=self.handle_key_press)
self._listener.start() # start to listen on a separate thread
[docs]
def stop(self: "KeyboardMonitor") -> None:
if self._listener:
self._listener.stop() # stop the listener
[docs]
def start_sleeping(self: "KeyboardMonitor") -> None:
self._sleeping = True
[docs]
def confirm(self: "KeyboardMonitor", msg: str | None = None) -> bool:
self._confirmed = False
rich.print(
msg
or f"Press {hilight(self.confirm_character)} to enter interactive mode in 10 seconds: ",
end="",
flush=True,
)
try:
count = 0
while self._confirmed is False:
time.sleep(0.1)
if self._confirmed:
return True
count += 1
# wait a total of 10s
if count > 100:
break
return self._confirmed
finally:
# whether or not confirm is successful, reset paused and confirmed flag
self._paused = False
self._confirmed = None
[docs]
def is_sleeping(self: "KeyboardMonitor") -> bool:
return self._sleeping
[docs]
def is_paused(self: "KeyboardMonitor") -> bool:
return self._paused
[docs]
def is_confirmed(self: "KeyboardMonitor") -> bool:
return self._confirmed is True
[docs]
def set_paused(self: "KeyboardMonitor", paused: bool = True) -> None:
self._paused = paused
if pynput_enabled:
def handle_key_press(
self: "KeyboardMonitor", key: keyboard.Key | keyboard.KeyCode | None
) -> None:
# is sleeping, wake up
if self._sleeping:
if key == keyboard.Key.esc:
self._sleeping = False
return
# if waiting for confirmation, set confirm
if self._confirmed is False:
if getattr(key, "char", "") == self.confirm_character:
self._confirmed = True
return
# if being paused
if self.is_paused():
if key == keyboard.Key.esc:
print("Still searching ... will pause as soon as I am done.")
return
if key == keyboard.Key.esc:
print("Pausing search ...")
self._paused = True
[docs]
class Counter:
[docs]
def increment(self: "Counter", counter_key: CounterItem, item_name: str, by: int = 1) -> None:
key = (CacheType.COUNTERS.value, counter_key.value, item_name)
try:
cache.incr(key, by, default=None)
except KeyError:
# if key does not exist, set it to by, and set tag
cache.set(key, by, tag=CacheType.COUNTERS.value)
def __str__(self: "Counter") -> str:
"""Return pretty form of all non-zero counters"""
# this is super inefficient. Thankfully we are not calling this often.
# See https://github.com/grantjenks/python-diskcache/issues/341
# for details
counters = {
key: cache.get(key) for key in cache.iterkeys() if key[0] == CacheType.COUNTERS.value
}
item_names = {x[2] for x in counters.keys()}
cnts = {}
for item_name in item_names:
# per-item statistics
cnts[item_name] = {
x.value: counters.get((CacheType.COUNTERS.value, x.value, item_name), 0)
for x in CounterItem
if counters.get((CacheType.COUNTERS.value, x.value, item_name), 0)
}
# total statistics
cnts["Total"] = {
x.value: sum(
counters.get((CacheType.COUNTERS.value, x.value, item_name), 0)
for item_name in item_names
)
for x in CounterItem
if sum(
counters.get((CacheType.COUNTERS.value, x.value, item_name), 0)
for item_name in item_names
)
}
return pretty_repr(cnts)
counter = Counter()
[docs]
def hash_dict(obj: Dict[str, Any]) -> str:
"""Hash a dictionary to a string."""
dict_string = json.dumps(obj).encode("utf-8")
return hashlib.sha256(dict_string).hexdigest()
[docs]
@dataclass
class BaseConfig:
name: str
enabled: bool | None = None
def __post_init__(self: "BaseConfig") -> None:
"""Handle all methods that start with 'handle_' in the dataclass."""
for f in fields(self):
# test the type of field f, if it is a string or a list of string
# try to expand the string with environment variables
fvalue = getattr(self, f.name)
if isinstance(fvalue, str):
setattr(self, f.name, self._value_from_environ(fvalue))
elif isinstance(fvalue, list) and all(isinstance(x, str) for x in fvalue):
setattr(self, f.name, [self._value_from_environ(x) for x in fvalue])
handle_method = getattr(self, f"handle_{f.name}", None)
if handle_method:
handle_method()
def _value_from_environ(self: "BaseConfig", key: str) -> str | None:
"""Replace key with value from an environment variable if it has a format of ${KEY}.
Returns None (with a warning) when the variable is not set, so
that optional credentials degrade gracefully to anonymous mode.
"""
if not isinstance(key, str) or not key.startswith("${") or not key.endswith("}"):
return key
var_name = key[2:-1]
if var_name not in os.environ:
import warnings
warnings.warn(
f"Environment variable {var_name} is not set — ignored.",
stacklevel=2,
)
return None
return os.environ[var_name]
[docs]
def handle_enabled(self: "BaseConfig") -> None:
if self.enabled is None:
return
if not isinstance(self.enabled, bool):
raise ValueError(f"Item {hilight(self.name)} enabled must be a boolean.")
@property
def hash(self: "BaseConfig") -> str:
return hash_dict(asdict(self))
[docs]
@dataclass
class MonitorConfig(BaseConfig):
proxy_server: List[str] | None = None
proxy_bypass: str | None = None
proxy_username: str | None = None
proxy_password: str | None = None
[docs]
def handle_proxy_server(self: "MonitorConfig") -> None:
if self.proxy_server is None:
return
if isinstance(self.proxy_server, str):
self.proxy_server = [self.proxy_server]
if not all(isinstance(x, str) for x in self.proxy_server):
raise ValueError(f"Item {hilight(self.name)} proxy_server must be a string.")
if not all(x.startswith("http://") or x.startswith("https://") for x in self.proxy_server):
raise ValueError(
f"Item {hilight(self.name)} proxy_server must start with http:// or https://"
)
[docs]
def handle_proxy_bypass(self: "MonitorConfig") -> None:
if self.proxy_bypass is None:
return
if not isinstance(self.proxy_bypass, str):
raise ValueError(f"Item {hilight(self.name)} proxy_bypass must be a string.")
[docs]
def handle_proxy_username(self: "MonitorConfig") -> None:
if self.proxy_username is None:
return
if not isinstance(self.proxy_username, str):
raise ValueError(f"Item {hilight(self.name)} proxy_username must be a string.")
[docs]
def handle_proxy_password(self: "MonitorConfig") -> None:
if self.proxy_password is None:
return
if not isinstance(self.proxy_password, str):
raise ValueError(f"Item {hilight(self.name)} proxy_password must be a string.")
[docs]
def get_proxy_options(self: "MonitorConfig") -> ProxySettings | None:
if not self.proxy_server:
return None
res = ProxySettings(server=random.choice(self.proxy_server))
if self.proxy_username and self.proxy_password:
res["username"] = self.proxy_username
res["password"] = self.proxy_password
if self.proxy_bypass:
res["bypass"] = self.proxy_bypass
return res
[docs]
def calculate_file_hash(file_paths: List[Path]) -> str:
"""Calculate the SHA-256 hash of the file content."""
hasher = hashlib.sha256()
# they should exist, just to make sure
for file_path in file_paths:
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
#
with open(file_path, "rb") as file:
while chunk := file.read(8192):
hasher.update(chunk)
return hasher.hexdigest()
[docs]
def merge_dicts(dicts: list) -> dict:
"""Merge a list of dictionaries into a single dictionary, including nested dictionaries.
:param dicts: A list of dictionaries to merge.
:return: A single merged dictionary.
"""
def merge(d1: dict, d2: dict) -> dict:
for key, value in d2.items():
if key in d1:
if isinstance(d1[key], dict) and isinstance(value, dict):
d1[key] = merge(d1[key], value)
elif isinstance(d1[key], list) and isinstance(value, list):
d1[key].extend(value)
else:
d1[key] = value
else:
d1[key] = value
return d1
result: Dict[str, Any] = {}
for dictionary in dicts:
result = merge(result, dictionary)
return result
[docs]
def normalize_string(string: str) -> str:
"""Normalize a string by replacing multiple spaces (including space, tab, and newline) with a single space."""
return re.sub(r"\s+", " ", string).lower()
ParserElement.enable_packrat()
double_quoted_string = ('"' + CharsNotIn('"').leaveWhitespace() + '"').setParseAction(
lambda t: t[1]
) # removes quotes, keeps only the content
single_quoted_string = ("'" + CharsNotIn("'").leaveWhitespace() + "'").setParseAction(
lambda t: t[1]
) # removes quotes, keeps only the content
special_chars = "!@#$%^&*-_=+[]{}|;:'\",.<>?/\\`~"
unquoted_string = Word(alphanums + special_chars)
operand = double_quoted_string | single_quoted_string | unquoted_string
and_op = Keyword("AND")
or_op = Keyword("OR")
not_op = Keyword("NOT")
# Define the grammar for parsing
expr = infix_notation(
operand,
[
(not_op, 1, opAssoc.RIGHT),
(and_op, 2, opAssoc.LEFT),
(or_op, 2, opAssoc.LEFT),
],
)
[docs]
def is_substring(
var1: str | List[str], var2: str | List[str], logger: Logger | None = None
) -> bool:
"""Check if var1 is a substring of var2, after normalizing both strings. One of them can be a list of strings.
var1: can be a single string, or a list of string, for which a condition of OR is assumed.
this program will parse var11 for "AND", "OR" and "NOT", and return the results of the
logical expression.
var2: one or more strings for testing if strings in "var1" is a substring.
"""
if isinstance(var1, list):
return any(is_substring(x, var2, logger) for x in var1)
# parse the expression
parsed = ""
try:
parsed = expr.parseString(var1, parseAll=True)[0]
except Exception:
# treat var1 as literal string for searching.
if any(x in var1 for x in (" AND ", " OR ", " NOT ", "(NOT ")) or var1.startswith("NOT "):
if logger:
logger.warning(
f"Failed to parse {var1} as a logical expression. Treating it as literal string."
)
if isinstance(var2, str):
return normalize_string(var1) in normalize_string(var2)
return any(normalize_string(var1) in normalize_string(s2) for s2 in var2)
def evaluate_expression(parsed_expression: str | ParseResults) -> bool:
if isinstance(parsed_expression, str):
if isinstance(var2, str):
return normalize_string(parsed_expression) in normalize_string(var2)
return any(normalize_string(parsed_expression) in normalize_string(s) for s in var2)
if len(parsed_expression) == 1:
return evaluate_expression(parsed_expression[0])
if parsed_expression[0] == "NOT":
return not evaluate_expression(parsed_expression[1])
if parsed_expression[-2] == "AND":
return evaluate_expression(parsed_expression[:-2]) and evaluate_expression(
parsed_expression[-1]
)
if parsed_expression[-2] == "OR":
return evaluate_expression(parsed_expression[:-2]) or evaluate_expression(
parsed_expression[-1]
)
if logger:
logger.error(f"Invalid expression: {parsed_expression}")
return False
return evaluate_expression(parsed)
[docs]
class ChangeHandler(FileSystemEventHandler):
def __init__(self: "ChangeHandler", files: List[str]) -> None:
self.changed = False
# Normalize to real paths — on macOS /var/folders is a symlink
# to /private/var/folders and watchdog reports the resolved form.
self.files = {os.path.realpath(f) for f in files}
def _mark_if_watched(self: "ChangeHandler", path: "str | bytes | None") -> None:
if path and os.path.realpath(path) in self.files:
self.changed = True
[docs]
def on_modified(self: "ChangeHandler", event: FileSystemEvent) -> None:
if not event.is_directory:
self._mark_if_watched(event.src_path)
[docs]
def on_created(self: "ChangeHandler", event: FileSystemEvent) -> None:
# Atomic writes via os.replace() may appear as a create on the
# destination path (depending on platform + watchdog backend).
if not event.is_directory:
self._mark_if_watched(event.src_path)
[docs]
def on_deleted(self: "ChangeHandler", event: FileSystemEvent) -> None:
# On macOS, os.replace() over an existing file fires a 'deleted'
# event on the destination path, not 'moved'. Treat it as a change.
if not event.is_directory:
self._mark_if_watched(event.src_path)
[docs]
def on_moved(self: "ChangeHandler", event: FileSystemEvent) -> None:
# On Linux (inotify), atomic writes via tempfile + os.replace()
# land here: src_path is the temp file, dest_path is the real one.
if not event.is_directory:
self._mark_if_watched(getattr(event, "dest_path", None))
self._mark_if_watched(event.src_path)
[docs]
def doze(
duration: int, files: List[Path] | None = None, keyboard_monitor: KeyboardMonitor | None = None
) -> SleepStatus:
"""Sleep for a specified duration while monitoring the change of files.
Return:
0: if doze was done naturally.
1: if doze was disrupted by keyboard
2: if doze was disrupted by file change
"""
event_handler = ChangeHandler([str(x) for x in (files or [])])
observers = []
if keyboard_monitor:
keyboard_monitor.start_sleeping()
for filename in files or []:
if not filename.exists():
raise FileNotFoundError(f"File not found: {filename}")
observer = Observer()
# we can only monitor a directory
observer.schedule(event_handler, str(filename.parent), recursive=False)
observer.start()
observers.append(observer)
start_time = time.time()
try:
while time.time() - start_time < duration:
if event_handler.changed:
return SleepStatus.BY_FILE_CHANGE
time.sleep(1)
if keyboard_monitor and not keyboard_monitor.is_sleeping():
return SleepStatus.BY_KEYBOARD
return SleepStatus.NOT_DISRUPTED
finally:
for observer in observers:
observer.stop()
observer.join()
[docs]
def convert_to_seconds(time_str: str) -> int:
cal = parsedatetime.Calendar(version=parsedatetime.VERSION_CONTEXT_STYLE)
time_struct, _ = cal.parse(time_str)
return int(time.mktime(time_struct) - time.mktime(time.localtime()))
[docs]
def hilight(text: str, style: str = "name") -> str:
"""Highlight the keywords in the text with the specified color."""
color = {
"name": "cyan",
"fail": "red",
"info": "blue",
"succ": "green",
"dim": "gray",
}.get(style, "blue")
return f"[{color}]{text}[/{color}]"
[docs]
def fetch_with_retry(
url: str,
timeout: int = 10,
max_retries: int = 3,
backoff_factor: float = 1.5,
logger: Logger | None = None,
) -> Tuple[bytes, str] | None:
"""Fetch URL content with retry logic
Args:
url: URL to fetch
timeout: Timeout in seconds
max_retries: Maximum number of retry attempts
backoff_factor: Multiplier for exponential backoff
logger: logger object
Returns:
Tuple of (content, content_type) if successful, None if failed
"""
if logger:
logger.debug(f"Fetching {url} with timeout {timeout}s")
for attempt in range(max_retries):
try:
response = requests.get(
url,
timeout=timeout,
stream=True, # Good practice for downloading files
)
response.raise_for_status() # Raises exception for 4XX/5XX status codes
return response.content, response.headers["Content-Type"]
except Timeout:
wait_time = backoff_factor**attempt
if logger:
logger.warning(
f"Timeout fetching {url} (attempt {attempt + 1}/{max_retries}). "
f"Waiting {wait_time:.1f}s before retry"
)
if attempt < max_retries - 1:
time.sleep(wait_time)
except RequestException as e:
if logger:
logger.error(f"Error fetching {url}: {e!s}")
return None
if logger:
logger.error(f"Failed to fetch {url} after {max_retries} attempts")
return None
[docs]
def resize_image_data(image_data: bytes, max_width: int = 800, max_height: int = 600) -> bytes:
# Create image object from binary data
try:
image = Image.open(io.BytesIO(image_data))
if image.format == "GIF":
return image_data
except Exception:
# if unacceptable file format, just return
return image_data
# Calculate new dimensions maintaining aspect ratio
width, height = image.size
ratio = min(max_width / width, max_height / height)
if ratio >= 1:
return image_data
new_width = int(width * ratio)
new_height = int(height * ratio)
# Resize image
resized_image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)
# Convert back to bytes
buffer = io.BytesIO()
resized_image.save(buffer, format=image.format)
return buffer.getvalue()
[docs]
class Translator:
def __init__(
self: "Translator", locale: str | None = None, dictionary: Dict[str, str] | None = None
) -> None:
self.locale = locale
self._dictionary: Dict[str, str] = copy.deepcopy(dictionary or {})
def __call__(self: "Translator", word: str) -> str:
"""Return translated version"""
return self._dictionary.get(word, word)