Sync Data to External APIs

On this page

Push data to external systems using @sink. The runtime handles batching, rate limiting, per-row tracking, and crash recovery.

1. Create a lib function

Create a .star file in lib/ with an API dict and a push() function:

# lib/hubspot_push.star
API = {
    "base_url": "https://api.hubapi.com",
    "auth": {"env": "HUBSPOT_KEY"},
    "retry": 3,
    "rate_limit": {"requests": 100, "per": "10s"},
    "push": {
        "batch_size": 100,
        "batch_mode": "sync",
    },
}

def push(rows=[], batch_number=1):
    results = {}
    for r in rows:
        ct = r["__ondatra_change_type"]
        key = str(r["__ondatra_rowid"]) + ":" + ct
        payload = {k: v for k, v in r.items() if not k.startswith("_")}

        if ct == "insert":
            resp = http.post("/crm/v3/objects/contacts", json=payload)
        elif ct == "update_postimage":
            resp = http.patch("/crm/v3/objects/contacts/" + str(r["customer_id"]), json=payload)
        elif ct == "delete":
            resp = http.delete("/crm/v3/objects/contacts/" + str(r["customer_id"]))
        elif ct == "update_preimage":
            results[key] = "ok"  # skip
            continue

        results[key] = "ok" if resp.ok else "error: " + resp.text
    return results

Each row includes two internal fields:

  • __ondatra_rowid — stable DuckLake row identifier
  • __ondatra_change_type — raw DuckLake change type: insert, update_preimage, update_postimage, or delete

Return keys are composite: "rowid:change_type". This ensures that update_preimage and update_postimage for the same row can have independent statuses. See the Push Contract for the complete reference.

2. Create a sync model

-- models/sync/hubspot.sql
-- @kind: merge
-- @unique_key: customer_id
-- @sink: hubspot_push

SELECT customer_id, email, name, plan
FROM mart.customers
WHERE active = true

@sink works with table, append, merge, and tracked kinds.

3. Run

ondatrasql run

First run syncs all rows (change type insert). Subsequent runs sync only changed rows — merge produces insert for new rows and update_postimage for modified rows.

Change types per kind

Each kind produces different change types from DuckLake’s table_changes():

KindMaterializationChange types produced
tableTRUNCATE + INSERTdelete (truncate) + insert (new rows)
appendINSERTinsert only
mergeMERGE INTOinsert, update_postimage (+ update_preimage)
trackedDELETE + INSERTdelete + insert (your push groups by key)

scd2 is not supported with @sink — use @kind: table with WHERE is_current = true to push current state.

Your Starlark push function handles the logic. For example, tracked’s DELETE+INSERT pattern means you group by key to distinguish updates from real deletes.

Sync patterns

Upsert (merge)

-- @kind: merge
-- @unique_key: customer_id
-- @sink: crm_push
SELECT customer_id, email, plan FROM mart.customers

Push receives insert for new rows, update_postimage for changed rows. Map to POST and PATCH respectively.

Group sync (tracked)

-- @kind: tracked
-- @group_key: invoice_id
-- @sink: erp_push
SELECT invoice_id, line_item, quantity, price FROM mart.invoice_lines

Push receives delete + insert events. Group by invoice_id — if a key has both deletes and inserts, it’s an update. If only deletes, the entity was removed.

Append (new rows only)

-- @kind: append
-- @sink: slack_post
SELECT message, channel FROM mart.notifications

Push always receives insert — append-only data.

Full replace (table)

-- @kind: table
-- @sink: sheets_push
SELECT region, revenue FROM mart.regional_summary

Push receives delete events (from truncate) and insert events (new data). For a simple full replace, filter to inserts only.

Next steps