Skip to main content

Importing from MongoDB

This tutorial walks through moving MongoDB data into RushDB. It covers:

  • One-shot bulk import — dump a collection and push it in one call
  • Embedded documents and arrays — how importJson handles nesting automatically
  • Incremental sync with upsert — run the same script on a schedule without creating duplicates
  • Cross-collection references — link documents by ObjectId using relationships.createMany
  • Change streams — react to live MongoDB writes and mirror them into RushDB

Prerequisites

npm install @rushdb/javascript-sdk mongodb dotenv
RUSHDB_API_KEY=your_rushdb_key
MONGO_URI=mongodb+srv://user:pass@cluster.mongodb.net/acme

Why importJson instead of createMany?

MongoDB documents are rarely flat. A typical users document might look like:

{
"_id": "64f1a...",
"name": "Ada Lovelace",
"email": "ada@example.com",
"address": { "city": "London", "country": "UK" },
"orders": [
{ "_id": "64f2b...", "total": 149.99, "status": "shipped" },
{ "_id": "64f2c...", "total": 29.99, "status": "delivered" }
]
}

records.createMany only accepts flat rows. records.importJson handles nested objects and arrays by recursively creating child records and linking them in the graph. The key you use for nested arrays becomes the label of the child records.


1. One-shot bulk import

The simplest path: dump a collection, reshape each document, push everything to RushDB.

from pymongo import MongoClient
from rushdb import RushDB
import os

db = RushDB(os.environ["RUSHDB_API_KEY"])

mongo = MongoClient(os.environ["MONGO_URI"])
mdb = mongo.get_default_database()

# 1. Fetch
users = list(mdb["users"].find({}))

# 2. Reshape
payload = [
{
"mongoId": str(u["_id"]),
"name": u.get("name"),
"email": u.get("email"),
"city": u.get("address", {}).get("city"),
"country": u.get("address", {}).get("country"),
"Order": [
{"mongoId": str(o["_id"]), "total": o["total"], "status": o["status"]}
for o in u.get("orders", [])
],
}
for u in users
]

# 3. Import
db.records.import_json(
label="User",
data=payload,
options={"suggestTypes": True, "returnResult": False},
)

print(f"Imported {len(users)} users with their orders")
mongo.close()

After this runs, RushDB contains:

  • One User record per MongoDB user document
  • One Order record per embedded order, automatically linked to its parent user

2. Incremental sync with upsert

Add mergeBy and mergeStrategy to make subsequent runs idempotent. The script can run on a cron without creating duplicates.

from datetime import datetime, timedelta, timezone

def incremental_sync():
mongo = MongoClient(os.environ["MONGO_URI"])
mdb = mongo.get_default_database()

since = datetime.now(timezone.utc) - timedelta(hours=1)
users = list(mdb["users"].find({"updatedAt": {"$gte": since}}))

if not users:
print("No updates since", since.isoformat())
mongo.close()
return

payload = [
{
"mongoId": str(u["_id"]),
"name": u.get("name"),
"email": u.get("email"),
"city": u.get("address", {}).get("city"),
"country": u.get("address", {}).get("country"),
"Order": [
{"mongoId": str(o["_id"]), "total": o["total"], "status": o["status"]}
for o in u.get("orders", [])
],
}
for u in users
]

db.records.import_json(
label="User",
data=payload,
options={"suggestTypes": True, "mergeBy": ["mongoId"], "mergeStrategy": "append"},
)

print(f"Synced {len(users)} updated users")
mongo.close()
mergeStrategy options
  • append — adds/updates provided fields, keeps any others already in RushDB. Best for incremental enrichment.
  • rewrite — replaces all own properties with the incoming set. Best when RushDB should be an exact mirror of the source.

3. Cross-collection references

When orders live in a separate collection and reference users by userId (an ObjectId), use the "import then link" pattern.

def import_with_references():
mongo = MongoClient(os.environ["MONGO_URI"])
mdb = mongo.get_default_database()

users = list(mdb["users"].find({}))
orders = list(mdb["orders"].find({}))

# 1) Import Users
db.records.create_many(
label="User",
data=[{"mongoId": str(u["_id"]), "name": u["name"], "email": u["email"]} for u in users],
options={"suggestTypes": True, "mergeBy": ["mongoId"], "mergeStrategy": "append"},
)

# 2) Import Orders
db.records.create_many(
label="Order",
data=[
{
"mongoId": str(o["_id"]),
"userMongoId": str(o["userId"]),
"total": o["total"],
"status": o["status"],
"createdAt": o.get("createdAt", "").isoformat() if o.get("createdAt") else None,
}
for o in orders
],
options={"suggestTypes": True, "mergeBy": ["mongoId"], "mergeStrategy": "append"},
)

