Database Schema

RAT uses PostgreSQL 16.4 to store platform metadata. All actual data (tables, files) lives in S3 --- Postgres is metadata only. The schema consists of 20 tables organized into core, supporting, and infrastructure groups.


Entity Relationship Diagram


Table Groups

Core Tables

The primary entities that power the platform.

TableRows (typical)Purpose
namespaces1-10Logical groupings for pipelines and data
pipelines10-1000Pipeline registry (metadata, S3 path, ownership)
runs100-100KPipeline execution history
schedules10-500Cron-based pipeline triggers

Quality Tables

Quality testing and validation.

TableRows (typical)Purpose
quality_tests50-5000Registered quality test definitions
quality_results500-50KQuality test execution results

Supporting Tables

Additional features that extend core functionality.

TableRows (typical)Purpose
pipeline_triggers10-500Event-based pipeline triggers
pipeline_versions100-10KPipeline version snapshots (publish history)
table_metadata10-1000User-authored table/column descriptions
landing_zones5-100Landing zone definitions
landing_files100-10KUploaded file tracking

Multi-User Tables (Pro)

Used by Pro plugins for access control.

TableRows (typical)Purpose
ownership10-1000Per-object ownership registry
shares10-5000Resource access grants
projects1-50Project groupings
project_members10-500Project membership

Infrastructure Tables

Platform configuration and maintenance.

TableRows (typical)Purpose
plugins0-5Registered plugin health tracking
audit_log1K-1MAudit trail for state-changing operations
platform_settings2-5System configuration (retention, feature flags)
reaper_status1 (singleton)Data retention daemon status

Core Tables

namespaces

Logical groupings for organizing pipelines and data. The Community Edition auto-creates a default namespace on first startup.

namespaces
CREATE TABLE namespaces (
    id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name        VARCHAR(63) UNIQUE NOT NULL,
    description TEXT NOT NULL DEFAULT '',
    created_at  TIMESTAMPTZ NOT NULL DEFAULT now(),
    created_by  VARCHAR(255)
);
ColumnTypeNotes
idUUIDAuto-generated primary key
nameVARCHAR(63)Unique namespace slug (e.g., default, ecommerce)
descriptionTEXTHuman-readable description
created_atTIMESTAMPTZCreation timestamp
created_byVARCHAR(255)NULL for Community (single user)

Seed data: INSERT INTO namespaces (name) VALUES ('default') ON CONFLICT DO NOTHING;

pipelines

The central registry of all pipelines. S3 is the source of truth for code; Postgres tracks metadata, ownership, and relationships.

pipelines
CREATE TABLE pipelines (
    id                  UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    namespace           VARCHAR(63) NOT NULL REFERENCES namespaces(name) ON DELETE CASCADE,
    layer               VARCHAR(10) NOT NULL CHECK (layer IN ('bronze', 'silver', 'gold')),
    name                VARCHAR(255) NOT NULL,
    type                VARCHAR(10) NOT NULL DEFAULT 'sql' CHECK (type IN ('sql', 'python')),
    s3_path             VARCHAR(1024) NOT NULL,
    description         TEXT DEFAULT '',
    owner               VARCHAR(255),
    published_at        TIMESTAMPTZ,
    published_versions  JSONB NOT NULL DEFAULT '{}',
    draft_dirty         BOOLEAN NOT NULL DEFAULT false,
    max_versions        INTEGER NOT NULL DEFAULT 50,
    retention_config    JSONB,
    created_at          TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at          TIMESTAMPTZ NOT NULL DEFAULT now(),
    deleted_at          TIMESTAMPTZ,
    UNIQUE(namespace, layer, name)
);
ColumnTypeNotes
idUUIDAuto-generated primary key
namespaceVARCHAR(63)FK to namespaces. Cascades on delete.
layerVARCHAR(10)Constrained to bronze, silver, gold
nameVARCHAR(255)Pipeline name (unique within namespace+layer)
typeVARCHAR(10)sql or python
s3_pathVARCHAR(1024)Full S3 key prefix for the pipeline directory
descriptionTEXTHuman-readable description
ownerVARCHAR(255)NULL for Community (single user)
published_atTIMESTAMPTZWhen the pipeline was last published
published_versionsJSONBMap of file path to S3 version ID (pinned snapshot)
draft_dirtyBOOLEANTrue when HEAD differs from published versions
max_versionsINTEGERMax version history to retain (default 50)
retention_configJSONBPer-pipeline retention overrides (NULL = system default)
created_atTIMESTAMPTZCreation timestamp
updated_atTIMESTAMPTZLast modification timestamp
deleted_atTIMESTAMPTZSoft delete timestamp (NULL = active)

