Incremental Pipelines
Incremental pipelines are the backbone of efficient data processing. Instead of reprocessing all data on every run, they detect what has changed since the last run and process only the new or updated rows. This dramatically reduces execution time and resource usage for large datasets.
Why Incremental?
Consider a table with 100 million rows where 10,000 new rows arrive every hour. With full_refresh, you reprocess all 100 million rows every hour. With incremental, you process only the 10,000 new rows.
| Metric | full_refresh | incremental |
|---|---|---|
| Rows processed per run | 100,000,000 | ~10,000 |
| Run time | 15 minutes | 5 seconds |
| DuckDB memory | 8 GB | 50 MB |
| Iceberg files written | Full rewrite | Append + merge |
How Incremental Processing Works
Incremental processing in RAT uses three mechanisms working together:
- Watermark column — A timestamp or monotonically increasing column that tracks “recency”
- Watermark value — The
MAX()of the watermark column from the existing target table - Unique key — The column(s) used to match incoming rows with existing rows for upsert
Setting Up an Incremental Pipeline
Choose your watermark column
The watermark column should be a timestamp or numeric column that increases with new or updated data. Good choices:
updated_at— A timestamp updated whenever the row changescreated_at— If rows are immutable (never updated, only inserted)event_time— For event streams- An auto-incrementing
id— If no timestamp is available
The watermark column must be monotonically increasing for new data. If you backfill old data with timestamps earlier than the current watermark, those rows will be skipped on incremental runs.
Choose your unique key
The unique key identifies each row for upsert matching. It can be a single column or composite:
order_id— Single-column keyorder_id, product_id— Composite key for order line itemsuser_id, event_date— Composite key for daily user snapshots
Add the annotations
-- @merge_strategy: incremental
-- @unique_key: order_id
-- @watermark_column: updated_atWrite the SQL with watermark filter
-- @merge_strategy: incremental
-- @unique_key: order_id
-- @watermark_column: updated_at
SELECT
order_id,
customer_id,
status,
total_amount,
updated_at
FROM {{ ref('bronze.raw_orders') }}
{% if is_incremental() %}
WHERE updated_at > '{{ watermark_value }}'
{% endif %}First Run vs Subsequent Runs
RAT behaves differently depending on whether the target table already exists:
First Run (Table Does Not Exist)
is_incremental()returnsfalse- The
{% if is_incremental() %}block is skipped - Your query runs without the watermark filter — processing all source rows
- RAT creates the target Iceberg table and writes all results
- This is effectively a
full_refreshfor the first run
Subsequent Runs (Table Exists)
is_incremental()returnstrue- RAT reads
MAX(watermark_column)from the existing target table watermark_valueis set to that value- The
{% if is_incremental() %}block is included - Your query processes only rows newer than the watermark
- Results are deduplicated by
unique_keyand merged into the existing table
How watermark_value Works
The watermark_value template variable contains the result of:
SELECT MAX(watermark_column) FROM target_tableThis query runs against the existing Iceberg table before pipeline execution begins. The value is then injected into your Jinja template as a string.
Timestamp Watermarks
For timestamp columns, watermark_value is an ISO 8601 string:
-- watermark_value = '2024-03-15 14:30:00'
WHERE updated_at > '{{ watermark_value }}'Numeric Watermarks
For numeric columns (like auto-incrementing IDs), the value is a number:
-- watermark_value = '5847293'
WHERE id > {{ watermark_value }}Note the subtle difference: timestamp watermarks need single quotes around the value (> '{{ watermark_value }}') because they’re strings in SQL. Numeric watermarks do not.
NULL Watermarks
If the target table exists but is empty (all rows were deleted), watermark_value will be NULL. Your SQL should handle this gracefully. The simplest approach is to use COALESCE:
{% if is_incremental() %}
WHERE updated_at > COALESCE('{{ watermark_value }}', '1970-01-01')
{% endif %}Unique Keys and Dedup Behavior
When the pipeline output contains multiple rows with the same unique_key, RAT deduplicates them using ROW_NUMBER():
-- RAT internally does this:
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER (
PARTITION BY order_id
ORDER BY order_id -- last row wins
) AS _rn
FROM pipeline_output
) WHERE _rn = 1The last row in the result set for each key is kept. All other duplicates are discarded before the merge.
Composite Unique Keys
For composite keys, all columns must match for two rows to be considered the same:
-- @unique_key: order_id, product_id
-- These are DIFFERENT rows (different product_id):
-- order_id=1, product_id=100
-- order_id=1, product_id=200
-- These are the SAME row (dedup applies):
-- order_id=1, product_id=100, qty=5
-- order_id=1, product_id=100, qty=3 <-- this one wins (last)The Merge Operation
After dedup, RAT performs an Iceberg merge (upsert):
- Matching keys (row exists in target): The existing row is updated with all columns from the new row
- Non-matching keys (row does not exist in target): The new row is inserted
- Keys only in target (not in pipeline output): Left untouched
This means incremental is truly an upsert — it never deletes rows that are not in the current pipeline output.
The {% if is_incremental() %} Pattern
This Jinja conditional is the core pattern for incremental pipelines. Let’s break down why it works:
-- @merge_strategy: incremental
-- @unique_key: user_id
-- @watermark_column: last_active_at
SELECT
user_id,
email,
plan,
last_active_at
FROM {{ ref('bronze.raw_users') }}
{% if is_incremental() %}
-- This block is ONLY included when the target table already exists
WHERE last_active_at > '{{ watermark_value }}'
{% endif %}First run — The SQL executed is:
SELECT user_id, email, plan, last_active_at
FROM iceberg_scan('...')
-- No WHERE clause — all rows processedSecond run — The SQL executed is:
SELECT user_id, email, plan, last_active_at
FROM iceberg_scan('...')
WHERE last_active_at > '2024-03-15 14:30:00'
-- Only new/updated rows processeddelete_insert vs incremental
Both strategies use a unique_key, but they behave differently. Choosing the wrong one is a common mistake.
| Aspect | incremental | delete_insert |
|---|---|---|
| Dedup incoming rows | Yes (ROW_NUMBER, last wins) | No |
| Multiple rows per key | Keeps 1 per unique key | Keeps all rows per key value |
| Operation | MERGE (upsert) | DELETE matching + INSERT all |
| Missing keys in output | Left untouched | Left untouched |
| Best for | Entity tables (1 row per key) | Batch tables (many rows per key) |
When to Use incremental
Use incremental when each unique key should have exactly one row in the target table. Classic entity/dimension pattern:
-- @merge_strategy: incremental
-- @unique_key: customer_id
-- @watermark_column: updated_at
-- One row per customer, latest state wins
SELECT customer_id, name, email, plan, updated_at
FROM {{ ref('bronze.raw_customers') }}
{% if is_incremental() %}
WHERE updated_at > '{{ watermark_value }}'
{% endif %}When to Use delete_insert
Use delete_insert when a key maps to multiple rows and you want to replace all rows for that key as a batch:
-- @merge_strategy: delete_insert
-- @unique_key: report_date
-- Many rows per date — replace the entire day's data
SELECT
report_date,
product_id,
units_sold,
revenue
FROM {{ ref('bronze.raw_daily_sales') }}
WHERE report_date = CURRENT_DATEIf you used incremental here, each (report_date, product_id) would need to be a composite unique key. With delete_insert, you simply say “delete everything for this date and insert the new batch.”
Full Example: Incremental Order Updates
Here is a complete incremental pipeline with all the pieces:
-- @merge_strategy: incremental
-- @unique_key: order_id
-- @watermark_column: updated_at
-- @description: Clean orders with incremental updates via watermark
SELECT
CAST(order_id AS INTEGER) AS order_id,
CAST(customer_id AS INTEGER) AS customer_id,
TRIM(UPPER(status)) AS status,
CAST(total_amount AS DECIMAL(12, 2)) AS total_amount,
CAST(currency AS VARCHAR(3)) AS currency,
TRY_CAST(order_date AS DATE) AS order_date,
TRY_CAST(shipped_date AS DATE) AS shipped_date,
CAST(updated_at AS TIMESTAMP) AS updated_at,
'{{ run_started_at }}' AS _loaded_at
FROM {{ ref('bronze.raw_orders') }}
WHERE order_id IS NOT NULL
AND total_amount IS NOT NULL
{% if is_incremental() %}
AND CAST(updated_at AS TIMESTAMP) > '{{ watermark_value }}'
{% endif %}What happens on each run:
Run 1 (table does not exist):
is_incremental()=false-> no WHERE filter on watermark- All 1,000,000 rows from
bronze.raw_ordersare processed - Dedup by
order_id(ROW_NUMBER) removes duplicates - Target table
silver.ordersis created with ~950,000 unique rows - Watermark after run:
2024-03-15 14:30:00
Run 2 (15 minutes later, 500 new/updated rows in source):
is_incremental()=truewatermark_value=2024-03-15 14:30:00- WHERE filter:
updated_at > '2024-03-15 14:30:00' - 500 rows processed (instead of 1,000,500)
- Dedup by
order_id-> ~480 unique rows - Merge: 200 existing orders updated, 280 new orders inserted
- Watermark after run:
2024-03-15 14:45:00
Advanced Patterns
Multiple Watermark Conditions
Sometimes you need to filter on more than just the watermark:
-- @merge_strategy: incremental
-- @unique_key: event_id
-- @watermark_column: event_time
SELECT *
FROM {{ ref('bronze.raw_events') }}
{% if is_incremental() %}
WHERE event_time > '{{ watermark_value }}'
AND event_type != 'heartbeat' -- additional filter
{% else %}
WHERE event_type != 'heartbeat' -- same filter on first run too
{% endif %}Late-Arriving Data
If your source data can arrive late (e.g., events with timestamps hours in the past), add a buffer to the watermark:
{% if is_incremental() %}
-- Look back 2 hours from watermark to catch late arrivals
WHERE event_time > TIMESTAMP '{{ watermark_value }}' - INTERVAL 2 HOUR
{% endif %}Late-arriving data with timestamps before the buffer window will be missed. If you need to catch all late data, consider using delete_insert with a date-based key instead.
Incremental with Cross-Namespace References
-- @merge_strategy: incremental
-- @unique_key: order_id
-- @watermark_column: updated_at
SELECT
o.order_id,
o.total_amount,
o.updated_at,
r.exchange_rate
FROM {{ ref('silver.orders') }} o
JOIN {{ ref('finance.silver.exchange_rates') }} r
ON o.currency = r.currency
AND o.order_date = r.rate_date
{% if is_incremental() %}
WHERE o.updated_at > '{{ watermark_value }}'
{% endif %}Troubleshooting
Pipeline reprocesses all data every run
Symptom: Run logs show millions of rows processed even though only a few changed.
Cause: Missing {% if is_incremental() %} block, or the watermark filter is not in the query.
Fix: Add the Jinja conditional with a watermark filter.
Duplicate rows in target table
Symptom: Same unique_key appears multiple times in the target table.
Cause: The unique_key annotation does not match the actual unique key of the data, or the key is not truly unique in the source.
Fix: Verify your unique key and add columns to make it truly unique (e.g., order_id, line_item_id).
Watermark not advancing
Symptom: watermark_value stays the same across runs.
Cause: No new rows have a watermark_column value greater than the current watermark, or the pipeline is not writing the watermark column to the target.
Fix: Ensure the watermark_column is included in your SELECT and that new data has increasing values.
Late-arriving data is missed
Symptom: Rows with timestamps before the watermark are never picked up.
Cause: By design, the watermark filter excludes rows older than MAX(watermark_column).
Fix: Add a lookback buffer (see “Late-Arriving Data” above) or use delete_insert with a date-based key for full daily reloads.