add rss fetcher

This commit is contained in:
Daniel O'Connell 2025-05-26 17:24:39 +02:00
parent 482aefabe3
commit 27fbfcc548
6 changed files with 1411 additions and 36 deletions

View File

@ -2,15 +2,17 @@ import logging
import re
from datetime import datetime
from urllib.parse import urlparse
from typing import cast
import requests
from bs4 import BeautifulSoup, Tag
from memory.common.parsers.html import (
BaseHTMLParser,
Article,
parse_date,
extract_title,
extract_date,
fetch_html,
)
@ -618,48 +620,38 @@ def parse_webpage(url: str) -> Article:
Returns:
Article object with extracted content and metadata
"""
response = requests.get(
url,
timeout=30,
headers={
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:137.0) Gecko/20100101 Firefox/137.0"
},
)
response.raise_for_status()
parser = get_parser_for_url(url, response.text)
return parser.parse(response.text, url)
html = cast(str, fetch_html(url))
parser = get_parser_for_url(url, html)
return parser.parse(html, url)
blogs = [
"https://acoup.blog/",
"https://guzey.com/",
"https://akarlin.com/",
"https://aphyr.com/",
"https://www.applieddivinitystudies.com/",
"https://www.bitsaboutmoney.com/",
feeds = [
"https://archive.ph/o/IQUoT/https://www.bloomberg.com/opinion/authors/ARbTQlRLRjE/matthew-s-levine",
"https://www.rifters.com/crawl/",
"https://rachelbythebay.com/w/atom.xml",
"https://danluu.com/",
"https://mcfunley.com/",
"https://www.exurbe.com/",
"https://www.flyingmachinestudios.com/",
"https://guzey.com/archive",
"https://aphyr.com/posts.atom",
"https://www.applieddivinitystudies.com/atom.xml",
"https://www.imightbewrong.org/",
"https://www.kvetch.au/",
"https://www.overcomingbias.com/",
"https://www.rifters.com/crawl/",
"https://samkriss.substack.com/",
"https://www.paulgraham.com/articles.html",
"https://putanumonit.com/",
"https://www.richardhanania.com/",
"https://skunkledger.substack.com/",
"https://taipology.substack.com/",
"https://putanumonit.com/",
"https://www.flyingmachinestudios.com/",
"https://www.theintrinsicperspective.com/",
"https://www.strangeloopcanon.com/",
"https://slimemoldtimemold.com/",
"https://www.theredhandfiles.com/",
"https://rachelbythebay.com/w/",
"https://zeroinputagriculture.substack.com/",
"https://nadia.xyz/posts/",
"https://nayafia.substack.com",
"https://archive.ph/o/IQUoT/https://www.bloomberg.com/opinion/authors/ARbTQlRLRjE/matthew-s-levine",
"https://www.paulgraham.com/articles.html",
"https://mcfunley.com/writing",
"https://www.bitsaboutmoney.com/archive/",
"https://akarlin.com/archive/",
"https://www.exurbe.com/",
"https://acoup.blog/",
"https://www.theredhandfiles.com/",
]

View File

@ -0,0 +1,451 @@
from datetime import datetime
import logging
import re
from dataclasses import dataclass, field
from typing import Any, Generator, Sequence, cast
from urllib.parse import urljoin, urlparse
import feedparser
from bs4 import BeautifulSoup, Tag
import requests
from memory.common.parsers.html import (
get_base_url,
to_absolute_url,
extract_title,
extract_date,
fetch_html,
)
logger = logging.getLogger(__name__)
@dataclass
class FeedItem:
"""Represents a single item from a feed."""
title: str
url: str
description: str = ""
author: str | None = None
published_date: datetime | None = None
guid: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass
class FeedParser:
"""Base class for feed parsers."""
url: str
content: str | None = None
since: datetime | None = None
@property
def base_url(self) -> str:
"""Get the base URL of the feed."""
return get_base_url(self.url)
def fetch_items(self) -> Sequence[Any]:
"""Fetch items from the feed. Override in subclasses."""
return []
def parse_item(self, item: Any) -> FeedItem:
return FeedItem(
title=self.extract_title(item),
url=self.extract_url(item),
description=self.extract_description(item),
author=self.extract_author(item),
published_date=self.extract_date(item),
guid=self.extract_guid(item),
metadata=self.extract_metadata(item),
)
def valid_item(self, item: FeedItem) -> bool:
return True
def parse_feed(self) -> Generator[FeedItem, None, None]:
"""Parse feed content and return list of feed items."""
for item in self.fetch_items():
parsed_item = self.parse_item(item)
if self.valid_item(parsed_item):
yield parsed_item
def extract_title(self, entry: Any) -> str:
"""Extract title from feed entry. Override in subclasses."""
return "Untitled"
def extract_url(self, entry: Any) -> str:
"""Extract URL from feed entry. Override in subclasses."""
return ""
def extract_description(self, entry: Any) -> str:
"""Extract description from feed entry. Override in subclasses."""
return ""
def extract_author(self, entry: Any) -> str | None:
"""Extract author from feed entry. Override in subclasses."""
return None
def extract_date(self, entry: Any) -> datetime | None:
"""Extract publication date from feed entry. Override in subclasses."""
return None
def extract_guid(self, entry: Any) -> str | None:
"""Extract GUID from feed entry. Override in subclasses."""
return None
def extract_metadata(self, entry: Any) -> dict[str, Any]:
"""Extract additional metadata from feed entry. Override in subclasses."""
return {}
class RSSAtomParser(FeedParser):
"""Parser for RSS and Atom feeds using feedparser."""
def fetch_items(self) -> Sequence[Any]:
"""Fetch items from the feed."""
if self.since:
feed = feedparser.parse(self.content or self.url, modified=self.since)
else:
feed = feedparser.parse(self.content or self.url)
return feed.entries
def extract_title(self, entry: Any) -> str:
"""Extract title from RSS/Atom entry."""
return getattr(entry, "title", "Untitled")
def extract_url(self, entry: Any) -> str:
"""Extract URL from RSS/Atom entry."""
url = getattr(entry, "link", "")
if url and not urlparse(url).scheme:
url = urljoin(self.base_url, url)
return url
def extract_description(self, entry: Any) -> str:
"""Extract description from RSS/Atom entry."""
return getattr(entry, "summary", "") or getattr(entry, "description", "")
def extract_author(self, entry: Any) -> str | None:
"""Extract author from RSS/Atom entry."""
return getattr(entry, "author", None) or getattr(
entry, "author_detail", {}
).get("name", None)
def extract_date(self, entry: Any) -> datetime | None:
"""Extract publication date from RSS/Atom entry."""
for date_attr in ["published_parsed", "updated_parsed"]:
time_struct = getattr(entry, date_attr, None)
if not time_struct:
continue
try:
return datetime(*time_struct[:6])
except (TypeError, ValueError):
continue
return None
def extract_guid(self, entry: Any) -> str | None:
"""Extract GUID from RSS/Atom entry."""
return getattr(entry, "id", None) or getattr(entry, "guid", None)
def extract_metadata(self, entry: Any) -> dict[str, Any]:
"""Extract additional metadata from RSS/Atom entry."""
return {
attr: getattr(entry, attr)
for attr in ["tags", "category", "categories", "enclosures"]
if hasattr(entry, attr)
}
DEFAULT_SKIP_PATTERNS = [
r"^#", # Fragment-only links
r"mailto:",
r"tel:",
r"javascript:",
r"\.pdf$",
r"\.jpg$",
r"\.png$",
r"\.gif$",
]
class HTMLListParser(FeedParser):
"""Parser for HTML pages containing lists of article links.
Requires explicit selectors to be specified - no magic defaults.
"""
item_selector: str = "li"
url_selector: str = "a[href]"
skip_patterns: list[str] = DEFAULT_SKIP_PATTERNS
title_selector: str | None = None
description_selector: str | None = None
date_selector: str | None = None
date_format: str = "%Y-%m-%d"
def fetch_items(self) -> Sequence[Any]:
"""Fetch items from the HTML page."""
if not self.content:
self.content = cast(str, fetch_html(self.url))
soup = BeautifulSoup(self.content, "html.parser")
items = []
seen_urls = set()
tags = soup.select(self.item_selector)
for tag in tags:
if not isinstance(tag, Tag):
continue
url = self.extract_url(tag)
if url in seen_urls or self._should_skip_url(url):
continue
seen_urls.add(url)
items.append(tag)
return items
def _should_skip_url(self, url: str) -> bool:
"""Check if URL should be skipped."""
return any(
re.search(pattern, url, re.IGNORECASE) for pattern in self.skip_patterns
)
def extract_title(self, entry: Any) -> str | None:
"""Extract title from HTML entry."""
if self.title_selector:
return extract_title(entry, self.title_selector)
def extract_description(self, entry: Any) -> str | None:
"""Extract description from HTML entry."""
if not self.description_selector:
return None
desc = entry.select_one(self.description_selector)
return desc and desc.get_text(strip=True)
def extract_url(self, entry: Any) -> str:
"""Extract URL from HTML entry."""
if not (link := entry.select_one(self.url_selector)):
return ""
if not (href := link.get("href")):
return ""
return to_absolute_url(href, self.base_url)
def extract_date(self, entry: Any) -> datetime | None:
if self.date_selector:
return extract_date(entry, self.date_selector, self.date_format)
class DanluuParser(HTMLListParser):
skip_patterns = [r"^https://danluu.com/#"]
def valid_item(self, item: FeedItem) -> bool:
return item.url.startswith(self.base_url)
class GuzeyParser(HTMLListParser):
item_selector = "li a[href]"
skip_patterns = DEFAULT_SKIP_PATTERNS + [r"docs\.google\.com"]
def valid_item(self, item: FeedItem) -> bool:
# Only include items that are actual blog posts (relative URLs or guzey.com URLs)
return (
item.url.startswith(self.base_url)
or item.url.startswith("../")
or not item.url.startswith("http")
)
class PaulGrahamParser(HTMLListParser):
item_selector = "font a[href]"
skip_patterns = DEFAULT_SKIP_PATTERNS + [
r"\.txt$", # Skip text files
r"turbifycdn\.com", # Skip CDN links
]
def valid_item(self, item: FeedItem) -> bool:
# Only include items that are actual essays (relative URLs ending in .html)
return (
item.url.endswith(".html")
and not item.url.startswith("http")
and len(item.title) > 5 # Filter out very short titles
)
class NadiaXyzParser(HTMLListParser):
item_selector = ".blog.all li"
skip_patterns = DEFAULT_SKIP_PATTERNS + [
r"twitter\.com",
r"newsletter",
r"projects",
r"notes",
]
date_selector = ".date"
date_format = "%B %d, %Y"
description_selector = "p"
def valid_item(self, item: FeedItem) -> bool:
# Only include actual blog posts (relative URLs or nadia.xyz URLs)
return (
item.url.startswith(self.base_url)
or item.url.startswith("/")
or (not item.url.startswith("http") and item.url.endswith("/"))
)
class RedHandFilesParser(HTMLListParser):
item_selector = "article, .issue, .post"
url_selector = "a[href]"
title_selector = "h2, .issue-title"
description_selector = "p"
skip_patterns = DEFAULT_SKIP_PATTERNS + [
r"/joy",
r"/about",
r"/subscribe",
r"/ask",
r"privacy-policy",
r"#",
]
def valid_item(self, item: FeedItem) -> bool:
# Only include actual issues (should have "Issue #" in title or URL)
return (
item.url.startswith(self.base_url)
and ("issue" in item.url.lower() or "issue #" in item.title.lower())
and len(item.title) > 10
)
def extract_title(self, entry: Any) -> str:
"""Extract title, combining issue number and question."""
# Look for issue number
issue_elem = entry.select_one("h3, .issue-number")
issue_text = issue_elem.get_text(strip=True) if issue_elem else ""
# Look for the main question/title
title_elem = entry.select_one("h2, .issue-title, .question")
title_text = title_elem.get_text(strip=True) if title_elem else ""
# Combine them
if issue_text and title_text:
return f"{issue_text}: {title_text}"
elif title_text:
return title_text
elif issue_text:
return issue_text
# Fallback to any link text
link = entry.select_one(self.url_selector)
return link.get_text(strip=True) if link else "Untitled"
def extract_description(self, entry: Any) -> str:
"""Extract the question text as description."""
# Look for the question text in h2 or similar
desc_elem = entry.select_one("h2, .question, .issue-title")
if desc_elem:
text = desc_elem.get_text(strip=True)
# Clean up and truncate if too long
if len(text) > 200:
text = text[:200] + "..."
return text
return ""
class BloombergAuthorParser(HTMLListParser):
item_selector = "section#author_page article"
url_selector = "a[href]"
title_selector = "article div a"
description_selector = "article div section"
skip_patterns = DEFAULT_SKIP_PATTERNS + [
r"/authors/",
r"/topics/",
r"/subscribe",
r"/newsletter/",
r"#",
r"mailto:",
]
def valid_item(self, item: FeedItem) -> bool:
# Only include actual articles
return (
(
item.url.startswith("https://www.bloomberg.com")
or item.url.startswith("https://archive.ph")
or item.url.startswith("/")
)
and (
"opinion" in item.url.lower()
or "news" in item.url.lower()
or len(item.url.split("/")) > 4
)
and len(item.title) > 10
)
def is_rss_feed(content: str) -> bool:
"""Check if content appears to be an XML feed."""
content_lower = content.strip().lower()
return (
content_lower.startswith("<?xml")
or "<rss" in content_lower
or "<feed" in content_lower
or "<atom" in content_lower
)
def extract_url(element: Tag, base_url: str) -> str | None:
if not (href := element.get("href")):
return None
return to_absolute_url(str(href), base_url)
def find_feed_link(url: str, soup: BeautifulSoup) -> str | None:
head = soup.find("head")
if not head:
return None
for type_ in ["application/rss+xml", "application/atom+xml"]:
links = head.find_all("link", {"rel": "alternate", "type": type_}) # type: ignore
for link in links:
if not isinstance(link, Tag):
continue
if not (link_url := extract_url(link, url)):
continue
if link_url.rstrip("/") != url.rstrip("/"):
return link_url
return None
PARSER_REGISTRY = {
r"https://danluu.com": DanluuParser,
r"https://guzey.com/archive": GuzeyParser,
r"https://www.paulgraham.com/articles": PaulGrahamParser,
r"https://nadia.xyz/posts": NadiaXyzParser,
r"https://www.theredhandfiles.com": RedHandFilesParser,
r"https://archive.ph/.*?/https://www.bloomberg.com/opinion/authors/": BloombergAuthorParser,
}
def get_feed_parser(url: str, check_from: datetime | None = None) -> FeedParser | None:
for pattern, parser_class in PARSER_REGISTRY.items():
if re.search(pattern, url.rstrip("/")):
return parser_class(url=url, since=check_from)
text = cast(str, fetch_html(url))
if is_rss_feed(text):
return RSSAtomParser(url=url, content=text, since=check_from)
soup = BeautifulSoup(text, "html.parser")
if feed_link := find_feed_link(url, soup):
return RSSAtomParser(url=feed_link, since=check_from)
for path in ["/archive", "/posts", "/feed"]:
if url.rstrip("/").endswith(path):
continue
try:
if parser := get_feed_parser(url + path, check_from):
return parser
except requests.HTTPError:
continue
return None

View File

@ -17,6 +17,20 @@ from memory.common import settings
logger = logging.getLogger(__name__)
def fetch_html(url: str, as_bytes: bool = False) -> str | bytes:
response = requests.get(
url,
timeout=30,
headers={
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:137.0) Gecko/20100101 Firefox/137.0"
},
)
response.raise_for_status()
if as_bytes:
return response.content
return response.text
@dataclass
class Article:
"""Structured representation of a web article."""
@ -135,9 +149,7 @@ def process_image(url: str, image_dir: pathlib.Path) -> PILImage.Image | None:
# Download if not already cached
if not local_path.exists():
response = requests.get(url, timeout=30)
response.raise_for_status()
local_path.write_bytes(response.content)
local_path.write_bytes(fetch_html(url, as_bytes=True))
try:
return PILImage.open(local_path)
@ -153,10 +165,10 @@ def process_images(
Process all images in content: download them, update URLs, and return PIL Images.
Returns:
Tuple of (updated_content, list_of_pil_images)
Tuple of (updated_content, dict_of_pil_images)
"""
if not content:
return content, []
return content, {}
images = {}

