← All blueprints

Google Ad Manager

Fetch Google Ad Manager reports incrementally with OndatraSQL. One SQL model, one blueprint, OAuth with Google service account.

advertisinggoogleoauthincremental
Model path models/staging/gam_report.sql
Kind append
Vendor google
Auth oauth
Source type rest-api

Incremental ingestion of Google Ad Manager reports via the REST API. OAuth with Google service account, report creation, polling, and pagination handled by the blueprint. You write SQL.

What This Blueprint Gives You

  • SQL controls the report — your SELECT determines which dimensions and metrics are fetched
  • Normalized types determine the split: string = dimension, anything else = metric
  • JSON options for report type, filters, custom dimensions, currency, timezone, and more
  • Incremental loading by date
  • OAuth with Google service account (configured in API dict)
  • Report creation, async polling, and pagination handled by the runtime

Quick Start

1. Set up credentials

You need a GCP service account with the Ad Manager API enabled, added as an API user in your GAM network (setup guide).

Add to .env:

GAM_NETWORK_CODE=12345678
GAM_KEY_FILE=service-account.json

2. Add the blueprint

Copy lib/gam_report.star into your project.

3. Create models

Raw model — fetches data with GAM column names. Casts control the report:

-- models/raw/gam_report.sql
-- @kind: append
-- @incremental: DATE
-- @incremental_initial: 20260401

SELECT
    AD_UNIT_NAME,
    DATE,
    ORDER_NAME,
    LINE_ITEM_NAME,
    AD_SERVER_IMPRESSIONS::BIGINT AS AD_SERVER_IMPRESSIONS,
    AD_SERVER_CLICKS::BIGINT AS AD_SERVER_CLICKS,
    AD_SERVER_CTR::DOUBLE AS AD_SERVER_CTR,
    AD_SERVER_REVENUE::DOUBLE AS AD_SERVER_REVENUE
FROM gam_report()

No cast → dimension (AD_UNIT_NAME, DATE, ORDER_NAME, LINE_ITEM_NAME). Cast to ::BIGINT or ::DOUBLE → metric. The blueprint reads the normalized types and builds the report request automatically.

Staging model — transforms raw data:

-- models/staging/gam_report.sql
-- @kind: table
-- @constraint: not_null(report_date)

SELECT
    AD_UNIT_NAME AS ad_unit,
    strptime(DATE, '%Y%m%d')::DATE AS report_date,
    ORDER_NAME AS order_name,
    LINE_ITEM_NAME AS line_item,
    AD_SERVER_IMPRESSIONS::BIGINT AS impressions,
    AD_SERVER_CLICKS::BIGINT AS clicks,
    AD_SERVER_CTR::DOUBLE AS ctr,
    AD_SERVER_REVENUE::DOUBLE AS revenue
FROM raw.gam_report

4. Run

ondatrasql run
[OK] raw.gam_report (append, backfill, 27798 rows, 14.9s — first run)
[OK] staging.gam_report (table, backfill, 27798 rows, 308ms — first run)
report_datead_unitorder_nameline_itemimpressionsclicksrevenue
2026-04-01Mobile_1_320x320Prebid-banner-03/02/2026Prebid-banner @ 3.1533109.82
2026-04-01Mobile_4_320x320Prebid-banner-03/02/2026Prebid-banner @ 2.9724806.97
2026-04-01Mobile_2_320x320Prebid-banner-03/02/2026Prebid-banner @ 2.9722206.24

5. Run again — only new dates

ondatrasql run raw.gam_report
[OK] raw.gam_report (append, incremental, 0 rows, 1.0s — incremental run)

No report created when data is up to date.

How Types Flow

The runtime sends normalized types to the blueprint. The blueprint uses them to split columns into dimensions and metrics:

SQLNormalized typeGAM report field
AD_UNIT_NAME (no cast)stringdimensions
DATE (no cast)stringdimensions
AD_SERVER_IMPRESSIONS::BIGINTintegermetrics
AD_SERVER_REVENUE::DOUBLEfloatmetrics

