Pipelines
Pipelines are the core resource in RAT. A pipeline defines a data transformation — a SQL query or Python script that reads from source tables and writes to a target Iceberg table. Pipelines are organized by namespace and layer (bronze, silver, gold).
Endpoints
| Method | Endpoint | Description |
|---|---|---|
GET | /api/v1/pipelines | List all pipelines |
POST | /api/v1/pipelines | Create a new pipeline |
GET | /api/v1/pipelines/{ns}/{layer}/{name} | Get pipeline details |
PUT | /api/v1/pipelines/{ns}/{layer}/{name} | Update a pipeline |
DELETE | /api/v1/pipelines/{ns}/{layer}/{name} | Soft-delete a pipeline |
POST | /api/v1/pipelines/{ns}/{layer}/{name}/preview | Dry-run (preview) a pipeline |
POST | /api/v1/pipelines/{ns}/{layer}/{name}/publish | Snapshot and version a pipeline |
GET | /api/v1/pipelines/{ns}/{layer}/{name}/versions | List pipeline versions |
GET | /api/v1/pipelines/{ns}/{layer}/{name}/versions/{number} | Get a specific version |
POST | /api/v1/pipelines/{ns}/{layer}/{name}/rollback | Rollback to a previous version |
List Pipelines
GET /api/v1/pipelinesReturns a paginated list of all pipelines, optionally filtered by namespace, layer, or search term.
Query Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
namespace | string | — | Filter by namespace |
layer | string | — | Filter by layer (bronze, silver, gold) |
search | string | — | Search pipelines by name (partial match) |
sort | string | created_at | Sort field (name, created_at, updated_at) |
order | string | desc | Sort direction (asc, desc) |
limit | integer | 50 | Items per page (max 200) |
offset | integer | 0 | Number of items to skip |
Request
curl "http://localhost:8080/api/v1/pipelines?namespace=default&layer=silver&limit=10"Response — 200 OK
{
"pipelines": [
{
"namespace": "default",
"layer": "silver",
"name": "orders",
"type": "sql",
"owner": null,
"description": "Clean and deduplicate orders",
"created_at": "2026-02-12T10:00:00Z",
"updated_at": "2026-02-12T10:00:00Z",
"last_run": {
"run_id": "abc123",
"status": "success",
"finished_at": "2026-02-12T14:00:00Z"
}
}
],
"total": 1
}Response Fields
| Field | Type | Description |
|---|---|---|
pipelines | array | List of pipeline objects |
pipelines[].namespace | string | Namespace the pipeline belongs to |
pipelines[].layer | string | Data layer: bronze, silver, or gold |
pipelines[].name | string | Pipeline name |
pipelines[].type | string | Pipeline type: sql or python |
pipelines[].owner | string|null | Owner user ID (Pro) or null |
pipelines[].description | string | Pipeline description |
pipelines[].created_at | string | ISO 8601 creation timestamp |
pipelines[].updated_at | string | ISO 8601 last update timestamp |
pipelines[].last_run | object|null | Most recent run summary, or null if never run |
pipelines[].last_run.run_id | string | Run ID |
pipelines[].last_run.status | string | Run status |
pipelines[].last_run.finished_at | string | ISO 8601 completion timestamp |
total | integer | Total number of matching pipelines (for pagination) |
Create Pipeline
POST /api/v1/pipelinesCreates a new pipeline and scaffolds the initial S3 files (pipeline.sql and config.yaml).
Request Body
| Field | Type | Required | Description |
|---|---|---|---|
namespace | string | Yes | Target namespace |
layer | string | Yes | Data layer: bronze, silver, or gold |
name | string | Yes | Pipeline name (lowercase, alphanumeric, hyphens, underscores) |
type | string | No | Pipeline type: sql (default) or python |
description | string | No | Human-readable description |
source | string | No | Source table or landing zone name |
unique_key | string | No | Primary key column for deduplication |
Request
curl -X POST http://localhost:8080/api/v1/pipelines \
-H "Content-Type: application/json" \
-d '{
"namespace": "default",
"layer": "silver",
"name": "orders",
"type": "sql",
"source": "raw_orders",
"unique_key": "id",
"description": "Clean and deduplicate orders"
}'Response — 201 Created
{
"namespace": "default",
"layer": "silver",
"name": "orders",
"s3_path": "default/pipelines/silver/orders/",
"files_created": ["pipeline.sql", "config.yaml"]
}Error Responses
| Status | Code | Description |
|---|---|---|
400 | INVALID_ARGUMENT | Missing required fields, invalid name or layer |
409 | ALREADY_EXISTS | A pipeline with the same namespace, layer, and name already exists |
Get Pipeline
GET /api/v1/pipelines/{ns}/{layer}/{name}Returns the full details for a single pipeline, including metadata and last run information.
Path Parameters
| Parameter | Type | Description |
|---|---|---|
ns | string | Namespace |
layer | string | Data layer |
name | string | Pipeline name |
Request
curl http://localhost:8080/api/v1/pipelines/default/silver/ordersResponse — 200 OK
{
"namespace": "default",
"layer": "silver",
"name": "orders",
"type": "sql",
"owner": null,
"description": "Clean and deduplicate orders",
"s3_path": "default/pipelines/silver/orders/",
"created_at": "2026-02-12T10:00:00Z",
"updated_at": "2026-02-12T10:00:00Z",
"last_run": {
"run_id": "abc123",
"status": "success",
"finished_at": "2026-02-12T14:00:00Z"
}
}Error Responses
| Status | Code | Description |
|---|---|---|
404 | NOT_FOUND | Pipeline not found |
Update Pipeline
PUT /api/v1/pipelines/{ns}/{layer}/{name}Updates a pipeline’s mutable fields. This is a partial update — only provided fields are changed.
Path Parameters
| Parameter | Type | Description |
|---|---|---|
ns | string | Namespace |
layer | string | Data layer |
name | string | Pipeline name |
Request Body
| Field | Type | Required | Description |
|---|---|---|---|
description | string | No | Updated description |
type | string | No | Pipeline type: sql or python |
owner | string | No | Owner user ID |
Request
curl -X PUT http://localhost:8080/api/v1/pipelines/default/silver/orders \
-H "Content-Type: application/json" \
-d '{
"description": "Updated description",
"type": "python",
"owner": "user-id"
}'Response — 200 OK
Returns the full pipeline object with updated fields.
Error Responses
| Status | Code | Description |
|---|---|---|
400 | INVALID_ARGUMENT | Invalid field values |
403 | AUTHORIZATION | Insufficient permissions (Pro: ownership/ACL check) |
404 | NOT_FOUND | Pipeline not found |
Delete Pipeline
DELETE /api/v1/pipelines/{ns}/{layer}/{name}Soft-deletes a pipeline record and cleans up S3 files under the pipeline prefix. Soft-deleted pipelines are permanently purged by the reaper after the configured retention period.
Path Parameters
| Parameter | Type | Description |
|---|---|---|
ns | string | Namespace |
layer | string | Data layer |
name | string | Pipeline name |
Request
curl -X DELETE http://localhost:8080/api/v1/pipelines/default/silver/ordersResponse — 204 No Content
No response body.
Error Responses
| Status | Code | Description |
|---|---|---|
403 | AUTHORIZATION | Insufficient permissions (Pro: ownership/ACL check) |
404 | NOT_FOUND | Pipeline not found |
Preview Pipeline (Dry Run)
POST /api/v1/pipelines/{ns}/{layer}/{name}/previewExecutes a pipeline in preview mode — returns sample rows, profiling statistics, EXPLAIN ANALYZE output, and execution logs without writing to the data lake or creating a run record.
Path Parameters
| Parameter | Type | Description |
|---|---|---|
ns | string | Namespace |
layer | string | Data layer |
name | string | Pipeline name |
Query Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
limit | integer | 100 | Maximum rows to return (max 1000) |
Request Body (Optional)
| Field | Type | Required | Description |
|---|---|---|---|
limit | integer | No | Maximum rows to return (overrides query param, max 1000) |
code | string | No | Override SQL/Python code for the preview |
sample_files | array | No | List of S3 paths to sample files for landing zone previews |
Request
curl -X POST http://localhost:8080/api/v1/pipelines/default/silver/orders/preview \
-H "Content-Type: application/json" \
-d '{
"limit": 100,
"sample_files": ["default/landing/raw-uploads/_samples/sample.csv"],
"code": "SELECT * FROM {{ ref('\''bronze.raw_orders'\'') }} LIMIT 10"
}'Response — 200 OK
{
"columns": [
{ "name": "id", "type": "VARCHAR" },
{ "name": "amount", "type": "DECIMAL(14,2)" }
],
"rows": [
["ORD-001", 129.99],
["ORD-002", 45.50]
],
"total_row_count": 12340,
"phases": [
{ "name": "template_render", "duration_ms": 5, "metadata": {} },
{ "name": "sql_execute", "duration_ms": 120, "metadata": {} },
{ "name": "profiling", "duration_ms": 30, "metadata": {} }
],
"explain_output": "EXPLAIN ANALYZE output...",
"memory_peak_bytes": 52428800,
"logs": [
{ "timestamp": "2026-02-12T14:00:01Z", "level": "info", "message": "Rendering template..." }
],
"warnings": ["Column 'legacy_id' has 45% null values"]
}Response Fields
| Field | Type | Description |
|---|---|---|
columns | array | Column definitions with name and type |
rows | array | Result rows (up to limit) |
total_row_count | integer | Total rows the query would produce |
phases | array | Execution phases with timing information |
phases[].name | string | Phase name: template_render, sql_execute, profiling |
phases[].duration_ms | integer | Phase duration in milliseconds |
phases[].metadata | object | Phase-specific metadata |
explain_output | string | DuckDB EXPLAIN ANALYZE output |
memory_peak_bytes | integer | Peak memory usage in bytes |
logs | array | Execution log entries |
warnings | array | Data quality warnings (e.g., high null percentages) |
Error Responses
| Status | Code | Description |
|---|---|---|
400 | INVALID_ARGUMENT | Invalid request body |
404 | NOT_FOUND | Pipeline not found |
503 | UNAVAILABLE | Executor (runner) not available |
Publish Pipeline
POST /api/v1/pipelines/{ns}/{layer}/{name}/publishSnapshots the current HEAD S3 version IDs as the “published” versions for a pipeline. Creates a version history record. Templates are validated against the runner before publishing.
Path Parameters
| Parameter | Type | Description |
|---|---|---|
ns | string | Namespace |
layer | string | Data layer |
name | string | Pipeline name |
Request Body (Optional)
| Field | Type | Required | Description |
|---|---|---|---|
message | string | No | Publish message describing the changes |
Request
curl -X POST http://localhost:8080/api/v1/pipelines/default/silver/orders/publish \
-H "Content-Type: application/json" \
-d '{"message": "Fix null handling in orders pipeline"}'Response — 200 OK
{
"status": "published",
"version": 3,
"message": "Fix null handling in orders pipeline",
"versions": {
"default/pipelines/silver/orders/pipeline.sql": "version-id-1",
"default/pipelines/silver/orders/config.yaml": "version-id-2"
}
}Error Responses
| Status | Code | Description |
|---|---|---|
404 | NOT_FOUND | Pipeline not found |
422 | INVALID_ARGUMENT | Template validation failed |
Template Validation Failure — 422
When template validation fails, the response includes detailed error information:
{
"error": "template validation failed",
"validation": {
"valid": false,
"files": [
{
"path": "pipeline.sql",
"valid": false,
"errors": ["ref('missing_table') references a table that does not exist"],
"warnings": []
}
]
}
}Template validation is a soft dependency — if the runner is unavailable, publishing proceeds without validation. This ensures you can still publish when the runner is temporarily down.
List Versions
GET /api/v1/pipelines/{ns}/{layer}/{name}/versionsReturns the version history for a pipeline. Only available when a VersionStore is configured.
Path Parameters
| Parameter | Type | Description |
|---|---|---|
ns | string | Namespace |
layer | string | Data layer |
name | string | Pipeline name |
Request
curl http://localhost:8080/api/v1/pipelines/default/silver/orders/versionsResponse — 200 OK
{
"versions": [
{
"id": "version-uuid",
"pipeline_id": "pipeline-uuid",
"version_number": 3,
"message": "Fix null handling",
"published_versions": {
"default/pipelines/silver/orders/pipeline.sql": "version-id-1"
},
"created_at": "2026-02-14T10:00:00Z"
}
],
"total": 3
}Response Fields
| Field | Type | Description |
|---|---|---|
versions | array | List of version objects (newest first) |
versions[].id | string | Version UUID |
versions[].pipeline_id | string | Pipeline UUID |
versions[].version_number | integer | Sequential version number |
versions[].message | string | Publish message |
versions[].published_versions | object | Map of S3 file paths to version IDs |
versions[].created_at | string | ISO 8601 creation timestamp |
total | integer | Total number of versions |
Error Responses
| Status | Code | Description |
|---|---|---|
404 | NOT_FOUND | Pipeline not found |
Get Version
GET /api/v1/pipelines/{ns}/{layer}/{name}/versions/{number}Returns a specific version by its version number.
Path Parameters
| Parameter | Type | Description |
|---|---|---|
ns | string | Namespace |
layer | string | Data layer |
name | string | Pipeline name |
number | integer | Version number (1-based) |
Request
curl http://localhost:8080/api/v1/pipelines/default/silver/orders/versions/2Response — 200 OK
Returns a single version object (same shape as the items in the list response).
Error Responses
| Status | Code | Description |
|---|---|---|
400 | INVALID_ARGUMENT | Invalid version number |
404 | NOT_FOUND | Pipeline or version not found |
Rollback Pipeline
POST /api/v1/pipelines/{ns}/{layer}/{name}/rollbackCreates a new version that re-pins an old version’s file snapshots as the current published state. The operation is atomic — version creation, publish, and prune happen in one transaction.
Versions are pruned to keep the last max_versions (default 50) per pipeline.
Path Parameters
| Parameter | Type | Description |
|---|---|---|
ns | string | Namespace |
layer | string | Data layer |
name | string | Pipeline name |
Request Body
| Field | Type | Required | Description |
|---|---|---|---|
version | integer | Yes | Version number to rollback to (must be >= 1) |
message | string | No | Rollback message |
Request
curl -X POST http://localhost:8080/api/v1/pipelines/default/silver/orders/rollback \
-H "Content-Type: application/json" \
-d '{
"version": 2,
"message": "Rollback to v2 due to regression"
}'Response — 200 OK
{
"status": "rolled_back",
"from_version": 2,
"new_version": 4,
"message": "Rollback to v2 due to regression"
}Response Fields
| Field | Type | Description |
|---|---|---|
status | string | Always rolled_back |
from_version | integer | The version number that was rolled back to |
new_version | integer | The new version number created by the rollback |
message | string | Rollback message |
Error Responses
| Status | Code | Description |
|---|---|---|
400 | INVALID_ARGUMENT | Invalid version number (must be >= 1) |
404 | NOT_FOUND | Pipeline or target version not found |