Skip to main content

End-to-End Data Lineage: From Source to Answer

When a dashboard shows a number, a sales rep acts on it. When a model generates a recommendation, an engineer ships it. Lineage answers the question that always follows: where did this come from?

This tutorial models a data pipeline as a graph:

  • SOURCE nodes represent raw data origins (database dumps, API responses, uploaded files)
  • ARTIFACT nodes represent transformed or derived outputs (cleaned datasets, enriched records, summaries)
  • PIPELINE_RUN nodes capture when and how a transformation happened
  • ANSWER nodes represent final outputs — responses, reports, or decisions — along with the artifacts they drew from

Every node is connected by typed relationships, so any output can be traced back to its origin in a single traversal query.


Lineage graph shape


Step 1: Register source records

from rushdb import RushDB

db = RushDB("RUSHDB_API_KEY", base_url="https://api.rushdb.com/api/v1")

crm_export = db.records.create("SOURCE", {
"name": "crm-export-2025-03-01",
"origin": "Salesforce",
"format": "csv",
"rowCount": 14200,
"capturedAt": "2025-03-01T00:00:00Z",
"checksum": "sha256:abc123"
})

billing_snapshot = db.records.create("SOURCE", {
"name": "billing-snapshot-Q1",
"origin": "Stripe",
"format": "json",
"rowCount": 3800,
"capturedAt": "2025-03-31T23:59:59Z",
"checksum": "sha256:def456"
})

Step 2: Record a pipeline run

A PIPELINE_RUN captures the job identity, code version, and run metadata. It connects upstream sources to downstream artifacts.

enrich_run = db.records.create("PIPELINE_RUN", {
"runId": "enrich-contacts-v2-20250301",
"pipelineName": "enrich-contacts",
"version": "v2.4.1",
"startedAt": "2025-03-01T01:00:00Z",
"finishedAt": "2025-03-01T01:47:23Z",
"status": "success",
"triggeredBy": "scheduler"
})

db.records.attach(crm_export.id, enrich_run.id, {"type": "FEEDS"})
db.records.attach(billing_snapshot.id, enrich_run.id, {"type": "FEEDS"})

Step 3: Register the produced artifact

enriched = db.records.create("ARTIFACT", {
"name": "contacts-enriched-20250301",
"type": "dataset",
"rowCount": 13940,
"storagePath": "s3://data-lake/enriched/contacts-2025-03-01.parquet",
"createdAt": "2025-03-01T01:47:23Z",
"schema": "contacts-enriched-v3"
})

db.records.attach(enrich_run.id, enriched.id, {"type": "PRODUCED"})

Step 4: Chain another pipeline run on top

A second pipeline reads the enriched artifact and produces a summary.

summary_run = db.records.create("PIPELINE_RUN", {
"runId": "summary-q1-20250401",
"pipelineName": "generate-summary",
"version": "v1.2.0",
"status": "success",
"triggeredBy": "manual"
})

summary_artifact = db.records.create("ARTIFACT", {
"name": "q1-contact-summary",
"type": "report",
"storagePath": "s3://reports/q1-2025/contact-summary.json",
"createdAt": "2025-04-01T08:11:42Z"
})

db.records.attach(enriched.id, summary_run.id, {"type": "FEEDS"})
db.records.attach(summary_run.id, summary_artifact.id, {"type": "PRODUCED"})

An ANSWER is the final output — an LLM response, a dashboard stat, or a filed report — linked to the artifacts it drew from.

answer = db.records.create("ANSWER", {
"answerId": "report-q1-retention",
"type": "retention-report",
"generatedAt": "2025-04-02T09:00:00Z",
"generatedBy": "analytics-agent-v3",
"content": "Q1 2025: churn rate dropped 4.2% YoY."
})

db.records.attach(summary_artifact.id, answer.id, {"type": "CITED_BY"})

Step 6: Trace an answer back to its raw sources

Given an answer ID, walk the full lineage chain to recover all upstream sources.

lineage = db.records.find({
"labels": ["SOURCE"],
"where": {
"PIPELINE_RUN": {
"$alias": "$run",
"$relation": {"type": "FEEDS", "direction": "out"},
"ARTIFACT": {
"$alias": "$artifact",
"$relation": {"type": "PRODUCED", "direction": "in"},
"PIPELINE_RUN": {
"ARTIFACT": {
"$alias": "$finalArt",
"$relation": {"type": "PRODUCED", "direction": "in"},
"ANSWER": {
"answerId": "report-q1-retention"
}
}
}
}
}
},
"select": {
"sourceName": "$record.name",
"origin": "$record.origin",
"capturedAt": "$record.capturedAt",
"runId": "$run.runId"
}
})

Step 7: Find all failed pipeline runs and their downstream artifacts

failed_downstream = db.records.find({
"labels": ["ARTIFACT"],
"where": {
"PIPELINE_RUN": {
"$alias": "$run",
"$relation": {"type": "PRODUCED", "direction": "in"},
"status": "failed"
}
},
"select": {
"artifactName": "$record.name",
"failedRunId": "$run.runId",
"failedAt": "$run.finishedAt"
},
"orderBy": {"failedAt": "desc"}
})

Production caveat

Multi-hop lineage queries will scan the entire reachable subgraph unless you scope them. Always filter by a narrow property on the starting label — answerId, runId, or a date range on capturedAt — before the traversal starts. Deep chains (more than four hops) can be expensive. For very long chains, consider materializing intermediate lineage summaries as ARTIFACT metadata instead of relying on traversal alone.


Next steps