Ingest Google Ad Manager reports without building a connector.
No Python connector. No Airflow DAG. No manual cursor tracking.
What This Blueprint Gives You
- Incremental loading by date
- OAuth with Google service account
- Report creation + polling handled automatically
- Dynamic schema from dimensions and metrics
- Reusable source function for multiple GAM reports
Quick Start
1. Set up credentials
Add to .env:
GAM_NETWORK_CODE=12345678
GAM_KEY_FILE=service-account.json
2. Add the source function
Copy lib/gam_report.star into your project.
3. Create a model
# models/raw/gam_report.yaml
kind: append
incremental: report_date
incremental_initial: "2026-02-28"
source: gam_report
config:
network_code: ${GAM_NETWORK_CODE}
key_file: ${GAM_KEY_FILE}
dimensions:
- AD_UNIT_NAME
- DATE
- ORDER_NAME
- LINE_ITEM_NAME
- CREATIVE_NAME
metrics:
- AD_SERVER_IMPRESSIONS
- AD_SERVER_CLICKS
- AD_SERVER_CTR
- AD_SERVER_REVENUE
4. Run
ondatrasql run raw.gam_report
That’s the whole model. Most users only change YAML — not code.
This creates a versioned append table in DuckLake, keyed by report_date for incremental loading.
How It Works
- Authenticates with a Google service account
- Computes the next date range from incremental state
- Creates and runs a GAM report
- Polls until the report is ready
- Fetches rows page by page
- Maps dimensions and metrics into typed output columns
- Emits rows into the pipeline
Customization
Most users never need to touch the source code. Change dimensions, metrics, filters, and date behavior in YAML only.
Simpler report — fewer columns:
# models/raw/gam_simple.yaml
kind: append
incremental: report_date
incremental_initial: "2026-01-01"
source: gam_report
config:
network_code: ${GAM_NETWORK_CODE}
key_file: ${GAM_KEY_FILE}
dimensions:
- DATE
- AD_UNIT_NAME
metrics:
- AD_SERVER_IMPRESSIONS
With filters — only specific orders:
# models/raw/gam_campaign.yaml
kind: append
incremental: report_date
incremental_initial: "2026-01-01"
source: gam_report
config:
network_code: ${GAM_NETWORK_CODE}
key_file: ${GAM_KEY_FILE}
dimensions:
- DATE
- LINE_ITEM_NAME
metrics:
- AD_SERVER_IMPRESSIONS
- AD_SERVER_REVENUE
filters:
- dimension: ORDER_NAME
operator: CONTAINS
values: ["Q1_2026"]
currency_code: "EUR"
Multiple networks — duplicate the YAML with a different network_code:
# models/raw/gam_report_us.yaml
kind: append
incremental: report_date
incremental_initial: "2026-02-28"
source: gam_report
config:
network_code: ${GAM_NETWORK_CODE_US}
key_file: ${GAM_KEY_FILE}
dimensions:
- AD_UNIT_NAME
- DATE
metrics:
- AD_SERVER_IMPRESSIONS
- AD_SERVER_CLICKS
The source function in lib/gam_report.star is shared — no code duplication.
What This Replaces
Traditionally, this workflow requires:
- A custom Python connector
- OAuth/token handling
- Polling logic
- Pagination code
- Incremental state storage
- A scheduler
With OndatraSQL, it’s one source function plus one YAML model.
Output Schema
Column names and types are derived from your config:
| Config | Column | Type |
|---|---|---|
DATE | report_date | VARCHAR (YYYY-MM-DD) |
AD_UNIT_NAME | ad_unit_name | VARCHAR |
ORDER_NAME | order_name | VARCHAR |
LINE_ITEM_NAME | line_item_name | VARCHAR |
CREATIVE_NAME | creative_name | VARCHAR |
AD_SERVER_IMPRESSIONS | impressions | BIGINT |
AD_SERVER_CLICKS | clicks | BIGINT |
AD_SERVER_CTR | ctr | DOUBLE |
AD_SERVER_REVENUE | revenue | DOUBLE |
Dimensions with intValue (non-DATE) become BIGINT. Metrics with intValue become BIGINT, doubleValue become DOUBLE.
Before You Start
You need:
- A GCP project with the Ad Manager API enabled
- A service account JSON key (create under Credentials > Service Account)
- API access enabled in your GAM network (Admin > Global settings, docs)
- The service account email added as an API user in GAM
- Your GAM network code (found under Admin > Global settings)
Full Config Reference
All GAM reportDefinition fields are supported as optional config keys:
config:
network_code: ${GAM_NETWORK_CODE}
key_file: ${GAM_KEY_FILE}
dimensions: [...]
metrics: [...]
report_type: HISTORICAL
# Filtering and sorting
filters:
- dimension: ORDER_NAME
operator: CONTAINS
values: ["Campaign_2026"]
sorts:
- field: AD_SERVER_IMPRESSIONS
descending: true
# Time zone and currency
time_zone_source: PROVIDED
time_zone: "Europe/Stockholm"
currency_code: "SEK"
# Comparison and time periods
time_period_column: QUARTERS
comparison_date_range:
fixed:
start_date: "2025-01-01"
end_date: "2025-12-31"
# Custom dimensions
cms_metadata_dimension_key_ids: ["12345"]
custom_dimension_key_ids: ["67890"]
Most users only need dimensions, metrics, and optionally filters.
Source Function
lib/gam_report.star
The source function handles all API interaction. It maps dimensions and metrics dynamically — column names are derived from the config.
def gam_report(save, network_code="", key_file="service-account.json",
dimensions=None, metrics=None, report_type="HISTORICAL",
filters=None, sorts=None,
time_zone_source=None, time_zone=None, currency_code=None,
time_period_column=None, comparison_date_range=None,
cms_metadata_dimension_key_ids=None, custom_dimension_key_ids=None,
ekv_dimension_key_ids=None, line_item_custom_field_ids=None,
order_custom_field_ids=None, creative_custom_field_ids=None,
flags=None, expanded_compatibility=None):
"""Fetch GAM historical report. Dimensions and metrics are mapped dynamically."""
scope = "https://www.googleapis.com/auth/admanager"
base_url = "https://admanager.googleapis.com/v1/networks/" + network_code
one_day = time.parse_duration("24h")
yesterday = (time.now() - one_day).format("2006-01-02")
start_date = ""
if incremental.is_backfill:
start_date = incremental.initial_value
else:
last = time.parse_time(incremental.last_value + "T00:00:00Z")
start_date = (last + one_day).format("2006-01-02")
end_date = yesterday
print("Range: " + start_date + " to " + end_date +
" (backfill: " + str(incremental.is_backfill) + ")")
if start_date > end_date:
print("Already up to date")
abort()
token = oauth.token(google_key_file=key_file, scope=scope)
headers = {
"Authorization": "Bearer " + token.access_token,
"Content-Type": "application/json",
}
# --- Build report definition ---
start_t = time.parse_time(start_date + "T00:00:00Z")
end_t = time.parse_time(end_date + "T00:00:00Z")
report_def = {
"dimensions": dimensions,
"metrics": metrics,
"dateRange": {
"fixed": {
"startDate": {"year": start_t.year, "month": start_t.month,
"day": start_t.day},
"endDate": {"year": end_t.year, "month": end_t.month,
"day": end_t.day},
},
},
"reportType": report_type,
}
# Optional fields — only include if provided
if filters:
report_def["filters"] = filters
if sorts:
report_def["sorts"] = sorts
if time_zone_source:
report_def["timeZoneSource"] = time_zone_source
if time_zone:
report_def["timeZone"] = time_zone
if currency_code:
report_def["currencyCode"] = currency_code
if time_period_column:
report_def["timePeriodColumn"] = time_period_column
if comparison_date_range:
report_def["comparisonDateRange"] = comparison_date_range
if cms_metadata_dimension_key_ids:
report_def["cmsMetadataDimensionKeyIds"] = cms_metadata_dimension_key_ids
if custom_dimension_key_ids:
report_def["customDimensionKeyIds"] = custom_dimension_key_ids
if ekv_dimension_key_ids:
report_def["ekvDimensionKeyIds"] = ekv_dimension_key_ids
if line_item_custom_field_ids:
report_def["lineItemCustomFieldIds"] = line_item_custom_field_ids
if order_custom_field_ids:
report_def["orderCustomFieldIds"] = order_custom_field_ids
if creative_custom_field_ids:
report_def["creativeCustomFieldIds"] = creative_custom_field_ids
if flags:
report_def["flags"] = flags
if expanded_compatibility != None:
report_def["expandedCompatibility"] = expanded_compatibility
create_body = json.encode({
"reportDefinition": report_def,
"visibility": "HIDDEN",
})
# --- Create report ---
create_resp = http.post(base_url + "/reports", json=create_body,
headers=headers, timeout=60)
if not create_resp.ok:
fail("Create report failed: " + str(create_resp.status_code) +
" " + create_resp.text)
report_name = create_resp.json["name"]
# --- Run report ---
run_resp = http.post(
"https://admanager.googleapis.com/v1/" + report_name + ":run",
json={}, headers=headers, timeout=60,
)
if not run_resp.ok:
fail("Run report failed: " + str(run_resp.status_code) +
" " + run_resp.text)
operation_name = run_resp.json["name"]
# --- Poll until done ---
poll_url = "https://admanager.googleapis.com/v1/" + operation_name
last_poll = None
wait = 2.0
for i in range(60):
sleep(wait)
last_poll = http.get(poll_url, headers=headers)
if not last_poll.ok:
fail("Poll failed: " + str(last_poll.status_code) +
" " + last_poll.text)
if "done" in last_poll.json and last_poll.json["done"]:
break
if i == 59:
fail("Report timed out")
if wait < 10.0:
wait = wait * 2
# --- Column name mapping ---
dim_cols = []
for d in dimensions:
col = d.lower()
if col == "date":
col = "report_date"
dim_cols.append(col)
metric_cols = []
for m in metrics:
col = m.lower()
for prefix in ["ad_server_", "ad_exchange_", "total_"]:
if col.startswith(prefix):
col = col[len(prefix):]
break
metric_cols.append(col)
# --- Fetch result rows ---
result_name = last_poll.json["response"]["reportResult"]
fetch_base = ("https://admanager.googleapis.com/v1/" +
result_name + ":fetchRows")
page_token = ""
while True:
params = {"pageSize": "10000"}
if page_token != "":
params["pageToken"] = page_token
res = http.get(fetch_base, headers=headers, params=params, timeout=120)
if not res.ok:
fail("Fetch rows failed: " + str(res.status_code) +
" " + res.text)
rows = res.json.get("rows")
if rows == None or len(rows) == 0:
break
for row in rows:
dims = row["dimensionValues"]
mets = row["metricValueGroups"][0]["primaryValues"]
record = {}
for i, col in enumerate(dim_cols):
val = dims[i]
if "intValue" in val:
raw = val["intValue"]
if dimensions[i] == "DATE":
record[col] = raw[:4] + "-" + raw[4:6] + "-" + raw[6:8]
else:
record[col] = int(raw)
else:
record[col] = val.get("stringValue", "")
for i, col in enumerate(metric_cols):
val = mets[i]
if "doubleValue" in val:
record[col] = float(val["doubleValue"])
elif "intValue" in val:
record[col] = int(val["intValue"])
else:
record[col] = 0
save.row(record)
page_token = res.json.get("nextPageToken", "")
if page_token == "":
break
print("Saved " + str(save.count()) + " rows")
Using as a Standalone Script
Use this only if you want full control in Starlark. For most cases, YAML is simpler and preferred.
# models/raw/gam_report.star
# @kind: append
# @incremental: report_date
# @incremental_initial: 2026-02-28
load("lib/gam_report.star", "gam_report")
gam_report(
save,
network_code=env.get("GAM_NETWORK_CODE"),
key_file=env.get("GAM_KEY_FILE"),
dimensions=["AD_UNIT_NAME", "DATE", "ORDER_NAME", "LINE_ITEM_NAME", "CREATIVE_NAME"],
metrics=["AD_SERVER_IMPRESSIONS", "AD_SERVER_CLICKS", "AD_SERVER_CTR", "AD_SERVER_REVENUE"],
)
Ondatra Labs