ContributingArchitecturePipeline Execution

Pipeline Execution

Every pipeline run in RAT follows a 5-phase execution model. Each phase has a clear responsibility, and the pipeline can fail, be cancelled, or recover at any point. This page walks through every phase in detail.


Execution Overview

PhaseNameWhat Happens
0Branch CreationCreate Nessie branch run-{run_id} from main
1Config & DetectionDetect pipeline type, read config, merge annotations
2Build ResultCompile Jinja templates, execute SQL/Python via DuckDB
3Iceberg WriteWrite PyArrow table to Iceberg on the branch
4Quality TestsDiscover and execute quality test SQL files
5Branch ResolutionMerge to main (pass) or delete branch (fail)

Full Sequence Diagram


Phase 0: Branch Creation

Every pipeline run starts by creating an isolated Nessie branch. This ensures that no writes affect the production main branch until the run is fully validated.

Branch name: run-{run_id}
Parent: main

Why Branch Per Run?

  • Isolation --- concurrent pipeline runs never interfere with each other
  • Atomicity --- a failed run leaves zero trace on the production catalog
  • Quality gating --- quality tests run against the branch data before it reaches main
  • Auditability --- every run’s writes are on a named branch that can be inspected

Fallback Behavior

If Nessie is temporarily unavailable when the run starts, the runner falls back to writing directly to the main branch. This is a degraded mode --- quality tests still run, but there is no branch isolation. The run logs a warning.

⚠️

Fallback to direct-main writes means a failed quality test cannot roll back the data. This mode exists to prevent total outages when Nessie has transient issues. In production, ensure Nessie is healthy and monitored.

Branch Cleanup

Branches are always cleaned up, regardless of outcome:

OutcomeBranch Action
Success (all tests pass)Merge to main, then delete branch
Failure (test failed)Delete branch (no merge)
Error (crash/timeout)Reaper cleans up orphaned branches after 6 hours
CancellationDelete branch

Phase 1: Type Detection and Config Loading

The runner reads the pipeline files from S3 and builds the execution configuration.

Type Detection

The runner checks for the pipeline file in order:

  1. pipeline.sql --- SQL pipeline (DuckDB execution)
  2. pipeline.py --- Python pipeline (sandboxed exec)

The type determines the execution path in Phase 2.

Config Loading

Configuration comes from up to three sources, merged in priority order:

Annotation Parsing

Annotations are SQL comments at the top of the file using the -- @key: value format:

pipeline.sql
-- @merge_strategy: incremental
-- @unique_key: order_id
-- @watermark_column: updated_at
-- @description: Deduplicated orders
 
SELECT * FROM {{ ref('bronze.raw_orders') }}

The parser extracts these before Jinja compilation. Recognized annotations:

AnnotationDefaultDescription
@merge_strategyfull_refreshHow new data merges with existing data
@unique_key(none)Primary key column(s) for dedup
@watermark_column(none)Column tracking incremental progress
@description""Human-readable description
@partition_column(none)Partition column for snapshot strategy
@archive_landing_zonesfalseMove landing zone files to _processed/ after run
@scd_valid_fromvalid_fromSCD2 record start date column
@scd_valid_tovalid_toSCD2 record end date column
@materializedtableMaterialization type

Phase 2: Build Result

This is where the actual data processing happens. The runner compiles Jinja templates into raw SQL, executes it via DuckDB, and produces a PyArrow Table.

DuckDB Initialization

Each run gets a fresh DuckDB connection with extensions loaded:

Engine initialization (simplified)
conn = duckdb.connect(":memory:")
conn.execute("INSTALL httpfs; LOAD httpfs;")
conn.execute("INSTALL iceberg; LOAD iceberg;")
conn.execute(f"SET s3_endpoint='{s3_endpoint}';")
conn.execute(f"SET s3_access_key_id='{s3_key}';")
conn.execute(f"SET s3_secret_access_key='{s3_secret}';")
conn.execute("SET s3_url_style='path';")

Jinja Compilation

The runner builds a Jinja context with all available template variables and compiles the SQL:

TemplateResolves To
{{ ref('bronze.raw_orders') }}iceberg_scan('s3://rat/{ns}/warehouse/bronze/raw_orders/metadata/...')
{{ ref('other_ns.silver.users') }}iceberg_scan('s3://rat/other_ns/warehouse/silver/users/metadata/...')
{{ landing_zone('orders') }}'s3://rat/{ns}/landing/orders/**'
{{ this }}The current pipeline’s own Iceberg table path
{{ run_started_at }}ISO 8601 UTC timestamp
{{ watermark_value }}MAX of watermark_column from existing table
{{ is_incremental() }}true if strategy=incremental and table exists
{{ is_scd2() }}true if strategy=scd2

SQL Execution

The compiled SQL is executed in DuckDB. DuckDB reads source data from MinIO via S3 protocol (using iceberg_scan() for Iceberg tables or read_csv_auto()/read_parquet() for landing zone files).

The result is fetched as a PyArrow Table --- an in-memory columnar representation that is efficient for both DuckDB and Iceberg writes.

Python Pipeline Execution

For pipeline.py files, the runner uses exec() with a sandboxed environment:

  • 26 blocked builtins (exec, eval, compile, __import__, open, input, etc.)
  • 30+ blocked imports (os, sys, subprocess, socket, shutil, etc.)
  • AST validation (checks the abstract syntax tree before execution)
  • SQL filtering on any SQL strings constructed in Python

The Python pipeline must produce a PyArrow Table or DuckDB relation as its output.


Phase 3: Iceberg Write

The PyArrow Table from Phase 2 is written to an Apache Iceberg table on the Nessie branch. The write strategy depends on the merge_strategy annotation.

Merge Strategy Dispatch

StrategyBehaviorWhen to Use
full_refreshDrop and replace the entire tableSmall tables, Gold aggregations, no history needed
incrementalUpsert by unique_key, using watermark_columnLarge tables with updates, deduplication
append_onlyAppend new rows, never update existingEvent logs, raw ingestion
delete_insertDelete matching rows by unique_key, then insert newBatch updates where upsert is complex
scd2Slowly Changing Dimension Type 2 with validity rangesDimension tables requiring full history
snapshotPartition-level overwrite by partition_columnDaily/monthly snapshots

Write Paths

The runner uses PyIceberg for Iceberg writes. There are two paths:

Optimized path (overwrite) --- used by full_refresh and snapshot:

Optimized overwrite (simplified)
table.overwrite(arrow_table)  # Atomic replace

Merge path (upsert) --- used by incremental, delete_insert, scd2:

Merge path (simplified)
# Read existing data from branch
existing = table.scan().to_arrow()
# Merge new data with existing based on unique_key
merged = merge_tables(existing, new_data, unique_key, strategy)
# Overwrite with merged result
table.overwrite(merged)

Commit to Branch

After writing Parquet files to MinIO, the runner commits the Iceberg metadata to the Nessie branch. This is a single atomic operation --- either all the metadata is committed or none of it is.


Phase 4: Quality Tests

Quality tests are SQL files that validate the data written on the branch. They run before the branch is merged to main, acting as a gate.

Test Discovery

The runner looks for quality test files in the pipeline’s S3 directory:

