Pipeviz
Easy, elegant lineage from a single .json
An open source JSON spec for lineage. Declare your pipelines, get beautiful graphs.
{
"pipelines": [{
"name": "etl-job",
"input_sources": ["raw_events"],
"output_sources": ["cleaned_events"]
}]
}
pipeviz.json describing your pipelines and data sourcesEach pipeline declares what it reads and writes. Pipeviz stitches these into a complete graph. From there, you get some superpowers for free:
Other solutions ask for a lot. OpenLineage/Marquez need agents, a metadata store, scheduler integration. Atlas wants a governance platform. dbt couples you to their framework. Pipeviz asks for one JSON file.
Why Clojure
Pipeviz is written in Clojure/ClojureScript, a Lisp that runs on the JVM and compiles to JavaScript, for two main reasons:
?url=https://yoursite.com/pipeviz.json
Load a configuration to see your pipelines
Load a configuration to see your datasources
Node Details
Load a configuration to see the export...
Merging Pipeviz Configs
When your data platform spans multiple teams or repositories, you can maintain separate
config files and merge them into a single Pipeviz config using jq.
Basic Merge (Two Files)
Combine pipelines and datasources from two config files:
jq -s '{
pipelines: (.[0].pipelines + .[1].pipelines),
datasources: (.[0].datasources + .[1].datasources)
}' team_a.json team_b.json > combined.json
Merge Multiple Files
Merge all *.pipeviz.json files in a directory:
jq -s '{
pipelines: [.[].pipelines[]] | unique_by(.name),
datasources: [.[].datasources[]] | unique_by(.name)
}' configs/*.pipeviz.json > combined.json
Merge with Deduplication
When configs might have overlapping definitions, dedupe by name (last write wins):
jq -s '{
pipelines: (
[.[].pipelines[]] | group_by(.name) | map(last)
),
datasources: (
[.[].datasources[]] | group_by(.name) | map(last)
)
}' *.json > merged.json
Smart Prefix (Only on Collision)
Only add prefixes when names actually collide:
# Find colliding names, prefix only those
jq -s '
[
(.[0].pipelines | map(. + {_src: "a"})),
(.[1].pipelines | map(. + {_src: "b"}))
] | add
| group_by(.name)
| map(
if length > 1 then
map(.name = ._src + "_" + .name)
else . end
)
| add
| map(del(._src))
' team_a.json team_b.json
Self-Hosting Pipeviz
Run a server that serves your config as a live dashboard.
Quick Start
Run the server with your config file:
# Start server on port 3000 with your config
clj -M:server 3000 path/to/your-config.json
Then open http://localhost:3000 in your browser.
Server Options
# Default port (8080) with config
clj -M:server path/to/config.json
# Custom port
clj -M:server 3000 path/to/config.json
# Multiple configs (merged automatically)
clj -M:server 3000 team_a.json team_b.json
REPL-Driven Development
Connect to the nREPL for live exploration and extension:
# Start shadow-cljs in watch mode
npx shadow-cljs watch app
# Connect to the REPL from another terminal
npx shadow-cljs cljs-repl app
Extending the Server
The server exposes a simple HTTP API. Add custom endpoints in src/pipeviz/server/main.clj:
;; Example: Add a custom analysis endpoint
(defn custom-handler [config]
(fn [req]
(case (:uri req)
"/api/custom" {:status 200
:headers {"Content-Type" "application/json"}
:body (json/write-str (analyze config))}
nil)))
OpenAPI Spec
The server includes an OpenAPI specification at resources/openapi.json:
# Endpoints documented in the spec:
GET /api/config # Returns the loaded Pipeviz config
GET /api/health # Health check endpoint
Use this spec with Swagger UI or to generate API clients.
Production Deployment
Build a standalone bundle for deployment:
# Build optimized JS bundle
npx shadow-cljs release release
# Bundle into single HTML file
./scripts/bundle.sh
# Output: dist/pipeviz.html (single-file, no server needed)
pipeviz.html is a single file with all JS/CSS inlined.
Perfect for hosting on GitHub Pages, S3, or embedding in internal tools.
Pipeviz JSON Spec
Only pipelines is required. Clusters and datasources are auto-created when referenced.
Root
| Field | Type | Required | Description |
|---|---|---|---|
pipelines | array | Yes | Jobs that transform or move data |
datasources | array | No | Tables, files, streams, APIs |
clusters | array | No | Groups for visual organization |
{
"clusters": [...],
"pipelines": [...],
"datasources": [...]
}
Pipeline
| Field | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Unique identifier |
description | string | No | |
input_sources | array | No | Datasources read from |
output_sources | array | No | Datasources written to |
upstream_pipelines | array | No | Pipeline or group names (orange edges) |
cluster | string | No | Cluster name |
group | string | No | Collapse into group node |
schedule | string | No | Free text |
tags | array | No | |
links | object | No | name to URL |
duration | number | No | Runtime in minutes (for critical path) |
cost | number | No | Cost per run (for spend analysis) |
{
"name": "user-enrichment",
"description": "Enriches user data with events",
"input_sources": ["raw_users", "events"],
"output_sources": ["enriched_users"],
"upstream_pipelines": ["data-ingestion"],
"cluster": "user-processing",
"group": "etl-jobs",
"schedule": "Every 2 hours",
"duration": 45,
"cost": 12.50,
"tags": ["user-data", "ml"],
"links": {
"airflow": "https://...",
"docs": "https://..."
}
}
Datasource
Auto-created when referenced. Define explicitly to add metadata.
| Field | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Unique identifier |
description | string | No | |
type | string | No | snowflake, postgres, kafka, s3... |
owner | string | No | |
cluster | string | No | |
tags | array | No | |
metadata | object | No | Arbitrary key-value |
links | object | No | name to URL |
attributes | array | No | Column-level lineage |
{
"name": "raw_users",
"description": "Raw user data from prod",
"type": "snowflake",
"owner": "data-team@company.com",
"cluster": "user-processing",
"tags": ["pii", "users"],
"metadata": {
"size": "2.1TB",
"record_count": "45M"
},
"links": {
"snowflake": "https://...",
"docs": "https://..."
}
}
Cluster
Auto-created when referenced. Define explicitly for nesting.
| Field | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Unique identifier |
description | string | No | |
parent | string | No | Parent cluster for nesting |
{
"name": "realtime",
"description": "Real-time processing cluster",
"parent": "order-processing"
}
Attribute Lineage
Add attributes to a datasource. Supports nesting for structs/objects. Reference upstream with source::attr or source::parent::child.
| Field | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Column/field name |
from | string or array | No | Upstream refs |
attributes | array | No | Nested child attributes |
{
"name": "enriched_users",
"attributes": [
{ "name": "user_id", "from": "raw_users::id" },
{
"name": "location",
"from": "raw_users::address",
"attributes": [
{ "name": "city", "from": "raw_users::address::city" },
{ "name": "zip", "from": "raw_users::address::zip" }
]
}
]
}
Full Example
{
"clusters": [
{ "name": "etl", "description": "ETL pipelines" }
],
"pipelines": [
{
"name": "user-enrichment",
"description": "Enriches user data with events",
"input_sources": ["raw_users", "events"],
"output_sources": ["enriched_users"],
"cluster": "etl",
"schedule": "Every 2 hours",
"tags": ["user-data"],
"links": { "airflow": "https://..." }
}
],
"datasources": [
{
"name": "raw_users",
"type": "snowflake",
"owner": "data-team@company.com",
"attributes": [
{ "name": "id" },
{ "name": "first" },
{ "name": "last" }
]
},
{
"name": "enriched_users",
"type": "snowflake",
"attributes": [
{ "name": "user_id", "from": "raw_users::id" },
{ "name": "full_name", "from": ["raw_users::first", "raw_users::last"] }
]
}
]
}
Edge Types
| Color | Source | Description |
|---|---|---|
| Gray ā | input_sources / output_sources | Data flow through datasources |
| Orange ā | upstream_pipelines | Direct pipeline dependency |