import re
import time
from dataclasses import asdict, dataclass, field
from enum import Enum
from logging import Logger
from typing import Any, ClassVar, Generic, Optional, Type, TypeVar
from diskcache import Cache # type: ignore
from openai import OpenAI # type: ignore
from rich.pretty import pretty_repr
from .listing import Listing
from .marketplace import TItemConfig, TMarketplaceConfig
from .utils import BaseConfig, CacheType, CounterItem, cache, counter, hilight
[docs]
class AIServiceProvider(Enum):
OPENAI = "OpenAI"
DEEPSEEK = "DeepSeek"
ANTHROPIC = "Anthropic"
OLLAMA = "Ollama"
[docs]
@dataclass
class AIResponse:
score: int
comment: str
name: str = ""
NOT_EVALUATED: ClassVar = "Not evaluated by AI"
@property
def conclusion(self: "AIResponse") -> str:
return {
1: "No match",
2: "Potential match",
3: "Poor match",
4: "Good match",
5: "Great deal",
}[self.score]
@property
def style(self: "AIResponse") -> str:
if self.comment == self.NOT_EVALUATED:
return "dim"
if self.score < 3:
return "fail"
if self.score > 3:
return "succ"
return "name"
@property
def stars(self: "AIResponse") -> str:
full_stars = self.score
empty_stars = 5 - full_stars
return (
'<span style="color: #FFD700; font-size: 20px;">★</span>' * full_stars
+ '<span style="color: #D3D3D3; font-size: 20px;">☆</span>' * empty_stars
)
[docs]
@classmethod
def from_cache(
cls: Type["AIResponse"],
listing: Listing,
item_config: TItemConfig,
marketplace_config: TMarketplaceConfig,
local_cache: Cache | None = None,
) -> Optional["AIResponse"]:
res = (cache if local_cache is None else local_cache).get(
(CacheType.AI_INQUIRY.value, item_config.hash, marketplace_config.hash, listing.hash)
)
if res is None:
return None
return AIResponse(**res)
[docs]
def to_cache(
self: "AIResponse",
listing: Listing,
item_config: TItemConfig,
marketplace_config: TMarketplaceConfig,
local_cache: Cache | None = None,
) -> None:
(cache if local_cache is None else local_cache).set(
(CacheType.AI_INQUIRY.value, item_config.hash, marketplace_config.hash, listing.hash),
asdict(self),
tag=CacheType.AI_INQUIRY.value,
)
[docs]
@dataclass
class AIConfig(BaseConfig):
# this argument is required
api_key: str | None = None
provider: str | None = None
model: str | None = None
base_url: str | None = None
max_retries: int = 10
timeout: int | None = None
[docs]
def handle_provider(self: "AIConfig") -> None:
if self.provider is None:
return
if self.provider.lower() not in [x.value.lower() for x in AIServiceProvider]:
raise ValueError(
f"""AIConfig requires a valid service provider. Valid providers are {hilight(", ".join([x.value for x in AIServiceProvider]))}"""
)
[docs]
def handle_api_key(self: "AIConfig") -> None:
if self.api_key is None:
return
if not isinstance(self.api_key, str):
raise ValueError("AIConfig requires a string api_key.")
self.api_key = self.api_key.strip()
[docs]
def handle_max_retries(self: "AIConfig") -> None:
if not isinstance(self.max_retries, int) or self.max_retries < 0:
raise ValueError("AIConfig requires a positive integer max_retries.")
[docs]
def handle_timeout(self: "AIConfig") -> None:
if self.timeout is None:
return
if not isinstance(self.timeout, int) or self.timeout < 0:
raise ValueError("AIConfig requires a positive integer timeout.")
[docs]
@dataclass
class OpenAIConfig(AIConfig):
[docs]
def handle_api_key(self: "OpenAIConfig") -> None:
if self.api_key is None:
raise ValueError("OpenAI requires a string api_key.")
[docs]
@dataclass
class DeekSeekConfig(OpenAIConfig):
pass
[docs]
@dataclass
class OllamaConfig(OpenAIConfig):
api_key: str | None = field(default="ollama") # required but not used.
[docs]
def handle_base_url(self: "OllamaConfig") -> None:
if self.base_url is None:
raise ValueError("Ollama requires a string base_url.")
[docs]
def handle_model(self: "OllamaConfig") -> None:
if self.model is None:
raise ValueError("Ollama requires a string model.")
[docs]
@dataclass
class AnthropicConfig(AIConfig):
[docs]
def handle_api_key(self: "AnthropicConfig") -> None:
if self.api_key is None:
raise ValueError("Anthropic requires a string api_key.")
TAIConfig = TypeVar("TAIConfig", bound=AIConfig)
[docs]
class AIBackend(Generic[TAIConfig]):
def __init__(self: "AIBackend", config: AIConfig, logger: Logger | None = None) -> None:
self.config = config
self.logger = logger
self.client: Any = None
[docs]
@classmethod
def get_config(cls: Type["AIBackend"], **kwargs: Any) -> TAIConfig:
raise NotImplementedError("get_config method must be implemented by subclasses.")
[docs]
def connect(self: "AIBackend") -> None:
raise NotImplementedError("Connect method must be implemented by subclasses.")
[docs]
def get_prompt(
self: "AIBackend",
listing: Listing,
item_config: TItemConfig,
marketplace_config: TMarketplaceConfig,
) -> str:
prompt = (
f"""A user wants to buy a {item_config.name} from Facebook Marketplace. """
f"""Search phrases: "{'" and "'.join(item_config.search_phrases)}", """
)
if item_config.description:
prompt += f"""Description: "{item_config.description}", """
#
max_price = item_config.max_price or 0
min_price = item_config.min_price or 0
if max_price and min_price:
prompt += f"""Price range: {min_price} to {max_price}. """
elif max_price:
prompt += f"""Max price {max_price}. """
elif min_price:
prompt += f"""Min price {min_price}. """
#
if item_config.antikeywords:
prompt += f"""Exclude keywords "{'" and "'.join(item_config.antikeywords)}" in title or description."""
#
prompt += (
f"""\n\nThe user found a listing titled "{listing.title}" in {listing.condition} condition, """
f"""priced at {listing.price}, located in {listing.location}, """
f"""posted at {listing.post_url} with description "{listing.description}"\n\n"""
)
# prompt
if item_config.prompt is not None:
prompt += item_config.prompt
elif marketplace_config.prompt is not None:
prompt += marketplace_config.prompt
else:
prompt += (
"Evaluate how well this listing matches the user's criteria. Assess the description, MSRP, model year, "
"condition, and seller's credibility."
)
# extra_prompt
prompt += "\n"
if item_config.extra_prompt is not None:
prompt += f"\n{item_config.extra_prompt.strip()}\n"
elif marketplace_config.extra_prompt is not None:
prompt += f"\n{marketplace_config.extra_prompt.strip()}\n"
# rating_prompt
if item_config.rating_prompt is not None:
prompt += f"\n{item_config.rating_prompt.strip()}\n"
elif marketplace_config.rating_prompt is not None:
prompt += f"\n{marketplace_config.rating_prompt.strip()}\n"
else:
prompt += (
"\nRate from 1 to 5 based on the following: \n"
"1 - No match: Missing key details, wrong category/brand, or suspicious activity (e.g., external links).\n"
"2 - Potential match: Lacks essential info (e.g., condition, brand, or model); needs clarification.\n"
"3 - Poor match: Some mismatches or missing details; acceptable but not ideal.\n"
"4 - Good match: Mostly meets criteria with clear, relevant details.\n"
"5 - Great deal: Fully matches criteria, with excellent condition or price.\n"
"Conclude with:\n"
'"Rating <1-5>: <summary>"\n'
"where <1-5> is the rating and <summary> is a brief recommendation (max 30 words)."
)
if self.logger:
self.logger.debug(f"""{hilight("[AI-Prompt]", "info")} {prompt}""")
return prompt
[docs]
def evaluate(
self: "AIBackend",
listing: Listing,
item_config: TItemConfig,
marketplace_config: TMarketplaceConfig,
) -> AIResponse:
raise NotImplementedError("Confirm method must be implemented by subclasses.")
[docs]
class OpenAIBackend(AIBackend):
default_model = "gpt-4o"
# the default is f"https://api.openai.com/v1"
base_url: str | None = None
[docs]
@classmethod
def get_config(cls: Type["OpenAIBackend"], **kwargs: Any) -> OpenAIConfig:
return OpenAIConfig(**kwargs)
[docs]
def connect(self: "OpenAIBackend") -> None:
if self.client is None:
self.client = OpenAI(
api_key=self.config.api_key,
base_url=self.config.base_url or self.base_url,
timeout=self.config.timeout,
default_headers={
"X-Title": "AI Marketplace Monitor",
"HTTP-Referer": "https://github.com/BoPeng/ai-marketplace-monitor",
},
)
if self.logger:
self.logger.info(f"""{hilight("[AI]", "name")} {self.config.name} connected.""")
[docs]
def evaluate(
self: "OpenAIBackend",
listing: Listing,
item_config: TItemConfig,
marketplace_config: TMarketplaceConfig,
) -> AIResponse:
# ask openai to confirm the item is correct
counter.increment(CounterItem.AI_QUERY, item_config.name)
prompt = self.get_prompt(listing, item_config, marketplace_config)
res: AIResponse | None = AIResponse.from_cache(listing, item_config, marketplace_config)
if res is not None:
if self.logger:
self.logger.debug(
f"""{hilight("[AI]", res.style)} {self.config.name} previously concluded {hilight(f"{res.conclusion} ({res.score}): {res.comment}", res.style)} for listing {hilight(listing.title)}."""
)
return res
self.connect()
retries = 0
while retries < self.config.max_retries:
self.connect()
assert self.client is not None
try:
response = self.client.chat.completions.create(
model=self.config.model or self.default_model,
messages=[
{
"role": "system",
"content": "You are a helpful assistant that can confirm if a user's search criteria matches the item he is interested in.",
},
{"role": "user", "content": prompt},
],
stream=False,
)
break
except KeyboardInterrupt:
raise
except Exception as e:
if self.logger:
self.logger.error(
f"""{hilight("[AI-Error]", "fail")} {self.config.name} failed to evaluate {hilight(listing.title)}: {e}"""
)
retries += 1
# try to initiate a connection
self.client = None
time.sleep(5)
# check if the response is yes
if self.logger:
self.logger.debug(f"""{hilight("[AI-Response]", "info")} {pretty_repr(response)}""")
answer = response.choices[0].message.content or ""
if (
answer is None
or not answer.strip()
or re.search(r"Rating[^1-5]*[1-5]", answer, re.DOTALL) is None
):
counter.increment(CounterItem.FAILED_AI_QUERY, item_config.name)
raise ValueError(f"Empty or invalid response from {self.config.name}: {response}")
lines = answer.split("\n")
# if any of the lines contains "Rating: ", extract the rating from it.
score: int = 1
comment = ""
rating_line = None
for idx, line in enumerate(lines):
matched = re.match(r".*Rating[^1-5]*([1-5])[:\s]*(.*)", line)
if matched:
score = int(matched.group(1))
comment = matched.group(2).strip()
rating_line = idx
continue
if rating_line is not None:
# if the AI puts comment after Rating, we need to include them
comment += " " + line
# if the AI puts the rating at the end, let us try to use the line before the Rating line
if len(comment.strip()) < 5 and rating_line is not None and rating_line > 0:
comment = lines[rating_line - 1]
# remove multiple spaces, take first 30 words
comment = " ".join([x for x in comment.split() if x.strip()]).strip()
res = AIResponse(name=self.config.name, score=score, comment=comment)
res.to_cache(listing, item_config, marketplace_config)
counter.increment(CounterItem.NEW_AI_QUERY, item_config.name)
return res
[docs]
class DeepSeekBackend(OpenAIBackend):
default_model = "deepseek-chat"
base_url = "https://api.deepseek.com"
[docs]
@classmethod
def get_config(cls: Type["DeepSeekBackend"], **kwargs: Any) -> DeekSeekConfig:
return DeekSeekConfig(**kwargs)
[docs]
class OllamaBackend(OpenAIBackend):
default_model = "deepseek-r1:14b"
[docs]
@classmethod
def get_config(cls: Type["OllamaBackend"], **kwargs: Any) -> OllamaConfig:
return OllamaConfig(**kwargs)
[docs]
class AnthropicBackend(AIBackend):
default_model = "claude-sonnet-4-20250514"
[docs]
@classmethod
def get_config(cls: Type["AnthropicBackend"], **kwargs: Any) -> AnthropicConfig:
return AnthropicConfig(**kwargs)
[docs]
def connect(self: "AnthropicBackend") -> None:
if self.client is None:
import anthropic # type: ignore
self.client = anthropic.Anthropic(
api_key=self.config.api_key,
timeout=self.config.timeout,
)
if self.logger:
self.logger.info(f"""{hilight("[AI]", "name")} {self.config.name} connected.""")
[docs]
def evaluate(
self: "AnthropicBackend",
listing: Listing,
item_config: TItemConfig,
marketplace_config: TMarketplaceConfig,
) -> AIResponse:
counter.increment(CounterItem.AI_QUERY, item_config.name)
prompt = self.get_prompt(listing, item_config, marketplace_config)
res: AIResponse | None = AIResponse.from_cache(listing, item_config, marketplace_config)
if res is not None:
if self.logger:
self.logger.debug(
f"""{hilight("[AI]", res.style)} {self.config.name} previously concluded {hilight(f"{res.conclusion} ({res.score}): {res.comment}", res.style)} for listing {hilight(listing.title)}."""
)
return res
self.connect()
retries = 0
while retries < self.config.max_retries:
self.connect()
assert self.client is not None
try:
response = self.client.messages.create(
model=self.config.model or self.default_model,
max_tokens=1024,
system="You are a helpful assistant that can confirm if a user's search criteria matches the item he is interested in.",
messages=[
{"role": "user", "content": prompt},
],
)
break
except KeyboardInterrupt:
raise
except Exception as e:
if self.logger:
self.logger.error(
f"""{hilight("[AI-Error]", "fail")} {self.config.name} failed to evaluate {hilight(listing.title)}: {e}"""
)
retries += 1
self.client = None
time.sleep(5)
if self.logger:
self.logger.debug(f"""{hilight("[AI-Response]", "info")} {pretty_repr(response)}""")
answer = response.content[0].text if response.content else ""
if (
answer is None
or not answer.strip()
or re.search(r"Rating[^1-5]*[1-5]", answer, re.DOTALL) is None
):
counter.increment(CounterItem.FAILED_AI_QUERY, item_config.name)
raise ValueError(f"Empty or invalid response from {self.config.name}: {response}")
lines = answer.split("\n")
score: int = 1
comment = ""
rating_line = None
for idx, line in enumerate(lines):
matched = re.match(r".*Rating[^1-5]*([1-5])[:\s]*(.*)", line)
if matched:
score = int(matched.group(1))
comment = matched.group(2).strip()
rating_line = idx
continue
if rating_line is not None:
comment += " " + line
if len(comment.strip()) < 5 and rating_line is not None and rating_line > 0:
comment = lines[rating_line - 1]
comment = " ".join([x for x in comment.split() if x.strip()]).strip()
res = AIResponse(name=self.config.name, score=score, comment=comment)
res.to_cache(listing, item_config, marketplace_config)
counter.increment(CounterItem.NEW_AI_QUERY, item_config.name)
return res