#!/usr/bin/env python3 """ Script to run Celery tasks on the Docker Compose setup from your local machine. This script connects to the RabbitMQ broker running in Docker and sends tasks to the workers. It requires the same dependencies as the workers to import the task definitions. Usage: python run_celery_task.py --help python run_celery_task.py email sync-all-accounts python run_celery_task.py email sync-account --account-id 1 python run_celery_task.py ebook sync-book --file-path "/path/to/book.epub" --tags "fiction,scifi" python run_celery_task.py maintenance clean-all-collections python run_celery_task.py blogs sync-webpage --url "https://example.com" python run_celery_task.py comic sync-all-comics python run_celery_task.py forums sync-lesswrong --since-date "2025-01-01" --min-karma 10 --limit 50 --cooldown 0.5 --max-items 1000 """ import json import sys from pathlib import Path from typing import Any import click from memory.workers.tasks.blogs import ( SYNC_ALL_ARTICLE_FEEDS, SYNC_ARTICLE_FEED, SYNC_WEBPAGE, SYNC_WEBSITE_ARCHIVE, ) from memory.workers.tasks.comic import SYNC_ALL_COMICS, SYNC_COMIC, SYNC_SMBC, SYNC_XKCD from memory.workers.tasks.ebook import SYNC_BOOK from memory.workers.tasks.email import PROCESS_EMAIL, SYNC_ACCOUNT, SYNC_ALL_ACCOUNTS from memory.workers.tasks.forums import SYNC_LESSWRONG, SYNC_LESSWRONG_POST from memory.workers.tasks.maintenance import ( CLEAN_ALL_COLLECTIONS, CLEAN_COLLECTION, REINGEST_CHUNK, REINGEST_EMPTY_SOURCE_ITEMS, REINGEST_ALL_EMPTY_SOURCE_ITEMS, REINGEST_ITEM, REINGEST_MISSING_CHUNKS, UPDATE_METADATA_FOR_ITEM, UPDATE_METADATA_FOR_SOURCE_ITEMS, ) # Add the src directory to Python path so we can import memory modules sys.path.insert(0, str(Path(__file__).parent / "src")) from celery import Celery from memory.common import settings TASK_MAPPINGS = { "email": { "sync_all_accounts": SYNC_ALL_ACCOUNTS, "sync_account": SYNC_ACCOUNT, "process_message": PROCESS_EMAIL, }, "ebook": { "sync_book": SYNC_BOOK, }, "maintenance": { "clean_all_collections": CLEAN_ALL_COLLECTIONS, "clean_collection": CLEAN_COLLECTION, "reingest_missing_chunks": REINGEST_MISSING_CHUNKS, "reingest_chunk": REINGEST_CHUNK, "reingest_item": REINGEST_ITEM, "reingest_empty_source_items": REINGEST_EMPTY_SOURCE_ITEMS, "reingest_all_empty_source_items": REINGEST_ALL_EMPTY_SOURCE_ITEMS, "update_metadata_for_item": UPDATE_METADATA_FOR_ITEM, "update_metadata_for_source_items": UPDATE_METADATA_FOR_SOURCE_ITEMS, }, "blogs": { "sync_webpage": SYNC_WEBPAGE, "sync_article_feed": SYNC_ARTICLE_FEED, "sync_all_article_feeds": SYNC_ALL_ARTICLE_FEEDS, "sync_website_archive": SYNC_WEBSITE_ARCHIVE, }, "comic": { "sync_all_comics": SYNC_ALL_COMICS, "sync_smbc": SYNC_SMBC, "sync_xkcd": SYNC_XKCD, "sync_comic": SYNC_COMIC, }, "forums": { "sync_lesswrong": SYNC_LESSWRONG, "sync_lesswrong_post": SYNC_LESSWRONG_POST, }, } QUEUE_MAPPINGS = { "email": "email", "ebook": "ebooks", "photo": "photo_embed", } def create_local_celery_app() -> Celery: """Create a Celery app configured to connect to the Docker RabbitMQ.""" # Override settings for local connection to Docker services rabbitmq_url = f"amqp://{settings.RABBITMQ_USER}:{settings.RABBITMQ_PASSWORD}@localhost:15673//" app = Celery( "memory-local", broker=rabbitmq_url, backend=settings.CELERY_RESULT_BACKEND.replace( "postgres:5432", "localhost:15432" ), ) # Import task modules so they're registered app.autodiscover_tasks(["memory.workers.tasks"]) return app def run_task(app: Celery, category: str, task_name: str, **kwargs) -> str: """Run a task using the task mappings.""" if category not in TASK_MAPPINGS: raise ValueError(f"Unknown category: {category}") if task_name not in TASK_MAPPINGS[category]: raise ValueError(f"Unknown {category} task: {task_name}") task_path = TASK_MAPPINGS[category][task_name] queue_name = QUEUE_MAPPINGS.get(category) or category result = app.send_task(task_path, kwargs=kwargs, queue=queue_name) return result.id def get_task_result(app: Celery, task_id: str, timeout: int = 300) -> Any: """Get the result of a task.""" result = app.AsyncResult(task_id) try: return result.get(timeout=timeout) except Exception as e: return {"error": str(e), "status": result.status} @click.group() @click.option("--wait", is_flag=True, help="Wait for task completion and show result") @click.option( "--timeout", default=300, help="Timeout in seconds when waiting for result" ) @click.pass_context def cli(ctx, wait, timeout): """Run Celery tasks on Docker Compose setup.""" ctx.ensure_object(dict) ctx.obj["wait"] = wait ctx.obj["timeout"] = timeout try: ctx.obj["app"] = create_local_celery_app() except Exception as e: click.echo(f"Error connecting to Celery broker: {e}") click.echo( "Make sure Docker Compose is running and RabbitMQ is accessible on localhost:15673" ) sys.exit(1) def execute_task(ctx, category: str, task_name: str, **kwargs): """Helper to execute a task and handle results.""" app = ctx.obj["app"] wait = ctx.obj["wait"] timeout = ctx.obj["timeout"] # Filter out None values kwargs = {k: v for k, v in kwargs.items() if v is not None} try: task_id = run_task(app, category, task_name, **kwargs) click.echo("Task submitted successfully!") click.echo(f"Task ID: {task_id}") if wait: click.echo(f"Waiting for task completion (timeout: {timeout}s)...") result = get_task_result(app, task_id, timeout) click.echo("Task result:") click.echo(json.dumps(result, indent=2, default=str)) except Exception as e: click.echo(f"Error running task: {e}") sys.exit(1) @cli.group() @click.pass_context def email(ctx): """Email-related tasks.""" pass @email.command("sync-all-accounts") @click.option("--since-date", help="Sync items since this date (ISO format)") @click.pass_context def email_sync_all_accounts(ctx, since_date): """Sync all email accounts.""" execute_task(ctx, "email", "sync_all_accounts", since_date=since_date) @email.command("sync-account") @click.option("--account-id", type=int, required=True, help="Email account ID") @click.option("--since-date", help="Sync items since this date (ISO format)") @click.pass_context def email_sync_account(ctx, account_id, since_date): """Sync a specific email account.""" execute_task( ctx, "email", "sync_account", account_id=account_id, since_date=since_date ) @email.command("process-message") @click.option("--message-id", required=True, help="Email message ID") @click.option("--folder", help="Email folder name") @click.option("--raw-email", help="Raw email content") @click.pass_context def email_process_message(ctx, message_id, folder, raw_email): """Process a specific email message.""" execute_task( ctx, "email", "process_message", message_id=message_id, folder=folder, raw_email=raw_email, ) @cli.group() @click.pass_context def ebook(ctx): """Ebook-related tasks.""" pass @ebook.command("sync-book") @click.option("--file-path", required=True, help="Path to ebook file") @click.option("--tags", help="Comma-separated tags") @click.pass_context def ebook_sync_book(ctx, file_path, tags): """Sync an ebook.""" execute_task(ctx, "ebook", "sync_book", file_path=file_path, tags=tags) @cli.group() @click.pass_context def maintenance(ctx): """Maintenance tasks.""" pass @maintenance.command("clean-all-collections") @click.pass_context def maintenance_clean_all_collections(ctx): """Clean all collections.""" execute_task(ctx, "maintenance", "clean_all_collections") @maintenance.command("clean-collection") @click.option("--collection", required=True, help="Collection name to clean") @click.pass_context def maintenance_clean_collection(ctx, collection): """Clean a specific collection.""" execute_task(ctx, "maintenance", "clean_collection", collection=collection) @maintenance.command("reingest-missing-chunks") @click.option("--minutes-ago", type=int, help="Minutes ago to reingest chunks") @click.pass_context def maintenance_reingest_missing_chunks(ctx, minutes_ago): """Reingest missing chunks.""" execute_task(ctx, "maintenance", "reingest_missing_chunks", minutes_ago=minutes_ago) @maintenance.command("reingest-item") @click.option("--item-id", required=True, help="Item ID to reingest") @click.option("--item-type", required=True, help="Item type to reingest") @click.pass_context def maintenance_reingest_item(ctx, item_id, item_type): """Reingest a specific item.""" execute_task( ctx, "maintenance", "reingest_item", item_id=item_id, item_type=item_type ) @maintenance.command("update-metadata-for-item") @click.option("--item-id", required=True, help="Item ID to update metadata for") @click.option("--item-type", required=True, help="Item type to update metadata for") @click.pass_context def maintenance_update_metadata_for_item(ctx, item_id, item_type): """Update metadata for a specific item.""" execute_task( ctx, "maintenance", "update_metadata_for_item", item_id=item_id, item_type=item_type, ) @maintenance.command("update-metadata-for-source-items") @click.option("--item-type", required=True, help="Item type to update metadata for") @click.pass_context def maintenance_update_metadata_for_source_items(ctx, item_type): """Update metadata for all items of a specific type.""" execute_task( ctx, "maintenance", "update_metadata_for_source_items", item_type=item_type ) @maintenance.command("reingest-empty-source-items") @click.option("--item-type", required=True, help="Item type to reingest") @click.pass_context def maintenance_reingest_empty_source_items(ctx, item_type): """Reingest empty source items.""" execute_task(ctx, "maintenance", "reingest_empty_source_items", item_type=item_type) @maintenance.command("reingest-all-empty-source-items") @click.pass_context def maintenance_reingest_all_empty_source_items(ctx): """Reingest all empty source items.""" execute_task(ctx, "maintenance", "reingest_all_empty_source_items") @maintenance.command("reingest-chunk") @click.option("--chunk-id", required=True, help="Chunk ID to reingest") @click.pass_context def maintenance_reingest_chunk(ctx, chunk_id): """Reingest a specific chunk.""" execute_task(ctx, "maintenance", "reingest_chunk", chunk_id=chunk_id) @cli.group() @click.pass_context def blogs(ctx): """Blog-related tasks.""" pass @blogs.command("sync-webpage") @click.option("--url", required=True, help="URL to sync") @click.pass_context def blogs_sync_webpage(ctx, url): """Sync a webpage.""" execute_task(ctx, "blogs", "sync_webpage", url=url) @blogs.command("sync-article-feed") @click.option("--feed-id", type=int, required=True, help="Feed ID to sync") @click.pass_context def blogs_sync_article_feed(ctx, feed_id): """Sync an article feed.""" execute_task(ctx, "blogs", "sync_article_feed", feed_id=feed_id) @blogs.command("sync-all-article-feeds") @click.pass_context def blogs_sync_all_article_feeds(ctx): """Sync all article feeds.""" execute_task(ctx, "blogs", "sync_all_article_feeds") @blogs.command("sync-website-archive") @click.option("--url", required=True, help="URL to sync") @click.pass_context def blogs_sync_website_archive(ctx, url): """Sync a website archive.""" execute_task(ctx, "blogs", "sync_website_archive", url=url) @cli.group() @click.pass_context def comic(ctx): """Comic-related tasks.""" pass @comic.command("sync-all-comics") @click.pass_context def comic_sync_all_comics(ctx): """Sync all comics.""" execute_task(ctx, "comic", "sync_all_comics") @comic.command("sync-smbc") @click.pass_context def comic_sync_smbc(ctx): """Sync SMBC comics.""" execute_task(ctx, "comic", "sync_smbc") @comic.command("sync-xkcd") @click.pass_context def comic_sync_xkcd(ctx): """Sync XKCD comics.""" execute_task(ctx, "comic", "sync_xkcd") @comic.command("sync-comic") @click.option("--image-url", required=True, help="Image URL to sync") @click.option("--title", help="Comic title") @click.option("--author", help="Comic author") @click.option("--published-date", help="Comic published date") @click.pass_context def comic_sync_comic(ctx, image_url, title, author, published_date): """Sync a specific comic.""" execute_task( ctx, "comic", "sync_comic", image_url=image_url, title=title, author=author, published_date=published_date, ) @cli.group() @click.pass_context def forums(ctx): """Forum-related tasks.""" pass @forums.command("sync-lesswrong") @click.option("--since-date", help="Sync items since this date (ISO format)") @click.option("--min-karma", type=int, help="Minimum karma to sync") @click.option("--limit", type=int, help="Limit the number of posts to sync") @click.option("--cooldown", type=float, help="Cooldown between posts") @click.option("--max-items", type=int, help="Maximum number of posts to sync") @click.pass_context def forums_sync_lesswrong(ctx, since_date, min_karma, limit, cooldown, max_items): """Sync LessWrong posts.""" execute_task( ctx, "forums", "sync_lesswrong", since_date=since_date, min_karma=min_karma, limit=limit, cooldown=cooldown, max_items=max_items, ) @forums.command("sync-lesswrong-post") @click.option("--url", required=True, help="LessWrong post URL") @click.pass_context def forums_sync_lesswrong_post(ctx, url): """Sync a specific LessWrong post.""" execute_task(ctx, "forums", "sync_lesswrong_post", url=url) if __name__ == "__main__": cli()