GuidesIncremental Pipelines

Incremental Pipelines

Incremental pipelines are the backbone of efficient data processing. Instead of reprocessing all data on every run, they detect what has changed since the last run and process only the new or updated rows. This dramatically reduces execution time and resource usage for large datasets.

Why Incremental?

Consider a table with 100 million rows where 10,000 new rows arrive every hour. With full_refresh, you reprocess all 100 million rows every hour. With incremental, you process only the 10,000 new rows.

Metricfull_refreshincremental
Rows processed per run100,000,000~10,000
Run time15 minutes5 seconds
DuckDB memory8 GB50 MB
Iceberg files writtenFull rewriteAppend + merge

How Incremental Processing Works

Incremental processing in RAT uses three mechanisms working together:

  1. Watermark column — A timestamp or monotonically increasing column that tracks “recency”
  2. Watermark value — The MAX() of the watermark column from the existing target table
  3. Unique key — The column(s) used to match incoming rows with existing rows for upsert

Setting Up an Incremental Pipeline

Choose your watermark column

The watermark column should be a timestamp or numeric column that increases with new or updated data. Good choices:

  • updated_at — A timestamp updated whenever the row changes
  • created_at — If rows are immutable (never updated, only inserted)
  • event_time — For event streams
  • An auto-incrementing id — If no timestamp is available
⚠️

The watermark column must be monotonically increasing for new data. If you backfill old data with timestamps earlier than the current watermark, those rows will be skipped on incremental runs.

Choose your unique key

The unique key identifies each row for upsert matching. It can be a single column or composite:

  • order_id — Single-column key
  • order_id, product_id — Composite key for order line items
  • user_id, event_date — Composite key for daily user snapshots

Add the annotations

pipeline.sql
-- @merge_strategy: incremental
-- @unique_key: order_id
-- @watermark_column: updated_at

Write the SQL with watermark filter

pipeline.sql
-- @merge_strategy: incremental
-- @unique_key: order_id
-- @watermark_column: updated_at
 
SELECT
    order_id,
    customer_id,
    status,
    total_amount,
    updated_at
FROM {{ ref('bronze.raw_orders') }}
{% if is_incremental() %}
WHERE updated_at > '{{ watermark_value }}'
{% endif %}

First Run vs Subsequent Runs

RAT behaves differently depending on whether the target table already exists:

First Run (Table Does Not Exist)

  • is_incremental() returns false
  • The {% if is_incremental() %} block is skipped
  • Your query runs without the watermark filter — processing all source rows
  • RAT creates the target Iceberg table and writes all results
  • This is effectively a full_refresh for the first run

Subsequent Runs (Table Exists)

  • is_incremental() returns true
  • RAT reads MAX(watermark_column) from the existing target table
  • watermark_value is set to that value
  • The {% if is_incremental() %} block is included
  • Your query processes only rows newer than the watermark
  • Results are deduplicated by unique_key and merged into the existing table

How watermark_value Works

The watermark_value template variable contains the result of:

SELECT MAX(watermark_column) FROM target_table

This query runs against the existing Iceberg table before pipeline execution begins. The value is then injected into your Jinja template as a string.

Timestamp Watermarks

For timestamp columns, watermark_value is an ISO 8601 string:

-- watermark_value = '2024-03-15 14:30:00'
WHERE updated_at > '{{ watermark_value }}'

Numeric Watermarks

For numeric columns (like auto-incrementing IDs), the value is a number:

-- watermark_value = '5847293'
WHERE id > {{ watermark_value }}

Note the subtle difference: timestamp watermarks need single quotes around the value (> '{{ watermark_value }}') because they’re strings in SQL. Numeric watermarks do not.

NULL Watermarks

If the target table exists but is empty (all rows were deleted), watermark_value will be NULL. Your SQL should handle this gracefully. The simplest approach is to use COALESCE:

{% if is_incremental() %}
WHERE updated_at > COALESCE('{{ watermark_value }}', '1970-01-01')
{% endif %}

Unique Keys and Dedup Behavior

When the pipeline output contains multiple rows with the same unique_key, RAT deduplicates them using ROW_NUMBER():

