GuidesBest PracticesPipeline Patterns

Pipeline Patterns

This page collects common SQL patterns for RAT pipelines. Each pattern is a proven approach to a recurring data engineering task — from basic staging CTEs to complex incremental logic.


Staging Pattern (CTE-Based Cleanup)

The staging pattern uses Common Table Expressions (CTEs) to build a clean transformation in stages. Each CTE handles one concern, making the pipeline readable and debuggable.

silver/clean_orders/pipeline.sql
-- @merge_strategy: incremental
-- @unique_key: order_id
-- @watermark_column: updated_at
 
WITH source AS (
    -- Step 1: Read raw data
    SELECT *
    FROM {{ ref('bronze.raw_orders') }}
    {% if is_incremental() %}
    WHERE updated_at > '{{ watermark_value }}'
    {% endif %}
),
 
renamed AS (
    -- Step 2: Rename and cast columns
    SELECT
        CAST(order_id AS INTEGER) AS order_id,
        CAST(customer_id AS INTEGER) AS customer_id,
        CAST(product_id AS VARCHAR) AS product_id,
        CAST(quantity AS INTEGER) AS quantity,
        CAST(unit_price AS DECIMAL(10,2)) AS unit_price,
        UPPER(TRIM(status)) AS status,
        CAST(order_date AS DATE) AS order_date,
        CAST(updated_at AS TIMESTAMP) AS updated_at
    FROM source
),
 
filtered AS (
    -- Step 3: Remove invalid records
    SELECT *
    FROM renamed
    WHERE order_id IS NOT NULL
      AND customer_id IS NOT NULL
      AND quantity > 0
      AND unit_price >= 0
),
 
enriched AS (
    -- Step 4: Add computed columns
    SELECT
        *,
        quantity * unit_price AS line_total,
        CASE
            WHEN status IN ('SHIPPED', 'DELIVERED') THEN 'fulfilled'
            WHEN status = 'CANCELLED' THEN 'cancelled'
            ELSE 'open'
        END AS status_category
    FROM filtered
)
 
-- Step 5: Final select
SELECT * FROM enriched

Why this works well:

  • Each CTE is testable independently (add SELECT * FROM renamed LIMIT 10 during development)
  • Concerns are separated: source reading, renaming, filtering, enrichment
  • Easy to add or remove stages without reorganizing the query

Deduplication Patterns

Deduplicate on Unique Key (Latest Wins)

When source data contains duplicate records, keep only the most recent version of each key:

silver/clean_events/pipeline.sql
-- @merge_strategy: incremental
-- @unique_key: event_id
-- @watermark_column: event_timestamp
 
WITH ranked AS (
    SELECT
        *,
        ROW_NUMBER() OVER (
            PARTITION BY event_id
            ORDER BY event_timestamp DESC
        ) AS rn
    FROM {{ ref('bronze.raw_events') }}
    {% if is_incremental() %}
    WHERE event_timestamp > '{{ watermark_value }}'
    {% endif %}
)
 
SELECT
    event_id,
    event_type,
    user_id,
    payload,
    event_timestamp
FROM ranked
WHERE rn = 1

Deduplicate with Tie-Breaking

When the primary ordering column might have ties, add secondary sort columns:

pipeline.sql
WITH ranked AS (
    SELECT
        *,
        ROW_NUMBER() OVER (
            PARTITION BY customer_id
            ORDER BY
                updated_at DESC,       -- prefer most recent
                source_priority ASC,   -- prefer higher-priority source
                _ingested_at DESC      -- tie-break on ingestion time
        ) AS rn
    FROM {{ ref('bronze.raw_customers') }}
)
 
SELECT * FROM ranked WHERE rn = 1

Deduplicate Across Sources

When ingesting the same entity from multiple sources:

silver/unified_customers/pipeline.sql
-- @merge_strategy: incremental
-- @unique_key: customer_id
 
WITH all_sources AS (
    SELECT *, 'crm' AS source, 1 AS source_priority
    FROM {{ ref('bronze.crm_customers') }}
    UNION ALL
    SELECT *, 'web' AS source, 2 AS source_priority
    FROM {{ ref('bronze.web_customers') }}
),
 
ranked AS (
    SELECT
        *,
        ROW_NUMBER() OVER (
            PARTITION BY customer_id
            ORDER BY source_priority, updated_at DESC
        ) AS rn
    FROM all_sources
)
 
SELECT customer_id, name, email, address, source, updated_at
FROM ranked
WHERE rn = 1

Join Patterns

Inner Join (Multiple ref() Tables)

gold/order_summary/pipeline.sql
-- @merge_strategy: full_refresh
 
SELECT
    o.order_id,
    o.order_date,
    c.customer_name,
    c.customer_email,
    p.product_name,
    p.category,
    o.quantity,
    o.line_total
FROM {{ ref('silver.clean_orders') }} o
INNER JOIN {{ ref('silver.clean_customers') }} c
    ON o.customer_id = c.customer_id
