diff --git a/src/memory/common/parsers/blogs.py b/src/memory/common/parsers/blogs.py
index 02ab59e..1fb0ad4 100644
--- a/src/memory/common/parsers/blogs.py
+++ b/src/memory/common/parsers/blogs.py
@@ -2,15 +2,17 @@ import logging
import re
from datetime import datetime
from urllib.parse import urlparse
+from typing import cast
-import requests
from bs4 import BeautifulSoup, Tag
+
from memory.common.parsers.html import (
BaseHTMLParser,
Article,
parse_date,
extract_title,
extract_date,
+ fetch_html,
)
@@ -618,48 +620,38 @@ def parse_webpage(url: str) -> Article:
Returns:
Article object with extracted content and metadata
"""
- response = requests.get(
- url,
- timeout=30,
- headers={
- "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:137.0) Gecko/20100101 Firefox/137.0"
- },
- )
- response.raise_for_status()
-
- parser = get_parser_for_url(url, response.text)
-
- return parser.parse(response.text, url)
+ html = cast(str, fetch_html(url))
+ parser = get_parser_for_url(url, html)
+ return parser.parse(html, url)
-blogs = [
- "https://acoup.blog/",
- "https://guzey.com/",
- "https://akarlin.com/",
- "https://aphyr.com/",
- "https://www.applieddivinitystudies.com/",
- "https://www.bitsaboutmoney.com/",
+feeds = [
+ "https://archive.ph/o/IQUoT/https://www.bloomberg.com/opinion/authors/ARbTQlRLRjE/matthew-s-levine",
+ "https://www.rifters.com/crawl/",
+ "https://rachelbythebay.com/w/atom.xml",
"https://danluu.com/",
- "https://mcfunley.com/",
- "https://www.exurbe.com/",
- "https://www.flyingmachinestudios.com/",
+ "https://guzey.com/archive",
+ "https://aphyr.com/posts.atom",
+ "https://www.applieddivinitystudies.com/atom.xml",
"https://www.imightbewrong.org/",
"https://www.kvetch.au/",
"https://www.overcomingbias.com/",
- "https://www.rifters.com/crawl/",
"https://samkriss.substack.com/",
- "https://www.paulgraham.com/articles.html",
- "https://putanumonit.com/",
"https://www.richardhanania.com/",
"https://skunkledger.substack.com/",
"https://taipology.substack.com/",
+ "https://putanumonit.com/",
+ "https://www.flyingmachinestudios.com/",
"https://www.theintrinsicperspective.com/",
"https://www.strangeloopcanon.com/",
"https://slimemoldtimemold.com/",
- "https://www.theredhandfiles.com/",
- "https://rachelbythebay.com/w/",
"https://zeroinputagriculture.substack.com/",
- "https://nadia.xyz/posts/",
"https://nayafia.substack.com",
- "https://archive.ph/o/IQUoT/https://www.bloomberg.com/opinion/authors/ARbTQlRLRjE/matthew-s-levine",
+ "https://www.paulgraham.com/articles.html",
+ "https://mcfunley.com/writing",
+ "https://www.bitsaboutmoney.com/archive/",
+ "https://akarlin.com/archive/",
+ "https://www.exurbe.com/",
+ "https://acoup.blog/",
+ "https://www.theredhandfiles.com/",
]
diff --git a/src/memory/common/parsers/feeds.py b/src/memory/common/parsers/feeds.py
new file mode 100644
index 0000000..8087272
--- /dev/null
+++ b/src/memory/common/parsers/feeds.py
@@ -0,0 +1,451 @@
+from datetime import datetime
+import logging
+import re
+from dataclasses import dataclass, field
+from typing import Any, Generator, Sequence, cast
+from urllib.parse import urljoin, urlparse
+
+import feedparser
+from bs4 import BeautifulSoup, Tag
+import requests
+
+from memory.common.parsers.html import (
+ get_base_url,
+ to_absolute_url,
+ extract_title,
+ extract_date,
+ fetch_html,
+)
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class FeedItem:
+ """Represents a single item from a feed."""
+
+ title: str
+ url: str
+ description: str = ""
+ author: str | None = None
+ published_date: datetime | None = None
+ guid: str | None = None
+ metadata: dict[str, Any] = field(default_factory=dict)
+
+
+@dataclass
+class FeedParser:
+ """Base class for feed parsers."""
+
+ url: str
+ content: str | None = None
+ since: datetime | None = None
+
+ @property
+ def base_url(self) -> str:
+ """Get the base URL of the feed."""
+ return get_base_url(self.url)
+
+ def fetch_items(self) -> Sequence[Any]:
+ """Fetch items from the feed. Override in subclasses."""
+ return []
+
+ def parse_item(self, item: Any) -> FeedItem:
+ return FeedItem(
+ title=self.extract_title(item),
+ url=self.extract_url(item),
+ description=self.extract_description(item),
+ author=self.extract_author(item),
+ published_date=self.extract_date(item),
+ guid=self.extract_guid(item),
+ metadata=self.extract_metadata(item),
+ )
+
+ def valid_item(self, item: FeedItem) -> bool:
+ return True
+
+ def parse_feed(self) -> Generator[FeedItem, None, None]:
+ """Parse feed content and return list of feed items."""
+ for item in self.fetch_items():
+ parsed_item = self.parse_item(item)
+ if self.valid_item(parsed_item):
+ yield parsed_item
+
+ def extract_title(self, entry: Any) -> str:
+ """Extract title from feed entry. Override in subclasses."""
+ return "Untitled"
+
+ def extract_url(self, entry: Any) -> str:
+ """Extract URL from feed entry. Override in subclasses."""
+ return ""
+
+ def extract_description(self, entry: Any) -> str:
+ """Extract description from feed entry. Override in subclasses."""
+ return ""
+
+ def extract_author(self, entry: Any) -> str | None:
+ """Extract author from feed entry. Override in subclasses."""
+ return None
+
+ def extract_date(self, entry: Any) -> datetime | None:
+ """Extract publication date from feed entry. Override in subclasses."""
+ return None
+
+ def extract_guid(self, entry: Any) -> str | None:
+ """Extract GUID from feed entry. Override in subclasses."""
+ return None
+
+ def extract_metadata(self, entry: Any) -> dict[str, Any]:
+ """Extract additional metadata from feed entry. Override in subclasses."""
+ return {}
+
+
+class RSSAtomParser(FeedParser):
+ """Parser for RSS and Atom feeds using feedparser."""
+
+ def fetch_items(self) -> Sequence[Any]:
+ """Fetch items from the feed."""
+ if self.since:
+ feed = feedparser.parse(self.content or self.url, modified=self.since)
+ else:
+ feed = feedparser.parse(self.content or self.url)
+ return feed.entries
+
+ def extract_title(self, entry: Any) -> str:
+ """Extract title from RSS/Atom entry."""
+ return getattr(entry, "title", "Untitled")
+
+ def extract_url(self, entry: Any) -> str:
+ """Extract URL from RSS/Atom entry."""
+ url = getattr(entry, "link", "")
+ if url and not urlparse(url).scheme:
+ url = urljoin(self.base_url, url)
+ return url
+
+ def extract_description(self, entry: Any) -> str:
+ """Extract description from RSS/Atom entry."""
+ return getattr(entry, "summary", "") or getattr(entry, "description", "")
+
+ def extract_author(self, entry: Any) -> str | None:
+ """Extract author from RSS/Atom entry."""
+ return getattr(entry, "author", None) or getattr(
+ entry, "author_detail", {}
+ ).get("name", None)
+
+ def extract_date(self, entry: Any) -> datetime | None:
+ """Extract publication date from RSS/Atom entry."""
+ for date_attr in ["published_parsed", "updated_parsed"]:
+ time_struct = getattr(entry, date_attr, None)
+ if not time_struct:
+ continue
+ try:
+ return datetime(*time_struct[:6])
+ except (TypeError, ValueError):
+ continue
+ return None
+
+ def extract_guid(self, entry: Any) -> str | None:
+ """Extract GUID from RSS/Atom entry."""
+ return getattr(entry, "id", None) or getattr(entry, "guid", None)
+
+ def extract_metadata(self, entry: Any) -> dict[str, Any]:
+ """Extract additional metadata from RSS/Atom entry."""
+ return {
+ attr: getattr(entry, attr)
+ for attr in ["tags", "category", "categories", "enclosures"]
+ if hasattr(entry, attr)
+ }
+
+
+DEFAULT_SKIP_PATTERNS = [
+ r"^#", # Fragment-only links
+ r"mailto:",
+ r"tel:",
+ r"javascript:",
+ r"\.pdf$",
+ r"\.jpg$",
+ r"\.png$",
+ r"\.gif$",
+]
+
+
+class HTMLListParser(FeedParser):
+ """Parser for HTML pages containing lists of article links.
+
+ Requires explicit selectors to be specified - no magic defaults.
+ """
+
+ item_selector: str = "li"
+ url_selector: str = "a[href]"
+ skip_patterns: list[str] = DEFAULT_SKIP_PATTERNS
+ title_selector: str | None = None
+ description_selector: str | None = None
+ date_selector: str | None = None
+ date_format: str = "%Y-%m-%d"
+
+ def fetch_items(self) -> Sequence[Any]:
+ """Fetch items from the HTML page."""
+ if not self.content:
+ self.content = cast(str, fetch_html(self.url))
+
+ soup = BeautifulSoup(self.content, "html.parser")
+ items = []
+ seen_urls = set()
+
+ tags = soup.select(self.item_selector)
+
+ for tag in tags:
+ if not isinstance(tag, Tag):
+ continue
+
+ url = self.extract_url(tag)
+ if url in seen_urls or self._should_skip_url(url):
+ continue
+ seen_urls.add(url)
+ items.append(tag)
+
+ return items
+
+ def _should_skip_url(self, url: str) -> bool:
+ """Check if URL should be skipped."""
+ return any(
+ re.search(pattern, url, re.IGNORECASE) for pattern in self.skip_patterns
+ )
+
+ def extract_title(self, entry: Any) -> str | None:
+ """Extract title from HTML entry."""
+ if self.title_selector:
+ return extract_title(entry, self.title_selector)
+
+ def extract_description(self, entry: Any) -> str | None:
+ """Extract description from HTML entry."""
+ if not self.description_selector:
+ return None
+ desc = entry.select_one(self.description_selector)
+ return desc and desc.get_text(strip=True)
+
+ def extract_url(self, entry: Any) -> str:
+ """Extract URL from HTML entry."""
+ if not (link := entry.select_one(self.url_selector)):
+ return ""
+ if not (href := link.get("href")):
+ return ""
+ return to_absolute_url(href, self.base_url)
+
+ def extract_date(self, entry: Any) -> datetime | None:
+ if self.date_selector:
+ return extract_date(entry, self.date_selector, self.date_format)
+
+
+class DanluuParser(HTMLListParser):
+ skip_patterns = [r"^https://danluu.com/#"]
+
+ def valid_item(self, item: FeedItem) -> bool:
+ return item.url.startswith(self.base_url)
+
+
+class GuzeyParser(HTMLListParser):
+ item_selector = "li a[href]"
+ skip_patterns = DEFAULT_SKIP_PATTERNS + [r"docs\.google\.com"]
+
+ def valid_item(self, item: FeedItem) -> bool:
+ # Only include items that are actual blog posts (relative URLs or guzey.com URLs)
+ return (
+ item.url.startswith(self.base_url)
+ or item.url.startswith("../")
+ or not item.url.startswith("http")
+ )
+
+
+class PaulGrahamParser(HTMLListParser):
+ item_selector = "font a[href]"
+ skip_patterns = DEFAULT_SKIP_PATTERNS + [
+ r"\.txt$", # Skip text files
+ r"turbifycdn\.com", # Skip CDN links
+ ]
+
+ def valid_item(self, item: FeedItem) -> bool:
+ # Only include items that are actual essays (relative URLs ending in .html)
+ return (
+ item.url.endswith(".html")
+ and not item.url.startswith("http")
+ and len(item.title) > 5 # Filter out very short titles
+ )
+
+
+class NadiaXyzParser(HTMLListParser):
+ item_selector = ".blog.all li"
+ skip_patterns = DEFAULT_SKIP_PATTERNS + [
+ r"twitter\.com",
+ r"newsletter",
+ r"projects",
+ r"notes",
+ ]
+ date_selector = ".date"
+ date_format = "%B %d, %Y"
+ description_selector = "p"
+
+ def valid_item(self, item: FeedItem) -> bool:
+ # Only include actual blog posts (relative URLs or nadia.xyz URLs)
+ return (
+ item.url.startswith(self.base_url)
+ or item.url.startswith("/")
+ or (not item.url.startswith("http") and item.url.endswith("/"))
+ )
+
+
+class RedHandFilesParser(HTMLListParser):
+ item_selector = "article, .issue, .post"
+ url_selector = "a[href]"
+ title_selector = "h2, .issue-title"
+ description_selector = "p"
+ skip_patterns = DEFAULT_SKIP_PATTERNS + [
+ r"/joy",
+ r"/about",
+ r"/subscribe",
+ r"/ask",
+ r"privacy-policy",
+ r"#",
+ ]
+
+ def valid_item(self, item: FeedItem) -> bool:
+ # Only include actual issues (should have "Issue #" in title or URL)
+ return (
+ item.url.startswith(self.base_url)
+ and ("issue" in item.url.lower() or "issue #" in item.title.lower())
+ and len(item.title) > 10
+ )
+
+ def extract_title(self, entry: Any) -> str:
+ """Extract title, combining issue number and question."""
+ # Look for issue number
+ issue_elem = entry.select_one("h3, .issue-number")
+ issue_text = issue_elem.get_text(strip=True) if issue_elem else ""
+
+ # Look for the main question/title
+ title_elem = entry.select_one("h2, .issue-title, .question")
+ title_text = title_elem.get_text(strip=True) if title_elem else ""
+
+ # Combine them
+ if issue_text and title_text:
+ return f"{issue_text}: {title_text}"
+ elif title_text:
+ return title_text
+ elif issue_text:
+ return issue_text
+
+ # Fallback to any link text
+ link = entry.select_one(self.url_selector)
+ return link.get_text(strip=True) if link else "Untitled"
+
+ def extract_description(self, entry: Any) -> str:
+ """Extract the question text as description."""
+ # Look for the question text in h2 or similar
+ desc_elem = entry.select_one("h2, .question, .issue-title")
+ if desc_elem:
+ text = desc_elem.get_text(strip=True)
+ # Clean up and truncate if too long
+ if len(text) > 200:
+ text = text[:200] + "..."
+ return text
+ return ""
+
+
+class BloombergAuthorParser(HTMLListParser):
+ item_selector = "section#author_page article"
+ url_selector = "a[href]"
+ title_selector = "article div a"
+ description_selector = "article div section"
+ skip_patterns = DEFAULT_SKIP_PATTERNS + [
+ r"/authors/",
+ r"/topics/",
+ r"/subscribe",
+ r"/newsletter/",
+ r"#",
+ r"mailto:",
+ ]
+
+ def valid_item(self, item: FeedItem) -> bool:
+ # Only include actual articles
+ return (
+ (
+ item.url.startswith("https://www.bloomberg.com")
+ or item.url.startswith("https://archive.ph")
+ or item.url.startswith("/")
+ )
+ and (
+ "opinion" in item.url.lower()
+ or "news" in item.url.lower()
+ or len(item.url.split("/")) > 4
+ )
+ and len(item.title) > 10
+ )
+
+
+def is_rss_feed(content: str) -> bool:
+ """Check if content appears to be an XML feed."""
+ content_lower = content.strip().lower()
+ return (
+ content_lower.startswith(" str | None:
+ if not (href := element.get("href")):
+ return None
+
+ return to_absolute_url(str(href), base_url)
+
+
+def find_feed_link(url: str, soup: BeautifulSoup) -> str | None:
+ head = soup.find("head")
+ if not head:
+ return None
+ for type_ in ["application/rss+xml", "application/atom+xml"]:
+ links = head.find_all("link", {"rel": "alternate", "type": type_}) # type: ignore
+ for link in links:
+ if not isinstance(link, Tag):
+ continue
+ if not (link_url := extract_url(link, url)):
+ continue
+ if link_url.rstrip("/") != url.rstrip("/"):
+ return link_url
+ return None
+
+
+PARSER_REGISTRY = {
+ r"https://danluu.com": DanluuParser,
+ r"https://guzey.com/archive": GuzeyParser,
+ r"https://www.paulgraham.com/articles": PaulGrahamParser,
+ r"https://nadia.xyz/posts": NadiaXyzParser,
+ r"https://www.theredhandfiles.com": RedHandFilesParser,
+ r"https://archive.ph/.*?/https://www.bloomberg.com/opinion/authors/": BloombergAuthorParser,
+}
+
+
+def get_feed_parser(url: str, check_from: datetime | None = None) -> FeedParser | None:
+ for pattern, parser_class in PARSER_REGISTRY.items():
+ if re.search(pattern, url.rstrip("/")):
+ return parser_class(url=url, since=check_from)
+
+ text = cast(str, fetch_html(url))
+ if is_rss_feed(text):
+ return RSSAtomParser(url=url, content=text, since=check_from)
+
+ soup = BeautifulSoup(text, "html.parser")
+ if feed_link := find_feed_link(url, soup):
+ return RSSAtomParser(url=feed_link, since=check_from)
+
+ for path in ["/archive", "/posts", "/feed"]:
+ if url.rstrip("/").endswith(path):
+ continue
+ try:
+ if parser := get_feed_parser(url + path, check_from):
+ return parser
+ except requests.HTTPError:
+ continue
+
+ return None
diff --git a/src/memory/common/parsers/html.py b/src/memory/common/parsers/html.py
index ab64d56..1842456 100644
--- a/src/memory/common/parsers/html.py
+++ b/src/memory/common/parsers/html.py
@@ -17,6 +17,20 @@ from memory.common import settings
logger = logging.getLogger(__name__)
+def fetch_html(url: str, as_bytes: bool = False) -> str | bytes:
+ response = requests.get(
+ url,
+ timeout=30,
+ headers={
+ "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:137.0) Gecko/20100101 Firefox/137.0"
+ },
+ )
+ response.raise_for_status()
+ if as_bytes:
+ return response.content
+ return response.text
+
+
@dataclass
class Article:
"""Structured representation of a web article."""
@@ -135,9 +149,7 @@ def process_image(url: str, image_dir: pathlib.Path) -> PILImage.Image | None:
# Download if not already cached
if not local_path.exists():
- response = requests.get(url, timeout=30)
- response.raise_for_status()
- local_path.write_bytes(response.content)
+ local_path.write_bytes(fetch_html(url, as_bytes=True))
try:
return PILImage.open(local_path)
@@ -153,10 +165,10 @@ def process_images(
Process all images in content: download them, update URLs, and return PIL Images.
Returns:
- Tuple of (updated_content, list_of_pil_images)
+ Tuple of (updated_content, dict_of_pil_images)
"""
if not content:
- return content, []
+ return content, {}
images = {}
diff --git a/tests/memory/common/parsers/test_archives.py b/tests/memory/common/parsers/test_archives.py
new file mode 100644
index 0000000..b5df1b1
--- /dev/null
+++ b/tests/memory/common/parsers/test_archives.py
@@ -0,0 +1,176 @@
+import pytest
+from unittest.mock import Mock, patch
+from bs4 import BeautifulSoup
+
+from memory.common.parsers.archives import (
+ ArchiveParser,
+ WordPressArchiveParser,
+ SubstackArchiveParser,
+ get_archive_parser,
+)
+
+
+class TestArchiveParser:
+ def test_init(self):
+ parser = ArchiveParser(url="https://example.com")
+ assert parser.url == "https://example.com"
+ assert parser._visited_urls == set()
+ assert parser._all_items == []
+ assert parser.max_pages == 100
+ assert parser.delay_between_requests == 1.0
+
+ def test_extract_items_from_page(self):
+ html = """
+
+ """
+ soup = BeautifulSoup(html, "html.parser")
+ parser = ArchiveParser(url="https://example.com")
+
+ items = parser._extract_items_from_page(soup)
+ assert len(items) == 2 # Duplicates should be filtered out
+
+ def test_find_next_page_url_with_selector(self):
+ html = ''
+ soup = BeautifulSoup(html, "html.parser")
+ parser = ArchiveParser(url="https://example.com")
+ parser.next_page_selector = ".next"
+
+ next_url = parser._find_next_page_url(soup, "https://example.com/page/1")
+ assert next_url == "https://example.com/page/2"
+
+ def test_find_next_page_url_heuristic(self):
+ html = ''
+ soup = BeautifulSoup(html, "html.parser")
+ parser = ArchiveParser(url="https://example.com")
+
+ next_url = parser._find_next_page_url(soup, "https://example.com/page/1")
+ assert next_url == "https://example.com/page/2"
+
+ def test_find_next_page_url_contains_text(self):
+ html = ''
+ soup = BeautifulSoup(html, "html.parser")
+ parser = ArchiveParser(url="https://example.com")
+
+ next_url = parser._find_next_page_heuristic(soup)
+ assert next_url == "https://example.com/page/2"
+
+ def test_find_next_numeric_page(self):
+ parser = ArchiveParser(url="https://example.com")
+ parser.page_url_pattern = "/page/{page}"
+
+ # Test with existing page number
+ next_url = parser._find_next_numeric_page("https://example.com/page/3")
+ assert next_url == "https://example.com/page/4"
+
+ # Test without page number (assume page 1)
+ next_url = parser._find_next_numeric_page("https://example.com/archive")
+ assert next_url == "https://example.com/archive/page/2"
+
+ @patch("memory.common.parsers.archives.fetch_html")
+ @patch("time.sleep")
+ def test_fetch_items_pagination(self, mock_sleep, mock_fetch):
+ # Mock HTML for two pages
+ page1_html = """
+
+ """
+ page2_html = """
+
+ """
+
+ mock_fetch.side_effect = [page1_html, page2_html]
+
+ parser = ArchiveParser(url="https://example.com/page/1")
+ parser.delay_between_requests = 0.1 # Speed up test
+
+ items = parser.fetch_items()
+
+ assert len(items) == 4
+ assert mock_fetch.call_count == 2
+ assert mock_sleep.call_count == 1 # One delay between requests
+
+ @patch("memory.common.parsers.archives.fetch_html")
+ def test_fetch_items_stops_at_max_pages(self, mock_fetch):
+ # Mock HTML that always has a next page
+ html_with_next = """
+
+ """
+
+ mock_fetch.return_value = html_with_next
+
+ parser = ArchiveParser(url="https://example.com/page/1")
+ parser.max_pages = 3
+ parser.delay_between_requests = 0 # No delay for test
+
+ items = parser.fetch_items()
+
+ assert mock_fetch.call_count == 3 # Should stop at max_pages
+
+ @patch("memory.common.parsers.archives.fetch_html")
+ def test_fetch_items_handles_duplicate_urls(self, mock_fetch):
+ # Mock HTML that creates a cycle
+ page1_html = """
+
+ """
+ page2_html = """
+
+ """
+
+ mock_fetch.side_effect = [page1_html, page2_html]
+
+ parser = ArchiveParser(url="https://example.com/page/1")
+ parser.delay_between_requests = 0
+
+ items = parser.fetch_items()
+
+ assert len(items) == 2
+ assert mock_fetch.call_count == 2 # Should stop when it hits visited URL
+
+
+class TestWordPressArchiveParser:
+ def test_selectors(self):
+ parser = WordPressArchiveParser(url="https://example.wordpress.com")
+ assert parser.item_selector == "article, .post"
+ assert parser.next_page_selector == '.nav-previous a, .next a, a[rel="next"]'
+ assert parser.title_selector == ".entry-title a, h1 a, h2 a"
+
+
+class TestSubstackArchiveParser:
+ def test_selectors(self):
+ parser = SubstackArchiveParser(url="https://example.substack.com")
+ assert parser.item_selector == ".post-preview, .post"
+ assert parser.next_page_selector == ".pagination .next"
+
+
+class TestGetArchiveParser:
+ @pytest.mark.parametrize(
+ "url,expected_class",
+ [
+ ("https://example.wordpress.com/archive", WordPressArchiveParser),
+ ("https://example.substack.com/archive", SubstackArchiveParser),
+ ("https://example.com/archive", ArchiveParser), # Default
+ ],
+ )
+ def test_get_archive_parser(self, url, expected_class):
+ parser = get_archive_parser(url)
+ assert isinstance(parser, expected_class)
+ assert parser.url == url
diff --git a/tests/memory/common/parsers/test_feeds.py b/tests/memory/common/parsers/test_feeds.py
new file mode 100644
index 0000000..b1d30b6
--- /dev/null
+++ b/tests/memory/common/parsers/test_feeds.py
@@ -0,0 +1,738 @@
+from datetime import datetime
+from unittest.mock import MagicMock, patch
+from typing import Any, cast
+
+import pytest
+from bs4 import BeautifulSoup, Tag
+import requests
+
+from memory.common.parsers.feeds import (
+ FeedItem,
+ FeedParser,
+ RSSAtomParser,
+ HTMLListParser,
+ DanluuParser,
+ GuzeyParser,
+ PaulGrahamParser,
+ NadiaXyzParser,
+ RedHandFilesParser,
+ BloombergAuthorParser,
+ is_rss_feed,
+ extract_url,
+ find_feed_link,
+ get_feed_parser,
+ DEFAULT_SKIP_PATTERNS,
+ PARSER_REGISTRY,
+)
+
+
+def test_feed_parser_base_url():
+ parser = FeedParser(url="https://example.com/path/to/feed")
+ assert parser.base_url == "https://example.com"
+
+
+def test_feed_parser_parse_feed_empty():
+ parser = FeedParser(url="https://example.com")
+ items = list(parser.parse_feed())
+ assert items == []
+
+
+def test_feed_parser_parse_feed_with_items():
+ class TestParser(FeedParser):
+ def fetch_items(self):
+ return ["item1", "item2"]
+
+ def extract_title(self, entry):
+ return f"Title for {entry}"
+
+ def extract_url(self, entry):
+ return f"https://example.com/{entry}"
+
+ parser = TestParser(url="https://example.com")
+ assert list(parser.parse_feed()) == [
+ FeedItem(title="Title for item1", url="https://example.com/item1"),
+ FeedItem(title="Title for item2", url="https://example.com/item2"),
+ ]
+
+
+def test_feed_parser_parse_feed_with_invalid_items():
+ class TestParser(FeedParser):
+ def fetch_items(self):
+ return ["valid", "invalid"]
+
+ def extract_title(self, entry):
+ return f"Title for {entry}"
+
+ def extract_url(self, entry):
+ return f"https://example.com/{entry}"
+
+ def valid_item(self, item):
+ return item.title == "Title for valid"
+
+ parser = TestParser(url="https://example.com")
+ assert list(parser.parse_feed()) == [
+ FeedItem(title="Title for valid", url="https://example.com/valid"),
+ ]
+
+
+@patch("memory.common.parsers.feeds.feedparser.parse")
+@pytest.mark.parametrize("since_date", [None, datetime(2023, 1, 1)])
+def test_rss_atom_parser_fetch_items(mock_parse, since_date):
+ mock_feed = MagicMock()
+ mock_feed.entries = ["entry1", "entry2"]
+ mock_parse.return_value = mock_feed
+
+ parser = RSSAtomParser(url="https://example.com/feed.xml", since=since_date)
+ items = parser.fetch_items()
+
+ if since_date:
+ mock_parse.assert_called_once_with(
+ "https://example.com/feed.xml", modified=since_date
+ )
+ else:
+ mock_parse.assert_called_once_with("https://example.com/feed.xml")
+ assert items == ["entry1", "entry2"]
+
+
+@patch("memory.common.parsers.feeds.feedparser.parse")
+def test_rss_atom_parser_fetch_items_with_content(mock_parse):
+ mock_feed = MagicMock()
+ mock_feed.entries = ["entry1"]
+ mock_parse.return_value = mock_feed
+
+ content = "... "
+ parser = RSSAtomParser(url="https://example.com/feed.xml", content=content)
+ items = parser.fetch_items()
+
+ mock_parse.assert_called_once_with(content)
+ assert items == ["entry1"]
+
+
+@pytest.mark.parametrize(
+ "entry_attrs, expected",
+ [
+ ({"title": "Test Title"}, "Test Title"),
+ ({}, "Untitled"),
+ ],
+)
+def test_rss_atom_parser_extract_title(entry_attrs, expected):
+ parser = RSSAtomParser(url="https://example.com")
+ entry = MagicMock()
+
+ for attr, value in entry_attrs.items():
+ setattr(entry, attr, value)
+
+ # Remove attributes not in entry_attrs
+ if "title" not in entry_attrs:
+ del entry.title
+
+ assert parser.extract_title(entry) == expected
+
+
+@pytest.mark.parametrize(
+ "entry_attrs, expected",
+ [
+ ({"link": "https://other.com/article"}, "https://other.com/article"),
+ ({"link": "/article"}, "https://example.com/article"),
+ ({}, ""),
+ ],
+)
+def test_rss_atom_parser_extract_url(entry_attrs, expected):
+ parser = RSSAtomParser(url="https://example.com")
+ entry = MagicMock()
+
+ for attr, value in entry_attrs.items():
+ setattr(entry, attr, value)
+
+ if "link" not in entry_attrs:
+ del entry.link
+
+ assert parser.extract_url(entry) == expected
+
+
+@pytest.mark.parametrize(
+ "entry_attrs, expected",
+ [
+ (
+ {"summary": "Test summary", "description": "Test description"},
+ "Test summary",
+ ),
+ ({"summary": "", "description": "Test description"}, "Test description"),
+ ({}, ""),
+ ],
+)
+def test_rss_atom_parser_extract_description(entry_attrs, expected):
+ parser = RSSAtomParser(url="https://example.com")
+ entry = MagicMock()
+
+ for attr, value in entry_attrs.items():
+ setattr(entry, attr, value)
+
+ for attr in ["summary", "description"]:
+ if attr not in entry_attrs:
+ delattr(entry, attr)
+
+ assert parser.extract_description(entry) == expected
+
+
+@pytest.mark.parametrize(
+ "entry_attrs, expected",
+ [
+ ({"author": "John Doe"}, "John Doe"),
+ ({"author": None, "author_detail": {"name": "Jane Smith"}}, "Jane Smith"),
+ ({"author": None, "author_detail": {}}, None),
+ ],
+)
+def test_rss_atom_parser_extract_author(entry_attrs, expected):
+ parser = RSSAtomParser(url="https://example.com")
+ entry = MagicMock()
+
+ for attr, value in entry_attrs.items():
+ setattr(entry, attr, value)
+
+ assert parser.extract_author(entry) == expected
+
+
+@pytest.mark.parametrize(
+ "entry_attrs, expected",
+ [
+ (
+ {
+ "published_parsed": (2023, 1, 15, 10, 30, 0, 0, 0, 0),
+ "updated_parsed": None,
+ },
+ datetime(2023, 1, 15, 10, 30, 0),
+ ),
+ (
+ {
+ "published_parsed": None,
+ "updated_parsed": (2023, 2, 20, 14, 45, 30, 0, 0, 0),
+ },
+ datetime(2023, 2, 20, 14, 45, 30),
+ ),
+ ({"published_parsed": "invalid", "updated_parsed": None}, None),
+ ({}, None),
+ ],
+)
+def test_rss_atom_parser_extract_date(entry_attrs, expected):
+ parser = RSSAtomParser(url="https://example.com")
+ entry = MagicMock()
+
+ for attr, value in entry_attrs.items():
+ setattr(entry, attr, value)
+
+ for attr in ["published_parsed", "updated_parsed"]:
+ if attr not in entry_attrs:
+ delattr(entry, attr)
+
+ assert parser.extract_date(entry) == expected
+
+
+@pytest.mark.parametrize(
+ "entry_attrs, expected",
+ [
+ ({"id": "unique-id-123", "guid": "guid-456"}, "unique-id-123"),
+ ({"id": None, "guid": "guid-456"}, "guid-456"),
+ ({"id": None, "guid": None}, None),
+ ],
+)
+def test_rss_atom_parser_extract_guid(entry_attrs, expected):
+ parser = RSSAtomParser(url="https://example.com")
+ entry = MagicMock()
+
+ for attr, value in entry_attrs.items():
+ setattr(entry, attr, value)
+
+ assert parser.extract_guid(entry) == expected
+
+
+def test_rss_atom_parser_extract_metadata():
+ parser = RSSAtomParser(url="https://example.com")
+
+ entry = MagicMock()
+ entry.tags = ["tag1", "tag2"]
+ entry.category = "news"
+ entry.categories = ["tech", "science"]
+ entry.enclosures = ["file1.mp3"]
+ entry.other_attr = "should not be included"
+
+ metadata = parser.extract_metadata(entry)
+
+ assert metadata == {
+ "tags": ["tag1", "tag2"],
+ "category": "news",
+ "categories": ["tech", "science"],
+ "enclosures": ["file1.mp3"],
+ }
+
+
+@patch("memory.common.parsers.feeds.fetch_html")
+def test_html_list_parser_fetch_items_with_content(mock_fetch_html):
+ html = """
+
+ """
+
+ parser = HTMLListParser(url="https://example.com", content=html)
+ assert [a.prettify() for a in parser.fetch_items()] == [
+ '\n \n Article 1\n \n \n',
+ '\n \n Article 2\n \n \n',
+ ]
+
+ mock_fetch_html.assert_not_called()
+
+
+@patch("memory.common.parsers.feeds.fetch_html")
+def test_html_list_parser_fetch_items_without_content(mock_fetch_html):
+ html = """
+
+ """
+ mock_fetch_html.return_value = html
+
+ parser = HTMLListParser(url="https://example.com")
+ assert [a.prettify() for a in parser.fetch_items()] == [
+ '\n \n Article 1\n \n \n',
+ ]
+
+ mock_fetch_html.assert_called_once_with("https://example.com")
+
+
+def test_html_list_parser_fetch_items_deduplication():
+ html = """
+
+ """
+
+ parser = HTMLListParser(url="https://example.com", content=html)
+ assert [a.prettify() for a in parser.fetch_items()] == [
+ '\n \n Article 1\n \n \n',
+ '\n \n Article 2\n \n \n',
+ ]
+
+
+@pytest.mark.parametrize(
+ "url, should_skip",
+ [
+ ("#fragment", True),
+ ("mailto:test@example.com", True),
+ ("tel:+1234567890", True),
+ ("javascript:void(0)", True),
+ ("document.pdf", True),
+ ("image.jpg", True),
+ ("photo.png", True),
+ ("animation.gif", True),
+ ("https://example.com/article", False),
+ ("/relative/path", False),
+ ],
+)
+def test_html_list_parser_should_skip_url(url, should_skip):
+ parser = HTMLListParser(url="https://example.com")
+ assert parser._should_skip_url(url) == should_skip
+
+
+@pytest.mark.parametrize(
+ "html, title_selector, expected",
+ [
+ (
+ 'Custom Title Link ',
+ "h2",
+ "Custom Title",
+ ),
+ ('Link ', None, None),
+ ],
+)
+def test_html_list_parser_extract_title(html, title_selector, expected):
+ soup = BeautifulSoup(html, "html.parser")
+ item = soup.find("li")
+
+ parser = HTMLListParser(url="https://example.com")
+ parser.title_selector = title_selector
+
+ if expected and title_selector:
+ with patch("memory.common.parsers.feeds.extract_title") as mock_extract:
+ mock_extract.return_value = expected
+ title = parser.extract_title(item)
+ mock_extract.assert_called_once_with(item, title_selector)
+ assert title == expected
+ else:
+ assert parser.extract_title(item) is None
+
+
+@pytest.mark.parametrize(
+ "html, description_selector, expected",
+ [
+ (
+ 'Description text
Link ',
+ "p",
+ "Description text",
+ ),
+ ('Link ', None, None),
+ ],
+)
+def test_html_list_parser_extract_description(html, description_selector, expected):
+ soup = BeautifulSoup(html, "html.parser")
+ item = soup.find("li")
+
+ parser = HTMLListParser(url="https://example.com")
+ parser.description_selector = description_selector
+
+ assert parser.extract_description(item) == expected
+
+
+@pytest.mark.parametrize(
+ "html, expected",
+ [
+ ('Article ', "https://example.com/article"),
+ ("No link here ", ""),
+ ],
+)
+def test_html_list_parser_extract_url(html, expected):
+ soup = BeautifulSoup(html, "html.parser")
+ item = soup.find("li")
+
+ parser = HTMLListParser(url="https://example.com")
+ assert parser.extract_url(item) == expected
+
+
+def test_html_list_parser_extract_date_with_selector():
+ html = '2023-01-15 Link '
+ soup = BeautifulSoup(html, "html.parser")
+ item = soup.find("li")
+
+ parser = HTMLListParser(url="https://example.com")
+ parser.date_selector = ".date"
+
+ with patch("memory.common.parsers.feeds.extract_date") as mock_extract:
+ mock_extract.return_value = datetime(2023, 1, 15)
+ date = parser.extract_date(item)
+ mock_extract.assert_called_once_with(item, ".date", "%Y-%m-%d")
+ assert date == datetime(2023, 1, 15)
+
+
+def test_html_list_parser_extract_date_without_selector():
+ html = 'Link '
+ soup = BeautifulSoup(html, "html.parser")
+ item = soup.find("li")
+
+ parser = HTMLListParser(url="https://example.com")
+ assert parser.extract_date(item) is None
+
+
+@pytest.mark.parametrize(
+ "parser_class, url, valid_urls, invalid_urls",
+ [
+ (
+ DanluuParser,
+ "https://danluu.com",
+ ["https://danluu.com/article"],
+ ["https://other.com/article"],
+ ),
+ (
+ GuzeyParser,
+ "https://guzey.com/archive",
+ ["https://guzey.com/archive/article", "../relative", "relative"],
+ ["https://other.com/article"],
+ ),
+ (
+ PaulGrahamParser,
+ "https://www.paulgraham.com/articles",
+ [("Long enough title", "essay.html")],
+ [
+ ("Short", "essay.html"),
+ ("Long enough title", "https://other.com/essay.html"),
+ ("Long enough title", "document.txt"),
+ ],
+ ),
+ (
+ NadiaXyzParser,
+ "https://nadia.xyz/posts",
+ ["https://nadia.xyz/posts/article", "/article", "article/"],
+ ["https://other.com/article"],
+ ),
+ (
+ RedHandFilesParser,
+ "https://www.theredhandfiles.com",
+ [
+ (
+ "Issue #123: Long question",
+ "https://www.theredhandfiles.com/issue-123",
+ ),
+ ("Long enough title", "https://www.theredhandfiles.com/some-issue"),
+ ],
+ [
+ ("Short", "https://www.theredhandfiles.com/issue-123"),
+ ("Long enough title", "https://other.com/issue"),
+ ("Long enough title", "https://www.theredhandfiles.com/about"),
+ ],
+ ),
+ (
+ BloombergAuthorParser,
+ "https://archive.ph/123/https://www.bloomberg.com/opinion/authors/",
+ [
+ (
+ "Long enough title",
+ "https://www.bloomberg.com/opinion/articles/2023/01/15/article",
+ ),
+ ("Long enough title", "/news/articles/2023/01/15/article"),
+ (
+ "Long enough title",
+ "https://archive.ph/2023/01/15/some/article/path",
+ ),
+ ],
+ [
+ (
+ "Short",
+ "https://www.bloomberg.com/opinion/articles/2023/01/15/article",
+ ),
+ ("Long enough title", "https://other.com/article"),
+ ("Long enough title", "https://www.bloomberg.com/simple"),
+ ],
+ ),
+ ],
+)
+def test_specific_parsers_valid_item(parser_class, url, valid_urls, invalid_urls):
+ parser = parser_class(url=url)
+
+ # Test valid items
+ for item_data in valid_urls:
+ if isinstance(item_data, tuple):
+ title, url_val = item_data
+ item = FeedItem(title=title, url=url_val)
+ else:
+ item = FeedItem(title="Test", url=item_data)
+ assert parser.valid_item(item) is True
+
+ # Test invalid items
+ for item_data in invalid_urls:
+ if isinstance(item_data, tuple):
+ title, url_val = item_data
+ item = FeedItem(title=title, url=url_val)
+ else:
+ item = FeedItem(title="Test", url=item_data)
+ assert parser.valid_item(item) is False
+
+
+def test_red_hand_files_extract_title():
+ html = """
+
+ Issue #123
+ What is the meaning of life?
+ Link
+
+ """
+ soup = BeautifulSoup(html, "html.parser")
+ item = soup.find("article")
+
+ parser = RedHandFilesParser(url="https://www.theredhandfiles.com")
+ title = parser.extract_title(item)
+ assert title == "Issue #123: What is the meaning of life?"
+
+
+def test_red_hand_files_extract_description():
+ # Create a text that's definitely longer than 200 characters
+ long_text = "This is a very long question that should be truncated because it exceeds the maximum length limit of 200 characters and we want to make sure that the description is not too long for display purposes and this text continues to be very long indeed to ensure truncation happens"
+ html = f"""
+
+ {long_text}
+
+ """
+ soup = BeautifulSoup(html, "html.parser")
+ item = soup.find("article")
+
+ parser = RedHandFilesParser(url="https://www.theredhandfiles.com")
+ description = parser.extract_description(item)
+ assert len(description) <= 203 # 200 + "..."
+ assert description.endswith("...")
+
+
+@pytest.mark.parametrize(
+ "content, expected",
+ [
+ ("", True),
+ ("", True),
+ ("", True),
+ ("", True),
+ (" ", True), # Case insensitive
+ ("Not a feed", False),
+ ("Plain text content", False),
+ ("", False),
+ ],
+)
+def test_is_rss_feed(content, expected):
+ assert is_rss_feed(content) == expected
+
+
+@pytest.mark.parametrize(
+ "html, expected",
+ [
+ ('Link ', "https://example.com/relative/path"),
+ ("Link without href ", None),
+ ],
+)
+def test_extract_url_function(html, expected):
+ soup = BeautifulSoup(html, "html.parser")
+ element = soup.find("a")
+ assert element is not None
+
+ url = extract_url(cast(Tag, element), "https://example.com")
+ assert url == expected
+
+
+@pytest.mark.parametrize(
+ "html, expected",
+ [
+ (
+ """
+
+
+
+
+
+
+ """,
+ "https://example.com/feed.xml",
+ ),
+ ("No head", None),
+ (
+ """
+
+
+
+
+
+ """,
+ None,
+ ), # Should not return same URL
+ ],
+)
+def test_find_feed_link(html, expected):
+ soup = BeautifulSoup(html, "html.parser")
+ feed_link = find_feed_link("https://example.com", soup)
+ assert feed_link == expected
+
+
+@pytest.mark.parametrize(
+ "url, expected_parser_class",
+ [
+ ("https://danluu.com", DanluuParser),
+ ("https://guzey.com/archive", GuzeyParser),
+ ("https://www.paulgraham.com/articles", PaulGrahamParser),
+ ("https://nadia.xyz/posts", NadiaXyzParser),
+ ("https://www.theredhandfiles.com", RedHandFilesParser),
+ (
+ "https://archive.ph/abc123/https://www.bloomberg.com/opinion/authors/john-doe",
+ BloombergAuthorParser,
+ ),
+ ],
+)
+def test_get_feed_parser_registry(url, expected_parser_class):
+ parser = get_feed_parser(url)
+ assert parser is not None
+ assert isinstance(parser, expected_parser_class)
+ assert parser.url == url
+
+
+@patch("memory.common.parsers.feeds.fetch_html")
+def test_get_feed_parser_rss_content(mock_fetch_html):
+ mock_fetch_html.return_value = ""
+
+ parser = get_feed_parser("https://example.com/unknown")
+ assert isinstance(parser, RSSAtomParser)
+ assert parser.url == "https://example.com/unknown"
+
+
+@patch("memory.common.parsers.feeds.fetch_html")
+def test_get_feed_parser_with_feed_link(mock_fetch_html):
+ html = """
+
+
+
+
+
+ """
+ mock_fetch_html.return_value = html
+
+ parser = get_feed_parser("https://example.com")
+ assert isinstance(parser, RSSAtomParser)
+ assert parser.url == "https://example.com/feed.xml"
+
+
+@patch("memory.common.parsers.feeds.fetch_html")
+def test_get_feed_parser_recursive_paths(mock_fetch_html):
+ # Mock the initial call to return HTML without feed links
+ html = "No feed links"
+ mock_fetch_html.return_value = html
+
+ # Mock the recursive calls to avoid actual HTTP requests
+ with patch("memory.common.parsers.feeds.get_feed_parser") as mock_recursive:
+ # Set up the mock to return None for recursive calls
+ mock_recursive.return_value = None
+
+ # Call the original function directly
+ from memory.common.parsers.feeds import (
+ get_feed_parser as original_get_feed_parser,
+ )
+
+ parser = original_get_feed_parser("https://example.com")
+
+ assert parser is None
+
+
+@patch("memory.common.parsers.feeds.fetch_html")
+def test_get_feed_parser_no_match(mock_fetch_html):
+ html = "No feed links"
+ mock_fetch_html.return_value = html
+
+ # Mock the recursive calls to avoid actual HTTP requests
+ with patch("memory.common.parsers.feeds.get_feed_parser") as mock_recursive:
+ mock_recursive.return_value = None
+ parser = get_feed_parser("https://unknown.com")
+
+ assert parser is None
+
+
+def test_get_feed_parser_with_check_from():
+ check_from = datetime(2023, 1, 1)
+ parser = get_feed_parser("https://danluu.com", check_from)
+ assert isinstance(parser, DanluuParser)
+ assert parser.since == check_from
+
+
+def test_parser_registry_completeness():
+ """Ensure PARSER_REGISTRY contains expected parsers."""
+ expected_patterns = [
+ r"https://danluu.com",
+ r"https://guzey.com/archive",
+ r"https://www.paulgraham.com/articles",
+ r"https://nadia.xyz/posts",
+ r"https://www.theredhandfiles.com",
+ r"https://archive.ph/.*?/https://www.bloomberg.com/opinion/authors/",
+ ]
+
+ assert len(PARSER_REGISTRY) == len(expected_patterns)
+ for pattern in expected_patterns:
+ assert pattern in PARSER_REGISTRY
+
+
+def test_default_skip_patterns():
+ """Ensure DEFAULT_SKIP_PATTERNS contains expected patterns."""
+ expected_patterns = [
+ r"^#",
+ r"mailto:",
+ r"tel:",
+ r"javascript:",
+ r"\.pdf$",
+ r"\.jpg$",
+ r"\.png$",
+ r"\.gif$",
+ ]
+
+ assert DEFAULT_SKIP_PATTERNS == expected_patterns
diff --git a/tests/memory/common/parsers/test_html.py b/tests/memory/common/parsers/test_html.py
index 0ad7e1f..dbba9ca 100644
--- a/tests/memory/common/parsers/test_html.py
+++ b/tests/memory/common/parsers/test_html.py
@@ -325,7 +325,13 @@ def test_process_image_success(mock_pil_open, mock_requests_get):
result = process_image(url, image_dir)
# Verify HTTP request was made
- mock_requests_get.assert_called_once_with(url, timeout=30)
+ mock_requests_get.assert_called_once_with(
+ url,
+ timeout=30,
+ headers={
+ "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:137.0) Gecko/20100101 Firefox/137.0"
+ },
+ )
mock_response.raise_for_status.assert_called_once()
# Verify image was opened