ReferenceAPI ReferencePipelines

Pipelines

Pipelines are the core resource in RAT. A pipeline defines a data transformation — a SQL query or Python script that reads from source tables and writes to a target Iceberg table. Pipelines are organized by namespace and layer (bronze, silver, gold).


Endpoints

MethodEndpointDescription
GET/api/v1/pipelinesList all pipelines
POST/api/v1/pipelinesCreate a new pipeline
GET/api/v1/pipelines/{ns}/{layer}/{name}Get pipeline details
PUT/api/v1/pipelines/{ns}/{layer}/{name}Update a pipeline
DELETE/api/v1/pipelines/{ns}/{layer}/{name}Soft-delete a pipeline
POST/api/v1/pipelines/{ns}/{layer}/{name}/previewDry-run (preview) a pipeline
POST/api/v1/pipelines/{ns}/{layer}/{name}/publishSnapshot and version a pipeline
GET/api/v1/pipelines/{ns}/{layer}/{name}/versionsList pipeline versions
GET/api/v1/pipelines/{ns}/{layer}/{name}/versions/{number}Get a specific version
POST/api/v1/pipelines/{ns}/{layer}/{name}/rollbackRollback to a previous version

List Pipelines

GET /api/v1/pipelines

Returns a paginated list of all pipelines, optionally filtered by namespace, layer, or search term.

Query Parameters

ParameterTypeDefaultDescription
namespacestringFilter by namespace
layerstringFilter by layer (bronze, silver, gold)
searchstringSearch pipelines by name (partial match)
sortstringcreated_atSort field (name, created_at, updated_at)
orderstringdescSort direction (asc, desc)
limitinteger50Items per page (max 200)
offsetinteger0Number of items to skip

Request

curl "http://localhost:8080/api/v1/pipelines?namespace=default&layer=silver&limit=10"

Response — 200 OK

{
  "pipelines": [
    {
      "namespace": "default",
      "layer": "silver",
      "name": "orders",
      "type": "sql",
      "owner": null,
      "description": "Clean and deduplicate orders",
      "created_at": "2026-02-12T10:00:00Z",
      "updated_at": "2026-02-12T10:00:00Z",
      "last_run": {
        "run_id": "abc123",
        "status": "success",
        "finished_at": "2026-02-12T14:00:00Z"
      }
    }
  ],
  "total": 1
}

Response Fields

FieldTypeDescription
pipelinesarrayList of pipeline objects
pipelines[].namespacestringNamespace the pipeline belongs to
pipelines[].layerstringData layer: bronze, silver, or gold
pipelines[].namestringPipeline name
pipelines[].typestringPipeline type: sql or python
pipelines[].ownerstring|nullOwner user ID (Pro) or null
pipelines[].descriptionstringPipeline description
pipelines[].created_atstringISO 8601 creation timestamp
pipelines[].updated_atstringISO 8601 last update timestamp
pipelines[].last_runobject|nullMost recent run summary, or null if never run
pipelines[].last_run.run_idstringRun ID
pipelines[].last_run.statusstringRun status
pipelines[].last_run.finished_atstringISO 8601 completion timestamp
totalintegerTotal number of matching pipelines (for pagination)

Create Pipeline

POST /api/v1/pipelines

Creates a new pipeline and scaffolds the initial S3 files (pipeline.sql and config.yaml).

Request Body

FieldTypeRequiredDescription
namespacestringYesTarget namespace
layerstringYesData layer: bronze, silver, or gold
namestringYesPipeline name (lowercase, alphanumeric, hyphens, underscores)
typestringNoPipeline type: sql (default) or python
descriptionstringNoHuman-readable description
sourcestringNoSource table or landing zone name
unique_keystringNoPrimary key column for deduplication

Request

curl -X POST http://localhost:8080/api/v1/pipelines \
  -H "Content-Type: application/json" \
  -d '{
    "namespace": "default",
    "layer": "silver",
    "name": "orders",
    "type": "sql",
    "source": "raw_orders",
    "unique_key": "id",
    "description": "Clean and deduplicate orders"
  }'

Response — 201 Created

{
  "namespace": "default",
  "layer": "silver",
  "name": "orders",
  "s3_path": "default/pipelines/silver/orders/",
  "files_created": ["pipeline.sql", "config.yaml"]
}

Error Responses

StatusCodeDescription
400INVALID_ARGUMENTMissing required fields, invalid name or layer
409ALREADY_EXISTSA pipeline with the same namespace, layer, and name already exists

Get Pipeline

GET /api/v1/pipelines/{ns}/{layer}/{name}

Returns the full details for a single pipeline, including metadata and last run information.

Path Parameters

ParameterTypeDescription
nsstringNamespace
layerstringData layer
namestringPipeline name

Request

curl http://localhost:8080/api/v1/pipelines/default/silver/orders

Response — 200 OK