View File

@ -0,0 +1,176 @@
import pytest
from unittest.mock import Mock, patch
from bs4 import BeautifulSoup
from memory.common.parsers.archives import (
ArchiveParser,
WordPressArchiveParser,
SubstackArchiveParser,
get_archive_parser,
)
class TestArchiveParser:
def test_init(self):
parser = ArchiveParser(url="https://example.com")
assert parser.url == "https://example.com"
assert parser._visited_urls == set()
assert parser._all_items == []
assert parser.max_pages == 100
assert parser.delay_between_requests == 1.0
def test_extract_items_from_page(self):
html = """
<div>
<li><a href="/post1">Post 1</a></li>
<li><a href="/post2">Post 2</a></li>
<li><a href="/post1">Post 1</a></li> <!-- Duplicate -->
</div>
"""
soup = BeautifulSoup(html, "html.parser")
parser = ArchiveParser(url="https://example.com")
items = parser._extract_items_from_page(soup)
assert len(items) == 2 # Duplicates should be filtered out
def test_find_next_page_url_with_selector(self):
html = '<div><a class="next" href="/page/2">Next</a></div>'
soup = BeautifulSoup(html, "html.parser")
parser = ArchiveParser(url="https://example.com")
parser.next_page_selector = ".next"
next_url = parser._find_next_page_url(soup, "https://example.com/page/1")
assert next_url == "https://example.com/page/2"
def test_find_next_page_url_heuristic(self):
html = '<div><a rel="next" href="/page/2">Next</a></div>'
soup = BeautifulSoup(html, "html.parser")
parser = ArchiveParser(url="https://example.com")
next_url = parser._find_next_page_url(soup, "https://example.com/page/1")
assert next_url == "https://example.com/page/2"
def test_find_next_page_url_contains_text(self):
html = '<div><a href="/page/2">Next →</a></div>'
soup = BeautifulSoup(html, "html.parser")
parser = ArchiveParser(url="https://example.com")
next_url = parser._find_next_page_heuristic(soup)
assert next_url == "https://example.com/page/2"
def test_find_next_numeric_page(self):
parser = ArchiveParser(url="https://example.com")
parser.page_url_pattern = "/page/{page}"
# Test with existing page number
next_url = parser._find_next_numeric_page("https://example.com/page/3")
assert next_url == "https://example.com/page/4"
# Test without page number (assume page 1)
next_url = parser._find_next_numeric_page("https://example.com/archive")
assert next_url == "https://example.com/archive/page/2"
@patch("memory.common.parsers.archives.fetch_html")
@patch("time.sleep")
def test_fetch_items_pagination(self, mock_sleep, mock_fetch):
# Mock HTML for two pages
page1_html = """
<div>
<li><a href="/post1">Post 1</a></li>
<li><a href="/post2">Post 2</a></li>
<a rel="next" href="/page/2">Next</a>
</div>
"""
page2_html = """
<div>
<li><a href="/post3">Post 3</a></li>
<li><a href="/post4">Post 4</a></li>
</div>
"""
mock_fetch.side_effect = [page1_html, page2_html]
parser = ArchiveParser(url="https://example.com/page/1")
parser.delay_between_requests = 0.1 # Speed up test
items = parser.fetch_items()
assert len(items) == 4
assert mock_fetch.call_count == 2
assert mock_sleep.call_count == 1 # One delay between requests
@patch("memory.common.parsers.archives.fetch_html")
def test_fetch_items_stops_at_max_pages(self, mock_fetch):
# Mock HTML that always has a next page
html_with_next = """
<div>
<li><a href="/post">Post</a></li>
<a rel="next" href="/page/999">Next</a>
</div>
"""
mock_fetch.return_value = html_with_next
parser = ArchiveParser(url="https://example.com/page/1")
parser.max_pages = 3
parser.delay_between_requests = 0 # No delay for test
items = parser.fetch_items()
assert mock_fetch.call_count == 3 # Should stop at max_pages
@patch("memory.common.parsers.archives.fetch_html")
def test_fetch_items_handles_duplicate_urls(self, mock_fetch):
# Mock HTML that creates a cycle
page1_html = """
<div>
<li><a href="/post1">Post 1</a></li>
<a rel="next" href="/page/2">Next</a>
</div>
"""
page2_html = """
<div>
<li><a href="/post2">Post 2</a></li>
<a rel="next" href="/page/1">Back to page 1</a>
</div>
"""
mock_fetch.side_effect = [page1_html, page2_html]
parser = ArchiveParser(url="https://example.com/page/1")
parser.delay_between_requests = 0
items = parser.fetch_items()
assert len(items) == 2
assert mock_fetch.call_count == 2 # Should stop when it hits visited URL
class TestWordPressArchiveParser:
def test_selectors(self):
parser = WordPressArchiveParser(url="https://example.wordpress.com")
assert parser.item_selector == "article, .post"
assert parser.next_page_selector == '.nav-previous a, .next a, a[rel="next"]'
assert parser.title_selector == ".entry-title a, h1 a, h2 a"
class TestSubstackArchiveParser:
def test_selectors(self):
parser = SubstackArchiveParser(url="https://example.substack.com")
assert parser.item_selector == ".post-preview, .post"
assert parser.next_page_selector == ".pagination .next"
class TestGetArchiveParser:
@pytest.mark.parametrize(
"url,expected_class",
[
("https://example.wordpress.com/archive", WordPressArchiveParser),
("https://example.substack.com/archive", SubstackArchiveParser),
("https://example.com/archive", ArchiveParser), # Default
],
)
def test_get_archive_parser(self, url, expected_class):
parser = get_archive_parser(url)
assert isinstance(parser, expected_class)
assert parser.url == url