INNER JOIN {{ ref('silver.clean_products') }} p
    ON o.product_id = p.product_id

Left Join with Null Handling

gold/customer_orders/pipeline.sql
-- @merge_strategy: full_refresh
 
SELECT
    c.customer_id,
    c.customer_name,
    c.signup_date,
    COALESCE(order_stats.total_orders, 0) AS total_orders,
    COALESCE(order_stats.total_revenue, 0) AS total_revenue,
    order_stats.first_order_date,
    order_stats.last_order_date,
    CASE
        WHEN order_stats.total_orders IS NULL THEN 'never_ordered'
        WHEN order_stats.last_order_date < CURRENT_DATE - INTERVAL '90 days' THEN 'churned'
        ELSE 'active'
    END AS customer_status
FROM {{ ref('silver.clean_customers') }} c
LEFT JOIN (
    SELECT
        customer_id,
        COUNT(*) AS total_orders,
        SUM(line_total) AS total_revenue,
        MIN(order_date) AS first_order_date,
        MAX(order_date) AS last_order_date
    FROM {{ ref('silver.clean_orders') }}
    WHERE status != 'CANCELLED'
    GROUP BY customer_id
) order_stats ON c.customer_id = order_stats.customer_id

Self-Join (Comparing Periods)

gold/month_over_month/pipeline.sql
-- @merge_strategy: full_refresh
 
WITH monthly AS (
    SELECT
        DATE_TRUNC('month', order_date) AS month,
        SUM(line_total) AS revenue,
        COUNT(DISTINCT order_id) AS orders
    FROM {{ ref('silver.clean_orders') }}
    WHERE status NOT IN ('CANCELLED', 'REFUNDED')
    GROUP BY DATE_TRUNC('month', order_date)
)
 
SELECT
    curr.month,
    curr.revenue AS current_revenue,
    prev.revenue AS previous_revenue,
    curr.revenue - COALESCE(prev.revenue, 0) AS revenue_change,
    ROUND(
        (curr.revenue - COALESCE(prev.revenue, curr.revenue))
        / NULLIF(prev.revenue, 0) * 100, 2
    ) AS revenue_change_pct
FROM monthly curr
LEFT JOIN monthly prev
    ON curr.month = prev.month + INTERVAL '1 month'
ORDER BY curr.month DESC

Aggregation Patterns

Daily Rollup

gold/daily_metrics/pipeline.sql
-- @merge_strategy: snapshot
-- @partition_column: metric_date
 
SELECT
    CURRENT_DATE - INTERVAL '1 day' AS metric_date,
    COUNT(DISTINCT order_id) AS total_orders,
    COUNT(DISTINCT customer_id) AS unique_customers,
    SUM(line_total) AS total_revenue,
    AVG(line_total) AS avg_order_value,
    PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY line_total) AS median_order_value,
    MAX(line_total) AS max_order_value
FROM {{ ref('silver.clean_orders') }}
WHERE order_date = CURRENT_DATE - INTERVAL '1 day'
  AND status NOT IN ('CANCELLED', 'REFUNDED')

Pivot / Wide Aggregation

gold/status_breakdown/pipeline.sql
-- @merge_strategy: full_refresh
 
SELECT
    DATE_TRUNC('day', order_date) AS day,
    COUNT(*) FILTER (WHERE status = 'PENDING') AS pending_count,
    COUNT(*) FILTER (WHERE status = 'CONFIRMED') AS confirmed_count,
    COUNT(*) FILTER (WHERE status = 'SHIPPED') AS shipped_count,
    COUNT(*) FILTER (WHERE status = 'DELIVERED') AS delivered_count,
    COUNT(*) FILTER (WHERE status = 'CANCELLED') AS cancelled_count,
    COUNT(*) AS total_count
FROM {{ ref('silver.clean_orders') }}
GROUP BY DATE_TRUNC('day', order_date)
ORDER BY day DESC

Running Totals / Window Functions

gold/cumulative_revenue/pipeline.sql
-- @merge_strategy: full_refresh
 
WITH daily AS (
    SELECT
        order_date,
        SUM(line_total) AS daily_revenue
    FROM {{ ref('silver.clean_orders') }}
    WHERE status NOT IN ('CANCELLED', 'REFUNDED')
    GROUP BY order_date
)
 
SELECT
    order_date,
    daily_revenue,
    SUM(daily_revenue) OVER (ORDER BY order_date) AS cumulative_revenue,
    AVG(daily_revenue) OVER (
        ORDER BY order_date
        ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
    ) AS rolling_7d_avg
FROM daily
ORDER BY order_date

Conditional Incremental Logic

The {% if is_incremental() %} block lets you write different logic for full vs. incremental runs. The first run processes all data; subsequent runs process only new/changed records.

Basic Watermark Pattern

pipeline.sql
-- @merge_strategy: incremental
-- @unique_key: order_id
-- @watermark_column: updated_at
 
SELECT *
FROM {{ ref('bronze.raw_orders') }}
{% if is_incremental() %}
WHERE updated_at > '{{ watermark_value }}'
{% endif %}

