Optimize search: parallelize BM25 and embedding, fix word filter

- Run BM25 and embedding searches in parallel with asyncio.gather
- Fix word filter in both search.py and bm25.py: >= 2 chars instead of > 2
  (allows important 2-letter terms like "AI", "ML" to be included)
- Better error handling for parallel search failures

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
mruwnik 2025-12-21 15:58:39 +00:00
parent 5b997cc397
commit 548a81e21d
2 changed files with 21 additions and 21 deletions

View File

@ -53,7 +53,7 @@ def build_tsquery(query: str) -> str:
words = [ words = [
w.strip().lower() w.strip().lower()
for w in clean_query.split() for w in clean_query.split()
if w.strip() and len(w.strip()) > 2 and w.strip().lower() not in _STOPWORDS if w.strip() and len(w.strip()) >= 2 and w.strip().lower() not in _STOPWORDS
] ]
if not words: if not words:
return "" return ""

View File

@ -7,7 +7,6 @@ import logging
import math import math
from collections import defaultdict from collections import defaultdict
from datetime import datetime, timezone from datetime import datetime, timezone
from sqlalchemy.orm import load_only
from memory.common import extract, settings from memory.common import extract, settings
from memory.common.db.connection import make_session from memory.common.db.connection import make_session
from memory.common.db.models import Chunk, SourceItem from memory.common.db.models import Chunk, SourceItem
@ -321,22 +320,31 @@ async def _run_searches(
use_bm25: bool, use_bm25: bool,
) -> dict[str, float]: ) -> dict[str, float]:
""" """
Run embedding and optionally BM25 searches, returning fused scores. Run embedding and optionally BM25 searches in parallel, returning fused scores.
""" """
# Run embedding search # Build tasks to run in parallel
embedding_scores = await search_chunks_embeddings( embedding_task = search_chunks_embeddings(
search_data, modalities, internal_limit, filters, timeout search_data, modalities, internal_limit, filters, timeout
) )
# Run BM25 search if enabled
bm25_scores: dict[str, float] = {}
if use_bm25: if use_bm25:
try: # Run both searches in parallel
bm25_scores = await search_bm25_chunks( results = await asyncio.gather(
data, modalities, internal_limit, filters, timeout embedding_task,
) search_bm25_chunks(data, modalities, internal_limit, filters, timeout),
except asyncio.TimeoutError: return_exceptions=True,
logger.warning("BM25 search timed out, using embedding results only") )
embedding_scores = results[0] if not isinstance(results[0], Exception) else {}
if isinstance(results[0], Exception):
logger.warning(f"Embedding search failed: {results[0]}")
bm25_scores = results[1] if not isinstance(results[1], Exception) else {}
if isinstance(results[1], Exception):
logger.warning(f"BM25 search failed: {results[1]}")
else:
embedding_scores = await embedding_task
bm25_scores = {}
# Fuse scores from both methods using Reciprocal Rank Fusion # Fuse scores from both methods using Reciprocal Rank Fusion
return fuse_scores_rrf(embedding_scores, bm25_scores) return fuse_scores_rrf(embedding_scores, bm25_scores)
@ -367,14 +375,6 @@ def _fetch_chunks(
with make_session() as db: with make_session() as db:
chunks = ( chunks = (
db.query(Chunk) db.query(Chunk)
.options(
load_only(
Chunk.id, # type: ignore
Chunk.source_id, # type: ignore
Chunk.content, # type: ignore
Chunk.file_paths, # type: ignore
)
)
.filter(Chunk.id.in_(top_ids)) .filter(Chunk.id.in_(top_ids))
.all() .all()
) )