Skip to main content

Incident Response Graphs

When something breaks in production, you need to answer four questions quickly:

  1. What is failing? — which services and components are affected
  2. Why? — the root cause and the causal chain to the symptom
  3. Who is affected? — impacted customers or downstream systems
  4. What happened during the response? — the investigation and resolution timeline

A flat alert table can answer the first question. It cannot answer the rest. A graph can answer all four from a single traversal.


Graph shape

LabelWhat it represents
ALERTA threshold breach or anomaly detection signal
INCIDENTThe declared incident record — owns status, severity, timestamps
SERVICEA component in your infrastructure or application graph
INCIDENT_EVENTAn ordered entry in the investigation timeline
ACTIONA remediation step taken to resolve the incident
CUSTOMERAn end-user or tenant affected by the incident

Step 1: Declare an incident from an alert

When an alert fires, create an INCIDENT record and link it to the alert and all known affected services in a single transaction.

import os
from datetime import datetime, timezone
from rushdb import RushDB

db = RushDB(os.environ["RUSHDB_API_KEY"], base_url="https://api.rushdb.com/api/v1")


def declare_incident(
alert_id: str,
affected_service_ids: list[str],
severity: str,
summary: str,
owner_id: str
):
tx = db.transactions.begin()
try:
incident = db.records.create("INCIDENT", {
"summary": summary,
"severity": severity,
"status": "active",
"ownerId": owner_id,
"declaredAt": datetime.now(timezone.utc).isoformat(),
"resolvedAt": None,
"rootCauseId": None
}, transaction=tx)

db.records.attach(alert_id, incident.id, {"type": "TRIGGERED", "direction": "out"}, transaction=tx)

for service_id in affected_service_ids:
db.records.attach(incident.id, service_id, {"type": "AFFECTS", "direction": "out"}, transaction=tx)

open_event = db.records.create("INCIDENT_EVENT", {
"type": "declared",
"note": f"Incident declared. Severity: {severity}. Owner: {owner_id}",
"actorId": owner_id,
"occurredAt": datetime.now(timezone.utc).isoformat()
}, transaction=tx)
db.records.attach(incident.id, open_event.id, {"type": "HAS_EVENT", "direction": "out"}, transaction=tx)

db.transactions.commit(tx)
print(f"Incident {incident.id} declared ({severity})")
return incident
except Exception:
db.transactions.rollback(tx)
raise


incident = declare_incident(
alert_id="alert-9f3a",
affected_service_ids=["svc-api-gateway", "svc-checkout"],
severity="P1",
summary="Checkout service returning 503s — payment flow unavailable",
owner_id="oncall-eng-007"
)

Step 2: Record the investigation timeline

As the incident evolves, append INCIDENT_EVENT records for every significant update — hypothesis formed, action taken, escalation. This creates an ordered, queryable timeline.

def log_incident_event(incident_id: str, event_type: str, note: str, actor_id: str):
tx = db.transactions.begin()
try:
event = db.records.create("INCIDENT_EVENT", {
"type": event_type,
"note": note,
"actorId": actor_id,
"occurredAt": datetime.now(timezone.utc).isoformat()
}, transaction=tx)
db.records.attach(incident_id, event.id, {"type": "HAS_EVENT", "direction": "out"}, transaction=tx)
db.transactions.commit(tx)
return event
except Exception:
db.transactions.rollback(tx)
raise


log_incident_event(incident.id, "hypothesis", "Suspect database connection pool exhaustion on checkout-db", "oncall-eng-007")
log_incident_event(incident.id, "action_taken", "Increased connection pool limit and restarted checkout service", "oncall-eng-007")
log_incident_event(incident.id, "escalation", "Escalated to DBA team for root cause confirmation", "oncall-eng-007")

Step 3: Identify root cause and measure blast radius

Mark the root cause service and traverse the graph to find which customers are downstream of the affected services.

