GuidesPython Pipelines

Python Pipelines

While SQL pipelines cover most use cases, sometimes you need the flexibility of Python. Python pipelines let you use PyArrow transformations, complex control flow, or DuckDB queries composed programmatically — all within a sandboxed execution environment.

When to Use Python vs SQL

Your transformation is expressible as a SELECT statement. SQL pipelines are simpler, more readable, and easier to maintain. The vast majority of pipelines should be SQL.

Use SQL when...

You need complex transformations that are awkward or impossible in SQL: pivoting with dynamic columns, custom parsing logic, row-level functions with state, or programmatic DuckDB query composition.

Use Python when...

Common reasons to choose Python:

  • Complex transformations — Multi-step logic that requires intermediate variables or loops
  • Dynamic SQL generation — Building queries programmatically based on schema introspection
  • Custom parsing — Handling non-standard file formats or nested structures
  • PyArrow operations — Column-level operations using Arrow’s compute functions
  • External data via DuckDB — Using DuckDB’s HTTP or other extensions for external reads
⚠️

Python pipelines run in a security sandbox. You cannot access the filesystem, make HTTP calls directly, spawn processes, or import most standard library modules. If you need network access, use DuckDB’s built-in HTTP extension via duckdb_conn instead.

File Structure

A Python pipeline uses pipeline.py instead of pipeline.sql:

{namespace}/pipelines/{layer}/{pipeline_name}/pipeline.py

For example:

ecommerce/pipelines/silver/parsed_events/pipeline.py

The pipeline directory structure is the same as SQL pipelines. You can still have config.yaml and quality tests alongside the Python file:

ecommerce/pipelines/silver/parsed_events/
├── pipeline.py
├── config.yaml
└── tests/
    └── quality/
        └── test_no_null_event_ids.sql

A pipeline directory should contain either pipeline.sql or pipeline.py — never both. If both exist, RAT will use the SQL file and ignore the Python file.

Available Globals

When your pipeline.py executes, RAT injects several globals into the execution scope. These are the tools you use to read data, query DuckDB, and produce results.

GlobalTypeDescription
duckdb_connSafeDuckDBConnectionPre-configured DuckDB connection with S3 and Iceberg extensions loaded. Use this for all queries.
papyarrow moduleThe PyArrow module, pre-imported for you. Use pa.Table, pa.array(), etc.
refCallable[[str], str]Same as SQL ref(). Returns an iceberg_scan(...) expression for use in SQL strings.
landing_zoneCallable[[str], str]Same as SQL landing_zone(). Returns the S3 glob path for the named zone.
thisstrCurrent pipeline’s own Iceberg table path (for self-referencing in incremental).
run_started_atstrISO 8601 UTC timestamp of the current run.
is_incrementalboolTrue if the strategy is incremental and the target table already exists.
configdictThe parsed config.yaml contents (or empty dict if none).
resultNoneThe output variable. You must set this to a pa.Table.
logLoggerLogger instance. Use log.info(), log.warning(), log.error() for run logs.

The result Contract

Every Python pipeline must set the result global to a PyArrow Table. This is the data that RAT writes to Iceberg.

pipeline.py
# Minimal valid pipeline
result = pa.table({
    "id": [1, 2, 3],
    "name": ["Alice", "Bob", "Charlie"],
})
🚫

If your pipeline finishes without setting result, or sets it to something other than a pa.Table, the run will fail with an error. The result variable is the only way to produce output.

Using DuckDB via duckdb_conn

The duckdb_conn object is a pre-configured DuckDB connection with S3 and Iceberg extensions already loaded. Use it to query referenced tables, read from landing zones, or run any DuckDB SQL.

Querying Tables

pipeline.py
# Query a referenced table
orders = duckdb_conn.execute(f"""
    SELECT order_id, customer_id, total_amount, updated_at
    FROM {ref('bronze.raw_orders')}
    WHERE total_amount > 0
""").fetch_arrow_table()
 
result = orders

Running Multiple Queries

pipeline.py
# Read multiple sources
orders = duckdb_conn.execute(f"""
    SELECT * FROM {ref('silver.clean_orders')}
""").fetch_arrow_table()
 
