Google Ad Manager
Fetch Google Ad Manager reports incrementally with OndatraSQL. One SQL model, one blueprint, OAuth with Google service account.
models/staging/gam_report.sqlIncremental ingestion of Google Ad Manager reports via the REST API. OAuth with Google service account, report creation, polling, and pagination handled by the blueprint. You write SQL.
What This Blueprint Gives You
- SQL controls the report — your SELECT determines which dimensions and metrics are fetched
- Normalized types determine the split:
string= dimension, anything else = metric - JSON options for report type, filters, custom dimensions, currency, timezone, and more
- Incremental loading by date
- OAuth with Google service account (configured in API dict)
- Report creation, async polling, and pagination handled by the runtime
Quick Start
1. Set up credentials
You need a GCP service account with the Ad Manager API enabled, added as an API user in your GAM network (setup guide).
Add to .env:
GAM_NETWORK_CODE=12345678
GAM_KEY_FILE=service-account.json
2. Add the blueprint
Copy lib/gam_report.star into your project.
3. Create models
Raw model — fetches data with GAM column names. Casts control the report:
-- models/raw/gam_report.sql
-- @kind: append
-- @incremental: DATE
-- @incremental_initial: 20260401
SELECT
AD_UNIT_NAME,
DATE,
ORDER_NAME,
LINE_ITEM_NAME,
AD_SERVER_IMPRESSIONS::BIGINT AS AD_SERVER_IMPRESSIONS,
AD_SERVER_CLICKS::BIGINT AS AD_SERVER_CLICKS,
AD_SERVER_CTR::DOUBLE AS AD_SERVER_CTR,
AD_SERVER_REVENUE::DOUBLE AS AD_SERVER_REVENUE
FROM gam_report()
No cast → dimension (AD_UNIT_NAME, DATE, ORDER_NAME, LINE_ITEM_NAME). Cast to ::BIGINT or ::DOUBLE → metric. The blueprint reads the normalized types and builds the report request automatically.
Staging model — transforms raw data:
-- models/staging/gam_report.sql
-- @kind: table
-- @constraint: not_null(report_date)
SELECT
AD_UNIT_NAME AS ad_unit,
strptime(DATE, '%Y%m%d')::DATE AS report_date,
ORDER_NAME AS order_name,
LINE_ITEM_NAME AS line_item,
AD_SERVER_IMPRESSIONS::BIGINT AS impressions,
AD_SERVER_CLICKS::BIGINT AS clicks,
AD_SERVER_CTR::DOUBLE AS ctr,
AD_SERVER_REVENUE::DOUBLE AS revenue
FROM raw.gam_report
4. Run
ondatrasql run
[OK] raw.gam_report (append, backfill, 27798 rows, 14.9s — first run)
[OK] staging.gam_report (table, backfill, 27798 rows, 308ms — first run)
| report_date | ad_unit | order_name | line_item | impressions | clicks | revenue |
|---|---|---|---|---|---|---|
| 2026-04-01 | Mobile_1_320x320 | Prebid-banner-03/02/2026 | Prebid-banner @ 3.15 | 331 | 0 | 9.82 |
| 2026-04-01 | Mobile_4_320x320 | Prebid-banner-03/02/2026 | Prebid-banner @ 2.97 | 248 | 0 | 6.97 |
| 2026-04-01 | Mobile_2_320x320 | Prebid-banner-03/02/2026 | Prebid-banner @ 2.97 | 222 | 0 | 6.24 |
5. Run again — only new dates
ondatrasql run raw.gam_report
[OK] raw.gam_report (append, incremental, 0 rows, 1.0s — incremental run)
No report created when data is up to date.
How Types Flow
The runtime sends normalized types to the blueprint. The blueprint uses them to split columns into dimensions and metrics:
| SQL | Normalized type | GAM report field |
|---|---|---|
AD_UNIT_NAME (no cast) | string | dimensions |
DATE (no cast) | string | dimensions |
AD_SERVER_IMPRESSIONS::BIGINT | integer | metrics |
AD_SERVER_REVENUE::DOUBLE | float | metrics |
Any GAM dimension or metric works — just add it to your SELECT with the right cast. Full list in the GAM API reference.
Options
Pass report settings as a JSON string. All options are optional — FROM gam_report() works for basic reports.
FROM gam_report('{"custom_dimensions": [11678108], "currency": "USD"}')
Report settings
| Key | Type | Default | Description |
|---|---|---|---|
report_type | string | "HISTORICAL" | GAM report type (HISTORICAL, REACH, etc.) |
currency | string | network default | ISO 4217 currency code (e.g. "SEK", "USD") |
timezone | string | network default | IANA timezone (e.g. "Europe/Stockholm"). Automatically sets timeZoneSource to PROVIDED |
Dimension key IDs
| Key | Type | Description |
|---|---|---|
custom_dimensions | [int] | Custom dimension key IDs. Maps to CUSTOM_DIMENSION_0_VALUE, CUSTOM_DIMENSION_1_VALUE, etc. by index |
cms_metadata | [int] | CMS metadata dimension key IDs |
ekv_dimensions | [int] | EKV (key-value targeting) dimension key IDs |
Custom field IDs
| Key | Type | Description |
|---|---|---|
line_item_custom_fields | [int] | Line item custom field IDs |
order_custom_fields | [int] | Order custom field IDs |
creative_custom_fields | [int] | Creative custom field IDs |
Advanced
| Key | Type | Description |
|---|---|---|
filters | [object] | Server-side row filters. Each filter has field, operation, and values. See GAM Filter reference |
flags | [object] | Report flags (e.g. INCLUDE_ZERO_IMPRESSIONS). See GAM Flag reference |
expanded_compatibility | bool | Enable expanded metric compatibility |
Examples
With currency and timezone:
FROM gam_report('{"currency": "SEK", "timezone": "Europe/Stockholm"}')
With server-side filter:
FROM gam_report('{"filters": [{"field": "AD_UNIT_NAME", "operation": "EQUALS", "values": ["Mobile_1_320x320"]}]}')
With custom dimensions:
FROM gam_report('{"custom_dimensions": [11678108]}')
Custom dimension IDs map to CUSTOM_DIMENSION_0_VALUE, CUSTOM_DIMENSION_1_VALUE, etc. by index:
-- models/raw/gam_with_custom.sql
-- @kind: append
-- @incremental: DATE
-- @incremental_initial: 20260420
SELECT
AD_UNIT_NAME,
DATE,
CUSTOM_DIMENSION_0_VALUE_ID,
CUSTOM_DIMENSION_0_VALUE,
AD_SERVER_IMPRESSIONS::BIGINT AS AD_SERVER_IMPRESSIONS,
AD_SERVER_REVENUE::DOUBLE AS AD_SERVER_REVENUE
FROM gam_report('{"custom_dimensions": [11678108]}')
| AD_UNIT_NAME | DATE | CUSTOM_DIMENSION_0_VALUE | AD_SERVER_IMPRESSIONS | AD_SERVER_REVENUE |
|---|---|---|---|---|
| Mobile_1_320x320 | 20260411 | concept_adform | 142 | 3.82 |
| Mobile_1_320x320 | 20260411 | concept_pubmatic | 98 | 2.61 |
| Mobile_1_320x320 | 20260411 | keymobile | 76 | 1.94 |
Multiple custom dimensions: [11678108, 22345] maps to CUSTOM_DIMENSION_0_VALUE and CUSTOM_DIMENSION_1_VALUE.
Customization
Fewer columns
Only need impressions and revenue by date?
SELECT DATE, AD_SERVER_IMPRESSIONS::BIGINT, AD_SERVER_REVENUE::DOUBLE
FROM gam_report()
Fewer dimensions = smaller report, faster execution.
Add country breakdown
SELECT
AD_UNIT_NAME, DATE, COUNTRY_NAME,
AD_SERVER_IMPRESSIONS::BIGINT, AD_SERVER_REVENUE::DOUBLE
FROM gam_report()
Downstream model — daily summary
-- models/mart/daily_revenue.sql
-- @kind: table
SELECT
report_date,
SUM(impressions) AS total_impressions,
SUM(clicks) AS total_clicks,
SUM(revenue) AS total_revenue,
CASE WHEN SUM(impressions) > 0
THEN SUM(revenue) / SUM(impressions) * 1000
ELSE 0
END AS rpm
FROM staging.gam_report
GROUP BY report_date
ORDER BY report_date
All three models participate in the DAG. ondatrasql run executes them in order.
How It Works
- Runtime extracts column names and normalized types from your SELECT via DuckDB AST
- Columns with type
string→dimensions, columns with any other type →metrics submit()parses JSON options, builds a GAM report definition, creates and runs the report- Runtime polls
check()with configurable interval and backoff until the report completes fetch_result()fetches result rows in pages — GAM returns positional arrays (guaranteed by API), blueprint maps indices back to column names- On incremental runs, only dates after
MAX(DATE)are requested. If already up to date,abort()materializes with 0 rows
Blueprint
lib/gam_report.star
RESERVED_COLUMNS = ()
API = {
"base_url": "https://admanager.googleapis.com",
"auth": {
"service_account": {"env": "GAM_KEY_FILE"},
"scope": "https://www.googleapis.com/auth/admanager",
},
"timeout": 120,
"retry": 3,
"backoff": 2,
"fetch": {
"args": ["options"],
"page_size": 10000,
"supported_kinds": ["append"],
"async": True,
"poll_interval": "5s",
"poll_timeout": "5m",
"poll_backoff": 2,
},
}
def submit(options="", columns=[], is_backfill=True, last_value="", initial_value=""):
base = "/v1/networks/" + env.get("GAM_NETWORK_CODE")
one_day = time.parse_duration("24h")
yesterday = (time.now() - one_day).format("2006-01-02")
if is_backfill:
start_date = _to_iso_date(initial_value)
else:
start_date = _next_day(_to_iso_date(last_value))
if start_date > yesterday:
abort()
# Normalized types: string = dimension, anything else = metric
dims = [c["name"] for c in columns if c.get("type", "string") == "string"]
mets = [c["name"] for c in columns if c.get("type", "string") != "string"]
s = time.parse_time(start_date + "T00:00:00Z")
e = time.parse_time(yesterday + "T00:00:00Z")
opts = json.decode(options) if options else {}
report_def = {
"dimensions": dims,
"metrics": mets,
"reportType": opts.get("report_type", "HISTORICAL"),
"dateRange": {"fixed": {
"startDate": {"year": s.year, "month": s.month, "day": s.day},
"endDate": {"year": e.year, "month": e.month, "day": e.day},
}},
}
if "custom_dimensions" in opts:
report_def["customDimensionKeyIds"] = [str(id) for id in opts["custom_dimensions"]]
if "cms_metadata" in opts:
report_def["cmsMetadataDimensionKeyIds"] = [str(id) for id in opts["cms_metadata"]]
if "ekv_dimensions" in opts:
report_def["ekvDimensionKeyIds"] = [str(id) for id in opts["ekv_dimensions"]]
if "line_item_custom_fields" in opts:
report_def["lineItemCustomFieldIds"] = [str(id) for id in opts["line_item_custom_fields"]]
if "order_custom_fields" in opts:
report_def["orderCustomFieldIds"] = [str(id) for id in opts["order_custom_fields"]]
if "creative_custom_fields" in opts:
report_def["creativeCustomFieldIds"] = [str(id) for id in opts["creative_custom_fields"]]
if "currency" in opts:
report_def["currencyCode"] = opts["currency"]
if "timezone" in opts:
report_def["timeZone"] = opts["timezone"]
report_def["timeZoneSource"] = "PROVIDED"
if "filters" in opts:
report_def["filters"] = opts["filters"]
if "flags" in opts:
report_def["flags"] = opts["flags"]
if "expanded_compatibility" in opts:
report_def["expandedCompatibility"] = opts["expanded_compatibility"]
# Create report
resp = http.post(base + "/reports", json={
"reportDefinition": report_def,
"visibility": "HIDDEN",
})
if not resp.ok:
fail("create report: " + resp.text)
# Run report
resp = http.post("/v1/" + resp.json["name"] + ":run", json={})
if not resp.ok:
fail("run report: " + resp.text)
return {"operation": resp.json["name"], "dims": dims, "mets": mets}
def check(job_ref):
resp = http.get("/v1/" + job_ref["operation"])
if not resp.ok:
fail("poll report: " + resp.text)
if not resp.json.get("done"):
return None
if "error" in resp.json:
fail("report failed: " + json.encode(resp.json["error"]))
result = resp.json["response"]["reportResult"]
return {
"fetch_url": "/v1/" + result + ":fetchRows",
"dims": job_ref["dims"],
"mets": job_ref["mets"],
}
def fetch_result(result_ref, page):
url = result_ref["fetch_url"] + "?pageSize=" + str(page.size)
if page.cursor != None:
url += "&pageToken=" + str(page.cursor)
resp = http.get(url)
if not resp.ok:
fail("fetch rows: " + resp.text)
dims = result_ref["dims"]
mets = result_ref["mets"]
rows = []
for row in (resp.json.get("rows") or []):
dv = row.get("dimensionValues") or []
mv = (row.get("metricValueGroups") or [{}])[0].get("primaryValues") or []
record = {}
for i, name in enumerate(dims):
if i < len(dv):
record[name] = _extract_value(dv[i])
for i, name in enumerate(mets):
if i < len(mv):
record[name] = _extract_value(mv[i])
rows.append(record)
return {"rows": rows, "next": resp.json.get("nextPageToken")}
def _extract_value(v):
# GAM ReportValue: positional mapping guaranteed by API docs.
# "The order of the dimension values is the same as the order
# of the dimensions specified in the request."
if "stringValue" in v:
return v["stringValue"]
if "intValue" in v:
return v["intValue"]
if "doubleValue" in v:
return v["doubleValue"]
if "boolValue" in v:
return v["boolValue"]
if "dateValue" in v:
d = v["dateValue"]
return "%04d-%02d-%02d" % (d["year"], d["month"], d["day"])
fail("unknown GAM ReportValue type: " + json.encode(v))
def _to_iso_date(d):
"""Normalize to YYYY-MM-DD. Handles YYYYMMDD (GAM format) and YYYY-MM-DD."""
if len(d) >= 8 and d[4] != "-":
return d[:4] + "-" + d[4:6] + "-" + d[6:8]
return d[:10]
def _next_day(iso_date):
"""Next calendar day. Uses UTC — no DST edge cases."""
d = time.parse_time(iso_date + "T00:00:00Z")
return (d + time.parse_duration("24h")).format("2006-01-02")
Notes
- Authentication —
service_accountin the API dict resolves the key file path from.envvia{"env": "GAM_KEY_FILE"}. The runtime handles JWT signing and token refresh automatically. - Async polling — The runtime handles the poll loop.
poll_interval: "5s"withpoll_backoff: 2doubles the wait each cycle (5s, 10s, 20s, capped at 30s).poll_timeout: "5m"aborts if the report doesn’t complete. - Pagination — 10,000 rows per page.
fetch_result()usesnextPageTokenfrom the GAM response as cursor. - Positional mapping — GAM returns dimension and metric values as positional arrays. The API guarantees that the order matches the report definition.
- Date format — GAM returns dates as
YYYYMMDD. Usestrptime(DATE, '%Y%m%d')::DATEin the staging model to convert. - Revenue — Values are in micro amounts of your network’s currency.
- Incremental —
@incremental: DATEtracksMAX(DATE)in the raw table. Only dates after that are fetched on subsequent runs. - Type-based split — No hardcoded dimension list. Normalized types determine the split:
stringcolumns are dimensions, everything else is metrics. Any GAM dimension or metric works. - Runtime kwargs filtering — The blueprint only declares the parameters it uses. The runtime automatically filters kwargs to match — no
**kwargsneeded.
See Fetch Contract for the complete function spec and API Dict Reference for all configuration options.
OndatraSQL