Fetch Contract
On this page
Your fetch() function is called once per page by the runtime. It receives pagination context and returns rows. Starlark handles I/O — SQL handles transformation.
Function signature
def fetch(arg1, arg2, ..., page, columns=[], target=""):
Arguments before page come from API.fetch.args. They receive values from the SQL call:
API = {"fetch": {"args": ["resource"]}}
SELECT * FROM my_api('users')
-- ↑
-- resource
Runtime-injected kwargs
| Kwarg | Type | Description |
|---|---|---|
columns | list | SELECT columns as typed dicts: [{"name": "total", "type": "number"}, ...]. Type is JSON Schema, mapped from SQL casts. |
target | string | Model target name (e.g. raw.orders). |
Both are optional — declare them with defaults if your blueprint needs them.
columns is how SQL communicates intent to Starlark. SQL casts control the types:
| SQL | Type in columns | Use case |
|---|---|---|
amount::DECIMAL | "number" | Numeric field |
count::INTEGER | "integer" | Integer field |
items::JSON | "array" | Structured data (arrays, objects) |
name (no cast) | "string" | String field (default) |
This lets blueprints adapt their API requests based on what SQL asks for — without hardcoding field lists.
Page object
Read-only struct:
| Field | Type | Description |
|---|---|---|
page.cursor | any | None on first page. On subsequent pages, whatever you returned as next. |
page.size | int | From API.fetch.page_size. Constant across all pages. |
page.number | int | 1-based page counter. |
Return format
return {
"rows": [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}],
"next": "cursor_abc123",
}
| Key | Required | Description |
|---|---|---|
"rows" | Yes | List of dicts. Each dict is one row. |
"next" | No | Cursor for the next page. Any type. None, missing, or "" stops pagination. |
The cursor is opaque — the runtime doesn’t interpret it. For complex state (multiple values needed across pages), serialize as JSON:
next_cursor = json.encode({"url": fetch_url, "token": next_token, "series_idx": 3})
Pagination
The runtime calls fetch() in a loop until "next" is None or missing.
Pagination patterns vary by API. The blueprint owns the logic:
- Cursor-based — API returns a next token, pass it back
- Offset-based — increment an offset by page size
- Date-range — advance a date window per page
- Multi-resource — iterate through resources, then dates within each
The runtime doesn’t care which pattern you use — it just follows the cursor.
Incremental state
If the SQL model uses @incremental, the incremental module is available:
| Field | Type | Description |
|---|---|---|
incremental.is_backfill | bool | True on first run (no target table). |
incremental.last_value | string | MAX(cursor_column) from previous run. On backfill: the @incremental_initial value. |
incremental.initial_value | string | Starting value from @incremental_initial. |
incremental.last_run | string | Timestamp of the most recent successful run. |
incremental.cursor | string | Column name from @incremental directive. |
All fields are read-only.
def fetch(page):
if incremental.is_backfill:
start_date = incremental.initial_value
else:
start_date = _next_day(incremental.last_value)
if start_date > yesterday:
abort()
resp = http.get("/data", params={"from": start_date})
return {"rows": resp.json, "next": None}
Error handling
Fail — stops the pipeline with an error:
if not resp.ok:
fail("API error: " + str(resp.status_code) + " " + resp.text)
Abort — clean exit, 0 rows, no error. Use when there’s nothing to fetch:
if start_date > yesterday:
abort()
The two-model pattern
Blueprints return raw API data. SQL transforms it in a downstream model. This keeps the layers separate:
Raw model — Starlark fetches, column names match the API:
-- models/raw/data.sql
-- @kind: append
-- @incremental: date
SELECT series, date, value
FROM my_api('SERIES_A,SERIES_B')
Staging model — SQL transforms, casts types, pivots, joins:
-- models/staging/data.sql
-- @kind: table
SELECT
date::DATE AS date,
MAX(CASE WHEN series = 'SERIES_A' THEN value END)::DECIMAL AS series_a,
MAX(CASE WHEN series = 'SERIES_B' THEN value END)::DECIMAL AS series_b
FROM raw.data
GROUP BY date
Don’t alias or transform in the raw model. Don’t call APIs in the staging model. Each layer does one thing.
Column type inference
Without explicit types, the runtime infers from data:
| Data | DuckDB type |
|---|---|
| Integer | BIGINT |
| Float | DOUBLE |
| Boolean | BOOLEAN |
| String, list, map | VARCHAR |
Lists and maps are JSON-serialized. Use ::JSON in SQL to signal structured data, then expand in a downstream model with json_each() or json_transform().
Available modules
| Module | Purpose |
|---|---|
http | HTTP requests. Auth, headers, retry, base_url from API dict injected automatically. |
incremental | Incremental state. Only available with @incremental. |
env | Environment variables. |
json | JSON encoding/decoding. |
time | Date/time operations. |
oauth | OAuth2 tokens. |
query() | Read-only SQL against DuckDB. |
crypto | Hashing and encoding. |
xml | XML parsing. |
csv | CSV parsing. |
url | URL utilities. |
See Starlark Modules for details.
OndatraSQL