Fetch Contract

On this page

Your fetch() function is called once per page by the runtime. It receives pagination context and returns rows. Starlark handles I/O — SQL handles transformation.

Function signature

def fetch(arg1, arg2, ..., page, columns=[], target=""):

Arguments before page come from API.fetch.args. They receive values from the SQL call:

API = {"fetch": {"args": ["resource"]}}
SELECT * FROM my_api('users')
--                    ↑
--                 resource

Runtime-injected kwargs

KwargTypeDescription
columnslistSELECT columns as typed dicts: [{"name": "total", "type": "number"}, ...]. Type is JSON Schema, mapped from SQL casts.
targetstringModel target name (e.g. raw.orders).

Both are optional — declare them with defaults if your blueprint needs them.

columns is how SQL communicates intent to Starlark. SQL casts control the types:

SQLType in columnsUse case
amount::DECIMAL"number"Numeric field
count::INTEGER"integer"Integer field
items::JSON"array"Structured data (arrays, objects)
name (no cast)"string"String field (default)

This lets blueprints adapt their API requests based on what SQL asks for — without hardcoding field lists.

Page object

Read-only struct:

FieldTypeDescription
page.cursoranyNone on first page. On subsequent pages, whatever you returned as next.
page.sizeintFrom API.fetch.page_size. Constant across all pages.
page.numberint1-based page counter.

Return format

return {
    "rows": [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}],
    "next": "cursor_abc123",
}
KeyRequiredDescription
"rows"YesList of dicts. Each dict is one row.
"next"NoCursor for the next page. Any type. None, missing, or "" stops pagination.

The cursor is opaque — the runtime doesn’t interpret it. For complex state (multiple values needed across pages), serialize as JSON:

next_cursor = json.encode({"url": fetch_url, "token": next_token, "series_idx": 3})

Pagination

The runtime calls fetch() in a loop until "next" is None or missing.

Pagination patterns vary by API. The blueprint owns the logic:

  • Cursor-based — API returns a next token, pass it back
  • Offset-based — increment an offset by page size
  • Date-range — advance a date window per page
  • Multi-resource — iterate through resources, then dates within each

The runtime doesn’t care which pattern you use — it just follows the cursor.

Incremental state

If the SQL model uses @incremental, the incremental module is available:

FieldTypeDescription
incremental.is_backfillboolTrue on first run (no target table).
incremental.last_valuestringMAX(cursor_column) from previous run. On backfill: the @incremental_initial value.
incremental.initial_valuestringStarting value from @incremental_initial.
incremental.last_runstringTimestamp of the most recent successful run.
incremental.cursorstringColumn name from @incremental directive.

All fields are read-only.

def fetch(page):
    if incremental.is_backfill:
        start_date = incremental.initial_value
    else:
        start_date = _next_day(incremental.last_value)

    if start_date > yesterday:
        abort()

    resp = http.get("/data", params={"from": start_date})
    return {"rows": resp.json, "next": None}

Error handling

Fail — stops the pipeline with an error:

if not resp.ok:
    fail("API error: " + str(resp.status_code) + " " + resp.text)

Abort — clean exit, 0 rows, no error. Use when there’s nothing to fetch:

if start_date > yesterday:
    abort()

The two-model pattern

Blueprints return raw API data. SQL transforms it in a downstream model. This keeps the layers separate:

Raw model — Starlark fetches, column names match the API:

-- models/raw/data.sql
-- @kind: append
-- @incremental: date

SELECT series, date, value
FROM my_api('SERIES_A,SERIES_B')

Staging model — SQL transforms, casts types, pivots, joins:

-- models/staging/data.sql
-- @kind: table

SELECT
    date::DATE AS date,
    MAX(CASE WHEN series = 'SERIES_A' THEN value END)::DECIMAL AS series_a,
    MAX(CASE WHEN series = 'SERIES_B' THEN value END)::DECIMAL AS series_b
FROM raw.data
GROUP BY date

Don’t alias or transform in the raw model. Don’t call APIs in the staging model. Each layer does one thing.

Column type inference

Without explicit types, the runtime infers from data:

DataDuckDB type
IntegerBIGINT
FloatDOUBLE
BooleanBOOLEAN
String, list, mapVARCHAR

Lists and maps are JSON-serialized. Use ::JSON in SQL to signal structured data, then expand in a downstream model with json_each() or json_transform().

Available modules

ModulePurpose
httpHTTP requests. Auth, headers, retry, base_url from API dict injected automatically.
incrementalIncremental state. Only available with @incremental.
envEnvironment variables.
jsonJSON encoding/decoding.
timeDate/time operations.
oauthOAuth2 tokens.
query()Read-only SQL against DuckDB.
cryptoHashing and encoding.
xmlXML parsing.
csvCSV parsing.
urlURL utilities.

See Starlark Modules for details.