GuidesSQL Pipelines

SQL Pipelines

SQL pipelines are the core building block of RAT. You write a SELECT statement, and RAT handles execution, deduplication, Iceberg writes, quality testing, and branch isolation. If you can write SQL, you can build data pipelines.

Basic Structure

Every SQL pipeline lives in a file called pipeline.sql inside your pipeline directory:

{namespace}/pipelines/{layer}/{pipeline_name}/pipeline.sql

For example, a pipeline called clean_orders in the silver layer of the ecommerce namespace:

ecommerce/pipelines/silver/clean_orders/pipeline.sql

At its simplest, a pipeline is just a SELECT statement:

pipeline.sql
SELECT
    id,
    TRIM(name) AS name,
    CAST(price AS DECIMAL(10, 2)) AS price,
    created_at
FROM {{ ref('bronze.raw_products') }}
WHERE price > 0

RAT takes the result of your SELECT and writes it to an Apache Iceberg table. The table name is derived from the pipeline’s location: {namespace}.{layer}.{pipeline_name}.

Annotations

Annotations are special SQL comments at the top of your pipeline that configure how RAT processes the result. They use the -- @key: value format:

pipeline.sql
-- @merge_strategy: incremental
-- @unique_key: order_id
-- @watermark_column: updated_at
-- @description: Deduplicated orders with latest status
 
SELECT * FROM {{ ref('bronze.raw_orders') }}
{% if is_incremental() %}
WHERE updated_at > '{{ watermark_value }}'
{% endif %}

Available Annotations

AnnotationValuesDescription
@merge_strategyfull_refresh, incremental, append_only, delete_insert, scd2, snapshotHow new data merges with existing data. Default: full_refresh
@unique_keyColumn name(s)Primary key for dedup. Comma-separated for composite keys
@watermark_columnColumn nameColumn used to track incremental progress
@descriptionFree textHuman-readable description shown in the portal
@partition_columnColumn namePartition column for snapshot strategy
@archive_landing_zonestrue / falseMove landing zone files to _processed/ after run
@scd_valid_fromColumn nameSCD2: column for record start date. Default: valid_from
@scd_valid_toColumn nameSCD2: column for record end date. Default: valid_to
@materializedtableMaterialization type (currently only table)

Annotations must appear at the top of the file before any SQL or Jinja code. The order of annotations does not matter, but by convention they are listed in this order: merge_strategy, unique_key, watermark_column, partition_column, scd_valid_from, scd_valid_to, archive_landing_zones, materialized, description.

Annotations vs config.yaml

You can also configure pipelines via a config.yaml file in the same directory:

config.yaml
merge_strategy: incremental
unique_key: order_id
watermark_column: updated_at
description: Deduplicated orders with latest status

When both exist, annotations win. The resolution order is:

  1. Annotations in pipeline.sql (highest priority)
  2. config.yaml in the pipeline directory
  3. Default values
⚠️

If you define merge_strategy: full_refresh in config.yaml but -- @merge_strategy: incremental in the SQL file, the pipeline will use incremental. This can be confusing — pick one approach and stick with it.

Jinja Templating

RAT pipelines use Jinja2 templating to inject dynamic values and references. Jinja expressions are wrapped in {{ }} for values and {% %} for control flow.

Template Variables

VariableTypeDescription
{{ ref('layer.name') }}FunctionReference another table. Resolves to iceberg_scan('<path>')
{{ ref('ns.layer.name') }}FunctionCross-namespace reference with explicit namespace
{{ landing_zone('zone_name') }}FunctionResolves to landing zone S3 path: s3://rat/{ns}/landing/{zone}/**
{{ this }}StringCurrent pipeline’s own Iceberg table (for self-referencing)
{{ run_started_at }}StringISO 8601 UTC timestamp of when the run started
{{ watermark_value }}StringMAX value of watermark_column from the existing table
{{ is_incremental() }}Booleantrue if strategy is incremental and table already exists
{{ is_scd2() }}Booleantrue if strategy is scd2

The ref() Function

ref() is the most important template function. It creates a dependency between pipelines and resolves table references to Iceberg scan paths.

2-part syntax — references a table within the current namespace:

pipeline.sql
SELECT * FROM {{ ref('bronze.raw_orders') }}

This resolves to something like:

SELECT * FROM iceberg_scan('s3://rat/ecommerce/bronze/raw_orders/metadata/...')

3-part syntax — references a table in a different namespace:

pipeline.sql
SELECT
    o.*,
    c.country
FROM {{ ref('bronze.raw_orders') }} o
JOIN {{ ref('shared.silver.customers') }} c ON o.customer_id = c.id

ref() does more than resolve paths — it also registers the dependency in RAT’s lineage graph. This means RAT knows which pipelines depend on which tables, enabling DAG visualization and dependency-aware scheduling.

Conditional Logic

Use Jinja control flow for conditional SQL blocks. This is most commonly used with incremental pipelines:

pipeline.sql
-- @merge_strategy: incremental
-- @unique_key: user_id
-- @watermark_column: last_login_at
 
SELECT
    user_id,
    email,
    last_login_at,
    login_count
FROM {{ ref('bronze.raw_logins') }}
{% if is_incremental() %}
WHERE last_login_at > '{{ watermark_value }}'
{% endif %}

On the first run, is_incremental() returns false (the target table does not exist yet), so RAT loads all data. On subsequent runs, it returns true, and only rows newer than the watermark are processed.

