diff --git a/requirements-common.txt b/requirements-common.txt index bf32273..f7597b1 100644 --- a/requirements-common.txt +++ b/requirements-common.txt @@ -5,4 +5,5 @@ alembic==1.13.1 dotenv==0.9.9 voyageai==0.3.2 qdrant-client==1.9.0 -PyMuPDF==1.25.5 \ No newline at end of file +PyMuPDF==1.25.5 +ebooklib==0.18.0 \ No newline at end of file diff --git a/src/memory/common/parsers/ebook.py b/src/memory/common/parsers/ebook.py new file mode 100644 index 0000000..33fe227 --- /dev/null +++ b/src/memory/common/parsers/ebook.py @@ -0,0 +1,181 @@ +import logging +from dataclasses import dataclass, field +from typing import Optional, List, Dict, Any, cast +from pathlib import Path + +import fitz # PyMuPDF + +logger = logging.getLogger(__name__) + + +@dataclass +class Section: + """Represents a chapter or section in an ebook.""" + + title: str + content: str + number: Optional[int] = None + start_page: Optional[int] = None + end_page: Optional[int] = None + children: list["Section"] = field(default_factory=list) + + +@dataclass +class Ebook: + """Structured representation of an ebook.""" + + title: str + author: str + metadata: Dict[str, Any] = field(default_factory=dict) + sections: List[Section] = field(default_factory=list) + full_content: str = "" + file_path: Optional[Path] = None + file_type: str = "" + + +class Peekable: + def __init__(self, items): + self.items = items + self.done = False + self._get_next() + + def _get_next(self): + try: + self.cached = next(self.items) + except StopIteration: + self.done = True + + def peek(self): + if self.done: + return None + return self.cached + + def __iter__(self): + return self + + def __next__(self): + if self.done: + raise StopIteration + + item = self.cached + self._get_next() + return item + + +TOCItem = tuple[int, str, int] + + +def extract_epub_metadata(doc) -> Dict[str, Any]: + """Extract metadata from a PyMuPDF document (EPUB).""" + if not doc.metadata: + return {} + + return {key: value for key, value in doc.metadata.items() if value} + + +def get_pages(doc, start_page: int, end_page: int) -> str: + pages = [ + doc[page_num].get_text() + for page_num in range(start_page, end_page + 1) + if 0 <= page_num < doc.page_count + ] + return "\n".join(pages) + + +def extract_section_pages(doc, toc: Peekable, section_num: int = 1) -> Section | None: + """Extract all sections from a table of contents.""" + if not toc.peek(): + return None + item = cast(TOCItem | None, next(toc)) + if not item: + return None + + level, name, page = item + next_item = cast(TOCItem | None, toc.peek()) + if not next_item: + return Section( + title=name, + content=get_pages(doc, page, doc.page_count), + number=section_num, + start_page=page, + end_page=doc.page_count, + ) + + children = [] + while next_item and next_item[0] > level: + children.append(extract_section_pages(doc, toc, len(children) + 1)) + next_item = cast(TOCItem | None, toc.peek()) + + last_page = next_item[2] - 1 if next_item else doc.page_count + return Section( + title=name, + content=get_pages(doc, page, last_page), + number=section_num, + start_page=page, + end_page=last_page, + children=children, + ) + + +def extract_sections(doc) -> List[Section]: + """Extract all sections from a PyMuPDF document.""" + toc = doc.get_toc() + if not toc: + return [ + Section( + title="Content", + content=doc.get_text(), + number=1, + start_page=0, + end_page=doc.page_count, + ) + ] + + sections = [] + toc = Peekable(iter(doc.get_toc())) + while toc.peek(): + section = extract_section_pages(doc, toc, len(sections) + 1) + if section: + sections.append(section) + return sections + + +def parse_ebook(file_path: str | Path) -> Ebook: + """ + Parse an ebook file and extract its content and metadata. + + Args: + file_path: Path to the ebook file + + Returns: + Structured ebook data + """ + path = Path(file_path) + if not path.exists(): + raise FileNotFoundError(f"File not found: {path}") + + try: + doc = fitz.open(str(path)) + except fitz.FileNotFoundError as e: + logger.error(f"Error opening ebook {path}: {e}") + raise + + metadata = extract_epub_metadata(doc) + + title = metadata.get("title", path.stem) + author = metadata.get("author", "Unknown") + + sections = extract_sections(doc) + full_content = "" + if sections: + full_content = "".join(section.content for section in sections) + + return Ebook( + title=title, + author=author, + metadata=metadata, + sections=sections, + full_content=full_content, + file_path=path, + file_type=path.suffix.lower()[1:], + ) diff --git a/src/memory/common/parsers/email.py b/src/memory/common/parsers/email.py index 1680f2b..ac0501b 100644 --- a/src/memory/common/parsers/email.py +++ b/src/memory/common/parsers/email.py @@ -3,7 +3,7 @@ import hashlib import logging from datetime import datetime from email.utils import parsedate_to_datetime -from typing import TypedDict, Literal +from typing import TypedDict import pathlib logger = logging.getLogger(__name__) diff --git a/tests/memory/common/parsers/test_ebook.py b/tests/memory/common/parsers/test_ebook.py new file mode 100644 index 0000000..2172ce9 --- /dev/null +++ b/tests/memory/common/parsers/test_ebook.py @@ -0,0 +1,394 @@ +from unittest.mock import MagicMock, patch + +import pytest +import fitz + +from memory.common.parsers.ebook import ( + Peekable, + extract_epub_metadata, + get_pages, + extract_section_pages, + extract_sections, + parse_ebook, + Section, +) + + +def test_peekable_peek(): + p = Peekable(iter([1, 2, 3])) + assert p.peek() == 1 + assert p.peek() == 1 # Multiple peeks don't advance + + +def test_peekable_iteration(): + p = Peekable(iter([1, 2, 3])) + assert list(p) == [1, 2, 3] + + +def test_peekable_empty(): + p = Peekable(iter([])) + assert p.peek() is None + assert list(p) == [] + + +@pytest.fixture +def mock_doc(): + doc = MagicMock() + doc.metadata = { + "title": "Test Book", + "author": "Test Author", + "creator": "Test Creator", + "producer": "Test Producer", + } + doc.page_count = 5 + + # Mock pages + doc.__getitem__.side_effect = lambda i: MagicMock( + get_text=lambda: f"Content of page {i}" + ) + + # Mock TOC + doc.get_toc.return_value = [ + [1, "Chapter 1", 0], + [2, "Section 1.1", 1], + [2, "Section 1.2", 2], + [1, "Chapter 2", 3], + [2, "Section 2.1", 4], + ] + + return doc + + +@pytest.mark.parametrize( + "metadata_input,expected", + [ + ({"title": "Book", "author": "Author"}, {"title": "Book", "author": "Author"}), + ( + {"title": "", "author": "Author"}, + {"author": "Author"}, + ), # Empty strings should be filtered + ( + {"title": None, "author": "Author"}, + {"author": "Author"}, + ), # None values should be filtered + ({}, {}), # Empty dict + ], +) +def test_extract_epub_metadata(metadata_input, expected): + doc = MagicMock() + doc.metadata = metadata_input + assert extract_epub_metadata(doc) == expected + + +@pytest.mark.parametrize( + "start_page,end_page,expected_content", + [ + (0, 2, "Content of page 0\nContent of page 1\nContent of page 2"), + (3, 4, "Content of page 3\nContent of page 4"), + (4, 4, "Content of page 4"), + ( + 0, + 10, + "Content of page 0\nContent of page 1\nContent of page 2\nContent of page 3\nContent of page 4", + ), # Out of range + (5, 10, ""), # Completely out of range + (3, 2, ""), # Invalid range (start > end) + ( + -1, + 2, + "Content of page 0\nContent of page 1\nContent of page 2", + ), # Negative start + ], +) +def test_get_pages(mock_doc, start_page, end_page, expected_content): + assert get_pages(mock_doc, start_page, end_page) == expected_content + + +@pytest.fixture +def mock_toc_items(): + items = [ + (1, "Chapter 1", 0), # Level 1, start at page 0 + (2, "Section 1.1", 1), # Level 2, start at page 1 + (2, "Section 1.2", 2), # Level 2, start at page 2 + (1, "Chapter 2", 3), # Level 1, start at page 3 + ] + return Peekable(iter(items)) + + +def test_extract_section_pages(mock_doc, mock_toc_items): + assert extract_section_pages(mock_doc, mock_toc_items) == Section( + title="Chapter 1", + number=1, + start_page=0, + end_page=2, + content="Content of page 0\nContent of page 1\nContent of page 2", + children=[ + Section( + title="Section 1.1", + number=1, + start_page=1, + end_page=1, + content="Content of page 1", + ), + Section( + title="Section 1.2", + number=2, + start_page=2, + end_page=2, + content="Content of page 2", + ), + ], + ) + + +def test_extract_sections(mock_doc): + assert extract_sections(mock_doc) == [ + Section( + title="Chapter 1", + number=1, + start_page=0, + end_page=2, + content="Content of page 0\nContent of page 1\nContent of page 2", + children=[ + Section( + title="Section 1.1", + number=1, + start_page=1, + end_page=1, + content="Content of page 1", + ), + Section( + title="Section 1.2", + number=2, + start_page=2, + end_page=2, + content="Content of page 2", + ), + ], + ), + Section( + title="Chapter 2", + number=2, + start_page=3, + end_page=5, + content="Content of page 3\nContent of page 4", + children=[ + Section( + title="Section 2.1", + number=1, + start_page=4, + end_page=5, + content="Content of page 4", + ), + ], + ), + ] + + +def test_extract_sections_no_toc(mock_doc): + mock_doc.get_toc.return_value = [] + mock_doc.get_text.return_value = "Full document content" + + assert extract_sections(mock_doc) == [ + Section( + title="Content", + number=1, + start_page=0, + end_page=5, + content="Full document content", + children=[], + ), + ] + + +@patch("fitz.open") +def test_parse_ebook(mock_open, mock_doc, tmp_path): + mock_open.return_value = mock_doc + + # Create a test file + test_file = tmp_path / "test.epub" + test_file.touch() + + ebook = parse_ebook(test_file) + + assert ebook.title == "Test Book" + assert ebook.author == "Test Author" + assert len(ebook.sections) == 2 + assert ebook.file_path == test_file + assert ebook.file_type == "epub" + + +@patch("fitz.open") +def test_parse_ebook_file_not_found(mock_open, tmp_path): + non_existent_file = tmp_path / "does_not_exist.epub" + + with pytest.raises(FileNotFoundError): + parse_ebook(non_existent_file) + + +@patch("fitz.open") +def test_parse_ebook_fitz_error(mock_open, tmp_path): + # Create a test file to avoid FileNotFoundError + test_file = tmp_path / "test.epub" + test_file.touch() + + # Mock the fitz.open to raise the FileNotFoundError + mock_open.side_effect = fitz.FileNotFoundError("File not found by PyMuPDF") + + with pytest.raises(fitz.FileNotFoundError): + parse_ebook(test_file) + + +@patch("fitz.open") +def test_parse_ebook_no_metadata(mock_open, mock_doc, tmp_path): + mock_doc.metadata = {} + mock_open.return_value = mock_doc + + test_file = tmp_path / "test.epub" + test_file.touch() + + ebook = parse_ebook(test_file) + + assert ebook.title == "test" # Should use file stem + assert ebook.author == "Unknown" + + +@pytest.mark.parametrize( + "file_suffix,expected_type", + [ + (".epub", "epub"), + (".pdf", "pdf"), + (".mobi", "mobi"), + (".EPUB", "epub"), # Test case insensitivity + ("", ""), # No extension + ], +) +@patch("fitz.open") +def test_parse_ebook_file_types( + mock_open, mock_doc, tmp_path, file_suffix, expected_type +): + mock_open.return_value = mock_doc + + # Create a test file with the given suffix + test_file = tmp_path / f"test{file_suffix}" + test_file.touch() + + ebook = parse_ebook(test_file) + assert ebook.file_type == expected_type + + +def test_extract_section_pages_empty(): + """Test with empty TOC.""" + doc = MagicMock() + doc.page_count = 5 + + empty_toc = Peekable(iter([])) + assert extract_section_pages(doc, empty_toc) is None + + +def test_extract_section_pages_deeply_nested(): + """Test with deeply nested TOC structure.""" + doc = MagicMock() + doc.page_count = 10 + doc.__getitem__.side_effect = lambda i: MagicMock( + get_text=lambda: f"Content of page {i}" + ) + + # Create a deeply nested TOC structure + items = [ + (1, "Chapter 1", 0), + (2, "Section 1.1", 1), + (3, "Subsection 1.1.1", 2), + (4, "Sub-subsection 1.1.1.1", 3), + (3, "Subsection 1.1.2", 4), + (2, "Section 1.2", 5), + (1, "Chapter 2", 6), + ] + toc = Peekable(iter(items)) + + # Extract the top-level section + section = extract_section_pages(doc, toc) + assert section is not None + assert section.title == "Chapter 1" + assert section.start_page == 0 + assert section.end_page == 5 # Before Chapter 2 + assert len(section.children) == 2 # Two level-2 sections + + # Check first level-2 section + section_1_1 = section.children[0] + assert section_1_1.title == "Section 1.1" + assert section_1_1.start_page == 1 + assert section_1_1.end_page == 4 # Before Section 1.2 + assert len(section_1_1.children) == 2 # Two level-3 sections + + # Check first level-3 section + subsection_1_1_1 = section_1_1.children[0] + assert subsection_1_1_1.title == "Subsection 1.1.1" + assert subsection_1_1_1.start_page == 2 + assert subsection_1_1_1.end_page == 3 # Before Subsection 1.1.2 + assert len(subsection_1_1_1.children) == 1 # One level-4 section + + # Check level-4 section + subsubsection = subsection_1_1_1.children[0] + assert subsubsection.title == "Sub-subsection 1.1.1.1" + assert subsubsection.start_page == 3 + assert subsubsection.end_page == 3 # Just one page + assert len(subsubsection.children) == 0 # No children + + +def test_extract_sections_with_different_toc_formats(): + """Test ability to handle different TOC formats.""" + doc = MagicMock() + doc.page_count = 5 + doc.__getitem__.side_effect = lambda i: MagicMock( + get_text=lambda: f"Content of page {i}" + ) + + # Test with tuple format TOC + doc.get_toc.return_value = [ + (1, "Chapter 1", 0), + (1, "Chapter 2", 3), + ] + + sections = extract_sections(doc) + assert len(sections) == 2 + assert sections[0].title == "Chapter 1" + assert sections[1].title == "Chapter 2" + + # Test with list format TOC (same representation in PyMuPDF) + doc.get_toc.return_value = [ + [1, "Chapter 1", 0], + [1, "Chapter 2", 3], + ] + + sections = extract_sections(doc) + assert len(sections) == 2 + assert sections[0].title == "Chapter 1" + assert sections[1].title == "Chapter 2" + + +@patch("fitz.open") +def test_parse_ebook_full_content_generation(mock_open, mock_doc, tmp_path): + """Test full content is correctly concatenated from sections.""" + # Setup mock document with sections + mock_doc.metadata = {"title": "Test Book", "author": "Test Author"} + mock_doc.page_count = 3 + + # Create sections with specific content + section1 = MagicMock() + section1.content = "Content of section 1" + section2 = MagicMock() + section2.content = "Content of section 2" + + # Mock extract_sections to return our sections + with patch("memory.common.parsers.ebook.extract_sections") as mock_extract: + mock_extract.return_value = [section1, section2] + + mock_open.return_value = mock_doc + test_file = tmp_path / "test.epub" + test_file.touch() + + ebook = parse_ebook(test_file) + + # Check the full content is concatenated correctly + assert ebook.full_content == "Content of section 1Content of section 2"