Indexes:

CREATE INDEX idx_pipelines_namespace ON pipelines(namespace);
CREATE INDEX idx_pipelines_layer ON pipelines(namespace, layer);
CREATE INDEX idx_pipelines_owner ON pipelines(owner) WHERE owner IS NOT NULL;

The unique constraint is (namespace, layer, name), meaning you can have a pipeline called clean_orders in both silver and gold layers of the same namespace, but not two in the same layer.

runs

Every pipeline execution is tracked as a run. Logs are stored inline as JSONB. Phase profiling data is stored for performance analysis.

runs
CREATE TABLE runs (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    pipeline_id     UUID NOT NULL REFERENCES pipelines(id) ON DELETE CASCADE,
    status          VARCHAR(20) NOT NULL DEFAULT 'pending'
                    CHECK (status IN ('pending', 'running', 'success', 'failed', 'cancelled')),
    trigger         VARCHAR(100) NOT NULL DEFAULT 'manual',
    started_at      TIMESTAMPTZ,
    finished_at     TIMESTAMPTZ,
    duration_ms     INT,
    rows_written    BIGINT,
    error           TEXT,
    logs_s3_path    VARCHAR(1024),
    logs            JSONB,
    phase_profiles  JSONB,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT now()
);
ColumnTypeNotes
statusVARCHAR(20)Lifecycle: pending -> running -> success/failed/cancelled
triggerVARCHAR(100)How the run was triggered: manual, schedule:hourly, trigger:upstream_name
duration_msINTTotal execution time in milliseconds
rows_writtenBIGINTNumber of rows written to Iceberg
errorTEXTError message if run failed
logsJSONBStructured log entries (persisted on completion)
phase_profilesJSONBPer-phase timing data (see Pipeline Execution)

Indexes:

CREATE INDEX idx_runs_pipeline ON runs(pipeline_id, created_at DESC);
CREATE INDEX idx_runs_status ON runs(status) WHERE status IN ('pending', 'running');

The idx_runs_status partial index covers only active runs, making stuck-run detection fast.

schedules

Cron-based triggers evaluated by ratd’s background scheduler every 30 seconds.

schedules
CREATE TABLE schedules (
    id            UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    pipeline_id   UUID NOT NULL REFERENCES pipelines(id) ON DELETE CASCADE,
    cron_expr     VARCHAR(100) NOT NULL,
    enabled       BOOLEAN NOT NULL DEFAULT true,
    last_run_id   UUID REFERENCES runs(id),
    last_run_at   TIMESTAMPTZ,
    next_run_at   TIMESTAMPTZ,
    created_at    TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at    TIMESTAMPTZ NOT NULL DEFAULT now()
);
ColumnTypeNotes
cron_exprVARCHAR(100)Standard 5-field cron expression
enabledBOOLEANToggle schedule on/off
last_run_idUUIDFK to the most recent run triggered by this schedule
next_run_atTIMESTAMPTZPre-computed next fire time (for efficient queries)

Index:

CREATE INDEX idx_schedules_next_run ON schedules(next_run_at) WHERE enabled = true;

