add ebook job

This commit is contained in:
Daniel O'Connell 2025-05-24 20:21:41 +02:00
parent b292baf59d
commit 02d606deab
7 changed files with 772 additions and 19 deletions

View File

@ -17,7 +17,7 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install .[all]
pip install ruff==0.11.10 pylint
pip install ruff==0.11.10 pylint==1.1.400
- name: Run linters
run: |
ruff check .

View File

@ -0,0 +1,107 @@
"""Add ebooks
Revision ID: fe570eab952a
Revises: b78b1fff9974
Create Date: 2025-05-23 16:37:53.354723
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = "fe570eab952a"
down_revision: Union[str, None] = "b78b1fff9974"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"book",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("isbn", sa.Text(), nullable=True),
sa.Column("title", sa.Text(), nullable=False),
sa.Column("author", sa.Text(), nullable=True),
sa.Column("publisher", sa.Text(), nullable=True),
sa.Column("published", sa.DateTime(timezone=True), nullable=True),
sa.Column("language", sa.Text(), nullable=True),
sa.Column("edition", sa.Text(), nullable=True),
sa.Column("series", sa.Text(), nullable=True),
sa.Column("series_number", sa.Integer(), nullable=True),
sa.Column("total_pages", sa.Integer(), nullable=True),
sa.Column("file_path", sa.Text(), nullable=True),
sa.Column("tags", sa.ARRAY(sa.Text()), nullable=False, server_default="{}"),
sa.Column("metadata", postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=True,
),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("isbn"),
)
op.create_index("book_author_idx", "book", ["author"], unique=False)
op.create_index("book_isbn_idx", "book", ["isbn"], unique=False)
op.create_index("book_title_idx", "book", ["title"], unique=False)
op.create_table(
"book_section",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("book_id", sa.BigInteger(), nullable=False),
sa.Column("section_title", sa.Text(), nullable=True),
sa.Column("section_number", sa.Integer(), nullable=True),
sa.Column("section_level", sa.Integer(), nullable=True),
sa.Column("start_page", sa.Integer(), nullable=True),
sa.Column("end_page", sa.Integer(), nullable=True),
sa.Column("parent_section_id", sa.BigInteger(), nullable=True),
sa.ForeignKeyConstraint(["book_id"], ["book.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(["id"], ["source_item.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(
["parent_section_id"],
["book_section.id"],
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index("book_section_book_idx", "book_section", ["book_id"], unique=False)
op.create_index(
"book_section_level_idx",
"book_section",
["section_level", "section_number"],
unique=False,
)
op.create_index(
"book_section_parent_idx", "book_section", ["parent_section_id"], unique=False
)
op.drop_table("book_doc")
def downgrade() -> None:
op.create_table(
"book_doc",
sa.Column("id", sa.BIGINT(), autoincrement=False, nullable=False),
sa.Column("title", sa.TEXT(), autoincrement=False, nullable=True),
sa.Column("author", sa.TEXT(), autoincrement=False, nullable=True),
sa.Column("chapter", sa.TEXT(), autoincrement=False, nullable=True),
sa.Column(
"published",
postgresql.TIMESTAMP(timezone=True),
autoincrement=False,
nullable=True,
),
sa.ForeignKeyConstraint(
["id"], ["source_item.id"], name="book_doc_id_fkey", ondelete="CASCADE"
),
sa.PrimaryKeyConstraint("id", name="book_doc_pkey"),
)
op.drop_index("book_section_parent_idx", table_name="book_section")
op.drop_index("book_section_level_idx", table_name="book_section")
op.drop_index("book_section_book_idx", table_name="book_section")
op.drop_table("book_section")
op.drop_index("book_title_idx", table_name="book")
op.drop_index("book_isbn_idx", table_name="book")
op.drop_index("book_author_idx", table_name="book")
op.drop_table("book")

3
dev.sh
View File

@ -13,6 +13,9 @@ echo -e "${GREEN}Starting development environment for Memory Knowledge Base...${
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$SCRIPT_DIR"
docker volume create memory_file_storage
docker run --rm -v memory_file_storage:/data busybox chown -R 1000:1000 /data
# Create a temporary docker-compose override file to expose PostgreSQL
echo -e "${YELLOW}Creating docker-compose override to expose PostgreSQL...${NC}"
if [ ! -f docker-compose.override.yml ]; then

View File

@ -407,19 +407,98 @@ class Comic(SourceItem):
return {k: v for k, v in payload.items() if v is not None}
class BookDoc(SourceItem):
__tablename__ = "book_doc"
class Book(Base):
"""Book-level metadata table"""
__tablename__ = "book"
id = Column(BigInteger, primary_key=True)
isbn = Column(Text, unique=True)
title = Column(Text, nullable=False)
author = Column(Text)
publisher = Column(Text)
published = Column(DateTime(timezone=True))
language = Column(Text)
edition = Column(Text)
series = Column(Text)
series_number = Column(Integer)
total_pages = Column(Integer)
file_path = Column(Text)
tags = Column(ARRAY(Text), nullable=False, server_default="{}")
# Metadata from ebook parser
book_metadata = Column(JSONB, name="metadata")
created_at = Column(DateTime(timezone=True), server_default=func.now())
__table_args__ = (
Index("book_isbn_idx", "isbn"),
Index("book_author_idx", "author"),
Index("book_title_idx", "title"),
)
def as_payload(self) -> dict:
return {
"source_id": self.id,
"isbn": self.isbn,
"title": self.title,
"author": self.author,
"publisher": self.publisher,
"published": self.published,
"language": self.language,
"edition": self.edition,
"series": self.series,
"series_number": self.series_number,
"tags": self.tags,
} | (cast(dict, self.book_metadata) or {})
class BookSection(SourceItem):
"""Individual sections/chapters of books"""
__tablename__ = "book_section"
id = Column(
BigInteger, ForeignKey("source_item.id", ondelete="CASCADE"), primary_key=True
)
title = Column(Text)
author = Column(Text)
chapter = Column(Text)
published = Column(DateTime(timezone=True))
book_id = Column(
BigInteger, ForeignKey("book.id", ondelete="CASCADE"), nullable=False
)
__mapper_args__ = {
"polymorphic_identity": "book_doc",
section_title = Column(Text)
section_number = Column(Integer)
section_level = Column(Integer) # 1=chapter, 2=section, 3=subsection
start_page = Column(Integer)
end_page = Column(Integer)
# Parent-child relationships for nested sections
parent_section_id = Column(BigInteger, ForeignKey("book_section.id"))
book = relationship("Book", backref="sections")
parent = relationship(
"BookSection",
remote_side=[id],
backref="children",
foreign_keys=[parent_section_id],
)
__mapper_args__ = {"polymorphic_identity": "book_section"}
__table_args__ = (
Index("book_section_book_idx", "book_id"),
Index("book_section_parent_idx", "parent_section_id"),
Index("book_section_level_idx", "section_level", "section_number"),
)
def as_payload(self) -> dict:
return {
"source_id": self.id,
"book_id": self.book_id,
"section_title": self.section_title,
"section_number": self.section_number,
"section_level": self.section_level,
"start_page": self.start_page,
"end_page": self.end_page,
"tags": self.tags,
}

View File

@ -1,6 +1,6 @@
import logging
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any, cast
from typing import Any, cast
from pathlib import Path
import fitz # PyMuPDF
@ -14,9 +14,9 @@ class Section:
title: str
content: str
number: Optional[int] = None
start_page: Optional[int] = None
end_page: Optional[int] = None
number: int | None = None
start_page: int | None = None
end_page: int | None = None
children: list["Section"] = field(default_factory=list)
@ -26,11 +26,12 @@ class Ebook:
title: str
author: str
metadata: Dict[str, Any] = field(default_factory=dict)
sections: List[Section] = field(default_factory=list)
file_path: Path
metadata: dict[str, Any] = field(default_factory=dict)
sections: list[Section] = field(default_factory=list)
full_content: str = ""
file_path: Optional[Path] = None
file_type: str = ""
n_pages: int = 0
class Peekable:
@ -65,7 +66,7 @@ class Peekable:
TOCItem = tuple[int, str, int]
def extract_epub_metadata(doc) -> Dict[str, Any]:
def extract_epub_metadata(doc) -> dict[str, Any]:
"""Extract metadata from a PyMuPDF document (EPUB)."""
if not doc.metadata:
return {}
@ -117,7 +118,7 @@ def extract_section_pages(doc, toc: Peekable, section_num: int = 1) -> Section |
)
def extract_sections(doc) -> List[Section]:
def extract_sections(doc) -> list[Section]:
"""Extract all sections from a PyMuPDF document."""
toc = doc.get_toc()
if not toc:
@ -178,4 +179,5 @@ def parse_ebook(file_path: str | Path) -> Ebook:
full_content=full_content,
file_path=path,
file_type=path.suffix.lower()[1:],
n_pages=doc.page_count,
)

View File

@ -0,0 +1,244 @@
import hashlib
import logging
from pathlib import Path
from typing import Iterable, cast
from memory.common import embedding, qdrant, settings
from memory.common.db.connection import make_session
from memory.common.db.models import Book, BookSection
from memory.common.parsers.ebook import Ebook, parse_ebook, Section
from memory.workers.celery_app import app
logger = logging.getLogger(__name__)
SYNC_BOOK = "memory.workers.tasks.book.sync_book"
# Minimum section length to embed (avoid noise from very short sections)
MIN_SECTION_LENGTH = 100
def create_book_from_ebook(ebook, tags: Iterable[str] = []) -> Book:
"""Create a Book model from parsed ebook data."""
return Book(
title=ebook.title,
author=ebook.author,
publisher=ebook.metadata.get("creator"),
language=ebook.metadata.get("language"),
total_pages=ebook.n_pages,
file_path=ebook.file_path.as_posix(),
book_metadata=ebook.metadata,
tags=tags,
)
def section_processor(
book: Book,
all_sections: list[BookSection],
section_map: dict[
tuple[int, int | None], tuple[BookSection, tuple[int, int | None] | None]
],
):
def process_section(
section: Section,
level: int = 1,
parent_key: tuple[int, int | None] | None = None,
):
if len(section.content.strip()) >= MIN_SECTION_LENGTH:
sha256 = hashlib.sha256(
f"{book.id}:{section.title}:{section.start_page}".encode()
).digest()
book_section = BookSection(
book_id=book.id,
section_title=section.title,
section_number=section.number,
section_level=level,
start_page=section.start_page,
end_page=section.end_page,
parent_section_id=None, # Will be set after flush
content=section.content,
sha256=sha256,
modality="book",
tags=book.tags,
)
all_sections.append(book_section)
section_key = (level, section.number)
section_map[section_key] = (book_section, parent_key)
# Process children
for child in section.children:
process_section(child, level + 1, section_key)
return process_section
def create_all_sections(
ebook_sections: list[Section], book: Book
) -> tuple[list[BookSection], dict]:
"""Create all sections iteratively to handle parent-child relationships properly."""
all_sections = []
section_map = {} # Maps (level, number) to section for parent lookup
process_section = section_processor(book, all_sections, section_map)
for section in ebook_sections:
process_section(section)
return all_sections, section_map
def validate_and_parse_book(file_path: str) -> Ebook:
"""Validate file exists and parse the ebook."""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"Book file not found: {path}")
try:
return parse_ebook(path)
except Exception as e:
logger.error(f"Failed to parse ebook {path}: {e}")
raise
def create_book_and_sections(
ebook, session, tags: Iterable[str] = []
) -> tuple[Book, list[BookSection]]:
"""Create book and all its sections with proper relationships."""
# Create book
book = create_book_from_ebook(ebook, tags)
session.add(book)
session.flush() # Get the book ID
# Create all sections
all_sections, section_map = create_all_sections(ebook.sections, book)
session.add_all(all_sections)
session.flush()
for book_section, parent_key in section_map.values():
if parent_key and parent_key in section_map:
parent_section = section_map[parent_key][0]
book_section.parent_section_id = cast(int, parent_section.id)
return book, all_sections
def embed_sections(all_sections: list[BookSection]) -> int:
"""Embed all sections and return count of successfully embedded sections."""
embedded_count = 0
for section in all_sections:
try:
_, chunks = embedding.embed(
"text/plain",
cast(str, section.content),
metadata=section.as_payload(),
)
if chunks:
section.chunks = chunks
section.embed_status = "QUEUED" # type: ignore
embedded_count += 1
else:
section.embed_status = "FAILED" # type: ignore
logger.warning(
f"No chunks generated for section: {section.section_title}"
)
except IOError as e:
section.embed_status = "FAILED" # type: ignore
logger.error(f"Failed to embed section {section.section_title}: {e}")
return embedded_count
def push_to_qdrant(all_sections: list[BookSection]):
"""Push embeddings to Qdrant for all successfully embedded sections."""
vector_ids = []
vectors = []
payloads = []
to_process = [s for s in all_sections if cast(str, s.embed_status) == "QUEUED"]
all_chunks = [chunk for section in to_process for chunk in section.chunks]
if not all_chunks:
return
vector_ids = [str(chunk.id) for chunk in all_chunks]
vectors = [chunk.vector for chunk in all_chunks]
payloads = [chunk.item_metadata for chunk in all_chunks]
qdrant.upsert_vectors(
client=qdrant.get_qdrant_client(),
collection_name="book",
ids=vector_ids,
vectors=vectors,
payloads=payloads,
)
for section in to_process:
section.embed_status = "STORED" # type: ignore
@app.task(name=SYNC_BOOK)
def sync_book(file_path: str, tags: Iterable[str] = []) -> dict:
"""
Synchronize a book from a file path.
Args:
file_path: Path to the ebook file
Returns:
dict: Summary of what was processed
"""
ebook = validate_and_parse_book(file_path)
with make_session() as session:
# Check for existing book
existing_book = (
session.query(Book)
.filter(Book.file_path == ebook.file_path.as_posix())
.first()
)
if existing_book:
logger.info(f"Book already exists: {existing_book.title}")
return {
"book_id": existing_book.id,
"title": existing_book.title,
"author": existing_book.author,
"status": "already_exists",
"sections_processed": 0,
}
# Create book and sections with relationships
book, all_sections = create_book_and_sections(ebook, session, tags)
# Embed sections
embedded_count = embed_sections(all_sections)
session.flush()
# Push to Qdrant
try:
push_to_qdrant(all_sections)
except Exception as e:
logger.error(f"Failed to push embeddings to Qdrant: {e}")
# Mark sections as failed
for section in all_sections:
if getattr(section, "embed_status") == "STORED":
section.embed_status = "FAILED" # type: ignore
raise
session.commit()
logger.info(
f"Successfully processed book: {book.title} "
f"({embedded_count}/{len(all_sections)} sections embedded)"
)
return {
"book_id": book.id,
"title": book.title,
"author": book.author,
"status": "processed",
"total_sections": len(all_sections),
"sections_embedded": embedded_count,
}

View File

@ -0,0 +1,318 @@
import pytest
from pathlib import Path
from unittest.mock import patch, Mock
from memory.common.db.models import Book, BookSection, Chunk
from memory.common.parsers.ebook import Ebook, Section
from memory.workers.tasks import ebook
@pytest.fixture
def mock_ebook():
"""Mock ebook data for testing."""
return Ebook(
title="Test Book",
author="Test Author",
metadata={"language": "en", "creator": "Test Publisher"},
sections=[
Section(
title="Chapter 1",
content="This is the content of chapter 1. "
* 20, # Make it long enough
number=1,
start_page=1,
end_page=10,
children=[
Section(
title="Section 1.1",
content="This is section 1.1 content. " * 15,
number=1,
start_page=1,
end_page=5,
),
Section(
title="Section 1.2",
content="This is section 1.2 content. " * 15,
number=2,
start_page=6,
end_page=10,
),
],
),
Section(
title="Chapter 2",
content="This is the content of chapter 2. " * 20,
number=2,
start_page=11,
end_page=20,
),
],
file_path=Path("/test/book.epub"),
n_pages=20,
)
@pytest.fixture(autouse=True)
def mock_embedding():
"""Mock the embedding function to return dummy vectors."""
with patch("memory.workers.tasks.ebook.embedding.embed") as mock:
mock.return_value = (
"book",
[
Chunk(
vector=[0.1] * 1024,
item_metadata={"test": "data"},
content="Test content",
embedding_model="model",
)
],
)
yield mock
@pytest.fixture
def mock_qdrant():
"""Mock Qdrant operations."""
with (
patch("memory.workers.tasks.ebook.qdrant.upsert_vectors") as mock_upsert,
patch("memory.workers.tasks.ebook.qdrant.get_qdrant_client") as mock_client,
):
mock_client.return_value = Mock()
yield mock_upsert
def test_create_book_from_ebook(mock_ebook):
"""Test creating a Book model from ebook data."""
book = ebook.create_book_from_ebook(mock_ebook)
assert book.title == "Test Book" # type: ignore
assert book.author == "Test Author" # type: ignore
assert book.publisher == "Test Publisher" # type: ignore
assert book.language == "en" # type: ignore
assert book.file_path == "/test/book.epub" # type: ignore
assert book.total_pages == 20 # type: ignore
assert book.book_metadata == { # type: ignore
"language": "en",
"creator": "Test Publisher",
}
def test_validate_and_parse_book_success(mock_ebook, tmp_path):
"""Test successful book validation and parsing."""
book_file = tmp_path / "test.epub"
book_file.write_text("dummy content")
with patch("memory.workers.tasks.ebook.parse_ebook", return_value=mock_ebook):
assert ebook.validate_and_parse_book(str(book_file)) == mock_ebook
def test_validate_and_parse_book_file_not_found():
"""Test handling of missing files."""
with pytest.raises(FileNotFoundError):
ebook.validate_and_parse_book("/nonexistent/file.epub")
def test_validate_and_parse_book_parse_error(tmp_path):
"""Test handling of parsing errors."""
book_file = tmp_path / "corrupted.epub"
book_file.write_text("corrupted data")
with patch(
"memory.workers.tasks.ebook.parse_ebook", side_effect=Exception("Parse error")
):
with pytest.raises(Exception, match="Parse error"):
ebook.validate_and_parse_book(str(book_file))
def test_create_book_and_sections(mock_ebook, db_session):
"""Test creating book and sections with relationships."""
book, sections = ebook.create_book_and_sections(mock_ebook, db_session)
# Verify book creation
assert book.title == "Test Book" # type: ignore
assert book.id is not None
# Verify sections creation
assert len(sections) == 4 # Chapter 1, Section 1.1, Section 1.2, Chapter 2
# Verify parent-child relationships
chapter1 = next(s for s in sections if getattr(s, "section_title") == "Chapter 1")
section11 = next(
s for s in sections if getattr(s, "section_title") == "Section 1.1"
)
section12 = next(
s for s in sections if getattr(s, "section_title") == "Section 1.2"
)
# Children should reference chapter 1 as parent
assert getattr(section11, "parent_section_id") == chapter1.id
assert getattr(section12, "parent_section_id") == chapter1.id
# Chapter 1 should have no parent
assert getattr(chapter1, "parent_section_id") is None
def test_embed_sections(db_session, mock_embedding):
"""Test basic embedding sections workflow."""
# Create a test book first
book = Book(
title="Test Book",
author="Test Author",
file_path="/test/path",
)
db_session.add(book)
db_session.flush() # Get the book ID
# Create test sections with all required fields
sections = [
BookSection(
book_id=book.id,
section_title="Test Section",
section_number=1,
section_level=1,
start_page=1,
end_page=10,
content="Test content " * 20,
sha256=b"test_hash",
modality="book",
tags=["book"],
)
]
db_session.add_all(sections)
db_session.flush()
embedded_count = ebook.embed_sections(sections)
assert embedded_count >= 0
assert hasattr(sections[0], "embed_status")
def test_push_to_qdrant(qdrant):
"""Test pushing embeddings to Qdrant."""
# Create test sections with chunks
mock_chunk = Mock(
id="00000000-0000-0000-0000-000000000000",
vector=[0.1] * 1024,
item_metadata={"test": "data"},
)
mock_section = Mock(spec=BookSection)
mock_section.embed_status = "QUEUED"
mock_section.chunks = [mock_chunk]
sections = [mock_section]
ebook.push_to_qdrant(sections) # type: ignore
assert {r.id: r.payload for r in qdrant.scroll(collection_name="book")[0]} == {
"00000000-0000-0000-0000-000000000000": {
"test": "data",
}
}
assert mock_section.embed_status == "STORED"
@patch("memory.workers.tasks.ebook.parse_ebook")
def test_sync_book_success(mock_parse, mock_ebook, db_session, tmp_path):
"""Test successful book synchronization."""
book_file = tmp_path / "test.epub"
book_file.write_text("dummy content")
mock_ebook.file_path = book_file
mock_parse.return_value = mock_ebook
result = ebook.sync_book(str(book_file), {"source", "test"})
assert result == {
"book_id": 1,
"title": "Test Book",
"author": "Test Author",
"status": "processed",
"total_sections": 4,
"sections_embedded": 4,
}
book = db_session.query(Book).filter(Book.title == "Test Book").first()
assert book is not None
assert book.author == "Test Author"
assert set(book.tags) == {"source", "test"}
sections = (
db_session.query(BookSection).filter(BookSection.book_id == book.id).all()
)
assert len(sections) == 4
@patch("memory.workers.tasks.ebook.parse_ebook")
def test_sync_book_already_exists(mock_parse, mock_ebook, db_session, tmp_path):
"""Test that duplicate books are not processed."""
book_file = tmp_path / "test.epub"
book_file.write_text("dummy content")
existing_book = Book(
title="Existing Book",
author="Author",
file_path=str(book_file),
)
db_session.add(existing_book)
db_session.commit()
mock_ebook.file_path = book_file
mock_parse.return_value = mock_ebook
assert ebook.sync_book(str(book_file)) == {
"book_id": existing_book.id,
"title": "Existing Book",
"author": "Author",
"status": "already_exists",
"sections_processed": 0,
}
@patch("memory.workers.tasks.ebook.parse_ebook")
def test_sync_book_embedding_failure(
mock_parse, mock_ebook, db_session, tmp_path, mock_embedding
):
"""Test handling of embedding failures."""
book_file = tmp_path / "test.epub"
book_file.write_text("dummy content")
mock_ebook.file_path = book_file
mock_parse.return_value = mock_ebook
mock_embedding.side_effect = IOError("Embedding failed")
assert ebook.sync_book(str(book_file)) == {
"book_id": 1,
"title": "Test Book",
"author": "Test Author",
"status": "processed",
"sections_embedded": 0,
"total_sections": 4,
}
sections = db_session.query(BookSection).all()
for section in sections:
assert section.embed_status == "FAILED"
@patch("memory.workers.tasks.ebook.parse_ebook")
def test_sync_book_qdrant_failure(mock_parse, mock_ebook, db_session, tmp_path):
"""Test handling of Qdrant failures."""
book_file = tmp_path / "test.epub"
book_file.write_text("dummy content")
mock_ebook.file_path = book_file
mock_parse.return_value = mock_ebook
# Since embedding is already failing, this test will complete without hitting Qdrant
# So let's just verify that the function completes without raising an exception
with patch.object(ebook, "push_to_qdrant", side_effect=Exception("Qdrant failed")):
with pytest.raises(Exception, match="Qdrant failed"):
ebook.sync_book(str(book_file))
def test_sync_book_file_not_found():
"""Test handling of missing files."""
with pytest.raises(FileNotFoundError):
ebook.sync_book("/nonexistent/file.epub")