Create a Lib Function
On this page
How to create a lib function that fetches data from an API or pushes data to an external system. Follow the three-layer pattern: API dict declares the contract, Starlark handles I/O, SQL handles transformation.
1. Create the file
mkdir -p lib
touch lib/my_api.star
The file name becomes the function name in SQL: FROM my_api(...) or @sink: my_api.
2. Define the API dict
Start with base_url and auth. All values must be literals — the runtime parses the dict as AST without executing code.
API = {
"base_url": "https://api.example.com",
"auth": {"env": "MY_API_KEY"},
"retry": 3,
"backoff": 2,
"rate_limit": {"requests": 100, "per": "10s"},
}
Auth patterns:
"auth": {"env": "API_KEY"} # API key → Bearer header
"auth": {"env": "API_KEY", "header": "X-Api-Key"} # API key → custom header
"auth": {"provider": "hubspot"} # OAuth2 (run ondatrasql auth hubspot first)
"auth": {"env_user": "USER", "env_pass": "PASS"} # Basic auth
"auth": {"google_key_file_env": "KEY_FILE", "scope": "..."} # Google service account from .env
See API Dict Reference for all auth patterns.
3. Add a fetch section (inbound)
API = {
"base_url": "https://api.example.com",
"auth": {"env": "MY_API_KEY"},
"fetch": {
"args": ["resource"],
"page_size": 100,
"dynamic_columns": True,
},
}
args— parameter names passed from SQL:FROM my_api('users')→resource = "users"page_size— enables pagination (runtime manages the loop)dynamic_columns: True— SQL controls the schema. Column names and types are extracted from your SELECT casts and passed tofetch()as thecolumnskwarg
4. Write the fetch function
Starlark does I/O only — call the API, parse the response, return rows:
def fetch(resource, page):
resp = http.get("/v1/" + resource, params={
"limit": page.size,
"cursor": page.cursor,
})
if not resp.ok:
fail("API error: " + str(resp.status_code) + " " + resp.text)
data = resp.json
return {
"rows": data["items"],
"next": data.get("next_cursor"),
}
page.cursorisNoneon first call, then whatever you returned asnextpage.sizecomes frompage_sizein the API dict- Return
Nonefornextto stop pagination http.getusesbase_urlandauthfrom the API dict automatically
5. Create SQL models
Use the two-model pattern — raw fetches, staging transforms:
Raw model — column names match the API:
-- models/raw/users.sql
-- @kind: append
-- @incremental: updated_at
SELECT id, name, email, updated_at, metadata::JSON AS metadata
FROM my_api('users')
SQL casts flow to the blueprint: ::JSON tells the API to return structured data, ::DECIMAL tells it to return numbers. Columns without casts default to strings.
Staging model — SQL transforms, casts types, expands JSON:
-- models/staging/users.sql
-- @kind: table
SELECT
id::BIGINT AS id,
name,
email,
updated_at::TIMESTAMP AS updated_at,
metadata->>'$.city' AS city,
(metadata->>'$.score')::INTEGER AS score
FROM raw.users
Don’t transform in the raw model. Don’t call APIs in the staging model. Each layer does one thing.
6. Run and verify
# First run — backfill
ondatrasql run
# Check data
ondatrasql sql "SELECT * FROM staging.users LIMIT 5"
# Second run — only new data
ondatrasql run
7. Add a push section (outbound)
SQL controls what gets pushed. Starlark controls how:
API = {
"base_url": "https://api.example.com",
"auth": {"env": "MY_API_KEY"},
"push": {
"batch_size": 100,
"batch_mode": "sync",
"rate_limit": {"requests": 50, "per": "10s"},
},
}
def push(rows):
results = {}
for r in rows:
ct = r["__ondatra_change_type"]
key = str(r["__ondatra_rowid"]) + ":" + ct
payload = {k: v for k, v in r.items() if not k.startswith("_")}
if ct == "insert":
resp = http.post("/v1/contacts", json=payload)
elif ct == "update_postimage":
resp = http.patch("/v1/contacts/" + str(r["id"]), json=payload)
elif ct == "delete":
resp = http.delete("/v1/contacts/" + str(r["id"]))
elif ct == "update_preimage":
results[key] = "ok"
continue
results[key] = "ok" if resp.ok else "error: " + resp.text
return results
SQL model with @sink:
-- models/sync/contacts.sql
-- @kind: merge
-- @unique_key: id
-- @sink: my_api
SELECT id, name, email FROM staging.contacts
SQL can build nested JSON for APIs that expect structured payloads:
SELECT id, json_object('name', name, 'email', email, 'tags', json_group_array(tag)) AS properties
FROM staging.contacts JOIN staging.tags USING (id)
GROUP BY id, name, email
Common patterns
Incremental fetch
-- @kind: append
-- @incremental: updated_at
SELECT * FROM my_api('events')
def fetch(resource, page):
params = {"limit": page.size, "cursor": page.cursor}
if not incremental.is_backfill:
params["since"] = incremental.last_value
resp = http.get("/v1/" + resource, params=params)
return {"rows": resp.json["items"], "next": resp.json.get("next")}
JSON cursor for complex pagination state
When you need to carry multiple values between pages:
next_cursor = json.encode({"url": fetch_url, "token": next_token, "series_idx": 3})
# ...
state = json.decode(page.cursor)
Options via JSON arg
For API-specific configuration beyond columns:
FROM my_api('users', '{"filter": "active", "fields": ["name", "email"]}')
API = {"fetch": {"args": ["resource", "options"]}}
def fetch(resource, options, page):
opts = json.decode(options) if options else {}
Finalize callback
def finalize(succeeded, failed):
if failed == 0:
http.post("/v1/webhooks", json={"event": "sync_complete", "rows": succeeded})
Reference
- API Dict — all fields and options
- Fetch Contract — complete function spec
- Push Contract — change types, batch modes, return values
- Starlark Modules — http, env, crypto, csv, xml, time
- Blueprints — working examples for Mistral OCR, Riksbank, Google Ad Manager
OndatraSQL