Blueprints Blog Contact About
← All blueprints

Google Ad Manager

Fetch historical GAM reports incrementally with one YAML model and one shared source function.

advertisinggoogleoauthincremental
Model path models/raw/gam_report.yaml
Kind append
Strategy Incremental by date

Ingest Google Ad Manager reports without building a connector.

No Python connector. No Airflow DAG. No manual cursor tracking.

What This Blueprint Gives You

  • Incremental loading by date
  • OAuth with Google service account
  • Report creation + polling handled automatically
  • Dynamic schema from dimensions and metrics
  • Reusable source function for multiple GAM reports

Quick Start

1. Set up credentials

Add to .env:

GAM_NETWORK_CODE=12345678
GAM_KEY_FILE=service-account.json

2. Add the source function

Copy lib/gam_report.star into your project.

3. Create a model

# models/raw/gam_report.yaml
kind: append
incremental: report_date
incremental_initial: "2026-02-28"
source: gam_report
config:
  network_code: ${GAM_NETWORK_CODE}
  key_file: ${GAM_KEY_FILE}
  dimensions:
    - AD_UNIT_NAME
    - DATE
    - ORDER_NAME
    - LINE_ITEM_NAME
    - CREATIVE_NAME
  metrics:
    - AD_SERVER_IMPRESSIONS
    - AD_SERVER_CLICKS
    - AD_SERVER_CTR
    - AD_SERVER_REVENUE

4. Run

ondatrasql run raw.gam_report

That’s the whole model. Most users only change YAML — not code.

This creates a versioned append table in DuckLake, keyed by report_date for incremental loading.

How It Works

  1. Authenticates with a Google service account
  2. Computes the next date range from incremental state
  3. Creates and runs a GAM report
  4. Polls until the report is ready
  5. Fetches rows page by page
  6. Maps dimensions and metrics into typed output columns
  7. Emits rows into the pipeline

Customization

Most users never need to touch the source code. Change dimensions, metrics, filters, and date behavior in YAML only.

Simpler report — fewer columns:

# models/raw/gam_simple.yaml
kind: append
incremental: report_date
incremental_initial: "2026-01-01"
source: gam_report
config:
  network_code: ${GAM_NETWORK_CODE}
  key_file: ${GAM_KEY_FILE}
  dimensions:
    - DATE
    - AD_UNIT_NAME
  metrics:
    - AD_SERVER_IMPRESSIONS

With filters — only specific orders:

# models/raw/gam_campaign.yaml
kind: append
incremental: report_date
incremental_initial: "2026-01-01"
source: gam_report
config:
  network_code: ${GAM_NETWORK_CODE}
  key_file: ${GAM_KEY_FILE}
  dimensions:
    - DATE
    - LINE_ITEM_NAME
  metrics:
    - AD_SERVER_IMPRESSIONS
    - AD_SERVER_REVENUE
  filters:
    - dimension: ORDER_NAME
      operator: CONTAINS
      values: ["Q1_2026"]
  currency_code: "EUR"

Multiple networks — duplicate the YAML with a different network_code:

# models/raw/gam_report_us.yaml
kind: append
incremental: report_date
incremental_initial: "2026-02-28"
source: gam_report
config:
  network_code: ${GAM_NETWORK_CODE_US}
  key_file: ${GAM_KEY_FILE}
  dimensions:
    - AD_UNIT_NAME
    - DATE
  metrics:
    - AD_SERVER_IMPRESSIONS
    - AD_SERVER_CLICKS

The source function in lib/gam_report.star is shared — no code duplication.

What This Replaces

Traditionally, this workflow requires:

  • A custom Python connector
  • OAuth/token handling
  • Polling logic
  • Pagination code
  • Incremental state storage
  • A scheduler

With OndatraSQL, it’s one source function plus one YAML model.

Output Schema

Column names and types are derived from your config:

ConfigColumnType
DATEreport_dateVARCHAR (YYYY-MM-DD)
AD_UNIT_NAMEad_unit_nameVARCHAR
ORDER_NAMEorder_nameVARCHAR
LINE_ITEM_NAMEline_item_nameVARCHAR
CREATIVE_NAMEcreative_nameVARCHAR
AD_SERVER_IMPRESSIONSimpressionsBIGINT
AD_SERVER_CLICKSclicksBIGINT
AD_SERVER_CTRctrDOUBLE
AD_SERVER_REVENUErevenueDOUBLE

