Multi-Source RAG
Real retrieval pipelines rarely draw from a single data source. This tutorial shows how to ingest PDFs, web pages, and database summaries as distinct record labels, then search across all of them in a single vector query — with source-aware citations in the final answer.
Architecture overview
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ PDF Chunks │ │ Web Chunks │ │ DB Summary │
│ PDF_CHUNK │ │ WEB_CHUNK │ │ DB_SUMMARY │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
└─────────────────┼─────────────────┘
│
ai.search across
all three labels
│
LLM synthesis
with citations
Each source type lands in its own RushDB label. This keeps data organized and lets you filter by source when needed, while a single ai.search call lets you retrieve relevantly from all of them simultaneously.
Prerequisites
- RushDB project with an embedding index configured (cloud managed or self-hosted with
RUSHDB_EMBEDDING_MODELset). - A PDF parsing library (we'll use
pdf-parsefor Node.js /pypdffor Python). - An LLM client for the synthesis step.
Step 1: Ingest PDFs as PDF_CHUNK
Chunk the PDF text and store each chunk as a separate record. Keep metadata (source, page) so you can cite the document later.
- Python
- JavaScript
import os
from rushdb import RushDB
from pypdf import PdfReader
db = RushDB(os.environ['RUSHDB_API_KEY'])
def chunk_text(text: str, size: int = 600, overlap: int = 80) -> list[str]:
chunks = []
start = 0
while start < len(text):
chunks.append(text[start:start + size])
start += size - overlap
return chunks
def ingest_pdf(file_path: str, doc_title: str):
reader = PdfReader(file_path)
full_text = '\n'.join(page.extract_text() or '' for page in reader.pages)
chunks = chunk_text(full_text)
db.records.import_json(
label='PDF_CHUNK',
data=[
{
'text': chunk,
'source': doc_title,
'chunkIndex': i,
'sourceType': 'pdf'
}
for i, chunk in enumerate(chunks)
]
)
print(f'Ingested {len(chunks)} chunks from "{doc_title}"')
ingest_pdf('./docs/annual-report.pdf', 'Annual Report 2024')
ingest_pdf('./docs/product-spec.pdf', 'Product Spec v3')
import RushDB from '@rushdb/javascript-sdk'
import pdfParse from 'pdf-parse'
import fs from 'node:fs'
const db = new RushDB(process.env.RUSHDB_API_KEY)
function chunkText(text, size = 600, overlap = 80) {
const chunks = []
let start = 0
while (start < text.length) {
chunks.push(text.slice(start, start + size))
start += size - overlap
}
return chunks
}
async function ingestPdf(filePath, docTitle) {
const buffer = fs.readFileSync(filePath)
const { text } = await pdfParse(buffer)
const chunks = chunkText(text)
await db.records.importJson({
label: 'PDF_CHUNK',
data: chunks.map((chunk, i) => ({
text: chunk,
source: docTitle,
chunkIndex: i,
sourceType: 'pdf'
}))
})
console.log(`Ingested ${chunks.length} chunks from "${docTitle}"`)
}
await ingestPdf('./docs/annual-report.pdf', 'Annual Report 2024')
await ingestPdf('./docs/product-spec.pdf', 'Product Spec v3')
Step 2: Ingest web pages as WEB_CHUNK
Fetch and chunk web content the same way. Add a url field for citation.
- Python
- JavaScript
import httpx
from bs4 import BeautifulSoup
def ingest_web_page(url: str):
response = httpx.get(url, follow_redirects=True)
soup = BeautifulSoup(response.text, 'html.parser')
text = ' '.join(soup.get_text().split())
chunks = chunk_text(text)
db.records.import_json(
label='WEB_CHUNK',
data=[
{
'text': chunk,
'url': url,
'chunkIndex': i,
'sourceType': 'web'
}
for i, chunk in enumerate(chunks)
]
)
print(f'Ingested {len(chunks)} chunks from {url}')
ingest_web_page('https://docs.rushdb.com/get-started/quickstart')
ingest_web_page('https://docs.rushdb.com/tutorials/graphrag')
import { JSDOM } from 'jsdom'
async function ingestWebPage(url) {
const res = await fetch(url)
const html = await res.text()
const dom = new JSDOM(html)
const text = dom.window.document.body.textContent ?? ''
const chunks = chunkText(text.replace(/\s+/g, ' ').trim())
await db.records.importJson({
label: 'WEB_CHUNK',
data: chunks.map((chunk, i) => ({
text: chunk,
url,
chunkIndex: i,
sourceType: 'web'
}))
})
console.log(`Ingested ${chunks.length} chunks from ${url}`)
}
await ingestWebPage('https://docs.rushdb.com/get-started/quickstart')
await ingestWebPage('https://docs.rushdb.com/tutorials/graphrag')
Step 3: Ingest database summaries as DB_SUMMARY
For structured data, generate a natural-language summary per entity and store it as a record. This makes relational data searchable via vector similarity.
- Python
- JavaScript
def ingest_db_summaries(products: list[dict]):
db.records.import_json(
label='DB_SUMMARY',
data=[
{
'text': (
f"{p['name']}: {p['description']}. "
f"Category: {p['category']}. "
f"Price: ${p['price']}. "
f"In stock: {p['stock']}."
),
'entityId': p['id'],
'entityType': 'product',
'sourceType': 'database'
}
for p in products
]
)
print(f'Ingested {len(products)} DB summaries')
products = fetch_products_from_database()
ingest_db_summaries(products)
// Example: products from a SQL database
async function ingestDbSummaries(products) {
await db.records.importJson({
label: 'DB_SUMMARY',
data: products.map(p => ({
text: `${p.name}: ${p.description}. Category: ${p.category}. Price: $${p.price}. In stock: ${p.stock}.`,
entityId: p.id,
entityType: 'product',
sourceType: 'database'
}))
})
console.log(`Ingested ${products.length} DB summaries`)
}
const products = await fetchProductsFromDatabase()
await ingestDbSummaries(products)
Step 4: Create embedding indexes
Create one embedding index per label. Each index covers the text property.
- Python
- JavaScript
labels = ['PDF_CHUNK', 'WEB_CHUNK', 'DB_SUMMARY']
for label in labels:
result = db.embeddings.create_index(label=label, property_name='text')
print(f'Created index for {label}: {result.id}')
const labels = ['PDF_CHUNK', 'WEB_CHUNK', 'DB_SUMMARY']
for (const label of labels) {
const { id } = await db.embeddings.createIndex({
label,
propertyName: 'text'
})
console.log(`Created index for ${label}: ${id}`)
}
The indexes backfill in the background. Poll status if you need to wait:
- Python
- JavaScript
import time
def wait_for_index(index_id: str):
while True:
index = db.embeddings.get_index(index_id)
if index.status == 'ready':
break
print(f'Index {index_id}: {index.status}')
time.sleep(3)
import { setTimeout } from 'timers/promises'
async function waitForIndex(indexId) {
while (true) {
const { status } = await db.embeddings.getIndex(indexId)
if (status === 'ready') break
console.log(`Index ${indexId}: ${status}`)
await setTimeout(3000)
}
}
Step 5: Cross-source search
A single ai.search call across multiple labels returns results ranked by vector similarity regardless of source type.
- Python
- JavaScript
def search_all_sources(query: str, limit: int = 8):
return db.ai.search(
query=query,
labels=['PDF_CHUNK', 'WEB_CHUNK', 'DB_SUMMARY'],
property_name='text',
limit=limit
)
hits = search_all_sources('how does the pricing model work?')
async function searchAllSources(query, limit = 8) {
const results = await db.ai.search({
query,
labels: ['PDF_CHUNK', 'WEB_CHUNK', 'DB_SUMMARY'],
propertyName: 'text',
limit
})
return results
}
const hits = await searchAllSources('how does the pricing model work?')
Source-specific retrieval
You can also query a single source when you know which to target:
- Python
- JavaScript
pdf_hits = db.ai.search(
query='annual revenue breakdown',
labels=['PDF_CHUNK'],
property_name='text',
limit=5
)
web_hits = db.ai.search(
query='quickstart guide',
labels=['WEB_CHUNK'],
property_name='text',
limit=5
)
// Only search the PDF corpus
const pdfHits = await db.ai.search({
query: 'annual revenue breakdown',
labels: ['PDF_CHUNK'],
propertyName: 'text',
limit: 5
})
// Only search web content
const webHits = await db.ai.search({
query: 'quickstart guide',
labels: ['WEB_CHUNK'],
propertyName: 'text',
limit: 5
})
Step 6: Score filtering and de-duplication
Drop low-confidence results and remove near-duplicate chunks before sending to the LLM.
- Python
- JavaScript
def jaccard_similarity(a: str, b: str) -> float:
set_a = set(a.lower().split())
set_b = set(b.lower().split())
if not set_a or not set_b:
return 0.0
return len(set_a & set_b) / len(set_a | set_b)
def deduplicate_chunks(
hits: list,
score_threshold: float = 0.70,
similarity_threshold: float = 0.95
) -> list:
filtered = [h for h in hits if (h.score or 0) >= score_threshold]
kept = []
for hit in filtered:
is_dup = any(
jaccard_similarity(k.text, hit.text) >= similarity_threshold
for k in kept
)
if not is_dup:
kept.append(hit)
return kept
function deduplicateChunks(hits, scoreThreshold = 0.70, similarityThreshold = 0.95) {
// Drop low-score results
const filtered = hits.filter(h => (h.__score ?? 0) >= scoreThreshold)
// Remove near-duplicates: skip a chunk if its text is too similar to one already kept
const kept = []
for (const hit of filtered) {
const isDuplicate = kept.some(k => jaccardSimilarity(k.text, hit.text) >= similarityThreshold)
if (!isDuplicate) kept.push(hit)
}
return kept
}
// Simple Jaccard similarity on word sets
function jaccardSimilarity(a, b) {
const setA = new Set(a.toLowerCase().split(/\s+/))
const setB = new Set(b.toLowerCase().split(/\s+/))
const intersection = [...setA].filter(w => setB.has(w)).length
const union = new Set([...setA, ...setB]).size
return intersection / union
}
Step 7: Build source-aware citations
Format the context for the LLM prompt with numbered citations tied to each source type.
- Python
- JavaScript
def build_context_with_citations(hits: list) -> dict:
citations = []
context_lines = []
for i, hit in enumerate(hits):
ref = i + 1
label = getattr(hit, '__label', '')
if label == 'PDF_CHUNK':
citation = f'[{ref}] PDF: {hit.source} (chunk {hit.chunkIndex})'
elif label == 'WEB_CHUNK':
citation = f'[{ref}] Web: {hit.url}'
else:
citation = f'[{ref}] Database: {hit.entityType} ID {hit.entityId}'
citations.append(citation)
context_lines.append(f'[{ref}] {hit.text}')
return {
'context': '\n\n'.join(context_lines),
'citation_block': '\n'.join(citations)
}
function buildContextWithCitations(hits) {
const citations = []
const contextLines = []
hits.forEach((hit, i) => {
const ref = i + 1
const sourceLabel = hit.__label
let citation
if (sourceLabel === 'PDF_CHUNK') {
citation = `[${ref}] PDF: ${hit.source} (chunk ${hit.chunkIndex})`
} else if (sourceLabel === 'WEB_CHUNK') {
citation = `[${ref}] Web: ${hit.url}`
} else {
citation = `[${ref}] Database: ${hit.entityType} ID ${hit.entityId}`
}
citations.push(citation)
contextLines.push(`[${ref}] ${hit.text}`)
})
return {
context: contextLines.join('\n\n'),
citationBlock: citations.join('\n')
}
}
Step 8: Synthesize with the LLM
- Python
- JavaScript
from openai import OpenAI
openai = OpenAI()
def answer_with_sources(question: str) -> dict:
raw_hits = search_all_sources(question, limit=12)
hits = deduplicate_chunks(raw_hits)
ctx = build_context_with_citations(hits)
prompt = f"""You are a helpful assistant. Answer the question below using ONLY the provided context.
After your answer, list the sources you used as [1], [2], etc.
Context:
{ctx['context']}
Question: {question}
Answer:"""
completion = openai.chat.completions.create(
model='gpt-4o-mini',
messages=[{'role': 'user', 'content': prompt}]
)
return {
'answer': completion.choices[0].message.content,
'citations': ctx['citation_block']
}
result = answer_with_sources('What is the refund policy?')
print(result['answer'])
print('\nSources:\n' + result['citations'])
import OpenAI from 'openai'
const openai = new OpenAI()
async function answerWithSources(question) {
const rawHits = await searchAllSources(question, 12)
const hits = deduplicateChunks(rawHits)
const { context, citationBlock } = buildContextWithCitations(hits)
const prompt = `You are a helpful assistant. Answer the question below using ONLY the provided context.
After your answer, list the sources you used as [1], [2], etc.
Context:
${context}
Question: ${question}
Answer:`
const completion = await openai.chat.completions.create({
model: 'gpt-4o-mini',
messages: [{ role: 'user', content: prompt }]
})
return {
answer: completion.choices[0].message.content,
citations: citationBlock
}
}
const { answer, citations } = await answerWithSources('What is the refund policy?')
console.log(answer)
console.log('\nSources:\n' + citations)
Full pipeline
The complete flow in one place:
ingestPdf() → PDF_CHUNK records
ingestWebPage() → WEB_CHUNK records
ingestDbSummaries()→ DB_SUMMARY records
↓
db.embeddings.createIndex() × 3
↓
db.ai.search(labels: all three)
↓
deduplicateChunks(hits)
↓
buildContextWithCitations(hits)
↓
LLM.chat(prompt + context)
↓
answer + numbered citations
Tips
- Adjust chunk size per source type. PDFs may need smaller chunks (400–500 chars) for precision; web pages can tolerate 800–1000 chars for better context.
- Filter by
sourceTypewhen the query implies a source. If the user asks "show me in the docs…", restrict labels to['WEB_CHUNK']to reduce noise. - Add a
retrievedAttimestamp to web chunks to detect stale content and trigger re-ingestion. - Use
wherefilters alongsideai.searchto scope by date, author, or any other metadata field.
Next steps
- RAG Evaluation — measure precision@k and recall@k across your pipeline
- RAG Reranking — two-stage retrieval with cross-encoder scoring
- GraphRAG — enrich chunks with graph context before synthesis