initial structure

This commit is contained in:
Daniel O'Connell 2025-04-27 14:31:53 +02:00
commit a003ada9b7
29 changed files with 1278 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
.env
.DS_Store
secrets/
acme.json
__pycache__/

229
db/schema.sql Normal file
View File

@ -0,0 +1,229 @@
/*========================================================================
Knowledge-Base schema first-run script
---------------------------------------------------------------
PostgreSQL 15+
Creates every table, index, trigger and helper in one pass
No ALTER statements or later migrations required
Enable pgcrypto for UUID helpers (safe to re-run)
========================================================================*/
-------------------------------------------------------------------------------
-- 0. EXTENSIONS
-------------------------------------------------------------------------------
CREATE EXTENSION IF NOT EXISTS pgcrypto; -- gen_random_uuid(), crypt()
-------------------------------------------------------------------------------
-- 1. CANONICAL ARTEFACT TABLE (everything points here)
-------------------------------------------------------------------------------
CREATE TABLE source_item (
id BIGSERIAL PRIMARY KEY,
modality TEXT NOT NULL, -- 'mail'|'chat'|...
sha256 BYTEA UNIQUE NOT NULL, -- 32-byte blob
inserted_at TIMESTAMPTZ DEFAULT NOW(),
tags TEXT[] NOT NULL DEFAULT '{}', -- flexible labels
lang TEXT, -- ISO-639-1 or NULL
model_hash TEXT, -- embedding model ver.
vector_ids TEXT[] NOT NULL DEFAULT '{}', -- 0-N Qdrant IDs
embed_status TEXT NOT NULL DEFAULT 'RAW'
CHECK (embed_status IN ('RAW','QUEUED','STORED','FAILED')),
byte_length INTEGER, -- original size
mime_type TEXT
);
CREATE INDEX source_modality_idx ON source_item (modality);
CREATE INDEX source_status_idx ON source_item (embed_status);
CREATE INDEX source_tags_idx ON source_item USING GIN (tags);
-- 1.a Trigger vector_ids must be present when status = STORED
CREATE OR REPLACE FUNCTION trg_vector_ids_not_empty()
RETURNS TRIGGER LANGUAGE plpgsql AS $$
BEGIN
IF NEW.embed_status = 'STORED'
AND (NEW.vector_ids IS NULL OR array_length(NEW.vector_ids,1) = 0) THEN
RAISE EXCEPTION
USING MESSAGE = 'vector_ids must not be empty when embed_status = STORED';
END IF;
RETURN NEW;
END;
$$;
CREATE TRIGGER check_vector_ids
BEFORE UPDATE ON source_item
FOR EACH ROW EXECUTE FUNCTION trg_vector_ids_not_empty();
-------------------------------------------------------------------------------
-- 2. MAIL MESSAGES
-------------------------------------------------------------------------------
CREATE TABLE mail_message (
id BIGSERIAL PRIMARY KEY,
source_id BIGINT NOT NULL REFERENCES source_item ON DELETE CASCADE,
message_id TEXT UNIQUE,
subject TEXT,
sender TEXT,
recipients TEXT[],
sent_at TIMESTAMPTZ,
body_raw TEXT,
attachments JSONB
);
CREATE INDEX mail_sent_idx ON mail_message (sent_at);
CREATE INDEX mail_recipients_idx ON mail_message USING GIN (recipients);
ALTER TABLE mail_message
ADD COLUMN tsv tsvector
GENERATED ALWAYS AS (
to_tsvector('english',
coalesce(subject,'') || ' ' || coalesce(body_raw,'')))
STORED;
CREATE INDEX mail_tsv_idx ON mail_message USING GIN (tsv);
-------------------------------------------------------------------------------
-- 3. CHAT (Slack / Discord)
-------------------------------------------------------------------------------
CREATE TABLE chat_message (
id BIGSERIAL PRIMARY KEY,
source_id BIGINT NOT NULL REFERENCES source_item ON DELETE CASCADE,
platform TEXT CHECK (platform IN ('slack','discord')),
channel_id TEXT,
author TEXT,
sent_at TIMESTAMPTZ,
body_raw TEXT
);
CREATE INDEX chat_channel_idx ON chat_message (platform, channel_id);
-------------------------------------------------------------------------------
-- 4. GIT COMMITS (local repos)
-------------------------------------------------------------------------------
CREATE TABLE git_commit (
id BIGSERIAL PRIMARY KEY,
source_id BIGINT NOT NULL REFERENCES source_item ON DELETE CASCADE,
repo_path TEXT,
commit_sha TEXT UNIQUE,
author_name TEXT,
author_email TEXT,
author_date TIMESTAMPTZ,
msg_raw TEXT,
diff_summary TEXT,
files_changed TEXT[]
);
CREATE INDEX git_files_idx ON git_commit USING GIN (files_changed);
CREATE INDEX git_date_idx ON git_commit (author_date);
-------------------------------------------------------------------------------
-- 5. PHOTOS
-------------------------------------------------------------------------------
CREATE TABLE photo (
id BIGSERIAL PRIMARY KEY,
source_id BIGINT NOT NULL REFERENCES source_item ON DELETE CASCADE,
file_path TEXT,
exif_taken_at TIMESTAMPTZ,
exif_lat NUMERIC(9,6),
exif_lon NUMERIC(9,6),
camera_make TEXT,
camera_model TEXT
);
CREATE INDEX photo_taken_idx ON photo (exif_taken_at);
-------------------------------------------------------------------------------
-- 6. BOOKS, BLOG POSTS, MISC DOCS
-------------------------------------------------------------------------------
CREATE TABLE book_doc (
id BIGSERIAL PRIMARY KEY,
source_id BIGINT NOT NULL REFERENCES source_item ON DELETE CASCADE,
title TEXT,
author TEXT,
chapter TEXT,
published DATE
);
CREATE TABLE blog_post (
id BIGSERIAL PRIMARY KEY,
source_id BIGINT NOT NULL REFERENCES source_item ON DELETE CASCADE,
url TEXT UNIQUE,
title TEXT,
published TIMESTAMPTZ
);
CREATE TABLE misc_doc (
id BIGSERIAL PRIMARY KEY,
source_id BIGINT NOT NULL REFERENCES source_item ON DELETE CASCADE,
path TEXT,
mime_type TEXT
);
-------------------------------------------------------------------------------
-- 6.5 RSS FEEDS
-------------------------------------------------------------------------------
CREATE TABLE rss_feeds (
id BIGSERIAL PRIMARY KEY,
url TEXT UNIQUE NOT NULL,
title TEXT,
description TEXT,
tags TEXT[] NOT NULL DEFAULT '{}',
last_checked_at TIMESTAMPTZ,
active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX rss_feeds_active_idx ON rss_feeds (active, last_checked_at);
CREATE INDEX rss_feeds_tags_idx ON rss_feeds USING GIN (tags);
-------------------------------------------------------------------------------
-- 7. GITHUB ITEMS (issues, PRs, comments, project cards)
-------------------------------------------------------------------------------
CREATE TYPE gh_item_kind AS ENUM ('issue','pr','comment','project_card');
CREATE TABLE github_item (
id BIGSERIAL PRIMARY KEY,
source_id BIGINT NOT NULL REFERENCES source_item ON DELETE CASCADE,
kind gh_item_kind NOT NULL,
repo_path TEXT NOT NULL, -- "owner/repo"
number INTEGER, -- issue/PR number (NULL for commit comment)
parent_number INTEGER, -- comment → its issue/PR
commit_sha TEXT, -- for commit comments
state TEXT, -- 'open'|'closed'|'merged'
title TEXT,
body_raw TEXT,
labels TEXT[],
author TEXT,
created_at TIMESTAMPTZ,
closed_at TIMESTAMPTZ,
merged_at TIMESTAMPTZ,
diff_summary TEXT, -- PR only
payload JSONB -- extra GitHub fields
);
CREATE INDEX gh_repo_kind_idx ON github_item (repo_path, kind);
CREATE INDEX gh_issue_lookup_idx ON github_item (repo_path, kind, number);
CREATE INDEX gh_labels_idx ON github_item USING GIN (labels);
CREATE INDEX gh_tsv_idx ON github_item
WHERE kind IN ('issue','pr')
USING GIN (to_tsvector('english',
coalesce(title,'') || ' ' || coalesce(body_raw,'')));
-------------------------------------------------------------------------------
-- 8. HELPER FUNCTION add tags
-------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION add_tags(p_source BIGINT, p_tags TEXT[])
RETURNS VOID LANGUAGE SQL AS $$
UPDATE source_item
SET tags =
(SELECT ARRAY(SELECT DISTINCT unnest(tags || p_tags)))
WHERE id = p_source;
$$;
-------------------------------------------------------------------------------
-- 9. (optional) PARTITION STUBS create per-year partitions later
-------------------------------------------------------------------------------
/*
-- example:
CREATE TABLE mail_message_2026 PARTITION OF mail_message
FOR VALUES FROM ('2026-01-01') TO ('2027-01-01');
*/
-- =========================================================================
-- Schema creation complete
-- =========================================================================

401
design/plan.md Normal file
View File

@ -0,0 +1,401 @@
# Personal Multimodal Knowledge-Base — Design Document v1.0
*(self-hosted, privacy-first)*
---
## 1 Purpose
Build a single private system that lets **one user** ask natural-language questions (text or image) and instantly search:
* E-mail (IMAP or local mbox)
* Slack & Discord messages
* Git commit history
* Photos (≈ 200 GB)
* Books (EPUB/PDF), important blog posts / RSS items
* Misc. documents & meeting transcripts
…and receive answers that combine the most relevant passages, pictures and commits.
---
## 2 High-level Architecture
```
┌──────────────────────────┐
Internet → │ Ingestion Workers │
│ (Celery queues) │
│ mail / chat / git / ... │
└─────┬──────────┬─────────┘
│TEXT │IMAGE/ATT
▼ ▼
┌──────────┐ ┌──────────┐
│Embedding │ │OCR / │
│Workers │ │Vision │
└────┬──────┘ └────┬─────┘
│ vectors │ captions/tags
▼ ▼
Postgres 15 (canonical) ◄───► Qdrant 1.9 (vectors)
• raw bodies / metadata • per-modality collections
• tags[] array, GIN index • payload filter inde# Personal Multimodal Knowledge-Base — **Design Document v1.1**
*(self-hosted, privacy-first; incorporates external feedback except the “LLaVA-speed” concern, which is intentionally ignored)*
---
## 1 Purpose
Provide a **single-user** system that answers natural-language questions about the owners entire digital corpus—e-mails, chats, code history, photos, books, blog posts, RSS items and ad-hoc documents—while keeping all data fully under personal control.
---
## 2 Target Workload & Service-Levels
| Metric | Year-1 | 5-year |
|--------|--------|--------|
| Text artefacts | ≈ 5 M | ≈ 25 M |
| Photos | ≈ 200 k (≈ 200 GB) | ≈ 600 k (≈ 600 GB) |
| Concurrency | 1 interactive seat + background jobs |
| **p95 answer latency** | ≤ 2 s (GPT-4o) |
| Uptime goal | “Home-lab” best-effort, but automatic recovery from single-component failures |
---
## 3 Hardware Specification ‼ BREAKING
| Component | Spec | Notes |
|-----------|------|-------|
| CPU | 8-core / 16-thread (NUC 13 Pro i7 or similar) |
| **RAM** | **32 GB ECC** |
| **GPU** | **Low-profile RTX A2000 (6 GB)** — accelerates CLIP & local LLaVA |
| Storage | 2 TB NVMe (data) + 2 TB SATA SSD (offline backup/ZFS snapshot target) |
| Power | ≈ 10 W idle, 55 W peak |
---
## 4 Software Stack
| Layer | Tech |
|-------|------|
| OS | Ubuntu 22.04 LTS (automatic security updates) |
| Container runtime | Docker 24 + docker-compose v2 |
| **Message broker** | RabbitMQ 3.13 (priority queues, DLQ) |
| Database | PostgreSQL 15 |
| Vector DB | Qdrant 1.9 |
| Task engine | Celery 5 (broker = RabbitMQ, result-backend = Postgres) |
| Web/API | FastAPI + Uvicorn |
| Back-end LLMs | GPT-4o (API) **and** optional on-device LLaVA-1.6-Q4 (GPU) |
| Embeddings | OpenAI *text-embedding-3-small* (1536 d) • OpenCLIP ViT-B/32 (512 d) |
---
## 5 Data Sources & Ingestion Queues
| Source | Trigger | Parser | Default **tags[]** |
|--------|---------|--------|--------------------|
| **E-mail IMAP** | UID poll 10 min | `imap_tools` | `work` if address ends `@corp.com` |
| **Slack** | Socket-mode WS | `slack_sdk` | `work` on `#proj-*` |
| **Discord** | Gateway WS | `discord.py` | `personal` |
| **Git** | `post-receive` hook / hourly fetch | `GitPython` + LLM diff summary | `work` if remote host in allow-list |
| **Photos** | `watchdog` folder | `Pillow`, EXIF; CLIP embed; FaceNet | `personal` unless GPS in office polygon |
| **Books (EPUB/PDF)** | Nightly folder scan | `ebooklib`, `pdfminer`, OCR | `reference` |
| **Blog / RSS** | `feedparser` 30 min | `trafilatura` | `reference` |
| **Misc docs / transcripts** | `watchdog` inbox | PDF→OCR, DOCX→txt, VTT→txt | deduced from path |
---
## 6 Data Model
### 6.1 PostgreSQL (tables share columns)
```sql
id bigserial primary key,
sha256 bytea unique,
inserted_at timestamptz default now(),
tags text[] not null default '{}', -- flexible labelling
lang text, -- detected language
body_raw text, -- TOAST/LZ4
vector_ids text[], -- 0-N vectors in Qdrant
model_hash text -- hash of embedding model
```
*GIN index on `tags`; range or JSONB indexes where relevant.*
### 6.2 Qdrant (collections)
| Collection | Model | Dim |
|------------|-------|-----|
| `mail`, `chat`, `git`, `book`, `blog`, `doc` | *text-embedding-3-small* | 1536 |
| `photo` | OpenCLIP ViT-B/32 | 512 |
Payload fields: `tags`, per-domain metadata (EXIF, author, files_changed[] …).
---
## 7 Task Queues & Concurrency
| Celery queue | Priority | Concurrency | Typical load |
|--------------|----------|-------------|--------------|
| `interactive` | 9 | auto (1 per core) | query embedding + GPT-4o calls |
| `medium_embed` | 5 | 4 | mail/chat embeddings |
| `low_ocr` | 2 | **≤ physical cores 2** | PDF/image OCR |
| `photo_embed_gpu` | 5 | GPU | CLIP image vectors |
| `git_summary` | 4 | 2 | LLM diff summaries |
| All queues have DLQ → `failed_tasks` exchange (RabbitMQ). |
---
## 8 Vector Consistency & Repair
* **Up-front write:** worker inserts into Postgres, then Qdrant; the returned `vector_id` is stored in `vector_ids[]`.
* **Audit Cron (5 min):**
* Find rows where `vector_ids = '{}'` or with `model_hash ≠ CURRENT_HASH`.
* Re-enqueue to appropriate embed queue.
* **Qdrant-centric diff (hourly):** dump collection IDs → compare against Postgres; orphans are deleted, missing vectors are re-enqueued.
* **Disaster Re-build:** documented script streams `id,chunk_text` to embed queue (rate-limited).
---
## 9 Embedding-Model Versioning
* Compute `MODEL_HASH = sha256(model_name + version + weights_SHA)` at worker start.
* Model change → hashes differ → audit cron flags rows → background re-embed queue.
* Router refuses to mix hashes unless `ALLOW_MIXED_MODE=1`.
---
## 10 Security Hardening
1. **JWT auth** on all API routes (HS256 secret in Docker secret store).
2. **Rate limiter** (`slowapi`): 60 req / min / IP.
3. **Filesystem isolation**
* Containers run as UID 1000, read-only bind mounts for `/photos`, `/books`.
4. **TLS everywhere** (Traefik + Lets Encrypt on LAN or Tailscale certs).
5. **Input sanitisation**: Markdown-escape bodies; regex filter for SSNs/credit-card patterns before LLM prompt.
6. **Resource quotas** in compose (`mem_limit`, `pids_limit`).
---
## 11 Backup & Restore
| Layer | Tool | Frequency | Storage cost (Glacier DA) |
|-------|------|-----------|---------------------------|
| Postgres basebackup + WAL | `pgBackRest` | nightly | included in dataset |
| Qdrant | `qdrant-backup` tar of collection dir | nightly | vectors + graph ≈ 20 GB year-5 |
| Files / attachments | Restic dedup | nightly | 400 GB -> ~€1.2 / mo |
| **Grandfather-father-son** pruning (`7-4-6`). |
Restore script: ① create fresh volumes, ② `pgbackrest restore`, ③ `qdrant-restore`, ④ run audit cron to verify.
---
## 12 Monitoring & Alerting
* **Prometheus exporters**
* node-exporter, postgres-exporter, rabbitmq-exporter, qdrant-exporter, cadvisor.
* **Grafana dashboards**: CPU, RAM, queue depth, DLQ count, GPT-4o latency.
* **Alertmanager rules**
* `vector_audit_missing > 500` → warn
* `node_filesystem_free_percent < 15` → critical
* `rabbitmq_queue_messages{queue="failed_tasks"} > 0` → critical
* `pg_up == 0` → critical
---
## 13 Query Flow
1. **Embed** user text with *text-embedding-3* and CLIP-text (one call each).
2. **Determine scope** from conversation memory (`tags = 'work'` etc.).
3. **Async search** each relevant collection (max 3 per batch) with payload filter.
4. **Merge** top-k by score.
5. Build **evidence JSON** (snippets, thumbnails, commit summaries).
6. **LLM**
* default: GPT-4o (vision) via API
* offline mode: local LLaVA-1.6 Q4 on GPU
7. Stream answer + thumbnails.
*Expected p95 latency on spec hardware: **~2 s** (cloud) | **~1.4 s** (local LLaVA).*
---
## 14 Internationalisation
* Tesseract language packs specified via `OCR_LANGS=eng+pol+deu`.
* `langdetect` sets `lang` column; router boosts same-language chunks.
---
## 15 Road-map
| Phase | Milestones |
|-------|------------|
| **0** | Hardware build, RabbitMQ, base compose up |
| **1** | Mail, chat, photo, git ingestion & audit loop |
| **2** | Backup scripts, security hardening, monitoring |
| **3** | Books/RSS/misc docs + international OCR |
| **4** | Tag-based multi-user RLS (optional) |
---
*End of Design Document v1.1*x
• LISTEN/NOTIFY queue
▲ ▲
│ │
└────────────┬─────────────────────┘
FastAPI “/chat”
(router + merge)
+ LangChain agent
+ GPT-4o or local LLaVA
```
*Everything runs in Docker-Compose on a low-power x86 mini-PC (NUC 11/12; 16 GB RAM, 1 TB NVMe).*
---
## 3 Data Sources & Ingestion
| Source | Trigger | Parser / Notes | Stored **tags** (default rules) |
|--------|---------|----------------|---------------------------------|
| **E-mail** (IMAP, mbox) | UID poll 10 min | `imap_tools`, strip quotes | `work` if address ends “@corp.com” |
| **Slack** | Socket-mode WS | `slack_sdk`, flatten blocks | `work` if channel `#proj-*` |
| **Discord** | Gateway WS | `discord.py`, role IDs | `personal` else |
| **Git commits** | `post-receive` hook or hourly fetch | `GitPython` → diff; 3-sentence summary via LLM | `work` if remote in `github.com/corp` |
| **Photos** | `watchdog` on folder | `Pillow`, EXIF; CLIP embed; FaceNet & optional YOLO tagger | `personal` unless GPS inside office |
| **Books** (EPUB/PDF) | Nightly scan of `/books` | `ebooklib` / `pdfminer` (+OCR) | `reference` |
| **Blog / RSS** | `feedparser` every 30 min | `trafilatura` HTML clean | `reference` |
| **Misc. docs / transcripts** | `watchdog` on `/kb-inbox` | PDF->OCR, DOCX→txt, VTT/SRT stitch | inferred from path (`/work/` etc.) |
---
## 4 Storage Model
### 4.1 PostgreSQL (system-of-record)
* Base tables: `mail_msg`, `chat_msg`, `git_commit`, `photo`, `book_doc`, `blog_post`, `misc_doc`, `attachment`.
* Common columns: `id bigserial`, `sha256 bytea`, `inserted_at timestamptz`, `tags text[] NOT NULL DEFAULT '{}'`, `vector_ids text[]`.
* All large bodies are **TOAST/LZ4** compressed; photos/attachments > 5 MB stay on disk with a path pointer.
* GIN indexes on `tags` for millisecond filtering.
* LISTEN/NOTIFY drives Celery (no Redis needed, but Redis used by default).
### 4.2 Qdrant (similarity index)
| Collection | Model | Dim | Distance | Extra payload |
|------------|-------|-----|----------|---------------|
| `mail` | `text-embedding-3-small` | 1536 | Cosine | `tags`, `folder`, `from` |
| `chat` | same | 1536 | Cosine | `channel_id`, `platform` |
| `git` | same | 1536 | Cosine | `files_changed[]`, `author`, `tags` |
| `photo` | OpenCLIP ViT-B/32 | 512 | Cosine | `exif_date`, `face_id`, `tags` |
| `book`, `blog`, `doc` | same | 1536 | Cosine | `title`, `source_url`, `tags` |
---
## 5 Workers & Queues
| Queue | Concurrency | Task | Key libs |
|-------|-------------|------|----------|
| `text` | 4 CPU | Chunk + embed text | OpenAI Python SDK |
| `image` | 2 CPU / GPU | Embed photo (CLIP) | `open_clip_torch` |
| `ocr` | 8 CPU | OCR PDF/image | `ocrmypdf`, `tesseract-ocr` |
| `git` | 2 CPU | Diff-summary → embed | GPT-4o mini or Φ-3-mini |
| `rss` | 1 CPU | Fetch feed, parse article | `feedparser`, `trafilatura` |
| `docs` | 2 CPU | Misc file parsing | `pdfminer`, `python-docx` |
Every queue auto-retries 3× with exponential back-off.
---
## 6 Tagging Framework
* YAML rule file; fields `sender_regex`, `path_regex`, `channel_regex`, `gps_polygon`, `add_tags[]`.
* Workers call `apply_tags()` before inserting into Postgres/Qdrant.
* CLI utility `retag add/remove <tag> (--where …)` for bulk fixes.
* Tags are free-form strings; new tags require **no schema or index change** — Qdrant builds bitmap on first use.
---
## 7 Query & Chat Flow
1. **Router** embeds user text with
* CLIP-text → hits `photo`
* text-embed-3 → hits all text collections.
2. Applies user-or conversation-scoped filter, e.g. `{"tags":{"value":"work"}}`.
3. Parallel search (async) → merge top-k by score.
4. Build “evidence bundle” (snippets, thumbs, commit msgs).
5. Feed bundle + question to LLM:
* cloud GPT-4o (vision) **or**
* local LLaVA-1.6 + captions.
6. Stream answer & thumbnails back.
Expected latency (NUC, GPT-4o): **≈ 1.3 s p95**.
---
## 8 Back-ups & DR
| Layer | Method | Retention | Cost |
|-------|--------|-----------|------|
| NVMe dataset | Restic dedup ⇒ **S3 Glacier Deep Archive** | 7 daily / 4 weekly / 6 monthly | First snapshot 250 GB → €0.9 / mo; delta ≈ €0.02 / mo |
| Local roll-back | ZFS hourly snapshots (compressed) | 7 days | disk-only |
| Restore test | Quarterly scripted restore to `/tmp/restore-test` | — | — |
---
## 9 Security
* Full-disk LUKS; if on AWS use encrypted EBS + **customer-managed KMS key**.
* Instance in private subnet; access via Tailscale SSH or AWS SSM.
* Docker containers run as non-root; seccomp default profile.
* TLS termination in Traefik with auto-renewing Lets Encrypt cert on LAN.
---
## 10 Hardware & Performance
| Component | Spec | Head-room |
|-----------|------|-----------|
| Mini-PC | 4-core i5 (11th gen) / 16 GB RAM | p95 memory < 9 GB |
| Storage | 1 TB NVMe + ext. 1 TB SATA for backups | 5-year growth ≤ 400 GB |
| Power | 6 W idle → €1.5 / mo | — |
| GPU (optional) | Used RTX 2060 / T600 | Speeds 1st photo embed to < 1 h |
---
## 11 LLM & Model Abstraction
```python
class EmbedProvider(ABC):
def embed(self, text: str) -> list[float]: ...
provider = OpenAIProvider(model="text-embedding-3-small")
# swap later:
# provider = OllamaProvider(model="nomic-embed-text")
# injection via environment
EMBED_BACKEND="openai" # or "ollama"
```
Same interface for diff-summariser and chat-LLM; switching is one `docker-compose.yml` env var.
---
## 12 Monitoring & Ops
* **Prometheus + Grafana**: node load, Postgres WAL lag, queue depth.
* **Watchtower** auto-updates images weekly (except Postgres & Qdrant).
* Alertmanager e-mails if free disk < 15 % or any Celery worker dies.
---
## 13 Roadmap / Open Items
| Phase | Deliverable |
|-------|-------------|
| **0** (done) | Design document v1.0 |
| **1** | Dockerfiles & compose stack; mail + chat + photo ingestion |
| **2** | Git summariser + OCR worker; tag rules config |
| **3** | Books, RSS, misc docs workers |
| **4** | Live chat UI & LLaVA offline option |
| **5** | Multi-user RLS & optional code-search add-on |
---
*Document ends — save for future implementation.*

239
docker-compose.yaml Normal file
View File

@ -0,0 +1,239 @@
version: "3.9"
# --------------------------------------------------------------------- networks
networks:
kbnet: # internal overlay NOT exposed
driver: bridge
# --------------------------------------------------------------------- secrets
secrets:
postgres_password: {file: ./secrets/postgres_password.txt}
jwt_secret: {file: ./secrets/jwt_secret.txt}
openai_key: {file: ./secrets/openai_key.txt}
# --------------------------------------------------------------------- volumes
volumes:
db_data: {} # Postgres
qdrant_data: {} # Qdrant
rabbitmq_data: {} # RabbitMQ
# ------------------------------ X-templates ----------------------------
x-common-env: &env
RABBITMQ_USER: kb
TZ: "Etc/UTC"
x-worker-base: &worker-base
build:
context: .
dockerfile: docker/workers/Dockerfile
restart: unless-stopped
networks: [kbnet]
security_opt: ["no-new-privileges=true"]
depends_on: [postgres, rabbitmq, qdrant]
env_file: [.env]
environment: &worker-env
<<: *env
POSTGRES_PASSWORD_FILE: /run/secrets/postgres_password
# DSNs are built in worker entrypoint from user + pw files
QDRANT_URL: http://qdrant:6333
OPENAI_API_KEY_FILE: /run/secrets/openai_key
secrets: [postgres_password, openai_key]
read_only: true
tmpfs: [/tmp,/var/tmp]
cap_drop: [ALL]
logging:
options: {max-size: "10m", max-file: "3"}
# ================================ SERVICES ============================
services:
# ----------------------------------------------------------------- data layer
postgres:
image: postgres:15
restart: unless-stopped
networks: [kbnet]
environment:
<<: *env
POSTGRES_USER: kb
POSTGRES_PASSWORD_FILE: /run/secrets/postgres_password
POSTGRES_DB: kb
secrets: [postgres_password]
volumes:
- db_data:/var/lib/postgresql/data:rw
- ./db:/docker-entrypoint-initdb.d:ro
healthcheck:
test: ["CMD-SHELL", "pg_isready -U kb"]
interval: 10s
timeout: 5s
retries: 5
mem_limit: 4g
cpus: "1.5"
security_opt: ["no-new-privileges=true"]
rabbitmq:
image: rabbitmq:3.13-management
restart: unless-stopped
networks: [kbnet]
environment:
<<: *env
RABBITMQ_DEFAULT_USER: "kb"
RABBITMQ_DEFAULT_PASS: "${RABBITMQ_PASSWORD}"
volumes:
- rabbitmq_data:/var/lib/rabbitmq:rw
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "ping"]
interval: 15s
timeout: 5s
retries: 5
mem_limit: 512m
cpus: "0.5"
security_opt: ["no-new-privileges=true"]
ports: # UI only on localhost
- "127.0.0.1:15672:15672"
qdrant:
image: qdrant/qdrant:v1.14.0
restart: unless-stopped
networks: [kbnet]
volumes:
- qdrant_data:/qdrant/storage:rw
tmpfs:
- /tmp
- /var/tmp
- /qdrant/snapshots:rw
healthcheck:
test: ["CMD", "wget", "-q", "-T", "2", "-O", "-", "localhost:6333/ready"]
interval: 15s
timeout: 5s
retries: 5
mem_limit: 4g
cpus: "2"
security_opt: ["no-new-privileges=true"]
cap_drop: [ALL]
# ------------------------------------------------------------ API / gateway
# api:
# build:
# context: .
# dockerfile: docker/api/Dockerfile
# restart: unless-stopped
# networks: [kbnet]
# depends_on: [postgres, rabbitmq, qdrant]
# environment:
# <<: *env
# JWT_SECRET_FILE: /run/secrets/jwt_secret
# OPENAI_API_KEY_FILE: /run/secrets/openai_key
# POSTGRES_PASSWORD_FILE: /run/secrets/postgres_password
# QDRANT_URL: http://qdrant:6333
# secrets: [jwt_secret, openai_key, postgres_password]
# healthcheck:
# test: ["CMD-SHELL", "curl -fs http://localhost:8000/health || exit 1"]
# interval: 15s
# timeout: 5s
# retries: 5
# mem_limit: 768m
# cpus: "1"
# labels:
# - "traefik.enable=true"
# - "traefik.http.routers.kb.rule=Host(`${TRAEFIK_DOMAIN}`)"
# - "traefik.http.routers.kb.entrypoints=websecure"
# - "traefik.http.services.kb.loadbalancer.server.port=8000"
traefik:
image: traefik:v3.0
restart: unless-stopped
networks: [kbnet]
command:
- "--providers.docker=true"
- "--providers.docker.network=kbnet"
- "--entrypoints.web.address=:80"
- "--entrypoints.websecure.address=:443"
# - "--certificatesresolvers.le.acme.httpchallenge=true"
# - "--certificatesresolvers.le.acme.httpchallenge.entrypoint=web"
# - "--certificatesresolvers.le.acme.email=${LE_EMAIL}"
# - "--certificatesresolvers.le.acme.storage=/acme.json"
- "--log.level=INFO"
ports:
- "80:80"
- "443:443"
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro
# - ./acme.json:/acme.json:rw
# ------------------------------------------------------------ Celery workers
worker-text:
<<: *worker-base
environment:
<<: *worker-env
QUEUES: "medium_embed"
deploy: {resources: {limits: {cpus: "2", memory: 3g}}}
worker-photo:
<<: *worker-base
environment:
<<: *worker-env
QUEUES: "photo_embed"
deploy: {resources: {limits: {cpus: "4", memory: 4g}}}
worker-ocr:
<<: *worker-base
environment:
<<: *worker-env
QUEUES: "low_ocr"
deploy: {resources: {limits: {cpus: "4", memory: 4g}}}
worker-git:
<<: *worker-base
environment:
<<: *worker-env
QUEUES: "git_summary"
deploy: {resources: {limits: {cpus: "1", memory: 1g}}}
worker-rss:
<<: *worker-base
environment:
<<: *worker-env
QUEUES: "rss"
deploy: {resources: {limits: {cpus: "0.5", memory: 512m}}}
worker-docs:
<<: *worker-base
environment:
<<: *worker-env
QUEUES: "docs"
deploy: {resources: {limits: {cpus: "1", memory: 1g}}}
# ------------------------------------------------------------ watchtower (auto-update)
watchtower:
image: containrrr/watchtower
restart: unless-stopped
command: ["--schedule", "0 0 4 * * *", "--cleanup"]
volumes: ["/var/run/docker.sock:/var/run/docker.sock:ro"]
networks: [kbnet]
# ------------------------------------------------------------------- profiles: observability (opt-in)
# services:
# prometheus:
# image: prom/prometheus:v2.52
# profiles: ["obs"]
# networks: [kbnet]
# volumes: [./observability/prometheus.yml:/etc/prometheus/prometheus.yml:ro]
# restart: unless-stopped
# ports: ["127.0.0.1:9090:9090"]
# grafana:
# image: grafana/grafana:10
# profiles: ["obs"]
# networks: [kbnet]
# volumes: [./observability/grafana:/var/lib/grafana]
# restart: unless-stopped
# environment:
# GF_SECURITY_ADMIN_USER: admin
# GF_SECURITY_ADMIN_PASSWORD_FILE: /run/secrets/grafana_pw
# secrets: [grafana_pw]
# ports: ["127.0.0.1:3000:3000"]
# secrets: # extra secret for Grafana, not needed otherwise
# grafana_pw:
# file: ./secrets/grafana_pw.txt

