Data Flow
This page shows how data moves through the RAT platform for the most common operations. Each flow is illustrated with a Mermaid sequence diagram and explained step by step.
Manual Pipeline Run
The most common flow: a user clicks “Run” in the portal, and the pipeline executes end-to-end.
Step-by-step
Portal sends run request
The user clicks “Run” in the portal. The portal sends POST /api/v1/runs with the pipeline identifier (namespace, layer, name) and trigger type (manual).
ratd creates a pending run
ratd inserts a new row in the runs table with status=pending and dispatches the execution to the runner via the SubmitPipeline gRPC call. The runner returns a run handle immediately --- execution is asynchronous.
Runner executes the 5-phase pipeline
The runner performs branch creation, config loading, SQL execution, Iceberg writes, and quality testing. See Pipeline Execution for the full breakdown.
Runner calls back to ratd
On completion (success or failure), the runner pushes the terminal status to ratd via an HTTP POST to the callback URL. This is faster than polling.
Portal picks up the result
The portal’s SWR hooks automatically revalidate the run status. The UI updates to show the completed run with duration, rows written, and any quality test results.
If the callback fails (network issue, ratd restart), ratd falls back to polling GetRunStatus on the runner every 60 seconds. The run will still be marked as complete, just with a slight delay.
Query Execution
When a user runs an interactive query in the portal’s query console.
Step-by-step
Portal sends the query
The user types SQL in the CodeMirror editor and clicks “Run”. The portal sends POST /api/v1/query/execute with the raw SQL text.
ratd validates and proxies
ratd performs basic validation (query size under 100 KB, timeout set to 30 seconds) and forwards the query to ratq via the ExecuteQuery gRPC call. ratd acts as a proxy --- it does not execute SQL itself.
ratq enforces read-only
Before execution, ratq scans the SQL for dangerous patterns. It blocks 25+ SQL statements (CREATE, DROP, INSERT, etc.) and 20+ functions (write_parquet, read_csv, etc.). If any blocked pattern is detected, it returns an error immediately.
ratq executes via DuckDB
The query runs against ratq’s persistent DuckDB connection, which has Iceberg tables pre-registered from the last catalog refresh (every 30 seconds). DuckDB reads Parquet files from MinIO via iceberg_scan().
Results return to the portal
Results are serialized as columnar data (Arrow format) from ratq to ratd, then converted to JSON for the portal. The portal renders results in a data table with type-colored values, row numbers, and zebra striping.
Queries have a 30-second timeout and a 100 KB size limit. If your query hits the timeout, consider optimizing it with filters, limits, or by creating a Gold-layer pipeline for pre-aggregation.
File Upload to Landing Zone
When a user uploads a file (CSV, Parquet, JSON) to a landing zone.
Step-by-step
User uploads a file
The user drags a file into a landing zone in the portal. The portal sends a multipart upload to POST /api/v1/storage/upload. ratd validates the file (size limits, allowed content types, no path traversal) and writes it to MinIO at s3://rat/{namespace}/landing/{zone_name}/{filename}.
File is registered in Postgres
ratd inserts a row in the landing_files table with the file’s S3 path, size, content type, and upload timestamp.
Pipeline reads from landing zone
When a bronze pipeline uses {{ landing_zone("orders") }}, the Jinja template resolves to the S3 glob pattern s3://rat/{namespace}/landing/orders/**. DuckDB’s read_csv_auto() or read_parquet() reads all files matching the pattern.
Post-run archival (optional)
If the pipeline has @archive_landing_zones: true, after a successful run, the source files are moved from the landing zone to _processed/{run_id}/. This prevents reprocessing on subsequent runs.
Scheduled Run
The background scheduler in ratd evaluates cron expressions and triggers pipeline runs automatically.
Step-by-step
Scheduler ticks every 30 seconds
The ratd scheduler runs a background goroutine that ticks every 30 seconds. On each tick, it queries the schedules table for rows where enabled = true AND next_run_at <= now().
Advisory lock prevents duplicate execution
Before processing due schedules, the scheduler acquires a Postgres advisory lock. This ensures that only one ratd instance (in a multi-replica scenario) evaluates schedules. Other instances skip the tick.
Run is created and dispatched
For each due schedule, the scheduler creates a run record with the trigger set to the schedule type (e.g., schedule:hourly) and dispatches it to the runner via the WarmPoolExecutor.
Schedule is updated
After dispatching, the scheduler updates the schedule’s last_run_at to now and computes the next_run_at using the cron expression via robfig/cron/v3.
Catch-up policy
If ratd was down and missed a scheduled run, the scheduler uses a catch-up-once policy: it fires the pipeline once (not once per missed tick). This prevents a flood of duplicate runs after a restart.
The scheduler supports standard 5-field cron expressions: minute hour day month weekday. Examples: 0 * * * * (hourly), */5 * * * * (every 5 minutes), 0 2 * * 1 (2 AM every Monday).
Pipeline Trigger (Event-Based)
When a pipeline is configured with a trigger (e.g., “run when upstream pipeline completes”).
How Triggers Work
Pipeline triggers are stored in the pipeline_triggers table. Each trigger has:
- type --- the event type (e.g.,
pipeline_success,landing_zone_upload) - config --- JSONB configuration (e.g., which upstream pipeline to watch)
- cooldown_seconds --- minimum time between trigger fires (prevents cascading)
- enabled --- toggle on/off
When an event occurs (e.g., a run completes successfully), ratd evaluates all matching triggers. If a trigger matches and is not in cooldown, it creates a new run and dispatches it.
Schema Introspection
When a user browses table schemas in the portal.
Catalog Refresh
ratq maintains an in-memory cache of the Iceberg catalog. Every 30 seconds, it refreshes this cache by querying Nessie for the current table list on the main branch. This means:
- New tables appear in the schema browser within 30 seconds of being merged
- Deleted tables disappear within 30 seconds
- Branch-isolated tables (mid-run) are never visible in the query console
Log Streaming
When a user watches pipeline logs in real-time during execution.
How Log Streaming Works
- The portal opens a Server-Sent Events (SSE) connection to
GET /api/v1/runs/{id}/logs - ratd opens a gRPC streaming call to the runner’s
StreamLogsRPC - The runner pushes log entries as they are produced (each phase, SQL compilation, DuckDB execution, Iceberg writes, quality test results)
- ratd converts each gRPC
LogEntrymessage to an SSE event and pushes it to the portal - When the run completes, the stream closes
The runner keeps logs in a bounded deque (in-memory ring buffer). On run completion, logs are persisted to the runs.logs JSONB column in Postgres.
Data Flow Summary
| Operation | Path | Protocol |
|---|---|---|
| Pipeline run | Portal → ratd → Runner → Nessie/MinIO → ratd (callback) | REST + gRPC + S3 + REST |
| Query | Portal → ratd → ratq → MinIO (read) | REST + gRPC + S3 |
| File upload | Portal → ratd → MinIO | REST + S3 |
| Schedule tick | ratd Scheduler → Runner → Nessie/MinIO → ratd (callback) | Internal + gRPC + S3 + REST |
| Log stream | Portal → ratd → Runner | SSE + gRPC streaming |
| Schema browse | Portal → ratd → ratq | REST + gRPC |
| Trigger fire | ratd (event) → Runner → Nessie/MinIO → ratd (callback) | Internal + gRPC + S3 + REST |