Skip to main content

RAG Evaluation

Retrieval quality degrades silently: a new embedding model bumps up dimensions, a chunk-size change reduces recall, or an index backfill problem leaves some records un-vectorized. This tutorial builds a lightweight evaluation harness that measures Precision@k and Recall@k for your RushDB vector search pipeline — and plugs into CI so regressions are caught before they reach production.


Concepts

MetricDefinitionFormula
Precision@kOf the k results returned, how many are relevant?`
Recall@kOf all relevant items, how many appear in the top k?`

A well-tuned RAG retriever should maintain Precision@5 ≥ 0.6 and Recall@5 ≥ 0.5 for your domain. The exact thresholds depend on your corpus — establish a baseline first, then track drift.


Step 1: Build a ground-truth dataset

Create a small set of evaluation queries paired with expected top record IDs. Aim for 20–50 queries covering your key topic areas.

# eval_dataset.py
EVAL_DATASET = [
{
'query': 'how to set up a self-hosted RushDB instance',
'expected_ids': [
'record-id-docker-guide',
'record-id-env-vars',
'record-id-first-boot'
]
},
{
'query': 'connecting a Neo4j Aura database',
'expected_ids': [
'record-id-aura-setup',
'record-id-bolt-url'
]
},
{
'query': 'external embedding indexes BYOV',
'expected_ids': [
'record-id-byov-intro',
'record-id-external-index',
'record-id-vectors-key'
]
}
]

Bootstrapping expected IDs

If you don't have ground-truth IDs yet, run a baseline search, manually review the results, and promote the relevant ones to your dataset:

import os
from rushdb import RushDB

db = RushDB(os.environ['RUSHDB_API_KEY'])

hits = db.ai.search(
query='how to set up a self-hosted instance',
labels=['DOC_CHUNK'],
property_name='text',
limit=10
)

for h in hits:
print(h.__id, f'{h.__score:.3f}', h.text[:80])
# Manually note which IDs are relevant → add to EVAL_DATASET

Step 2: Implement the metrics

# metrics.py
def precision_at_k(retrieved_ids: list[str], relevant_ids: list[str], k: int) -> float:
top_k = retrieved_ids[:k]
relevant_set = set(relevant_ids)
hits = sum(1 for id_ in top_k if id_ in relevant_set)
return hits / k

def recall_at_k(retrieved_ids: list[str], relevant_ids: list[str], k: int) -> float:
if not relevant_ids:
return 1.0
top_k = retrieved_ids[:k]
relevant_set = set(relevant_ids)
hits = sum(1 for id_ in top_k if id_ in relevant_set)
return hits / len(relevant_ids)

def mean_metric(values: list[float]) -> float:
return sum(values) / len(values) if values else 0.0

Step 3: Run the evaluation

# evaluate.py
import os
from rushdb import RushDB
from eval_dataset import EVAL_DATASET
from metrics import precision_at_k, recall_at_k, mean_metric

db = RushDB(os.environ['RUSHDB_API_KEY'])
K = 5

def evaluate():
precisions = []
recalls = []

for item in EVAL_DATASET:
hits = db.ai.search(
query=item['query'],
labels=['DOC_CHUNK'],
property_name='text',
limit=K
)
retrieved_ids = [h.__id for h in hits]
precisions.append(precision_at_k(retrieved_ids, item['expected_ids'], K))
recalls.append(recall_at_k(retrieved_ids, item['expected_ids'], K))

results = {
'k': K,
'queries': len(EVAL_DATASET),
'mean_precision': mean_metric(precisions),
'mean_recall': mean_metric(recalls),
'per_query': [
{
'query': EVAL_DATASET[i]['query'],
'precision': precisions[i],
'recall': recalls[i]
}
for i in range(len(EVAL_DATASET))
]
}

print(f"\nEvaluation Results (k={K})")
print(f"Mean Precision@{K}: {results['mean_precision']:.3f}")
print(f"Mean Recall@{K}: {results['mean_recall']:.3f}")

return results

Step 4: Score drift detection

Track __score for a fixed set of queries to detect when your embedding model or index quality changes.

# score_drift.py
import json, os

SNAPSHOT_FILE = './eval_score_snapshot.json'

