Runtime Modules
A built-in runtime for writing data pipelines
OndatraSQL includes a built-in runtime for writing data pipelines.
No imports. No dependencies. Everything you need is available by default.
All modules are predeclared globals, available in both model scripts and shared library modules loaded via load().
Mental Model
Scripts in OndatraSQL are not standalone programs.
They run inside a data pipeline runtime:
query()reads from DuckDBhttpfetches external datasave.row()writes outputincrementalprovides state
You’re not wiring systems together — you’re writing the pipeline itself.
Core Runtime
query() — Read from Your Pipeline
Use query() to read from any table in your pipeline. Dependencies are detected automatically.
rows = query("SELECT id, name FROM staging.customers WHERE active = true")
for row in rows:
print(row["id"], row["name"])
Returns a list of dict, where every value is a string. An empty result set returns an empty list.
# CTE queries work too
rows = query("""
WITH recent AS (
SELECT * FROM raw.orders
WHERE created_at > '2026-01-01'
)
SELECT customer_id, COUNT(*) AS cnt
FROM recent
GROUP BY 1
""")
This enables:
- Reverse ETL — read from DuckDB, push to external API
- Enrichment workflows — join API data with existing tables
- Feedback loops — read, send, write back
Without leaving the pipeline.
Reverse ETL:
rows = query("SELECT * FROM mart.customers WHERE synced = false")
for row in rows:
http.post("https://api.hubspot.com/contacts", json=row)
save.row({"id": row["id"], "synced_at": str(time.now())})
Cross-source enrichment:
existing = query("SELECT id, email FROM staging.users")
for user in existing:
profile = http.get("https://api.example.com/profiles/" + user["id"])
save.row({"id": user["id"], "email": user["email"], "bio": profile.json.get("bio", "")})
Table references in query() string literals are automatically extracted for DAG ordering. If your script queries staging.customers, OndatraSQL ensures that model runs before yours.
query("SELECT * FROM " + table)) won’t be detected — the dependency must be declared manually or restructured as a literal string.query() rejects mutating statements — INSERT, UPDATE, DELETE, DROP, CREATE, etc. are blocked before execution.
incremental — Pipeline State
Access the state of previous runs. Used for cursor-based ingestion from APIs.
| Field | Type | Description |
|---|---|---|
is_backfill | bool | True if target table doesn’t exist |
cursor | string | Cursor column name |
last_value | string | MAX(cursor) from target |
last_run | string | Timestamp of last commit |
initial_value | string | Starting cursor value |
The incremental module is read-only state — available directly from library functions without being passed as a parameter. See Incremental Models for details.
# @kind: append
# @incremental: updated_at
url_str = "https://api.example.com/orders"
if not incremental.is_backfill:
url_str = url_str + "?since=" + incremental.last_value
resp = http.get(url_str)
for order in resp.json:
save.row(order)
External Access
env
Access environment variables (from .env or shell).
value = env.get("API_KEY")
value = env.get("API_KEY", default="fallback")
env.set("TEMP_VAR", "some_value")
| Method | Description |
|---|---|
get(name, default?) | Get variable value with optional fallback |
set(name, value) | Set variable for current session |
url
Build and parse URLs.
full = url.build("https://api.com/data", params={"page": "1", "limit": "50"})
# "https://api.com/data?limit=50&page=1"
encoded = url.encode("hello world")
# "hello+world"
params = url.encode_params({"foo": "bar", "baz": "qux"})
# "baz=qux&foo=bar"
parsed = url.parse("https://api.com/data?page=1")
# parsed.scheme == "https", parsed.host == "api.com", parsed.path == "/data"
crypto
Cryptographic functions for hashing, encoding, and signing.
# Base64
encoded = crypto.base64_encode("hello")
decoded = crypto.base64_decode(encoded)
# Hashing
sha = crypto.sha256("data")
md = crypto.md5("data")
# HMAC signing
sig = crypto.hmac_sha256("secret_key", "message")
# UUID generation
id = crypto.uuid()
# Random string (alphanumeric, default length 32)
s = crypto.random_string()
s = crypto.random_string(length=16)
HMAC Request Signing
Use crypto.hmac_sha256 to sign API requests. This is the same mechanism used by AWS Signature v4, Stripe webhook verification, and similar schemes.
# Sign a request payload
secret = env.get("API_SECRET")
timestamp = str(int(time.now().unix))
body = json.encode({"event": "payment", "amount": 100})
signature = crypto.hmac_sha256(secret, timestamp + "." + body)
resp = http.post("https://api.example.com/events",
json=json.decode(body),
headers={
"X-Signature": signature,
"X-Timestamp": timestamp,
},
)
Data Handling
xml
Parse and encode XML data.
data = xml.decode('<user><name>Alice</name><age>30</age></user>')
# {"user": {"name": "Alice", "age": "30"}}
output = xml.encode({"user": {"name": "Alice", "age": "30"}})
XML attributes are prefixed with @:
data = xml.decode('<item id="42"><name>Widget</name></item>')
# {"item": {"@id": "42", "name": "Widget"}}
| Method | Description |
|---|---|
decode(string) | Parse XML string to dict |
encode(dict) | Convert dict to XML string |
csv
Parse and encode CSV data.
# With header row (default) — returns list of dicts
rows = csv.decode("name,age\nAlice,30\nBob,25")
# [{"name": "Alice", "age": "30"}, {"name": "Bob", "age": "25"}]
# Without header — returns list of lists
rows = csv.decode("Alice,30\nBob,25", header=False)
# Tab-separated
rows = csv.decode(data, delimiter="\t")
# Encode list of dicts to CSV
output = csv.encode([{"name": "Alice", "age": "30"}])
# Encode list of lists with explicit header
output = csv.encode([["Alice", "30"]], header=["name", "age"])
| Method | Description |
|---|---|
decode(data, delimiter?, header?) | Parse CSV string. delimiter defaults to ",", header defaults to True |
encode(rows, header?) | Encode rows to CSV string. header is optional list of column names |
json
JSON encoding and decoding.
s = json.encode({"name": "Alice", "age": 30})
data = json.decode('{"name": "Alice", "age": 30}')
Standard Library
time
Date and time operations. See the full API reference.
now = time.now()
t = time.parse_time("2024-03-15T10:30:00Z")
t = time.time(year=2024, month=3, day=15)
t.year, t.month, t.day, t.hour, t.minute, t.second
t.unix # Unix timestamp (seconds)
t.format("2006-01-02") # Go layout
tomorrow = t + time.parse_duration("24h")
yesterday = t - time.parse_duration("24h")
math
Mathematical functions. See the full API reference.
math.ceil(1.5) # 2
math.floor(1.5) # 1
math.round(1.5) # 2
math.sqrt(16) # 4.0
math.abs(-5) # 5
math.pow(2, 10) # 1024.0
math.pi # 3.141592653589793
math.e # 2.718281828459045
re
Regular expressions (Python re-compatible). See the full API reference.
m = re.search(r"(\d+)-(\d+)", "order-123-456")
m.group(1) # "123"
re.findall(r"\d+", "a1b2c3") # ["1", "2", "3"]
re.split(r"\s+", "hello world") # ["hello", "world"]
re.sub(r"\d+", "X", "a1b2c3") # "aXbXcX"
Putting It Together
A typical ingestion model:
- Authenticate (
oauth) - Read state (
incremental) - Fetch data (
http) - Transform (Starlark)
- Write output (
save)
All inside one runtime.
What You Don’t Need
- No Python environment
- No requests library
- No OAuth libraries
- No state storage
- No SDKs
Everything is built in.
Ondatra Labs