Any GAM dimension or metric works — just add it to your SELECT with the right cast. Full list in the GAM API reference.

Options

Pass report settings as a JSON string. All options are optional — FROM gam_report() works for basic reports.

FROM gam_report('{"custom_dimensions": [11678108], "currency": "USD"}')

Report settings

KeyTypeDefaultDescription
report_typestring"HISTORICAL"GAM report type (HISTORICAL, REACH, etc.)
currencystringnetwork defaultISO 4217 currency code (e.g. "SEK", "USD")
timezonestringnetwork defaultIANA timezone (e.g. "Europe/Stockholm"). Automatically sets timeZoneSource to PROVIDED

Dimension key IDs

KeyTypeDescription
custom_dimensions[int]Custom dimension key IDs. Maps to CUSTOM_DIMENSION_0_VALUE, CUSTOM_DIMENSION_1_VALUE, etc. by index
cms_metadata[int]CMS metadata dimension key IDs
ekv_dimensions[int]EKV (key-value targeting) dimension key IDs

Custom field IDs

KeyTypeDescription
line_item_custom_fields[int]Line item custom field IDs
order_custom_fields[int]Order custom field IDs
creative_custom_fields[int]Creative custom field IDs

Advanced

KeyTypeDescription
filters[object]Server-side row filters. Each filter has field, operation, and values. See GAM Filter reference
flags[object]Report flags (e.g. INCLUDE_ZERO_IMPRESSIONS). See GAM Flag reference
expanded_compatibilityboolEnable expanded metric compatibility

Examples

With currency and timezone:

FROM gam_report('{"currency": "SEK", "timezone": "Europe/Stockholm"}')

With server-side filter:

FROM gam_report('{"filters": [{"field": "AD_UNIT_NAME", "operation": "EQUALS", "values": ["Mobile_1_320x320"]}]}')

With custom dimensions:

FROM gam_report('{"custom_dimensions": [11678108]}')

Custom dimension IDs map to CUSTOM_DIMENSION_0_VALUE, CUSTOM_DIMENSION_1_VALUE, etc. by index:

-- models/raw/gam_with_custom.sql
-- @kind: append
-- @incremental: DATE
-- @incremental_initial: 20260420

SELECT
    AD_UNIT_NAME,
    DATE,
    CUSTOM_DIMENSION_0_VALUE_ID,
    CUSTOM_DIMENSION_0_VALUE,
    AD_SERVER_IMPRESSIONS::BIGINT AS AD_SERVER_IMPRESSIONS,
    AD_SERVER_REVENUE::DOUBLE AS AD_SERVER_REVENUE
FROM gam_report('{"custom_dimensions": [11678108]}')
AD_UNIT_NAMEDATECUSTOM_DIMENSION_0_VALUEAD_SERVER_IMPRESSIONSAD_SERVER_REVENUE
Mobile_1_320x32020260411concept_adform1423.82
Mobile_1_320x32020260411concept_pubmatic982.61
Mobile_1_320x32020260411keymobile761.94

Multiple custom dimensions: [11678108, 22345] maps to CUSTOM_DIMENSION_0_VALUE and CUSTOM_DIMENSION_1_VALUE.

Customization

Fewer columns

Only need impressions and revenue by date?

SELECT DATE, AD_SERVER_IMPRESSIONS::BIGINT, AD_SERVER_REVENUE::DOUBLE
FROM gam_report()

Fewer dimensions = smaller report, faster execution.

Add country breakdown

SELECT
    AD_UNIT_NAME, DATE, COUNTRY_NAME,
    AD_SERVER_IMPRESSIONS::BIGINT, AD_SERVER_REVENUE::DOUBLE
FROM gam_report()

Downstream model — daily summary

-- models/mart/daily_revenue.sql
-- @kind: table

SELECT
    report_date,
    SUM(impressions) AS total_impressions,
    SUM(clicks) AS total_clicks,
    SUM(revenue) AS total_revenue,
    CASE WHEN SUM(impressions) > 0
        THEN SUM(revenue) / SUM(impressions) * 1000
        ELSE 0
    END AS rpm
