Source code for ai_marketplace_monitor.facebook

import datetime
import os
import re
import time
from dataclasses import dataclass
from enum import Enum
from itertools import repeat
from logging import Logger
from typing import Any, Generator, List, Tuple, Type, cast
from urllib.parse import quote

import humanize
from currency_converter import CurrencyConverter  # type: ignore
from playwright.sync_api import Browser, ElementHandle, Page  # type: ignore
from rich.pretty import pretty_repr

from .listing import Listing
from .marketplace import ItemConfig, Marketplace, MarketplaceConfig, WebPage
from .utils import (
    BaseConfig,
    CounterItem,
    KeyboardMonitor,
    Translator,
    convert_to_seconds,
    counter,
    doze,
    extract_price,
    hilight,
    is_substring,
)


[docs] class Condition(Enum): NEW = "new" USED_LIKE_NEW = "used_like_new" USED_GOOD = "used_good" USED_FAIR = "used_fair"
[docs] class DateListed(Enum): ANYTIME = 0 PAST_24_HOURS = 1 PAST_WEEK = 7 PAST_MONTH = 30
[docs] class DeliveryMethod(Enum): LOCAL_PICK_UP = "local_pick_up" SHIPPING = "shipping" ALL = "all"
[docs] class Availability(Enum): ALL = "all" INSTOCK = "in" OUTSTOCK = "out"
[docs] class Category(Enum): VEHICLES = "vehicles" PROPERTY_RENTALS = "propertyrentals" APPAREL = "apparel" ELECTRONICS = "electronics" ENTERTAINMENT = "entertainment" FAMILY = "family" FREE_STUFF = "freestuff" FREE = "free" GARDEN = "garden" HOBBIES = "hobbies" HOME_GOODS = "homegoods" HOME_IMPROVEMENT = "homeimprovement" HOME_SALES = "homesales" MUSICAL_INSTRUMENTS = "musicalinstruments" OFFICE_SUPPLIES = "officesupplies" PET_SUPPLIES = "petsupplies" SPORTING_GOODS = "sportinggoods" TICKETS = "tickets" TOYS = "toys" VIDEO_GAMES = "videogames"
[docs] @dataclass class FacebookMarketItemCommonConfig(BaseConfig): """Item options that can be defined in marketplace This class defines and processes options that can be specified in both marketplace and item sections, specific to facebook marketplace """ seller_locations: List[str] | None = None availability: List[str] | None = None condition: List[str] | None = None date_listed: List[int] | None = None delivery_method: List[str] | None = None category: str | None = None
[docs] def handle_seller_locations(self: "FacebookMarketItemCommonConfig") -> None: if self.seller_locations is None: return if isinstance(self.seller_locations, str): self.seller_locations = [self.seller_locations] if not isinstance(self.seller_locations, list) or not all( isinstance(x, str) for x in self.seller_locations ): raise ValueError(f"Item {hilight(self.name)} seller_locations must be a list.")
[docs] def handle_availability(self: "FacebookMarketItemCommonConfig") -> None: if self.availability is None: return if isinstance(self.availability, str): self.availability = [self.availability] if not all(val in [x.value for x in Availability] for val in self.availability): raise ValueError( f"Item {hilight(self.name)} availability must be one or two values of 'all', 'in', and 'out'." ) if len(self.availability) > 2: raise ValueError( f"Item {hilight(self.name)} availability must be one or two values of 'all', 'in', and 'out'." )
[docs] def handle_condition(self: "FacebookMarketItemCommonConfig") -> None: if self.condition is None: return if isinstance(self.condition, Condition): self.condition = [self.condition] if not isinstance(self.condition, list) or not all( isinstance(x, str) and x in [cond.value for cond in Condition] for x in self.condition ): raise ValueError( f"Item {hilight(self.name)} condition must be one or more of that can be one of 'new', 'used_like_new', 'used_good', 'used_fair'." )
[docs] def handle_date_listed(self: "FacebookMarketItemCommonConfig") -> None: if self.date_listed is None: return if not isinstance(self.date_listed, list): self.date_listed = [self.date_listed] # new_values: List[int] = [] for val in self.date_listed: if isinstance(val, str): if val.isdigit(): new_values.append(int(val)) elif val.lower() == "all": new_values.append(DateListed.ANYTIME.value) elif val.lower() == "last 24 hours": new_values.append(DateListed.PAST_24_HOURS.value) elif val.lower() == "last 7 days": new_values.append(DateListed.PAST_WEEK.value) elif val.lower() == "last 30 days": new_values.append(DateListed.PAST_MONTH.value) else: raise ValueError( f"""Item {hilight(self.name)} date_listed must be one of 1, 7, and 30, or All, Last 24 hours, Last 7 days, Last 30 days.: {self.date_listed} provided.""" ) elif isinstance(val, (int, float)): if int(val) not in [x.value for x in DateListed]: raise ValueError( f"""Item {hilight(self.name)} date_listed must be one of 1, 7, and 30, or All, Last 24 hours, Last 7 days, Last 30 days.: {self.date_listed} provided.""" ) new_values.append(int(val)) else: raise ValueError( f"""Item {hilight(self.name)} date_listed must be one of 1, 7, and 30, or All, Last 24 hours, Last 7 days, Last 30 days.: {self.date_listed} provided.""" ) # new_values should have length 1 or 2 if len(new_values) > 2: raise ValueError( f"""Item {hilight(self.name)} date_listed must have one or two values.""" ) self.date_listed = new_values
[docs] def handle_delivery_method(self: "FacebookMarketItemCommonConfig") -> None: if self.delivery_method is None: return if isinstance(self.delivery_method, str): self.delivery_method = [self.delivery_method] if len(self.delivery_method) > 2: raise ValueError( f"Item {hilight(self.name)} delivery_method must be one or two values of 'local_pick_up' and 'shipping'." ) if not isinstance(self.delivery_method, list) or not all( val in [x.value for x in DeliveryMethod] for val in self.delivery_method ): raise ValueError( f"Item {hilight(self.name)} delivery_method must be one of 'local_pick_up' and 'shipping'." )
[docs] def handle_category(self: "FacebookMarketItemCommonConfig") -> None: if self.category is None: return if not isinstance(self.category, str) or self.category not in [x.value for x in Category]: raise ValueError( f"Item {hilight(self.name)} category must be one of {', '.join(x.value for x in Category)}." )
[docs] @dataclass class FacebookMarketplaceConfig(MarketplaceConfig, FacebookMarketItemCommonConfig): """Options specific to facebook marketplace This class defines and processes options that can be specified in the marketplace.facebook section only. None of the options are required. """ login_wait_time: int | None = None password: str | None = None username: str | None = None
[docs] def handle_username(self: "FacebookMarketplaceConfig") -> None: if self.username is None: self.username = os.environ.get("FACEBOOK_USERNAME") if self.username is None: return if not isinstance(self.username, str): raise ValueError(f"Marketplace {self.name} username must be a string.")
[docs] def handle_password(self: "FacebookMarketplaceConfig") -> None: if self.password is None: self.password = os.environ.get("FACEBOOK_PASSWORD") if self.password is None: return if not isinstance(self.password, str): raise ValueError(f"Marketplace {self.name} password must be a string.")
[docs] def handle_login_wait_time(self: "FacebookMarketplaceConfig") -> None: if self.login_wait_time is None: return if isinstance(self.login_wait_time, str): try: self.login_wait_time = convert_to_seconds(self.login_wait_time) except KeyboardInterrupt: raise except Exception as e: raise ValueError( f"Marketplace {self.name} login_wait_time {self.login_wait_time} is not recognized." ) from e if not isinstance(self.login_wait_time, int) or self.login_wait_time < 0: raise ValueError( f"Marketplace {self.name} login_wait_time should be a non-negative number." )
[docs] @dataclass class FacebookItemConfig(ItemConfig, FacebookMarketItemCommonConfig): pass
[docs] class FacebookMarketplace(Marketplace): initial_url = "https://www.facebook.com/login/device-based/regular/login/" name = "facebook" def __init__( self: "FacebookMarketplace", name: str, browser: Browser | None, keyboard_monitor: KeyboardMonitor | None = None, logger: Logger | None = None, ) -> None: assert name == self.name super().__init__(name, browser, keyboard_monitor, logger) self.page: Page | None = None
[docs] @classmethod def get_config(cls: Type["FacebookMarketplace"], **kwargs: Any) -> FacebookMarketplaceConfig: return FacebookMarketplaceConfig(**kwargs)
[docs] @classmethod def get_item_config(cls: Type["FacebookMarketplace"], **kwargs: Any) -> FacebookItemConfig: return FacebookItemConfig(**kwargs)
[docs] def login(self: "FacebookMarketplace") -> None: assert self.browser is not None self.page = self.create_page(swap_proxy=True) # Navigate to the URL, no timeout self.goto_url(self.initial_url) if self.logger: self.logger.debug("[Login] Checking for cookie consent pop-up...") try: allow_button_locator = self.page.get_by_role( "button", name=re.compile(r"Allow all cookies|Allow cookies|Accept All", re.IGNORECASE), ) if allow_button_locator.is_visible(): allow_button_locator.click() self.page.wait_for_timeout(2000) # 2 seconds if self.logger: self.logger.debug( f"""{hilight("[Login]", "succ")} Allow all cookies' button clicked.""" ) elif self.logger: self.logger.debug( f"{hilight('[Login]', 'succ')} Cookie consent pop-up not found or not visible within timeout." ) except Exception as e: if self.logger: self.logger.warning( f"{hilight('[Login]', 'fail')} Could not handle cookie pop-up (or it was not present): {e!s}" ) self.config: FacebookMarketplaceConfig try: if self.config.username: time.sleep(2) selector = self.page.wait_for_selector('input[name="email"]') if selector is not None: selector.type(self.config.username, delay=250) if self.config.password: time.sleep(2) selector = self.page.wait_for_selector('input[name="pass"]') if selector is not None: selector.type(self.config.password, delay=250) if self.config.username and self.config.password: time.sleep(2) selector = self.page.wait_for_selector('button[name="login"]') if selector is not None: selector.click() except KeyboardInterrupt: raise except Exception as e: if self.logger: self.logger.error(f"""{hilight("[Login]", "fail")} {e}""") # in case there is a need to enter additional information login_wait_time = ( 60 if self.config.login_wait_time is None else self.config.login_wait_time ) if login_wait_time > 0: if self.logger: self.logger.info( f"""{hilight("[Login]", "info")} Waiting {humanize.naturaldelta(login_wait_time)}""" + ( f""" or press {hilight("Esc")} when you are ready.""" if self.keyboard_monitor is not None else "" ) ) doze(login_wait_time, keyboard_monitor=self.keyboard_monitor)
[docs] def search( self: "FacebookMarketplace", item_config: FacebookItemConfig ) -> Generator[Listing, None, None]: if not self.page: self.login() assert self.page is not None options = [] condition = item_config.condition or self.config.condition if condition: options.append(f"itemCondition={'%2C'.join(condition)}") # availability can take values from item_config, or marketplace config and will # use the first or second value depending on how many times the item has been searched. if item_config.date_listed: date_listed = item_config.date_listed[0 if item_config.searched_count == 0 else -1] elif self.config.date_listed: date_listed = self.config.date_listed[0 if item_config.searched_count == 0 else -1] else: date_listed = DateListed.ANYTIME.value if date_listed is not None and date_listed != DateListed.ANYTIME.value: options.append(f"daysSinceListed={date_listed}") # delivery_method can take values from item_config, or marketplace config and will # use the first or second value depending on how many times the item has been searched. if item_config.delivery_method: delivery_method = item_config.delivery_method[ 0 if item_config.searched_count == 0 else -1 ] elif self.config.delivery_method: delivery_method = self.config.delivery_method[ 0 if item_config.searched_count == 0 else -1 ] else: delivery_method = DeliveryMethod.ALL.value if delivery_method is not None and delivery_method != DeliveryMethod.ALL.value: options.append(f"deliveryMethod={delivery_method}") # availability can take values from item_config, or marketplace config and will # use the first or second value depending on how many times the item has been searched. if item_config.availability: availability = item_config.availability[0 if item_config.searched_count == 0 else -1] elif self.config.availability: availability = self.config.availability[0 if item_config.searched_count == 0 else -1] else: availability = Availability.ALL.value if availability is not None and availability != Availability.ALL.value: options.append(f"availability={availability}") # search multiple keywords and cities # there is a small chance that search by different keywords and city will return the same items. found = {} search_city = item_config.search_city or self.config.search_city or [] city_name = item_config.city_name or self.config.city_name or [] radiuses = item_config.radius or self.config.radius currencies = item_config.currency or self.config.currency # this should not happen because `Config.validate_items` has checked this if not search_city: if self.logger: self.logger.error( f"""{hilight("[Search]", "fail")} No search city provided for {item_config.name}""" ) # increase the searched_count to differentiate first and subsequent searches item_config.searched_count += 1 for city, cname, radius, currency in zip( search_city, repeat(None) if city_name is None else city_name, repeat(None) if radiuses is None else radiuses, repeat(None) if currencies is None else currencies, ): marketplace_url = f"https://www.facebook.com/marketplace/{city}/search?" if radius: # avoid specifying radius more than once if options and options[-1].startswith("radius"): options.pop() options.append(f"radius={radius}") max_price = item_config.max_price or self.config.max_price if max_price: if max_price.isdigit(): options.append(f"maxPrice={max_price}") else: price, cur = max_price.split(" ", 1) if currency and cur != currency: c = CurrencyConverter() price = str(int(c.convert(int(price), cur, currency))) if self.logger: self.logger.debug( f"""{hilight("[Search]", "info")} Converting price {max_price} {cur} to {price} {currency}""" ) options.append(f"maxPrice={price}") min_price = item_config.min_price or self.config.min_price if min_price: if min_price.isdigit(): options.append(f"minPrice={min_price}") else: price, cur = min_price.split(" ", 1) if currency and cur != currency: c = CurrencyConverter() price = str(int(c.convert(int(price), cur, currency))) if self.logger: self.logger.debug( f"""{hilight("[Search]", "info")} Converting price {max_price} {cur} to {price} {currency}""" ) options.append(f"minPrice={price}") category = item_config.category or self.config.category if category: options.append(f"category={category}") if category == Category.FREE_STUFF.value or category == Category.FREE.value: # find min_price= and max_price= in options and remove them options = [ x for x in options if not x.startswith("minPrice=") and not x.startswith("maxPrice=") ] for search_phrase in item_config.search_phrases: if self.logger: self.logger.info( f"""{hilight("[Search]", "info")} Searching {item_config.marketplace} for """ f"""{hilight(item_config.name)} from {hilight(cname or city)}""" + (f" with radius={radius}" if radius else " with default radius") ) self.goto_url( marketplace_url + "&".join([f"query={quote(search_phrase)}", *options]) ) found_listings = FacebookSearchResultPage( self.page, self.translator, self.logger ).get_listings() time.sleep(5) if self.logger: self.logger.error( f"""{hilight("[Search]", "fail")} Failed to get search results for {search_phrase} from {city}""" ) counter.increment(CounterItem.SEARCH_PERFORMED, item_config.name) # go to each item and get the description # if we have not done that before for listing in found_listings: if listing.post_url.split("?")[0] in found: continue if self.keyboard_monitor is not None and self.keyboard_monitor.is_paused(): return counter.increment(CounterItem.LISTING_EXAMINED, item_config.name) found[listing.post_url.split("?")[0]] = True # filter by title and location; skip keyword filtering since we do not have description yet. if not self.check_listing(listing, item_config, description_available=False): counter.increment(CounterItem.EXCLUDED_LISTING, item_config.name) continue try: details, from_cache = self.get_listing_details( listing.post_url, item_config, price=listing.price, title=listing.title, ) if not from_cache: time.sleep(5) except KeyboardInterrupt: raise except Exception as e: if self.logger: self.logger.error( f"""{hilight("[Retrieve]", "fail")} Failed to get item details: {e}""" ) continue # currently we trust the other items from summary page a bit better # so we do not copy title, description etc from the detailed result for attr in ("condition", "seller", "description"): # other attributes should be consistent setattr(listing, attr, getattr(details, attr)) listing.name = item_config.name if self.logger: self.logger.debug( f"""{hilight("[Retrieve]", "succ")} New item "{listing.title}" from {listing.post_url} is sold by "{listing.seller}" and with description "{listing.description[:100]}..." """ ) # Warn if we never managed to extract a description for keyword-based filtering if ( (not listing.description or len(listing.description.strip()) == 0) and item_config.keywords and len(item_config.keywords) > 0 and self.logger ): self.logger.debug( f"""{hilight("[Error]", "fail")} Failed to extract description for {hilight(listing.title)} at {listing.post_url}. Keyword filtering will only apply to title.""" ) if self.check_listing(listing, item_config): yield listing else: counter.increment(CounterItem.EXCLUDED_LISTING, item_config.name)
[docs] def get_listing_details( self: "FacebookMarketplace", post_url: str, item_config: ItemConfig, price: str | None = None, title: str | None = None, ) -> Tuple[Listing, bool]: assert post_url.startswith("https://www.facebook.com") details = Listing.from_cache(post_url) if ( details is not None and (price is None or details.price == price) and (title is None or details.title == title) ): # if the price and title are the same, we assume everything else is unchanged. return details, True if not self.page: self.login() assert self.page is not None self.goto_url(post_url) counter.increment(CounterItem.LISTING_QUERY, item_config.name) details = parse_listing(self.page, post_url, self.translator, self.logger) if details is None: raise ValueError( f"Failed to get item details of listing {post_url}. " "The listing might be missing key information (e.g. seller) or not in English." "Please add option language to your marketplace configuration is the latter is the case. See https://github.com/BoPeng/ai-marketplace-monitor?tab=readme-ov-file#support-for-non-english-languages for details." ) details.to_cache(post_url) return details, False
[docs] def check_listing( self: "FacebookMarketplace", item: Listing, item_config: FacebookItemConfig, description_available: bool = True, ) -> bool: # get antikeywords from both item_config or config antikeywords = item_config.antikeywords if antikeywords and ( is_substring(antikeywords, item.title + " " + item.description, logger=self.logger) ): if self.logger: self.logger.info( f"""{hilight("[Skip]", "fail")} Exclude {hilight(item.title)} due to {hilight("excluded keywords", "fail")}: {", ".join(antikeywords)}""" ) return False # if the return description does not contain any of the search keywords keywords = item_config.keywords if ( description_available and keywords and not ( is_substring(keywords, item.title + " " + item.description, logger=self.logger) ) ): if self.logger: self.logger.info( f"""{hilight("[Skip]", "fail")} Exclude {hilight(item.title)} {hilight("without required keywords", "fail")} in title and description.""" ) return False # get locations from either marketplace config or item config if item_config.seller_locations is not None: allowed_locations = item_config.seller_locations else: allowed_locations = self.config.seller_locations or [] if allowed_locations and not is_substring( allowed_locations, item.location, logger=self.logger ): if self.logger: self.logger.info( f"""{hilight("[Skip]", "fail")} Exclude {hilight("out of area", "fail")} item {hilight(item.title)} from location {hilight(item.location)}""" ) return False # get exclude_sellers from both item_config or config if item_config.exclude_sellers is not None: exclude_sellers = item_config.exclude_sellers else: exclude_sellers = self.config.exclude_sellers or [] if ( item.seller and exclude_sellers and is_substring(exclude_sellers, item.seller, logger=self.logger) ): if self.logger: self.logger.info( f"""{hilight("[Skip]", "fail")} Exclude {hilight(item.title)} sold by {hilight("banned seller", "failed")} {hilight(item.seller)}""" ) return False return True
[docs] class FacebookSearchResultPage(WebPage): def _get_listings_elements_by_children_counts(self: "FacebookSearchResultPage"): parent: ElementHandle | None = self.page.locator("img").first.element_handle() # look for parent of parent until it has more than 10 children children = [] while parent: children = parent.query_selector_all(":scope > *") if len(children) > 10: break parent = parent.query_selector("xpath=..") # find each listing valid_listings = [] try: for listing in children: if not listing.text_content(): continue valid_listings.append(listing) except Exception as e: # this error should be tolerated if self.logger: self.logger.debug( f"{hilight('[Retrieve]', 'fail')} Some grid item cannot be read: {e}" ) return valid_listings def _get_listing_elements_by_traversing_header(self: "FacebookSearchResultPage"): heading = self.page.locator( f'[aria-label="{self.translator("Collection of Marketplace items")}"]' ) if not heading: return [] grid_items = heading.locator( ":scope > :first-child > :first-child > :nth-child(3) > :first-child > :nth-child(2) > div" ) # find each listing valid_listings = [] try: for listing in grid_items.all(): if not listing.text_content(): continue valid_listings.append(listing.element_handle()) except Exception as e: # this error should be tolerated if self.logger: self.logger.debug( f"{hilight('[Retrieve]', 'fail')} Some grid item cannot be read: {e}" ) return valid_listings
[docs] def get_listings(self: "FacebookSearchResultPage") -> List[Listing]: # if no result is found btn = self.page.locator(f"""span:has-text('{self.translator("Browse Marketplace")}')""") if btn.count() > 0: if self.logger: msg = self._parent_with_cond( btn.first, lambda x: len(x) == 3 and self.translator("Browse Marketplace") in (x[-1].text_content() or ""), 1, ) self.logger.info(f"{hilight('[Retrieve]', 'dim')} {msg}") return [] # find the grid box try: valid_listings = ( self._get_listing_elements_by_traversing_header() or self._get_listings_elements_by_children_counts() ) except KeyboardInterrupt: raise except Exception as e: filename = datetime.datetime.now().strftime("debug_%Y%m%d_%H%M%S.html") if self.logger: self.logger.error( f"{hilight('[Retrieve]', 'fail')} failed to parse searching result. Page saved to {filename}: {e}" ) with open(filename, "w", encoding="utf-8") as f: f.write(self.page.content()) return [] listings: List[Listing] = [] for idx, listing in enumerate(valid_listings): try: atag = listing.query_selector( ":scope > :first-child > :first-child > :first-child > :first-child > :first-child > :first-child > :first-child > :first-child" ) if not atag: continue post_url = atag.get_attribute("href") or "" details_divs = atag.query_selector_all(":scope > :first-child > div") if not details_divs: continue details = details_divs[1] divs = details.query_selector_all(":scope > div") raw_price = "" if len(divs) < 1 else divs[0].text_content() or "" title = "" if len(divs) < 2 else divs[1].text_content() or "" # location can be empty in some rare cases location = "" if len(divs) < 3 else (divs[2].text_content() or "") # get image img = listing.query_selector("img") image = img.get_attribute("src") if img else "" price = extract_price(raw_price) if post_url.startswith("/"): post_url = f"https://www.facebook.com{post_url}" if image.startswith("/"): image = f"https://www.facebook.com{image}" listings.append( Listing( marketplace="facebook", name="", id=post_url.split("?")[0].rstrip("/").split("/")[-1], title=title, image=image, price=price, # all the ?referral_code&referral_sotry_type etc # could be helpful for live navigation, but will be stripped # for caching item details. post_url=post_url, location=location, condition="", seller="", description="", ) ) except KeyboardInterrupt: raise except Exception as e: if self.logger: self.logger.error( f"{hilight('[Retrieve]', 'fail')} Failed to parse search results {idx + 1} listing: {e}" ) continue return listings
[docs] class FacebookItemPage(WebPage):
[docs] def verify_layout(self: "FacebookItemPage") -> bool: return True
[docs] def get_title(self: "FacebookItemPage") -> str: raise NotImplementedError("get_title is not implemented for this page")
[docs] def get_price(self: "FacebookItemPage") -> str: raise NotImplementedError("get_price is not implemented for this page")
[docs] def get_image_url(self: "FacebookItemPage") -> str: raise NotImplementedError("get_image_url is not implemented for this page")
[docs] def get_seller(self: "FacebookItemPage") -> str: raise NotImplementedError("get_seller is not implemented for this page")
[docs] def get_description(self: "FacebookItemPage") -> str: raise NotImplementedError("get_description is not implemented for this page")
[docs] def get_location(self: "FacebookItemPage") -> str: raise NotImplementedError("get_location is not implemented for this page")
[docs] def get_condition(self: "FacebookItemPage") -> str: raise NotImplementedError("get_condition is not implemented for this page")
def _expand_see_more(self: "FacebookItemPage") -> None: """Click any 'See more' disclosure links to expand truncated descriptions.""" try: see_more_buttons = self.page.locator( f'div[role="button"]:has(span:text("{self.translator("See more")}"))' ) # wait briefly for "See more" buttons to appear in the DOM see_more_buttons.first.wait_for(state="visible", timeout=8000) for i in range(see_more_buttons.count()): see_more_buttons.nth(i).click(timeout=2000) # allow the DOM to update after clicking self.page.wait_for_timeout(500) except KeyboardInterrupt: raise except Exception as e: if self.logger: self.logger.debug(f"{hilight('[Retrieve]', 'fail')} See more expansion: {e}")
[docs] def parse(self: "FacebookItemPage", post_url: str) -> Listing: if not self.verify_layout(): raise ValueError("Layout mismatch") # expand any truncated description sections before extracting text self._expand_see_more() # title title = self.get_title() price = self.get_price() description = self.get_description() # strip disclosure button text left over after expanding "See more" for label in (self.translator("See more"), self.translator("See less")): description = description.replace(label, "").strip() if not title or not price or not description: raise ValueError(f"Failed to parse {post_url}") if self.logger: self.logger.info(f"{hilight('[Retrieve]', 'succ')} Parsing {hilight(title)}") res = Listing( marketplace="facebook", name="", id=post_url.split("?")[0].rstrip("/").split("/")[-1], title=title, image=self.get_image_url(), price=extract_price(price), post_url=post_url, location=self.get_location(), condition=self.get_condition(), description=description, seller=self.get_seller(), ) if self.logger: self.logger.debug(f"{hilight('[Retrieve]', 'succ')} {pretty_repr(res)}") return cast(Listing, res)
[docs] class FacebookRegularItemPage(FacebookItemPage):
[docs] def verify_layout(self: "FacebookRegularItemPage") -> bool: return any( self.translator("Condition") in (x.text_content() or "") for x in self.page.query_selector_all("li") )
[docs] def get_title(self: "FacebookRegularItemPage") -> str: try: h1_element = self.page.query_selector_all("h1")[-1] return h1_element.text_content() or self.translator("**unspecified**") except KeyboardInterrupt: raise except Exception as e: if self.logger: self.logger.debug(f"{hilight('[Retrieve]', 'fail')} {e}") return ""
[docs] def get_price(self: "FacebookRegularItemPage") -> str: try: price_element = self.page.locator("h1 + *") return price_element.text_content() or self.translator("**unspecified**") except KeyboardInterrupt: raise except Exception as e: if self.logger: self.logger.debug(f"{hilight('[Retrieve]', 'fail')} {e}") return ""
[docs] def get_image_url(self: "FacebookRegularItemPage") -> str: try: image_url = self.page.locator("img").first.get_attribute("src") or "" return image_url except KeyboardInterrupt: raise except Exception as e: if self.logger: self.logger.debug(f"{hilight('[Retrieve]', 'fail')} {e}") return ""
[docs] def get_seller(self: "FacebookRegularItemPage") -> str: try: seller_locator = self.page.locator("//a[contains(@href, '/marketplace/profile')]") if seller_locator.count() == 0: # Try an alternative pattern — Facebook sometimes uses # different link structures for the seller name. seller_locator = self.page.locator("//a[contains(@href, '/profile')]") if seller_locator.count() == 0: return self.translator("**unspecified**") # Use a short timeout to avoid a 30s delay when seller data is not # present (e.g. in anonymous/not-logged-in mode). See #289. return seller_locator.last.text_content(timeout=3000) or self.translator( "**unspecified**" ) except KeyboardInterrupt: raise except Exception as e: if self.logger: self.logger.debug( f"{hilight('[Retrieve]', 'fail')} get_seller failed: {type(e).__name__}: {e}" ) return self.translator("**unspecified**")
[docs] def get_description(self: "FacebookRegularItemPage") -> str: try: # Find the span with text "condition", then parent, then next... description_element = self.page.locator( f'span:text("{self.translator("Condition")}") >> xpath=ancestor::ul[1] >> xpath=following-sibling::*[1]' ) return description_element.text_content() or self.translator("**unspecified**") except KeyboardInterrupt: raise except Exception as e: if self.logger: self.logger.debug(f"{hilight('[Retrieve]', 'fail')} {e}") return ""
[docs] def get_condition(self: "FacebookRegularItemPage") -> str: try: if self.logger: self.logger.debug(f"{hilight('[Debug]', 'info')} Getting condition info...") # Find the span with text "condition", then parent, then next... condition_text = self.translator("Condition") # Use .first property to avoid strict mode violation when multiple elements match # This handles cases where "Condition" appears in both the label and description text condition_locator = self.page.locator(f'span:text("{condition_text}")') condition_element = condition_locator.first result = self._parent_with_cond( condition_element, lambda x: len(x) >= 2 and self.translator("Condition") in (x[0].text_content() or ""), 1, ) return result except KeyboardInterrupt: raise except Exception as e: if self.logger: self.logger.error( f"{hilight('[Error]', 'fail')} get_condition failed: {type(e).__name__}: {e}" ) return ""
[docs] def get_location(self: "FacebookRegularItemPage") -> str: try: # look for "Location is approximate", then find its neighbor approximate_element = self.page.locator( f'span:text("{self.translator("Location is approximate")}")' ) return self._parent_with_cond( approximate_element, lambda x: len(x) == 2 and self.translator("Location is approximate") in (x[1].text_content() or ""), 0, ) except KeyboardInterrupt: raise except Exception as e: if self.logger: self.logger.debug(f"{hilight('[Retrieve]', 'fail')} {e}") return ""
[docs] class FacebookRentalItemPage(FacebookRegularItemPage):
[docs] def verify_layout(self: "FacebookRentalItemPage") -> bool: # there is a header h2 with text Description return any( self.translator("Description") in (x.text_content() or "") for x in self.page.query_selector_all("h2") )
[docs] def get_description(self: "FacebookRentalItemPage") -> str: # some pages do not have a condition box and appears to have a "Description" header # See https://github.com/BoPeng/ai-marketplace-monitor/issues/29 for details. try: description_header = self.page.query_selector( f'h2:has(span:text("{self.translator("Description")}"))' ) return self._parent_with_cond( description_header, lambda x: len(x) > 1 and x[0].text_content() == self.translator("Description"), 1, ) except KeyboardInterrupt: raise except Exception as e: if self.logger: self.logger.debug(f"{hilight('[Retrieve]', 'fail')} {e}") return ""
[docs] def get_condition(self: "FacebookRentalItemPage") -> str: # no condition information for rental items return self.translator("**unspecified**")
_VEHICLE_EMOJI_PATTERNS = [ ("Driven", "🚗"), ("transmission", "⚙️"), ("color", "🎨"), ("safety rating", "⭐"), ("NHTSA", "⭐"), ("Fuel type", "⛽"), ("MPG", "⛽"), ("owner", "👤"), ("paid off", "💰"), ("Clean title", "✅"), ("no significant damage", "✅"), ("Salvage", "⚠️"), ("accident", "⚠️"), ] def _add_vehicle_emojis(text: str) -> str: """Prepend emoji indicators to known vehicle attribute lines.""" lines = text.split("\n") result = [] for line in lines: stripped = line.strip() if not stripped: continue emoji = "" for pattern, icon in _VEHICLE_EMOJI_PATTERNS: if pattern.lower() in stripped.lower(): emoji = icon + " " break result.append(emoji + stripped) return "\n".join(result)
[docs] class FacebookAutoItemWithAboutAndDescriptionPage(FacebookRegularItemPage): def _has_about_this_vehicle(self: "FacebookAutoItemWithAboutAndDescriptionPage") -> bool: return any( self.translator("About this vehicle") in (x.text_content() or "") for x in self.page.query_selector_all("h2") ) def _has_seller_description(self: "FacebookAutoItemWithAboutAndDescriptionPage") -> bool: return any( self.translator("Seller's description") in (x.text_content() or "") for x in self.page.query_selector_all("h2") ) def _get_about_this_vehicle(self: "FacebookAutoItemWithAboutAndDescriptionPage") -> str: try: about_element = self.page.locator( f'h2:has(span:text("{self.translator("About this vehicle")}"))' ) return self._parent_with_cond( # start from About this vehicle about_element, # find an array of elements with the first one being "About this vehicle" # and the second child has actual content (not just whitespace) lambda x: len(x) > 1 and self.translator("About this vehicle") in (x[0].text_content() or "") and (x[1].text_content() or "").replace("\xa0", "").strip(), # Extract all texts, using inner_text to preserve line breaks, and add emojis lambda x: _add_vehicle_emojis( "\n".join([child.inner_text() or "" for child in x]) ), ) except KeyboardInterrupt: raise except Exception as e: if self.logger: self.logger.debug(f"{hilight('[Retrieve]', 'fail')} {e}") return "" def _get_seller_description(self: "FacebookAutoItemWithAboutAndDescriptionPage") -> str: try: description_header = self.page.query_selector( f"""h2:has(span:text("{self.translator("Seller's description")}"))""" ) return self._parent_with_cond( # start from the description header description_header, # find an array of elements with the first one being "Seller's description" # and the second child has actual content (not just whitespace) lambda x: len(x) > 1 and self.translator("Seller's description") in (x[0].text_content() or "") and (x[1].text_content() or "").replace("\xa0", "").strip(), # then, drill down from the second child lambda x: self._children_with_cond( x[1], # find the an array of elements lambda y: len(y) > 1, # and return the texts. lambda y: f"""\n\n{self.translator("Seller's description")}\n\n{y[0].text_content() or self.translator("**unspecified**")}""", ), ) except KeyboardInterrupt: raise except Exception as e: if self.logger: self.logger.debug(f"{hilight('[Retrieve]', 'fail')} {e}") return ""
[docs] def verify_layout(self: "FacebookAutoItemWithAboutAndDescriptionPage") -> bool: # there is a header h2 with text "About this vehicle" return self._has_about_this_vehicle() and self._has_seller_description()
[docs] def get_description(self: "FacebookAutoItemWithAboutAndDescriptionPage") -> str: return self._get_about_this_vehicle() + self._get_seller_description()
[docs] def get_price(self: "FacebookAutoItemWithAboutAndDescriptionPage") -> str: description = self.get_description() # using regular expression to find text that looks like price in the description price_pattern = r"\$\d{1,3}(?:,\d{3})*(?:\.\d{2})?(?:,\d{2})?" match = re.search(price_pattern, description) return match.group(0) if match else self.translator("**unspecified**")
[docs] def get_condition(self: "FacebookAutoItemWithAboutAndDescriptionPage") -> str: # no condition information for auto items return self.translator("**unspecified**")
[docs] class FacebookAutoItemWithDescriptionPage(FacebookAutoItemWithAboutAndDescriptionPage):
[docs] def verify_layout(self: "FacebookAutoItemWithDescriptionPage") -> bool: return self._has_seller_description() and not self._has_about_this_vehicle()
[docs] def get_description(self: "FacebookAutoItemWithDescriptionPage") -> str: try: description_header = self.page.query_selector( f"""h2:has(span:text("{self.translator("Seller's description")}"))""" ) return self._parent_with_cond( # start from the description header description_header, # find an array of elements with the first one being "Seller's description" # and the second child has actual content (not just whitespace) lambda x: len(x) > 1 and self.translator("Seller's description") in (x[0].text_content() or "") and (x[1].text_content() or "").replace("\xa0", "").strip(), # then, drill down from the second child lambda x: self._children_with_cond( x[1], # find the an array of elements lambda y: len(y) > 2, # and return the texts. lambda y: f"""\n\n{self.translator("Seller's description")}\n\n{y[1].text_content() or self.translator("**unspecified**")}""", ), ) except KeyboardInterrupt: raise except Exception as e: if self.logger: self.logger.debug(f"{hilight('[Retrieve]', 'fail')} {e}") return ""
[docs] def get_condition(self: "FacebookAutoItemWithDescriptionPage") -> str: try: description_header = self.page.query_selector( f"""h2:has(span:text("{self.translator("Seller's description")}"))""" ) res = self._parent_with_cond( # start from the description header description_header, # find an array of elements with the first one being "Seller's description" # and the second child has actual content (not just whitespace) lambda x: len(x) > 1 and self.translator("Seller's description") in (x[0].text_content() or "") and (x[1].text_content() or "").replace("\xa0", "").strip(), # then, drill down from the second child lambda x: self._children_with_cond( x[1], # find the an array of elements lambda y: len(y) > 2, # and return the texts after seller's description. lambda y: y[0].text_content() or self.translator("**unspecified**"), ), ) if res.startswith(self.translator("Condition")): res = res[len(self.translator("Condition")) :] return res.strip() except KeyboardInterrupt: raise except Exception as e: if self.logger: self.logger.debug(f"{hilight('[Retrieve]', 'fail')} {e}") return ""
[docs] def get_price(self: "FacebookAutoItemWithDescriptionPage") -> str: # for this page, price is after header try: h1_element = self.page.query_selector_all("h1")[-1] header = h1_element.text_content() return self._parent_with_cond( # start from the header h1_element, # find an array of elements with the first one being "Seller's description" lambda x: len(x) > 1 and header in (x[0].text_content() or ""), # then, find the element after header 1, ) except KeyboardInterrupt: raise except Exception as e: if self.logger: self.logger.debug(f"{hilight('[Retrieve]', 'fail')} {e}") return ""
[docs] def parse_listing( page: Page, post_url: str, translator: Translator | None = None, logger: Logger | None = None ) -> Listing | None: supported_facebook_item_layouts = [ FacebookRentalItemPage, FacebookAutoItemWithAboutAndDescriptionPage, FacebookAutoItemWithDescriptionPage, FacebookRegularItemPage, ] for page_model in supported_facebook_item_layouts: try: return page_model(page, translator, logger).parse(post_url) except KeyboardInterrupt: raise except Exception: # try next page ayout continue return None