The partial index on next_run_at makes the scheduler’s tick query efficient --- it only scans enabled schedules.


Quality Tables

quality_tests

Registered quality test definitions. The SQL content is stored in S3; Postgres tracks metadata.

quality_tests
CREATE TABLE quality_tests (
    id            UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    pipeline_id   UUID NOT NULL REFERENCES pipelines(id) ON DELETE CASCADE,
    name          VARCHAR(255) NOT NULL,
    description   TEXT DEFAULT '',
    severity      VARCHAR(10) NOT NULL DEFAULT 'error'
                  CHECK (severity IN ('error', 'warn')),
    s3_path       VARCHAR(1024) NOT NULL,
    created_at    TIMESTAMPTZ NOT NULL DEFAULT now(),
    UNIQUE(pipeline_id, name)
);
ColumnTypeNotes
severityVARCHAR(10)error = blocks merge, warn = log only
s3_pathVARCHAR(1024)Path to the test SQL file in S3

quality_results

Test execution results. One row per test per run.

quality_results
CREATE TABLE quality_results (
    id            UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    test_id       UUID NOT NULL REFERENCES quality_tests(id) ON DELETE CASCADE,
    run_id        UUID REFERENCES runs(id),
    status        VARCHAR(10) NOT NULL
                  CHECK (status IN ('passed', 'failed', 'warned', 'error')),
    value         NUMERIC,
    expected      NUMERIC,
    duration_ms   INT,
    ran_at        TIMESTAMPTZ NOT NULL DEFAULT now()
);
ColumnTypeNotes
statusVARCHAR(10)passed (0 violations), failed (>0 + error severity), warned (>0 + warn severity), error (test SQL failed)
valueNUMERICActual count of violations
expectedNUMERICExpected count (usually 0)
duration_msINTTest execution time

Index:

CREATE INDEX idx_quality_results_test ON quality_results(test_id, ran_at DESC);

Supporting Tables

pipeline_triggers

Event-based triggers for pipelines. More flexible than cron schedules --- triggers fire in response to events (upstream pipeline completion, landing zone uploads, etc.).

pipeline_triggers
CREATE TABLE pipeline_triggers (
    id                UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    pipeline_id       UUID NOT NULL REFERENCES pipelines(id) ON DELETE CASCADE,
    type              VARCHAR(50) NOT NULL,
    config            JSONB NOT NULL DEFAULT '{}',
    enabled           BOOLEAN NOT NULL DEFAULT true,
    cooldown_seconds  INT NOT NULL DEFAULT 0,
    last_triggered_at TIMESTAMPTZ,
    last_run_id       UUID REFERENCES runs(id),
    created_at        TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at        TIMESTAMPTZ NOT NULL DEFAULT now()
);
ColumnTypeNotes
typeVARCHAR(50)Event type: pipeline_success, landing_zone_upload, webhook
configJSONBTrigger-specific config (e.g., which upstream pipeline to watch)
cooldown_secondsINTMinimum time between fires (prevents cascading)

Indexes:

CREATE INDEX idx_pipeline_triggers_pipeline ON pipeline_triggers(pipeline_id);
CREATE INDEX idx_pipeline_triggers_type ON pipeline_triggers(type) WHERE enabled = true;
CREATE INDEX idx_pipeline_triggers_config
    ON pipeline_triggers USING gin (config jsonb_path_ops) WHERE enabled = true;

The GIN index on config enables efficient JSONB containment queries (e.g., “find all triggers watching pipeline X”).

pipeline_versions

Version history for published pipelines. Each publish creates a snapshot of S3 version IDs. Rollback creates a new version that re-pins old version IDs (git-revert style).

