Pipeline Patterns
This page collects common SQL patterns for RAT pipelines. Each pattern is a proven approach to a recurring data engineering task — from basic staging CTEs to complex incremental logic.
Staging Pattern (CTE-Based Cleanup)
The staging pattern uses Common Table Expressions (CTEs) to build a clean transformation in stages. Each CTE handles one concern, making the pipeline readable and debuggable.
-- @merge_strategy: incremental
-- @unique_key: order_id
-- @watermark_column: updated_at
WITH source AS (
-- Step 1: Read raw data
SELECT *
FROM {{ ref('bronze.raw_orders') }}
{% if is_incremental() %}
WHERE updated_at > '{{ watermark_value }}'
{% endif %}
),
renamed AS (
-- Step 2: Rename and cast columns
SELECT
CAST(order_id AS INTEGER) AS order_id,
CAST(customer_id AS INTEGER) AS customer_id,
CAST(product_id AS VARCHAR) AS product_id,
CAST(quantity AS INTEGER) AS quantity,
CAST(unit_price AS DECIMAL(10,2)) AS unit_price,
UPPER(TRIM(status)) AS status,
CAST(order_date AS DATE) AS order_date,
CAST(updated_at AS TIMESTAMP) AS updated_at
FROM source
),
filtered AS (
-- Step 3: Remove invalid records
SELECT *
FROM renamed
WHERE order_id IS NOT NULL
AND customer_id IS NOT NULL
AND quantity > 0
AND unit_price >= 0
),
enriched AS (
-- Step 4: Add computed columns
SELECT
*,
quantity * unit_price AS line_total,
CASE
WHEN status IN ('SHIPPED', 'DELIVERED') THEN 'fulfilled'
WHEN status = 'CANCELLED' THEN 'cancelled'
ELSE 'open'
END AS status_category
FROM filtered
)
-- Step 5: Final select
SELECT * FROM enrichedWhy this works well:
- Each CTE is testable independently (add
SELECT * FROM renamed LIMIT 10during development) - Concerns are separated: source reading, renaming, filtering, enrichment
- Easy to add or remove stages without reorganizing the query
Deduplication Patterns
Deduplicate on Unique Key (Latest Wins)
When source data contains duplicate records, keep only the most recent version of each key:
-- @merge_strategy: incremental
-- @unique_key: event_id
-- @watermark_column: event_timestamp
WITH ranked AS (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY event_id
ORDER BY event_timestamp DESC
) AS rn
FROM {{ ref('bronze.raw_events') }}
{% if is_incremental() %}
WHERE event_timestamp > '{{ watermark_value }}'
{% endif %}
)
SELECT
event_id,
event_type,
user_id,
payload,
event_timestamp
FROM ranked
WHERE rn = 1Deduplicate with Tie-Breaking
When the primary ordering column might have ties, add secondary sort columns:
WITH ranked AS (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY customer_id
ORDER BY
updated_at DESC, -- prefer most recent
source_priority ASC, -- prefer higher-priority source
_ingested_at DESC -- tie-break on ingestion time
) AS rn
FROM {{ ref('bronze.raw_customers') }}
)
SELECT * FROM ranked WHERE rn = 1Deduplicate Across Sources
When ingesting the same entity from multiple sources:
-- @merge_strategy: incremental
-- @unique_key: customer_id
WITH all_sources AS (
SELECT *, 'crm' AS source, 1 AS source_priority
FROM {{ ref('bronze.crm_customers') }}
UNION ALL
SELECT *, 'web' AS source, 2 AS source_priority
FROM {{ ref('bronze.web_customers') }}
),
ranked AS (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY customer_id
ORDER BY source_priority, updated_at DESC
) AS rn
FROM all_sources
)
SELECT customer_id, name, email, address, source, updated_at
FROM ranked
WHERE rn = 1Join Patterns
Inner Join (Multiple ref() Tables)
-- @merge_strategy: full_refresh
SELECT
o.order_id,
o.order_date,
c.customer_name,
c.customer_email,
p.product_name,
p.category,
o.quantity,
o.line_total
FROM {{ ref('silver.clean_orders') }} o
INNER JOIN {{ ref('silver.clean_customers') }} c
ON o.customer_id = c.customer_id
INNER JOIN {{ ref('silver.clean_products') }} p
ON o.product_id = p.product_idLeft Join with Null Handling
-- @merge_strategy: full_refresh
SELECT
c.customer_id,
c.customer_name,
c.signup_date,
COALESCE(order_stats.total_orders, 0) AS total_orders,
COALESCE(order_stats.total_revenue, 0) AS total_revenue,
order_stats.first_order_date,
order_stats.last_order_date,
CASE
WHEN order_stats.total_orders IS NULL THEN 'never_ordered'
WHEN order_stats.last_order_date < CURRENT_DATE - INTERVAL '90 days' THEN 'churned'
ELSE 'active'
END AS customer_status
FROM {{ ref('silver.clean_customers') }} c
LEFT JOIN (
SELECT
customer_id,
COUNT(*) AS total_orders,
SUM(line_total) AS total_revenue,
MIN(order_date) AS first_order_date,
MAX(order_date) AS last_order_date
FROM {{ ref('silver.clean_orders') }}
WHERE status != 'CANCELLED'
GROUP BY customer_id
) order_stats ON c.customer_id = order_stats.customer_idSelf-Join (Comparing Periods)
-- @merge_strategy: full_refresh
WITH monthly AS (
SELECT
DATE_TRUNC('month', order_date) AS month,
SUM(line_total) AS revenue,
COUNT(DISTINCT order_id) AS orders
FROM {{ ref('silver.clean_orders') }}
WHERE status NOT IN ('CANCELLED', 'REFUNDED')
GROUP BY DATE_TRUNC('month', order_date)
)
SELECT
curr.month,
curr.revenue AS current_revenue,
prev.revenue AS previous_revenue,
curr.revenue - COALESCE(prev.revenue, 0) AS revenue_change,
ROUND(
(curr.revenue - COALESCE(prev.revenue, curr.revenue))
/ NULLIF(prev.revenue, 0) * 100, 2
) AS revenue_change_pct
FROM monthly curr
LEFT JOIN monthly prev
ON curr.month = prev.month + INTERVAL '1 month'
ORDER BY curr.month DESCAggregation Patterns
Daily Rollup
-- @merge_strategy: snapshot
-- @partition_column: metric_date
SELECT
CURRENT_DATE - INTERVAL '1 day' AS metric_date,
COUNT(DISTINCT order_id) AS total_orders,
COUNT(DISTINCT customer_id) AS unique_customers,
SUM(line_total) AS total_revenue,
AVG(line_total) AS avg_order_value,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY line_total) AS median_order_value,
MAX(line_total) AS max_order_value
FROM {{ ref('silver.clean_orders') }}
WHERE order_date = CURRENT_DATE - INTERVAL '1 day'
AND status NOT IN ('CANCELLED', 'REFUNDED')Pivot / Wide Aggregation
-- @merge_strategy: full_refresh
SELECT
DATE_TRUNC('day', order_date) AS day,
COUNT(*) FILTER (WHERE status = 'PENDING') AS pending_count,
COUNT(*) FILTER (WHERE status = 'CONFIRMED') AS confirmed_count,
COUNT(*) FILTER (WHERE status = 'SHIPPED') AS shipped_count,
COUNT(*) FILTER (WHERE status = 'DELIVERED') AS delivered_count,
COUNT(*) FILTER (WHERE status = 'CANCELLED') AS cancelled_count,
COUNT(*) AS total_count
FROM {{ ref('silver.clean_orders') }}
GROUP BY DATE_TRUNC('day', order_date)
ORDER BY day DESCRunning Totals / Window Functions
-- @merge_strategy: full_refresh
WITH daily AS (
SELECT
order_date,
SUM(line_total) AS daily_revenue
FROM {{ ref('silver.clean_orders') }}
WHERE status NOT IN ('CANCELLED', 'REFUNDED')
GROUP BY order_date
)
SELECT
order_date,
daily_revenue,
SUM(daily_revenue) OVER (ORDER BY order_date) AS cumulative_revenue,
AVG(daily_revenue) OVER (
ORDER BY order_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) AS rolling_7d_avg
FROM daily
ORDER BY order_dateConditional Incremental Logic
The {% if is_incremental() %} block lets you write different logic for full vs. incremental runs. The first run processes all data; subsequent runs process only new/changed records.
Basic Watermark Pattern
-- @merge_strategy: incremental
-- @unique_key: order_id
-- @watermark_column: updated_at
SELECT *
FROM {{ ref('bronze.raw_orders') }}
{% if is_incremental() %}
WHERE updated_at > '{{ watermark_value }}'
{% endif %}Multi-Source Incremental
When joining multiple sources incrementally, filter each source independently:
-- @merge_strategy: incremental
-- @unique_key: order_id
-- @watermark_column: updated_at
SELECT
o.order_id,
o.customer_id,
c.customer_name,
o.total_amount,
o.updated_at
FROM {{ ref('bronze.raw_orders') }} o
JOIN {{ ref('silver.clean_customers') }} c
ON o.customer_id = c.customer_id
{% if is_incremental() %}
WHERE o.updated_at > '{{ watermark_value }}'
{% endif %}When joining an incremental source with a reference table, only the primary source needs the watermark filter. The reference table (clean_customers above) is always read in full because you want to join against the latest customer data.
Conditional Merge Strategy Logic
Check the current strategy inside the template:
-- @merge_strategy: incremental
-- @unique_key: event_id
-- @watermark_column: event_time
SELECT
event_id,
event_type,
user_id,
event_time,
{% if is_incremental() %}
'incremental' AS load_type
{% else %}
'full' AS load_type
{% endif %}
FROM {{ ref('bronze.raw_events') }}
{% if is_incremental() %}
WHERE event_time > '{{ watermark_value }}'
{% endif %}Landing Zone Ingestion Patterns
CSV Ingestion
-- @merge_strategy: append_only
-- @archive_landing_zones: true
SELECT *
FROM read_csv_auto(
{{ landing_zone('raw_orders') }},
header = true,
all_varchar = true,
ignore_errors = true
)Using all_varchar = true reads all columns as strings. This prevents type inference errors on messy CSV data. Cast to proper types in the Silver layer instead.
Parquet Ingestion
-- @merge_strategy: append_only
-- @archive_landing_zones: true
SELECT *
FROM read_parquet(
{{ landing_zone('raw_events') }},
hive_partitioning = false
)JSON Ingestion
-- @merge_strategy: append_only
-- @archive_landing_zones: true
SELECT *
FROM read_json_auto(
{{ landing_zone('api_responses') }},
maximum_object_size = 10485760
)CSV with Explicit Schema
When CSV files have known schema issues, define the schema explicitly:
-- @merge_strategy: full_refresh
SELECT *
FROM read_csv(
{{ landing_zone('raw_products') }},
columns = {
'product_id': 'VARCHAR',
'name': 'VARCHAR',
'category': 'VARCHAR',
'price': 'VARCHAR',
'stock': 'VARCHAR'
},
header = true,
delim = ',',
quote = '"',
escape = '"',
null_padding = true,
ignore_errors = true
)Multi-Format Ingestion
When a landing zone receives mixed file types:
-- @merge_strategy: append_only
SELECT *, 'csv' AS _source_format
FROM read_csv_auto(
{{ landing_zone('raw_data') }},
filename = true,
header = true,
ignore_errors = true
)
WHERE filename LIKE '%.csv'DuckDB’s read_csv_auto and read_parquet interpret the landing zone glob pattern differently. If your landing zone contains a mix of file types, filter by filename to avoid parse errors.
The this Reference
The {{ this }} variable resolves to the current pipeline’s own output table. It is useful for self-referencing patterns like deduplication against existing data:
-- @merge_strategy: incremental
-- @unique_key: customer_id
SELECT
new.customer_id,
new.customer_name,
new.updated_at
FROM {{ ref('bronze.raw_customers') }} new
{% if is_incremental() %}
LEFT JOIN {{ this }} existing
ON new.customer_id = existing.customer_id
WHERE existing.customer_id IS NULL
OR new.updated_at > existing.updated_at
{% endif %}Template Variables Reference
All variables available in pipeline SQL templates:
| Variable | Type | Description |
|---|---|---|
{{ ref('layer.name') }} | Function | Resolves to iceberg_scan() of the referenced table |
{{ ref('ns.layer.name') }} | Function | Cross-namespace reference |
{{ landing_zone('zone') }} | Function | Resolves to S3 glob for landing zone files |
{{ this }} | String | Current pipeline’s own output table reference |
{{ run_started_at }} | String | ISO timestamp of the current run start |
{{ watermark_value }} | String | Max value of the watermark column (incremental only) |
{% if is_incremental() %} | Function | True when merge_strategy is incremental |
{% if is_scd2() %} | Function | True when merge_strategy is scd2 |
{% if is_snapshot() %} | Function | True when merge_strategy is snapshot |
{% if is_append_only() %} | Function | True when merge_strategy is append_only |
{% if is_delete_insert() %} | Function | True when merge_strategy is delete_insert |