Pipeline Execution
Every pipeline run in RAT follows a 5-phase execution model. Each phase has a clear responsibility, and the pipeline can fail, be cancelled, or recover at any point. This page walks through every phase in detail.
Execution Overview
| Phase | Name | What Happens |
|---|---|---|
| 0 | Branch Creation | Create Nessie branch run-{run_id} from main |
| 1 | Config & Detection | Detect pipeline type, read config, merge annotations |
| 2 | Build Result | Compile Jinja templates, execute SQL/Python via DuckDB |
| 3 | Iceberg Write | Write PyArrow table to Iceberg on the branch |
| 4 | Quality Tests | Discover and execute quality test SQL files |
| 5 | Branch Resolution | Merge to main (pass) or delete branch (fail) |
Full Sequence Diagram
Phase 0: Branch Creation
Every pipeline run starts by creating an isolated Nessie branch. This ensures that no writes affect the production main branch until the run is fully validated.
Branch name: run-{run_id}
Parent: mainWhy Branch Per Run?
- Isolation --- concurrent pipeline runs never interfere with each other
- Atomicity --- a failed run leaves zero trace on the production catalog
- Quality gating --- quality tests run against the branch data before it reaches main
- Auditability --- every run’s writes are on a named branch that can be inspected
Fallback Behavior
If Nessie is temporarily unavailable when the run starts, the runner falls back to writing directly to the main branch. This is a degraded mode --- quality tests still run, but there is no branch isolation. The run logs a warning.
Fallback to direct-main writes means a failed quality test cannot roll back the data. This mode exists to prevent total outages when Nessie has transient issues. In production, ensure Nessie is healthy and monitored.
Branch Cleanup
Branches are always cleaned up, regardless of outcome:
| Outcome | Branch Action |
|---|---|
| Success (all tests pass) | Merge to main, then delete branch |
| Failure (test failed) | Delete branch (no merge) |
| Error (crash/timeout) | Reaper cleans up orphaned branches after 6 hours |
| Cancellation | Delete branch |
Phase 1: Type Detection and Config Loading
The runner reads the pipeline files from S3 and builds the execution configuration.
Type Detection
The runner checks for the pipeline file in order:
pipeline.sql--- SQL pipeline (DuckDB execution)pipeline.py--- Python pipeline (sandboxed exec)
The type determines the execution path in Phase 2.
Config Loading
Configuration comes from up to three sources, merged in priority order:
Annotation Parsing
Annotations are SQL comments at the top of the file using the -- @key: value format:
-- @merge_strategy: incremental
-- @unique_key: order_id
-- @watermark_column: updated_at
-- @description: Deduplicated orders
SELECT * FROM {{ ref('bronze.raw_orders') }}The parser extracts these before Jinja compilation. Recognized annotations:
| Annotation | Default | Description |
|---|---|---|
@merge_strategy | full_refresh | How new data merges with existing data |
@unique_key | (none) | Primary key column(s) for dedup |
@watermark_column | (none) | Column tracking incremental progress |
@description | "" | Human-readable description |
@partition_column | (none) | Partition column for snapshot strategy |
@archive_landing_zones | false | Move landing zone files to _processed/ after run |
@scd_valid_from | valid_from | SCD2 record start date column |
@scd_valid_to | valid_to | SCD2 record end date column |
@materialized | table | Materialization type |
Phase 2: Build Result
This is where the actual data processing happens. The runner compiles Jinja templates into raw SQL, executes it via DuckDB, and produces a PyArrow Table.
DuckDB Initialization
Each run gets a fresh DuckDB connection with extensions loaded:
conn = duckdb.connect(":memory:")
conn.execute("INSTALL httpfs; LOAD httpfs;")
conn.execute("INSTALL iceberg; LOAD iceberg;")
conn.execute(f"SET s3_endpoint='{s3_endpoint}';")
conn.execute(f"SET s3_access_key_id='{s3_key}';")
conn.execute(f"SET s3_secret_access_key='{s3_secret}';")
conn.execute("SET s3_url_style='path';")Jinja Compilation
The runner builds a Jinja context with all available template variables and compiles the SQL:
| Template | Resolves To |
|---|---|
{{ ref('bronze.raw_orders') }} | iceberg_scan('s3://rat/{ns}/warehouse/bronze/raw_orders/metadata/...') |
{{ ref('other_ns.silver.users') }} | iceberg_scan('s3://rat/other_ns/warehouse/silver/users/metadata/...') |
{{ landing_zone('orders') }} | 's3://rat/{ns}/landing/orders/**' |
{{ this }} | The current pipeline’s own Iceberg table path |
{{ run_started_at }} | ISO 8601 UTC timestamp |
{{ watermark_value }} | MAX of watermark_column from existing table |
{{ is_incremental() }} | true if strategy=incremental and table exists |
{{ is_scd2() }} | true if strategy=scd2 |
SQL Execution
The compiled SQL is executed in DuckDB. DuckDB reads source data from MinIO via S3 protocol (using iceberg_scan() for Iceberg tables or read_csv_auto()/read_parquet() for landing zone files).
The result is fetched as a PyArrow Table --- an in-memory columnar representation that is efficient for both DuckDB and Iceberg writes.
Python Pipeline Execution
For pipeline.py files, the runner uses exec() with a sandboxed environment:
- 26 blocked builtins (
exec,eval,compile,__import__,open,input, etc.) - 30+ blocked imports (
os,sys,subprocess,socket,shutil, etc.) - AST validation (checks the abstract syntax tree before execution)
- SQL filtering on any SQL strings constructed in Python
The Python pipeline must produce a PyArrow Table or DuckDB relation as its output.
Phase 3: Iceberg Write
The PyArrow Table from Phase 2 is written to an Apache Iceberg table on the Nessie branch. The write strategy depends on the merge_strategy annotation.
Merge Strategy Dispatch
| Strategy | Behavior | When to Use |
|---|---|---|
full_refresh | Drop and replace the entire table | Small tables, Gold aggregations, no history needed |
incremental | Upsert by unique_key, using watermark_column | Large tables with updates, deduplication |
append_only | Append new rows, never update existing | Event logs, raw ingestion |
delete_insert | Delete matching rows by unique_key, then insert new | Batch updates where upsert is complex |
scd2 | Slowly Changing Dimension Type 2 with validity ranges | Dimension tables requiring full history |
snapshot | Partition-level overwrite by partition_column | Daily/monthly snapshots |
Write Paths
The runner uses PyIceberg for Iceberg writes. There are two paths:
Optimized path (overwrite) --- used by full_refresh and snapshot:
table.overwrite(arrow_table) # Atomic replaceMerge path (upsert) --- used by incremental, delete_insert, scd2:
# Read existing data from branch
existing = table.scan().to_arrow()
# Merge new data with existing based on unique_key
merged = merge_tables(existing, new_data, unique_key, strategy)
# Overwrite with merged result
table.overwrite(merged)Commit to Branch
After writing Parquet files to MinIO, the runner commits the Iceberg metadata to the Nessie branch. This is a single atomic operation --- either all the metadata is committed or none of it is.
Phase 4: Quality Tests
Quality tests are SQL files that validate the data written on the branch. They run before the branch is merged to main, acting as a gate.
Test Discovery
The runner looks for quality test files in the pipeline’s S3 directory:
{namespace}/pipelines/{layer}/{name}/tests/quality/*.sqlEach .sql file is a separate quality test. The file name becomes the test name.
Test Execution
Each test SQL is executed against the branch data (not main). A quality test is expected to return rows that violate the quality rule:
- 0 rows returned = test passes (no violations)
- >0 rows returned = test fails (violations found)
-- Fail if any order has a NULL order_id
SELECT order_id, customer_id
FROM {{ this }}
WHERE order_id IS NULL-- Fail if any order has a non-positive total
SELECT order_id, total_amount
FROM {{ this }}
WHERE total_amount <= 0Test Severity
Each quality test has a severity level:
| Severity | On Failure |
|---|---|
error (default) | Block merge --- branch is deleted, run fails |
warn | Log warning --- branch is still merged, run succeeds with warnings |
Test Results
Quality test results are recorded in the quality_results table with:
- Status:
passed,failed,warned,error - Value: actual count of violations
- Duration: execution time
- Run ID: link to the pipeline run
Phase 5: Branch Resolution
The final phase determines the outcome of the run based on quality test results.
Decision Logic
Merge to Main
When all quality tests pass (or only warn-severity tests fail), the runner merges the branch to main:
POST /api/v2/trees/main/merge
Body: { fromRef: "run-{id}", fromHash: "{branch_hash}" }The merge uses Nessie’s optimistic concurrency. If another run merged to main between branch creation and this merge, Nessie resolves the conflict (Iceberg tables in different namespaces/layers never conflict). If there is a genuine conflict (same table modified), the merge fails and the runner retries (see Nessie Branching).
Delete Branch
After merging (or on failure), the branch is always deleted:
DELETE /api/v2/trees/run-{id}Post-Success Tasks
After a successful merge, the runner performs optional cleanup:
- Landing zone archival --- if
@archive_landing_zones: true, source files are moved to_processed/{run_id}/ - Iceberg maintenance --- snapshot expiration and orphan file cleanup on the table (if configured)
Callback to ratd
The runner POSTs the terminal status to ratd:
{
"status": "success",
"rows_written": 15482,
"duration_ms": 3200,
"phase_profiles": {
"branch_creation": 120,
"config_loading": 45,
"build_result": 2100,
"iceberg_write": 650,
"quality_tests": 180,
"branch_resolution": 105
}
}ratd updates the runs table and stores the phase profiling data in the phase_profiles JSONB column.
Error Handling
Phase-Level Error Handling
| Phase | On Error |
|---|---|
| Phase 0 (Branch) | Fall back to direct main writes. Log warning. |
| Phase 1 (Config) | Run fails. No branch cleanup needed (no writes yet). |
| Phase 2 (Execute) | Run fails. Delete branch. |
| Phase 3 (Write) | Run fails. Delete branch. |
| Phase 4 (Quality) | Depends on severity. Error = fail + delete. Warn = merge. |
| Phase 5 (Merge) | Retry up to 3 times. If all retries fail, delete branch and fail. |
Common Errors
DuckDB OutOfMemoryError → Run fails, branch deleted
Nessie ConflictException → Retry merge (3 attempts, exponential backoff)
S3 ConnectionError → Run fails, branch deleted
Jinja TemplateError → Run fails (Phase 2), branch deleted
Quality test timeout → Test counts as error, blocks mergeCancellation
Pipeline runs can be cancelled at any point via the CancelRun gRPC call.
Cancellation is cooperative --- the runner checks a cancel flag between phases and at key checkpoints within phases. DuckDB queries can be interrupted mid-execution.
Crash Recovery
If the runner crashes mid-execution, the run is left in running state with an orphaned Nessie branch. The reaper daemon in ratd handles recovery:
-
Stuck run detection --- the reaper queries for runs with
status=runningandstarted_atolder than 30 minutes (configurable viastuck_run_timeout_minutes). These runs are marked asfailedwith an error message indicating the timeout. -
Orphan branch cleanup --- the reaper queries Nessie for branches matching
run-*that are older than 6 hours (configurable vianessie_orphan_branch_max_age_hours). These branches are deleted.
The reaper runs every 60 minutes (configurable) and performs 6 cleanup tasks. See the Reaper section for all tasks.
Phase Profiling
Every run records per-phase timing in the phase_profiles JSONB column:
{
"branch_creation": 120,
"config_loading": 45,
"build_result": 2100,
"iceberg_write": 650,
"quality_tests": 180,
"branch_resolution": 105
}This data is displayed in the portal’s run detail page, helping identify bottlenecks. Common patterns:
- build_result dominates --- the SQL query is slow. Optimize the query or add filters.
- iceberg_write dominates --- large result set. Consider incremental strategy to reduce write volume.
- quality_tests dominates --- complex quality tests. Optimize test SQL or reduce test count.
- branch_resolution dominates --- merge conflicts. Check for concurrent writes to the same table.