Python Pipelines
While SQL pipelines cover most use cases, sometimes you need the flexibility of Python. Python pipelines let you use PyArrow transformations, complex control flow, or DuckDB queries composed programmatically — all within a sandboxed execution environment.
When to Use Python vs SQL
Your transformation is expressible as a SELECT statement. SQL pipelines are simpler, more readable, and easier to maintain. The vast majority of pipelines should be SQL.
You need complex transformations that are awkward or impossible in SQL: pivoting with dynamic columns, custom parsing logic, row-level functions with state, or programmatic DuckDB query composition.
Common reasons to choose Python:
- Complex transformations — Multi-step logic that requires intermediate variables or loops
- Dynamic SQL generation — Building queries programmatically based on schema introspection
- Custom parsing — Handling non-standard file formats or nested structures
- PyArrow operations — Column-level operations using Arrow’s compute functions
- External data via DuckDB — Using DuckDB’s HTTP or other extensions for external reads
Python pipelines run in a security sandbox. You cannot access the filesystem, make HTTP calls directly, spawn processes, or import most standard library modules. If you need network access, use DuckDB’s built-in HTTP extension via duckdb_conn instead.
File Structure
A Python pipeline uses pipeline.py instead of pipeline.sql:
{namespace}/pipelines/{layer}/{pipeline_name}/pipeline.pyFor example:
ecommerce/pipelines/silver/parsed_events/pipeline.pyThe pipeline directory structure is the same as SQL pipelines. You can still have config.yaml and quality tests alongside the Python file:
ecommerce/pipelines/silver/parsed_events/
├── pipeline.py
├── config.yaml
└── tests/
└── quality/
└── test_no_null_event_ids.sqlA pipeline directory should contain either pipeline.sql or pipeline.py — never both. If both exist, RAT will use the SQL file and ignore the Python file.
Available Globals
When your pipeline.py executes, RAT injects several globals into the execution scope. These are the tools you use to read data, query DuckDB, and produce results.
| Global | Type | Description |
|---|---|---|
duckdb_conn | SafeDuckDBConnection | Pre-configured DuckDB connection with S3 and Iceberg extensions loaded. Use this for all queries. |
pa | pyarrow module | The PyArrow module, pre-imported for you. Use pa.Table, pa.array(), etc. |
ref | Callable[[str], str] | Same as SQL ref(). Returns an iceberg_scan(...) expression for use in SQL strings. |
landing_zone | Callable[[str], str] | Same as SQL landing_zone(). Returns the S3 glob path for the named zone. |
this | str | Current pipeline’s own Iceberg table path (for self-referencing in incremental). |
run_started_at | str | ISO 8601 UTC timestamp of the current run. |
is_incremental | bool | True if the strategy is incremental and the target table already exists. |
config | dict | The parsed config.yaml contents (or empty dict if none). |
result | None | The output variable. You must set this to a pa.Table. |
log | Logger | Logger instance. Use log.info(), log.warning(), log.error() for run logs. |
The result Contract
Every Python pipeline must set the result global to a PyArrow Table. This is the data that RAT writes to Iceberg.
# Minimal valid pipeline
result = pa.table({
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
})If your pipeline finishes without setting result, or sets it to something other than a pa.Table, the run will fail with an error. The result variable is the only way to produce output.
Using DuckDB via duckdb_conn
The duckdb_conn object is a pre-configured DuckDB connection with S3 and Iceberg extensions already loaded. Use it to query referenced tables, read from landing zones, or run any DuckDB SQL.
Querying Tables
# Query a referenced table
orders = duckdb_conn.execute(f"""
SELECT order_id, customer_id, total_amount, updated_at
FROM {ref('bronze.raw_orders')}
WHERE total_amount > 0
""").fetch_arrow_table()
result = ordersRunning Multiple Queries
# Read multiple sources
orders = duckdb_conn.execute(f"""
SELECT * FROM {ref('silver.clean_orders')}
""").fetch_arrow_table()
customers = duckdb_conn.execute(f"""
SELECT customer_id, segment FROM {ref('silver.customers')}
""").fetch_arrow_table()
# Join in DuckDB using the Arrow tables
duckdb_conn.register("orders_tbl", orders)
duckdb_conn.register("customers_tbl", customers)
result = duckdb_conn.execute("""
SELECT
o.*,
c.segment AS customer_segment
FROM orders_tbl o
LEFT JOIN customers_tbl c ON o.customer_id = c.customer_id
""").fetch_arrow_table()Reading from Landing Zones
# Read CSVs from a landing zone
raw_data = duckdb_conn.execute(f"""
SELECT *
FROM read_csv_auto('{landing_zone("events")}', header=true)
""").fetch_arrow_table()
log.info(f"Read {raw_data.num_rows} rows from landing zone")
result = raw_dataPyArrow Transformations
You can use PyArrow’s compute functions for column-level transformations that are easier to express in Python than SQL:
import pyarrow.compute as pc
# Read source data
events = duckdb_conn.execute(f"""
SELECT event_id, event_type, payload, created_at
FROM {ref('bronze.raw_events')}
""").fetch_arrow_table()
log.info(f"Processing {events.num_rows} events")
# PyArrow transformations
result = events.append_column(
"event_date",
pc.cast(events.column("created_at"), pa.date32())
).append_column(
"event_type_upper",
pc.utf8_upper(events.column("event_type"))
).append_column(
"_loaded_at",
pa.array([run_started_at] * events.num_rows, type=pa.string())
)You can import pyarrow.compute — it is part of the pyarrow package which is pre-imported as pa. Sub-modules of allowed packages are also allowed.
Incremental Python Pipelines
Python pipelines support the same incremental pattern as SQL. Use the is_incremental flag and this reference:
# config.yaml should have:
# merge_strategy: incremental
# unique_key: event_id
# watermark_column: created_at
if is_incremental:
# Get watermark from existing table
watermark = duckdb_conn.execute(f"""
SELECT MAX(created_at) AS wm FROM {this}
""").fetchone()[0]
log.info(f"Incremental run — watermark: {watermark}")
result = duckdb_conn.execute(f"""
SELECT event_id, event_type, payload, created_at
FROM {ref('bronze.raw_events')}
WHERE created_at > '{watermark}'
""").fetch_arrow_table()
else:
log.info("Full initial load")
result = duckdb_conn.execute(f"""
SELECT event_id, event_type, payload, created_at
FROM {ref('bronze.raw_events')}
""").fetch_arrow_table()
log.info(f"Produced {result.num_rows} rows")Configuration with config.yaml
Since Python pipelines do not have annotation comments, all configuration goes into config.yaml:
merge_strategy: incremental
unique_key: event_id
watermark_column: created_at
description: Parsed and enriched event stream
archive_landing_zones: trueYou can also read custom config values via the config global:
merge_strategy: full_refresh
description: Dynamic pivot table
# Custom config
pivot_column: category
value_columns:
- revenue
- quantitypivot_col = config.get("pivot_column", "category")
value_cols = config.get("value_columns", ["revenue"])
log.info(f"Pivoting on {pivot_col} with values {value_cols}")
# Use config to dynamically build query
source = duckdb_conn.execute(f"""
SELECT * FROM {ref('silver.sales')}
""").fetch_arrow_table()
duckdb_conn.register("source_tbl", source)
# Build dynamic PIVOT query
values_sql = ", ".join(value_cols)
result = duckdb_conn.execute(f"""
PIVOT source_tbl
ON {pivot_col}
USING SUM({values_sql})
""").fetch_arrow_table()Security Sandbox
Python pipelines execute in a sandboxed environment to prevent accidental or malicious operations. RAT enforces several layers of protection:
Blocked Built-in Functions
The following built-in functions are removed from the execution scope:
| Blocked | Reason |
|---|---|
eval() | Arbitrary code execution |
exec() | Arbitrary code execution |
open() | Filesystem access |
__import__() | Unrestricted module loading |
compile() | Code compilation |
globals() | Scope manipulation |
locals() | Scope manipulation |
breakpoint() | Debugger access |
input() | Interactive input (would hang) |
Blocked Imports
These modules cannot be imported:
| Module | Reason |
|---|---|
os | Operating system access |
sys | Interpreter manipulation |
subprocess | Process spawning |
shutil | Filesystem operations |
socket | Network access |
http | HTTP client/server |
requests | HTTP client |
urllib | URL operations |
pathlib | Filesystem paths |
importlib | Dynamic imports |
ctypes | C library access |
multiprocessing | Process spawning |
threading | Thread spawning |
AST Validation
RAT performs static analysis on your Python code before execution. The AST validator blocks:
- Double-underscore (
__dunder__) attribute access (e.g.,obj.__class__.__bases__) - Any attempt to circumvent the sandbox through Python’s object model
Why not just use a container? The sandbox is a first layer of defense inside the runner process. Container-level isolation is handled by Docker. The sandbox prevents pipelines from interfering with the runner’s own state or reading other pipelines’ data directly.
What IS Allowed
pyarrow— Full PyArrow module includingpyarrow.compute,pyarrow.csv,pyarrow.parquetduckdbqueries — Via the providedduckdb_conn(which has S3 and Iceberg extensions)json— For parsing JSON stringsre— Regular expressionsdatetime— Date and time operationsmath— Math functionshashlib— Hashingbase64— Encoding/decoding- Standard data types —
collections,itertools,functools,dataclasses
Full Example: CSV Ingest with Transformation
Here is a complete Python pipeline that reads CSV files from a landing zone, parses a nested JSON column, and produces a clean Arrow table:
import json
import pyarrow.compute as pc
# Read raw CSVs from landing zone
raw = duckdb_conn.execute(f"""
SELECT *
FROM read_csv_auto(
'{landing_zone("events")}',
header = true,
all_varchar = true
)
""").fetch_arrow_table()
log.info(f"Read {raw.num_rows} raw events from landing zone")
# Parse the JSON 'properties' column into separate fields
event_ids = []
event_types = []
user_ids = []
timestamps = []
page_urls = []
durations = []
for i in range(raw.num_rows):
row = {col: raw.column(col)[i].as_py() for col in raw.column_names}
event_ids.append(row.get("event_id"))
event_types.append(row.get("event_type"))
user_ids.append(row.get("user_id"))
timestamps.append(row.get("timestamp"))
# Parse nested JSON
props = json.loads(row.get("properties", "{}"))
page_urls.append(props.get("page_url"))
durations.append(props.get("duration_ms"))
result = pa.table({
"event_id": pa.array(event_ids, type=pa.string()),
"event_type": pa.array(event_types, type=pa.string()),
"user_id": pa.array(user_ids, type=pa.string()),
"timestamp": pa.array(timestamps, type=pa.string()),
"page_url": pa.array(page_urls, type=pa.string()),
"duration_ms": pa.array(durations, type=pa.string()),
"_loaded_at": pa.array(
[run_started_at] * raw.num_rows, type=pa.string()
),
})
log.info(f"Produced {result.num_rows} parsed events")merge_strategy: append_only
archive_landing_zones: true
description: Parse raw event CSVs and extract nested JSON propertiesTips and Best Practices
If your transformation can be expressed as a SELECT, use SQL. Python pipelines are harder to read, test, and maintain. Use Python only when SQL genuinely cannot do the job.
Don’t loop over rows in Python when DuckDB can do it in SQL. Register Arrow tables with duckdb_conn.register() and join/filter/aggregate in SQL.
Use log.info() to emit progress messages. They appear in the portal’s run logs and help with debugging.
Python pipelines should be short — ideally under 100 lines. If your pipeline is getting long, consider splitting it into multiple pipelines with ref() between them.
Common mistake: Forgetting to set result. Every code path in your pipeline must end with result assigned to a pa.Table. If you have conditional branches, make sure each branch sets result.