memory/tests/providers/email_provider.py
Daniel O'Connell 03b5c908ee email provider
2025-04-27 19:03:50 +02:00

184 lines
7.6 KiB
Python

import email
from datetime import datetime
from typing import Any
class MockEmailProvider:
"""
Mock IMAP email provider for integration testing.
Can be initialized with predefined emails to return.
"""
def __init__(self, emails_by_folder: dict[str, list[dict[str, Any]]] = None):
"""
Initialize with a dictionary of emails organized by folder.
Args:
emails_by_folder: A dictionary mapping folder names to lists of email dictionaries.
Each email dict should have: 'uid', 'flags', 'date', 'from', 'to', 'subject',
'message_id', 'body', and optionally 'attachments'.
"""
self.emails_by_folder = emails_by_folder or {
"INBOX": [],
"Sent": [],
"Archive": []
}
self.current_folder = None
self.is_connected = False
def _generate_email_string(self, email_data: dict[str, Any]) -> str:
"""Generate a raw email string from the provided email data."""
msg = email.message.EmailMessage()
msg["From"] = email_data.get("from", "sender@example.com")
msg["To"] = email_data.get("to", "recipient@example.com")
msg["Subject"] = email_data.get("subject", "Test Subject")
msg["Message-ID"] = email_data.get("message_id", f"<test-{email_data['uid']}@example.com>")
msg["Date"] = email_data.get("date", datetime.now().strftime("%a, %d %b %Y %H:%M:%S +0000"))
# Set the body content
msg.set_content(email_data.get("body", f"This is test email body {email_data['uid']}"))
# Add attachments if present
for attachment in email_data.get("attachments", []):
if isinstance(attachment, dict) and "filename" in attachment and "content" in attachment:
msg.add_attachment(
attachment["content"],
maintype=attachment.get("maintype", "application"),
subtype=attachment.get("subtype", "octet-stream"),
filename=attachment["filename"]
)
return msg.as_string()
def login(self, username: str, password: str) -> tuple[str, list[bytes]]:
"""Mock login method."""
self.is_connected = True
return ('OK', [b'Login successful'])
def logout(self) -> tuple[str, list[bytes]]:
"""Mock logout method."""
self.is_connected = False
return ('OK', [b'Logout successful'])
def select(self, folder: str, readonly: bool = False) -> tuple[str, list[bytes]]:
"""
Select a folder and make it the current active folder.
Args:
folder: Folder name to select
readonly: Whether to open in readonly mode
Returns:
IMAP-style response with message count
"""
folder_name = folder.decode() if isinstance(folder, bytes) else folder
self.current_folder = folder_name
message_count = len(self.emails_by_folder.get(folder_name, []))
return ('OK', [str(message_count).encode()])
def list(self, directory: str = '', pattern: str = '*') -> tuple[str, list[bytes]]:
"""List available folders."""
folders = []
for folder in self.emails_by_folder.keys():
folders.append(f'(\\HasNoChildren) "/" "{folder}"'.encode())
return ('OK', folders)
def search(self, charset, criteria):
"""
Handle SEARCH command to find email UIDs.
Args:
charset: Character set (ignored in mock)
criteria: Search criteria (ignored in mock, we return all emails)
Returns:
All email UIDs in the current folder
"""
if not self.current_folder or self.current_folder not in self.emails_by_folder:
return ('OK', [b''])
uids = [str(email["uid"]).encode() for email in self.emails_by_folder[self.current_folder]]
return ('OK', [b' '.join(uids) if uids else b''])
def fetch(self, message_set, message_parts) -> tuple[str, list]:
"""
Handle FETCH command to retrieve email data.
Args:
message_set: Message numbers/UIDs to fetch
message_parts: Parts of the message to fetch
Returns:
Email data in IMAP format
"""
if not self.current_folder or self.current_folder not in self.emails_by_folder:
return ('OK', [None])
# For simplicity, we'll just match the UID with the ID provided
uid = int(message_set.decode() if isinstance(message_set, bytes) else message_set)
# Find the email with the matching UID
for email_data in self.emails_by_folder[self.current_folder]:
if email_data["uid"] == uid:
# Generate email content
email_string = self._generate_email_string(email_data)
flags = email_data.get("flags", "\\Seen")
date = email_data.get("date_internal", "01-Jan-2023 00:00:00 +0000")
# Format the response as expected by the IMAP client
response = [(
f'{uid} (UID {uid} FLAGS ({flags}) INTERNALDATE "{date}" RFC822 '
f'{{{len(email_string)}}}'.encode(),
email_string.encode()
)]
return ('OK', response)
# No matching email found
return ('NO', [b'Email not found'])
# def uid(self, command: str, *args) -> tuple[str, list]:
# """
# Handle UID-based commands like SEARCH and FETCH.
# Args:
# command: The IMAP command (SEARCH, FETCH, etc.)
# *args: Additional arguments for the command
# Returns:
# IMAP-style response appropriate for the command
# """
# if not self.current_folder or self.current_folder not in self.emails_by_folder:
# return ('OK', [b''])
# if command == 'SEARCH':
# # For simplicity, return all UIDs in the current folder
# # A real implementation would parse the search criteria
# uids = [str(email["uid"]).encode() for email in self.emails_by_folder[self.current_folder]]
# return ('OK', [b' '.join(uids) if uids else b''])
# elif command == 'FETCH':
# # Parse the UID from the arguments
# uid_arg = args[0].decode() if isinstance(args[0], bytes) else args[0]
# uid = int(uid_arg)
# # Find the email with the matching UID
# for email_data in self.emails_by_folder[self.current_folder]:
# if email_data["uid"] == uid:
# # Generate email content
# email_string = self._generate_email_string(email_data)
# flags = email_data.get("flags", "\\Seen")
# date = email_data.get("date_internal", "01-Jan-2023 00:00:00 +0000")
# # Format the response as expected by the IMAP client
# response = [(
# f'{uid} (UID {uid} FLAGS ({flags}) INTERNALDATE "{date}" RFC822 '
# f'{{{len(email_string)}}}'.encode(),
# email_string.encode()
# )]
# return ('OK', response)
# # No matching email found
# return ('NO', [b'Email not found'])
# return ('NO', [f'Command {command} not implemented'.encode()])