pipeline_versions
CREATE TABLE pipeline_versions (
    id                  UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    pipeline_id         UUID NOT NULL REFERENCES pipelines(id),
    version_number      INTEGER NOT NULL,
    message             TEXT NOT NULL DEFAULT '',
    published_versions  JSONB NOT NULL DEFAULT '{}',
    created_at          TIMESTAMPTZ NOT NULL DEFAULT now(),
    UNIQUE (pipeline_id, version_number)
);
ColumnTypeNotes
version_numberINTEGERAuto-incrementing per pipeline
messageTEXTPublish message (like a commit message)
published_versionsJSONBMap of {file_path: s3_version_id}

Index:

CREATE INDEX idx_pipeline_versions_pipeline
    ON pipeline_versions (pipeline_id, version_number DESC);

table_metadata

User-authored metadata for Iceberg tables. Separate from the pipeline table because metadata can exist for tables created outside RAT.

table_metadata
CREATE TABLE table_metadata (
    id                    UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    namespace             VARCHAR(63) NOT NULL,
    layer                 VARCHAR(10) NOT NULL,
    name                  VARCHAR(255) NOT NULL,
    description           TEXT NOT NULL DEFAULT '',
    owner                 VARCHAR(255),
    column_descriptions   JSONB NOT NULL DEFAULT '{}',
    created_at            TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at            TIMESTAMPTZ NOT NULL DEFAULT now(),
    UNIQUE(namespace, layer, name)
);
ColumnTypeNotes
column_descriptionsJSONBMap of {column_name: description}

landing_zones

Landing zone definitions. Each zone is a directory in S3 where users upload raw files.

landing_zones
CREATE TABLE landing_zones (
    id                    UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    namespace             VARCHAR(63) NOT NULL REFERENCES namespaces(name) ON DELETE CASCADE,
    name                  VARCHAR(255) NOT NULL,
    description           TEXT NOT NULL DEFAULT '',
    owner                 VARCHAR(255),
    expected_schema       TEXT NOT NULL DEFAULT '',
    processed_max_age_days INT,
    auto_purge            BOOLEAN NOT NULL DEFAULT false,
    created_at            TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at            TIMESTAMPTZ NOT NULL DEFAULT now(),
    UNIQUE(namespace, name)
);
ColumnTypeNotes
expected_schemaTEXTExpected file schema description (documentation)
processed_max_age_daysINTDays to keep processed files (NULL = system default)
auto_purgeBOOLEANAuto-delete processed files after max age

landing_files

Individual uploaded files within landing zones.

landing_files
CREATE TABLE landing_files (
    id            UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    zone_id       UUID NOT NULL REFERENCES landing_zones(id) ON DELETE CASCADE,
    filename      VARCHAR(1024) NOT NULL,
    s3_path       VARCHAR(1024) NOT NULL,
    size_bytes    BIGINT NOT NULL DEFAULT 0,
    content_type  VARCHAR(128) NOT NULL DEFAULT '',
    uploaded_by   VARCHAR(255),
    uploaded_at   TIMESTAMPTZ NOT NULL DEFAULT now()
);

Index:

CREATE INDEX idx_landing_files_zone ON landing_files(zone_id, uploaded_at DESC);

Multi-User Tables (Pro)

These tables are created in the Community Edition but only populated by Pro plugins.

ownership

Per-object ownership. Every pipeline, table, or namespace has a single owner.

ownership
CREATE TABLE ownership (
    id            UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    object_type   VARCHAR(20) NOT NULL CHECK (object_type IN ('pipeline', 'table', 'namespace')),
    object_id     UUID NOT NULL,
    owner         VARCHAR(255) NOT NULL,
    created_at    TIMESTAMPTZ NOT NULL DEFAULT now(),
    UNIQUE(object_type, object_id)
);

shares

Access grants. Owners share resources with users or roles.