View File

@ -0,0 +1,738 @@
from datetime import datetime
from unittest.mock import MagicMock, patch
from typing import Any, cast
import pytest
from bs4 import BeautifulSoup, Tag
import requests
from memory.common.parsers.feeds import (
FeedItem,
FeedParser,
RSSAtomParser,
HTMLListParser,
DanluuParser,
GuzeyParser,
PaulGrahamParser,
NadiaXyzParser,
RedHandFilesParser,
BloombergAuthorParser,
is_rss_feed,
extract_url,
find_feed_link,
get_feed_parser,
DEFAULT_SKIP_PATTERNS,
PARSER_REGISTRY,
)
def test_feed_parser_base_url():
parser = FeedParser(url="https://example.com/path/to/feed")
assert parser.base_url == "https://example.com"
def test_feed_parser_parse_feed_empty():
parser = FeedParser(url="https://example.com")
items = list(parser.parse_feed())
assert items == []
def test_feed_parser_parse_feed_with_items():
class TestParser(FeedParser):
def fetch_items(self):
return ["item1", "item2"]
def extract_title(self, entry):
return f"Title for {entry}"
def extract_url(self, entry):
return f"https://example.com/{entry}"
parser = TestParser(url="https://example.com")
assert list(parser.parse_feed()) == [
FeedItem(title="Title for item1", url="https://example.com/item1"),
FeedItem(title="Title for item2", url="https://example.com/item2"),
]
def test_feed_parser_parse_feed_with_invalid_items():
class TestParser(FeedParser):
def fetch_items(self):
return ["valid", "invalid"]
def extract_title(self, entry):
return f"Title for {entry}"
def extract_url(self, entry):
return f"https://example.com/{entry}"
def valid_item(self, item):
return item.title == "Title for valid"
parser = TestParser(url="https://example.com")
assert list(parser.parse_feed()) == [
FeedItem(title="Title for valid", url="https://example.com/valid"),
]
@patch("memory.common.parsers.feeds.feedparser.parse")
@pytest.mark.parametrize("since_date", [None, datetime(2023, 1, 1)])
def test_rss_atom_parser_fetch_items(mock_parse, since_date):
mock_feed = MagicMock()
mock_feed.entries = ["entry1", "entry2"]
mock_parse.return_value = mock_feed
parser = RSSAtomParser(url="https://example.com/feed.xml", since=since_date)
items = parser.fetch_items()
if since_date:
mock_parse.assert_called_once_with(
"https://example.com/feed.xml", modified=since_date
)
else:
mock_parse.assert_called_once_with("https://example.com/feed.xml")
assert items == ["entry1", "entry2"]
@patch("memory.common.parsers.feeds.feedparser.parse")
def test_rss_atom_parser_fetch_items_with_content(mock_parse):
mock_feed = MagicMock()
mock_feed.entries = ["entry1"]
mock_parse.return_value = mock_feed
content = "<rss>...</rss>"
parser = RSSAtomParser(url="https://example.com/feed.xml", content=content)
items = parser.fetch_items()
mock_parse.assert_called_once_with(content)
assert items == ["entry1"]
@pytest.mark.parametrize(
"entry_attrs, expected",
[
({"title": "Test Title"}, "Test Title"),
({}, "Untitled"),
],
)
def test_rss_atom_parser_extract_title(entry_attrs, expected):
parser = RSSAtomParser(url="https://example.com")
entry = MagicMock()
for attr, value in entry_attrs.items():
setattr(entry, attr, value)
# Remove attributes not in entry_attrs
if "title" not in entry_attrs:
del entry.title
assert parser.extract_title(entry) == expected
@pytest.mark.parametrize(
"entry_attrs, expected",
[
({"link": "https://other.com/article"}, "https://other.com/article"),
({"link": "/article"}, "https://example.com/article"),
({}, ""),
],
)
def test_rss_atom_parser_extract_url(entry_attrs, expected):
parser = RSSAtomParser(url="https://example.com")
entry = MagicMock()
for attr, value in entry_attrs.items():
setattr(entry, attr, value)
if "link" not in entry_attrs:
del entry.link
assert parser.extract_url(entry) == expected
@pytest.mark.parametrize(
"entry_attrs, expected",
[
(
{"summary": "Test summary", "description": "Test description"},
"Test summary",
),
({"summary": "", "description": "Test description"}, "Test description"),
({}, ""),
],
)
def test_rss_atom_parser_extract_description(entry_attrs, expected):
parser = RSSAtomParser(url="https://example.com")
entry = MagicMock()
for attr, value in entry_attrs.items():
setattr(entry, attr, value)
for attr in ["summary", "description"]:
if attr not in entry_attrs:
delattr(entry, attr)
assert parser.extract_description(entry) == expected
@pytest.mark.parametrize(
"entry_attrs, expected",
[
({"author": "John Doe"}, "John Doe"),
({"author": None, "author_detail": {"name": "Jane Smith"}}, "Jane Smith"),
({"author": None, "author_detail": {}}, None),
],
)
def test_rss_atom_parser_extract_author(entry_attrs, expected):
parser = RSSAtomParser(url="https://example.com")
entry = MagicMock()
for attr, value in entry_attrs.items():
setattr(entry, attr, value)
assert parser.extract_author(entry) == expected
@pytest.mark.parametrize(
"entry_attrs, expected",
[
(
{
"published_parsed": (2023, 1, 15, 10, 30, 0, 0, 0, 0),
"updated_parsed": None,
},
datetime(2023, 1, 15, 10, 30, 0),
),
(
{
"published_parsed": None,
"updated_parsed": (2023, 2, 20, 14, 45, 30, 0, 0, 0),
},
datetime(2023, 2, 20, 14, 45, 30),
),
({"published_parsed": "invalid", "updated_parsed": None}, None),
({}, None),
],
)
def test_rss_atom_parser_extract_date(entry_attrs, expected):
parser = RSSAtomParser(url="https://example.com")
entry = MagicMock()
for attr, value in entry_attrs.items():
setattr(entry, attr, value)
for attr in ["published_parsed", "updated_parsed"]:
if attr not in entry_attrs:
delattr(entry, attr)
assert parser.extract_date(entry) == expected
@pytest.mark.parametrize(
"entry_attrs, expected",
[
({"id": "unique-id-123", "guid": "guid-456"}, "unique-id-123"),
({"id": None, "guid": "guid-456"}, "guid-456"),
({"id": None, "guid": None}, None),
],
)
def test_rss_atom_parser_extract_guid(entry_attrs, expected):
parser = RSSAtomParser(url="https://example.com")
entry = MagicMock()
for attr, value in entry_attrs.items():
setattr(entry, attr, value)
assert parser.extract_guid(entry) == expected
def test_rss_atom_parser_extract_metadata():
parser = RSSAtomParser(url="https://example.com")
entry = MagicMock()
entry.tags = ["tag1", "tag2"]
entry.category = "news"
entry.categories = ["tech", "science"]
entry.enclosures = ["file1.mp3"]
entry.other_attr = "should not be included"
metadata = parser.extract_metadata(entry)
assert metadata == {
"tags": ["tag1", "tag2"],
"category": "news",
"categories": ["tech", "science"],
"enclosures": ["file1.mp3"],
}
@patch("memory.common.parsers.feeds.fetch_html")
def test_html_list_parser_fetch_items_with_content(mock_fetch_html):
html = """
<ul>
<li><a href="/article1">Article 1</a></li>
<li><a href="/article2">Article 2</a></li>
<li><a href="mailto:test@example.com">Email</a></li>
</ul>
"""
parser = HTMLListParser(url="https://example.com", content=html)
assert [a.prettify() for a in parser.fetch_items()] == [
'<li>\n <a href="/article1">\n Article 1\n </a>\n</li>\n',
'<li>\n <a href="/article2">\n Article 2\n </a>\n</li>\n',
]
mock_fetch_html.assert_not_called()
@patch("memory.common.parsers.feeds.fetch_html")
def test_html_list_parser_fetch_items_without_content(mock_fetch_html):
html = """
<ul>
<li><a href="/article1">Article 1</a></li>
</ul>
"""
mock_fetch_html.return_value = html
parser = HTMLListParser(url="https://example.com")
assert [a.prettify() for a in parser.fetch_items()] == [
'<li>\n <a href="/article1">\n Article 1\n </a>\n</li>\n',
]
mock_fetch_html.assert_called_once_with("https://example.com")
def test_html_list_parser_fetch_items_deduplication():
html = """
<ul>
<li><a href="/article1">Article 1</a></li>
<li><a href="/article1">Article 1 Duplicate</a></li>
<li><a href="/article2">Article 2</a></li>
</ul>
"""
parser = HTMLListParser(url="https://example.com", content=html)
assert [a.prettify() for a in parser.fetch_items()] == [
'<li>\n <a href="/article1">\n Article 1\n </a>\n</li>\n',
'<li>\n <a href="/article2">\n Article 2\n </a>\n</li>\n',
]
@pytest.mark.parametrize(
"url, should_skip",
[
("#fragment", True),
("mailto:test@example.com", True),
("tel:+1234567890", True),
("javascript:void(0)", True),
("document.pdf", True),
("image.jpg", True),
("photo.png", True),
("animation.gif", True),
("https://example.com/article", False),
("/relative/path", False),
],
)
def test_html_list_parser_should_skip_url(url, should_skip):
parser = HTMLListParser(url="https://example.com")
assert parser._should_skip_url(url) == should_skip
@pytest.mark.parametrize(
"html, title_selector, expected",
[
(
'<li><h2>Custom Title</h2><a href="/link">Link</a></li>',
"h2",
"Custom Title",
),
('<li><a href="/link">Link</a></li>', None, None),
],
)
def test_html_list_parser_extract_title(html, title_selector, expected):
soup = BeautifulSoup(html, "html.parser")
item = soup.find("li")
parser = HTMLListParser(url="https://example.com")
parser.title_selector = title_selector
if expected and title_selector:
with patch("memory.common.parsers.feeds.extract_title") as mock_extract:
mock_extract.return_value = expected
title = parser.extract_title(item)
mock_extract.assert_called_once_with(item, title_selector)
assert title == expected
else:
assert parser.extract_title(item) is None
@pytest.mark.parametrize(
"html, description_selector, expected",
[
(
'<li><p>Description text</p><a href="/link">Link</a></li>',
"p",
"Description text",
),
('<li><a href="/link">Link</a></li>', None, None),
],
)
def test_html_list_parser_extract_description(html, description_selector, expected):
soup = BeautifulSoup(html, "html.parser")
item = soup.find("li")
parser = HTMLListParser(url="https://example.com")
parser.description_selector = description_selector
assert parser.extract_description(item) == expected
@pytest.mark.parametrize(
"html, expected",
[
('<li><a href="/article">Article</a></li>', "https://example.com/article"),
("<li>No link here</li>", ""),
],
)
def test_html_list_parser_extract_url(html, expected):
soup = BeautifulSoup(html, "html.parser")
item = soup.find("li")
parser = HTMLListParser(url="https://example.com")
assert parser.extract_url(item) == expected
def test_html_list_parser_extract_date_with_selector():
html = '<li><span class="date">2023-01-15</span><a href="/link">Link</a></li>'
soup = BeautifulSoup(html, "html.parser")
item = soup.find("li")
parser = HTMLListParser(url="https://example.com")
parser.date_selector = ".date"
with patch("memory.common.parsers.feeds.extract_date") as mock_extract:
mock_extract.return_value = datetime(2023, 1, 15)
date = parser.extract_date(item)
mock_extract.assert_called_once_with(item, ".date", "%Y-%m-%d")
assert date == datetime(2023, 1, 15)
def test_html_list_parser_extract_date_without_selector():
html = '<li><a href="/link">Link</a></li>'
soup = BeautifulSoup(html, "html.parser")
item = soup.find("li")
parser = HTMLListParser(url="https://example.com")
assert parser.extract_date(item) is None
@pytest.mark.parametrize(
"parser_class, url, valid_urls, invalid_urls",
[
(
DanluuParser,
"https://danluu.com",
["https://danluu.com/article"],
["https://other.com/article"],
),
(
GuzeyParser,
"https://guzey.com/archive",
["https://guzey.com/archive/article", "../relative", "relative"],
["https://other.com/article"],
),
(
PaulGrahamParser,
"https://www.paulgraham.com/articles",
[("Long enough title", "essay.html")],
[
("Short", "essay.html"),
("Long enough title", "https://other.com/essay.html"),
("Long enough title", "document.txt"),
],
),
(
NadiaXyzParser,
"https://nadia.xyz/posts",
["https://nadia.xyz/posts/article", "/article", "article/"],
["https://other.com/article"],
),
(
RedHandFilesParser,
"https://www.theredhandfiles.com",
[
(
"Issue #123: Long question",
"https://www.theredhandfiles.com/issue-123",
),
("Long enough title", "https://www.theredhandfiles.com/some-issue"),
],
[
("Short", "https://www.theredhandfiles.com/issue-123"),
("Long enough title", "https://other.com/issue"),
("Long enough title", "https://www.theredhandfiles.com/about"),
],
),
(
BloombergAuthorParser,
"https://archive.ph/123/https://www.bloomberg.com/opinion/authors/",
[
(
"Long enough title",
"https://www.bloomberg.com/opinion/articles/2023/01/15/article",
),
("Long enough title", "/news/articles/2023/01/15/article"),
(
"Long enough title",
"https://archive.ph/2023/01/15/some/article/path",
),
],
[
(
"Short",
"https://www.bloomberg.com/opinion/articles/2023/01/15/article",
),
("Long enough title", "https://other.com/article"),
("Long enough title", "https://www.bloomberg.com/simple"),
],
),
],
)
def test_specific_parsers_valid_item(parser_class, url, valid_urls, invalid_urls):
parser = parser_class(url=url)
# Test valid items
for item_data in valid_urls:
if isinstance(item_data, tuple):
title, url_val = item_data
item = FeedItem(title=title, url=url_val)
else:
item = FeedItem(title="Test", url=item_data)
assert parser.valid_item(item) is True
# Test invalid items
for item_data in invalid_urls:
if isinstance(item_data, tuple):
title, url_val = item_data
item = FeedItem(title=title, url=url_val)
else:
item = FeedItem(title="Test", url=item_data)
assert parser.valid_item(item) is False
def test_red_hand_files_extract_title():
html = """
<article>
<h3>Issue #123</h3>
<h2>What is the meaning of life?</h2>
<a href="/issue-123">Link</a>
</article>
"""
soup = BeautifulSoup(html, "html.parser")
item = soup.find("article")
parser = RedHandFilesParser(url="https://www.theredhandfiles.com")
title = parser.extract_title(item)
assert title == "Issue #123: What is the meaning of life?"
def test_red_hand_files_extract_description():
# Create a text that's definitely longer than 200 characters
long_text = "This is a very long question that should be truncated because it exceeds the maximum length limit of 200 characters and we want to make sure that the description is not too long for display purposes and this text continues to be very long indeed to ensure truncation happens"
html = f"""
<article>
<h2>{long_text}</h2>
</article>
"""
soup = BeautifulSoup(html, "html.parser")
item = soup.find("article")
parser = RedHandFilesParser(url="https://www.theredhandfiles.com")
description = parser.extract_description(item)
assert len(description) <= 203 # 200 + "..."
assert description.endswith("...")
@pytest.mark.parametrize(
"content, expected",
[
("<?xml version='1.0'?><rss>", True),
("<rss version='2.0'>", True),
("<feed xmlns='http://www.w3.org/2005/Atom'>", True),
("<atom:feed>", True),
(" <?XML version='1.0'?>", True), # Case insensitive
("<html><body>Not a feed</body></html>", False),
("Plain text content", False),
("", False),
],
)
def test_is_rss_feed(content, expected):
assert is_rss_feed(content) == expected
@pytest.mark.parametrize(
"html, expected",
[
('<a href="/relative/path">Link</a>', "https://example.com/relative/path"),
("<a>Link without href</a>", None),
],
)
def test_extract_url_function(html, expected):
soup = BeautifulSoup(html, "html.parser")
element = soup.find("a")
assert element is not None
url = extract_url(cast(Tag, element), "https://example.com")
assert url == expected
@pytest.mark.parametrize(
"html, expected",
[
(
"""
<html>
<head>
<link rel="alternate" type="application/rss+xml" href="/feed.xml">
<link rel="alternate" type="application/atom+xml" href="/atom.xml">
</head>
</html>
""",
"https://example.com/feed.xml",
),
("<html><body>No head</body></html>", None),
(
"""
<html>
<head>
<link rel="alternate" type="application/rss+xml" href="https://example.com">
</head>
</html>
""",
None,
), # Should not return same URL
],
)
def test_find_feed_link(html, expected):
soup = BeautifulSoup(html, "html.parser")
feed_link = find_feed_link("https://example.com", soup)
assert feed_link == expected
@pytest.mark.parametrize(
"url, expected_parser_class",
[
("https://danluu.com", DanluuParser),
("https://guzey.com/archive", GuzeyParser),
("https://www.paulgraham.com/articles", PaulGrahamParser),
("https://nadia.xyz/posts", NadiaXyzParser),
("https://www.theredhandfiles.com", RedHandFilesParser),
(
"https://archive.ph/abc123/https://www.bloomberg.com/opinion/authors/john-doe",
BloombergAuthorParser,
),
],
)
def test_get_feed_parser_registry(url, expected_parser_class):
parser = get_feed_parser(url)
assert parser is not None
assert isinstance(parser, expected_parser_class)
assert parser.url == url
@patch("memory.common.parsers.feeds.fetch_html")
def test_get_feed_parser_rss_content(mock_fetch_html):
mock_fetch_html.return_value = "<?xml version='1.0'?><rss>"
parser = get_feed_parser("https://example.com/unknown")
assert isinstance(parser, RSSAtomParser)
assert parser.url == "https://example.com/unknown"
@patch("memory.common.parsers.feeds.fetch_html")
def test_get_feed_parser_with_feed_link(mock_fetch_html):
html = """
<html>
<head>
<link rel="alternate" type="application/rss+xml" href="/feed.xml">
</head>
</html>
"""
mock_fetch_html.return_value = html
parser = get_feed_parser("https://example.com")
assert isinstance(parser, RSSAtomParser)
assert parser.url == "https://example.com/feed.xml"
@patch("memory.common.parsers.feeds.fetch_html")
def test_get_feed_parser_recursive_paths(mock_fetch_html):
# Mock the initial call to return HTML without feed links
html = "<html><body>No feed links</body></html>"
mock_fetch_html.return_value = html
# Mock the recursive calls to avoid actual HTTP requests
with patch("memory.common.parsers.feeds.get_feed_parser") as mock_recursive:
# Set up the mock to return None for recursive calls
mock_recursive.return_value = None
# Call the original function directly
from memory.common.parsers.feeds import (
get_feed_parser as original_get_feed_parser,
)
parser = original_get_feed_parser("https://example.com")
assert parser is None
@patch("memory.common.parsers.feeds.fetch_html")
def test_get_feed_parser_no_match(mock_fetch_html):
html = "<html><body>No feed links</body></html>"
mock_fetch_html.return_value = html
# Mock the recursive calls to avoid actual HTTP requests
with patch("memory.common.parsers.feeds.get_feed_parser") as mock_recursive:
mock_recursive.return_value = None
parser = get_feed_parser("https://unknown.com")
assert parser is None
def test_get_feed_parser_with_check_from():
check_from = datetime(2023, 1, 1)
parser = get_feed_parser("https://danluu.com", check_from)
assert isinstance(parser, DanluuParser)
assert parser.since == check_from
def test_parser_registry_completeness():
"""Ensure PARSER_REGISTRY contains expected parsers."""
expected_patterns = [
r"https://danluu.com",
r"https://guzey.com/archive",
r"https://www.paulgraham.com/articles",
r"https://nadia.xyz/posts",
r"https://www.theredhandfiles.com",
r"https://archive.ph/.*?/https://www.bloomberg.com/opinion/authors/",
]
assert len(PARSER_REGISTRY) == len(expected_patterns)
for pattern in expected_patterns:
assert pattern in PARSER_REGISTRY
def test_default_skip_patterns():
"""Ensure DEFAULT_SKIP_PATTERNS contains expected patterns."""
expected_patterns = [
r"^#",
r"mailto:",
r"tel:",
r"javascript:",
r"\.pdf$",
r"\.jpg$",
r"\.png$",
r"\.gif$",
]
assert DEFAULT_SKIP_PATTERNS == expected_patterns

View File

@ -325,7 +325,13 @@ def test_process_image_success(mock_pil_open, mock_requests_get):
result = process_image(url, image_dir)
# Verify HTTP request was made
mock_requests_get.assert_called_once_with(url, timeout=30)
mock_requests_get.assert_called_once_with(
url,
timeout=30,
headers={
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:137.0) Gecko/20100101 Firefox/137.0"
},
)
mock_response.raise_for_status.assert_called_once()
# Verify image was opened