FROM staging.gam_report
GROUP BY report_date
ORDER BY report_date

All three models participate in the DAG. ondatrasql run executes them in order.

How It Works

  1. Runtime extracts column names and normalized types from your SELECT via DuckDB AST
  2. Columns with type stringdimensions, columns with any other type → metrics
  3. submit() parses JSON options, builds a GAM report definition, creates and runs the report
  4. Runtime polls check() with configurable interval and backoff until the report completes
  5. fetch_result() fetches result rows in pages — GAM returns positional arrays (guaranteed by API), blueprint maps indices back to column names
  6. On incremental runs, only dates after MAX(DATE) are requested. If already up to date, abort() materializes with 0 rows

Blueprint

lib/gam_report.star

RESERVED_COLUMNS = ()

API = {
    "base_url": "https://admanager.googleapis.com",
    "auth": {
        "service_account": {"env": "GAM_KEY_FILE"},
        "scope": "https://www.googleapis.com/auth/admanager",
    },
    "timeout": 120,
    "retry": 3,
    "backoff": 2,
    "fetch": {
        "args": ["options"],
        "page_size": 10000,
        "supported_kinds": ["append"],
        "async": True,
        "poll_interval": "5s",
        "poll_timeout": "5m",
        "poll_backoff": 2,
    },
}

def submit(options="", columns=[], is_backfill=True, last_value="", initial_value=""):
    base = "/v1/networks/" + env.get("GAM_NETWORK_CODE")
    one_day = time.parse_duration("24h")
    yesterday = (time.now() - one_day).format("2006-01-02")

    if is_backfill:
        start_date = _to_iso_date(initial_value)
    else:
        start_date = _next_day(_to_iso_date(last_value))

    if start_date > yesterday:
        abort()

    # Normalized types: string = dimension, anything else = metric
    dims = [c["name"] for c in columns if c.get("type", "string") == "string"]
    mets = [c["name"] for c in columns if c.get("type", "string") != "string"]

    s = time.parse_time(start_date + "T00:00:00Z")
    e = time.parse_time(yesterday + "T00:00:00Z")

    opts = json.decode(options) if options else {}

    report_def = {
        "dimensions": dims,
        "metrics": mets,
        "reportType": opts.get("report_type", "HISTORICAL"),
        "dateRange": {"fixed": {
            "startDate": {"year": s.year, "month": s.month, "day": s.day},
            "endDate":   {"year": e.year, "month": e.month, "day": e.day},
        }},
    }

    if "custom_dimensions" in opts:
        report_def["customDimensionKeyIds"] = [str(id) for id in opts["custom_dimensions"]]
    if "cms_metadata" in opts:
        report_def["cmsMetadataDimensionKeyIds"] = [str(id) for id in opts["cms_metadata"]]
    if "ekv_dimensions" in opts:
        report_def["ekvDimensionKeyIds"] = [str(id) for id in opts["ekv_dimensions"]]
    if "line_item_custom_fields" in opts:
        report_def["lineItemCustomFieldIds"] = [str(id) for id in opts["line_item_custom_fields"]]
    if "order_custom_fields" in opts:
        report_def["orderCustomFieldIds"] = [str(id) for id in opts["order_custom_fields"]]
    if "creative_custom_fields" in opts:
        report_def["creativeCustomFieldIds"] = [str(id) for id in opts["creative_custom_fields"]]
    if "currency" in opts:
        report_def["currencyCode"] = opts["currency"]
    if "timezone" in opts:
        report_def["timeZone"] = opts["timezone"]
        report_def["timeZoneSource"] = "PROVIDED"
    if "filters" in opts:
        report_def["filters"] = opts["filters"]
    if "flags" in opts:
        report_def["flags"] = opts["flags"]
    if "expanded_compatibility" in opts:
        report_def["expandedCompatibility"] = opts["expanded_compatibility"]

    # Create report
    resp = http.post(base + "/reports", json={
        "reportDefinition": report_def,
        "visibility": "HIDDEN",
    })
    if not resp.ok:
        fail("create report: " + resp.text)

    # Run report
    resp = http.post("/v1/" + resp.json["name"] + ":run", json={})
    if not resp.ok:
        fail("run report: " + resp.text)

    return {"operation": resp.json["name"], "dims": dims, "mets": mets}


