Create a Lib Function
On this page
How to create a lib function that fetches data from an API or pushes data to an external system. Follow the three-layer pattern: API dict declares the contract, Starlark handles I/O, SQL handles transformation.
1. Create the file
mkdir -p lib
touch lib/my_api.star
The file name becomes the function name in SQL: FROM my_api(...) or @sink: my_api.
2. Define the API dict
Start with base_url and auth. All values must be literals — the runtime parses the dict as AST without executing code.
API = {
"base_url": "https://api.example.com",
"auth": {"env": "MY_API_KEY"},
"retry": 3,
"backoff": 2,
"rate_limit": {"requests": 100, "per": "10s"},
}
Auth patterns:
"auth": {"env": "API_KEY"} # API key → Bearer header
"auth": {"env": "API_KEY", "header": "X-Api-Key"} # API key → custom header
"auth": {"provider": "hubspot"} # OAuth2 (run ondatrasql auth hubspot first)
"auth": {"user": {"env": "USER"}, "pass": {"env": "PASS"}} # Basic auth
"auth": {"service_account": {"env": "KEY_FILE"}, "scope": "..."} # Google service account
See API Dict Reference for all auth patterns.
3. Add a fetch section (inbound)
API = {
"base_url": "https://api.example.com",
"auth": {"env": "MY_API_KEY"},
"fetch": {
"args": ["resource"],
"page_size": 100,
},
}
args— parameter names passed from SQL:FROM my_api('users')→resource = "users"page_size— enables pagination (runtime manages the loop)
SQL controls the schema. Column names and normalized types are extracted from your SELECT casts and passed to fetch() as the columns kwarg.
4. Write the fetch function
Starlark does I/O only — call the API, parse the response, return rows. Declare only the parameters you need — the runtime filters kwargs automatically.
def fetch(resource, page):
resp = http.get("/v1/" + resource, params={
"limit": page.size,
"cursor": page.cursor,
})
if not resp.ok:
fail("API error: " + str(resp.status_code) + " " + resp.text)
data = resp.json
return {
"rows": data["items"],
"next": data.get("next_cursor"),
}
page.cursorisNoneon first call, then whatever you returned asnextpage.sizecomes frompage_sizein the API dict- Return
Nonefornextto stop pagination http.getusesbase_urlandauthfrom the API dict automatically
5. Create SQL models
Use the two-model pattern — raw fetches, staging transforms:
Raw model — column names match the API:
-- models/raw/users.sql
-- @kind: append
-- @incremental: updated_at
SELECT id, name, email, updated_at, metadata::JSON AS metadata
FROM my_api('users')
SQL casts flow to the blueprint as normalized types: ::JSON becomes "json", ::DECIMAL becomes "decimal". Columns without casts default to "string".
Staging model — SQL transforms, casts types, expands JSON:
-- models/staging/users.sql
-- @kind: table
SELECT
id::BIGINT AS id,
name,
email,
updated_at::TIMESTAMP AS updated_at,
metadata->>'$.city' AS city,
(metadata->>'$.score')::INTEGER AS score
FROM raw.users
Don’t transform in the raw model. Don’t call APIs in the staging model. Each layer does one thing.
6. Run and verify
# First run — backfill
ondatrasql run
# Check data
ondatrasql sql "SELECT * FROM staging.users LIMIT 5"
# Second run — only new data
ondatrasql run
7. Add a push section (outbound)
SQL controls what gets pushed. Starlark controls how:
API = {
"base_url": "https://api.example.com",
"auth": {"env": "MY_API_KEY"},
"push": {
"batch_size": 100,
"batch_mode": "sync",
"rate_limit": {"requests": 50, "per": "10s"},
},
}
def push(rows=[], batch_number=1):
results = {}
for r in rows:
ct = r["__ondatra_change_type"]
key = str(r["__ondatra_rowid"]) + ":" + ct
payload = {k: v for k, v in r.items() if not k.startswith("_")}
if ct == "insert":
resp = http.post("/v1/contacts", json=payload)
elif ct == "update_postimage":
resp = http.patch("/v1/contacts/" + str(r["id"]), json=payload)
elif ct == "delete":
resp = http.delete("/v1/contacts/" + str(r["id"]))
elif ct == "update_preimage":
results[key] = "ok"
continue
results[key] = "ok" if resp.ok else "error: " + resp.text
return results
SQL model with @sink:
-- models/sync/contacts.sql
-- @kind: merge
-- @unique_key: id
-- @sink: my_api
SELECT id, name, email FROM staging.contacts
SQL can build nested JSON for APIs that expect structured payloads:
SELECT id, json_object('name', name, 'email', email, 'tags', json_group_array(tag)) AS properties
FROM staging.contacts JOIN staging.tags USING (id)
GROUP BY id, name, email
Common patterns
Incremental fetch
-- @kind: append
-- @incremental: updated_at
SELECT * FROM my_api('events')
def fetch(resource, page, is_backfill=True, last_value=""):
params = {"limit": page.size, "cursor": page.cursor}
if not is_backfill:
params["since"] = last_value
resp = http.get("/v1/" + resource, params=params)
return {"rows": resp.json["items"], "next": resp.json.get("next")}
Dict cursor for complex pagination state
When you need to carry multiple values between pages, return a dict — it passes through directly:
next_cursor = {"url": fetch_url, "token": next_token, "series_idx": 3}
# ...
series_idx = page.cursor["series_idx"]
Options via JSON arg
For API-specific configuration beyond columns:
FROM my_api('users', '{"filter": "active", "fields": ["name", "email"]}')
API = {"fetch": {"args": ["resource", "options"]}}
def fetch(resource, options="", page=None):
opts = json.decode(options) if options else {}
Async fetch (report-style APIs)
For APIs that create reports asynchronously:
API = {
"fetch": {
"async": True,
"poll_interval": "5s",
"poll_timeout": "5m",
"poll_backoff": 2,
},
}
def submit(columns=[], is_backfill=True, last_value=""):
resp = http.post("/reports", json={"columns": [c["name"] for c in columns]})
return {"job_id": resp.json["id"]}
def check(job_ref):
resp = http.get("/reports/" + job_ref["job_id"])
if resp.json["status"] == "complete":
return {"url": resp.json["result_url"]}
return None # keep polling
def fetch_result(result_ref, page):
resp = http.get(result_ref["url"] + "?page=" + str(page.cursor or 0))
return {"rows": resp.json["data"], "next": resp.json.get("next_page")}
Finalize callback
def finalize(succeeded, failed):
if failed == 0:
http.post("/v1/webhooks", json={"event": "sync_complete", "rows": succeeded})
Reference
- API Dict — all fields and options
- Fetch Contract — complete function spec
- Push Contract — change types, batch modes, return values
- Starlark Modules — http, env, builtins, csv, xml, time
- Blueprints — working examples for Mistral OCR, Riksbank, Google Ad Manager
OndatraSQL