Blueprints Blog Contact About

Runtime Modules

A built-in runtime for writing data pipelines

OndatraSQL includes a built-in runtime for writing data pipelines.

No imports. No dependencies. Everything you need is available by default.

All modules are predeclared globals, available in both model scripts and shared library modules loaded via load().

Mental Model

Scripts in OndatraSQL are not standalone programs.

They run inside a data pipeline runtime:

  • query() reads from DuckDB
  • http fetches external data
  • save.row() writes output
  • incremental provides state

You’re not wiring systems together — you’re writing the pipeline itself.

Core Runtime

query() — Read from Your Pipeline

Use query() to read from any table in your pipeline. Dependencies are detected automatically.

rows = query("SELECT id, name FROM staging.customers WHERE active = true")
for row in rows:
    print(row["id"], row["name"])

Returns a list of dict, where every value is a string. An empty result set returns an empty list.

# CTE queries work too
rows = query("""
    WITH recent AS (
        SELECT * FROM raw.orders
        WHERE created_at > '2026-01-01'
    )
    SELECT customer_id, COUNT(*) AS cnt
    FROM recent
    GROUP BY 1
""")

This enables:

  • Reverse ETL — read from DuckDB, push to external API
  • Enrichment workflows — join API data with existing tables
  • Feedback loops — read, send, write back

Without leaving the pipeline.

Reverse ETL:

rows = query("SELECT * FROM mart.customers WHERE synced = false")
for row in rows:
    http.post("https://api.hubspot.com/contacts", json=row)
    save.row({"id": row["id"], "synced_at": str(time.now())})

Cross-source enrichment:

existing = query("SELECT id, email FROM staging.users")
for user in existing:
    profile = http.get("https://api.example.com/profiles/" + user["id"])
    save.row({"id": user["id"], "email": user["email"], "bio": profile.json.get("bio", "")})

Table references in query() string literals are automatically extracted for DAG ordering. If your script queries staging.customers, OndatraSQL ensures that model runs before yours.

Only string literals are analyzed. Dynamic SQL (query("SELECT * FROM " + table)) won’t be detected — the dependency must be declared manually or restructured as a literal string.

query() rejects mutating statements — INSERT, UPDATE, DELETE, DROP, CREATE, etc. are blocked before execution.

incremental — Pipeline State

Access the state of previous runs. Used for cursor-based ingestion from APIs.

FieldTypeDescription
is_backfillboolTrue if target table doesn’t exist
cursorstringCursor column name
last_valuestringMAX(cursor) from target
last_runstringTimestamp of last commit
initial_valuestringStarting cursor value

The incremental module is read-only state — available directly from library functions without being passed as a parameter. See Incremental Models for details.

# @kind: append
# @incremental: updated_at

url_str = "https://api.example.com/orders"
if not incremental.is_backfill:
    url_str = url_str + "?since=" + incremental.last_value

resp = http.get(url_str)
for order in resp.json:
    save.row(order)

External Access

env

Access environment variables (from .env or shell).

value = env.get("API_KEY")
value = env.get("API_KEY", default="fallback")
env.set("TEMP_VAR", "some_value")
MethodDescription
get(name, default?)Get variable value with optional fallback
set(name, value)Set variable for current session

url

Build and parse URLs.

full = url.build("https://api.com/data", params={"page": "1", "limit": "50"})
# "https://api.com/data?limit=50&page=1"

encoded = url.encode("hello world")
# "hello+world"

params = url.encode_params({"foo": "bar", "baz": "qux"})
# "baz=qux&foo=bar"

parsed = url.parse("https://api.com/data?page=1")
# parsed.scheme == "https", parsed.host == "api.com", parsed.path == "/data"

crypto

Cryptographic functions for hashing, encoding, and signing.

# Base64
encoded = crypto.base64_encode("hello")
decoded = crypto.base64_decode(encoded)

