Starlark Modules
On this page
Modules available in your lib functions. Starlark handles I/O — these modules provide the tools.
http
HTTP client. When used inside a lib function with an API dict, base_url, auth, headers, timeout, retry, and backoff are injected automatically into all methods — including upload. Per-call kwargs override.
Methods
resp = http.get(url, headers=?, params=?, timeout=?, retry=?, backoff=?, auth=?, cert=?, key=?, ca=?)
resp = http.post(url, json=?, data=?, body=?, headers=?, ...)
resp = http.put(url, ...)
resp = http.patch(url, ...)
resp = http.delete(url, ...)
resp = http.upload(url, file=, field=?, filename=?, headers=?, fields=?, ...)
Body kwargs are mutually exclusive: json= (dict/list), data= (form), body= (raw string).
Response object
| Field | Type | Description |
|---|---|---|
status_code | int | HTTP status code |
text | string | Raw response body |
ok | bool | True if 200–299 |
json | any | Parsed JSON or None |
headers | dict | Response headers |
Per-call auth (overrides API dict)
resp = http.get(url, auth=("user", "pass")) # Basic
resp = http.get(url, auth=("user", "pass", "digest")) # Digest (RFC 7616)
mTLS (client certificates)
resp = http.get(url, cert="client.crt", key="client.key", ca="ca.crt")
File upload
resp = http.upload(url, file="invoice.pdf", field="document", fields={"purpose": "ocr"})
http.upload inherits base_url and auth from the API dict:
resp = http.upload("/v1/files", file="doc.pdf", fields={"purpose": "ocr"})
Retry behavior
Retries on 429 and 5xx. Exponential backoff with jitter. Respects Retry-After headers. Configure via API dict:
API = {"retry": 3, "backoff": 2} # 3 retries, 2s/4s/8s backoff
incremental
Pipeline state from previous runs. Available when the SQL model uses @incremental.
| Field | Type | Description |
|---|---|---|
is_backfill | bool | True if target table doesn’t exist (first run) |
last_value | string | MAX(cursor_column) from previous run |
initial_value | string | Starting value from @incremental_initial |
last_run | string | Timestamp of last successful run |
cursor | string | Column name from @incremental directive |
All fields are read-only.
if incremental.is_backfill:
start = incremental.initial_value
else:
start = _next_day(incremental.last_value)
sink
Batch context inside push() functions.
| Field | Type | Description |
|---|---|---|
batch_number | int | 1-indexed batch number in current run |
def push(rows):
if sink.batch_number == 1:
http.post(url + "/clear", json={})
http.post(url + "/append", json=rows)
env
Access environment variables from .env or shell.
value = env.get("API_KEY")
value = env.get("API_KEY", default="fallback")
env.set("TEMP_VAR", "some_value")
oauth
OAuth2 token management.
Service account (Google APIs)
Configured in the API dict — no Starlark code needed:
API = {
"auth": {
"google_key_file_env": "GAM_KEY_FILE",
"scope": "https://www.googleapis.com/auth/admanager",
},
}
google_key_file_env resolves the key file path from .env. The runtime handles JWT signing and token refresh.
For direct use in Starlark (when API dict auth isn’t sufficient):
token = oauth.token(
google_key_file=env.get("KEY_FILE"),
scope="https://www.googleapis.com/auth/analytics",
)
headers = {"Authorization": "Bearer " + token.access_token}
OAuth2 provider (browser-based)
ondatrasql auth hubspot # one-time login
API = {"auth": {"provider": "hubspot"}}
# Tokens refresh automatically
Client credentials
token = oauth.token(
token_url="https://auth.example.com/token",
client_id=env.get("CLIENT_ID"),
client_secret=env.get("CLIENT_SECRET"),
scope="read write",
)
json
JSON encoding and decoding (Starlark standard library).
s = json.encode({"name": "Alice", "age": 30})
data = json.decode('{"name": "Alice", "age": 30}')
time
Date and time operations (Starlark standard library).
now = time.now()
t = time.parse_time("2024-03-15T10:30:00Z")
t.year, t.month, t.day
t.unix # Unix timestamp (seconds)
t.format("2006-01-02") # Go layout format
tomorrow = t + time.parse_duration("24h")
xml
Parse and encode XML.
data = xml.decode('<user><name>Alice</name></user>')
# {"user": {"name": "Alice"}}
output = xml.encode({"user": {"name": "Alice"}})
XML attributes are prefixed with @:
data = xml.decode('<item id="42"><name>Widget</name></item>')
# {"item": {"@id": "42", "name": "Widget"}}
csv
Parse and encode CSV.
rows = csv.decode("name,age\nAlice,30\nBob,25")
# [{"name": "Alice", "age": "30"}, {"name": "Bob", "age": "25"}]
rows = csv.decode(data, delimiter="\t", header=False) # returns list of lists
output = csv.encode([{"name": "Alice", "age": "30"}])
crypto
Hashing, encoding, and signing.
encoded = crypto.base64_encode("hello")
decoded = crypto.base64_decode(encoded)
sha = crypto.sha256("data")
md = crypto.md5("data")
sig = crypto.hmac_sha256("secret_key", "message")
url
URL building and parsing.
full = url.build("https://api.com/data", params={"page": "1"})
encoded = url.encode("hello world")
params = url.encode_params({"foo": "bar"})
parsed = url.parse("https://api.com/data?page=1")
# parsed.scheme, parsed.host, parsed.path
Global functions
abort() # clean exit, 0 rows, no error
fail("something went wrong") # stop with error
sleep(1.5) # pause (prefer rate_limit in API dict)
print("debug info") # stderr (secrets auto-redacted)
getvariable("currency") # read DuckDB session variable
query("SELECT * FROM t") # read-only SQL against DuckDB
query
Read-only SQL against the DuckDB session. Returns a list of dicts. Only SELECT and WITH statements.
rows = query("SELECT file FROM glob('data/*.pdf') ORDER BY file")
rates = query("SELECT currency, rate FROM mart.exchange_rates WHERE date = current_date")
Secret redaction
Secrets from .env are automatically removed from print() output and error messages:
Bearer eyJhbG... → Bearer [REDACTED]
token=abc123 → token=[REDACTED]
OndatraSQL