Skip to main content

RAG Reranking

Vector similarity retrieval is fast but imprecise: cosine distance is a proxy for relevance, not a direct measure of it. Reranking adds a second, slower, but more accurate scoring stage that re-orders the initial candidates before they reach the LLM.

This tutorial covers two complementary techniques:

  • LLM-based reranking — ask the model to score each candidate for relevance
  • Reciprocal Rank Fusion (RRF) — combine results from multiple retrieval strategies without needing a trained cross-encoder

Two-stage retrieval pattern

Stage 1 (fast, broad):         db.ai.search  →  top-N candidates (N = 20–50)

Stage 2 (slow, precise): reranker → top-k final results (k = 3–5)

LLM synthesis → answer

The key insight is to over-fetch in stage 1 (retrieve more candidates than you'll ultimately use) so the reranker has enough material to work with.


Approach 1: LLM-based reranking

Ask the LLM to score each candidate passage for relevance to the query on a 0–10 scale, then sort descending. This is simple to implement, costs a small number of tokens per candidate, and works well out of the box.

import os
from concurrent.futures import ThreadPoolExecutor
from rushdb import RushDB
from openai import OpenAI

db = RushDB(os.environ['RUSHDB_API_KEY'])
openai = OpenAI()

def retrieve_candidates(query: str, candidate_count: int = 25) -> list:
return db.ai.search(
query=query,
labels=['DOC_CHUNK'],
property_name='text',
limit=candidate_count
)

def score_candidate(query: str, candidate) -> dict:
prompt = f"""Rate how relevant the following passage is to the query on a scale of 0 to 10.
Respond with ONLY a single integer (0–10). No explanation.

Query: {query}

Passage: {candidate.text}

Relevance score:"""

completion = openai.chat.completions.create(
model='gpt-4o-mini',
messages=[{'role': 'user', 'content': prompt}],
max_tokens=5,
temperature=0
)
raw = (completion.choices[0].message.content or '0').strip()
try:
score = int(raw)
except ValueError:
score = 0
return {'record': candidate, 'rerank_score': score}

def rerank_with_llm(query: str, candidates: list, top_k: int = 5) -> list:
with ThreadPoolExecutor(max_workers=10) as executor:
scored = list(executor.map(lambda c: score_candidate(query, c), candidates))

scored.sort(key=lambda x: x['rerank_score'], reverse=True)
return [item['record'] for item in scored[:top_k]]

def retrieve_and_rerank(query: str, top_k: int = 5) -> list:
candidates = retrieve_candidates(query, 25)
return rerank_with_llm(query, candidates, top_k)

top_chunks = retrieve_and_rerank('how does billing work for BYOC projects?')
for chunk in top_chunks:
print(f'{chunk.text[:80]}...')

Approach 2: Reciprocal Rank Fusion (RRF)

RRF merges ranked result lists from multiple retrieval strategies without needing scores to be on the same scale. It's particularly useful when combining:

  • Vector similarity (semantic)
  • Keyword/property filters (exact)
  • Different vector indexes (e.g., title vs body)

RRF formula:

RRF(d) = Σ  1 / (k + rank_r(d))   for each result list r

Where k = 60 is the standard smoothing constant and r iterates over result lists.

# rrf.py
from concurrent.futures import ThreadPoolExecutor

def rrf_merge(ranked_lists: list[list[str]], k: int = 60) -> list[str]:
"""Reciprocal Rank Fusion over multiple ranked ID lists."""
scores: dict[str, float] = {}
for ranked in ranked_lists:
for rank, id_ in enumerate(ranked, start=1):
scores[id_] = scores.get(id_, 0.0) + 1 / (k + rank)
return sorted(scores, key=lambda id_: scores[id_], reverse=True)

def hybrid_search(db, query: str, top_k: int = 5) -> list:
with ThreadPoolExecutor(max_workers=2) as executor:
body_future = executor.submit(
db.ai.search,
query=query, labels=['DOC_CHUNK'], property_name='text', limit=20
)
title_future = executor.submit(
db.ai.search,
query=query, labels=['DOC_CHUNK'], property_name='title', limit=20
)
body_hits = body_future.result()
title_hits = title_future.result()

body_ids = [h.__id for h in body_hits]
title_ids = [h.__id for h in title_hits]

merged_ids = rrf_merge([body_ids, title_ids])[:top_k]

record_map = {h.__id: h for h in [*body_hits, *title_hits]}
return [record_map[id_] for id_ in merged_ids if id_ in record_map]

Approach 3: Combined pipeline (RRF → LLM rerank)

For maximum precision, use RRF to merge multiple retrieval strategies, then LLM-rerank the merged candidates:

def precision_pipeline(db, openai_client, query: str, final_top_k: int = 5) -> list:
# Stage 1a: semantic search
semantic_hits = db.ai.search(
query=query, labels=['DOC_CHUNK'], property_name='text', limit=20
)

# Stage 1b: keyword filter
words = query.split()[:3]
keyword_result = db.records.find(
labels=['DOC_CHUNK'],
where={'text': {'$contains': ' '.join(words)}},
limit=20
)
keyword_hits = keyword_result.data if keyword_result else []

# Stage 2: RRF merge
semantic_ids = [h.__id for h in semantic_hits]
keyword_ids = [h.__id for h in keyword_hits]
merged_ids = rrf_merge([semantic_ids, keyword_ids])[:25]

all_records = [*semantic_hits, *keyword_hits]
record_map = {r.__id: r for r in all_records}
merged = [record_map[id_] for id_ in merged_ids if id_ in record_map]

# Stage 3: LLM rerank
return rerank_with_llm(query, merged, final_top_k)

Cost and latency trade-offs

StrategyLatencyCostPrecision gain
Vector only (baseline)~50–200 ms$
RRF (multi-index)~100–400 ms$Low–Medium
LLM rerank (gpt-4o-mini)+500–2000 ms$$Medium–High
LLM rerank (gpt-4o)+1000–4000 ms$$$High
RRF + LLM rerank+600–2500 ms$$High

Practical guidance:

  • Use vector only for real-time type-ahead or high-volume search where P@5 > 0.6 is already achieved.
  • Use RRF when you have multiple meaningful retrieval signals (titles, bodies, semantic, exact match) and want free precision gains.
  • Use LLM rerank when you need the highest possible precision for low-traffic, high-stakes queries (support tickets, legal research, medical Q&A).
  • Use RRF + LLM rerank for regulated domains or when evaluation shows vector-only Precision@5 < 0.55.

Caching rerank scores

Reranker calls are expensive. Cache results for identical (query, candidate-set) pairs to avoid re-scoring on repeated queries.

from functools import lru_cache
import hashlib, json

_rerank_cache: dict[str, list] = {}

def cached_rerank(query: str, candidates: list, top_k: int = 5) -> list:
ids_key = ','.join(sorted(c.__id for c in candidates))
cache_key = hashlib.md5(f'{query}|{ids_key}'.encode()).hexdigest()

if cache_key in _rerank_cache:
return _rerank_cache[cache_key]

result = rerank_with_llm(query, candidates, top_k)
_rerank_cache[cache_key] = result
return result

Full pipeline summary

User query


db.ai.search(limit=20..50) ← Stage 1: fast broad retrieval

├── optional: db.records.find(keyword filter)


rrfMerge([semantic, keyword]) ← Stage 2: fuse result lists


rerankWithLlm(merged, topK=5) ← Stage 3: precise reordering


buildPrompt(topChunks)


LLM.chat(prompt) ← Final answer with citations

Next steps

  • RAG Evaluation — measure the Precision@k impact of adding reranking
  • Multi-Source RAG — apply RRF across PDF, web, and database labels
  • GraphRAG — add graph-enriched context alongside reranked chunks