From 1dd93929c1a486b9db3afb7ca92b114b2a5cacfb Mon Sep 17 00:00:00 2001 From: Daniel O'Connell Date: Sat, 31 May 2025 16:51:55 +0200 Subject: [PATCH] Add embedding for observations --- src/memory/common/collections.py | 11 + src/memory/common/db/models/source_item.py | 5 +- src/memory/common/db/models/source_items.py | 65 ++ src/memory/common/formatters/observation.py | 86 +++ .../test_source_item.py} | 452 ------------- .../common/db/models/test_source_items.py | 607 ++++++++++++++++++ .../common/formatters/test_observation.py | 220 +++++++ 7 files changed, 992 insertions(+), 454 deletions(-) create mode 100644 src/memory/common/formatters/observation.py rename tests/memory/common/db/{test_models.py => models/test_source_item.py} (61%) create mode 100644 tests/memory/common/db/models/test_source_items.py create mode 100644 tests/memory/common/formatters/test_observation.py diff --git a/src/memory/common/collections.py b/src/memory/common/collections.py index ae3e78b..76dffed 100644 --- a/src/memory/common/collections.py +++ b/src/memory/common/collections.py @@ -71,6 +71,17 @@ ALL_COLLECTIONS: dict[str, Collection] = { "distance": "Cosine", "model": settings.MIXED_EMBEDDING_MODEL, }, + # Observations + "semantic": { + "dimension": 1024, + "distance": "Cosine", + "model": settings.TEXT_EMBEDDING_MODEL, + }, + "temporal": { + "dimension": 1024, + "distance": "Cosine", + "model": settings.TEXT_EMBEDDING_MODEL, + }, } TEXT_COLLECTIONS = { coll diff --git a/src/memory/common/db/models/source_item.py b/src/memory/common/db/models/source_item.py index 6d8cf1d..b5656b4 100644 --- a/src/memory/common/db/models/source_item.py +++ b/src/memory/common/db/models/source_item.py @@ -103,9 +103,10 @@ def add_pics(chunk: str, images: list[Image.Image]) -> list[extract.MulitmodalCh def merge_metadata(*metadata: dict[str, Any]) -> dict[str, Any]: final = {} for m in metadata: - if tags := set(m.pop("tags", [])): + data = m.copy() + if tags := set(data.pop("tags", [])): final["tags"] = tags | final.get("tags", set()) - final |= m + final |= data return final diff --git a/src/memory/common/db/models/source_items.py b/src/memory/common/db/models/source_items.py index 2dcf47e..ceb7e7d 100644 --- a/src/memory/common/db/models/source_items.py +++ b/src/memory/common/db/models/source_items.py @@ -27,6 +27,7 @@ from sqlalchemy.orm import relationship from memory.common import settings import memory.common.extract as extract import memory.common.summarizer as summarizer +import memory.common.formatters.observation as observation from memory.common.db.models.source_item import ( SourceItem, @@ -568,3 +569,67 @@ class AgentObservation(SourceItem): def all_contradictions(self): """Get all contradictions involving this observation.""" return self.contradictions_as_first + self.contradictions_as_second + + def data_chunks(self, metadata: dict[str, Any] = {}) -> Sequence[extract.DataChunk]: + """ + Generate multiple chunks for different embedding dimensions. + Each chunk goes to a different Qdrant collection for specialized search. + """ + chunks = [] + + # 1. Semantic chunk - standard content representation + semantic_text = observation.generate_semantic_text( + cast(str, self.subject), + cast(str, self.observation_type), + cast(str, self.content), + cast(observation.Evidence, self.evidence), + ) + chunks.append( + extract.DataChunk( + data=[semantic_text], + metadata=merge_metadata(metadata, {"embedding_type": "semantic"}), + collection_name="semantic", + ) + ) + + # 2. Temporal chunk - time-aware representation + temporal_text = observation.generate_temporal_text( + cast(str, self.subject), + cast(str, self.content), + cast(float, self.confidence), + cast(datetime, self.inserted_at), + ) + chunks.append( + extract.DataChunk( + data=[temporal_text], + metadata=merge_metadata(metadata, {"embedding_type": "temporal"}), + collection_name="temporal", + ) + ) + + # TODO: Add more embedding dimensions here: + # 3. Epistemic chunk - belief structure focused + # epistemic_text = self._generate_epistemic_text() + # chunks.append(extract.DataChunk( + # data=[epistemic_text], + # metadata={**base_metadata, "embedding_type": "epistemic"}, + # collection_name="observations_epistemic" + # )) + # + # 4. Emotional chunk - emotional context focused + # emotional_text = self._generate_emotional_text() + # chunks.append(extract.DataChunk( + # data=[emotional_text], + # metadata={**base_metadata, "embedding_type": "emotional"}, + # collection_name="observations_emotional" + # )) + # + # 5. Relational chunk - connection patterns focused + # relational_text = self._generate_relational_text() + # chunks.append(extract.DataChunk( + # data=[relational_text], + # metadata={**base_metadata, "embedding_type": "relational"}, + # collection_name="observations_relational" + # )) + + return chunks diff --git a/src/memory/common/formatters/observation.py b/src/memory/common/formatters/observation.py new file mode 100644 index 0000000..6f2819c --- /dev/null +++ b/src/memory/common/formatters/observation.py @@ -0,0 +1,86 @@ +from datetime import datetime +from typing import TypedDict + + +class Evidence(TypedDict): + quote: str + context: str + + +def generate_semantic_text( + subject: str, observation_type: str, content: str, evidence: Evidence +) -> str: + """Generate text optimized for semantic similarity search.""" + parts = [ + f"Subject: {subject}", + f"Type: {observation_type}", + f"Observation: {content}", + ] + + if not evidence or not isinstance(evidence, dict): + return " | ".join(parts) + + if "quote" in evidence: + parts.append(f"Quote: {evidence['quote']}") + if "context" in evidence: + parts.append(f"Context: {evidence['context']}") + + return " | ".join(parts) + + +def generate_temporal_text( + subject: str, + content: str, + confidence: float, + created_at: datetime, +) -> str: + """Generate text with temporal context for time-pattern search.""" + # Add temporal markers + time_of_day = created_at.strftime("%H:%M") + day_of_week = created_at.strftime("%A") + + # Categorize time periods + hour = created_at.hour + if 5 <= hour < 12: + time_period = "morning" + elif 12 <= hour < 17: + time_period = "afternoon" + elif 17 <= hour < 22: + time_period = "evening" + else: + time_period = "late_night" + + parts = [ + f"Time: {time_of_day} on {day_of_week} ({time_period})", + f"Subject: {subject}", + f"Observation: {content}", + f"Confidence: {confidence}", + ] + + return " | ".join(parts) + + +# TODO: Add more embedding dimensions here: +# 3. Epistemic chunk - belief structure focused +# epistemic_text = self._generate_epistemic_text() +# chunks.append(extract.DataChunk( +# data=[epistemic_text], +# metadata={**base_metadata, "embedding_type": "epistemic"}, +# collection_name="observations_epistemic" +# )) +# +# 4. Emotional chunk - emotional context focused +# emotional_text = self._generate_emotional_text() +# chunks.append(extract.DataChunk( +# data=[emotional_text], +# metadata={**base_metadata, "embedding_type": "emotional"}, +# collection_name="observations_emotional" +# )) +# +# 5. Relational chunk - connection patterns focused +# relational_text = self._generate_relational_text() +# chunks.append(extract.DataChunk( +# data=[relational_text], +# metadata={**base_metadata, "embedding_type": "relational"}, +# collection_name="observations_relational" +# )) diff --git a/tests/memory/common/db/test_models.py b/tests/memory/common/db/models/test_source_item.py similarity index 61% rename from tests/memory/common/db/test_models.py rename to tests/memory/common/db/models/test_source_item.py index da90fa8..5e12a6c 100644 --- a/tests/memory/common/db/test_models.py +++ b/tests/memory/common/db/models/test_source_item.py @@ -587,288 +587,6 @@ def test_chunk_constraint_validation( assert chunk.id is not None -@pytest.mark.parametrize( - "modality,expected_modality", - [ - (None, "email"), # Default case - ("custom", "custom"), # Override case - ], -) -def test_mail_message_modality(modality, expected_modality): - """Test MailMessage modality setting""" - kwargs = {"sha256": b"test", "content": "test"} - if modality is not None: - kwargs["modality"] = modality - - mail_message = MailMessage(**kwargs) - # The __init__ method should set the correct modality - assert hasattr(mail_message, "modality") - - -@pytest.mark.parametrize( - "sender,folder,expected_path", - [ - ("user@example.com", "INBOX", "user_example_com/INBOX"), - ("user+tag@example.com", "Sent Items", "user_tag_example_com/Sent_Items"), - ("user@domain.co.uk", None, "user_domain_co_uk/INBOX"), - ("user@domain.co.uk", "", "user_domain_co_uk/INBOX"), - ], -) -def test_mail_message_attachments_path(sender, folder, expected_path): - """Test MailMessage.attachments_path property""" - mail_message = MailMessage( - sha256=b"test", content="test", sender=sender, folder=folder - ) - - result = mail_message.attachments_path - assert str(result) == f"{settings.FILE_STORAGE_DIR}/emails/{expected_path}" - - -@pytest.mark.parametrize( - "filename,expected", - [ - ("document.pdf", "document.pdf"), - ("file with spaces.txt", "file_with_spaces.txt"), - ("file@#$%^&*().doc", "file.doc"), - ("no-extension", "no_extension"), - ("multiple.dots.in.name.txt", "multiple_dots_in_name.txt"), - ], -) -def test_mail_message_safe_filename(tmp_path, filename, expected): - """Test MailMessage.safe_filename method""" - mail_message = MailMessage( - sha256=b"test", content="test", sender="user@example.com", folder="INBOX" - ) - - expected = settings.FILE_STORAGE_DIR / f"emails/user_example_com/INBOX/{expected}" - assert mail_message.safe_filename(filename) == expected - - -@pytest.mark.parametrize( - "sent_at,expected_date", - [ - (datetime(2023, 1, 1, 12, 0, 0), "2023-01-01T12:00:00"), - (None, None), - ], -) -def test_mail_message_as_payload(sent_at, expected_date): - """Test MailMessage.as_payload method""" - - mail_message = MailMessage( - sha256=b"test", - content="test", - message_id="", - subject="Test Subject", - sender="sender@example.com", - recipients=["recipient1@example.com", "recipient2@example.com"], - folder="INBOX", - sent_at=sent_at, - tags=["tag1", "tag2"], - size=1024, - ) - # Manually set id for testing - object.__setattr__(mail_message, "id", 123) - - payload = mail_message.as_payload() - - expected = { - "source_id": 123, - "size": 1024, - "message_id": "", - "subject": "Test Subject", - "sender": "sender@example.com", - "recipients": ["recipient1@example.com", "recipient2@example.com"], - "folder": "INBOX", - "tags": [ - "tag1", - "tag2", - "sender@example.com", - "recipient1@example.com", - "recipient2@example.com", - ], - "date": expected_date, - } - assert payload == expected - - -def test_mail_message_parsed_content(): - """Test MailMessage.parsed_content property with actual email parsing""" - # Use a simple email format that the parser can handle - email_content = """From: sender@example.com -To: recipient@example.com -Subject: Test Subject - -Test Body Content""" - - mail_message = MailMessage( - sha256=b"test", content=email_content, message_id="" - ) - - result = mail_message.parsed_content - - # Just test that it returns a dict-like object - assert isinstance(result, dict) - assert "body" in result - - -def test_mail_message_body_property(): - """Test MailMessage.body property with actual email parsing""" - email_content = """From: sender@example.com -To: recipient@example.com -Subject: Test Subject - -Test Body Content""" - - mail_message = MailMessage( - sha256=b"test", content=email_content, message_id="" - ) - - assert mail_message.body == "Test Body Content" - - -def test_mail_message_display_contents(): - """Test MailMessage.display_contents property with actual email parsing""" - email_content = """From: sender@example.com -To: recipient@example.com -Subject: Test Subject - -Test Body Content""" - - mail_message = MailMessage( - sha256=b"test", content=email_content, message_id="" - ) - - expected = ( - "\nSubject: Test Subject\nFrom: \nTo: \nDate: \nBody: \nTest Body Content\n" - ) - assert mail_message.display_contents == expected - - -@pytest.mark.parametrize( - "created_at,expected_date", - [ - (datetime(2023, 1, 1, 12, 0, 0), "2023-01-01T12:00:00"), - (None, None), - ], -) -def test_email_attachment_as_payload(created_at, expected_date): - """Test EmailAttachment.as_payload method""" - attachment = EmailAttachment( - sha256=b"test", - filename="document.pdf", - mime_type="application/pdf", - size=1024, - mail_message_id=123, - created_at=created_at, - tags=["pdf", "document"], - ) - # Manually set id for testing - object.__setattr__(attachment, "id", 456) - - payload = attachment.as_payload() - - expected = { - "source_id": 456, - "filename": "document.pdf", - "content_type": "application/pdf", - "size": 1024, - "created_at": expected_date, - "mail_message_id": 123, - "tags": ["pdf", "document"], - } - assert payload == expected - - -@pytest.mark.parametrize( - "has_filename,content_source,expected_content", - [ - (True, "file", b"test file content"), - (False, "content", "attachment content"), - ], -) -@patch("memory.common.extract.extract_data_chunks") -def test_email_attachment_data_chunks( - mock_extract, has_filename, content_source, expected_content, tmp_path -): - """Test EmailAttachment.data_chunks method""" - from memory.common.extract import DataChunk - - mock_extract.return_value = [ - DataChunk(data=["extracted text"], metadata={"source": content_source}) - ] - - if has_filename: - # Create a test file - test_file = tmp_path / "test.txt" - test_file.write_bytes(b"test file content") - attachment = EmailAttachment( - sha256=b"test", - filename=str(test_file), - mime_type="text/plain", - mail_message_id=123, - ) - else: - attachment = EmailAttachment( - sha256=b"test", - content="attachment content", - filename=None, - mime_type="text/plain", - mail_message_id=123, - ) - - # Mock _make_chunk to return a simple chunk - mock_chunk = Mock() - with patch.object(attachment, "_make_chunk", return_value=mock_chunk) as mock_make: - result = attachment.data_chunks({"extra": "metadata"}) - - # Verify the method calls - mock_extract.assert_called_once_with("text/plain", expected_content) - mock_make.assert_called_once_with( - extract.DataChunk(data=["extracted text"], metadata={"source": content_source}), - {"extra": "metadata"}, - ) - assert result == [mock_chunk] - - -def test_email_attachment_cascade_delete(db_session: Session): - """Test that EmailAttachment is deleted when MailMessage is deleted""" - mail_message = MailMessage( - sha256=b"test_email", - content="test email", - message_id="", - subject="Test", - sender="sender@example.com", - recipients=["recipient@example.com"], - folder="INBOX", - ) - db_session.add(mail_message) - db_session.commit() - - attachment = EmailAttachment( - sha256=b"test_attachment", - content="attachment content", - mail_message=mail_message, - filename="test.txt", - mime_type="text/plain", - size=100, - modality="attachment", # Set modality explicitly - ) - db_session.add(attachment) - db_session.commit() - - attachment_id = attachment.id - - # Delete the mail message - db_session.delete(mail_message) - db_session.commit() - - # Verify the attachment was also deleted - deleted_attachment = ( - db_session.query(EmailAttachment).filter_by(id=attachment_id).first() - ) - assert deleted_attachment is None - - def test_subclass_deletion_cascades_to_source_item(db_session: Session): mail_message = MailMessage( sha256=b"test_email_cascade", @@ -928,173 +646,3 @@ def test_subclass_deletion_cascades_from_source_item(db_session: Session): # Verify both the MailMessage and SourceItem records are deleted assert db_session.query(MailMessage).filter_by(id=mail_message_id).first() is None assert db_session.query(SourceItem).filter_by(id=source_item_id).first() is None - - -@pytest.mark.parametrize( - "pages,expected_chunks", - [ - # No pages - ([], []), - # Single page - (["Page 1 content"], [("Page 1 content", {"type": "page"})]), - # Multiple pages - ( - ["Page 1", "Page 2", "Page 3"], - [ - ( - "Page 1\n\nPage 2\n\nPage 3", - {"type": "section", "tags": {"tag1", "tag2"}}, - ), - ("test", {"type": "summary", "tags": {"tag1", "tag2"}}), - ], - ), - # Empty/whitespace pages filtered out - (["", " ", "Page 3"], [("Page 3", {"type": "page"})]), - # All empty - no chunks created - (["", " ", " "], []), - ], -) -def test_book_section_data_chunks(pages, expected_chunks): - """Test BookSection.data_chunks with various page combinations""" - content = "\n\n".join(pages).strip() - book_section = BookSection( - sha256=b"test_section", - content=content, - modality="book", - book_id=1, - start_page=10, - end_page=10 + len(pages), - pages=pages, - book=Book(id=1, title="Test Book", author="Test Author"), - ) - - chunks = book_section.data_chunks() - expected = [ - (c, merge_metadata(book_section.as_payload(), m)) for c, m in expected_chunks - ] - assert [(c.content, c.item_metadata) for c in chunks] == expected - for c in chunks: - assert cast(list, c.file_paths) == [] - - -@pytest.mark.parametrize( - "content,expected", - [ - ("", []), - ( - "Short content", - [ - extract.DataChunk( - data=["Short content"], metadata={"tags": ["tag1", "tag2"]} - ) - ], - ), - ( - "This is a very long piece of content that should be chunked into multiple pieces when processed.", - [ - extract.DataChunk( - data=[ - "This is a very long piece of content that should be chunked into multiple pieces when processed." - ], - metadata={"tags": ["tag1", "tag2"]}, - ), - extract.DataChunk( - data=["This is a very long piece of content that"], - metadata={"tags": ["tag1", "tag2"]}, - ), - extract.DataChunk( - data=["should be chunked into multiple pieces when"], - metadata={"tags": ["tag1", "tag2"]}, - ), - extract.DataChunk( - data=["processed."], - metadata={"tags": ["tag1", "tag2"]}, - ), - extract.DataChunk( - data=["test"], - metadata={"tags": ["tag1", "tag2"]}, - ), - ], - ), - ], -) -def test_blog_post_chunk_contents(content, expected, default_chunk_size): - default_chunk_size(10) - blog_post = BlogPost( - sha256=b"test_blog", - content=content, - modality="blog", - url="https://example.com/post", - images=[], - ) - - with patch.object(chunker, "DEFAULT_CHUNK_TOKENS", 10): - assert blog_post._chunk_contents() == expected - - -def test_blog_post_chunk_contents_with_images(tmp_path): - """Test BlogPost._chunk_contents with images""" - # Create test image files - img1_path = tmp_path / "img1.jpg" - img2_path = tmp_path / "img2.jpg" - for img_path in [img1_path, img2_path]: - img = Image.new("RGB", (10, 10), color="red") - img.save(img_path) - - blog_post = BlogPost( - sha256=b"test_blog", - content="Content with images", - modality="blog", - url="https://example.com/post", - images=[str(img1_path), str(img2_path)], - ) - - result = blog_post._chunk_contents() - result = [ - [i if isinstance(i, str) else getattr(i, "filename") for i in c.data] - for c in result - ] - assert result == [ - ["Content with images", img1_path.as_posix(), img2_path.as_posix()] - ] - - -def test_blog_post_chunk_contents_with_image_long_content(tmp_path, default_chunk_size): - default_chunk_size(10) - img1_path = tmp_path / "img1.jpg" - img2_path = tmp_path / "img2.jpg" - for img_path in [img1_path, img2_path]: - img = Image.new("RGB", (10, 10), color="red") - img.save(img_path) - - blog_post = BlogPost( - sha256=b"test_blog", - content=f"First picture is here: {img1_path.as_posix()}\nSecond picture is here: {img2_path.as_posix()}", - modality="blog", - url="https://example.com/post", - images=[str(img1_path), str(img2_path)], - ) - - with patch.object(chunker, "DEFAULT_CHUNK_TOKENS", 10): - result = blog_post._chunk_contents() - - result = [ - [i if isinstance(i, str) else getattr(i, "filename") for i in c.data] - for c in result - ] - assert result == [ - [ - f"First picture is here: {img1_path.as_posix()}\nSecond picture is here: {img2_path.as_posix()}", - img1_path.as_posix(), - img2_path.as_posix(), - ], - [ - f"First picture is here: {img1_path.as_posix()}", - img1_path.as_posix(), - ], - [ - f"Second picture is here: {img2_path.as_posix()}", - img2_path.as_posix(), - ], - ["test"], - ] diff --git a/tests/memory/common/db/models/test_source_items.py b/tests/memory/common/db/models/test_source_items.py new file mode 100644 index 0000000..1a1bf55 --- /dev/null +++ b/tests/memory/common/db/models/test_source_items.py @@ -0,0 +1,607 @@ +from sqlalchemy.orm import Session +from unittest.mock import patch, Mock +from typing import cast +import pytest +from PIL import Image +from datetime import datetime +import uuid +from memory.common import settings, chunker, extract +from memory.common.db.models.sources import Book +from memory.common.db.models.source_items import ( + MailMessage, + EmailAttachment, + BookSection, + BlogPost, + AgentObservation, +) +from memory.common.db.models.source_item import merge_metadata + + +@pytest.fixture +def default_chunk_size(): + chunk_length = chunker.DEFAULT_CHUNK_TOKENS + real_chunker = chunker.chunk_text + + def chunk_text(text: str, max_tokens: int = 0): + max_tokens = max_tokens or chunk_length + return real_chunker(text, max_tokens=max_tokens) + + def set_size(new_size: int): + nonlocal chunk_length + chunk_length = new_size + + with patch.object(chunker, "chunk_text", chunk_text): + yield set_size + + +@pytest.mark.parametrize( + "modality,expected_modality", + [ + (None, "email"), # Default case + ("custom", "custom"), # Override case + ], +) +def test_mail_message_modality(modality, expected_modality): + """Test MailMessage modality setting""" + kwargs = {"sha256": b"test", "content": "test"} + if modality is not None: + kwargs["modality"] = modality + + mail_message = MailMessage(**kwargs) + # The __init__ method should set the correct modality + assert hasattr(mail_message, "modality") + + +@pytest.mark.parametrize( + "sender,folder,expected_path", + [ + ("user@example.com", "INBOX", "user_example_com/INBOX"), + ("user+tag@example.com", "Sent Items", "user_tag_example_com/Sent_Items"), + ("user@domain.co.uk", None, "user_domain_co_uk/INBOX"), + ("user@domain.co.uk", "", "user_domain_co_uk/INBOX"), + ], +) +def test_mail_message_attachments_path(sender, folder, expected_path): + """Test MailMessage.attachments_path property""" + mail_message = MailMessage( + sha256=b"test", content="test", sender=sender, folder=folder + ) + + result = mail_message.attachments_path + assert str(result) == f"{settings.FILE_STORAGE_DIR}/emails/{expected_path}" + + +@pytest.mark.parametrize( + "filename,expected", + [ + ("document.pdf", "document.pdf"), + ("file with spaces.txt", "file_with_spaces.txt"), + ("file@#$%^&*().doc", "file.doc"), + ("no-extension", "no_extension"), + ("multiple.dots.in.name.txt", "multiple_dots_in_name.txt"), + ], +) +def test_mail_message_safe_filename(tmp_path, filename, expected): + """Test MailMessage.safe_filename method""" + mail_message = MailMessage( + sha256=b"test", content="test", sender="user@example.com", folder="INBOX" + ) + + expected = settings.FILE_STORAGE_DIR / f"emails/user_example_com/INBOX/{expected}" + assert mail_message.safe_filename(filename) == expected + + +@pytest.mark.parametrize( + "sent_at,expected_date", + [ + (datetime(2023, 1, 1, 12, 0, 0), "2023-01-01T12:00:00"), + (None, None), + ], +) +def test_mail_message_as_payload(sent_at, expected_date): + """Test MailMessage.as_payload method""" + + mail_message = MailMessage( + sha256=b"test", + content="test", + message_id="", + subject="Test Subject", + sender="sender@example.com", + recipients=["recipient1@example.com", "recipient2@example.com"], + folder="INBOX", + sent_at=sent_at, + tags=["tag1", "tag2"], + size=1024, + ) + # Manually set id for testing + object.__setattr__(mail_message, "id", 123) + + payload = mail_message.as_payload() + + expected = { + "source_id": 123, + "size": 1024, + "message_id": "", + "subject": "Test Subject", + "sender": "sender@example.com", + "recipients": ["recipient1@example.com", "recipient2@example.com"], + "folder": "INBOX", + "tags": [ + "tag1", + "tag2", + "sender@example.com", + "recipient1@example.com", + "recipient2@example.com", + ], + "date": expected_date, + } + assert payload == expected + + +def test_mail_message_parsed_content(): + """Test MailMessage.parsed_content property with actual email parsing""" + # Use a simple email format that the parser can handle + email_content = """From: sender@example.com +To: recipient@example.com +Subject: Test Subject + +Test Body Content""" + + mail_message = MailMessage( + sha256=b"test", content=email_content, message_id="" + ) + + result = mail_message.parsed_content + + # Just test that it returns a dict-like object + assert isinstance(result, dict) + assert "body" in result + + +def test_mail_message_body_property(): + """Test MailMessage.body property with actual email parsing""" + email_content = """From: sender@example.com +To: recipient@example.com +Subject: Test Subject + +Test Body Content""" + + mail_message = MailMessage( + sha256=b"test", content=email_content, message_id="" + ) + + assert mail_message.body == "Test Body Content" + + +def test_mail_message_display_contents(): + """Test MailMessage.display_contents property with actual email parsing""" + email_content = """From: sender@example.com +To: recipient@example.com +Subject: Test Subject + +Test Body Content""" + + mail_message = MailMessage( + sha256=b"test", content=email_content, message_id="" + ) + + expected = ( + "\nSubject: Test Subject\nFrom: \nTo: \nDate: \nBody: \nTest Body Content\n" + ) + assert mail_message.display_contents == expected + + +@pytest.mark.parametrize( + "created_at,expected_date", + [ + (datetime(2023, 1, 1, 12, 0, 0), "2023-01-01T12:00:00"), + (None, None), + ], +) +def test_email_attachment_as_payload(created_at, expected_date): + """Test EmailAttachment.as_payload method""" + attachment = EmailAttachment( + sha256=b"test", + filename="document.pdf", + mime_type="application/pdf", + size=1024, + mail_message_id=123, + created_at=created_at, + tags=["pdf", "document"], + ) + # Manually set id for testing + object.__setattr__(attachment, "id", 456) + + payload = attachment.as_payload() + + expected = { + "source_id": 456, + "filename": "document.pdf", + "content_type": "application/pdf", + "size": 1024, + "created_at": expected_date, + "mail_message_id": 123, + "tags": ["pdf", "document"], + } + assert payload == expected + + +@pytest.mark.parametrize( + "has_filename,content_source,expected_content", + [ + (True, "file", b"test file content"), + (False, "content", "attachment content"), + ], +) +@patch("memory.common.extract.extract_data_chunks") +def test_email_attachment_data_chunks( + mock_extract, has_filename, content_source, expected_content, tmp_path +): + """Test EmailAttachment.data_chunks method""" + from memory.common.extract import DataChunk + + mock_extract.return_value = [ + DataChunk(data=["extracted text"], metadata={"source": content_source}) + ] + + if has_filename: + # Create a test file + test_file = tmp_path / "test.txt" + test_file.write_bytes(b"test file content") + attachment = EmailAttachment( + sha256=b"test", + filename=str(test_file), + mime_type="text/plain", + mail_message_id=123, + ) + else: + attachment = EmailAttachment( + sha256=b"test", + content="attachment content", + filename=None, + mime_type="text/plain", + mail_message_id=123, + ) + + # Mock _make_chunk to return a simple chunk + mock_chunk = Mock() + with patch.object(attachment, "_make_chunk", return_value=mock_chunk) as mock_make: + result = attachment.data_chunks({"extra": "metadata"}) + + # Verify the method calls + mock_extract.assert_called_once_with("text/plain", expected_content) + mock_make.assert_called_once_with( + extract.DataChunk(data=["extracted text"], metadata={"source": content_source}), + {"extra": "metadata"}, + ) + assert result == [mock_chunk] + + +def test_email_attachment_cascade_delete(db_session: Session): + """Test that EmailAttachment is deleted when MailMessage is deleted""" + mail_message = MailMessage( + sha256=b"test_email", + content="test email", + message_id="", + subject="Test", + sender="sender@example.com", + recipients=["recipient@example.com"], + folder="INBOX", + ) + db_session.add(mail_message) + db_session.commit() + + attachment = EmailAttachment( + sha256=b"test_attachment", + content="attachment content", + mail_message=mail_message, + filename="test.txt", + mime_type="text/plain", + size=100, + modality="attachment", # Set modality explicitly + ) + db_session.add(attachment) + db_session.commit() + + attachment_id = attachment.id + + # Delete the mail message + db_session.delete(mail_message) + db_session.commit() + + # Verify the attachment was also deleted + deleted_attachment = ( + db_session.query(EmailAttachment).filter_by(id=attachment_id).first() + ) + assert deleted_attachment is None + + +@pytest.mark.parametrize( + "pages,expected_chunks", + [ + # No pages + ([], []), + # Single page + (["Page 1 content"], [("Page 1 content", {"type": "page"})]), + # Multiple pages + ( + ["Page 1", "Page 2", "Page 3"], + [ + ( + "Page 1\n\nPage 2\n\nPage 3", + {"type": "section", "tags": {"tag1", "tag2"}}, + ), + ("test", {"type": "summary", "tags": {"tag1", "tag2"}}), + ], + ), + # Empty/whitespace pages filtered out + (["", " ", "Page 3"], [("Page 3", {"type": "page"})]), + # All empty - no chunks created + (["", " ", " "], []), + ], +) +def test_book_section_data_chunks(pages, expected_chunks): + """Test BookSection.data_chunks with various page combinations""" + content = "\n\n".join(pages).strip() + book_section = BookSection( + sha256=b"test_section", + content=content, + modality="book", + book_id=1, + start_page=10, + end_page=10 + len(pages), + pages=pages, + book=Book(id=1, title="Test Book", author="Test Author"), + ) + + chunks = book_section.data_chunks() + expected = [ + (c, merge_metadata(book_section.as_payload(), m)) for c, m in expected_chunks + ] + assert [(c.content, c.item_metadata) for c in chunks] == expected + for c in chunks: + assert cast(list, c.file_paths) == [] + + +@pytest.mark.parametrize( + "content,expected", + [ + ("", []), + ( + "Short content", + [ + extract.DataChunk( + data=["Short content"], metadata={"tags": ["tag1", "tag2"]} + ) + ], + ), + ( + "This is a very long piece of content that should be chunked into multiple pieces when processed.", + [ + extract.DataChunk( + data=[ + "This is a very long piece of content that should be chunked into multiple pieces when processed." + ], + metadata={"tags": ["tag1", "tag2"]}, + ), + extract.DataChunk( + data=["This is a very long piece of content that"], + metadata={"tags": ["tag1", "tag2"]}, + ), + extract.DataChunk( + data=["should be chunked into multiple pieces when"], + metadata={"tags": ["tag1", "tag2"]}, + ), + extract.DataChunk( + data=["processed."], + metadata={"tags": ["tag1", "tag2"]}, + ), + extract.DataChunk( + data=["test"], + metadata={"tags": ["tag1", "tag2"]}, + ), + ], + ), + ], +) +def test_blog_post_chunk_contents(content, expected, default_chunk_size): + default_chunk_size(10) + blog_post = BlogPost( + sha256=b"test_blog", + content=content, + modality="blog", + url="https://example.com/post", + images=[], + ) + + with patch.object(chunker, "DEFAULT_CHUNK_TOKENS", 10): + assert blog_post._chunk_contents() == expected + + +def test_blog_post_chunk_contents_with_images(tmp_path): + """Test BlogPost._chunk_contents with images""" + # Create test image files + img1_path = tmp_path / "img1.jpg" + img2_path = tmp_path / "img2.jpg" + for img_path in [img1_path, img2_path]: + img = Image.new("RGB", (10, 10), color="red") + img.save(img_path) + + blog_post = BlogPost( + sha256=b"test_blog", + content="Content with images", + modality="blog", + url="https://example.com/post", + images=[str(img1_path), str(img2_path)], + ) + + result = blog_post._chunk_contents() + result = [ + [i if isinstance(i, str) else getattr(i, "filename") for i in c.data] + for c in result + ] + assert result == [ + ["Content with images", img1_path.as_posix(), img2_path.as_posix()] + ] + + +def test_blog_post_chunk_contents_with_image_long_content(tmp_path, default_chunk_size): + default_chunk_size(10) + img1_path = tmp_path / "img1.jpg" + img2_path = tmp_path / "img2.jpg" + for img_path in [img1_path, img2_path]: + img = Image.new("RGB", (10, 10), color="red") + img.save(img_path) + + blog_post = BlogPost( + sha256=b"test_blog", + content=f"First picture is here: {img1_path.as_posix()}\nSecond picture is here: {img2_path.as_posix()}", + modality="blog", + url="https://example.com/post", + images=[str(img1_path), str(img2_path)], + ) + + with patch.object(chunker, "DEFAULT_CHUNK_TOKENS", 10): + result = blog_post._chunk_contents() + + result = [ + [i if isinstance(i, str) else getattr(i, "filename") for i in c.data] + for c in result + ] + assert result == [ + [ + f"First picture is here: {img1_path.as_posix()}\nSecond picture is here: {img2_path.as_posix()}", + img1_path.as_posix(), + img2_path.as_posix(), + ], + [ + f"First picture is here: {img1_path.as_posix()}", + img1_path.as_posix(), + ], + [ + f"Second picture is here: {img2_path.as_posix()}", + img2_path.as_posix(), + ], + ["test"], + ] + + +@pytest.mark.parametrize( + "metadata,expected_semantic_metadata,expected_temporal_metadata,observation_tags", + [ + ( + {}, + {"embedding_type": "semantic"}, + {"embedding_type": "temporal"}, + [], + ), + ( + {"extra_key": "extra_value"}, + {"extra_key": "extra_value", "embedding_type": "semantic"}, + {"extra_key": "extra_value", "embedding_type": "temporal"}, + [], + ), + ( + {"tags": ["existing_tag"], "source": "test"}, + {"tags": {"existing_tag"}, "source": "test", "embedding_type": "semantic"}, + {"tags": {"existing_tag"}, "source": "test", "embedding_type": "temporal"}, + [], + ), + ], +) +def test_agent_observation_data_chunks( + metadata, expected_semantic_metadata, expected_temporal_metadata, observation_tags +): + """Test AgentObservation.data_chunks generates correct chunks with proper metadata""" + observation = AgentObservation( + sha256=b"test_obs", + content="User prefers Python over JavaScript", + subject="programming preferences", + observation_type="preference", + confidence=0.9, + evidence={ + "quote": "I really like Python", + "context": "discussion about languages", + }, + agent_model="claude-3.5-sonnet", + session_id=uuid.uuid4(), + tags=observation_tags, + ) + # Set inserted_at using object.__setattr__ to bypass SQLAlchemy restrictions + object.__setattr__(observation, "inserted_at", datetime(2023, 1, 1, 12, 0, 0)) + + result = observation.data_chunks(metadata) + + # Verify chunks + assert len(result) == 2 + + semantic_chunk = result[0] + expected_semantic_text = "Subject: programming preferences | Type: preference | Observation: User prefers Python over JavaScript | Quote: I really like Python | Context: discussion about languages" + assert semantic_chunk.data == [expected_semantic_text] + assert semantic_chunk.metadata == expected_semantic_metadata + assert semantic_chunk.collection_name == "semantic" + + temporal_chunk = result[1] + expected_temporal_text = "Time: 12:00 on Sunday (afternoon) | Subject: programming preferences | Observation: User prefers Python over JavaScript | Confidence: 0.9" + assert temporal_chunk.data == [expected_temporal_text] + assert temporal_chunk.metadata == expected_temporal_metadata + assert temporal_chunk.collection_name == "temporal" + + +def test_agent_observation_data_chunks_with_none_values(): + """Test AgentObservation.data_chunks handles None values correctly""" + observation = AgentObservation( + sha256=b"test_obs", + content="Content", + subject="subject", + observation_type="belief", + confidence=0.7, + evidence=None, + agent_model="gpt-4", + session_id=None, + ) + object.__setattr__(observation, "inserted_at", datetime(2023, 2, 15, 9, 30, 0)) + + result = observation.data_chunks() + + assert len(result) == 2 + assert result[0].collection_name == "semantic" + assert result[1].collection_name == "temporal" + + # Verify content with None evidence + semantic_text = "Subject: subject | Type: belief | Observation: Content" + assert result[0].data == [semantic_text] + + temporal_text = "Time: 09:30 on Wednesday (morning) | Subject: subject | Observation: Content | Confidence: 0.7" + assert result[1].data == [temporal_text] + + +def test_agent_observation_data_chunks_merge_metadata_behavior(): + """Test that merge_metadata works correctly in data_chunks""" + observation = AgentObservation( + sha256=b"test", + content="test", + subject="test", + observation_type="test", + confidence=0.8, + evidence={}, + agent_model="test", + tags=["base_tag"], # Set base tags so they appear in both chunks + ) + object.__setattr__(observation, "inserted_at", datetime.now()) + + # Test that metadata merging preserves original values and adds new ones + input_metadata = {"existing": "value", "tags": ["tag1"]} + result = observation.data_chunks(input_metadata) + + semantic_metadata = result[0].metadata + temporal_metadata = result[1].metadata + + # Both should have the existing metadata plus embedding_type + assert semantic_metadata["existing"] == "value" + assert semantic_metadata["tags"] == {"tag1"} # Merged tags + assert semantic_metadata["embedding_type"] == "semantic" + + assert temporal_metadata["existing"] == "value" + assert temporal_metadata["tags"] == {"tag1"} # Merged tags + assert temporal_metadata["embedding_type"] == "temporal" diff --git a/tests/memory/common/formatters/test_observation.py b/tests/memory/common/formatters/test_observation.py new file mode 100644 index 0000000..044aa5a --- /dev/null +++ b/tests/memory/common/formatters/test_observation.py @@ -0,0 +1,220 @@ +import pytest +from datetime import datetime +from typing import Any + +from memory.common.formatters.observation import ( + Evidence, + generate_semantic_text, + generate_temporal_text, +) + + +def test_generate_semantic_text_basic_functionality(): + evidence: Evidence = {"quote": "test quote", "context": "test context"} + result = generate_semantic_text( + subject="test_subject", + observation_type="test_type", + content="test_content", + evidence=evidence, + ) + assert ( + result + == "Subject: test_subject | Type: test_type | Observation: test_content | Quote: test quote | Context: test context" + ) + + +@pytest.mark.parametrize( + "evidence,expected_suffix", + [ + ({"quote": "test quote"}, " | Quote: test quote"), + ({"context": "test context"}, " | Context: test context"), + ({}, ""), + ], +) +def test_generate_semantic_text_partial_evidence( + evidence: dict[str, str], expected_suffix: str +): + result = generate_semantic_text( + subject="subject", + observation_type="type", + content="content", + evidence=evidence, # type: ignore + ) + expected = f"Subject: subject | Type: type | Observation: content{expected_suffix}" + assert result == expected + + +def test_generate_semantic_text_none_evidence(): + result = generate_semantic_text( + subject="subject", + observation_type="type", + content="content", + evidence=None, # type: ignore + ) + assert result == "Subject: subject | Type: type | Observation: content" + + +@pytest.mark.parametrize( + "invalid_evidence", + [ + "string", + 123, + ["list"], + True, + ], +) +def test_generate_semantic_text_invalid_evidence_types(invalid_evidence: Any): + result = generate_semantic_text( + subject="subject", + observation_type="type", + content="content", + evidence=invalid_evidence, # type: ignore + ) + assert result == "Subject: subject | Type: type | Observation: content" + + +def test_generate_semantic_text_empty_strings(): + evidence = {"quote": "", "context": ""} + result = generate_semantic_text( + subject="", + observation_type="", + content="", + evidence=evidence, # type: ignore + ) + assert result == "Subject: | Type: | Observation: | Quote: | Context: " + + +def test_generate_semantic_text_special_characters(): + evidence: Evidence = { + "quote": "Quote with | pipe and | symbols", + "context": "Context with special chars: @#$%", + } + result = generate_semantic_text( + subject="Subject with | pipe", + observation_type="Type with | pipe", + content="Content with | pipe", + evidence=evidence, + ) + expected = "Subject: Subject with | pipe | Type: Type with | pipe | Observation: Content with | pipe | Quote: Quote with | pipe and | symbols | Context: Context with special chars: @#$%" + assert result == expected + + +@pytest.mark.parametrize( + "hour,expected_period", + [ + (5, "morning"), + (6, "morning"), + (11, "morning"), + (12, "afternoon"), + (13, "afternoon"), + (16, "afternoon"), + (17, "evening"), + (18, "evening"), + (21, "evening"), + (22, "late_night"), + (23, "late_night"), + (0, "late_night"), + (1, "late_night"), + (4, "late_night"), + ], +) +def test_generate_temporal_text_time_periods(hour: int, expected_period: str): + test_date = datetime(2024, 1, 15, hour, 30) # Monday + result = generate_temporal_text( + subject="test_subject", + content="test_content", + confidence=0.8, + created_at=test_date, + ) + time_str = test_date.strftime("%H:%M") + expected = f"Time: {time_str} on Monday ({expected_period}) | Subject: test_subject | Observation: test_content | Confidence: 0.8" + assert result == expected + + +@pytest.mark.parametrize( + "weekday,day_name", + [ + (0, "Monday"), + (1, "Tuesday"), + (2, "Wednesday"), + (3, "Thursday"), + (4, "Friday"), + (5, "Saturday"), + (6, "Sunday"), + ], +) +def test_generate_temporal_text_days_of_week(weekday: int, day_name: str): + test_date = datetime(2024, 1, 15 + weekday, 10, 30) + result = generate_temporal_text( + subject="subject", content="content", confidence=0.5, created_at=test_date + ) + assert f"on {day_name}" in result + + +@pytest.mark.parametrize("confidence", [0.0, 0.1, 0.5, 0.99, 1.0]) +def test_generate_temporal_text_confidence_values(confidence: float): + test_date = datetime(2024, 1, 15, 10, 30) + result = generate_temporal_text( + subject="subject", + content="content", + confidence=confidence, + created_at=test_date, + ) + assert f"Confidence: {confidence}" in result + + +@pytest.mark.parametrize( + "test_date,expected_period", + [ + (datetime(2024, 1, 15, 5, 0), "morning"), # Start of morning + (datetime(2024, 1, 15, 11, 59), "morning"), # End of morning + (datetime(2024, 1, 15, 12, 0), "afternoon"), # Start of afternoon + (datetime(2024, 1, 15, 16, 59), "afternoon"), # End of afternoon + (datetime(2024, 1, 15, 17, 0), "evening"), # Start of evening + (datetime(2024, 1, 15, 21, 59), "evening"), # End of evening + (datetime(2024, 1, 15, 22, 0), "late_night"), # Start of late_night + (datetime(2024, 1, 15, 4, 59), "late_night"), # End of late_night + ], +) +def test_generate_temporal_text_boundary_cases( + test_date: datetime, expected_period: str +): + result = generate_temporal_text( + subject="subject", content="content", confidence=0.8, created_at=test_date + ) + assert f"({expected_period})" in result + + +def test_generate_temporal_text_complete_format(): + test_date = datetime(2024, 3, 22, 14, 45) # Friday afternoon + result = generate_temporal_text( + subject="Important observation", + content="User showed strong preference for X", + confidence=0.95, + created_at=test_date, + ) + expected = "Time: 14:45 on Friday (afternoon) | Subject: Important observation | Observation: User showed strong preference for X | Confidence: 0.95" + assert result == expected + + +def test_generate_temporal_text_empty_strings(): + test_date = datetime(2024, 1, 15, 10, 30) + result = generate_temporal_text( + subject="", content="", confidence=0.0, created_at=test_date + ) + assert ( + result + == "Time: 10:30 on Monday (morning) | Subject: | Observation: | Confidence: 0.0" + ) + + +def test_generate_temporal_text_special_characters(): + test_date = datetime(2024, 1, 15, 15, 20) + result = generate_temporal_text( + subject="Subject with | pipe", + content="Content with | pipe and @#$ symbols", + confidence=0.75, + created_at=test_date, + ) + expected = "Time: 15:20 on Monday (afternoon) | Subject: Subject with | pipe | Observation: Content with | pipe and @#$ symbols | Confidence: 0.75" + assert result == expected