Skip to main content

Building a Graph-Backed API Layer

RushDB gives your application a graph database with a REST and SDK interface. Most production apps do not expose RushDB directly to the client — they expose a purpose-built API that translates user intent into RushDB queries, enforces access rules, and shapes the response.

This tutorial shows the patterns that work.


Why a translation layer?

  • Access control: the API enforces which records a user can query, narrowing where clauses with tenant or permission filters before calling RushDB
  • Schema stability: client contracts are stable even if your graph schema evolves
  • Response shaping: the API assembles multi-query responses into clean payloads before returning them
  • Query safety: user input is mapped to a validated query structure — not passed through to RushDB directly

Pattern 1: Accept user params, produce a validated SearchQuery

Never pass user-supplied JSON directly to records.find. Always construct the SearchQuery from validated inputs.

from rushdb import RushDB
from typing import Optional
import os

db = RushDB(os.environ["RUSHDB_API_KEY"], base_url="https://api.rushdb.com/api/v1")


def build_article_query(
tenant_id: str,
q: Optional[str] = None,
category: Optional[str] = None,
author_id: Optional[str] = None,
since: Optional[str] = None,
page: int = 0,
page_size: int = 20,
) -> dict:
where: dict = {"tenantId": tenant_id}

if category:
where["category"] = category
if author_id:
where["authorId"] = author_id
if since:
where["publishedAt"] = {"$gte": since}
if q:
where["title"] = {"$contains": q}

return {
"labels": ["ARTICLE"],
"where": where,
"orderBy": {"publishedAt": "desc"},
"skip": page * min(page_size, 100),
"limit": min(page_size, 100),
}


# Usage in a FastAPI handler
# from fastapi import FastAPI, Request
# app = FastAPI()
#
# @app.get("/articles")
# def get_articles(q: str = None, category: str = None, page: int = 0):
# tenant_id = get_tenant_id(request)
# query = build_article_query(tenant_id, q=q, category=category, page=page)
# result = db.records.find(query)
# return {"articles": result.data, "total": result.total}

Pattern 2: Multi-query response assembly

Some API responses require data from more than one query. Run them concurrently and assemble the response.

from concurrent.futures import ThreadPoolExecutor, as_completed


def get_project_detail(project_id: str, tenant_id: str):
def fetch_project():
return db.records.find({
"labels": ["PROJECT"],
"where": {"__id": project_id, "tenantId": tenant_id}
})

def fetch_tasks():
return db.records.find({
"labels": ["TASK"],
"where": {
"PROJECT": {
"$relation": {"type": "CONTAINS", "direction": "in"},
"__id": project_id
},
"status": {"$in": ["open", "in_progress"]}
},
"orderBy": {"dueDate": "asc"},
"limit": 10
})

def fetch_members():
return db.records.find({
"labels": ["MEMBER"],
"where": {
"PROJECT": {
"$relation": {"type": "ASSIGNED_TO", "direction": "out"},
"__id": project_id
},
"isActive": True
}
})

with ThreadPoolExecutor(max_workers=3) as executor:
fut_project = executor.submit(fetch_project)
fut_tasks = executor.submit(fetch_tasks)
fut_members = executor.submit(fetch_members)

project_result = fut_project.result()
tasks_result = fut_tasks.result()
members_result = fut_members.result()

if not project_result.data:
return None

return {
"project": project_result.data[0].data,
"openTasks": [t.data for t in tasks_result.data],
"members": [m.data for m in members_result.data]
}

Pattern 3: Metrics endpoints

Expose KPI endpoints that return counts, sums, and distributions — not raw records. Use the select and groupBy clauses for all metrics.

def get_task_status_breakdown(tenant_id: str, project_id: str | None = None):
where: dict = {"tenantId": tenant_id}
if project_id:
where["PROJECT"] = {
"__id": project_id,
"$relation": {"type": "CONTAINS", "direction": "in"}
}

result = db.records.find({
"labels": ["TASK"],
"where": where,
"select": {
"count": {"$count": "*"},
"status": "$record.status"
},
"groupBy": ["status", "count"],
"orderBy": {"count": "desc"}
})

return {"breakdown": result.data, "total": result.total}

Pattern 4: Safe delete with preview

Always offer a count-first preview before executing a delete, especially for bulk operations.

def delete_articles(tenant_id: str, filter_: dict, confirm: bool = False):
where = {"tenantId": tenant_id, **filter_}

if not confirm:
# Preview
preview = db.records.find({
"labels": ["ARTICLE"],
"where": where,
"select": {"count": {"$count": "*"}},
"groupBy": ["count"]
})
count = preview.data[0]["count"] if preview.data else 0
return {"dryRun": True, "count": count}

db.records.delete({"labels": ["ARTICLE"], "where": where})
return {"deleted": True}

Response shaping conventions

The patterns above follow these conventions:

  1. Never return __id as the external ID field name — map it to id in the response shape
  2. Never return internal RushDB metadata (__labels, __proptypes) in public API responses
  3. Always cap limit at a maximum page size (100 or less) even if the user passes a higher value
  4. Always inject tenant scope from the session token, never from user-supplied parameters
  5. Metrics responses return structured objects, not raw RushDB result arrays

Production caveat

SearchQuery where clauses support arbitrary nesting and traversal. In a public-facing API, always validate the filter shape that users supply — never spread user-provided objects directly into where without validation. An attacker who can inject arbitrary $or or traversal keys into your query can scan records they should not see. Build your where object programmatically from validated, typed parameters.


Next steps