# Hashing
sha = crypto.sha256("data")
md = crypto.md5("data")

# HMAC signing
sig = crypto.hmac_sha256("secret_key", "message")

# UUID generation
id = crypto.uuid()

# Random string (alphanumeric, default length 32)
s = crypto.random_string()
s = crypto.random_string(length=16)

HMAC Request Signing

Use crypto.hmac_sha256 to sign API requests. This is the same mechanism used by AWS Signature v4, Stripe webhook verification, and similar schemes.

# Sign a request payload
secret = env.get("API_SECRET")
timestamp = str(int(time.now().unix))
body = json.encode({"event": "payment", "amount": 100})

signature = crypto.hmac_sha256(secret, timestamp + "." + body)

resp = http.post("https://api.example.com/events",
    json=json.decode(body),
    headers={
        "X-Signature": signature,
        "X-Timestamp": timestamp,
    },
)

Data Handling

xml

Parse and encode XML data.

data = xml.decode('<user><name>Alice</name><age>30</age></user>')
# {"user": {"name": "Alice", "age": "30"}}

output = xml.encode({"user": {"name": "Alice", "age": "30"}})

XML attributes are prefixed with @:

data = xml.decode('<item id="42"><name>Widget</name></item>')
# {"item": {"@id": "42", "name": "Widget"}}
MethodDescription
decode(string)Parse XML string to dict
encode(dict)Convert dict to XML string

csv

Parse and encode CSV data.

# With header row (default) — returns list of dicts
rows = csv.decode("name,age\nAlice,30\nBob,25")
# [{"name": "Alice", "age": "30"}, {"name": "Bob", "age": "25"}]

# Without header — returns list of lists
rows = csv.decode("Alice,30\nBob,25", header=False)

# Tab-separated
rows = csv.decode(data, delimiter="\t")

# Encode list of dicts to CSV
output = csv.encode([{"name": "Alice", "age": "30"}])

# Encode list of lists with explicit header
output = csv.encode([["Alice", "30"]], header=["name", "age"])
MethodDescription
decode(data, delimiter?, header?)Parse CSV string. delimiter defaults to ",", header defaults to True
encode(rows, header?)Encode rows to CSV string. header is optional list of column names

json

JSON encoding and decoding.

s = json.encode({"name": "Alice", "age": 30})
data = json.decode('{"name": "Alice", "age": 30}')

Standard Library

time

Date and time operations. See the full API reference.

now = time.now()
t = time.parse_time("2024-03-15T10:30:00Z")
t = time.time(year=2024, month=3, day=15)

t.year, t.month, t.day, t.hour, t.minute, t.second
t.unix       # Unix timestamp (seconds)
t.format("2006-01-02")  # Go layout

tomorrow = t + time.parse_duration("24h")
yesterday = t - time.parse_duration("24h")

math

Mathematical functions. See the full API reference.

math.ceil(1.5)    # 2
math.floor(1.5)   # 1
math.round(1.5)   # 2
math.sqrt(16)     # 4.0
math.abs(-5)      # 5
math.pow(2, 10)   # 1024.0

math.pi    # 3.141592653589793
math.e     # 2.718281828459045

re

Regular expressions (Python re-compatible). See the full API reference.

m = re.search(r"(\d+)-(\d+)", "order-123-456")
m.group(1)  # "123"

re.findall(r"\d+", "a1b2c3")  # ["1", "2", "3"]
re.split(r"\s+", "hello  world")  # ["hello", "world"]
re.sub(r"\d+", "X", "a1b2c3")  # "aXbXcX"

Putting It Together

A typical ingestion model:

  1. Authenticate (oauth)
  2. Read state (incremental)
  3. Fetch data (http)
  4. Transform (Starlark)
  5. Write output (save)

All inside one runtime.

What You Don’t Need

  • No Python environment
  • No requests library
  • No OAuth libraries
  • No state storage
  • No SDKs

Everything is built in.