← All blueprints

Mistral OCR

Extract structured data from PDF invoices using Mistral OCR in OndatraSQL. One SQL model, one blueprint, SELECT controls the extraction schema.

aiocrpdfmistral
Model path models/raw/invoices.sql
Kind tracked
Vendor mistral
Auth api-key
Source type file

Scan a folder of PDFs, extract structured data using Mistral OCR, and materialize the results into DuckLake. Your SELECT controls what gets extracted, what types it gets, and how arrays are expanded.

What This Blueprint Gives You

  • Scan PDFs with a glob pattern — FROM mistral_ocr('data/invoices/*.pdf')
  • Incremental: only new or changed files hit the API (content hash via md5_file())
  • Types from SQL: total::DECIMAL"number", line_items::JSON"array" in Mistral’s JSON Schema
  • Array expansion in SQL: use DuckDB’s json_each() or json_transform() in a downstream model
  • Paginated: processes files in batches (default 10) to limit memory
  • Cleanup: uploaded files deleted from Mistral after each extraction
  • Zero blueprint config — everything is controlled from SQL models

Quick Start

1. Set up credentials

Get an API key at console.mistral.ai. Add to .env:

MISTRAL_API_KEY=your-key-here

2. Add the blueprint

Copy lib/mistral_ocr.star into your project’s lib/ directory.

3. Create a model

For simple flat extraction (one row per PDF):

-- models/raw/invoices.sql
-- @kind: tracked
-- @group_key: source_file

SELECT
    source_file,
    source_hash,
    invoice_number,
    vendor,
    total::DECIMAL AS total,
    date
FROM mistral_ocr('data/invoices/*.pdf')

4. Run

ondatrasql run raw.invoices
ocr: data/invoices/acme-001.pdf
ocr: data/invoices/globex-042.pdf
[OK] raw.invoices (tracked, backfill, 2 rows, 3.1s — first run)
source_fileinvoice_numbervendortotaldate
data/invoices/acme-001.pdfINV-2024-001Acme Corp1499.702024-03-15
data/invoices/globex-042.pdfINV-2024-042Globex Inc825.002024-04-01

Second run — files unchanged, no API calls:

skip (unchanged): data/invoices/acme-001.pdf
skip (unchanged): data/invoices/globex-042.pdf
[OK] raw.invoices (tracked, incremental, 0 rows, 0.4s — incremental run)

Extracting Line Items

Most invoices have multiple line items per document. Use ::JSON to extract them as arrays, then expand in a downstream model.

Step 1: Raw model — one row per PDF

-- models/raw/invoices.sql
-- @kind: tracked
-- @group_key: source_file

SELECT
    source_file,
    source_hash,
    invoice_number,
    vendor,
    total::DECIMAL AS total,
    date,
    line_items::JSON AS line_items
FROM mistral_ocr('data/invoices/*.pdf')
source_fileinvoice_numbervendortotaldateline_items
data/invoices/acme-001.pdfINV-2024-001Acme Corp1499.702024-03-15[{“item”:“Widget”,“quantity”:10,“price”:49.99},{“item”:“Gadget”,“quantity”:5,“price”:199.96}]
data/invoices/globex-042.pdfINV-2024-042Globex Inc825.002024-04-01[{“item”:“Consulting”,“quantity”:5,“price”:165.00}]

One row per PDF. Arrays stored as JSON. Mistral is called once per file.

Step 2: Staging model — one row per line item

-- models/staging/invoice_lines.sql
-- @kind: table

SELECT
    invoice_number,
    vendor,
    date,
    j.value->>'item' AS item,
    (j.value->>'quantity')::INTEGER AS qty,
    (j.value->>'price')::DECIMAL AS price
FROM raw.invoices,
LATERAL json_each(line_items) AS j
invoice_numbervendordateitemqtyprice
INV-2024-001Acme Corp2024-03-15Widget1049.99
INV-2024-001Acme Corp2024-03-15Gadget5199.96
INV-2024-042Globex Inc2024-04-01Consulting5165.00

