Fetch Data from APIs

On this page

Pull data from external APIs into your DuckLake pipeline. The runtime handles pagination, retries, and incremental state — your lib function handles the API-specific I/O. SQL handles the transformation.

The pattern

Every API integration follows the same structure:

  1. API dict — declares auth, endpoints, rate limits, pagination
  2. Starlark fetch() — calls the API, parses responses, returns rows
  3. Raw SQL model — fetches with API column names
  4. Staging SQL model — transforms: casts types, renames, pivots, expands JSON
lib/my_api.star          → I/O logic
models/raw/data.sql      → FROM my_api('resource')
models/staging/data.sql   → SELECT ... FROM raw.data (transformation)

Quick start

1. Create a lib function

# lib/my_api.star
API = {
    "base_url": "https://api.example.com",
    "auth": {"env": "MY_API_KEY"},
    "fetch": {
        "args": ["resource"],
        "page_size": 100,
        "dynamic_columns": True,
    },
}

def fetch(resource, page):
    resp = http.get("/v1/" + resource, params={
        "limit": page.size,
        "cursor": page.cursor,
    })
    if not resp.ok:
        fail("API error: " + str(resp.status_code))

    data = resp.json
    return {"rows": data["items"], "next": data.get("next_cursor")}

2. Write SQL models

Raw — fetch with API column names:

-- models/raw/users.sql
-- @kind: table

SELECT id, name, email, metadata::JSON AS metadata
FROM my_api('users')

Staging — transform in SQL:

-- models/staging/users.sql
-- @kind: table

SELECT
    id::BIGINT AS id,
    name,
    LOWER(email) AS email,
    metadata->>'$.city' AS city
FROM raw.users

3. Run

ondatrasql run

Both models participate in the DAG. The staging model runs automatically after the raw model.

SQL casts control the API

With dynamic_columns: True, the runtime extracts column names and types from your SELECT casts via DuckDB AST. These are passed to fetch() as the columns kwarg:

SQLType passed to blueprintUse case
amount::DECIMAL"number"Numeric field
count::INTEGER"integer"Integer field
items::JSON"array"Structured data (arrays, objects)
name (no cast)"string"String (default)

Blueprints can use this to adapt their API requests. For example, the GAM blueprint splits columns into dimensions (string) and metrics (numeric) based on cast type.

Pagination patterns

The runtime calls fetch() in a loop until "next" is None. Your blueprint owns the pagination logic:

Cursor-based

def fetch(resource, page):
    params = {"limit": page.size}
    if page.cursor:
        params["starting_after"] = page.cursor

    resp = http.get("/v1/" + resource, params=params)
    data = resp.json

    next_cursor = None
    if data.get("has_more") and len(data["data"]) > 0:
        next_cursor = data["data"][-1]["id"]

    return {"rows": data["data"], "next": next_cursor}

Offset-based

def fetch(resource, page):
    offset = page.cursor or 0
    resp = http.get("/v1/" + resource, params={"limit": page.size, "offset": offset})
    rows = resp.json["items"]
    next_offset = offset + page.size if len(rows) == page.size else None
    return {"rows": rows, "next": next_offset}

Date-range

Fetch one time window per page:

def fetch(series, page):
    cursor_date = start_date if page.cursor == None else page.cursor
    end_date = min(_add_days(cursor_date, 365), yesterday)

    resp = http.get("/data/" + series + "/" + cursor_date + "/" + end_date)
    rows = [{"date": obs["date"], "value": obs["value"]} for obs in resp.json]

    next_cursor = _next_day(end_date) if end_date < yesterday else None
    return {"rows": rows, "next": next_cursor}

Complex state (JSON cursor)

When you need to carry multiple values between pages:

next_cursor = json.encode({"url": fetch_url, "token": next_token, "series_idx": 3})
# On next page:
state = json.decode(page.cursor)

Nested and structured data

APIs return nested JSON. Blueprints return it as-is — SQL handles expansion in a downstream model.

json_each — simplest, no schema needed

-- Raw: fetch with arrays as JSON
SELECT id, name, tags::JSON AS tags FROM my_api('users')

-- Staging: expand
SELECT id, name, j.value->>'name' AS tag
FROM raw.users, LATERAL json_each(tags) AS j

json_transform — typed expansion

SELECT id, x.*
FROM raw.users,
LATERAL (SELECT unnest(json_transform(tags,
    '[{"name":"VARCHAR","color":"VARCHAR"}]')) AS x)

Arrow operators — direct access

SELECT id, metadata->>'$.address.city' AS city
FROM raw.users

Pivot — normalize then pivot in SQL

When the API returns one row per measurement:

-- Raw: normalized (series, date, value)
SELECT series, date, value FROM riksbank('SEKEURPMI,SEKUSDPMI')

-- Staging: pivoted
SELECT date::DATE,
    MAX(CASE WHEN series = 'SEKEURPMI' THEN value END)::DECIMAL AS eur,
    MAX(CASE WHEN series = 'SEKUSDPMI' THEN value END)::DECIMAL AS usd
FROM raw.exchange_rates
GROUP BY date

Or with DuckDB’s native PIVOT:

PIVOT raw.exchange_rates ON series USING MAX(value) GROUP BY date::DATE

Incremental loads

Use @incremental to fetch only new data on subsequent runs:

-- models/raw/events.sql
-- @kind: append
-- @incremental: created_at

SELECT * FROM my_api('events')
def fetch(resource, page):
    params = {"limit": page.size, "cursor": page.cursor}
    if not incremental.is_backfill:
        params["created_after"] = incremental.last_value

    resp = http.get("/v1/" + resource, params=params)
    return {"rows": resp.json["items"], "next": resp.json.get("next")}
FieldDescription
incremental.is_backfillTrue on first run — fetch everything
incremental.last_valueMAX(cursor_column) from previous run
incremental.initial_valueStarting value from @incremental_initial

Authentication

Configured in the API dict. Injected into every http.* call automatically.

# API key
"auth": {"env": "API_KEY"}

# OAuth provider (browser-based)
"auth": {"provider": "hubspot"}

# Basic auth
"auth": {"env_user": "USER", "env_pass": "PASS"}

# Google service account (key file path from .env)
"auth": {"google_key_file_env": "KEY_FILE", "scope": "https://..."}

# Google service account (literal path)
"auth": {"google_key_file": "service-account.json", "scope": "https://..."}

Error handling

Fail — stops the pipeline:

if not resp.ok:
    fail("API error: " + str(resp.status_code) + " " + resp.text)

Abort — clean exit, 0 rows, no error:

if start_date > yesterday:
    abort()

Retry — automatic. Configure in API dict:

API = {"retry": 3, "backoff": 2, "timeout": 30}

Rate limiting — declare in API dict, runtime handles it:

API = {"rate_limit": {"requests": 2, "per": "1s"}}

Options via JSON arg

For API-specific configuration beyond columns:

FROM gam_report('{"custom_dimensions": [11678108], "currency": "USD"}')
API = {"fetch": {"args": ["options"]}}

def fetch(options, page, columns=[]):
    opts = json.decode(options) if options else {}

Reference