Create a Lib Function

On this page

How to create a lib function that fetches data from an API or pushes data to an external system. Follow the three-layer pattern: API dict declares the contract, Starlark handles I/O, SQL handles transformation.

1. Create the file

mkdir -p lib
touch lib/my_api.star

The file name becomes the function name in SQL: FROM my_api(...) or @sink: my_api.

2. Define the API dict

Start with base_url and auth. All values must be literals — the runtime parses the dict as AST without executing code.

API = {
    "base_url": "https://api.example.com",
    "auth": {"env": "MY_API_KEY"},
    "retry": 3,
    "backoff": 2,
    "rate_limit": {"requests": 100, "per": "10s"},
}

Auth patterns:

"auth": {"env": "API_KEY"}                                           # API key → Bearer header
"auth": {"env": "API_KEY", "header": "X-Api-Key"}                    # API key → custom header
"auth": {"provider": "hubspot"}                                      # OAuth2 (run ondatrasql auth hubspot first)
"auth": {"user": {"env": "USER"}, "pass": {"env": "PASS"}}           # Basic auth
"auth": {"service_account": {"env": "KEY_FILE"}, "scope": "..."}     # Google service account

See API Dict Reference for all auth patterns.

3. Add a fetch section (inbound)

API = {
    "base_url": "https://api.example.com",
    "auth": {"env": "MY_API_KEY"},
    "fetch": {
        "args": ["resource"],
        "page_size": 100,
    },
}
  • args — parameter names passed from SQL: FROM my_api('users')resource = "users"
  • page_size — enables pagination (runtime manages the loop)

SQL controls the schema. Column names and normalized types are extracted from your SELECT casts and passed to fetch() as the columns kwarg.

4. Write the fetch function

Starlark does I/O only — call the API, parse the response, return rows. Declare only the parameters you need — the runtime filters kwargs automatically.

def fetch(resource, page):
    resp = http.get("/v1/" + resource, params={
        "limit": page.size,
        "cursor": page.cursor,
    })
    if not resp.ok:
        fail("API error: " + str(resp.status_code) + " " + resp.text)

    data = resp.json
    return {
        "rows": data["items"],
        "next": data.get("next_cursor"),
    }
  • page.cursor is None on first call, then whatever you returned as next
  • page.size comes from page_size in the API dict
  • Return None for next to stop pagination
  • http.get uses base_url and auth from the API dict automatically

5. Create SQL models

Use the two-model pattern — raw fetches, staging transforms:

Raw model — column names match the API:

-- models/raw/users.sql
-- @kind: append
-- @incremental: updated_at

SELECT id, name, email, updated_at, metadata::JSON AS metadata
FROM my_api('users')

SQL casts flow to the blueprint as normalized types: ::JSON becomes "json", ::DECIMAL becomes "decimal". Columns without casts default to "string".

Staging model — SQL transforms, casts types, expands JSON:

-- models/staging/users.sql
-- @kind: table

SELECT
    id::BIGINT AS id,
    name,
    email,
    updated_at::TIMESTAMP AS updated_at,
    metadata->>'$.city' AS city,
    (metadata->>'$.score')::INTEGER AS score
FROM raw.users

Don’t transform in the raw model. Don’t call APIs in the staging model. Each layer does one thing.

6. Run and verify

# First run — backfill
ondatrasql run

# Check data
ondatrasql sql "SELECT * FROM staging.users LIMIT 5"

# Second run — only new data
ondatrasql run

7. Add a push section (outbound)

SQL controls what gets pushed. Starlark controls how:

API = {
    "base_url": "https://api.example.com",
    "auth": {"env": "MY_API_KEY"},
    "push": {
        "batch_size": 100,
        "batch_mode": "sync",
        "rate_limit": {"requests": 50, "per": "10s"},
    },
}

def push(rows=[], batch_number=1):
    results = {}
    for r in rows:
        ct = r["__ondatra_change_type"]
        key = str(r["__ondatra_rowid"]) + ":" + ct
        payload = {k: v for k, v in r.items() if not k.startswith("_")}

        if ct == "insert":
            resp = http.post("/v1/contacts", json=payload)
        elif ct == "update_postimage":
            resp = http.patch("/v1/contacts/" + str(r["id"]), json=payload)
        elif ct == "delete":
            resp = http.delete("/v1/contacts/" + str(r["id"]))
        elif ct == "update_preimage":
            results[key] = "ok"
            continue

        results[key] = "ok" if resp.ok else "error: " + resp.text
    return results

SQL model with @sink:

-- models/sync/contacts.sql
-- @kind: merge
-- @unique_key: id
-- @sink: my_api

SELECT id, name, email FROM staging.contacts

SQL can build nested JSON for APIs that expect structured payloads:

SELECT id, json_object('name', name, 'email', email, 'tags', json_group_array(tag)) AS properties
FROM staging.contacts JOIN staging.tags USING (id)
GROUP BY id, name, email

Common patterns

Incremental fetch

-- @kind: append
-- @incremental: updated_at
SELECT * FROM my_api('events')
def fetch(resource, page, is_backfill=True, last_value=""):
    params = {"limit": page.size, "cursor": page.cursor}
    if not is_backfill:
        params["since"] = last_value
    resp = http.get("/v1/" + resource, params=params)
    return {"rows": resp.json["items"], "next": resp.json.get("next")}

Dict cursor for complex pagination state

When you need to carry multiple values between pages, return a dict — it passes through directly:

next_cursor = {"url": fetch_url, "token": next_token, "series_idx": 3}
# ...
series_idx = page.cursor["series_idx"]

Options via JSON arg

For API-specific configuration beyond columns:

FROM my_api('users', '{"filter": "active", "fields": ["name", "email"]}')
API = {"fetch": {"args": ["resource", "options"]}}

def fetch(resource, options="", page=None):
    opts = json.decode(options) if options else {}

Async fetch (report-style APIs)

For APIs that create reports asynchronously:

API = {
    "fetch": {
        "async": True,
        "poll_interval": "5s",
        "poll_timeout": "5m",
        "poll_backoff": 2,
    },
}

def submit(columns=[], is_backfill=True, last_value=""):
    resp = http.post("/reports", json={"columns": [c["name"] for c in columns]})
    return {"job_id": resp.json["id"]}

def check(job_ref):
    resp = http.get("/reports/" + job_ref["job_id"])
    if resp.json["status"] == "complete":
        return {"url": resp.json["result_url"]}
    return None  # keep polling

def fetch_result(result_ref, page):
    resp = http.get(result_ref["url"] + "?page=" + str(page.cursor or 0))
    return {"rows": resp.json["data"], "next": resp.json.get("next_page")}

Finalize callback

def finalize(succeeded, failed):
    if failed == 0:
        http.post("/v1/webhooks", json={"event": "sync_complete", "rows": succeeded})

Reference