ReferenceSQL Templating

SQL Templating

RAT pipelines use Jinja2 templating to inject dynamic values, resolve table references, and enable conditional logic in SQL. This page is the complete reference for every template function, variable, and behavior.


Template Environment

RAT uses a Jinja2 SandboxedEnvironment with StrictUndefined. This means:

  • Sandboxed: Arbitrary Python code cannot be executed from templates. Only whitelisted functions and filters are available.
  • StrictUndefined: Referencing an undefined variable raises an error instead of silently producing an empty string. This catches typos early.
# Internal implementation (for reference)
from jinja2.sandbox import SandboxedEnvironment
from jinja2 import StrictUndefined
 
env = SandboxedEnvironment(undefined=StrictUndefined)

Standard Jinja2 filters like |default, |upper, |lower, |replace, and |trim are available. Custom Python functions or imports are not permitted inside templates.


Template Functions

ref()

Resolves a reference to another Iceberg table. This is the most important template function — it creates lineage dependencies and resolves table paths.

Signatures:

ref('layer.name')            → 2-part (same namespace)
ref('namespace.layer.name')  → 3-part (cross-namespace)

Parameters:

ParameterTypeDescription
referencestringDot-separated table reference. Either layer.name or namespace.layer.name

Returns: A DuckDB iceberg_scan() expression pointing to the table’s Iceberg metadata.

Examples:

pipeline.sql
-- 2-part: references bronze.raw_orders in the current namespace
SELECT * FROM {{ ref('bronze.raw_orders') }}
 
-- 3-part: references a table in a different namespace
SELECT * FROM {{ ref('shared.silver.customers') }}

Resolved output:

-- 2-part resolves to (example):
SELECT * FROM iceberg_scan('s3://rat/ecommerce/bronze/raw_orders/metadata/v1.metadata.json')
 
-- 3-part resolves to:
SELECT * FROM iceberg_scan('s3://rat/shared/silver/customers/metadata/v1.metadata.json')

Resolution Logic

Parse the reference

The reference string is split by . (dot). If there are 2 parts, the current pipeline’s namespace is used. If there are 3 parts, the first part is the target namespace.

Locate the metadata file

RAT looks for the Iceberg metadata file at the standard location:

s3://{bucket}/{namespace}/{layer}/{name}/metadata/

It reads the latest v*.metadata.json file (the Nessie catalog determines which version is current on the active branch).

Return the iceberg_scan expression

The resolved path is wrapped in iceberg_scan() which DuckDB uses to read Iceberg tables natively.

Register the lineage edge

The dependency from the current pipeline to the referenced table is recorded in RAT’s lineage graph. This powers the DAG visualization in the portal and dependency-aware scheduling.

⚠️

If the referenced table does not exist (has never been materialized), ref() raises a RefResolutionError during template compilation. The run fails before any SQL is executed. This is intentional — it prevents pipelines from running against missing upstream data.

2-Part vs 3-Part References

SyntaxExampleNamespaceUse Case
2-partref('bronze.raw_orders')Current pipeline’s namespaceReferencing tables in the same namespace (most common)
3-partref('shared.silver.customers')Explicitly specifiedCross-namespace references, shared datasets

landing_zone()

Resolves to the S3 path of a landing zone directory. Landing zones are where external data files (CSV, JSON, Parquet) are uploaded before being ingested by Bronze pipelines.

Signature:

landing_zone('zone_name')

Parameters:

ParameterTypeDescription
zone_namestringName of the landing zone directory

Returns: An S3 glob path pointing to all files in the landing zone.

Example:

pipeline.sql
SELECT *
FROM read_csv_auto(
    '{{ landing_zone("orders") }}',
    header = true,
    all_varchar = true
)

Resolved output:

SELECT *
FROM read_csv_auto(
    's3://rat/ecommerce/landing/orders/**',
    header = true,
    all_varchar = true
)

Resolution Logic

The landing zone path follows this pattern:

s3://{bucket}/{namespace}/landing/{zone_name}/**

The ** glob matches all files recursively within the directory.

Preview Mode (_samples)

When a pipeline is previewed (dry-run) from the portal rather than fully executed, landing_zone() resolves to a _samples subdirectory instead:

# Normal execution
s3://rat/ecommerce/landing/orders/**

# Preview mode
s3://rat/ecommerce/landing/orders/_samples/**

Place a small representative sample of your landing zone data in the _samples/ subdirectory. This enables fast previews in the portal editor without scanning the full landing zone, which could contain gigabytes of data.