Multi-Source Incremental

When joining multiple sources incrementally, filter each source independently:

pipeline.sql
-- @merge_strategy: incremental
-- @unique_key: order_id
-- @watermark_column: updated_at
 
SELECT
    o.order_id,
    o.customer_id,
    c.customer_name,
    o.total_amount,
    o.updated_at
FROM {{ ref('bronze.raw_orders') }} o
JOIN {{ ref('silver.clean_customers') }} c
    ON o.customer_id = c.customer_id
{% if is_incremental() %}
WHERE o.updated_at > '{{ watermark_value }}'
{% endif %}

When joining an incremental source with a reference table, only the primary source needs the watermark filter. The reference table (clean_customers above) is always read in full because you want to join against the latest customer data.

Conditional Merge Strategy Logic

Check the current strategy inside the template:

pipeline.sql
-- @merge_strategy: incremental
-- @unique_key: event_id
-- @watermark_column: event_time
 
SELECT
    event_id,
    event_type,
    user_id,
    event_time,
    {% if is_incremental() %}
    'incremental' AS load_type
    {% else %}
    'full' AS load_type
    {% endif %}
FROM {{ ref('bronze.raw_events') }}
{% if is_incremental() %}
WHERE event_time > '{{ watermark_value }}'
{% endif %}

Landing Zone Ingestion Patterns

CSV Ingestion

bronze/ingest_orders/pipeline.sql
-- @merge_strategy: append_only
-- @archive_landing_zones: true
 
SELECT *
FROM read_csv_auto(
    {{ landing_zone('raw_orders') }},
    header = true,
    all_varchar = true,
    ignore_errors = true
)

Using all_varchar = true reads all columns as strings. This prevents type inference errors on messy CSV data. Cast to proper types in the Silver layer instead.

Parquet Ingestion

bronze/ingest_events/pipeline.sql
-- @merge_strategy: append_only
-- @archive_landing_zones: true
 
SELECT *
FROM read_parquet(
    {{ landing_zone('raw_events') }},
    hive_partitioning = false
)

JSON Ingestion

bronze/ingest_api_responses/pipeline.sql
-- @merge_strategy: append_only
-- @archive_landing_zones: true
 
SELECT *
FROM read_json_auto(
    {{ landing_zone('api_responses') }},
    maximum_object_size = 10485760
)

CSV with Explicit Schema

When CSV files have known schema issues, define the schema explicitly:

bronze/ingest_products/pipeline.sql
-- @merge_strategy: full_refresh
 
SELECT *
FROM read_csv(
    {{ landing_zone('raw_products') }},
    columns = {
        'product_id': 'VARCHAR',
        'name': 'VARCHAR',
        'category': 'VARCHAR',
        'price': 'VARCHAR',
        'stock': 'VARCHAR'
    },
    header = true,
    delim = ',',
    quote = '"',
    escape = '"',
    null_padding = true,
    ignore_errors = true
)

Multi-Format Ingestion

When a landing zone receives mixed file types:

bronze/ingest_mixed/pipeline.sql
-- @merge_strategy: append_only
 
SELECT *, 'csv' AS _source_format
FROM read_csv_auto(
    {{ landing_zone('raw_data') }},
    filename = true,
    header = true,
    ignore_errors = true
)
WHERE filename LIKE '%.csv'
⚠️

DuckDB’s read_csv_auto and read_parquet interpret the landing zone glob pattern differently. If your landing zone contains a mix of file types, filter by filename to avoid parse errors.


The this Reference

The {{ this }} variable resolves to the current pipeline’s own output table. It is useful for self-referencing patterns like deduplication against existing data:

pipeline.sql
-- @merge_strategy: incremental
-- @unique_key: customer_id
 
SELECT
    new.customer_id,
    new.customer_name,
    new.updated_at
FROM {{ ref('bronze.raw_customers') }} new
{% if is_incremental() %}
LEFT JOIN {{ this }} existing
    ON new.customer_id = existing.customer_id
WHERE existing.customer_id IS NULL
   OR new.updated_at > existing.updated_at
{% endif %}

Template Variables Reference

All variables available in pipeline SQL templates:

VariableTypeDescription
{{ ref('layer.name') }}FunctionResolves to iceberg_scan() of the referenced table
{{ ref('ns.layer.name') }}FunctionCross-namespace reference
{{ landing_zone('zone') }}FunctionResolves to S3 glob for landing zone files
{{ this }}StringCurrent pipeline’s own output table reference
{{ run_started_at }}StringISO timestamp of the current run start
{{ watermark_value }}StringMax value of the watermark column (incremental only)
{% if is_incremental() %}FunctionTrue when merge_strategy is incremental
{% if is_scd2() %}FunctionTrue when merge_strategy is scd2
{% if is_snapshot() %}FunctionTrue when merge_strategy is snapshot
{% if is_append_only() %}FunctionTrue when merge_strategy is append_only
{% if is_delete_insert() %}FunctionTrue when merge_strategy is delete_insert