{namespace}/pipelines/{layer}/{name}/tests/quality/*.sql

Each .sql file is a separate quality test. The file name becomes the test name.

Test Execution

Each test SQL is executed against the branch data (not main). A quality test is expected to return rows that violate the quality rule:

  • 0 rows returned = test passes (no violations)
  • >0 rows returned = test fails (violations found)
tests/quality/no_null_order_ids.sql
-- Fail if any order has a NULL order_id
SELECT order_id, customer_id
FROM {{ this }}
WHERE order_id IS NULL
tests/quality/positive_amounts.sql
-- Fail if any order has a non-positive total
SELECT order_id, total_amount
FROM {{ this }}
WHERE total_amount <= 0

Test Severity

Each quality test has a severity level:

SeverityOn Failure
error (default)Block merge --- branch is deleted, run fails
warnLog warning --- branch is still merged, run succeeds with warnings

Test Results

Quality test results are recorded in the quality_results table with:

  • Status: passed, failed, warned, error
  • Value: actual count of violations
  • Duration: execution time
  • Run ID: link to the pipeline run

Phase 5: Branch Resolution

The final phase determines the outcome of the run based on quality test results.

Decision Logic

Merge to Main

When all quality tests pass (or only warn-severity tests fail), the runner merges the branch to main:

POST /api/v2/trees/main/merge
Body: { fromRef: "run-{id}", fromHash: "{branch_hash}" }

The merge uses Nessie’s optimistic concurrency. If another run merged to main between branch creation and this merge, Nessie resolves the conflict (Iceberg tables in different namespaces/layers never conflict). If there is a genuine conflict (same table modified), the merge fails and the runner retries (see Nessie Branching).

Delete Branch

After merging (or on failure), the branch is always deleted:

DELETE /api/v2/trees/run-{id}

Post-Success Tasks

After a successful merge, the runner performs optional cleanup:

  1. Landing zone archival --- if @archive_landing_zones: true, source files are moved to _processed/{run_id}/
  2. Iceberg maintenance --- snapshot expiration and orphan file cleanup on the table (if configured)

Callback to ratd

The runner POSTs the terminal status to ratd:

POST {RATD_CALLBACK_URL}/api/v1/internal/runs/{id}/callback
{
  "status": "success",
  "rows_written": 15482,
  "duration_ms": 3200,
  "phase_profiles": {
    "branch_creation": 120,
    "config_loading": 45,
    "build_result": 2100,
    "iceberg_write": 650,
    "quality_tests": 180,
    "branch_resolution": 105
  }
}

ratd updates the runs table and stores the phase profiling data in the phase_profiles JSONB column.


Error Handling

Phase-Level Error Handling

PhaseOn Error
Phase 0 (Branch)Fall back to direct main writes. Log warning.
Phase 1 (Config)Run fails. No branch cleanup needed (no writes yet).
Phase 2 (Execute)Run fails. Delete branch.
Phase 3 (Write)Run fails. Delete branch.
Phase 4 (Quality)Depends on severity. Error = fail + delete. Warn = merge.
Phase 5 (Merge)Retry up to 3 times. If all retries fail, delete branch and fail.

Common Errors

DuckDB OutOfMemoryError    → Run fails, branch deleted
Nessie ConflictException   → Retry merge (3 attempts, exponential backoff)
S3 ConnectionError         → Run fails, branch deleted
Jinja TemplateError        → Run fails (Phase 2), branch deleted
Quality test timeout       → Test counts as error, blocks merge

Cancellation

Pipeline runs can be cancelled at any point via the CancelRun gRPC call.

Cancellation is cooperative --- the runner checks a cancel flag between phases and at key checkpoints within phases. DuckDB queries can be interrupted mid-execution.


Crash Recovery

If the runner crashes mid-execution, the run is left in running state with an orphaned Nessie branch. The reaper daemon in ratd handles recovery:

  1. Stuck run detection --- the reaper queries for runs with status=running and started_at older than 30 minutes (configurable via stuck_run_timeout_minutes). These runs are marked as failed with an error message indicating the timeout.

  2. Orphan branch cleanup --- the reaper queries Nessie for branches matching run-* that are older than 6 hours (configurable via nessie_orphan_branch_max_age_hours). These branches are deleted.

The reaper runs every 60 minutes (configurable) and performs 6 cleanup tasks. See the Reaper section for all tasks.


Phase Profiling

Every run records per-phase timing in the phase_profiles JSONB column:

Example phase_profiles
{
  "branch_creation": 120,
  "config_loading": 45,
  "build_result": 2100,
  "iceberg_write": 650,
  "quality_tests": 180,
  "branch_resolution": 105
}

This data is displayed in the portal’s run detail page, helping identify bottlenecks. Common patterns:

  • build_result dominates --- the SQL query is slow. Optimize the query or add filters.
  • iceberg_write dominates --- large result set. Consider incremental strategy to reduce write volume.
  • quality_tests dominates --- complex quality tests. Optimize test SQL or reduce test count.
  • branch_resolution dominates --- merge conflicts. Check for concurrent writes to the same table.