Build Daily Metrics
Create a multi-layer SQL pipeline with incremental updates and validation
Build a three-layer pipeline: raw → staging → mart. Incremental updates. Validation. Run with one command.
1. Raw Layer
Source data. In production this comes from an API or events. For this guide, inline data:
-- models/raw/orders.sql
-- @kind: table
SELECT * FROM (VALUES
(1, 'Alice', 100, 'shipped', '2026-03-28'),
(2, 'Bob', 250, 'shipped', '2026-03-28'),
(3, 'Charlie', 75, 'pending', '2026-03-29'),
(4, 'Alice', 200, 'shipped', '2026-03-29'),
(5, 'Diana', 150, 'cancelled','2026-03-30')
) AS t(order_id, customer, amount, status, order_date)
2. Staging Layer
Clean and validate:
-- models/staging/stg_orders.sql
-- @kind: table
-- @constraint: order_id NOT NULL
-- @constraint: amount > 0
SELECT
order_id,
LOWER(TRIM(customer)) AS customer,
CAST(amount AS DOUBLE) AS amount,
status,
CAST(order_date AS DATE) AS order_date
FROM raw.orders
Constraints run before materialization. Bad data is blocked.
3. Mart Layer
Aggregate into daily metrics:
-- models/mart/daily_revenue.sql
-- @kind: merge
-- @unique_key: order_date
-- @audit: row_count > 0
SELECT
order_date,
COUNT(*) AS orders,
SUM(amount) AS revenue,
SUM(CASE WHEN status = 'shipped' THEN amount ELSE 0 END) AS shipped_revenue,
ROUND(SUM(CASE WHEN status = 'shipped' THEN amount ELSE 0 END) / SUM(amount) * 100, 1) AS ship_rate
FROM staging.stg_orders
GROUP BY order_date
The audit checks that at least one row exists after materialization. If it fails, the table is rolled back.
4. Run
ondatrasql run
Running 3 models...
[OK] raw.orders (table, backfill, 5 rows, 156ms — first run)
[OK] staging.stg_orders (table, backfill, 5 rows, 233ms — first run)
[OK] mart.daily_revenue (merge, backfill, 3 rows, 296ms — first run)
5. Query
ondatrasql sql "SELECT * FROM mart.daily_revenue ORDER BY order_date"
6. Run Again
Change nothing. Run again:
ondatrasql run
Running 3 models...
[OK] raw.orders (table, skip, 0 rows, 0s — no dependencies)
[OK] staging.stg_orders (table, skip, 0 rows, 0s — deps unchanged)
[OK] mart.daily_revenue (merge, incremental, 0 rows, 296ms — unchanged)
Nothing changed, nothing ran. No wasted I/O.
7. Preview Changes
Edit raw/orders.sql to add a row. Then:
ondatrasql sandbox
See the row count diff, downstream propagation, and validation results — before committing anything.
The DAG
OndatraSQL detected the dependency chain automatically:
raw.orders → staging.stg_orders → mart.daily_revenue
You never declared it. The SQL AST did.
Ondatra Labs