Push Contract

On this page

Your push() function receives rows from the runtime and sends them to an external API. SQL controls what gets pushed — Starlark controls how.

How push fits the architecture

SQL transforms data and declares what to sync:

-- models/sync/contacts.sql
-- @kind: merge
-- @unique_key: id
-- @sink: crm

SELECT id, name, email FROM staging.contacts

The runtime materializes the SQL, detects changes via DuckLake snapshots, and calls push() with the delta rows. Starlark sends them to the API — nothing else.

Function signature

def push(rows):

rows is a list of dicts. Each dict is one row with column values plus two internal fields.

Internal fields

Every row includes two internal fields from DuckLake. Strip fields starting with _ before sending to external APIs.

FieldTypeDescription
__ondatra_rowidintStable DuckLake row identifier. Persists across snapshots.
__ondatra_change_typestringRaw change type from table_changes().

Change types

Change typeMeaningData source
insertNew rowCurrent table
update_postimageRow after updateCurrent table
update_preimageRow before updatePrevious snapshot
deleteRow removedPrevious snapshot

An UPDATE produces two rows with the same __ondatra_rowid: update_preimage (old data) and update_postimage (new data).

Change types per kind

KindSQL operationChange types produced
tableTRUNCATE + INSERTdelete + insert
appendINSERTinsert
mergeMERGE INTOinsert, update_postimage (+ update_preimage)
trackedDELETE + INSERT per groupdelete + insert
scd2UPDATE + INSERTupdate_postimage + insert
partitionDELETE partition + INSERTdelete + insert (scoped to affected partitions)

Your push function decides what to do with each change type. The runtime does no filtering.

Return value

Return a dict with a status per row. Keys are "rowid:change_type":

key = str(r["__ondatra_rowid"]) + ":" + r["__ondatra_change_type"]
StatusMeaningRuntime behavior
"ok"DeliveredAcked — removed from queue
"warn: message"Delivered with warningAcked + warning logged
"error: message"Temporary failureNacked — requeued for retry
"reject: message"Permanent failureDead-lettered — never retried

Every row in the batch must have a status. Missing keys cause an error.

def push(rows):
    payload = [{k: v for k, v in r.items() if not k.startswith("_")} for r in rows]
    resp = http.post("/api/batch", json=payload)
    if not resp.ok:
        return {str(r["__ondatra_rowid"]) + ":" + r["__ondatra_change_type"]: "error: " + resp.text for r in rows}
    return {str(r["__ondatra_rowid"]) + ":" + r["__ondatra_change_type"]: "ok" for r in rows}

Batch modes

Configured in the API dict under push:

sync (default)

Per-row status. Rows succeed and fail independently.

API = {"push": {"batch_size": 50, "batch_mode": "sync"}}

atomic

All-or-nothing. Return None for success, call fail() to abort.

API = {"push": {"batch_size": 100, "batch_mode": "atomic"}}

def push(rows):
    resp = http.post("/api/batch", json=rows)
    if not resp.ok:
        fail("batch failed: " + resp.text)

async

Job-based with polling. Return a job reference, implement poll():

API = {"push": {"batch_mode": "async", "poll_interval": "30s", "poll_timeout": "1h"}}

def push(rows):
    resp = http.post("/api/bulk-job", json=rows)
    return {"job_id": resp.json["id"]}

def poll(job_ref):
    resp = http.get("/api/bulk-job/" + job_ref["job_id"])
    if resp.json["status"] == "complete":
        return {rid + ":" + ct: "ok" for rid, ct in resp.json["results"]}
    if resp.json["status"] == "failed":
        fail("job failed: " + resp.json["error"])
    # return None to keep polling

Handling change types

Route change types to appropriate API operations:

def push(rows):
    results = {}
    for r in rows:
        ct = r["__ondatra_change_type"]
        key = str(r["__ondatra_rowid"]) + ":" + ct
        payload = {k: v for k, v in r.items() if not k.startswith("_")}

        if ct == "insert":
            resp = http.post("/api/records", json=payload)
        elif ct == "update_postimage":
            resp = http.patch("/api/records/" + str(r["id"]), json=payload)
        elif ct == "delete":
            resp = http.delete("/api/records/" + str(r["id"]))
        elif ct == "update_preimage":
            results[key] = "ok"  # skip — or use for audit logging
            continue

        results[key] = "ok" if resp.ok else "error: " + resp.text
    return results

Finalize

Optional. Called once after all batches succeed:

def finalize(succeeded, failed):
    if failed == 0:
        http.post("/webhooks", json={"event": "sync_complete", "rows": succeeded})

Not called if any batch failed.

Building outbound JSON in SQL

SQL controls the transformation — including building nested JSON for APIs that expect structured payloads:

-- models/sync/api_contacts.sql
-- @kind: merge
-- @unique_key: id
-- @sink: crm

SELECT
    id,
    json_object(
        'name', first_name || ' ' || last_name,
        'email', email,
        'tags', json_group_array(tag)
    ) AS properties
FROM staging.contacts
JOIN staging.contact_tags USING (id)
GROUP BY id, first_name, last_name, email

Push receives properties as a JSON string. DuckDB handles the nesting — Starlark just sends it.

Crash recovery

The runtime uses a durable Badger queue. If a run crashes:

  • Pending events survive and retry on the next run
  • Inflight batches older than 10 minutes are reclaimed
  • Cross-check against DuckLake prevents double-delivery

Your push function doesn’t handle crash recovery. The runtime guarantees at-least-once delivery.