Building a Graph-Backed API Layer
RushDB gives your application a graph database with a REST and SDK interface. Most production apps do not expose RushDB directly to the client — they expose a purpose-built API that translates user intent into RushDB queries, enforces access rules, and shapes the response.
This tutorial shows the patterns that work.
Why a translation layer?
- Access control: the API enforces which records a user can query, narrowing
whereclauses with tenant or permission filters before calling RushDB - Schema stability: client contracts are stable even if your graph schema evolves
- Response shaping: the API assembles multi-query responses into clean payloads before returning them
- Query safety: user input is mapped to a validated query structure — not passed through to RushDB directly
Pattern 1: Accept user params, produce a validated SearchQuery
Never pass user-supplied JSON directly to records.find. Always construct the SearchQuery from validated inputs.
- Python
- TypeScript
- shell
from rushdb import RushDB
from typing import Optional
import os
db = RushDB(os.environ["RUSHDB_API_KEY"], base_url="https://api.rushdb.com/api/v1")
def build_article_query(
tenant_id: str,
q: Optional[str] = None,
category: Optional[str] = None,
author_id: Optional[str] = None,
since: Optional[str] = None,
page: int = 0,
page_size: int = 20,
) -> dict:
where: dict = {"tenantId": tenant_id}
if category:
where["category"] = category
if author_id:
where["authorId"] = author_id
if since:
where["publishedAt"] = {"$gte": since}
if q:
where["title"] = {"$contains": q}
return {
"labels": ["ARTICLE"],
"where": where,
"orderBy": {"publishedAt": "desc"},
"skip": page * min(page_size, 100),
"limit": min(page_size, 100),
}
# Usage in a FastAPI handler
# from fastapi import FastAPI, Request
# app = FastAPI()
#
# @app.get("/articles")
# def get_articles(q: str = None, category: str = None, page: int = 0):
# tenant_id = get_tenant_id(request)
# query = build_article_query(tenant_id, q=q, category=category, page=page)
# result = db.records.find(query)
# return {"articles": result.data, "total": result.total}
import RushDB from '@rushdb/javascript-sdk'
import type { SearchQuery } from '@rushdb/javascript-sdk'
const db = new RushDB(process.env.RUSHDB_API_KEY!)
type ArticleSearchParams = {
q?: string
category?: string
authorId?: string
since?: string
page?: number
pageSize?: number
}
function buildArticleQuery(params: ArticleSearchParams, tenantId: string): SearchQuery {
const where: Record<string, unknown> = {
// Always scope to tenant — never trust user to supply this
tenantId
}
if (params.category) {
where.category = params.category
}
if (params.authorId) {
where.authorId = params.authorId
}
if (params.since) {
// Validate date format before using it
const date = new Date(params.since)
if (isNaN(date.getTime())) throw new Error('Invalid since date')
where.publishedAt = { $gte: params.since }
}
if (params.q) {
// Text search via $contains — not arbitrary Cypher
where.title = { $contains: params.q }
}
return {
labels: ['ARTICLE'],
where,
orderBy: { publishedAt: 'desc' },
skip: ((params.page ?? 0) * (params.pageSize ?? 20)),
limit: Math.min(params.pageSize ?? 20, 100) // cap at 100
}
}
// Usage in a route handler (e.g. Hono, Express, Next.js)
export async function getArticles(req: Request): Promise<Response> {
const tenantId = getTenantId(req) // from session/JWT
const url = new URL(req.url)
const params: ArticleSearchParams = {
q: url.searchParams.get('q') ?? undefined,
category: url.searchParams.get('category') ?? undefined,
authorId: url.searchParams.get('authorId') ?? undefined,
since: url.searchParams.get('since') ?? undefined,
page: Number(url.searchParams.get('page') ?? 0),
pageSize: Number(url.searchParams.get('pageSize') ?? 20)
}
const query = buildArticleQuery(params, tenantId)
const result = await db.records.find(query)
return Response.json({
articles: result.data.map(r => ({
id: r.__id,
title: r.title,
category: r.category,
publishedAt: r.publishedAt
})),
total: result.total,
page: params.page,
pageSize: params.pageSize
})
}
# The shell equivalent is building the query JSON and posting it
BASE="https://api.rushdb.com/api/v1"
TOKEN="RUSHDB_API_KEY"
H='Content-Type: application/json'
TENANT_ID="tenant-42"
CATEGORY="infrastructure"
curl -s -X POST "$BASE/records/search" \
-H "$H" -H "Authorization: Bearer $TOKEN" \
-d "{
\"labels\": [\"ARTICLE\"],
\"where\": {
\"tenantId\": \"$TENANT_ID\",
\"category\": \"$CATEGORY\"
},
\"orderBy\": {\"publishedAt\": \"desc\"},
\"limit\": 20
}"
Pattern 2: Multi-query response assembly
Some API responses require data from more than one query. Run them concurrently and assemble the response.
- Python
- TypeScript
- shell
from concurrent.futures import ThreadPoolExecutor, as_completed
def get_project_detail(project_id: str, tenant_id: str):
def fetch_project():
return db.records.find({
"labels": ["PROJECT"],
"where": {"__id": project_id, "tenantId": tenant_id}
})
def fetch_tasks():
return db.records.find({
"labels": ["TASK"],
"where": {
"PROJECT": {
"$relation": {"type": "CONTAINS", "direction": "in"},
"__id": project_id
},
"status": {"$in": ["open", "in_progress"]}
},
"orderBy": {"dueDate": "asc"},
"limit": 10
})
def fetch_members():
return db.records.find({
"labels": ["MEMBER"],
"where": {
"PROJECT": {
"$relation": {"type": "ASSIGNED_TO", "direction": "out"},
"__id": project_id
},
"isActive": True
}
})
with ThreadPoolExecutor(max_workers=3) as executor:
fut_project = executor.submit(fetch_project)
fut_tasks = executor.submit(fetch_tasks)
fut_members = executor.submit(fetch_members)
project_result = fut_project.result()
tasks_result = fut_tasks.result()
members_result = fut_members.result()
if not project_result.data:
return None
return {
"project": project_result.data[0].data,
"openTasks": [t.data for t in tasks_result.data],
"members": [m.data for m in members_result.data]
}
// GET /projects/:id — returns project + recent tasks + active members
export async function getProjectDetail(projectId: string, tenantId: string) {
const [projectResult, tasksResult, membersResult] = await Promise.all([
db.records.find({
labels: ['PROJECT'],
where: { __id: projectId, tenantId }
}),
db.records.find({
labels: ['TASK'],
where: {
PROJECT: {
$relation: { type: 'CONTAINS', direction: 'in' },
__id: projectId
},
status: { $in: ['open', 'in_progress'] }
},
orderBy: { dueDate: 'asc' },
limit: 10
}),
db.records.find({
labels: ['MEMBER'],
where: {
PROJECT: {
$relation: { type: 'ASSIGNED_TO', direction: 'out' },
__id: projectId
},
isActive: true
}
})
])
const project = projectResult.data[0]
if (!project) return null
return {
project: {
id: project.__id,
name: project.name,
status: project.status
},
openTasks: tasksResult.data.map(t => ({
id: t.__id,
title: t.title,
dueDate: t.dueDate,
status: t.status
})),
members: membersResult.data.map(m => ({
id: m.__id,
name: m.name,
role: m.role
}))
}
}
PROJECT_ID="proj-123"
TENANT_ID="tenant-42"
# Run concurrently with background processes
curl -s -X POST "$BASE/records/search" \
-H "$H" -H "Authorization: Bearer $TOKEN" \
-d "{\"labels\":[\"PROJECT\"],\"where\":{\"__id\":\"$PROJECT_ID\",\"tenantId\":\"$TENANT_ID\"}}" &
curl -s -X POST "$BASE/records/search" \
-H "$H" -H "Authorization: Bearer $TOKEN" \
-d "{\"labels\":[\"TASK\"],\"where\":{\"PROJECT\":{\"__id\":\"$PROJECT_ID\"},\"status\":{\"\$in\":[\"open\",\"in_progress\"]}},\"limit\":10,\"orderBy\":{\"dueDate\":\"asc\"}}" &
wait
Pattern 3: Metrics endpoints
Expose KPI endpoints that return counts, sums, and distributions — not raw records. Use the select and groupBy clauses for all metrics.
- Python
- TypeScript
- shell
def get_task_status_breakdown(tenant_id: str, project_id: str | None = None):
where: dict = {"tenantId": tenant_id}
if project_id:
where["PROJECT"] = {
"__id": project_id,
"$relation": {"type": "CONTAINS", "direction": "in"}
}
result = db.records.find({
"labels": ["TASK"],
"where": where,
"select": {
"count": {"$count": "*"},
"status": "$record.status"
},
"groupBy": ["status", "count"],
"orderBy": {"count": "desc"}
})
return {"breakdown": result.data, "total": result.total}
// GET /analytics/tasks-by-status?tenantId=...&projectId=...
export async function getTaskStatusBreakdown(tenantId: string, projectId?: string) {
const projectFilter = projectId
? { PROJECT: { __id: projectId, $relation: { type: 'CONTAINS', direction: 'in' } } }
: {}
const result = await db.records.find({
labels: ['TASK'],
where: {
tenantId,
...projectFilter
},
select: {
count: { $count: '*' },
status: '$record.status'
},
groupBy: ['status', 'count'],
orderBy: { count: 'desc' }
})
return {
breakdown: result.data,
total: result.total
}
}
TENANT_ID="tenant-42"
curl -s -X POST "$BASE/records/search" \
-H "$H" -H "Authorization: Bearer $TOKEN" \
-d "{
\"labels\": [\"TASK\"],
\"where\": {\"tenantId\": \"$TENANT_ID\"},
\"select\": {
\"count\": {\"$count\": \"*\"},
\"status\": \"\$record.status\"
},
\"groupBy\": [\"status\", \"count\"],
\"orderBy\": {\"count\": \"desc\"}
}"
Pattern 4: Safe delete with preview
Always offer a count-first preview before executing a delete, especially for bulk operations.
- Python
- TypeScript
- shell
def delete_articles(tenant_id: str, filter_: dict, confirm: bool = False):
where = {"tenantId": tenant_id, **filter_}
if not confirm:
# Preview
preview = db.records.find({
"labels": ["ARTICLE"],
"where": where,
"select": {"count": {"$count": "*"}},
"groupBy": ["count"]
})
count = preview.data[0]["count"] if preview.data else 0
return {"dryRun": True, "count": count}
db.records.delete({"labels": ["ARTICLE"], "where": where})
return {"deleted": True}
type DeletePreview = { count: number; dryRun: true }
async function previewDelete(tenantId: string, filter: Record<string, unknown>): Promise<DeletePreview> {
const result = await db.records.find({
labels: ['ARTICLE'],
where: { tenantId, ...filter },
select: { count: { $count: '*' } },
groupBy: ['count']
})
return { count: result.data[0]?.count as number ?? 0, dryRun: true }
}
async function executeDelete(tenantId: string, filter: Record<string, unknown>) {
return db.records.deleteMany({
labels: ['ARTICLE'],
where: { tenantId, ...filter }
})
}
// API handler
export async function deleteArticlesHandler(req: Request): Promise<Response> {
const { filter, confirm } = await req.json()
const tenantId = getTenantId(req)
if (!confirm) {
const preview = await previewDelete(tenantId, filter)
return Response.json({ preview })
}
await executeDelete(tenantId, filter)
return Response.json({ deleted: true })
}
TENANT_ID="tenant-42"
# Preview: count matching records first
curl -s -X POST "$BASE/records/search" \
-H "$H" -H "Authorization: Bearer $TOKEN" \
-d "{
\"labels\": [\"ARTICLE\"],
\"where\": {\"tenantId\": \"$TENANT_ID\", \"status\": \"archived\"},
\"select\": {\"count\": {\"$count\": \"*\"}},
\"groupBy\": [\"count\"]
}"
# Execute delete only after confirming the count
curl -s -X DELETE "$BASE/records" \
-H "$H" -H "Authorization: Bearer $TOKEN" \
-d "{
\"labels\": [\"ARTICLE\"],
\"where\": {\"tenantId\": \"$TENANT_ID\", \"status\": \"archived\"}
}"
Response shaping conventions
The patterns above follow these conventions:
- Never return
__idas the external ID field name — map it toidin the response shape - Never return internal RushDB metadata (
__labels,__proptypes) in public API responses - Always cap
limitat a maximum page size (100 or less) even if the user passes a higher value - Always inject tenant scope from the session token, never from user-supplied parameters
- Metrics responses return structured objects, not raw RushDB result arrays
Production caveat
SearchQuery where clauses support arbitrary nesting and traversal. In a public-facing API, always validate the filter shape that users supply — never spread user-provided objects directly into where without validation. An attacker who can inject arbitrary $or or traversal keys into your query can scan records they should not see. Build your where object programmatically from validated, typed parameters.
Next steps
- Hybrid Retrieval: Structured Filters Plus Semantic Search — combining filter and vector search in one handler
- Semantic Search for Multi-Tenant Products — tenant isolation at the storage layer
- Query Optimization and KU Efficiency — measuring and reducing compute cost