diff --git a/src/memory/common/parsers/archives.py b/src/memory/common/parsers/archives.py
new file mode 100644
index 0000000..afc0cb4
--- /dev/null
+++ b/src/memory/common/parsers/archives.py
@@ -0,0 +1,301 @@
+from dataclasses import dataclass, field
+import logging
+import re
+import time
+from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
+from typing import Generator, cast
+
+from bs4 import BeautifulSoup
+from memory.common.parsers.blogs import is_substack
+
+from memory.common.parsers.feeds import (
+ DanluuParser,
+ HTMLListParser,
+ RiftersParser,
+ FeedItem,
+ FeedParser,
+ SubstackAPIParser,
+)
+from memory.common.parsers.html import (
+ fetch_html,
+ extract_url,
+ get_base_url,
+)
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class ArchiveFetcher:
+ """Fetches complete backlogs from sites with pagination."""
+
+ parser_class: type[FeedParser]
+ start_url: str
+ max_pages: int = 100
+ delay_between_requests: float = 1.0
+ parser_kwargs: dict = field(default_factory=dict)
+
+ def make_parser(self, url: str) -> FeedParser:
+ parser = self.parser_class(url=url)
+ for key, value in self.parser_kwargs.items():
+ setattr(parser, key, value)
+ return parser
+
+ def fetch_all_items(self) -> Generator[FeedItem, None, None]:
+ """Fetch all items from all pages."""
+ visited_urls = set()
+ current_url = self.start_url
+ page_count = 0
+ total_items = 0
+
+ while current_url and page_count < self.max_pages:
+ if current_url in visited_urls:
+ logger.warning(f"Already visited {current_url}, stopping")
+ break
+
+ logger.info(f"Fetching page {page_count + 1}: {current_url}")
+ visited_urls.add(current_url)
+
+ try:
+ parser = self.make_parser(current_url)
+
+ items = parser.parse_feed()
+ if not items:
+ break
+
+ prev_items = total_items
+ for item in items:
+ total_items += 1
+ yield item
+
+ if prev_items == total_items:
+ logger.warning(f"No new items found on page {page_count + 1}")
+ break
+
+ current_url = self._find_next_page(parser, page_count)
+ if not current_url:
+ logger.info("No more pages found")
+ break
+
+ page_count += 1
+
+ if self.delay_between_requests > 0:
+ time.sleep(self.delay_between_requests)
+
+ except Exception as e:
+ logger.error(f"Error processing {current_url}: {e}")
+ break
+
+ def _find_next_page(self, parser: FeedParser, current_page: int = 0) -> str | None:
+ return None
+
+
+@dataclass
+class LinkFetcher(ArchiveFetcher):
+ per_page: int = 10
+
+ def _find_next_page(self, parser: FeedParser, current_page: int = 0):
+ next_page = current_page + 1
+ parsed = urlparse(self.start_url)
+ params = parse_qs(parsed.query)
+ params["offset"] = [str(next_page * self.per_page)]
+ params["limit"] = [str(self.per_page)]
+
+ new_query = urlencode(params, doseq=True)
+ return urlunparse(parsed._replace(query=new_query))
+
+
+@dataclass
+class HTMLArchiveFetcher(ArchiveFetcher):
+ next_page_selectors: list[str] = field(
+ default_factory=lambda: [
+ 'a[rel="next"]',
+ ".next a",
+ "a.next",
+ ".pagination .next",
+ ".pager .next",
+ "nav.page a:last-of-type",
+ ".navigation a:last-of-type",
+ ]
+ )
+
+ def _find_next_page(self, parser: FeedParser, current_page: int = 0) -> str | None:
+ if not parser.content:
+ return None
+ soup = BeautifulSoup(parser.content, "html.parser")
+ selectors = ",".join(self.next_page_selectors)
+ return extract_url(soup, selectors, parser.url)
+
+
+def html_parser(**kwargs) -> type[HTMLListParser]:
+ class ConfiguredHTMLListParser(HTMLListParser):
+ def __init__(self, url: str):
+ super().__init__(url)
+ for key, value in kwargs.items():
+ setattr(self, key, value)
+
+ return ConfiguredHTMLListParser
+
+
+@dataclass
+class SubstackArchiveFetcher(LinkFetcher):
+ def __post_init__(self):
+ if "api/v1/archive" not in self.start_url:
+ base_url = get_base_url(self.start_url)
+ self.start_url = f"{base_url}/api/v1/archive"
+
+
+@dataclass
+class ACOUPArchiveFetcher(HTMLArchiveFetcher):
+ def _find_next_page(self, parser: FeedParser, current_page: int = 0) -> str | None:
+ if not parser.content:
+ return None
+ soup = BeautifulSoup(parser.content, "html.parser")
+ urls = reversed([i.attrs.get("href") for i in soup.select(".widget_archive a")])
+ urls = (cast(str, u) for u in urls if u)
+ for url in urls:
+ if url.rstrip("/") == parser.url.rstrip("/"):
+ return next(urls, None)
+
+
+@dataclass
+class HTMLNextUrlArchiveFetcher(HTMLArchiveFetcher):
+ next_url: str = ""
+
+ def __post_init__(self):
+ if not self.next_url:
+ self.next_url = self.start_url
+ if not self.next_url.startswith("http") and not self.next_url.startswith("/"):
+ self.next_url = f"{self.start_url}/{self.next_url}"
+
+ def _find_next_page(self, parser: FeedParser, current_page: int = 0) -> str | None:
+ return f"{self.next_url}/{current_page + 1}"
+
+
+FETCHER_REGISTRY = {
+ r"https://putanumonit.com": (
+ "https://putanumonit.com/full-archive",
+ html_parser(
+ item_selector="article p", title_selector="a strong", url_selector="a"
+ ),
+ ),
+ r"https://danluu.com": DanluuParser,
+ r"https://www.rifters.com": RiftersParser,
+ r"https://rachelbythebay.com": html_parser(
+ item_selector="div.post",
+ url_selector="a",
+ ),
+ r"https://guzey.com": (
+ "https://guzey.com/archive/",
+ html_parser(item_selector="article li"),
+ ),
+ r"https://aphyr.com": html_parser(
+ item_selector="article.post",
+ title_selector="h1",
+ url_selector="h1 a",
+ description_selector=".body",
+ date_selector=".meta time",
+ ),
+ r"https://www.applieddivinitystudies.com": html_parser(
+ item_selector="article.article",
+ title_selector="header.article-header h1",
+ url_selector="header.article-header h1 a",
+ description_selector=".article-entry",
+ date_selector=".article-meta time",
+ ),
+ r"https://www.flyingmachinestudios.com": html_parser(
+ item_selector="#main #articles li",
+ title_selector="header .title",
+ description_selector="p",
+ date_selector="header .date",
+ date_format="%d %B %Y",
+ ),
+ r"https://slimemoldtimemold.com": html_parser(
+ item_selector="article .wp-block-list li", title_selector="a"
+ ),
+ r"https://www.paulgraham.com": (
+ "https://www.paulgraham.com/articles.html",
+ html_parser(item_selector="img + font"),
+ ),
+ r"https://slatestarcodex.com": (
+ "https://slatestarcodex.com/archives/",
+ html_parser(item_selector="#sya_container li"),
+ ),
+ r"https://mcfunley.com": (
+ "https://mcfunley.com/writing",
+ html_parser(item_selector="article", title_selector="h6"),
+ ),
+ r"https://www.bitsaboutmoney.com": HTMLArchiveFetcher(
+ html_parser(
+ item_selector="article",
+ title_selector="h1",
+ description_selector="p",
+ date_selector="time",
+ ),
+ "https://www.bitsaboutmoney.com/archive/",
+ next_page_selectors=["nav.pagination a.older-posts"],
+ ),
+ r"https://acoup.blog": ACOUPArchiveFetcher(
+ html_parser(
+ item_selector="article",
+ title_selector="a",
+ description_selector=".entry-content",
+ date_selector=".published-on time",
+ ),
+ "https://acoup.blog/2019/05/",
+ ),
+ r"https://www.theredhandfiles.com": html_parser(
+ item_selector="article", title_selector="h3", description_selector="h2"
+ ),
+}
+
+
+def get_archive_fetcher(url: str) -> ArchiveFetcher | None:
+ for pattern, fetcher in FETCHER_REGISTRY.items():
+ if re.search(pattern, url.rstrip("/")):
+ if isinstance(fetcher, ArchiveFetcher):
+ return fetcher
+ elif isinstance(fetcher, tuple):
+ base_url, html_fetcher = fetcher
+ return HTMLArchiveFetcher(html_fetcher, base_url)
+ else:
+ return HTMLArchiveFetcher(fetcher, url)
+
+ html = fetch_html(url)
+ soup = BeautifulSoup(html, "html.parser")
+ if is_substack(soup):
+ return SubstackArchiveFetcher(SubstackAPIParser, url)
+
+
+feeds = [
+ "https://archive.ph/o/IQUoT/https://www.bloomberg.com/opinion/authors/ARbTQlRLRjE/matthew-s-levine",
+ "https://www.rifters.com/crawl/",
+ "https://rachelbythebay.com/w/",
+ "https://danluu.com/",
+ "https://guzey.com",
+ "https://aphyr.com/",
+ "https://www.applieddivinitystudies.com/",
+ "https://www.imightbewrong.org/",
+ "https://www.kvetch.au/",
+ "https://www.overcomingbias.com/",
+ "https://samkriss.substack.com/",
+ "https://www.richardhanania.com/",
+ "https://skunkledger.substack.com/",
+ "https://taipology.substack.com/",
+ "https://putanumonit.com/",
+ "https://www.flyingmachinestudios.com/",
+ "https://www.theintrinsicperspective.com/",
+ "https://www.strangeloopcanon.com/",
+ "https://slimemoldtimemold.com/",
+ "https://zeroinputagriculture.substack.com/",
+ "https://nayafia.substack.com",
+ "https://www.paulgraham.com/articles.html",
+ "https://mcfunley.com/writing",
+ "https://www.bitsaboutmoney.com/",
+ "https://akarlin.com",
+ "https://www.exurbe.com/",
+ "https://acoup.blog/",
+ "https://www.theredhandfiles.com/",
+ "https://karlin.blog/",
+ "https://slatestarcodex.com/",
+]
diff --git a/src/memory/common/parsers/blogs.py b/src/memory/common/parsers/blogs.py
index 1fb0ad4..021e718 100644
--- a/src/memory/common/parsers/blogs.py
+++ b/src/memory/common/parsers/blogs.py
@@ -13,6 +13,9 @@ from memory.common.parsers.html import (
extract_title,
extract_date,
fetch_html,
+ is_wordpress,
+ is_substack,
+ is_bloomberg,
)
@@ -587,24 +590,13 @@ def get_parser_for_url(url: str, html: str) -> BaseHTMLParser:
return parser_class(url)
soup = BeautifulSoup(html, "html.parser")
- body_select = "body"
- # Check if this is an archived page
- if contents := soup.select_one("#CONTENT .html"):
- body_select = ".body"
- soup = contents
-
- if soup.select_one(f"{body_select} .wp-singular"):
+ if is_wordpress(soup):
return WordPressParser(url)
- if any(
- "https://substackcdn.com" == a.attrs.get("href") # type: ignore
- for a in soup.find_all("link", {"rel": "preconnect"})
- if hasattr(a, "attrs") # type: ignore
- ):
+ if is_substack(soup):
return SubstackParser(url)
- urls = [a.attrs.get("href") for a in soup.select(f"{body_select} a")] # type: ignore
- if any(u.endswith("https://www.bloomberg.com/company/") for u in urls[:5] if u): # type: ignore
+ if is_bloomberg(soup):
return BloombergParser(url)
return BaseHTMLParser(url)
@@ -628,11 +620,11 @@ def parse_webpage(url: str) -> Article:
feeds = [
"https://archive.ph/o/IQUoT/https://www.bloomberg.com/opinion/authors/ARbTQlRLRjE/matthew-s-levine",
"https://www.rifters.com/crawl/",
- "https://rachelbythebay.com/w/atom.xml",
+ "https://rachelbythebay.com/w/",
"https://danluu.com/",
- "https://guzey.com/archive",
- "https://aphyr.com/posts.atom",
- "https://www.applieddivinitystudies.com/atom.xml",
+ "https://guzey.come",
+ "https://aphyr.com/",
+ "https://www.applieddivinitystudies.com/",
"https://www.imightbewrong.org/",
"https://www.kvetch.au/",
"https://www.overcomingbias.com/",
@@ -649,9 +641,10 @@ feeds = [
"https://nayafia.substack.com",
"https://www.paulgraham.com/articles.html",
"https://mcfunley.com/writing",
- "https://www.bitsaboutmoney.com/archive/",
- "https://akarlin.com/archive/",
+ "https://www.bitsaboutmoney.com/",
+ "https://akarlin.com",
"https://www.exurbe.com/",
"https://acoup.blog/",
"https://www.theredhandfiles.com/",
+ "https://karlin.blog/",
]
diff --git a/src/memory/common/parsers/feeds.py b/src/memory/common/parsers/feeds.py
index 8087272..6ea25a4 100644
--- a/src/memory/common/parsers/feeds.py
+++ b/src/memory/common/parsers/feeds.py
@@ -1,5 +1,6 @@
from datetime import datetime
import logging
+import json
import re
from dataclasses import dataclass, field
from typing import Any, Generator, Sequence, cast
@@ -20,6 +21,20 @@ from memory.common.parsers.html import (
logger = logging.getLogger(__name__)
+ObjectPath = list[str | int]
+
+
+def select_in(data: Any, path: ObjectPath) -> Any:
+ if not path:
+ return data
+
+ key, *rest = path
+ try:
+ return select_in(data[key], rest)
+ except (KeyError, TypeError, IndexError):
+ return None
+
+
@dataclass
class FeedItem:
"""Represents a single item from a feed."""
@@ -62,7 +77,7 @@ class FeedParser:
)
def valid_item(self, item: FeedItem) -> bool:
- return True
+ return bool(item.url)
def parse_feed(self) -> Generator[FeedItem, None, None]:
"""Parse feed content and return list of feed items."""
@@ -100,6 +115,46 @@ class FeedParser:
return {}
+class JSONParser(FeedParser):
+ title_path: ObjectPath = ["title"]
+ url_path: ObjectPath = ["url"]
+ description_path: ObjectPath = ["description"]
+ date_path: ObjectPath = ["date"]
+ author_path: ObjectPath = ["author"]
+ guid_path: ObjectPath = ["guid"]
+ metadata_path: ObjectPath = ["metadata"]
+
+ def fetch_items(self) -> Sequence[Any]:
+ if not self.content:
+ self.content = cast(str, fetch_html(self.url))
+ try:
+ return json.loads(self.content)
+ except json.JSONDecodeError as e:
+ logger.error(f"Error parsing JSON: {e}")
+ return []
+
+ def extract_title(self, entry: Any) -> str:
+ return select_in(entry, self.title_path)
+
+ def extract_url(self, entry: Any) -> str:
+ return select_in(entry, self.url_path)
+
+ def extract_description(self, entry: Any) -> str:
+ return select_in(entry, self.description_path)
+
+ def extract_date(self, entry: Any) -> datetime:
+ return select_in(entry, self.date_path)
+
+ def extract_author(self, entry: Any) -> str:
+ return select_in(entry, self.author_path)
+
+ def extract_guid(self, entry: Any) -> str:
+ return select_in(entry, self.guid_path)
+
+ def extract_metadata(self, entry: Any) -> dict[str, Any]:
+ return select_in(entry, self.metadata_path)
+
+
class RSSAtomParser(FeedParser):
"""Parser for RSS and Atom feeds using feedparser."""
@@ -237,8 +292,14 @@ class HTMLListParser(FeedParser):
return extract_date(entry, self.date_selector, self.date_format)
+class SubstackAPIParser(JSONParser):
+ url_path = ["canonical_url"]
+ author_path = ["publishedBylines", 0, "name"]
+ date_path = ["post_date"]
+
+
class DanluuParser(HTMLListParser):
- skip_patterns = [r"^https://danluu.com/#"]
+ skip_patterns = DEFAULT_SKIP_PATTERNS + [r"^https://danluu\.com/?#"]
def valid_item(self, item: FeedItem) -> bool:
return item.url.startswith(self.base_url)
@@ -351,6 +412,13 @@ class RedHandFilesParser(HTMLListParser):
return ""
+class RiftersParser(HTMLListParser):
+ item_selector = "#content .post"
+ title_selector = "h2 a"
+ url_selector = "h2 a"
+ description_selector = ".entry-content"
+
+
class BloombergAuthorParser(HTMLListParser):
item_selector = "section#author_page article"
url_selector = "a[href]"
@@ -393,7 +461,7 @@ def is_rss_feed(content: str) -> bool:
)
-def extract_url(element: Tag, base_url: str) -> str | None:
+def clean_url(element: Tag, base_url: str) -> str | None:
if not (href := element.get("href")):
return None
@@ -409,14 +477,14 @@ def find_feed_link(url: str, soup: BeautifulSoup) -> str | None:
for link in links:
if not isinstance(link, Tag):
continue
- if not (link_url := extract_url(link, url)):
+ if not (link_url := clean_url(link, url)):
continue
if link_url.rstrip("/") != url.rstrip("/"):
return link_url
return None
-PARSER_REGISTRY = {
+FEED_REGISTRY = {
r"https://danluu.com": DanluuParser,
r"https://guzey.com/archive": GuzeyParser,
r"https://www.paulgraham.com/articles": PaulGrahamParser,
@@ -427,7 +495,7 @@ PARSER_REGISTRY = {
def get_feed_parser(url: str, check_from: datetime | None = None) -> FeedParser | None:
- for pattern, parser_class in PARSER_REGISTRY.items():
+ for pattern, parser_class in FEED_REGISTRY.items():
if re.search(pattern, url.rstrip("/")):
return parser_class(url=url, since=check_from)
diff --git a/src/memory/common/parsers/html.py b/src/memory/common/parsers/html.py
index 1842456..5a46533 100644
--- a/src/memory/common/parsers/html.py
+++ b/src/memory/common/parsers/html.py
@@ -110,7 +110,14 @@ def extract_date(
datetime_attr = element.get("datetime")
if datetime_attr:
- for format in ["%Y-%m-%dT%H:%M:%S", "%Y-%m-%d", date_format]:
+ for format in [
+ "%Y-%m-%dT%H:%M:%S.%fZ",
+ "%Y-%m-%dT%H:%M:%S%z",
+ "%Y-%m-%dT%H:%M:%S.%f",
+ "%Y-%m-%dT%H:%M:%S",
+ "%Y-%m-%d",
+ date_format,
+ ]:
if date := parse_date(str(datetime_attr), format):
return date
@@ -277,6 +284,47 @@ def extract_metadata(soup: BeautifulSoup) -> dict[str, Any]:
return metadata
+def extract_url(soup: BeautifulSoup, selectors: str, base_url: str = "") -> str | None:
+ for selector in selectors.split(","):
+ next_link = soup.select_one(selector)
+ if not (next_link and isinstance(next_link, Tag)):
+ continue
+
+ if not (href := next_link.get("href")):
+ continue
+
+ return to_absolute_url(str(href), base_url)
+
+ return None
+
+
+def is_substack(soup: BeautifulSoup | Tag) -> bool:
+ return any(
+ "https://substackcdn.com" == a.attrs.get("href") # type: ignore
+ for a in soup.find_all("link", {"rel": "preconnect"})
+ if hasattr(a, "attrs") # type: ignore
+ )
+
+
+def is_wordpress(soup: BeautifulSoup | Tag) -> bool:
+ body_select = "body"
+ # Check if this is an archived page
+ if contents := soup.select_one("#CONTENT .html"):
+ body_select = "#CONTENT .html"
+ soup = contents
+ return bool(soup.select_one(f"{body_select} .wp-singular"))
+
+
+def is_bloomberg(soup: BeautifulSoup | Tag) -> bool:
+ body_select = "body"
+ # Check if this is an archived page
+ if contents := soup.select_one("#CONTENT .html"):
+ body_select = "#CONTENT .html"
+ soup = contents
+ urls = [a.attrs.get("href") for a in soup.select(f"{body_select} a")] # type: ignore
+ return any(u.endswith("https://www.bloomberg.com/company/") for u in urls[:5] if u) # type: ignore
+
+
class BaseHTMLParser:
"""Base class for parsing HTML content from websites."""
diff --git a/tests/memory/common/parsers/test_archives.py b/tests/memory/common/parsers/test_archives.py
index b5df1b1..e7284dd 100644
--- a/tests/memory/common/parsers/test_archives.py
+++ b/tests/memory/common/parsers/test_archives.py
@@ -1,176 +1,512 @@
+from unittest.mock import patch
+from urllib.parse import urlparse, parse_qs
+
import pytest
-from unittest.mock import Mock, patch
-from bs4 import BeautifulSoup
from memory.common.parsers.archives import (
- ArchiveParser,
- WordPressArchiveParser,
- SubstackArchiveParser,
- get_archive_parser,
+ ArchiveFetcher,
+ LinkFetcher,
+ HTMLArchiveFetcher,
+ SubstackArchiveFetcher,
+ ACOUPArchiveFetcher,
+ HTMLNextUrlArchiveFetcher,
+ html_parser,
+ get_archive_fetcher,
+ FETCHER_REGISTRY,
+)
+from memory.common.parsers.feeds import (
+ FeedItem,
+ FeedParser,
+ HTMLListParser,
+ DanluuParser,
+ SubstackAPIParser,
)
-class TestArchiveParser:
- def test_init(self):
- parser = ArchiveParser(url="https://example.com")
- assert parser.url == "https://example.com"
- assert parser._visited_urls == set()
- assert parser._all_items == []
- assert parser.max_pages == 100
- assert parser.delay_between_requests == 1.0
+class MockParser(FeedParser):
+ def __init__(
+ self, url: str, items: list[FeedItem] | None = None, content: str = ""
+ ):
+ super().__init__(url)
+ self.items = items or []
+ self.content = content
- def test_extract_items_from_page(self):
- html = """
-
- """
- soup = BeautifulSoup(html, "html.parser")
- parser = ArchiveParser(url="https://example.com")
-
- items = parser._extract_items_from_page(soup)
- assert len(items) == 2 # Duplicates should be filtered out
-
- def test_find_next_page_url_with_selector(self):
- html = ''
- soup = BeautifulSoup(html, "html.parser")
- parser = ArchiveParser(url="https://example.com")
- parser.next_page_selector = ".next"
-
- next_url = parser._find_next_page_url(soup, "https://example.com/page/1")
- assert next_url == "https://example.com/page/2"
-
- def test_find_next_page_url_heuristic(self):
- html = ''
- soup = BeautifulSoup(html, "html.parser")
- parser = ArchiveParser(url="https://example.com")
-
- next_url = parser._find_next_page_url(soup, "https://example.com/page/1")
- assert next_url == "https://example.com/page/2"
-
- def test_find_next_page_url_contains_text(self):
- html = ''
- soup = BeautifulSoup(html, "html.parser")
- parser = ArchiveParser(url="https://example.com")
-
- next_url = parser._find_next_page_heuristic(soup)
- assert next_url == "https://example.com/page/2"
-
- def test_find_next_numeric_page(self):
- parser = ArchiveParser(url="https://example.com")
- parser.page_url_pattern = "/page/{page}"
-
- # Test with existing page number
- next_url = parser._find_next_numeric_page("https://example.com/page/3")
- assert next_url == "https://example.com/page/4"
-
- # Test without page number (assume page 1)
- next_url = parser._find_next_numeric_page("https://example.com/archive")
- assert next_url == "https://example.com/archive/page/2"
-
- @patch("memory.common.parsers.archives.fetch_html")
- @patch("time.sleep")
- def test_fetch_items_pagination(self, mock_sleep, mock_fetch):
- # Mock HTML for two pages
- page1_html = """
-
- """
- page2_html = """
-
- """
-
- mock_fetch.side_effect = [page1_html, page2_html]
-
- parser = ArchiveParser(url="https://example.com/page/1")
- parser.delay_between_requests = 0.1 # Speed up test
-
- items = parser.fetch_items()
-
- assert len(items) == 4
- assert mock_fetch.call_count == 2
- assert mock_sleep.call_count == 1 # One delay between requests
-
- @patch("memory.common.parsers.archives.fetch_html")
- def test_fetch_items_stops_at_max_pages(self, mock_fetch):
- # Mock HTML that always has a next page
- html_with_next = """
-
- """
-
- mock_fetch.return_value = html_with_next
-
- parser = ArchiveParser(url="https://example.com/page/1")
- parser.max_pages = 3
- parser.delay_between_requests = 0 # No delay for test
-
- items = parser.fetch_items()
-
- assert mock_fetch.call_count == 3 # Should stop at max_pages
-
- @patch("memory.common.parsers.archives.fetch_html")
- def test_fetch_items_handles_duplicate_urls(self, mock_fetch):
- # Mock HTML that creates a cycle
- page1_html = """
-
- """
- page2_html = """
-
- """
-
- mock_fetch.side_effect = [page1_html, page2_html]
-
- parser = ArchiveParser(url="https://example.com/page/1")
- parser.delay_between_requests = 0
-
- items = parser.fetch_items()
-
- assert len(items) == 2
- assert mock_fetch.call_count == 2 # Should stop when it hits visited URL
+ def parse_feed(self):
+ return self.items
-class TestWordPressArchiveParser:
- def test_selectors(self):
- parser = WordPressArchiveParser(url="https://example.wordpress.com")
- assert parser.item_selector == "article, .post"
- assert parser.next_page_selector == '.nav-previous a, .next a, a[rel="next"]'
- assert parser.title_selector == ".entry-title a, h1 a, h2 a"
-
-
-class TestSubstackArchiveParser:
- def test_selectors(self):
- parser = SubstackArchiveParser(url="https://example.substack.com")
- assert parser.item_selector == ".post-preview, .post"
- assert parser.next_page_selector == ".pagination .next"
-
-
-class TestGetArchiveParser:
- @pytest.mark.parametrize(
- "url,expected_class",
- [
- ("https://example.wordpress.com/archive", WordPressArchiveParser),
- ("https://example.substack.com/archive", SubstackArchiveParser),
- ("https://example.com/archive", ArchiveParser), # Default
- ],
+def test_archive_fetcher_make_parser():
+ fetcher = ArchiveFetcher(
+ parser_class=MockParser,
+ start_url="https://example.com",
+ parser_kwargs={"custom_attr": "value"},
)
- def test_get_archive_parser(self, url, expected_class):
- parser = get_archive_parser(url)
- assert isinstance(parser, expected_class)
- assert parser.url == url
+
+ parser = fetcher.make_parser("https://example.com/page1")
+
+ assert isinstance(parser, MockParser)
+ assert parser.url == "https://example.com/page1"
+ assert getattr(parser, "custom_attr") == "value"
+
+
+def test_archive_fetcher_find_next_page_base():
+ fetcher = ArchiveFetcher(MockParser, "https://example.com")
+ parser = MockParser("https://example.com")
+
+ assert fetcher._find_next_page(parser, 0) is None
+
+
+@patch("memory.common.parsers.archives.time.sleep")
+def test_archive_fetcher_fetch_all_items_single_page(mock_sleep):
+ items = [
+ FeedItem(title="Item 1", url="https://example.com/1"),
+ FeedItem(title="Item 2", url="https://example.com/2"),
+ ]
+
+ fetcher = ArchiveFetcher(
+ parser_class=MockParser,
+ start_url="https://example.com",
+ delay_between_requests=0.5,
+ )
+
+ with patch.object(fetcher, "make_parser") as mock_make_parser:
+ mock_parser = MockParser("https://example.com", items)
+ mock_make_parser.return_value = mock_parser
+
+ result = list(fetcher.fetch_all_items())
+
+ assert result == items
+ mock_make_parser.assert_called_once_with("https://example.com")
+ mock_sleep.assert_not_called() # No delay for single page
+
+
+@patch("memory.common.parsers.archives.time.sleep")
+def test_archive_fetcher_fetch_all_items_multiple_pages(mock_sleep):
+ page1_items = [FeedItem(title="Item 1", url="https://example.com/1")]
+ page2_items = [FeedItem(title="Item 2", url="https://example.com/2")]
+
+ class TestFetcher(ArchiveFetcher):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.call_count = 0
+
+ def _find_next_page(self, parser, current_page=0):
+ self.call_count += 1
+ if self.call_count == 1:
+ return "https://example.com/page2"
+ return None
+
+ fetcher = TestFetcher(
+ parser_class=MockParser,
+ start_url="https://example.com",
+ delay_between_requests=0.1,
+ )
+
+ with patch.object(fetcher, "make_parser") as mock_make_parser:
+ mock_make_parser.side_effect = [
+ MockParser("https://example.com", page1_items),
+ MockParser("https://example.com/page2", page2_items),
+ ]
+
+ result = list(fetcher.fetch_all_items())
+
+ assert result == page1_items + page2_items
+ assert mock_make_parser.call_count == 2
+ mock_sleep.assert_called_once_with(0.1)
+
+
+def test_archive_fetcher_fetch_all_items_max_pages():
+ class TestFetcher(ArchiveFetcher):
+ def _find_next_page(self, parser, current_page=0):
+ return f"https://example.com/page{current_page + 2}"
+
+ fetcher = TestFetcher(
+ parser_class=MockParser,
+ start_url="https://example.com",
+ max_pages=2,
+ delay_between_requests=0,
+ )
+
+ items = [FeedItem(title="Item", url="https://example.com/item")]
+
+ with patch.object(fetcher, "make_parser") as mock_make_parser:
+ mock_make_parser.return_value = MockParser("https://example.com", items)
+
+ result = list(fetcher.fetch_all_items())
+
+ assert len(result) == 2 # 2 pages * 1 item per page
+ assert mock_make_parser.call_count == 2
+
+
+def test_archive_fetcher_fetch_all_items_visited_url():
+ class TestFetcher(ArchiveFetcher):
+ def _find_next_page(self, parser, current_page=0):
+ return "https://example.com" # Return same URL to trigger visited check
+
+ fetcher = TestFetcher(MockParser, "https://example.com", delay_between_requests=0)
+ items = [FeedItem(title="Item", url="https://example.com/item")]
+
+ with patch.object(fetcher, "make_parser") as mock_make_parser:
+ mock_make_parser.return_value = MockParser("https://example.com", items)
+
+ result = list(fetcher.fetch_all_items())
+
+ assert len(result) == 1 # Only first page processed
+ mock_make_parser.assert_called_once()
+
+
+def test_archive_fetcher_fetch_all_items_no_items():
+ fetcher = ArchiveFetcher(
+ MockParser, "https://example.com", delay_between_requests=0
+ )
+
+ with patch.object(fetcher, "make_parser") as mock_make_parser:
+ mock_make_parser.return_value = MockParser("https://example.com", [])
+
+ result = list(fetcher.fetch_all_items())
+
+ assert result == []
+ mock_make_parser.assert_called_once()
+
+
+def test_archive_fetcher_fetch_all_items_exception():
+ fetcher = ArchiveFetcher(
+ MockParser, "https://example.com", delay_between_requests=0
+ )
+
+ with patch.object(fetcher, "make_parser") as mock_make_parser:
+ mock_make_parser.side_effect = Exception("Network error")
+
+ result = list(fetcher.fetch_all_items())
+
+ assert result == []
+
+
+@pytest.mark.parametrize(
+ "start_url, per_page, current_page, expected_params",
+ [
+ ("https://example.com", 10, 0, {"offset": ["10"], "limit": ["10"]}),
+ (
+ "https://example.com?existing=value",
+ 20,
+ 1,
+ {"existing": ["value"], "offset": ["40"], "limit": ["20"]},
+ ),
+ (
+ "https://example.com?offset=0&limit=5",
+ 15,
+ 2,
+ {"offset": ["45"], "limit": ["15"]},
+ ),
+ ],
+)
+def test_link_fetcher_find_next_page(
+ start_url, per_page, current_page, expected_params
+):
+ fetcher = LinkFetcher(MockParser, start_url, per_page=per_page)
+ parser = MockParser(start_url)
+
+ next_url = fetcher._find_next_page(parser, current_page)
+
+ assert next_url is not None
+ parsed = urlparse(next_url)
+ params = parse_qs(parsed.query)
+
+ for key, value in expected_params.items():
+ assert params[key] == value
+
+
+@pytest.mark.parametrize(
+ "html, selectors, expected_url",
+ [
+ (
+ 'Next ',
+ ['a[rel="next"]'],
+ "https://example.com/page2",
+ ),
+ (
+ '',
+ [".next a"],
+ "https://example.com/page2",
+ ),
+ (
+ 'Next ',
+ ["a.next"],
+ "https://example.com/page2",
+ ),
+ (
+ '',
+ [".pagination .next"],
+ None, # This won't match because it's looking for .pagination .next directly
+ ),
+ (
+ '',
+ [".pagination.next"],
+ None, # This selector isn't in default list
+ ),
+ (
+ '1 2 ',
+ ["nav.page a:last-of-type"],
+ "https://example.com/page2",
+ ),
+ ("No next link
", ['a[rel="next"]'], None),
+ ],
+)
+def test_html_archive_fetcher_find_next_page(html, selectors, expected_url):
+ fetcher = HTMLArchiveFetcher(
+ MockParser, "https://example.com", next_page_selectors=selectors
+ )
+ parser = MockParser("https://example.com", content=html)
+
+ with patch("memory.common.parsers.archives.extract_url") as mock_extract:
+ mock_extract.return_value = expected_url
+
+ result = fetcher._find_next_page(parser)
+
+ if expected_url:
+ mock_extract.assert_called_once()
+ assert result == expected_url
+ else:
+ # extract_url might still be called but return None
+ assert result is None
+
+
+def test_html_archive_fetcher_find_next_page_no_content():
+ fetcher = HTMLArchiveFetcher(MockParser, "https://example.com")
+ parser = MockParser("https://example.com", content="")
+
+ result = fetcher._find_next_page(parser)
+
+ assert result is None
+
+
+def test_html_parser_factory():
+ CustomParser = html_parser(
+ item_selector="article", title_selector="h1", custom_attr="value"
+ )
+
+ parser = CustomParser("https://example.com")
+
+ assert isinstance(parser, HTMLListParser)
+ assert parser.item_selector == "article"
+ assert parser.title_selector == "h1"
+ assert getattr(parser, "custom_attr") == "value"
+
+
+@pytest.mark.parametrize(
+ "start_url, expected_api_url",
+ [
+ ("https://example.substack.com", "https://example.substack.com/api/v1/archive"),
+ (
+ "https://example.substack.com/posts",
+ "https://example.substack.com/api/v1/archive",
+ ),
+ (
+ "https://example.substack.com/api/v1/archive",
+ "https://example.substack.com/api/v1/archive",
+ ),
+ ],
+)
+def test_substack_archive_fetcher_post_init(start_url, expected_api_url):
+ with patch("memory.common.parsers.archives.get_base_url") as mock_get_base:
+ mock_get_base.return_value = "https://example.substack.com"
+
+ fetcher = SubstackArchiveFetcher(SubstackAPIParser, start_url)
+
+ assert fetcher.start_url == expected_api_url
+
+
+def test_acoup_archive_fetcher_find_next_page():
+ html = """
+
+ """
+
+ fetcher = ACOUPArchiveFetcher(MockParser, "https://acoup.blog/2019/05/")
+ parser = MockParser("https://acoup.blog/2019/05/", content=html)
+
+ result = fetcher._find_next_page(parser)
+
+ assert result == "https://acoup.blog/2019/04/"
+
+
+def test_acoup_archive_fetcher_find_next_page_no_match():
+ html = """
+
+ """
+
+ fetcher = ACOUPArchiveFetcher(MockParser, "https://acoup.blog/2019/05/")
+ parser = MockParser("https://acoup.blog/2019/05/", content=html)
+
+ result = fetcher._find_next_page(parser)
+
+ assert result is None
+
+
+def test_acoup_archive_fetcher_find_next_page_no_content():
+ fetcher = ACOUPArchiveFetcher(MockParser, "https://acoup.blog/2019/05/")
+ parser = MockParser("https://acoup.blog/2019/05/", content="")
+
+ result = fetcher._find_next_page(parser)
+
+ assert result is None
+
+
+@pytest.mark.parametrize(
+ "start_url, next_url, expected_next_url",
+ [
+ (
+ "https://example.com",
+ "",
+ "https://example.com",
+ ), # Empty next_url defaults to start_url
+ (
+ "https://example.com",
+ "https://other.com/archive",
+ "https://other.com/archive", # Full URL is preserved
+ ),
+ (
+ "https://example.com",
+ "/archive",
+ "/archive",
+ ), # Absolute path is preserved
+ (
+ "https://example.com",
+ "archive",
+ "https://example.com/archive",
+ ), # Relative path gets prepended
+ ],
+)
+def test_html_next_url_archive_fetcher_post_init(
+ start_url, next_url, expected_next_url
+):
+ fetcher = HTMLNextUrlArchiveFetcher(MockParser, start_url, next_url=next_url)
+
+ assert fetcher.next_url == expected_next_url
+
+
+def test_html_next_url_archive_fetcher_find_next_page():
+ fetcher = HTMLNextUrlArchiveFetcher(
+ MockParser, "https://example.com", next_url="https://example.com/archive"
+ )
+ parser = MockParser("https://example.com")
+
+ result = fetcher._find_next_page(parser, 2)
+
+ assert result == "https://example.com/archive/3"
+
+
+@pytest.mark.parametrize(
+ "url, expected_fetcher_type",
+ [
+ ("https://danluu.com", HTMLArchiveFetcher),
+ ("https://www.rifters.com", HTMLArchiveFetcher),
+ ("https://putanumonit.com", HTMLArchiveFetcher),
+ ("https://acoup.blog", ACOUPArchiveFetcher),
+ ("https://unknown.com", None),
+ ],
+)
+def test_get_archive_fetcher_registry_matches(url, expected_fetcher_type):
+ with patch("memory.common.parsers.archives.fetch_html") as mock_fetch:
+ mock_fetch.return_value = "Not substack"
+
+ with patch("memory.common.parsers.archives.is_substack") as mock_is_substack:
+ mock_is_substack.return_value = False
+
+ fetcher = get_archive_fetcher(url)
+
+ if expected_fetcher_type:
+ assert isinstance(fetcher, expected_fetcher_type)
+ else:
+ assert fetcher is None
+
+
+def test_get_archive_fetcher_tuple_registry():
+ url = "https://putanumonit.com"
+
+ with patch("memory.common.parsers.archives.fetch_html") as mock_fetch:
+ mock_fetch.return_value = "Not substack"
+
+ fetcher = get_archive_fetcher(url)
+
+ assert isinstance(fetcher, HTMLArchiveFetcher)
+ assert fetcher.start_url == "https://putanumonit.com/full-archive"
+
+
+def test_get_archive_fetcher_direct_parser_registry():
+ url = "https://danluu.com"
+
+ with patch("memory.common.parsers.archives.fetch_html") as mock_fetch:
+ mock_fetch.return_value = "Not substack"
+
+ fetcher = get_archive_fetcher(url)
+
+ assert isinstance(fetcher, HTMLArchiveFetcher)
+ assert fetcher.parser_class == DanluuParser
+ assert fetcher.start_url == url
+
+
+def test_get_archive_fetcher_substack():
+ url = "https://example.substack.com"
+
+ with patch("memory.common.parsers.archives.fetch_html") as mock_fetch:
+ mock_fetch.return_value = "Substack content"
+
+ with patch("memory.common.parsers.archives.is_substack") as mock_is_substack:
+ mock_is_substack.return_value = True
+
+ fetcher = get_archive_fetcher(url)
+
+ assert isinstance(fetcher, SubstackArchiveFetcher)
+ assert fetcher.parser_class == SubstackAPIParser
+
+
+def test_get_archive_fetcher_no_match():
+ url = "https://unknown.com"
+
+ with patch("memory.common.parsers.archives.fetch_html") as mock_fetch:
+ mock_fetch.return_value = "Regular website"
+
+ with patch("memory.common.parsers.archives.is_substack") as mock_is_substack:
+ mock_is_substack.return_value = False
+
+ fetcher = get_archive_fetcher(url)
+
+ assert fetcher is None
+
+
+def test_fetcher_registry_structure():
+ """Test that FETCHER_REGISTRY has expected structure."""
+ assert isinstance(FETCHER_REGISTRY, dict)
+
+ for pattern, fetcher in FETCHER_REGISTRY.items():
+ assert isinstance(pattern, str)
+ assert (
+ isinstance(fetcher, type)
+ and issubclass(fetcher, FeedParser)
+ or isinstance(fetcher, tuple)
+ or isinstance(fetcher, ArchiveFetcher)
+ )
+
+
+@pytest.mark.parametrize(
+ "pattern, test_url, should_match",
+ [
+ (r"https://danluu.com", "https://danluu.com", True),
+ (r"https://danluu.com", "https://danluu.com/", True),
+ (r"https://danluu.com", "https://other.com", False),
+ (r"https://www.rifters.com", "https://www.rifters.com/crawl", True),
+ (r"https://putanumonit.com", "https://putanumonit.com/archive", True),
+ ],
+)
+def test_registry_pattern_matching(pattern, test_url, should_match):
+ import re
+
+ match = re.search(pattern, test_url.rstrip("/"))
+ assert bool(match) == should_match
diff --git a/tests/memory/common/parsers/test_feeds.py b/tests/memory/common/parsers/test_feeds.py
index b1d30b6..b8cf541 100644
--- a/tests/memory/common/parsers/test_feeds.py
+++ b/tests/memory/common/parsers/test_feeds.py
@@ -1,10 +1,10 @@
from datetime import datetime
from unittest.mock import MagicMock, patch
-from typing import Any, cast
+from typing import cast
+import json
import pytest
from bs4 import BeautifulSoup, Tag
-import requests
from memory.common.parsers.feeds import (
FeedItem,
@@ -17,15 +17,160 @@ from memory.common.parsers.feeds import (
NadiaXyzParser,
RedHandFilesParser,
BloombergAuthorParser,
+ JSONParser,
+ SubstackAPIParser,
+ select_in,
+ clean_url,
is_rss_feed,
- extract_url,
find_feed_link,
get_feed_parser,
- DEFAULT_SKIP_PATTERNS,
- PARSER_REGISTRY,
)
+@pytest.mark.parametrize(
+ "data, path, expected",
+ [
+ # Basic dictionary access
+ ({"key": "value"}, ["key"], "value"),
+ ({"nested": {"key": "value"}}, ["nested", "key"], "value"),
+ # List access
+ (["a", "b", "c"], [1], "b"),
+ ([{"key": "value"}], [0, "key"], "value"),
+ # Mixed access
+ (
+ {"items": [{"name": "first"}, {"name": "second"}]},
+ ["items", 1, "name"],
+ "second",
+ ),
+ # Empty path returns original data
+ ({"key": "value"}, [], {"key": "value"}),
+ # Missing keys return None
+ ({"key": "value"}, ["missing"], None),
+ ({"nested": {}}, ["nested", "missing"], None),
+ # Index out of bounds returns None
+ (["a", "b"], [5], None),
+ # Type errors return None
+ ("string", ["key"], None),
+ (123, [0], None),
+ (None, ["key"], None),
+ # Deep nesting
+ ({"a": {"b": {"c": {"d": "deep"}}}}, ["a", "b", "c", "d"], "deep"),
+ ],
+)
+def test_select_in(data, path, expected):
+ assert select_in(data, path) == expected
+
+
+@patch("memory.common.parsers.feeds.fetch_html")
+def test_json_parser_fetch_items_with_content(mock_fetch_html):
+ content = json.dumps(
+ [
+ {"title": "Article 1", "url": "https://example.com/1"},
+ {"title": "Article 2", "url": "https://example.com/2"},
+ ]
+ )
+
+ parser = JSONParser(url="https://example.com/feed.json", content=content)
+ items = parser.fetch_items()
+
+ assert items == [
+ {"title": "Article 1", "url": "https://example.com/1"},
+ {"title": "Article 2", "url": "https://example.com/2"},
+ ]
+ mock_fetch_html.assert_not_called()
+
+
+@patch("memory.common.parsers.feeds.fetch_html")
+def test_json_parser_fetch_items_without_content(mock_fetch_html):
+ content = json.dumps([{"title": "Article", "url": "https://example.com/1"}])
+ mock_fetch_html.return_value = content
+
+ parser = JSONParser(url="https://example.com/feed.json")
+ items = parser.fetch_items()
+
+ assert items == [{"title": "Article", "url": "https://example.com/1"}]
+ mock_fetch_html.assert_called_once_with("https://example.com/feed.json")
+
+
+@patch("memory.common.parsers.feeds.fetch_html")
+def test_json_parser_fetch_items_invalid_json(mock_fetch_html):
+ mock_fetch_html.return_value = "invalid json content"
+
+ parser = JSONParser(url="https://example.com/feed.json")
+ items = parser.fetch_items()
+
+ assert items == []
+
+
+def test_json_parser_extract_methods():
+ parser = JSONParser(url="https://example.com")
+
+ entry = {
+ "title": "Test Title",
+ "url": "https://example.com/article",
+ "description": "Test description",
+ "date": "2023-01-15",
+ "author": "John Doe",
+ "guid": "unique-123",
+ "metadata": {"tags": ["tech", "news"]},
+ }
+
+ assert parser.extract_title(entry) == "Test Title"
+ assert parser.extract_url(entry) == "https://example.com/article"
+ assert parser.extract_description(entry) == "Test description"
+ assert parser.extract_date(entry) == "2023-01-15"
+ assert parser.extract_author(entry) == "John Doe"
+ assert parser.extract_guid(entry) == "unique-123"
+ assert parser.extract_metadata(entry) == {"tags": ["tech", "news"]}
+
+
+def test_json_parser_custom_paths():
+ parser = JSONParser(url="https://example.com")
+ parser.title_path = ["content", "headline"]
+ parser.url_path = ["links", "canonical"]
+ parser.author_path = ["byline", "name"]
+
+ entry = {
+ "content": {"headline": "Custom Title"},
+ "links": {"canonical": "https://example.com/custom"},
+ "byline": {"name": "Jane Smith"},
+ }
+
+ assert parser.extract_title(entry) == "Custom Title"
+ assert parser.extract_url(entry) == "https://example.com/custom"
+ assert parser.extract_author(entry) == "Jane Smith"
+
+
+def test_json_parser_missing_fields():
+ parser = JSONParser(url="https://example.com")
+
+ entry = {} # Empty entry
+
+ assert parser.extract_title(entry) is None
+ assert parser.extract_url(entry) is None
+ assert parser.extract_description(entry) is None
+ assert parser.extract_date(entry) is None
+ assert parser.extract_author(entry) is None
+ assert parser.extract_guid(entry) is None
+ assert parser.extract_metadata(entry) is None
+
+
+def test_json_parser_nested_paths():
+ parser = JSONParser(url="https://example.com")
+ parser.title_path = ["article", "header", "title"]
+ parser.author_path = ["article", "byline", 0, "name"]
+
+ entry = {
+ "article": {
+ "header": {"title": "Nested Title"},
+ "byline": [{"name": "First Author"}, {"name": "Second Author"}],
+ }
+ }
+
+ assert parser.extract_title(entry) == "Nested Title"
+ assert parser.extract_author(entry) == "First Author"
+
+
def test_feed_parser_base_url():
parser = FeedParser(url="https://example.com/path/to/feed")
assert parser.base_url == "https://example.com"
@@ -582,7 +727,7 @@ def test_extract_url_function(html, expected):
element = soup.find("a")
assert element is not None
- url = extract_url(cast(Tag, element), "https://example.com")
+ url = clean_url(cast(Tag, element), "https://example.com")
assert url == expected
@@ -706,33 +851,30 @@ def test_get_feed_parser_with_check_from():
assert parser.since == check_from
-def test_parser_registry_completeness():
- """Ensure PARSER_REGISTRY contains expected parsers."""
- expected_patterns = [
- r"https://danluu.com",
- r"https://guzey.com/archive",
- r"https://www.paulgraham.com/articles",
- r"https://nadia.xyz/posts",
- r"https://www.theredhandfiles.com",
- r"https://archive.ph/.*?/https://www.bloomberg.com/opinion/authors/",
- ]
+def test_substack_api_parser():
+ parser = SubstackAPIParser(url="https://example.substack.com/api/v1/posts")
- assert len(PARSER_REGISTRY) == len(expected_patterns)
- for pattern in expected_patterns:
- assert pattern in PARSER_REGISTRY
+ entry = {
+ "title": "Substack Post",
+ "canonical_url": "https://example.substack.com/p/post-slug",
+ "publishedBylines": [{"name": "Author Name"}],
+ "post_date": "2023-01-15T10:30:00Z",
+ }
+
+ assert parser.extract_title(entry) == "Substack Post"
+ assert parser.extract_url(entry) == "https://example.substack.com/p/post-slug"
+ assert parser.extract_author(entry) == "Author Name"
+ assert parser.extract_date(entry) == "2023-01-15T10:30:00Z"
-def test_default_skip_patterns():
- """Ensure DEFAULT_SKIP_PATTERNS contains expected patterns."""
- expected_patterns = [
- r"^#",
- r"mailto:",
- r"tel:",
- r"javascript:",
- r"\.pdf$",
- r"\.jpg$",
- r"\.png$",
- r"\.gif$",
- ]
+def test_substack_api_parser_missing_bylines():
+ parser = SubstackAPIParser(url="https://example.substack.com/api/v1/posts")
- assert DEFAULT_SKIP_PATTERNS == expected_patterns
+ entry = {
+ "title": "Post Without Author",
+ "canonical_url": "https://example.substack.com/p/post",
+ "publishedBylines": [],
+ "post_date": "2023-01-15T10:30:00Z",
+ }
+
+ assert parser.extract_author(entry) is None
diff --git a/tests/memory/common/parsers/test_html.py b/tests/memory/common/parsers/test_html.py
index dbba9ca..44262e1 100644
--- a/tests/memory/common/parsers/test_html.py
+++ b/tests/memory/common/parsers/test_html.py
@@ -23,7 +23,11 @@ from memory.common.parsers.html import (
extract_meta_by_pattern,
extract_metadata,
extract_title,
+ extract_url,
get_base_url,
+ is_bloomberg,
+ is_substack,
+ is_wordpress,
parse_date,
process_image,
process_images,
@@ -454,7 +458,7 @@ def test_process_images_empty():
None, "https://example.com", pathlib.Path("/tmp")
)
assert result_content is None
- assert result_images == []
+ assert result_images == {}
@patch("memory.common.parsers.html.process_image")
@@ -503,6 +507,191 @@ def test_process_images_no_filename(mock_process_image):
assert not images
+@pytest.mark.parametrize(
+ "html, selectors, base_url, expected",
+ [
+ # Basic URL extraction
+ (
+ 'Next ',
+ "a",
+ "https://example.com",
+ "https://example.com/next-page",
+ ),
+ # Multiple selectors - should pick first matching
+ (
+ '',
+ "a",
+ "https://example.com",
+ "https://example.com/first",
+ ),
+ # Multiple selectors with comma separation - span doesn't have href, so falls back to a
+ (
+ '',
+ ".next, a",
+ "https://example.com",
+ "https://example.com/link",
+ ),
+ # Absolute URL should remain unchanged
+ (
+ 'External ',
+ "a",
+ "https://example.com",
+ "https://other.com/page",
+ ),
+ # No href attribute
+ ("No href ", "a", "https://example.com", None),
+ # No matching element
+ ("No links
", "a", "https://example.com", None),
+ # Empty href
+ ('Empty ', "a", "https://example.com", None),
+ ],
+)
+def test_extract_url(html, selectors, base_url, expected):
+ soup = BeautifulSoup(html, "html.parser")
+ assert extract_url(soup, selectors, base_url) == expected
+
+
+@pytest.mark.parametrize(
+ "html, expected",
+ [
+ # Substack with preconnect link
+ (
+ """
+
+
+
+ """,
+ True,
+ ),
+ # Multiple preconnect links, one is Substack
+ (
+ """
+
+
+
+
+ """,
+ True,
+ ),
+ # No Substack preconnect
+ (
+ """
+
+
+
+ """,
+ False,
+ ),
+ # No preconnect links at all
+ ("", False),
+ # Preconnect without href
+ (' ', False),
+ # Different rel attribute
+ (' ', False),
+ ],
+)
+def test_is_substack(html, expected):
+ soup = BeautifulSoup(html, "html.parser")
+ assert is_substack(soup) == expected
+
+
+@pytest.mark.parametrize(
+ "html, expected",
+ [
+ # WordPress with wp-singular class on body should be False (looks for content inside body)
+ ('Content', False),
+ # WordPress with nested wp-singular
+ ('Content
', True),
+ # Archived page with WordPress content
+ (
+ """
+
+ """,
+ True,
+ ),
+ # No WordPress indicators
+ ('Regular content
', False),
+ # Empty body
+ ("", False),
+ # No body tag
+ ("No body
", False),
+ ],
+)
+def test_is_wordpress(html, expected):
+ soup = BeautifulSoup(html, "html.parser")
+ assert is_wordpress(soup) == expected
+
+
+@pytest.mark.parametrize(
+ "html, expected",
+ [
+ # Bloomberg with company link
+ (
+ """
+
+ Bloomberg
+
+ """,
+ True,
+ ),
+ # Bloomberg link among other links
+ (
+ """
+
+ Example
+ Bloomberg
+ Other
+
+ """,
+ True,
+ ),
+ # Archived page with Bloomberg content
+ (
+ """
+
+ """,
+ True,
+ ),
+ # No Bloomberg links
+ (
+ """
+
+ Example
+ Other
+
+ """,
+ False,
+ ),
+ # Bloomberg link but not company page
+ (
+ """
+
+ Bloomberg News
+
+ """,
+ False,
+ ),
+ # No links at all
+ ("No links
", False),
+ # Links without href
+ ("No href ", False),
+ ],
+)
+def test_is_bloomberg(html, expected):
+ soup = BeautifulSoup(html, "html.parser")
+ assert is_bloomberg(soup) == expected
+
+
class TestBaseHTMLParser:
def test_init_with_base_url(self):
parser = BaseHTMLParser("https://example.com/path")
@@ -584,7 +773,7 @@ class TestBaseHTMLParser:
def test_parse_with_images(self, mock_process_images):
# Mock the image processing to return test data
mock_image = MagicMock(spec=PILImage.Image)
- mock_process_images.return_value = (MagicMock(), [mock_image])
+ mock_process_images.return_value = (MagicMock(), {"test_image.jpg": mock_image})
html = """
@@ -600,5 +789,6 @@ class TestBaseHTMLParser:
article = parser.parse(html, "https://example.com/article")
assert len(article.images) == 1
- assert article.images[0] == mock_image
+ assert "test_image.jpg" in article.images
+ assert article.images["test_image.jpg"] == mock_image
mock_process_images.assert_called_once()