shares
CREATE TABLE shares (
    id            UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    object_type   VARCHAR(20) NOT NULL CHECK (object_type IN ('pipeline', 'table', 'namespace')),
    object_id     UUID NOT NULL,
    shared_with   VARCHAR(255) NOT NULL,
    access_level  VARCHAR(10) NOT NULL CHECK (access_level IN ('read', 'write')),
    granted_by    VARCHAR(255) NOT NULL,
    granted_at    TIMESTAMPTZ NOT NULL DEFAULT now(),
    UNIQUE(object_type, object_id, shared_with)
);

The shared_with field can be a username (e.g., alice) or a role reference (e.g., role:data-engineer). The enforcement plugin resolves role memberships at query time.

projects and project_members

Soft groupings for organizing pipelines and tables. No isolation --- purely organizational.

projects
CREATE TABLE projects (
    id            UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name          VARCHAR(255) UNIQUE NOT NULL,
    description   TEXT DEFAULT '',
    created_by    VARCHAR(255),
    created_at    TIMESTAMPTZ NOT NULL DEFAULT now()
);
 
CREATE TABLE project_members (
    project_id    UUID NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
    object_type   VARCHAR(20) NOT NULL CHECK (object_type IN ('pipeline', 'table')),
    object_id     UUID NOT NULL,
    added_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    PRIMARY KEY (project_id, object_type, object_id)
);

Infrastructure Tables

plugins

Tracks registered plugins and their health status.

plugins
CREATE TABLE plugins (
    name          VARCHAR(63) PRIMARY KEY,
    slot          VARCHAR(20) NOT NULL,
    image         VARCHAR(500) NOT NULL,
    status        VARCHAR(20) NOT NULL DEFAULT 'unknown'
                  CHECK (status IN ('healthy', 'degraded', 'down', 'unknown')),
    config        JSONB DEFAULT '{}',
    last_health   TIMESTAMPTZ,
    created_at    TIMESTAMPTZ NOT NULL DEFAULT now()
);

audit_log

Audit trail for all state-changing operations. Written by the audit middleware on POST/PUT/DELETE requests.

audit_log
CREATE TABLE audit_log (
    id            UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    actor         VARCHAR(255) NOT NULL,
    action        VARCHAR(100) NOT NULL,
    object_type   VARCHAR(20),
    object_id     UUID,
    details       JSONB,
    timestamp     TIMESTAMPTZ NOT NULL DEFAULT now()
);

Indexes:

CREATE INDEX idx_audit_log_actor ON audit_log(actor, timestamp DESC);
CREATE INDEX idx_audit_log_action ON audit_log(action, timestamp DESC);
CREATE INDEX idx_audit_log_time ON audit_log(timestamp DESC);

Three indexes cover the most common query patterns: filter by actor, filter by action, and sort by time.

platform_settings

Key-value store for system configuration. Uses JSONB values for flexibility.

platform_settings
CREATE TABLE platform_settings (
    key         VARCHAR(63) PRIMARY KEY,
    value       JSONB NOT NULL,
    updated_at  TIMESTAMPTZ NOT NULL DEFAULT now()
);

Default entries:

retention settings (key: 'retention')
{
    "runs_max_per_pipeline": 100,
    "runs_max_age_days": 90,
    "logs_max_age_days": 30,
    "quality_results_max_per_test": 100,
    "soft_delete_purge_days": 30,
    "stuck_run_timeout_minutes": 120,
    "audit_log_max_age_days": 365,
    "nessie_orphan_branch_max_age_hours": 6,
    "reaper_interval_minutes": 60,
    "iceberg_snapshot_max_age_days": 7,
    "iceberg_orphan_file_max_age_days": 3
}
feature flags (key: 'feature_flags')
{
    "quality_tests": true,
    "landing_zones": true,
    "audit_log": true,
    "pipeline_triggers": true,
    "pipeline_versions": true
}

reaper_status

Singleton table (exactly 1 row, enforced by CHECK (id = 1)) tracking the data retention daemon’s status and counters.