-- RAT internally does this:
SELECT * FROM (
    SELECT *, ROW_NUMBER() OVER (
        PARTITION BY order_id
        ORDER BY order_id  -- last row wins
    ) AS _rn
    FROM pipeline_output
) WHERE _rn = 1

The last row in the result set for each key is kept. All other duplicates are discarded before the merge.

Composite Unique Keys

For composite keys, all columns must match for two rows to be considered the same:

-- @unique_key: order_id, product_id
 
-- These are DIFFERENT rows (different product_id):
-- order_id=1, product_id=100
-- order_id=1, product_id=200
 
-- These are the SAME row (dedup applies):
-- order_id=1, product_id=100, qty=5
-- order_id=1, product_id=100, qty=3  <-- this one wins (last)

The Merge Operation

After dedup, RAT performs an Iceberg merge (upsert):

  • Matching keys (row exists in target): The existing row is updated with all columns from the new row
  • Non-matching keys (row does not exist in target): The new row is inserted
  • Keys only in target (not in pipeline output): Left untouched

This means incremental is truly an upsert — it never deletes rows that are not in the current pipeline output.

The {% if is_incremental() %} Pattern

This Jinja conditional is the core pattern for incremental pipelines. Let’s break down why it works:

pipeline.sql
-- @merge_strategy: incremental
-- @unique_key: user_id
-- @watermark_column: last_active_at
 
SELECT
    user_id,
    email,
    plan,
    last_active_at
FROM {{ ref('bronze.raw_users') }}
 
{% if is_incremental() %}
-- This block is ONLY included when the target table already exists
WHERE last_active_at > '{{ watermark_value }}'
{% endif %}

First run — The SQL executed is:

SELECT user_id, email, plan, last_active_at
FROM iceberg_scan('...')
-- No WHERE clause — all rows processed

Second run — The SQL executed is:

SELECT user_id, email, plan, last_active_at
FROM iceberg_scan('...')
WHERE last_active_at > '2024-03-15 14:30:00'
-- Only new/updated rows processed

delete_insert vs incremental

Both strategies use a unique_key, but they behave differently. Choosing the wrong one is a common mistake.

Aspectincrementaldelete_insert
Dedup incoming rowsYes (ROW_NUMBER, last wins)No
Multiple rows per keyKeeps 1 per unique keyKeeps all rows per key value
OperationMERGE (upsert)DELETE matching + INSERT all
Missing keys in outputLeft untouchedLeft untouched
Best forEntity tables (1 row per key)Batch tables (many rows per key)

When to Use incremental

Use incremental when each unique key should have exactly one row in the target table. Classic entity/dimension pattern:

pipeline.sql
-- @merge_strategy: incremental
-- @unique_key: customer_id
-- @watermark_column: updated_at
 
-- One row per customer, latest state wins
SELECT customer_id, name, email, plan, updated_at
FROM {{ ref('bronze.raw_customers') }}
{% if is_incremental() %}
WHERE updated_at > '{{ watermark_value }}'
{% endif %}

When to Use delete_insert

Use delete_insert when a key maps to multiple rows and you want to replace all rows for that key as a batch:

pipeline.sql
-- @merge_strategy: delete_insert
-- @unique_key: report_date
 
-- Many rows per date — replace the entire day's data
SELECT
    report_date,
    product_id,
    units_sold,
    revenue
FROM {{ ref('bronze.raw_daily_sales') }}
WHERE report_date = CURRENT_DATE

If you used incremental here, each (report_date, product_id) would need to be a composite unique key. With delete_insert, you simply say “delete everything for this date and insert the new batch.”

Full Example: Incremental Order Updates

Here is a complete incremental pipeline with all the pieces:

ecommerce/pipelines/silver/orders/pipeline.sql
-- @merge_strategy: incremental
-- @unique_key: order_id
-- @watermark_column: updated_at
-- @description: Clean orders with incremental updates via watermark
 
