diff --git a/src/memory/common/db/models.py b/src/memory/common/db/models.py index 4a5b01f..c329856 100644 --- a/src/memory/common/db/models.py +++ b/src/memory/common/db/models.py @@ -4,9 +4,10 @@ Database models for the knowledge base system. import pathlib import re -from pathlib import Path import textwrap +from datetime import datetime from typing import Any, ClassVar, cast + from PIL import Image from sqlalchemy import ( ARRAY, @@ -27,10 +28,10 @@ from sqlalchemy import ( ) from sqlalchemy.dialects.postgresql import BYTEA, JSONB, TSVECTOR from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import relationship, Session +from sqlalchemy.orm import Session, relationship from memory.common import settings -from memory.common.parsers.email import parse_email_message, EmailMessage +from memory.common.parsers.email import EmailMessage, parse_email_message Base = declarative_base() @@ -215,13 +216,13 @@ class MailMessage(SourceItem): } @property - def attachments_path(self) -> Path: + def attachments_path(self) -> pathlib.Path: clean_sender = clean_filename(cast(str, self.sender)) clean_folder = clean_filename(cast(str | None, self.folder) or "INBOX") - return Path(settings.FILE_STORAGE_DIR) / clean_sender / clean_folder + return pathlib.Path(settings.FILE_STORAGE_DIR) / clean_sender / clean_folder - def safe_filename(self, filename: str) -> Path: - suffix = Path(filename).suffix + def safe_filename(self, filename: str) -> pathlib.Path: + suffix = pathlib.Path(filename).suffix name = clean_filename(filename.removesuffix(suffix)) + suffix path = self.attachments_path / name path.parent.mkdir(parents=True, exist_ok=True) @@ -511,12 +512,46 @@ class BlogPost(SourceItem): ) url = Column(Text, unique=True) title = Column(Text) - published = Column(DateTime(timezone=True)) + author = Column(Text, nullable=True) + published = Column(DateTime(timezone=True), nullable=True) + + # Additional metadata from webpage parsing + description = Column(Text, nullable=True) # Meta description or excerpt + domain = Column(Text, nullable=True) # Domain of the source website + word_count = Column(Integer, nullable=True) # Approximate word count + + # Store original metadata from parser + webpage_metadata = Column(JSONB, nullable=True) __mapper_args__ = { "polymorphic_identity": "blog_post", } + __table_args__ = ( + Index("blog_post_author_idx", "author"), + Index("blog_post_domain_idx", "domain"), + Index("blog_post_published_idx", "published"), + Index("blog_post_word_count_idx", "word_count"), + ) + + def as_payload(self) -> dict: + published_date = cast(datetime | None, self.published) + metadata = cast(dict | None, self.webpage_metadata) or {} + + payload = { + "source_id": self.id, + "url": self.url, + "title": self.title, + "author": self.author, + "published": published_date and published_date.isoformat(), + "description": self.description, + "domain": self.domain, + "word_count": self.word_count, + "tags": self.tags, + **metadata, + } + return {k: v for k, v in payload.items() if v} + class MiscDoc(SourceItem): __tablename__ = "misc_doc" diff --git a/src/memory/common/parsers/blogs.py b/src/memory/common/parsers/blogs.py index e2c4bf5..02ab59e 100644 --- a/src/memory/common/parsers/blogs.py +++ b/src/memory/common/parsers/blogs.py @@ -1,5 +1,6 @@ import logging import re +from datetime import datetime from urllib.parse import urlparse import requests @@ -277,7 +278,7 @@ class ExUrbeParser(BaseHTMLParser): ".tags", ] - def _extract_date(self, soup: BeautifulSoup) -> str | None: + def _extract_date(self, soup: BeautifulSoup) -> datetime | None: """Extract date, handling ordinal formats like 'Mar 5th, 2025'.""" date = soup.select_one(".published") if date: @@ -335,14 +336,14 @@ class RiftersParser(BaseHTMLParser): ".rss-links", ] - def _extract_date(self, soup: BeautifulSoup) -> str | None: + def _extract_date(self, soup: BeautifulSoup) -> datetime | None: """Extract date, handling ordinal formats like 'Mar 5th, 2025'.""" date = soup.select_one(".entry-date") if not date: return None date_str = date.text.replace("\n", " ").strip() if date := parse_date(date_str, "%d %b %Y"): - return date.isoformat() + return date return None @@ -379,7 +380,7 @@ class PaulGrahamParser(BaseHTMLParser): # Fallback to standard title extraction return extract_title(soup, self.title_selector) - def _extract_date(self, soup: BeautifulSoup) -> str | None: + def _extract_date(self, soup: BeautifulSoup) -> datetime | None: """Extract date from essay content.""" # Look for date patterns in the text content (often at the beginning) text_content = soup.get_text() @@ -389,7 +390,7 @@ class PaulGrahamParser(BaseHTMLParser): if date_match: date_str = date_match.group(1) if date := parse_date(date_str, self.date_format): - return date.isoformat() + return date return extract_date(soup, self.date_selector, self.date_format) @@ -450,7 +451,7 @@ class TheRedHandFilesParser(BaseHTMLParser): ".privacy-policy", ] - def _extract_date(self, soup: BeautifulSoup) -> str | None: + def _extract_date(self, soup: BeautifulSoup) -> datetime | None: """Extract date from issue header.""" # Look for issue date pattern like "Issue #325 / May 2025" text_content = soup.get_text() @@ -460,7 +461,7 @@ class TheRedHandFilesParser(BaseHTMLParser): if date_match: date_str = date_match.group(1) if date := parse_date(date_str, self.date_format): - return date.isoformat() + return date # Fallback to parent method return extract_date(soup, self.date_selector, self.date_format) @@ -485,7 +486,7 @@ class RachelByTheBayParser(BaseHTMLParser): ".comments", ] - def _extract_date(self, soup: BeautifulSoup) -> str | None: + def _extract_date(self, soup: BeautifulSoup) -> datetime | None: """Extract date from URL structure if available.""" # Try to get current URL from canonical link or other sources canonical = soup.find("link", rel="canonical") @@ -498,7 +499,7 @@ class RachelByTheBayParser(BaseHTMLParser): year, month, day = date_match.groups() date_str = f"{year}/{month}/{day}" if date := parse_date(date_str, self.date_format): - return date.isoformat() + return date # Fallback to parent method return extract_date(soup, self.date_selector, self.date_format) diff --git a/src/memory/common/parsers/html.py b/src/memory/common/parsers/html.py index 174ea80..286e12a 100644 --- a/src/memory/common/parsers/html.py +++ b/src/memory/common/parsers/html.py @@ -24,9 +24,9 @@ class Article: title: str content: str # Markdown content author: str | None = None - published_date: str | None = None + published_date: datetime | None = None url: str = "" - images: list[PILImage.Image] = field(default_factory=list) + images: dict[str, PILImage.Image] = field(default_factory=dict) metadata: dict[str, Any] = field(default_factory=dict) @@ -87,7 +87,7 @@ def parse_date(text: str, date_format: str = "%Y-%m-%d") -> datetime | None: def extract_date( soup: BeautifulSoup, date_selector: str, date_format: str = "%Y-%m-%d" -) -> str | None: +) -> datetime | None: """Extract publication date.""" for selector in date_selector.split(","): element = soup.select_one(selector.strip()) @@ -98,12 +98,11 @@ def extract_date( if datetime_attr: date_str = str(datetime_attr) if date := parse_date(date_str, date_format): - return date.isoformat() - return date_str + return date for text in element.find_all(string=True): if text and (date := parse_date(str(text).strip(), date_format)): - return date.isoformat() + return date return None @@ -149,7 +148,7 @@ def process_image(url: str, image_dir: pathlib.Path) -> PILImage.Image | None: def process_images( content: Tag | None, base_url: str, image_dir: pathlib.Path -) -> tuple[Tag | None, list[PILImage.Image]]: +) -> tuple[Tag | None, dict[str, PILImage.Image]]: """ Process all images in content: download them, update URLs, and return PIL Images. @@ -159,7 +158,7 @@ def process_images( if not content: return content, [] - images = [] + images = {} for img_tag in content.find_all("img"): if not isinstance(img_tag, Tag): @@ -180,7 +179,7 @@ def process_images( path = pathlib.Path(image.filename) # type: ignore img_tag["src"] = str(path.relative_to(FILE_STORAGE_DIR.resolve())) - images.append(image) + images[img_tag["src"]] = image except Exception as e: logger.warning(f"Failed to process image {src}: {e}") continue @@ -337,7 +336,7 @@ class BaseHTMLParser: """Extract article author.""" return extract_author(soup, self.author_selector) - def _extract_date(self, soup: BeautifulSoup) -> str | None: + def _extract_date(self, soup: BeautifulSoup) -> datetime | None: """Extract publication date.""" return extract_date(soup, self.date_selector, self.date_format) @@ -349,7 +348,7 @@ class BaseHTMLParser: def _process_images( self, content: Tag | None, base_url: str - ) -> tuple[Tag | None, list[PILImage.Image]]: + ) -> tuple[Tag | None, dict[str, PILImage.Image]]: """Process all images: download, update URLs, return PIL Images.""" return process_images(content, base_url, self.image_dir) diff --git a/src/memory/workers/celery_app.py b/src/memory/workers/celery_app.py index bde18a0..2da9d44 100644 --- a/src/memory/workers/celery_app.py +++ b/src/memory/workers/celery_app.py @@ -25,6 +25,7 @@ app.conf.update( "memory.workers.tasks.email.*": {"queue": "email"}, "memory.workers.tasks.photo.*": {"queue": "photo_embed"}, "memory.workers.tasks.comic.*": {"queue": "comic"}, + "memory.workers.tasks.blogs.*": {"queue": "blogs"}, "memory.workers.tasks.docs.*": {"queue": "docs"}, "memory.workers.tasks.maintenance.*": {"queue": "maintenance"}, }, diff --git a/src/memory/workers/tasks/__init__.py b/src/memory/workers/tasks/__init__.py index cba4b0c..1e6ad91 100644 --- a/src/memory/workers/tasks/__init__.py +++ b/src/memory/workers/tasks/__init__.py @@ -2,7 +2,8 @@ Import sub-modules so Celery can register their @app.task decorators. """ -from memory.workers.tasks import docs, email, comic # noqa +from memory.workers.tasks import docs, email, comic, blogs # noqa +from memory.workers.tasks.blogs import SYNC_WEBPAGE from memory.workers.tasks.email import SYNC_ACCOUNT, SYNC_ALL_ACCOUNTS, PROCESS_EMAIL from memory.workers.tasks.maintenance import ( CLEAN_ALL_COLLECTIONS, @@ -15,6 +16,8 @@ __all__ = [ "docs", "email", "comic", + "blogs", + "SYNC_WEBPAGE", "SYNC_ACCOUNT", "SYNC_ALL_ACCOUNTS", "PROCESS_EMAIL", diff --git a/src/memory/workers/tasks/blogs.py b/src/memory/workers/tasks/blogs.py new file mode 100644 index 0000000..14609e1 --- /dev/null +++ b/src/memory/workers/tasks/blogs.py @@ -0,0 +1,176 @@ +import hashlib +import logging +from typing import Iterable, cast + +from memory.common import chunker, embedding, qdrant +from memory.common.db.connection import make_session +from memory.common.db.models import BlogPost +from memory.common.parsers.blogs import parse_webpage +from memory.workers.celery_app import app + +logger = logging.getLogger(__name__) + + +SYNC_WEBPAGE = "memory.workers.tasks.blogs.sync_webpage" + + +def create_blog_post_from_article(article, tags: Iterable[str] = []) -> BlogPost: + """Create a BlogPost model from parsed article data.""" + return BlogPost( + url=article.url, + title=article.title, + published=article.published_date, + content=article.content, + sha256=hashlib.sha256(article.content.encode()).digest(), + modality="blog", + tags=tags, + mime_type="text/markdown", + size=len(article.content.encode("utf-8")), + ) + + +def embed_blog_post(blog_post: BlogPost) -> int: + """Embed blog post content and return count of successfully embedded chunks.""" + try: + # Always embed the full content + _, chunks = embedding.embed( + "text/markdown", + cast(str, blog_post.content), + metadata=blog_post.as_payload(), + chunk_size=chunker.EMBEDDING_MAX_TOKENS, + ) + # But also embed the content in chunks (unless it's really short) + if ( + chunker.approx_token_count(cast(str, blog_post.content)) + > chunker.DEFAULT_CHUNK_TOKENS * 2 + ): + _, small_chunks = embedding.embed( + "text/markdown", + cast(str, blog_post.content), + metadata=blog_post.as_payload(), + ) + chunks += small_chunks + + if chunks: + blog_post.chunks = chunks + blog_post.embed_status = "QUEUED" # type: ignore + return len(chunks) + else: + blog_post.embed_status = "FAILED" # type: ignore + logger.warning(f"No chunks generated for blog post: {blog_post.title}") + return 0 + + except Exception as e: + blog_post.embed_status = "FAILED" # type: ignore + logger.error(f"Failed to embed blog post {blog_post.title}: {e}") + return 0 + + +def push_to_qdrant(blog_post: BlogPost): + """Push embeddings to Qdrant for successfully embedded blog post.""" + if cast(str, blog_post.embed_status) != "QUEUED" or not blog_post.chunks: + return + + try: + vector_ids = [str(chunk.id) for chunk in blog_post.chunks] + vectors = [chunk.vector for chunk in blog_post.chunks] + payloads = [chunk.item_metadata for chunk in blog_post.chunks] + + qdrant.upsert_vectors( + client=qdrant.get_qdrant_client(), + collection_name="blog", + ids=vector_ids, + vectors=vectors, + payloads=payloads, + ) + + blog_post.embed_status = "STORED" # type: ignore + logger.info(f"Successfully stored embeddings for: {blog_post.title}") + + except Exception as e: + blog_post.embed_status = "FAILED" # type: ignore + logger.error(f"Failed to push embeddings to Qdrant: {e}") + raise + + +@app.task(name=SYNC_WEBPAGE) +def sync_webpage(url: str, tags: Iterable[str] = []) -> dict: + """ + Synchronize a webpage from a URL. + + Args: + url: URL of the webpage to parse and store + tags: Additional tags to apply to the content + + Returns: + dict: Summary of what was processed + """ + article = parse_webpage(url) + + if not article.content: + logger.warning(f"Article content too short or empty: {url}") + return { + "url": url, + "title": article.title, + "status": "skipped_short_content", + "content_length": 0, + } + + blog_post = create_blog_post_from_article(article, tags) + + with make_session() as session: + existing_post = session.query(BlogPost).filter(BlogPost.url == url).first() + if existing_post: + logger.info(f"Blog post already exists: {existing_post.title}") + return { + "blog_post_id": existing_post.id, + "url": url, + "title": existing_post.title, + "status": "already_exists", + "chunks_count": len(existing_post.chunks), + } + + existing_post = ( + session.query(BlogPost).filter(BlogPost.sha256 == blog_post.sha256).first() + ) + if existing_post: + logger.info( + f"Blog post with the same content already exists: {existing_post.title}" + ) + return { + "blog_post_id": existing_post.id, + "url": url, + "title": existing_post.title, + "status": "already_exists", + "chunks_count": len(existing_post.chunks), + } + + session.add(blog_post) + session.flush() + + chunks_count = embed_blog_post(blog_post) + session.flush() + + try: + push_to_qdrant(blog_post) + logger.info( + f"Successfully processed webpage: {blog_post.title} " + f"({chunks_count} chunks embedded)" + ) + except Exception as e: + logger.error(f"Failed to push embeddings to Qdrant: {e}") + blog_post.embed_status = "FAILED" # type: ignore + + session.commit() + + return { + "blog_post_id": blog_post.id, + "url": url, + "title": blog_post.title, + "author": article.author, + "published_date": article.published_date, + "status": "processed", + "chunks_count": chunks_count, + "content_length": len(article.content), + "embed_status": blog_post.embed_status, + }