def capture_score_snapshot(db, queries: list[str]) -> dict:
snapshot = {}
for query in queries:
hits = db.ai.search(
query=query,
labels=['DOC_CHUNK'],
property_name='text',
limit=5
)
snapshot[query] = [{'id': h.__id, 'score': h.__score} for h in hits]

with open(SNAPSHOT_FILE, 'w') as f:
json.dump(snapshot, f, indent=2)
print('Score snapshot saved.')
return snapshot

def compare_snapshots(baseline: dict, current: dict, drift_threshold: float = 0.05) -> list:
drifts = []
for query, base_results in baseline.items():
if query not in current:
continue
base_scores = {r['id']: r['score'] for r in base_results}
for result in current[query]:
id_ = result['id']
if id_ in base_scores:
delta = abs(result['score'] - base_scores[id_])
if delta > drift_threshold:
drifts.append({
'query': query,
'id': id_,
'baseline': base_scores[id_],
'current': result['score'],
'delta': delta
})
return drifts

Step 5: CI regression test

Plug the evaluation harness into your CI pipeline so merges that degrade retrieval quality are blocked.

# eval_ci.py  — run with: python eval_ci.py
import sys
from evaluate import evaluate

PRECISION_THRESHOLD = 0.60
RECALL_THRESHOLD = 0.50

results = evaluate()

passed = (
results['mean_precision'] >= PRECISION_THRESHOLD and
results['mean_recall'] >= RECALL_THRESHOLD
)

if not passed:
print('\n❌ Retrieval regression detected!')
print(f" Precision@{results['k']}: {results['mean_precision']:.3f} (threshold: {PRECISION_THRESHOLD})")
print(f" Recall@{results['k']}: {results['mean_recall']:.3f} (threshold: {RECALL_THRESHOLD})")

worst = sorted(results['per_query'], key=lambda q: q['precision'])[:3]
print('\nWorst-performing queries:')
for q in worst:
print(f" \"{q['query']}\" → P={q['precision']:.2f}, R={q['recall']:.2f}")

sys.exit(1)

print(f"\n✓ Retrieval quality OK (P@{results['k']}={results['mean_precision']:.3f}, R@{results['k']}={results['mean_recall']:.3f})")
sys.exit(0)

GitHub Actions integration

# .github/workflows/rag-eval.yml
name: RAG Evaluation

on:
pull_request:
paths:
- 'src/**' # Adjust to your source paths

jobs:
eval:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- uses: actions/setup-node@v4
with:
node-version: '20'

- name: Install dependencies
run: npm ci

- name: Run RAG evaluation
env:
RUSHDB_API_KEY: ${{ secrets.RUSHDB_API_KEY }}
run: node eval.ci.js

Step 6: Track results over time

Save evaluation results as JSON artifacts to build a history of retrieval quality.

import json, os
from datetime import datetime, timezone
from evaluate import evaluate

results = evaluate()

entry = {
'timestamp': datetime.now(timezone.utc).isoformat(),
'commit': os.environ.get('GITHUB_SHA', 'local'),
**results
}

with open('./eval_history.jsonl', 'a') as f:
f.write(json.dumps(entry) + '\n')
print('Results saved to eval_history.jsonl')

Interpreting results

SignalLikely causeAction
Low Precision@k (< 0.4)Too many off-topic resultsReduce limit, tighten where filters, or increase score threshold
Low Recall@k (< 0.4)Missing relevant chunksCheck index status (ready?), increase limit, review chunking strategy
Score drift across a PREmbedding model updateReview BYOV or model config change; re-evaluate thresholds
Specific query clusters failingCoverage gap in corpusIngest additional source material for those topics
Single label dominating resultsLabel imbalanceBalance corpus or search labels individually then merge

Tips

  • Start with k=5 or k=10. Going higher inflates recall trivially and makes the metric less useful for catching real degradation.
  • Weight your queries. If some queries are business-critical, run a weighted mean rather than a simple average.
  • Keep ground truth in version control. It becomes part of your specification — changes to expected IDs should be deliberate.
  • Re-validate ground truth after major corpus updates. Expected IDs may no longer exist after a data migration.

Next steps

  • RAG Reranking — improve Precision@k by adding a second retrieval stage
  • Multi-Source RAG — extend evaluation to cross-label retrieval
  • GraphRAG — measure enrichment quality alongside base retrieval metrics