import sys
import time
from logging import Logger
from pathlib import Path
from typing import ClassVar, List
import humanize
import inflect
import rich
import schedule # type: ignore
from playwright.sync_api import Browser, Playwright, sync_playwright
from rich.pretty import pretty_repr
from rich.prompt import Prompt
from .ai import AIBackend, AIResponse
from .config import Config, supported_ai_backends, supported_marketplaces
from .listing import Listing
from .marketplace import Marketplace, TItemConfig, TMarketplaceConfig
from .notification import NotificationStatus
from .user import User
from .utils import (
CounterItem,
KeyboardMonitor,
SleepStatus,
Translator,
aimm_event,
amm_home,
cache,
calculate_file_hash,
counter,
doze,
hilight,
)
[docs]
class MarketplaceMonitor:
active_marketplaces: ClassVar = {}
def __init__(
self: "MarketplaceMonitor",
config_files: List[Path] | None,
headless: bool | None,
logger: Logger | None,
) -> None:
for file_path in config_files or []:
if not file_path.exists():
raise FileNotFoundError(f"Config file {file_path} not found.")
default_config = amm_home / "config.toml"
self.config_files = ([default_config] if default_config.exists() else []) + (
[x.expanduser().resolve() for x in config_files or []]
)
#
self.config: Config | None = None
self.config_hash: str | None = None
self.headless = headless
# When True, start_monitor blocks until every enabled marketplace
# has a username + password in the config. The web UI sets this
# so Playwright doesn't race the web UI for Facebook credentials.
self.defer_login_until_credentials: bool = False
self.ai_agents: List[AIBackend] = []
self.keyboard_monitor: KeyboardMonitor | None = None
self.playwright: Playwright = sync_playwright().start()
self.browser: Browser | None = None
self.logger = logger
[docs]
def load_config_file(self: "MarketplaceMonitor") -> Config:
"""Load the configuration file."""
last_invalid_hash = None
while True:
new_file_hash = calculate_file_hash(self.config_files)
config_changed = self.config_hash is None or new_file_hash != self.config_hash
if not config_changed:
assert self.config is not None
return self.config
try:
# if the config file is ok, break
assert self.logger is not None
self.config = Config(self.config_files, self.logger)
self.config_hash = new_file_hash
# self.logger.debug(self.config)
assert self.config is not None
return self.config
except KeyboardInterrupt:
raise
except Exception as e:
if last_invalid_hash != new_file_hash:
last_invalid_hash = new_file_hash
if self.logger:
self.logger.error(
f"""{hilight("[Config]", "fail")} Error parsing:\n\n{hilight(str(e), "fail")}\n\nPlease fix the configuration and I will try again as soon as you are done."""
)
doze(60, self.config_files, self.keyboard_monitor)
continue
def _launch_browser(self: "MarketplaceMonitor") -> Browser:
"""Launch a browser, preferring Chromium if available, otherwise any installed browser."""
# Try browsers in order of preference
browser_types = [
("chromium", self.playwright.chromium),
("firefox", self.playwright.firefox),
("webkit", self.playwright.webkit),
]
for browser_name, browser_type in browser_types:
try:
if self.logger:
self.logger.debug(f"Attempting to launch {browser_name} browser...")
browser = browser_type.launch(headless=self.headless)
if self.logger:
self.logger.info(
f"""{hilight("[Browser]", "info")} Successfully launched {browser_name} browser.""",
extra=aimm_event("browser_ready", engine=browser_name),
)
return browser
except Exception as e:
if self.logger:
self.logger.debug(f"Failed to launch {browser_name}: {e}")
continue
# If all fail, raise an error
raise RuntimeError(
"No browser could be launched. Please ensure Chromium, Firefox, or WebKit is installed."
)
[docs]
def load_ai_agents(self: "MarketplaceMonitor") -> None:
"""Load the AI agent."""
assert self.config is not None
for ai_config in (self.config.ai or {}).values():
if ai_config.enabled is False:
continue
if (
ai_config.provider is not None
and ai_config.provider.lower() in supported_ai_backends
):
ai_class = supported_ai_backends[ai_config.provider.lower()]
elif ai_config.name.lower() in supported_ai_backends:
ai_class = supported_ai_backends[ai_config.name.lower()]
else:
if self.logger:
self.logger.error(
f"""{hilight("[Config]", "fail")} Cannot determine an AI service provider from service name or provider."""
)
continue
try:
self.ai_agents.append(ai_class(config=ai_config, logger=self.logger))
# self.ai_agents[-1].connect()
# self.logger.info(
# f"""{hilight("[AI]", "succ")} Connected to {hilight(ai_config.name)}"""
# )
except KeyboardInterrupt:
raise
except Exception as e:
if self.logger:
self.logger.error(
f"""{hilight("[AI]", "fail")} Failed to connect to {hilight(ai_config.name, "fail")}: {e}"""
)
continue
[docs]
def search_item(
self: "MarketplaceMonitor",
marketplace_config: TMarketplaceConfig,
marketplace: Marketplace,
item_config: TItemConfig,
) -> None:
"""Search for an item on the marketplace."""
new_listings: List[Listing] = []
listing_ratings = []
# users to notify is determined from item, then marketplace, then all users
assert self.config is not None
users_to_notify = (
item_config.notify or marketplace_config.notify or list(self.config.user.keys())
)
for listing in marketplace.search(item_config):
# duplicated ID should not happen, but sellers could repost the same listing,
# potentially under different seller names
if listing.id in [x.id for x in new_listings] or listing.content in [
x.content for x in new_listings
]:
if self.logger:
self.logger.debug(f"Found duplicated result for {listing}")
continue
# if everyone has been notified
if all(
User(self.config.user[user], self.logger).notification_status(listing)
== NotificationStatus.NOTIFIED
for user in users_to_notify
):
if self.logger:
self.logger.info(
f"""{hilight("[Skip]", "info")} Already sent notification for item {hilight(listing.title)}, skipping.""",
extra=aimm_event(
"listing_skip",
reason="already_notified",
listing_id=listing.id,
title=listing.title,
item=item_config.name,
),
)
continue
# for x in self.find_new_items(found_items)
res = self.evaluate_by_ai(
listing, item_config=item_config, marketplace_config=marketplace_config
)
if self.logger:
if res.comment == AIResponse.NOT_EVALUATED:
if res.name:
self.logger.info(
f"""{hilight("[AI]", res.style)} {res.name or "AI"} did not evaluate {hilight(listing.title)}."""
)
else:
self.logger.info(
f"""{hilight("[AI]", res.style)} No AI available to evaluate {hilight(listing.title)}."""
)
else:
self.logger.info(
f"""{hilight("[AI]", res.style)} {res.name or "AI"} concludes {hilight(f"{res.conclusion} ({res.score}): {res.comment}", res.style)} for listing {hilight(listing.title)}.""",
extra=aimm_event(
"ai_eval",
listing_id=listing.id,
title=listing.title,
url=getattr(listing, "post_url", None)
or getattr(listing, "url", None),
price=getattr(listing, "price", None),
score=res.score,
conclusion=res.conclusion,
comment=res.comment,
ai_name=res.name,
item=item_config.name,
),
)
if item_config.rating:
acceptable_rating = item_config.rating[
0 if item_config.searched_count == 0 else -1
]
elif marketplace_config.rating:
acceptable_rating = marketplace_config.rating[
0 if item_config.searched_count == 0 else -1
]
else:
acceptable_rating = 3
if res.score < acceptable_rating:
if self.logger:
self.logger.info(
f"""{hilight("[Skip]", "fail")} Rating {hilight(f"{res.conclusion} ({res.score})")} for {listing.title} is below threshold {acceptable_rating}.""",
extra=aimm_event(
"listing_skip",
reason="below_threshold",
listing_id=listing.id,
title=listing.title,
item=item_config.name,
score=res.score,
threshold=acceptable_rating,
),
)
counter.increment(CounterItem.EXCLUDED_LISTING, item_config.name)
continue
new_listings.append(listing)
listing_ratings.append(res)
p = inflect.engine()
if self.logger:
self.logger.info(
f"""{hilight("[Search]", "succ" if len(new_listings) > 0 else "fail")} {hilight(str(len(new_listings)))} new {p.plural_noun("listing", len(new_listings))} for {item_config.name} {p.plural_verb("is", len(new_listings))} found.""",
extra=aimm_event(
"search_summary",
item=item_config.name,
marketplace=marketplace_config.name,
new_count=len(new_listings),
),
)
if new_listings:
counter.increment(
CounterItem.NEW_VALIDATED_LISTING, item_config.name, len(new_listings)
)
for user in users_to_notify:
User(self.config.user[user], logger=self.logger).notify(
new_listings, listing_ratings, item_config
)
time.sleep(5)
def _select_translator(
self: "MarketplaceMonitor", language: str | None = None
) -> Translator | None:
"""Select the language for the marketplace."""
# self.config.translator.get(marketplace_config.language, None)
assert self.config is not None
if not language:
return None
if language in self.config.translator:
return self.config.translator[language]
# if there is no exact match, we are going to match the language code
# e.g. 'en' to 'en_US'
if "_" in language:
# if a more general languge exists?
if language.split("_")[0] in self.config.translator:
translator = self.config.translator[language.split("_")[0]]
if self.logger:
self.logger.info(
f"""{hilight("[Translator]", "info")} Using language {language.split("_")[0]} (locale {translator.locale}) for {language} translation."""
)
return translator
# if not, we are going to match the language code
# e.g. 'en' to 'en_US'
for name, translator in self.config.translator.items():
if name.startswith(language.split("_")[0] + "_"):
if self.logger:
self.logger.info(
f"""{hilight("[Translator]", "info")} Using language {name} (locale {translator.locale}) for {language} translation."""
)
return translator
# if there is no match, we are going to match the language code
# e.g. 'en' to 'en_US'
for name, translator in self.config.translator.items():
if name.startswith(language + "_"):
if self.logger:
self.logger.info(
f"""{hilight("[Translator]", "info")} Using language {name} (locale {translator.locale}) for {language} translation."""
)
return translator
raise RuntimeError(f"Cannot find translator for language {language}.")
[docs]
def schedule_jobs(self: "MarketplaceMonitor") -> None:
"""Schedule jobs to run periodically."""
# we reload the config file each time when a scan action is completed
# this allows users to add/remove products dynamically.
self.load_config_file()
self.load_ai_agents()
assert self.config is not None
for marketplace_config in self.config.marketplace.values():
if marketplace_config.enabled is False:
continue
marketplace_class = supported_marketplaces[marketplace_config.name]
if marketplace_config.name in self.active_marketplaces:
marketplace = self.active_marketplaces[marketplace_config.name]
else:
marketplace = marketplace_class(
marketplace_config.name, self.browser, self.keyboard_monitor, self.logger
)
self.active_marketplaces[marketplace_config.name] = marketplace
# Configure might have been changed
marketplace.configure(
marketplace_config,
translator=self._select_translator(marketplace_config.language),
)
for item_config in self.config.item.values():
if item_config.enabled is False:
continue
if (
item_config.marketplace is None
or item_config.marketplace == marketplace_config.name
):
# wait for some time before next search
# interval (in minutes) can be defined both for the marketplace
# if there is any configuration file change, stop sleeping and search again
scheduled = None
start_at_list = item_config.start_at or marketplace_config.start_at
if start_at_list is not None and start_at_list:
for start_at in start_at_list:
if start_at.startswith("*:*:"):
# '*:*:12' to ':12'
if self.logger:
self.logger.info(
f"""{hilight("[Schedule]", "info")} Scheduling to search for {item_config.name} every minute at {start_at[3:]}s"""
)
scheduled = schedule.every().minute.at(start_at[3:])
elif start_at.startswith("*:"):
# '*:12:12' or '*:12'
if self.logger:
self.logger.info(
f"""{hilight("[Schedule]", "info")} Scheduling to search for {item_config.name} every hour at {start_at[1:]}m"""
)
scheduled = schedule.every().hour.at(
start_at[1:] if start_at.count(":") == 1 else start_at[2:]
)
else:
# '12:12:12' or '12:12'
if self.logger:
self.logger.info(
f"""{hilight("[Schedule]", "ss")} Scheduling to search for {item_config.name} every day at {start_at}"""
)
scheduled = schedule.every().day.at(start_at)
else:
search_interval = max(
item_config.search_interval
or marketplace_config.search_interval
or 30 * 60,
1,
)
max_search_interval = max(
item_config.max_search_interval
or marketplace_config.max_search_interval
or 60 * 60,
search_interval,
)
if self.logger:
self.logger.info(
f"""{hilight("[Schedule]", "info")} Scheduling to search for {item_config.name} every {humanize.naturaldelta(search_interval)} {"" if search_interval == max_search_interval else f"to {humanize.naturaldelta(max_search_interval)}"}"""
)
scheduled = schedule.every(search_interval).to(max_search_interval).seconds
if scheduled is None:
raise ValueError(
f"Cannot determine a schedule for {item_config.name} from configuration file."
)
scheduled.do(
self.search_item,
marketplace_config,
marketplace,
item_config,
).tag(item_config.name)
[docs]
def handle_pause(self: "MarketplaceMonitor") -> None:
"""Handle interruption signal."""
if self.keyboard_monitor is None or not self.keyboard_monitor.is_paused():
return
rich.print(counter)
if not self.keyboard_monitor.confirm():
return
# now we should go to an interactive session
while True:
while True:
url = (
Prompt.ask(
f"""\nEnter an {hilight("ID")} or a {hilight("URL")} to check, or {hilight("exit")}."""
)
.strip("\x1b")
.strip()
)
if not url.isnumeric() and not url.startswith("https://"):
if url.endswith("exit"):
url = "exit"
break
if url:
print(f'Invalid input "{url}". Please try again.')
else:
break
if url == "exit":
break
try:
self.check_items([url], for_item=None)
except KeyboardInterrupt:
raise
except Exception as e:
if self.logger:
self.logger.debug(f"Failed to check item {url}: {e}")
def _has_marketplace_credentials(self: "MarketplaceMonitor") -> bool:
"""True if every enabled marketplace has a username and password.
Used to defer launching the Playwright browser until the user has
provided credentials (typically via the web UI), to avoid the
confusing state of two places asking for Facebook login at once.
"""
assert self.config is not None
for mp in self.config.marketplace.values():
if getattr(mp, "enabled", True) is False:
continue
if not getattr(mp, "username", None) or not getattr(mp, "password", None):
return False
return True
def _wait_for_marketplace_credentials(self: "MarketplaceMonitor") -> None:
"""Block until config has marketplace credentials.
Reloads the config whenever the file changes on disk.
No-op if credentials are already present.
"""
assert self.config is not None
while not self._has_marketplace_credentials():
if self.logger:
self.logger.info(
f"""{hilight("[Login]", "info")} Waiting for Facebook credentials. Sign in via the web UI or add username/password under [marketplace.facebook] in your config. The Playwright browser will launch once credentials are available.""",
extra=aimm_event("credentials_wait", status="waiting"),
)
# doze wakes up on file change OR keyboard interrupt OR timeout.
doze(300, self.config_files, self.keyboard_monitor)
# File may have changed — reload the config (non-fatal on parse error).
try:
self.load_config_file()
except KeyboardInterrupt:
raise
except Exception as e:
if self.logger:
self.logger.debug(f"Config reload failed during credential wait: {e}")
continue
if self.logger:
self.logger.info(
f"""{hilight("[Login]", "succ")} Facebook credentials found — launching browser.""",
extra=aimm_event("credentials_wait", status="found"),
)
[docs]
def start_monitor(self: "MarketplaceMonitor") -> None:
"""Main function to monitor the marketplace."""
# start a browser with playwright, cannot use with statement since the jobs will be
# executed outside of the scope by schedule job runner
self.keyboard_monitor = KeyboardMonitor()
self.keyboard_monitor.start()
# Open a new browser page.
self.load_config_file()
assert self.config is not None
# If requested (by the web UI), defer browser launch until
# marketplace credentials are set. Without this, Playwright
# navigates to the Facebook login page and waits for manual
# input even though the user has a web UI open that's also
# asking for those same credentials.
if self.defer_login_until_credentials:
self._wait_for_marketplace_credentials()
self.browser = self._launch_browser()
#
assert self.browser is not None
while True:
self.handle_pause()
self.schedule_jobs()
if not schedule.get_jobs():
# this actually should not happen because at least one item is required for the configuration file
if self.logger:
self.logger.error(
"No search job is defined. Please add search items to your config file."
)
self.handle_pause()
if doze(60, self.config_files, self.keyboard_monitor) == SleepStatus.BY_KEYBOARD:
self.keyboard_monitor.set_paused(True)
continue
# run all jobs at the first time, then on their own schedule
# we could have used schedule.run_all() but we would like to check if
# configuration file has been changed, if so, clear all jobs and restart
for job in schedule.get_jobs():
job.run()
self.handle_pause()
# if configuration file has been changed, clear all scheduled jobs and restart
new_file_hash = calculate_file_hash(self.config_files)
assert self.config_hash is not None
if new_file_hash != self.config_hash:
if self.logger:
self.logger.info(
f"""{hilight("[Config]", "info")} Config file changed, restarting monitor."""
)
schedule.clear()
break
if not schedule.get_jobs():
continue
# subsequent runs will be scheduled runs
while True:
next_job: schedule.Job | None = None
for job in schedule.jobs:
if job.next_run is None:
continue
if next_job is None or (
next_job.next_run and next_job.next_run > job.next_run
):
next_job = job
if next_job is None:
# no more job
if self.logger:
self.logger.warning(
f"""{hilight("[Schedule]", "fail")} No more active search job."""
)
sys.exit(0)
# assert next_job is not None
assert next_job.next_run is not None
idle_seconds = schedule.idle_seconds() or 0
if idle_seconds > 60:
# the sleep time might not be enough, causing this message
# to be sent repeatedly. Having a idle_seconds > 60 helps
# to reduce the frequency of this message.
if self.logger:
self.logger.info(
f"""{hilight("[Schedule]", "info")} Next job to search {hilight(str(next(iter(next_job.tags))))} scheduled to run in {humanize.naturaldelta(idle_seconds)} at {next_job.next_run.strftime("%Y-%m-%d %H:%M:%S")}"""
)
# sleep at most 1 hr, and print updated "next job" message
res = doze(
min(max(5, int(idle_seconds)), 60 * 60),
self.config_files,
self.keyboard_monitor,
)
if res == SleepStatus.BY_FILE_CHANGE:
# if configuration file has been changed, clear all scheduled jobs and restart
new_file_hash = calculate_file_hash(self.config_files)
assert self.config_hash is not None
if new_file_hash != self.config_hash:
if self.logger:
self.logger.info(
f"""{hilight("[Config]", "info")} Config file changed, restarting monitor."""
)
schedule.clear()
break
elif res == SleepStatus.BY_KEYBOARD:
self.keyboard_monitor.set_paused(True)
self.handle_pause()
schedule.run_pending()
[docs]
def stop_monitor(self: "MarketplaceMonitor") -> None:
"""Stop the monitor."""
for marketplace in self.active_marketplaces.values():
marketplace.stop()
self.playwright.stop()
if self.keyboard_monitor:
self.keyboard_monitor.stop()
cache.close()
[docs]
def check_items(
self: "MarketplaceMonitor", items: List[str] | None = None, for_item: str | None = None
) -> None:
"""Main function to monitor the marketplace."""
# we reload the config file each time when a scan action is completed
# this allows users to add/remove products dynamically.
self.load_config_file()
if for_item is not None:
assert self.config is not None
if for_item not in self.config.item:
raise ValueError(
f"Item {for_item} not found in config, available items are {', '.join(self.config.item.keys())}."
)
self.load_ai_agents()
post_urls = []
for post_url in items or []:
if post_url.isnumeric():
post_url = f"https://www.facebook.com/marketplace/item/{post_url}/"
if not post_url.startswith("https://www.facebook.com/marketplace/item"):
raise ValueError(f"URL {post_url} is not a valid Facebook Marketplace URL.")
post_urls.append(post_url)
if not post_urls:
raise ValueError("No URLs to check.")
# Open a new browser page.
for post_url in post_urls or []:
# check if item in config
assert self.config is not None
# which marketplace to check it?
for marketplace_config in self.config.marketplace.values():
if marketplace_config.enabled is False:
continue
marketplace_class = supported_marketplaces[marketplace_config.name]
if marketplace_config.name in self.active_marketplaces:
marketplace = self.active_marketplaces[marketplace_config.name]
else:
marketplace = marketplace_class(
marketplace_config.name, None, None, self.logger
)
self.active_marketplaces[marketplace_config.name] = marketplace
# Configure might have been changed
marketplace.configure(
marketplace_config,
translator=self._select_translator(marketplace_config.language),
)
# do we need a browser?
if Listing.from_cache(post_url) is None:
if self.browser is None:
if self.logger:
self.logger.info(
f"""{hilight("[Search]", "info")} Starting a browser because the item was not checked before."""
)
self.browser = self._launch_browser()
marketplace.set_browser(self.browser)
# ignore enabled
if for_item is None:
# get by asking user
name = None
item_names = list(self.config.item.keys())
if len(item_names) > 1:
name = Prompt.ask(
f"""Enter name of {hilight("search item")}""", choices=item_names
)
item_config = self.config.item[name or item_names[0]]
else:
item_config = self.config.item[for_item]
# do not search, get the item details directly
listing_result = marketplace.get_listing_details(post_url, item_config)
# get_listing_details returns a tuple (Listing, bool) - unpack it properly
if isinstance(listing_result, tuple) and len(listing_result) == 2:
listing, from_cache = listing_result
else:
# Fallback - treat as direct listing (shouldn't happen but defensive)
listing = listing_result
if self.logger:
self.logger.info(
f"""{hilight("[Retrieve]", "succ")} Details of the item is found: {pretty_repr(listing)}"""
)
if self.logger:
self.logger.info(
f"""{hilight("[Search]", "succ")} Checking {post_url} for item {item_config.name} with configuration {pretty_repr(item_config)}"""
)
marketplace.check_listing(listing, item_config)
rating = self.evaluate_by_ai(
listing, item_config=item_config, marketplace_config=marketplace_config
)
if self.logger:
if rating.comment == AIResponse.NOT_EVALUATED:
if rating.name:
self.logger.info(
f"""{hilight("[AI]", rating.style)} {rating.name or "AI"} did not evaluate {hilight(listing.title)}."""
)
else:
self.logger.info(
f"""{hilight("[AI]", rating.style)} No AI available to evaluate {hilight(listing.title)}."""
)
else:
self.logger.info(
f"""{hilight("[AI]", rating.style)} {rating.name or "AI"} concludes {hilight(f"{rating.conclusion} ({rating.score}): {rating.comment}", rating.style)} for listing {hilight(listing.title)}."""
)
# notification status?
users_to_notify = (
item_config.notify
or marketplace_config.notify
or list(self.config.user.keys())
)
# for notification usages
listing.name = item_config.name
for user in users_to_notify:
ns = User(self.config.user[user], self.logger).notification_status(listing)
if self.logger:
if ns == NotificationStatus.NOTIFIED:
self.logger.info(
f"""{hilight("[Notify]", "succ")} Notified {user} about {post_url}."""
)
elif ns == NotificationStatus.EXPIRED:
self.logger.info(
f"""{hilight("[Notify]", "info")} Already notified {user} about {post_url}. The notification is ow expired."""
)
elif ns == NotificationStatus.LISTING_CHANGED:
self.logger.info(
f"""{hilight("[Notify]", "info")} Already notified {user} about {post_url}, but the listing is now changed."""
)
elif ns == NotificationStatus.LISTING_DISCOUNTED:
self.logger.info(
f"""{hilight("[Notify]", "info")} Already notified {user} about {post_url}, but the listing is now discounted."""
)
else:
self.logger.info(
f"""{hilight("[Notify]", "info")} Not notified {user} about {post_url} yet."""
)
# testing notification
# User(self.config.user[user], logger=self.logger).notify(
# [listing], [rating], item_config, force=True
# )
[docs]
def evaluate_by_ai(
self: "MarketplaceMonitor",
item: Listing,
item_config: TItemConfig,
marketplace_config: TMarketplaceConfig,
) -> AIResponse:
if item_config.ai is not None:
ai_agents = item_config.ai
elif marketplace_config.ai is not None:
ai_agents = marketplace_config.ai
else:
ai_agents = None
#
for agent in self.ai_agents:
if ai_agents is not None and agent.config.name not in ai_agents:
continue
try:
return agent.evaluate(item, item_config, marketplace_config)
except KeyboardInterrupt:
raise
except Exception as e:
if self.logger:
self.logger.error(
f"""{hilight("[AI]", "fail")} Failed to get an answer from {agent.config.name}: {e}"""
)
continue
return AIResponse(5, AIResponse.NOT_EVALUATED)