email attachments tables

This commit is contained in:
Daniel O'Connell 2025-04-27 21:14:59 +02:00
parent 14aa6ff9be
commit 889df318a1
5 changed files with 234 additions and 140 deletions

View File

@ -20,10 +20,12 @@ depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.execute('CREATE EXTENSION IF NOT EXISTS pgcrypto')
op.execute("CREATE EXTENSION IF NOT EXISTS pgcrypto")
# Create enum type for github_item with IF NOT EXISTS
op.execute("DO $$ BEGIN CREATE TYPE gh_item_kind AS ENUM ('issue','pr','comment','project_card'); EXCEPTION WHEN duplicate_object THEN NULL; END $$;")
op.execute(
"DO $$ BEGIN CREATE TYPE gh_item_kind AS ENUM ('issue','pr','comment','project_card'); EXCEPTION WHEN duplicate_object THEN NULL; END $$;"
)
op.create_table(
"email_accounts",
@ -161,14 +163,32 @@ def upgrade() -> None:
sa.Column("recipients", sa.ARRAY(sa.Text()), nullable=True),
sa.Column("sent_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("body_raw", sa.Text(), nullable=True),
sa.Column(
"attachments", postgresql.JSONB(astext_type=sa.Text()), nullable=True
),
sa.Column("folder", sa.Text(), nullable=True),
sa.Column("tsv", postgresql.TSVECTOR(), nullable=True),
sa.ForeignKeyConstraint(["source_id"], ["source_item.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("message_id"),
)
op.create_table(
"email_attachment",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("mail_message_id", sa.BigInteger(), nullable=False),
sa.Column("filename", sa.Text(), nullable=False),
sa.Column("content_type", sa.Text(), nullable=True),
sa.Column("size", sa.Integer(), nullable=True),
sa.Column("content", postgresql.BYTEA(), nullable=True),
sa.Column("file_path", sa.Text(), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=True,
),
sa.ForeignKeyConstraint(
["mail_message_id"], ["mail_message.id"], ondelete="CASCADE"
),
sa.PrimaryKeyConstraint("id"),
)
op.create_table(
"misc_doc",
sa.Column("id", sa.BigInteger(), nullable=False),
@ -219,18 +239,18 @@ def upgrade() -> None:
op.create_check_constraint(
"github_item_kind_check",
"github_item",
"kind IN ('issue', 'pr', 'comment', 'project_card')"
"kind IN ('issue', 'pr', 'comment', 'project_card')",
)
# Add missing constraint to source_item
op.create_check_constraint(
"source_item_embed_status_check",
"source_item",
"embed_status IN ('RAW','QUEUED','STORED','FAILED')"
"embed_status IN ('RAW','QUEUED','STORED','FAILED')",
)
# Create trigger function for vector_ids validation
op.execute('''
op.execute("""
CREATE OR REPLACE FUNCTION trg_vector_ids_not_empty()
RETURNS TRIGGER LANGUAGE plpgsql AS $$
BEGIN
@ -242,51 +262,65 @@ def upgrade() -> None:
RETURN NEW;
END;
$$;
''')
""")
# Create trigger
op.execute('''
op.execute("""
CREATE TRIGGER check_vector_ids
BEFORE UPDATE ON source_item
FOR EACH ROW EXECUTE FUNCTION trg_vector_ids_not_empty();
''')
""")
# Create indexes for source_item
op.create_index('source_modality_idx', 'source_item', ['modality'])
op.create_index('source_status_idx', 'source_item', ['embed_status'])
op.create_index('source_tags_idx', 'source_item', ['tags'], postgresql_using='gin')
op.create_index("source_modality_idx", "source_item", ["modality"])
op.create_index("source_status_idx", "source_item", ["embed_status"])
op.create_index("source_tags_idx", "source_item", ["tags"], postgresql_using="gin")
# Create indexes for mail_message
op.create_index('mail_sent_idx', 'mail_message', ['sent_at'])
op.create_index('mail_recipients_idx', 'mail_message', ['recipients'], postgresql_using='gin')
op.create_index('mail_tsv_idx', 'mail_message', ['tsv'], postgresql_using='gin')
op.create_index("mail_sent_idx", "mail_message", ["sent_at"])
op.create_index(
"mail_recipients_idx", "mail_message", ["recipients"], postgresql_using="gin"
)
op.create_index("email_attachment_filename_idx", "email_attachment", ["filename"], unique=False)
op.create_index("email_attachment_message_idx", "email_attachment", ["mail_message_id"], unique=False)
op.create_index("mail_tsv_idx", "mail_message", ["tsv"], postgresql_using="gin")
# Create index for chat_message
op.create_index('chat_channel_idx', 'chat_message', ['platform', 'channel_id'])
op.create_index("chat_channel_idx", "chat_message", ["platform", "channel_id"])
# Create indexes for git_commit
op.create_index('git_files_idx', 'git_commit', ['files_changed'], postgresql_using='gin')
op.create_index('git_date_idx', 'git_commit', ['author_date'])
op.create_index(
"git_files_idx", "git_commit", ["files_changed"], postgresql_using="gin"
)
op.create_index("git_date_idx", "git_commit", ["author_date"])
# Create index for photo
op.create_index('photo_taken_idx', 'photo', ['exif_taken_at'])
op.create_index("photo_taken_idx", "photo", ["exif_taken_at"])
# Create indexes for rss_feeds
op.create_index('rss_feeds_active_idx', 'rss_feeds', ['active', 'last_checked_at'])
op.create_index('rss_feeds_tags_idx', 'rss_feeds', ['tags'], postgresql_using='gin')
op.create_index("rss_feeds_active_idx", "rss_feeds", ["active", "last_checked_at"])
op.create_index("rss_feeds_tags_idx", "rss_feeds", ["tags"], postgresql_using="gin")
# Create indexes for email_accounts
op.create_index('email_accounts_address_idx', 'email_accounts', ['email_address'], unique=True)
op.create_index('email_accounts_active_idx', 'email_accounts', ['active', 'last_sync_at'])
op.create_index('email_accounts_tags_idx', 'email_accounts', ['tags'], postgresql_using='gin')
op.create_index(
"email_accounts_address_idx", "email_accounts", ["email_address"], unique=True
)
op.create_index(
"email_accounts_active_idx", "email_accounts", ["active", "last_sync_at"]
)
op.create_index(
"email_accounts_tags_idx", "email_accounts", ["tags"], postgresql_using="gin"
)
# Create indexes for github_item
op.create_index('gh_repo_kind_idx', 'github_item', ['repo_path', 'kind'])
op.create_index('gh_issue_lookup_idx', 'github_item', ['repo_path', 'kind', 'number'])
op.create_index('gh_labels_idx', 'github_item', ['labels'], postgresql_using='gin')
op.create_index("gh_repo_kind_idx", "github_item", ["repo_path", "kind"])
op.create_index(
"gh_issue_lookup_idx", "github_item", ["repo_path", "kind", "number"]
)
op.create_index("gh_labels_idx", "github_item", ["labels"], postgresql_using="gin")
# Create add_tags helper function
op.execute('''
op.execute("""
CREATE OR REPLACE FUNCTION add_tags(p_source BIGINT, p_tags TEXT[])
RETURNS VOID LANGUAGE SQL AS $$
UPDATE source_item
@ -294,30 +328,31 @@ def upgrade() -> None:
(SELECT ARRAY(SELECT DISTINCT unnest(tags || p_tags)))
WHERE id = p_source;
$$;
''')
""")
def downgrade() -> None:
# Drop indexes
op.drop_index('gh_tsv_idx', table_name='github_item')
op.drop_index('gh_labels_idx', table_name='github_item')
op.drop_index('gh_issue_lookup_idx', table_name='github_item')
op.drop_index('gh_repo_kind_idx', table_name='github_item')
op.drop_index('email_accounts_tags_idx', table_name='email_accounts')
op.drop_index('email_accounts_active_idx', table_name='email_accounts')
op.drop_index('email_accounts_address_idx', table_name='email_accounts')
op.drop_index('rss_feeds_tags_idx', table_name='rss_feeds')
op.drop_index('rss_feeds_active_idx', table_name='rss_feeds')
op.drop_index('photo_taken_idx', table_name='photo')
op.drop_index('git_date_idx', table_name='git_commit')
op.drop_index('git_files_idx', table_name='git_commit')
op.drop_index('chat_channel_idx', table_name='chat_message')
op.drop_index('mail_tsv_idx', table_name='mail_message')
op.drop_index('mail_recipients_idx', table_name='mail_message')
op.drop_index('mail_sent_idx', table_name='mail_message')
op.drop_index('source_tags_idx', table_name='source_item')
op.drop_index('source_status_idx', table_name='source_item')
op.drop_index('source_modality_idx', table_name='source_item')
op.drop_index("gh_labels_idx", table_name="github_item")
op.drop_index("gh_issue_lookup_idx", table_name="github_item")
op.drop_index("gh_repo_kind_idx", table_name="github_item")
op.drop_index("email_accounts_tags_idx", table_name="email_accounts")
op.drop_index("email_accounts_active_idx", table_name="email_accounts")
op.drop_index("email_accounts_address_idx", table_name="email_accounts")
op.drop_index("rss_feeds_tags_idx", table_name="rss_feeds")
op.drop_index("rss_feeds_active_idx", table_name="rss_feeds")
op.drop_index("photo_taken_idx", table_name="photo")
op.drop_index("git_date_idx", table_name="git_commit")
op.drop_index("git_files_idx", table_name="git_commit")
op.drop_index("chat_channel_idx", table_name="chat_message")
op.drop_index("mail_tsv_idx", table_name="mail_message")
op.drop_index("mail_recipients_idx", table_name="mail_message")
op.drop_index("mail_sent_idx", table_name="mail_message")
op.drop_index("email_attachment_message_idx", table_name="email_attachment")
op.drop_index("email_attachment_filename_idx", table_name="email_attachment")
op.drop_index("source_tags_idx", table_name="source_item")
op.drop_index("source_status_idx", table_name="source_item")
op.drop_index("source_modality_idx", table_name="source_item")
# Drop tables
op.drop_table("photo")
@ -328,6 +363,7 @@ def downgrade() -> None:
op.drop_table("book_doc")
op.drop_table("blog_post")
op.drop_table("github_item")
op.drop_table("email_attachment")
op.drop_table("source_item")
op.drop_table("rss_feeds")
op.drop_table("email_accounts")

View File

@ -1,13 +1,15 @@
"""
Database models for the knowledge base system.
"""
from pathlib import Path
from sqlalchemy import (
Column, ForeignKey, Integer, BigInteger, Text, DateTime, Boolean,
ARRAY, func, Numeric, CheckConstraint, Index
)
from sqlalchemy.dialects.postgresql import BYTEA, JSONB, TSVECTOR
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from memory.common import settings
Base = declarative_base()
@ -47,9 +49,15 @@ class MailMessage(Base):
recipients = Column(ARRAY(Text))
sent_at = Column(DateTime(timezone=True))
body_raw = Column(Text)
attachments = Column(JSONB)
folder = Column(Text)
tsv = Column(TSVECTOR)
attachments = relationship("EmailAttachment", back_populates="mail_message", cascade="all, delete-orphan")
@property
def attachments_path(self) -> Path:
return Path(settings.FILE_STORAGE_DIR) / self.sender / (self.folder or 'INBOX')
# Add indexes
__table_args__ = (
Index('mail_sent_idx', 'sent_at'),
@ -58,6 +66,27 @@ class MailMessage(Base):
)
class EmailAttachment(Base):
__tablename__ = 'email_attachment'
id = Column(BigInteger, primary_key=True)
mail_message_id = Column(BigInteger, ForeignKey('mail_message.id', ondelete='CASCADE'), nullable=False)
filename = Column(Text, nullable=False)
content_type = Column(Text)
size = Column(Integer)
content = Column(BYTEA) # For small files stored inline
file_path = Column(Text) # For larger files stored on disk
created_at = Column(DateTime(timezone=True), server_default=func.now())
mail_message = relationship("MailMessage", back_populates="attachments")
# Add indexes
__table_args__ = (
Index('email_attachment_message_idx', 'mail_message_id'),
Index('email_attachment_filename_idx', 'filename'),
)
class ChatMessage(Base):
__tablename__ = 'chat_message'

View File

@ -12,7 +12,7 @@ from typing import Generator, Callable, TypedDict, Literal
import pathlib
from sqlalchemy.orm import Session
from memory.common.db.models import EmailAccount, MailMessage, SourceItem
from memory.common.db.models import EmailAccount, MailMessage, SourceItem, EmailAttachment
from memory.common import settings
logger = logging.getLogger(__name__)
@ -141,7 +141,7 @@ def extract_attachments(msg: email.message.Message) -> list[Attachment]:
return attachments
def process_attachment(attachment: Attachment, message_id: str) -> Attachment | None:
def process_attachment(attachment: Attachment, message: MailMessage) -> EmailAttachment | None:
"""Process an attachment, storing large files on disk and returning metadata.
Args:
@ -151,33 +151,33 @@ def process_attachment(attachment: Attachment, message_id: str) -> Attachment |
Returns:
Processed attachment dictionary with appropriate metadata
"""
if not (content := attachment.get("content")):
return attachment
content, file_path = None, None
if not (real_content := attachment.get("content")):
"No content, so just save the metadata"
elif attachment["size"] <= settings.MAX_INLINE_ATTACHMENT_SIZE:
content = base64.b64encode(real_content)
else:
safe_filename = re.sub(r'[/\\]', '_', attachment["filename"])
user_dir = message.attachments_path
user_dir.mkdir(parents=True, exist_ok=True)
file_path = user_dir / safe_filename
try:
file_path.write_bytes(real_content)
except Exception as e:
logger.error(f"Failed to save attachment {safe_filename} to disk: {str(e)}")
return None
if attachment["size"] <= settings.MAX_INLINE_ATTACHMENT_SIZE:
return {**attachment, "content": base64.b64encode(content).decode('utf-8')}
safe_message_id = re.sub(r'[<>\s:/\\]', '_', message_id)
unique_id = str(uuid.uuid4())[:8]
safe_filename = re.sub(r'[/\\]', '_', attachment["filename"])
# Create user subdirectory
user_dir = settings.FILE_STORAGE_DIR / safe_message_id
user_dir.mkdir(parents=True, exist_ok=True)
# Final path for the attachment
file_path = user_dir / f"{unique_id}_{safe_filename}"
# Write the file
try:
file_path.write_bytes(content)
return {**attachment, "path": file_path}
except Exception as e:
logger.error(f"Failed to save attachment {safe_filename} to disk: {str(e)}")
return None
return EmailAttachment(
mail_message_id=message.id,
filename=attachment["filename"],
content_type=attachment.get("content_type"),
size=attachment.get("size"),
content=content,
file_path=file_path and str(file_path),
)
def process_attachments(attachments: list[Attachment], message_id: str) -> list[Attachment]:
def process_attachments(attachments: list[Attachment], message: MailMessage) -> list[EmailAttachment]:
"""
Process email attachments, storing large files on disk and returning metadata.
@ -193,7 +193,7 @@ def process_attachments(attachments: list[Attachment], message_id: str) -> list[
return [
attachment
for a in attachments if (attachment := process_attachment(a, message_id))
for a in attachments if (attachment := process_attachment(a, message))
]
@ -275,7 +275,7 @@ def create_mail_message(
folder: str,
) -> MailMessage:
"""
Create a new mail message record.
Create a new mail message record and associated attachments.
Args:
db_session: Database session
@ -286,12 +286,6 @@ def create_mail_message(
Returns:
Newly created MailMessage
"""
processed_attachments = process_attachments(
parsed_email["attachments"],
parsed_email["message_id"]
)
print("processed_attachments", processed_attachments)
mail_message = MailMessage(
source_id=source_id,
message_id=parsed_email["message_id"],
@ -300,9 +294,15 @@ def create_mail_message(
recipients=parsed_email["recipients"],
sent_at=parsed_email["sent_at"],
body_raw=parsed_email["body"],
attachments={"items": processed_attachments, "folder": folder}
folder=folder,
)
db_session.add(mail_message)
db_session.flush()
if parsed_email["attachments"]:
processed_attachments = process_attachments(parsed_email["attachments"], mail_message)
db_session.add_all(processed_attachments)
return mail_message

View File

@ -1,7 +1,7 @@
import pytest
from datetime import datetime, timedelta
from memory.common.db.models import EmailAccount, MailMessage, SourceItem
from memory.common.db.models import EmailAccount, MailMessage, SourceItem, EmailAttachment
from memory.workers.tasks.email import process_message
@ -82,7 +82,7 @@ def test_process_simple_email(db_session, test_email_account):
assert mail_message.sender == "alice@example.com"
assert "bob@example.com" in mail_message.recipients
assert "This is test email 1" in mail_message.body_raw
assert mail_message.attachments.get("folder") == "INBOX"
assert mail_message.folder == "INBOX"
def test_process_email_with_attachment(db_session, test_email_account):
@ -96,19 +96,24 @@ def test_process_email_with_attachment(db_session, test_email_account):
assert source_id is not None
# Check mail message specifics and attachment
# Check mail message specifics
mail_message = db_session.query(MailMessage).filter(MailMessage.source_id == source_id).first()
assert mail_message is not None
assert mail_message.subject == "Email with Attachment"
assert mail_message.sender == "eve@example.com"
assert "This email has an attachment" in mail_message.body_raw
assert mail_message.attachments.get("folder") == "Archive"
assert mail_message.folder == "Archive"
# Check attachments were processed
attachment_items = mail_message.attachments.get("items", [])
assert len(attachment_items) > 0
assert attachment_items[0]["filename"] == "test.txt"
assert attachment_items[0]["content_type"] == "text/plain"
# Check attachments were processed and stored in the EmailAttachment table
attachments = db_session.query(EmailAttachment).filter(
EmailAttachment.mail_message_id == mail_message.id
).all()
assert len(attachments) > 0
assert attachments[0].filename == "test.txt"
assert attachments[0].content_type == "text/plain"
# Either content or file_path should be set
assert attachments[0].content is not None or attachments[0].file_path is not None
def test_process_empty_message(db_session, test_email_account):

View File

@ -4,14 +4,12 @@ import email.mime.text
import email.mime.base
import base64
import pathlib
import re
from datetime import datetime
from email.utils import formatdate
from unittest.mock import ANY, MagicMock, patch
import pytest
from memory.common.db.models import SourceItem
from memory.common.db.models import MailMessage, EmailAccount
from memory.common.db.models import SourceItem, MailMessage, EmailAttachment, EmailAccount
from memory.common import settings
from memory.workers.email import (
compute_message_hash,
@ -224,17 +222,23 @@ def test_process_attachment_inline(attachment_size, max_inline_size, message_id)
"size": attachment_size,
"content": b"a" * attachment_size,
}
message = MailMessage(
id=1,
message_id=message_id,
sender="sender@example.com",
folder="INBOX",
)
with patch.object(settings, "MAX_INLINE_ATTACHMENT_SIZE", max_inline_size):
result = process_attachment(attachment, message_id)
result = process_attachment(attachment, message)
assert result is not None
# For inline attachments, content should be base64 encoded string
assert isinstance(result["content"], str)
assert isinstance(result.content, bytes)
# Decode the base64 string and compare with the original content
decoded_content = base64.b64decode(result["content"].encode('utf-8'))
decoded_content = base64.b64decode(result.content)
assert decoded_content == attachment["content"]
assert "path" not in result
assert result.file_path is None
@pytest.mark.parametrize(
@ -253,18 +257,18 @@ def test_process_attachment_disk(attachment_size, max_inline_size, message_id):
"size": attachment_size,
"content": b"a" * attachment_size,
}
message = MailMessage(
id=1,
message_id=message_id,
sender="sender@example.com",
folder="INBOX",
)
with patch.object(settings, "MAX_INLINE_ATTACHMENT_SIZE", max_inline_size):
result = process_attachment(attachment, message_id)
result = process_attachment(attachment, message)
assert result is not None
# For disk-stored attachments, content should not be modified and path should be set
assert "path" in result
assert isinstance(result["path"], pathlib.Path)
# Verify the path contains safe message ID
safe_message_id = re.sub(r"[<>\s:/\\]", "_", message_id)
assert safe_message_id in str(result["path"])
assert not result.content
assert result.file_path == str(settings.FILE_STORAGE_DIR / "sender@example.com" / "INBOX" / "test.txt")
def test_process_attachment_write_error():
@ -275,6 +279,12 @@ def test_process_attachment_write_error():
"size": 100,
"content": b"a" * 100,
}
message = MailMessage(
id=1,
message_id="<test@example.com>",
sender="sender@example.com",
folder="INBOX",
)
# Mock write_bytes to raise an exception
def mock_write_bytes(self, content):
@ -284,7 +294,7 @@ def test_process_attachment_write_error():
patch.object(settings, "MAX_INLINE_ATTACHMENT_SIZE", 10),
patch.object(pathlib.Path, "write_bytes", mock_write_bytes),
):
assert process_attachment(attachment, "<test@example.com>") is None
assert process_attachment(attachment, message) is None
def test_process_attachments_empty():
@ -316,20 +326,26 @@ def test_process_attachments_mixed():
"content": b"c" * 30,
},
]
message = MailMessage(
id=1,
message_id="<test@example.com>",
sender="sender@example.com",
folder="INBOX",
)
with patch.object(settings, "MAX_INLINE_ATTACHMENT_SIZE", 50):
# Process attachments
results = process_attachments(attachments, "<test@example.com>")
results = process_attachments(attachments, message)
# Verify we have all attachments processed
assert len(results) == 3
# Verify small attachments are base64 encoded
assert isinstance(results[0]["content"], str)
assert isinstance(results[2]["content"], str)
assert isinstance(results[0].content, bytes)
assert isinstance(results[2].content, bytes)
# Verify large attachment has a path
assert "path" in results[1]
assert results[1].file_path is not None
@pytest.mark.parametrize(
@ -520,7 +536,16 @@ def test_check_message_exists(
def test_create_mail_message(db_session):
source_id = 1
source_item = SourceItem(
id=1,
modality="mail",
sha256=b"test_hash_bytes" + bytes(28),
tags=["test"],
byte_length=100,
)
db_session.add(source_item)
db_session.flush()
source_id = source_item.id
parsed_email = {
"message_id": "<test@example.com>",
"subject": "Test Subject",
@ -542,6 +567,8 @@ def test_create_mail_message(db_session):
folder=folder,
)
attachments = db_session.query(EmailAttachment).filter(EmailAttachment.mail_message_id == mail_message.id).all()
# Verify the mail message was created correctly
assert isinstance(mail_message, MailMessage)
assert mail_message.source_id == source_id
@ -551,10 +578,7 @@ def test_create_mail_message(db_session):
assert mail_message.recipients == parsed_email["recipients"]
assert mail_message.sent_at == parsed_email["sent_at"]
assert mail_message.body_raw == parsed_email["body"]
assert mail_message.attachments == {
"items": parsed_email["attachments"],
"folder": folder,
}
assert mail_message.attachments == attachments
def test_fetch_email(email_provider):