diff --git a/db/migrations/versions/20250527_013945_rename_rss_feed.py b/db/migrations/versions/20250527_015138_rename_rss_feed.py similarity index 94% rename from db/migrations/versions/20250527_013945_rename_rss_feed.py rename to db/migrations/versions/20250527_015138_rename_rss_feed.py index 8482f55..4fe395f 100644 --- a/db/migrations/versions/20250527_013945_rename_rss_feed.py +++ b/db/migrations/versions/20250527_015138_rename_rss_feed.py @@ -1,8 +1,8 @@ """Rename rss feed -Revision ID: f8e6a7f80928 +Revision ID: 1b535e1b044e Revises: d897c6353a84 -Create Date: 2025-05-27 01:39:45.722077 +Create Date: 2025-05-27 01:51:38.553777 """ @@ -13,7 +13,7 @@ import sqlalchemy as sa from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. -revision: str = "f8e6a7f80928" +revision: str = "1b535e1b044e" down_revision: Union[str, None] = "d897c6353a84" branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None @@ -27,6 +27,9 @@ def upgrade() -> None: sa.Column("title", sa.Text(), nullable=True), sa.Column("description", sa.Text(), nullable=True), sa.Column("tags", sa.ARRAY(sa.Text()), server_default="{}", nullable=False), + sa.Column( + "check_interval", sa.Integer(), server_default="3600", nullable=False + ), sa.Column("last_checked_at", sa.DateTime(timezone=True), nullable=True), sa.Column("active", sa.Boolean(), server_default="true", nullable=False), sa.Column( diff --git a/docker-compose.yaml b/docker-compose.yaml index f9bd21a..63f9406 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -76,6 +76,9 @@ services: mem_limit: 4g cpus: "1.5" security_opt: [ "no-new-privileges=true" ] + ports: + # PostgreSQL port for local Celery result backend + - "15432:5432" rabbitmq: image: rabbitmq:3.13-management @@ -97,7 +100,9 @@ services: security_opt: [ "no-new-privileges=true" ] ports: # UI only on localhost - - "127.0.0.1:15672:15672" + - "15672:15672" + # AMQP port for local Celery clients + - "15673:5672" qdrant: image: qdrant/qdrant:v1.14.0 @@ -114,6 +119,8 @@ services: interval: 15s timeout: 5s retries: 5 + ports: + - "6333:6333" mem_limit: 4g cpus: "2" security_opt: [ "no-new-privileges=true" ] @@ -181,7 +188,24 @@ services: environment: <<: *worker-env QUEUES: "medium_embed" - # deploy: { resources: { limits: { cpus: "2", memory: 3g } } } + + worker-ebook: + <<: *worker-base + environment: + <<: *worker-env + QUEUES: "ebooks" + + worker-comic: + <<: *worker-base + environment: + <<: *worker-env + QUEUES: "comic" + + worker-blogs: + <<: *worker-base + environment: + <<: *worker-env + QUEUES: "blogs" worker-photo: <<: *worker-base diff --git a/docker/ingest_hub/Dockerfile b/docker/ingest_hub/Dockerfile index 18659de..2c88d17 100644 --- a/docker/ingest_hub/Dockerfile +++ b/docker/ingest_hub/Dockerfile @@ -2,28 +2,33 @@ FROM python:3.11-slim WORKDIR /app -# Copy requirements files and setup -COPY requirements-*.txt ./ -COPY setup.py ./ -COPY src/ ./src/ - # Install dependencies RUN apt-get update && apt-get install -y \ libpq-dev gcc supervisor && \ pip install -e ".[workers]" && \ apt-get purge -y gcc && apt-get autoremove -y && rm -rf /var/lib/apt/lists/* +COPY requirements-*.txt ./ +RUN pip install --no-cache-dir -r requirements-common.txt +RUN pip install --no-cache-dir -r requirements-parsers.txt +RUN pip install --no-cache-dir -r requirements-workers.txt + +# Copy requirements files and setup +COPY setup.py ./ +COPY src/ ./src/ + # Create and copy entrypoint script -COPY docker/ingest_hub/supervisor.conf /etc/supervisor/conf.d/supervisor.conf COPY docker/workers/entry.sh ./entry.sh RUN chmod +x entry.sh -# Create required tmpfs directories for supervisor -RUN mkdir -p /var/log/supervisor /var/run/supervisor - # Create storage directory RUN mkdir -p /app/memory_files +COPY docker/ingest_hub/supervisor.conf /etc/supervisor/conf.d/supervisor.conf + +# Create required tmpfs directories for supervisor +RUN mkdir -p /var/log/supervisor /var/run/supervisor + # Create user and set permissions RUN useradd -m kb && chown -R kb /app /var/log/supervisor /var/run/supervisor /app/memory_files USER kb diff --git a/docker/workers/Dockerfile b/docker/workers/Dockerfile index 07336e4..b0f4e6d 100644 --- a/docker/workers/Dockerfile +++ b/docker/workers/Dockerfile @@ -2,12 +2,6 @@ FROM python:3.11-slim WORKDIR /app -# Copy requirements files and setup -COPY requirements-*.txt ./ -COPY setup.py ./ -COPY src/ ./src/ - -# Install dependencies RUN apt-get update && apt-get install -y \ libpq-dev gcc pandoc \ texlive-xetex texlive-fonts-recommended texlive-plain-generic \ @@ -18,15 +12,25 @@ RUN apt-get update && apt-get install -y \ # For optional LibreOffice support (uncomment if needed) # libreoffice-writer \ && apt-get purge -y gcc && apt-get autoremove -y && rm -rf /var/lib/apt/lists/* + +COPY requirements-*.txt ./ +RUN pip install --no-cache-dir -r requirements-common.txt +RUN pip install --no-cache-dir -r requirements-parsers.txt +RUN pip install --no-cache-dir -r requirements-workers.txt + +# Install Python dependencies +COPY setup.py ./ +COPY src/ ./src/ RUN pip install -e ".[workers]" -# Create and copy entrypoint script +# Copy entrypoint scripts and set permissions COPY docker/workers/entry.sh ./entry.sh -COPY docker/workers/unnest-table.lua ./unnest-table.lua RUN chmod +x entry.sh RUN mkdir -p /app/memory_files +COPY docker/workers/unnest-table.lua ./unnest-table.lua + # Create user and set permissions RUN useradd -m kb RUN mkdir -p /var/cache/fontconfig /home/kb/.cache/fontconfig && \ @@ -35,7 +39,7 @@ RUN mkdir -p /var/cache/fontconfig /home/kb/.cache/fontconfig && \ USER kb # Default queues to process -ENV QUEUES="docs,email,maintenance" +ENV QUEUES="ebooks,email,comic,blogs,photo_embed,maintenance" ENV PYTHONPATH="/app" ENTRYPOINT ["./entry.sh"] \ No newline at end of file diff --git a/requirements-common.txt b/requirements-common.txt index a275cd0..27b9e47 100644 --- a/requirements-common.txt +++ b/requirements-common.txt @@ -4,9 +4,4 @@ pydantic==2.7.1 alembic==1.13.1 dotenv==0.9.9 voyageai==0.3.2 -qdrant-client==1.9.0 -PyMuPDF==1.25.5 -ebooklib==0.18.0 -beautifulsoup4==4.13.4 -markdownify==0.13.1 -pillow==10.4.0 \ No newline at end of file +qdrant-client==1.9.0 \ No newline at end of file diff --git a/requirements-parsers.txt b/requirements-parsers.txt new file mode 100644 index 0000000..a864ebd --- /dev/null +++ b/requirements-parsers.txt @@ -0,0 +1,5 @@ +PyMuPDF==1.25.5 +ebooklib==0.18.0 +beautifulsoup4==4.13.4 +markdownify==0.13.1 +pillow==10.4.0 \ No newline at end of file diff --git a/requirements-workers.txt b/requirements-workers.txt index ba9dfc8..cbf9e95 100644 --- a/requirements-workers.txt +++ b/requirements-workers.txt @@ -1,6 +1,6 @@ celery==5.3.6 openai==1.25.0 -pillow==10.3.0 +pillow==10.4.0 pypandoc==1.15.0 beautifulsoup4==4.13.4 feedparser==6.0.10 \ No newline at end of file diff --git a/setup.py b/setup.py index 7a527e3..1cc6a87 100644 --- a/setup.py +++ b/setup.py @@ -14,6 +14,7 @@ def read_requirements(filename: str) -> list[str]: # Read requirements files common_requires = read_requirements("requirements-common.txt") +parsers_requires = read_requirements("requirements-parsers.txt") api_requires = read_requirements("requirements-api.txt") workers_requires = read_requirements("requirements-workers.txt") dev_requires = read_requirements("requirements-dev.txt") @@ -26,9 +27,13 @@ setup( python_requires=">=3.10", extras_require={ "api": api_requires + common_requires, - "workers": workers_requires + common_requires, + "workers": workers_requires + common_requires + parsers_requires, "common": common_requires, "dev": dev_requires, - "all": api_requires + workers_requires + common_requires + dev_requires, + "all": api_requires + + workers_requires + + common_requires + + dev_requires + + parsers_requires, }, ) diff --git a/src/memory/api/app.py b/src/memory/api/app.py index 30dbeb3..172e788 100644 --- a/src/memory/api/app.py +++ b/src/memory/api/app.py @@ -15,7 +15,7 @@ from PIL import Image from pydantic import BaseModel from memory.common import embedding, qdrant, extract, settings -from memory.common.collections import get_modality, TEXT_COLLECTIONS +from memory.common.collections import get_modality, TEXT_COLLECTIONS, ALL_COLLECTIONS from memory.common.db.connection import make_session from memory.common.db.models import Chunk, SourceItem @@ -95,8 +95,8 @@ def group_chunks(chunks: list[tuple[SourceItem, AnnotatedChunk]]) -> list[Search return [ SearchResult( id=source.id, - size=source.size, - mime_type=source.mime_type, + size=source.size or len(source.content), + mime_type=source.mime_type or "text/plain", filename=source.filename and source.filename.replace( str(settings.FILE_STORAGE_DIR).lstrip("/"), "/files" @@ -110,7 +110,7 @@ def group_chunks(chunks: list[tuple[SourceItem, AnnotatedChunk]]) -> list[Search def query_chunks( client: qdrant_client.QdrantClient, - upload_data: list[tuple[str, list[extract.Page]]], + upload_data: list[extract.DataChunk], allowed_modalities: set[str], embedder: Callable, min_score: float = 0.0, @@ -119,15 +119,9 @@ def query_chunks( if not upload_data: return {} - chunks = [ - chunk - for content_type, pages in upload_data - if get_modality(content_type) in allowed_modalities - for page in pages - for chunk in page["contents"] - ] - + chunks = [chunk for data_chunk in upload_data for chunk in data_chunk.data] if not chunks: + logger.error(f"No chunks to embed for {allowed_modalities}") return {} vector = embedder(chunks, input_type="query")[0] @@ -143,18 +137,18 @@ def query_chunks( ) if r.score >= min_score ] - for collection in embedding.ALL_COLLECTIONS + for collection in allowed_modalities } -async def input_type(item: str | UploadFile) -> tuple[str, list[extract.Page]]: +async def input_type(item: str | UploadFile) -> list[extract.DataChunk]: if not item: - return "text/plain", [] + return [] if isinstance(item, str): - return "text/plain", extract.extract_text(item) + return extract.extract_text(item) content_type = item.content_type or "application/octet-stream" - return content_type, extract.extract_content(content_type, await item.read()) + return extract.extract_data_chunks(content_type, await item.read()) @app.post("/search", response_model=list[SearchResult]) @@ -179,13 +173,15 @@ async def search( Returns: - List of search results sorted by score """ - upload_data = [await input_type(item) for item in [query, *files]] + upload_data = [ + chunk for item in [query, *files] for chunk in await input_type(item) + ] logger.error( f"Querying chunks for {modalities}, query: {query}, previews: {previews}, upload_data: {upload_data}" ) client = qdrant.get_qdrant_client() - allowed_modalities = set(modalities or embedding.ALL_COLLECTIONS.keys()) + allowed_modalities = set(modalities or ALL_COLLECTIONS.keys()) text_results = query_chunks( client, upload_data, @@ -212,6 +208,7 @@ async def search( } with make_session() as db: chunks = db.query(Chunk).filter(Chunk.id.in_(found_chunks.keys())).all() + logger.error(f"Found chunks: {chunks}") results = group_chunks( [ @@ -245,3 +242,16 @@ def get_file_by_path(path: str): raise HTTPException(status_code=404, detail=f"File not found at path: {path}") return FileResponse(path=file_path, filename=file_path.name) + + +def main(): + """Run the FastAPI server in debug mode with auto-reloading.""" + import uvicorn + + uvicorn.run( + "memory.api.app:app", host="0.0.0.0", port=8000, reload=True, log_level="debug" + ) + + +if __name__ == "__main__": + main() diff --git a/src/memory/common/db/models.py b/src/memory/common/db/models.py index ab38440..3ceda92 100644 --- a/src/memory/common/db/models.py +++ b/src/memory/common/db/models.py @@ -33,7 +33,6 @@ from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import Session, relationship from memory.common import settings -from memory.common.parsers.email import EmailMessage, parse_email_message import memory.common.extract as extract import memory.common.collections as collections import memory.common.chunker as chunker @@ -126,8 +125,8 @@ class Chunk(Base): embedding_model = Column(Text) created_at = Column(DateTime(timezone=True), server_default=func.now()) checked_at = Column(DateTime(timezone=True), server_default=func.now()) - vector: ClassVar[list[float] | None] = None - item_metadata: ClassVar[dict[str, Any] | None] = None + vector: list[float] = [] + item_metadata: dict[str, Any] = {} images: list[Image.Image] = [] # One of file_path or content must be populated @@ -275,7 +274,7 @@ class MailMessage(SourceItem): def attachments_path(self) -> pathlib.Path: clean_sender = clean_filename(cast(str, self.sender)) clean_folder = clean_filename(cast(str | None, self.folder) or "INBOX") - return pathlib.Path(settings.FILE_STORAGE_DIR) / clean_sender / clean_folder + return pathlib.Path(settings.EMAIL_STORAGE_DIR) / clean_sender / clean_folder def safe_filename(self, filename: str) -> pathlib.Path: suffix = pathlib.Path(filename).suffix @@ -297,7 +296,9 @@ class MailMessage(SourceItem): } @property - def parsed_content(self) -> EmailMessage: + def parsed_content(self): + from memory.parsers.email import parse_email_message + return parse_email_message(cast(str, self.content), cast(str, self.message_id)) @property @@ -563,6 +564,8 @@ class BookSection(SourceItem): def as_payload(self) -> dict: vals = { + "title": self.book.title, + "author": self.book.author, "source_id": self.id, "book_id": self.book_id, "section_title": self.section_title, @@ -636,7 +639,9 @@ class BlogPost(SourceItem): return {k: v for k, v in payload.items() if v} def _chunk_contents(self) -> Sequence[Sequence[extract.MulitmodalChunk]]: - images = [Image.open(image) for image in self.images] + images = [ + Image.open(settings.FILE_STORAGE_DIR / image) for image in self.images + ] content = cast(str, self.content) full_text = [content.strip(), *images] @@ -705,6 +710,9 @@ class ArticleFeed(Base): title = Column(Text) description = Column(Text) tags = Column(ARRAY(Text), nullable=False, server_default="{}") + check_interval = Column( + Integer, nullable=False, server_default="3600", doc="Seconds between checks" + ) last_checked_at = Column(DateTime(timezone=True)) active = Column(Boolean, nullable=False, server_default="true") created_at = Column( diff --git a/src/memory/common/embedding.py b/src/memory/common/embedding.py index 95acbc4..38d3741 100644 --- a/src/memory/common/embedding.py +++ b/src/memory/common/embedding.py @@ -20,6 +20,7 @@ def embed_chunks( model: str = settings.TEXT_EMBEDDING_MODEL, input_type: Literal["document", "query"] = "document", ) -> list[Vector]: + logger.debug(f"Embedding chunks: {model} - {str(chunks)[:100]}") vo = voyageai.Client() # type: ignore if model == settings.MIXED_EMBEDDING_MODEL: return vo.multimodal_embed( @@ -71,18 +72,24 @@ def embed_mixed( return embed_chunks([chunks], model, input_type) -def embed_chunk(chunk: Chunk) -> Chunk: - model = cast(str, chunk.embedding_model) - if model == settings.TEXT_EMBEDDING_MODEL: - content = cast(str, chunk.content) - elif model == settings.MIXED_EMBEDDING_MODEL: - content = [cast(str, chunk.content)] + chunk.images - else: - raise ValueError(f"Unsupported model: {chunk.embedding_model}") - vectors = embed_chunks([content], model) # type: ignore - chunk.vector = vectors[0] # type: ignore - return chunk +def embed_by_model(chunks: list[Chunk], model: str) -> list[Chunk]: + model_chunks = [ + chunk for chunk in chunks if cast(str, chunk.embedding_model) == model + ] + if not model_chunks: + return [] + + vectors = embed_chunks([chunk.content for chunk in model_chunks], model) # type: ignore + for chunk, vector in zip(model_chunks, vectors): + chunk.vector = vector + return model_chunks def embed_source_item(item: SourceItem) -> list[Chunk]: - return [embed_chunk(chunk) for chunk in item.data_chunks()] + chunks = list(item.data_chunks()) + if not chunks: + return [] + + text_chunks = embed_by_model(chunks, settings.TEXT_EMBEDDING_MODEL) + mixed_chunks = embed_by_model(chunks, settings.MIXED_EMBEDDING_MODEL) + return text_chunks + mixed_chunks diff --git a/src/memory/common/extract.py b/src/memory/common/extract.py index 9631f37..785ba7d 100644 --- a/src/memory/common/extract.py +++ b/src/memory/common/extract.py @@ -20,6 +20,7 @@ MulitmodalChunk = Image.Image | str class DataChunk: data: Sequence[MulitmodalChunk] metadata: dict[str, Any] = field(default_factory=dict) + mime_type: str = "text/plain" @contextmanager @@ -36,7 +37,9 @@ def as_file(content: bytes | str | pathlib.Path) -> Generator[pathlib.Path, None def page_to_image(page: pymupdf.Page) -> Image.Image: pix = page.get_pixmap() # type: ignore - return Image.frombytes("RGB", [pix.width, pix.height], pix.samples) + image = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) + image.format = "jpeg" + return image def doc_to_images(content: bytes | str | pathlib.Path) -> list[DataChunk]: @@ -50,6 +53,7 @@ def doc_to_images(content: bytes | str | pathlib.Path) -> list[DataChunk]: "width": page.rect.width, "height": page.rect.height, }, + mime_type="image/jpeg", ) for page in pdf.pages() ] @@ -99,7 +103,8 @@ def extract_image(content: bytes | str | pathlib.Path) -> list[DataChunk]: image = Image.open(io.BytesIO(content)) else: raise ValueError(f"Unsupported content type: {type(content)}") - return [DataChunk(data=[image])] + image_format = image.format or "jpeg" + return [DataChunk(data=[image], mime_type=f"image/{image_format.lower()}")] def extract_text( @@ -112,7 +117,7 @@ def extract_text( content = cast(str, content) chunks = chunker.chunk_text(content, chunk_size or chunker.DEFAULT_CHUNK_TOKENS) - return [DataChunk(data=[c]) for c in chunks] + return [DataChunk(data=[c], mime_type="text/plain") for c in chunks if c.strip()] def extract_data_chunks( diff --git a/src/memory/common/settings.py b/src/memory/common/settings.py index 48f7790..51f145f 100644 --- a/src/memory/common/settings.py +++ b/src/memory/common/settings.py @@ -39,20 +39,32 @@ CELERY_RESULT_BACKEND = os.getenv("CELERY_RESULT_BACKEND", f"db+{DB_URL}") # File storage settings FILE_STORAGE_DIR = pathlib.Path(os.getenv("FILE_STORAGE_DIR", "/tmp/memory_files")) +EBOOK_STORAGE_DIR = pathlib.Path( + os.getenv("EBOOK_STORAGE_DIR", FILE_STORAGE_DIR / "ebooks") +) +EMAIL_STORAGE_DIR = pathlib.Path( + os.getenv("EMAIL_STORAGE_DIR", FILE_STORAGE_DIR / "emails") +) CHUNK_STORAGE_DIR = pathlib.Path( os.getenv("CHUNK_STORAGE_DIR", FILE_STORAGE_DIR / "chunks") ) COMIC_STORAGE_DIR = pathlib.Path( os.getenv("COMIC_STORAGE_DIR", FILE_STORAGE_DIR / "comics") ) +PHOTO_STORAGE_DIR = pathlib.Path( + os.getenv("PHOTO_STORAGE_DIR", FILE_STORAGE_DIR / "photos") +) WEBPAGE_STORAGE_DIR = pathlib.Path( os.getenv("WEBPAGE_STORAGE_DIR", FILE_STORAGE_DIR / "webpages") ) storage_dirs = [ FILE_STORAGE_DIR, + EBOOK_STORAGE_DIR, + EMAIL_STORAGE_DIR, CHUNK_STORAGE_DIR, COMIC_STORAGE_DIR, + PHOTO_STORAGE_DIR, WEBPAGE_STORAGE_DIR, ] for dir in storage_dirs: @@ -83,3 +95,4 @@ CHUNK_REINGEST_SINCE_MINUTES = int(os.getenv("CHUNK_REINGEST_SINCE_MINUTES", 60 # Embedding settings TEXT_EMBEDDING_MODEL = os.getenv("TEXT_EMBEDDING_MODEL", "voyage-3-large") MIXED_EMBEDDING_MODEL = os.getenv("MIXED_EMBEDDING_MODEL", "voyage-multimodal-3") +EMBEDDING_MAX_WORKERS = int(os.getenv("EMBEDDING_MAX_WORKERS", 50)) diff --git a/src/memory/common/parsers/archives.py b/src/memory/parsers/archives.py similarity index 98% rename from src/memory/common/parsers/archives.py rename to src/memory/parsers/archives.py index afc0cb4..762d45b 100644 --- a/src/memory/common/parsers/archives.py +++ b/src/memory/parsers/archives.py @@ -1,26 +1,22 @@ -from dataclasses import dataclass, field import logging import re import time -from urllib.parse import parse_qs, urlencode, urlparse, urlunparse +from dataclasses import dataclass, field from typing import Generator, cast +from urllib.parse import parse_qs, urlencode, urlparse, urlunparse from bs4 import BeautifulSoup -from memory.common.parsers.blogs import is_substack -from memory.common.parsers.feeds import ( +from memory.parsers.blogs import is_substack +from memory.parsers.feeds import ( DanluuParser, - HTMLListParser, - RiftersParser, FeedItem, FeedParser, + HTMLListParser, + RiftersParser, SubstackAPIParser, ) -from memory.common.parsers.html import ( - fetch_html, - extract_url, - get_base_url, -) +from memory.parsers.html import extract_url, fetch_html, get_base_url logger = logging.getLogger(__name__) diff --git a/src/memory/common/parsers/blogs.py b/src/memory/parsers/blogs.py similarity index 99% rename from src/memory/common/parsers/blogs.py rename to src/memory/parsers/blogs.py index c99b2e3..631e19f 100644 --- a/src/memory/common/parsers/blogs.py +++ b/src/memory/parsers/blogs.py @@ -6,7 +6,7 @@ from typing import cast from bs4 import BeautifulSoup, Tag -from memory.common.parsers.html import ( +from memory.parsers.html import ( BaseHTMLParser, Article, parse_date, diff --git a/src/memory/common/parsers/comics.py b/src/memory/parsers/comics.py similarity index 100% rename from src/memory/common/parsers/comics.py rename to src/memory/parsers/comics.py diff --git a/src/memory/common/parsers/ebook.py b/src/memory/parsers/ebook.py similarity index 100% rename from src/memory/common/parsers/ebook.py rename to src/memory/parsers/ebook.py diff --git a/src/memory/common/parsers/email.py b/src/memory/parsers/email.py similarity index 100% rename from src/memory/common/parsers/email.py rename to src/memory/parsers/email.py index 89c7a0a..b728bb6 100644 --- a/src/memory/common/parsers/email.py +++ b/src/memory/parsers/email.py @@ -1,10 +1,10 @@ import email import hashlib import logging +import pathlib from datetime import datetime from email.utils import parsedate_to_datetime from typing import TypedDict -import pathlib logger = logging.getLogger(__name__) diff --git a/src/memory/common/parsers/feeds.py b/src/memory/parsers/feeds.py similarity index 99% rename from src/memory/common/parsers/feeds.py rename to src/memory/parsers/feeds.py index d352c7e..1612017 100644 --- a/src/memory/common/parsers/feeds.py +++ b/src/memory/parsers/feeds.py @@ -10,7 +10,7 @@ import feedparser from bs4 import BeautifulSoup, Tag import requests -from memory.common.parsers.html import ( +from memory.parsers.html import ( get_base_url, to_absolute_url, extract_title, diff --git a/src/memory/common/parsers/html.py b/src/memory/parsers/html.py similarity index 99% rename from src/memory/common/parsers/html.py rename to src/memory/parsers/html.py index 5a46533..0d8ca54 100644 --- a/src/memory/common/parsers/html.py +++ b/src/memory/parsers/html.py @@ -1,11 +1,11 @@ -from datetime import datetime +import hashlib import logging +import pathlib import re from dataclasses import dataclass, field -import pathlib +from datetime import datetime from typing import Any from urllib.parse import urljoin, urlparse -import hashlib import requests from bs4 import BeautifulSoup, Tag @@ -153,6 +153,7 @@ def process_image(url: str, image_dir: pathlib.Path) -> PILImage.Image | None: ext = pathlib.Path(urlparse(url).path).suffix or ".jpg" filename = f"{url_hash}{ext}" local_path = image_dir / filename + local_path.parent.mkdir(parents=True, exist_ok=True) # Download if not already cached if not local_path.exists(): diff --git a/src/memory/workers/celery_app.py b/src/memory/workers/celery_app.py index 2da9d44..82f61b2 100644 --- a/src/memory/workers/celery_app.py +++ b/src/memory/workers/celery_app.py @@ -21,12 +21,11 @@ app.conf.update( task_reject_on_worker_lost=True, worker_prefetch_multiplier=1, task_routes={ - "memory.workers.tasks.text.*": {"queue": "medium_embed"}, "memory.workers.tasks.email.*": {"queue": "email"}, "memory.workers.tasks.photo.*": {"queue": "photo_embed"}, "memory.workers.tasks.comic.*": {"queue": "comic"}, + "memory.workers.tasks.ebook.*": {"queue": "ebooks"}, "memory.workers.tasks.blogs.*": {"queue": "blogs"}, - "memory.workers.tasks.docs.*": {"queue": "docs"}, "memory.workers.tasks.maintenance.*": {"queue": "maintenance"}, }, ) diff --git a/src/memory/workers/email.py b/src/memory/workers/email.py index 493637f..f1133e7 100644 --- a/src/memory/workers/email.py +++ b/src/memory/workers/email.py @@ -1,7 +1,6 @@ import hashlib import imaplib import logging -import pathlib import re from collections import defaultdict from contextlib import contextmanager @@ -16,7 +15,7 @@ from memory.common.db.models import ( EmailAttachment, MailMessage, ) -from memory.common.parsers.email import ( +from memory.parsers.email import ( Attachment, EmailMessage, RawEmailResponse, diff --git a/src/memory/workers/tasks/__init__.py b/src/memory/workers/tasks/__init__.py index 1e6ad91..d0036db 100644 --- a/src/memory/workers/tasks/__init__.py +++ b/src/memory/workers/tasks/__init__.py @@ -2,26 +2,42 @@ Import sub-modules so Celery can register their @app.task decorators. """ -from memory.workers.tasks import docs, email, comic, blogs # noqa -from memory.workers.tasks.blogs import SYNC_WEBPAGE +from memory.workers.tasks import email, comic, blogs, ebook # noqa +from memory.workers.tasks.blogs import ( + SYNC_WEBPAGE, + SYNC_ARTICLE_FEED, + SYNC_ALL_ARTICLE_FEEDS, + SYNC_WEBSITE_ARCHIVE, +) +from memory.workers.tasks.comic import SYNC_ALL_COMICS, SYNC_SMBC, SYNC_XKCD +from memory.workers.tasks.ebook import SYNC_BOOK from memory.workers.tasks.email import SYNC_ACCOUNT, SYNC_ALL_ACCOUNTS, PROCESS_EMAIL from memory.workers.tasks.maintenance import ( CLEAN_ALL_COLLECTIONS, CLEAN_COLLECTION, REINGEST_MISSING_CHUNKS, + REINGEST_CHUNK, ) __all__ = [ - "docs", "email", "comic", "blogs", + "ebook", "SYNC_WEBPAGE", + "SYNC_ARTICLE_FEED", + "SYNC_ALL_ARTICLE_FEEDS", + "SYNC_WEBSITE_ARCHIVE", + "SYNC_ALL_COMICS", + "SYNC_SMBC", + "SYNC_XKCD", + "SYNC_BOOK", "SYNC_ACCOUNT", "SYNC_ALL_ACCOUNTS", "PROCESS_EMAIL", "CLEAN_ALL_COLLECTIONS", "CLEAN_COLLECTION", "REINGEST_MISSING_CHUNKS", + "REINGEST_CHUNK", ] diff --git a/src/memory/workers/tasks/blogs.py b/src/memory/workers/tasks/blogs.py index 6506972..7b4f9cd 100644 --- a/src/memory/workers/tasks/blogs.py +++ b/src/memory/workers/tasks/blogs.py @@ -1,9 +1,12 @@ import logging -from typing import Iterable +from datetime import datetime, timedelta +from typing import Iterable, cast from memory.common.db.connection import make_session -from memory.common.db.models import BlogPost -from memory.common.parsers.blogs import parse_webpage +from memory.common.db.models import ArticleFeed, BlogPost +from memory.parsers.blogs import parse_webpage +from memory.parsers.feeds import get_feed_parser +from memory.parsers.archives import get_archive_fetcher from memory.workers.celery_app import app from memory.workers.tasks.content_processing import ( check_content_exists, @@ -16,6 +19,9 @@ from memory.workers.tasks.content_processing import ( logger = logging.getLogger(__name__) SYNC_WEBPAGE = "memory.workers.tasks.blogs.sync_webpage" +SYNC_ARTICLE_FEED = "memory.workers.tasks.blogs.sync_article_feed" +SYNC_ALL_ARTICLE_FEEDS = "memory.workers.tasks.blogs.sync_all_article_feeds" +SYNC_WEBSITE_ARCHIVE = "memory.workers.tasks.blogs.sync_website_archive" @app.task(name=SYNC_WEBPAGE) @@ -31,7 +37,9 @@ def sync_webpage(url: str, tags: Iterable[str] = []) -> dict: Returns: dict: Summary of what was processed """ + logger.info(f"Syncing webpage: {url}") article = parse_webpage(url) + logger.debug(f"Article: {article.title} - {article.url}") if not article.content: logger.warning(f"Article content too short or empty: {url}") @@ -57,10 +65,158 @@ def sync_webpage(url: str, tags: Iterable[str] = []) -> dict: with make_session() as session: existing_post = check_content_exists( - session, BlogPost, url=url, sha256=create_content_hash(article.content) + session, BlogPost, url=article.url, sha256=blog_post.sha256 ) if existing_post: logger.info(f"Blog post already exists: {existing_post.title}") - return create_task_result(existing_post, "already_exists", url=url) + return create_task_result(existing_post, "already_exists", url=article.url) return process_content_item(blog_post, "blog", session, tags) + + +@app.task(name=SYNC_ARTICLE_FEED) +@safe_task_execution +def sync_article_feed(feed_id: int) -> dict: + """ + Synchronize articles from a specific ArticleFeed. + + Args: + feed_id: ID of the ArticleFeed to sync + + Returns: + dict: Summary of sync operation including stats + """ + with make_session() as session: + feed = session.query(ArticleFeed).filter(ArticleFeed.id == feed_id).first() + if not feed or not cast(bool, feed.active): + logger.warning(f"Feed {feed_id} not found or inactive") + return {"status": "error", "error": "Feed not found or inactive"} + + last_checked_at = cast(datetime | None, feed.last_checked_at) + if last_checked_at and datetime.now() - last_checked_at < timedelta( + seconds=cast(int, feed.check_interval) + ): + logger.info(f"Feed {feed_id} checked too recently, skipping") + return {"status": "skipped_recent_check", "feed_id": feed_id} + + logger.info(f"Syncing feed: {feed.title} ({feed.url})") + + parser = get_feed_parser(cast(str, feed.url), last_checked_at) + if not parser: + logger.error(f"No parser available for feed: {feed.url}") + return {"status": "error", "error": "No parser available for feed"} + + articles_found = 0 + new_articles = 0 + errors = 0 + task_ids = [] + + try: + for feed_item in parser.parse_feed(): + articles_found += 1 + + existing = check_content_exists(session, BlogPost, url=feed_item.url) + if existing: + continue + + feed_tags = cast(list[str] | None, feed.tags) or [] + task_ids.append(sync_webpage.delay(feed_item.url, feed_tags).id) + new_articles += 1 + + logger.info(f"Scheduled sync for: {feed_item.title} ({feed_item.url})") + + except Exception as e: + logger.error(f"Error parsing feed {feed.url}: {e}") + errors += 1 + + feed.last_checked_at = datetime.now() # type: ignore + session.commit() + + return { + "status": "completed", + "feed_id": feed_id, + "feed_title": feed.title, + "feed_url": feed.url, + "articles_found": articles_found, + "new_articles": new_articles, + "errors": errors, + "task_ids": task_ids, + } + + +@app.task(name=SYNC_ALL_ARTICLE_FEEDS) +def sync_all_article_feeds() -> list[dict]: + """ + Trigger sync for all active ArticleFeeds. + + Returns: + List of task results for each feed sync + """ + with make_session() as session: + active_feeds = session.query(ArticleFeed).filter(ArticleFeed.active).all() + + results = [ + { + "feed_id": feed.id, + "feed_title": feed.title, + "feed_url": feed.url, + "task_id": sync_article_feed.delay(feed.id).id, + } + for feed in active_feeds + ] + logger.info(f"Scheduled sync for {len(results)} active feeds") + return results + + +@app.task(name=SYNC_WEBSITE_ARCHIVE) +@safe_task_execution +def sync_website_archive( + url: str, tags: Iterable[str] = [], max_pages: int = 100 +) -> dict: + """ + Synchronize all articles from a website's archive. + + Args: + url: Base URL of the website to sync + tags: Additional tags to apply to all articles + max_pages: Maximum number of pages to process + + Returns: + dict: Summary of archive sync operation + """ + logger.info(f"Starting archive sync for: {url}") + + # Get archive fetcher for the website + fetcher = get_archive_fetcher(url) + if not fetcher: + logger.error(f"No archive fetcher available for: {url}") + return {"status": "error", "error": "No archive fetcher available"} + + # Override max_pages if provided + fetcher.max_pages = max_pages + + articles_found = 0 + new_articles = 0 + task_ids = [] + + for feed_item in fetcher.fetch_all_items(): + articles_found += 1 + + with make_session() as session: + existing = check_content_exists(session, BlogPost, url=feed_item.url) + if existing: + continue + + task_ids.append(sync_webpage.delay(feed_item.url, list(tags)).id) + new_articles += 1 + + logger.info(f"Scheduled sync for: {feed_item.title} ({feed_item.url})") + + return { + "status": "completed", + "website_url": url, + "articles_found": articles_found, + "new_articles": new_articles, + "task_ids": task_ids, + "max_pages_processed": fetcher.max_pages, + } diff --git a/src/memory/workers/tasks/comic.py b/src/memory/workers/tasks/comic.py index ea650d4..580b868 100644 --- a/src/memory/workers/tasks/comic.py +++ b/src/memory/workers/tasks/comic.py @@ -8,7 +8,7 @@ import requests from memory.common import settings from memory.common.db.connection import make_session from memory.common.db.models import Comic, clean_filename -from memory.common.parsers import comics +from memory.parsers import comics from memory.workers.celery_app import app from memory.workers.tasks.content_processing import ( check_content_exists, diff --git a/src/memory/workers/tasks/content_processing.py b/src/memory/workers/tasks/content_processing.py index 3c3fb2c..4a00a91 100644 --- a/src/memory/workers/tasks/content_processing.py +++ b/src/memory/workers/tasks/content_processing.py @@ -221,6 +221,7 @@ def process_content_item( try: push_to_qdrant([item], collection_name) status = "processed" + item.embed_status = "STORED" # type: ignore logger.info( f"Successfully processed {type(item).__name__}: {getattr(item, 'title', 'unknown')} ({chunks_count} chunks embedded)" ) @@ -228,7 +229,6 @@ def process_content_item( logger.error(f"Failed to push embeddings to Qdrant: {e}") item.embed_status = "FAILED" # type: ignore status = "failed" - session.commit() return create_task_result(item, status, content_length=getattr(item, "size", 0)) diff --git a/src/memory/workers/tasks/docs.py b/src/memory/workers/tasks/docs.py deleted file mode 100644 index 32cf5ea..0000000 --- a/src/memory/workers/tasks/docs.py +++ /dev/null @@ -1,5 +0,0 @@ -from memory.workers.celery_app import app - -@app.task(name="kb.text.ping") -def ping(): - return "pong" \ No newline at end of file diff --git a/src/memory/workers/tasks/ebook.py b/src/memory/workers/tasks/ebook.py index bb2a8d9..1835fbb 100644 --- a/src/memory/workers/tasks/ebook.py +++ b/src/memory/workers/tasks/ebook.py @@ -1,9 +1,10 @@ import logging -from pathlib import Path +import pathlib from typing import Iterable, cast +import memory.common.settings as settings +from memory.parsers.ebook import Ebook, parse_ebook, Section from memory.common.db.models import Book, BookSection -from memory.common.parsers.ebook import Ebook, parse_ebook, Section from memory.common.db.connection import make_session from memory.workers.celery_app import app from memory.workers.tasks.content_processing import ( @@ -16,7 +17,7 @@ from memory.workers.tasks.content_processing import ( logger = logging.getLogger(__name__) -SYNC_BOOK = "memory.workers.tasks.book.sync_book" +SYNC_BOOK = "memory.workers.tasks.ebook.sync_book" # Minimum section length to embed (avoid noise from very short sections) MIN_SECTION_LENGTH = 100 @@ -59,6 +60,8 @@ def section_processor( end_page=section.end_page, parent_section_id=None, # Will be set after flush content=content, + size=len(content), + mime_type="text/plain", sha256=create_content_hash( f"{book.id}:{section.title}:{section.start_page}" ), @@ -94,7 +97,13 @@ def create_all_sections( def validate_and_parse_book(file_path: str) -> Ebook: """Validate file exists and parse the ebook.""" - path = Path(file_path) + logger.info(f"Validating and parsing book from {file_path}") + path = pathlib.Path(file_path) + if not path.is_absolute(): + path = settings.EBOOK_STORAGE_DIR / path + + logger.info(f"Resolved path: {path}") + if not path.exists(): raise FileNotFoundError(f"Book file not found: {path}") @@ -145,6 +154,7 @@ def sync_book(file_path: str, tags: Iterable[str] = []) -> dict: dict: Summary of what was processed """ ebook = validate_and_parse_book(file_path) + logger.info(f"Ebook parsed: {ebook.title}") with make_session() as session: # Check for existing book @@ -161,14 +171,22 @@ def sync_book(file_path: str, tags: Iterable[str] = []) -> dict: "sections_processed": 0, } + logger.info("Creating book and sections with relationships") # Create book and sections with relationships book, all_sections = create_book_and_sections(ebook, session, tags) # Embed sections + logger.info("Embedding sections") embedded_count = sum(embed_source_item(section) for section in all_sections) session.flush() + for section in all_sections: + logger.info( + f"Embedded section: {section.section_title} - {section.content[:100]}" + ) + logger.info("Pushing to Qdrant") push_to_qdrant(all_sections, "book") + logger.info("Committing session") session.commit() diff --git a/src/memory/workers/tasks/email.py b/src/memory/workers/tasks/email.py index 97efd72..531e2c7 100644 --- a/src/memory/workers/tasks/email.py +++ b/src/memory/workers/tasks/email.py @@ -10,7 +10,7 @@ from memory.workers.email import ( process_folder, vectorize_email, ) -from memory.common.parsers.email import parse_email_message +from memory.parsers.email import parse_email_message from memory.workers.tasks.content_processing import ( check_content_exists, safe_task_execution, diff --git a/src/memory/workers/tasks/maintenance.py b/src/memory/workers/tasks/maintenance.py index f8c978d..b309ead 100644 --- a/src/memory/workers/tasks/maintenance.py +++ b/src/memory/workers/tasks/maintenance.py @@ -23,6 +23,10 @@ REINGEST_CHUNK = "memory.workers.tasks.maintenance.reingest_chunk" @app.task(name=CLEAN_COLLECTION) def clean_collection(collection: str) -> dict[str, int]: logger.info(f"Cleaning collection {collection}") + + if collection not in collections.ALL_COLLECTIONS: + raise ValueError(f"Unsupported collection {collection}") + client = qdrant.get_qdrant_client() batches, deleted, checked = 0, 0, 0 for batch in qdrant.batch_ids(client, collection): @@ -47,7 +51,7 @@ def clean_collection(collection: str) -> dict[str, int]: @app.task(name=CLEAN_ALL_COLLECTIONS) def clean_all_collections(): logger.info("Cleaning all collections") - for collection in embedding.ALL_COLLECTIONS: + for collection in collections.ALL_COLLECTIONS: clean_collection.delay(collection) # type: ignore @@ -111,10 +115,12 @@ def check_batch(batch: Sequence[Chunk]) -> dict: @app.task(name=REINGEST_MISSING_CHUNKS) -def reingest_missing_chunks(batch_size: int = 1000): +def reingest_missing_chunks( + batch_size: int = 1000, minutes_ago: int = settings.CHUNK_REINGEST_SINCE_MINUTES +): logger.info("Reingesting missing chunks") total_stats = defaultdict(lambda: {"missing": 0, "correct": 0, "total": 0}) - since = datetime.now() - timedelta(minutes=settings.CHUNK_REINGEST_SINCE_MINUTES) + since = datetime.now() - timedelta(minutes=minutes_ago) with make_session() as session: total_count = session.query(Chunk).filter(Chunk.checked_at < since).count() diff --git a/tests/memory/common/db/test_models.py b/tests/memory/common/db/test_models.py index c5e8017..817efc3 100644 --- a/tests/memory/common/db/test_models.py +++ b/tests/memory/common/db/test_models.py @@ -753,7 +753,65 @@ def test_email_attachment_cascade_delete(db_session: Session): assert deleted_attachment is None -# BookSection tests +def test_subclass_deletion_cascades_to_source_item(db_session: Session): + mail_message = MailMessage( + sha256=b"test_email_cascade", + content="test email content", + message_id="", + subject="Cascade Test", + sender="sender@example.com", + recipients=["recipient@example.com"], + folder="INBOX", + ) + db_session.add(mail_message) + db_session.commit() + + source_item_id = mail_message.id + mail_message_id = mail_message.id + + # Verify both records exist + assert db_session.query(SourceItem).filter_by(id=source_item_id).first() is not None + assert ( + db_session.query(MailMessage).filter_by(id=mail_message_id).first() is not None + ) + + # Delete the MailMessage subclass + db_session.delete(mail_message) + db_session.commit() + + # Verify both the MailMessage and SourceItem records are deleted + assert db_session.query(MailMessage).filter_by(id=mail_message_id).first() is None + assert db_session.query(SourceItem).filter_by(id=source_item_id).first() is None + + +def test_subclass_deletion_cascades_from_source_item(db_session: Session): + mail_message = MailMessage( + sha256=b"test_email_cascade", + content="test email content", + message_id="", + subject="Cascade Test", + sender="sender@example.com", + recipients=["recipient@example.com"], + folder="INBOX", + ) + db_session.add(mail_message) + db_session.commit() + + source_item_id = mail_message.id + mail_message_id = mail_message.id + + # Verify both records exist + source_item = db_session.query(SourceItem).get(source_item_id) + assert source_item + assert db_session.query(MailMessage).get(mail_message_id) + + # Delete the MailMessage subclass + db_session.delete(source_item) + db_session.commit() + + # Verify both the MailMessage and SourceItem records are deleted + assert db_session.query(MailMessage).filter_by(id=mail_message_id).first() is None + assert db_session.query(SourceItem).filter_by(id=source_item_id).first() is None @pytest.mark.parametrize( diff --git a/tests/memory/common/parsers/test_archives.py b/tests/memory/parsers/test_archives.py similarity index 93% rename from tests/memory/common/parsers/test_archives.py rename to tests/memory/parsers/test_archives.py index e7284dd..8942f1e 100644 --- a/tests/memory/common/parsers/test_archives.py +++ b/tests/memory/parsers/test_archives.py @@ -3,7 +3,7 @@ from urllib.parse import urlparse, parse_qs import pytest -from memory.common.parsers.archives import ( +from memory.parsers.archives import ( ArchiveFetcher, LinkFetcher, HTMLArchiveFetcher, @@ -14,7 +14,7 @@ from memory.common.parsers.archives import ( get_archive_fetcher, FETCHER_REGISTRY, ) -from memory.common.parsers.feeds import ( +from memory.parsers.feeds import ( FeedItem, FeedParser, HTMLListParser, @@ -56,7 +56,7 @@ def test_archive_fetcher_find_next_page_base(): assert fetcher._find_next_page(parser, 0) is None -@patch("memory.common.parsers.archives.time.sleep") +@patch("memory.parsers.archives.time.sleep") def test_archive_fetcher_fetch_all_items_single_page(mock_sleep): items = [ FeedItem(title="Item 1", url="https://example.com/1"), @@ -80,7 +80,7 @@ def test_archive_fetcher_fetch_all_items_single_page(mock_sleep): mock_sleep.assert_not_called() # No delay for single page -@patch("memory.common.parsers.archives.time.sleep") +@patch("memory.parsers.archives.time.sleep") def test_archive_fetcher_fetch_all_items_multiple_pages(mock_sleep): page1_items = [FeedItem(title="Item 1", url="https://example.com/1")] page2_items = [FeedItem(title="Item 2", url="https://example.com/2")] @@ -258,7 +258,7 @@ def test_html_archive_fetcher_find_next_page(html, selectors, expected_url): ) parser = MockParser("https://example.com", content=html) - with patch("memory.common.parsers.archives.extract_url") as mock_extract: + with patch("memory.parsers.archives.extract_url") as mock_extract: mock_extract.return_value = expected_url result = fetcher._find_next_page(parser) @@ -308,7 +308,7 @@ def test_html_parser_factory(): ], ) def test_substack_archive_fetcher_post_init(start_url, expected_api_url): - with patch("memory.common.parsers.archives.get_base_url") as mock_get_base: + with patch("memory.parsers.archives.get_base_url") as mock_get_base: mock_get_base.return_value = "https://example.substack.com" fetcher = SubstackArchiveFetcher(SubstackAPIParser, start_url) @@ -413,10 +413,10 @@ def test_html_next_url_archive_fetcher_find_next_page(): ], ) def test_get_archive_fetcher_registry_matches(url, expected_fetcher_type): - with patch("memory.common.parsers.archives.fetch_html") as mock_fetch: + with patch("memory.parsers.archives.fetch_html") as mock_fetch: mock_fetch.return_value = "Not substack" - with patch("memory.common.parsers.archives.is_substack") as mock_is_substack: + with patch("memory.parsers.archives.is_substack") as mock_is_substack: mock_is_substack.return_value = False fetcher = get_archive_fetcher(url) @@ -430,7 +430,7 @@ def test_get_archive_fetcher_registry_matches(url, expected_fetcher_type): def test_get_archive_fetcher_tuple_registry(): url = "https://putanumonit.com" - with patch("memory.common.parsers.archives.fetch_html") as mock_fetch: + with patch("memory.parsers.archives.fetch_html") as mock_fetch: mock_fetch.return_value = "Not substack" fetcher = get_archive_fetcher(url) @@ -442,7 +442,7 @@ def test_get_archive_fetcher_tuple_registry(): def test_get_archive_fetcher_direct_parser_registry(): url = "https://danluu.com" - with patch("memory.common.parsers.archives.fetch_html") as mock_fetch: + with patch("memory.parsers.archives.fetch_html") as mock_fetch: mock_fetch.return_value = "Not substack" fetcher = get_archive_fetcher(url) @@ -455,10 +455,10 @@ def test_get_archive_fetcher_direct_parser_registry(): def test_get_archive_fetcher_substack(): url = "https://example.substack.com" - with patch("memory.common.parsers.archives.fetch_html") as mock_fetch: + with patch("memory.parsers.archives.fetch_html") as mock_fetch: mock_fetch.return_value = "Substack content" - with patch("memory.common.parsers.archives.is_substack") as mock_is_substack: + with patch("memory.parsers.archives.is_substack") as mock_is_substack: mock_is_substack.return_value = True fetcher = get_archive_fetcher(url) @@ -470,10 +470,10 @@ def test_get_archive_fetcher_substack(): def test_get_archive_fetcher_no_match(): url = "https://unknown.com" - with patch("memory.common.parsers.archives.fetch_html") as mock_fetch: + with patch("memory.parsers.archives.fetch_html") as mock_fetch: mock_fetch.return_value = "Regular website" - with patch("memory.common.parsers.archives.is_substack") as mock_is_substack: + with patch("memory.parsers.archives.is_substack") as mock_is_substack: mock_is_substack.return_value = False fetcher = get_archive_fetcher(url) diff --git a/tests/memory/common/parsers/test_comic.py b/tests/memory/parsers/test_comic.py similarity index 99% rename from tests/memory/common/parsers/test_comic.py rename to tests/memory/parsers/test_comic.py index 3f1bea7..570fa7c 100644 --- a/tests/memory/common/parsers/test_comic.py +++ b/tests/memory/parsers/test_comic.py @@ -2,7 +2,7 @@ from unittest.mock import patch import pytest -from memory.common.parsers.comics import extract_smbc, extract_xkcd +from memory.parsers.comics import extract_smbc, extract_xkcd MOCK_SMBC_HTML = """ diff --git a/tests/memory/common/parsers/test_ebook.py b/tests/memory/parsers/test_ebook.py similarity index 98% rename from tests/memory/common/parsers/test_ebook.py rename to tests/memory/parsers/test_ebook.py index 44a170d..cfd8b59 100644 --- a/tests/memory/common/parsers/test_ebook.py +++ b/tests/memory/parsers/test_ebook.py @@ -3,7 +3,7 @@ from unittest.mock import MagicMock, patch import pytest import fitz -from memory.common.parsers.ebook import ( +from memory.parsers.ebook import ( Peekable, extract_epub_metadata, get_pages, @@ -381,7 +381,7 @@ def test_parse_ebook_full_content_generation(mock_open, mock_doc, tmp_path): section2.pages = ["Content of section 2"] # Mock extract_sections to return our sections - with patch("memory.common.parsers.ebook.extract_sections") as mock_extract: + with patch("memory.parsers.ebook.extract_sections") as mock_extract: mock_extract.return_value = [section1, section2] mock_open.return_value = mock_doc diff --git a/tests/memory/common/parsers/test_email_parsers.py b/tests/memory/parsers/test_email_parsers.py similarity index 99% rename from tests/memory/common/parsers/test_email_parsers.py rename to tests/memory/parsers/test_email_parsers.py index ccaaf63..012dd54 100644 --- a/tests/memory/common/parsers/test_email_parsers.py +++ b/tests/memory/parsers/test_email_parsers.py @@ -7,7 +7,7 @@ from datetime import datetime from email.utils import formatdate from unittest.mock import ANY import pytest -from memory.common.parsers.email import ( +from memory.parsers.email import ( compute_message_hash, extract_attachments, extract_body, diff --git a/tests/memory/common/parsers/test_feeds.py b/tests/memory/parsers/test_feeds.py similarity index 96% rename from tests/memory/common/parsers/test_feeds.py rename to tests/memory/parsers/test_feeds.py index 7981bfc..d7a528c 100644 --- a/tests/memory/common/parsers/test_feeds.py +++ b/tests/memory/parsers/test_feeds.py @@ -6,7 +6,7 @@ import json import pytest from bs4 import BeautifulSoup, Tag -from memory.common.parsers.feeds import ( +from memory.parsers.feeds import ( FeedItem, FeedParser, RSSAtomParser, @@ -61,7 +61,7 @@ def test_select_in(data, path, expected): assert select_in(data, path) == expected -@patch("memory.common.parsers.feeds.fetch_html") +@patch("memory.parsers.feeds.fetch_html") def test_json_parser_fetch_items_with_content(mock_fetch_html): content = json.dumps( [ @@ -80,7 +80,7 @@ def test_json_parser_fetch_items_with_content(mock_fetch_html): mock_fetch_html.assert_not_called() -@patch("memory.common.parsers.feeds.fetch_html") +@patch("memory.parsers.feeds.fetch_html") def test_json_parser_fetch_items_without_content(mock_fetch_html): content = json.dumps([{"title": "Article", "url": "https://example.com/1"}]) mock_fetch_html.return_value = content @@ -92,7 +92,7 @@ def test_json_parser_fetch_items_without_content(mock_fetch_html): mock_fetch_html.assert_called_once_with("https://example.com/feed.json") -@patch("memory.common.parsers.feeds.fetch_html") +@patch("memory.parsers.feeds.fetch_html") def test_json_parser_fetch_items_invalid_json(mock_fetch_html): mock_fetch_html.return_value = "invalid json content" @@ -220,7 +220,7 @@ def test_feed_parser_parse_feed_with_invalid_items(): ] -@patch("memory.common.parsers.feeds.feedparser.parse") +@patch("memory.parsers.feeds.feedparser.parse") @pytest.mark.parametrize("since_date", [None, datetime(2023, 1, 1)]) def test_rss_atom_parser_fetch_items(mock_parse, since_date): mock_feed = MagicMock() @@ -239,7 +239,7 @@ def test_rss_atom_parser_fetch_items(mock_parse, since_date): assert items == ["entry1", "entry2"] -@patch("memory.common.parsers.feeds.feedparser.parse") +@patch("memory.parsers.feeds.feedparser.parse") def test_rss_atom_parser_fetch_items_with_content(mock_parse): mock_feed = MagicMock() mock_feed.entries = ["entry1"] @@ -411,7 +411,7 @@ def test_rss_atom_parser_extract_metadata(): } -@patch("memory.common.parsers.feeds.fetch_html") +@patch("memory.parsers.feeds.fetch_html") def test_html_list_parser_fetch_items_with_content(mock_fetch_html): html = """