SELECT
    CAST(order_id AS INTEGER) AS order_id,
    CAST(customer_id AS INTEGER) AS customer_id,
    TRIM(UPPER(status)) AS status,
    CAST(total_amount AS DECIMAL(12, 2)) AS total_amount,
    CAST(currency AS VARCHAR(3)) AS currency,
    TRY_CAST(order_date AS DATE) AS order_date,
    TRY_CAST(shipped_date AS DATE) AS shipped_date,
    CAST(updated_at AS TIMESTAMP) AS updated_at,
    '{{ run_started_at }}' AS _loaded_at
FROM {{ ref('bronze.raw_orders') }}
WHERE order_id IS NOT NULL
  AND total_amount IS NOT NULL
{% if is_incremental() %}
  AND CAST(updated_at AS TIMESTAMP) > '{{ watermark_value }}'
{% endif %}

What happens on each run:

Run 1 (table does not exist):

  1. is_incremental() = false -> no WHERE filter on watermark
  2. All 1,000,000 rows from bronze.raw_orders are processed
  3. Dedup by order_id (ROW_NUMBER) removes duplicates
  4. Target table silver.orders is created with ~950,000 unique rows
  5. Watermark after run: 2024-03-15 14:30:00

Run 2 (15 minutes later, 500 new/updated rows in source):

  1. is_incremental() = true
  2. watermark_value = 2024-03-15 14:30:00
  3. WHERE filter: updated_at > '2024-03-15 14:30:00'
  4. 500 rows processed (instead of 1,000,500)
  5. Dedup by order_id -> ~480 unique rows
  6. Merge: 200 existing orders updated, 280 new orders inserted
  7. Watermark after run: 2024-03-15 14:45:00

Advanced Patterns

Multiple Watermark Conditions

Sometimes you need to filter on more than just the watermark:

pipeline.sql
-- @merge_strategy: incremental
-- @unique_key: event_id
-- @watermark_column: event_time
 
SELECT *
FROM {{ ref('bronze.raw_events') }}
{% if is_incremental() %}
WHERE event_time > '{{ watermark_value }}'
  AND event_type != 'heartbeat'  -- additional filter
{% else %}
WHERE event_type != 'heartbeat'  -- same filter on first run too
{% endif %}

Late-Arriving Data

If your source data can arrive late (e.g., events with timestamps hours in the past), add a buffer to the watermark:

pipeline.sql
{% if is_incremental() %}
-- Look back 2 hours from watermark to catch late arrivals
WHERE event_time > TIMESTAMP '{{ watermark_value }}' - INTERVAL 2 HOUR
{% endif %}

Late-arriving data with timestamps before the buffer window will be missed. If you need to catch all late data, consider using delete_insert with a date-based key instead.

Incremental with Cross-Namespace References

pipeline.sql
-- @merge_strategy: incremental
-- @unique_key: order_id
-- @watermark_column: updated_at
 
SELECT
    o.order_id,
    o.total_amount,
    o.updated_at,
    r.exchange_rate
FROM {{ ref('silver.orders') }} o
JOIN {{ ref('finance.silver.exchange_rates') }} r
    ON o.currency = r.currency
    AND o.order_date = r.rate_date
{% if is_incremental() %}
WHERE o.updated_at > '{{ watermark_value }}'
{% endif %}

Troubleshooting

Pipeline reprocesses all data every run

Symptom: Run logs show millions of rows processed even though only a few changed.

Cause: Missing {% if is_incremental() %} block, or the watermark filter is not in the query.

Fix: Add the Jinja conditional with a watermark filter.

Duplicate rows in target table

Symptom: Same unique_key appears multiple times in the target table.

Cause: The unique_key annotation does not match the actual unique key of the data, or the key is not truly unique in the source.

Fix: Verify your unique key and add columns to make it truly unique (e.g., order_id, line_item_id).

Watermark not advancing

Symptom: watermark_value stays the same across runs.

Cause: No new rows have a watermark_column value greater than the current watermark, or the pipeline is not writing the watermark column to the target.

Fix: Ensure the watermark_column is included in your SELECT and that new data has increasing values.

Late-arriving data is missed

Symptom: Rows with timestamps before the watermark are never picked up.

Cause: By design, the watermark filter excludes rows older than MAX(watermark_column).

Fix: Add a lookback buffer (see “Late-Arriving Data” above) or use delete_insert with a date-based key for full daily reloads.