Create a Lib Function

On this page

How to create a lib function that fetches data from an API or pushes data to an external system. Follow the three-layer pattern: API dict declares the contract, Starlark handles I/O, SQL handles transformation.

1. Create the file

mkdir -p lib
touch lib/my_api.star

The file name becomes the function name in SQL: FROM my_api(...) or @sink: my_api.

2. Define the API dict

Start with base_url and auth. All values must be literals — the runtime parses the dict as AST without executing code.

API = {
    "base_url": "https://api.example.com",
    "auth": {"env": "MY_API_KEY"},
    "retry": 3,
    "backoff": 2,
    "rate_limit": {"requests": 100, "per": "10s"},
}

Auth patterns:

"auth": {"env": "API_KEY"}                                # API key → Bearer header
"auth": {"env": "API_KEY", "header": "X-Api-Key"}         # API key → custom header
"auth": {"provider": "hubspot"}                            # OAuth2 (run ondatrasql auth hubspot first)
"auth": {"env_user": "USER", "env_pass": "PASS"}          # Basic auth
"auth": {"google_key_file_env": "KEY_FILE", "scope": "..."} # Google service account from .env

See API Dict Reference for all auth patterns.

3. Add a fetch section (inbound)

API = {
    "base_url": "https://api.example.com",
    "auth": {"env": "MY_API_KEY"},
    "fetch": {
        "args": ["resource"],
        "page_size": 100,
        "dynamic_columns": True,
    },
}
  • args — parameter names passed from SQL: FROM my_api('users')resource = "users"
  • page_size — enables pagination (runtime manages the loop)
  • dynamic_columns: True — SQL controls the schema. Column names and types are extracted from your SELECT casts and passed to fetch() as the columns kwarg

4. Write the fetch function

Starlark does I/O only — call the API, parse the response, return rows:

def fetch(resource, page):
    resp = http.get("/v1/" + resource, params={
        "limit": page.size,
        "cursor": page.cursor,
    })
    if not resp.ok:
        fail("API error: " + str(resp.status_code) + " " + resp.text)

    data = resp.json
    return {
        "rows": data["items"],
        "next": data.get("next_cursor"),
    }
  • page.cursor is None on first call, then whatever you returned as next
  • page.size comes from page_size in the API dict
  • Return None for next to stop pagination
  • http.get uses base_url and auth from the API dict automatically

5. Create SQL models

Use the two-model pattern — raw fetches, staging transforms:

Raw model — column names match the API:

-- models/raw/users.sql
-- @kind: append
-- @incremental: updated_at

SELECT id, name, email, updated_at, metadata::JSON AS metadata
FROM my_api('users')

SQL casts flow to the blueprint: ::JSON tells the API to return structured data, ::DECIMAL tells it to return numbers. Columns without casts default to strings.

Staging model — SQL transforms, casts types, expands JSON:

-- models/staging/users.sql
-- @kind: table

SELECT
    id::BIGINT AS id,
    name,
    email,
    updated_at::TIMESTAMP AS updated_at,
    metadata->>'$.city' AS city,
    (metadata->>'$.score')::INTEGER AS score
FROM raw.users

Don’t transform in the raw model. Don’t call APIs in the staging model. Each layer does one thing.

6. Run and verify

# First run — backfill
ondatrasql run

# Check data
ondatrasql sql "SELECT * FROM staging.users LIMIT 5"

# Second run — only new data
ondatrasql run

7. Add a push section (outbound)

SQL controls what gets pushed. Starlark controls how:

API = {
    "base_url": "https://api.example.com",
    "auth": {"env": "MY_API_KEY"},
    "push": {
        "batch_size": 100,
        "batch_mode": "sync",
        "rate_limit": {"requests": 50, "per": "10s"},
    },
}

def push(rows):
    results = {}
    for r in rows:
        ct = r["__ondatra_change_type"]
        key = str(r["__ondatra_rowid"]) + ":" + ct
        payload = {k: v for k, v in r.items() if not k.startswith("_")}

        if ct == "insert":
            resp = http.post("/v1/contacts", json=payload)
        elif ct == "update_postimage":
            resp = http.patch("/v1/contacts/" + str(r["id"]), json=payload)
        elif ct == "delete":
            resp = http.delete("/v1/contacts/" + str(r["id"]))
        elif ct == "update_preimage":
            results[key] = "ok"
            continue

        results[key] = "ok" if resp.ok else "error: " + resp.text
    return results

SQL model with @sink:

-- models/sync/contacts.sql
-- @kind: merge
-- @unique_key: id
-- @sink: my_api

SELECT id, name, email FROM staging.contacts

SQL can build nested JSON for APIs that expect structured payloads:

SELECT id, json_object('name', name, 'email', email, 'tags', json_group_array(tag)) AS properties
FROM staging.contacts JOIN staging.tags USING (id)
GROUP BY id, name, email

Common patterns

Incremental fetch

-- @kind: append
-- @incremental: updated_at
SELECT * FROM my_api('events')
def fetch(resource, page):
    params = {"limit": page.size, "cursor": page.cursor}
    if not incremental.is_backfill:
        params["since"] = incremental.last_value
    resp = http.get("/v1/" + resource, params=params)
    return {"rows": resp.json["items"], "next": resp.json.get("next")}

JSON cursor for complex pagination state

When you need to carry multiple values between pages:

next_cursor = json.encode({"url": fetch_url, "token": next_token, "series_idx": 3})
# ...
state = json.decode(page.cursor)

Options via JSON arg

For API-specific configuration beyond columns:

FROM my_api('users', '{"filter": "active", "fields": ["name", "email"]}')
API = {"fetch": {"args": ["resource", "options"]}}

def fetch(resource, options, page):
    opts = json.decode(options) if options else {}

Finalize callback

def finalize(succeeded, failed):
    if failed == 0:
        http.post("/v1/webhooks", json={"event": "sync_complete", "rows": succeeded})

Reference