# 3) Link
db.relationships.create_many(
source={"label": "User", "key": "mongoId"},
target={"label": "Order", "key": "userMongoId"},
type="PLACED",
direction="out",
)

print("Import and link complete")
mongo.close()
Why store the reference as a string?

RushDB joins on property value equality. MongoDB's ObjectId must be converted to String() before storing so the join User.mongoId = Order.userMongoId works correctly — both sides must be the same type.


4. Deeply nested collections

If your documents have multi-level nesting (e.g. orders containing line items containing product info), nest the keys accordingly. importJson handles arbitrary depth.

payload = [
{
"mongoId": str(o["_id"]),
"total": o["total"],
"LineItem": [
{
"mongoId": str(item["_id"]),
"quantity": item["qty"],
"unitPrice": item["price"],
"Product": [
{"mongoId": str(item["product"]["_id"]), "name": item["product"]["name"], "sku": item["product"]["sku"]}
] if item.get("product") else [],
}
for item in o.get("items", [])
],
}
for o in orders
]

db.records.import_json(
label="Order",
data=payload,
options={"suggestTypes": True, "mergeBy": ["mongoId"], "mergeStrategy": "append"},
)

This produces the graph: Order → LineItem → Product.


5. Change streams (real-time sync)

MongoDB change streams let you mirror writes into RushDB as they happen, without polling.

import os
from pymongo import MongoClient
from rushdb import RushDB

db = RushDB(os.environ["RUSHDB_API_KEY"])

def watch_collection():
mongo = MongoClient(os.environ["MONGO_URI"])
collection = mongo.get_default_database()["users"]

with collection.watch(full_document="updateLookup") as stream:
print("Watching users collection for changes...")
for event in stream:
op = event["operationType"]

if op in ("insert", "replace", "update"):
doc = event.get("fullDocument")
if not doc:
continue
db.records.upsert(
label="User",
data={
"mongoId": str(doc["_id"]),
"name": doc.get("name"),
"email": doc.get("email"),
"city": (doc.get("address") or {}).get("city"),
},
options={"suggestTypes": True, "mergeBy": ["mongoId"], "mergeStrategy": "append"},
)

elif op == "delete":
mongo_id = str(event["documentKey"]["_id"])
db.records.delete({"labels": ["User"], "where": {"mongoId": mongo_id}})

watch_collection()
Replica set required

Change streams require MongoDB to be running as a replica set (or MongoDB Atlas). They are not available on standalone mongod instances.


6. Batching large collections

For collections with millions of documents, process in batches to avoid memory pressure and respect API rate limits.

def import_large_collection(batch_size=500):
mongo = MongoClient(os.environ["MONGO_URI"])
collection = mongo.get_default_database()["products"]

skip = 0
imported = 0

while True:
batch = list(collection.find({}).skip(skip).limit(batch_size))
if not batch:
break

payload = [
{
"mongoId": str(p["_id"]),
"sku": p["sku"],
"name": p["name"],
"price": p["price"],
"category": p["category"],
"tags": p.get("tags", []),
}
for p in batch
]

db.records.create_many(
label="Product",
data=payload,
options={"suggestTypes": True, "mergeBy": ["mongoId"], "mergeStrategy": "append"},
)

imported += len(batch)
skip += batch_size
print(f"Imported {imported} products")

mongo.close()

7. Full import example

from pymongo import MongoClient
from rushdb import RushDB

db = RushDB("RUSHDB_API_KEY")
mongo = MongoClient("mongodb+srv://...")
mdb = mongo["acme"]

# Fetch and reshape
users = list(mdb["users"].find({}))
payload = [
{
"mongoId": str(u["_id"]),
"name": u.get("name"),
"email": u.get("email"),
"Order": [
{"mongoId": str(o["_id"]), "total": o["total"], "status": o["status"]}
for o in u.get("orders", [])
],
}
for u in users
]

# Upsert into RushDB
db.records.import_json(
label="User",
data=payload,
options={"suggestTypes": True, "mergeBy": ["mongoId"], "mergeStrategy": "append"},
)

print(f"Imported {len(users)} users")
mongo.close()

Troubleshooting

SymptomLikely causeFix
createMany throws "not a flat object"Document has embedded objects or arraysUse records.importJson instead
Child records created with wrong labelNested array key name not matching desired labelRename the key in the reshape step (e.g. rename ordersOrder)
Duplicate records after re-importmergeBy not setAdd mergeBy: ['mongoId'] to options
Join not linking recordsObjectId not converted to stringEnsure both sides use String(objectId)
Change stream fullDocument is nullupdateLookup not enabled or update is partialUse { fullDocument: 'updateLookup' } in the watch options
Import dies on large collectionsMemory exhausted on .toArray()Use cursor pagination with .skip() / .limit() batching

See also