def check(job_ref):
    resp = http.get("/v1/" + job_ref["operation"])
    if not resp.ok:
        fail("poll report: " + resp.text)

    if not resp.json.get("done"):
        return None

    if "error" in resp.json:
        fail("report failed: " + json.encode(resp.json["error"]))

    result = resp.json["response"]["reportResult"]
    return {
        "fetch_url": "/v1/" + result + ":fetchRows",
        "dims": job_ref["dims"],
        "mets": job_ref["mets"],
    }


def fetch_result(result_ref, page):
    url = result_ref["fetch_url"] + "?pageSize=" + str(page.size)
    if page.cursor != None:
        url += "&pageToken=" + str(page.cursor)

    resp = http.get(url)
    if not resp.ok:
        fail("fetch rows: " + resp.text)

    dims = result_ref["dims"]
    mets = result_ref["mets"]
    rows = []

    for row in (resp.json.get("rows") or []):
        dv = row.get("dimensionValues") or []
        mv = (row.get("metricValueGroups") or [{}])[0].get("primaryValues") or []
        record = {}
        for i, name in enumerate(dims):
            if i < len(dv):
                record[name] = _extract_value(dv[i])
        for i, name in enumerate(mets):
            if i < len(mv):
                record[name] = _extract_value(mv[i])
        rows.append(record)

    return {"rows": rows, "next": resp.json.get("nextPageToken")}


def _extract_value(v):
    # GAM ReportValue: positional mapping guaranteed by API docs.
    # "The order of the dimension values is the same as the order
    # of the dimensions specified in the request."
    if "stringValue" in v:
        return v["stringValue"]
    if "intValue" in v:
        return v["intValue"]
    if "doubleValue" in v:
        return v["doubleValue"]
    if "boolValue" in v:
        return v["boolValue"]
    if "dateValue" in v:
        d = v["dateValue"]
        return "%04d-%02d-%02d" % (d["year"], d["month"], d["day"])
    fail("unknown GAM ReportValue type: " + json.encode(v))


def _to_iso_date(d):
    """Normalize to YYYY-MM-DD. Handles YYYYMMDD (GAM format) and YYYY-MM-DD."""
    if len(d) >= 8 and d[4] != "-":
        return d[:4] + "-" + d[4:6] + "-" + d[6:8]
    return d[:10]


def _next_day(iso_date):
    """Next calendar day. Uses UTC — no DST edge cases."""
    d = time.parse_time(iso_date + "T00:00:00Z")
    return (d + time.parse_duration("24h")).format("2006-01-02")

Notes

  • Authenticationservice_account in the API dict resolves the key file path from .env via {"env": "GAM_KEY_FILE"}. The runtime handles JWT signing and token refresh automatically.
  • Async polling — The runtime handles the poll loop. poll_interval: "5s" with poll_backoff: 2 doubles the wait each cycle (5s, 10s, 20s, capped at 30s). poll_timeout: "5m" aborts if the report doesn’t complete.
  • Pagination — 10,000 rows per page. fetch_result() uses nextPageToken from the GAM response as cursor.
  • Positional mapping — GAM returns dimension and metric values as positional arrays. The API guarantees that the order matches the report definition.
  • Date format — GAM returns dates as YYYYMMDD. Use strptime(DATE, '%Y%m%d')::DATE in the staging model to convert.
  • Revenue — Values are in micro amounts of your network’s currency.
  • Incremental@incremental: DATE tracks MAX(DATE) in the raw table. Only dates after that are fetched on subsequent runs.
  • Type-based split — No hardcoded dimension list. Normalized types determine the split: string columns are dimensions, everything else is metrics. Any GAM dimension or metric works.
  • Runtime kwargs filtering — The blueprint only declares the parameters it uses. The runtime automatically filters kwargs to match — no **kwargs needed.

See Fetch Contract for the complete function spec and API Dict Reference for all configuration options.