Sync Data to External APIs

On this page

Push data to external systems using @sink. The runtime handles batching, rate limiting, per-row tracking, and crash recovery.

1. Create a lib function

Create a .star file in lib/ with an API dict and a push() function:

# lib/hubspot_push.star
API = {
    "base_url": "https://api.hubapi.com",
    "auth": {"env": "HUBSPOT_KEY"},
    "retry": 3,
    "rate_limit": {"requests": 100, "per": "10s"},
    "push": {
        "batch_size": 100,
        "batch_mode": "sync",
    },
}

def push(rows):
    results = {}
    for r in rows:
        ct = r["__ondatra_change_type"]
        key = str(r["__ondatra_rowid"]) + ":" + ct
        payload = {k: v for k, v in r.items() if not k.startswith("_")}

        if ct == "insert":
            resp = http.post("/crm/v3/objects/contacts", json=payload)
        elif ct == "update_postimage":
            resp = http.patch("/crm/v3/objects/contacts/" + str(r["customer_id"]), json=payload)
        elif ct == "delete":
            resp = http.delete("/crm/v3/objects/contacts/" + str(r["customer_id"]))
        elif ct == "update_preimage":
            results[key] = "ok"  # skip
            continue

        results[key] = "ok" if resp.ok else "error: " + resp.text
    return results

Each row includes two internal fields:

  • __ondatra_rowid — stable DuckLake row identifier
  • __ondatra_change_type — raw DuckLake change type: insert, update_preimage, update_postimage, or delete

Return keys are composite: "rowid:change_type". This ensures that update_preimage and update_postimage for the same row can have independent statuses. See the Push Contract for the complete reference.

2. Create a sync model

-- models/sync/hubspot.sql
-- @kind: merge
-- @unique_key: customer_id
-- @sink: hubspot_push

SELECT customer_id, email, name, plan
FROM mart.customers
WHERE active = true

@sink works with all model kinds except events.

3. Run

ondatrasql run

First run syncs all rows (change type insert). Subsequent runs sync only changed rows — merge produces insert for new rows and update_postimage for modified rows.

Change types per kind

Each kind produces different change types from DuckLake’s table_changes():

KindMaterializationChange types produced
tableTRUNCATE + INSERTdelete (truncate) + insert (new rows)
appendINSERTinsert only
mergeMERGE INTOinsert, update_postimage (+ update_preimage)
trackedDELETE + INSERTdelete + insert (your push groups by key)
scd2UPDATE (close) + INSERT (new version)update_postimage + insert
partitionTRUNCATE partition + INSERTdelete + insert per partition

Your Starlark push function handles the logic. For example, tracked’s DELETE+INSERT pattern means you group by unique key to distinguish updates from real deletes.

Sync patterns

Upsert (merge)

-- @kind: merge
-- @unique_key: customer_id
-- @sink: crm_push
SELECT customer_id, email, plan FROM mart.customers

Push receives insert for new rows, update_postimage for changed rows. Map to POST and PATCH respectively.

Group sync (tracked)

-- @kind: tracked
-- @unique_key: invoice_id
-- @sink: erp_push
SELECT invoice_id, line_item, quantity, price FROM mart.invoice_lines

Push receives delete + insert events. Group by invoice_id — if a key has both deletes and inserts, it’s an update. If only deletes, the entity was removed.

Append (new rows only)

-- @kind: append
-- @sink: slack_post
SELECT message, channel FROM mart.notifications

Push always receives insert — append-only data.

Full replace (table)

-- @kind: table
-- @sink: sheets_push
SELECT region, revenue FROM mart.regional_summary

Push receives delete events (from truncate) and insert events (new data). For a simple full replace, filter to inserts only.

Version history (scd2)

-- @kind: scd2
-- @unique_key: product_id
-- @sink: archive_push
SELECT product_id, name, price FROM mart.products

Push receives update_postimage (closed version with valid_to_snapshot set) and insert (new current version).

Next steps