Data Flow

This page shows how data moves through the RAT platform for the most common operations. Each flow is illustrated with a Mermaid sequence diagram and explained step by step.


Manual Pipeline Run

The most common flow: a user clicks “Run” in the portal, and the pipeline executes end-to-end.

Step-by-step

Portal sends run request

The user clicks “Run” in the portal. The portal sends POST /api/v1/runs with the pipeline identifier (namespace, layer, name) and trigger type (manual).

ratd creates a pending run

ratd inserts a new row in the runs table with status=pending and dispatches the execution to the runner via the SubmitPipeline gRPC call. The runner returns a run handle immediately --- execution is asynchronous.

Runner executes the 5-phase pipeline

The runner performs branch creation, config loading, SQL execution, Iceberg writes, and quality testing. See Pipeline Execution for the full breakdown.

Runner calls back to ratd

On completion (success or failure), the runner pushes the terminal status to ratd via an HTTP POST to the callback URL. This is faster than polling.

Portal picks up the result

The portal’s SWR hooks automatically revalidate the run status. The UI updates to show the completed run with duration, rows written, and any quality test results.

If the callback fails (network issue, ratd restart), ratd falls back to polling GetRunStatus on the runner every 60 seconds. The run will still be marked as complete, just with a slight delay.


Query Execution

When a user runs an interactive query in the portal’s query console.

Step-by-step

Portal sends the query

The user types SQL in the CodeMirror editor and clicks “Run”. The portal sends POST /api/v1/query/execute with the raw SQL text.

ratd validates and proxies

ratd performs basic validation (query size under 100 KB, timeout set to 30 seconds) and forwards the query to ratq via the ExecuteQuery gRPC call. ratd acts as a proxy --- it does not execute SQL itself.

ratq enforces read-only

Before execution, ratq scans the SQL for dangerous patterns. It blocks 25+ SQL statements (CREATE, DROP, INSERT, etc.) and 20+ functions (write_parquet, read_csv, etc.). If any blocked pattern is detected, it returns an error immediately.

ratq executes via DuckDB

The query runs against ratq’s persistent DuckDB connection, which has Iceberg tables pre-registered from the last catalog refresh (every 30 seconds). DuckDB reads Parquet files from MinIO via iceberg_scan().

Results return to the portal

Results are serialized as columnar data (Arrow format) from ratq to ratd, then converted to JSON for the portal. The portal renders results in a data table with type-colored values, row numbers, and zebra striping.

⚠️

Queries have a 30-second timeout and a 100 KB size limit. If your query hits the timeout, consider optimizing it with filters, limits, or by creating a Gold-layer pipeline for pre-aggregation.


File Upload to Landing Zone

When a user uploads a file (CSV, Parquet, JSON) to a landing zone.

Step-by-step

User uploads a file

The user drags a file into a landing zone in the portal. The portal sends a multipart upload to POST /api/v1/storage/upload. ratd validates the file (size limits, allowed content types, no path traversal) and writes it to MinIO at s3://rat/{namespace}/landing/{zone_name}/{filename}.

File is registered in Postgres

ratd inserts a row in the landing_files table with the file’s S3 path, size, content type, and upload timestamp.

Pipeline reads from landing zone

When a bronze pipeline uses {{ landing_zone("orders") }}, the Jinja template resolves to the S3 glob pattern s3://rat/{namespace}/landing/orders/**. DuckDB’s read_csv_auto() or read_parquet() reads all files matching the pattern.

Post-run archival (optional)

If the pipeline has @archive_landing_zones: true, after a successful run, the source files are moved from the landing zone to _processed/{run_id}/. This prevents reprocessing on subsequent runs.


Scheduled Run

The background scheduler in ratd evaluates cron expressions and triggers pipeline runs automatically.

Step-by-step

Scheduler ticks every 30 seconds

The ratd scheduler runs a background goroutine that ticks every 30 seconds. On each tick, it queries the schedules table for rows where enabled = true AND next_run_at <= now().

Advisory lock prevents duplicate execution

Before processing due schedules, the scheduler acquires a Postgres advisory lock. This ensures that only one ratd instance (in a multi-replica scenario) evaluates schedules. Other instances skip the tick.

Run is created and dispatched

For each due schedule, the scheduler creates a run record with the trigger set to the schedule type (e.g., schedule:hourly) and dispatches it to the runner via the WarmPoolExecutor.

Schedule is updated

After dispatching, the scheduler updates the schedule’s last_run_at to now and computes the next_run_at using the cron expression via robfig/cron/v3.

Catch-up policy

If ratd was down and missed a scheduled run, the scheduler uses a catch-up-once policy: it fires the pipeline once (not once per missed tick). This prevents a flood of duplicate runs after a restart.

The scheduler supports standard 5-field cron expressions: minute hour day month weekday. Examples: 0 * * * * (hourly), */5 * * * * (every 5 minutes), 0 2 * * 1 (2 AM every Monday).


Pipeline Trigger (Event-Based)

When a pipeline is configured with a trigger (e.g., “run when upstream pipeline completes”).

How Triggers Work

Pipeline triggers are stored in the pipeline_triggers table. Each trigger has:

  • type --- the event type (e.g., pipeline_success, landing_zone_upload)
  • config --- JSONB configuration (e.g., which upstream pipeline to watch)
  • cooldown_seconds --- minimum time between trigger fires (prevents cascading)
  • enabled --- toggle on/off

When an event occurs (e.g., a run completes successfully), ratd evaluates all matching triggers. If a trigger matches and is not in cooldown, it creates a new run and dispatches it.


Schema Introspection

When a user browses table schemas in the portal.

Catalog Refresh

ratq maintains an in-memory cache of the Iceberg catalog. Every 30 seconds, it refreshes this cache by querying Nessie for the current table list on the main branch. This means:

  • New tables appear in the schema browser within 30 seconds of being merged
  • Deleted tables disappear within 30 seconds
  • Branch-isolated tables (mid-run) are never visible in the query console

Log Streaming

When a user watches pipeline logs in real-time during execution.

How Log Streaming Works

  1. The portal opens a Server-Sent Events (SSE) connection to GET /api/v1/runs/{id}/logs
  2. ratd opens a gRPC streaming call to the runner’s StreamLogs RPC
  3. The runner pushes log entries as they are produced (each phase, SQL compilation, DuckDB execution, Iceberg writes, quality test results)
  4. ratd converts each gRPC LogEntry message to an SSE event and pushes it to the portal
  5. When the run completes, the stream closes

The runner keeps logs in a bounded deque (in-memory ring buffer). On run completion, logs are persisted to the runs.logs JSONB column in Postgres.


Data Flow Summary

OperationPathProtocol
Pipeline runPortal → ratd → Runner → Nessie/MinIO → ratd (callback)REST + gRPC + S3 + REST
QueryPortal → ratd → ratq → MinIO (read)REST + gRPC + S3
File uploadPortal → ratd → MinIOREST + S3
Schedule tickratd Scheduler → Runner → Nessie/MinIO → ratd (callback)Internal + gRPC + S3 + REST
Log streamPortal → ratd → RunnerSSE + gRPC streaming
Schema browsePortal → ratd → ratqREST + gRPC
Trigger fireratd (event) → Runner → Nessie/MinIO → ratd (callback)Internal + gRPC + S3 + REST