customers = duckdb_conn.execute(f"""
    SELECT customer_id, segment FROM {ref('silver.customers')}
""").fetch_arrow_table()
 
# Join in DuckDB using the Arrow tables
duckdb_conn.register("orders_tbl", orders)
duckdb_conn.register("customers_tbl", customers)
 
result = duckdb_conn.execute("""
    SELECT
        o.*,
        c.segment AS customer_segment
    FROM orders_tbl o
    LEFT JOIN customers_tbl c ON o.customer_id = c.customer_id
""").fetch_arrow_table()

Reading from Landing Zones

pipeline.py
# Read CSVs from a landing zone
raw_data = duckdb_conn.execute(f"""
    SELECT *
    FROM read_csv_auto('{landing_zone("events")}', header=true)
""").fetch_arrow_table()
 
log.info(f"Read {raw_data.num_rows} rows from landing zone")
result = raw_data

PyArrow Transformations

You can use PyArrow’s compute functions for column-level transformations that are easier to express in Python than SQL:

pipeline.py
import pyarrow.compute as pc
 
# Read source data
events = duckdb_conn.execute(f"""
    SELECT event_id, event_type, payload, created_at
    FROM {ref('bronze.raw_events')}
""").fetch_arrow_table()
 
log.info(f"Processing {events.num_rows} events")
 
# PyArrow transformations
result = events.append_column(
    "event_date",
    pc.cast(events.column("created_at"), pa.date32())
).append_column(
    "event_type_upper",
    pc.utf8_upper(events.column("event_type"))
).append_column(
    "_loaded_at",
    pa.array([run_started_at] * events.num_rows, type=pa.string())
)

You can import pyarrow.compute — it is part of the pyarrow package which is pre-imported as pa. Sub-modules of allowed packages are also allowed.

Incremental Python Pipelines

Python pipelines support the same incremental pattern as SQL. Use the is_incremental flag and this reference:

pipeline.py
# config.yaml should have:
# merge_strategy: incremental
# unique_key: event_id
# watermark_column: created_at
 
if is_incremental:
    # Get watermark from existing table
    watermark = duckdb_conn.execute(f"""
        SELECT MAX(created_at) AS wm FROM {this}
    """).fetchone()[0]
 
    log.info(f"Incremental run — watermark: {watermark}")
 
    result = duckdb_conn.execute(f"""
        SELECT event_id, event_type, payload, created_at
        FROM {ref('bronze.raw_events')}
        WHERE created_at > '{watermark}'
    """).fetch_arrow_table()
else:
    log.info("Full initial load")
    result = duckdb_conn.execute(f"""
        SELECT event_id, event_type, payload, created_at
        FROM {ref('bronze.raw_events')}
    """).fetch_arrow_table()
 
log.info(f"Produced {result.num_rows} rows")

Configuration with config.yaml

Since Python pipelines do not have annotation comments, all configuration goes into config.yaml:

config.yaml
merge_strategy: incremental
unique_key: event_id
watermark_column: created_at
description: Parsed and enriched event stream
archive_landing_zones: true

You can also read custom config values via the config global:

config.yaml
merge_strategy: full_refresh
description: Dynamic pivot table
 
# Custom config
pivot_column: category
value_columns:
  - revenue
  - quantity
pipeline.py
pivot_col = config.get("pivot_column", "category")
value_cols = config.get("value_columns", ["revenue"])
 
log.info(f"Pivoting on {pivot_col} with values {value_cols}")
 
# Use config to dynamically build query
source = duckdb_conn.execute(f"""
    SELECT * FROM {ref('silver.sales')}
""").fetch_arrow_table()
 
duckdb_conn.register("source_tbl", source)
 
# Build dynamic PIVOT query
values_sql = ", ".join(value_cols)
result = duckdb_conn.execute(f"""
    PIVOT source_tbl
    ON {pivot_col}
    USING SUM({values_sql})
""").fetch_arrow_table()

Security Sandbox

Python pipelines execute in a sandboxed environment to prevent accidental or malicious operations. RAT enforces several layers of protection:

Blocked Built-in Functions

The following built-in functions are removed from the execution scope:

BlockedReason
eval()Arbitrary code execution
exec()Arbitrary code execution
open()Filesystem access
__import__()Unrestricted module loading
compile()Code compilation
globals()Scope manipulation
locals()Scope manipulation
breakpoint()Debugger access
input()Interactive input (would hang)