2 PDFs → 2 rows in raw → 3 rows in staging. Invoice-level fields like total stay in the raw model. Line item fields expand into individual rows. No additional API calls — the staging model reads from DuckLake.

Both models participate in the DAG. Run ondatrasql run and the staging model executes automatically after the raw model.

Joining invoice totals with line items

Need both? Join the two models downstream:

-- models/mart/invoice_report.sql
-- @kind: table

SELECT
    r.invoice_number,
    r.vendor,
    r.total AS invoice_total,
    r.date,
    l.item,
    l.qty,
    l.price,
    l.qty * l.price AS line_total
FROM raw.invoices r
JOIN staging.invoice_lines l ON r.invoice_number = l.invoice_number
invoice_numbervendorinvoice_totaldateitemqtypriceline_total
INV-2024-001Acme Corp1499.702024-03-15Widget1049.99499.90
INV-2024-001Acme Corp1499.702024-03-15Gadget5199.96999.80
INV-2024-042Globex Inc825.002024-04-01Consulting5165.00825.00

Three models, zero API calls in the downstream ones. The DAG handles execution order.

How Types Flow

SQL casts control the JSON Schema sent to Mistral. The runtime sends normalized types with a json_schema_type field that the blueprint reads directly:

SQLNormalized typejson_schema_typeMistral returns
total::DECIMALdecimal"number"499.9 (float)
qty::INTEGERinteger"integer"10 (int)
line_items::JSONjson"array"[{"item": "Widget", ...}] (list)
date (no cast)string"string""2024-03-15" (string)

Mistral returns values matching the JSON Schema types — floats for "number", ints for "integer", lists for "array". No type coercion needed in the blueprint. The runtime casts to DuckDB types at insert.

Alternative Array Syntax

Besides json_each, DuckDB offers other ways to expand JSON:

json_transform + unnest (typed — validates and casts in one step):

SELECT invoice_number, vendor, date, x.*
FROM raw.invoices,
LATERAL (SELECT unnest(json_transform(line_items,
    '[{"item":"VARCHAR","quantity":"INTEGER","price":"DOUBLE"}]')) AS x)

Direct arrow extraction (access specific elements without expanding):

SELECT
    invoice_number,
    line_items->>0->>'item' AS first_item,
    (line_items->>0->>'price')::DECIMAL AS first_price
FROM raw.invoices
invoice_numberfirst_itemfirst_price
INV-2024-001Widget49.99
INV-2024-042Consulting165.00

More Examples

Receipts (flat — no arrays):

-- source_hash is required for incremental skip logic
SELECT source_file, source_hash,
    store, date, total::DECIMAL AS total, payment_method
FROM mistral_ocr('data/receipts/*.pdf')
source_filestoredatetotalpayment_method
data/receipts/grocery.pdfICA Maxi2024-03-20423.50Card

Contracts (flat):

-- source_hash is required for incremental skip logic
SELECT source_file, source_hash,
    parties, effective_date, termination_date,
    value::DECIMAL AS value
FROM mistral_ocr('data/contracts/*.pdf')
source_filepartieseffective_datetermination_datevalue
data/contracts/lease.pdfAcme Corp, Tenant LLC2024-01-012025-12-3124000.00

How It Works

  1. The blueprint lists files via glob() — a built-in that returns file paths without reading content
  2. Files are processed in batches of page_size (default 10) to limit memory
  3. For each file in the batch, the content hash is computed via md5_file() and compared against existing hashes using lookup() — unchanged files are skipped
  4. For new or changed files:
    • Uploads the PDF to Mistral’s /v1/files endpoint
    • Calls /v1/ocr with a JSON Schema built from json_schema_type in each column
    • Deletes the uploaded file from Mistral immediately (best-effort cleanup)
    • Parses the structured annotation — values are returned as native types matching the JSON Schema
  5. Blueprint fields (source_file, source_hash) are set last — they always override any API response
  6. Returns one row per file per batch — the runtime calls fetch again for the next batch

Important Columns

ColumnPurpose
source_fileFile path — used as @group_key for tracked deduplication
source_hashmd5 of file contents — used by the blueprint to skip unchanged files