You can also use Jinja for other conditional patterns:

pipeline.sql
SELECT
    order_id,
    status,
    {% if is_scd2() %}
    valid_from,
    valid_to,
    {% endif %}
    updated_at
FROM {{ ref('bronze.raw_orders') }}

Pipeline Execution Flow

When you run a SQL pipeline, RAT goes through these phases:

Create Ephemeral Branch

RAT creates a Nessie branch named run-{run_id}. All writes happen on this branch, never on main.

Parse Configuration

RAT reads config.yaml and parses annotations from the SQL file. Annotations override config values.

Compile Jinja Template

Jinja expressions (ref(), landing_zone(), is_incremental(), etc.) are resolved into actual SQL.

Execute SQL via DuckDB

The compiled SQL runs in DuckDB with Iceberg and S3 extensions loaded. DuckDB produces a result table.

Write to Iceberg

The result table is written to Apache Iceberg on the ephemeral branch using the configured merge strategy.

Run Quality Tests

Any quality test SQL files in the pipeline’s tests/quality/ directory are executed against the branch data.

Branch Resolution

If all quality tests pass (or only have warn severity), the branch is merged to main. If any error severity test fails, the branch is deleted and the run fails — bad data never reaches production.

Full Examples

Bronze: Ingest from Landing Zone

This pipeline reads raw CSV files from a landing zone and loads them as-is into a Bronze table:

ecommerce/pipelines/bronze/raw_orders/pipeline.sql
-- @merge_strategy: append_only
-- @archive_landing_zones: true
-- @description: Ingest raw order CSVs from the orders landing zone
 
SELECT
    *,
    '{{ run_started_at }}' AS _loaded_at,
    filename AS _source_file
FROM read_csv_auto(
    '{{ landing_zone("orders") }}',
    header = true,
    all_varchar = true
)

Using all_varchar = true in Bronze is a common pattern. It prevents type-casting errors on messy source data — you handle typing in the Silver layer instead.

Silver: Clean and Deduplicate

This pipeline takes raw orders from Bronze, casts types, and deduplicates using incremental processing:

ecommerce/pipelines/silver/clean_orders/pipeline.sql
-- @merge_strategy: incremental
-- @unique_key: order_id
-- @watermark_column: updated_at
-- @description: Cleaned and deduplicated orders
 
SELECT
    CAST(order_id AS INTEGER) AS order_id,
    CAST(customer_id AS INTEGER) AS customer_id,
    TRIM(status) AS status,
    CAST(total_amount AS DECIMAL(12, 2)) AS total_amount,
    CAST(currency AS VARCHAR(3)) AS currency,
    TRY_CAST(order_date AS DATE) AS order_date,
    CAST(updated_at AS TIMESTAMP) AS updated_at
FROM {{ ref('bronze.raw_orders') }}
WHERE order_id IS NOT NULL
  AND total_amount IS NOT NULL
{% if is_incremental() %}
  AND CAST(updated_at AS TIMESTAMP) > '{{ watermark_value }}'
{% endif %}

Gold: Business Aggregation

This pipeline aggregates clean Silver data into a Gold summary table for dashboards:

ecommerce/pipelines/gold/daily_revenue/pipeline.sql
-- @merge_strategy: full_refresh
-- @description: Daily revenue by currency for executive dashboards
 
SELECT
    order_date,
    currency,
    COUNT(*) AS order_count,
    SUM(total_amount) AS total_revenue,
    AVG(total_amount) AS avg_order_value,
    COUNT(DISTINCT customer_id) AS unique_customers
FROM {{ ref('silver.clean_orders') }}
WHERE status NOT IN ('cancelled', 'refunded')
GROUP BY order_date, currency
ORDER BY order_date DESC, currency

Combining Multiple Sources

Pipelines can reference multiple tables and landing zones:

ecommerce/pipelines/silver/enriched_orders/pipeline.sql
-- @merge_strategy: incremental
-- @unique_key: order_id
-- @watermark_column: updated_at
-- @description: Orders enriched with customer and product data
 
SELECT
    o.order_id,
    o.order_date,
    o.total_amount,
    o.updated_at,
    c.name AS customer_name,
    c.segment AS customer_segment,
    p.category AS product_category
FROM {{ ref('silver.clean_orders') }} o
LEFT JOIN {{ ref('silver.customers') }} c
    ON o.customer_id = c.customer_id
LEFT JOIN {{ ref('silver.products') }} p
    ON o.product_id = p.product_id
{% if is_incremental() %}
WHERE o.updated_at > '{{ watermark_value }}'
{% endif %}

Tips and Best Practices

Bronze pipelines should ingest data as-is. Don’t transform, filter, or cast types in Bronze — leave that for Silver.

Keep Bronze raw

Use all_varchar = true when reading CSVs in Bronze. This avoids type inference errors on messy source data.

Use all_varchar in Bronze

Never hardcode table paths. Always use ref() — it registers lineage and resolves paths correctly across environments.

Always use ref()

Keep all annotations at the very top of the file, before any SQL. This is a convention that makes pipelines scannable.

Annotations at the top
⚠️

Common mistake: Forgetting {% if is_incremental() %} in an incremental pipeline. Without the watermark filter, every run reprocesses all data — defeating the purpose of incremental processing. On the first run, the filter is skipped automatically (the table does not exist yet).