Blocked Imports

These modules cannot be imported:

ModuleReason
osOperating system access
sysInterpreter manipulation
subprocessProcess spawning
shutilFilesystem operations
socketNetwork access
httpHTTP client/server
requestsHTTP client
urllibURL operations
pathlibFilesystem paths
importlibDynamic imports
ctypesC library access
multiprocessingProcess spawning
threadingThread spawning

AST Validation

RAT performs static analysis on your Python code before execution. The AST validator blocks:

  • Double-underscore (__dunder__) attribute access (e.g., obj.__class__.__bases__)
  • Any attempt to circumvent the sandbox through Python’s object model

Why not just use a container? The sandbox is a first layer of defense inside the runner process. Container-level isolation is handled by Docker. The sandbox prevents pipelines from interfering with the runner’s own state or reading other pipelines’ data directly.

What IS Allowed

  • pyarrow — Full PyArrow module including pyarrow.compute, pyarrow.csv, pyarrow.parquet
  • duckdb queries — Via the provided duckdb_conn (which has S3 and Iceberg extensions)
  • json — For parsing JSON strings
  • re — Regular expressions
  • datetime — Date and time operations
  • math — Math functions
  • hashlib — Hashing
  • base64 — Encoding/decoding
  • Standard data typescollections, itertools, functools, dataclasses

Full Example: CSV Ingest with Transformation

Here is a complete Python pipeline that reads CSV files from a landing zone, parses a nested JSON column, and produces a clean Arrow table:

ecommerce/pipelines/bronze/parsed_events/pipeline.py
import json
import pyarrow.compute as pc
 
# Read raw CSVs from landing zone
raw = duckdb_conn.execute(f"""
    SELECT *
    FROM read_csv_auto(
        '{landing_zone("events")}',
        header = true,
        all_varchar = true
    )
""").fetch_arrow_table()
 
log.info(f"Read {raw.num_rows} raw events from landing zone")
 
# Parse the JSON 'properties' column into separate fields
event_ids = []
event_types = []
user_ids = []
timestamps = []
page_urls = []
durations = []
 
for i in range(raw.num_rows):
    row = {col: raw.column(col)[i].as_py() for col in raw.column_names}
 
    event_ids.append(row.get("event_id"))
    event_types.append(row.get("event_type"))
    user_ids.append(row.get("user_id"))
    timestamps.append(row.get("timestamp"))
 
    # Parse nested JSON
    props = json.loads(row.get("properties", "{}"))
    page_urls.append(props.get("page_url"))
    durations.append(props.get("duration_ms"))
 
result = pa.table({
    "event_id": pa.array(event_ids, type=pa.string()),
    "event_type": pa.array(event_types, type=pa.string()),
    "user_id": pa.array(user_ids, type=pa.string()),
    "timestamp": pa.array(timestamps, type=pa.string()),
    "page_url": pa.array(page_urls, type=pa.string()),
    "duration_ms": pa.array(durations, type=pa.string()),
    "_loaded_at": pa.array(
        [run_started_at] * raw.num_rows, type=pa.string()
    ),
})
 
log.info(f"Produced {result.num_rows} parsed events")
ecommerce/pipelines/bronze/parsed_events/config.yaml
merge_strategy: append_only
archive_landing_zones: true
description: Parse raw event CSVs and extract nested JSON properties

Tips and Best Practices

If your transformation can be expressed as a SELECT, use SQL. Python pipelines are harder to read, test, and maintain. Use Python only when SQL genuinely cannot do the job.

Prefer SQL

Don’t loop over rows in Python when DuckDB can do it in SQL. Register Arrow tables with duckdb_conn.register() and join/filter/aggregate in SQL.

Use DuckDB for heavy lifting

Use log.info() to emit progress messages. They appear in the portal’s run logs and help with debugging.

Log progress

Python pipelines should be short — ideally under 100 lines. If your pipeline is getting long, consider splitting it into multiple pipelines with ref() between them.

Keep it small
⚠️

Common mistake: Forgetting to set result. Every code path in your pipeline must end with result assigned to a pa.Table. If you have conditional branches, make sure each branch sets result.