24
docker/api/Dockerfile Normal file
View File

@ -0,0 +1,24 @@
FROM python:3.10-slim
WORKDIR /app
# Copy requirements files and setup
COPY requirements-*.txt ./
COPY setup.py ./
COPY src/ ./src/
# Install the package with API dependencies
RUN pip install -e ".[api]"
# Run as non-root user
RUN useradd -m appuser
USER appuser
# Set environment variables
ENV PORT=8000
ENV PYTHONPATH="/app"
EXPOSE 8000
# Run the API
CMD ["uvicorn", "memory.api.app:app", "--host", "0.0.0.0", "--port", "8000"]

28
docker/workers/Dockerfile Normal file
View File

@ -0,0 +1,28 @@
FROM python:3.11-slim
WORKDIR /app
# Copy requirements files and setup
COPY requirements-*.txt ./
COPY setup.py ./
COPY src/ ./src/
# Install dependencies
RUN apt-get update && apt-get install -y \
libpq-dev gcc && \
pip install -e ".[workers]" && \
apt-get purge -y gcc && apt-get autoremove -y && rm -rf /var/lib/apt/lists/*
# Create and copy entrypoint script
COPY docker/workers/entry.sh ./entry.sh
RUN chmod +x entry.sh
# Create user and set permissions
RUN useradd -m kb && chown -R kb /app
USER kb
# Default queues to process
ENV QUEUES="medium_embed,photo_embed,low_ocr,git_summary,rss,docs"
ENV PYTHONPATH="/app"
ENTRYPOINT ["./entry.sh"]

14
docker/workers/entry.sh Normal file
View File

@ -0,0 +1,14 @@
#!/usr/bin/env bash
set -euo pipefail
QUEUES=${QUEUES:-default}
CONCURRENCY=${CONCURRENCY:-2}
LOGLEVEL=${LOGLEVEL:-INFO}
HOSTNAME="${QUEUES%@*}@$(hostname)"
exec celery -A memory.workers.celery_app worker \
-Q "${QUEUES}" \
--concurrency="${CONCURRENCY}" \
--hostname="${HOSTNAME}" \
--loglevel="${LOGLEVEL}"

4
requirements-api.txt Normal file
View File

@ -0,0 +1,4 @@
fastapi==0.112.2
uvicorn==0.29.0
python-jose==3.3.0
python-multipart==0.0.9

3
requirements-common.txt Normal file
View File

@ -0,0 +1,3 @@
sqlalchemy==2.0.30
psycopg2-binary==2.9.9
pydantic==2.7.1

4
requirements-workers.txt Normal file
View File

@ -0,0 +1,4 @@
celery==5.3.6
openai==1.25.0
pillow==10.3.0
qdrant-client==1.9.0

31
setup.py Normal file
View File

@ -0,0 +1,31 @@
import pathlib
from setuptools import setup, find_namespace_packages
def read_requirements(filename: str) -> list[str]:
"""Read requirements from file, ignoring comments and -r directives."""
filename = pathlib.Path(filename)
return [
line.strip()
for line in filename.read_text().splitlines()
if line.strip() and not line.strip().startswith(('#', '-r'))
]
# Read requirements files
common_requires = read_requirements('requirements-common.txt')
api_requires = read_requirements('requirements-api.txt')
workers_requires = read_requirements('requirements-workers.txt')
setup(
name="memory",
version="0.1.0",
package_dir={"": "src"},
packages=find_namespace_packages(where="src"),
python_requires=">=3.10",
extras_require={
"api": api_requires + common_requires,
"workers": workers_requires + common_requires,
"common": common_requires,
},
)

5
src/__init__.py Normal file
View File

@ -0,0 +1,5 @@
"""
Memory knowledge-base application.
"""
__version__ = "0.1.0"

1
src/memory/__init__.py Normal file
View File

@ -0,0 +1 @@

View File

50
src/memory/api/app.py Normal file
View File

@ -0,0 +1,50 @@
"""
FastAPI application for the knowledge base.
"""
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from memory.common.db import get_scoped_session
from memory.common.db.models import SourceItem
app = FastAPI(title="Knowledge Base API")
def get_db():
"""Database session dependency"""
db = get_scoped_session()
try:
yield db
finally:
db.close()
@app.get("/health")
def health_check():
"""Simple health check endpoint"""
return {"status": "healthy"}
@app.get("/sources")
def list_sources(
tag: str = None,
limit: int = 100,
db: Session = Depends(get_db)
):
"""List source items, optionally filtered by tag"""
query = db.query(SourceItem)
if tag:
query = query.filter(SourceItem.tags.contains([tag]))
return query.limit(limit).all()
@app.get("/sources/{source_id}")
def get_source(source_id: int, db: Session = Depends(get_db)):
"""Get a specific source by ID"""
source = db.query(SourceItem).filter(SourceItem.id == source_id).first()
if not source:
raise HTTPException(status_code=404, detail="Source not found")
return source

View File

View File

@ -0,0 +1,12 @@
"""
Database utilities package.
"""
from memory.common.db.models import Base
from memory.common.db.connection import get_engine, get_session_factory, get_scoped_session
__all__ = [
"Base",
"get_engine",
"get_session_factory",
"get_scoped_session",
]

View File

@ -0,0 +1,32 @@
"""
Database connection utilities.
"""
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
def get_engine():
"""Create SQLAlchemy engine from environment variables"""
user = os.getenv("POSTGRES_USER", "kb")
password = os.getenv("POSTGRES_PASSWORD", "kb")
host = os.getenv("POSTGRES_HOST", "postgres")
port = os.getenv("POSTGRES_PORT", "5432")
db = os.getenv("POSTGRES_DB", "kb")
return create_engine(f"postgresql://{user}:{password}@{host}:{port}/{db}")
def get_session_factory():
"""Create a session factory for SQLAlchemy sessions"""
engine = get_engine()
session_factory = sessionmaker(bind=engine)
return session_factory
def get_scoped_session():
"""Create a thread-local scoped session factory"""
engine = get_engine()
session_factory = sessionmaker(bind=engine)
return scoped_session(session_factory)

View File

@ -0,0 +1,127 @@
"""
Database models for the knowledge base system.
"""
from sqlalchemy import (
Column, ForeignKey, Integer, BigInteger, Text, DateTime, Boolean, Float,
ARRAY, func
)
from sqlalchemy.dialects.postgresql import BYTEA, JSONB, TSVECTOR
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class SourceItem(Base):
__tablename__ = 'source_item'
id = Column(BigInteger, primary_key=True)
modality = Column(Text, nullable=False)
sha256 = Column(BYTEA, nullable=False, unique=True)
inserted_at = Column(DateTime(timezone=True), server_default=func.now())
tags = Column(ARRAY(Text), nullable=False, server_default='{}')
lang = Column(Text)
model_hash = Column(Text)
vector_ids = Column(ARRAY(Text), nullable=False, server_default='{}')
embed_status = Column(Text, nullable=False, server_default='RAW')
byte_length = Column(Integer)
mime_type = Column(Text)
class MailMessage(Base):
__tablename__ = 'mail_message'
id = Column(BigInteger, primary_key=True)
source_id = Column(BigInteger, ForeignKey('source_item.id', ondelete='CASCADE'), nullable=False)
message_id = Column(Text, unique=True)
subject = Column(Text)
sender = Column(Text)
recipients = Column(ARRAY(Text))
sent_at = Column(DateTime(timezone=True))
body_raw = Column(Text)
attachments = Column(JSONB)
tsv = Column(TSVECTOR)
class ChatMessage(Base):
__tablename__ = 'chat_message'
id = Column(BigInteger, primary_key=True)
source_id = Column(BigInteger, ForeignKey('source_item.id', ondelete='CASCADE'), nullable=False)
platform = Column(Text)
channel_id = Column(Text)
author = Column(Text)
sent_at = Column(DateTime(timezone=True))
body_raw = Column(Text)
class GitCommit(Base):
__tablename__ = 'git_commit'
id = Column(BigInteger, primary_key=True)
source_id = Column(BigInteger, ForeignKey('source_item.id', ondelete='CASCADE'), nullable=False)
repo_path = Column(Text)
commit_sha = Column(Text, unique=True)
author_name = Column(Text)
author_email = Column(Text)
author_date = Column(DateTime(timezone=True))
msg_raw = Column(Text)
diff_summary = Column(Text)
files_changed = Column(ARRAY(Text))
class Photo(Base):
__tablename__ = 'photo'
id = Column(BigInteger, primary_key=True)
source_id = Column(BigInteger, ForeignKey('source_item.id', ondelete='CASCADE'), nullable=False)
file_path = Column(Text)
exif_taken_at = Column(DateTime(timezone=True))
exif_lat = Column(Float)
exif_lon = Column(Float)
camera_make = Column(Text)
camera_model = Column(Text)
class BookDoc(Base):
__tablename__ = 'book_doc'
id = Column(BigInteger, primary_key=True)
source_id = Column(BigInteger, ForeignKey('source_item.id', ondelete='CASCADE'), nullable=False)
title = Column(Text)
author = Column(Text)
chapter = Column(Text)
published = Column(DateTime)
class BlogPost(Base):
__tablename__ = 'blog_post'
id = Column(BigInteger, primary_key=True)
source_id = Column(BigInteger, ForeignKey('source_item.id', ondelete='CASCADE'), nullable=False)
url = Column(Text, unique=True)
title = Column(Text)
published = Column(DateTime(timezone=True))
class MiscDoc(Base):
__tablename__ = 'misc_doc'
id = Column(BigInteger, primary_key=True)
source_id = Column(BigInteger, ForeignKey('source_item.id', ondelete='CASCADE'), nullable=False)
path = Column(Text)
mime_type = Column(Text)
class RssFeed(Base):
__tablename__ = 'rss_feeds'
id = Column(BigInteger, primary_key=True)
url = Column(Text, nullable=False, unique=True)
title = Column(Text)
description = Column(Text)
tags = Column(ARRAY(Text), nullable=False, server_default='{}')
last_checked_at = Column(DateTime(timezone=True))
active = Column(Boolean, nullable=False, server_default='true')
created_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now())
updated_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now())

View File

View File

@ -0,0 +1,33 @@
import os
from celery import Celery
def rabbit_url() -> str:
user = os.getenv("RABBITMQ_USER", "guest")
password = os.getenv("RABBITMQ_PASSWORD", "guest")
return f"amqp://{user}:{password}@rabbitmq:5672//"
app = Celery("memory",
broker=rabbit_url(),
backend=os.getenv("CELERY_RESULT_BACKEND",
"db+postgresql://kb:kb@postgres/kb"))
app.autodiscover_tasks(["memory.workers.tasks"])
app.conf.update(
task_acks_late=True,
task_reject_on_worker_lost=True,
worker_prefetch_multiplier=1,
task_routes={
# Task routing configuration
"memory.workers.tasks.text.*": {"queue": "medium_embed"},
"memory.workers.tasks.photo.*": {"queue": "photo_embed"},
"memory.workers.tasks.ocr.*": {"queue": "low_ocr"},
"memory.workers.tasks.git.*": {"queue": "git_summary"},
"memory.workers.tasks.rss.*": {"queue": "rss"},
"memory.workers.tasks.docs.*": {"queue": "docs"},
},
)

View File

@ -0,0 +1,4 @@
"""
Import sub-modules so Celery can register their @app.task decorators.
"""
from memory.workers.tasks import text, photo, ocr, git, rss, docs # noqa

View File

@ -0,0 +1,5 @@
from memory.workers.celery_app import app
@app.task(name="kb.text.ping")
def ping():
return "pong"

View File

@ -0,0 +1,5 @@
from memory.workers.celery_app import app
@app.task(name="kb.text.ping")
def ping():
return "pong"

View File

@ -0,0 +1,5 @@
from memory.workers.celery_app import app
@app.task(name="kb.text.ping")
def ping():
return "pong"

View File

@ -0,0 +1,5 @@
from memory.workers.celery_app import app
@app.task(name="kb.text.ping")
def ping():
return "pong"

View File

@ -0,0 +1,6 @@
from memory.workers.celery_app import app
@app.task(name="kb.text.ping")
def ping():
return "pong"

View File

@ -0,0 +1,5 @@
from memory.workers.celery_app import app
@app.task(name="memory.text.ping")
def ping():
return "pong"

1
tests/__init__.py Normal file
View File

@ -0,0 +1 @@