Sync Data to External APIs
On this page
Push data to external systems using @sink. The runtime handles batching, rate limiting, per-row tracking, and crash recovery.
1. Create a lib function
Create a .star file in lib/ with an API dict and a push() function:
# lib/hubspot_push.star
API = {
"base_url": "https://api.hubapi.com",
"auth": {"env": "HUBSPOT_KEY"},
"retry": 3,
"rate_limit": {"requests": 100, "per": "10s"},
"push": {
"batch_size": 100,
"batch_mode": "sync",
},
}
def push(rows):
results = {}
for r in rows:
ct = r["__ondatra_change_type"]
key = str(r["__ondatra_rowid"]) + ":" + ct
payload = {k: v for k, v in r.items() if not k.startswith("_")}
if ct == "insert":
resp = http.post("/crm/v3/objects/contacts", json=payload)
elif ct == "update_postimage":
resp = http.patch("/crm/v3/objects/contacts/" + str(r["customer_id"]), json=payload)
elif ct == "delete":
resp = http.delete("/crm/v3/objects/contacts/" + str(r["customer_id"]))
elif ct == "update_preimage":
results[key] = "ok" # skip
continue
results[key] = "ok" if resp.ok else "error: " + resp.text
return results
Each row includes two internal fields:
__ondatra_rowid— stable DuckLake row identifier__ondatra_change_type— raw DuckLake change type:insert,update_preimage,update_postimage, ordelete
Return keys are composite: "rowid:change_type". This ensures that update_preimage and update_postimage for the same row can have independent statuses. See the Push Contract for the complete reference.
2. Create a sync model
-- models/sync/hubspot.sql
-- @kind: merge
-- @unique_key: customer_id
-- @sink: hubspot_push
SELECT customer_id, email, name, plan
FROM mart.customers
WHERE active = true
@sink works with all model kinds except events.
3. Run
ondatrasql run
First run syncs all rows (change type insert). Subsequent runs sync only changed rows — merge produces insert for new rows and update_postimage for modified rows.
Change types per kind
Each kind produces different change types from DuckLake’s table_changes():
| Kind | Materialization | Change types produced |
|---|---|---|
| table | TRUNCATE + INSERT | delete (truncate) + insert (new rows) |
| append | INSERT | insert only |
| merge | MERGE INTO | insert, update_postimage (+ update_preimage) |
| tracked | DELETE + INSERT | delete + insert (your push groups by key) |
| scd2 | UPDATE (close) + INSERT (new version) | update_postimage + insert |
| partition | TRUNCATE partition + INSERT | delete + insert per partition |
Your Starlark push function handles the logic. For example, tracked’s DELETE+INSERT pattern means you group by unique key to distinguish updates from real deletes.
Sync patterns
Upsert (merge)
-- @kind: merge
-- @unique_key: customer_id
-- @sink: crm_push
SELECT customer_id, email, plan FROM mart.customers
Push receives insert for new rows, update_postimage for changed rows. Map to POST and PATCH respectively.
Group sync (tracked)
-- @kind: tracked
-- @unique_key: invoice_id
-- @sink: erp_push
SELECT invoice_id, line_item, quantity, price FROM mart.invoice_lines
Push receives delete + insert events. Group by invoice_id — if a key has both deletes and inserts, it’s an update. If only deletes, the entity was removed.
Append (new rows only)
-- @kind: append
-- @sink: slack_post
SELECT message, channel FROM mart.notifications
Push always receives insert — append-only data.
Full replace (table)
-- @kind: table
-- @sink: sheets_push
SELECT region, revenue FROM mart.regional_summary
Push receives delete events (from truncate) and insert events (new data). For a simple full replace, filter to inserts only.
Version history (scd2)
-- @kind: scd2
-- @unique_key: product_id
-- @sink: archive_push
SELECT product_id, name, price FROM mart.products
Push receives update_postimage (closed version with valid_to_snapshot set) and insert (new current version).
Next steps
- Push Contract for internal fields, return values, batch modes, and crash recovery
- API Dict for all push configuration options
- Create a Lib Function for the full blueprint walkthrough
- Directives for all
@sinkdirectives
OndatraSQL