SQL Templating
RAT pipelines use Jinja2 templating to inject dynamic values, resolve table references, and enable conditional logic in SQL. This page is the complete reference for every template function, variable, and behavior.
Template Environment
RAT uses a Jinja2 SandboxedEnvironment with StrictUndefined. This means:
- Sandboxed: Arbitrary Python code cannot be executed from templates. Only whitelisted functions and filters are available.
- StrictUndefined: Referencing an undefined variable raises an error instead of silently producing an empty string. This catches typos early.
# Internal implementation (for reference)
from jinja2.sandbox import SandboxedEnvironment
from jinja2 import StrictUndefined
env = SandboxedEnvironment(undefined=StrictUndefined)Standard Jinja2 filters like |default, |upper, |lower, |replace, and |trim are available. Custom Python functions or imports are not permitted inside templates.
Template Functions
ref()
Resolves a reference to another Iceberg table. This is the most important template function — it creates lineage dependencies and resolves table paths.
Signatures:
ref('layer.name') → 2-part (same namespace)
ref('namespace.layer.name') → 3-part (cross-namespace)Parameters:
| Parameter | Type | Description |
|---|---|---|
reference | string | Dot-separated table reference. Either layer.name or namespace.layer.name |
Returns: A DuckDB iceberg_scan() expression pointing to the table’s Iceberg metadata.
Examples:
-- 2-part: references bronze.raw_orders in the current namespace
SELECT * FROM {{ ref('bronze.raw_orders') }}
-- 3-part: references a table in a different namespace
SELECT * FROM {{ ref('shared.silver.customers') }}Resolved output:
-- 2-part resolves to (example):
SELECT * FROM iceberg_scan('s3://rat/ecommerce/bronze/raw_orders/metadata/v1.metadata.json')
-- 3-part resolves to:
SELECT * FROM iceberg_scan('s3://rat/shared/silver/customers/metadata/v1.metadata.json')Resolution Logic
Parse the reference
The reference string is split by . (dot). If there are 2 parts, the current pipeline’s namespace is used. If there are 3 parts, the first part is the target namespace.
Locate the metadata file
RAT looks for the Iceberg metadata file at the standard location:
s3://{bucket}/{namespace}/{layer}/{name}/metadata/It reads the latest v*.metadata.json file (the Nessie catalog determines which version is current on the active branch).
Return the iceberg_scan expression
The resolved path is wrapped in iceberg_scan() which DuckDB uses to read Iceberg tables natively.
Register the lineage edge
The dependency from the current pipeline to the referenced table is recorded in RAT’s lineage graph. This powers the DAG visualization in the portal and dependency-aware scheduling.
If the referenced table does not exist (has never been materialized), ref() raises a RefResolutionError during template compilation. The run fails before any SQL is executed. This is intentional — it prevents pipelines from running against missing upstream data.
2-Part vs 3-Part References
| Syntax | Example | Namespace | Use Case |
|---|---|---|---|
| 2-part | ref('bronze.raw_orders') | Current pipeline’s namespace | Referencing tables in the same namespace (most common) |
| 3-part | ref('shared.silver.customers') | Explicitly specified | Cross-namespace references, shared datasets |
landing_zone()
Resolves to the S3 path of a landing zone directory. Landing zones are where external data files (CSV, JSON, Parquet) are uploaded before being ingested by Bronze pipelines.
Signature:
landing_zone('zone_name')Parameters:
| Parameter | Type | Description |
|---|---|---|
zone_name | string | Name of the landing zone directory |
Returns: An S3 glob path pointing to all files in the landing zone.
Example:
SELECT *
FROM read_csv_auto(
'{{ landing_zone("orders") }}',
header = true,
all_varchar = true
)Resolved output:
SELECT *
FROM read_csv_auto(
's3://rat/ecommerce/landing/orders/**',
header = true,
all_varchar = true
)Resolution Logic
The landing zone path follows this pattern:
s3://{bucket}/{namespace}/landing/{zone_name}/**The ** glob matches all files recursively within the directory.
Preview Mode (_samples)
When a pipeline is previewed (dry-run) from the portal rather than fully executed, landing_zone() resolves to a _samples subdirectory instead:
# Normal execution
s3://rat/ecommerce/landing/orders/**
# Preview mode
s3://rat/ecommerce/landing/orders/_samples/**Place a small representative sample of your landing zone data in the _samples/ subdirectory. This enables fast previews in the portal editor without scanning the full landing zone, which could contain gigabytes of data.
this
A variable (not a function) that resolves to the current pipeline’s own Iceberg table. Used for self-referencing patterns like incremental reads.
Type: string
Example:
-- @merge_strategy: incremental
-- @unique_key: user_id
SELECT * FROM {{ ref('bronze.raw_users') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}Resolved output:
SELECT * FROM iceberg_scan('s3://rat/ecommerce/bronze/raw_users/metadata/...')
WHERE updated_at > (SELECT MAX(updated_at) FROM iceberg_scan('s3://rat/ecommerce/silver/dim_users/metadata/...'))Using {{ this }} only makes sense in incremental or append-only pipelines where the target table already exists. On the first run, is_incremental() returns false, so the {{ this }} branch is skipped.
run_started_at
A variable that contains the ISO 8601 UTC timestamp of when the current run started.
Type: string (ISO 8601 format: YYYY-MM-DDTHH:MM:SSZ)
Example:
SELECT
*,
'{{ run_started_at }}' AS _loaded_at
FROM {{ ref('bronze.raw_events') }}Resolved output:
SELECT
*,
'2026-02-16T09:30:00Z' AS _loaded_at
FROM iceberg_scan('...')This is commonly used in Bronze pipelines to stamp each row with when it was ingested.
watermark_value
A variable that contains the maximum value of the watermark_column from the existing target table. Used in incremental pipelines to process only new data.
Type: string (the actual value from the table, typically a timestamp or integer)
Example:
-- @merge_strategy: incremental
-- @unique_key: order_id
-- @watermark_column: updated_at
SELECT * FROM {{ ref('bronze.raw_orders') }}
{% if is_incremental() %}
WHERE updated_at > '{{ watermark_value }}'
{% endif %}The watermark_value is computed by running SELECT MAX(watermark_column) FROM target_table before the pipeline SQL is executed. On the first run (when the table does not exist), this variable is not available — but is_incremental() returns false, so the block is skipped.
is_incremental()
Returns true if the pipeline’s merge strategy is incremental and the target table already exists (has been materialized at least once).
Returns: boolean
Example:
-- @merge_strategy: incremental
-- @unique_key: id
-- @watermark_column: updated_at
SELECT * FROM {{ ref('bronze.raw_data') }}
{% if is_incremental() %}
WHERE updated_at > '{{ watermark_value }}'
{% endif %}| Run | Table Exists | Strategy | is_incremental() | Behavior |
|---|---|---|---|---|
| First | No | incremental | false | Full load (no WHERE clause) |
| Subsequent | Yes | incremental | true | Incremental (watermark filter applied) |
| Any | Yes/No | full_refresh | false | Always full load |
is_scd2()
Returns true if the pipeline’s merge strategy is scd2 (Slowly Changing Dimension Type 2).
Returns: boolean
Example:
-- @merge_strategy: scd2
-- @unique_key: customer_id
-- @scd_valid_from: valid_from
-- @scd_valid_to: valid_to
SELECT
customer_id,
name,
email,
tier,
{% if is_scd2() %}
CURRENT_TIMESTAMP AS valid_from,
NULL AS valid_to,
{% endif %}
updated_at
FROM {{ ref('bronze.raw_customers') }}is_snapshot()
Returns true if the pipeline’s merge strategy is snapshot.
Returns: boolean
Example:
-- @merge_strategy: snapshot
-- @partition_column: snapshot_date
SELECT
*,
{% if is_snapshot() %}
CURRENT_DATE AS snapshot_date,
{% endif %}
FROM {{ ref('silver.inventory') }}is_append_only()
Returns true if the pipeline’s merge strategy is append_only.
Returns: boolean
is_delete_insert()
Returns true if the pipeline’s merge strategy is delete_insert.
Returns: boolean
Template Function Summary
| Function / Variable | Type | Description |
|---|---|---|
ref('layer.name') | function | Reference a table in the current namespace |
ref('ns.layer.name') | function | Reference a table in another namespace |
landing_zone('name') | function | Resolve a landing zone S3 path |
this | variable | Current pipeline’s own Iceberg table |
run_started_at | variable | ISO 8601 UTC timestamp of run start |
watermark_value | variable | MAX of watermark_column from existing table |
is_incremental() | function | true if incremental strategy and table exists |
is_scd2() | function | true if scd2 strategy |
is_snapshot() | function | true if snapshot strategy |
is_append_only() | function | true if append_only strategy |
is_delete_insert() | function | true if delete_insert strategy |
Metadata Extraction
RAT extracts metadata annotations from SQL comments at the top of pipeline files. This metadata configures pipeline behavior without needing a separate config.yaml file.
Parsing Rules
- Comment prefix: Annotations use
-- @key: valuein SQL files and# @key: valuein Python files. - Top of file only: The parser reads from the first line downward. The first non-comment, non-blank line stops metadata extraction.
- One annotation per line: Each
@key: valuemust be on its own line. - Whitespace: Leading and trailing whitespace around values is trimmed.
- Case-insensitive keys:
@Merge_Strategyand@merge_strategyare equivalent.
Example — SQL pipeline:
-- @description: Clean and deduplicate customer records
-- @merge_strategy: incremental
-- @unique_key: customer_id
-- @watermark_column: updated_at
SELECT * FROM {{ ref('bronze.raw_customers') }}
{% if is_incremental() %}
WHERE updated_at > '{{ watermark_value }}'
{% endif %}Example — Python pipeline:
# @description: Fetch data from external REST API
# @merge_strategy: full_refresh
# @max_retries: 3
# @retry_delay_seconds: 30
import requests
def run(context):
response = requests.get("https://api.example.com/data")
return response.json()Extraction Stops At
The parser stops extracting annotations when it encounters:
- A line that is not a comment (SQL: not starting with
--, Python: not starting with#) - A line that is a comment but does not contain
@key: value - An empty line after the annotation block
-- @merge_strategy: incremental ← extracted
-- @unique_key: id ← extracted
-- This is a regular comment ← NOT extracted (no @key pattern), extraction stops
SELECT * FROM {{ ref('bronze.data') }}
-- @description: ignored ← NOT extracted (below SQL, too late)Template Validation
RAT validates templates during the compilation phase (before SQL execution). There are two categories of template issues:
Errors (Block Execution)
These issues cause the run to fail immediately:
| Error | Cause |
|---|---|
UndefinedError | Referencing an undefined variable (e.g., typo in {{ watermak_value }}) |
RefResolutionError | ref() points to a table that does not exist |
TemplateSyntaxError | Invalid Jinja syntax (unclosed {% %}, mismatched tags) |
SecurityError | Attempting to use blocked Python constructs in the sandbox |
Warnings (Non-Blocking)
These issues are logged but do not prevent execution:
| Warning | Cause |
|---|---|
| Unused watermark | @watermark_column set but no {% if is_incremental() %} block found |
| Missing watermark filter | is_incremental() used but no watermark_value reference in the WHERE clause |
| Unused annotation | An annotation key is not recognized |
Jinja Control Flow Reference
Standard Jinja2 control flow is supported:
Conditionals
{% if is_incremental() %}
WHERE updated_at > '{{ watermark_value }}'
{% endif %}
{% if is_scd2() %}
-- SCD2 specific columns
{% elif is_snapshot() %}
-- Snapshot specific columns
{% else %}
-- Default columns
{% endif %}Comments
{# This is a Jinja comment — it will not appear in the compiled SQL #}
SELECT * FROM {{ ref('bronze.data') }}Whitespace Control
Use the - modifier to strip whitespace around tags:
SELECT
id,
name
{%- if is_scd2() -%}
, valid_from
, valid_to
{%- endif %}
FROM {{ ref('bronze.data') }}Complete Example
Here is a fully-annotated incremental pipeline using most template features:
-- @description: Orders enriched with customer data, incrementally processed
-- @merge_strategy: incremental
-- @unique_key: order_id
-- @watermark_column: updated_at
-- @materialized: table
{# Join orders with customers and only process new records #}
SELECT
o.order_id,
o.customer_id,
o.status,
o.total_amount,
o.currency,
o.order_date,
o.updated_at,
c.name AS customer_name,
c.segment AS customer_segment,
c.country AS customer_country,
'{{ run_started_at }}' AS _processed_at
FROM {{ ref('bronze.raw_orders') }} o
LEFT JOIN {{ ref('silver.customers') }} c
ON o.customer_id = c.customer_id
WHERE o.order_id IS NOT NULL
{% if is_incremental() %}
AND o.updated_at > '{{ watermark_value }}'
{% endif %}