Skip to main content

Event-Driven Ingestion from Webhooks and Queues

Webhooks and message queues deliver events at least once — occasionally more than once, occasionally out of order. Naive insertion on every delivery duplicates records, splits relationship graphs, and corrupts summary metrics.

This tutorial shows idempotent ingestion patterns: look up before you write, write only once, always link atomically.


The fundamental problem

The solution is find-then-create: search for the record by its natural key before inserting. If it already exists, update it. If it does not, create it. Do both inside a transaction so no two workers race to create the same record simultaneously.


Step 1: Idempotent upsert of a single event

from rushdb import RushDB
import os

db = RushDB(os.environ["RUSHDB_API_KEY"], base_url="https://api.rushdb.com/api/v1")

def upsert_order(event: dict) -> str:
tx = db.transactions.begin()
try:
existing = db.records.find({
"labels": ["ORDER"],
"where": {"orderId": event["orderId"]}
})

if existing.data:
db.records.update(existing.data[0].id, {"status": event["status"]}, transaction=tx)
order_id = existing.data[0].id
else:
record = db.records.create("ORDER", event, transaction=tx)
order_id = record.id

db.transactions.commit(tx)
return order_id
except Exception as e:
db.transactions.rollback(tx)
raise
Always use a natural key

Pick a field that is unique and immutable per event — orderId, eventId, messageId. Never use a mutable field like status or updatedAt as the deduplication key.


Step 2: Idempotent upsert with relationship creation

Events often carry implicit relationships. A checkout.completed webhook references both a customer ID and the order itself. Link them atomically.

def handle_checkout(event: dict) -> None:
tx = db.transactions.begin()
try:
existing = db.records.find({"labels": ["ORDER"], "where": {"orderId": event["orderId"]}})

if existing.data:
order = existing.data[0]
db.records.update(order.id, {"status": event["status"]}, transaction=tx)
else:
order = db.records.create("ORDER", {
"orderId": event["orderId"],
"status": event["status"],
"totalUsd": event["totalUsd"],
"createdAt": event["createdAt"]
}, transaction=tx)

customers = db.records.find({"labels": ["CUSTOMER"], "where": {"customerId": event["customerId"]}})
if not customers.data:
raise ValueError(f"CUSTOMER {event['customerId']} not found")

already_linked = db.records.find({
"labels": ["CUSTOMER"],
"where": {
"customerId": event["customerId"],
"ORDER": {
"$relation": {"type": "PLACED", "direction": "out"},
"orderId": event["orderId"]
}
}
})

if not already_linked.data:
db.records.attach(customers.data[0].id, order.id, {"type": "PLACED", "direction": "out"}, transaction=tx)

db.transactions.commit(tx)
except Exception:
db.transactions.rollback(tx)
raise

Step 3: Bulk ingestion from a queue batch

Message queues often deliver events in batches. Use importJson for the record layer, then link in a second pass.

def flush_pageviews(events: list[dict]) -> None:
if not events:
return

# Deduplicate before writing
seen = {}
for e in events:
key = f"{e['sessionId']}:{e['url']}:{e['timestamp']}"
seen[key] = e

db.records.import_json({"label": "PAGEVIEW", "data": list(seen.values())})

Step 4: Out-of-order event handling

Some pipelines deliver events out of chronological order. Guard against overwriting a later state with an earlier one.

def apply_order_status(order_id: str, new_status: str, event_at: str) -> None:
result = db.records.find({"labels": ["ORDER"], "where": {"orderId": order_id}})

if not result.data:
db.records.create("ORDER", {"orderId": order_id, "status": new_status, "lastEventAt": event_at})
return

existing = result.data[0]
last_event_at = existing.data.get("lastEventAt")
if last_event_at and event_at <= last_event_at:
print(f"Skipping stale event for {order_id}: {event_at} <= {last_event_at}")
return

db.records.update(existing.id, {"status": new_status, "lastEventAt": event_at})

Production checklist

ConcernPractice
Duplicate eventsFind by natural key before creating
Duplicate edgesCheck relationship exists before attach()
Out-of-order stateCompare lastEventAt timestamps before updating
Partial failuresWrap multi-record writes in a transaction
Large queue spikesUse importJson for batches; don't loop create()

Next steps