Push Contract
On this page
Your push() function receives rows from the runtime and sends them to an external API. SQL controls what gets pushed — Starlark controls how.
How push fits the architecture
SQL transforms data and declares what to sync:
-- models/sync/contacts.sql
-- @kind: merge
-- @unique_key: id
-- @sink: crm
SELECT id, name, email FROM staging.contacts
The runtime materializes the SQL, detects changes via DuckLake snapshots, and calls push() with the delta rows. Starlark sends them to the API — nothing else.
Function signature
def push(rows):
rows is a list of dicts. Each dict is one row with column values plus two internal fields.
Internal fields
Every row includes two internal fields from DuckLake. Strip fields starting with _ before sending to external APIs.
| Field | Type | Description |
|---|---|---|
__ondatra_rowid | int | Stable DuckLake row identifier. Persists across snapshots. |
__ondatra_change_type | string | Raw change type from table_changes(). |
Change types
| Change type | Meaning | Data source |
|---|---|---|
insert | New row | Current table |
update_postimage | Row after update | Current table |
update_preimage | Row before update | Previous snapshot |
delete | Row removed | Previous snapshot |
An UPDATE produces two rows with the same __ondatra_rowid: update_preimage (old data) and update_postimage (new data).
Change types per kind
| Kind | SQL operation | Change types produced |
|---|---|---|
| table | TRUNCATE + INSERT | delete + insert |
| append | INSERT | insert |
| merge | MERGE INTO | insert, update_postimage (+ update_preimage) |
| tracked | DELETE + INSERT per group | delete + insert |
| scd2 | UPDATE + INSERT | update_postimage + insert |
| partition | DELETE partition + INSERT | delete + insert (scoped to affected partitions) |
Your push function decides what to do with each change type. The runtime does no filtering.
Return value
Return a dict with a status per row. Keys are "rowid:change_type":
key = str(r["__ondatra_rowid"]) + ":" + r["__ondatra_change_type"]
| Status | Meaning | Runtime behavior |
|---|---|---|
"ok" | Delivered | Acked — removed from queue |
"warn: message" | Delivered with warning | Acked + warning logged |
"error: message" | Temporary failure | Nacked — requeued for retry |
"reject: message" | Permanent failure | Dead-lettered — never retried |
Every row in the batch must have a status. Missing keys cause an error.
def push(rows):
payload = [{k: v for k, v in r.items() if not k.startswith("_")} for r in rows]
resp = http.post("/api/batch", json=payload)
if not resp.ok:
return {str(r["__ondatra_rowid"]) + ":" + r["__ondatra_change_type"]: "error: " + resp.text for r in rows}
return {str(r["__ondatra_rowid"]) + ":" + r["__ondatra_change_type"]: "ok" for r in rows}
Batch modes
Configured in the API dict under push:
sync (default)
Per-row status. Rows succeed and fail independently.
API = {"push": {"batch_size": 50, "batch_mode": "sync"}}
atomic
All-or-nothing. Return None for success, call fail() to abort.
API = {"push": {"batch_size": 100, "batch_mode": "atomic"}}
def push(rows):
resp = http.post("/api/batch", json=rows)
if not resp.ok:
fail("batch failed: " + resp.text)
async
Job-based with polling. Return a job reference, implement poll():
API = {"push": {"batch_mode": "async", "poll_interval": "30s", "poll_timeout": "1h"}}
def push(rows):
resp = http.post("/api/bulk-job", json=rows)
return {"job_id": resp.json["id"]}
def poll(job_ref):
resp = http.get("/api/bulk-job/" + job_ref["job_id"])
if resp.json["status"] == "complete":
return {rid + ":" + ct: "ok" for rid, ct in resp.json["results"]}
if resp.json["status"] == "failed":
fail("job failed: " + resp.json["error"])
# return None to keep polling
Handling change types
Route change types to appropriate API operations:
def push(rows):
results = {}
for r in rows:
ct = r["__ondatra_change_type"]
key = str(r["__ondatra_rowid"]) + ":" + ct
payload = {k: v for k, v in r.items() if not k.startswith("_")}
if ct == "insert":
resp = http.post("/api/records", json=payload)
elif ct == "update_postimage":
resp = http.patch("/api/records/" + str(r["id"]), json=payload)
elif ct == "delete":
resp = http.delete("/api/records/" + str(r["id"]))
elif ct == "update_preimage":
results[key] = "ok" # skip — or use for audit logging
continue
results[key] = "ok" if resp.ok else "error: " + resp.text
return results
Finalize
Optional. Called once after all batches succeed:
def finalize(succeeded, failed):
if failed == 0:
http.post("/webhooks", json={"event": "sync_complete", "rows": succeeded})
Not called if any batch failed.
Building outbound JSON in SQL
SQL controls the transformation — including building nested JSON for APIs that expect structured payloads:
-- models/sync/api_contacts.sql
-- @kind: merge
-- @unique_key: id
-- @sink: crm
SELECT
id,
json_object(
'name', first_name || ' ' || last_name,
'email', email,
'tags', json_group_array(tag)
) AS properties
FROM staging.contacts
JOIN staging.contact_tags USING (id)
GROUP BY id, first_name, last_name, email
Push receives properties as a JSON string. DuckDB handles the nesting — Starlark just sends it.
Crash recovery
The runtime uses a durable Badger queue. If a run crashes:
- Pending events survive and retry on the next run
- Inflight batches older than 10 minutes are reclaimed
- Cross-check against DuckLake prevents double-delivery
Your push function doesn’t handle crash recovery. The runtime guarantees at-least-once delivery.
OndatraSQL