Blueprints Blog Contact About

Event Collection

Built-in event ingestion — no Kafka, no message broker

Ingest events directly over HTTP — no Kafka, no message broker, no infrastructure.

OndatraSQL provides a built-in ingestion endpoint with durable buffering and exactly-once delivery into DuckLake. Just POST events. They are stored, recovered on crash, and safely materialized during pipeline runs.

Mental Model

Your app sends events via HTTP. OndatraSQL buffers them on disk. The pipeline flushes them into DuckLake.

No external systems. No moving parts.

Why This Exists

Most event pipelines require Kafka (or similar), a consumer service, and a data warehouse loader. OndatraSQL replaces all of that with a single binary.

Quick Start

1. Define the Schema

-- models/raw/events.sql
-- @kind: events

event_name VARCHAR NOT NULL,
page_url VARCHAR,
user_id VARCHAR,
received_at TIMESTAMPTZ

2. Start the Daemon

ondatrasql daemon

3. Send Events

curl -X POST localhost:8080/collect/raw/events \
  -d '{"event_name":"pageview","page_url":"/home","user_id":"u42"}'

That’s it. Events are buffered durably on disk.

4. Flush to DuckLake

ondatrasql run

Events are flushed, transformed, and queryable.

Exactly-Once Delivery

Events are inserted into DuckLake in a single transaction. If a crash happens — no data is lost, no duplicates are created.

Crash Recovery

If the system crashes:

  • Events on disk → still there
  • Committed batches → detected and cleaned up
  • Uncommitted batches → retried automatically

No manual intervention required.

When to Use This

Use event collection when:

  • You don’t want Kafka
  • You run on a single machine or small cluster
  • You want simple ingestion without infrastructure

Not Designed For

  • High-scale distributed streaming
  • Real-time sub-second processing
  • Multi-region ingestion

Architecture

Two processes work together:

Daemon — receives events via HTTP, stores them durably on disk. Runs continuously.

Runner — flushes events into DuckLake. Runs as part of the pipeline (cron or manual).

sequenceDiagram participant App as Browser / App participant Daemon as ondatrasql daemon participant Runner as ondatrasql run participant DL as DuckLake App->>Daemon: POST /collect/raw/events Daemon-->>App: 202 Accepted Note over Runner: ondatrasql run Runner->>Daemon: claim events Runner->>DL: BEGIN: INSERT + ack → COMMIT Runner->>Daemon: ack (cleanup)

Flush Protocol

flowchart TD CLAIM["1. Claim batch from buffer"] --> TEMP["2. Load into temp table"] TEMP --> BEGIN["3. BEGIN transaction"] BEGIN --> INSERT["4. INSERT INTO target"] INSERT --> ACK_REC["5. Write ack record"] ACK_REC --> COMMIT["6. COMMIT"] COMMIT --> CRASH_NOTE["crash window — safe: recovery detects ack record"] CRASH_NOTE --> BADGER_ACK["7. Ack buffer (cleanup)"] BADGER_ACK --> DELETE_ACK["8. Delete ack record"] style CLAIM fill:#1a2332,stroke:#4a6fa5,color:#e0e0e0 style TEMP fill:#1a2332,stroke:#4a6fa5,color:#e0e0e0 style BEGIN fill:#1e2d3d,stroke:#3ecfcf,color:#e0e0e0 style INSERT fill:#1e2d3d,stroke:#3ecfcf,color:#e0e0e0 style ACK_REC fill:#1e2d3d,stroke:#3ecfcf,color:#e0e0e0 style COMMIT fill:#1a2332,stroke:#3ecfcf,color:#e0e0e0 style CRASH_NOTE fill:#1e2d3d,stroke:#e8854a,color:#e0e0e0 style BADGER_ACK fill:#1a2332,stroke:#555,color:#e0e0e0 style DELETE_ACK fill:#1a2332,stroke:#555,color:#e0e0e0

Steps 3–6 are one transaction. Steps 7–8 are idempotent cleanup.

Endpoints

Public (COLLECT_PORT, default 8080):

MethodPathDescription
POST/collect/{schema}/{table}Single event
POST/collect/{schema}/{table}/batchBatch (atomic)
GET/healthHealth check

Admin (COLLECT_ADMIN_PORT, default 8081, localhost only):

MethodPathDescription
POST/flush/.../claimClaim events
POST/flush/.../ackAcknowledge flush
POST/flush/.../nackReturn to queue
GET/flush/.../inflightList inflight claims

Validation

NOT NULL columns must be present. Unknown fields are accepted but ignored during flush. received_at is auto-populated with server timestamp if not provided.

Configuration

COLLECT_PORT=9090 COLLECT_ADMIN_PORT=9091 ondatrasql daemon
VariableDefaultDescription
COLLECT_PORT8080Public endpoint
COLLECT_ADMIN_PORTCOLLECT_PORT + 1Internal flush API

Performance

Single machine (Xeon W-2295), ~200 byte payloads:

StageThroughput
HTTP ingest~23,000 events/sec
Flush to DuckLake~12,000 events/sec

No external systems. No cluster.

Full Example

-- models/raw/events.sql
-- @kind: events

event_name VARCHAR NOT NULL,
page_url VARCHAR,
user_id VARCHAR,
session_id VARCHAR,
received_at TIMESTAMPTZ
-- models/staging/sessions.sql
-- @kind: view

SELECT
    session_id, user_id,
    MIN(received_at) AS started_at,
    MAX(received_at) AS ended_at,
    COUNT(*) AS page_views
FROM raw.events
WHERE event_name = 'pageview'
GROUP BY session_id, user_id
-- models/mart/daily_traffic.sql
-- @kind: merge
-- @unique_key: day

SELECT
    DATE_TRUNC('day', started_at) AS day,
    COUNT(DISTINCT session_id) AS sessions,
    COUNT(DISTINCT user_id) AS visitors,
    SUM(page_views) AS total_views
FROM staging.sessions
GROUP BY 1
# Start daemon
ondatrasql daemon &

# Send events
curl -X POST localhost:8080/collect/raw/events \
  -d '{"event_name":"pageview","page_url":"/home","user_id":"u1","session_id":"s1"}'

# Run pipeline
ondatrasql run

# Query results
ondatrasql sql "SELECT * FROM mart.daily_traffic"