add ebook parser

This commit is contained in:
Daniel O'Connell 2025-05-21 00:49:27 +02:00
parent 4f1ca777e9
commit b292baf59d
4 changed files with 578 additions and 2 deletions

View File

@ -5,4 +5,5 @@ alembic==1.13.1
dotenv==0.9.9 dotenv==0.9.9
voyageai==0.3.2 voyageai==0.3.2
qdrant-client==1.9.0 qdrant-client==1.9.0
PyMuPDF==1.25.5 PyMuPDF==1.25.5
ebooklib==0.18.0

View File

@ -0,0 +1,181 @@
import logging
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any, cast
from pathlib import Path
import fitz # PyMuPDF
logger = logging.getLogger(__name__)
@dataclass
class Section:
"""Represents a chapter or section in an ebook."""
title: str
content: str
number: Optional[int] = None
start_page: Optional[int] = None
end_page: Optional[int] = None
children: list["Section"] = field(default_factory=list)
@dataclass
class Ebook:
"""Structured representation of an ebook."""
title: str
author: str
metadata: Dict[str, Any] = field(default_factory=dict)
sections: List[Section] = field(default_factory=list)
full_content: str = ""
file_path: Optional[Path] = None
file_type: str = ""
class Peekable:
def __init__(self, items):
self.items = items
self.done = False
self._get_next()
def _get_next(self):
try:
self.cached = next(self.items)
except StopIteration:
self.done = True
def peek(self):
if self.done:
return None
return self.cached
def __iter__(self):
return self
def __next__(self):
if self.done:
raise StopIteration
item = self.cached
self._get_next()
return item
TOCItem = tuple[int, str, int]
def extract_epub_metadata(doc) -> Dict[str, Any]:
"""Extract metadata from a PyMuPDF document (EPUB)."""
if not doc.metadata:
return {}
return {key: value for key, value in doc.metadata.items() if value}
def get_pages(doc, start_page: int, end_page: int) -> str:
pages = [
doc[page_num].get_text()
for page_num in range(start_page, end_page + 1)
if 0 <= page_num < doc.page_count
]
return "\n".join(pages)
def extract_section_pages(doc, toc: Peekable, section_num: int = 1) -> Section | None:
"""Extract all sections from a table of contents."""
if not toc.peek():
return None
item = cast(TOCItem | None, next(toc))
if not item:
return None
level, name, page = item
next_item = cast(TOCItem | None, toc.peek())
if not next_item:
return Section(
title=name,
content=get_pages(doc, page, doc.page_count),
number=section_num,
start_page=page,
end_page=doc.page_count,
)
children = []
while next_item and next_item[0] > level:
children.append(extract_section_pages(doc, toc, len(children) + 1))
next_item = cast(TOCItem | None, toc.peek())
last_page = next_item[2] - 1 if next_item else doc.page_count
return Section(
title=name,
content=get_pages(doc, page, last_page),
number=section_num,
start_page=page,
end_page=last_page,
children=children,
)
def extract_sections(doc) -> List[Section]:
"""Extract all sections from a PyMuPDF document."""
toc = doc.get_toc()
if not toc:
return [
Section(
title="Content",
content=doc.get_text(),
number=1,
start_page=0,
end_page=doc.page_count,
)
]
sections = []
toc = Peekable(iter(doc.get_toc()))
while toc.peek():
section = extract_section_pages(doc, toc, len(sections) + 1)
if section:
sections.append(section)
return sections
def parse_ebook(file_path: str | Path) -> Ebook:
"""
Parse an ebook file and extract its content and metadata.
Args:
file_path: Path to the ebook file
Returns:
Structured ebook data
"""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"File not found: {path}")
try:
doc = fitz.open(str(path))
except fitz.FileNotFoundError as e:
logger.error(f"Error opening ebook {path}: {e}")
raise
metadata = extract_epub_metadata(doc)
title = metadata.get("title", path.stem)
author = metadata.get("author", "Unknown")
sections = extract_sections(doc)
full_content = ""
if sections:
full_content = "".join(section.content for section in sections)
return Ebook(
title=title,
author=author,
metadata=metadata,
sections=sections,
full_content=full_content,
file_path=path,
file_type=path.suffix.lower()[1:],
)

View File

