Scripting
Built-in scripting for ingestion — no Python required
OndatraSQL uses Starlark for data ingestion and custom logic. No Python. No dependencies. No runtime setup.
Write scripts that call APIs, handle pagination and auth, and emit rows directly into your pipeline. All in the same system as your SQL models.
Mental Model
Starlark models produce rows. SQL models transform them. Both are part of the same DAG.
How It Works
# @kind: append
# @incremental: updated_at
resp = http.get("https://api.example.com/data")
for item in resp.json:
save.row({
"id": item["id"],
"name": item["name"],
"updated_at": item["updated_at"],
})
What happens:
- Script fetches data
save.row()emits rows- OndatraSQL materializes the result into DuckLake
Same directives, same DAG, same validation as SQL models.
Why Starlark
- No Python environment to manage
- No dependency conflicts
- Deterministic execution
- Runs the same everywhere
Built-in Modules
Everything you need is built in — no imports required. No SDKs, no client libraries, no setup.
| Module | What it does |
|---|---|
| http | API requests with retry, digest auth, mTLS |
| oauth | OAuth 2.0 flows with auto-refresh |
| save | Emit rows to the pipeline |
| query | Read DuckDB tables |
| incremental | Cursor state for incremental loads |
| env | Access environment variables |
| xml, csv | Parse and encode data formats |
| url, crypto | URL building, hashing, signing |
| time, math, json, re | Standard library |
See Language Reference for Starlark syntax.
Shared Libraries
Reuse logic across models. Write once, use everywhere.
load("lib/helpers.star", "paginate")
for page in paginate("https://api.example.com/users"):
for user in page:
save.row(user)
How load() Works
- Resolved relative to project root
- Executed once per run, cached across all
load()calls - Nested loads supported (A loads B loads C)
- Import cycles detected
- Path traversal blocked
Library modules have access to all built-ins except save (must be passed as a parameter to functions that write data).
YAML Models
Configure ingestion without writing code. Use when the logic already exists in lib/.
kind: append
incremental: report_date
source: gam_report
config:
network_code: ${GAM_NETWORK_CODE}
dimensions:
- AD_UNIT_NAME
- DATE
# lib/gam_report.star
def gam_report(save, network_code="", dimensions=None):
start = incremental.initial_value if incremental.is_backfill else incremental.last_value
# ... fetch and paginate API ...
for row in results:
save.row(row)
See Models for the full YAML reference.
Example: Paginated API
# @kind: append
page = 1
while True:
resp = http.get("https://api.example.com/users?page=" + str(page))
if len(resp.json) == 0:
break
for user in resp.json:
save.row(user)
page += 1
sleep(0.1) # rate limit
Global Functions
abort() # clean exit — 0 rows, no error
fail("something went wrong") # stop with error
sleep(1.5) # rate limiting
print("debug info") # stderr (secrets auto-redacted)
getvariable("currency") # read DuckDB session variable
Useful for retries, rate limits, and conditional execution.
Secret Redaction
Secrets are automatically removed from logs. No configuration required.
Bearer eyJhbG... → Bearer [REDACTED]
token=abc123 → token=[REDACTED]
Safe by default — no accidental leaks in logs.
Ondatra Labs