Fetch Data from APIs
On this page
Pull data from external APIs into your DuckLake pipeline. The runtime handles pagination, retries, and incremental state — your lib function handles the API-specific I/O. SQL handles the transformation.
The pattern
Every API integration follows the same structure:
- API dict — declares auth, endpoints, rate limits, pagination
- Starlark
fetch()— calls the API, parses responses, returns rows - Raw SQL model — fetches with API column names
- Staging SQL model — transforms: casts types, renames, pivots, expands JSON
lib/my_api.star → I/O logic
models/raw/data.sql → FROM my_api('resource')
models/staging/data.sql → SELECT ... FROM raw.data (transformation)
Quick start
1. Create a lib function
# lib/my_api.star
API = {
"base_url": "https://api.example.com",
"auth": {"env": "MY_API_KEY"},
"fetch": {
"args": ["resource"],
"page_size": 100,
"dynamic_columns": True,
},
}
def fetch(resource, page):
resp = http.get("/v1/" + resource, params={
"limit": page.size,
"cursor": page.cursor,
})
if not resp.ok:
fail("API error: " + str(resp.status_code))
data = resp.json
return {"rows": data["items"], "next": data.get("next_cursor")}
2. Write SQL models
Raw — fetch with API column names:
-- models/raw/users.sql
-- @kind: table
SELECT id, name, email, metadata::JSON AS metadata
FROM my_api('users')
Staging — transform in SQL:
-- models/staging/users.sql
-- @kind: table
SELECT
id::BIGINT AS id,
name,
LOWER(email) AS email,
metadata->>'$.city' AS city
FROM raw.users
3. Run
ondatrasql run
Both models participate in the DAG. The staging model runs automatically after the raw model.
SQL casts control the API
With dynamic_columns: True, the runtime extracts column names and types from your SELECT casts via DuckDB AST. These are passed to fetch() as the columns kwarg:
| SQL | Type passed to blueprint | Use case |
|---|---|---|
amount::DECIMAL | "number" | Numeric field |
count::INTEGER | "integer" | Integer field |
items::JSON | "array" | Structured data (arrays, objects) |
name (no cast) | "string" | String (default) |
Blueprints can use this to adapt their API requests. For example, the GAM blueprint splits columns into dimensions (string) and metrics (numeric) based on cast type.
Pagination patterns
The runtime calls fetch() in a loop until "next" is None. Your blueprint owns the pagination logic:
Cursor-based
def fetch(resource, page):
params = {"limit": page.size}
if page.cursor:
params["starting_after"] = page.cursor
resp = http.get("/v1/" + resource, params=params)
data = resp.json
next_cursor = None
if data.get("has_more") and len(data["data"]) > 0:
next_cursor = data["data"][-1]["id"]
return {"rows": data["data"], "next": next_cursor}
Offset-based
def fetch(resource, page):
offset = page.cursor or 0
resp = http.get("/v1/" + resource, params={"limit": page.size, "offset": offset})
rows = resp.json["items"]
next_offset = offset + page.size if len(rows) == page.size else None
return {"rows": rows, "next": next_offset}
Date-range
Fetch one time window per page:
def fetch(series, page):
cursor_date = start_date if page.cursor == None else page.cursor
end_date = min(_add_days(cursor_date, 365), yesterday)
resp = http.get("/data/" + series + "/" + cursor_date + "/" + end_date)
rows = [{"date": obs["date"], "value": obs["value"]} for obs in resp.json]
next_cursor = _next_day(end_date) if end_date < yesterday else None
return {"rows": rows, "next": next_cursor}
Complex state (JSON cursor)
When you need to carry multiple values between pages:
next_cursor = json.encode({"url": fetch_url, "token": next_token, "series_idx": 3})
# On next page:
state = json.decode(page.cursor)
Nested and structured data
APIs return nested JSON. Blueprints return it as-is — SQL handles expansion in a downstream model.
json_each — simplest, no schema needed
-- Raw: fetch with arrays as JSON
SELECT id, name, tags::JSON AS tags FROM my_api('users')
-- Staging: expand
SELECT id, name, j.value->>'name' AS tag
FROM raw.users, LATERAL json_each(tags) AS j
json_transform — typed expansion
SELECT id, x.*
FROM raw.users,
LATERAL (SELECT unnest(json_transform(tags,
'[{"name":"VARCHAR","color":"VARCHAR"}]')) AS x)
Arrow operators — direct access
SELECT id, metadata->>'$.address.city' AS city
FROM raw.users
Pivot — normalize then pivot in SQL
When the API returns one row per measurement:
-- Raw: normalized (series, date, value)
SELECT series, date, value FROM riksbank('SEKEURPMI,SEKUSDPMI')
-- Staging: pivoted
SELECT date::DATE,
MAX(CASE WHEN series = 'SEKEURPMI' THEN value END)::DECIMAL AS eur,
MAX(CASE WHEN series = 'SEKUSDPMI' THEN value END)::DECIMAL AS usd
FROM raw.exchange_rates
GROUP BY date
Or with DuckDB’s native PIVOT:
PIVOT raw.exchange_rates ON series USING MAX(value) GROUP BY date::DATE
Incremental loads
Use @incremental to fetch only new data on subsequent runs:
-- models/raw/events.sql
-- @kind: append
-- @incremental: created_at
SELECT * FROM my_api('events')
def fetch(resource, page):
params = {"limit": page.size, "cursor": page.cursor}
if not incremental.is_backfill:
params["created_after"] = incremental.last_value
resp = http.get("/v1/" + resource, params=params)
return {"rows": resp.json["items"], "next": resp.json.get("next")}
| Field | Description |
|---|---|
incremental.is_backfill | True on first run — fetch everything |
incremental.last_value | MAX(cursor_column) from previous run |
incremental.initial_value | Starting value from @incremental_initial |
Authentication
Configured in the API dict. Injected into every http.* call automatically.
# API key
"auth": {"env": "API_KEY"}
# OAuth provider (browser-based)
"auth": {"provider": "hubspot"}
# Basic auth
"auth": {"env_user": "USER", "env_pass": "PASS"}
# Google service account (key file path from .env)
"auth": {"google_key_file_env": "KEY_FILE", "scope": "https://..."}
# Google service account (literal path)
"auth": {"google_key_file": "service-account.json", "scope": "https://..."}
Error handling
Fail — stops the pipeline:
if not resp.ok:
fail("API error: " + str(resp.status_code) + " " + resp.text)
Abort — clean exit, 0 rows, no error:
if start_date > yesterday:
abort()
Retry — automatic. Configure in API dict:
API = {"retry": 3, "backoff": 2, "timeout": 30}
Rate limiting — declare in API dict, runtime handles it:
API = {"rate_limit": {"requests": 2, "per": "1s"}}
Options via JSON arg
For API-specific configuration beyond columns:
FROM gam_report('{"custom_dimensions": [11678108], "currency": "USD"}')
API = {"fetch": {"args": ["options"]}}
def fetch(options, page, columns=[]):
opts = json.decode(options) if options else {}
Reference
- Fetch Contract — complete function spec
- API Dict — all configuration options
- Starlark Modules — http, env, crypto, query, etc.
- Set Up OAuth — browser-based auth setup
- Blueprints — working examples (Mistral OCR, Riksbank, Google Ad Manager)
OndatraSQL