Mistral OCR
Extract structured data from PDF invoices using Mistral OCR in OndatraSQL. One SQL model, one blueprint, SELECT controls the extraction schema.
models/raw/invoices.sqlScan a folder of PDFs, extract structured data using Mistral OCR, and materialize the results into DuckLake. Your SELECT controls what gets extracted, what types it gets, and how arrays are expanded.
What This Blueprint Gives You
- Scan PDFs with a glob pattern —
FROM mistral_ocr('data/invoices/*.pdf') - Incremental: only new or changed files hit the API (content hash via
md5_file()) - Types from SQL:
total::DECIMAL→"number",line_items::JSON→"array"in Mistral’s JSON Schema - Array expansion in SQL: use DuckDB’s
json_each()orjson_transform()in a downstream model - Paginated: processes files in batches (default 10) to limit memory
- Cleanup: uploaded files deleted from Mistral after each extraction
- Zero blueprint config — everything is controlled from SQL models
Quick Start
1. Set up credentials
Get an API key at console.mistral.ai. Add to .env:
MISTRAL_API_KEY=your-key-here
2. Add the blueprint
Copy lib/mistral_ocr.star into your project’s lib/ directory.
3. Create a model
For simple flat extraction (one row per PDF):
-- models/raw/invoices.sql
-- @kind: tracked
-- @group_key: source_file
SELECT
source_file,
source_hash,
invoice_number,
vendor,
total::DECIMAL AS total,
date
FROM mistral_ocr('data/invoices/*.pdf')
4. Run
ondatrasql run raw.invoices
ocr: data/invoices/acme-001.pdf
ocr: data/invoices/globex-042.pdf
[OK] raw.invoices (tracked, backfill, 2 rows, 3.1s — first run)
| source_file | invoice_number | vendor | total | date |
|---|---|---|---|---|
| data/invoices/acme-001.pdf | INV-2024-001 | Acme Corp | 1499.70 | 2024-03-15 |
| data/invoices/globex-042.pdf | INV-2024-042 | Globex Inc | 825.00 | 2024-04-01 |
Second run — files unchanged, no API calls:
skip (unchanged): data/invoices/acme-001.pdf
skip (unchanged): data/invoices/globex-042.pdf
[OK] raw.invoices (tracked, incremental, 0 rows, 0.4s — incremental run)
Extracting Line Items
Most invoices have multiple line items per document. Use ::JSON to extract them as arrays, then expand in a downstream model.
Step 1: Raw model — one row per PDF
-- models/raw/invoices.sql
-- @kind: tracked
-- @group_key: source_file
SELECT
source_file,
source_hash,
invoice_number,
vendor,
total::DECIMAL AS total,
date,
line_items::JSON AS line_items
FROM mistral_ocr('data/invoices/*.pdf')
| source_file | invoice_number | vendor | total | date | line_items |
|---|---|---|---|---|---|
| data/invoices/acme-001.pdf | INV-2024-001 | Acme Corp | 1499.70 | 2024-03-15 | [{“item”:“Widget”,“quantity”:10,“price”:49.99},{“item”:“Gadget”,“quantity”:5,“price”:199.96}] |
| data/invoices/globex-042.pdf | INV-2024-042 | Globex Inc | 825.00 | 2024-04-01 | [{“item”:“Consulting”,“quantity”:5,“price”:165.00}] |
One row per PDF. Arrays stored as JSON. Mistral is called once per file.
Step 2: Staging model — one row per line item
-- models/staging/invoice_lines.sql
-- @kind: table
SELECT
invoice_number,
vendor,
date,
j.value->>'item' AS item,
(j.value->>'quantity')::INTEGER AS qty,
(j.value->>'price')::DECIMAL AS price
FROM raw.invoices,
LATERAL json_each(line_items) AS j
| invoice_number | vendor | date | item | qty | price |
|---|---|---|---|---|---|
| INV-2024-001 | Acme Corp | 2024-03-15 | Widget | 10 | 49.99 |
| INV-2024-001 | Acme Corp | 2024-03-15 | Gadget | 5 | 199.96 |
| INV-2024-042 | Globex Inc | 2024-04-01 | Consulting | 5 | 165.00 |
2 PDFs → 2 rows in raw → 3 rows in staging. Invoice-level fields like total stay in the raw model. Line item fields expand into individual rows. No additional API calls — the staging model reads from DuckLake.
Both models participate in the DAG. Run ondatrasql run and the staging model executes automatically after the raw model.
Joining invoice totals with line items
Need both? Join the two models downstream:
-- models/mart/invoice_report.sql
-- @kind: table
SELECT
r.invoice_number,
r.vendor,
r.total AS invoice_total,
r.date,
l.item,
l.qty,
l.price,
l.qty * l.price AS line_total
FROM raw.invoices r
JOIN staging.invoice_lines l ON r.invoice_number = l.invoice_number
| invoice_number | vendor | invoice_total | date | item | qty | price | line_total |
|---|---|---|---|---|---|---|---|
| INV-2024-001 | Acme Corp | 1499.70 | 2024-03-15 | Widget | 10 | 49.99 | 499.90 |
| INV-2024-001 | Acme Corp | 1499.70 | 2024-03-15 | Gadget | 5 | 199.96 | 999.80 |
| INV-2024-042 | Globex Inc | 825.00 | 2024-04-01 | Consulting | 5 | 165.00 | 825.00 |
Three models, zero API calls in the downstream ones. The DAG handles execution order.
How Types Flow
SQL casts control the JSON Schema sent to Mistral. The runtime sends normalized types with a json_schema_type field that the blueprint reads directly:
| SQL | Normalized type | json_schema_type | Mistral returns |
|---|---|---|---|
total::DECIMAL | decimal | "number" | 499.9 (float) |
qty::INTEGER | integer | "integer" | 10 (int) |
line_items::JSON | json | "array" | [{"item": "Widget", ...}] (list) |
date (no cast) | string | "string" | "2024-03-15" (string) |
Mistral returns values matching the JSON Schema types — floats for "number", ints for "integer", lists for "array". No type coercion needed in the blueprint. The runtime casts to DuckDB types at insert.
Alternative Array Syntax
Besides json_each, DuckDB offers other ways to expand JSON:
json_transform + unnest (typed — validates and casts in one step):
SELECT invoice_number, vendor, date, x.*
FROM raw.invoices,
LATERAL (SELECT unnest(json_transform(line_items,
'[{"item":"VARCHAR","quantity":"INTEGER","price":"DOUBLE"}]')) AS x)
Direct arrow extraction (access specific elements without expanding):
SELECT
invoice_number,
line_items->>0->>'item' AS first_item,
(line_items->>0->>'price')::DECIMAL AS first_price
FROM raw.invoices
| invoice_number | first_item | first_price |
|---|---|---|
| INV-2024-001 | Widget | 49.99 |
| INV-2024-042 | Consulting | 165.00 |
More Examples
Receipts (flat — no arrays):
-- source_hash is required for incremental skip logic
SELECT source_file, source_hash,
store, date, total::DECIMAL AS total, payment_method
FROM mistral_ocr('data/receipts/*.pdf')
| source_file | store | date | total | payment_method |
|---|---|---|---|---|
| data/receipts/grocery.pdf | ICA Maxi | 2024-03-20 | 423.50 | Card |
Contracts (flat):
-- source_hash is required for incremental skip logic
SELECT source_file, source_hash,
parties, effective_date, termination_date,
value::DECIMAL AS value
FROM mistral_ocr('data/contracts/*.pdf')
| source_file | parties | effective_date | termination_date | value |
|---|---|---|---|---|
| data/contracts/lease.pdf | Acme Corp, Tenant LLC | 2024-01-01 | 2025-12-31 | 24000.00 |
How It Works
- The blueprint lists files via
glob()— a built-in that returns file paths without reading content - Files are processed in batches of
page_size(default 10) to limit memory - For each file in the batch, the content hash is computed via
md5_file()and compared against existing hashes usinglookup()— unchanged files are skipped - For new or changed files:
- Uploads the PDF to Mistral’s
/v1/filesendpoint - Calls
/v1/ocrwith a JSON Schema built fromjson_schema_typein each column - Deletes the uploaded file from Mistral immediately (best-effort cleanup)
- Parses the structured annotation — values are returned as native types matching the JSON Schema
- Uploads the PDF to Mistral’s
- Blueprint fields (
source_file,source_hash) are set last — they always override any API response - Returns one row per file per batch — the runtime calls fetch again for the next batch
Important Columns
| Column | Purpose |
|---|---|
source_file | File path — used as @group_key for tracked deduplication |
source_hash | md5 of file contents — used by the blueprint to skip unchanged files |
Both should be in your SELECT. Without source_hash, the blueprint processes all files on every run.
Blueprint
lib/mistral_ocr.star
RESERVED_COLUMNS = ("source_file", "source_hash")
API = {
"base_url": "https://api.mistral.ai",
"auth": {"env": "MISTRAL_API_KEY"},
"timeout": 120,
"retry": 2,
"fetch": {
"args": ["pattern"],
"page_size": 10,
"supported_kinds": ["tracked"],
},
}
def fetch(pattern, page, columns=[], target="", is_backfill=True):
files = glob(pattern)
schema = _build_schema(columns)
# Index-based pagination. Assumes the file set is stable for the
# duration of the pipeline run.
start = int(page.cursor) if page.cursor != None else 0
batch = files[start:start + page.size]
known = _load_known_hashes(target, batch, is_backfill)
rows = []
for filepath in batch:
content_hash = md5_file(filepath)
if known.get(filepath, None) == content_hash:
print("skip (unchanged): " + filepath)
continue
print("ocr: " + filepath)
file_id = _upload(filepath)
resp = http.post("/v1/ocr", json={
"model": "mistral-ocr-latest",
"document": {"type": "file", "file_id": file_id},
"document_annotation_format": {
"type": "json_schema",
"json_schema": {"name": "extraction", "schema": schema},
},
})
_cleanup(file_id)
if not resp.ok:
fail("ocr " + filepath + ": " + resp.text)
# Mistral returns annotation as JSON-encoded string inside JSON.
# Sometimes wraps response in a JSON Schema envelope — unwrap if detected.
annotation = json.decode(resp.json["document_annotation"])
if "properties" in annotation and "type" in annotation:
annotation = annotation["properties"]
row = {}
for col in columns:
row[col["name"]] = annotation.get(col["name"], None)
row["source_file"] = filepath
row["source_hash"] = content_hash
rows.append(row)
next_start = start + page.size
next_cursor = next_start if next_start < len(files) else None
return {"rows": rows, "next": next_cursor}
def _build_schema(columns):
"""Build JSON Schema from typed columns using json_schema_type from runtime."""
properties = {}
for col in columns:
name = col["name"]
if name in RESERVED_COLUMNS:
continue
properties[name] = {"type": col.get("json_schema_type", "string")}
return {"type": "object", "properties": properties}
def _load_known_hashes(target, batch, is_backfill):
if is_backfill or not target or not batch:
return {}
return lookup(table=target, key="source_file", value="source_hash", where=batch)
def _upload(filepath):
resp = http.upload("/v1/files", file=filepath, fields={"purpose": "ocr"})
if not resp.ok:
fail("upload " + filepath + ": " + resp.text)
return resp.json["id"]
def _cleanup(file_id):
"""Best-effort cleanup — silent on failure."""
http.delete("/v1/files/" + file_id)
Notes
- Pricing: Mistral OCR costs ~$0.001 per page (1000 pages/$1). Check mistral.ai/pricing for current rates.
- File size: Mistral accepts PDFs up to 50 MB per file.
- Batch size: The blueprint processes 10 files per page by default. Change
page_sizein the API dict to adjust. - Cleanup: Uploaded files are deleted from Mistral immediately after the OCR response is received (best-effort — silent on failure).
- Progress: Each file logs
ocr: <filepath>orskip (unchanged): <filepath>to stderr. - Content hashing: Uses
md5_file()per file — a built-in that computesmd5(read_blob())via DuckDB. Only files in the current batch are hashed. ::JSONcast: Signals “return this field as structured data”. Mistral returns arrays/objects instead of flattened strings. Usejson_each(),json_transform(), or arrow operators in a downstream model to expand.- Runtime kwargs filtering: The blueprint only declares the parameters it uses. The runtime automatically filters kwargs to match.
See Fetch Contract for the complete function spec and API Dict Reference for all configuration options.
OndatraSQL