{
  "namespace": "default",
  "layer": "silver",
  "name": "orders",
  "type": "sql",
  "owner": null,
  "description": "Clean and deduplicate orders",
  "s3_path": "default/pipelines/silver/orders/",
  "created_at": "2026-02-12T10:00:00Z",
  "updated_at": "2026-02-12T10:00:00Z",
  "last_run": {
    "run_id": "abc123",
    "status": "success",
    "finished_at": "2026-02-12T14:00:00Z"
  }
}

Error Responses

StatusCodeDescription
404NOT_FOUNDPipeline not found

Update Pipeline

PUT /api/v1/pipelines/{ns}/{layer}/{name}

Updates a pipeline’s mutable fields. This is a partial update — only provided fields are changed.

Path Parameters

ParameterTypeDescription
nsstringNamespace
layerstringData layer
namestringPipeline name

Request Body

FieldTypeRequiredDescription
descriptionstringNoUpdated description
typestringNoPipeline type: sql or python
ownerstringNoOwner user ID

Request

curl -X PUT http://localhost:8080/api/v1/pipelines/default/silver/orders \
  -H "Content-Type: application/json" \
  -d '{
    "description": "Updated description",
    "type": "python",
    "owner": "user-id"
  }'

Response — 200 OK

Returns the full pipeline object with updated fields.

Error Responses

StatusCodeDescription
400INVALID_ARGUMENTInvalid field values
403AUTHORIZATIONInsufficient permissions (Pro: ownership/ACL check)
404NOT_FOUNDPipeline not found

Delete Pipeline

DELETE /api/v1/pipelines/{ns}/{layer}/{name}

Soft-deletes a pipeline record and cleans up S3 files under the pipeline prefix. Soft-deleted pipelines are permanently purged by the reaper after the configured retention period.

Path Parameters

ParameterTypeDescription
nsstringNamespace
layerstringData layer
namestringPipeline name

Request

curl -X DELETE http://localhost:8080/api/v1/pipelines/default/silver/orders

Response — 204 No Content

No response body.

Error Responses

StatusCodeDescription
403AUTHORIZATIONInsufficient permissions (Pro: ownership/ACL check)
404NOT_FOUNDPipeline not found

Preview Pipeline (Dry Run)

POST /api/v1/pipelines/{ns}/{layer}/{name}/preview

Executes a pipeline in preview mode — returns sample rows, profiling statistics, EXPLAIN ANALYZE output, and execution logs without writing to the data lake or creating a run record.

Path Parameters

ParameterTypeDescription
nsstringNamespace
layerstringData layer
namestringPipeline name

Query Parameters

ParameterTypeDefaultDescription
limitinteger100Maximum rows to return (max 1000)

Request Body (Optional)

FieldTypeRequiredDescription
limitintegerNoMaximum rows to return (overrides query param, max 1000)
codestringNoOverride SQL/Python code for the preview
sample_filesarrayNoList of S3 paths to sample files for landing zone previews

Request

curl -X POST http://localhost:8080/api/v1/pipelines/default/silver/orders/preview \
  -H "Content-Type: application/json" \
  -d '{
    "limit": 100,
    "sample_files": ["default/landing/raw-uploads/_samples/sample.csv"],
    "code": "SELECT * FROM {{ ref('\''bronze.raw_orders'\'') }} LIMIT 10"
  }'

Response — 200 OK

{
  "columns": [
    { "name": "id", "type": "VARCHAR" },
    { "name": "amount", "type": "DECIMAL(14,2)" }
  ],
  "rows": [
    ["ORD-001", 129.99],
    ["ORD-002", 45.50]
  ],
  "total_row_count": 12340,
  "phases": [
    { "name": "template_render", "duration_ms": 5, "metadata": {} },
    { "name": "sql_execute", "duration_ms": 120, "metadata": {} },
    { "name": "profiling", "duration_ms": 30, "metadata": {} }
  ],
  "explain_output": "EXPLAIN ANALYZE output...",
  "memory_peak_bytes": 52428800,
  "logs": [
    { "timestamp": "2026-02-12T14:00:01Z", "level": "info", "message": "Rendering template..." }
  ],
  "warnings": ["Column 'legacy_id' has 45% null values"]
}

Response Fields

FieldTypeDescription
columnsarrayColumn definitions with name and type
rowsarrayResult rows (up to limit)
total_row_countintegerTotal rows the query would produce
phasesarrayExecution phases with timing information
phases[].namestringPhase name: template_render, sql_execute, profiling
phases[].duration_msintegerPhase duration in milliseconds
phases[].metadataobjectPhase-specific metadata
explain_outputstringDuckDB EXPLAIN ANALYZE output
memory_peak_bytesintegerPeak memory usage in bytes
logsarrayExecution log entries
warningsarrayData quality warnings (e.g., high null percentages)

Error Responses

