From f729122754a4f1fdc913bdce7e01068dacef0998 Mon Sep 17 00:00:00 2001 From: mruwnik Date: Tue, 23 Dec 2025 19:11:10 +0000 Subject: [PATCH] github tracking --- .../20251223_120000_add_github_tracking.py | 182 ++++++ src/memory/api/admin.py | 61 ++ src/memory/common/celery_app.py | 7 + src/memory/common/db/models/__init__.py | 4 + src/memory/common/db/models/source_items.py | 19 + src/memory/common/db/models/sources.py | 101 ++++ src/memory/parsers/github.py | 531 ++++++++++++++++++ src/memory/workers/tasks/github.py | 390 +++++++++++++ 8 files changed, 1295 insertions(+) create mode 100644 db/migrations/versions/20251223_120000_add_github_tracking.py create mode 100644 src/memory/parsers/github.py create mode 100644 src/memory/workers/tasks/github.py diff --git a/db/migrations/versions/20251223_120000_add_github_tracking.py b/db/migrations/versions/20251223_120000_add_github_tracking.py new file mode 100644 index 0000000..a7ad9bd --- /dev/null +++ b/db/migrations/versions/20251223_120000_add_github_tracking.py @@ -0,0 +1,182 @@ +"""add github tracking + +Revision ID: b7c8d9e0f1a2 +Revises: a1b2c3d4e5f6 +Create Date: 2025-12-23 12:00:00.000000 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + + +# revision identifiers, used by Alembic. +revision: str = "b7c8d9e0f1a2" +down_revision: Union[str, None] = "a1b2c3d4e5f6" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # Create github_accounts table + op.create_table( + "github_accounts", + sa.Column("id", sa.BigInteger(), nullable=False), + sa.Column("name", sa.Text(), nullable=False), + sa.Column("auth_type", sa.Text(), nullable=False), + sa.Column("access_token", sa.Text(), nullable=True), + sa.Column("app_id", sa.BigInteger(), nullable=True), + sa.Column("installation_id", sa.BigInteger(), nullable=True), + sa.Column("private_key", sa.Text(), nullable=True), + sa.Column("active", sa.Boolean(), server_default="true", nullable=False), + sa.Column("last_sync_at", sa.DateTime(timezone=True), nullable=True), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), + sa.Column( + "updated_at", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), + sa.PrimaryKeyConstraint("id"), + sa.CheckConstraint("auth_type IN ('pat', 'app')"), + ) + op.create_index( + "github_accounts_active_idx", "github_accounts", ["active", "last_sync_at"] + ) + + # Create github_repos table + op.create_table( + "github_repos", + sa.Column("id", sa.BigInteger(), nullable=False), + sa.Column("account_id", sa.BigInteger(), nullable=False), + sa.Column("owner", sa.Text(), nullable=False), + sa.Column("name", sa.Text(), nullable=False), + sa.Column("track_issues", sa.Boolean(), server_default="true", nullable=False), + sa.Column("track_prs", sa.Boolean(), server_default="true", nullable=False), + sa.Column( + "track_comments", sa.Boolean(), server_default="true", nullable=False + ), + sa.Column( + "track_project_fields", sa.Boolean(), server_default="false", nullable=False + ), + sa.Column( + "labels_filter", sa.ARRAY(sa.Text()), server_default="{}", nullable=False + ), + sa.Column("state_filter", sa.Text(), nullable=True), + sa.Column("tags", sa.ARRAY(sa.Text()), server_default="{}", nullable=False), + sa.Column("check_interval", sa.Integer(), server_default="60", nullable=False), + sa.Column("last_sync_at", sa.DateTime(timezone=True), nullable=True), + sa.Column( + "full_sync_interval", sa.Integer(), server_default="1440", nullable=False + ), + sa.Column("last_full_sync_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("active", sa.Boolean(), server_default="true", nullable=False), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), + sa.Column( + "updated_at", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), + sa.ForeignKeyConstraint( + ["account_id"], ["github_accounts.id"], ondelete="CASCADE" + ), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint( + "account_id", "owner", "name", name="unique_repo_per_account" + ), + ) + op.create_index( + "github_repos_active_idx", "github_repos", ["active", "last_sync_at"] + ) + op.create_index("github_repos_owner_name_idx", "github_repos", ["owner", "name"]) + + # Add new columns to github_item table + op.add_column( + "github_item", + sa.Column("github_updated_at", sa.DateTime(timezone=True), nullable=True), + ) + op.add_column( + "github_item", + sa.Column("content_hash", sa.Text(), nullable=True), + ) + op.add_column( + "github_item", + sa.Column("repo_id", sa.BigInteger(), nullable=True), + ) + op.add_column( + "github_item", + sa.Column("project_status", sa.Text(), nullable=True), + ) + op.add_column( + "github_item", + sa.Column("project_priority", sa.Text(), nullable=True), + ) + op.add_column( + "github_item", + sa.Column("project_fields", postgresql.JSONB(), nullable=True), + ) + op.add_column( + "github_item", + sa.Column("assignees", sa.ARRAY(sa.Text()), nullable=True), + ) + op.add_column( + "github_item", + sa.Column("milestone", sa.Text(), nullable=True), + ) + op.add_column( + "github_item", + sa.Column("comment_count", sa.Integer(), nullable=True), + ) + + # Add foreign key and indexes for github_item + op.create_foreign_key( + "fk_github_item_repo", + "github_item", + "github_repos", + ["repo_id"], + ["id"], + ondelete="SET NULL", + ) + op.create_index("gh_github_updated_at_idx", "github_item", ["github_updated_at"]) + op.create_index("gh_repo_id_idx", "github_item", ["repo_id"]) + + +def downgrade() -> None: + # Drop indexes and foreign key from github_item + op.drop_index("gh_repo_id_idx", table_name="github_item") + op.drop_index("gh_github_updated_at_idx", table_name="github_item") + op.drop_constraint("fk_github_item_repo", "github_item", type_="foreignkey") + + # Drop new columns from github_item + op.drop_column("github_item", "comment_count") + op.drop_column("github_item", "milestone") + op.drop_column("github_item", "assignees") + op.drop_column("github_item", "project_fields") + op.drop_column("github_item", "project_priority") + op.drop_column("github_item", "project_status") + op.drop_column("github_item", "repo_id") + op.drop_column("github_item", "content_hash") + op.drop_column("github_item", "github_updated_at") + + # Drop github_repos table + op.drop_index("github_repos_owner_name_idx", table_name="github_repos") + op.drop_index("github_repos_active_idx", table_name="github_repos") + op.drop_table("github_repos") + + # Drop github_accounts table + op.drop_index("github_accounts_active_idx", table_name="github_accounts") + op.drop_table("github_accounts") diff --git a/src/memory/api/admin.py b/src/memory/api/admin.py index b7c7cf2..0d92861 100644 --- a/src/memory/api/admin.py +++ b/src/memory/api/admin.py @@ -14,6 +14,9 @@ from memory.common.db.models import ( BookSection, Chunk, Comic, + GithubAccount, + GithubItem, + GithubRepo, MCPServer, DiscordMessage, EmailAccount, @@ -337,6 +340,61 @@ class ScheduledLLMCallAdmin(ModelView, model=ScheduledLLMCall): column_sortable_list = ["executed_at", "scheduled_time", "created_at", "updated_at"] +class GithubAccountAdmin(ModelView, model=GithubAccount): + column_list = [ + "id", + "name", + "auth_type", + "active", + "last_sync_at", + "created_at", + "updated_at", + ] + column_searchable_list = ["name", "id"] + # Hide sensitive columns from display + column_exclude_list = ["access_token", "private_key"] + form_excluded_columns = ["repos"] + + +class GithubRepoAdmin(ModelView, model=GithubRepo): + column_list = [ + "id", + "account", + "owner", + "name", + "track_issues", + "track_prs", + "track_comments", + "track_project_fields", + "labels_filter", + "tags", + "check_interval", + "full_sync_interval", + "active", + "last_sync_at", + "last_full_sync_at", + "created_at", + ] + column_searchable_list = ["owner", "name", "id"] + + +class GithubItemAdmin(ModelView, model=GithubItem): + column_list = source_columns( + GithubItem, + "kind", + "repo_path", + "number", + "title", + "state", + "author", + "labels", + "github_updated_at", + "project_status", + ) + column_searchable_list = ["title", "repo_path", "author", "id", "number"] + column_sortable_list = ["github_updated_at", "created_at"] + + def setup_admin(admin: Admin): """Add all admin views to the admin instance with OAuth protection.""" admin.add_view(SourceItemAdmin) @@ -361,3 +419,6 @@ def setup_admin(admin: Admin): admin.add_view(DiscordChannelAdmin) admin.add_view(MCPServerAdmin) admin.add_view(ScheduledLLMCallAdmin) + admin.add_view(GithubAccountAdmin) + admin.add_view(GithubRepoAdmin) + admin.add_view(GithubItemAdmin) diff --git a/src/memory/common/celery_app.py b/src/memory/common/celery_app.py index fd97a42..f2a4dd0 100644 --- a/src/memory/common/celery_app.py +++ b/src/memory/common/celery_app.py @@ -14,6 +14,7 @@ OBSERVATIONS_ROOT = "memory.workers.tasks.observations" SCHEDULED_CALLS_ROOT = "memory.workers.tasks.scheduled_calls" DISCORD_ROOT = "memory.workers.tasks.discord" BACKUP_ROOT = "memory.workers.tasks.backup" +GITHUB_ROOT = "memory.workers.tasks.github" ADD_DISCORD_MESSAGE = f"{DISCORD_ROOT}.add_discord_message" EDIT_DISCORD_MESSAGE = f"{DISCORD_ROOT}.edit_discord_message" PROCESS_DISCORD_MESSAGE = f"{DISCORD_ROOT}.process_discord_message" @@ -60,6 +61,11 @@ RUN_SCHEDULED_CALLS = f"{SCHEDULED_CALLS_ROOT}.run_scheduled_calls" BACKUP_PATH = f"{BACKUP_ROOT}.backup_path" BACKUP_ALL = f"{BACKUP_ROOT}.backup_all" +# GitHub tasks +SYNC_GITHUB_REPO = f"{GITHUB_ROOT}.sync_github_repo" +SYNC_ALL_GITHUB_REPOS = f"{GITHUB_ROOT}.sync_all_github_repos" +SYNC_GITHUB_ITEM = f"{GITHUB_ROOT}.sync_github_item" + def get_broker_url() -> str: protocol = settings.CELERY_BROKER_TYPE @@ -115,6 +121,7 @@ app.conf.update( "queue": f"{settings.CELERY_QUEUE_PREFIX}-scheduler" }, f"{BACKUP_ROOT}.*": {"queue": f"{settings.CELERY_QUEUE_PREFIX}-backup"}, + f"{GITHUB_ROOT}.*": {"queue": f"{settings.CELERY_QUEUE_PREFIX}-github"}, }, ) diff --git a/src/memory/common/db/models/__init__.py b/src/memory/common/db/models/__init__.py index 7eb5887..2d0966c 100644 --- a/src/memory/common/db/models/__init__.py +++ b/src/memory/common/db/models/__init__.py @@ -50,6 +50,8 @@ from memory.common.db.models.sources import ( Book, ArticleFeed, EmailAccount, + GithubAccount, + GithubRepo, ) from memory.common.db.models.users import ( User, @@ -107,6 +109,8 @@ __all__ = [ "Book", "ArticleFeed", "EmailAccount", + "GithubAccount", + "GithubRepo", "DiscordServer", "DiscordChannel", "DiscordUser", diff --git a/src/memory/common/db/models/source_items.py b/src/memory/common/db/models/source_items.py index d51c002..e87e17f 100644 --- a/src/memory/common/db/models/source_items.py +++ b/src/memory/common/db/models/source_items.py @@ -824,6 +824,23 @@ class GithubItem(SourceItem): payload = Column(JSONB) + # New fields for change detection and tracking + github_updated_at = Column(DateTime(timezone=True)) # GitHub's updated_at + content_hash = Column(Text) # Hash of body + comments for change detection + repo_id = Column( + BigInteger, ForeignKey("github_repos.id", ondelete="SET NULL"), nullable=True + ) + + # GitHub Projects v2 fields + project_status = Column(Text, nullable=True) + project_priority = Column(Text, nullable=True) + project_fields = Column(JSONB, nullable=True) # All project field values + + # Additional tracking + assignees = Column(ARRAY(Text), nullable=True) + milestone = Column(Text, nullable=True) + comment_count = Column(Integer, nullable=True) + __mapper_args__ = { "polymorphic_identity": "github_item", } @@ -833,6 +850,8 @@ class GithubItem(SourceItem): Index("gh_repo_kind_idx", "repo_path", "kind"), Index("gh_issue_lookup_idx", "repo_path", "kind", "number"), Index("gh_labels_idx", "labels", postgresql_using="gin"), + Index("gh_github_updated_at_idx", "github_updated_at"), + Index("gh_repo_id_idx", "repo_id"), ) diff --git a/src/memory/common/db/models/sources.py b/src/memory/common/db/models/sources.py index 77e0c3e..3f504d2 100644 --- a/src/memory/common/db/models/sources.py +++ b/src/memory/common/db/models/sources.py @@ -8,12 +8,14 @@ from sqlalchemy import ( ARRAY, BigInteger, Boolean, + CheckConstraint, Column, DateTime, ForeignKey, Index, Integer, Text, + UniqueConstraint, func, ) from sqlalchemy.dialects.postgresql import JSONB @@ -125,3 +127,102 @@ class EmailAccount(Base): Index("email_accounts_active_idx", "active", "last_sync_at"), Index("email_accounts_tags_idx", "tags", postgresql_using="gin"), ) + + +class GithubAccount(Base): + """GitHub authentication credentials for API access.""" + + __tablename__ = "github_accounts" + + id = Column(BigInteger, primary_key=True) + name = Column(Text, nullable=False) # Display name + + # Authentication - support both PAT and GitHub App + auth_type = Column(Text, nullable=False) # 'pat' or 'app' + + # For Personal Access Token auth + access_token = Column(Text, nullable=True) # PAT + + # For GitHub App auth + app_id = Column(BigInteger, nullable=True) + installation_id = Column(BigInteger, nullable=True) + private_key = Column(Text, nullable=True) # PEM key + + # Status + active = Column(Boolean, nullable=False, server_default="true") + last_sync_at = Column(DateTime(timezone=True), nullable=True) + created_at = Column( + DateTime(timezone=True), nullable=False, server_default=func.now() + ) + updated_at = Column( + DateTime(timezone=True), nullable=False, server_default=func.now() + ) + + # Relationship to repos + repos = relationship( + "GithubRepo", back_populates="account", cascade="all, delete-orphan" + ) + + __table_args__ = ( + CheckConstraint("auth_type IN ('pat', 'app')"), + Index("github_accounts_active_idx", "active", "last_sync_at"), + ) + + +class GithubRepo(Base): + """Tracked GitHub repository configuration.""" + + __tablename__ = "github_repos" + + id = Column(BigInteger, primary_key=True) + account_id = Column( + BigInteger, ForeignKey("github_accounts.id", ondelete="CASCADE"), nullable=False + ) + + # Repository identification + owner = Column(Text, nullable=False) # org or user + name = Column(Text, nullable=False) # repo name + + # What to track + track_issues = Column(Boolean, nullable=False, server_default="true") + track_prs = Column(Boolean, nullable=False, server_default="true") + track_comments = Column(Boolean, nullable=False, server_default="true") + track_project_fields = Column(Boolean, nullable=False, server_default="false") + + # Filtering + labels_filter = Column( + ARRAY(Text), nullable=False, server_default="{}" + ) # Empty = all labels + state_filter = Column(Text, nullable=True) # 'open', 'closed', or None for all + + # Tags to apply to all items from this repo + tags = Column(ARRAY(Text), nullable=False, server_default="{}") + + # Sync configuration + check_interval = Column(Integer, nullable=False, server_default="60") # Minutes + last_sync_at = Column(DateTime(timezone=True), nullable=True) + # Full sync interval for catching project field changes (minutes, 0 = disabled) + full_sync_interval = Column(Integer, nullable=False, server_default="1440") # Daily + last_full_sync_at = Column(DateTime(timezone=True), nullable=True) + + # Status + active = Column(Boolean, nullable=False, server_default="true") + created_at = Column( + DateTime(timezone=True), nullable=False, server_default=func.now() + ) + updated_at = Column( + DateTime(timezone=True), nullable=False, server_default=func.now() + ) + + # Relationships + account = relationship("GithubAccount", back_populates="repos") + + __table_args__ = ( + UniqueConstraint("account_id", "owner", "name", name="unique_repo_per_account"), + Index("github_repos_active_idx", "active", "last_sync_at"), + Index("github_repos_owner_name_idx", "owner", "name"), + ) + + @property + def repo_path(self) -> str: + return f"{self.owner}/{self.name}" diff --git a/src/memory/parsers/github.py b/src/memory/parsers/github.py new file mode 100644 index 0000000..ae0c8fc --- /dev/null +++ b/src/memory/parsers/github.py @@ -0,0 +1,531 @@ +"""GitHub API client for fetching issues, PRs, comments, and project fields.""" + +import hashlib +import logging +import time +from dataclasses import dataclass +from datetime import datetime +from typing import Any, Generator, TypedDict + +import requests + +logger = logging.getLogger(__name__) + +# GitHub REST API base URL +GITHUB_API_URL = "https://api.github.com" +GITHUB_GRAPHQL_URL = "https://api.github.com/graphql" + +# Rate limit handling +RATE_LIMIT_REMAINING_HEADER = "X-RateLimit-Remaining" +RATE_LIMIT_RESET_HEADER = "X-RateLimit-Reset" +MIN_RATE_LIMIT_REMAINING = 10 + + +@dataclass +class GithubCredentials: + """Credentials for GitHub API access.""" + + auth_type: str # 'pat' or 'app' + access_token: str | None = None + app_id: int | None = None + installation_id: int | None = None + private_key: str | None = None + + +class GithubComment(TypedDict): + """A comment on an issue or PR.""" + + id: int + author: str + body: str + created_at: str + updated_at: str + + +class GithubIssueData(TypedDict): + """Parsed issue/PR data ready for storage.""" + + kind: str # 'issue' or 'pr' + number: int + title: str + body: str + state: str + author: str + labels: list[str] + assignees: list[str] + milestone: str | None + created_at: datetime + closed_at: datetime | None + merged_at: datetime | None # PRs only + github_updated_at: datetime + comment_count: int + comments: list[GithubComment] + diff_summary: str | None # PRs only + project_fields: dict[str, Any] | None + content_hash: str + + +def parse_github_date(date_str: str | None) -> datetime | None: + """Parse ISO date string from GitHub API to datetime.""" + if not date_str: + return None + # GitHub uses ISO format with Z suffix + return datetime.fromisoformat(date_str.replace("Z", "+00:00")) + + +def compute_content_hash(body: str, comments: list[GithubComment]) -> str: + """Compute SHA256 hash of issue/PR content for change detection.""" + content_parts = [body or ""] + for comment in comments: + content_parts.append(comment["body"]) + return hashlib.sha256("\n".join(content_parts).encode()).hexdigest() + + +class GithubClient: + """Client for GitHub REST and GraphQL APIs.""" + + def __init__(self, credentials: GithubCredentials): + self.credentials = credentials + self.session = requests.Session() + self._setup_auth() + + def _setup_auth(self) -> None: + if self.credentials.auth_type == "pat": + self.session.headers["Authorization"] = ( + f"Bearer {self.credentials.access_token}" + ) + elif self.credentials.auth_type == "app": + # Generate JWT and get installation token + token = self._get_installation_token() + self.session.headers["Authorization"] = f"Bearer {token}" + + self.session.headers["Accept"] = "application/vnd.github+json" + self.session.headers["X-GitHub-Api-Version"] = "2022-11-28" + self.session.headers["User-Agent"] = "memory-kb-github-sync" + + def _get_installation_token(self) -> str: + """Get installation access token for GitHub App.""" + try: + import jwt + except ImportError: + raise ImportError("PyJWT is required for GitHub App authentication") + + if not self.credentials.app_id or not self.credentials.private_key: + raise ValueError("app_id and private_key required for app auth") + if not self.credentials.installation_id: + raise ValueError("installation_id required for app auth") + + now = int(time.time()) + payload = { + "iat": now - 60, + "exp": now + 600, + "iss": self.credentials.app_id, + } + jwt_token = jwt.encode( + payload, self.credentials.private_key, algorithm="RS256" + ) + + response = requests.post( + f"{GITHUB_API_URL}/app/installations/{self.credentials.installation_id}/access_tokens", + headers={ + "Authorization": f"Bearer {jwt_token}", + "Accept": "application/vnd.github+json", + }, + timeout=30, + ) + response.raise_for_status() + return response.json()["token"] + + def _handle_rate_limit(self, response: requests.Response) -> None: + """Check rate limits and sleep if necessary.""" + remaining = int(response.headers.get(RATE_LIMIT_REMAINING_HEADER, 100)) + if remaining < MIN_RATE_LIMIT_REMAINING: + reset_time = int(response.headers.get(RATE_LIMIT_RESET_HEADER, 0)) + sleep_time = max(reset_time - time.time(), 0) + 1 + logger.warning(f"Rate limit low ({remaining}), sleeping for {sleep_time}s") + time.sleep(sleep_time) + + def fetch_issues( + self, + owner: str, + repo: str, + since: datetime | None = None, + state: str = "all", + labels: list[str] | None = None, + ) -> Generator[GithubIssueData, None, None]: + """Fetch issues from a repository with pagination.""" + params: dict[str, Any] = { + "state": state, + "sort": "updated", + "direction": "desc", + "per_page": 100, + } + if since: + params["since"] = since.isoformat() + if labels: + params["labels"] = ",".join(labels) + + page = 1 + while True: + params["page"] = page + response = self.session.get( + f"{GITHUB_API_URL}/repos/{owner}/{repo}/issues", + params=params, + timeout=30, + ) + response.raise_for_status() + self._handle_rate_limit(response) + + issues = response.json() + if not issues: + break + + for issue in issues: + # Skip PRs (they're included in issues endpoint) + if "pull_request" in issue: + continue + + yield self._parse_issue(owner, repo, issue) + + page += 1 + + def fetch_prs( + self, + owner: str, + repo: str, + since: datetime | None = None, + state: str = "all", + ) -> Generator[GithubIssueData, None, None]: + """Fetch pull requests from a repository with pagination.""" + params: dict[str, Any] = { + "state": state, + "sort": "updated", + "direction": "desc", + "per_page": 100, + } + + page = 1 + while True: + params["page"] = page + response = self.session.get( + f"{GITHUB_API_URL}/repos/{owner}/{repo}/pulls", + params=params, + timeout=30, + ) + response.raise_for_status() + self._handle_rate_limit(response) + + prs = response.json() + if not prs: + break + + for pr in prs: + updated_at = parse_github_date(pr["updated_at"]) + if since and updated_at and updated_at < since: + return # Stop if we've gone past our since date + + yield self._parse_pr(owner, repo, pr) + + page += 1 + + def fetch_comments( + self, + owner: str, + repo: str, + issue_number: int, + ) -> list[GithubComment]: + """Fetch all comments for an issue/PR.""" + comments: list[GithubComment] = [] + page = 1 + + while True: + response = self.session.get( + f"{GITHUB_API_URL}/repos/{owner}/{repo}/issues/{issue_number}/comments", + params={"page": page, "per_page": 100}, + timeout=30, + ) + response.raise_for_status() + self._handle_rate_limit(response) + + page_comments = response.json() + if not page_comments: + break + + comments.extend( + [ + GithubComment( + id=c["id"], + author=c["user"]["login"] if c.get("user") else "ghost", + body=c.get("body", ""), + created_at=c["created_at"], + updated_at=c["updated_at"], + ) + for c in page_comments + ] + ) + page += 1 + + return comments + + def fetch_project_fields( + self, + owner: str, + repo: str, + issue_number: int, + ) -> dict[str, Any] | None: + """Fetch GitHub Projects v2 field values using GraphQL.""" + query = """ + query($owner: String!, $repo: String!, $number: Int!) { + repository(owner: $owner, name: $repo) { + issue(number: $number) { + projectItems(first: 10) { + nodes { + project { title } + fieldValues(first: 20) { + nodes { + ... on ProjectV2ItemFieldTextValue { + text + field { ... on ProjectV2Field { name } } + } + ... on ProjectV2ItemFieldNumberValue { + number + field { ... on ProjectV2Field { name } } + } + ... on ProjectV2ItemFieldDateValue { + date + field { ... on ProjectV2Field { name } } + } + ... on ProjectV2ItemFieldSingleSelectValue { + name + field { ... on ProjectV2SingleSelectField { name } } + } + ... on ProjectV2ItemFieldIterationValue { + title + field { ... on ProjectV2IterationField { name } } + } + } + } + } + } + } + } + } + """ + + try: + response = self.session.post( + GITHUB_GRAPHQL_URL, + json={ + "query": query, + "variables": {"owner": owner, "repo": repo, "number": issue_number}, + }, + timeout=30, + ) + response.raise_for_status() + except requests.RequestException as e: + logger.warning(f"Failed to fetch project fields: {e}") + return None + + data = response.json() + if "errors" in data: + logger.warning(f"GraphQL errors: {data['errors']}") + return None + + # Parse project fields + issue_data = ( + data.get("data", {}).get("repository", {}).get("issue", {}) + ) + if not issue_data: + return None + + items = issue_data.get("projectItems", {}).get("nodes", []) + if not items: + return None + + fields: dict[str, Any] = {} + for item in items: + project_name = item.get("project", {}).get("title", "unknown") + for field_value in item.get("fieldValues", {}).get("nodes", []): + field_info = field_value.get("field", {}) + field_name = field_info.get("name") if field_info else None + if not field_name: + continue + + # Extract value based on type + value = ( + field_value.get("text") + or field_value.get("number") + or field_value.get("date") + or field_value.get("name") # Single select + or field_value.get("title") # Iteration + ) + + if value is not None: + fields[f"{project_name}.{field_name}"] = value + + return fields if fields else None + + def fetch_pr_project_fields( + self, + owner: str, + repo: str, + pr_number: int, + ) -> dict[str, Any] | None: + """Fetch GitHub Projects v2 field values for a PR using GraphQL.""" + query = """ + query($owner: String!, $repo: String!, $number: Int!) { + repository(owner: $owner, name: $repo) { + pullRequest(number: $number) { + projectItems(first: 10) { + nodes { + project { title } + fieldValues(first: 20) { + nodes { + ... on ProjectV2ItemFieldTextValue { + text + field { ... on ProjectV2Field { name } } + } + ... on ProjectV2ItemFieldNumberValue { + number + field { ... on ProjectV2Field { name } } + } + ... on ProjectV2ItemFieldDateValue { + date + field { ... on ProjectV2Field { name } } + } + ... on ProjectV2ItemFieldSingleSelectValue { + name + field { ... on ProjectV2SingleSelectField { name } } + } + ... on ProjectV2ItemFieldIterationValue { + title + field { ... on ProjectV2IterationField { name } } + } + } + } + } + } + } + } + } + """ + + try: + response = self.session.post( + GITHUB_GRAPHQL_URL, + json={ + "query": query, + "variables": {"owner": owner, "repo": repo, "number": pr_number}, + }, + timeout=30, + ) + response.raise_for_status() + except requests.RequestException as e: + logger.warning(f"Failed to fetch PR project fields: {e}") + return None + + data = response.json() + if "errors" in data: + logger.warning(f"GraphQL errors: {data['errors']}") + return None + + # Parse project fields + pr_data = ( + data.get("data", {}).get("repository", {}).get("pullRequest", {}) + ) + if not pr_data: + return None + + items = pr_data.get("projectItems", {}).get("nodes", []) + if not items: + return None + + fields: dict[str, Any] = {} + for item in items: + project_name = item.get("project", {}).get("title", "unknown") + for field_value in item.get("fieldValues", {}).get("nodes", []): + field_info = field_value.get("field", {}) + field_name = field_info.get("name") if field_info else None + if not field_name: + continue + + value = ( + field_value.get("text") + or field_value.get("number") + or field_value.get("date") + or field_value.get("name") + or field_value.get("title") + ) + + if value is not None: + fields[f"{project_name}.{field_name}"] = value + + return fields if fields else None + + def _parse_issue( + self, owner: str, repo: str, issue: dict[str, Any] + ) -> GithubIssueData: + """Parse raw issue data into structured format.""" + comments = self.fetch_comments(owner, repo, issue["number"]) + body = issue.get("body") or "" + + return GithubIssueData( + kind="issue", + number=issue["number"], + title=issue["title"], + body=body, + state=issue["state"], + author=issue["user"]["login"] if issue.get("user") else "ghost", + labels=[label["name"] for label in issue.get("labels", [])], + assignees=[a["login"] for a in issue.get("assignees", [])], + milestone=( + issue["milestone"]["title"] if issue.get("milestone") else None + ), + created_at=parse_github_date(issue["created_at"]), # type: ignore + closed_at=parse_github_date(issue.get("closed_at")), + merged_at=None, + github_updated_at=parse_github_date(issue["updated_at"]), # type: ignore + comment_count=len(comments), + comments=comments, + diff_summary=None, + project_fields=None, # Fetched separately if enabled + content_hash=compute_content_hash(body, comments), + ) + + def _parse_pr( + self, owner: str, repo: str, pr: dict[str, Any] + ) -> GithubIssueData: + """Parse raw PR data into structured format.""" + comments = self.fetch_comments(owner, repo, pr["number"]) + body = pr.get("body") or "" + + # Get diff summary (truncated) + diff_summary = None + if diff_url := pr.get("diff_url"): + try: + diff_response = self.session.get(diff_url, timeout=30) + if diff_response.ok: + diff_summary = diff_response.text[:5000] # Truncate large diffs + except Exception as e: + logger.warning(f"Failed to fetch diff: {e}") + + return GithubIssueData( + kind="pr", + number=pr["number"], + title=pr["title"], + body=body, + state=pr["state"], + author=pr["user"]["login"] if pr.get("user") else "ghost", + labels=[label["name"] for label in pr.get("labels", [])], + assignees=[a["login"] for a in pr.get("assignees", [])], + milestone=pr["milestone"]["title"] if pr.get("milestone") else None, + created_at=parse_github_date(pr["created_at"]), # type: ignore + closed_at=parse_github_date(pr.get("closed_at")), + merged_at=parse_github_date(pr.get("merged_at")), + github_updated_at=parse_github_date(pr["updated_at"]), # type: ignore + comment_count=len(comments), + comments=comments, + diff_summary=diff_summary, + project_fields=None, # Fetched separately if enabled + content_hash=compute_content_hash(body, comments), + ) diff --git a/src/memory/workers/tasks/github.py b/src/memory/workers/tasks/github.py new file mode 100644 index 0000000..c61785f --- /dev/null +++ b/src/memory/workers/tasks/github.py @@ -0,0 +1,390 @@ +"""Celery tasks for GitHub issue/PR syncing.""" + +import logging +from datetime import datetime, timedelta, timezone +from typing import Any, cast + +from memory.common import qdrant +from memory.common.celery_app import ( + app, + SYNC_GITHUB_REPO, + SYNC_ALL_GITHUB_REPOS, + SYNC_GITHUB_ITEM, +) +from memory.common.db.connection import make_session +from memory.common.db.models import GithubItem +from memory.common.db.models.sources import GithubAccount, GithubRepo +from memory.parsers.github import ( + GithubClient, + GithubCredentials, + GithubIssueData, +) +from memory.workers.tasks.content_processing import ( + create_content_hash, + create_task_result, + process_content_item, + safe_task_execution, +) + +logger = logging.getLogger(__name__) + + +def _build_content(issue_data: GithubIssueData) -> str: + """Build searchable content from issue/PR data.""" + content_parts = [f"# {issue_data['title']}", issue_data["body"]] + for comment in issue_data["comments"]: + content_parts.append(f"\n---\n**{comment['author']}**: {comment['body']}") + return "\n\n".join(content_parts) + + +def _create_github_item( + repo: GithubRepo, + issue_data: GithubIssueData, +) -> GithubItem: + """Create a GithubItem from parsed issue/PR data.""" + content = _build_content(issue_data) + + # Extract project status/priority if available + project_fields = issue_data.get("project_fields") or {} + project_status = None + project_priority = None + for key, value in project_fields.items(): + key_lower = key.lower() + if "status" in key_lower and project_status is None: + project_status = str(value) + elif "priority" in key_lower and project_priority is None: + project_priority = str(value) + + repo_tags = cast(list[str], repo.tags) or [] + + return GithubItem( + modality="text", + sha256=create_content_hash(content), + content=content, + kind=issue_data["kind"], + repo_path=repo.repo_path, + repo_id=repo.id, + number=issue_data["number"], + title=issue_data["title"], + state=issue_data["state"], + author=issue_data["author"], + labels=issue_data["labels"], + assignees=issue_data["assignees"], + milestone=issue_data["milestone"], + created_at=issue_data["created_at"], + closed_at=issue_data["closed_at"], + merged_at=issue_data["merged_at"], + github_updated_at=issue_data["github_updated_at"], + comment_count=issue_data["comment_count"], + diff_summary=issue_data["diff_summary"], + content_hash=issue_data["content_hash"], + project_status=project_status, + project_priority=project_priority, + project_fields=project_fields if project_fields else None, + tags=repo_tags + issue_data["labels"], + size=len(content.encode("utf-8")), + mime_type="text/markdown", + ) + + +def _needs_reindex(existing: GithubItem, new_data: GithubIssueData) -> bool: + """Check if an existing item needs reindexing based on content changes.""" + # Compare content hash + if existing.content_hash != new_data["content_hash"]: + return True + + # Check if github_updated_at is newer + existing_updated = cast(datetime | None, existing.github_updated_at) + if existing_updated and new_data["github_updated_at"] > existing_updated: + return True + + # Check project fields changes + existing_fields = cast(dict | None, existing.project_fields) or {} + new_fields = new_data.get("project_fields") or {} + if existing_fields != new_fields: + return True + + return False + + +def _update_existing_item( + session: Any, + existing: GithubItem, + repo: GithubRepo, + issue_data: GithubIssueData, +) -> dict[str, Any]: + """Update an existing GithubItem and reindex if content changed.""" + if not _needs_reindex(existing, issue_data): + return create_task_result(existing, "unchanged") + + logger.info( + f"Content changed for {repo.repo_path}#{issue_data['number']}, reindexing" + ) + + # Delete old chunks from Qdrant + chunk_ids = [str(c.id) for c in existing.chunks if c.id] + if chunk_ids: + try: + client = qdrant.get_qdrant_client() + qdrant.delete_points(client, cast(str, existing.modality), chunk_ids) + except IOError as e: + logger.error(f"Error deleting chunks: {e}") + + # Delete chunks from database + for chunk in existing.chunks: + session.delete(chunk) + + # Update the existing item with new data + content = _build_content(issue_data) + existing.content = content # type: ignore + existing.sha256 = create_content_hash(content) # type: ignore + existing.title = issue_data["title"] # type: ignore + existing.state = issue_data["state"] # type: ignore + existing.labels = issue_data["labels"] # type: ignore + existing.assignees = issue_data["assignees"] # type: ignore + existing.milestone = issue_data["milestone"] # type: ignore + existing.closed_at = issue_data["closed_at"] # type: ignore + existing.merged_at = issue_data["merged_at"] # type: ignore + existing.github_updated_at = issue_data["github_updated_at"] # type: ignore + existing.comment_count = issue_data["comment_count"] # type: ignore + existing.diff_summary = issue_data["diff_summary"] # type: ignore + existing.content_hash = issue_data["content_hash"] # type: ignore + existing.size = len(content.encode("utf-8")) # type: ignore + + # Update project fields + project_fields = issue_data.get("project_fields") or {} + existing.project_fields = project_fields if project_fields else None # type: ignore + for key, value in project_fields.items(): + key_lower = key.lower() + if "status" in key_lower: + existing.project_status = str(value) # type: ignore + elif "priority" in key_lower: + existing.project_priority = str(value) # type: ignore + + # Update tags + repo_tags = cast(list[str], repo.tags) or [] + existing.tags = repo_tags + issue_data["labels"] # type: ignore + + session.flush() + + # Re-embed and push to Qdrant + return process_content_item(existing, session) + + +def _serialize_issue_data(data: GithubIssueData) -> dict[str, Any]: + """Serialize GithubIssueData for Celery task passing.""" + return { + **data, + "created_at": data["created_at"].isoformat() if data["created_at"] else None, + "closed_at": data["closed_at"].isoformat() if data["closed_at"] else None, + "merged_at": data["merged_at"].isoformat() if data["merged_at"] else None, + "github_updated_at": ( + data["github_updated_at"].isoformat() + if data["github_updated_at"] + else None + ), + "comments": [ + { + "id": c["id"], + "author": c["author"], + "body": c["body"], + "created_at": c["created_at"], + "updated_at": c["updated_at"], + } + for c in data["comments"] + ], + } + + +def _deserialize_issue_data(data: dict[str, Any]) -> GithubIssueData: + """Deserialize issue data from Celery task.""" + from memory.parsers.github import parse_github_date + + return GithubIssueData( + kind=data["kind"], + number=data["number"], + title=data["title"], + body=data["body"], + state=data["state"], + author=data["author"], + labels=data["labels"], + assignees=data["assignees"], + milestone=data["milestone"], + created_at=parse_github_date(data["created_at"]), # type: ignore + closed_at=parse_github_date(data.get("closed_at")), + merged_at=parse_github_date(data.get("merged_at")), + github_updated_at=parse_github_date(data["github_updated_at"]), # type: ignore + comment_count=data["comment_count"], + comments=data["comments"], + diff_summary=data.get("diff_summary"), + project_fields=data.get("project_fields"), + content_hash=data["content_hash"], + ) + + +@app.task(name=SYNC_GITHUB_ITEM) +@safe_task_execution +def sync_github_item( + repo_id: int, + issue_data_serialized: dict[str, Any], +) -> dict[str, Any]: + """Sync a single GitHub issue or PR.""" + issue_data = _deserialize_issue_data(issue_data_serialized) + logger.info(f"Syncing {issue_data['kind']} from repo {repo_id}: #{issue_data['number']}") + + with make_session() as session: + repo = session.get(GithubRepo, repo_id) + if not repo: + return {"status": "error", "error": "Repo not found"} + + # Check for existing item + existing = ( + session.query(GithubItem) + .filter( + GithubItem.repo_path == repo.repo_path, + GithubItem.number == issue_data["number"], + GithubItem.kind == issue_data["kind"], + ) + .first() + ) + + if existing: + return _update_existing_item(session, existing, repo, issue_data) + + # Create new item + github_item = _create_github_item(repo, issue_data) + return process_content_item(github_item, session) + + +@app.task(name=SYNC_GITHUB_REPO) +@safe_task_execution +def sync_github_repo(repo_id: int, force_full: bool = False) -> dict[str, Any]: + """Sync all issues and PRs for a repository.""" + logger.info(f"Syncing GitHub repo {repo_id}") + + with make_session() as session: + repo = session.get(GithubRepo, repo_id) + if not repo or not cast(bool, repo.active): + return {"status": "error", "error": "Repo not found or inactive"} + + account = repo.account + if not account or not cast(bool, account.active): + return {"status": "error", "error": "Account not found or inactive"} + + now = datetime.now(timezone.utc) + last_sync = cast(datetime | None, repo.last_sync_at) + last_full_sync = cast(datetime | None, repo.last_full_sync_at) + full_sync_interval = cast(int, repo.full_sync_interval) + track_project_fields = cast(bool, repo.track_project_fields) + + # Determine if we need a full sync for project fields + # Only do this if track_project_fields is enabled and interval > 0 + needs_full_sync = False + if track_project_fields and full_sync_interval > 0: + if last_full_sync is None: + needs_full_sync = True + elif now - last_full_sync >= timedelta(minutes=full_sync_interval): + needs_full_sync = True + + # Check if sync is needed based on interval + if last_sync and not force_full and not needs_full_sync: + check_interval = cast(int, repo.check_interval) + if now - last_sync < timedelta(minutes=check_interval): + return {"status": "skipped_recent_check", "repo_id": repo_id} + + # Create GitHub client + credentials = GithubCredentials( + auth_type=cast(str, account.auth_type), + access_token=cast(str | None, account.access_token), + app_id=cast(int | None, account.app_id), + installation_id=cast(int | None, account.installation_id), + private_key=cast(str | None, account.private_key), + ) + client = GithubClient(credentials) + + owner = cast(str, repo.owner) + name = cast(str, repo.name) + labels = cast(list[str], repo.labels_filter) or None + state = cast(str | None, repo.state_filter) or "all" + + # For full syncs triggered by full_sync_interval, only sync open issues + # (closed issues rarely have project field changes that matter) + if needs_full_sync and not force_full: + since = None + state = "open" + logger.info(f"Performing full sync of open issues for {repo.repo_path}") + elif force_full: + since = None + else: + since = last_sync + + issues_synced = 0 + prs_synced = 0 + task_ids = [] + + # Sync issues + if cast(bool, repo.track_issues): + for issue_data in client.fetch_issues(owner, name, since, state, labels): + # Fetch project fields if enabled + if track_project_fields: + issue_data["project_fields"] = client.fetch_project_fields( + owner, name, issue_data["number"] + ) + + serialized = _serialize_issue_data(issue_data) + task_id = sync_github_item.delay(repo.id, serialized) + task_ids.append(task_id.id) + issues_synced += 1 + + # Sync PRs + if cast(bool, repo.track_prs): + for pr_data in client.fetch_prs(owner, name, since, state): + # Fetch project fields if enabled + if track_project_fields: + pr_data["project_fields"] = client.fetch_pr_project_fields( + owner, name, pr_data["number"] + ) + + serialized = _serialize_issue_data(pr_data) + task_id = sync_github_item.delay(repo.id, serialized) + task_ids.append(task_id.id) + prs_synced += 1 + + # Update sync timestamps + repo.last_sync_at = now # type: ignore + if needs_full_sync or force_full: + repo.last_full_sync_at = now # type: ignore + session.commit() + + return { + "status": "completed", + "sync_type": "full" if (needs_full_sync or force_full) else "incremental", + "repo_id": repo_id, + "repo_path": repo.repo_path, + "issues_synced": issues_synced, + "prs_synced": prs_synced, + "task_ids": task_ids, + } + + +@app.task(name=SYNC_ALL_GITHUB_REPOS) +def sync_all_github_repos() -> list[dict[str, Any]]: + """Trigger sync for all active GitHub repos.""" + with make_session() as session: + active_repos = ( + session.query(GithubRepo) + .join(GithubAccount) + .filter(GithubRepo.active, GithubAccount.active) + .all() + ) + + results = [ + { + "repo_id": repo.id, + "repo_path": repo.repo_path, + "task_id": sync_github_repo.delay(repo.id).id, + } + for repo in active_repos + ] + logger.info(f"Scheduled sync for {len(results)} active GitHub repos") + return results