# Ondatra Labs
> The data runtime for teams that ship. One binary replaces ingestion, transformation, and orchestration.
One binary replaces ingestion, transformation, and orchestration.
- License: MIT
- Language: Go
- Repository: https://github.com/ondatra-labs/ondatrasql
- Discord: https://discord.gg/TQEqsa9q
---
## DuckDB Configuration
> How the runtime boots — DuckDB configuration via SQL files
URL: https://ondatra.sh/duckdb-configuration/
OndatraSQL prepares DuckDB before running your pipeline. Everything in `config/` is executed as SQL — no YAML, no config DSL.
This is where you configure DuckDB, define credentials, attach your catalog, and connect external sources.
## Mental Model
The runtime loads configuration in two phases:
**Before the catalog is attached** → configure DuckDB itself
**After the catalog is attached** → define logic that depends on your data
## Loading Order
**Pre-catalog:**
| File | What it does |
|------|--------------|
| [`settings.sql`](settings/) | Configure DuckDB (memory, threads, etc.) |
| [`secrets.sql`](secrets/) | Credentials — not stored in data, only used at runtime |
| [`extensions.sql`](extensions/) | Load DuckDB extensions |
**Catalog:**
| File | What it does |
|------|--------------|
| [`catalog.sql`](catalog/) | **Attach DuckLake** — without this, there's no database to write to |
**Post-catalog:**
| File | What it does |
|------|--------------|
| [`macros.sql`](macros/) | Reusable SQL logic |
| [`variables.sql`](variables/) | Runtime variables |
| [`sources.sql`](sources/) | External database connections |
All files except `catalog.sql` are optional. Missing files are silently skipped.
{{< callout type="info" >}}
Schemas are created automatically. Organize models into directories — `models/staging/orders.sql` creates `staging` schema on first run.
{{< /callout >}}
## Common Patterns
**Connect to PostgreSQL:**
```sql
-- config/sources.sql
ATTACH 'postgresql://${PG_USER}:${PG_PASSWORD}@${PG_HOST}/db' AS source (READ_ONLY);
```
**Add a reusable macro:**
```sql
-- config/macros.sql
CREATE MACRO is_active(status) AS status = 'active';
```
**Use cloud storage:**
```sql
-- config/secrets.sql
CREATE SECRET aws_chain (TYPE S3, PROVIDER credential_chain);
```
**Set DuckDB memory limit:**
```sql
-- config/settings.sql
SET memory_limit = '4GB';
SET threads = 4;
```
## Built-in Runtime
OndatraSQL automatically provides variables and macros after config loads. No setup required. These power change detection, lineage, and incremental execution.
### Variables
| Variable | Description |
|----------|-------------|
| `ondatra_run_time` | When the current run started (UTC) |
| `ondatra_load_id` | Unique identifier for this run |
| `curr_snapshot` | Current DuckLake snapshot ID |
| `prev_snapshot` | Previous snapshot ID (for CDC) |
| `dag_start_snapshot` | Snapshot at DAG start (consistent CDC across models) |
### Macros
**Runtime**
`ondatra_now()` · `ondatra_load_id()` · `last_snapshot()` · `prev_snapshot()` · `dag_start_snapshot()`
**Change Data Capture**
`ondatra_cdc_changes()` · `ondatra_cdc_deletes()` · `ondatra_cdc_updates_before()` · `ondatra_cdc_summary()` · `ondatra_cdc_empty()`
**Schema & Metadata**
`ondatra_table_exists()` · `ondatra_column_exists()` · `ondatra_column_type()` · `ondatra_get_columns()` · `ondatra_schema_hash()`
**Data Quality**
`ondatra_row_count()` · `ondatra_has_rows()` · `ondatra_min_value()` · `ondatra_max_value()` · `ondatra_distinct_count()` · `ondatra_null_count()` · `ondatra_duplicate_count()` · `ondatra_table_diff()` · `ondatra_compare_tables()`
**Metadata**
`ondatra_get_sql_hash()` · `ondatra_get_commit_info()` · `ondatra_get_downstream()`
## Editing
```bash
ondatrasql edit settings
ondatrasql edit secrets
ondatrasql edit catalog
ondatrasql edit macros
ondatrasql edit variables
ondatrasql edit sources
```
## Sandbox Mode
In [sandbox mode](/concepts/sandbox/), `catalog.sql` is skipped. Instead:
- **Production catalog** → attached read-only
- **Sandbox catalog** → attached writable
Models read from production, write to sandbox. All other config files load normally.
---
### settings.sql
> Execution settings — how the runtime uses DuckDB
URL: https://ondatra.sh/duckdb-configuration/settings/
**Phase:** Pre-catalog | **Order:** 1 | **Required:** No
OndatraSQL runs directly on DuckDB. This file controls how the runtime executes queries — memory usage, parallelism, and disk spill behavior.
You usually don't need to change this. Defaults work for most cases.
```sql
-- Limit how much memory the runtime can use
SET memory_limit = '8GB';
-- Control parallel execution
SET threads = 4;
-- Where to spill when memory is exceeded
SET temp_directory = '/tmp/duckdb';
-- Timestamp interpretation
SET TimeZone = 'Europe/Stockholm';
```
This file runs before the DuckLake catalog is attached.
## Common Settings
Most projects never change these. Adjust only if needed.
| Setting | What it controls | When to change |
|---|---|---|
| `memory_limit` | Max memory used by the runtime | Large datasets or limited RAM |
| `threads` | Parallel execution | Limit CPU on shared machines |
| `temp_directory` | Disk spill location | Slow disks or constrained environments |
| `TimeZone` | Timestamp interpretation | Working across regions |
For advanced tuning, see the [DuckDB configuration reference](https://duckdb.org/docs/stable/configuration/overview).
---
### secrets.sql
> Access external data without adding infrastructure
URL: https://ondatra.sh/duckdb-configuration/secrets/
**Secrets — access external data without adding infrastructure**
**Phase:** Pre-catalog | **Order:** 2 | **Required:** No
OndatraSQL runs locally, but your data doesn't have to.
This file defines how the runtime connects to external systems — cloud storage, databases, and APIs — using DuckDB's built-in [secrets manager](https://duckdb.org/docs/stable/configuration/secrets_manager).
No separate credential system. No service configuration. Just SQL.
You only need this if you connect to external data sources.
Local pipelines (CSV, Parquet, APIs via Starlark) do not require any secrets.
{{< callout type="warning" >}}
Never commit this file to version control.
Use environment variables (`.env`) for sensitive values, or rely on credential providers (like AWS credential chain) when available.
{{< /callout >}}
## Common Use Cases
### Cloud storage
```sql
-- Use AWS credential chain (recommended)
CREATE SECRET aws_chain (
TYPE s3,
PROVIDER credential_chain
);
```
### Explicit credentials (dev/testing)
```sql
CREATE SECRET s3_secret (
TYPE s3,
KEY_ID '${AWS_ACCESS_KEY_ID}',
SECRET '${AWS_SECRET_ACCESS_KEY}',
REGION 'eu-north-1'
);
```
### Scoped credentials
```sql
CREATE SECRET prod_bucket (
TYPE s3,
KEY_ID '${AWS_ACCESS_KEY_ID}',
SECRET '${AWS_SECRET_ACCESS_KEY}',
SCOPE 's3://prod-data/'
);
```
### Databases
```sql
CREATE SECRET pg_secret (
TYPE postgres,
HOST '${PG_HOST}',
PORT 5432,
DATABASE 'warehouse',
USER 'readonly',
PASSWORD '${PG_PASSWORD}'
);
```
Secrets are used by DuckDB itself — not by OndatraSQL. Once defined, they apply automatically to any query that accesses external data.
Supported providers include S3, GCS, R2, Azure, PostgreSQL, and MySQL.
---
### extensions.sql
> Optional capabilities for the runtime
URL: https://ondatra.sh/duckdb-configuration/extensions/
**Extensions — optional capabilities for the runtime**
**Phase:** Pre-catalog | **Order:** 3 | **Required:** No
OndatraSQL runs directly on DuckDB, and inherits its extension system.
Extensions add capabilities like cloud storage, external databases, or geospatial functions — without adding services or dependencies.
Most projects don't need this. Extensions are loaded only when required.
Use this file when an extension must be available before the pipeline runs. For example:
- Accessing S3 or cloud storage
- Connecting to external databases
- Using geospatial or specialized functions
There are two ways to use extensions:
- **`extensions.sql`** — load once for the entire runtime (global)
- **`@extension` directive** — load only for a specific model (local)
Prefer `@extension` unless the extension is required early (e.g. in `catalog.sql`).
## Common Extensions
### Cloud storage
```sql
-- Required for S3 / HTTP access
INSTALL httpfs;
LOAD httpfs;
```
### External databases
```sql
-- PostgreSQL connector
INSTALL postgres;
LOAD postgres;
-- MySQL connector
INSTALL mysql;
LOAD mysql;
```
### Specialized functions
```sql
-- Geospatial support
INSTALL spatial;
LOAD spatial;
```
Extensions are part of DuckDB — not a separate plugin system. They are loaded into the runtime and used transparently by your models.
{{< callout type="info" >}}
DuckDB automatically loads most core extensions on first use.
You only need to install/load extensions explicitly when:
- They must be available before execution (e.g. `catalog.sql`)
- You use community extensions
{{< /callout >}}
For advanced use cases, see the [DuckDB extensions documentation](https://duckdb.org/docs/stable/extensions/overview).
---
### catalog.sql
> Where your data lives — the only required configuration
URL: https://ondatra.sh/duckdb-configuration/catalog/
**Catalog — where your data lives**
**Phase:** Catalog | **Order:** 4 | **Required:** Yes
The catalog is where your data lives.
OndatraSQL writes tables to DuckLake — a versioned storage layer built on Parquet. This file tells the runtime where to store metadata and data files.
This is the only required configuration.
You don't provision a warehouse. You point the runtime at a location.
OndatraSQL handles:
- Table creation
- Schema evolution
- Snapshot management
- Change detection
You only define where the data should live.
## Mental Model
- The **catalog** stores metadata (tables, schemas, snapshots)
- The **data path** stores Parquet files
- Together, they form your warehouse
## Recommended Backends
If you're unsure, use SQLite.
### SQLite (default)
Zero configuration. Embedded. Works out of the box.
This is the default and the right choice for most use cases — including production on a single machine.
```sql
-- Local storage (default — created by ondatrasql init)
ATTACH 'ducklake:sqlite:ducklake.sqlite' AS lake
(DATA_PATH 'ducklake.sqlite.files');
```
```sql
-- Cloud storage (SQLite catalog + S3 data)
-- Requires: httpfs in extensions.sql
ATTACH 'ducklake:sqlite:ducklake.sqlite' AS lake
(DATA_PATH 's3://my-bucket/data/');
```
### PostgreSQL
Use this when multiple users or systems need to access the same catalog.
The data remains in DuckLake — PostgreSQL only stores metadata. This allows external tools (Metabase, Grafana, psql, pg_duckdb) to query your data without copying it.
```sql
-- Multi-user (PostgreSQL catalog + local data)
-- Requires: postgres in extensions.sql, credentials in secrets.sql
ATTACH 'ducklake:postgres:dbname=ducklake_catalog host=localhost' AS lake
(DATA_PATH '/data/warehouse/');
```
```sql
-- Multi-user (PostgreSQL catalog + S3 data)
ATTACH 'ducklake:postgres:dbname=ducklake_catalog host=db.prod.internal' AS lake
(DATA_PATH 's3://my-bucket/data/');
```
### Which to choose?
| | SQLite | PostgreSQL |
|---|---|---|
| **Setup** | Zero config | Requires a PostgreSQL server |
| **Read access** | OndatraSQL only | Any PostgreSQL client (pg_duckdb, Metabase, psql) |
| **Multi-machine** | No (local file) | Yes |
| **Performance** | Fast (embedded) | Fast (network round-trips for metadata) |
| **Cloud data** | Yes (S3/GCS via DATA_PATH) | Yes |
| **Best for** | Development, single-machine prod | Shared warehouse, BI tools, team access |
{{< callout type="warning" >}}
DuckLake also supports DuckDB and MySQL as catalog backends, but OndatraSQL only supports SQLite and PostgreSQL. Features like sandbox mode, CDC, and run-type detection are tested and maintained for these two backends only.
{{< /callout >}}
## Advanced Options
### Data Inlining
DuckLake automatically inlines small writes — inserts or deletes affecting fewer than 10 rows are stored directly in the catalog metadata instead of creating new Parquet files. This is **enabled by default** and requires no configuration.
This is particularly useful for incremental `append` models that add a few rows per run. Instead of creating a new Parquet file for each run, the data is stored in the catalog until it accumulates enough to flush.
Override the threshold via the `DATA_INLINING_ROW_LIMIT` option on ATTACH:
```sql
-- Increase threshold to 50 rows
ATTACH 'ducklake:sqlite:ducklake.sqlite' AS lake
(DATA_PATH 'ducklake.sqlite.files', DATA_INLINING_ROW_LIMIT 50);
```
```sql
-- Disable inlining entirely
ATTACH 'ducklake:sqlite:ducklake.sqlite' AS lake
(DATA_PATH 'ducklake.sqlite.files', DATA_INLINING_ROW_LIMIT 0);
```
You can also set it per-table after attachment:
```sql
CALL lake.set_option('data_inlining_row_limit', 50, table_name => 'staging.events');
```
Inlined data behaves identically to Parquet data — time travel, queries, and schema evolution all work the same. To manually flush inlined data to Parquet files, use:
```sql
CALL ducklake_flush_inlined_data('lake');
```
{{< callout type="info" >}}
The default threshold of 10 rows is a good balance for most workloads. Only change it if you have many models that consistently produce small batches (increase) or if you want all data in Parquet immediately (set to 0).
{{< /callout >}}
### Encryption
DuckLake can automatically encrypt all Parquet data files using the `ENCRYPTED` option. Each file gets a unique encryption key, stored in the catalog. Decryption is transparent — queries work identically.
This is useful when storing data on untrusted storage (S3, shared filesystems) where the catalog database is the trusted component.
```sql
-- Encrypted local storage
ATTACH 'ducklake:sqlite:ducklake.sqlite' AS lake
(DATA_PATH 'ducklake.sqlite.files', ENCRYPTED);
```
```sql
-- Encrypted cloud storage
ATTACH 'ducklake:sqlite:ducklake.sqlite' AS lake
(DATA_PATH 's3://my-bucket/data/', ENCRYPTED);
```
```sql
-- Encrypted multi-user (PostgreSQL catalog controls key access)
ATTACH 'ducklake:postgres:dbname=ducklake_catalog host=localhost' AS lake
(DATA_PATH 's3://my-bucket/data/', ENCRYPTED);
```
{{< callout type="info" >}}
Encryption keys are stored in the catalog database. Anyone with catalog access can decrypt the data. Use this to protect data at rest on untrusted storage — not as a substitute for catalog access control.
{{< /callout >}}
---
### macros.sql
> Reuse SQL without templating
URL: https://ondatra.sh/duckdb-configuration/macros/
**Macros — reuse SQL without templating**
**Phase:** Post-catalog | **Order:** 5 | **Required:** No
Macros let you reuse logic across models.
Unlike templating systems, these are real SQL functions — executed by DuckDB, not preprocessed.
No Jinja. No string templating. Just SQL.
Use macros when you want to:
- Reuse calculations across models
- Encapsulate business logic
- Define small helper functions
- Avoid repeating SQL
Built-in `ondatra_*` macros are created automatically — this file is for your own macros.
## Examples
### Pure computation
```sql
-- Simple reusable logic
CREATE OR REPLACE MACRO safe_divide(a, b) AS
CASE WHEN b = 0 THEN NULL ELSE a / b END;
CREATE OR REPLACE MACRO cents_to_dollars(cents) AS
cents / 100.0;
```
### Business logic
```sql
-- Read from a table
CREATE OR REPLACE MACRO default_currency() AS (
SELECT currency FROM mart.company_settings LIMIT 1
);
```
### Table macros
```sql
-- Return a result set
CREATE OR REPLACE MACRO recent_orders(n) AS TABLE
SELECT * FROM mart.orders ORDER BY order_date DESC LIMIT n;
```
## Usage in Models
Macros are available in all models automatically.
```sql
-- models/mart/revenue.sql
-- @kind: table
SELECT
order_date,
safe_divide(total_amount, quantity) AS unit_price,
cents_to_dollars(total_amount) AS amount_dollars
FROM staging.orders
WHERE currency = default_currency()
```
```sql
-- Table macros work as FROM sources
-- models/mart/recent_summary.sql
-- @kind: view
SELECT
COUNT(*) AS order_count,
SUM(amount) AS total
FROM recent_orders(100)
```
Table references work the same as in models — no special prefixes needed.
Macros are part of the runtime — not a separate layer. They run inside DuckDB and behave like native SQL functions.
{{< callout type="info" >}}
Macros are stored in DuckDB's in-memory catalog and made available to all models automatically. You don't need to configure anything.
{{< /callout >}}
DuckDB supports both **scalar macros** (return a single value) and **table macros** (return a result set). For advanced usage, see the [DuckDB macro documentation](https://duckdb.org/docs/stable/sql/statements/create_macro).
---
### variables.sql
> Configure your pipeline with SQL
URL: https://ondatra.sh/duckdb-configuration/variables/
**Variables — configure your pipeline with SQL**
**Phase:** Post-catalog | **Order:** 6 | **Required:** No
Variables let you define configuration inside your pipeline.
No YAML files. No separate config system. Just SQL.
Use variables for:
- Business logic (currency, tax rates, thresholds)
- Environment-specific settings
- Retention policies and limits
- Values reused across models and scripts
Variables are part of the runtime:
- **`.env`** — external configuration (secrets, credentials)
- **`variables.sql`** — pipeline logic and behavior
Built-in variables (`ondatra_run_time`, `ondatra_load_id`, snapshots) are set automatically.
## Examples
### Business config
```sql
SET VARIABLE default_currency = 'SEK';
SET VARIABLE vat_rate = 0.25;
SET VARIABLE fiscal_year_start = DATE '2024-07-01';
```
### Policies
```sql
SET VARIABLE archive_after_days = 365;
SET VARIABLE delete_after_days = 730;
```
### Dynamic values
```sql
-- Derived from data
SET VARIABLE latest_order_date = (
SELECT MAX(order_date) FROM mart.orders
);
```
## Usage
Variables are available everywhere:
- SQL models — `getvariable('name')`
- Scripts — `getvariable("name")`
```sql
SELECT *
FROM staging.orders
WHERE currency = getvariable('default_currency')
```
```python
currency = getvariable("default_currency") # "SEK"
```
Variables are evaluated at runtime — not templated. They behave like values in the database, not string substitutions.
If a variable is not set:
- SQL — returns `NULL`
- Scripts — returns empty string
For advanced usage, see the [DuckDB variable documentation](https://duckdb.org/docs/stable/sql/statements/set_variable).
---
### sources.sql
> No connectors. Just queries.
URL: https://ondatra.sh/duckdb-configuration/sources/
**Sources — no connectors. Just queries.**
**Phase:** Post-catalog | **Order:** 7 | **Required:** No
Query external data directly from your models.
No connectors. No ingestion tools. No sync jobs. Just attach a database and use it in SQL.
Use sources when your data already exists somewhere else:
- Application databases (PostgreSQL, MySQL)
- Operational systems
- Existing warehouses
You don't need to copy the data first.
## Mental Model
External data is just another table. Once attached, you query it like any other model.
Sources are read at query time — not synced.
## Examples
### PostgreSQL
```sql
-- Attach a PostgreSQL database (read-only)
-- Requires: postgres in extensions.sql, credentials in secrets.sql
ATTACH 'postgresql://user:pass@host:5432/warehouse' AS warehouse (READ_ONLY);
```
### MySQL
```sql
-- Attach a MySQL database (read-only)
-- Requires: mysql in extensions.sql, credentials in secrets.sql
ATTACH 'mysql:host=db.example.com port=3306 database=app' AS app_db (READ_ONLY);
```
## Usage in Models
Once attached, external tables behave like local tables. No ingestion step required.
```sql
-- models/raw/customers.sql
-- @kind: table
SELECT
customer_id,
name,
email,
created_at
FROM warehouse.public.customers
WHERE active = true
```
```sql
-- models/raw/products.sql
-- @kind: table
SELECT *
FROM app_db.products
```
## Files
For files (Parquet, CSV, JSON), no setup is required. Read them directly in your models:
```sql
SELECT * FROM read_parquet('data/*.parquet')
SELECT * FROM read_csv('data/events.csv')
SELECT * FROM read_json('s3://bucket/api-dump.json')
```
---
## Getting Started
> Run your first data pipeline in under 5 minutes
URL: https://ondatra.sh/getting-started/
Run your first data pipeline in under 5 minutes. No Kafka. No Airflow. No dbt.
## 1. Install
```bash
curl -fsSL https://ondatra.sh/install.sh | sh
```
One binary. No dependencies.
## 2. Create a Project
```bash
mkdir my-pipeline && cd my-pipeline
ondatrasql init
```
## 3. Write Your First Model
Create `models/staging/orders.sql`:
```sql
-- @kind: merge
-- @unique_key: order_id
SELECT * FROM (VALUES
(1, 'Alice', 100, '2026-01-15'),
(2, 'Bob', 200, '2026-02-20'),
(3, 'Charlie', 150, '2026-03-10')
) AS t(order_id, customer, amount, order_date)
```
The file path becomes the table name: `staging.orders`.
## 4. Run It
```bash
ondatrasql run
```
```
Running 1 models...
[OK] staging.orders (merge, backfill, 3 rows, 180ms)
```
## 5. Query the Result
```bash
ondatrasql sql "SELECT * FROM staging.orders"
```
```
| order_id | customer | amount | order_date |
| -------- | -------- | ------ | ---------- |
| 1 | Alice | 100 | 2026-01-15 |
| 2 | Bob | 200 | 2026-02-20 |
| 3 | Charlie | 150 | 2026-03-10 |
```
### What Just Happened
You didn't:
- set up a database
- configure a pipeline
- write orchestration logic
- install a warehouse
You wrote SQL, ran one command, and got a table.
## 6. Add a Downstream Model
Create `models/mart/revenue.sql`:
```sql
-- @kind: table
SELECT
order_date,
COUNT(*) AS orders,
SUM(amount) AS total
FROM staging.orders
GROUP BY order_date
```
Run again:
```bash
ondatrasql run
```
```
Running 2 models...
[OK] staging.orders (merge, skip — unchanged)
[OK] mart.revenue (table, backfill, 3 rows, 150ms)
```
OndatraSQL detected the dependency, skipped what hadn't changed, and built only what was needed. No configuration required.
## 7. Preview Changes Safely
```bash
ondatrasql sandbox
```
See what will change — row counts, schema diffs, downstream impact — before committing.
## What Else Can You Do
- **Ingest from APIs** — [Starlark scripts](/scripting/) with built-in HTTP and OAuth
- **Collect events** — [POST to an embedded endpoint](/concepts/events/), no Kafka
- **Validate data** — [26 constraints + 17 audits](/validation/) with automatic rollback
- **Track lineage** — [column-level](/lineage/), extracted from SQL AST
## That's the Model
You don't build pipelines. You run them.
{{< callout type="info" >}}
**Other install methods:** [Windows (WSL2)](https://learn.microsoft.com/en-us/windows/wsl/install), [build from source](/cli/#building-from-source) (Go 1.25+ and gcc/clang).
{{< /callout >}}
---
## CLI Reference
> Every command in one page
URL: https://ondatra.sh/cli/
One binary. These are the commands.
## Overview
Three core commands:
| Command | What it does |
|---|---|
| `run` | Execute your pipeline |
| `sandbox` | Preview changes safely |
| `daemon` | Collect events via HTTP |
Everything else is optional.
## Most Common Commands
```bash
ondatrasql run # Run your entire pipeline
ondatrasql sandbox # Preview before committing
ondatrasql run staging.orders # Run a single model
ondatrasql sql "SELECT * FROM staging.orders LIMIT 10"
```
## Example Workflow
```bash
# 1. Create a model
ondatrasql new staging/orders.sql
# 2. Edit it
ondatrasql edit staging.orders
# 3. Preview changes
ondatrasql sandbox
# 4. Run pipeline
ondatrasql run
# 5. Query results
ondatrasql sql "SELECT * FROM staging.orders"
```
---
## Core Commands
### run
Run your entire pipeline in one command.
```bash
ondatrasql run # All models in DAG order
ondatrasql run staging.orders # Single model
```
What it does:
- Builds the DAG automatically
- Detects what changed
- Runs only what's needed
- Writes results to DuckLake
No configuration required.
### sandbox
Preview all changes before committing.
```bash
ondatrasql sandbox # All models
ondatrasql sandbox staging.orders # One model
```
Shows row count diffs, schema evolution, downstream propagation — without touching production. See [Sandbox Mode](/concepts/sandbox/).
### daemon
Start the event collection endpoint.
```bash
COLLECT_PORT=8080 ondatrasql daemon
```
Receives events via HTTP, buffers them durably on disk. `ondatrasql run` flushes them to DuckLake.
```bash
# Custom ports
COLLECT_PORT=9090 COLLECT_ADMIN_PORT=9091 ondatrasql daemon
```
See [Event Collection](/concepts/events/).
---
## Development
### init
Create a new project in the current directory.
```bash
ondatrasql init
```
### new
Create a model file.
```bash
ondatrasql new staging/orders.sql
ondatrasql new raw/api_users.star
ondatrasql new raw/gam_report.yaml
```
### edit
Open in `$EDITOR`.
```bash
ondatrasql edit staging.orders # Model
ondatrasql edit env # .env
ondatrasql edit catalog # config/catalog.sql
ondatrasql edit macros # config/macros.sql
ondatrasql edit secrets # config/secrets.sql
ondatrasql edit settings # config/settings.sql
ondatrasql edit variables # config/variables.sql
ondatrasql edit sources # config/sources.sql
ondatrasql edit extensions # config/extensions.sql
```
---
## Querying
### sql
Run arbitrary SQL — no external database required.
```bash
ondatrasql sql "SELECT COUNT(*) FROM staging.orders"
ondatrasql sql "SELECT * FROM staging.orders" --format json
```
### query
Query a table directly.
```bash
ondatrasql query staging.orders
ondatrasql query staging.orders --limit 10 --format csv
```
Supports `--format csv|json|markdown` and `--limit N`.
---
## Introspection
### stats
Project overview — all models, kinds, status.
```bash
ondatrasql stats
```
### history
Run history for a model.
```bash
ondatrasql history staging.orders
ondatrasql history staging.orders --limit 5
```
### describe
Schema, dependencies, directives, SQL definition.
```bash
ondatrasql describe staging.orders
```
### lineage
Column-level lineage extracted from SQL AST.
```bash
ondatrasql lineage overview # All models
ondatrasql lineage staging.orders # One model
ondatrasql lineage staging.orders.total # One column
```
---
## Automation
### --json
Machine-readable output. Use it for CI/CD, alerts, or orchestrator integration.
```bash
ondatrasql run --json 2>/dev/null | jq -s '.'
```
Every model emits one JSON object:
```json
{
"model": "staging.orders",
"kind": "table",
"run_type": "incremental",
"rows_affected": 42,
"duration_ms": 567,
"status": "ok"
}
```
| Field | Description |
|---|---|
| `model` | Target table |
| `kind` | `table`, `view`, `append`, `merge`, `scd2`, `partition`, `events` |
| `run_type` | `skip`, `backfill`, `incremental`, `full`, `create`, `flush` |
| `run_reason` | Why this run type was chosen |
| `rows_affected` | Rows written (0 for skip) |
| `duration_ms` | Execution time |
| `status` | `ok` or `error` |
| `errors` | Error messages (when `status` is `error`) |
| `warnings` | Warnings (schema evolution, validation) |
### version
```bash
ondatrasql version
```
---
## Maintenance
DuckLake file management. All support `sandbox` to preview.
```bash
ondatrasql merge # Merge small Parquet files
ondatrasql expire # Expire old snapshots
ondatrasql cleanup # Delete unreferenced files
ondatrasql orphaned # Delete orphaned files
```
Preview first:
```bash
ondatrasql merge sandbox
ondatrasql expire sandbox
```
---
## Building from Source
Requires Go 1.25+ and a C compiler (`gcc` or `clang`) for the embedded DuckDB driver.
```bash
go install github.com/ondatra-labs/ondatrasql/cmd/ondatrasql@latest
```
If you see `cgo: C compiler not found`:
```bash
# Ubuntu/Debian
sudo apt install build-essential
# macOS
xcode-select --install
```
---
## Concepts
> How OndatraSQL works — the mental model
URL: https://ondatra.sh/concepts/
OndatraSQL runs your data pipeline.
You write SQL files. Run one command. Your data is materialized.
## The Mental Model
Think of OndatraSQL as a program that runs your data.
- **Files** = models
- **`ondatrasql run`** = execution
- **Output** = tables in DuckLake
You don't configure pipelines. You run them.
## What Happens When You Run
```bash
ondatrasql run
```
OndatraSQL:
1. Finds all models
2. Builds a dependency graph
3. Detects what changed
4. Runs only what's needed
5. Writes results to DuckLake
Done.
## Example
```
raw.events → staging.sessions → mart.daily_traffic
```
You don't define this graph. It's extracted from your SQL.
## The Runtime Model
Everything is built into one system:
- **Dependency graph** → automatic
- **Change detection** → automatic
- **Incremental loading** → automatic
- **Schema evolution** → automatic
- **Validation** → automatic
No config files. No orchestration layer.
## Data Flow
1. Data is collected or loaded
2. Stored durably on disk
3. Loaded into DuckDB
4. Transformed via SQL
5. Written to DuckLake
If anything fails, nothing is lost.
## Change Detection
Every model gets a run type:
| Run Type | Meaning |
|---|---|
| `skip` | Nothing changed |
| `incremental` | New data in source |
| `full` | Upstream changed |
| `backfill` | First run or definition changed |
You don't write incremental logic. It's built in.
## Safety
- **Constraints** block bad data before it's written
- **Audits** catch regressions after
- **Failures** roll back automatically via time-travel
Every run is reproducible.
## What OndatraSQL Is Not
OndatraSQL runs on a single machine. It is not:
- A distributed system
- A streaming platform
- A cloud warehouse
- An orchestrator
You don't orchestrate data pipelines. You run them.
---
### Models
> Everything is a model — ingestion, transformation, and events in one system
URL: https://ondatra.sh/concepts/models/
Everything in OndatraSQL is a model.
A model can transform data (SQL), ingest data (API scripts), or collect events (HTTP). All models run in the same pipeline, with the same execution model.
## Mental Model
- **SQL** = transformations
- **Scripts** = ingestion
- **Events** = streaming input
Different inputs — same pipeline.
## Three Formats — One Runtime
| Type | You write | OndatraSQL does |
|---|---|---|
| `.sql` | A SELECT statement | Materializes, tracks changes, evolves schema |
| `.star` | API logic | Runs it, buffers output, materializes |
| `.yaml` | Config for a source function | Calls it, materializes |
| `.sql` (events) | A column schema | Receives HTTP events, buffers, flushes |
File path = table name: `models/staging/orders.sql` → `staging.orders`.
## SQL Models
Write a SELECT. Everything else is automatic.
- Table creation
- Incremental logic
- Schema evolution
- Change detection
```sql
-- @kind: merge
-- @unique_key: order_id
SELECT order_id, customer_id, total, updated_at
FROM raw.orders
```
### Views
No materialization. Resolves live.
```sql
-- @kind: view
SELECT order_id, customer_id, total
FROM raw.orders
WHERE total > 0
```
## Starlark Scripts
Fetch data from APIs without leaving the pipeline. HTTP, OAuth, pagination — all built in. No Python, no dependencies.
```python
# @kind: append
# @incremental: updated_at
resp = http.get("https://api.example.com/users")
for user in resp.json:
save.row(user)
```
### Read From Your Data
Scripts can query DuckDB directly. This automatically creates dependencies in the DAG.
```python
rows = query("SELECT * FROM mart.customers WHERE synced = false")
for row in rows:
http.post("https://api.hubspot.com/contacts", json=row)
save.row({"id": row["id"], "synced_at": str(time.now())})
```
### Shared Libraries
Put reusable logic in `lib/`. Import with `load()`.
```python
load("lib/pagination.star", "paginate")
for page in paginate("https://api.example.com/users"):
for user in page:
save.row(user)
```
See [Scripting](/scripting/) for all built-in modules.
## Events Models
Define a schema. Send events via HTTP. No Kafka. No ingestion system.
```sql
-- @kind: events
event_name VARCHAR NOT NULL,
page_url VARCHAR,
user_id VARCHAR,
received_at TIMESTAMPTZ
```
Then:
```bash
curl -X POST localhost:8080/collect/raw/events \
-d '{"event_name":"pageview","page_url":"/home"}'
```
See [Event Collection](/concepts/events/).
## YAML Models
Use configuration instead of code. When the logic already exists in `lib/`.
```yaml
kind: append
incremental: report_date
source: gam_report
config:
network_code: ${GAM_NETWORK_CODE}
dimensions:
- AD_UNIT_NAME
- DATE
```
OndatraSQL calls your source function automatically. See [Blueprints](/blueprints/) for examples.
## Directives
Control behavior with comments:
```sql
-- @kind: merge
-- @unique_key: order_id
-- @incremental: updated_at
-- @constraint: order_id NOT NULL
-- @audit: row_count > 0
```
See [Directives](/concepts/directives/) for the full list.
## Why Models Matter
Most tools split this into multiple systems:
| Task | Traditional stack | OndatraSQL |
|---|---|---|
| Transform data | dbt | SQL model |
| Ingest APIs | Python + Airflow | Starlark model |
| Collect events | Kafka | Events model |
| Schedule runs | Airflow | `ondatrasql run` |
OndatraSQL unifies everything into models. One abstraction, one pipeline, one binary.
---
### Directives
> Directives turn SQL into a data pipeline
URL: https://ondatra.sh/concepts/directives/
Directives turn SQL into a data pipeline. They are comments that control how data is materialized, how changes are detected, and how data is validated.
You write SQL. Directives define everything else.
## Mental Model
- **SQL** → what the data is
- **Directives** → how it behaves
Together, they define a complete pipeline step.
## Before vs After
Without directives:
```sql
SELECT * FROM raw.orders
```
Just a query.
With directives:
```sql
-- @kind: merge
-- @unique_key: order_id
-- @incremental: updated_at
-- @constraint: total >= 0
-- @audit: row_count > 0
SELECT * FROM raw.orders
```
A fully managed pipeline — updates existing rows, blocks invalid data, verifies the result, tracks changes automatically.
## Grouped Reference
### Core
Control how data is materialized:
| Directive | What it does |
|---|---|
| `@kind` | Materialization strategy — `table`, `merge`, `append`, `scd2`, `partition`, `view`, `events` |
| `@unique_key` | Row matching key (required for merge, scd2, partition) |
| `@incremental` | Cursor column for incremental state |
| `@incremental_initial` | Starting cursor value (default: `1970-01-01T00:00:00Z`) |
### Storage & Performance
| Directive | What it does |
|---|---|
| `@partitioned_by` | DuckLake file partitioning — organizes data into separate files per key |
| `@sorted_by` | DuckLake sorted table — better statistics for faster queries |
### Validation
| Directive | What it does |
|---|---|
| `@constraint` | Block bad data before insert (26 patterns) |
| `@audit` | Verify results after insert, rollback on failure (17 patterns) |
| `@warning` | Log issues without failing |
### Metadata
| Directive | What it does |
|---|---|
| `@description` | Table comment in DuckLake catalog |
| `@column` | Column comment + masking tags |
### Runtime
| Directive | What it does |
|---|---|
| `@extension` | Load a DuckDB extension before execution |
## Syntax by File Type
Same directives, all formats:
{{< tabs >}}
{{< tab "SQL" >}}
```sql
-- @kind: merge
-- @unique_key: order_id
-- @constraint: order_id NOT NULL
SELECT * FROM raw.orders
```
{{< /tab >}}
{{< tab "Starlark" >}}
```python
# @kind: append
# @incremental: updated_at
resp = http.get("https://api.example.com/orders")
for order in resp.json:
save.row(order)
```
{{< /tab >}}
{{< tab "YAML" >}}
```yaml
kind: merge
unique_key: order_id
constraints:
- order_id NOT NULL
```
{{< /tab >}}
{{< /tabs >}}
## Column Masking
Tags in `@column` directives trigger automatic masking during materialization:
```sql
-- @column: email = Contact email | mask_email | PII
-- @column: ssn = SSN | mask_ssn | sensitive
```
Define masking macros in `config/macros.sql`:
```sql
CREATE OR REPLACE MACRO mask_email(val) AS
regexp_replace(val::VARCHAR, '(.).*@', '\1***@');
CREATE OR REPLACE MACRO mask_ssn(val) AS
'***-**-' || val[-4:];
```
Tags with `mask`, `hash`, or `redact` prefixes are applied as macro calls. Other tags (`PII`, `sensitive`) are metadata-only — stored for governance, no execution effect.
## Directive Compatibility
| Directive | table | view | append | merge | scd2 | partition | events |
|---|---|---|---|---|---|---|---|
| `@description` | Yes | Yes | Yes | Yes | Yes | Yes | Yes |
| `@unique_key` | — | — | — | Required | Required | Required | — |
| `@partitioned_by` | Hint | — | Hint | Hint | Hint | — | — |
| `@sorted_by` | Yes | — | Yes | Yes | Yes | Yes | — |
| `@incremental` | — | — | Yes | Yes | Yes | Yes | — |
| `@column` | Yes | — | Yes | Yes | Yes | Yes | — |
| `@constraint` | Yes | — | Yes | Yes | Yes | Yes | — |
| `@audit` | Yes | — | Yes | Yes | Yes | Yes | — |
| `@warning` | Yes | — | Yes | Yes | Yes | Yes | — |
| `@extension` | Yes | Yes | Yes | Yes | Yes | Yes | — |
## Why Directives Exist
Most tools split pipeline configuration across multiple systems — SQL for transformations, config files for behavior, separate tools for validation.
Directives unify everything inside the model. One file defines what the data is and how it behaves.
---
### Model Kinds
> Choose data behavior with one directive — not pipeline logic
URL: https://ondatra.sh/concepts/kinds/
Model kinds define how your data behaves. With one directive, you choose how data is stored, how changes are handled, and how updates are applied.
## Mental Model
Same SQL — different behavior:
```sql
SELECT * FROM raw.orders
-- @kind: append → only adds new rows
-- @kind: merge → updates existing, inserts new
-- @kind: scd2 → keeps full history of every change
-- @kind: table → rebuilds when something changed
```
## Which Kind Should You Use?
| Kind | Use for |
|---|---|
| `table` | Aggregates, marts, lookup tables |
| `view` | Lightweight transforms, staging |
| `append` | Event logs, fact tables, audit trails |
| `merge` | User/customer tables (upsert) |
| `scd2` | History tracking, dimension tables |
| `partition` | Large partitioned fact tables |
| `events` | HTTP event ingestion |
## view
No storage. Always up-to-date. Resolves at query time.
```sql
-- @kind: view
SELECT order_id, customer_id, total
FROM raw.orders WHERE total > 0
```
Recreated if definition changes. Skipped otherwise.
## table (default)
Rebuilds only when something changed. Tracks upstream dependencies — if nothing changed upstream, nothing runs.
```sql
-- @kind: table
SELECT * FROM raw.products WHERE is_active = true
```
| Condition | What happens |
|---|---|
| First run | Full build |
| SQL changed | Rebuild |
| Upstream changed | Rebuild |
| Nothing changed | Skip — no query, no I/O |
## append
Add new data. Never updates existing rows. [Smart CDC](/concepts/cdc/) on incremental runs.
```sql
-- @kind: append
SELECT * FROM raw.events
```
Source tables are rewritten to change-only queries via DuckLake time-travel.
## merge
Like an UPSERT. Updates existing rows, inserts new ones. [Smart CDC](/concepts/cdc/) on incremental runs.
```sql
-- @kind: merge
-- @unique_key: customer_id
SELECT customer_id, name, email, tier
FROM raw.customers
```
Requires `@unique_key`.
## scd2
Keeps every version of a row. Full history, permanent. Uses full-state comparison — not Smart CDC.
```sql
-- @kind: scd2
-- @unique_key: product_id
SELECT product_id, name, price, category
FROM raw.products
```
OndatraSQL adds `valid_from_snapshot`, `valid_to_snapshot`, and `is_current` automatically. Changed rows get closed and a new version inserted.
Requires `@unique_key`. History survives snapshot expiration.
## partition
Replaces only affected partitions. [Smart CDC](/concepts/cdc/) on incremental runs.
```sql
-- @kind: partition
-- @unique_key: year, month
SELECT order_id, total,
EXTRACT(YEAR FROM order_date) AS year,
EXTRACT(MONTH FROM order_date) AS month
FROM raw.orders
```
Requires `@unique_key` (supports multiple columns).
## events
Send events via HTTP. No Kafka.
```sql
-- @kind: events
event_name VARCHAR NOT NULL,
page_url VARCHAR,
user_id VARCHAR,
received_at TIMESTAMPTZ
```
Defines a column schema, not a SELECT. See [Event Collection](/concepts/events/).
## How Changes Are Detected
| Kind | Strategy |
|---|---|
| `view` | Definition hash |
| `table` | Dependency tracking — skip if unchanged |
| `append` / `merge` / `partition` | Smart CDC via time-travel |
| `scd2` | Full-state comparison |
| `events` | Always flush |
## Storage Hints
DuckLake optimizations for any materializing kind:
```sql
-- @sorted_by: updated_at → better statistics, faster queries
-- @partitioned_by: region → automatic file organization
```
## Why Model Kinds Exist
In most tools, materialization logic is spread across SQL, orchestration config, and custom scripts. Model kinds encode it directly in the model — one line replaces pipeline logic.
| Need | Traditional stack | OndatraSQL |
|---|---|---|
| Append data | Custom SQL + scheduler | `@kind: append` |
| Upsert rows | MERGE + orchestration | `@kind: merge` |
| Track history | Complex SCD logic | `@kind: scd2` |
| Collect events | Kafka + consumer | `@kind: events` |
---
### Smart CDC
> Write once. Run incrementally forever.
URL: https://ondatra.sh/concepts/cdc/
Write normal SQL. OndatraSQL automatically processes only changed data.
No `is_incremental()`. No manual filters. No extra logic.
## Mental Model
Your query, but only for new data. You don't change your SQL — OndatraSQL changes how it runs.
## Before and After
Without Smart CDC:
```sql
SELECT * FROM raw.events
WHERE event_time > (SELECT MAX(event_time) FROM target)
```
Manual logic. Easy to get wrong.
With Smart CDC:
```sql
-- @kind: append
SELECT * FROM raw.events
```
That's it.
## How It Works
OndatraSQL:
1. Finds your source tables (from SQL AST)
2. Checks what changed since last run (DuckLake snapshots)
3. Runs your query only on that data
First run: all rows. Every run after: only changes.
## Example
```sql
-- @kind: merge
-- @unique_key: customer_id
SELECT * FROM raw.customers
```
- First run → all rows
- Next runs → only changed rows
## Not All Tables Are Treated the Same
- **Main table** → only changed rows
- **Joins** → adjusted for correctness
You always get correct results — even with joins and aggregations.
## Where It Applies
Smart CDC works with:
- `append`
- `merge`
- `partition`
Other kinds use different strategies:
- `scd2` → full comparison (needs complete source)
- `table` → dependency tracking (skip or rebuild)
## Why This Matters
Most tools require writing incremental filters, managing state manually, and debugging edge cases.
Smart CDC removes all of that.
| Task | dbt | OndatraSQL |
|---|---|---|
| Incremental logic | Manual (`is_incremental()`) | Automatic |
| State handling | User-defined | Built-in |
| Edge cases | Your problem | Handled |
Write once. Run incrementally forever.
---
### Schema Evolution
> No migrations. Ever.
URL: https://ondatra.sh/concepts/schema-evolution/
Change your SQL. OndatraSQL updates the table automatically.
No migrations. No ALTER TABLE. No manual steps.
## Mental Model
Your SQL defines the schema. When the query changes, the table adapts.
## Before and After
Before:
```sql
SELECT order_id, total FROM raw.orders
```
After:
```sql
SELECT order_id, total, currency FROM raw.orders
```
OndatraSQL automatically adds the new column, updates the table, and keeps existing data.
## Additive Changes
Safe changes are applied automatically:
- **New columns** → added
- **Renamed columns** → detected via AST lineage
- **Types widened** → promoted safely (e.g. `INTEGER` → `BIGINT`)
No rebuild needed.
### Safe Type Promotions
| From | To |
|---|---|
| `TINYINT` | `SMALLINT`, `INTEGER`, `BIGINT`, `FLOAT`, `DOUBLE` |
| `SMALLINT` | `INTEGER`, `BIGINT`, `FLOAT`, `DOUBLE` |
| `INTEGER` | `BIGINT`, `FLOAT`, `DOUBLE` |
| `BIGINT` | `DOUBLE` |
| `FLOAT` | `DOUBLE` |
## Destructive Changes
If a change isn't safe:
- **Column removed**
- **Type narrowed**
OndatraSQL rebuilds the table automatically. No manual intervention.
## How It Works
1. Query runs into a temp table
2. Schemas compared
3. Changes applied — or table rebuilt
You modify your SQL. The table follows.
## Why This Matters
In most systems, schema changes require migration scripts, manual ALTER TABLE, and coordination between teams.
OndatraSQL removes all of that.
| Task | Traditional approach | OndatraSQL |
|---|---|---|
| Add column | Migration script | Update your SQL |
| Rename column | Manual ALTER | Automatic |
| Type change | Risky migration | Handled |
| Drop column | Careful coordination | Automatic rebuild |
No migrations. Ever.
---
### Incremental Models
> Incremental is not a feature — it's the default
URL: https://ondatra.sh/concepts/incremental/
Incremental processing is automatic. Write a normal query. OndatraSQL runs it on new data only.
## Mental Model
- **SQL models** → incremental by default
- **Scripts** → use a cursor
You don't write incremental logic — you just define the model.
## Before and After
Traditional approach:
```sql
SELECT * FROM raw.events
WHERE event_time > (SELECT MAX(event_time) FROM target)
```
Manual. Fragile.
OndatraSQL:
```sql
-- @kind: append
SELECT * FROM raw.events
```
Done. [Smart CDC](/concepts/cdc/) handles the rest.
## SQL Models
```sql
-- @kind: append
-- @incremental: event_time
SELECT * FROM raw.events
```
- First run → all data
- Next runs → only new data
Automatic via Smart CDC. No manual filters.
### Custom Start Value
```sql
-- @incremental: updated_at
-- @incremental_initial: "2024-01-01"
```
Default: `1970-01-01T00:00:00Z`.
## Starlark Scripts
Scripts don't have SQL to analyze — you control the cursor.
```python
# @kind: append
# @incremental: updated_at
url = "https://api.example.com/events"
if not incremental.is_backfill:
url += "?since=" + incremental.last_value
resp = http.get(url)
for item in resp.json:
save.row(item)
```
| Property | Description |
|---|---|
| `incremental.is_backfill` | `True` on first run |
| `incremental.last_value` | MAX(cursor) from previous run |
| `incremental.initial_value` | Starting value |
| `incremental.last_run` | Last successful run timestamp |
## SQL vs Scripts
| Model type | Incremental behavior |
|---|---|
| SQL | Automatic (Smart CDC) |
| Scripts | Manual cursor |
| YAML | Uses script logic |
## When Backfill Happens
- Model SQL or directives changed
- Target table doesn't exist
- `@kind`, `@unique_key`, `@incremental` changed
## Why This Matters
Most tools require writing incremental filters and managing state manually.
OndatraSQL makes incremental the default. Combined with [Smart CDC](/concepts/cdc/) and [Schema Evolution](/concepts/schema-evolution/), this eliminates most pipeline maintenance work.
---
### Dependency Graph
> Your queries define the graph
URL: https://ondatra.sh/concepts/dag/
Dependencies are automatic. Write queries. OndatraSQL figures out the order.
## Mental Model
- Files are nodes
- SQL references create edges
- Execution is automatic
You never define the DAG.
## How Dependencies Are Detected
- **SQL models** → table references from `FROM`, `JOIN`, CTEs, subqueries
- **Starlark models** → `query("...")` calls (followed through `load()` imports)
- **YAML models** → same as Starlark via source functions
Only references to other models create dependencies.
### Limitation
Only static SQL is detected:
```python
query("SELECT * FROM staging.orders") # tracked
query("SELECT * FROM " + table) # not tracked
```
Dynamic queries are silently skipped.
## Execution Order
If a model reads another model, it runs after it.
```
raw.orders → staging.orders
raw.customers → staging.customers
staging.* → mart.revenue
```
{{< mermaid >}}
graph LR
RO["raw.orders"] --> SO["staging.orders"]
RC["raw.customers"] --> SO
RC --> SC["staging.customers"]
SO --> MR["mart.revenue"]
SC --> MR
style RO fill:#1a2332,stroke:#4a6fa5,color:#e0e0e0
style RC fill:#1a2332,stroke:#4a6fa5,color:#e0e0e0
style SO fill:#1a2332,stroke:#3ecfcf,color:#e0e0e0
style SC fill:#1a2332,stroke:#3ecfcf,color:#e0e0e0
style MR fill:#1a2332,stroke:#e8854a,color:#e0e0e0
{{< /mermaid >}}
## Run Types
Each model gets a run type before execution:
| Run Type | Meaning |
|---|---|
| `skip` | Nothing changed |
| `backfill` | First run or definition changed |
| `incremental` | New/changed data |
| `full` | Upstream changed |
These decisions propagate through the graph.
## Propagation
When a model runs, downstream models are re-evaluated. Changes flow through the DAG.
If a model fails, downstream models do **not** run.
## Traditional Approach
```python
@task
def staging_orders():
...
@task
def mart_revenue():
staging_orders()
```
Manual DAG. Maintained by hand.
**OndatraSQL:**
```sql
SELECT * FROM staging.orders
```
DAG inferred automatically.
## Commands
**Run:**
```bash
ondatrasql run # All models in DAG order
ondatrasql run staging.orders # Single model
```
**Preview:**
```bash
ondatrasql sandbox
```
**Inspect:**
```bash
ondatrasql lineage overview # All dependencies
ondatrasql lineage staging.orders # One model
```
Circular dependencies are detected at build time.
## Why This Matters
Most tools require defining dependencies manually, maintaining DAGs, and debugging execution order.
OndatraSQL builds the graph from your code. Your queries define the graph.
---
### Sandbox Mode
> See the result before you commit it
URL: https://ondatra.sh/concepts/sandbox/
See the result before you commit it. No surprises. No blind runs.
```bash
ondatrasql sandbox
```
## Mental Model
- **Production** = read-only
- **Sandbox** = writable copy
- Your pipeline runs against both
You see the diff before anything is committed.
## How It Works
1. Production catalog mounted read-only
2. Temporary sandbox catalog created
3. Models run normally — reads from production, writes to sandbox
4. Results compared
5. Sandbox discarded
## What Makes It Different
This is not just a query preview. It's a full simulation.
- Runs the entire DAG — not just one model
- Applies schema evolution
- Executes validation (constraints + audits)
- Shows downstream impact
This is a real run — just not committed.
## Without Sandbox
1. Run pipeline
2. Something breaks
3. Debug in production
## With Sandbox
1. Preview everything
2. See impact
3. Then commit
## What You See
For each model:
- Row changes (added / removed / percentage)
- Schema changes (new columns, type changes)
- Sample diffs
- Validation results
- Downstream effects
```text
╠═══ Changes ══════════════════════════════════════════════════╣
║ staging.orders ║
║ Rows: 100 → 142 (+42, +42.0%) ║
║ ║
║ mart.revenue ║
║ SCHEMA EVOLUTION: + Added: region (VARCHAR) ║
║ Rows: 5 → 6 (+1, +20.0%) ║
```
## Failures Are Safe
If a model fails:
- Nothing is committed
- Downstream models are not affected
- Production never sees it
You fix it before it's real.
## Full DAG Preview
The entire pipeline executes:
```text
[OK] raw.customers (45ms)
[OK] staging.orders (95ms)
[OK] mart.revenue (54ms)
[FAIL] mart.kpis — constraint violated: total >= 0
```
You see exactly where things break.
## Commands
```bash
ondatrasql sandbox # All models
ondatrasql sandbox staging.orders # One model
```
Maintenance commands also support preview:
```bash
ondatrasql merge sandbox
ondatrasql expire sandbox
```
## When to Use It
- Before deploying changes
- After modifying SQL
- When adding new models
- When debugging data issues
Or simply: whenever you want certainty.
## Why This Matters
Most tools run pipelines blind. You find out something broke after it's committed.
Sandbox lets you run your pipeline before you run your pipeline.
---
### Maintenance
> Keep your data fast and your storage under control
URL: https://ondatra.sh/concepts/maintenance/
Keep your data fast and your storage under control. Built-in maintenance — no separate tooling needed.
## Mental Model
- Data is immutable (for time travel and CDC)
- Old data accumulates over time
- Maintenance removes what's no longer needed
## What Maintenance Does
| Command | What it does |
|---|---|
| `merge` | Combine small files → faster queries |
| `expire` | Remove old snapshot metadata |
| `cleanup` | Delete unused files → free storage |
| `orphaned` | Remove stray files from failed writes |
## Typical Workflow
```bash
ondatrasql merge
ondatrasql expire
ondatrasql cleanup
ondatrasql orphaned
```
Run weekly or as needed. Each step builds on the previous one.
## Safe by Default
All commands support sandbox — preview before deleting anything:
```bash
ondatrasql merge sandbox
ondatrasql cleanup sandbox
```
## When to Run What
| Command | When |
|---|---|
| `merge` | Queries slowing down (many small files) |
| `expire` | Regularly (e.g. 30-day retention) |
| `cleanup` | After expire |
| `orphaned` | After failures or crashes |
## One-Command Option
DuckLake's `CHECKPOINT` runs all steps at once:
```sql
CHECKPOINT;
```
Configure defaults:
```sql
CALL lake.set_option('expire_older_than', '30 days');
CALL lake.set_option('delete_older_than', '7 days');
```
## Scheduling
Use cron or any workflow scheduler:
```bash
# Weekly maintenance (Sunday 3 AM)
0 3 * * 0 cd /path/to/project && ondatrasql merge && ondatrasql expire && ondatrasql cleanup && ondatrasql orphaned
```
## Customizing
The SQL files in `sql/` are fully editable:
```bash
ondatrasql edit merge
ondatrasql edit expire
ondatrasql edit cleanup
ondatrasql edit orphaned
```
Adjust retention periods, add table-specific operations, or switch to `CHECKPOINT`.
## Why This Matters
Without maintenance, queries slow down and storage grows indefinitely. OndatraSQL keeps your data efficient with a few commands — no vacuum jobs, no external tools.
## Further Reading
- [DuckLake: Recommended Maintenance](https://ducklake.select/docs/stable/duckdb/maintenance/recommended_maintenance)
- [DuckLake: CHECKPOINT](https://ducklake.select/docs/stable/duckdb/maintenance/checkpoint)
---
### Event Collection
> Built-in event ingestion — no Kafka, no message broker
URL: https://ondatra.sh/concepts/events/
Ingest events directly over HTTP — no Kafka, no message broker, no infrastructure.
OndatraSQL provides a built-in ingestion endpoint with durable buffering and exactly-once delivery into DuckLake. Just POST events. They are stored, recovered on crash, and safely materialized during pipeline runs.
## Mental Model
Your app sends events via HTTP. OndatraSQL buffers them on disk. The pipeline flushes them into DuckLake.
No external systems. No moving parts.
## Why This Exists
Most event pipelines require Kafka (or similar), a consumer service, and a data warehouse loader. OndatraSQL replaces all of that with a single binary.
## Quick Start
### 1. Define the Schema
```sql
-- models/raw/events.sql
-- @kind: events
event_name VARCHAR NOT NULL,
page_url VARCHAR,
user_id VARCHAR,
received_at TIMESTAMPTZ
```
### 2. Start the Daemon
```bash
ondatrasql daemon
```
### 3. Send Events
```bash
curl -X POST localhost:8080/collect/raw/events \
-d '{"event_name":"pageview","page_url":"/home","user_id":"u42"}'
```
That's it. Events are buffered durably on disk.
### 4. Flush to DuckLake
```bash
ondatrasql run
```
Events are flushed, transformed, and queryable.
## Exactly-Once Delivery
Events are inserted into DuckLake in a single transaction. If a crash happens — no data is lost, no duplicates are created.
## Crash Recovery
If the system crashes:
- Events on disk → still there
- Committed batches → detected and cleaned up
- Uncommitted batches → retried automatically
No manual intervention required.
## When to Use This
Use event collection when:
- You don't want Kafka
- You run on a single machine or small cluster
- You want simple ingestion without infrastructure
## Not Designed For
- High-scale distributed streaming
- Real-time sub-second processing
- Multi-region ingestion
## Architecture
Two processes work together:
**Daemon** — receives events via HTTP, stores them durably on disk. Runs continuously.
**Runner** — flushes events into DuckLake. Runs as part of the pipeline (cron or manual).
{{< mermaid >}}
sequenceDiagram
participant App as Browser / App
participant Daemon as ondatrasql daemon
participant Runner as ondatrasql run
participant DL as DuckLake
App->>Daemon: POST /collect/raw/events
Daemon-->>App: 202 Accepted
Note over Runner: ondatrasql run
Runner->>Daemon: claim events
Runner->>DL: BEGIN: INSERT + ack → COMMIT
Runner->>Daemon: ack (cleanup)
{{< /mermaid >}}
## Flush Protocol
{{< mermaid >}}
flowchart TD
CLAIM["1. Claim batch from buffer"] --> TEMP["2. Load into temp table"]
TEMP --> BEGIN["3. BEGIN transaction"]
BEGIN --> INSERT["4. INSERT INTO target"]
INSERT --> ACK_REC["5. Write ack record"]
ACK_REC --> COMMIT["6. COMMIT"]
COMMIT --> CRASH_NOTE["crash window — safe: recovery detects ack record"]
CRASH_NOTE --> BADGER_ACK["7. Ack buffer (cleanup)"]
BADGER_ACK --> DELETE_ACK["8. Delete ack record"]
style CLAIM fill:#1a2332,stroke:#4a6fa5,color:#e0e0e0
style TEMP fill:#1a2332,stroke:#4a6fa5,color:#e0e0e0
style BEGIN fill:#1e2d3d,stroke:#3ecfcf,color:#e0e0e0
style INSERT fill:#1e2d3d,stroke:#3ecfcf,color:#e0e0e0
style ACK_REC fill:#1e2d3d,stroke:#3ecfcf,color:#e0e0e0
style COMMIT fill:#1a2332,stroke:#3ecfcf,color:#e0e0e0
style CRASH_NOTE fill:#1e2d3d,stroke:#e8854a,color:#e0e0e0
style BADGER_ACK fill:#1a2332,stroke:#555,color:#e0e0e0
style DELETE_ACK fill:#1a2332,stroke:#555,color:#e0e0e0
{{< /mermaid >}}
Steps 3–6 are one transaction. Steps 7–8 are idempotent cleanup.
## Endpoints
**Public** (COLLECT_PORT, default 8080):
| Method | Path | Description |
|---|---|---|
| `POST` | `/collect/{schema}/{table}` | Single event |
| `POST` | `/collect/{schema}/{table}/batch` | Batch (atomic) |
| `GET` | `/health` | Health check |
**Admin** (COLLECT_ADMIN_PORT, default 8081, localhost only):
| Method | Path | Description |
|---|---|---|
| `POST` | `/flush/.../claim` | Claim events |
| `POST` | `/flush/.../ack` | Acknowledge flush |
| `POST` | `/flush/.../nack` | Return to queue |
| `GET` | `/flush/.../inflight` | List inflight claims |
## Validation
`NOT NULL` columns must be present. Unknown fields are accepted but ignored during flush. `received_at` is auto-populated with server timestamp if not provided.
## Configuration
```bash
COLLECT_PORT=9090 COLLECT_ADMIN_PORT=9091 ondatrasql daemon
```
| Variable | Default | Description |
|---|---|---|
| `COLLECT_PORT` | `8080` | Public endpoint |
| `COLLECT_ADMIN_PORT` | `COLLECT_PORT + 1` | Internal flush API |
## Performance
Single machine (Xeon W-2295), ~200 byte payloads:
| Stage | Throughput |
|---|---|
| HTTP ingest | ~23,000 events/sec |
| Flush to DuckLake | ~12,000 events/sec |
No external systems. No cluster.
## Full Example
```sql
-- models/raw/events.sql
-- @kind: events
event_name VARCHAR NOT NULL,
page_url VARCHAR,
user_id VARCHAR,
session_id VARCHAR,
received_at TIMESTAMPTZ
```
```sql
-- models/staging/sessions.sql
-- @kind: view
SELECT
session_id, user_id,
MIN(received_at) AS started_at,
MAX(received_at) AS ended_at,
COUNT(*) AS page_views
FROM raw.events
WHERE event_name = 'pageview'
GROUP BY session_id, user_id
```
```sql
-- models/mart/daily_traffic.sql
-- @kind: merge
-- @unique_key: day
SELECT
DATE_TRUNC('day', started_at) AS day,
COUNT(DISTINCT session_id) AS sessions,
COUNT(DISTINCT user_id) AS visitors,
SUM(page_views) AS total_views
FROM staging.sessions
GROUP BY 1
```
```bash
# Start daemon
ondatrasql daemon &
# Send events
curl -X POST localhost:8080/collect/raw/events \
-d '{"event_name":"pageview","page_url":"/home","user_id":"u1","session_id":"s1"}'
# Run pipeline
ondatrasql run
# Query results
ondatrasql sql "SELECT * FROM mart.daily_traffic"
```
---
## Validation
> Built-in data validation — no external tools required
URL: https://ondatra.sh/validation/
OndatraSQL validates data as part of execution — not as a separate step. No external tools. No test framework. No extra runs.
Every model can enforce:
- **Constraints** → block bad data before it's written
- **Audits** → detect regressions after materialization
- **Warnings** → monitor without failing
```sql
-- @constraint: user_id NOT NULL
-- @constraint: email EMAIL
-- @audit: row_count > 0
-- @audit: row_count_change < 20%
-- @warning: null_count(email) < 5%
```
This ensures no invalid rows enter the table, no empty datasets are produced, sudden data drops fail the run, and minor issues are logged without blocking.
## When to Use What
| Type | When it runs | On failure | Use for |
|---|---|---|---|
| Constraint | Before insert | Nothing written | Enforce correctness |
| Audit | After insert | Rolled back | Detect anomalies |
| Warning | After insert | Logged only | Monitor trends |
## Mental Model
Validation happens in two stages:
**Before data is written** → validate individual rows. NULL checks, uniqueness, ranges, formats. If a constraint fails, nothing is committed.
**After data is written** → validate the result. Row count changes, distribution shifts, historical comparisons, cross-table reconciliation. These need the committed data and DuckLake snapshots.
Warnings use the same patterns as audits but only log — they never block.
## Automatic Rollback
If an audit fails, the transaction is rolled back via DuckLake time-travel. No bad data is committed. Your data remains consistent — automatically.
## Built on DuckLake
Audits can use time-travel to compare against previous versions:
```sql
-- This happens automatically inside audit patterns
SELECT COUNT(*) FROM table AT (VERSION => prev_snapshot)
```
This enables regression detection, historical comparisons, and reproducible validation — without external tooling.
## Usage
Define rules as directives in any model:
```sql
-- @constraint: id PRIMARY KEY
-- @audit: row_count > 0
-- @warning: row_count_change < 50%
```
See [Constraints](constraints/), [Audits](audits/), and [Warnings](warnings/) for all available patterns.
---
### Constraints
> Validation is part of execution — not a separate step
URL: https://ondatra.sh/validation/constraints/
Constraints stop bad data before it enters your tables.
If a single row violates a constraint, the entire model fails and nothing is written.
No separate testing framework. No post-hoc checks. Validation is part of execution.
## Why This Matters
Most data tools validate data after it's already written. By then, bad data is already in your tables.
OndatraSQL validates before insertion — so invalid data never lands.
## Mental Model
Constraints are enforced on the result of your query before it is written.
If they fail:
- Nothing is inserted
- Downstream models do not run
## Usage
Add constraints directly in your model:
```sql
-- @constraint: column_name PATTERN
```
You can define multiple constraints per model.
```sql
-- @constraint: email EMAIL
-- @constraint: total >= 0
-- @constraint: status IN ('pending', 'shipped', 'cancelled')
-- @constraint: customer_id REFERENCES customers(id)
```
If any of these fail, the model does not run.
## Common Patterns
| Pattern | Example | What it checks |
|---|---|---|
| `PRIMARY KEY` | `id PRIMARY KEY` | NOT NULL + UNIQUE |
| `NOT NULL` | `email NOT NULL` | No NULL values |
| `UNIQUE` | `code UNIQUE` | No duplicates |
| `IN (...)` | `status IN ('active', 'inactive')` | Allowed values |
| `>= N` | `total >= 0` | Minimum value |
| `EMAIL` | `email EMAIL` | Email format |
| `REFERENCES` | `customer_id REFERENCES customers(id)` | Foreign key |
| `NULL_PERCENT` | `email NULL_PERCENT < 10` | Max NULL percentage |
## All 26 Patterns
### Identity & Nullability
| Pattern | Example | Description |
|---|---|---|
| `PRIMARY KEY` | `id PRIMARY KEY` | NOT NULL + UNIQUE combined |
| `NOT NULL` | `email NOT NULL` | Column must not be NULL |
| `UNIQUE` | `code UNIQUE` | No duplicate values |
| `(col1, col2) UNIQUE` | `(year, month) UNIQUE` | Composite uniqueness |
| `NOT EMPTY` | `name NOT EMPTY` | Not NULL and not empty string |
### Comparison
| Pattern | Example | Description |
|---|---|---|
| `>= N` | `age >= 0` | Greater than or equal |
| `<= N` | `score <= 100` | Less than or equal |
| `> N` | `amount > 0` | Greater than |
| `< N` | `discount < 1` | Less than |
| `= N` | `version = 1` | Equal to |
| `!= N` | `status != 0` | Not equal to |
| `BETWEEN x AND y` | `rating BETWEEN 1 AND 5` | Range check |
### Membership
| Pattern | Example | Description |
|---|---|---|
| `IN (...)` | `status IN ('active', 'inactive')` | Allowed values (auto-quotes unquoted strings) |
| `NOT IN (...)` | `role NOT IN ('deleted')` | Forbidden values |
### Pattern Matching
| Pattern | Example | Description |
|---|---|---|
| `LIKE` | `email LIKE '%@%'` | SQL LIKE pattern |
| `NOT LIKE` | `name NOT LIKE 'test%'` | Inverse LIKE |
| `MATCHES` | `phone MATCHES '^\+[0-9]+'` | Regular expression |
| `EMAIL` | `email EMAIL` | Email format validation |
| `UUID` | `id UUID` | UUID format validation |
### String Length
| Pattern | Example | Description |
|---|---|---|
| `LENGTH BETWEEN a AND b` | `name LENGTH BETWEEN 1 AND 255` | String length range |
| `LENGTH = N` | `code LENGTH = 3` | Exact string length |
### Referential & Custom
| Pattern | Example | Description |
|---|---|---|
| `REFERENCES` | `customer_id REFERENCES customers(id)` | Foreign key check |
| `CHECK` | `total CHECK (total >= 0)` | Custom SQL expression |
### Statistical
| Pattern | Example | Description |
|---|---|---|
| `AT_LEAST_ONE` | `email AT_LEAST_ONE` | At least one non-NULL value |
| `NOT_CONSTANT` | `status NOT_CONSTANT` | At least 2 distinct values |
| `NULL_PERCENT` | `email NULL_PERCENT < 10` | Maximum NULL percentage |
| `DISTINCT_COUNT OP N` | `status DISTINCT_COUNT >= 3` | Number of distinct values (cardinality) |
| `DUPLICATE_PERCENT < N` | `email DUPLICATE_PERCENT < 5` | Maximum duplicate percentage |
| `SEQUENTIAL` | `id SEQUENTIAL` | No gaps in integer sequence |
| `SEQUENTIAL(N)` | `id SEQUENTIAL(5)` | Gaps allowed only by step N |
| `NO_OVERLAP` | `(start_date, end_date) NO_OVERLAP` | Time intervals don't overlap |
## Full Example
```sql
-- @kind: merge
-- @unique_key: order_id
-- @constraint: order_id PRIMARY KEY
-- @constraint: total >= 0
-- @constraint: status IN ('pending', 'shipped', 'delivered', 'cancelled')
-- @constraint: status DISTINCT_COUNT >= 2
-- @constraint: customer_id REFERENCES customers(id)
-- @constraint: email EMAIL
-- @constraint: email DUPLICATE_PERCENT < 10
-- @constraint: order_date NOT NULL
SELECT order_id, customer_id, email, total, status, order_date
FROM raw.orders
```
## Compared to Traditional Validation
Traditional approach:
- Write tests separately (dbt tests, Great Expectations)
- Run after data is written
- Failures require cleanup
OndatraSQL:
- Define constraints in the model
- Run before data is written
- Fail fast — no bad data enters the table
---
### Audits
> Detect regressions — rolled back automatically if they fail
URL: https://ondatra.sh/validation/audits/
Audits detect regressions after data is written.
If an audit fails, the transaction is rolled back and the model fails.
Constraints stop bad rows. Audits catch bad results.
## Why This Matters
Some data issues can't be detected row by row.
- Row counts suddenly drop
- Distributions drift
- Freshness breaks
- Aggregates don't match source systems
These are not row-level problems — they are dataset-level problems. That's what audits are for.
## When Audits Run
Audits run inside the same transaction as the data write.
This means:
- Either everything succeeds
- Or nothing is committed
If any audit fails:
- The transaction is rolled back
- No data is committed
- Downstream models do not run
## Usage
Add audits directly in your model:
```sql
-- @audit: PATTERN
```
You can define multiple audits per model.
```sql
-- @audit: row_count > 0
-- @audit: row_count_change < 20%
-- @audit: freshness(updated_at, 24h)
-- @audit: reconcile_sum(total, raw.orders.total)
-- @audit: distribution(status) STABLE(0.1)
```
If any of these fail, the entire model is rolled back.
## Constraints vs Audits
**Constraints:**
- Run before insert
- Validate individual rows
- Prevent bad data from entering
**Audits:**
- Run after insert
- Validate the dataset as a whole
- Catch regressions and anomalies
## Common Patterns
| Pattern | Example | What it catches |
|---|---|---|
| `row_count > 0` | `row_count > 0` | Empty results |
| `freshness(col, duration)` | `freshness(updated_at, 24h)` | Stale data |
| `row_count_change < N%` | `row_count_change < 20%` | Sudden drops or spikes |
| `min(col) OP N` | `min(price) >= 0` | Out-of-range values |
| `max(col) OP N` | `max(price) <= 10000` | Out-of-range values |
| `reconcile_sum(col, other.col)` | `reconcile_sum(total, raw.orders.total)` | Source/target mismatch |
| `distribution(col) STABLE(N)` | `distribution(status) STABLE(0.1)` | Category drift |
## All 17 Patterns
### Row Count
| Pattern | Example | Description |
|---|---|---|
| `row_count OP N` | `row_count >= 100` | Check total row count (supports `>=`, `<=`, `>`, `<`, `=`) |
| `row_count_change < N%` | `row_count_change < 10%` | Row count change from previous run |
### Freshness
| Pattern | Example | Description |
|---|---|---|
| `freshness(col, duration)` | `freshness(updated_at, 24h)` | Maximum age of most recent record (supports `h` for hours, `d` for days) |
### Statistical
| Pattern | Example | Description |
|---|---|---|
| `mean(col) ...` | `mean(price) BETWEEN 10 AND 100` | Average value check (supports BETWEEN and comparisons) |
| `stddev(col) < N` | `stddev(amount) < 25` | Standard deviation threshold |
| `min(col) OP N` | `min(price) >= 0` | Minimum value check |
| `max(col) OP N` | `max(price) <= 10000` | Maximum value check |
| `sum(col) OP N` | `sum(amount) >= 0` | Aggregate sum check |
| `zscore(col) < N` | `zscore(amount) < 3.0` | Statistical outlier detection |
| `percentile(col, p) OP N` | `percentile(latency, 0.95) <= 500` | Quantile validation |
### Reconciliation
| Pattern | Example | Description |
|---|---|---|
| `reconcile_count(table)` | `reconcile_count(raw.orders)` | Row counts must match |
| `reconcile_sum(col, other.col)` | `reconcile_sum(total, raw.orders.total)` | Aggregate sums must match |
### Schema
| Pattern | Example | Description |
|---|---|---|
| `column_exists(col)` | `column_exists(email)` | Verify column exists in output |
| `column_type(col, type)` | `column_type(price, DECIMAL(18,2))` | Verify column type (supports precision) |
### Data Comparison
| Pattern | Example | Description |
|---|---|---|
| `golden('path')` | `golden('tests/expected.csv')` | Compare output with golden dataset |
| `distribution(col) STABLE(threshold)` | `distribution(status) STABLE(0.1)` | Categorical distribution stability |
## Full Example
```sql
-- @kind: append
-- @incremental: event_time
-- @audit: row_count > 0
-- @audit: freshness(event_time, 24h)
-- @audit: row_count_change < 50%
-- @audit: min(amount) >= 0
-- @audit: zscore(amount) < 3.0
-- @audit: reconcile_sum(amount, raw.events.amount)
SELECT event_id, event_time, amount, category
FROM raw.events
```
## Compared to Traditional Validation
Traditional approach:
- Run tests after data is written
- Failures require manual cleanup
- Data may already be used downstream
OndatraSQL:
- Audits run inside the transaction
- Failures automatically roll back
- Bad data never persists
---
### Warnings
> Non-blocking validation — observe before you enforce
URL: https://ondatra.sh/validation/warnings/
Warnings surface issues without stopping the pipeline.
They let you observe data quality before enforcing it.
Use warnings when you're not ready to fail — yet.
## Why Warnings Exist
Not all data issues should block your pipeline.
- You're introducing a new check
- You're unsure about thresholds
- You're monitoring a known issue
- You want visibility before enforcement
Warnings let you see problems without breaking production.
## Constraints vs Audits vs Warnings
**Constraints:**
- Fail before insert
- Block bad rows
**Audits:**
- Fail after insert
- Roll back bad results
**Warnings:**
- Do not fail
- Log issues for visibility
## How Warnings Run
Warnings run after data is written.
They evaluate the same patterns as constraints and audits, but:
- They do not fail the model
- They do not trigger rollback
- They are logged in execution output
Warnings are not ignored — they are signals. They show up in logs, CLI output, and metadata, and can be tracked over time.
## From Warning to Enforcement
A common workflow:
1. Start with a warning
2. Observe behavior over time
3. Tune thresholds
4. Promote to constraint or audit
Validation evolves with your data — no rewrites needed.
## Usage
```sql
-- @warning: PATTERN
```
Warnings support the same patterns as constraints and audits.
You can use:
- Row-level checks (`NOT NULL`, `UNIQUE`, `IN`, etc.)
- Dataset checks (`row_count_change`, `freshness`, `distribution`, etc.)
### Dataset checks
```sql
-- @warning: row_count_change < 50%
-- @warning: mean(amount) BETWEEN 0 AND 1000
-- @warning: freshness(updated_at, 48h)
-- @warning: distribution(status) STABLE(0.2)
-- @warning: min(price) >= 0
```
### Row-level checks
```sql
-- @warning: email NOT NULL
-- @warning: status IN ('active', 'inactive', 'pending')
-- @warning: price >= 0
-- @warning: sku UNIQUE
-- @warning: email MATCHES '^[^@]+@[^@]+$'
```
## Example
```sql
-- @kind: merge
-- @unique_key: product_id
-- @constraint: product_id PRIMARY KEY
-- @audit: row_count > 0
-- @warning: row_count_change < 50%
-- @warning: mean(price) BETWEEN 1 AND 10000
-- @warning: description NOT NULL
SELECT product_id, name, price, description
FROM raw.products
```
In this example:
- **Constraints** enforce correctness — primary key must be valid
- **Audits** enforce expectations — row count must exist
- **Warnings** monitor anomalies — without failing the run
Large row count swings, unusual price averages, and missing descriptions generate log warnings but do not block execution.
---
## Scripting
> Built-in scripting for ingestion — no Python required
URL: https://ondatra.sh/scripting/
OndatraSQL uses Starlark for data ingestion and custom logic. No Python. No dependencies. No runtime setup.
Write scripts that call APIs, handle pagination and auth, and emit rows directly into your pipeline. All in the same system as your SQL models.
## Mental Model
Starlark models produce rows. SQL models transform them. Both are part of the same DAG.
## How It Works
```python
# @kind: append
# @incremental: updated_at
resp = http.get("https://api.example.com/data")
for item in resp.json:
save.row({
"id": item["id"],
"name": item["name"],
"updated_at": item["updated_at"],
})
```
What happens:
1. Script fetches data
2. `save.row()` emits rows
3. OndatraSQL materializes the result into DuckLake
Same directives, same DAG, same validation as SQL models.
## Why Starlark
- No Python environment to manage
- No dependency conflicts
- Deterministic execution
- Runs the same everywhere
## Built-in Modules
Everything you need is built in — no imports required. No SDKs, no client libraries, no setup.
| Module | What it does |
|---|---|
| [http](/scripting/http/) | API requests with retry, digest auth, mTLS |
| [oauth](/scripting/oauth/) | OAuth 2.0 flows with auto-refresh |
| [save](/scripting/save/) | Emit rows to the pipeline |
| [query](/scripting/modules/#query) | Read DuckDB tables |
| [incremental](/scripting/modules/#incremental) | Cursor state for incremental loads |
| [env](/scripting/modules/#env) | Access environment variables |
| [xml, csv](/scripting/modules/#data-formats) | Parse and encode data formats |
| [url, crypto](/scripting/modules/) | URL building, hashing, signing |
| [time, math, json, re](/scripting/modules/#standard-library) | Standard library |
See [Language Reference](/scripting/language/) for Starlark syntax.
## Shared Libraries
Reuse logic across models. Write once, use everywhere.
```python
load("lib/helpers.star", "paginate")
for page in paginate("https://api.example.com/users"):
for user in page:
save.row(user)
```
### How load() Works
- Resolved relative to project root
- Executed once per run, cached across all `load()` calls
- Nested loads supported (A loads B loads C)
- Import cycles detected
- Path traversal blocked
Library modules have access to all built-ins except `save` (must be passed as a parameter to functions that write data).
## YAML Models
Configure ingestion without writing code. Use when the logic already exists in `lib/`.
```yaml
kind: append
incremental: report_date
source: gam_report
config:
network_code: ${GAM_NETWORK_CODE}
dimensions:
- AD_UNIT_NAME
- DATE
```
```python
# lib/gam_report.star
def gam_report(save, network_code="", dimensions=None):
start = incremental.initial_value if incremental.is_backfill else incremental.last_value
# ... fetch and paginate API ...
for row in results:
save.row(row)
```
See [Models](/concepts/models/#yaml-models) for the full YAML reference.
## Example: Paginated API
```python
# @kind: append
page = 1
while True:
resp = http.get("https://api.example.com/users?page=" + str(page))
if len(resp.json) == 0:
break
for user in resp.json:
save.row(user)
page += 1
sleep(0.1) # rate limit
```
## Global Functions
```python
abort() # clean exit — 0 rows, no error
fail("something went wrong") # stop with error
sleep(1.5) # rate limiting
print("debug info") # stderr (secrets auto-redacted)
getvariable("currency") # read DuckDB session variable
```
Useful for retries, rate limits, and conditional execution.
## Secret Redaction
Secrets are automatically removed from logs. No configuration required.
```
Bearer eyJhbG... → Bearer [REDACTED]
token=abc123 → token=[REDACTED]
```
Safe by default — no accidental leaks in logs.
---
### HTTP Module
> Built-in HTTP client for API ingestion
URL: https://ondatra.sh/scripting/http/
Fetch data from APIs directly inside your pipeline.
No Python. No requests. No separate ingestion tool.
The `http` module is built into OndatraSQL and designed for production data ingestion. It is a predeclared global — no import needed.
## Why This Exists
Most data stacks require a separate ingestion layer:
- Python scripts
- Airbyte connectors
- Custom services
With OndatraSQL, ingestion is part of the pipeline. You fetch data, transform it, and materialize it — in one runtime.
## Mental Model
- `http.get()` fetches data
- `save.row()` emits rows
- OndatraSQL handles storage, validation, and materialization
You don't write scripts around APIs — you write models.
## Production-Ready by Default
- Automatic retries with exponential backoff
- Rate limit handling (`Retry-After`)
- OAuth, Basic, and Digest authentication
- mTLS support (client certificates)
- Built-in timeout and error handling
## HTTP Methods
All methods follow the same pattern:
```python
http.get(url, ...)
http.post(url, ...)
http.put(url, ...)
http.patch(url, ...)
http.delete(url, ...)
```
### GET
```python
resp = http.get(url)
resp = http.get(url, headers={"Authorization": "Bearer " + token})
resp = http.get(url, params={"page": "1", "limit": "50"})
resp = http.get(url, headers=headers, timeout=60, retry=3)
```
### POST
```python
# JSON body (sets Content-Type: application/json automatically)
resp = http.post(url, json={"key": "value"})
# Form data (sets Content-Type: application/x-www-form-urlencoded automatically)
resp = http.post(url, data={"key": "value"})
# With headers and options
resp = http.post(url, json=payload, headers=headers, timeout=60)
```
### PUT / PATCH / DELETE
```python
resp = http.put(url, json=payload, headers=headers)
resp = http.patch(url, json=patch)
resp = http.delete(url, headers=headers)
```
`json=` and `data=` are mutually exclusive — use one or the other.
## Response Object
| Field | Type | Description |
|---|---|---|
| `status_code` | int | HTTP status code |
| `text` | string | Raw response body |
| `ok` | bool | `True` if status code is 200–299 |
| `json` | any | Parsed JSON, or `None` if the response is not JSON |
| `headers` | dict | Response headers |
## Options
| Kwarg | Default | Description |
|---|---|---|
| `headers` | `None` | Request headers dict |
| `params` | `None` | Query parameters dict (appended to URL) |
| `json` | `None` | JSON body (dict, list, or string) |
| `data` | `None` | Form-encoded body (dict) |
| `timeout` | `30` | Request timeout in seconds |
| `retry` | `0` | Number of retry attempts |
| `backoff` | `1` | Initial backoff between retries in seconds |
| `auth` | `None` | Authentication tuple — see [Authentication](#authentication) |
| `cert` | `None` | Path to client certificate file (PEM) — see [mTLS](#mtls) |
| `key` | `None` | Path to client private key file (PEM) |
| `ca` | `None` | Path to custom CA certificate file (PEM) |
## Retry and Backoff
APIs fail. OndatraSQL retries automatically.
- Retries on 429 and 5xx errors
- Exponential backoff with jitter
- Respects `Retry-After` headers
When the server sends a `Retry-After` header (on 429 or 503), OndatraSQL respects it — both integer seconds and HTTP-date (RFC 1123) formats are supported. The `Retry-After` value overrides the calculated backoff for that retry.
```python
# Retry up to 3 times with 2s initial backoff (2s, 4s, 8s + jitter)
resp = http.get(url, retry=3, backoff=2)
```
## Authentication {#authentication}
### Basic Auth
```python
# Basic auth (default scheme)
resp = http.get(url, auth=("username", "password"))
resp = http.get(url, auth=("username", "password", "basic"))
```
### Digest Auth
[HTTP Digest Authentication](https://datatracker.ietf.org/doc/html/rfc7616) (RFC 7616) is a challenge-response scheme. The client first receives a 401 with a nonce from the server, then re-sends the request with a computed hash. OndatraSQL handles this automatically.
```python
# Digest auth — the challenge-response is handled transparently
resp = http.get(url, auth=("username", "password", "digest"))
```
Supports MD5 and SHA-256 algorithms with `qop=auth`.
When using digest auth, at least one retry is required for the challenge-response flow. OndatraSQL automatically sets `retry=1` if it's not already >= 1.
## mTLS (Mutual TLS) {#mtls}
For APIs that require client certificate authentication (PSD2/Open Banking, government APIs, enterprise service-to-service), use the `cert` and `key` kwargs.
```python
# Client certificate + private key
resp = http.get(url, cert="/path/to/client.crt", key="/path/to/client.key")
# With custom CA (for self-signed server certificates)
resp = http.get(url,
cert="/path/to/client.crt",
key="/path/to/client.key",
ca="/path/to/ca.crt",
)
# Combine with other kwargs
resp = http.post(url,
json=payload,
cert="client.crt",
key="client.key",
timeout=30,
)
```
- `cert` and `key` must be provided together
- `ca` is optional — use it when the server uses a self-signed or private CA certificate
- All paths are PEM-encoded files
## Example: Paginated API
Typical ingestion loop:
```python
# @kind: append
token = oauth.token(
token_url="https://auth.example.com/token",
client_id=env.get("CLIENT_ID"),
client_secret=env.get("CLIENT_SECRET"),
)
page = 1
while True:
resp = http.get(
"https://api.example.com/users",
headers={"Authorization": "Bearer " + token.access_token},
params={"page": str(page)},
retry=3,
backoff=2,
)
if not resp.ok:
fail("API error: " + str(resp.status_code))
if len(resp.json["data"]) == 0:
break
for user in resp.json["data"]:
save.row({
"id": user["id"],
"name": user["name"],
"email": user["email"],
})
page = page + 1
sleep(0.1) # Rate limiting
```
## Example: HMAC Request Signing {#hmac-signing}
Some APIs (AWS, Stripe webhooks, payment providers) require HMAC-signed requests. Use the [crypto module](/scripting/modules/#crypto) to compute signatures.
```python
# @kind: append
payload = json.encode({"event": "payment", "amount": 100})
timestamp = str(int(time.now().unix))
# Sign the request
signature = crypto.hmac_sha256(
env.get("API_SECRET"),
timestamp + "." + payload,
)
resp = http.post("https://api.example.com/webhook",
json=json.decode(payload),
headers={
"X-Signature": signature,
"X-Timestamp": timestamp,
},
)
```
## Integrated with the Pipeline
Data fetched via http is:
- Buffered durably
- Validated via constraints and audits
- Versioned in DuckLake
- Tracked in lineage
No separate ingestion layer required.
## Compared to Traditional Ingestion
Traditional approach:
- Python scripts with requests
- Airbyte connectors
- Separate orchestration
OndatraSQL:
- HTTP client built into the runtime
- Runs inside your pipeline
- No extra services or languages
---
### Save Module
> Emit rows that become the model output
URL: https://ondatra.sh/scripting/save/
Turn script output into a table.
Every row you emit becomes part of the model's result.
`save` is how scripts produce data in OndatraSQL. It is a predeclared global — no import needed.
## Why This Exists
In SQL models, your `SELECT` defines the output.
In scripts, you generate rows manually.
`save` bridges that gap — it lets you emit rows that OndatraSQL materializes.
## Mental Model
- `http.get()` fetches data
- You transform it in Starlark
- `save.row()` emits rows
- OndatraSQL materializes the result
Scripts behave like `SELECT` statements — but with full control.
Rows are collected incrementally during execution. You don't need to build the full dataset in memory — just emit rows as you process them.
## Methods
Emit rows one by one or in batches:
### row
Emit a single row:
```python
save.row({
"id": 1,
"name": "Alice",
"email": "alice@example.com",
})
```
### rows
Emit multiple rows:
```python
save.rows([
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
])
```
### count
Get the current number of collected rows:
```python
n = save.count()
```
## Column Types
Column types are inferred automatically from values:
| Starlark Type | DuckDB Type |
|---|---|
| `int` | `BIGINT` |
| `float` | `DOUBLE` |
| `string` | `VARCHAR` |
| `bool` | `BOOLEAN` |
## Example
```python
# @kind: merge
# @unique_key: product_id
resp = http.get("https://api.example.com/products")
for product in resp.json:
save.row({
"product_id": product["id"],
"name": product["name"],
"price": product["price"],
"in_stock": product["stock"] > 0,
"updated_at": product["updated_at"],
})
print("Saved", save.count(), "products")
```
Each `save.row()` call adds one row to the output table. At the end of the script, OndatraSQL materializes the collected rows.
## Integrated with the Pipeline
Rows collected via `save` are:
- Validated with constraints and audits
- Written using the model's `@kind` (append, merge, etc.)
- Tracked in lineage and metadata
- Versioned in DuckLake
You don't handle storage — only data.
## Compared to Traditional Ingestion
Traditional approach:
- Fetch data in Python
- Store in memory or files
- Load into a database separately
OndatraSQL:
- Fetch, transform, emit rows
- One step, one runtime
---
### OAuth Module
> Built-in OAuth for API authentication
URL: https://ondatra.sh/scripting/oauth/
Handle OAuth automatically in your pipeline.
No token storage. No refresh logic. No glue code.
The `oauth` module manages tokens for you — including automatic refresh. It is a predeclared global — no import needed.
## Why This Exists
Most API ingestion requires manual token handling:
- Store tokens
- Track expiration
- Refresh before expiry
- Retry failed requests
With OndatraSQL, this is built in. You request a token. The runtime keeps it valid.
## Mental Model
- `oauth.token()` creates a managed token
- `token.access_token` always returns a valid token
- Refresh happens automatically when needed
You don't manage authentication — you use it.
## Automatic Token Refresh
Tokens are refreshed automatically before they expire.
- No manual refresh logic
- No expiry tracking
- No retry handling required
Accessing `token.access_token` always gives you a valid token — even for long-running scripts.
## oauth.token
Returns a managed token that stays valid automatically.
### Client Credentials
```python
token = oauth.token(
token_url="https://auth.example.com/oauth/token",
client_id=env.get("CLIENT_ID"),
client_secret=env.get("CLIENT_SECRET"),
scope="read write",
)
resp = http.get(url, headers={"Authorization": "Bearer " + token.access_token})
```
| Argument | Required | Description |
|---|---|---|
| `token_url` | yes | OAuth token endpoint |
| `client_id` | yes | Client ID |
| `client_secret` | yes | Client secret |
| `scope` | no | Requested scope |
### Google Service Account
Use service accounts for Google APIs (BigQuery, Ad Manager, etc.).
```python
token = oauth.token(
google_service_account=env.get("GOOGLE_KEY_JSON"),
scope="https://www.googleapis.com/auth/bigquery.readonly",
)
# Or from file:
token = oauth.token(
google_key_file="service-account.json",
scope="https://www.googleapis.com/auth/admanager",
)
```
| Argument | Required | Description |
|---|---|---|
| `google_service_account` | no | Inline JSON key string |
| `google_key_file` | no | Path to JSON key file |
| `scope` | no | OAuth scope |
One of `google_service_account` or `google_key_file` is required for the Google flow.
## Works with the HTTP Module
Use the token directly in requests:
```python
resp = http.get(url, headers={
"Authorization": "Bearer " + token.access_token,
})
```
Authentication, retries, and data ingestion work together seamlessly.
## Full Example
```python
# @kind: append
token = oauth.token(
token_url="https://auth.example.com/oauth/token",
client_id=env.get("CLIENT_ID"),
client_secret=env.get("CLIENT_SECRET"),
scope="read:users",
)
page = 1
while True:
resp = http.get(
"https://api.example.com/users",
headers={"Authorization": "Bearer " + token.access_token},
params={"page": str(page)},
)
users = resp.json["data"]
if not users:
break
for user in users:
save.row({"id": user["id"], "name": user["name"]})
page += 1
```
## oauth.basic_auth
HTTP Basic Authentication header:
```python
header = oauth.basic_auth(env.get("USERNAME"), env.get("PASSWORD"))
# "Basic base64encoded..."
```
## Compared to Traditional OAuth
Traditional approach:
- Use OAuth libraries
- Store tokens in files or databases
- Implement refresh logic
- Handle expiration errors
OndatraSQL:
- Request a token once
- Use `token.access_token`
- Refresh handled automatically
---
### Runtime Modules
> A built-in runtime for writing data pipelines
URL: https://ondatra.sh/scripting/modules/
OndatraSQL includes a built-in runtime for writing data pipelines.
No imports. No dependencies. Everything you need is available by default.
All modules are predeclared globals, available in both model scripts and shared library modules loaded via `load()`.
## Mental Model
Scripts in OndatraSQL are not standalone programs.
They run inside a data pipeline runtime:
- `query()` reads from DuckDB
- `http` fetches external data
- `save.row()` writes output
- `incremental` provides state
You're not wiring systems together — you're writing the pipeline itself.
## Core Runtime
### query() — Read from Your Pipeline {#query}
Use `query()` to read from any table in your pipeline. Dependencies are detected automatically.
```python
rows = query("SELECT id, name FROM staging.customers WHERE active = true")
for row in rows:
print(row["id"], row["name"])
```
Returns a `list` of `dict`, where every value is a `string`. An empty result set returns an empty list.
```python
# CTE queries work too
rows = query("""
WITH recent AS (
SELECT * FROM raw.orders
WHERE created_at > '2026-01-01'
)
SELECT customer_id, COUNT(*) AS cnt
FROM recent
GROUP BY 1
""")
```
This enables:
- **Reverse ETL** — read from DuckDB, push to external API
- **Enrichment workflows** — join API data with existing tables
- **Feedback loops** — read, send, write back
Without leaving the pipeline.
**Reverse ETL:**
```python
rows = query("SELECT * FROM mart.customers WHERE synced = false")
for row in rows:
http.post("https://api.hubspot.com/contacts", json=row)
save.row({"id": row["id"], "synced_at": str(time.now())})
```
**Cross-source enrichment:**
```python
existing = query("SELECT id, email FROM staging.users")
for user in existing:
profile = http.get("https://api.example.com/profiles/" + user["id"])
save.row({"id": user["id"], "email": user["email"], "bio": profile.json.get("bio", "")})
```
Table references in `query()` string literals are automatically extracted for DAG ordering. If your script queries `staging.customers`, OndatraSQL ensures that model runs before yours.
{{< callout type="warning" >}}
Only string literals are analyzed. Dynamic SQL (`query("SELECT * FROM " + table)`) won't be detected — the dependency must be declared manually or restructured as a literal string.
{{< /callout >}}
`query()` rejects mutating statements — `INSERT`, `UPDATE`, `DELETE`, `DROP`, `CREATE`, etc. are blocked before execution.
### incremental — Pipeline State {#incremental}
Access the state of previous runs. Used for cursor-based ingestion from APIs.
| Field | Type | Description |
|---|---|---|
| `is_backfill` | bool | `True` if target table doesn't exist |
| `cursor` | string | Cursor column name |
| `last_value` | string | MAX(cursor) from target |
| `last_run` | string | Timestamp of last commit |
| `initial_value` | string | Starting cursor value |
The `incremental` module is read-only state — available directly from library functions without being passed as a parameter. See [Incremental Models](/concepts/incremental/) for details.
```python
# @kind: append
# @incremental: updated_at
url_str = "https://api.example.com/orders"
if not incremental.is_backfill:
url_str = url_str + "?since=" + incremental.last_value
resp = http.get(url_str)
for order in resp.json:
save.row(order)
```
## External Access
### env
Access environment variables (from `.env` or shell).
```python
value = env.get("API_KEY")
value = env.get("API_KEY", default="fallback")
env.set("TEMP_VAR", "some_value")
```
| Method | Description |
|---|---|
| `get(name, default?)` | Get variable value with optional fallback |
| `set(name, value)` | Set variable for current session |
### url
Build and parse URLs.
```python
full = url.build("https://api.com/data", params={"page": "1", "limit": "50"})
# "https://api.com/data?limit=50&page=1"
encoded = url.encode("hello world")
# "hello+world"
params = url.encode_params({"foo": "bar", "baz": "qux"})
# "baz=qux&foo=bar"
parsed = url.parse("https://api.com/data?page=1")
# parsed.scheme == "https", parsed.host == "api.com", parsed.path == "/data"
```
### crypto
Cryptographic functions for hashing, encoding, and signing.
```python
# Base64
encoded = crypto.base64_encode("hello")
decoded = crypto.base64_decode(encoded)
# Hashing
sha = crypto.sha256("data")
md = crypto.md5("data")
# HMAC signing
sig = crypto.hmac_sha256("secret_key", "message")
# UUID generation
id = crypto.uuid()
# Random string (alphanumeric, default length 32)
s = crypto.random_string()
s = crypto.random_string(length=16)
```
#### HMAC Request Signing {#hmac-signing}
Use `crypto.hmac_sha256` to sign API requests. This is the same mechanism used by AWS Signature v4, Stripe webhook verification, and similar schemes.
```python
# Sign a request payload
secret = env.get("API_SECRET")
timestamp = str(int(time.now().unix))
body = json.encode({"event": "payment", "amount": 100})
signature = crypto.hmac_sha256(secret, timestamp + "." + body)
resp = http.post("https://api.example.com/events",
json=json.decode(body),
headers={
"X-Signature": signature,
"X-Timestamp": timestamp,
},
)
```
## Data Handling {#data-formats}
### xml
Parse and encode XML data.
```python
data = xml.decode('Alice30')
# {"user": {"name": "Alice", "age": "30"}}
output = xml.encode({"user": {"name": "Alice", "age": "30"}})
```
XML attributes are prefixed with `@`:
```python
data = xml.decode('- Widget
')
# {"item": {"@id": "42", "name": "Widget"}}
```
| Method | Description |
|---|---|
| `decode(string)` | Parse XML string to dict |
| `encode(dict)` | Convert dict to XML string |
### csv
Parse and encode CSV data.
```python
# With header row (default) — returns list of dicts
rows = csv.decode("name,age\nAlice,30\nBob,25")
# [{"name": "Alice", "age": "30"}, {"name": "Bob", "age": "25"}]
# Without header — returns list of lists
rows = csv.decode("Alice,30\nBob,25", header=False)
# Tab-separated
rows = csv.decode(data, delimiter="\t")
# Encode list of dicts to CSV
output = csv.encode([{"name": "Alice", "age": "30"}])
# Encode list of lists with explicit header
output = csv.encode([["Alice", "30"]], header=["name", "age"])
```
| Method | Description |
|---|---|
| `decode(data, delimiter?, header?)` | Parse CSV string. `delimiter` defaults to `","`, `header` defaults to `True` |
| `encode(rows, header?)` | Encode rows to CSV string. `header` is optional list of column names |
### json
JSON encoding and decoding.
```python
s = json.encode({"name": "Alice", "age": 30})
data = json.decode('{"name": "Alice", "age": 30}')
```
## Standard Library {#standard-library}
### time
Date and time operations. See the [full API reference](https://pkg.go.dev/go.starlark.net/lib/time).
```python
now = time.now()
t = time.parse_time("2024-03-15T10:30:00Z")
t = time.time(year=2024, month=3, day=15)
t.year, t.month, t.day, t.hour, t.minute, t.second
t.unix # Unix timestamp (seconds)
t.format("2006-01-02") # Go layout
tomorrow = t + time.parse_duration("24h")
yesterday = t - time.parse_duration("24h")
```
### math
Mathematical functions. See the [full API reference](https://pkg.go.dev/go.starlark.net/lib/math).
```python
math.ceil(1.5) # 2
math.floor(1.5) # 1
math.round(1.5) # 2
math.sqrt(16) # 4.0
math.abs(-5) # 5
math.pow(2, 10) # 1024.0
math.pi # 3.141592653589793
math.e # 2.718281828459045
```
### re
Regular expressions (Python `re`-compatible). See the [full API reference](https://github.com/magnetde/starlark-re).
```python
m = re.search(r"(\d+)-(\d+)", "order-123-456")
m.group(1) # "123"
re.findall(r"\d+", "a1b2c3") # ["1", "2", "3"]
re.split(r"\s+", "hello world") # ["hello", "world"]
re.sub(r"\d+", "X", "a1b2c3") # "aXbXcX"
```
## Putting It Together
A typical ingestion model:
1. Authenticate (`oauth`)
2. Read state (`incremental`)
3. Fetch data (`http`)
4. Transform (Starlark)
5. Write output (`save`)
All inside one runtime.
## What You Don't Need
- No Python environment
- No requests library
- No OAuth libraries
- No state storage
- No SDKs
Everything is built in.
---
### Language Reference
> A simpler alternative to Python for pipelines
URL: https://ondatra.sh/scripting/language/
OndatraSQL scripts use [Starlark](https://github.com/google/starlark-go) — a Python-like language designed for data pipelines.
No packages. No virtual environments. No runtime setup.
If you know Python, you can write pipelines immediately.
## Why Starlark?
Starlark is a restricted, deterministic subset of Python.
- No hidden state
- No external dependencies
- No runtime environment
This makes pipelines reproducible and easy to reason about.
## Differences from Python
Starlark is intentionally simpler:
| Python | Starlark |
| --- | --- |
| `import module` | `load("lib/module.star", "func")` |
| `try / except` | Not available — errors stop the script |
| `class Foo:` | Not available — use dicts and functions |
| `with open(f):` | Not available — no file I/O |
| `for c in "hello":` | `for c in "hello".elems():` |
| `2 ** 10` | `math.pow(2, 10)` |
| `yield / async` | Not available |
| `global / nonlocal` | Not available |
| `x is y` | Use `==` |
This keeps pipelines predictable and portable.
## In OndatraSQL
Starlark is used for:
- Data ingestion (HTTP APIs, files)
- Custom logic between models
- Reverse ETL workflows
It runs inside the OndatraSQL runtime — with access to:
- `http` (API calls)
- `query()` (read from DuckDB)
- `save` (write output)
- `incremental` (pipeline state)
## Example: A Simple Pipeline Script
```python
# @kind: append
resp = http.get("https://api.example.com/users")
for user in resp.json:
save.row({
"id": user["id"],
"name": user["name"],
})
```
This fetches data from an API, transforms it, and writes it into your pipeline. All in one script.
## Basics
Everything works like Python.
```python
# Numbers
count = 42
rate = 0.95
# Strings
name = "Alice"
query = 'SELECT * FROM users'
# Booleans
active = True
deleted = False
# None
result = None
# Lists (mutable)
items = [1, 2, 3]
items.append(4)
# Tuples (immutable)
pair = (1, 2)
# Dicts (mutable, ordered)
user = {"name": "Alice", "age": 30}
# Sets (mutable)
tags = set(["a", "b", "c"])
```
## Control Flow
```python
# If / elif / else
if resp.ok:
process(resp.json)
elif resp.status_code == 429:
sleep(1)
else:
fail("Error: " + str(resp.status_code))
# For loops
for item in items:
save.row(item)
for i, item in enumerate(items):
print(i, item)
for i in range(10):
print(i)
# While loops
page = 1
while True:
resp = http.get(url + "?page=" + str(page))
if len(resp.json) == 0:
break
for item in resp.json:
save.row(item)
page += 1
```
## Functions and Lambdas
```python
def fetch_page(url, page=1, limit=50):
return http.get(
url + "?page=" + str(page) + "&limit=" + str(limit),
)
# Lambda expressions
double = lambda x: x * 2
sorted(users, key=lambda u: u["name"])
```
## load() — Shared Libraries
Import functions and values from other Starlark files using `load()`:
```python
load("lib/pagination.star", "paginate", "DEFAULT_PAGE_SIZE")
load("lib/auth.star", "get_token")
token = get_token()
for page in paginate(url, page_size=DEFAULT_PAGE_SIZE):
process(page)
```
### Syntax
```python
load("path/to/module.star", "name1", "name2") # import by name
load("path/to/module.star", alias="original_name") # import with alias
```
Paths are relative to the project root. Only files inside the project directory can be loaded — path traversal and symlinks pointing outside the project are rejected.
### Caching
Each module is executed once per model run. Multiple `load()` calls — including from different nested modules — share the cached result.
### Library Scope
Loaded modules have access to all built-in modules (`http`, `oauth`, `json`, etc.) except `save` and `incremental`. Functions that need `save` should accept it as a parameter:
```python
# lib/fetcher.star
def fetch_users(save, url):
resp = http.get(url)
for user in resp.json:
save.row(user)
```
This is the pattern used by [YAML models](/concepts/models/#yaml-models), where `save` is passed automatically.
## String Formatting
```python
# %-operator
msg = "Fetched %d rows from %s" % (count, source)
price = "Total: $%.2f" % amount
# .format() method
msg = "Page {} of {}".format(page, total)
msg = "Page {page} of {total}".format(page=1, total=10)
```
Format specifiers: `%s` (string), `%d` (integer), `%f` (float), `%x` (hex), `%o` (octal).
## Comprehensions
```python
# List comprehension
ids = [item["id"] for item in resp.json]
active = [u for u in users if u["status"] == "active"]
# Dict comprehension
lookup = {row.id: row.name for row in duckdb.sql("SELECT id, name FROM t")}
# Nested
flat = [cell for row in matrix for cell in row]
```
## Built-in Functions
| Function | Description |
| --- | --- |
| `abs(x)` | Absolute value |
| `all(x)` | `True` if all elements are truthy |
| `any(x)` | `True` if any element is truthy |
| `bool(x)` | Convert to boolean |
| `chr(i)` | Unicode code point to string |
| `dict(pairs)` | Create dict |
| `dir(x)` | List attributes/methods |
| `enumerate(x)` | Yields `(index, value)` pairs |
| `float(x)` | Convert to float |
| `getattr(x, name)` | Get attribute by name |
| `hasattr(x, name)` | Check if attribute exists |
| `hash(x)` | Hash a string |
| `int(x)` | Convert to int |
| `len(x)` | Number of elements |
| `list(x)` | Create list from iterable |
| `max(x)` | Largest value |
| `min(x)` | Smallest value |
| `ord(s)` | String to Unicode code point |
| `print(*args)` | Print to stderr |
| `range(n)` | Sequence of integers |
| `repr(x)` | Debug representation |
| `reversed(x)` | Reversed sequence |
| `set(x)` | Create set |
| `sorted(x, key?, reverse?)` | Sorted list |
| `str(x)` | Convert to string |
| `tuple(x)` | Create tuple |
| `type(x)` | Type name as string |
| `zip(a, b)` | Zip two sequences |
## String Methods
| Method | Description |
| --- | --- |
| `s.capitalize()` | First letter uppercase |
| `s.count(sub)` | Count occurrences |
| `s.endswith(suffix)` | Check suffix |
| `s.find(sub)` | Find index, -1 if not found |
| `s.format(*args, **kwargs)` | Format string |
| `s.index(sub)` | Find index, error if not found |
| `s.isalnum()` | Alphanumeric only? |
| `s.isalpha()` | Letters only? |
| `s.isdigit()` | Digits only? |
| `s.islower()` | Lowercase only? |
| `s.isspace()` | Whitespace only? |
| `s.istitle()` | Title case? |
| `s.isupper()` | Uppercase only? |
| `s.join(iterable)` | Join strings |
| `s.lower()` | To lowercase |
| `s.lstrip(chars?)` | Strip left |
| `s.upper()` | To uppercase |
| `s.strip(chars?)` | Strip both sides |
| `s.rstrip(chars?)` | Strip right |
| `s.replace(old, new)` | Replace substring |
| `s.split(sep?)` | Split string |
| `s.splitlines()` | Split on newlines |
| `s.startswith(prefix)` | Check prefix |
| `s.removeprefix(prefix)` | Remove prefix |
| `s.removesuffix(suffix)` | Remove suffix |
| `s.partition(sep)` | Split into 3 parts |
| `s.rpartition(sep)` | Split into 3 parts from right |
| `s.title()` | Title Case |
| `s.elems()` | Iterate bytes |
| `s.codepoints()` | Iterate unicode characters |
**Note:** Strings are not directly iterable. Use `s.elems()` or `s.codepoints()` to iterate over characters.
## List Methods
| Method | Description |
| --- | --- |
| `l.append(x)` | Add element |
| `l.clear()` | Remove all elements |
| `l.extend(iterable)` | Add all elements from iterable |
| `l.index(x)` | Find index |
| `l.insert(i, x)` | Insert at position |
| `l.pop(i?)` | Remove and return element |
| `l.remove(x)` | Remove first occurrence |
Supports slicing: `l[1:3]`, `l[::-1]`, `l[::2]`.
## Dict Methods
| Method | Description |
| --- | --- |
| `d.clear()` | Remove all entries |
| `d.get(key, default?)` | Get with fallback |
| `d.items()` | List of `(key, value)` tuples |
| `d.keys()` | List of keys |
| `d.pop(key, default?)` | Remove and return value |
| `d.popitem()` | Remove and return first entry |
| `d.setdefault(key, default?)` | Get or set default |
| `d.update(pairs)` | Merge entries |
| `d.values()` | List of values |
## Set Methods
| Method | Description |
| --- | --- |
| `s.add(x)` | Add element |
| `s.clear()` | Remove all elements |
| `s.discard(x)` | Remove if present |
| `s.remove(x)` | Remove, error if missing |
| `s.pop()` | Remove and return an element |
| `s.union(other)` | Union (`s \| other`) |
| `s.intersection(other)` | Intersection (`s & other`) |
| `s.difference(other)` | Difference |
| `s.symmetric_difference(other)` | Symmetric difference |
| `s.issubset(other)` | Subset check |
| `s.issuperset(other)` | Superset check |
You don't need more than this. Combined with the [built-in runtime modules](/scripting/modules/), this is enough to build complete data pipelines.
---
## Examples
> Real pipelines. No infrastructure.
URL: https://ondatra.sh/examples/
Real pipelines. No infrastructure.
These examples show how to build complete data workflows with OndatraSQL — from ingestion to analytics.
Each example is a working model you can run locally.
## Ingestion
Fetch data from APIs or collect events — no connectors, no brokers.
- [Ingest from a REST API](ingest-rest-api) — No Python. No connectors. No scheduler.
- [YAML Model with Shared Library](yaml-model) — Reusable source functions for any API.
- [Track Product Analytics](track-product-analytics) — No Kafka. No external event tools.
## Transformation
Use SQL models to clean, join, and reshape your data.
- [Build Daily Metrics](build-daily-metrics) — Three-layer pipeline: raw → staging → mart.
- [Incremental SQL Model](incremental-model) — Append new data automatically with cursor tracking.
## Change Tracking
Handle updates, history, and incremental processing automatically.
- [SCD2 Dimension](scd2-dimension) — Track full history of dimension changes.
## Validation
Ensure your data is correct before and after it's written.
- [Validated Model](validated-model) — Constraints, audits, and warnings in one model.
## Try It Yourself
Pick any example, copy it into a new project, and run:
```bash
ondatrasql run
```
That's it.
---
### Ingest from a REST API
> Fetch data from a real API — no Python, no pip, no setup
URL: https://ondatra.sh/examples/ingest-rest-api/
Fetch European country data from the [REST Countries API](https://restcountries.com). No Python, no pip.
## 1. Write the Script
```python
# models/raw/countries.star
# @kind: table
resp = http.get(
"https://restcountries.com/v3.1/region/europe?fields=name,capital,population,area,region,subregion",
retry=3,
)
if not resp.ok:
fail("API returned " + str(resp.status_code))
for country in resp.json:
capital = ""
if country.get("capital") and len(country["capital"]) > 0:
capital = country["capital"][0]
save.row({
"name": country["name"]["common"],
"capital": capital,
"population": country["population"],
"area_km2": country.get("area", 0),
"subregion": country.get("subregion", ""),
})
print("Fetched " + str(save.count()) + " countries")
```
No API key needed. No environment setup. Just run it.
## 2. Run
```bash
ondatrasql run raw.countries
```
```
Fetched 53 countries
[OK] raw.countries (table, backfill, 53 rows, 799ms)
```
## 3. Transform Downstream
```sql
-- models/staging/stg_countries.sql
-- @kind: table
-- @constraint: name NOT NULL
-- @constraint: population >= 0
-- @audit: row_count > 0
SELECT
name,
capital,
population,
area_km2,
subregion,
CASE
WHEN population > 50000000 THEN 'large'
WHEN population > 10000000 THEN 'medium'
ELSE 'small'
END AS size_category
FROM raw.countries
WHERE population > 0
```
```sql
-- models/mart/population_by_subregion.sql
-- @kind: table
-- @audit: row_count > 0
SELECT
subregion,
COUNT(*) AS countries,
SUM(population) AS total_population,
ROUND(AVG(population)) AS avg_population,
ROUND(SUM(area_km2)) AS total_area_km2
FROM staging.stg_countries
GROUP BY subregion
ORDER BY total_population DESC
```
## 4. Run the Full Pipeline
```bash
ondatrasql run
```
```
Running 3 models...
Fetched 53 countries
[OK] raw.countries (table, full, 53 rows, 820ms)
[OK] staging.stg_countries (table, backfill, 53 rows, 440ms)
[OK] mart.population_by_subregion (table, backfill, 6 rows, 243ms)
```
OndatraSQL detects that `stg_countries` depends on `countries` and runs them in the right order.
## What's Built In
- `http.get`, `http.post` with retry, timeout, headers
- `oauth.token` for OAuth 2.0 flows
- `env.get` for secrets from `.env`
- `incremental` for cursor-based loading
- `save.row` and `save.rows` for output
No external packages. No virtual environments. Everything in one binary.
---
### YAML Model with Shared Library
> Reusable source function with YAML configuration — one connector, many models
URL: https://ondatra.sh/examples/yaml-model/
## Overview
This example shows the three-layer pattern: a Starlark source function for connector logic, YAML models for configuration, and SQL for downstream transformations.
Uses the [Open-Meteo API](https://open-meteo.com) — no API key required.
## Source Function
Create a reusable weather connector in `lib/`:
```python
# lib/open_meteo.star
def open_meteo(save, latitude="59.33", longitude="18.07", city="Stockholm", past_days="7"):
"""Fetch weather data from Open-Meteo API."""
resp = http.get(
"https://api.open-meteo.com/v1/forecast",
params={
"latitude": latitude,
"longitude": longitude,
"daily": "temperature_2m_max,temperature_2m_min,precipitation_sum",
"timezone": "auto",
"past_days": past_days,
},
retry=3,
)
if not resp.ok:
fail("Open-Meteo API returned " + str(resp.status_code))
days = resp.json["daily"]["time"]
temp_max = resp.json["daily"]["temperature_2m_max"]
temp_min = resp.json["daily"]["temperature_2m_min"]
precip = resp.json["daily"]["precipitation_sum"]
for i in range(len(days)):
save.row({
"date": days[i],
"temp_max_c": temp_max[i],
"temp_min_c": temp_min[i],
"precip_mm": precip[i],
"city": city,
})
print("Fetched " + str(save.count()) + " days for " + city)
```
The function receives `save` as its first argument for collecting rows. All other built-in modules (`http`, `json`, etc.) are available as globals.
## YAML Models
Reference the source function from YAML models — one connector, many cities:
```yaml
# models/raw/weather_stockholm.yaml
kind: table
source: open_meteo
config:
latitude: "59.33"
longitude: "18.07"
city: Stockholm
```
```yaml
# models/raw/weather_london.yaml
kind: table
source: open_meteo
config:
latitude: "51.51"
longitude: "-0.13"
city: London
```
```yaml
# models/raw/weather_berlin.yaml
kind: table
source: open_meteo
config:
latitude: "52.52"
longitude: "13.41"
city: Berlin
```
No code to write — the YAML model specifies what to fetch, the source function handles how.
## Running
```bash
# Run one city
ondatrasql run raw.weather_london
```
```
Fetched 14 days for London
[OK] raw.weather_london (table, backfill, 14 rows, 463ms)
```
```bash
# Run all models
ondatrasql run
```
One connector, many models — each with its own configuration.
## Using load() Directly
If you need more control than YAML provides, use `load()` in a Starlark script:
```python
# models/raw/weather_custom.star
# @kind: table
load("lib/open_meteo.star", "open_meteo")
open_meteo(save, latitude="48.86", longitude="2.35", city="Paris", past_days="14")
```
---
### Track Product Analytics
> Collect events from your app, transform them into metrics, and query the results
URL: https://ondatra.sh/examples/track-product-analytics/
Collect pageview events from your app via HTTP. Transform them into daily metrics. Query the results. No Kafka, no external tools.
## 1. Define the Event Schema
```sql
-- models/raw/events.sql
-- @kind: events
event_name VARCHAR NOT NULL,
page_url VARCHAR,
user_id VARCHAR,
session_id VARCHAR,
received_at TIMESTAMPTZ
```
## 2. Configure Ports
Add to `.env`:
```
COLLECT_PORT=8080
COLLECT_ADMIN_PORT=8081
```
The public port receives events. The admin port is used by `ondatrasql run` to flush events into the pipeline.
## 3. Start the Daemon
```bash
ondatrasql daemon
```
```
Event daemon starting...
Public: :8080 (POST /collect/{schema}/{table})
Admin: 127.0.0.1:8081 (flush endpoints)
Store: .ondatra/events
Models: raw.events
```
## 4. Send Events
From your app, tracking script, or curl:
```bash
curl -X POST localhost:8080/collect/raw/events \
-H "Content-Type: application/json" \
-d '{"event_name":"pageview","page_url":"/pricing","user_id":"u42","session_id":"s1"}'
```
Events are buffered durably on disk. They survive crashes.
## 5. Transform with SQL
Sessions:
```sql
-- models/staging/sessions.sql
-- @kind: view
SELECT
session_id,
user_id,
MIN(received_at) AS started_at,
MAX(received_at) AS ended_at,
COUNT(*) AS page_views
FROM raw.events
WHERE event_name = 'pageview'
GROUP BY session_id, user_id
```
Daily metrics:
```sql
-- models/mart/daily_traffic.sql
-- @kind: merge
-- @unique_key: day
SELECT
DATE_TRUNC('day', started_at) AS day,
COUNT(DISTINCT session_id) AS sessions,
COUNT(DISTINCT user_id) AS visitors,
SUM(page_views) AS total_views
FROM staging.sessions
GROUP BY 1
```
## 6. Run the Pipeline
In a separate terminal (same project directory):
```bash
ondatrasql run
```
```
Running 3 models...
[OK] raw.events (events, flush, 3 rows, 380ms — event flush)
[OK] staging.sessions (view, create, 0 rows, 158ms — first run)
[OK] mart.daily_traffic (merge, backfill, 1 rows, 182ms — first run)
```
Events are flushed from the buffer into `raw.events`. The view resolves live. The mart is materialized.
## 7. Query
```bash
ondatrasql sql "SELECT * FROM mart.daily_traffic ORDER BY day DESC LIMIT 7"
```
## What You Get
- Events buffered durably — no data loss on crash
- Automatic flush on pipeline run
- One command runs everything — flush, transform, materialize
- No Kafka, no Airflow, no external database
---
### Build Daily Metrics
> Create a multi-layer SQL pipeline with incremental updates and validation
URL: https://ondatra.sh/examples/build-daily-metrics/
Build a three-layer pipeline: raw → staging → mart. Incremental updates. Validation. Run with one command.
## 1. Raw Layer
Source data. In production this comes from an API or events. For this guide, inline data:
```sql
-- models/raw/orders.sql
-- @kind: table
SELECT * FROM (VALUES
(1, 'Alice', 100, 'shipped', '2026-03-28'),
(2, 'Bob', 250, 'shipped', '2026-03-28'),
(3, 'Charlie', 75, 'pending', '2026-03-29'),
(4, 'Alice', 200, 'shipped', '2026-03-29'),
(5, 'Diana', 150, 'cancelled','2026-03-30')
) AS t(order_id, customer, amount, status, order_date)
```
## 2. Staging Layer
Clean and validate:
```sql
-- models/staging/stg_orders.sql
-- @kind: table
-- @constraint: order_id NOT NULL
-- @constraint: amount > 0
SELECT
order_id,
LOWER(TRIM(customer)) AS customer,
CAST(amount AS DOUBLE) AS amount,
status,
CAST(order_date AS DATE) AS order_date
FROM raw.orders
```
Constraints run before materialization. Bad data is blocked.
## 3. Mart Layer
Aggregate into daily metrics:
```sql
-- models/mart/daily_revenue.sql
-- @kind: merge
-- @unique_key: order_date
-- @audit: row_count > 0
SELECT
order_date,
COUNT(*) AS orders,
SUM(amount) AS revenue,
SUM(CASE WHEN status = 'shipped' THEN amount ELSE 0 END) AS shipped_revenue,
ROUND(SUM(CASE WHEN status = 'shipped' THEN amount ELSE 0 END) / SUM(amount) * 100, 1) AS ship_rate
FROM staging.stg_orders
GROUP BY order_date
```
The audit checks that at least one row exists after materialization. If it fails, the table is rolled back.
## 4. Run
```bash
ondatrasql run
```
```
Running 3 models...
[OK] raw.orders (table, backfill, 5 rows, 156ms — first run)
[OK] staging.stg_orders (table, backfill, 5 rows, 233ms — first run)
[OK] mart.daily_revenue (merge, backfill, 3 rows, 296ms — first run)
```
## 5. Query
```bash
ondatrasql sql "SELECT * FROM mart.daily_revenue ORDER BY order_date"
```
## 6. Run Again
Change nothing. Run again:
```bash
ondatrasql run
```
```
Running 3 models...
[OK] raw.orders (table, skip, 0 rows, 0s — no dependencies)
[OK] staging.stg_orders (table, skip, 0 rows, 0s — deps unchanged)
[OK] mart.daily_revenue (merge, incremental, 0 rows, 296ms — unchanged)
```
Nothing changed, nothing ran. No wasted I/O.
## 7. Preview Changes
Edit `raw/orders.sql` to add a row. Then:
```bash
ondatrasql sandbox
```
See the row count diff, downstream propagation, and validation results — before committing anything.
## The DAG
OndatraSQL detected the dependency chain automatically:
```
raw.orders → staging.stg_orders → mart.daily_revenue
```
You never declared it. The SQL AST did.
---
### Incremental SQL Model
> Build an incremental append model with cursor tracking
URL: https://ondatra.sh/examples/incremental-model/
This example shows a SQL model that incrementally appends new events using cursor tracking.
## 1. Source Data
```sql
-- models/raw/events.sql
-- @kind: table
SELECT * FROM (VALUES
(1, 'u1', 'click', '/pricing', '{}', '2026-03-28 10:00:00'),
(2, 'u2', 'view', '/features', '{}', '2026-03-28 11:00:00'),
(3, 'u1', 'purchase', '/checkout', '{}', '2026-03-29 09:00:00'),
(4, 'u3', 'signup', '/register', '{}', '2026-03-29 14:00:00'),
(5, 'u2', 'view', '/docs', '{}', '2026-03-30 08:00:00')
) AS t(event_id, user_id, event_type, page_url, metadata, event_time)
```
## 2. Incremental Model
```sql
-- models/staging/stg_events.sql
-- @kind: append
-- @incremental: event_time
-- @constraint: event_id NOT NULL
-- @constraint: event_type IN ('click', 'view', 'purchase', 'signup')
-- @audit: row_count > 0
SELECT
event_id,
user_id,
event_type,
page_url,
metadata,
event_time
FROM raw.events
ORDER BY event_time
```
## 3. Run
```bash
ondatrasql run
```
```
Running 2 models...
[OK] raw.events (table, backfill, 5 rows, 187ms — first run)
[OK] staging.stg_events (append, backfill, 5 rows, 352ms — first run)
```
## 4. Run Again
```bash
ondatrasql run
```
```
Running 2 models...
[OK] raw.events (table, skip, 0 rows, 0s — no dependencies)
[OK] staging.stg_events (append, incremental, 0 rows, 259ms — unchanged)
```
Nothing changed, nothing ran.
## How It Works
1. **First run (backfill):** All data is loaded into the target table
2. **After first run:** OndatraSQL records `MAX(event_time)` from the target table
3. **Subsequent runs:** Smart CDC automatically detects changes in `raw.events` using DuckLake time-travel and only appends new rows
4. **No manual filtering:** You write a plain SELECT — OndatraSQL handles incremental logic via CDC
---
### SCD2 Dimension
> Track dimension history with Slowly Changing Dimension Type 2
URL: https://ondatra.sh/examples/scd2-dimension/
This example shows how to track the complete history of a dimension table using SCD2.
## 1. Source Data
```sql
-- models/raw/customers.sql
-- @kind: table
SELECT * FROM (VALUES
(1, 'Alice', 'alice@example.com', 'gold', 'Sweden', '2026-03-28 10:00:00'),
(2, 'Bob', 'bob@example.com', 'silver', 'Norway', '2026-03-28 10:00:00'),
(3, 'Carol', 'carol@example.com', 'bronze', 'Denmark', '2026-03-29 12:00:00')
) AS t(customer_id, name, email, tier, country, updated_at)
```
## 2. SCD2 Model
```sql
-- models/staging/stg_customers.sql
-- @kind: scd2
-- @unique_key: customer_id
-- @constraint: customer_id NOT NULL
-- @audit: row_count > 0
SELECT
customer_id,
name,
email,
tier,
country,
updated_at
FROM raw.customers
```
## 3. First Run
```bash
ondatrasql run
```
```
Running 2 models...
[OK] raw.customers (table, backfill, 3 rows, 174ms — first run)
[OK] staging.stg_customers (scd2, backfill, 3 rows, 269ms — first run)
```
All rows are inserted with `is_current = true`:
| customer_id | name | tier | is_current |
|---|---|---|---|
| 1 | Alice | gold | true |
| 2 | Bob | silver | true |
| 3 | Carol | bronze | true |
## 4. When Data Changes
Change Alice's tier from "gold" to "platinum" in `raw/customers.sql`, then run again:
```bash
ondatrasql run
```
```
Running 2 models...
[OK] raw.customers (table, backfill, 3 rows, 158ms — sql changed)
[OK] staging.stg_customers (scd2, incremental, 1 rows, 360ms — unchanged)
```
The old version is closed and a new version is inserted:
| customer_id | name | tier | is_current |
|---|---|---|---|
| 1 | Alice | gold | **false** |
| 1 | Alice | platinum | **true** |
| 2 | Bob | silver | true |
| 3 | Carol | bronze | true |
## Auto-Generated Columns
OndatraSQL automatically adds three columns to the output:
| Column | Type | Description |
|---|---|---|
| `valid_from_snapshot` | BIGINT | DuckLake snapshot ID when this version became active |
| `valid_to_snapshot` | BIGINT | Snapshot ID when superseded (NULL if current) |
| `is_current` | BOOLEAN | Whether this is the active version |
## Querying SCD2 Data
```sql
-- Current state
SELECT * FROM staging.stg_customers WHERE is_current = true;
-- Full history for a customer
SELECT * FROM staging.stg_customers
WHERE customer_id = 1
ORDER BY valid_from_snapshot;
```
---
### Validated Model
> Combine constraints and audits for comprehensive data quality
URL: https://ondatra.sh/examples/validated-model/
This example demonstrates a model with multiple layers of validation: constraints, audits, and warnings.
## 1. Source Data
```sql
-- models/raw/orders.sql
-- @kind: table
SELECT * FROM (VALUES
(1, 101, 'alice@example.com', 150.00, 'shipped', '2026-03-28 10:00:00'),
(2, 102, 'bob@example.com', 250.00, 'pending', '2026-03-28 11:00:00'),
(3, 101, 'alice@example.com', 75.50, 'delivered', '2026-03-29 09:00:00'),
(4, 103, 'carol@example.com', 500.00, 'processing', '2026-03-29 14:00:00'),
(5, 102, 'bob@example.com', 120.00, 'shipped', '2026-03-30 08:00:00')
) AS t(order_id, customer_id, email, total, status, updated_at)
```
```sql
-- models/staging/stg_customers.sql
-- @kind: table
SELECT * FROM (VALUES
(101, 'Alice'),
(102, 'Bob'),
(103, 'Carol')
) AS t(customer_id, name)
```
## 2. Validated Model
```sql
-- models/staging/stg_orders.sql
-- @kind: merge
-- @unique_key: order_id
-- @incremental: updated_at
-- Constraints (checked BEFORE insert — fail prevents materialization)
-- @constraint: order_id PRIMARY KEY
-- @constraint: customer_id NOT NULL
-- @constraint: total >= 0
-- @constraint: status IN ('pending', 'processing', 'shipped', 'delivered', 'cancelled')
-- @constraint: email EMAIL
-- @constraint: customer_id REFERENCES staging.stg_customers(customer_id)
-- Audits (checked AFTER insert — fail triggers rollback)
-- @audit: row_count > 0
-- @audit: mean(total) BETWEEN 10 AND 5000
-- @audit: min(total) >= 0
-- Warnings (logged only — don't fail the model)
-- @warning: total NULL_PERCENT < 5
-- @warning: email NOT EMPTY
SELECT
order_id,
customer_id,
email,
total,
status,
updated_at
FROM raw.orders
```
## 3. Run
```bash
ondatrasql run
```
```
Running 3 models...
[OK] raw.orders (table, backfill, 5 rows, 215ms — first run)
[OK] staging.stg_customers (table, backfill, 3 rows, 172ms — first run)
[OK] staging.stg_orders (merge, backfill, 5 rows, 500ms — first run)
```
## Validation Flow
1. **SQL executes** — the SELECT runs and results are stored in a temp table
2. **Constraints check** — each constraint is validated against the temp data:
- `order_id` must be non-null and unique
- `total` must be >= 0
- `status` must be one of the allowed values
- `customer_id` must exist in `stg_customers`
- If any constraint fails, **nothing is written**
3. **Materialization** — data is merged into the target table
4. **Audits check** — post-insert validations run:
- At least 1 row must exist
- Average total must be between 10 and 5000
- Minimum total must be >= 0
- If any audit fails, the transaction **rolls back**
5. **Warnings log** — soft checks run and violations are logged (but don't fail)
---
## Lineage & Metadata
> Built-in lineage and metadata — no external systems required
URL: https://ondatra.sh/lineage/
OndatraSQL automatically tracks how your data is built. No external lineage tool. No metadata store. No integration required.
Every run records:
- Column-level lineage
- Execution details
- Schema changes
- Dependencies
- Git commit
All stored in DuckLake. All queryable with SQL.
## Mental Model
Every pipeline run creates a snapshot. Each snapshot includes the data, how it was produced, and where it came from.
## Column Lineage
Every column is traced back to its source — across CTEs, joins, subqueries. No annotations. No manual mapping.
```bash
ondatrasql lineage mart.revenue.total_amount
```
Shows source columns, transformations applied, upstream tables.
### Transformation Types
Each column is classified by how it was created:
| Type | Meaning | Example |
|---|---|---|
| `IDENTITY` | Direct copy | `SELECT name` |
| `AGGREGATION` | Aggregated value | `SUM(amount)` |
| `ARITHMETIC` | Computed | `price * quantity` |
| `CONDITIONAL` | Logic applied | `CASE WHEN ...` |
| `CAST` | Type conversion | `CAST(id AS VARCHAR)` |
| `FUNCTION` | Function call | `UPPER(name)` |
## CLI
```bash
ondatrasql lineage overview # All models with dependencies
ondatrasql lineage staging.orders # Column lineage for one model
ondatrasql lineage staging.orders.total # Trace one column
```
Use these to explore dependencies, debug transformations, and understand impact of changes.
## Commit Metadata
Every run stores execution metadata inside DuckLake. No separate system required. This acts as a built-in metadata store.
### Execution
| Field | Description |
|---|---|
| `run_type` | `backfill`, `incremental`, `full`, `skip`, `flush` |
| `rows_affected` | Rows written |
| `duration_ms` | Execution time |
| `steps` | Sub-step breakdown with timing |
### Lineage
| Field | Description |
|---|---|
| `column_lineage` | Source columns with transformation types |
| `depends` | Upstream table dependencies |
### Schema
| Field | Description |
|---|---|
| `columns` | Output column definitions |
| `schema_hash` | Detects schema evolution |
### Versioning
| Field | Description |
|---|---|
| `sql_hash` | Triggers backfill on change |
| `dag_run_id` | Run identifier |
| `source_file` | Model source path |
### Git
| Field | Description |
|---|---|
| `git_commit` | Commit SHA |
| `git_branch` | Branch name |
| `git_repo_url` | Repository URL |
Trace data back to the code change that produced it.
## Query Metadata
Everything is just SQL:
```sql
SELECT
commit_extra_info->>'model' AS model,
commit_extra_info->>'run_type' AS run_type,
commit_extra_info->>'rows_affected' AS rows,
commit_extra_info->>'duration_ms' AS ms
FROM lake.snapshots()
ORDER BY snapshot_id DESC
LIMIT 10;
```
Build dashboards, alerts, or debugging tools directly in SQL.
## Why This Matters
- Debug pipelines faster
- Understand data origins
- Track changes over time
- Build observability without extra tools
---
## Blueprint: Google Ad Manager
> Fetch historical GAM reports incrementally with one YAML model and one shared source function.
URL: https://ondatra.sh/blueprints/google-ad-manager/
Ingest Google Ad Manager reports without building a connector.
No Python connector. No Airflow DAG. No manual cursor tracking.
## What This Blueprint Gives You
- Incremental loading by date
- OAuth with Google service account
- Report creation + polling handled automatically
- Dynamic schema from dimensions and metrics
- Reusable source function for multiple GAM reports
## Quick Start
### 1. Set up credentials
Add to `.env`:
```bash
GAM_NETWORK_CODE=12345678
GAM_KEY_FILE=service-account.json
```
### 2. Add the source function
Copy [`lib/gam_report.star`](#source-function) into your project.
### 3. Create a model
```yaml
# models/raw/gam_report.yaml
kind: append
incremental: report_date
incremental_initial: "2026-02-28"
source: gam_report
config:
network_code: ${GAM_NETWORK_CODE}
key_file: ${GAM_KEY_FILE}
dimensions:
- AD_UNIT_NAME
- DATE
- ORDER_NAME
- LINE_ITEM_NAME
- CREATIVE_NAME
metrics:
- AD_SERVER_IMPRESSIONS
- AD_SERVER_CLICKS
- AD_SERVER_CTR
- AD_SERVER_REVENUE
```
### 4. Run
```bash
ondatrasql run raw.gam_report
```
That's the whole model. Most users only change YAML — not code.
This creates a versioned append table in DuckLake, keyed by `report_date` for incremental loading.
## How It Works
1. Authenticates with a Google service account
2. Computes the next date range from incremental state
3. Creates and runs a GAM report
4. Polls until the report is ready
5. Fetches rows page by page
6. Maps dimensions and metrics into typed output columns
7. Emits rows into the pipeline
## Customization
Most users never need to touch the source code. Change dimensions, metrics, filters, and date behavior in YAML only.
**Simpler report** — fewer columns:
```yaml
# models/raw/gam_simple.yaml
kind: append
incremental: report_date
incremental_initial: "2026-01-01"
source: gam_report
config:
network_code: ${GAM_NETWORK_CODE}
key_file: ${GAM_KEY_FILE}
dimensions:
- DATE
- AD_UNIT_NAME
metrics:
- AD_SERVER_IMPRESSIONS
```
**With filters** — only specific orders:
```yaml
# models/raw/gam_campaign.yaml
kind: append
incremental: report_date
incremental_initial: "2026-01-01"
source: gam_report
config:
network_code: ${GAM_NETWORK_CODE}
key_file: ${GAM_KEY_FILE}
dimensions:
- DATE
- LINE_ITEM_NAME
metrics:
- AD_SERVER_IMPRESSIONS
- AD_SERVER_REVENUE
filters:
- dimension: ORDER_NAME
operator: CONTAINS
values: ["Q1_2026"]
currency_code: "EUR"
```
**Multiple networks** — duplicate the YAML with a different `network_code`:
```yaml
# models/raw/gam_report_us.yaml
kind: append
incremental: report_date
incremental_initial: "2026-02-28"
source: gam_report
config:
network_code: ${GAM_NETWORK_CODE_US}
key_file: ${GAM_KEY_FILE}
dimensions:
- AD_UNIT_NAME
- DATE
metrics:
- AD_SERVER_IMPRESSIONS
- AD_SERVER_CLICKS
```
The source function in `lib/gam_report.star` is shared — no code duplication.
## What This Replaces
Traditionally, this workflow requires:
- A custom Python connector
- OAuth/token handling
- Polling logic
- Pagination code
- Incremental state storage
- A scheduler
With OndatraSQL, it's one source function plus one YAML model.
## Output Schema
Column names and types are derived from your config:
| Config | Column | Type |
|---|---|---|
| `DATE` | `report_date` | VARCHAR (YYYY-MM-DD) |
| `AD_UNIT_NAME` | `ad_unit_name` | VARCHAR |
| `ORDER_NAME` | `order_name` | VARCHAR |
| `LINE_ITEM_NAME` | `line_item_name` | VARCHAR |
| `CREATIVE_NAME` | `creative_name` | VARCHAR |
| `AD_SERVER_IMPRESSIONS` | `impressions` | BIGINT |
| `AD_SERVER_CLICKS` | `clicks` | BIGINT |
| `AD_SERVER_CTR` | `ctr` | DOUBLE |
| `AD_SERVER_REVENUE` | `revenue` | DOUBLE |
Dimensions with `intValue` (non-DATE) become BIGINT. Metrics with `intValue` become BIGINT, `doubleValue` become DOUBLE.
## Before You Start
You need:
- A GCP project with the [Ad Manager API](https://console.cloud.google.com/) enabled
- A service account JSON key (create under **Credentials > Service Account**)
- API access enabled in your GAM network (**Admin > Global settings**, [docs](https://support.google.com/admanager/answer/3088588))
- The service account email added as an API user in GAM
- Your GAM network code (found under **Admin > Global settings**)
## Full Config Reference
All [GAM reportDefinition fields](https://developers.google.com/ad-manager/api/beta/reference/rest/v1/networks.reports#ReportDefinition) are supported as optional config keys:
```yaml
config:
network_code: ${GAM_NETWORK_CODE}
key_file: ${GAM_KEY_FILE}
dimensions: [...]
metrics: [...]
report_type: HISTORICAL
# Filtering and sorting
filters:
- dimension: ORDER_NAME
operator: CONTAINS
values: ["Campaign_2026"]
sorts:
- field: AD_SERVER_IMPRESSIONS
descending: true
# Time zone and currency
time_zone_source: PROVIDED
time_zone: "Europe/Stockholm"
currency_code: "SEK"
# Comparison and time periods
time_period_column: QUARTERS
comparison_date_range:
fixed:
start_date: "2025-01-01"
end_date: "2025-12-31"
# Custom dimensions
cms_metadata_dimension_key_ids: ["12345"]
custom_dimension_key_ids: ["67890"]
```
Most users only need `dimensions`, `metrics`, and optionally `filters`.
## Source Function
`lib/gam_report.star`
The source function handles all API interaction. It maps dimensions and metrics dynamically — column names are derived from the config.
```python
def gam_report(save, network_code="", key_file="service-account.json",
dimensions=None, metrics=None, report_type="HISTORICAL",
filters=None, sorts=None,
time_zone_source=None, time_zone=None, currency_code=None,
time_period_column=None, comparison_date_range=None,
cms_metadata_dimension_key_ids=None, custom_dimension_key_ids=None,
ekv_dimension_key_ids=None, line_item_custom_field_ids=None,
order_custom_field_ids=None, creative_custom_field_ids=None,
flags=None, expanded_compatibility=None):
"""Fetch GAM historical report. Dimensions and metrics are mapped dynamically."""
scope = "https://www.googleapis.com/auth/admanager"
base_url = "https://admanager.googleapis.com/v1/networks/" + network_code
one_day = time.parse_duration("24h")
yesterday = (time.now() - one_day).format("2006-01-02")
start_date = ""
if incremental.is_backfill:
start_date = incremental.initial_value
else:
last = time.parse_time(incremental.last_value + "T00:00:00Z")
start_date = (last + one_day).format("2006-01-02")
end_date = yesterday
print("Range: " + start_date + " to " + end_date +
" (backfill: " + str(incremental.is_backfill) + ")")
if start_date > end_date:
print("Already up to date")
abort()
token = oauth.token(google_key_file=key_file, scope=scope)
headers = {
"Authorization": "Bearer " + token.access_token,
"Content-Type": "application/json",
}
# --- Build report definition ---
start_t = time.parse_time(start_date + "T00:00:00Z")
end_t = time.parse_time(end_date + "T00:00:00Z")
report_def = {
"dimensions": dimensions,
"metrics": metrics,
"dateRange": {
"fixed": {
"startDate": {"year": start_t.year, "month": start_t.month,
"day": start_t.day},
"endDate": {"year": end_t.year, "month": end_t.month,
"day": end_t.day},
},
},
"reportType": report_type,
}
# Optional fields — only include if provided
if filters:
report_def["filters"] = filters
if sorts:
report_def["sorts"] = sorts
if time_zone_source:
report_def["timeZoneSource"] = time_zone_source
if time_zone:
report_def["timeZone"] = time_zone
if currency_code:
report_def["currencyCode"] = currency_code
if time_period_column:
report_def["timePeriodColumn"] = time_period_column
if comparison_date_range:
report_def["comparisonDateRange"] = comparison_date_range
if cms_metadata_dimension_key_ids:
report_def["cmsMetadataDimensionKeyIds"] = cms_metadata_dimension_key_ids
if custom_dimension_key_ids:
report_def["customDimensionKeyIds"] = custom_dimension_key_ids
if ekv_dimension_key_ids:
report_def["ekvDimensionKeyIds"] = ekv_dimension_key_ids
if line_item_custom_field_ids:
report_def["lineItemCustomFieldIds"] = line_item_custom_field_ids
if order_custom_field_ids:
report_def["orderCustomFieldIds"] = order_custom_field_ids
if creative_custom_field_ids:
report_def["creativeCustomFieldIds"] = creative_custom_field_ids
if flags:
report_def["flags"] = flags
if expanded_compatibility != None:
report_def["expandedCompatibility"] = expanded_compatibility
create_body = json.encode({
"reportDefinition": report_def,
"visibility": "HIDDEN",
})
# --- Create report ---
create_resp = http.post(base_url + "/reports", json=create_body,
headers=headers, timeout=60)
if not create_resp.ok:
fail("Create report failed: " + str(create_resp.status_code) +
" " + create_resp.text)
report_name = create_resp.json["name"]
# --- Run report ---
run_resp = http.post(
"https://admanager.googleapis.com/v1/" + report_name + ":run",
json={}, headers=headers, timeout=60,
)
if not run_resp.ok:
fail("Run report failed: " + str(run_resp.status_code) +
" " + run_resp.text)
operation_name = run_resp.json["name"]
# --- Poll until done ---
poll_url = "https://admanager.googleapis.com/v1/" + operation_name
last_poll = None
wait = 2.0
for i in range(60):
sleep(wait)
last_poll = http.get(poll_url, headers=headers)
if not last_poll.ok:
fail("Poll failed: " + str(last_poll.status_code) +
" " + last_poll.text)
if "done" in last_poll.json and last_poll.json["done"]:
break
if i == 59:
fail("Report timed out")
if wait < 10.0:
wait = wait * 2
# --- Column name mapping ---
dim_cols = []
for d in dimensions:
col = d.lower()
if col == "date":
col = "report_date"
dim_cols.append(col)
metric_cols = []
for m in metrics:
col = m.lower()
for prefix in ["ad_server_", "ad_exchange_", "total_"]:
if col.startswith(prefix):
col = col[len(prefix):]
break
metric_cols.append(col)
# --- Fetch result rows ---
result_name = last_poll.json["response"]["reportResult"]
fetch_base = ("https://admanager.googleapis.com/v1/" +
result_name + ":fetchRows")
page_token = ""
while True:
params = {"pageSize": "10000"}
if page_token != "":
params["pageToken"] = page_token
res = http.get(fetch_base, headers=headers, params=params, timeout=120)
if not res.ok:
fail("Fetch rows failed: " + str(res.status_code) +
" " + res.text)
rows = res.json.get("rows")
if rows == None or len(rows) == 0:
break
for row in rows:
dims = row["dimensionValues"]
mets = row["metricValueGroups"][0]["primaryValues"]
record = {}
for i, col in enumerate(dim_cols):
val = dims[i]
if "intValue" in val:
raw = val["intValue"]
if dimensions[i] == "DATE":
record[col] = raw[:4] + "-" + raw[4:6] + "-" + raw[6:8]
else:
record[col] = int(raw)
else:
record[col] = val.get("stringValue", "")
for i, col in enumerate(metric_cols):
val = mets[i]
if "doubleValue" in val:
record[col] = float(val["doubleValue"])
elif "intValue" in val:
record[col] = int(val["intValue"])
else:
record[col] = 0
save.row(record)
page_token = res.json.get("nextPageToken", "")
if page_token == "":
break
print("Saved " + str(save.count()) + " rows")
```
## Using as a Standalone Script
Use this only if you want full control in Starlark. For most cases, YAML is simpler and preferred.
```python
# models/raw/gam_report.star
# @kind: append
# @incremental: report_date
# @incremental_initial: 2026-02-28
load("lib/gam_report.star", "gam_report")
gam_report(
save,
network_code=env.get("GAM_NETWORK_CODE"),
key_file=env.get("GAM_KEY_FILE"),
dimensions=["AD_UNIT_NAME", "DATE", "ORDER_NAME", "LINE_ITEM_NAME", "CREATIVE_NAME"],
metrics=["AD_SERVER_IMPRESSIONS", "AD_SERVER_CLICKS", "AD_SERVER_CTR", "AD_SERVER_REVENUE"],
)
```