Dimensions with intValue (non-DATE) become BIGINT. Metrics with intValue become BIGINT, doubleValue become DOUBLE.

Before You Start

You need:

  • A GCP project with the Ad Manager API enabled
  • A service account JSON key (create under Credentials > Service Account)
  • API access enabled in your GAM network (Admin > Global settings, docs)
  • The service account email added as an API user in GAM
  • Your GAM network code (found under Admin > Global settings)

Full Config Reference

All GAM reportDefinition fields are supported as optional config keys:

config:
  network_code: ${GAM_NETWORK_CODE}
  key_file: ${GAM_KEY_FILE}
  dimensions: [...]
  metrics: [...]
  report_type: HISTORICAL

  # Filtering and sorting
  filters:
    - dimension: ORDER_NAME
      operator: CONTAINS
      values: ["Campaign_2026"]
  sorts:
    - field: AD_SERVER_IMPRESSIONS
      descending: true

  # Time zone and currency
  time_zone_source: PROVIDED
  time_zone: "Europe/Stockholm"
  currency_code: "SEK"

  # Comparison and time periods
  time_period_column: QUARTERS
  comparison_date_range:
    fixed:
      start_date: "2025-01-01"
      end_date: "2025-12-31"

  # Custom dimensions
  cms_metadata_dimension_key_ids: ["12345"]
  custom_dimension_key_ids: ["67890"]

Most users only need dimensions, metrics, and optionally filters.

Source Function

lib/gam_report.star

The source function handles all API interaction. It maps dimensions and metrics dynamically — column names are derived from the config.

