Skip to main content

Multi-Source RAG

Real retrieval pipelines rarely draw from a single data source. This tutorial shows how to ingest PDFs, web pages, and database summaries as distinct record labels, then search across all of them in a single vector query — with source-aware citations in the final answer.


Architecture overview

┌──────────────┐  ┌──────────────┐  ┌──────────────┐
│ PDF Chunks │ │ Web Chunks │ │ DB Summary │
│ PDF_CHUNK │ │ WEB_CHUNK │ │ DB_SUMMARY │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
└─────────────────┼─────────────────┘

ai.search across
all three labels

LLM synthesis
with citations

Each source type lands in its own RushDB label. This keeps data organized and lets you filter by source when needed, while a single ai.search call lets you retrieve relevantly from all of them simultaneously.


Prerequisites

  • RushDB project with an embedding index configured (cloud managed or self-hosted with RUSHDB_EMBEDDING_MODEL set).
  • A PDF parsing library (we'll use pdf-parse for Node.js / pypdf for Python).
  • An LLM client for the synthesis step.

Step 1: Ingest PDFs as PDF_CHUNK

Chunk the PDF text and store each chunk as a separate record. Keep metadata (source, page) so you can cite the document later.

import os
from rushdb import RushDB
from pypdf import PdfReader

db = RushDB(os.environ['RUSHDB_API_KEY'])

def chunk_text(text: str, size: int = 600, overlap: int = 80) -> list[str]:
chunks = []
start = 0
while start < len(text):
chunks.append(text[start:start + size])
start += size - overlap
return chunks

def ingest_pdf(file_path: str, doc_title: str):
reader = PdfReader(file_path)
full_text = '\n'.join(page.extract_text() or '' for page in reader.pages)
chunks = chunk_text(full_text)

db.records.import_json(
label='PDF_CHUNK',
data=[
{
'text': chunk,
'source': doc_title,
'chunkIndex': i,
'sourceType': 'pdf'
}
for i, chunk in enumerate(chunks)
]
)
print(f'Ingested {len(chunks)} chunks from "{doc_title}"')

ingest_pdf('./docs/annual-report.pdf', 'Annual Report 2024')
ingest_pdf('./docs/product-spec.pdf', 'Product Spec v3')

Step 2: Ingest web pages as WEB_CHUNK

Fetch and chunk web content the same way. Add a url field for citation.

import httpx
from bs4 import BeautifulSoup

def ingest_web_page(url: str):
response = httpx.get(url, follow_redirects=True)
soup = BeautifulSoup(response.text, 'html.parser')
text = ' '.join(soup.get_text().split())
chunks = chunk_text(text)

db.records.import_json(
label='WEB_CHUNK',
data=[
{
'text': chunk,
'url': url,
'chunkIndex': i,
'sourceType': 'web'
}
for i, chunk in enumerate(chunks)
]
)
print(f'Ingested {len(chunks)} chunks from {url}')

ingest_web_page('https://docs.rushdb.com/get-started/quickstart')
ingest_web_page('https://docs.rushdb.com/tutorials/graphrag')

Step 3: Ingest database summaries as DB_SUMMARY

For structured data, generate a natural-language summary per entity and store it as a record. This makes relational data searchable via vector similarity.

def ingest_db_summaries(products: list[dict]):
db.records.import_json(
label='DB_SUMMARY',
data=[
{
'text': (
f"{p['name']}: {p['description']}. "
f"Category: {p['category']}. "
f"Price: ${p['price']}. "
f"In stock: {p['stock']}."
),
'entityId': p['id'],
'entityType': 'product',
'sourceType': 'database'
}
for p in products
]
)
print(f'Ingested {len(products)} DB summaries')

products = fetch_products_from_database()
ingest_db_summaries(products)

Step 4: Create embedding indexes

Create one embedding index per label. Each index covers the text property.

labels = ['PDF_CHUNK', 'WEB_CHUNK', 'DB_SUMMARY']

for label in labels:
result = db.embeddings.create_index(label=label, property_name='text')
print(f'Created index for {label}: {result.id}')

The indexes backfill in the background. Poll status if you need to wait:

import time

def wait_for_index(index_id: str):
while True:
index = db.embeddings.get_index(index_id)
if index.status == 'ready':
break
print(f'Index {index_id}: {index.status}')
time.sleep(3)

A single ai.search call across multiple labels returns results ranked by vector similarity regardless of source type.

def search_all_sources(query: str, limit: int = 8):
return db.ai.search(
query=query,
labels=['PDF_CHUNK', 'WEB_CHUNK', 'DB_SUMMARY'],
property_name='text',
limit=limit
)

hits = search_all_sources('how does the pricing model work?')

Source-specific retrieval

You can also query a single source when you know which to target:

pdf_hits = db.ai.search(
query='annual revenue breakdown',
labels=['PDF_CHUNK'],
property_name='text',
limit=5
)

web_hits = db.ai.search(
query='quickstart guide',
labels=['WEB_CHUNK'],
property_name='text',
limit=5
)

Step 6: Score filtering and de-duplication

Drop low-confidence results and remove near-duplicate chunks before sending to the LLM.

def jaccard_similarity(a: str, b: str) -> float:
set_a = set(a.lower().split())
set_b = set(b.lower().split())
if not set_a or not set_b:
return 0.0
return len(set_a & set_b) / len(set_a | set_b)

def deduplicate_chunks(
hits: list,
score_threshold: float = 0.70,
similarity_threshold: float = 0.95
) -> list:
filtered = [h for h in hits if (h.score or 0) >= score_threshold]
kept = []
for hit in filtered:
is_dup = any(
jaccard_similarity(k.text, hit.text) >= similarity_threshold
for k in kept
)
if not is_dup:
kept.append(hit)
return kept

Step 7: Build source-aware citations

Format the context for the LLM prompt with numbered citations tied to each source type.

def build_context_with_citations(hits: list) -> dict:
citations = []
context_lines = []

for i, hit in enumerate(hits):
ref = i + 1
label = getattr(hit, '__label', '')

if label == 'PDF_CHUNK':
citation = f'[{ref}] PDF: {hit.source} (chunk {hit.chunkIndex})'
elif label == 'WEB_CHUNK':
citation = f'[{ref}] Web: {hit.url}'
else:
citation = f'[{ref}] Database: {hit.entityType} ID {hit.entityId}'

citations.append(citation)
context_lines.append(f'[{ref}] {hit.text}')

return {
'context': '\n\n'.join(context_lines),
'citation_block': '\n'.join(citations)
}

Step 8: Synthesize with the LLM

from openai import OpenAI

openai = OpenAI()

def answer_with_sources(question: str) -> dict:
raw_hits = search_all_sources(question, limit=12)
hits = deduplicate_chunks(raw_hits)
ctx = build_context_with_citations(hits)

prompt = f"""You are a helpful assistant. Answer the question below using ONLY the provided context.
After your answer, list the sources you used as [1], [2], etc.

Context:
{ctx['context']}

Question: {question}

Answer:"""

completion = openai.chat.completions.create(
model='gpt-4o-mini',
messages=[{'role': 'user', 'content': prompt}]
)

return {
'answer': completion.choices[0].message.content,
'citations': ctx['citation_block']
}

result = answer_with_sources('What is the refund policy?')
print(result['answer'])
print('\nSources:\n' + result['citations'])

Full pipeline

The complete flow in one place:

ingestPdf()        →  PDF_CHUNK records
ingestWebPage() → WEB_CHUNK records
ingestDbSummaries()→ DB_SUMMARY records

db.embeddings.createIndex() × 3

db.ai.search(labels: all three)

deduplicateChunks(hits)

buildContextWithCitations(hits)

LLM.chat(prompt + context)

answer + numbered citations

Tips

  • Adjust chunk size per source type. PDFs may need smaller chunks (400–500 chars) for precision; web pages can tolerate 800–1000 chars for better context.
  • Filter by sourceType when the query implies a source. If the user asks "show me in the docs…", restrict labels to ['WEB_CHUNK'] to reduce noise.
  • Add a retrievedAt timestamp to web chunks to detect stale content and trigger re-ingestion.
  • Use where filters alongside ai.search to scope by date, author, or any other metadata field.

Next steps

  • RAG Evaluation — measure precision@k and recall@k across your pipeline
  • RAG Reranking — two-stage retrieval with cross-encoder scoring
  • GraphRAG — enrich chunks with graph context before synthesis