import smtplib
import ssl
import time
from dataclasses import dataclass
from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formataddr
from logging import Logger
from pathlib import Path
from typing import ClassVar, List, Tuple
import inflect
from jinja2 import Environment, FileSystemLoader, select_autoescape
from markupsafe import Markup, escape
from .ai import AIResponse # type: ignore
from .listing import Listing
from .notification import NotificationConfig, NotificationStatus
from .utils import fetch_with_retry, hilight, resize_image_data
[docs]
@dataclass
class EmailNotificationConfig(NotificationConfig):
notify_method = "email"
required_fields: ClassVar[List[str]] = ["email", "smtp_password"]
email: List[str] | None = None
smtp_server: str | None = None
smtp_port: int | None = None
smtp_username: str | None = None
smtp_password: str | None = None
smtp_from: str | None = None
[docs]
def handle_email(self: "EmailNotificationConfig") -> None:
if self.email is None:
return
if isinstance(self.email, str):
self.email = [self.email]
if not isinstance(self.email, list) or not all(
(isinstance(x, str) and "@" in x and "." in x.split("@")[1]) for x in self.email
):
raise ValueError(
f"Item {hilight(self.name)} email must be a string or list of string."
)
[docs]
def handle_smtp_server(self: "EmailNotificationConfig") -> None:
if self.smtp_server is None:
return
if not isinstance(self.smtp_server, str):
raise ValueError("user requires a string smtp_server.")
self.smtp_server = self.smtp_server.strip()
[docs]
def handle_smtp_port(self: "EmailNotificationConfig") -> None:
if self.smtp_port is None:
return
if not isinstance(self.smtp_port, int):
raise ValueError("user requires an integer smtp_port.")
if self.smtp_port < 1 or self.smtp_port > 65535:
raise ValueError("user requires an integer smtp_port between 1 and 65535.")
[docs]
def handle_smtp_username(self: "EmailNotificationConfig") -> None:
if self.smtp_username is None:
return
# smtp_username should be a string
if not isinstance(self.smtp_username, str) or not self.smtp_username:
raise ValueError("A non-empty value is requires for option smtp_username.")
self.smtp_username = self.smtp_username.strip()
[docs]
def handle_smtp_password(self: "EmailNotificationConfig") -> None:
if self.smtp_password is None:
return
# smtp_password should be a string
if not isinstance(self.smtp_password, str) or not self.smtp_password:
raise ValueError("A non-empty value is is required for option smtp_password.")
self.smtp_password = self.smtp_password.strip()
[docs]
def handle_smtp_from(self: "EmailNotificationConfig") -> None:
if self.smtp_from is None:
return
# smtp_from should be a string
if not isinstance(self.smtp_from, str):
raise ValueError("user requires a string smtp_from.")
self.smtp_from = self.smtp_from.strip()
[docs]
def get_title(
self: "EmailNotificationConfig",
listings: List[Listing],
notification_status: List[NotificationStatus],
force: bool = False,
) -> str:
p = inflect.engine()
n_new = len([x for x in notification_status if x == NotificationStatus.NOT_NOTIFIED])
n_notified = len([x for x in notification_status if x == NotificationStatus.NOTIFIED])
n_expired = len([x for x in notification_status if x == NotificationStatus.EXPIRED])
n_updated = len(
[x for x in notification_status if x == NotificationStatus.LISTING_CHANGED]
)
n_discounted = len(
[x for x in notification_status if x == NotificationStatus.LISTING_DISCOUNTED]
)
title = "Found "
cnts = []
if n_new > 0:
cnts.append(f"{n_new} new ")
if n_updated > 0:
cnts.append(f"{n_updated} updated ")
if n_discounted > 0:
cnts.append(f"{n_discounted} discounted ")
if n_expired > 0 or (force and n_notified > 0):
cnts.append(f"{n_expired + (n_notified if force else 0)} revisitable ")
if len(cnts) > 1:
cnts[-1] = f"and {cnts[-1]}"
elif len(cnts) == 0:
# no new items
return ""
title += " ".join(cnts)
title += f"{listings[0].name} {p.plural_noun('listing', len(listings) - (0 if force else n_notified))} from {listings[0].marketplace}"
return title
[docs]
def get_text_message(
self: "EmailNotificationConfig",
listings: List[Listing],
ratings: List[AIResponse],
notification_status: List[NotificationStatus],
force: bool = False,
logger: Logger | None = None,
) -> str:
messages = []
for listing, rating, ns in zip(listings, ratings, notification_status):
prefix = ""
if ns == NotificationStatus.NOTIFIED:
if force:
prefix = "[NOTIFIED] "
else:
continue
if ns == NotificationStatus.EXPIRED:
prefix = "[REMINDER] "
elif ns == NotificationStatus.LISTING_CHANGED:
prefix = "[lISTING UPDATED] "
elif ns == NotificationStatus.LISTING_DISCOUNTED:
prefix = "[lISTING DISCOUNTED] "
messages.append(
(
f"{prefix}{listing.title}\n{listing.price}, {listing.location}\n"
f"{listing.post_url.split('?')[0]}"
)
if rating.comment == AIResponse.NOT_EVALUATED
else (
f"{prefix} [{rating.conclusion} ({rating.score})] {listing.title}\n"
f"{listing.price}, {listing.location}\n"
f"{listing.post_url.split('?')[0]}\n"
f"\nAI: {rating.comment}"
)
)
message = "\n\n".join(messages)
return message
[docs]
def get_html_message(
self: "EmailNotificationConfig",
listings: List[Listing],
ratings: List[AIResponse],
notification_status: List[NotificationStatus],
force: bool = False,
logger: Logger | None = None,
) -> Tuple[str, list[Tuple[bytes, str, str]]]: # Return HTML and image data
template_dir = Path(__file__).parent
# Set up Jinja2 environment
env = Environment(
loader=FileSystemLoader(template_dir), autoescape=select_autoescape(["html", "xml"])
)
# Add custom filter for hashing
env.filters["hash"] = hash
def bold_headers(text: str) -> Markup:
"""Escape text then bold known section headers."""
safe_text = escape(text)
for header in ("About this vehicle", "Seller's description", "Description"):
bold = Markup(f"<b>{header}</b>") # noqa: S704 — header is a literal
safe_text = safe_text.replace(escape(header), bold)
return Markup(safe_text) # noqa: S704 — safe_text was escaped above
env.filters["bold_headers"] = bold_headers
# Load template
template = env.get_template("email.html.j2")
# Prepare images list for attachments
images = []
valid_image_hashes = set() # Track which images were successfully processed
# Process images first
for listing in listings:
if listing.image:
result = fetch_with_retry(listing.image, logger=logger)
if result:
image_data, content_type = result
image_data = resize_image_data(image_data)
if image_data and len(image_data) <= 1024 * 1024:
image_hash = hash(listing.image)
images.append((image_data, content_type, f"image_{image_hash}"))
valid_image_hashes.add(image_hash) # Track valid image
else:
if logger:
logger.debug(f"Image too large: {len(image_data)} bytes, skipped.")
else:
if logger:
logger.debug(f"Failed to fetch image: {listing.image}")
# Render template
html = template.render(
listings=zip(listings, ratings, notification_status),
force=force,
item_name=listings[0].name.capitalize(),
NotificationStatus=NotificationStatus, # Pass enum for comparison
valid_image_hashes=valid_image_hashes, # Pass set of valid image hashes
)
return html, images
[docs]
def notify(
self: "EmailNotificationConfig",
listings: List[Listing],
ratings: List[AIResponse],
notification_status: List[NotificationStatus],
force: bool = False,
logger: Logger | None = None,
) -> bool:
if not self._has_required_fields():
if logger:
logger.debug(
f"Missing required fields {', '.join(self.required_fields)}. No {self.notify_method} notification sent."
)
return False
title = self.get_title(listings, notification_status, force=force)
if not title:
if logger:
logger.debug("No new listings. No email sent.")
return False
message = self.get_text_message(
listings, ratings, notification_status, force, logger=logger
)
html_message, images = self.get_html_message(
listings, ratings, notification_status, force, logger=logger
)
return self.send_email_message(title, message, html_message, images, logger=logger)
[docs]
def send_email_message(
self: "EmailNotificationConfig",
title: str,
message: str,
html: str,
images: List[Tuple[bytes, str, str]],
logger: Logger | None = None,
) -> bool:
if not self.email:
if logger:
logger.debug("No recipients specified. No email sent.")
return False
sender = self.smtp_from or self.smtp_username or self.email[0]
if self.smtp_server:
smtp_server = self.smtp_server
else:
smtp_server = f"""smtp.{sender.split("@")[1]}"""
# s.starttls()
msg = MIMEMultipart("related")
msg["Subject"] = title
# can use the humanized version of self.name as well
msg["From"] = formataddr(("AI Marketplace Monitor", sender))
msg["To"] = ", ".join(self.email)
# Create alternative part
alt_part = MIMEMultipart("alternative")
msg.attach(alt_part)
alt_part.attach(MIMEText(message, "plain"))
alt_part.attach(MIMEText(html, "html")) # HTML part last = preferred
# Attach images
for image_data, _, cid in images:
image = MIMEImage(image_data)
image.add_header("Content-ID", f"<{cid}>")
image.add_header("Content-Disposition", "inline")
msg.attach(image)
for attempt in range(self.max_retries):
try:
smtp_port = self.smtp_port or 587
smtp_username = self.smtp_username or sender
if not smtp_username:
if logger:
logger.error("No smtp username.")
return False
smtp_password = self.smtp_password
if not smtp_password:
if logger:
logger.error("No smtp password.")
return False
context = ssl.create_default_context()
with smtplib.SMTP(smtp_server, smtp_port) as smtp:
# smtp.set_debuglevel(1)
smtp.ehlo() # Can be omitted
smtp.starttls(context=context)
smtp.ehlo() # Can be omitted
try:
smtp.login(smtp_username, smtp_password)
except KeyboardInterrupt:
raise
except Exception as e:
if logger:
logger.error(
f"Failed to login to smtp server {smtp_server}:{smtp_port} with username {smtp_username}: {e}"
)
return False
smtp.send_message(msg)
if logger:
logger.info(
f"""{hilight("[Notify]", "succ")} Sent {self.name} an email with title {hilight(title)}"""
)
return True
except KeyboardInterrupt:
raise
except Exception as e:
if logger:
logger.debug(
f"""{hilight("[Notify]", "fail")} Attempt {attempt + 1} failed: {e}"""
)
if attempt < self.max_retries - 1:
if logger:
logger.debug(
f"""{hilight("[Notify]", "fail")} Retrying in {self.retry_delay} seconds..."""
)
time.sleep(self.retry_delay)
else:
if logger:
logger.error(
f"""{hilight("[Notify]", "fail")} Max retries reached. Failed to push note to {self.name}."""
)
return False
return False