this

A variable (not a function) that resolves to the current pipeline’s own Iceberg table. Used for self-referencing patterns like incremental reads.

Type: string

Example:

pipeline.sql
-- @merge_strategy: incremental
-- @unique_key: user_id
 
SELECT * FROM {{ ref('bronze.raw_users') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}

Resolved output:

SELECT * FROM iceberg_scan('s3://rat/ecommerce/bronze/raw_users/metadata/...')
WHERE updated_at > (SELECT MAX(updated_at) FROM iceberg_scan('s3://rat/ecommerce/silver/dim_users/metadata/...'))
⚠️

Using {{ this }} only makes sense in incremental or append-only pipelines where the target table already exists. On the first run, is_incremental() returns false, so the {{ this }} branch is skipped.


run_started_at

A variable that contains the ISO 8601 UTC timestamp of when the current run started.

Type: string (ISO 8601 format: YYYY-MM-DDTHH:MM:SSZ)

Example:

pipeline.sql
SELECT
    *,
    '{{ run_started_at }}' AS _loaded_at
FROM {{ ref('bronze.raw_events') }}

Resolved output:

SELECT
    *,
    '2026-02-16T09:30:00Z' AS _loaded_at
FROM iceberg_scan('...')

This is commonly used in Bronze pipelines to stamp each row with when it was ingested.


watermark_value

A variable that contains the maximum value of the watermark_column from the existing target table. Used in incremental pipelines to process only new data.

Type: string (the actual value from the table, typically a timestamp or integer)

Example:

pipeline.sql
-- @merge_strategy: incremental
-- @unique_key: order_id
-- @watermark_column: updated_at
 
SELECT * FROM {{ ref('bronze.raw_orders') }}
{% if is_incremental() %}
WHERE updated_at > '{{ watermark_value }}'
{% endif %}

The watermark_value is computed by running SELECT MAX(watermark_column) FROM target_table before the pipeline SQL is executed. On the first run (when the table does not exist), this variable is not available — but is_incremental() returns false, so the block is skipped.


is_incremental()

Returns true if the pipeline’s merge strategy is incremental and the target table already exists (has been materialized at least once).

Returns: boolean

Example:

pipeline.sql
-- @merge_strategy: incremental
-- @unique_key: id
-- @watermark_column: updated_at
 
SELECT * FROM {{ ref('bronze.raw_data') }}
{% if is_incremental() %}
WHERE updated_at > '{{ watermark_value }}'
{% endif %}
RunTable ExistsStrategyis_incremental()Behavior
FirstNoincrementalfalseFull load (no WHERE clause)
SubsequentYesincrementaltrueIncremental (watermark filter applied)
AnyYes/Nofull_refreshfalseAlways full load

is_scd2()

Returns true if the pipeline’s merge strategy is scd2 (Slowly Changing Dimension Type 2).

Returns: boolean

Example:

pipeline.sql
-- @merge_strategy: scd2
-- @unique_key: customer_id
-- @scd_valid_from: valid_from
-- @scd_valid_to: valid_to
 
SELECT
    customer_id,
    name,
    email,
    tier,
    {% if is_scd2() %}
    CURRENT_TIMESTAMP AS valid_from,
    NULL AS valid_to,
    {% endif %}
    updated_at
FROM {{ ref('bronze.raw_customers') }}

is_snapshot()

Returns true if the pipeline’s merge strategy is snapshot.

Returns: boolean

Example:

pipeline.sql
-- @merge_strategy: snapshot
-- @partition_column: snapshot_date
 
SELECT
    *,
    {% if is_snapshot() %}
    CURRENT_DATE AS snapshot_date,
    {% endif %}
FROM {{ ref('silver.inventory') }}

is_append_only()

Returns true if the pipeline’s merge strategy is append_only.

Returns: boolean


is_delete_insert()

Returns true if the pipeline’s merge strategy is delete_insert.

Returns: boolean


Template Function Summary

Function / VariableTypeDescription
ref('layer.name')functionReference a table in the current namespace
ref('ns.layer.name')functionReference a table in another namespace
landing_zone('name')functionResolve a landing zone S3 path
thisvariableCurrent pipeline’s own Iceberg table
run_started_atvariableISO 8601 UTC timestamp of run start
watermark_valuevariableMAX of watermark_column from existing table
is_incremental()functiontrue if incremental strategy and table exists
is_scd2()functiontrue if scd2 strategy
is_snapshot()functiontrue if snapshot strategy
is_append_only()functiontrue if append_only strategy
is_delete_insert()functiontrue if delete_insert strategy

Metadata Extraction

RAT extracts metadata annotations from SQL comments at the top of pipeline files. This metadata configures pipeline behavior without needing a separate config.yaml file.

Parsing Rules

  1. Comment prefix: Annotations use -- @key: value in SQL files and # @key: value in Python files.
  2. Top of file only: The parser reads from the first line downward. The first non-comment, non-blank line stops metadata extraction.
  3. One annotation per line: Each @key: value must be on its own line.
  4. Whitespace: Leading and trailing whitespace around values is trimmed.
  5. Case-insensitive keys: @Merge_Strategy and @merge_strategy are equivalent.

Example — SQL pipeline:

pipeline.sql
-- @description: Clean and deduplicate customer records
-- @merge_strategy: incremental
-- @unique_key: customer_id
-- @watermark_column: updated_at
 
SELECT * FROM {{ ref('bronze.raw_customers') }}
{% if is_incremental() %}
WHERE updated_at > '{{ watermark_value }}'
{% endif %}

Example — Python pipeline:

pipeline.py
# @description: Fetch data from external REST API
# @merge_strategy: full_refresh
# @max_retries: 3
# @retry_delay_seconds: 30
 
import requests
 
def run(context):
    response = requests.get("https://api.example.com/data")
    return response.json()

Extraction Stops At

The parser stops extracting annotations when it encounters:

  • A line that is not a comment (SQL: not starting with --, Python: not starting with #)
  • A line that is a comment but does not contain @key: value
  • An empty line after the annotation block
pipeline.sql
-- @merge_strategy: incremental    ← extracted
-- @unique_key: id                 ← extracted
-- This is a regular comment       ← NOT extracted (no @key pattern), extraction stops
 
SELECT * FROM {{ ref('bronze.data') }}
-- @description: ignored           ← NOT extracted (below SQL, too late)

Template Validation

RAT validates templates during the compilation phase (before SQL execution). There are two categories of template issues:

Errors (Block Execution)

These issues cause the run to fail immediately:

ErrorCause
UndefinedErrorReferencing an undefined variable (e.g., typo in {{ watermak_value }})
RefResolutionErrorref() points to a table that does not exist
TemplateSyntaxErrorInvalid Jinja syntax (unclosed {% %}, mismatched tags)
SecurityErrorAttempting to use blocked Python constructs in the sandbox

Warnings (Non-Blocking)

These issues are logged but do not prevent execution:

WarningCause
Unused watermark@watermark_column set but no {% if is_incremental() %} block found
Missing watermark filteris_incremental() used but no watermark_value reference in the WHERE clause
Unused annotationAn annotation key is not recognized

Jinja Control Flow Reference

Standard Jinja2 control flow is supported:

Conditionals

pipeline.sql
{% if is_incremental() %}
WHERE updated_at > '{{ watermark_value }}'
{% endif %}
 
{% if is_scd2() %}
-- SCD2 specific columns
{% elif is_snapshot() %}
-- Snapshot specific columns
{% else %}
-- Default columns
{% endif %}

Comments

pipeline.sql
{# This is a Jinja comment — it will not appear in the compiled SQL #}
SELECT * FROM {{ ref('bronze.data') }}

Whitespace Control

Use the - modifier to strip whitespace around tags:

pipeline.sql
SELECT
    id,
    name
    {%- if is_scd2() -%}
    , valid_from
    , valid_to
    {%- endif %}
FROM {{ ref('bronze.data') }}

Complete Example

Here is a fully-annotated incremental pipeline using most template features:

ecommerce/pipelines/silver/enriched_orders/pipeline.sql
-- @description: Orders enriched with customer data, incrementally processed
-- @merge_strategy: incremental
-- @unique_key: order_id
-- @watermark_column: updated_at
-- @materialized: table
 
{# Join orders with customers and only process new records #}
SELECT
    o.order_id,
    o.customer_id,
    o.status,
    o.total_amount,
    o.currency,
    o.order_date,
    o.updated_at,
    c.name AS customer_name,
    c.segment AS customer_segment,
    c.country AS customer_country,
    '{{ run_started_at }}' AS _processed_at
FROM {{ ref('bronze.raw_orders') }} o
LEFT JOIN {{ ref('silver.customers') }} c
    ON o.customer_id = c.customer_id
WHERE o.order_id IS NOT NULL
{% if is_incremental() %}
  AND o.updated_at > '{{ watermark_value }}'
{% endif %}