# Ondatra Labs > The data runtime for teams that ship. One binary replaces ingestion, transformation, and orchestration. One binary replaces ingestion, transformation, and orchestration. - License: MIT - Language: Go - Repository: https://github.com/ondatra-labs/ondatrasql - Discord: https://discord.gg/TQEqsa9q --- ## DuckDB Configuration > How the runtime boots — DuckDB configuration via SQL files URL: https://ondatra.sh/duckdb-configuration/ OndatraSQL prepares DuckDB before running your pipeline. Everything in `config/` is executed as SQL — no YAML, no config DSL. This is where you configure DuckDB, define credentials, attach your catalog, and connect external sources. ## Mental Model The runtime loads configuration in two phases: **Before the catalog is attached** → configure DuckDB itself **After the catalog is attached** → define logic that depends on your data ## Loading Order **Pre-catalog:** | File | What it does | |------|--------------| | [`settings.sql`](settings/) | Configure DuckDB (memory, threads, etc.) | | [`secrets.sql`](secrets/) | Credentials — not stored in data, only used at runtime | | [`extensions.sql`](extensions/) | Load DuckDB extensions | **Catalog:** | File | What it does | |------|--------------| | [`catalog.sql`](catalog/) | **Attach DuckLake** — without this, there's no database to write to | **Post-catalog:** | File | What it does | |------|--------------| | [`macros.sql`](macros/) | Reusable SQL logic | | [`variables.sql`](variables/) | Runtime variables | | [`sources.sql`](sources/) | External database connections | All files except `catalog.sql` are optional. Missing files are silently skipped. {{< callout type="info" >}} Schemas are created automatically. Organize models into directories — `models/staging/orders.sql` creates `staging` schema on first run. {{< /callout >}} ## Common Patterns **Connect to PostgreSQL:** ```sql -- config/sources.sql ATTACH 'postgresql://${PG_USER}:${PG_PASSWORD}@${PG_HOST}/db' AS source (READ_ONLY); ``` **Add a reusable macro:** ```sql -- config/macros.sql CREATE MACRO is_active(status) AS status = 'active'; ``` **Use cloud storage:** ```sql -- config/secrets.sql CREATE SECRET aws_chain (TYPE S3, PROVIDER credential_chain); ``` **Set DuckDB memory limit:** ```sql -- config/settings.sql SET memory_limit = '4GB'; SET threads = 4; ``` ## Built-in Runtime OndatraSQL automatically provides variables and macros after config loads. No setup required. These power change detection, lineage, and incremental execution. ### Variables | Variable | Description | |----------|-------------| | `ondatra_run_time` | When the current run started (UTC) | | `ondatra_load_id` | Unique identifier for this run | | `curr_snapshot` | Current DuckLake snapshot ID | | `prev_snapshot` | Previous snapshot ID (for CDC) | | `dag_start_snapshot` | Snapshot at DAG start (consistent CDC across models) | ### Macros **Runtime** `ondatra_now()` · `ondatra_load_id()` · `last_snapshot()` · `prev_snapshot()` · `dag_start_snapshot()` **Change Data Capture** `ondatra_cdc_changes()` · `ondatra_cdc_deletes()` · `ondatra_cdc_updates_before()` · `ondatra_cdc_summary()` · `ondatra_cdc_empty()` **Schema & Metadata** `ondatra_table_exists()` · `ondatra_column_exists()` · `ondatra_column_type()` · `ondatra_get_columns()` · `ondatra_schema_hash()` **Data Quality** `ondatra_row_count()` · `ondatra_has_rows()` · `ondatra_min_value()` · `ondatra_max_value()` · `ondatra_distinct_count()` · `ondatra_null_count()` · `ondatra_duplicate_count()` · `ondatra_table_diff()` · `ondatra_compare_tables()` **Metadata** `ondatra_get_sql_hash()` · `ondatra_get_commit_info()` · `ondatra_get_downstream()` ## Editing ```bash ondatrasql edit settings ondatrasql edit secrets ondatrasql edit catalog ondatrasql edit macros ondatrasql edit variables ondatrasql edit sources ``` ## Sandbox Mode In [sandbox mode](/concepts/sandbox/), `catalog.sql` is skipped. Instead: - **Production catalog** → attached read-only - **Sandbox catalog** → attached writable Models read from production, write to sandbox. All other config files load normally. --- ### settings.sql > Execution settings — how the runtime uses DuckDB URL: https://ondatra.sh/duckdb-configuration/settings/ **Phase:** Pre-catalog | **Order:** 1 | **Required:** No OndatraSQL runs directly on DuckDB. This file controls how the runtime executes queries — memory usage, parallelism, and disk spill behavior. You usually don't need to change this. Defaults work for most cases. ```sql -- Limit how much memory the runtime can use SET memory_limit = '8GB'; -- Control parallel execution SET threads = 4; -- Where to spill when memory is exceeded SET temp_directory = '/tmp/duckdb'; -- Timestamp interpretation SET TimeZone = 'Europe/Stockholm'; ``` This file runs before the DuckLake catalog is attached. ## Common Settings Most projects never change these. Adjust only if needed. | Setting | What it controls | When to change | |---|---|---| | `memory_limit` | Max memory used by the runtime | Large datasets or limited RAM | | `threads` | Parallel execution | Limit CPU on shared machines | | `temp_directory` | Disk spill location | Slow disks or constrained environments | | `TimeZone` | Timestamp interpretation | Working across regions | For advanced tuning, see the [DuckDB configuration reference](https://duckdb.org/docs/stable/configuration/overview). --- ### secrets.sql > Access external data without adding infrastructure URL: https://ondatra.sh/duckdb-configuration/secrets/ **Secrets — access external data without adding infrastructure** **Phase:** Pre-catalog | **Order:** 2 | **Required:** No OndatraSQL runs locally, but your data doesn't have to. This file defines how the runtime connects to external systems — cloud storage, databases, and APIs — using DuckDB's built-in [secrets manager](https://duckdb.org/docs/stable/configuration/secrets_manager). No separate credential system. No service configuration. Just SQL. You only need this if you connect to external data sources. Local pipelines (CSV, Parquet, APIs via Starlark) do not require any secrets. {{< callout type="warning" >}} Never commit this file to version control. Use environment variables (`.env`) for sensitive values, or rely on credential providers (like AWS credential chain) when available. {{< /callout >}} ## Common Use Cases ### Cloud storage ```sql -- Use AWS credential chain (recommended) CREATE SECRET aws_chain ( TYPE s3, PROVIDER credential_chain ); ``` ### Explicit credentials (dev/testing) ```sql CREATE SECRET s3_secret ( TYPE s3, KEY_ID '${AWS_ACCESS_KEY_ID}', SECRET '${AWS_SECRET_ACCESS_KEY}', REGION 'eu-north-1' ); ``` ### Scoped credentials ```sql CREATE SECRET prod_bucket ( TYPE s3, KEY_ID '${AWS_ACCESS_KEY_ID}', SECRET '${AWS_SECRET_ACCESS_KEY}', SCOPE 's3://prod-data/' ); ``` ### Databases ```sql CREATE SECRET pg_secret ( TYPE postgres, HOST '${PG_HOST}', PORT 5432, DATABASE 'warehouse', USER 'readonly', PASSWORD '${PG_PASSWORD}' ); ``` Secrets are used by DuckDB itself — not by OndatraSQL. Once defined, they apply automatically to any query that accesses external data. Supported providers include S3, GCS, R2, Azure, PostgreSQL, and MySQL. --- ### extensions.sql > Optional capabilities for the runtime URL: https://ondatra.sh/duckdb-configuration/extensions/ **Extensions — optional capabilities for the runtime** **Phase:** Pre-catalog | **Order:** 3 | **Required:** No OndatraSQL runs directly on DuckDB, and inherits its extension system. Extensions add capabilities like cloud storage, external databases, or geospatial functions — without adding services or dependencies. Most projects don't need this. Extensions are loaded only when required. Use this file when an extension must be available before the pipeline runs. For example: - Accessing S3 or cloud storage - Connecting to external databases - Using geospatial or specialized functions There are two ways to use extensions: - **`extensions.sql`** — load once for the entire runtime (global) - **`@extension` directive** — load only for a specific model (local) Prefer `@extension` unless the extension is required early (e.g. in `catalog.sql`). ## Common Extensions ### Cloud storage ```sql -- Required for S3 / HTTP access INSTALL httpfs; LOAD httpfs; ``` ### External databases ```sql -- PostgreSQL connector INSTALL postgres; LOAD postgres; -- MySQL connector INSTALL mysql; LOAD mysql; ``` ### Specialized functions ```sql -- Geospatial support INSTALL spatial; LOAD spatial; ``` Extensions are part of DuckDB — not a separate plugin system. They are loaded into the runtime and used transparently by your models. {{< callout type="info" >}} DuckDB automatically loads most core extensions on first use. You only need to install/load extensions explicitly when: - They must be available before execution (e.g. `catalog.sql`) - You use community extensions {{< /callout >}} For advanced use cases, see the [DuckDB extensions documentation](https://duckdb.org/docs/stable/extensions/overview). --- ### catalog.sql > Where your data lives — the only required configuration URL: https://ondatra.sh/duckdb-configuration/catalog/ **Catalog — where your data lives** **Phase:** Catalog | **Order:** 4 | **Required:** Yes The catalog is where your data lives. OndatraSQL writes tables to DuckLake — a versioned storage layer built on Parquet. This file tells the runtime where to store metadata and data files. This is the only required configuration. You don't provision a warehouse. You point the runtime at a location. OndatraSQL handles: - Table creation - Schema evolution - Snapshot management - Change detection You only define where the data should live. ## Mental Model - The **catalog** stores metadata (tables, schemas, snapshots) - The **data path** stores Parquet files - Together, they form your warehouse ## Recommended Backends If you're unsure, use SQLite. ### SQLite (default) Zero configuration. Embedded. Works out of the box. This is the default and the right choice for most use cases — including production on a single machine. ```sql -- Local storage (default — created by ondatrasql init) ATTACH 'ducklake:sqlite:ducklake.sqlite' AS lake (DATA_PATH 'ducklake.sqlite.files'); ``` ```sql -- Cloud storage (SQLite catalog + S3 data) -- Requires: httpfs in extensions.sql ATTACH 'ducklake:sqlite:ducklake.sqlite' AS lake (DATA_PATH 's3://my-bucket/data/'); ``` ### PostgreSQL Use this when multiple users or systems need to access the same catalog. The data remains in DuckLake — PostgreSQL only stores metadata. This allows external tools (Metabase, Grafana, psql, pg_duckdb) to query your data without copying it. ```sql -- Multi-user (PostgreSQL catalog + local data) -- Requires: postgres in extensions.sql, credentials in secrets.sql ATTACH 'ducklake:postgres:dbname=ducklake_catalog host=localhost' AS lake (DATA_PATH '/data/warehouse/'); ``` ```sql -- Multi-user (PostgreSQL catalog + S3 data) ATTACH 'ducklake:postgres:dbname=ducklake_catalog host=db.prod.internal' AS lake (DATA_PATH 's3://my-bucket/data/'); ``` ### Which to choose? | | SQLite | PostgreSQL | |---|---|---| | **Setup** | Zero config | Requires a PostgreSQL server | | **Read access** | OndatraSQL only | Any PostgreSQL client (pg_duckdb, Metabase, psql) | | **Multi-machine** | No (local file) | Yes | | **Performance** | Fast (embedded) | Fast (network round-trips for metadata) | | **Cloud data** | Yes (S3/GCS via DATA_PATH) | Yes | | **Best for** | Development, single-machine prod | Shared warehouse, BI tools, team access | {{< callout type="warning" >}} DuckLake also supports DuckDB and MySQL as catalog backends, but OndatraSQL only supports SQLite and PostgreSQL. Features like sandbox mode, CDC, and run-type detection are tested and maintained for these two backends only. {{< /callout >}} ## Advanced Options ### Data Inlining DuckLake automatically inlines small writes — inserts or deletes affecting fewer than 10 rows are stored directly in the catalog metadata instead of creating new Parquet files. This is **enabled by default** and requires no configuration. This is particularly useful for incremental `append` models that add a few rows per run. Instead of creating a new Parquet file for each run, the data is stored in the catalog until it accumulates enough to flush. Override the threshold via the `DATA_INLINING_ROW_LIMIT` option on ATTACH: ```sql -- Increase threshold to 50 rows ATTACH 'ducklake:sqlite:ducklake.sqlite' AS lake (DATA_PATH 'ducklake.sqlite.files', DATA_INLINING_ROW_LIMIT 50); ``` ```sql -- Disable inlining entirely ATTACH 'ducklake:sqlite:ducklake.sqlite' AS lake (DATA_PATH 'ducklake.sqlite.files', DATA_INLINING_ROW_LIMIT 0); ``` You can also set it per-table after attachment: ```sql CALL lake.set_option('data_inlining_row_limit', 50, table_name => 'staging.events'); ``` Inlined data behaves identically to Parquet data — time travel, queries, and schema evolution all work the same. To manually flush inlined data to Parquet files, use: ```sql CALL ducklake_flush_inlined_data('lake'); ``` {{< callout type="info" >}} The default threshold of 10 rows is a good balance for most workloads. Only change it if you have many models that consistently produce small batches (increase) or if you want all data in Parquet immediately (set to 0). {{< /callout >}} ### Encryption DuckLake can automatically encrypt all Parquet data files using the `ENCRYPTED` option. Each file gets a unique encryption key, stored in the catalog. Decryption is transparent — queries work identically. This is useful when storing data on untrusted storage (S3, shared filesystems) where the catalog database is the trusted component. ```sql -- Encrypted local storage ATTACH 'ducklake:sqlite:ducklake.sqlite' AS lake (DATA_PATH 'ducklake.sqlite.files', ENCRYPTED); ``` ```sql -- Encrypted cloud storage ATTACH 'ducklake:sqlite:ducklake.sqlite' AS lake (DATA_PATH 's3://my-bucket/data/', ENCRYPTED); ``` ```sql -- Encrypted multi-user (PostgreSQL catalog controls key access) ATTACH 'ducklake:postgres:dbname=ducklake_catalog host=localhost' AS lake (DATA_PATH 's3://my-bucket/data/', ENCRYPTED); ``` {{< callout type="info" >}} Encryption keys are stored in the catalog database. Anyone with catalog access can decrypt the data. Use this to protect data at rest on untrusted storage — not as a substitute for catalog access control. {{< /callout >}} --- ### macros.sql > Reuse SQL without templating URL: https://ondatra.sh/duckdb-configuration/macros/ **Macros — reuse SQL without templating** **Phase:** Post-catalog | **Order:** 5 | **Required:** No Macros let you reuse logic across models. Unlike templating systems, these are real SQL functions — executed by DuckDB, not preprocessed. No Jinja. No string templating. Just SQL. Use macros when you want to: - Reuse calculations across models - Encapsulate business logic - Define small helper functions - Avoid repeating SQL Built-in `ondatra_*` macros are created automatically — this file is for your own macros. ## Examples ### Pure computation ```sql -- Simple reusable logic CREATE OR REPLACE MACRO safe_divide(a, b) AS CASE WHEN b = 0 THEN NULL ELSE a / b END; CREATE OR REPLACE MACRO cents_to_dollars(cents) AS cents / 100.0; ``` ### Business logic ```sql -- Read from a table CREATE OR REPLACE MACRO default_currency() AS ( SELECT currency FROM mart.company_settings LIMIT 1 ); ``` ### Table macros ```sql -- Return a result set CREATE OR REPLACE MACRO recent_orders(n) AS TABLE SELECT * FROM mart.orders ORDER BY order_date DESC LIMIT n; ``` ## Usage in Models Macros are available in all models automatically. ```sql -- models/mart/revenue.sql -- @kind: table SELECT order_date, safe_divide(total_amount, quantity) AS unit_price, cents_to_dollars(total_amount) AS amount_dollars FROM staging.orders WHERE currency = default_currency() ``` ```sql -- Table macros work as FROM sources -- models/mart/recent_summary.sql -- @kind: view SELECT COUNT(*) AS order_count, SUM(amount) AS total FROM recent_orders(100) ``` Table references work the same as in models — no special prefixes needed. Macros are part of the runtime — not a separate layer. They run inside DuckDB and behave like native SQL functions. {{< callout type="info" >}} Macros are stored in DuckDB's in-memory catalog and made available to all models automatically. You don't need to configure anything. {{< /callout >}} DuckDB supports both **scalar macros** (return a single value) and **table macros** (return a result set). For advanced usage, see the [DuckDB macro documentation](https://duckdb.org/docs/stable/sql/statements/create_macro). --- ### variables.sql > Configure your pipeline with SQL URL: https://ondatra.sh/duckdb-configuration/variables/ **Variables — configure your pipeline with SQL** **Phase:** Post-catalog | **Order:** 6 | **Required:** No Variables let you define configuration inside your pipeline. No YAML files. No separate config system. Just SQL. Use variables for: - Business logic (currency, tax rates, thresholds) - Environment-specific settings - Retention policies and limits - Values reused across models and scripts Variables are part of the runtime: - **`.env`** — external configuration (secrets, credentials) - **`variables.sql`** — pipeline logic and behavior Built-in variables (`ondatra_run_time`, `ondatra_load_id`, snapshots) are set automatically. ## Examples ### Business config ```sql SET VARIABLE default_currency = 'SEK'; SET VARIABLE vat_rate = 0.25; SET VARIABLE fiscal_year_start = DATE '2024-07-01'; ``` ### Policies ```sql SET VARIABLE archive_after_days = 365; SET VARIABLE delete_after_days = 730; ``` ### Dynamic values ```sql -- Derived from data SET VARIABLE latest_order_date = ( SELECT MAX(order_date) FROM mart.orders ); ``` ## Usage Variables are available everywhere: - SQL models — `getvariable('name')` - Scripts — `getvariable("name")` ```sql SELECT * FROM staging.orders WHERE currency = getvariable('default_currency') ``` ```python currency = getvariable("default_currency") # "SEK" ``` Variables are evaluated at runtime — not templated. They behave like values in the database, not string substitutions. If a variable is not set: - SQL — returns `NULL` - Scripts — returns empty string For advanced usage, see the [DuckDB variable documentation](https://duckdb.org/docs/stable/sql/statements/set_variable). --- ### sources.sql > No connectors. Just queries. URL: https://ondatra.sh/duckdb-configuration/sources/ **Sources — no connectors. Just queries.** **Phase:** Post-catalog | **Order:** 7 | **Required:** No Query external data directly from your models. No connectors. No ingestion tools. No sync jobs. Just attach a database and use it in SQL. Use sources when your data already exists somewhere else: - Application databases (PostgreSQL, MySQL) - Operational systems - Existing warehouses You don't need to copy the data first. ## Mental Model External data is just another table. Once attached, you query it like any other model. Sources are read at query time — not synced. ## Examples ### PostgreSQL ```sql -- Attach a PostgreSQL database (read-only) -- Requires: postgres in extensions.sql, credentials in secrets.sql ATTACH 'postgresql://user:pass@host:5432/warehouse' AS warehouse (READ_ONLY); ``` ### MySQL ```sql -- Attach a MySQL database (read-only) -- Requires: mysql in extensions.sql, credentials in secrets.sql ATTACH 'mysql:host=db.example.com port=3306 database=app' AS app_db (READ_ONLY); ``` ## Usage in Models Once attached, external tables behave like local tables. No ingestion step required. ```sql -- models/raw/customers.sql -- @kind: table SELECT customer_id, name, email, created_at FROM warehouse.public.customers WHERE active = true ``` ```sql -- models/raw/products.sql -- @kind: table SELECT * FROM app_db.products ``` ## Files For files (Parquet, CSV, JSON), no setup is required. Read them directly in your models: ```sql SELECT * FROM read_parquet('data/*.parquet') SELECT * FROM read_csv('data/events.csv') SELECT * FROM read_json('s3://bucket/api-dump.json') ``` --- ## Getting Started > Run your first data pipeline in under 5 minutes URL: https://ondatra.sh/getting-started/ Run your first data pipeline in under 5 minutes. No Kafka. No Airflow. No dbt. ## 1. Install ```bash curl -fsSL https://ondatra.sh/install.sh | sh ``` One binary. No dependencies. ## 2. Create a Project ```bash mkdir my-pipeline && cd my-pipeline ondatrasql init ``` ## 3. Write Your First Model Create `models/staging/orders.sql`: ```sql -- @kind: merge -- @unique_key: order_id SELECT * FROM (VALUES (1, 'Alice', 100, '2026-01-15'), (2, 'Bob', 200, '2026-02-20'), (3, 'Charlie', 150, '2026-03-10') ) AS t(order_id, customer, amount, order_date) ``` The file path becomes the table name: `staging.orders`. ## 4. Run It ```bash ondatrasql run ``` ``` Running 1 models... [OK] staging.orders (merge, backfill, 3 rows, 180ms) ``` ## 5. Query the Result ```bash ondatrasql sql "SELECT * FROM staging.orders" ``` ``` | order_id | customer | amount | order_date | | -------- | -------- | ------ | ---------- | | 1 | Alice | 100 | 2026-01-15 | | 2 | Bob | 200 | 2026-02-20 | | 3 | Charlie | 150 | 2026-03-10 | ``` ### What Just Happened You didn't: - set up a database - configure a pipeline - write orchestration logic - install a warehouse You wrote SQL, ran one command, and got a table. ## 6. Add a Downstream Model Create `models/mart/revenue.sql`: ```sql -- @kind: table SELECT order_date, COUNT(*) AS orders, SUM(amount) AS total FROM staging.orders GROUP BY order_date ``` Run again: ```bash ondatrasql run ``` ``` Running 2 models... [OK] staging.orders (merge, skip — unchanged) [OK] mart.revenue (table, backfill, 3 rows, 150ms) ``` OndatraSQL detected the dependency, skipped what hadn't changed, and built only what was needed. No configuration required. ## 7. Preview Changes Safely ```bash ondatrasql sandbox ``` See what will change — row counts, schema diffs, downstream impact — before committing. ## What Else Can You Do - **Ingest from APIs** — [Starlark scripts](/scripting/) with built-in HTTP and OAuth - **Collect events** — [POST to an embedded endpoint](/concepts/events/), no Kafka - **Validate data** — [26 constraints + 17 audits](/validation/) with automatic rollback - **Track lineage** — [column-level](/lineage/), extracted from SQL AST ## That's the Model You don't build pipelines. You run them. {{< callout type="info" >}} **Other install methods:** [Windows (WSL2)](https://learn.microsoft.com/en-us/windows/wsl/install), [build from source](/cli/#building-from-source) (Go 1.25+ and gcc/clang). {{< /callout >}} --- ## CLI Reference > Every command in one page URL: https://ondatra.sh/cli/ One binary. These are the commands. ## Overview Three core commands: | Command | What it does | |---|---| | `run` | Execute your pipeline | | `sandbox` | Preview changes safely | | `daemon` | Collect events via HTTP | Everything else is optional. ## Most Common Commands ```bash ondatrasql run # Run your entire pipeline ondatrasql sandbox # Preview before committing ondatrasql run staging.orders # Run a single model ondatrasql sql "SELECT * FROM staging.orders LIMIT 10" ``` ## Example Workflow ```bash # 1. Create a model ondatrasql new staging/orders.sql # 2. Edit it ondatrasql edit staging.orders # 3. Preview changes ondatrasql sandbox # 4. Run pipeline ondatrasql run # 5. Query results ondatrasql sql "SELECT * FROM staging.orders" ``` --- ## Core Commands ### run Run your entire pipeline in one command. ```bash ondatrasql run # All models in DAG order ondatrasql run staging.orders # Single model ``` What it does: - Builds the DAG automatically - Detects what changed - Runs only what's needed - Writes results to DuckLake No configuration required. ### sandbox Preview all changes before committing. ```bash ondatrasql sandbox # All models ondatrasql sandbox staging.orders # One model ``` Shows row count diffs, schema evolution, downstream propagation — without touching production. See [Sandbox Mode](/concepts/sandbox/). ### daemon Start the event collection endpoint. ```bash COLLECT_PORT=8080 ondatrasql daemon ``` Receives events via HTTP, buffers them durably on disk. `ondatrasql run` flushes them to DuckLake. ```bash # Custom ports COLLECT_PORT=9090 COLLECT_ADMIN_PORT=9091 ondatrasql daemon ``` See [Event Collection](/concepts/events/). --- ## Development ### init Create a new project in the current directory. ```bash ondatrasql init ``` ### new Create a model file. ```bash ondatrasql new staging/orders.sql ondatrasql new raw/api_users.star ondatrasql new raw/gam_report.yaml ``` ### edit Open in `$EDITOR`. ```bash ondatrasql edit staging.orders # Model ondatrasql edit env # .env ondatrasql edit catalog # config/catalog.sql ondatrasql edit macros # config/macros.sql ondatrasql edit secrets # config/secrets.sql ondatrasql edit settings # config/settings.sql ondatrasql edit variables # config/variables.sql ondatrasql edit sources # config/sources.sql ondatrasql edit extensions # config/extensions.sql ``` --- ## Querying ### sql Run arbitrary SQL — no external database required. ```bash ondatrasql sql "SELECT COUNT(*) FROM staging.orders" ondatrasql sql "SELECT * FROM staging.orders" --format json ``` ### query Query a table directly. ```bash ondatrasql query staging.orders ondatrasql query staging.orders --limit 10 --format csv ``` Supports `--format csv|json|markdown` and `--limit N`. --- ## Introspection ### stats Project overview — all models, kinds, status. ```bash ondatrasql stats ``` ### history Run history for a model. ```bash ondatrasql history staging.orders ondatrasql history staging.orders --limit 5 ``` ### describe Schema, dependencies, directives, SQL definition. ```bash ondatrasql describe staging.orders ``` ### lineage Column-level lineage extracted from SQL AST. ```bash ondatrasql lineage overview # All models ondatrasql lineage staging.orders # One model ondatrasql lineage staging.orders.total # One column ``` --- ## Automation ### --json Machine-readable output. Use it for CI/CD, alerts, or orchestrator integration. ```bash ondatrasql run --json 2>/dev/null | jq -s '.' ``` Every model emits one JSON object: ```json { "model": "staging.orders", "kind": "table", "run_type": "incremental", "rows_affected": 42, "duration_ms": 567, "status": "ok" } ``` | Field | Description | |---|---| | `model` | Target table | | `kind` | `table`, `view`, `append`, `merge`, `scd2`, `partition`, `events` | | `run_type` | `skip`, `backfill`, `incremental`, `full`, `create`, `flush` | | `run_reason` | Why this run type was chosen | | `rows_affected` | Rows written (0 for skip) | | `duration_ms` | Execution time | | `status` | `ok` or `error` | | `errors` | Error messages (when `status` is `error`) | | `warnings` | Warnings (schema evolution, validation) | ### version ```bash ondatrasql version ``` --- ## Maintenance DuckLake file management. All support `sandbox` to preview. ```bash ondatrasql merge # Merge small Parquet files ondatrasql expire # Expire old snapshots ondatrasql cleanup # Delete unreferenced files ondatrasql orphaned # Delete orphaned files ``` Preview first: ```bash ondatrasql merge sandbox ondatrasql expire sandbox ``` --- ## Building from Source Requires Go 1.25+ and a C compiler (`gcc` or `clang`) for the embedded DuckDB driver. ```bash go install github.com/ondatra-labs/ondatrasql/cmd/ondatrasql@latest ``` If you see `cgo: C compiler not found`: ```bash # Ubuntu/Debian sudo apt install build-essential # macOS xcode-select --install ``` --- ## Concepts > How OndatraSQL works — the mental model URL: https://ondatra.sh/concepts/ OndatraSQL runs your data pipeline. You write SQL files. Run one command. Your data is materialized. ## The Mental Model Think of OndatraSQL as a program that runs your data. - **Files** = models - **`ondatrasql run`** = execution - **Output** = tables in DuckLake You don't configure pipelines. You run them. ## What Happens When You Run ```bash ondatrasql run ``` OndatraSQL: 1. Finds all models 2. Builds a dependency graph 3. Detects what changed 4. Runs only what's needed 5. Writes results to DuckLake Done. ## Example ``` raw.events → staging.sessions → mart.daily_traffic ``` You don't define this graph. It's extracted from your SQL. ## The Runtime Model Everything is built into one system: - **Dependency graph** → automatic - **Change detection** → automatic - **Incremental loading** → automatic - **Schema evolution** → automatic - **Validation** → automatic No config files. No orchestration layer. ## Data Flow 1. Data is collected or loaded 2. Stored durably on disk 3. Loaded into DuckDB 4. Transformed via SQL 5. Written to DuckLake If anything fails, nothing is lost. ## Change Detection Every model gets a run type: | Run Type | Meaning | |---|---| | `skip` | Nothing changed | | `incremental` | New data in source | | `full` | Upstream changed | | `backfill` | First run or definition changed | You don't write incremental logic. It's built in. ## Safety - **Constraints** block bad data before it's written - **Audits** catch regressions after - **Failures** roll back automatically via time-travel Every run is reproducible. ## What OndatraSQL Is Not OndatraSQL runs on a single machine. It is not: - A distributed system - A streaming platform - A cloud warehouse - An orchestrator You don't orchestrate data pipelines. You run them. --- ### Models > Everything is a model — ingestion, transformation, and events in one system URL: https://ondatra.sh/concepts/models/ Everything in OndatraSQL is a model. A model can transform data (SQL), ingest data (API scripts), or collect events (HTTP). All models run in the same pipeline, with the same execution model. ## Mental Model - **SQL** = transformations - **Scripts** = ingestion - **Events** = streaming input Different inputs — same pipeline. ## Three Formats — One Runtime | Type | You write | OndatraSQL does | |---|---|---| | `.sql` | A SELECT statement | Materializes, tracks changes, evolves schema | | `.star` | API logic | Runs it, buffers output, materializes | | `.yaml` | Config for a source function | Calls it, materializes | | `.sql` (events) | A column schema | Receives HTTP events, buffers, flushes | File path = table name: `models/staging/orders.sql` → `staging.orders`. ## SQL Models Write a SELECT. Everything else is automatic. - Table creation - Incremental logic - Schema evolution - Change detection ```sql -- @kind: merge -- @unique_key: order_id SELECT order_id, customer_id, total, updated_at FROM raw.orders ``` ### Views No materialization. Resolves live. ```sql -- @kind: view SELECT order_id, customer_id, total FROM raw.orders WHERE total > 0 ``` ## Starlark Scripts Fetch data from APIs without leaving the pipeline. HTTP, OAuth, pagination — all built in. No Python, no dependencies. ```python # @kind: append # @incremental: updated_at resp = http.get("https://api.example.com/users") for user in resp.json: save.row(user) ``` ### Read From Your Data Scripts can query DuckDB directly. This automatically creates dependencies in the DAG. ```python rows = query("SELECT * FROM mart.customers WHERE synced = false") for row in rows: http.post("https://api.hubspot.com/contacts", json=row) save.row({"id": row["id"], "synced_at": str(time.now())}) ``` ### Shared Libraries Put reusable logic in `lib/`. Import with `load()`. ```python load("lib/pagination.star", "paginate") for page in paginate("https://api.example.com/users"): for user in page: save.row(user) ``` See [Scripting](/scripting/) for all built-in modules. ## Events Models Define a schema. Send events via HTTP. No Kafka. No ingestion system. ```sql -- @kind: events event_name VARCHAR NOT NULL, page_url VARCHAR, user_id VARCHAR, received_at TIMESTAMPTZ ``` Then: ```bash curl -X POST localhost:8080/collect/raw/events \ -d '{"event_name":"pageview","page_url":"/home"}' ``` See [Event Collection](/concepts/events/). ## YAML Models Use configuration instead of code. When the logic already exists in `lib/`. ```yaml kind: append incremental: report_date source: gam_report config: network_code: ${GAM_NETWORK_CODE} dimensions: - AD_UNIT_NAME - DATE ``` OndatraSQL calls your source function automatically. See [Blueprints](/blueprints/) for examples. ## Directives Control behavior with comments: ```sql -- @kind: merge -- @unique_key: order_id -- @incremental: updated_at -- @constraint: order_id NOT NULL -- @audit: row_count > 0 ``` See [Directives](/concepts/directives/) for the full list. ## Why Models Matter Most tools split this into multiple systems: | Task | Traditional stack | OndatraSQL | |---|---|---| | Transform data | dbt | SQL model | | Ingest APIs | Python + Airflow | Starlark model | | Collect events | Kafka | Events model | | Schedule runs | Airflow | `ondatrasql run` | OndatraSQL unifies everything into models. One abstraction, one pipeline, one binary. --- ### Directives > Directives turn SQL into a data pipeline URL: https://ondatra.sh/concepts/directives/ Directives turn SQL into a data pipeline. They are comments that control how data is materialized, how changes are detected, and how data is validated. You write SQL. Directives define everything else. ## Mental Model - **SQL** → what the data is - **Directives** → how it behaves Together, they define a complete pipeline step. ## Before vs After Without directives: ```sql SELECT * FROM raw.orders ``` Just a query. With directives: ```sql -- @kind: merge -- @unique_key: order_id -- @incremental: updated_at -- @constraint: total >= 0 -- @audit: row_count > 0 SELECT * FROM raw.orders ``` A fully managed pipeline — updates existing rows, blocks invalid data, verifies the result, tracks changes automatically. ## Grouped Reference ### Core Control how data is materialized: | Directive | What it does | |---|---| | `@kind` | Materialization strategy — `table`, `merge`, `append`, `scd2`, `partition`, `view`, `events` | | `@unique_key` | Row matching key (required for merge, scd2, partition) | | `@incremental` | Cursor column for incremental state | | `@incremental_initial` | Starting cursor value (default: `1970-01-01T00:00:00Z`) | ### Storage & Performance | Directive | What it does | |---|---| | `@partitioned_by` | DuckLake file partitioning — organizes data into separate files per key | | `@sorted_by` | DuckLake sorted table — better statistics for faster queries | ### Validation | Directive | What it does | |---|---| | `@constraint` | Block bad data before insert (26 patterns) | | `@audit` | Verify results after insert, rollback on failure (17 patterns) | | `@warning` | Log issues without failing | ### Metadata | Directive | What it does | |---|---| | `@description` | Table comment in DuckLake catalog | | `@column` | Column comment + masking tags | ### Runtime | Directive | What it does | |---|---| | `@extension` | Load a DuckDB extension before execution | ## Syntax by File Type Same directives, all formats: {{< tabs >}} {{< tab "SQL" >}} ```sql -- @kind: merge -- @unique_key: order_id -- @constraint: order_id NOT NULL SELECT * FROM raw.orders ``` {{< /tab >}} {{< tab "Starlark" >}} ```python # @kind: append # @incremental: updated_at resp = http.get("https://api.example.com/orders") for order in resp.json: save.row(order) ``` {{< /tab >}} {{< tab "YAML" >}} ```yaml kind: merge unique_key: order_id constraints: - order_id NOT NULL ``` {{< /tab >}} {{< /tabs >}} ## Column Masking Tags in `@column` directives trigger automatic masking during materialization: ```sql -- @column: email = Contact email | mask_email | PII -- @column: ssn = SSN | mask_ssn | sensitive ``` Define masking macros in `config/macros.sql`: ```sql CREATE OR REPLACE MACRO mask_email(val) AS regexp_replace(val::VARCHAR, '(.).*@', '\1***@'); CREATE OR REPLACE MACRO mask_ssn(val) AS '***-**-' || val[-4:]; ``` Tags with `mask`, `hash`, or `redact` prefixes are applied as macro calls. Other tags (`PII`, `sensitive`) are metadata-only — stored for governance, no execution effect. ## Directive Compatibility | Directive | table | view | append | merge | scd2 | partition | events | |---|---|---|---|---|---|---|---| | `@description` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `@unique_key` | — | — | — | Required | Required | Required | — | | `@partitioned_by` | Hint | — | Hint | Hint | Hint | — | — | | `@sorted_by` | Yes | — | Yes | Yes | Yes | Yes | — | | `@incremental` | — | — | Yes | Yes | Yes | Yes | — | | `@column` | Yes | — | Yes | Yes | Yes | Yes | — | | `@constraint` | Yes | — | Yes | Yes | Yes | Yes | — | | `@audit` | Yes | — | Yes | Yes | Yes | Yes | — | | `@warning` | Yes | — | Yes | Yes | Yes | Yes | — | | `@extension` | Yes | Yes | Yes | Yes | Yes | Yes | — | ## Why Directives Exist Most tools split pipeline configuration across multiple systems — SQL for transformations, config files for behavior, separate tools for validation. Directives unify everything inside the model. One file defines what the data is and how it behaves. --- ### Model Kinds > Choose data behavior with one directive — not pipeline logic URL: https://ondatra.sh/concepts/kinds/ Model kinds define how your data behaves. With one directive, you choose how data is stored, how changes are handled, and how updates are applied. ## Mental Model Same SQL — different behavior: ```sql SELECT * FROM raw.orders -- @kind: append → only adds new rows -- @kind: merge → updates existing, inserts new -- @kind: scd2 → keeps full history of every change -- @kind: table → rebuilds when something changed ``` ## Which Kind Should You Use? | Kind | Use for | |---|---| | `table` | Aggregates, marts, lookup tables | | `view` | Lightweight transforms, staging | | `append` | Event logs, fact tables, audit trails | | `merge` | User/customer tables (upsert) | | `scd2` | History tracking, dimension tables | | `partition` | Large partitioned fact tables | | `events` | HTTP event ingestion | ## view No storage. Always up-to-date. Resolves at query time. ```sql -- @kind: view SELECT order_id, customer_id, total FROM raw.orders WHERE total > 0 ``` Recreated if definition changes. Skipped otherwise. ## table (default) Rebuilds only when something changed. Tracks upstream dependencies — if nothing changed upstream, nothing runs. ```sql -- @kind: table SELECT * FROM raw.products WHERE is_active = true ``` | Condition | What happens | |---|---| | First run | Full build | | SQL changed | Rebuild | | Upstream changed | Rebuild | | Nothing changed | Skip — no query, no I/O | ## append Add new data. Never updates existing rows. [Smart CDC](/concepts/cdc/) on incremental runs. ```sql -- @kind: append SELECT * FROM raw.events ``` Source tables are rewritten to change-only queries via DuckLake time-travel. ## merge Like an UPSERT. Updates existing rows, inserts new ones. [Smart CDC](/concepts/cdc/) on incremental runs. ```sql -- @kind: merge -- @unique_key: customer_id SELECT customer_id, name, email, tier FROM raw.customers ``` Requires `@unique_key`. ## scd2 Keeps every version of a row. Full history, permanent. Uses full-state comparison — not Smart CDC. ```sql -- @kind: scd2 -- @unique_key: product_id SELECT product_id, name, price, category FROM raw.products ``` OndatraSQL adds `valid_from_snapshot`, `valid_to_snapshot`, and `is_current` automatically. Changed rows get closed and a new version inserted. Requires `@unique_key`. History survives snapshot expiration. ## partition Replaces only affected partitions. [Smart CDC](/concepts/cdc/) on incremental runs. ```sql -- @kind: partition -- @unique_key: year, month SELECT order_id, total, EXTRACT(YEAR FROM order_date) AS year, EXTRACT(MONTH FROM order_date) AS month FROM raw.orders ``` Requires `@unique_key` (supports multiple columns). ## events Send events via HTTP. No Kafka. ```sql -- @kind: events event_name VARCHAR NOT NULL, page_url VARCHAR, user_id VARCHAR, received_at TIMESTAMPTZ ``` Defines a column schema, not a SELECT. See [Event Collection](/concepts/events/). ## How Changes Are Detected | Kind | Strategy | |---|---| | `view` | Definition hash | | `table` | Dependency tracking — skip if unchanged | | `append` / `merge` / `partition` | Smart CDC via time-travel | | `scd2` | Full-state comparison | | `events` | Always flush | ## Storage Hints DuckLake optimizations for any materializing kind: ```sql -- @sorted_by: updated_at → better statistics, faster queries -- @partitioned_by: region → automatic file organization ``` ## Why Model Kinds Exist In most tools, materialization logic is spread across SQL, orchestration config, and custom scripts. Model kinds encode it directly in the model — one line replaces pipeline logic. | Need | Traditional stack | OndatraSQL | |---|---|---| | Append data | Custom SQL + scheduler | `@kind: append` | | Upsert rows | MERGE + orchestration | `@kind: merge` | | Track history | Complex SCD logic | `@kind: scd2` | | Collect events | Kafka + consumer | `@kind: events` | --- ### Smart CDC > Write once. Run incrementally forever. URL: https://ondatra.sh/concepts/cdc/ Write normal SQL. OndatraSQL automatically processes only changed data. No `is_incremental()`. No manual filters. No extra logic. ## Mental Model Your query, but only for new data. You don't change your SQL — OndatraSQL changes how it runs. ## Before and After Without Smart CDC: ```sql SELECT * FROM raw.events WHERE event_time > (SELECT MAX(event_time) FROM target) ``` Manual logic. Easy to get wrong. With Smart CDC: ```sql -- @kind: append SELECT * FROM raw.events ``` That's it. ## How It Works OndatraSQL: 1. Finds your source tables (from SQL AST) 2. Checks what changed since last run (DuckLake snapshots) 3. Runs your query only on that data First run: all rows. Every run after: only changes. ## Example ```sql -- @kind: merge -- @unique_key: customer_id SELECT * FROM raw.customers ``` - First run → all rows - Next runs → only changed rows ## Not All Tables Are Treated the Same - **Main table** → only changed rows - **Joins** → adjusted for correctness You always get correct results — even with joins and aggregations. ## Where It Applies Smart CDC works with: - `append` - `merge` - `partition` Other kinds use different strategies: - `scd2` → full comparison (needs complete source) - `table` → dependency tracking (skip or rebuild) ## Why This Matters Most tools require writing incremental filters, managing state manually, and debugging edge cases. Smart CDC removes all of that. | Task | dbt | OndatraSQL | |---|---|---| | Incremental logic | Manual (`is_incremental()`) | Automatic | | State handling | User-defined | Built-in | | Edge cases | Your problem | Handled | Write once. Run incrementally forever. --- ### Schema Evolution > No migrations. Ever. URL: https://ondatra.sh/concepts/schema-evolution/ Change your SQL. OndatraSQL updates the table automatically. No migrations. No ALTER TABLE. No manual steps. ## Mental Model Your SQL defines the schema. When the query changes, the table adapts. ## Before and After Before: ```sql SELECT order_id, total FROM raw.orders ``` After: ```sql SELECT order_id, total, currency FROM raw.orders ``` OndatraSQL automatically adds the new column, updates the table, and keeps existing data. ## Additive Changes Safe changes are applied automatically: - **New columns** → added - **Renamed columns** → detected via AST lineage - **Types widened** → promoted safely (e.g. `INTEGER` → `BIGINT`) No rebuild needed. ### Safe Type Promotions | From | To | |---|---| | `TINYINT` | `SMALLINT`, `INTEGER`, `BIGINT`, `FLOAT`, `DOUBLE` | | `SMALLINT` | `INTEGER`, `BIGINT`, `FLOAT`, `DOUBLE` | | `INTEGER` | `BIGINT`, `FLOAT`, `DOUBLE` | | `BIGINT` | `DOUBLE` | | `FLOAT` | `DOUBLE` | ## Destructive Changes If a change isn't safe: - **Column removed** - **Type narrowed** OndatraSQL rebuilds the table automatically. No manual intervention. ## How It Works 1. Query runs into a temp table 2. Schemas compared 3. Changes applied — or table rebuilt You modify your SQL. The table follows. ## Why This Matters In most systems, schema changes require migration scripts, manual ALTER TABLE, and coordination between teams. OndatraSQL removes all of that. | Task | Traditional approach | OndatraSQL | |---|---|---| | Add column | Migration script | Update your SQL | | Rename column | Manual ALTER | Automatic | | Type change | Risky migration | Handled | | Drop column | Careful coordination | Automatic rebuild | No migrations. Ever. --- ### Incremental Models > Incremental is not a feature — it's the default URL: https://ondatra.sh/concepts/incremental/ Incremental processing is automatic. Write a normal query. OndatraSQL runs it on new data only. ## Mental Model - **SQL models** → incremental by default - **Scripts** → use a cursor You don't write incremental logic — you just define the model. ## Before and After Traditional approach: ```sql SELECT * FROM raw.events WHERE event_time > (SELECT MAX(event_time) FROM target) ``` Manual. Fragile. OndatraSQL: ```sql -- @kind: append SELECT * FROM raw.events ``` Done. [Smart CDC](/concepts/cdc/) handles the rest. ## SQL Models ```sql -- @kind: append -- @incremental: event_time SELECT * FROM raw.events ``` - First run → all data - Next runs → only new data Automatic via Smart CDC. No manual filters. ### Custom Start Value ```sql -- @incremental: updated_at -- @incremental_initial: "2024-01-01" ``` Default: `1970-01-01T00:00:00Z`. ## Starlark Scripts Scripts don't have SQL to analyze — you control the cursor. ```python # @kind: append # @incremental: updated_at url = "https://api.example.com/events" if not incremental.is_backfill: url += "?since=" + incremental.last_value resp = http.get(url) for item in resp.json: save.row(item) ``` | Property | Description | |---|---| | `incremental.is_backfill` | `True` on first run | | `incremental.last_value` | MAX(cursor) from previous run | | `incremental.initial_value` | Starting value | | `incremental.last_run` | Last successful run timestamp | ## SQL vs Scripts | Model type | Incremental behavior | |---|---| | SQL | Automatic (Smart CDC) | | Scripts | Manual cursor | | YAML | Uses script logic | ## When Backfill Happens - Model SQL or directives changed - Target table doesn't exist - `@kind`, `@unique_key`, `@incremental` changed ## Why This Matters Most tools require writing incremental filters and managing state manually. OndatraSQL makes incremental the default. Combined with [Smart CDC](/concepts/cdc/) and [Schema Evolution](/concepts/schema-evolution/), this eliminates most pipeline maintenance work. --- ### Dependency Graph > Your queries define the graph URL: https://ondatra.sh/concepts/dag/ Dependencies are automatic. Write queries. OndatraSQL figures out the order. ## Mental Model - Files are nodes - SQL references create edges - Execution is automatic You never define the DAG. ## How Dependencies Are Detected - **SQL models** → table references from `FROM`, `JOIN`, CTEs, subqueries - **Starlark models** → `query("...")` calls (followed through `load()` imports) - **YAML models** → same as Starlark via source functions Only references to other models create dependencies. ### Limitation Only static SQL is detected: ```python query("SELECT * FROM staging.orders") # tracked query("SELECT * FROM " + table) # not tracked ``` Dynamic queries are silently skipped. ## Execution Order If a model reads another model, it runs after it. ``` raw.orders → staging.orders raw.customers → staging.customers staging.* → mart.revenue ``` {{< mermaid >}} graph LR RO["raw.orders"] --> SO["staging.orders"] RC["raw.customers"] --> SO RC --> SC["staging.customers"] SO --> MR["mart.revenue"] SC --> MR style RO fill:#1a2332,stroke:#4a6fa5,color:#e0e0e0 style RC fill:#1a2332,stroke:#4a6fa5,color:#e0e0e0 style SO fill:#1a2332,stroke:#3ecfcf,color:#e0e0e0 style SC fill:#1a2332,stroke:#3ecfcf,color:#e0e0e0 style MR fill:#1a2332,stroke:#e8854a,color:#e0e0e0 {{< /mermaid >}} ## Run Types Each model gets a run type before execution: | Run Type | Meaning | |---|---| | `skip` | Nothing changed | | `backfill` | First run or definition changed | | `incremental` | New/changed data | | `full` | Upstream changed | These decisions propagate through the graph. ## Propagation When a model runs, downstream models are re-evaluated. Changes flow through the DAG. If a model fails, downstream models do **not** run. ## Traditional Approach ```python @task def staging_orders(): ... @task def mart_revenue(): staging_orders() ``` Manual DAG. Maintained by hand. **OndatraSQL:** ```sql SELECT * FROM staging.orders ``` DAG inferred automatically. ## Commands **Run:** ```bash ondatrasql run # All models in DAG order ondatrasql run staging.orders # Single model ``` **Preview:** ```bash ondatrasql sandbox ``` **Inspect:** ```bash ondatrasql lineage overview # All dependencies ondatrasql lineage staging.orders # One model ``` Circular dependencies are detected at build time. ## Why This Matters Most tools require defining dependencies manually, maintaining DAGs, and debugging execution order. OndatraSQL builds the graph from your code. Your queries define the graph. --- ### Sandbox Mode > See the result before you commit it URL: https://ondatra.sh/concepts/sandbox/ See the result before you commit it. No surprises. No blind runs. ```bash ondatrasql sandbox ``` ## Mental Model - **Production** = read-only - **Sandbox** = writable copy - Your pipeline runs against both You see the diff before anything is committed. ## How It Works 1. Production catalog mounted read-only 2. Temporary sandbox catalog created 3. Models run normally — reads from production, writes to sandbox 4. Results compared 5. Sandbox discarded ## What Makes It Different This is not just a query preview. It's a full simulation. - Runs the entire DAG — not just one model - Applies schema evolution - Executes validation (constraints + audits) - Shows downstream impact This is a real run — just not committed. ## Without Sandbox 1. Run pipeline 2. Something breaks 3. Debug in production ## With Sandbox 1. Preview everything 2. See impact 3. Then commit ## What You See For each model: - Row changes (added / removed / percentage) - Schema changes (new columns, type changes) - Sample diffs - Validation results - Downstream effects ```text ╠═══ Changes ══════════════════════════════════════════════════╣ ║ staging.orders ║ ║ Rows: 100 → 142 (+42, +42.0%) ║ ║ ║ ║ mart.revenue ║ ║ SCHEMA EVOLUTION: + Added: region (VARCHAR) ║ ║ Rows: 5 → 6 (+1, +20.0%) ║ ``` ## Failures Are Safe If a model fails: - Nothing is committed - Downstream models are not affected - Production never sees it You fix it before it's real. ## Full DAG Preview The entire pipeline executes: ```text [OK] raw.customers (45ms) [OK] staging.orders (95ms) [OK] mart.revenue (54ms) [FAIL] mart.kpis — constraint violated: total >= 0 ``` You see exactly where things break. ## Commands ```bash ondatrasql sandbox # All models ondatrasql sandbox staging.orders # One model ``` Maintenance commands also support preview: ```bash ondatrasql merge sandbox ondatrasql expire sandbox ``` ## When to Use It - Before deploying changes - After modifying SQL - When adding new models - When debugging data issues Or simply: whenever you want certainty. ## Why This Matters Most tools run pipelines blind. You find out something broke after it's committed. Sandbox lets you run your pipeline before you run your pipeline. --- ### Maintenance > Keep your data fast and your storage under control URL: https://ondatra.sh/concepts/maintenance/ Keep your data fast and your storage under control. Built-in maintenance — no separate tooling needed. ## Mental Model - Data is immutable (for time travel and CDC) - Old data accumulates over time - Maintenance removes what's no longer needed ## What Maintenance Does | Command | What it does | |---|---| | `merge` | Combine small files → faster queries | | `expire` | Remove old snapshot metadata | | `cleanup` | Delete unused files → free storage | | `orphaned` | Remove stray files from failed writes | ## Typical Workflow ```bash ondatrasql merge ondatrasql expire ondatrasql cleanup ondatrasql orphaned ``` Run weekly or as needed. Each step builds on the previous one. ## Safe by Default All commands support sandbox — preview before deleting anything: ```bash ondatrasql merge sandbox ondatrasql cleanup sandbox ``` ## When to Run What | Command | When | |---|---| | `merge` | Queries slowing down (many small files) | | `expire` | Regularly (e.g. 30-day retention) | | `cleanup` | After expire | | `orphaned` | After failures or crashes | ## One-Command Option DuckLake's `CHECKPOINT` runs all steps at once: ```sql CHECKPOINT; ``` Configure defaults: ```sql CALL lake.set_option('expire_older_than', '30 days'); CALL lake.set_option('delete_older_than', '7 days'); ``` ## Scheduling Use cron or any workflow scheduler: ```bash # Weekly maintenance (Sunday 3 AM) 0 3 * * 0 cd /path/to/project && ondatrasql merge && ondatrasql expire && ondatrasql cleanup && ondatrasql orphaned ``` ## Customizing The SQL files in `sql/` are fully editable: ```bash ondatrasql edit merge ondatrasql edit expire ondatrasql edit cleanup ondatrasql edit orphaned ``` Adjust retention periods, add table-specific operations, or switch to `CHECKPOINT`. ## Why This Matters Without maintenance, queries slow down and storage grows indefinitely. OndatraSQL keeps your data efficient with a few commands — no vacuum jobs, no external tools. ## Further Reading - [DuckLake: Recommended Maintenance](https://ducklake.select/docs/stable/duckdb/maintenance/recommended_maintenance) - [DuckLake: CHECKPOINT](https://ducklake.select/docs/stable/duckdb/maintenance/checkpoint) --- ### Event Collection > Built-in event ingestion — no Kafka, no message broker URL: https://ondatra.sh/concepts/events/ Ingest events directly over HTTP — no Kafka, no message broker, no infrastructure. OndatraSQL provides a built-in ingestion endpoint with durable buffering and exactly-once delivery into DuckLake. Just POST events. They are stored, recovered on crash, and safely materialized during pipeline runs. ## Mental Model Your app sends events via HTTP. OndatraSQL buffers them on disk. The pipeline flushes them into DuckLake. No external systems. No moving parts. ## Why This Exists Most event pipelines require Kafka (or similar), a consumer service, and a data warehouse loader. OndatraSQL replaces all of that with a single binary. ## Quick Start ### 1. Define the Schema ```sql -- models/raw/events.sql -- @kind: events event_name VARCHAR NOT NULL, page_url VARCHAR, user_id VARCHAR, received_at TIMESTAMPTZ ``` ### 2. Start the Daemon ```bash ondatrasql daemon ``` ### 3. Send Events ```bash curl -X POST localhost:8080/collect/raw/events \ -d '{"event_name":"pageview","page_url":"/home","user_id":"u42"}' ``` That's it. Events are buffered durably on disk. ### 4. Flush to DuckLake ```bash ondatrasql run ``` Events are flushed, transformed, and queryable. ## Exactly-Once Delivery Events are inserted into DuckLake in a single transaction. If a crash happens — no data is lost, no duplicates are created. ## Crash Recovery If the system crashes: - Events on disk → still there - Committed batches → detected and cleaned up - Uncommitted batches → retried automatically No manual intervention required. ## When to Use This Use event collection when: - You don't want Kafka - You run on a single machine or small cluster - You want simple ingestion without infrastructure ## Not Designed For - High-scale distributed streaming - Real-time sub-second processing - Multi-region ingestion ## Architecture Two processes work together: **Daemon** — receives events via HTTP, stores them durably on disk. Runs continuously. **Runner** — flushes events into DuckLake. Runs as part of the pipeline (cron or manual). {{< mermaid >}} sequenceDiagram participant App as Browser / App participant Daemon as ondatrasql daemon participant Runner as ondatrasql run participant DL as DuckLake App->>Daemon: POST /collect/raw/events Daemon-->>App: 202 Accepted Note over Runner: ondatrasql run Runner->>Daemon: claim events Runner->>DL: BEGIN: INSERT + ack → COMMIT Runner->>Daemon: ack (cleanup) {{< /mermaid >}} ## Flush Protocol {{< mermaid >}} flowchart TD CLAIM["1. Claim batch from buffer"] --> TEMP["2. Load into temp table"] TEMP --> BEGIN["3. BEGIN transaction"] BEGIN --> INSERT["4. INSERT INTO target"] INSERT --> ACK_REC["5. Write ack record"] ACK_REC --> COMMIT["6. COMMIT"] COMMIT --> CRASH_NOTE["crash window — safe: recovery detects ack record"] CRASH_NOTE --> BADGER_ACK["7. Ack buffer (cleanup)"] BADGER_ACK --> DELETE_ACK["8. Delete ack record"] style CLAIM fill:#1a2332,stroke:#4a6fa5,color:#e0e0e0 style TEMP fill:#1a2332,stroke:#4a6fa5,color:#e0e0e0 style BEGIN fill:#1e2d3d,stroke:#3ecfcf,color:#e0e0e0 style INSERT fill:#1e2d3d,stroke:#3ecfcf,color:#e0e0e0 style ACK_REC fill:#1e2d3d,stroke:#3ecfcf,color:#e0e0e0 style COMMIT fill:#1a2332,stroke:#3ecfcf,color:#e0e0e0 style CRASH_NOTE fill:#1e2d3d,stroke:#e8854a,color:#e0e0e0 style BADGER_ACK fill:#1a2332,stroke:#555,color:#e0e0e0 style DELETE_ACK fill:#1a2332,stroke:#555,color:#e0e0e0 {{< /mermaid >}} Steps 3–6 are one transaction. Steps 7–8 are idempotent cleanup. ## Endpoints **Public** (COLLECT_PORT, default 8080): | Method | Path | Description | |---|---|---| | `POST` | `/collect/{schema}/{table}` | Single event | | `POST` | `/collect/{schema}/{table}/batch` | Batch (atomic) | | `GET` | `/health` | Health check | **Admin** (COLLECT_ADMIN_PORT, default 8081, localhost only): | Method | Path | Description | |---|---|---| | `POST` | `/flush/.../claim` | Claim events | | `POST` | `/flush/.../ack` | Acknowledge flush | | `POST` | `/flush/.../nack` | Return to queue | | `GET` | `/flush/.../inflight` | List inflight claims | ## Validation `NOT NULL` columns must be present. Unknown fields are accepted but ignored during flush. `received_at` is auto-populated with server timestamp if not provided. ## Configuration ```bash COLLECT_PORT=9090 COLLECT_ADMIN_PORT=9091 ondatrasql daemon ``` | Variable | Default | Description | |---|---|---| | `COLLECT_PORT` | `8080` | Public endpoint | | `COLLECT_ADMIN_PORT` | `COLLECT_PORT + 1` | Internal flush API | ## Performance Single machine (Xeon W-2295), ~200 byte payloads: | Stage | Throughput | |---|---| | HTTP ingest | ~23,000 events/sec | | Flush to DuckLake | ~12,000 events/sec | No external systems. No cluster. ## Full Example ```sql -- models/raw/events.sql -- @kind: events event_name VARCHAR NOT NULL, page_url VARCHAR, user_id VARCHAR, session_id VARCHAR, received_at TIMESTAMPTZ ``` ```sql -- models/staging/sessions.sql -- @kind: view SELECT session_id, user_id, MIN(received_at) AS started_at, MAX(received_at) AS ended_at, COUNT(*) AS page_views FROM raw.events WHERE event_name = 'pageview' GROUP BY session_id, user_id ``` ```sql -- models/mart/daily_traffic.sql -- @kind: merge -- @unique_key: day SELECT DATE_TRUNC('day', started_at) AS day, COUNT(DISTINCT session_id) AS sessions, COUNT(DISTINCT user_id) AS visitors, SUM(page_views) AS total_views FROM staging.sessions GROUP BY 1 ``` ```bash # Start daemon ondatrasql daemon & # Send events curl -X POST localhost:8080/collect/raw/events \ -d '{"event_name":"pageview","page_url":"/home","user_id":"u1","session_id":"s1"}' # Run pipeline ondatrasql run # Query results ondatrasql sql "SELECT * FROM mart.daily_traffic" ``` --- ## Validation > Built-in data validation — no external tools required URL: https://ondatra.sh/validation/ OndatraSQL validates data as part of execution — not as a separate step. No external tools. No test framework. No extra runs. Every model can enforce: - **Constraints** → block bad data before it's written - **Audits** → detect regressions after materialization - **Warnings** → monitor without failing ```sql -- @constraint: user_id NOT NULL -- @constraint: email EMAIL -- @audit: row_count > 0 -- @audit: row_count_change < 20% -- @warning: null_count(email) < 5% ``` This ensures no invalid rows enter the table, no empty datasets are produced, sudden data drops fail the run, and minor issues are logged without blocking. ## When to Use What | Type | When it runs | On failure | Use for | |---|---|---|---| | Constraint | Before insert | Nothing written | Enforce correctness | | Audit | After insert | Rolled back | Detect anomalies | | Warning | After insert | Logged only | Monitor trends | ## Mental Model Validation happens in two stages: **Before data is written** → validate individual rows. NULL checks, uniqueness, ranges, formats. If a constraint fails, nothing is committed. **After data is written** → validate the result. Row count changes, distribution shifts, historical comparisons, cross-table reconciliation. These need the committed data and DuckLake snapshots. Warnings use the same patterns as audits but only log — they never block. ## Automatic Rollback If an audit fails, the transaction is rolled back via DuckLake time-travel. No bad data is committed. Your data remains consistent — automatically. ## Built on DuckLake Audits can use time-travel to compare against previous versions: ```sql -- This happens automatically inside audit patterns SELECT COUNT(*) FROM table AT (VERSION => prev_snapshot) ``` This enables regression detection, historical comparisons, and reproducible validation — without external tooling. ## Usage Define rules as directives in any model: ```sql -- @constraint: id PRIMARY KEY -- @audit: row_count > 0 -- @warning: row_count_change < 50% ``` See [Constraints](constraints/), [Audits](audits/), and [Warnings](warnings/) for all available patterns. --- ### Constraints > Validation is part of execution — not a separate step URL: https://ondatra.sh/validation/constraints/ Constraints stop bad data before it enters your tables. If a single row violates a constraint, the entire model fails and nothing is written. No separate testing framework. No post-hoc checks. Validation is part of execution. ## Why This Matters Most data tools validate data after it's already written. By then, bad data is already in your tables. OndatraSQL validates before insertion — so invalid data never lands. ## Mental Model Constraints are enforced on the result of your query before it is written. If they fail: - Nothing is inserted - Downstream models do not run ## Usage Add constraints directly in your model: ```sql -- @constraint: column_name PATTERN ``` You can define multiple constraints per model. ```sql -- @constraint: email EMAIL -- @constraint: total >= 0 -- @constraint: status IN ('pending', 'shipped', 'cancelled') -- @constraint: customer_id REFERENCES customers(id) ``` If any of these fail, the model does not run. ## Common Patterns | Pattern | Example | What it checks | |---|---|---| | `PRIMARY KEY` | `id PRIMARY KEY` | NOT NULL + UNIQUE | | `NOT NULL` | `email NOT NULL` | No NULL values | | `UNIQUE` | `code UNIQUE` | No duplicates | | `IN (...)` | `status IN ('active', 'inactive')` | Allowed values | | `>= N` | `total >= 0` | Minimum value | | `EMAIL` | `email EMAIL` | Email format | | `REFERENCES` | `customer_id REFERENCES customers(id)` | Foreign key | | `NULL_PERCENT` | `email NULL_PERCENT < 10` | Max NULL percentage | ## All 26 Patterns ### Identity & Nullability | Pattern | Example | Description | |---|---|---| | `PRIMARY KEY` | `id PRIMARY KEY` | NOT NULL + UNIQUE combined | | `NOT NULL` | `email NOT NULL` | Column must not be NULL | | `UNIQUE` | `code UNIQUE` | No duplicate values | | `(col1, col2) UNIQUE` | `(year, month) UNIQUE` | Composite uniqueness | | `NOT EMPTY` | `name NOT EMPTY` | Not NULL and not empty string | ### Comparison | Pattern | Example | Description | |---|---|---| | `>= N` | `age >= 0` | Greater than or equal | | `<= N` | `score <= 100` | Less than or equal | | `> N` | `amount > 0` | Greater than | | `< N` | `discount < 1` | Less than | | `= N` | `version = 1` | Equal to | | `!= N` | `status != 0` | Not equal to | | `BETWEEN x AND y` | `rating BETWEEN 1 AND 5` | Range check | ### Membership | Pattern | Example | Description | |---|---|---| | `IN (...)` | `status IN ('active', 'inactive')` | Allowed values (auto-quotes unquoted strings) | | `NOT IN (...)` | `role NOT IN ('deleted')` | Forbidden values | ### Pattern Matching | Pattern | Example | Description | |---|---|---| | `LIKE` | `email LIKE '%@%'` | SQL LIKE pattern | | `NOT LIKE` | `name NOT LIKE 'test%'` | Inverse LIKE | | `MATCHES` | `phone MATCHES '^\+[0-9]+'` | Regular expression | | `EMAIL` | `email EMAIL` | Email format validation | | `UUID` | `id UUID` | UUID format validation | ### String Length | Pattern | Example | Description | |---|---|---| | `LENGTH BETWEEN a AND b` | `name LENGTH BETWEEN 1 AND 255` | String length range | | `LENGTH = N` | `code LENGTH = 3` | Exact string length | ### Referential & Custom | Pattern | Example | Description | |---|---|---| | `REFERENCES` | `customer_id REFERENCES customers(id)` | Foreign key check | | `CHECK` | `total CHECK (total >= 0)` | Custom SQL expression | ### Statistical | Pattern | Example | Description | |---|---|---| | `AT_LEAST_ONE` | `email AT_LEAST_ONE` | At least one non-NULL value | | `NOT_CONSTANT` | `status NOT_CONSTANT` | At least 2 distinct values | | `NULL_PERCENT` | `email NULL_PERCENT < 10` | Maximum NULL percentage | | `DISTINCT_COUNT OP N` | `status DISTINCT_COUNT >= 3` | Number of distinct values (cardinality) | | `DUPLICATE_PERCENT < N` | `email DUPLICATE_PERCENT < 5` | Maximum duplicate percentage | | `SEQUENTIAL` | `id SEQUENTIAL` | No gaps in integer sequence | | `SEQUENTIAL(N)` | `id SEQUENTIAL(5)` | Gaps allowed only by step N | | `NO_OVERLAP` | `(start_date, end_date) NO_OVERLAP` | Time intervals don't overlap | ## Full Example ```sql -- @kind: merge -- @unique_key: order_id -- @constraint: order_id PRIMARY KEY -- @constraint: total >= 0 -- @constraint: status IN ('pending', 'shipped', 'delivered', 'cancelled') -- @constraint: status DISTINCT_COUNT >= 2 -- @constraint: customer_id REFERENCES customers(id) -- @constraint: email EMAIL -- @constraint: email DUPLICATE_PERCENT < 10 -- @constraint: order_date NOT NULL SELECT order_id, customer_id, email, total, status, order_date FROM raw.orders ``` ## Compared to Traditional Validation Traditional approach: - Write tests separately (dbt tests, Great Expectations) - Run after data is written - Failures require cleanup OndatraSQL: - Define constraints in the model - Run before data is written - Fail fast — no bad data enters the table --- ### Audits > Detect regressions — rolled back automatically if they fail URL: https://ondatra.sh/validation/audits/ Audits detect regressions after data is written. If an audit fails, the transaction is rolled back and the model fails. Constraints stop bad rows. Audits catch bad results. ## Why This Matters Some data issues can't be detected row by row. - Row counts suddenly drop - Distributions drift - Freshness breaks - Aggregates don't match source systems These are not row-level problems — they are dataset-level problems. That's what audits are for. ## When Audits Run Audits run inside the same transaction as the data write. This means: - Either everything succeeds - Or nothing is committed If any audit fails: - The transaction is rolled back - No data is committed - Downstream models do not run ## Usage Add audits directly in your model: ```sql -- @audit: PATTERN ``` You can define multiple audits per model. ```sql -- @audit: row_count > 0 -- @audit: row_count_change < 20% -- @audit: freshness(updated_at, 24h) -- @audit: reconcile_sum(total, raw.orders.total) -- @audit: distribution(status) STABLE(0.1) ``` If any of these fail, the entire model is rolled back. ## Constraints vs Audits **Constraints:** - Run before insert - Validate individual rows - Prevent bad data from entering **Audits:** - Run after insert - Validate the dataset as a whole - Catch regressions and anomalies ## Common Patterns | Pattern | Example | What it catches | |---|---|---| | `row_count > 0` | `row_count > 0` | Empty results | | `freshness(col, duration)` | `freshness(updated_at, 24h)` | Stale data | | `row_count_change < N%` | `row_count_change < 20%` | Sudden drops or spikes | | `min(col) OP N` | `min(price) >= 0` | Out-of-range values | | `max(col) OP N` | `max(price) <= 10000` | Out-of-range values | | `reconcile_sum(col, other.col)` | `reconcile_sum(total, raw.orders.total)` | Source/target mismatch | | `distribution(col) STABLE(N)` | `distribution(status) STABLE(0.1)` | Category drift | ## All 17 Patterns ### Row Count | Pattern | Example | Description | |---|---|---| | `row_count OP N` | `row_count >= 100` | Check total row count (supports `>=`, `<=`, `>`, `<`, `=`) | | `row_count_change < N%` | `row_count_change < 10%` | Row count change from previous run | ### Freshness | Pattern | Example | Description | |---|---|---| | `freshness(col, duration)` | `freshness(updated_at, 24h)` | Maximum age of most recent record (supports `h` for hours, `d` for days) | ### Statistical | Pattern | Example | Description | |---|---|---| | `mean(col) ...` | `mean(price) BETWEEN 10 AND 100` | Average value check (supports BETWEEN and comparisons) | | `stddev(col) < N` | `stddev(amount) < 25` | Standard deviation threshold | | `min(col) OP N` | `min(price) >= 0` | Minimum value check | | `max(col) OP N` | `max(price) <= 10000` | Maximum value check | | `sum(col) OP N` | `sum(amount) >= 0` | Aggregate sum check | | `zscore(col) < N` | `zscore(amount) < 3.0` | Statistical outlier detection | | `percentile(col, p) OP N` | `percentile(latency, 0.95) <= 500` | Quantile validation | ### Reconciliation | Pattern | Example | Description | |---|---|---| | `reconcile_count(table)` | `reconcile_count(raw.orders)` | Row counts must match | | `reconcile_sum(col, other.col)` | `reconcile_sum(total, raw.orders.total)` | Aggregate sums must match | ### Schema | Pattern | Example | Description | |---|---|---| | `column_exists(col)` | `column_exists(email)` | Verify column exists in output | | `column_type(col, type)` | `column_type(price, DECIMAL(18,2))` | Verify column type (supports precision) | ### Data Comparison | Pattern | Example | Description | |---|---|---| | `golden('path')` | `golden('tests/expected.csv')` | Compare output with golden dataset | | `distribution(col) STABLE(threshold)` | `distribution(status) STABLE(0.1)` | Categorical distribution stability | ## Full Example ```sql -- @kind: append -- @incremental: event_time -- @audit: row_count > 0 -- @audit: freshness(event_time, 24h) -- @audit: row_count_change < 50% -- @audit: min(amount) >= 0 -- @audit: zscore(amount) < 3.0 -- @audit: reconcile_sum(amount, raw.events.amount) SELECT event_id, event_time, amount, category FROM raw.events ``` ## Compared to Traditional Validation Traditional approach: - Run tests after data is written - Failures require manual cleanup - Data may already be used downstream OndatraSQL: - Audits run inside the transaction - Failures automatically roll back - Bad data never persists --- ### Warnings > Non-blocking validation — observe before you enforce URL: https://ondatra.sh/validation/warnings/ Warnings surface issues without stopping the pipeline. They let you observe data quality before enforcing it. Use warnings when you're not ready to fail — yet. ## Why Warnings Exist Not all data issues should block your pipeline. - You're introducing a new check - You're unsure about thresholds - You're monitoring a known issue - You want visibility before enforcement Warnings let you see problems without breaking production. ## Constraints vs Audits vs Warnings **Constraints:** - Fail before insert - Block bad rows **Audits:** - Fail after insert - Roll back bad results **Warnings:** - Do not fail - Log issues for visibility ## How Warnings Run Warnings run after data is written. They evaluate the same patterns as constraints and audits, but: - They do not fail the model - They do not trigger rollback - They are logged in execution output Warnings are not ignored — they are signals. They show up in logs, CLI output, and metadata, and can be tracked over time. ## From Warning to Enforcement A common workflow: 1. Start with a warning 2. Observe behavior over time 3. Tune thresholds 4. Promote to constraint or audit Validation evolves with your data — no rewrites needed. ## Usage ```sql -- @warning: PATTERN ``` Warnings support the same patterns as constraints and audits. You can use: - Row-level checks (`NOT NULL`, `UNIQUE`, `IN`, etc.) - Dataset checks (`row_count_change`, `freshness`, `distribution`, etc.) ### Dataset checks ```sql -- @warning: row_count_change < 50% -- @warning: mean(amount) BETWEEN 0 AND 1000 -- @warning: freshness(updated_at, 48h) -- @warning: distribution(status) STABLE(0.2) -- @warning: min(price) >= 0 ``` ### Row-level checks ```sql -- @warning: email NOT NULL -- @warning: status IN ('active', 'inactive', 'pending') -- @warning: price >= 0 -- @warning: sku UNIQUE -- @warning: email MATCHES '^[^@]+@[^@]+$' ``` ## Example ```sql -- @kind: merge -- @unique_key: product_id -- @constraint: product_id PRIMARY KEY -- @audit: row_count > 0 -- @warning: row_count_change < 50% -- @warning: mean(price) BETWEEN 1 AND 10000 -- @warning: description NOT NULL SELECT product_id, name, price, description FROM raw.products ``` In this example: - **Constraints** enforce correctness — primary key must be valid - **Audits** enforce expectations — row count must exist - **Warnings** monitor anomalies — without failing the run Large row count swings, unusual price averages, and missing descriptions generate log warnings but do not block execution. --- ## Scripting > Built-in scripting for ingestion — no Python required URL: https://ondatra.sh/scripting/ OndatraSQL uses Starlark for data ingestion and custom logic. No Python. No dependencies. No runtime setup. Write scripts that call APIs, handle pagination and auth, and emit rows directly into your pipeline. All in the same system as your SQL models. ## Mental Model Starlark models produce rows. SQL models transform them. Both are part of the same DAG. ## How It Works ```python # @kind: append # @incremental: updated_at resp = http.get("https://api.example.com/data") for item in resp.json: save.row({ "id": item["id"], "name": item["name"], "updated_at": item["updated_at"], }) ``` What happens: 1. Script fetches data 2. `save.row()` emits rows 3. OndatraSQL materializes the result into DuckLake Same directives, same DAG, same validation as SQL models. ## Why Starlark - No Python environment to manage - No dependency conflicts - Deterministic execution - Runs the same everywhere ## Built-in Modules Everything you need is built in — no imports required. No SDKs, no client libraries, no setup. | Module | What it does | |---|---| | [http](/scripting/http/) | API requests with retry, digest auth, mTLS | | [oauth](/scripting/oauth/) | OAuth 2.0 flows with auto-refresh | | [save](/scripting/save/) | Emit rows to the pipeline | | [query](/scripting/modules/#query) | Read DuckDB tables | | [incremental](/scripting/modules/#incremental) | Cursor state for incremental loads | | [env](/scripting/modules/#env) | Access environment variables | | [xml, csv](/scripting/modules/#data-formats) | Parse and encode data formats | | [url, crypto](/scripting/modules/) | URL building, hashing, signing | | [time, math, json, re](/scripting/modules/#standard-library) | Standard library | See [Language Reference](/scripting/language/) for Starlark syntax. ## Shared Libraries Reuse logic across models. Write once, use everywhere. ```python load("lib/helpers.star", "paginate") for page in paginate("https://api.example.com/users"): for user in page: save.row(user) ``` ### How load() Works - Resolved relative to project root - Executed once per run, cached across all `load()` calls - Nested loads supported (A loads B loads C) - Import cycles detected - Path traversal blocked Library modules have access to all built-ins except `save` (must be passed as a parameter to functions that write data). ## YAML Models Configure ingestion without writing code. Use when the logic already exists in `lib/`. ```yaml kind: append incremental: report_date source: gam_report config: network_code: ${GAM_NETWORK_CODE} dimensions: - AD_UNIT_NAME - DATE ``` ```python # lib/gam_report.star def gam_report(save, network_code="", dimensions=None): start = incremental.initial_value if incremental.is_backfill else incremental.last_value # ... fetch and paginate API ... for row in results: save.row(row) ``` See [Models](/concepts/models/#yaml-models) for the full YAML reference. ## Example: Paginated API ```python # @kind: append page = 1 while True: resp = http.get("https://api.example.com/users?page=" + str(page)) if len(resp.json) == 0: break for user in resp.json: save.row(user) page += 1 sleep(0.1) # rate limit ``` ## Global Functions ```python abort() # clean exit — 0 rows, no error fail("something went wrong") # stop with error sleep(1.5) # rate limiting print("debug info") # stderr (secrets auto-redacted) getvariable("currency") # read DuckDB session variable ``` Useful for retries, rate limits, and conditional execution. ## Secret Redaction Secrets are automatically removed from logs. No configuration required. ``` Bearer eyJhbG... → Bearer [REDACTED] token=abc123 → token=[REDACTED] ``` Safe by default — no accidental leaks in logs. --- ### HTTP Module > Built-in HTTP client for API ingestion URL: https://ondatra.sh/scripting/http/ Fetch data from APIs directly inside your pipeline. No Python. No requests. No separate ingestion tool. The `http` module is built into OndatraSQL and designed for production data ingestion. It is a predeclared global — no import needed. ## Why This Exists Most data stacks require a separate ingestion layer: - Python scripts - Airbyte connectors - Custom services With OndatraSQL, ingestion is part of the pipeline. You fetch data, transform it, and materialize it — in one runtime. ## Mental Model - `http.get()` fetches data - `save.row()` emits rows - OndatraSQL handles storage, validation, and materialization You don't write scripts around APIs — you write models. ## Production-Ready by Default - Automatic retries with exponential backoff - Rate limit handling (`Retry-After`) - OAuth, Basic, and Digest authentication - mTLS support (client certificates) - Built-in timeout and error handling ## HTTP Methods All methods follow the same pattern: ```python http.get(url, ...) http.post(url, ...) http.put(url, ...) http.patch(url, ...) http.delete(url, ...) ``` ### GET ```python resp = http.get(url) resp = http.get(url, headers={"Authorization": "Bearer " + token}) resp = http.get(url, params={"page": "1", "limit": "50"}) resp = http.get(url, headers=headers, timeout=60, retry=3) ``` ### POST ```python # JSON body (sets Content-Type: application/json automatically) resp = http.post(url, json={"key": "value"}) # Form data (sets Content-Type: application/x-www-form-urlencoded automatically) resp = http.post(url, data={"key": "value"}) # With headers and options resp = http.post(url, json=payload, headers=headers, timeout=60) ``` ### PUT / PATCH / DELETE ```python resp = http.put(url, json=payload, headers=headers) resp = http.patch(url, json=patch) resp = http.delete(url, headers=headers) ``` `json=` and `data=` are mutually exclusive — use one or the other. ## Response Object | Field | Type | Description | |---|---|---| | `status_code` | int | HTTP status code | | `text` | string | Raw response body | | `ok` | bool | `True` if status code is 200–299 | | `json` | any | Parsed JSON, or `None` if the response is not JSON | | `headers` | dict | Response headers | ## Options | Kwarg | Default | Description | |---|---|---| | `headers` | `None` | Request headers dict | | `params` | `None` | Query parameters dict (appended to URL) | | `json` | `None` | JSON body (dict, list, or string) | | `data` | `None` | Form-encoded body (dict) | | `timeout` | `30` | Request timeout in seconds | | `retry` | `0` | Number of retry attempts | | `backoff` | `1` | Initial backoff between retries in seconds | | `auth` | `None` | Authentication tuple — see [Authentication](#authentication) | | `cert` | `None` | Path to client certificate file (PEM) — see [mTLS](#mtls) | | `key` | `None` | Path to client private key file (PEM) | | `ca` | `None` | Path to custom CA certificate file (PEM) | ## Retry and Backoff APIs fail. OndatraSQL retries automatically. - Retries on 429 and 5xx errors - Exponential backoff with jitter - Respects `Retry-After` headers When the server sends a `Retry-After` header (on 429 or 503), OndatraSQL respects it — both integer seconds and HTTP-date (RFC 1123) formats are supported. The `Retry-After` value overrides the calculated backoff for that retry. ```python # Retry up to 3 times with 2s initial backoff (2s, 4s, 8s + jitter) resp = http.get(url, retry=3, backoff=2) ``` ## Authentication {#authentication} ### Basic Auth ```python # Basic auth (default scheme) resp = http.get(url, auth=("username", "password")) resp = http.get(url, auth=("username", "password", "basic")) ``` ### Digest Auth [HTTP Digest Authentication](https://datatracker.ietf.org/doc/html/rfc7616) (RFC 7616) is a challenge-response scheme. The client first receives a 401 with a nonce from the server, then re-sends the request with a computed hash. OndatraSQL handles this automatically. ```python # Digest auth — the challenge-response is handled transparently resp = http.get(url, auth=("username", "password", "digest")) ``` Supports MD5 and SHA-256 algorithms with `qop=auth`. When using digest auth, at least one retry is required for the challenge-response flow. OndatraSQL automatically sets `retry=1` if it's not already >= 1. ## mTLS (Mutual TLS) {#mtls} For APIs that require client certificate authentication (PSD2/Open Banking, government APIs, enterprise service-to-service), use the `cert` and `key` kwargs. ```python # Client certificate + private key resp = http.get(url, cert="/path/to/client.crt", key="/path/to/client.key") # With custom CA (for self-signed server certificates) resp = http.get(url, cert="/path/to/client.crt", key="/path/to/client.key", ca="/path/to/ca.crt", ) # Combine with other kwargs resp = http.post(url, json=payload, cert="client.crt", key="client.key", timeout=30, ) ``` - `cert` and `key` must be provided together - `ca` is optional — use it when the server uses a self-signed or private CA certificate - All paths are PEM-encoded files ## Example: Paginated API Typical ingestion loop: ```python # @kind: append token = oauth.token( token_url="https://auth.example.com/token", client_id=env.get("CLIENT_ID"), client_secret=env.get("CLIENT_SECRET"), ) page = 1 while True: resp = http.get( "https://api.example.com/users", headers={"Authorization": "Bearer " + token.access_token}, params={"page": str(page)}, retry=3, backoff=2, ) if not resp.ok: fail("API error: " + str(resp.status_code)) if len(resp.json["data"]) == 0: break for user in resp.json["data"]: save.row({ "id": user["id"], "name": user["name"], "email": user["email"], }) page = page + 1 sleep(0.1) # Rate limiting ``` ## Example: HMAC Request Signing {#hmac-signing} Some APIs (AWS, Stripe webhooks, payment providers) require HMAC-signed requests. Use the [crypto module](/scripting/modules/#crypto) to compute signatures. ```python # @kind: append payload = json.encode({"event": "payment", "amount": 100}) timestamp = str(int(time.now().unix)) # Sign the request signature = crypto.hmac_sha256( env.get("API_SECRET"), timestamp + "." + payload, ) resp = http.post("https://api.example.com/webhook", json=json.decode(payload), headers={ "X-Signature": signature, "X-Timestamp": timestamp, }, ) ``` ## Integrated with the Pipeline Data fetched via http is: - Buffered durably - Validated via constraints and audits - Versioned in DuckLake - Tracked in lineage No separate ingestion layer required. ## Compared to Traditional Ingestion Traditional approach: - Python scripts with requests - Airbyte connectors - Separate orchestration OndatraSQL: - HTTP client built into the runtime - Runs inside your pipeline - No extra services or languages --- ### Save Module > Emit rows that become the model output URL: https://ondatra.sh/scripting/save/ Turn script output into a table. Every row you emit becomes part of the model's result. `save` is how scripts produce data in OndatraSQL. It is a predeclared global — no import needed. ## Why This Exists In SQL models, your `SELECT` defines the output. In scripts, you generate rows manually. `save` bridges that gap — it lets you emit rows that OndatraSQL materializes. ## Mental Model - `http.get()` fetches data - You transform it in Starlark - `save.row()` emits rows - OndatraSQL materializes the result Scripts behave like `SELECT` statements — but with full control. Rows are collected incrementally during execution. You don't need to build the full dataset in memory — just emit rows as you process them. ## Methods Emit rows one by one or in batches: ### row Emit a single row: ```python save.row({ "id": 1, "name": "Alice", "email": "alice@example.com", }) ``` ### rows Emit multiple rows: ```python save.rows([ {"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}, ]) ``` ### count Get the current number of collected rows: ```python n = save.count() ``` ## Column Types Column types are inferred automatically from values: | Starlark Type | DuckDB Type | |---|---| | `int` | `BIGINT` | | `float` | `DOUBLE` | | `string` | `VARCHAR` | | `bool` | `BOOLEAN` | ## Example ```python # @kind: merge # @unique_key: product_id resp = http.get("https://api.example.com/products") for product in resp.json: save.row({ "product_id": product["id"], "name": product["name"], "price": product["price"], "in_stock": product["stock"] > 0, "updated_at": product["updated_at"], }) print("Saved", save.count(), "products") ``` Each `save.row()` call adds one row to the output table. At the end of the script, OndatraSQL materializes the collected rows. ## Integrated with the Pipeline Rows collected via `save` are: - Validated with constraints and audits - Written using the model's `@kind` (append, merge, etc.) - Tracked in lineage and metadata - Versioned in DuckLake You don't handle storage — only data. ## Compared to Traditional Ingestion Traditional approach: - Fetch data in Python - Store in memory or files - Load into a database separately OndatraSQL: - Fetch, transform, emit rows - One step, one runtime --- ### OAuth Module > Built-in OAuth for API authentication URL: https://ondatra.sh/scripting/oauth/ Handle OAuth automatically in your pipeline. No token storage. No refresh logic. No glue code. The `oauth` module manages tokens for you — including automatic refresh. It is a predeclared global — no import needed. ## Why This Exists Most API ingestion requires manual token handling: - Store tokens - Track expiration - Refresh before expiry - Retry failed requests With OndatraSQL, this is built in. You request a token. The runtime keeps it valid. ## Mental Model - `oauth.token()` creates a managed token - `token.access_token` always returns a valid token - Refresh happens automatically when needed You don't manage authentication — you use it. ## Automatic Token Refresh Tokens are refreshed automatically before they expire. - No manual refresh logic - No expiry tracking - No retry handling required Accessing `token.access_token` always gives you a valid token — even for long-running scripts. ## oauth.token Returns a managed token that stays valid automatically. ### Client Credentials ```python token = oauth.token( token_url="https://auth.example.com/oauth/token", client_id=env.get("CLIENT_ID"), client_secret=env.get("CLIENT_SECRET"), scope="read write", ) resp = http.get(url, headers={"Authorization": "Bearer " + token.access_token}) ``` | Argument | Required | Description | |---|---|---| | `token_url` | yes | OAuth token endpoint | | `client_id` | yes | Client ID | | `client_secret` | yes | Client secret | | `scope` | no | Requested scope | ### Google Service Account Use service accounts for Google APIs (BigQuery, Ad Manager, etc.). ```python token = oauth.token( google_service_account=env.get("GOOGLE_KEY_JSON"), scope="https://www.googleapis.com/auth/bigquery.readonly", ) # Or from file: token = oauth.token( google_key_file="service-account.json", scope="https://www.googleapis.com/auth/admanager", ) ``` | Argument | Required | Description | |---|---|---| | `google_service_account` | no | Inline JSON key string | | `google_key_file` | no | Path to JSON key file | | `scope` | no | OAuth scope | One of `google_service_account` or `google_key_file` is required for the Google flow. ## Works with the HTTP Module Use the token directly in requests: ```python resp = http.get(url, headers={ "Authorization": "Bearer " + token.access_token, }) ``` Authentication, retries, and data ingestion work together seamlessly. ## Full Example ```python # @kind: append token = oauth.token( token_url="https://auth.example.com/oauth/token", client_id=env.get("CLIENT_ID"), client_secret=env.get("CLIENT_SECRET"), scope="read:users", ) page = 1 while True: resp = http.get( "https://api.example.com/users", headers={"Authorization": "Bearer " + token.access_token}, params={"page": str(page)}, ) users = resp.json["data"] if not users: break for user in users: save.row({"id": user["id"], "name": user["name"]}) page += 1 ``` ## oauth.basic_auth HTTP Basic Authentication header: ```python header = oauth.basic_auth(env.get("USERNAME"), env.get("PASSWORD")) # "Basic base64encoded..." ``` ## Compared to Traditional OAuth Traditional approach: - Use OAuth libraries - Store tokens in files or databases - Implement refresh logic - Handle expiration errors OndatraSQL: - Request a token once - Use `token.access_token` - Refresh handled automatically --- ### Runtime Modules > A built-in runtime for writing data pipelines URL: https://ondatra.sh/scripting/modules/ OndatraSQL includes a built-in runtime for writing data pipelines. No imports. No dependencies. Everything you need is available by default. All modules are predeclared globals, available in both model scripts and shared library modules loaded via `load()`. ## Mental Model Scripts in OndatraSQL are not standalone programs. They run inside a data pipeline runtime: - `query()` reads from DuckDB - `http` fetches external data - `save.row()` writes output - `incremental` provides state You're not wiring systems together — you're writing the pipeline itself. ## Core Runtime ### query() — Read from Your Pipeline {#query} Use `query()` to read from any table in your pipeline. Dependencies are detected automatically. ```python rows = query("SELECT id, name FROM staging.customers WHERE active = true") for row in rows: print(row["id"], row["name"]) ``` Returns a `list` of `dict`, where every value is a `string`. An empty result set returns an empty list. ```python # CTE queries work too rows = query(""" WITH recent AS ( SELECT * FROM raw.orders WHERE created_at > '2026-01-01' ) SELECT customer_id, COUNT(*) AS cnt FROM recent GROUP BY 1 """) ``` This enables: - **Reverse ETL** — read from DuckDB, push to external API - **Enrichment workflows** — join API data with existing tables - **Feedback loops** — read, send, write back Without leaving the pipeline. **Reverse ETL:** ```python rows = query("SELECT * FROM mart.customers WHERE synced = false") for row in rows: http.post("https://api.hubspot.com/contacts", json=row) save.row({"id": row["id"], "synced_at": str(time.now())}) ``` **Cross-source enrichment:** ```python existing = query("SELECT id, email FROM staging.users") for user in existing: profile = http.get("https://api.example.com/profiles/" + user["id"]) save.row({"id": user["id"], "email": user["email"], "bio": profile.json.get("bio", "")}) ``` Table references in `query()` string literals are automatically extracted for DAG ordering. If your script queries `staging.customers`, OndatraSQL ensures that model runs before yours. {{< callout type="warning" >}} Only string literals are analyzed. Dynamic SQL (`query("SELECT * FROM " + table)`) won't be detected — the dependency must be declared manually or restructured as a literal string. {{< /callout >}} `query()` rejects mutating statements — `INSERT`, `UPDATE`, `DELETE`, `DROP`, `CREATE`, etc. are blocked before execution. ### incremental — Pipeline State {#incremental} Access the state of previous runs. Used for cursor-based ingestion from APIs. | Field | Type | Description | |---|---|---| | `is_backfill` | bool | `True` if target table doesn't exist | | `cursor` | string | Cursor column name | | `last_value` | string | MAX(cursor) from target | | `last_run` | string | Timestamp of last commit | | `initial_value` | string | Starting cursor value | The `incremental` module is read-only state — available directly from library functions without being passed as a parameter. See [Incremental Models](/concepts/incremental/) for details. ```python # @kind: append # @incremental: updated_at url_str = "https://api.example.com/orders" if not incremental.is_backfill: url_str = url_str + "?since=" + incremental.last_value resp = http.get(url_str) for order in resp.json: save.row(order) ``` ## External Access ### env Access environment variables (from `.env` or shell). ```python value = env.get("API_KEY") value = env.get("API_KEY", default="fallback") env.set("TEMP_VAR", "some_value") ``` | Method | Description | |---|---| | `get(name, default?)` | Get variable value with optional fallback | | `set(name, value)` | Set variable for current session | ### url Build and parse URLs. ```python full = url.build("https://api.com/data", params={"page": "1", "limit": "50"}) # "https://api.com/data?limit=50&page=1" encoded = url.encode("hello world") # "hello+world" params = url.encode_params({"foo": "bar", "baz": "qux"}) # "baz=qux&foo=bar" parsed = url.parse("https://api.com/data?page=1") # parsed.scheme == "https", parsed.host == "api.com", parsed.path == "/data" ``` ### crypto Cryptographic functions for hashing, encoding, and signing. ```python # Base64 encoded = crypto.base64_encode("hello") decoded = crypto.base64_decode(encoded) # Hashing sha = crypto.sha256("data") md = crypto.md5("data") # HMAC signing sig = crypto.hmac_sha256("secret_key", "message") # UUID generation id = crypto.uuid() # Random string (alphanumeric, default length 32) s = crypto.random_string() s = crypto.random_string(length=16) ``` #### HMAC Request Signing {#hmac-signing} Use `crypto.hmac_sha256` to sign API requests. This is the same mechanism used by AWS Signature v4, Stripe webhook verification, and similar schemes. ```python # Sign a request payload secret = env.get("API_SECRET") timestamp = str(int(time.now().unix)) body = json.encode({"event": "payment", "amount": 100}) signature = crypto.hmac_sha256(secret, timestamp + "." + body) resp = http.post("https://api.example.com/events", json=json.decode(body), headers={ "X-Signature": signature, "X-Timestamp": timestamp, }, ) ``` ## Data Handling {#data-formats} ### xml Parse and encode XML data. ```python data = xml.decode('Alice30') # {"user": {"name": "Alice", "age": "30"}} output = xml.encode({"user": {"name": "Alice", "age": "30"}}) ``` XML attributes are prefixed with `@`: ```python data = xml.decode('Widget') # {"item": {"@id": "42", "name": "Widget"}} ``` | Method | Description | |---|---| | `decode(string)` | Parse XML string to dict | | `encode(dict)` | Convert dict to XML string | ### csv Parse and encode CSV data. ```python # With header row (default) — returns list of dicts rows = csv.decode("name,age\nAlice,30\nBob,25") # [{"name": "Alice", "age": "30"}, {"name": "Bob", "age": "25"}] # Without header — returns list of lists rows = csv.decode("Alice,30\nBob,25", header=False) # Tab-separated rows = csv.decode(data, delimiter="\t") # Encode list of dicts to CSV output = csv.encode([{"name": "Alice", "age": "30"}]) # Encode list of lists with explicit header output = csv.encode([["Alice", "30"]], header=["name", "age"]) ``` | Method | Description | |---|---| | `decode(data, delimiter?, header?)` | Parse CSV string. `delimiter` defaults to `","`, `header` defaults to `True` | | `encode(rows, header?)` | Encode rows to CSV string. `header` is optional list of column names | ### json JSON encoding and decoding. ```python s = json.encode({"name": "Alice", "age": 30}) data = json.decode('{"name": "Alice", "age": 30}') ``` ## Standard Library {#standard-library} ### time Date and time operations. See the [full API reference](https://pkg.go.dev/go.starlark.net/lib/time). ```python now = time.now() t = time.parse_time("2024-03-15T10:30:00Z") t = time.time(year=2024, month=3, day=15) t.year, t.month, t.day, t.hour, t.minute, t.second t.unix # Unix timestamp (seconds) t.format("2006-01-02") # Go layout tomorrow = t + time.parse_duration("24h") yesterday = t - time.parse_duration("24h") ``` ### math Mathematical functions. See the [full API reference](https://pkg.go.dev/go.starlark.net/lib/math). ```python math.ceil(1.5) # 2 math.floor(1.5) # 1 math.round(1.5) # 2 math.sqrt(16) # 4.0 math.abs(-5) # 5 math.pow(2, 10) # 1024.0 math.pi # 3.141592653589793 math.e # 2.718281828459045 ``` ### re Regular expressions (Python `re`-compatible). See the [full API reference](https://github.com/magnetde/starlark-re). ```python m = re.search(r"(\d+)-(\d+)", "order-123-456") m.group(1) # "123" re.findall(r"\d+", "a1b2c3") # ["1", "2", "3"] re.split(r"\s+", "hello world") # ["hello", "world"] re.sub(r"\d+", "X", "a1b2c3") # "aXbXcX" ``` ## Putting It Together A typical ingestion model: 1. Authenticate (`oauth`) 2. Read state (`incremental`) 3. Fetch data (`http`) 4. Transform (Starlark) 5. Write output (`save`) All inside one runtime. ## What You Don't Need - No Python environment - No requests library - No OAuth libraries - No state storage - No SDKs Everything is built in. --- ### Language Reference > A simpler alternative to Python for pipelines URL: https://ondatra.sh/scripting/language/ OndatraSQL scripts use [Starlark](https://github.com/google/starlark-go) — a Python-like language designed for data pipelines. No packages. No virtual environments. No runtime setup. If you know Python, you can write pipelines immediately. ## Why Starlark? Starlark is a restricted, deterministic subset of Python. - No hidden state - No external dependencies - No runtime environment This makes pipelines reproducible and easy to reason about. ## Differences from Python Starlark is intentionally simpler: | Python | Starlark | | --- | --- | | `import module` | `load("lib/module.star", "func")` | | `try / except` | Not available — errors stop the script | | `class Foo:` | Not available — use dicts and functions | | `with open(f):` | Not available — no file I/O | | `for c in "hello":` | `for c in "hello".elems():` | | `2 ** 10` | `math.pow(2, 10)` | | `yield / async` | Not available | | `global / nonlocal` | Not available | | `x is y` | Use `==` | This keeps pipelines predictable and portable. ## In OndatraSQL Starlark is used for: - Data ingestion (HTTP APIs, files) - Custom logic between models - Reverse ETL workflows It runs inside the OndatraSQL runtime — with access to: - `http` (API calls) - `query()` (read from DuckDB) - `save` (write output) - `incremental` (pipeline state) ## Example: A Simple Pipeline Script ```python # @kind: append resp = http.get("https://api.example.com/users") for user in resp.json: save.row({ "id": user["id"], "name": user["name"], }) ``` This fetches data from an API, transforms it, and writes it into your pipeline. All in one script. ## Basics Everything works like Python. ```python # Numbers count = 42 rate = 0.95 # Strings name = "Alice" query = 'SELECT * FROM users' # Booleans active = True deleted = False # None result = None # Lists (mutable) items = [1, 2, 3] items.append(4) # Tuples (immutable) pair = (1, 2) # Dicts (mutable, ordered) user = {"name": "Alice", "age": 30} # Sets (mutable) tags = set(["a", "b", "c"]) ``` ## Control Flow ```python # If / elif / else if resp.ok: process(resp.json) elif resp.status_code == 429: sleep(1) else: fail("Error: " + str(resp.status_code)) # For loops for item in items: save.row(item) for i, item in enumerate(items): print(i, item) for i in range(10): print(i) # While loops page = 1 while True: resp = http.get(url + "?page=" + str(page)) if len(resp.json) == 0: break for item in resp.json: save.row(item) page += 1 ``` ## Functions and Lambdas ```python def fetch_page(url, page=1, limit=50): return http.get( url + "?page=" + str(page) + "&limit=" + str(limit), ) # Lambda expressions double = lambda x: x * 2 sorted(users, key=lambda u: u["name"]) ``` ## load() — Shared Libraries Import functions and values from other Starlark files using `load()`: ```python load("lib/pagination.star", "paginate", "DEFAULT_PAGE_SIZE") load("lib/auth.star", "get_token") token = get_token() for page in paginate(url, page_size=DEFAULT_PAGE_SIZE): process(page) ``` ### Syntax ```python load("path/to/module.star", "name1", "name2") # import by name load("path/to/module.star", alias="original_name") # import with alias ``` Paths are relative to the project root. Only files inside the project directory can be loaded — path traversal and symlinks pointing outside the project are rejected. ### Caching Each module is executed once per model run. Multiple `load()` calls — including from different nested modules — share the cached result. ### Library Scope Loaded modules have access to all built-in modules (`http`, `oauth`, `json`, etc.) except `save` and `incremental`. Functions that need `save` should accept it as a parameter: ```python # lib/fetcher.star def fetch_users(save, url): resp = http.get(url) for user in resp.json: save.row(user) ``` This is the pattern used by [YAML models](/concepts/models/#yaml-models), where `save` is passed automatically. ## String Formatting ```python # %-operator msg = "Fetched %d rows from %s" % (count, source) price = "Total: $%.2f" % amount # .format() method msg = "Page {} of {}".format(page, total) msg = "Page {page} of {total}".format(page=1, total=10) ``` Format specifiers: `%s` (string), `%d` (integer), `%f` (float), `%x` (hex), `%o` (octal). ## Comprehensions ```python # List comprehension ids = [item["id"] for item in resp.json] active = [u for u in users if u["status"] == "active"] # Dict comprehension lookup = {row.id: row.name for row in duckdb.sql("SELECT id, name FROM t")} # Nested flat = [cell for row in matrix for cell in row] ``` ## Built-in Functions | Function | Description | | --- | --- | | `abs(x)` | Absolute value | | `all(x)` | `True` if all elements are truthy | | `any(x)` | `True` if any element is truthy | | `bool(x)` | Convert to boolean | | `chr(i)` | Unicode code point to string | | `dict(pairs)` | Create dict | | `dir(x)` | List attributes/methods | | `enumerate(x)` | Yields `(index, value)` pairs | | `float(x)` | Convert to float | | `getattr(x, name)` | Get attribute by name | | `hasattr(x, name)` | Check if attribute exists | | `hash(x)` | Hash a string | | `int(x)` | Convert to int | | `len(x)` | Number of elements | | `list(x)` | Create list from iterable | | `max(x)` | Largest value | | `min(x)` | Smallest value | | `ord(s)` | String to Unicode code point | | `print(*args)` | Print to stderr | | `range(n)` | Sequence of integers | | `repr(x)` | Debug representation | | `reversed(x)` | Reversed sequence | | `set(x)` | Create set | | `sorted(x, key?, reverse?)` | Sorted list | | `str(x)` | Convert to string | | `tuple(x)` | Create tuple | | `type(x)` | Type name as string | | `zip(a, b)` | Zip two sequences | ## String Methods | Method | Description | | --- | --- | | `s.capitalize()` | First letter uppercase | | `s.count(sub)` | Count occurrences | | `s.endswith(suffix)` | Check suffix | | `s.find(sub)` | Find index, -1 if not found | | `s.format(*args, **kwargs)` | Format string | | `s.index(sub)` | Find index, error if not found | | `s.isalnum()` | Alphanumeric only? | | `s.isalpha()` | Letters only? | | `s.isdigit()` | Digits only? | | `s.islower()` | Lowercase only? | | `s.isspace()` | Whitespace only? | | `s.istitle()` | Title case? | | `s.isupper()` | Uppercase only? | | `s.join(iterable)` | Join strings | | `s.lower()` | To lowercase | | `s.lstrip(chars?)` | Strip left | | `s.upper()` | To uppercase | | `s.strip(chars?)` | Strip both sides | | `s.rstrip(chars?)` | Strip right | | `s.replace(old, new)` | Replace substring | | `s.split(sep?)` | Split string | | `s.splitlines()` | Split on newlines | | `s.startswith(prefix)` | Check prefix | | `s.removeprefix(prefix)` | Remove prefix | | `s.removesuffix(suffix)` | Remove suffix | | `s.partition(sep)` | Split into 3 parts | | `s.rpartition(sep)` | Split into 3 parts from right | | `s.title()` | Title Case | | `s.elems()` | Iterate bytes | | `s.codepoints()` | Iterate unicode characters | **Note:** Strings are not directly iterable. Use `s.elems()` or `s.codepoints()` to iterate over characters. ## List Methods | Method | Description | | --- | --- | | `l.append(x)` | Add element | | `l.clear()` | Remove all elements | | `l.extend(iterable)` | Add all elements from iterable | | `l.index(x)` | Find index | | `l.insert(i, x)` | Insert at position | | `l.pop(i?)` | Remove and return element | | `l.remove(x)` | Remove first occurrence | Supports slicing: `l[1:3]`, `l[::-1]`, `l[::2]`. ## Dict Methods | Method | Description | | --- | --- | | `d.clear()` | Remove all entries | | `d.get(key, default?)` | Get with fallback | | `d.items()` | List of `(key, value)` tuples | | `d.keys()` | List of keys | | `d.pop(key, default?)` | Remove and return value | | `d.popitem()` | Remove and return first entry | | `d.setdefault(key, default?)` | Get or set default | | `d.update(pairs)` | Merge entries | | `d.values()` | List of values | ## Set Methods | Method | Description | | --- | --- | | `s.add(x)` | Add element | | `s.clear()` | Remove all elements | | `s.discard(x)` | Remove if present | | `s.remove(x)` | Remove, error if missing | | `s.pop()` | Remove and return an element | | `s.union(other)` | Union (`s \| other`) | | `s.intersection(other)` | Intersection (`s & other`) | | `s.difference(other)` | Difference | | `s.symmetric_difference(other)` | Symmetric difference | | `s.issubset(other)` | Subset check | | `s.issuperset(other)` | Superset check | You don't need more than this. Combined with the [built-in runtime modules](/scripting/modules/), this is enough to build complete data pipelines. --- ## Examples > Real pipelines. No infrastructure. URL: https://ondatra.sh/examples/ Real pipelines. No infrastructure. These examples show how to build complete data workflows with OndatraSQL — from ingestion to analytics. Each example is a working model you can run locally. ## Ingestion Fetch data from APIs or collect events — no connectors, no brokers. - [Ingest from a REST API](ingest-rest-api) — No Python. No connectors. No scheduler. - [YAML Model with Shared Library](yaml-model) — Reusable source functions for any API. - [Track Product Analytics](track-product-analytics) — No Kafka. No external event tools. ## Transformation Use SQL models to clean, join, and reshape your data. - [Build Daily Metrics](build-daily-metrics) — Three-layer pipeline: raw → staging → mart. - [Incremental SQL Model](incremental-model) — Append new data automatically with cursor tracking. ## Change Tracking Handle updates, history, and incremental processing automatically. - [SCD2 Dimension](scd2-dimension) — Track full history of dimension changes. ## Validation Ensure your data is correct before and after it's written. - [Validated Model](validated-model) — Constraints, audits, and warnings in one model. ## Try It Yourself Pick any example, copy it into a new project, and run: ```bash ondatrasql run ``` That's it. --- ### Ingest from a REST API > Fetch data from a real API — no Python, no pip, no setup URL: https://ondatra.sh/examples/ingest-rest-api/ Fetch European country data from the [REST Countries API](https://restcountries.com). No Python, no pip. ## 1. Write the Script ```python # models/raw/countries.star # @kind: table resp = http.get( "https://restcountries.com/v3.1/region/europe?fields=name,capital,population,area,region,subregion", retry=3, ) if not resp.ok: fail("API returned " + str(resp.status_code)) for country in resp.json: capital = "" if country.get("capital") and len(country["capital"]) > 0: capital = country["capital"][0] save.row({ "name": country["name"]["common"], "capital": capital, "population": country["population"], "area_km2": country.get("area", 0), "subregion": country.get("subregion", ""), }) print("Fetched " + str(save.count()) + " countries") ``` No API key needed. No environment setup. Just run it. ## 2. Run ```bash ondatrasql run raw.countries ``` ``` Fetched 53 countries [OK] raw.countries (table, backfill, 53 rows, 799ms) ``` ## 3. Transform Downstream ```sql -- models/staging/stg_countries.sql -- @kind: table -- @constraint: name NOT NULL -- @constraint: population >= 0 -- @audit: row_count > 0 SELECT name, capital, population, area_km2, subregion, CASE WHEN population > 50000000 THEN 'large' WHEN population > 10000000 THEN 'medium' ELSE 'small' END AS size_category FROM raw.countries WHERE population > 0 ``` ```sql -- models/mart/population_by_subregion.sql -- @kind: table -- @audit: row_count > 0 SELECT subregion, COUNT(*) AS countries, SUM(population) AS total_population, ROUND(AVG(population)) AS avg_population, ROUND(SUM(area_km2)) AS total_area_km2 FROM staging.stg_countries GROUP BY subregion ORDER BY total_population DESC ``` ## 4. Run the Full Pipeline ```bash ondatrasql run ``` ``` Running 3 models... Fetched 53 countries [OK] raw.countries (table, full, 53 rows, 820ms) [OK] staging.stg_countries (table, backfill, 53 rows, 440ms) [OK] mart.population_by_subregion (table, backfill, 6 rows, 243ms) ``` OndatraSQL detects that `stg_countries` depends on `countries` and runs them in the right order. ## What's Built In - `http.get`, `http.post` with retry, timeout, headers - `oauth.token` for OAuth 2.0 flows - `env.get` for secrets from `.env` - `incremental` for cursor-based loading - `save.row` and `save.rows` for output No external packages. No virtual environments. Everything in one binary. --- ### YAML Model with Shared Library > Reusable source function with YAML configuration — one connector, many models URL: https://ondatra.sh/examples/yaml-model/ ## Overview This example shows the three-layer pattern: a Starlark source function for connector logic, YAML models for configuration, and SQL for downstream transformations. Uses the [Open-Meteo API](https://open-meteo.com) — no API key required. ## Source Function Create a reusable weather connector in `lib/`: ```python # lib/open_meteo.star def open_meteo(save, latitude="59.33", longitude="18.07", city="Stockholm", past_days="7"): """Fetch weather data from Open-Meteo API.""" resp = http.get( "https://api.open-meteo.com/v1/forecast", params={ "latitude": latitude, "longitude": longitude, "daily": "temperature_2m_max,temperature_2m_min,precipitation_sum", "timezone": "auto", "past_days": past_days, }, retry=3, ) if not resp.ok: fail("Open-Meteo API returned " + str(resp.status_code)) days = resp.json["daily"]["time"] temp_max = resp.json["daily"]["temperature_2m_max"] temp_min = resp.json["daily"]["temperature_2m_min"] precip = resp.json["daily"]["precipitation_sum"] for i in range(len(days)): save.row({ "date": days[i], "temp_max_c": temp_max[i], "temp_min_c": temp_min[i], "precip_mm": precip[i], "city": city, }) print("Fetched " + str(save.count()) + " days for " + city) ``` The function receives `save` as its first argument for collecting rows. All other built-in modules (`http`, `json`, etc.) are available as globals. ## YAML Models Reference the source function from YAML models — one connector, many cities: ```yaml # models/raw/weather_stockholm.yaml kind: table source: open_meteo config: latitude: "59.33" longitude: "18.07" city: Stockholm ``` ```yaml # models/raw/weather_london.yaml kind: table source: open_meteo config: latitude: "51.51" longitude: "-0.13" city: London ``` ```yaml # models/raw/weather_berlin.yaml kind: table source: open_meteo config: latitude: "52.52" longitude: "13.41" city: Berlin ``` No code to write — the YAML model specifies what to fetch, the source function handles how. ## Running ```bash # Run one city ondatrasql run raw.weather_london ``` ``` Fetched 14 days for London [OK] raw.weather_london (table, backfill, 14 rows, 463ms) ``` ```bash # Run all models ondatrasql run ``` One connector, many models — each with its own configuration. ## Using load() Directly If you need more control than YAML provides, use `load()` in a Starlark script: ```python # models/raw/weather_custom.star # @kind: table load("lib/open_meteo.star", "open_meteo") open_meteo(save, latitude="48.86", longitude="2.35", city="Paris", past_days="14") ``` --- ### Track Product Analytics > Collect events from your app, transform them into metrics, and query the results URL: https://ondatra.sh/examples/track-product-analytics/ Collect pageview events from your app via HTTP. Transform them into daily metrics. Query the results. No Kafka, no external tools. ## 1. Define the Event Schema ```sql -- models/raw/events.sql -- @kind: events event_name VARCHAR NOT NULL, page_url VARCHAR, user_id VARCHAR, session_id VARCHAR, received_at TIMESTAMPTZ ``` ## 2. Configure Ports Add to `.env`: ``` COLLECT_PORT=8080 COLLECT_ADMIN_PORT=8081 ``` The public port receives events. The admin port is used by `ondatrasql run` to flush events into the pipeline. ## 3. Start the Daemon ```bash ondatrasql daemon ``` ``` Event daemon starting... Public: :8080 (POST /collect/{schema}/{table}) Admin: 127.0.0.1:8081 (flush endpoints) Store: .ondatra/events Models: raw.events ``` ## 4. Send Events From your app, tracking script, or curl: ```bash curl -X POST localhost:8080/collect/raw/events \ -H "Content-Type: application/json" \ -d '{"event_name":"pageview","page_url":"/pricing","user_id":"u42","session_id":"s1"}' ``` Events are buffered durably on disk. They survive crashes. ## 5. Transform with SQL Sessions: ```sql -- models/staging/sessions.sql -- @kind: view SELECT session_id, user_id, MIN(received_at) AS started_at, MAX(received_at) AS ended_at, COUNT(*) AS page_views FROM raw.events WHERE event_name = 'pageview' GROUP BY session_id, user_id ``` Daily metrics: ```sql -- models/mart/daily_traffic.sql -- @kind: merge -- @unique_key: day SELECT DATE_TRUNC('day', started_at) AS day, COUNT(DISTINCT session_id) AS sessions, COUNT(DISTINCT user_id) AS visitors, SUM(page_views) AS total_views FROM staging.sessions GROUP BY 1 ``` ## 6. Run the Pipeline In a separate terminal (same project directory): ```bash ondatrasql run ``` ``` Running 3 models... [OK] raw.events (events, flush, 3 rows, 380ms — event flush) [OK] staging.sessions (view, create, 0 rows, 158ms — first run) [OK] mart.daily_traffic (merge, backfill, 1 rows, 182ms — first run) ``` Events are flushed from the buffer into `raw.events`. The view resolves live. The mart is materialized. ## 7. Query ```bash ondatrasql sql "SELECT * FROM mart.daily_traffic ORDER BY day DESC LIMIT 7" ``` ## What You Get - Events buffered durably — no data loss on crash - Automatic flush on pipeline run - One command runs everything — flush, transform, materialize - No Kafka, no Airflow, no external database --- ### Build Daily Metrics > Create a multi-layer SQL pipeline with incremental updates and validation URL: https://ondatra.sh/examples/build-daily-metrics/ Build a three-layer pipeline: raw → staging → mart. Incremental updates. Validation. Run with one command. ## 1. Raw Layer Source data. In production this comes from an API or events. For this guide, inline data: ```sql -- models/raw/orders.sql -- @kind: table SELECT * FROM (VALUES (1, 'Alice', 100, 'shipped', '2026-03-28'), (2, 'Bob', 250, 'shipped', '2026-03-28'), (3, 'Charlie', 75, 'pending', '2026-03-29'), (4, 'Alice', 200, 'shipped', '2026-03-29'), (5, 'Diana', 150, 'cancelled','2026-03-30') ) AS t(order_id, customer, amount, status, order_date) ``` ## 2. Staging Layer Clean and validate: ```sql -- models/staging/stg_orders.sql -- @kind: table -- @constraint: order_id NOT NULL -- @constraint: amount > 0 SELECT order_id, LOWER(TRIM(customer)) AS customer, CAST(amount AS DOUBLE) AS amount, status, CAST(order_date AS DATE) AS order_date FROM raw.orders ``` Constraints run before materialization. Bad data is blocked. ## 3. Mart Layer Aggregate into daily metrics: ```sql -- models/mart/daily_revenue.sql -- @kind: merge -- @unique_key: order_date -- @audit: row_count > 0 SELECT order_date, COUNT(*) AS orders, SUM(amount) AS revenue, SUM(CASE WHEN status = 'shipped' THEN amount ELSE 0 END) AS shipped_revenue, ROUND(SUM(CASE WHEN status = 'shipped' THEN amount ELSE 0 END) / SUM(amount) * 100, 1) AS ship_rate FROM staging.stg_orders GROUP BY order_date ``` The audit checks that at least one row exists after materialization. If it fails, the table is rolled back. ## 4. Run ```bash ondatrasql run ``` ``` Running 3 models... [OK] raw.orders (table, backfill, 5 rows, 156ms — first run) [OK] staging.stg_orders (table, backfill, 5 rows, 233ms — first run) [OK] mart.daily_revenue (merge, backfill, 3 rows, 296ms — first run) ``` ## 5. Query ```bash ondatrasql sql "SELECT * FROM mart.daily_revenue ORDER BY order_date" ``` ## 6. Run Again Change nothing. Run again: ```bash ondatrasql run ``` ``` Running 3 models... [OK] raw.orders (table, skip, 0 rows, 0s — no dependencies) [OK] staging.stg_orders (table, skip, 0 rows, 0s — deps unchanged) [OK] mart.daily_revenue (merge, incremental, 0 rows, 296ms — unchanged) ``` Nothing changed, nothing ran. No wasted I/O. ## 7. Preview Changes Edit `raw/orders.sql` to add a row. Then: ```bash ondatrasql sandbox ``` See the row count diff, downstream propagation, and validation results — before committing anything. ## The DAG OndatraSQL detected the dependency chain automatically: ``` raw.orders → staging.stg_orders → mart.daily_revenue ``` You never declared it. The SQL AST did. --- ### Incremental SQL Model > Build an incremental append model with cursor tracking URL: https://ondatra.sh/examples/incremental-model/ This example shows a SQL model that incrementally appends new events using cursor tracking. ## 1. Source Data ```sql -- models/raw/events.sql -- @kind: table SELECT * FROM (VALUES (1, 'u1', 'click', '/pricing', '{}', '2026-03-28 10:00:00'), (2, 'u2', 'view', '/features', '{}', '2026-03-28 11:00:00'), (3, 'u1', 'purchase', '/checkout', '{}', '2026-03-29 09:00:00'), (4, 'u3', 'signup', '/register', '{}', '2026-03-29 14:00:00'), (5, 'u2', 'view', '/docs', '{}', '2026-03-30 08:00:00') ) AS t(event_id, user_id, event_type, page_url, metadata, event_time) ``` ## 2. Incremental Model ```sql -- models/staging/stg_events.sql -- @kind: append -- @incremental: event_time -- @constraint: event_id NOT NULL -- @constraint: event_type IN ('click', 'view', 'purchase', 'signup') -- @audit: row_count > 0 SELECT event_id, user_id, event_type, page_url, metadata, event_time FROM raw.events ORDER BY event_time ``` ## 3. Run ```bash ondatrasql run ``` ``` Running 2 models... [OK] raw.events (table, backfill, 5 rows, 187ms — first run) [OK] staging.stg_events (append, backfill, 5 rows, 352ms — first run) ``` ## 4. Run Again ```bash ondatrasql run ``` ``` Running 2 models... [OK] raw.events (table, skip, 0 rows, 0s — no dependencies) [OK] staging.stg_events (append, incremental, 0 rows, 259ms — unchanged) ``` Nothing changed, nothing ran. ## How It Works 1. **First run (backfill):** All data is loaded into the target table 2. **After first run:** OndatraSQL records `MAX(event_time)` from the target table 3. **Subsequent runs:** Smart CDC automatically detects changes in `raw.events` using DuckLake time-travel and only appends new rows 4. **No manual filtering:** You write a plain SELECT — OndatraSQL handles incremental logic via CDC --- ### SCD2 Dimension > Track dimension history with Slowly Changing Dimension Type 2 URL: https://ondatra.sh/examples/scd2-dimension/ This example shows how to track the complete history of a dimension table using SCD2. ## 1. Source Data ```sql -- models/raw/customers.sql -- @kind: table SELECT * FROM (VALUES (1, 'Alice', 'alice@example.com', 'gold', 'Sweden', '2026-03-28 10:00:00'), (2, 'Bob', 'bob@example.com', 'silver', 'Norway', '2026-03-28 10:00:00'), (3, 'Carol', 'carol@example.com', 'bronze', 'Denmark', '2026-03-29 12:00:00') ) AS t(customer_id, name, email, tier, country, updated_at) ``` ## 2. SCD2 Model ```sql -- models/staging/stg_customers.sql -- @kind: scd2 -- @unique_key: customer_id -- @constraint: customer_id NOT NULL -- @audit: row_count > 0 SELECT customer_id, name, email, tier, country, updated_at FROM raw.customers ``` ## 3. First Run ```bash ondatrasql run ``` ``` Running 2 models... [OK] raw.customers (table, backfill, 3 rows, 174ms — first run) [OK] staging.stg_customers (scd2, backfill, 3 rows, 269ms — first run) ``` All rows are inserted with `is_current = true`: | customer_id | name | tier | is_current | |---|---|---|---| | 1 | Alice | gold | true | | 2 | Bob | silver | true | | 3 | Carol | bronze | true | ## 4. When Data Changes Change Alice's tier from "gold" to "platinum" in `raw/customers.sql`, then run again: ```bash ondatrasql run ``` ``` Running 2 models... [OK] raw.customers (table, backfill, 3 rows, 158ms — sql changed) [OK] staging.stg_customers (scd2, incremental, 1 rows, 360ms — unchanged) ``` The old version is closed and a new version is inserted: | customer_id | name | tier | is_current | |---|---|---|---| | 1 | Alice | gold | **false** | | 1 | Alice | platinum | **true** | | 2 | Bob | silver | true | | 3 | Carol | bronze | true | ## Auto-Generated Columns OndatraSQL automatically adds three columns to the output: | Column | Type | Description | |---|---|---| | `valid_from_snapshot` | BIGINT | DuckLake snapshot ID when this version became active | | `valid_to_snapshot` | BIGINT | Snapshot ID when superseded (NULL if current) | | `is_current` | BOOLEAN | Whether this is the active version | ## Querying SCD2 Data ```sql -- Current state SELECT * FROM staging.stg_customers WHERE is_current = true; -- Full history for a customer SELECT * FROM staging.stg_customers WHERE customer_id = 1 ORDER BY valid_from_snapshot; ``` --- ### Validated Model > Combine constraints and audits for comprehensive data quality URL: https://ondatra.sh/examples/validated-model/ This example demonstrates a model with multiple layers of validation: constraints, audits, and warnings. ## 1. Source Data ```sql -- models/raw/orders.sql -- @kind: table SELECT * FROM (VALUES (1, 101, 'alice@example.com', 150.00, 'shipped', '2026-03-28 10:00:00'), (2, 102, 'bob@example.com', 250.00, 'pending', '2026-03-28 11:00:00'), (3, 101, 'alice@example.com', 75.50, 'delivered', '2026-03-29 09:00:00'), (4, 103, 'carol@example.com', 500.00, 'processing', '2026-03-29 14:00:00'), (5, 102, 'bob@example.com', 120.00, 'shipped', '2026-03-30 08:00:00') ) AS t(order_id, customer_id, email, total, status, updated_at) ``` ```sql -- models/staging/stg_customers.sql -- @kind: table SELECT * FROM (VALUES (101, 'Alice'), (102, 'Bob'), (103, 'Carol') ) AS t(customer_id, name) ``` ## 2. Validated Model ```sql -- models/staging/stg_orders.sql -- @kind: merge -- @unique_key: order_id -- @incremental: updated_at -- Constraints (checked BEFORE insert — fail prevents materialization) -- @constraint: order_id PRIMARY KEY -- @constraint: customer_id NOT NULL -- @constraint: total >= 0 -- @constraint: status IN ('pending', 'processing', 'shipped', 'delivered', 'cancelled') -- @constraint: email EMAIL -- @constraint: customer_id REFERENCES staging.stg_customers(customer_id) -- Audits (checked AFTER insert — fail triggers rollback) -- @audit: row_count > 0 -- @audit: mean(total) BETWEEN 10 AND 5000 -- @audit: min(total) >= 0 -- Warnings (logged only — don't fail the model) -- @warning: total NULL_PERCENT < 5 -- @warning: email NOT EMPTY SELECT order_id, customer_id, email, total, status, updated_at FROM raw.orders ``` ## 3. Run ```bash ondatrasql run ``` ``` Running 3 models... [OK] raw.orders (table, backfill, 5 rows, 215ms — first run) [OK] staging.stg_customers (table, backfill, 3 rows, 172ms — first run) [OK] staging.stg_orders (merge, backfill, 5 rows, 500ms — first run) ``` ## Validation Flow 1. **SQL executes** — the SELECT runs and results are stored in a temp table 2. **Constraints check** — each constraint is validated against the temp data: - `order_id` must be non-null and unique - `total` must be >= 0 - `status` must be one of the allowed values - `customer_id` must exist in `stg_customers` - If any constraint fails, **nothing is written** 3. **Materialization** — data is merged into the target table 4. **Audits check** — post-insert validations run: - At least 1 row must exist - Average total must be between 10 and 5000 - Minimum total must be >= 0 - If any audit fails, the transaction **rolls back** 5. **Warnings log** — soft checks run and violations are logged (but don't fail) --- ## Lineage & Metadata > Built-in lineage and metadata — no external systems required URL: https://ondatra.sh/lineage/ OndatraSQL automatically tracks how your data is built. No external lineage tool. No metadata store. No integration required. Every run records: - Column-level lineage - Execution details - Schema changes - Dependencies - Git commit All stored in DuckLake. All queryable with SQL. ## Mental Model Every pipeline run creates a snapshot. Each snapshot includes the data, how it was produced, and where it came from. ## Column Lineage Every column is traced back to its source — across CTEs, joins, subqueries. No annotations. No manual mapping. ```bash ondatrasql lineage mart.revenue.total_amount ``` Shows source columns, transformations applied, upstream tables. ### Transformation Types Each column is classified by how it was created: | Type | Meaning | Example | |---|---|---| | `IDENTITY` | Direct copy | `SELECT name` | | `AGGREGATION` | Aggregated value | `SUM(amount)` | | `ARITHMETIC` | Computed | `price * quantity` | | `CONDITIONAL` | Logic applied | `CASE WHEN ...` | | `CAST` | Type conversion | `CAST(id AS VARCHAR)` | | `FUNCTION` | Function call | `UPPER(name)` | ## CLI ```bash ondatrasql lineage overview # All models with dependencies ondatrasql lineage staging.orders # Column lineage for one model ondatrasql lineage staging.orders.total # Trace one column ``` Use these to explore dependencies, debug transformations, and understand impact of changes. ## Commit Metadata Every run stores execution metadata inside DuckLake. No separate system required. This acts as a built-in metadata store. ### Execution | Field | Description | |---|---| | `run_type` | `backfill`, `incremental`, `full`, `skip`, `flush` | | `rows_affected` | Rows written | | `duration_ms` | Execution time | | `steps` | Sub-step breakdown with timing | ### Lineage | Field | Description | |---|---| | `column_lineage` | Source columns with transformation types | | `depends` | Upstream table dependencies | ### Schema | Field | Description | |---|---| | `columns` | Output column definitions | | `schema_hash` | Detects schema evolution | ### Versioning | Field | Description | |---|---| | `sql_hash` | Triggers backfill on change | | `dag_run_id` | Run identifier | | `source_file` | Model source path | ### Git | Field | Description | |---|---| | `git_commit` | Commit SHA | | `git_branch` | Branch name | | `git_repo_url` | Repository URL | Trace data back to the code change that produced it. ## Query Metadata Everything is just SQL: ```sql SELECT commit_extra_info->>'model' AS model, commit_extra_info->>'run_type' AS run_type, commit_extra_info->>'rows_affected' AS rows, commit_extra_info->>'duration_ms' AS ms FROM lake.snapshots() ORDER BY snapshot_id DESC LIMIT 10; ``` Build dashboards, alerts, or debugging tools directly in SQL. ## Why This Matters - Debug pipelines faster - Understand data origins - Track changes over time - Build observability without extra tools --- ## Blueprint: Google Ad Manager > Fetch historical GAM reports incrementally with one YAML model and one shared source function. URL: https://ondatra.sh/blueprints/google-ad-manager/ Ingest Google Ad Manager reports without building a connector. No Python connector. No Airflow DAG. No manual cursor tracking. ## What This Blueprint Gives You - Incremental loading by date - OAuth with Google service account - Report creation + polling handled automatically - Dynamic schema from dimensions and metrics - Reusable source function for multiple GAM reports ## Quick Start ### 1. Set up credentials Add to `.env`: ```bash GAM_NETWORK_CODE=12345678 GAM_KEY_FILE=service-account.json ``` ### 2. Add the source function Copy [`lib/gam_report.star`](#source-function) into your project. ### 3. Create a model ```yaml # models/raw/gam_report.yaml kind: append incremental: report_date incremental_initial: "2026-02-28" source: gam_report config: network_code: ${GAM_NETWORK_CODE} key_file: ${GAM_KEY_FILE} dimensions: - AD_UNIT_NAME - DATE - ORDER_NAME - LINE_ITEM_NAME - CREATIVE_NAME metrics: - AD_SERVER_IMPRESSIONS - AD_SERVER_CLICKS - AD_SERVER_CTR - AD_SERVER_REVENUE ``` ### 4. Run ```bash ondatrasql run raw.gam_report ``` That's the whole model. Most users only change YAML — not code. This creates a versioned append table in DuckLake, keyed by `report_date` for incremental loading. ## How It Works 1. Authenticates with a Google service account 2. Computes the next date range from incremental state 3. Creates and runs a GAM report 4. Polls until the report is ready 5. Fetches rows page by page 6. Maps dimensions and metrics into typed output columns 7. Emits rows into the pipeline ## Customization Most users never need to touch the source code. Change dimensions, metrics, filters, and date behavior in YAML only. **Simpler report** — fewer columns: ```yaml # models/raw/gam_simple.yaml kind: append incremental: report_date incremental_initial: "2026-01-01" source: gam_report config: network_code: ${GAM_NETWORK_CODE} key_file: ${GAM_KEY_FILE} dimensions: - DATE - AD_UNIT_NAME metrics: - AD_SERVER_IMPRESSIONS ``` **With filters** — only specific orders: ```yaml # models/raw/gam_campaign.yaml kind: append incremental: report_date incremental_initial: "2026-01-01" source: gam_report config: network_code: ${GAM_NETWORK_CODE} key_file: ${GAM_KEY_FILE} dimensions: - DATE - LINE_ITEM_NAME metrics: - AD_SERVER_IMPRESSIONS - AD_SERVER_REVENUE filters: - dimension: ORDER_NAME operator: CONTAINS values: ["Q1_2026"] currency_code: "EUR" ``` **Multiple networks** — duplicate the YAML with a different `network_code`: ```yaml # models/raw/gam_report_us.yaml kind: append incremental: report_date incremental_initial: "2026-02-28" source: gam_report config: network_code: ${GAM_NETWORK_CODE_US} key_file: ${GAM_KEY_FILE} dimensions: - AD_UNIT_NAME - DATE metrics: - AD_SERVER_IMPRESSIONS - AD_SERVER_CLICKS ``` The source function in `lib/gam_report.star` is shared — no code duplication. ## What This Replaces Traditionally, this workflow requires: - A custom Python connector - OAuth/token handling - Polling logic - Pagination code - Incremental state storage - A scheduler With OndatraSQL, it's one source function plus one YAML model. ## Output Schema Column names and types are derived from your config: | Config | Column | Type | |---|---|---| | `DATE` | `report_date` | VARCHAR (YYYY-MM-DD) | | `AD_UNIT_NAME` | `ad_unit_name` | VARCHAR | | `ORDER_NAME` | `order_name` | VARCHAR | | `LINE_ITEM_NAME` | `line_item_name` | VARCHAR | | `CREATIVE_NAME` | `creative_name` | VARCHAR | | `AD_SERVER_IMPRESSIONS` | `impressions` | BIGINT | | `AD_SERVER_CLICKS` | `clicks` | BIGINT | | `AD_SERVER_CTR` | `ctr` | DOUBLE | | `AD_SERVER_REVENUE` | `revenue` | DOUBLE | Dimensions with `intValue` (non-DATE) become BIGINT. Metrics with `intValue` become BIGINT, `doubleValue` become DOUBLE. ## Before You Start You need: - A GCP project with the [Ad Manager API](https://console.cloud.google.com/) enabled - A service account JSON key (create under **Credentials > Service Account**) - API access enabled in your GAM network (**Admin > Global settings**, [docs](https://support.google.com/admanager/answer/3088588)) - The service account email added as an API user in GAM - Your GAM network code (found under **Admin > Global settings**) ## Full Config Reference All [GAM reportDefinition fields](https://developers.google.com/ad-manager/api/beta/reference/rest/v1/networks.reports#ReportDefinition) are supported as optional config keys: ```yaml config: network_code: ${GAM_NETWORK_CODE} key_file: ${GAM_KEY_FILE} dimensions: [...] metrics: [...] report_type: HISTORICAL # Filtering and sorting filters: - dimension: ORDER_NAME operator: CONTAINS values: ["Campaign_2026"] sorts: - field: AD_SERVER_IMPRESSIONS descending: true # Time zone and currency time_zone_source: PROVIDED time_zone: "Europe/Stockholm" currency_code: "SEK" # Comparison and time periods time_period_column: QUARTERS comparison_date_range: fixed: start_date: "2025-01-01" end_date: "2025-12-31" # Custom dimensions cms_metadata_dimension_key_ids: ["12345"] custom_dimension_key_ids: ["67890"] ``` Most users only need `dimensions`, `metrics`, and optionally `filters`. ## Source Function `lib/gam_report.star` The source function handles all API interaction. It maps dimensions and metrics dynamically — column names are derived from the config. ```python def gam_report(save, network_code="", key_file="service-account.json", dimensions=None, metrics=None, report_type="HISTORICAL", filters=None, sorts=None, time_zone_source=None, time_zone=None, currency_code=None, time_period_column=None, comparison_date_range=None, cms_metadata_dimension_key_ids=None, custom_dimension_key_ids=None, ekv_dimension_key_ids=None, line_item_custom_field_ids=None, order_custom_field_ids=None, creative_custom_field_ids=None, flags=None, expanded_compatibility=None): """Fetch GAM historical report. Dimensions and metrics are mapped dynamically.""" scope = "https://www.googleapis.com/auth/admanager" base_url = "https://admanager.googleapis.com/v1/networks/" + network_code one_day = time.parse_duration("24h") yesterday = (time.now() - one_day).format("2006-01-02") start_date = "" if incremental.is_backfill: start_date = incremental.initial_value else: last = time.parse_time(incremental.last_value + "T00:00:00Z") start_date = (last + one_day).format("2006-01-02") end_date = yesterday print("Range: " + start_date + " to " + end_date + " (backfill: " + str(incremental.is_backfill) + ")") if start_date > end_date: print("Already up to date") abort() token = oauth.token(google_key_file=key_file, scope=scope) headers = { "Authorization": "Bearer " + token.access_token, "Content-Type": "application/json", } # --- Build report definition --- start_t = time.parse_time(start_date + "T00:00:00Z") end_t = time.parse_time(end_date + "T00:00:00Z") report_def = { "dimensions": dimensions, "metrics": metrics, "dateRange": { "fixed": { "startDate": {"year": start_t.year, "month": start_t.month, "day": start_t.day}, "endDate": {"year": end_t.year, "month": end_t.month, "day": end_t.day}, }, }, "reportType": report_type, } # Optional fields — only include if provided if filters: report_def["filters"] = filters if sorts: report_def["sorts"] = sorts if time_zone_source: report_def["timeZoneSource"] = time_zone_source if time_zone: report_def["timeZone"] = time_zone if currency_code: report_def["currencyCode"] = currency_code if time_period_column: report_def["timePeriodColumn"] = time_period_column if comparison_date_range: report_def["comparisonDateRange"] = comparison_date_range if cms_metadata_dimension_key_ids: report_def["cmsMetadataDimensionKeyIds"] = cms_metadata_dimension_key_ids if custom_dimension_key_ids: report_def["customDimensionKeyIds"] = custom_dimension_key_ids if ekv_dimension_key_ids: report_def["ekvDimensionKeyIds"] = ekv_dimension_key_ids if line_item_custom_field_ids: report_def["lineItemCustomFieldIds"] = line_item_custom_field_ids if order_custom_field_ids: report_def["orderCustomFieldIds"] = order_custom_field_ids if creative_custom_field_ids: report_def["creativeCustomFieldIds"] = creative_custom_field_ids if flags: report_def["flags"] = flags if expanded_compatibility != None: report_def["expandedCompatibility"] = expanded_compatibility create_body = json.encode({ "reportDefinition": report_def, "visibility": "HIDDEN", }) # --- Create report --- create_resp = http.post(base_url + "/reports", json=create_body, headers=headers, timeout=60) if not create_resp.ok: fail("Create report failed: " + str(create_resp.status_code) + " " + create_resp.text) report_name = create_resp.json["name"] # --- Run report --- run_resp = http.post( "https://admanager.googleapis.com/v1/" + report_name + ":run", json={}, headers=headers, timeout=60, ) if not run_resp.ok: fail("Run report failed: " + str(run_resp.status_code) + " " + run_resp.text) operation_name = run_resp.json["name"] # --- Poll until done --- poll_url = "https://admanager.googleapis.com/v1/" + operation_name last_poll = None wait = 2.0 for i in range(60): sleep(wait) last_poll = http.get(poll_url, headers=headers) if not last_poll.ok: fail("Poll failed: " + str(last_poll.status_code) + " " + last_poll.text) if "done" in last_poll.json and last_poll.json["done"]: break if i == 59: fail("Report timed out") if wait < 10.0: wait = wait * 2 # --- Column name mapping --- dim_cols = [] for d in dimensions: col = d.lower() if col == "date": col = "report_date" dim_cols.append(col) metric_cols = [] for m in metrics: col = m.lower() for prefix in ["ad_server_", "ad_exchange_", "total_"]: if col.startswith(prefix): col = col[len(prefix):] break metric_cols.append(col) # --- Fetch result rows --- result_name = last_poll.json["response"]["reportResult"] fetch_base = ("https://admanager.googleapis.com/v1/" + result_name + ":fetchRows") page_token = "" while True: params = {"pageSize": "10000"} if page_token != "": params["pageToken"] = page_token res = http.get(fetch_base, headers=headers, params=params, timeout=120) if not res.ok: fail("Fetch rows failed: " + str(res.status_code) + " " + res.text) rows = res.json.get("rows") if rows == None or len(rows) == 0: break for row in rows: dims = row["dimensionValues"] mets = row["metricValueGroups"][0]["primaryValues"] record = {} for i, col in enumerate(dim_cols): val = dims[i] if "intValue" in val: raw = val["intValue"] if dimensions[i] == "DATE": record[col] = raw[:4] + "-" + raw[4:6] + "-" + raw[6:8] else: record[col] = int(raw) else: record[col] = val.get("stringValue", "") for i, col in enumerate(metric_cols): val = mets[i] if "doubleValue" in val: record[col] = float(val["doubleValue"]) elif "intValue" in val: record[col] = int(val["intValue"]) else: record[col] = 0 save.row(record) page_token = res.json.get("nextPageToken", "") if page_token == "": break print("Saved " + str(save.count()) + " rows") ``` ## Using as a Standalone Script Use this only if you want full control in Starlark. For most cases, YAML is simpler and preferred. ```python # models/raw/gam_report.star # @kind: append # @incremental: report_date # @incremental_initial: 2026-02-28 load("lib/gam_report.star", "gam_report") gam_report( save, network_code=env.get("GAM_NETWORK_CODE"), key_file=env.get("GAM_KEY_FILE"), dimensions=["AD_UNIT_NAME", "DATE", "ORDER_NAME", "LINE_ITEM_NAME", "CREATIVE_NAME"], metrics=["AD_SERVER_IMPRESSIONS", "AD_SERVER_CLICKS", "AD_SERVER_CTR", "AD_SERVER_REVENUE"], ) ```