@ -3,7 +3,7 @@ import hashlib
import logging import logging
from datetime import datetime from datetime import datetime
from email.utils import parsedate_to_datetime from email.utils import parsedate_to_datetime
from typing import TypedDict, Literal from typing import TypedDict
import pathlib import pathlib
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@ -0,0 +1,394 @@
from unittest.mock import MagicMock, patch
import pytest
import fitz
from memory.common.parsers.ebook import (
Peekable,
extract_epub_metadata,
get_pages,
extract_section_pages,
extract_sections,
parse_ebook,
Section,
)
def test_peekable_peek():
p = Peekable(iter([1, 2, 3]))
assert p.peek() == 1
assert p.peek() == 1 # Multiple peeks don't advance
def test_peekable_iteration():
p = Peekable(iter([1, 2, 3]))
assert list(p) == [1, 2, 3]
def test_peekable_empty():
p = Peekable(iter([]))
assert p.peek() is None
assert list(p) == []
@pytest.fixture
def mock_doc():
doc = MagicMock()
doc.metadata = {
"title": "Test Book",
"author": "Test Author",
"creator": "Test Creator",
"producer": "Test Producer",
}
doc.page_count = 5
# Mock pages
doc.__getitem__.side_effect = lambda i: MagicMock(
get_text=lambda: f"Content of page {i}"
)
# Mock TOC
doc.get_toc.return_value = [
[1, "Chapter 1", 0],
[2, "Section 1.1", 1],
[2, "Section 1.2", 2],
[1, "Chapter 2", 3],
[2, "Section 2.1", 4],
]
return doc
@pytest.mark.parametrize(
"metadata_input,expected",
[
({"title": "Book", "author": "Author"}, {"title": "Book", "author": "Author"}),
(
{"title": "", "author": "Author"},
{"author": "Author"},
), # Empty strings should be filtered
(
{"title": None, "author": "Author"},
{"author": "Author"},
), # None values should be filtered
({}, {}), # Empty dict
],
)
def test_extract_epub_metadata(metadata_input, expected):
doc = MagicMock()
doc.metadata = metadata_input
assert extract_epub_metadata(doc) == expected
@pytest.mark.parametrize(
"start_page,end_page,expected_content",
[
(0, 2, "Content of page 0\nContent of page 1\nContent of page 2"),
(3, 4, "Content of page 3\nContent of page 4"),
(4, 4, "Content of page 4"),
(
0,
10,
"Content of page 0\nContent of page 1\nContent of page 2\nContent of page 3\nContent of page 4",
), # Out of range
(5, 10, ""), # Completely out of range
(3, 2, ""), # Invalid range (start > end)
(
-1,
2,
"Content of page 0\nContent of page 1\nContent of page 2",
), # Negative start
],
)
def test_get_pages(mock_doc, start_page, end_page, expected_content):
assert get_pages(mock_doc, start_page, end_page) == expected_content
@pytest.fixture
def mock_toc_items():
items = [
(1, "Chapter 1", 0), # Level 1, start at page 0
(2, "Section 1.1", 1), # Level 2, start at page 1
(2, "Section 1.2", 2), # Level 2, start at page 2
(1, "Chapter 2", 3), # Level 1, start at page 3
]
return Peekable(iter(items))
def test_extract_section_pages(mock_doc, mock_toc_items):
assert extract_section_pages(mock_doc, mock_toc_items) == Section(
title="Chapter 1",
number=1,
start_page=0,
end_page=2,
content="Content of page 0\nContent of page 1\nContent of page 2",
children=[
Section(
title="Section 1.1",
number=1,
start_page=1,
end_page=1,
content="Content of page 1",
),
Section(
title="Section 1.2",
number=2,
start_page=2,
end_page=2,
content="Content of page 2",
),
],
)
def test_extract_sections(mock_doc):
assert extract_sections(mock_doc) == [
Section(
title="Chapter 1",
number=1,
start_page=0,
end_page=2,
content="Content of page 0\nContent of page 1\nContent of page 2",
children=[
Section(
title="Section 1.1",
number=1,
start_page=1,
end_page=1,
content="Content of page 1",
),
Section(
title="Section 1.2",
number=2,
start_page=2,
end_page=2,
content="Content of page 2",
),
],
),
Section(
title="Chapter 2",
number=2,
start_page=3,
end_page=5,
content="Content of page 3\nContent of page 4",
children=[
Section(
title="Section 2.1",
number=1,
start_page=4,
end_page=5,
content="Content of page 4",
),
],
),
]
def test_extract_sections_no_toc(mock_doc):
mock_doc.get_toc.return_value = []
mock_doc.get_text.return_value = "Full document content"
assert extract_sections(mock_doc) == [
Section(
title="Content",
number=1,
start_page=0,
end_page=5,
content="Full document content",
children=[],
),
]
@patch("fitz.open")
def test_parse_ebook(mock_open, mock_doc, tmp_path):
mock_open.return_value = mock_doc
# Create a test file
test_file = tmp_path / "test.epub"
test_file.touch()
ebook = parse_ebook(test_file)
assert ebook.title == "Test Book"
assert ebook.author == "Test Author"
assert len(ebook.sections) == 2
assert ebook.file_path == test_file
assert ebook.file_type == "epub"
@patch("fitz.open")
def test_parse_ebook_file_not_found(mock_open, tmp_path):
non_existent_file = tmp_path / "does_not_exist.epub"
with pytest.raises(FileNotFoundError):
parse_ebook(non_existent_file)
@patch("fitz.open")
def test_parse_ebook_fitz_error(mock_open, tmp_path):
# Create a test file to avoid FileNotFoundError
test_file = tmp_path / "test.epub"
test_file.touch()
# Mock the fitz.open to raise the FileNotFoundError
mock_open.side_effect = fitz.FileNotFoundError("File not found by PyMuPDF")
with pytest.raises(fitz.FileNotFoundError):
parse_ebook(test_file)
@patch("fitz.open")
def test_parse_ebook_no_metadata(mock_open, mock_doc, tmp_path):
mock_doc.metadata = {}
mock_open.return_value = mock_doc
test_file = tmp_path / "test.epub"
test_file.touch()
ebook = parse_ebook(test_file)
assert ebook.title == "test" # Should use file stem
assert ebook.author == "Unknown"
@pytest.mark.parametrize(
"file_suffix,expected_type",
[
(".epub", "epub"),
(".pdf", "pdf"),
(".mobi", "mobi"),
(".EPUB", "epub"), # Test case insensitivity
("", ""), # No extension
],
)
@patch("fitz.open")
def test_parse_ebook_file_types(
mock_open, mock_doc, tmp_path, file_suffix, expected_type
):
mock_open.return_value = mock_doc
# Create a test file with the given suffix
test_file = tmp_path / f"test{file_suffix}"
test_file.touch()
ebook = parse_ebook(test_file)
assert ebook.file_type == expected_type
def test_extract_section_pages_empty():
"""Test with empty TOC."""
doc = MagicMock()
doc.page_count = 5
empty_toc = Peekable(iter([]))
assert extract_section_pages(doc, empty_toc) is None
def test_extract_section_pages_deeply_nested():
"""Test with deeply nested TOC structure."""
doc = MagicMock()
doc.page_count = 10
doc.__getitem__.side_effect = lambda i: MagicMock(
get_text=lambda: f"Content of page {i}"
)
# Create a deeply nested TOC structure
items = [
(1, "Chapter 1", 0),
(2, "Section 1.1", 1),
(3, "Subsection 1.1.1", 2),
(4, "Sub-subsection 1.1.1.1", 3),
(3, "Subsection 1.1.2", 4),
(2, "Section 1.2", 5),
(1, "Chapter 2", 6),
]
toc = Peekable(iter(items))
# Extract the top-level section
section = extract_section_pages(doc, toc)
assert section is not None
assert section.title == "Chapter 1"
assert section.start_page == 0
assert section.end_page == 5 # Before Chapter 2
assert len(section.children) == 2 # Two level-2 sections
# Check first level-2 section
section_1_1 = section.children[0]
assert section_1_1.title == "Section 1.1"
assert section_1_1.start_page == 1
assert section_1_1.end_page == 4 # Before Section 1.2
assert len(section_1_1.children) == 2 # Two level-3 sections
# Check first level-3 section
subsection_1_1_1 = section_1_1.children[0]
assert subsection_1_1_1.title == "Subsection 1.1.1"
assert subsection_1_1_1.start_page == 2
assert subsection_1_1_1.end_page == 3 # Before Subsection 1.1.2
assert len(subsection_1_1_1.children) == 1 # One level-4 section
# Check level-4 section
subsubsection = subsection_1_1_1.children[0]
assert subsubsection.title == "Sub-subsection 1.1.1.1"
assert subsubsection.start_page == 3
assert subsubsection.end_page == 3 # Just one page
assert len(subsubsection.children) == 0 # No children
def test_extract_sections_with_different_toc_formats():
"""Test ability to handle different TOC formats."""
doc = MagicMock()
doc.page_count = 5
doc.__getitem__.side_effect = lambda i: MagicMock(
get_text=lambda: f"Content of page {i}"
)
# Test with tuple format TOC
doc.get_toc.return_value = [
(1, "Chapter 1", 0),
(1, "Chapter 2", 3),
]
sections = extract_sections(doc)
assert len(sections) == 2
assert sections[0].title == "Chapter 1"
assert sections[1].title == "Chapter 2"
# Test with list format TOC (same representation in PyMuPDF)
doc.get_toc.return_value = [
[1, "Chapter 1", 0],
[1, "Chapter 2", 3],
]
sections = extract_sections(doc)
assert len(sections) == 2
assert sections[0].title == "Chapter 1"
assert sections[1].title == "Chapter 2"
@patch("fitz.open")
def test_parse_ebook_full_content_generation(mock_open, mock_doc, tmp_path):
"""Test full content is correctly concatenated from sections."""
# Setup mock document with sections
mock_doc.metadata = {"title": "Test Book", "author": "Test Author"}
mock_doc.page_count = 3
# Create sections with specific content
section1 = MagicMock()
section1.content = "Content of section 1"
section2 = MagicMock()
section2.content = "Content of section 2"
# Mock extract_sections to return our sections
with patch("memory.common.parsers.ebook.extract_sections") as mock_extract:
mock_extract.return_value = [section1, section2]
mock_open.return_value = mock_doc
test_file = tmp_path / "test.epub"
test_file.touch()
ebook = parse_ebook(test_file)
# Check the full content is concatenated correctly
assert ebook.full_content == "Content of section 1Content of section 2"