diff --git a/docker-compose.yaml b/docker-compose.yaml index 5545432..31ef6b0 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -206,7 +206,7 @@ services: <<: *worker-base environment: <<: *worker-env - QUEUES: "backup,blogs,comic,discord,ebooks,email,forums,github,people,photo_embed,maintenance,notes,scheduler" + QUEUES: "backup,blogs,comic,discord,ebooks,email,forums,github,google,people,photo_embed,maintenance,notes,scheduler" ingest-hub: <<: *worker-base diff --git a/docker/workers/Dockerfile b/docker/workers/Dockerfile index 525ae81..3046d49 100644 --- a/docker/workers/Dockerfile +++ b/docker/workers/Dockerfile @@ -44,7 +44,7 @@ RUN git config --global user.email "${GIT_USER_EMAIL}" && \ git config --global user.name "${GIT_USER_NAME}" # Default queues to process -ENV QUEUES="backup,blogs,comic,discord,ebooks,email,forums,github,people,photo_embed,maintenance" +ENV QUEUES="backup,blogs,comic,discord,ebooks,email,forums,github,google,people,photo_embed,maintenance,notes,scheduler" ENV PYTHONPATH="/app" ENTRYPOINT ["./entry.sh"] \ No newline at end of file diff --git a/src/memory/common/collections.py b/src/memory/common/collections.py index e916672..922d888 100644 --- a/src/memory/common/collections.py +++ b/src/memory/common/collections.py @@ -85,7 +85,7 @@ ALL_COLLECTIONS: dict[str, Collection] = { "doc": { "dimension": 1024, "distance": "Cosine", - "text": False, + "text": True, "multimodal": True, }, # Observations diff --git a/src/memory/parsers/google_drive.py b/src/memory/parsers/google_drive.py index d89e38e..f05ea1d 100644 --- a/src/memory/parsers/google_drive.py +++ b/src/memory/parsers/google_drive.py @@ -106,6 +106,24 @@ class GoogleDriveClient: self._service = build("drive", "v3", credentials=creds) return self._service + def get_file_metadata(self, file_id: str) -> dict: + """Get metadata for a single file or folder.""" + service = self._get_service() + return ( + service.files() + .get( + fileId=file_id, + fields="id, name, mimeType, modifiedTime, createdTime, owners, lastModifyingUser, parents, size", + supportsAllDrives=True, + ) + .execute() + ) + + def is_folder(self, file_id: str) -> bool: + """Check if a file ID refers to a folder.""" + metadata = self.get_file_metadata(file_id) + return metadata.get("mimeType") == "application/vnd.google-apps.folder" + def list_files_in_folder( self, folder_id: str, @@ -141,6 +159,8 @@ class GoogleDriveClient: fields="nextPageToken, files(id, name, mimeType, modifiedTime, createdTime, owners, lastModifyingUser, parents, size)", pageToken=page_token, pageSize=page_size, + supportsAllDrives=True, + includeItemsFromAllDrives=True, ) .execute() ) @@ -191,7 +211,11 @@ class GoogleDriveClient: while current_id: try: file = ( - service.files().get(fileId=current_id, fields="name, parents").execute() + service.files().get( + fileId=current_id, + fields="name, parents", + supportsAllDrives=True, + ).execute() ) path_parts.insert(0, file["name"]) parents = file.get("parents", []) diff --git a/src/memory/workers/tasks/__init__.py b/src/memory/workers/tasks/__init__.py index f7c0a97..7e9cd0f 100644 --- a/src/memory/workers/tasks/__init__.py +++ b/src/memory/workers/tasks/__init__.py @@ -11,6 +11,7 @@ from memory.workers.tasks import ( email, forums, github, + google_drive, maintenance, notes, observations, @@ -28,6 +29,7 @@ __all__ = [ "email", "forums", "github", + "google_drive", "maintenance", "notes", "observations", diff --git a/src/memory/workers/tasks/google_drive.py b/src/memory/workers/tasks/google_drive.py index 1166c6a..0fb30b0 100644 --- a/src/memory/workers/tasks/google_drive.py +++ b/src/memory/workers/tasks/google_drive.py @@ -237,25 +237,66 @@ def sync_google_folder(folder_id: int, force_full: bool = False) -> dict[str, An docs_synced = 0 task_ids = [] + is_single_doc = False try: - # Get folder path for context - folder_path = client.get_folder_path(cast(str, folder.folder_id)) + google_id = cast(str, folder.folder_id) + + # Check if this is a single document or a folder + file_metadata = client.get_file_metadata(google_id) + is_folder = file_metadata.get("mimeType") == "application/vnd.google-apps.folder" + is_single_doc = not is_folder + + if is_folder: + # It's a folder - list and sync all files inside + folder_path = client.get_folder_path(google_id) + + for file_meta in client.list_files_in_folder( + google_id, + recursive=cast(bool, folder.recursive), + since=since, + ): + try: + file_data = client.fetch_file(file_meta, folder_path) + serialized = _serialize_file_data(file_data) + task = sync_google_doc.delay(folder.id, serialized) + task_ids.append(task.id) + docs_synced += 1 + except Exception as e: + logger.error(f"Error fetching file {file_meta.get('name')}: {e}") + continue + else: + # It's a single document - sync it directly + logger.info(f"Syncing single document: {file_metadata.get('name')}") + folder_path = client.get_folder_path(google_id) + + # Check if we need to sync based on modification time + if since and file_metadata.get("modifiedTime"): + from memory.parsers.google_drive import parse_google_date + modified_at = parse_google_date(file_metadata.get("modifiedTime")) + if modified_at and modified_at <= since: + logger.info(f"Document not modified since last sync, skipping") + folder.last_sync_at = now + session.commit() + return { + "status": "completed", + "sync_type": "incremental", + "folder_id": folder_id, + "folder_name": folder.folder_name, + "docs_synced": 0, + "task_ids": [], + "is_single_doc": True, + } - for file_meta in client.list_files_in_folder( - cast(str, folder.folder_id), - recursive=cast(bool, folder.recursive), - since=since, - ): try: - file_data = client.fetch_file(file_meta, folder_path) + file_data = client.fetch_file(file_metadata, folder_path) serialized = _serialize_file_data(file_data) task = sync_google_doc.delay(folder.id, serialized) task_ids.append(task.id) - docs_synced += 1 + docs_synced = 1 except Exception as e: - logger.error(f"Error fetching file {file_meta.get('name')}: {e}") - continue + logger.error(f"Error fetching document {file_metadata.get('name')}: {e}") + raise # Update sync timestamps folder.last_sync_at = now @@ -275,6 +316,7 @@ def sync_google_folder(folder_id: int, force_full: bool = False) -> dict[str, An "folder_name": folder.folder_name, "docs_synced": docs_synced, "task_ids": task_ids, + "is_single_doc": is_single_doc, }