def set_root_cause(incident_id: str, root_cause_service_id: str, note: str, actor_id: str):
tx = db.transactions.begin()
try:
db.records.update(incident_id, {"rootCauseId": root_cause_service_id}, transaction=tx)
db.records.attach(
incident_id, root_cause_service_id,
{"type": "ROOT_CAUSE", "direction": "out"},
transaction=tx
)
db.transactions.commit(tx)
log_incident_event(incident_id, "root_cause_identified", note, actor_id)
except Exception:
db.transactions.rollback(tx)
raise


set_root_cause(
incident.id,
"svc-checkout-db",
"Root cause: connection pool exhaustion on checkout-db due to slow query backlog",
"dba-lead"
)

# Blast radius
blast_radius = db.records.find({
"labels": ["CUSTOMER"],
"where": {
"SERVICE": {
"$relation": {"type": "SERVES", "direction": "in"},
"INCIDENT": {
"$relation": {"type": "AFFECTS", "direction": "in"},
"__id": incident.id
}
}
},
"select": {"count": {"$count": "*"}}
})

print(f"Blast radius: {blast_radius.data[0].data.get('count', 0)} customers affected")

Step 4: Resolve the incident and attach the post-mortem action

When the incident is resolved, update its status, record resolution time, and link the remediation action.

def resolve_incident(incident_id: str, resolution: dict, actor_id: str):
resolved_at = datetime.now(timezone.utc).isoformat()
tx = db.transactions.begin()
try:
db.records.update(incident_id, {"status": "resolved", "resolvedAt": resolved_at}, transaction=tx)

action = db.records.create("ACTION", {
"summary": resolution["summary"],
"actionsTaken": "; ".join(resolution["actionsTaken"]),
"preventionSteps": "; ".join(resolution["preventionSteps"]),
"createdAt": resolved_at,
"createdBy": actor_id
}, transaction=tx)

db.records.attach(incident_id, action.id, {"type": "RESOLVED_BY", "direction": "out"}, transaction=tx)
db.transactions.commit(tx)
log_incident_event(incident_id, "resolved", resolution["summary"], actor_id)
print(f"Incident {incident_id} resolved")
except Exception:
db.transactions.rollback(tx)
raise


resolve_incident(incident.id, {
"summary": "Increased DB connection pool, optimised slow query, deployed fix",
"actionsTaken": ["Increased pool limit to 200", "Killed blocking queries", "Deployed query index"],
"preventionSteps": ["Add pool alert at 80%", "Weekly slow query review"]
}, "oncall-eng-007")

Step 5: Query the full incident timeline

Reconstruct the complete investigation log ordered by occurrence time — useful for post-mortems and SLA reporting.

timeline = db.records.find({
"labels": ["INCIDENT_EVENT"],
"where": {
"INCIDENT": {
"$relation": {"type": "HAS_EVENT", "direction": "in"},
"__id": incident.id
}
},
"orderBy": {"occurredAt": "asc"}
})

print(f"\nIncident timeline ({len(timeline.data)} events):")
for event in timeline.data:
print(f" [{event.data.get('occurredAt')}] {event.data.get('type', '').upper()}{event.data.get('note')}")

Design rules

  1. Declare incidents immediately, refine later — create the INCIDENT record as soon as an alert fires; add ROOT_CAUSE and resolution details as they are discovered
  2. Log every decision as an INCIDENT_EVENT — hypotheses, actions taken, and escalations are part of the post-mortem record; do not rely on Slack or memory
  3. Use transactions for declaration — the incident + its initial events and service links must land atomically or not at all
  4. Store rootCauseId as a field — this enables simple cross-incident root cause aggregations without graph traversal
  5. Link AFFECTS broadly, refine to ROOT_CAUSE narrowly — at declaration time you may not know the root cause; AFFECTS edges are cheap to add as more services are confirmed impacted
  6. Never delete INCIDENT records — historical incidents are a training set for prevention; archive instead of delete

Next steps