def gam_report(save, network_code="", key_file="service-account.json",
               dimensions=None, metrics=None, report_type="HISTORICAL",
               filters=None, sorts=None,
               time_zone_source=None, time_zone=None, currency_code=None,
               time_period_column=None, comparison_date_range=None,
               cms_metadata_dimension_key_ids=None, custom_dimension_key_ids=None,
               ekv_dimension_key_ids=None, line_item_custom_field_ids=None,
               order_custom_field_ids=None, creative_custom_field_ids=None,
               flags=None, expanded_compatibility=None):
    """Fetch GAM historical report. Dimensions and metrics are mapped dynamically."""
    scope = "https://www.googleapis.com/auth/admanager"
    base_url = "https://admanager.googleapis.com/v1/networks/" + network_code

    one_day = time.parse_duration("24h")
    yesterday = (time.now() - one_day).format("2006-01-02")

    start_date = ""
    if incremental.is_backfill:
        start_date = incremental.initial_value
    else:
        last = time.parse_time(incremental.last_value + "T00:00:00Z")
        start_date = (last + one_day).format("2006-01-02")
    end_date = yesterday

    print("Range: " + start_date + " to " + end_date +
          " (backfill: " + str(incremental.is_backfill) + ")")

    if start_date > end_date:
        print("Already up to date")
        abort()

    token = oauth.token(google_key_file=key_file, scope=scope)
    headers = {
        "Authorization": "Bearer " + token.access_token,
        "Content-Type": "application/json",
    }

    # --- Build report definition ---
    start_t = time.parse_time(start_date + "T00:00:00Z")
    end_t = time.parse_time(end_date + "T00:00:00Z")

    report_def = {
        "dimensions": dimensions,
        "metrics": metrics,
        "dateRange": {
            "fixed": {
                "startDate": {"year": start_t.year, "month": start_t.month,
                              "day": start_t.day},
                "endDate": {"year": end_t.year, "month": end_t.month,
                            "day": end_t.day},
            },
        },
        "reportType": report_type,
    }

    # Optional fields — only include if provided
    if filters:
        report_def["filters"] = filters
    if sorts:
        report_def["sorts"] = sorts
    if time_zone_source:
        report_def["timeZoneSource"] = time_zone_source
    if time_zone:
        report_def["timeZone"] = time_zone
    if currency_code:
        report_def["currencyCode"] = currency_code
    if time_period_column:
        report_def["timePeriodColumn"] = time_period_column
    if comparison_date_range:
        report_def["comparisonDateRange"] = comparison_date_range
    if cms_metadata_dimension_key_ids:
        report_def["cmsMetadataDimensionKeyIds"] = cms_metadata_dimension_key_ids
    if custom_dimension_key_ids:
        report_def["customDimensionKeyIds"] = custom_dimension_key_ids
    if ekv_dimension_key_ids:
        report_def["ekvDimensionKeyIds"] = ekv_dimension_key_ids
    if line_item_custom_field_ids:
        report_def["lineItemCustomFieldIds"] = line_item_custom_field_ids
    if order_custom_field_ids:
        report_def["orderCustomFieldIds"] = order_custom_field_ids
    if creative_custom_field_ids:
        report_def["creativeCustomFieldIds"] = creative_custom_field_ids
    if flags:
        report_def["flags"] = flags
    if expanded_compatibility != None:
        report_def["expandedCompatibility"] = expanded_compatibility

    create_body = json.encode({
        "reportDefinition": report_def,
        "visibility": "HIDDEN",
    })

    # --- Create report ---
    create_resp = http.post(base_url + "/reports", json=create_body,
                            headers=headers, timeout=60)
    if not create_resp.ok:
        fail("Create report failed: " + str(create_resp.status_code) +
             " " + create_resp.text)
    report_name = create_resp.json["name"]

    # --- Run report ---
    run_resp = http.post(
        "https://admanager.googleapis.com/v1/" + report_name + ":run",
        json={}, headers=headers, timeout=60,
    )
    if not run_resp.ok:
        fail("Run report failed: " + str(run_resp.status_code) +
             " " + run_resp.text)
    operation_name = run_resp.json["name"]

    # --- Poll until done ---
    poll_url = "https://admanager.googleapis.com/v1/" + operation_name
    last_poll = None
    wait = 2.0
    for i in range(60):
        sleep(wait)
        last_poll = http.get(poll_url, headers=headers)
        if not last_poll.ok:
            fail("Poll failed: " + str(last_poll.status_code) +
                 " " + last_poll.text)
        if "done" in last_poll.json and last_poll.json["done"]:
            break
        if i == 59:
            fail("Report timed out")
        if wait < 10.0:
            wait = wait * 2

    # --- Column name mapping ---
    dim_cols = []
    for d in dimensions:
        col = d.lower()
        if col == "date":
            col = "report_date"
        dim_cols.append(col)

    metric_cols = []
    for m in metrics:
        col = m.lower()
        for prefix in ["ad_server_", "ad_exchange_", "total_"]:
            if col.startswith(prefix):
                col = col[len(prefix):]
                break
        metric_cols.append(col)

    # --- Fetch result rows ---
    result_name = last_poll.json["response"]["reportResult"]
    fetch_base = ("https://admanager.googleapis.com/v1/" +
                  result_name + ":fetchRows")
    page_token = ""

    while True:
        params = {"pageSize": "10000"}
        if page_token != "":
            params["pageToken"] = page_token

        res = http.get(fetch_base, headers=headers, params=params, timeout=120)
        if not res.ok:
            fail("Fetch rows failed: " + str(res.status_code) +
                 " " + res.text)

        rows = res.json.get("rows")
        if rows == None or len(rows) == 0:
            break

        for row in rows:
            dims = row["dimensionValues"]
            mets = row["metricValueGroups"][0]["primaryValues"]

            record = {}

            for i, col in enumerate(dim_cols):
                val = dims[i]
                if "intValue" in val:
                    raw = val["intValue"]
                    if dimensions[i] == "DATE":
                        record[col] = raw[:4] + "-" + raw[4:6] + "-" + raw[6:8]
                    else:
                        record[col] = int(raw)
                else:
                    record[col] = val.get("stringValue", "")

            for i, col in enumerate(metric_cols):
                val = mets[i]
                if "doubleValue" in val:
                    record[col] = float(val["doubleValue"])
                elif "intValue" in val:
                    record[col] = int(val["intValue"])
                else:
                    record[col] = 0

            save.row(record)

        page_token = res.json.get("nextPageToken", "")
        if page_token == "":
            break

    print("Saved " + str(save.count()) + " rows")

Using as a Standalone Script

Use this only if you want full control in Starlark. For most cases, YAML is simpler and preferred.

# models/raw/gam_report.star
# @kind: append
# @incremental: report_date
# @incremental_initial: 2026-02-28

load("lib/gam_report.star", "gam_report")
gam_report(
    save,
    network_code=env.get("GAM_NETWORK_CODE"),
    key_file=env.get("GAM_KEY_FILE"),
    dimensions=["AD_UNIT_NAME", "DATE", "ORDER_NAME", "LINE_ITEM_NAME", "CREATIVE_NAME"],
    metrics=["AD_SERVER_IMPRESSIONS", "AD_SERVER_CLICKS", "AD_SERVER_CTR", "AD_SERVER_REVENUE"],
)