Both should be in your SELECT. Without source_hash, the blueprint processes all files on every run.

Blueprint

lib/mistral_ocr.star

RESERVED_COLUMNS = ("source_file", "source_hash")

API = {
    "base_url": "https://api.mistral.ai",
    "auth": {"env": "MISTRAL_API_KEY"},
    "timeout": 120,
    "retry": 2,
    "fetch": {
        "args": ["pattern"],
        "page_size": 10,
        "supported_kinds": ["tracked"],
    },
}

def fetch(pattern, page, columns=[], target="", is_backfill=True):
    files = glob(pattern)
    schema = _build_schema(columns)

    # Index-based pagination. Assumes the file set is stable for the
    # duration of the pipeline run.
    start = int(page.cursor) if page.cursor != None else 0
    batch = files[start:start + page.size]
    known = _load_known_hashes(target, batch, is_backfill)
    rows = []

    for filepath in batch:
        content_hash = md5_file(filepath)
        if known.get(filepath, None) == content_hash:
            print("skip (unchanged): " + filepath)
            continue

        print("ocr: " + filepath)
        file_id = _upload(filepath)
        resp = http.post("/v1/ocr", json={
            "model": "mistral-ocr-latest",
            "document": {"type": "file", "file_id": file_id},
            "document_annotation_format": {
                "type": "json_schema",
                "json_schema": {"name": "extraction", "schema": schema},
            },
        })
        _cleanup(file_id)

        if not resp.ok:
            fail("ocr " + filepath + ": " + resp.text)

        # Mistral returns annotation as JSON-encoded string inside JSON.
        # Sometimes wraps response in a JSON Schema envelope — unwrap if detected.
        annotation = json.decode(resp.json["document_annotation"])
        if "properties" in annotation and "type" in annotation:
            annotation = annotation["properties"]

        row = {}
        for col in columns:
            row[col["name"]] = annotation.get(col["name"], None)
        row["source_file"] = filepath
        row["source_hash"] = content_hash
        rows.append(row)

    next_start = start + page.size
    next_cursor = next_start if next_start < len(files) else None

    return {"rows": rows, "next": next_cursor}


def _build_schema(columns):
    """Build JSON Schema from typed columns using json_schema_type from runtime."""
    properties = {}
    for col in columns:
        name = col["name"]
        if name in RESERVED_COLUMNS:
            continue
        properties[name] = {"type": col.get("json_schema_type", "string")}
    return {"type": "object", "properties": properties}


def _load_known_hashes(target, batch, is_backfill):
    if is_backfill or not target or not batch:
        return {}
    return lookup(table=target, key="source_file", value="source_hash", where=batch)


def _upload(filepath):
    resp = http.upload("/v1/files", file=filepath, fields={"purpose": "ocr"})
    if not resp.ok:
        fail("upload " + filepath + ": " + resp.text)
    return resp.json["id"]


def _cleanup(file_id):
    """Best-effort cleanup — silent on failure."""
    http.delete("/v1/files/" + file_id)

Notes

  • Pricing: Mistral OCR costs ~$0.001 per page (1000 pages/$1). Check mistral.ai/pricing for current rates.
  • File size: Mistral accepts PDFs up to 50 MB per file.
  • Batch size: The blueprint processes 10 files per page by default. Change page_size in the API dict to adjust.
  • Cleanup: Uploaded files are deleted from Mistral immediately after the OCR response is received (best-effort — silent on failure).
  • Progress: Each file logs ocr: <filepath> or skip (unchanged): <filepath> to stderr.
  • Content hashing: Uses md5_file() per file — a built-in that computes md5(read_blob()) via DuckDB. Only files in the current batch are hashed.
  • ::JSON cast: Signals “return this field as structured data”. Mistral returns arrays/objects instead of flattened strings. Use json_each(), json_transform(), or arrow operators in a downstream model to expand.
  • Runtime kwargs filtering: The blueprint only declares the parameters it uses. The runtime automatically filters kwargs to match.

See Fetch Contract for the complete function spec and API Dict Reference for all configuration options.