Starlark Modules

On this page

Modules available in your lib functions. Starlark handles I/O — these modules provide the tools.

http

HTTP client. When used inside a lib function with an API dict, base_url, auth, headers, timeout, retry, and backoff are injected automatically into all methods — including upload. Per-call kwargs override.

Methods

resp = http.get(url, headers=?, params=?, timeout=?, retry=?, backoff=?, auth=?, cert=?, key=?, ca=?)
resp = http.post(url, json=?, data=?, body=?, headers=?, ...)
resp = http.put(url, ...)
resp = http.patch(url, ...)
resp = http.delete(url, ...)
resp = http.upload(url, file=, field=?, filename=?, headers=?, fields=?, ...)

Body kwargs are mutually exclusive: json= (dict/list), data= (form), body= (raw string).

Response object

FieldTypeDescription
status_codeintHTTP status code
textstringRaw response body
okboolTrue if 200–299
jsonanyParsed JSON or None
headersdictResponse headers

Per-call auth (overrides API dict)

resp = http.get(url, auth=("user", "pass"))           # Basic
resp = http.get(url, auth=("user", "pass", "digest"))  # Digest (RFC 7616)

mTLS (client certificates)

resp = http.get(url, cert="client.crt", key="client.key", ca="ca.crt")

File upload

resp = http.upload(url, file="invoice.pdf", field="document", fields={"purpose": "ocr"})

http.upload inherits base_url and auth from the API dict:

resp = http.upload("/v1/files", file="doc.pdf", fields={"purpose": "ocr"})

Retry behavior

Retries on 429 and 5xx. Exponential backoff with jitter. Respects Retry-After headers. Configure via API dict:

API = {"retry": 3, "backoff": 2}  # 3 retries, 2s/4s/8s backoff

incremental

Pipeline state from previous runs. Available when the SQL model uses @incremental.

FieldTypeDescription
is_backfillboolTrue if target table doesn’t exist (first run)
last_valuestringMAX(cursor_column) from previous run
initial_valuestringStarting value from @incremental_initial
last_runstringTimestamp of last successful run
cursorstringColumn name from @incremental directive

All fields are read-only.

if incremental.is_backfill:
    start = incremental.initial_value
else:
    start = _next_day(incremental.last_value)

sink

Batch context inside push() functions.

FieldTypeDescription
batch_numberint1-indexed batch number in current run
def push(rows):
    if sink.batch_number == 1:
        http.post(url + "/clear", json={})
    http.post(url + "/append", json=rows)

env

Access environment variables from .env or shell.

value = env.get("API_KEY")
value = env.get("API_KEY", default="fallback")
env.set("TEMP_VAR", "some_value")

oauth

OAuth2 token management.

Service account (Google APIs)

Configured in the API dict — no Starlark code needed:

API = {
    "auth": {
        "google_key_file_env": "GAM_KEY_FILE",
        "scope": "https://www.googleapis.com/auth/admanager",
    },
}

google_key_file_env resolves the key file path from .env. The runtime handles JWT signing and token refresh.

For direct use in Starlark (when API dict auth isn’t sufficient):

token = oauth.token(
    google_key_file=env.get("KEY_FILE"),
    scope="https://www.googleapis.com/auth/analytics",
)
headers = {"Authorization": "Bearer " + token.access_token}

OAuth2 provider (browser-based)

ondatrasql auth hubspot  # one-time login
API = {"auth": {"provider": "hubspot"}}
# Tokens refresh automatically

Client credentials

token = oauth.token(
    token_url="https://auth.example.com/token",
    client_id=env.get("CLIENT_ID"),
    client_secret=env.get("CLIENT_SECRET"),
    scope="read write",
)

json

JSON encoding and decoding (Starlark standard library).

s = json.encode({"name": "Alice", "age": 30})
data = json.decode('{"name": "Alice", "age": 30}')

time

Date and time operations (Starlark standard library).

now = time.now()
t = time.parse_time("2024-03-15T10:30:00Z")
t.year, t.month, t.day
t.unix       # Unix timestamp (seconds)
t.format("2006-01-02")  # Go layout format

tomorrow = t + time.parse_duration("24h")

xml

Parse and encode XML.

data = xml.decode('<user><name>Alice</name></user>')
# {"user": {"name": "Alice"}}

output = xml.encode({"user": {"name": "Alice"}})

XML attributes are prefixed with @:

data = xml.decode('<item id="42"><name>Widget</name></item>')
# {"item": {"@id": "42", "name": "Widget"}}

csv

Parse and encode CSV.

rows = csv.decode("name,age\nAlice,30\nBob,25")
# [{"name": "Alice", "age": "30"}, {"name": "Bob", "age": "25"}]

rows = csv.decode(data, delimiter="\t", header=False)  # returns list of lists

output = csv.encode([{"name": "Alice", "age": "30"}])

crypto

Hashing, encoding, and signing.

encoded = crypto.base64_encode("hello")
decoded = crypto.base64_decode(encoded)

sha = crypto.sha256("data")
md = crypto.md5("data")
sig = crypto.hmac_sha256("secret_key", "message")

url

URL building and parsing.

full = url.build("https://api.com/data", params={"page": "1"})
encoded = url.encode("hello world")
params = url.encode_params({"foo": "bar"})
parsed = url.parse("https://api.com/data?page=1")
# parsed.scheme, parsed.host, parsed.path

Global functions

abort()                        # clean exit, 0 rows, no error
fail("something went wrong")  # stop with error
sleep(1.5)                     # pause (prefer rate_limit in API dict)
print("debug info")           # stderr (secrets auto-redacted)
getvariable("currency")       # read DuckDB session variable
query("SELECT * FROM t")      # read-only SQL against DuckDB

query

Read-only SQL against the DuckDB session. Returns a list of dicts. Only SELECT and WITH statements.

rows = query("SELECT file FROM glob('data/*.pdf') ORDER BY file")
rates = query("SELECT currency, rate FROM mart.exchange_rates WHERE date = current_date")

Secret redaction

Secrets from .env are automatically removed from print() output and error messages:

Bearer eyJhbG...  →  Bearer [REDACTED]
token=abc123      →  token=[REDACTED]