From 876fa8772564465c157a250c64a9c94150432ff6 Mon Sep 17 00:00:00 2001 From: Daniel O'Connell Date: Tue, 27 May 2025 01:24:57 +0200 Subject: [PATCH] Add archives fetcher --- src/memory/common/parsers/archives.py | 301 +++++++++ src/memory/common/parsers/blogs.py | 33 +- src/memory/common/parsers/feeds.py | 80 ++- src/memory/common/parsers/html.py | 50 +- tests/memory/common/parsers/test_archives.py | 668 ++++++++++++++----- tests/memory/common/parsers/test_feeds.py | 206 +++++- tests/memory/common/parsers/test_html.py | 196 +++++- 7 files changed, 1306 insertions(+), 228 deletions(-) create mode 100644 src/memory/common/parsers/archives.py diff --git a/src/memory/common/parsers/archives.py b/src/memory/common/parsers/archives.py new file mode 100644 index 0000000..afc0cb4 --- /dev/null +++ b/src/memory/common/parsers/archives.py @@ -0,0 +1,301 @@ +from dataclasses import dataclass, field +import logging +import re +import time +from urllib.parse import parse_qs, urlencode, urlparse, urlunparse +from typing import Generator, cast + +from bs4 import BeautifulSoup +from memory.common.parsers.blogs import is_substack + +from memory.common.parsers.feeds import ( + DanluuParser, + HTMLListParser, + RiftersParser, + FeedItem, + FeedParser, + SubstackAPIParser, +) +from memory.common.parsers.html import ( + fetch_html, + extract_url, + get_base_url, +) + +logger = logging.getLogger(__name__) + + +@dataclass +class ArchiveFetcher: + """Fetches complete backlogs from sites with pagination.""" + + parser_class: type[FeedParser] + start_url: str + max_pages: int = 100 + delay_between_requests: float = 1.0 + parser_kwargs: dict = field(default_factory=dict) + + def make_parser(self, url: str) -> FeedParser: + parser = self.parser_class(url=url) + for key, value in self.parser_kwargs.items(): + setattr(parser, key, value) + return parser + + def fetch_all_items(self) -> Generator[FeedItem, None, None]: + """Fetch all items from all pages.""" + visited_urls = set() + current_url = self.start_url + page_count = 0 + total_items = 0 + + while current_url and page_count < self.max_pages: + if current_url in visited_urls: + logger.warning(f"Already visited {current_url}, stopping") + break + + logger.info(f"Fetching page {page_count + 1}: {current_url}") + visited_urls.add(current_url) + + try: + parser = self.make_parser(current_url) + + items = parser.parse_feed() + if not items: + break + + prev_items = total_items + for item in items: + total_items += 1 + yield item + + if prev_items == total_items: + logger.warning(f"No new items found on page {page_count + 1}") + break + + current_url = self._find_next_page(parser, page_count) + if not current_url: + logger.info("No more pages found") + break + + page_count += 1 + + if self.delay_between_requests > 0: + time.sleep(self.delay_between_requests) + + except Exception as e: + logger.error(f"Error processing {current_url}: {e}") + break + + def _find_next_page(self, parser: FeedParser, current_page: int = 0) -> str | None: + return None + + +@dataclass +class LinkFetcher(ArchiveFetcher): + per_page: int = 10 + + def _find_next_page(self, parser: FeedParser, current_page: int = 0): + next_page = current_page + 1 + parsed = urlparse(self.start_url) + params = parse_qs(parsed.query) + params["offset"] = [str(next_page * self.per_page)] + params["limit"] = [str(self.per_page)] + + new_query = urlencode(params, doseq=True) + return urlunparse(parsed._replace(query=new_query)) + + +@dataclass +class HTMLArchiveFetcher(ArchiveFetcher): + next_page_selectors: list[str] = field( + default_factory=lambda: [ + 'a[rel="next"]', + ".next a", + "a.next", + ".pagination .next", + ".pager .next", + "nav.page a:last-of-type", + ".navigation a:last-of-type", + ] + ) + + def _find_next_page(self, parser: FeedParser, current_page: int = 0) -> str | None: + if not parser.content: + return None + soup = BeautifulSoup(parser.content, "html.parser") + selectors = ",".join(self.next_page_selectors) + return extract_url(soup, selectors, parser.url) + + +def html_parser(**kwargs) -> type[HTMLListParser]: + class ConfiguredHTMLListParser(HTMLListParser): + def __init__(self, url: str): + super().__init__(url) + for key, value in kwargs.items(): + setattr(self, key, value) + + return ConfiguredHTMLListParser + + +@dataclass +class SubstackArchiveFetcher(LinkFetcher): + def __post_init__(self): + if "api/v1/archive" not in self.start_url: + base_url = get_base_url(self.start_url) + self.start_url = f"{base_url}/api/v1/archive" + + +@dataclass +class ACOUPArchiveFetcher(HTMLArchiveFetcher): + def _find_next_page(self, parser: FeedParser, current_page: int = 0) -> str | None: + if not parser.content: + return None + soup = BeautifulSoup(parser.content, "html.parser") + urls = reversed([i.attrs.get("href") for i in soup.select(".widget_archive a")]) + urls = (cast(str, u) for u in urls if u) + for url in urls: + if url.rstrip("/") == parser.url.rstrip("/"): + return next(urls, None) + + +@dataclass +class HTMLNextUrlArchiveFetcher(HTMLArchiveFetcher): + next_url: str = "" + + def __post_init__(self): + if not self.next_url: + self.next_url = self.start_url + if not self.next_url.startswith("http") and not self.next_url.startswith("/"): + self.next_url = f"{self.start_url}/{self.next_url}" + + def _find_next_page(self, parser: FeedParser, current_page: int = 0) -> str | None: + return f"{self.next_url}/{current_page + 1}" + + +FETCHER_REGISTRY = { + r"https://putanumonit.com": ( + "https://putanumonit.com/full-archive", + html_parser( + item_selector="article p", title_selector="a strong", url_selector="a" + ), + ), + r"https://danluu.com": DanluuParser, + r"https://www.rifters.com": RiftersParser, + r"https://rachelbythebay.com": html_parser( + item_selector="div.post", + url_selector="a", + ), + r"https://guzey.com": ( + "https://guzey.com/archive/", + html_parser(item_selector="article li"), + ), + r"https://aphyr.com": html_parser( + item_selector="article.post", + title_selector="h1", + url_selector="h1 a", + description_selector=".body", + date_selector=".meta time", + ), + r"https://www.applieddivinitystudies.com": html_parser( + item_selector="article.article", + title_selector="header.article-header h1", + url_selector="header.article-header h1 a", + description_selector=".article-entry", + date_selector=".article-meta time", + ), + r"https://www.flyingmachinestudios.com": html_parser( + item_selector="#main #articles li", + title_selector="header .title", + description_selector="p", + date_selector="header .date", + date_format="%d %B %Y", + ), + r"https://slimemoldtimemold.com": html_parser( + item_selector="article .wp-block-list li", title_selector="a" + ), + r"https://www.paulgraham.com": ( + "https://www.paulgraham.com/articles.html", + html_parser(item_selector="img + font"), + ), + r"https://slatestarcodex.com": ( + "https://slatestarcodex.com/archives/", + html_parser(item_selector="#sya_container li"), + ), + r"https://mcfunley.com": ( + "https://mcfunley.com/writing", + html_parser(item_selector="article", title_selector="h6"), + ), + r"https://www.bitsaboutmoney.com": HTMLArchiveFetcher( + html_parser( + item_selector="article", + title_selector="h1", + description_selector="p", + date_selector="time", + ), + "https://www.bitsaboutmoney.com/archive/", + next_page_selectors=["nav.pagination a.older-posts"], + ), + r"https://acoup.blog": ACOUPArchiveFetcher( + html_parser( + item_selector="article", + title_selector="a", + description_selector=".entry-content", + date_selector=".published-on time", + ), + "https://acoup.blog/2019/05/", + ), + r"https://www.theredhandfiles.com": html_parser( + item_selector="article", title_selector="h3", description_selector="h2" + ), +} + + +def get_archive_fetcher(url: str) -> ArchiveFetcher | None: + for pattern, fetcher in FETCHER_REGISTRY.items(): + if re.search(pattern, url.rstrip("/")): + if isinstance(fetcher, ArchiveFetcher): + return fetcher + elif isinstance(fetcher, tuple): + base_url, html_fetcher = fetcher + return HTMLArchiveFetcher(html_fetcher, base_url) + else: + return HTMLArchiveFetcher(fetcher, url) + + html = fetch_html(url) + soup = BeautifulSoup(html, "html.parser") + if is_substack(soup): + return SubstackArchiveFetcher(SubstackAPIParser, url) + + +feeds = [ + "https://archive.ph/o/IQUoT/https://www.bloomberg.com/opinion/authors/ARbTQlRLRjE/matthew-s-levine", + "https://www.rifters.com/crawl/", + "https://rachelbythebay.com/w/", + "https://danluu.com/", + "https://guzey.com", + "https://aphyr.com/", + "https://www.applieddivinitystudies.com/", + "https://www.imightbewrong.org/", + "https://www.kvetch.au/", + "https://www.overcomingbias.com/", + "https://samkriss.substack.com/", + "https://www.richardhanania.com/", + "https://skunkledger.substack.com/", + "https://taipology.substack.com/", + "https://putanumonit.com/", + "https://www.flyingmachinestudios.com/", + "https://www.theintrinsicperspective.com/", + "https://www.strangeloopcanon.com/", + "https://slimemoldtimemold.com/", + "https://zeroinputagriculture.substack.com/", + "https://nayafia.substack.com", + "https://www.paulgraham.com/articles.html", + "https://mcfunley.com/writing", + "https://www.bitsaboutmoney.com/", + "https://akarlin.com", + "https://www.exurbe.com/", + "https://acoup.blog/", + "https://www.theredhandfiles.com/", + "https://karlin.blog/", + "https://slatestarcodex.com/", +] diff --git a/src/memory/common/parsers/blogs.py b/src/memory/common/parsers/blogs.py index 1fb0ad4..021e718 100644 --- a/src/memory/common/parsers/blogs.py +++ b/src/memory/common/parsers/blogs.py @@ -13,6 +13,9 @@ from memory.common.parsers.html import ( extract_title, extract_date, fetch_html, + is_wordpress, + is_substack, + is_bloomberg, ) @@ -587,24 +590,13 @@ def get_parser_for_url(url: str, html: str) -> BaseHTMLParser: return parser_class(url) soup = BeautifulSoup(html, "html.parser") - body_select = "body" - # Check if this is an archived page - if contents := soup.select_one("#CONTENT .html"): - body_select = ".body" - soup = contents - - if soup.select_one(f"{body_select} .wp-singular"): + if is_wordpress(soup): return WordPressParser(url) - if any( - "https://substackcdn.com" == a.attrs.get("href") # type: ignore - for a in soup.find_all("link", {"rel": "preconnect"}) - if hasattr(a, "attrs") # type: ignore - ): + if is_substack(soup): return SubstackParser(url) - urls = [a.attrs.get("href") for a in soup.select(f"{body_select} a")] # type: ignore - if any(u.endswith("https://www.bloomberg.com/company/") for u in urls[:5] if u): # type: ignore + if is_bloomberg(soup): return BloombergParser(url) return BaseHTMLParser(url) @@ -628,11 +620,11 @@ def parse_webpage(url: str) -> Article: feeds = [ "https://archive.ph/o/IQUoT/https://www.bloomberg.com/opinion/authors/ARbTQlRLRjE/matthew-s-levine", "https://www.rifters.com/crawl/", - "https://rachelbythebay.com/w/atom.xml", + "https://rachelbythebay.com/w/", "https://danluu.com/", - "https://guzey.com/archive", - "https://aphyr.com/posts.atom", - "https://www.applieddivinitystudies.com/atom.xml", + "https://guzey.come", + "https://aphyr.com/", + "https://www.applieddivinitystudies.com/", "https://www.imightbewrong.org/", "https://www.kvetch.au/", "https://www.overcomingbias.com/", @@ -649,9 +641,10 @@ feeds = [ "https://nayafia.substack.com", "https://www.paulgraham.com/articles.html", "https://mcfunley.com/writing", - "https://www.bitsaboutmoney.com/archive/", - "https://akarlin.com/archive/", + "https://www.bitsaboutmoney.com/", + "https://akarlin.com", "https://www.exurbe.com/", "https://acoup.blog/", "https://www.theredhandfiles.com/", + "https://karlin.blog/", ] diff --git a/src/memory/common/parsers/feeds.py b/src/memory/common/parsers/feeds.py index 8087272..6ea25a4 100644 --- a/src/memory/common/parsers/feeds.py +++ b/src/memory/common/parsers/feeds.py @@ -1,5 +1,6 @@ from datetime import datetime import logging +import json import re from dataclasses import dataclass, field from typing import Any, Generator, Sequence, cast @@ -20,6 +21,20 @@ from memory.common.parsers.html import ( logger = logging.getLogger(__name__) +ObjectPath = list[str | int] + + +def select_in(data: Any, path: ObjectPath) -> Any: + if not path: + return data + + key, *rest = path + try: + return select_in(data[key], rest) + except (KeyError, TypeError, IndexError): + return None + + @dataclass class FeedItem: """Represents a single item from a feed.""" @@ -62,7 +77,7 @@ class FeedParser: ) def valid_item(self, item: FeedItem) -> bool: - return True + return bool(item.url) def parse_feed(self) -> Generator[FeedItem, None, None]: """Parse feed content and return list of feed items.""" @@ -100,6 +115,46 @@ class FeedParser: return {} +class JSONParser(FeedParser): + title_path: ObjectPath = ["title"] + url_path: ObjectPath = ["url"] + description_path: ObjectPath = ["description"] + date_path: ObjectPath = ["date"] + author_path: ObjectPath = ["author"] + guid_path: ObjectPath = ["guid"] + metadata_path: ObjectPath = ["metadata"] + + def fetch_items(self) -> Sequence[Any]: + if not self.content: + self.content = cast(str, fetch_html(self.url)) + try: + return json.loads(self.content) + except json.JSONDecodeError as e: + logger.error(f"Error parsing JSON: {e}") + return [] + + def extract_title(self, entry: Any) -> str: + return select_in(entry, self.title_path) + + def extract_url(self, entry: Any) -> str: + return select_in(entry, self.url_path) + + def extract_description(self, entry: Any) -> str: + return select_in(entry, self.description_path) + + def extract_date(self, entry: Any) -> datetime: + return select_in(entry, self.date_path) + + def extract_author(self, entry: Any) -> str: + return select_in(entry, self.author_path) + + def extract_guid(self, entry: Any) -> str: + return select_in(entry, self.guid_path) + + def extract_metadata(self, entry: Any) -> dict[str, Any]: + return select_in(entry, self.metadata_path) + + class RSSAtomParser(FeedParser): """Parser for RSS and Atom feeds using feedparser.""" @@ -237,8 +292,14 @@ class HTMLListParser(FeedParser): return extract_date(entry, self.date_selector, self.date_format) +class SubstackAPIParser(JSONParser): + url_path = ["canonical_url"] + author_path = ["publishedBylines", 0, "name"] + date_path = ["post_date"] + + class DanluuParser(HTMLListParser): - skip_patterns = [r"^https://danluu.com/#"] + skip_patterns = DEFAULT_SKIP_PATTERNS + [r"^https://danluu\.com/?#"] def valid_item(self, item: FeedItem) -> bool: return item.url.startswith(self.base_url) @@ -351,6 +412,13 @@ class RedHandFilesParser(HTMLListParser): return "" +class RiftersParser(HTMLListParser): + item_selector = "#content .post" + title_selector = "h2 a" + url_selector = "h2 a" + description_selector = ".entry-content" + + class BloombergAuthorParser(HTMLListParser): item_selector = "section#author_page article" url_selector = "a[href]" @@ -393,7 +461,7 @@ def is_rss_feed(content: str) -> bool: ) -def extract_url(element: Tag, base_url: str) -> str | None: +def clean_url(element: Tag, base_url: str) -> str | None: if not (href := element.get("href")): return None @@ -409,14 +477,14 @@ def find_feed_link(url: str, soup: BeautifulSoup) -> str | None: for link in links: if not isinstance(link, Tag): continue - if not (link_url := extract_url(link, url)): + if not (link_url := clean_url(link, url)): continue if link_url.rstrip("/") != url.rstrip("/"): return link_url return None -PARSER_REGISTRY = { +FEED_REGISTRY = { r"https://danluu.com": DanluuParser, r"https://guzey.com/archive": GuzeyParser, r"https://www.paulgraham.com/articles": PaulGrahamParser, @@ -427,7 +495,7 @@ PARSER_REGISTRY = { def get_feed_parser(url: str, check_from: datetime | None = None) -> FeedParser | None: - for pattern, parser_class in PARSER_REGISTRY.items(): + for pattern, parser_class in FEED_REGISTRY.items(): if re.search(pattern, url.rstrip("/")): return parser_class(url=url, since=check_from) diff --git a/src/memory/common/parsers/html.py b/src/memory/common/parsers/html.py index 1842456..5a46533 100644 --- a/src/memory/common/parsers/html.py +++ b/src/memory/common/parsers/html.py @@ -110,7 +110,14 @@ def extract_date( datetime_attr = element.get("datetime") if datetime_attr: - for format in ["%Y-%m-%dT%H:%M:%S", "%Y-%m-%d", date_format]: + for format in [ + "%Y-%m-%dT%H:%M:%S.%fZ", + "%Y-%m-%dT%H:%M:%S%z", + "%Y-%m-%dT%H:%M:%S.%f", + "%Y-%m-%dT%H:%M:%S", + "%Y-%m-%d", + date_format, + ]: if date := parse_date(str(datetime_attr), format): return date @@ -277,6 +284,47 @@ def extract_metadata(soup: BeautifulSoup) -> dict[str, Any]: return metadata +def extract_url(soup: BeautifulSoup, selectors: str, base_url: str = "") -> str | None: + for selector in selectors.split(","): + next_link = soup.select_one(selector) + if not (next_link and isinstance(next_link, Tag)): + continue + + if not (href := next_link.get("href")): + continue + + return to_absolute_url(str(href), base_url) + + return None + + +def is_substack(soup: BeautifulSoup | Tag) -> bool: + return any( + "https://substackcdn.com" == a.attrs.get("href") # type: ignore + for a in soup.find_all("link", {"rel": "preconnect"}) + if hasattr(a, "attrs") # type: ignore + ) + + +def is_wordpress(soup: BeautifulSoup | Tag) -> bool: + body_select = "body" + # Check if this is an archived page + if contents := soup.select_one("#CONTENT .html"): + body_select = "#CONTENT .html" + soup = contents + return bool(soup.select_one(f"{body_select} .wp-singular")) + + +def is_bloomberg(soup: BeautifulSoup | Tag) -> bool: + body_select = "body" + # Check if this is an archived page + if contents := soup.select_one("#CONTENT .html"): + body_select = "#CONTENT .html" + soup = contents + urls = [a.attrs.get("href") for a in soup.select(f"{body_select} a")] # type: ignore + return any(u.endswith("https://www.bloomberg.com/company/") for u in urls[:5] if u) # type: ignore + + class BaseHTMLParser: """Base class for parsing HTML content from websites.""" diff --git a/tests/memory/common/parsers/test_archives.py b/tests/memory/common/parsers/test_archives.py index b5df1b1..e7284dd 100644 --- a/tests/memory/common/parsers/test_archives.py +++ b/tests/memory/common/parsers/test_archives.py @@ -1,176 +1,512 @@ +from unittest.mock import patch +from urllib.parse import urlparse, parse_qs + import pytest -from unittest.mock import Mock, patch -from bs4 import BeautifulSoup from memory.common.parsers.archives import ( - ArchiveParser, - WordPressArchiveParser, - SubstackArchiveParser, - get_archive_parser, + ArchiveFetcher, + LinkFetcher, + HTMLArchiveFetcher, + SubstackArchiveFetcher, + ACOUPArchiveFetcher, + HTMLNextUrlArchiveFetcher, + html_parser, + get_archive_fetcher, + FETCHER_REGISTRY, +) +from memory.common.parsers.feeds import ( + FeedItem, + FeedParser, + HTMLListParser, + DanluuParser, + SubstackAPIParser, ) -class TestArchiveParser: - def test_init(self): - parser = ArchiveParser(url="https://example.com") - assert parser.url == "https://example.com" - assert parser._visited_urls == set() - assert parser._all_items == [] - assert parser.max_pages == 100 - assert parser.delay_between_requests == 1.0 +class MockParser(FeedParser): + def __init__( + self, url: str, items: list[FeedItem] | None = None, content: str = "" + ): + super().__init__(url) + self.items = items or [] + self.content = content - def test_extract_items_from_page(self): - html = """ -
-
  • Post 1
  • -
  • Post 2
  • -
  • Post 1
  • -
    - """ - soup = BeautifulSoup(html, "html.parser") - parser = ArchiveParser(url="https://example.com") - - items = parser._extract_items_from_page(soup) - assert len(items) == 2 # Duplicates should be filtered out - - def test_find_next_page_url_with_selector(self): - html = '
    ' - soup = BeautifulSoup(html, "html.parser") - parser = ArchiveParser(url="https://example.com") - parser.next_page_selector = ".next" - - next_url = parser._find_next_page_url(soup, "https://example.com/page/1") - assert next_url == "https://example.com/page/2" - - def test_find_next_page_url_heuristic(self): - html = '
    ' - soup = BeautifulSoup(html, "html.parser") - parser = ArchiveParser(url="https://example.com") - - next_url = parser._find_next_page_url(soup, "https://example.com/page/1") - assert next_url == "https://example.com/page/2" - - def test_find_next_page_url_contains_text(self): - html = '
    Next →
    ' - soup = BeautifulSoup(html, "html.parser") - parser = ArchiveParser(url="https://example.com") - - next_url = parser._find_next_page_heuristic(soup) - assert next_url == "https://example.com/page/2" - - def test_find_next_numeric_page(self): - parser = ArchiveParser(url="https://example.com") - parser.page_url_pattern = "/page/{page}" - - # Test with existing page number - next_url = parser._find_next_numeric_page("https://example.com/page/3") - assert next_url == "https://example.com/page/4" - - # Test without page number (assume page 1) - next_url = parser._find_next_numeric_page("https://example.com/archive") - assert next_url == "https://example.com/archive/page/2" - - @patch("memory.common.parsers.archives.fetch_html") - @patch("time.sleep") - def test_fetch_items_pagination(self, mock_sleep, mock_fetch): - # Mock HTML for two pages - page1_html = """ -
    -
  • Post 1
  • -
  • Post 2
  • - -
    - """ - page2_html = """ -
    -
  • Post 3
  • -
  • Post 4
  • -
    - """ - - mock_fetch.side_effect = [page1_html, page2_html] - - parser = ArchiveParser(url="https://example.com/page/1") - parser.delay_between_requests = 0.1 # Speed up test - - items = parser.fetch_items() - - assert len(items) == 4 - assert mock_fetch.call_count == 2 - assert mock_sleep.call_count == 1 # One delay between requests - - @patch("memory.common.parsers.archives.fetch_html") - def test_fetch_items_stops_at_max_pages(self, mock_fetch): - # Mock HTML that always has a next page - html_with_next = """ -
    -
  • Post
  • - -
    - """ - - mock_fetch.return_value = html_with_next - - parser = ArchiveParser(url="https://example.com/page/1") - parser.max_pages = 3 - parser.delay_between_requests = 0 # No delay for test - - items = parser.fetch_items() - - assert mock_fetch.call_count == 3 # Should stop at max_pages - - @patch("memory.common.parsers.archives.fetch_html") - def test_fetch_items_handles_duplicate_urls(self, mock_fetch): - # Mock HTML that creates a cycle - page1_html = """ -
    -
  • Post 1
  • - -
    - """ - page2_html = """ -
    -
  • Post 2
  • - -
    - """ - - mock_fetch.side_effect = [page1_html, page2_html] - - parser = ArchiveParser(url="https://example.com/page/1") - parser.delay_between_requests = 0 - - items = parser.fetch_items() - - assert len(items) == 2 - assert mock_fetch.call_count == 2 # Should stop when it hits visited URL + def parse_feed(self): + return self.items -class TestWordPressArchiveParser: - def test_selectors(self): - parser = WordPressArchiveParser(url="https://example.wordpress.com") - assert parser.item_selector == "article, .post" - assert parser.next_page_selector == '.nav-previous a, .next a, a[rel="next"]' - assert parser.title_selector == ".entry-title a, h1 a, h2 a" - - -class TestSubstackArchiveParser: - def test_selectors(self): - parser = SubstackArchiveParser(url="https://example.substack.com") - assert parser.item_selector == ".post-preview, .post" - assert parser.next_page_selector == ".pagination .next" - - -class TestGetArchiveParser: - @pytest.mark.parametrize( - "url,expected_class", - [ - ("https://example.wordpress.com/archive", WordPressArchiveParser), - ("https://example.substack.com/archive", SubstackArchiveParser), - ("https://example.com/archive", ArchiveParser), # Default - ], +def test_archive_fetcher_make_parser(): + fetcher = ArchiveFetcher( + parser_class=MockParser, + start_url="https://example.com", + parser_kwargs={"custom_attr": "value"}, ) - def test_get_archive_parser(self, url, expected_class): - parser = get_archive_parser(url) - assert isinstance(parser, expected_class) - assert parser.url == url + + parser = fetcher.make_parser("https://example.com/page1") + + assert isinstance(parser, MockParser) + assert parser.url == "https://example.com/page1" + assert getattr(parser, "custom_attr") == "value" + + +def test_archive_fetcher_find_next_page_base(): + fetcher = ArchiveFetcher(MockParser, "https://example.com") + parser = MockParser("https://example.com") + + assert fetcher._find_next_page(parser, 0) is None + + +@patch("memory.common.parsers.archives.time.sleep") +def test_archive_fetcher_fetch_all_items_single_page(mock_sleep): + items = [ + FeedItem(title="Item 1", url="https://example.com/1"), + FeedItem(title="Item 2", url="https://example.com/2"), + ] + + fetcher = ArchiveFetcher( + parser_class=MockParser, + start_url="https://example.com", + delay_between_requests=0.5, + ) + + with patch.object(fetcher, "make_parser") as mock_make_parser: + mock_parser = MockParser("https://example.com", items) + mock_make_parser.return_value = mock_parser + + result = list(fetcher.fetch_all_items()) + + assert result == items + mock_make_parser.assert_called_once_with("https://example.com") + mock_sleep.assert_not_called() # No delay for single page + + +@patch("memory.common.parsers.archives.time.sleep") +def test_archive_fetcher_fetch_all_items_multiple_pages(mock_sleep): + page1_items = [FeedItem(title="Item 1", url="https://example.com/1")] + page2_items = [FeedItem(title="Item 2", url="https://example.com/2")] + + class TestFetcher(ArchiveFetcher): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.call_count = 0 + + def _find_next_page(self, parser, current_page=0): + self.call_count += 1 + if self.call_count == 1: + return "https://example.com/page2" + return None + + fetcher = TestFetcher( + parser_class=MockParser, + start_url="https://example.com", + delay_between_requests=0.1, + ) + + with patch.object(fetcher, "make_parser") as mock_make_parser: + mock_make_parser.side_effect = [ + MockParser("https://example.com", page1_items), + MockParser("https://example.com/page2", page2_items), + ] + + result = list(fetcher.fetch_all_items()) + + assert result == page1_items + page2_items + assert mock_make_parser.call_count == 2 + mock_sleep.assert_called_once_with(0.1) + + +def test_archive_fetcher_fetch_all_items_max_pages(): + class TestFetcher(ArchiveFetcher): + def _find_next_page(self, parser, current_page=0): + return f"https://example.com/page{current_page + 2}" + + fetcher = TestFetcher( + parser_class=MockParser, + start_url="https://example.com", + max_pages=2, + delay_between_requests=0, + ) + + items = [FeedItem(title="Item", url="https://example.com/item")] + + with patch.object(fetcher, "make_parser") as mock_make_parser: + mock_make_parser.return_value = MockParser("https://example.com", items) + + result = list(fetcher.fetch_all_items()) + + assert len(result) == 2 # 2 pages * 1 item per page + assert mock_make_parser.call_count == 2 + + +def test_archive_fetcher_fetch_all_items_visited_url(): + class TestFetcher(ArchiveFetcher): + def _find_next_page(self, parser, current_page=0): + return "https://example.com" # Return same URL to trigger visited check + + fetcher = TestFetcher(MockParser, "https://example.com", delay_between_requests=0) + items = [FeedItem(title="Item", url="https://example.com/item")] + + with patch.object(fetcher, "make_parser") as mock_make_parser: + mock_make_parser.return_value = MockParser("https://example.com", items) + + result = list(fetcher.fetch_all_items()) + + assert len(result) == 1 # Only first page processed + mock_make_parser.assert_called_once() + + +def test_archive_fetcher_fetch_all_items_no_items(): + fetcher = ArchiveFetcher( + MockParser, "https://example.com", delay_between_requests=0 + ) + + with patch.object(fetcher, "make_parser") as mock_make_parser: + mock_make_parser.return_value = MockParser("https://example.com", []) + + result = list(fetcher.fetch_all_items()) + + assert result == [] + mock_make_parser.assert_called_once() + + +def test_archive_fetcher_fetch_all_items_exception(): + fetcher = ArchiveFetcher( + MockParser, "https://example.com", delay_between_requests=0 + ) + + with patch.object(fetcher, "make_parser") as mock_make_parser: + mock_make_parser.side_effect = Exception("Network error") + + result = list(fetcher.fetch_all_items()) + + assert result == [] + + +@pytest.mark.parametrize( + "start_url, per_page, current_page, expected_params", + [ + ("https://example.com", 10, 0, {"offset": ["10"], "limit": ["10"]}), + ( + "https://example.com?existing=value", + 20, + 1, + {"existing": ["value"], "offset": ["40"], "limit": ["20"]}, + ), + ( + "https://example.com?offset=0&limit=5", + 15, + 2, + {"offset": ["45"], "limit": ["15"]}, + ), + ], +) +def test_link_fetcher_find_next_page( + start_url, per_page, current_page, expected_params +): + fetcher = LinkFetcher(MockParser, start_url, per_page=per_page) + parser = MockParser(start_url) + + next_url = fetcher._find_next_page(parser, current_page) + + assert next_url is not None + parsed = urlparse(next_url) + params = parse_qs(parsed.query) + + for key, value in expected_params.items(): + assert params[key] == value + + +@pytest.mark.parametrize( + "html, selectors, expected_url", + [ + ( + '', + ['a[rel="next"]'], + "https://example.com/page2", + ), + ( + '', + [".next a"], + "https://example.com/page2", + ), + ( + '', + ["a.next"], + "https://example.com/page2", + ), + ( + '', + [".pagination .next"], + None, # This won't match because it's looking for .pagination .next directly + ), + ( + '', + [".pagination.next"], + None, # This selector isn't in default list + ), + ( + '', + ["nav.page a:last-of-type"], + "https://example.com/page2", + ), + ("
    No next link
    ", ['a[rel="next"]'], None), + ], +) +def test_html_archive_fetcher_find_next_page(html, selectors, expected_url): + fetcher = HTMLArchiveFetcher( + MockParser, "https://example.com", next_page_selectors=selectors + ) + parser = MockParser("https://example.com", content=html) + + with patch("memory.common.parsers.archives.extract_url") as mock_extract: + mock_extract.return_value = expected_url + + result = fetcher._find_next_page(parser) + + if expected_url: + mock_extract.assert_called_once() + assert result == expected_url + else: + # extract_url might still be called but return None + assert result is None + + +def test_html_archive_fetcher_find_next_page_no_content(): + fetcher = HTMLArchiveFetcher(MockParser, "https://example.com") + parser = MockParser("https://example.com", content="") + + result = fetcher._find_next_page(parser) + + assert result is None + + +def test_html_parser_factory(): + CustomParser = html_parser( + item_selector="article", title_selector="h1", custom_attr="value" + ) + + parser = CustomParser("https://example.com") + + assert isinstance(parser, HTMLListParser) + assert parser.item_selector == "article" + assert parser.title_selector == "h1" + assert getattr(parser, "custom_attr") == "value" + + +@pytest.mark.parametrize( + "start_url, expected_api_url", + [ + ("https://example.substack.com", "https://example.substack.com/api/v1/archive"), + ( + "https://example.substack.com/posts", + "https://example.substack.com/api/v1/archive", + ), + ( + "https://example.substack.com/api/v1/archive", + "https://example.substack.com/api/v1/archive", + ), + ], +) +def test_substack_archive_fetcher_post_init(start_url, expected_api_url): + with patch("memory.common.parsers.archives.get_base_url") as mock_get_base: + mock_get_base.return_value = "https://example.substack.com" + + fetcher = SubstackArchiveFetcher(SubstackAPIParser, start_url) + + assert fetcher.start_url == expected_api_url + + +def test_acoup_archive_fetcher_find_next_page(): + html = """ +
    + April 2019 + May 2019 + June 2019 +
    + """ + + fetcher = ACOUPArchiveFetcher(MockParser, "https://acoup.blog/2019/05/") + parser = MockParser("https://acoup.blog/2019/05/", content=html) + + result = fetcher._find_next_page(parser) + + assert result == "https://acoup.blog/2019/04/" + + +def test_acoup_archive_fetcher_find_next_page_no_match(): + html = """ +
    + April 2019 + June 2019 +
    + """ + + fetcher = ACOUPArchiveFetcher(MockParser, "https://acoup.blog/2019/05/") + parser = MockParser("https://acoup.blog/2019/05/", content=html) + + result = fetcher._find_next_page(parser) + + assert result is None + + +def test_acoup_archive_fetcher_find_next_page_no_content(): + fetcher = ACOUPArchiveFetcher(MockParser, "https://acoup.blog/2019/05/") + parser = MockParser("https://acoup.blog/2019/05/", content="") + + result = fetcher._find_next_page(parser) + + assert result is None + + +@pytest.mark.parametrize( + "start_url, next_url, expected_next_url", + [ + ( + "https://example.com", + "", + "https://example.com", + ), # Empty next_url defaults to start_url + ( + "https://example.com", + "https://other.com/archive", + "https://other.com/archive", # Full URL is preserved + ), + ( + "https://example.com", + "/archive", + "/archive", + ), # Absolute path is preserved + ( + "https://example.com", + "archive", + "https://example.com/archive", + ), # Relative path gets prepended + ], +) +def test_html_next_url_archive_fetcher_post_init( + start_url, next_url, expected_next_url +): + fetcher = HTMLNextUrlArchiveFetcher(MockParser, start_url, next_url=next_url) + + assert fetcher.next_url == expected_next_url + + +def test_html_next_url_archive_fetcher_find_next_page(): + fetcher = HTMLNextUrlArchiveFetcher( + MockParser, "https://example.com", next_url="https://example.com/archive" + ) + parser = MockParser("https://example.com") + + result = fetcher._find_next_page(parser, 2) + + assert result == "https://example.com/archive/3" + + +@pytest.mark.parametrize( + "url, expected_fetcher_type", + [ + ("https://danluu.com", HTMLArchiveFetcher), + ("https://www.rifters.com", HTMLArchiveFetcher), + ("https://putanumonit.com", HTMLArchiveFetcher), + ("https://acoup.blog", ACOUPArchiveFetcher), + ("https://unknown.com", None), + ], +) +def test_get_archive_fetcher_registry_matches(url, expected_fetcher_type): + with patch("memory.common.parsers.archives.fetch_html") as mock_fetch: + mock_fetch.return_value = "Not substack" + + with patch("memory.common.parsers.archives.is_substack") as mock_is_substack: + mock_is_substack.return_value = False + + fetcher = get_archive_fetcher(url) + + if expected_fetcher_type: + assert isinstance(fetcher, expected_fetcher_type) + else: + assert fetcher is None + + +def test_get_archive_fetcher_tuple_registry(): + url = "https://putanumonit.com" + + with patch("memory.common.parsers.archives.fetch_html") as mock_fetch: + mock_fetch.return_value = "Not substack" + + fetcher = get_archive_fetcher(url) + + assert isinstance(fetcher, HTMLArchiveFetcher) + assert fetcher.start_url == "https://putanumonit.com/full-archive" + + +def test_get_archive_fetcher_direct_parser_registry(): + url = "https://danluu.com" + + with patch("memory.common.parsers.archives.fetch_html") as mock_fetch: + mock_fetch.return_value = "Not substack" + + fetcher = get_archive_fetcher(url) + + assert isinstance(fetcher, HTMLArchiveFetcher) + assert fetcher.parser_class == DanluuParser + assert fetcher.start_url == url + + +def test_get_archive_fetcher_substack(): + url = "https://example.substack.com" + + with patch("memory.common.parsers.archives.fetch_html") as mock_fetch: + mock_fetch.return_value = "Substack content" + + with patch("memory.common.parsers.archives.is_substack") as mock_is_substack: + mock_is_substack.return_value = True + + fetcher = get_archive_fetcher(url) + + assert isinstance(fetcher, SubstackArchiveFetcher) + assert fetcher.parser_class == SubstackAPIParser + + +def test_get_archive_fetcher_no_match(): + url = "https://unknown.com" + + with patch("memory.common.parsers.archives.fetch_html") as mock_fetch: + mock_fetch.return_value = "Regular website" + + with patch("memory.common.parsers.archives.is_substack") as mock_is_substack: + mock_is_substack.return_value = False + + fetcher = get_archive_fetcher(url) + + assert fetcher is None + + +def test_fetcher_registry_structure(): + """Test that FETCHER_REGISTRY has expected structure.""" + assert isinstance(FETCHER_REGISTRY, dict) + + for pattern, fetcher in FETCHER_REGISTRY.items(): + assert isinstance(pattern, str) + assert ( + isinstance(fetcher, type) + and issubclass(fetcher, FeedParser) + or isinstance(fetcher, tuple) + or isinstance(fetcher, ArchiveFetcher) + ) + + +@pytest.mark.parametrize( + "pattern, test_url, should_match", + [ + (r"https://danluu.com", "https://danluu.com", True), + (r"https://danluu.com", "https://danluu.com/", True), + (r"https://danluu.com", "https://other.com", False), + (r"https://www.rifters.com", "https://www.rifters.com/crawl", True), + (r"https://putanumonit.com", "https://putanumonit.com/archive", True), + ], +) +def test_registry_pattern_matching(pattern, test_url, should_match): + import re + + match = re.search(pattern, test_url.rstrip("/")) + assert bool(match) == should_match diff --git a/tests/memory/common/parsers/test_feeds.py b/tests/memory/common/parsers/test_feeds.py index b1d30b6..b8cf541 100644 --- a/tests/memory/common/parsers/test_feeds.py +++ b/tests/memory/common/parsers/test_feeds.py @@ -1,10 +1,10 @@ from datetime import datetime from unittest.mock import MagicMock, patch -from typing import Any, cast +from typing import cast +import json import pytest from bs4 import BeautifulSoup, Tag -import requests from memory.common.parsers.feeds import ( FeedItem, @@ -17,15 +17,160 @@ from memory.common.parsers.feeds import ( NadiaXyzParser, RedHandFilesParser, BloombergAuthorParser, + JSONParser, + SubstackAPIParser, + select_in, + clean_url, is_rss_feed, - extract_url, find_feed_link, get_feed_parser, - DEFAULT_SKIP_PATTERNS, - PARSER_REGISTRY, ) +@pytest.mark.parametrize( + "data, path, expected", + [ + # Basic dictionary access + ({"key": "value"}, ["key"], "value"), + ({"nested": {"key": "value"}}, ["nested", "key"], "value"), + # List access + (["a", "b", "c"], [1], "b"), + ([{"key": "value"}], [0, "key"], "value"), + # Mixed access + ( + {"items": [{"name": "first"}, {"name": "second"}]}, + ["items", 1, "name"], + "second", + ), + # Empty path returns original data + ({"key": "value"}, [], {"key": "value"}), + # Missing keys return None + ({"key": "value"}, ["missing"], None), + ({"nested": {}}, ["nested", "missing"], None), + # Index out of bounds returns None + (["a", "b"], [5], None), + # Type errors return None + ("string", ["key"], None), + (123, [0], None), + (None, ["key"], None), + # Deep nesting + ({"a": {"b": {"c": {"d": "deep"}}}}, ["a", "b", "c", "d"], "deep"), + ], +) +def test_select_in(data, path, expected): + assert select_in(data, path) == expected + + +@patch("memory.common.parsers.feeds.fetch_html") +def test_json_parser_fetch_items_with_content(mock_fetch_html): + content = json.dumps( + [ + {"title": "Article 1", "url": "https://example.com/1"}, + {"title": "Article 2", "url": "https://example.com/2"}, + ] + ) + + parser = JSONParser(url="https://example.com/feed.json", content=content) + items = parser.fetch_items() + + assert items == [ + {"title": "Article 1", "url": "https://example.com/1"}, + {"title": "Article 2", "url": "https://example.com/2"}, + ] + mock_fetch_html.assert_not_called() + + +@patch("memory.common.parsers.feeds.fetch_html") +def test_json_parser_fetch_items_without_content(mock_fetch_html): + content = json.dumps([{"title": "Article", "url": "https://example.com/1"}]) + mock_fetch_html.return_value = content + + parser = JSONParser(url="https://example.com/feed.json") + items = parser.fetch_items() + + assert items == [{"title": "Article", "url": "https://example.com/1"}] + mock_fetch_html.assert_called_once_with("https://example.com/feed.json") + + +@patch("memory.common.parsers.feeds.fetch_html") +def test_json_parser_fetch_items_invalid_json(mock_fetch_html): + mock_fetch_html.return_value = "invalid json content" + + parser = JSONParser(url="https://example.com/feed.json") + items = parser.fetch_items() + + assert items == [] + + +def test_json_parser_extract_methods(): + parser = JSONParser(url="https://example.com") + + entry = { + "title": "Test Title", + "url": "https://example.com/article", + "description": "Test description", + "date": "2023-01-15", + "author": "John Doe", + "guid": "unique-123", + "metadata": {"tags": ["tech", "news"]}, + } + + assert parser.extract_title(entry) == "Test Title" + assert parser.extract_url(entry) == "https://example.com/article" + assert parser.extract_description(entry) == "Test description" + assert parser.extract_date(entry) == "2023-01-15" + assert parser.extract_author(entry) == "John Doe" + assert parser.extract_guid(entry) == "unique-123" + assert parser.extract_metadata(entry) == {"tags": ["tech", "news"]} + + +def test_json_parser_custom_paths(): + parser = JSONParser(url="https://example.com") + parser.title_path = ["content", "headline"] + parser.url_path = ["links", "canonical"] + parser.author_path = ["byline", "name"] + + entry = { + "content": {"headline": "Custom Title"}, + "links": {"canonical": "https://example.com/custom"}, + "byline": {"name": "Jane Smith"}, + } + + assert parser.extract_title(entry) == "Custom Title" + assert parser.extract_url(entry) == "https://example.com/custom" + assert parser.extract_author(entry) == "Jane Smith" + + +def test_json_parser_missing_fields(): + parser = JSONParser(url="https://example.com") + + entry = {} # Empty entry + + assert parser.extract_title(entry) is None + assert parser.extract_url(entry) is None + assert parser.extract_description(entry) is None + assert parser.extract_date(entry) is None + assert parser.extract_author(entry) is None + assert parser.extract_guid(entry) is None + assert parser.extract_metadata(entry) is None + + +def test_json_parser_nested_paths(): + parser = JSONParser(url="https://example.com") + parser.title_path = ["article", "header", "title"] + parser.author_path = ["article", "byline", 0, "name"] + + entry = { + "article": { + "header": {"title": "Nested Title"}, + "byline": [{"name": "First Author"}, {"name": "Second Author"}], + } + } + + assert parser.extract_title(entry) == "Nested Title" + assert parser.extract_author(entry) == "First Author" + + def test_feed_parser_base_url(): parser = FeedParser(url="https://example.com/path/to/feed") assert parser.base_url == "https://example.com" @@ -582,7 +727,7 @@ def test_extract_url_function(html, expected): element = soup.find("a") assert element is not None - url = extract_url(cast(Tag, element), "https://example.com") + url = clean_url(cast(Tag, element), "https://example.com") assert url == expected @@ -706,33 +851,30 @@ def test_get_feed_parser_with_check_from(): assert parser.since == check_from -def test_parser_registry_completeness(): - """Ensure PARSER_REGISTRY contains expected parsers.""" - expected_patterns = [ - r"https://danluu.com", - r"https://guzey.com/archive", - r"https://www.paulgraham.com/articles", - r"https://nadia.xyz/posts", - r"https://www.theredhandfiles.com", - r"https://archive.ph/.*?/https://www.bloomberg.com/opinion/authors/", - ] +def test_substack_api_parser(): + parser = SubstackAPIParser(url="https://example.substack.com/api/v1/posts") - assert len(PARSER_REGISTRY) == len(expected_patterns) - for pattern in expected_patterns: - assert pattern in PARSER_REGISTRY + entry = { + "title": "Substack Post", + "canonical_url": "https://example.substack.com/p/post-slug", + "publishedBylines": [{"name": "Author Name"}], + "post_date": "2023-01-15T10:30:00Z", + } + + assert parser.extract_title(entry) == "Substack Post" + assert parser.extract_url(entry) == "https://example.substack.com/p/post-slug" + assert parser.extract_author(entry) == "Author Name" + assert parser.extract_date(entry) == "2023-01-15T10:30:00Z" -def test_default_skip_patterns(): - """Ensure DEFAULT_SKIP_PATTERNS contains expected patterns.""" - expected_patterns = [ - r"^#", - r"mailto:", - r"tel:", - r"javascript:", - r"\.pdf$", - r"\.jpg$", - r"\.png$", - r"\.gif$", - ] +def test_substack_api_parser_missing_bylines(): + parser = SubstackAPIParser(url="https://example.substack.com/api/v1/posts") - assert DEFAULT_SKIP_PATTERNS == expected_patterns + entry = { + "title": "Post Without Author", + "canonical_url": "https://example.substack.com/p/post", + "publishedBylines": [], + "post_date": "2023-01-15T10:30:00Z", + } + + assert parser.extract_author(entry) is None diff --git a/tests/memory/common/parsers/test_html.py b/tests/memory/common/parsers/test_html.py index dbba9ca..44262e1 100644 --- a/tests/memory/common/parsers/test_html.py +++ b/tests/memory/common/parsers/test_html.py @@ -23,7 +23,11 @@ from memory.common.parsers.html import ( extract_meta_by_pattern, extract_metadata, extract_title, + extract_url, get_base_url, + is_bloomberg, + is_substack, + is_wordpress, parse_date, process_image, process_images, @@ -454,7 +458,7 @@ def test_process_images_empty(): None, "https://example.com", pathlib.Path("/tmp") ) assert result_content is None - assert result_images == [] + assert result_images == {} @patch("memory.common.parsers.html.process_image") @@ -503,6 +507,191 @@ def test_process_images_no_filename(mock_process_image): assert not images +@pytest.mark.parametrize( + "html, selectors, base_url, expected", + [ + # Basic URL extraction + ( + 'Next', + "a", + "https://example.com", + "https://example.com/next-page", + ), + # Multiple selectors - should pick first matching + ( + '
    FirstSecond
    ', + "a", + "https://example.com", + "https://example.com/first", + ), + # Multiple selectors with comma separation - span doesn't have href, so falls back to a + ( + '
    SpanLink
    ', + ".next, a", + "https://example.com", + "https://example.com/link", + ), + # Absolute URL should remain unchanged + ( + 'External', + "a", + "https://example.com", + "https://other.com/page", + ), + # No href attribute + ("No href", "a", "https://example.com", None), + # No matching element + ("

    No links

    ", "a", "https://example.com", None), + # Empty href + ('Empty', "a", "https://example.com", None), + ], +) +def test_extract_url(html, selectors, base_url, expected): + soup = BeautifulSoup(html, "html.parser") + assert extract_url(soup, selectors, base_url) == expected + + +@pytest.mark.parametrize( + "html, expected", + [ + # Substack with preconnect link + ( + """ + + + + """, + True, + ), + # Multiple preconnect links, one is Substack + ( + """ + + + + + """, + True, + ), + # No Substack preconnect + ( + """ + + + + """, + False, + ), + # No preconnect links at all + ("", False), + # Preconnect without href + ('', False), + # Different rel attribute + ('', False), + ], +) +def test_is_substack(html, expected): + soup = BeautifulSoup(html, "html.parser") + assert is_substack(soup) == expected + + +@pytest.mark.parametrize( + "html, expected", + [ + # WordPress with wp-singular class on body should be False (looks for content inside body) + ('Content', False), + # WordPress with nested wp-singular + ('
    Content
    ', True), + # Archived page with WordPress content + ( + """ +
    +
    + Content +
    +
    + """, + True, + ), + # No WordPress indicators + ('
    Regular content
    ', False), + # Empty body + ("", False), + # No body tag + ("
    No body
    ", False), + ], +) +def test_is_wordpress(html, expected): + soup = BeautifulSoup(html, "html.parser") + assert is_wordpress(soup) == expected + + +@pytest.mark.parametrize( + "html, expected", + [ + # Bloomberg with company link + ( + """ + + Bloomberg + + """, + True, + ), + # Bloomberg link among other links + ( + """ + + Example + Bloomberg + Other + + """, + True, + ), + # Archived page with Bloomberg content + ( + """ +
    +
    + + Bloomberg + +
    +
    + """, + True, + ), + # No Bloomberg links + ( + """ + + Example + Other + + """, + False, + ), + # Bloomberg link but not company page + ( + """ + + Bloomberg News + + """, + False, + ), + # No links at all + ("

    No links

    ", False), + # Links without href + ("No href", False), + ], +) +def test_is_bloomberg(html, expected): + soup = BeautifulSoup(html, "html.parser") + assert is_bloomberg(soup) == expected + + class TestBaseHTMLParser: def test_init_with_base_url(self): parser = BaseHTMLParser("https://example.com/path") @@ -584,7 +773,7 @@ class TestBaseHTMLParser: def test_parse_with_images(self, mock_process_images): # Mock the image processing to return test data mock_image = MagicMock(spec=PILImage.Image) - mock_process_images.return_value = (MagicMock(), [mock_image]) + mock_process_images.return_value = (MagicMock(), {"test_image.jpg": mock_image}) html = """
    @@ -600,5 +789,6 @@ class TestBaseHTMLParser: article = parser.parse(html, "https://example.com/article") assert len(article.images) == 1 - assert article.images[0] == mock_image + assert "test_image.jpg" in article.images + assert article.images["test_image.jpg"] == mock_image mock_process_images.assert_called_once()