reaper_status
CREATE TABLE reaper_status (
    id               INT PRIMARY KEY DEFAULT 1 CHECK (id = 1),
    last_run_at      TIMESTAMPTZ,
    runs_pruned      INT NOT NULL DEFAULT 0,
    logs_pruned      INT NOT NULL DEFAULT 0,
    quality_pruned   INT NOT NULL DEFAULT 0,
    pipelines_purged INT NOT NULL DEFAULT 0,
    runs_failed      INT NOT NULL DEFAULT 0,
    branches_cleaned INT NOT NULL DEFAULT 0,
    lz_files_cleaned INT NOT NULL DEFAULT 0,
    audit_pruned     INT NOT NULL DEFAULT 0,
    updated_at       TIMESTAMPTZ NOT NULL DEFAULT now()
);
ColumnTypeNotes
runs_prunedINTCumulative count of runs deleted by the reaper
runs_failedINTCumulative count of stuck runs marked as failed
branches_cleanedINTCumulative count of orphan Nessie branches deleted
pipelines_purgedINTCumulative count of soft-deleted pipelines permanently removed
lz_files_cleanedINTCumulative count of processed landing zone files purged
audit_prunedINTCumulative count of old audit log entries deleted

Schema Migrations

Schema migrations are managed by ratd on startup. Migration files live in platform/internal/postgres/migrations/ and are applied in order:

MigrationDescription
001_initial.sqlCore tables: namespaces, pipelines, runs, schedules, quality_tests, quality_results, plugins
002_landing_zones.sqlLanding zone tables
002_pipeline_unique_partial.sqlPartial unique index for soft deletes
003_audit_log.sqlAudit log table
004_pipeline_landing_zone.sqlPipeline-landing zone relationship (deprecated)
005_drop_pipeline_landing_zone.sqlDrop deprecated relationship
006_pipeline_triggers.sqlEvent-based pipeline triggers
007_run_logs.sqlAdd JSONB logs column to runs
008_phase_profiles.sqlAdd JSONB phase_profiles column to runs
009_pipeline_versioning.sqlAdd versioning columns to pipelines
010_pipeline_versions.sqlPipeline version history table
011_namespace_description.sqlAdd description to namespaces
012_table_metadata.sqlTable metadata with column descriptions
013_landing_zone_schema.sqlAdd expected_schema to landing zones
014_retention_settings.sqlPlatform settings + reaper status tables
015_feature_flags.sqlFeature flags in platform settings

Migrations are idempotent (IF NOT EXISTS, ON CONFLICT DO NOTHING) and forward-only. Rollback is not supported --- instead, create a new migration that reverses the change.


Key Design Decisions

UUIDs for Primary Keys

All tables use UUID primary keys generated by gen_random_uuid(). This provides:

  • Globally unique IDs without coordination
  • Safe for distributed systems (no sequence conflicts)
  • No information leakage (unlike auto-incrementing integers)

Soft Deletes

The pipelines table uses soft deletes (deleted_at column). This allows:

  • Undo within a grace period
  • Audit trail (deleted pipelines are still queryable)
  • The reaper permanently purges soft-deleted pipelines after 30 days

JSONB for Flexible Data

Several columns use JSONB for data that varies per record:

  • phase_profiles --- per-phase timing (varies by pipeline type)
  • published_versions --- file-to-version mapping (varies by pipeline)
  • config on triggers --- trigger-specific configuration
  • details on audit log --- request-specific details
  • retention_config on pipelines --- per-pipeline overrides

Timestamps

All mutable tables have created_at and updated_at columns. All timestamps use TIMESTAMPTZ (timezone-aware) to avoid UTC/local confusion.

Foreign Key Cascades

Most foreign keys use ON DELETE CASCADE, meaning:

  • Deleting a namespace deletes all its pipelines, landing zones
  • Deleting a pipeline deletes all its runs, schedules, quality tests, triggers, versions
  • Deleting a landing zone deletes all its files
  • Deleting a quality test deletes all its results