SQL Pipelines
SQL pipelines are the core building block of RAT. You write a SELECT statement, and RAT handles execution, deduplication, Iceberg writes, quality testing, and branch isolation. If you can write SQL, you can build data pipelines.
Basic Structure
Every SQL pipeline lives in a file called pipeline.sql inside your pipeline directory:
{namespace}/pipelines/{layer}/{pipeline_name}/pipeline.sqlFor example, a pipeline called clean_orders in the silver layer of the ecommerce namespace:
ecommerce/pipelines/silver/clean_orders/pipeline.sqlAt its simplest, a pipeline is just a SELECT statement:
SELECT
id,
TRIM(name) AS name,
CAST(price AS DECIMAL(10, 2)) AS price,
created_at
FROM {{ ref('bronze.raw_products') }}
WHERE price > 0RAT takes the result of your SELECT and writes it to an Apache Iceberg table. The table name is derived from the pipeline’s location: {namespace}.{layer}.{pipeline_name}.
Annotations
Annotations are special SQL comments at the top of your pipeline that configure how RAT processes the result. They use the -- @key: value format:
-- @merge_strategy: incremental
-- @unique_key: order_id
-- @watermark_column: updated_at
-- @description: Deduplicated orders with latest status
SELECT * FROM {{ ref('bronze.raw_orders') }}
{% if is_incremental() %}
WHERE updated_at > '{{ watermark_value }}'
{% endif %}Available Annotations
| Annotation | Values | Description |
|---|---|---|
@merge_strategy | full_refresh, incremental, append_only, delete_insert, scd2, snapshot | How new data merges with existing data. Default: full_refresh |
@unique_key | Column name(s) | Primary key for dedup. Comma-separated for composite keys |
@watermark_column | Column name | Column used to track incremental progress |
@description | Free text | Human-readable description shown in the portal |
@partition_column | Column name | Partition column for snapshot strategy |
@archive_landing_zones | true / false | Move landing zone files to _processed/ after run |
@scd_valid_from | Column name | SCD2: column for record start date. Default: valid_from |
@scd_valid_to | Column name | SCD2: column for record end date. Default: valid_to |
@materialized | table | Materialization type (currently only table) |
Annotations must appear at the top of the file before any SQL or Jinja code. The order of annotations does not matter, but by convention they are listed in this order: merge_strategy, unique_key, watermark_column, partition_column, scd_valid_from, scd_valid_to, archive_landing_zones, materialized, description.
Annotations vs config.yaml
You can also configure pipelines via a config.yaml file in the same directory:
merge_strategy: incremental
unique_key: order_id
watermark_column: updated_at
description: Deduplicated orders with latest statusWhen both exist, annotations win. The resolution order is:
- Annotations in
pipeline.sql(highest priority) config.yamlin the pipeline directory- Default values
If you define merge_strategy: full_refresh in config.yaml but -- @merge_strategy: incremental in the SQL file, the pipeline will use incremental. This can be confusing — pick one approach and stick with it.
Jinja Templating
RAT pipelines use Jinja2 templating to inject dynamic values and references. Jinja expressions are wrapped in {{ }} for values and {% %} for control flow.
Template Variables
| Variable | Type | Description |
|---|---|---|
{{ ref('layer.name') }} | Function | Reference another table. Resolves to iceberg_scan('<path>') |
{{ ref('ns.layer.name') }} | Function | Cross-namespace reference with explicit namespace |
{{ landing_zone('zone_name') }} | Function | Resolves to landing zone S3 path: s3://rat/{ns}/landing/{zone}/** |
{{ this }} | String | Current pipeline’s own Iceberg table (for self-referencing) |
{{ run_started_at }} | String | ISO 8601 UTC timestamp of when the run started |
{{ watermark_value }} | String | MAX value of watermark_column from the existing table |
{{ is_incremental() }} | Boolean | true if strategy is incremental and table already exists |
{{ is_scd2() }} | Boolean | true if strategy is scd2 |
The ref() Function
ref() is the most important template function. It creates a dependency between pipelines and resolves table references to Iceberg scan paths.
2-part syntax — references a table within the current namespace:
SELECT * FROM {{ ref('bronze.raw_orders') }}This resolves to something like:
SELECT * FROM iceberg_scan('s3://rat/ecommerce/bronze/raw_orders/metadata/...')3-part syntax — references a table in a different namespace:
SELECT
o.*,
c.country
FROM {{ ref('bronze.raw_orders') }} o
JOIN {{ ref('shared.silver.customers') }} c ON o.customer_id = c.idref() does more than resolve paths — it also registers the dependency in RAT’s lineage graph. This means RAT knows which pipelines depend on which tables, enabling DAG visualization and dependency-aware scheduling.
Conditional Logic
Use Jinja control flow for conditional SQL blocks. This is most commonly used with incremental pipelines:
-- @merge_strategy: incremental
-- @unique_key: user_id
-- @watermark_column: last_login_at
SELECT
user_id,
email,
last_login_at,
login_count
FROM {{ ref('bronze.raw_logins') }}
{% if is_incremental() %}
WHERE last_login_at > '{{ watermark_value }}'
{% endif %}On the first run, is_incremental() returns false (the target table does not exist yet), so RAT loads all data. On subsequent runs, it returns true, and only rows newer than the watermark are processed.
You can also use Jinja for other conditional patterns:
SELECT
order_id,
status,
{% if is_scd2() %}
valid_from,
valid_to,
{% endif %}
updated_at
FROM {{ ref('bronze.raw_orders') }}Pipeline Execution Flow
When you run a SQL pipeline, RAT goes through these phases:
Create Ephemeral Branch
RAT creates a Nessie branch named run-{run_id}. All writes happen on this branch, never on main.
Parse Configuration
RAT reads config.yaml and parses annotations from the SQL file. Annotations override config values.
Compile Jinja Template
Jinja expressions (ref(), landing_zone(), is_incremental(), etc.) are resolved into actual SQL.
Execute SQL via DuckDB
The compiled SQL runs in DuckDB with Iceberg and S3 extensions loaded. DuckDB produces a result table.
Write to Iceberg
The result table is written to Apache Iceberg on the ephemeral branch using the configured merge strategy.
Run Quality Tests
Any quality test SQL files in the pipeline’s tests/quality/ directory are executed against the branch data.
Branch Resolution
If all quality tests pass (or only have warn severity), the branch is merged to main. If any error severity test fails, the branch is deleted and the run fails — bad data never reaches production.
Full Examples
Bronze: Ingest from Landing Zone
This pipeline reads raw CSV files from a landing zone and loads them as-is into a Bronze table:
-- @merge_strategy: append_only
-- @archive_landing_zones: true
-- @description: Ingest raw order CSVs from the orders landing zone
SELECT
*,
'{{ run_started_at }}' AS _loaded_at,
filename AS _source_file
FROM read_csv_auto(
'{{ landing_zone("orders") }}',
header = true,
all_varchar = true
)Using all_varchar = true in Bronze is a common pattern. It prevents type-casting errors on messy source data — you handle typing in the Silver layer instead.
Silver: Clean and Deduplicate
This pipeline takes raw orders from Bronze, casts types, and deduplicates using incremental processing:
-- @merge_strategy: incremental
-- @unique_key: order_id
-- @watermark_column: updated_at
-- @description: Cleaned and deduplicated orders
SELECT
CAST(order_id AS INTEGER) AS order_id,
CAST(customer_id AS INTEGER) AS customer_id,
TRIM(status) AS status,
CAST(total_amount AS DECIMAL(12, 2)) AS total_amount,
CAST(currency AS VARCHAR(3)) AS currency,
TRY_CAST(order_date AS DATE) AS order_date,
CAST(updated_at AS TIMESTAMP) AS updated_at
FROM {{ ref('bronze.raw_orders') }}
WHERE order_id IS NOT NULL
AND total_amount IS NOT NULL
{% if is_incremental() %}
AND CAST(updated_at AS TIMESTAMP) > '{{ watermark_value }}'
{% endif %}Gold: Business Aggregation
This pipeline aggregates clean Silver data into a Gold summary table for dashboards:
-- @merge_strategy: full_refresh
-- @description: Daily revenue by currency for executive dashboards
SELECT
order_date,
currency,
COUNT(*) AS order_count,
SUM(total_amount) AS total_revenue,
AVG(total_amount) AS avg_order_value,
COUNT(DISTINCT customer_id) AS unique_customers
FROM {{ ref('silver.clean_orders') }}
WHERE status NOT IN ('cancelled', 'refunded')
GROUP BY order_date, currency
ORDER BY order_date DESC, currencyCombining Multiple Sources
Pipelines can reference multiple tables and landing zones:
-- @merge_strategy: incremental
-- @unique_key: order_id
-- @watermark_column: updated_at
-- @description: Orders enriched with customer and product data
SELECT
o.order_id,
o.order_date,
o.total_amount,
o.updated_at,
c.name AS customer_name,
c.segment AS customer_segment,
p.category AS product_category
FROM {{ ref('silver.clean_orders') }} o
LEFT JOIN {{ ref('silver.customers') }} c
ON o.customer_id = c.customer_id
LEFT JOIN {{ ref('silver.products') }} p
ON o.product_id = p.product_id
{% if is_incremental() %}
WHERE o.updated_at > '{{ watermark_value }}'
{% endif %}Tips and Best Practices
Bronze pipelines should ingest data as-is. Don’t transform, filter, or cast types in Bronze — leave that for Silver.
Use all_varchar = true when reading CSVs in Bronze. This avoids type inference errors on messy source data.
Never hardcode table paths. Always use ref() — it registers lineage and resolves paths correctly across environments.
Keep all annotations at the very top of the file, before any SQL. This is a convention that makes pipelines scannable.
Common mistake: Forgetting {% if is_incremental() %} in an incremental pipeline. Without the watermark filter, every run reprocesses all data — defeating the purpose of incremental processing. On the first run, the filter is skipped automatically (the table does not exist yet).