github tracking

This commit is contained in:
mruwnik 2025-12-23 19:11:10 +00:00
parent 0cc1d7f6c7
commit f729122754
8 changed files with 1295 additions and 0 deletions

View File

@ -0,0 +1,182 @@
"""add github tracking
Revision ID: b7c8d9e0f1a2
Revises: a1b2c3d4e5f6
Create Date: 2025-12-23 12:00:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = "b7c8d9e0f1a2"
down_revision: Union[str, None] = "a1b2c3d4e5f6"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Create github_accounts table
op.create_table(
"github_accounts",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("name", sa.Text(), nullable=False),
sa.Column("auth_type", sa.Text(), nullable=False),
sa.Column("access_token", sa.Text(), nullable=True),
sa.Column("app_id", sa.BigInteger(), nullable=True),
sa.Column("installation_id", sa.BigInteger(), nullable=True),
sa.Column("private_key", sa.Text(), nullable=True),
sa.Column("active", sa.Boolean(), server_default="true", nullable=False),
sa.Column("last_sync_at", sa.DateTime(timezone=True), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.PrimaryKeyConstraint("id"),
sa.CheckConstraint("auth_type IN ('pat', 'app')"),
)
op.create_index(
"github_accounts_active_idx", "github_accounts", ["active", "last_sync_at"]
)
# Create github_repos table
op.create_table(
"github_repos",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("account_id", sa.BigInteger(), nullable=False),
sa.Column("owner", sa.Text(), nullable=False),
sa.Column("name", sa.Text(), nullable=False),
sa.Column("track_issues", sa.Boolean(), server_default="true", nullable=False),
sa.Column("track_prs", sa.Boolean(), server_default="true", nullable=False),
sa.Column(
"track_comments", sa.Boolean(), server_default="true", nullable=False
),
sa.Column(
"track_project_fields", sa.Boolean(), server_default="false", nullable=False
),
sa.Column(
"labels_filter", sa.ARRAY(sa.Text()), server_default="{}", nullable=False
),
sa.Column("state_filter", sa.Text(), nullable=True),
sa.Column("tags", sa.ARRAY(sa.Text()), server_default="{}", nullable=False),
sa.Column("check_interval", sa.Integer(), server_default="60", nullable=False),
sa.Column("last_sync_at", sa.DateTime(timezone=True), nullable=True),
sa.Column(
"full_sync_interval", sa.Integer(), server_default="1440", nullable=False
),
sa.Column("last_full_sync_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("active", sa.Boolean(), server_default="true", nullable=False),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["account_id"], ["github_accounts.id"], ondelete="CASCADE"
),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint(
"account_id", "owner", "name", name="unique_repo_per_account"
),
)
op.create_index(
"github_repos_active_idx", "github_repos", ["active", "last_sync_at"]
)
op.create_index("github_repos_owner_name_idx", "github_repos", ["owner", "name"])
# Add new columns to github_item table
op.add_column(
"github_item",
sa.Column("github_updated_at", sa.DateTime(timezone=True), nullable=True),
)
op.add_column(
"github_item",
sa.Column("content_hash", sa.Text(), nullable=True),
)
op.add_column(
"github_item",
sa.Column("repo_id", sa.BigInteger(), nullable=True),
)
op.add_column(
"github_item",
sa.Column("project_status", sa.Text(), nullable=True),
)
op.add_column(
"github_item",
sa.Column("project_priority", sa.Text(), nullable=True),
)
op.add_column(
"github_item",
sa.Column("project_fields", postgresql.JSONB(), nullable=True),
)
op.add_column(
"github_item",
sa.Column("assignees", sa.ARRAY(sa.Text()), nullable=True),
)
op.add_column(
"github_item",
sa.Column("milestone", sa.Text(), nullable=True),
)
op.add_column(
"github_item",
sa.Column("comment_count", sa.Integer(), nullable=True),
)
# Add foreign key and indexes for github_item
op.create_foreign_key(
"fk_github_item_repo",
"github_item",
"github_repos",
["repo_id"],
["id"],
ondelete="SET NULL",
)
op.create_index("gh_github_updated_at_idx", "github_item", ["github_updated_at"])
op.create_index("gh_repo_id_idx", "github_item", ["repo_id"])
def downgrade() -> None:
# Drop indexes and foreign key from github_item
op.drop_index("gh_repo_id_idx", table_name="github_item")
op.drop_index("gh_github_updated_at_idx", table_name="github_item")
op.drop_constraint("fk_github_item_repo", "github_item", type_="foreignkey")
# Drop new columns from github_item
op.drop_column("github_item", "comment_count")
op.drop_column("github_item", "milestone")
op.drop_column("github_item", "assignees")
op.drop_column("github_item", "project_fields")
op.drop_column("github_item", "project_priority")
op.drop_column("github_item", "project_status")
op.drop_column("github_item", "repo_id")
op.drop_column("github_item", "content_hash")
op.drop_column("github_item", "github_updated_at")
# Drop github_repos table
op.drop_index("github_repos_owner_name_idx", table_name="github_repos")
op.drop_index("github_repos_active_idx", table_name="github_repos")
op.drop_table("github_repos")
# Drop github_accounts table
op.drop_index("github_accounts_active_idx", table_name="github_accounts")
op.drop_table("github_accounts")

View File

@ -14,6 +14,9 @@ from memory.common.db.models import (
BookSection, BookSection,
Chunk, Chunk,
Comic, Comic,
GithubAccount,
GithubItem,
GithubRepo,
MCPServer, MCPServer,
DiscordMessage, DiscordMessage,
EmailAccount, EmailAccount,
@ -337,6 +340,61 @@ class ScheduledLLMCallAdmin(ModelView, model=ScheduledLLMCall):
column_sortable_list = ["executed_at", "scheduled_time", "created_at", "updated_at"] column_sortable_list = ["executed_at", "scheduled_time", "created_at", "updated_at"]
class GithubAccountAdmin(ModelView, model=GithubAccount):
column_list = [
"id",
"name",
"auth_type",
"active",
"last_sync_at",
"created_at",
"updated_at",
]
column_searchable_list = ["name", "id"]
# Hide sensitive columns from display
column_exclude_list = ["access_token", "private_key"]
form_excluded_columns = ["repos"]
class GithubRepoAdmin(ModelView, model=GithubRepo):
column_list = [
"id",
"account",
"owner",
"name",
"track_issues",
"track_prs",
"track_comments",
"track_project_fields",
"labels_filter",
"tags",
"check_interval",
"full_sync_interval",
"active",
"last_sync_at",
"last_full_sync_at",
"created_at",
]
column_searchable_list = ["owner", "name", "id"]
class GithubItemAdmin(ModelView, model=GithubItem):
column_list = source_columns(
GithubItem,
"kind",
"repo_path",
"number",
"title",
"state",
"author",
"labels",
"github_updated_at",
"project_status",
)
column_searchable_list = ["title", "repo_path", "author", "id", "number"]
column_sortable_list = ["github_updated_at", "created_at"]
def setup_admin(admin: Admin): def setup_admin(admin: Admin):
"""Add all admin views to the admin instance with OAuth protection.""" """Add all admin views to the admin instance with OAuth protection."""
admin.add_view(SourceItemAdmin) admin.add_view(SourceItemAdmin)
@ -361,3 +419,6 @@ def setup_admin(admin: Admin):
admin.add_view(DiscordChannelAdmin) admin.add_view(DiscordChannelAdmin)
admin.add_view(MCPServerAdmin) admin.add_view(MCPServerAdmin)
admin.add_view(ScheduledLLMCallAdmin) admin.add_view(ScheduledLLMCallAdmin)
admin.add_view(GithubAccountAdmin)
admin.add_view(GithubRepoAdmin)
admin.add_view(GithubItemAdmin)

View File

@ -14,6 +14,7 @@ OBSERVATIONS_ROOT = "memory.workers.tasks.observations"
SCHEDULED_CALLS_ROOT = "memory.workers.tasks.scheduled_calls" SCHEDULED_CALLS_ROOT = "memory.workers.tasks.scheduled_calls"
DISCORD_ROOT = "memory.workers.tasks.discord" DISCORD_ROOT = "memory.workers.tasks.discord"
BACKUP_ROOT = "memory.workers.tasks.backup" BACKUP_ROOT = "memory.workers.tasks.backup"
GITHUB_ROOT = "memory.workers.tasks.github"
ADD_DISCORD_MESSAGE = f"{DISCORD_ROOT}.add_discord_message" ADD_DISCORD_MESSAGE = f"{DISCORD_ROOT}.add_discord_message"
EDIT_DISCORD_MESSAGE = f"{DISCORD_ROOT}.edit_discord_message" EDIT_DISCORD_MESSAGE = f"{DISCORD_ROOT}.edit_discord_message"
PROCESS_DISCORD_MESSAGE = f"{DISCORD_ROOT}.process_discord_message" PROCESS_DISCORD_MESSAGE = f"{DISCORD_ROOT}.process_discord_message"
@ -60,6 +61,11 @@ RUN_SCHEDULED_CALLS = f"{SCHEDULED_CALLS_ROOT}.run_scheduled_calls"
BACKUP_PATH = f"{BACKUP_ROOT}.backup_path" BACKUP_PATH = f"{BACKUP_ROOT}.backup_path"
BACKUP_ALL = f"{BACKUP_ROOT}.backup_all" BACKUP_ALL = f"{BACKUP_ROOT}.backup_all"
# GitHub tasks
SYNC_GITHUB_REPO = f"{GITHUB_ROOT}.sync_github_repo"
SYNC_ALL_GITHUB_REPOS = f"{GITHUB_ROOT}.sync_all_github_repos"
SYNC_GITHUB_ITEM = f"{GITHUB_ROOT}.sync_github_item"
def get_broker_url() -> str: def get_broker_url() -> str:
protocol = settings.CELERY_BROKER_TYPE protocol = settings.CELERY_BROKER_TYPE
@ -115,6 +121,7 @@ app.conf.update(
"queue": f"{settings.CELERY_QUEUE_PREFIX}-scheduler" "queue": f"{settings.CELERY_QUEUE_PREFIX}-scheduler"
}, },
f"{BACKUP_ROOT}.*": {"queue": f"{settings.CELERY_QUEUE_PREFIX}-backup"}, f"{BACKUP_ROOT}.*": {"queue": f"{settings.CELERY_QUEUE_PREFIX}-backup"},
f"{GITHUB_ROOT}.*": {"queue": f"{settings.CELERY_QUEUE_PREFIX}-github"},
}, },
) )

View File

@ -50,6 +50,8 @@ from memory.common.db.models.sources import (
Book, Book,
ArticleFeed, ArticleFeed,
EmailAccount, EmailAccount,
GithubAccount,
GithubRepo,
) )
from memory.common.db.models.users import ( from memory.common.db.models.users import (
User, User,
@ -107,6 +109,8 @@ __all__ = [
"Book", "Book",
"ArticleFeed", "ArticleFeed",
"EmailAccount", "EmailAccount",
"GithubAccount",
"GithubRepo",
"DiscordServer", "DiscordServer",
"DiscordChannel", "DiscordChannel",
"DiscordUser", "DiscordUser",

View File

@ -824,6 +824,23 @@ class GithubItem(SourceItem):
payload = Column(JSONB) payload = Column(JSONB)
# New fields for change detection and tracking
github_updated_at = Column(DateTime(timezone=True)) # GitHub's updated_at
content_hash = Column(Text) # Hash of body + comments for change detection
repo_id = Column(
BigInteger, ForeignKey("github_repos.id", ondelete="SET NULL"), nullable=True
)
# GitHub Projects v2 fields
project_status = Column(Text, nullable=True)
project_priority = Column(Text, nullable=True)
project_fields = Column(JSONB, nullable=True) # All project field values
# Additional tracking
assignees = Column(ARRAY(Text), nullable=True)
milestone = Column(Text, nullable=True)
comment_count = Column(Integer, nullable=True)
__mapper_args__ = { __mapper_args__ = {
"polymorphic_identity": "github_item", "polymorphic_identity": "github_item",
} }
@ -833,6 +850,8 @@ class GithubItem(SourceItem):
Index("gh_repo_kind_idx", "repo_path", "kind"), Index("gh_repo_kind_idx", "repo_path", "kind"),
Index("gh_issue_lookup_idx", "repo_path", "kind", "number"), Index("gh_issue_lookup_idx", "repo_path", "kind", "number"),
Index("gh_labels_idx", "labels", postgresql_using="gin"), Index("gh_labels_idx", "labels", postgresql_using="gin"),
Index("gh_github_updated_at_idx", "github_updated_at"),
Index("gh_repo_id_idx", "repo_id"),
) )

View File

@ -8,12 +8,14 @@ from sqlalchemy import (
ARRAY, ARRAY,
BigInteger, BigInteger,
Boolean, Boolean,
CheckConstraint,
Column, Column,
DateTime, DateTime,
ForeignKey, ForeignKey,
Index, Index,
Integer, Integer,
Text, Text,
UniqueConstraint,
func, func,
) )
from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.dialects.postgresql import JSONB
@ -125,3 +127,102 @@ class EmailAccount(Base):
Index("email_accounts_active_idx", "active", "last_sync_at"), Index("email_accounts_active_idx", "active", "last_sync_at"),
Index("email_accounts_tags_idx", "tags", postgresql_using="gin"), Index("email_accounts_tags_idx", "tags", postgresql_using="gin"),
) )
class GithubAccount(Base):
"""GitHub authentication credentials for API access."""
__tablename__ = "github_accounts"
id = Column(BigInteger, primary_key=True)
name = Column(Text, nullable=False) # Display name
# Authentication - support both PAT and GitHub App
auth_type = Column(Text, nullable=False) # 'pat' or 'app'
# For Personal Access Token auth
access_token = Column(Text, nullable=True) # PAT
# For GitHub App auth
app_id = Column(BigInteger, nullable=True)
installation_id = Column(BigInteger, nullable=True)
private_key = Column(Text, nullable=True) # PEM key
# Status
active = Column(Boolean, nullable=False, server_default="true")
last_sync_at = Column(DateTime(timezone=True), nullable=True)
created_at = Column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
updated_at = Column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
# Relationship to repos
repos = relationship(
"GithubRepo", back_populates="account", cascade="all, delete-orphan"
)
__table_args__ = (
CheckConstraint("auth_type IN ('pat', 'app')"),
Index("github_accounts_active_idx", "active", "last_sync_at"),
)
class GithubRepo(Base):
"""Tracked GitHub repository configuration."""
__tablename__ = "github_repos"
id = Column(BigInteger, primary_key=True)
account_id = Column(
BigInteger, ForeignKey("github_accounts.id", ondelete="CASCADE"), nullable=False
)
# Repository identification
owner = Column(Text, nullable=False) # org or user
name = Column(Text, nullable=False) # repo name
# What to track
track_issues = Column(Boolean, nullable=False, server_default="true")
track_prs = Column(Boolean, nullable=False, server_default="true")
track_comments = Column(Boolean, nullable=False, server_default="true")
track_project_fields = Column(Boolean, nullable=False, server_default="false")
# Filtering
labels_filter = Column(
ARRAY(Text), nullable=False, server_default="{}"
) # Empty = all labels
state_filter = Column(Text, nullable=True) # 'open', 'closed', or None for all
# Tags to apply to all items from this repo
tags = Column(ARRAY(Text), nullable=False, server_default="{}")
# Sync configuration
check_interval = Column(Integer, nullable=False, server_default="60") # Minutes
last_sync_at = Column(DateTime(timezone=True), nullable=True)
# Full sync interval for catching project field changes (minutes, 0 = disabled)
full_sync_interval = Column(Integer, nullable=False, server_default="1440") # Daily
last_full_sync_at = Column(DateTime(timezone=True), nullable=True)
# Status
active = Column(Boolean, nullable=False, server_default="true")
created_at = Column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
updated_at = Column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
# Relationships
account = relationship("GithubAccount", back_populates="repos")
__table_args__ = (
UniqueConstraint("account_id", "owner", "name", name="unique_repo_per_account"),
Index("github_repos_active_idx", "active", "last_sync_at"),
Index("github_repos_owner_name_idx", "owner", "name"),
)
@property
def repo_path(self) -> str:
return f"{self.owner}/{self.name}"

View File

@ -0,0 +1,531 @@
"""GitHub API client for fetching issues, PRs, comments, and project fields."""
import hashlib
import logging
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Generator, TypedDict
import requests
logger = logging.getLogger(__name__)
# GitHub REST API base URL
GITHUB_API_URL = "https://api.github.com"
GITHUB_GRAPHQL_URL = "https://api.github.com/graphql"
# Rate limit handling
RATE_LIMIT_REMAINING_HEADER = "X-RateLimit-Remaining"
RATE_LIMIT_RESET_HEADER = "X-RateLimit-Reset"
MIN_RATE_LIMIT_REMAINING = 10
@dataclass
class GithubCredentials:
"""Credentials for GitHub API access."""
auth_type: str # 'pat' or 'app'
access_token: str | None = None
app_id: int | None = None
installation_id: int | None = None
private_key: str | None = None
class GithubComment(TypedDict):
"""A comment on an issue or PR."""
id: int
author: str
body: str
created_at: str
updated_at: str
class GithubIssueData(TypedDict):
"""Parsed issue/PR data ready for storage."""
kind: str # 'issue' or 'pr'
number: int
title: str
body: str
state: str
author: str
labels: list[str]
assignees: list[str]
milestone: str | None
created_at: datetime
closed_at: datetime | None
merged_at: datetime | None # PRs only
github_updated_at: datetime
comment_count: int
comments: list[GithubComment]
diff_summary: str | None # PRs only
project_fields: dict[str, Any] | None
content_hash: str
def parse_github_date(date_str: str | None) -> datetime | None:
"""Parse ISO date string from GitHub API to datetime."""
if not date_str:
return None
# GitHub uses ISO format with Z suffix
return datetime.fromisoformat(date_str.replace("Z", "+00:00"))
def compute_content_hash(body: str, comments: list[GithubComment]) -> str:
"""Compute SHA256 hash of issue/PR content for change detection."""
content_parts = [body or ""]
for comment in comments:
content_parts.append(comment["body"])
return hashlib.sha256("\n".join(content_parts).encode()).hexdigest()
class GithubClient:
"""Client for GitHub REST and GraphQL APIs."""
def __init__(self, credentials: GithubCredentials):
self.credentials = credentials
self.session = requests.Session()
self._setup_auth()
def _setup_auth(self) -> None:
if self.credentials.auth_type == "pat":
self.session.headers["Authorization"] = (
f"Bearer {self.credentials.access_token}"
)
elif self.credentials.auth_type == "app":
# Generate JWT and get installation token
token = self._get_installation_token()
self.session.headers["Authorization"] = f"Bearer {token}"
self.session.headers["Accept"] = "application/vnd.github+json"
self.session.headers["X-GitHub-Api-Version"] = "2022-11-28"
self.session.headers["User-Agent"] = "memory-kb-github-sync"
def _get_installation_token(self) -> str:
"""Get installation access token for GitHub App."""
try:
import jwt
except ImportError:
raise ImportError("PyJWT is required for GitHub App authentication")
if not self.credentials.app_id or not self.credentials.private_key:
raise ValueError("app_id and private_key required for app auth")
if not self.credentials.installation_id:
raise ValueError("installation_id required for app auth")
now = int(time.time())
payload = {
"iat": now - 60,
"exp": now + 600,
"iss": self.credentials.app_id,
}
jwt_token = jwt.encode(
payload, self.credentials.private_key, algorithm="RS256"
)
response = requests.post(
f"{GITHUB_API_URL}/app/installations/{self.credentials.installation_id}/access_tokens",
headers={
"Authorization": f"Bearer {jwt_token}",
"Accept": "application/vnd.github+json",
},
timeout=30,
)
response.raise_for_status()
return response.json()["token"]
def _handle_rate_limit(self, response: requests.Response) -> None:
"""Check rate limits and sleep if necessary."""
remaining = int(response.headers.get(RATE_LIMIT_REMAINING_HEADER, 100))
if remaining < MIN_RATE_LIMIT_REMAINING:
reset_time = int(response.headers.get(RATE_LIMIT_RESET_HEADER, 0))
sleep_time = max(reset_time - time.time(), 0) + 1
logger.warning(f"Rate limit low ({remaining}), sleeping for {sleep_time}s")
time.sleep(sleep_time)
def fetch_issues(
self,
owner: str,
repo: str,
since: datetime | None = None,
state: str = "all",
labels: list[str] | None = None,
) -> Generator[GithubIssueData, None, None]:
"""Fetch issues from a repository with pagination."""
params: dict[str, Any] = {
"state": state,
"sort": "updated",
"direction": "desc",
"per_page": 100,
}
if since:
params["since"] = since.isoformat()
if labels:
params["labels"] = ",".join(labels)
page = 1
while True:
params["page"] = page
response = self.session.get(
f"{GITHUB_API_URL}/repos/{owner}/{repo}/issues",
params=params,
timeout=30,
)
response.raise_for_status()
self._handle_rate_limit(response)
issues = response.json()
if not issues:
break
for issue in issues:
# Skip PRs (they're included in issues endpoint)
if "pull_request" in issue:
continue
yield self._parse_issue(owner, repo, issue)
page += 1
def fetch_prs(
self,
owner: str,
repo: str,
since: datetime | None = None,
state: str = "all",
) -> Generator[GithubIssueData, None, None]:
"""Fetch pull requests from a repository with pagination."""
params: dict[str, Any] = {
"state": state,
"sort": "updated",
"direction": "desc",
"per_page": 100,
}
page = 1
while True:
params["page"] = page
response = self.session.get(
f"{GITHUB_API_URL}/repos/{owner}/{repo}/pulls",
params=params,
timeout=30,
)
response.raise_for_status()
self._handle_rate_limit(response)
prs = response.json()
if not prs:
break
for pr in prs:
updated_at = parse_github_date(pr["updated_at"])
if since and updated_at and updated_at < since:
return # Stop if we've gone past our since date
yield self._parse_pr(owner, repo, pr)
page += 1
def fetch_comments(
self,
owner: str,
repo: str,
issue_number: int,
) -> list[GithubComment]:
"""Fetch all comments for an issue/PR."""
comments: list[GithubComment] = []
page = 1
while True:
response = self.session.get(
f"{GITHUB_API_URL}/repos/{owner}/{repo}/issues/{issue_number}/comments",
params={"page": page, "per_page": 100},
timeout=30,
)
response.raise_for_status()
self._handle_rate_limit(response)
page_comments = response.json()
if not page_comments:
break
comments.extend(
[
GithubComment(
id=c["id"],
author=c["user"]["login"] if c.get("user") else "ghost",
body=c.get("body", ""),
created_at=c["created_at"],
updated_at=c["updated_at"],
)
for c in page_comments
]
)
page += 1
return comments
def fetch_project_fields(
self,
owner: str,
repo: str,
issue_number: int,
) -> dict[str, Any] | None:
"""Fetch GitHub Projects v2 field values using GraphQL."""
query = """
query($owner: String!, $repo: String!, $number: Int!) {
repository(owner: $owner, name: $repo) {
issue(number: $number) {
projectItems(first: 10) {
nodes {
project { title }
fieldValues(first: 20) {
nodes {
... on ProjectV2ItemFieldTextValue {
text
field { ... on ProjectV2Field { name } }
}
... on ProjectV2ItemFieldNumberValue {
number
field { ... on ProjectV2Field { name } }
}
... on ProjectV2ItemFieldDateValue {
date
field { ... on ProjectV2Field { name } }
}
... on ProjectV2ItemFieldSingleSelectValue {
name
field { ... on ProjectV2SingleSelectField { name } }
}
... on ProjectV2ItemFieldIterationValue {
title
field { ... on ProjectV2IterationField { name } }
}
}
}
}
}
}
}
}
"""
try:
response = self.session.post(
GITHUB_GRAPHQL_URL,
json={
"query": query,
"variables": {"owner": owner, "repo": repo, "number": issue_number},
},
timeout=30,
)
response.raise_for_status()
except requests.RequestException as e:
logger.warning(f"Failed to fetch project fields: {e}")
return None
data = response.json()
if "errors" in data:
logger.warning(f"GraphQL errors: {data['errors']}")
return None
# Parse project fields
issue_data = (
data.get("data", {}).get("repository", {}).get("issue", {})
)
if not issue_data:
return None
items = issue_data.get("projectItems", {}).get("nodes", [])
if not items:
return None
fields: dict[str, Any] = {}
for item in items:
project_name = item.get("project", {}).get("title", "unknown")
for field_value in item.get("fieldValues", {}).get("nodes", []):
field_info = field_value.get("field", {})
field_name = field_info.get("name") if field_info else None
if not field_name:
continue
# Extract value based on type
value = (
field_value.get("text")
or field_value.get("number")
or field_value.get("date")
or field_value.get("name") # Single select
or field_value.get("title") # Iteration
)
if value is not None:
fields[f"{project_name}.{field_name}"] = value
return fields if fields else None
def fetch_pr_project_fields(
self,
owner: str,
repo: str,
pr_number: int,
) -> dict[str, Any] | None:
"""Fetch GitHub Projects v2 field values for a PR using GraphQL."""
query = """
query($owner: String!, $repo: String!, $number: Int!) {
repository(owner: $owner, name: $repo) {
pullRequest(number: $number) {
projectItems(first: 10) {
nodes {
project { title }
fieldValues(first: 20) {
nodes {
... on ProjectV2ItemFieldTextValue {
text
field { ... on ProjectV2Field { name } }
}
... on ProjectV2ItemFieldNumberValue {
number
field { ... on ProjectV2Field { name } }
}
... on ProjectV2ItemFieldDateValue {
date
field { ... on ProjectV2Field { name } }
}
... on ProjectV2ItemFieldSingleSelectValue {
name
field { ... on ProjectV2SingleSelectField { name } }
}
... on ProjectV2ItemFieldIterationValue {
title
field { ... on ProjectV2IterationField { name } }
}
}
}
}
}
}
}
}
"""
try:
response = self.session.post(
GITHUB_GRAPHQL_URL,
json={
"query": query,
"variables": {"owner": owner, "repo": repo, "number": pr_number},
},
timeout=30,
)
response.raise_for_status()
except requests.RequestException as e:
logger.warning(f"Failed to fetch PR project fields: {e}")
return None
data = response.json()
if "errors" in data:
logger.warning(f"GraphQL errors: {data['errors']}")
return None
# Parse project fields
pr_data = (
data.get("data", {}).get("repository", {}).get("pullRequest", {})
)
if not pr_data:
return None
items = pr_data.get("projectItems", {}).get("nodes", [])
if not items:
return None
fields: dict[str, Any] = {}
for item in items:
project_name = item.get("project", {}).get("title", "unknown")
for field_value in item.get("fieldValues", {}).get("nodes", []):
field_info = field_value.get("field", {})
field_name = field_info.get("name") if field_info else None
if not field_name:
continue
value = (
field_value.get("text")
or field_value.get("number")
or field_value.get("date")
or field_value.get("name")
or field_value.get("title")
)
if value is not None:
fields[f"{project_name}.{field_name}"] = value
return fields if fields else None
def _parse_issue(
self, owner: str, repo: str, issue: dict[str, Any]
) -> GithubIssueData:
"""Parse raw issue data into structured format."""
comments = self.fetch_comments(owner, repo, issue["number"])
body = issue.get("body") or ""
return GithubIssueData(
kind="issue",
number=issue["number"],
title=issue["title"],
body=body,
state=issue["state"],
author=issue["user"]["login"] if issue.get("user") else "ghost",
labels=[label["name"] for label in issue.get("labels", [])],
assignees=[a["login"] for a in issue.get("assignees", [])],
milestone=(
issue["milestone"]["title"] if issue.get("milestone") else None
),
created_at=parse_github_date(issue["created_at"]), # type: ignore
closed_at=parse_github_date(issue.get("closed_at")),
merged_at=None,
github_updated_at=parse_github_date(issue["updated_at"]), # type: ignore
comment_count=len(comments),
comments=comments,
diff_summary=None,
project_fields=None, # Fetched separately if enabled
content_hash=compute_content_hash(body, comments),
)
def _parse_pr(
self, owner: str, repo: str, pr: dict[str, Any]
) -> GithubIssueData:
"""Parse raw PR data into structured format."""
comments = self.fetch_comments(owner, repo, pr["number"])
body = pr.get("body") or ""
# Get diff summary (truncated)
diff_summary = None
if diff_url := pr.get("diff_url"):
try:
diff_response = self.session.get(diff_url, timeout=30)
if diff_response.ok:
diff_summary = diff_response.text[:5000] # Truncate large diffs
except Exception as e:
logger.warning(f"Failed to fetch diff: {e}")
return GithubIssueData(
kind="pr",
number=pr["number"],
title=pr["title"],
body=body,
state=pr["state"],
author=pr["user"]["login"] if pr.get("user") else "ghost",
labels=[label["name"] for label in pr.get("labels", [])],
assignees=[a["login"] for a in pr.get("assignees", [])],
milestone=pr["milestone"]["title"] if pr.get("milestone") else None,
created_at=parse_github_date(pr["created_at"]), # type: ignore
closed_at=parse_github_date(pr.get("closed_at")),
merged_at=parse_github_date(pr.get("merged_at")),
github_updated_at=parse_github_date(pr["updated_at"]), # type: ignore
comment_count=len(comments),
comments=comments,
diff_summary=diff_summary,
project_fields=None, # Fetched separately if enabled
content_hash=compute_content_hash(body, comments),
)

View File

@ -0,0 +1,390 @@
"""Celery tasks for GitHub issue/PR syncing."""
import logging
from datetime import datetime, timedelta, timezone
from typing import Any, cast
from memory.common import qdrant
from memory.common.celery_app import (
app,
SYNC_GITHUB_REPO,
SYNC_ALL_GITHUB_REPOS,
SYNC_GITHUB_ITEM,
)
from memory.common.db.connection import make_session
from memory.common.db.models import GithubItem
from memory.common.db.models.sources import GithubAccount, GithubRepo
from memory.parsers.github import (
GithubClient,
GithubCredentials,
GithubIssueData,
)
from memory.workers.tasks.content_processing import (
create_content_hash,
create_task_result,
process_content_item,
safe_task_execution,
)
logger = logging.getLogger(__name__)
def _build_content(issue_data: GithubIssueData) -> str:
"""Build searchable content from issue/PR data."""
content_parts = [f"# {issue_data['title']}", issue_data["body"]]
for comment in issue_data["comments"]:
content_parts.append(f"\n---\n**{comment['author']}**: {comment['body']}")
return "\n\n".join(content_parts)
def _create_github_item(
repo: GithubRepo,
issue_data: GithubIssueData,
) -> GithubItem:
"""Create a GithubItem from parsed issue/PR data."""
content = _build_content(issue_data)
# Extract project status/priority if available
project_fields = issue_data.get("project_fields") or {}
project_status = None
project_priority = None
for key, value in project_fields.items():
key_lower = key.lower()
if "status" in key_lower and project_status is None:
project_status = str(value)
elif "priority" in key_lower and project_priority is None:
project_priority = str(value)
repo_tags = cast(list[str], repo.tags) or []
return GithubItem(
modality="text",
sha256=create_content_hash(content),
content=content,
kind=issue_data["kind"],
repo_path=repo.repo_path,
repo_id=repo.id,
number=issue_data["number"],
title=issue_data["title"],
state=issue_data["state"],
author=issue_data["author"],
labels=issue_data["labels"],
assignees=issue_data["assignees"],
milestone=issue_data["milestone"],
created_at=issue_data["created_at"],
closed_at=issue_data["closed_at"],
merged_at=issue_data["merged_at"],
github_updated_at=issue_data["github_updated_at"],
comment_count=issue_data["comment_count"],
diff_summary=issue_data["diff_summary"],
content_hash=issue_data["content_hash"],
project_status=project_status,
project_priority=project_priority,
project_fields=project_fields if project_fields else None,
tags=repo_tags + issue_data["labels"],
size=len(content.encode("utf-8")),
mime_type="text/markdown",
)
def _needs_reindex(existing: GithubItem, new_data: GithubIssueData) -> bool:
"""Check if an existing item needs reindexing based on content changes."""
# Compare content hash
if existing.content_hash != new_data["content_hash"]:
return True
# Check if github_updated_at is newer
existing_updated = cast(datetime | None, existing.github_updated_at)
if existing_updated and new_data["github_updated_at"] > existing_updated:
return True
# Check project fields changes
existing_fields = cast(dict | None, existing.project_fields) or {}
new_fields = new_data.get("project_fields") or {}
if existing_fields != new_fields:
return True
return False
def _update_existing_item(
session: Any,
existing: GithubItem,
repo: GithubRepo,
issue_data: GithubIssueData,
) -> dict[str, Any]:
"""Update an existing GithubItem and reindex if content changed."""
if not _needs_reindex(existing, issue_data):
return create_task_result(existing, "unchanged")
logger.info(
f"Content changed for {repo.repo_path}#{issue_data['number']}, reindexing"
)
# Delete old chunks from Qdrant
chunk_ids = [str(c.id) for c in existing.chunks if c.id]
if chunk_ids:
try:
client = qdrant.get_qdrant_client()
qdrant.delete_points(client, cast(str, existing.modality), chunk_ids)
except IOError as e:
logger.error(f"Error deleting chunks: {e}")
# Delete chunks from database
for chunk in existing.chunks:
session.delete(chunk)
# Update the existing item with new data
content = _build_content(issue_data)
existing.content = content # type: ignore
existing.sha256 = create_content_hash(content) # type: ignore
existing.title = issue_data["title"] # type: ignore
existing.state = issue_data["state"] # type: ignore
existing.labels = issue_data["labels"] # type: ignore
existing.assignees = issue_data["assignees"] # type: ignore
existing.milestone = issue_data["milestone"] # type: ignore
existing.closed_at = issue_data["closed_at"] # type: ignore
existing.merged_at = issue_data["merged_at"] # type: ignore
existing.github_updated_at = issue_data["github_updated_at"] # type: ignore
existing.comment_count = issue_data["comment_count"] # type: ignore
existing.diff_summary = issue_data["diff_summary"] # type: ignore
existing.content_hash = issue_data["content_hash"] # type: ignore
existing.size = len(content.encode("utf-8")) # type: ignore
# Update project fields
project_fields = issue_data.get("project_fields") or {}
existing.project_fields = project_fields if project_fields else None # type: ignore
for key, value in project_fields.items():
key_lower = key.lower()
if "status" in key_lower:
existing.project_status = str(value) # type: ignore
elif "priority" in key_lower:
existing.project_priority = str(value) # type: ignore
# Update tags
repo_tags = cast(list[str], repo.tags) or []
existing.tags = repo_tags + issue_data["labels"] # type: ignore
session.flush()
# Re-embed and push to Qdrant
return process_content_item(existing, session)
def _serialize_issue_data(data: GithubIssueData) -> dict[str, Any]:
"""Serialize GithubIssueData for Celery task passing."""
return {
**data,
"created_at": data["created_at"].isoformat() if data["created_at"] else None,
"closed_at": data["closed_at"].isoformat() if data["closed_at"] else None,
"merged_at": data["merged_at"].isoformat() if data["merged_at"] else None,
"github_updated_at": (
data["github_updated_at"].isoformat()
if data["github_updated_at"]
else None
),
"comments": [
{
"id": c["id"],
"author": c["author"],
"body": c["body"],
"created_at": c["created_at"],
"updated_at": c["updated_at"],
}
for c in data["comments"]
],
}
def _deserialize_issue_data(data: dict[str, Any]) -> GithubIssueData:
"""Deserialize issue data from Celery task."""
from memory.parsers.github import parse_github_date
return GithubIssueData(
kind=data["kind"],
number=data["number"],
title=data["title"],
body=data["body"],
state=data["state"],
author=data["author"],
labels=data["labels"],
assignees=data["assignees"],
milestone=data["milestone"],
created_at=parse_github_date(data["created_at"]), # type: ignore
closed_at=parse_github_date(data.get("closed_at")),
merged_at=parse_github_date(data.get("merged_at")),
github_updated_at=parse_github_date(data["github_updated_at"]), # type: ignore
comment_count=data["comment_count"],
comments=data["comments"],
diff_summary=data.get("diff_summary"),
project_fields=data.get("project_fields"),
content_hash=data["content_hash"],
)
@app.task(name=SYNC_GITHUB_ITEM)
@safe_task_execution
def sync_github_item(
repo_id: int,
issue_data_serialized: dict[str, Any],
) -> dict[str, Any]:
"""Sync a single GitHub issue or PR."""
issue_data = _deserialize_issue_data(issue_data_serialized)
logger.info(f"Syncing {issue_data['kind']} from repo {repo_id}: #{issue_data['number']}")
with make_session() as session:
repo = session.get(GithubRepo, repo_id)
if not repo:
return {"status": "error", "error": "Repo not found"}
# Check for existing item
existing = (
session.query(GithubItem)
.filter(
GithubItem.repo_path == repo.repo_path,
GithubItem.number == issue_data["number"],
GithubItem.kind == issue_data["kind"],
)
.first()
)
if existing:
return _update_existing_item(session, existing, repo, issue_data)
# Create new item
github_item = _create_github_item(repo, issue_data)
return process_content_item(github_item, session)
@app.task(name=SYNC_GITHUB_REPO)
@safe_task_execution
def sync_github_repo(repo_id: int, force_full: bool = False) -> dict[str, Any]:
"""Sync all issues and PRs for a repository."""
logger.info(f"Syncing GitHub repo {repo_id}")
with make_session() as session:
repo = session.get(GithubRepo, repo_id)
if not repo or not cast(bool, repo.active):
return {"status": "error", "error": "Repo not found or inactive"}
account = repo.account
if not account or not cast(bool, account.active):
return {"status": "error", "error": "Account not found or inactive"}
now = datetime.now(timezone.utc)
last_sync = cast(datetime | None, repo.last_sync_at)
last_full_sync = cast(datetime | None, repo.last_full_sync_at)
full_sync_interval = cast(int, repo.full_sync_interval)
track_project_fields = cast(bool, repo.track_project_fields)
# Determine if we need a full sync for project fields
# Only do this if track_project_fields is enabled and interval > 0
needs_full_sync = False
if track_project_fields and full_sync_interval > 0:
if last_full_sync is None:
needs_full_sync = True
elif now - last_full_sync >= timedelta(minutes=full_sync_interval):
needs_full_sync = True
# Check if sync is needed based on interval
if last_sync and not force_full and not needs_full_sync:
check_interval = cast(int, repo.check_interval)
if now - last_sync < timedelta(minutes=check_interval):
return {"status": "skipped_recent_check", "repo_id": repo_id}
# Create GitHub client
credentials = GithubCredentials(
auth_type=cast(str, account.auth_type),
access_token=cast(str | None, account.access_token),
app_id=cast(int | None, account.app_id),
installation_id=cast(int | None, account.installation_id),
private_key=cast(str | None, account.private_key),
)
client = GithubClient(credentials)
owner = cast(str, repo.owner)
name = cast(str, repo.name)
labels = cast(list[str], repo.labels_filter) or None
state = cast(str | None, repo.state_filter) or "all"
# For full syncs triggered by full_sync_interval, only sync open issues
# (closed issues rarely have project field changes that matter)
if needs_full_sync and not force_full:
since = None
state = "open"
logger.info(f"Performing full sync of open issues for {repo.repo_path}")
elif force_full:
since = None
else:
since = last_sync
issues_synced = 0
prs_synced = 0
task_ids = []
# Sync issues
if cast(bool, repo.track_issues):
for issue_data in client.fetch_issues(owner, name, since, state, labels):
# Fetch project fields if enabled
if track_project_fields:
issue_data["project_fields"] = client.fetch_project_fields(
owner, name, issue_data["number"]
)
serialized = _serialize_issue_data(issue_data)
task_id = sync_github_item.delay(repo.id, serialized)
task_ids.append(task_id.id)
issues_synced += 1
# Sync PRs
if cast(bool, repo.track_prs):
for pr_data in client.fetch_prs(owner, name, since, state):
# Fetch project fields if enabled
if track_project_fields:
pr_data["project_fields"] = client.fetch_pr_project_fields(
owner, name, pr_data["number"]
)
serialized = _serialize_issue_data(pr_data)
task_id = sync_github_item.delay(repo.id, serialized)
task_ids.append(task_id.id)
prs_synced += 1
# Update sync timestamps
repo.last_sync_at = now # type: ignore
if needs_full_sync or force_full:
repo.last_full_sync_at = now # type: ignore
session.commit()
return {
"status": "completed",
"sync_type": "full" if (needs_full_sync or force_full) else "incremental",
"repo_id": repo_id,
"repo_path": repo.repo_path,
"issues_synced": issues_synced,
"prs_synced": prs_synced,
"task_ids": task_ids,
}
@app.task(name=SYNC_ALL_GITHUB_REPOS)
def sync_all_github_repos() -> list[dict[str, Any]]:
"""Trigger sync for all active GitHub repos."""
with make_session() as session:
active_repos = (
session.query(GithubRepo)
.join(GithubAccount)
.filter(GithubRepo.active, GithubAccount.active)
.all()
)
results = [
{
"repo_id": repo.id,
"repo_path": repo.repo_path,
"task_id": sync_github_repo.delay(repo.id).id,
}
for repo in active_repos
]
logger.info(f"Scheduled sync for {len(results)} active GitHub repos")
return results