StatusCodeDescription
400INVALID_ARGUMENTInvalid request body
404NOT_FOUNDPipeline not found
503UNAVAILABLEExecutor (runner) not available

Publish Pipeline

POST /api/v1/pipelines/{ns}/{layer}/{name}/publish

Snapshots the current HEAD S3 version IDs as the “published” versions for a pipeline. Creates a version history record. Templates are validated against the runner before publishing.

Path Parameters

ParameterTypeDescription
nsstringNamespace
layerstringData layer
namestringPipeline name

Request Body (Optional)

FieldTypeRequiredDescription
messagestringNoPublish message describing the changes

Request

curl -X POST http://localhost:8080/api/v1/pipelines/default/silver/orders/publish \
  -H "Content-Type: application/json" \
  -d '{"message": "Fix null handling in orders pipeline"}'

Response — 200 OK

{
  "status": "published",
  "version": 3,
  "message": "Fix null handling in orders pipeline",
  "versions": {
    "default/pipelines/silver/orders/pipeline.sql": "version-id-1",
    "default/pipelines/silver/orders/config.yaml": "version-id-2"
  }
}

Error Responses

StatusCodeDescription
404NOT_FOUNDPipeline not found
422INVALID_ARGUMENTTemplate validation failed

Template Validation Failure — 422

When template validation fails, the response includes detailed error information:

{
  "error": "template validation failed",
  "validation": {
    "valid": false,
    "files": [
      {
        "path": "pipeline.sql",
        "valid": false,
        "errors": ["ref('missing_table') references a table that does not exist"],
        "warnings": []
      }
    ]
  }
}

Template validation is a soft dependency — if the runner is unavailable, publishing proceeds without validation. This ensures you can still publish when the runner is temporarily down.


List Versions

GET /api/v1/pipelines/{ns}/{layer}/{name}/versions

Returns the version history for a pipeline. Only available when a VersionStore is configured.

Path Parameters

ParameterTypeDescription
nsstringNamespace
layerstringData layer
namestringPipeline name

Request

curl http://localhost:8080/api/v1/pipelines/default/silver/orders/versions

Response — 200 OK

{
  "versions": [
    {
      "id": "version-uuid",
      "pipeline_id": "pipeline-uuid",
      "version_number": 3,
      "message": "Fix null handling",
      "published_versions": {
        "default/pipelines/silver/orders/pipeline.sql": "version-id-1"
      },
      "created_at": "2026-02-14T10:00:00Z"
    }
  ],
  "total": 3
}

Response Fields

FieldTypeDescription
versionsarrayList of version objects (newest first)
versions[].idstringVersion UUID
versions[].pipeline_idstringPipeline UUID
versions[].version_numberintegerSequential version number
versions[].messagestringPublish message
versions[].published_versionsobjectMap of S3 file paths to version IDs
versions[].created_atstringISO 8601 creation timestamp
totalintegerTotal number of versions

Error Responses

StatusCodeDescription
404NOT_FOUNDPipeline not found

Get Version

GET /api/v1/pipelines/{ns}/{layer}/{name}/versions/{number}

Returns a specific version by its version number.

Path Parameters

ParameterTypeDescription
nsstringNamespace
layerstringData layer
namestringPipeline name
numberintegerVersion number (1-based)

Request

curl http://localhost:8080/api/v1/pipelines/default/silver/orders/versions/2

Response — 200 OK

Returns a single version object (same shape as the items in the list response).

Error Responses

StatusCodeDescription
400INVALID_ARGUMENTInvalid version number
404NOT_FOUNDPipeline or version not found

Rollback Pipeline

POST /api/v1/pipelines/{ns}/{layer}/{name}/rollback

Creates a new version that re-pins an old version’s file snapshots as the current published state. The operation is atomic — version creation, publish, and prune happen in one transaction.

Versions are pruned to keep the last max_versions (default 50) per pipeline.

Path Parameters

ParameterTypeDescription
nsstringNamespace
layerstringData layer
namestringPipeline name

Request Body

FieldTypeRequiredDescription
versionintegerYesVersion number to rollback to (must be >= 1)
messagestringNoRollback message

Request

curl -X POST http://localhost:8080/api/v1/pipelines/default/silver/orders/rollback \
  -H "Content-Type: application/json" \
  -d '{
    "version": 2,
    "message": "Rollback to v2 due to regression"
  }'

Response — 200 OK

{
  "status": "rolled_back",
  "from_version": 2,
  "new_version": 4,
  "message": "Rollback to v2 due to regression"
}

Response Fields

FieldTypeDescription
statusstringAlways rolled_back
from_versionintegerThe version number that was rolled back to
new_versionintegerThe new version number created by the rollback
messagestringRollback message

Error Responses

StatusCodeDescription
400INVALID_ARGUMENTInvalid version number (must be >= 1)
404NOT_FOUNDPipeline or target version not found