Skip to content

Backfill & Dedup Strategy

Consumer-graph-worker ingests purchase data via two Kafka consumers (receipts + factoids) gated by the consumer_graph_ingestion feature flag. An SQS-based user load path exists (calls Purchase History API / Neli for 365-day history), but there is no batch tooling to trigger it at scale.

Additionally, writing the same purchase through both Kafka and SQS/Airflow causes duplicate data in Neo4j because the PURCHASED relationship MERGE keys only on (user_id, product_id) and blindly appends receipt_ids/timestamps on match.

This document covers five areas:

  1. Near-term: SQS Backfill Job
  2. Mid-term: Airflow Snowflake Backfill
  3. Kafka + Backfill Dedup Strategies
  4. Feature Flipper New User Detection
  5. BTS Shop Order Backfill (PLT-591)
  6. Cloud Backfill Endpoint (PLT-591)

Create a CLI tool that sends SQS user_load_request messages for a list of user IDs. The existing SQS worker picks them up, calls Neli for 365 days of purchase history, and writes to Neo4j.

  • Target: ~75K test users initially
  • Rate: 10 req/s (configurable) -> ~2 hours for 75K users
  • Triggerable manually or via cron

New CLI at cmd/backfill/main.go with a rate-limited sender at internal/backfill/sender.go.

CLI flags:

FlagDefaultDescription
--user-file(required)Path to newline-delimited user IDs
--rate-limit10Messages per second
--lookback-days365Days of history to request
--dry-runfalseLog messages without sending
--queue-url$SQS_QUEUE_URLSQS queue URL override

SQS message format (must match UserLoadEvent in internal/worker/orchestrator.go):

{
"event_type": "user_load_request",
"user_id": "<id>",
"reason": "batch_backfill",
"requested_at": "<RFC3339>",
"lookback_days": 365,
"request_id": "<uuid>",
"priority": "low"
}

Progress tracking:

  • Log every 1000 users: Sent 5000/75000 (6.7%), elapsed: 8m20s
  • On failure, write remaining user IDs to a resume file
  • Support --resume-from flag to skip already-sent IDs

Idempotency: The orchestrator already checks loaded_users:{userID} (7d TTL). Users loaded within the past week will be skipped automatically.

Uses the existing SQS queue declared in consumer-graph-worker.yml:

- type: aws_sqs_queue
name: "{{env}}-consumer-graph-queue"
attributes:
visibility_timeout: 600 # 10 min — BTS power users can have 200+ transactions
message_retention_period: 345600 # 4 days

No new infrastructure needed.

Terminal window
# Manual run
go run cmd/backfill/main.go \
--user-file ./data/test-users-75k.txt \
--rate-limit 10 \
--lookback-days 365
# Dry run
go run cmd/backfill/main.go \
--user-file ./data/test-users-75k.txt \
--dry-run
# Cron (ECS Scheduled Task or k8s CronJob)
# Schedule as needed for incremental user additions

At 10 req/s, the Purchase History API (Neli) should be within its standard rate limits. The existing orchestrator has retry + exponential backoff for 429 responses (internal/api/purchasehistory/client.go). If Neli rate-limits us, backfill will slow down but not fail.

  • cmd/backfill/main.go — CLI entry point
  • internal/backfill/sender.go — rate-limited SQS message sender
  • internal/backfill/sender_test.go — unit tests
  • Makefile — add make backfill target

New Airflow DAG that reads purchase history directly from Snowflake and writes to Neo4j. Bypasses the Purchase History API for bulk operations (hundreds of thousands or millions of users).

Create internal/airflow/dags/neo4j_purchase_history_backfill.py following patterns from the existing neo4j_feature_sync.py (same Neo4jHook, SnowflakeHook, batch writes, sample_users_only parameter, alert_failure_callback).

DAG task flow:

validate_schema -> query_user_list -> chunk_users -> [query_purchases -> write_to_neo4j] per chunk -> verify_data

Snowflake source tables:

  • FETCH_SERVICES_PROD.RECEIPT_SERVICE.RECEIPTS — receipt-level data
  • FETCH_SERVICES_PROD.RECEIPT_SERVICE.RECEIPT_ITEMS — item-level with FIDO, brand, category

Query sketch:

SELECT
r.USER_ID_RAW AS user_id,
r.RECEIPT_ID_RAW AS receipt_id,
r.PURCHASE_DATE AS purchase_date,
r.STORE_NAME AS store_name,
r.RETAILER_CHANNEL AS store_type,
i.FIDO AS product_id,
i.BRAND AS brand,
i.CATEGORY_1 AS category,
i.DESCRIPTION AS product_name,
i.FINAL_QUANTITY AS quantity
FROM FETCH_SERVICES_PROD.RECEIPT_SERVICE.RECEIPTS r
JOIN FETCH_SERVICES_PROD.RECEIPT_SERVICE.RECEIPT_ITEMS i
ON r.RECEIPT_ID = i.RECEIPT_ID
WHERE r.USER_ID_RAW IN ({{user_ids}})
AND r.PURCHASE_DATE >= DATEADD(day, -365, CURRENT_DATE())
ORDER BY r.USER_ID_RAW, r.PURCHASE_DATE

Risk: Snowflake table schemas may change over time (column renames, type changes, table moves).

Strategy A: Schema Validation Task (Recommended)

Add a leading DAG task that checks expected columns exist before any data queries:

@task()
def validate_schema():
hook = SnowflakeHook(snowflake_conn_id="snowflake_default")
expected_receipt_cols = {'USER_ID_RAW', 'RECEIPT_ID_RAW', 'PURCHASE_DATE', 'STORE_NAME'}
expected_item_cols = {'FIDO', 'BRAND', 'CATEGORY_1', 'DESCRIPTION', 'FINAL_QUANTITY'}
actual_cols = set(row[3] for row in hook.get_records(
"SHOW COLUMNS IN FETCH_SERVICES_PROD.RECEIPT_SERVICE.RECEIPTS"
))
missing = expected_receipt_cols - actual_cols
if missing:
raise ValueError(f"Schema drift detected! Missing columns in RECEIPTS: {missing}")
# Repeat for RECEIPT_ITEMS...

If validation fails, the DAG short-circuits before wasting compute. The existing alert_failure_callback fires a notification.

Strategy B: Column Aliasing with Fallbacks

Use COALESCE in Snowflake queries for columns known to have rename risk:

COALESCE(i.CATEGORY_1, i.CATEGORY, 'UNCATEGORIZED') AS category

Recommendation: Use both — Strategy A as a hard gate, Strategy B for graceful degradation on known-risk columns.

The Airflow DAG must use dedup-aware Cypher (see Area 3 below). It should NOT blindly append receipt_ids.

  • internal/airflow/dags/neo4j_purchase_history_backfill.py — new DAG
  • internal/airflow/shared_code/schema_validator.py — reusable validation utility

internal/graph/writer.go createPurchaseBatch (lines 261-289) uses:

MERGE (u)-[r:PURCHASED]->(p)
ON MATCH SET
r.receipt_ids = r.receipt_ids + purch.receiptID,
r.times = r.times + 1,
r.timestamps = r.timestamps + datetime(purch.timestamp)

If the same purchase arrives through both Kafka and SQS backfill:

  • receipt_ids array gets ["R1", "R1"] (duplicate)
  • times becomes 2 (should be 1)
  • timestamps gets [T1, T1] (duplicate)

The Kafka and SQS paths use different idempotency key namespaces in Valkey, so the cache doesn’t prevent cross-pipeline duplicates:

  • Kafka: idempotency:kafka:factoid:{summaryID} / idempotency:kafka:receipt:{receiptID}
  • SQS: idempotency:{requestID}
Section titled “Strategy 1: Dedup-Aware Cypher (Recommended)”

Fix the ON MATCH SET to check before appending:

MERGE (u)-[r:PURCHASED]->(p)
ON CREATE SET
r.timestamps = [datetime(purch.timestamp)],
r.receipt_ids = [purch.receiptID],
r.times = 1,
r.last = datetime(purch.timestamp),
r.avg_interval_days = 0,
r.repurchase_likelihood = 0.0
ON MATCH SET
r.receipt_ids = CASE
WHEN purch.receiptID IN r.receipt_ids THEN r.receipt_ids
ELSE r.receipt_ids + purch.receiptID
END,
r.timestamps = CASE
WHEN purch.receiptID IN r.receipt_ids THEN r.timestamps
ELSE r.timestamps + datetime(purch.timestamp)
END,
r.times = CASE
WHEN purch.receiptID IN r.receipt_ids THEN r.times
ELSE r.times + 1
END,
r.last = CASE
WHEN datetime(purch.timestamp) > r.last THEN datetime(purch.timestamp)
ELSE r.last
END

Why this is the right fix:

  • Single change point — all pipelines (Kafka, SQS, Airflow) benefit automatically
  • IN check on a small array (typical <50 receipt_ids per user+product pair) is cheap in Neo4j
  • No changes needed to Kafka or SQS processing logic
  • This is a prerequisite for Area 1 and Area 2. Must ship before running any backfill.

Files to modify:

  • internal/graph/writer.go — fix createPurchaseBatch Cypher
  • internal/graph/writer_test.go — add duplicate receipt test

Strategy 2: Cross-Pipeline Valkey Keys (Optional)

Section titled “Strategy 2: Cross-Pipeline Valkey Keys (Optional)”

Add receipt-level idempotency across pipelines:

receipt_processed:{receipt_id}:{product_id} -> "1" (TTL: 30 days)

Check this key in both internal/kafka/graph_processor.go and internal/worker/orchestrator.go before calling PersistUserGraph().

ProsCons
Prevents reaching Neo4j for duplicatesAdds millions of Valkey keys
Reduces Neo4j write load30-day TTL means older duplicates slip through

Verdict: Nice-to-have optimization. The Cypher fix (Strategy 1) alone is sufficient.

For already-corrupted data in Neo4j:

MATCH (u:User)-[r:PURCHASED]->(p:Product)
WHERE size(r.receipt_ids) <> size(apoc.coll.toSet(r.receipt_ids))
SET r.receipt_ids = apoc.coll.toSet(r.receipt_ids),
r.timestamps = apoc.coll.toSet(r.timestamps),
r.times = size(apoc.coll.toSet(r.receipt_ids))

Requires APOC library. If APOC is unavailable, use REDUCE + list comprehension equivalent.

Run as a one-time migration after deploying the Cypher fix.


The Feature Flipper library (feature-flipper-service/v2) only exposes Check(flagName, WithUserId(userID)). There is no ListUsers() or OnChange() API available. The library internally consumes the feature-flipper-flags Kafka topic and reads from S3 bucket {{env}}-feature-flipper.

Section titled “Strategy A: Polling-Based Detection (Recommended for near-term)”

Add a periodic job that:

  1. Gets candidate user IDs from an external source (Snowflake query of recently active users, or a maintained file)
  2. Checks each candidate against featureflag.IsIngestionEnabled(userID)
  3. Compares against a Valkey set backfill:known_enabled_users
  4. For newly-enabled users not already in loaded_users:{userID}, sends SQS backfill messages

Implementation: New component in unified-worker following the SchedulerComponent pattern at cmd/unified-worker/scheduler_component.go.

ProsCons
Works today with existing APIsRequires an external candidate user list
Simple, self-healingPolling delay (configurable, e.g., hourly)
No dependency on FF internals

Files to create/modify:

  • internal/backfill/detector.go — polling detection logic
  • internal/backfill/detector_test.go — unit tests
  • cmd/unified-worker/backfill_detector_component.go — unified worker component
  • cmd/unified-worker/main.go — wire up detector component

Strategy B: Feature Flipper Kafka Topic Consumer (Investigate for mid-term)

Section titled “Strategy B: Feature Flipper Kafka Topic Consumer (Investigate for mid-term)”

Consume feature-flipper-flags topic directly to detect flag changes in real-time. The dependency is already configured in consumer-graph-worker.yml with read permissions and a consumer group.

ProsCons
Real-time detectionUndocumented message format
No external user list neededFormat may change without notice
Infrastructure already provisionedFlag changes may be percentage-based, not per-user

Next step: Sample production messages from the topic to determine feasibility. If the message format includes per-user flag changes with user IDs, this becomes viable.

Strategy C: S3 Bucket Monitoring (Alternative)

Section titled “Strategy C: S3 Bucket Monitoring (Alternative)”

Read {{env}}-feature-flipper S3 bucket (read permissions already granted in FSD YAML) to find the user list for the consumer_graph_ingestion flag. Diff against a known set periodically.

ProsCons
S3 likely has ground truthUnknown object structure
Read permissions already grantedNeeds investigation of FF S3 format

Next step: List objects in the bucket to determine if per-flag user lists are stored as readable files.

  1. Now: Implement Strategy A (polling-based). Works immediately with existing APIs.
  2. In parallel: Investigate Strategy B (Kafka topic) and Strategy C (S3 bucket) to understand the data formats. If viable, upgrade to real-time detection.

The Purchase History API only returns receipt-based purchases. Shop orders (Fetch Shop / Button transactions) flow exclusively through the factoid Kafka stream. When the IsUserLoaded skip logic was active (see PR #142 for removal), factoid events were silently dropped for recently-backfilled users, leaving shop purchase data permanently missing from the graph. This caused false-positive repurchase DMs (e.g., a user received a DM for a product they already bought via Fetch Shop).

Query the Button Transaction Service (BTS) for historical shop orders alongside the existing Purchase History API backfill.

API endpoints used:

  • GET /user/{userId}/transaction?startTimeMs={365_days_ago} — list of transactions (summary)
  • GET /internal/transaction/{buttonOrderId}/button — full transaction with line items and FIDOs

Data retention: BTS MongoDB retains 2+ years of transaction data (verified in prod). A 365-day lookback is well within range.

PLT-591 originally proposed FIDO + timestamp as a new dedupe key. After investigation, this is unnecessary:

  1. buttonOrderId = factoid summaryID. The BTS factoid builder (button-transaction-service/internal/services/factoid_service/builder.go:175) sets SummaryId: tfd.ButtonOrderId. Both the real-time factoid Kafka path and the BTS backfill share the same key.

  2. Existing Cypher dedup works automatically. The graph writer checks purch.receiptID IN r.receipt_ids. Since factoid Kafka uses summaryID as receiptID and BTS backfill uses buttonOrderId as receiptID — and these are the same value — no changes to the graph writer are needed.

  3. FIDO + timestamp would lose per-order granularity. Two shop orders for the same product on the same day would be collapsed into one purchase.

  4. Receipt and shop data don’t overlap. The Purchase History API and BTS represent different purchase channels. They never produce the same physical purchase, so cross-source dedup is not needed.

The BTS backfill runs as part of ProcessUserLoad in the orchestrator, after the receipt backfill:

SQS message (user_load_request)
→ ProcessUserLoad()
→ processReceiptBackfill() ← Purchase History API (existing)
→ ProcessShopOrderBackfill() ← BTS (new, PLT-591)

Each has an independent idempotency key (receipt-{requestID} / shop-{requestID}) so partial failures can be retried independently.

Files:

  • internal/api/bts/client.go — HTTP client with rate limiting, retry, URL escaping
  • internal/api/bts/types.go — BTS response types
  • internal/api/bts/transformer.go — BTS transaction → GraphData (reuses FIDO enrichment)
  • internal/worker/orchestrator.gomergeGraphData(), refactored ProcessUserLoad()

Receipt and BTS data are fetched in parallel with fail-fast cancellation (if either fails, the other is cancelled immediately). Results are merged into a single GraphData and written atomically with FullLoad=true. This ensures SHOPS_IN/SHOPS_AT aggregates always reflect combined totals from both sources. On retry, the same merge+overwrite produces the same result — idempotent.

Any failure (receipt fetch, BTS fetch, any individual BTS detail call, Neo4j write) fails the entire job. SQS retries from scratch, re-fetching both sources. This prevents incomplete FullLoad=true writes from corrupting aggregates.

Production data: ~29% of users have BTS transactions, median 3, p90=32, p99=171, max=288.

  • ≤50 validated transactions (93% of active users): sequential detail calls
  • >50 transactions: 5 concurrent goroutines with semaphore, context.WithCancel on first error

Constants btsConcurrencyThreshold (50) and btsConcurrencyWorkers (5) are at the top of orchestrator.go.

BTS power users (200+ transactions) can take several minutes to process. The SQS visibility timeout is configurable:

  • YAML: sqs_consumer.visibility_timeout (seconds)
  • Env: SQS_VISIBILITY_TIMEOUT
  • Default: 600s (10 min)
  • FSD queue default (consumer-graph-worker.yml): 600s

The existing backfill CLI (cmd/backfill/main.go) requires local AWS credentials via TempAdmin. This is operationally fragile and doesn’t scale for ongoing reingestion.

POST /admin/backfill HTTP endpoint on the unified worker. Runs in ECS using the service’s IAM role — no TempAdmin needed.

Three source modes:

Terminal window
# Backfill specific users
curl -X POST http://<ecs>:8080/admin/backfill \
-d '{"user_ids": ["id1", "id2"]}'
# Backfill from S3 file
curl -X POST http://<ecs>:8080/admin/backfill \
-d '{"s3_uri": "s3://bucket/user-ids.txt"}'
# Backfill all users in Neo4j
curl -X POST http://<ecs>:8080/admin/backfill \
-d '{"source": "neo4j"}'

Status polling: GET /admin/backfill/status Cancellation: POST /admin/backfill/cancel

Enrollment and bulk backfill use separate SQS queues so new user enrollments aren’t blocked behind bulk backfill:

Enrollment API → {{env}}-consumer-graph-enrollment-queue (2 dedicated workers)
Bulk backfill → {{env}}-consumer-graph-queue (5 workers)
Both call ProcessUserLoad (receipt + BTS)

The enrollment endpoint (POST /consumer-graph-worker/enroll) sends to the enrollment queue. The cloud backfill endpoint and CLI tool send to the main queue. Both queues are processed by separate SQSConsumerComponent instances running the same ProcessUserLoad code.

Config:

  • enrollment_consumer.queue_url / ENROLLMENT_QUEUE_URL — enrollment queue URL
  • enrollment_consumer.worker_count — default 2
  • enrollment_consumer.visibility_timeout — default 600s
  • One backfill job at a time (concurrency guard)
  • 1M user ID cap (inline, S3, and Neo4j sources)
  • Request body limited to 50MB
  • S3 URI validated synchronously before accepting
  • Cancellable context — stops on server shutdown
  • User IDs deduplicated before sending

Files:

  • cmd/unified-worker/server.go — endpoint handlers

StepAreaDescriptionPrerequisite
13Fix dedup-aware Cypher in internal/graph/writer.goNone
21Build SQS backfill CLI (cmd/backfill/)Step 1
33Run one-time data cleanup in Neo4jStep 1
41Execute 75K user backfill, monitorSteps 1-2
52Build Airflow Snowflake backfill DAGStep 1
64Build polling-based FF new user detectionStep 2
74Investigate FF Kafka topic / S3 bucket formatsNone (can run in parallel)

RiskLikelihoodImpactMitigation
Neo4j overload during backfillMediumHighRate limit at 10 req/s; monitor Neo4j metrics
Snowflake schema drift breaks Airflow DAGMediumMediumSchema validation task as DAG prerequisite
Dedup Cypher change causes regressionLowHighThorough testing; staged rollout
Purchase History API rate limits during backfillLowMediumExisting retry + exponential backoff in client
Feature Flipper Kafka topic format changesMediumLowOnly impacts Strategy B; Strategy A (polling) is unaffected

5. Operational Runbook: Running a Backfill

Section titled “5. Operational Runbook: Running a Backfill”
  1. The FullLoad dedup fix must be deployed. The unified worker must include the conditional SHOPS_IN/SHOPS_AT logic (see “FullLoad Conditional Logic” below).

  2. Kafka consumers must be running and caught up (lag = 0) before the feature flag is turned on. The Kafka topic retains ~7 days of messages. Consumer offsets are per-partition, not per-user. If the consumers are already at the tip of the topic when the flag is enabled for new users, the retained messages for those users have already been read and discarded (because the flag was off). Turning on the flag only affects new messages going forward — there is no replay of old messages and therefore no double-counting risk.

    Important: If you turn on the flag before consumers have caught up (e.g., consumers were stopped or restarted with a reset offset), the consumers would reprocess retained messages for the newly-enabled users, and a subsequent backfill would double-count SHOPS_IN/SHOPS_AT aggregates for those users.

1. Deploy unified worker with FullLoad dedup fix
2. Verify Kafka consumer lag is zero (consumers are caught up)
3. Update consumer_graph_ingestion feature flag segment to include target users
(e.g., internal employees segment)
4. Run backfill — FullLoad=true overwrites aggregates with correct totals

The consumer_graph_ingestion feature flag gates which users’ Kafka messages are processed. Before running a backfill for a set of users, their user IDs must be included in the flag’s segment so that Kafka ingests their data. If a user is backfilled but not in the feature flag segment, Kafka won’t process their future receipts, and the graph will go stale.

Why no wait is needed between steps 3 and 4: When the flag is turned on while consumers are already at the tip of the topic, the 7-day retained messages have already been consumed and discarded (flag was off at the time). No retained messages will be replayed for the newly-enabled users. The backfill can run immediately.

Once Kafka is consuming in real-time, the backfill can be run at any time — including re-runs. The backfill overwrites aggregates, so it self-corrects. Any new Kafka messages arriving after the backfill will add incrementally to the correct baseline.

The FullLoad field on GraphData (internal/api/purchasehistory/types.go) controls how aggregate relationships are written:

RelationshipFullLoad=true (backfill)FullLoad=false (Kafka incremental)
PURCHASEDReceipt-ID dedup (always safe)Receipt-ID dedup (always safe)
SHOPS_IN (User→Category)purchase_count = cat.purchaseCount (overwrite)purchase_count += cat.purchaseCount (add)
SHOPS_AT (User→Retailer)frequency = ret.visits (overwrite)frequency += ret.visits (add)
  • Backfill path (internal/worker/orchestrator.go): Sets graphData.FullLoad = true after calling TransformToGraphData. The Purchase History API returns the complete history, so aggregates should be overwritten.
  • Kafka path (internal/kafka/graph_processor.go): FullLoad defaults to false. Each Kafka event contains data from a single receipt, so aggregates should be added to.
Terminal window
# Dry run (logs only, no SQS messages sent)
./bin/backfill \
--user-file ./data/internal_employees.csv \
--queue-url "https://sqs.us-east-1.amazonaws.com/705747465351/prod-consumer-graph-queue" \
--rate-limit 10 \
--lookback-days 365 \
--dry-run
# Actual run
AWS_PROFILE=prod-services-admin ./bin/backfill \
--user-file ./data/internal_employees.csv \
--queue-url "https://sqs.us-east-1.amazonaws.com/705747465351/prod-consumer-graph-queue" \
--rate-limit 10 \
--lookback-days 365

Before running a backfill, verify that consumer lag is zero for both consumer groups. The CloudWatch query requires three dimensions: Consumer Group, Cluster Name, and Topic.

Terminal window
# Factoids consumer lag
aws cloudwatch get-metric-statistics \
--namespace AWS/Kafka \
--metric-name SumOffsetLag \
--dimensions "Name=Consumer Group,Value=unified-worker-factoids" \
"Name=Cluster Name,Value=prod-factoids" \
"Name=Topic,Value=factoid-stream" \
--start-time $(date -u -v-10M +%Y-%m-%dT%H:%M:%S) \
--end-time $(date -u +%Y-%m-%dT%H:%M:%S) \
--period 60 \
--statistics Maximum \
--profile prod-services-admin \
--region us-east-1
# Receipts consumer lag
aws cloudwatch get-metric-statistics \
--namespace AWS/Kafka \
--metric-name SumOffsetLag \
--dimensions "Name=Consumer Group,Value=unified-worker-receipts" \
"Name=Cluster Name,Value=prod-receipt-infra" \
"Name=Topic,Value=pipeline-v1-processed-receipt-events" \
--start-time $(date -u -v-10M +%Y-%m-%dT%H:%M:%S) \
--end-time $(date -u +%Y-%m-%dT%H:%M:%S) \
--period 60 \
--statistics Maximum \
--profile prod-services-admin \
--region us-east-1

All Maximum datapoints should be 0 (or near-zero, e.g., 1-2 from messages arriving in real-time). If datapoints are empty, the consumer group is not active — consumers haven’t been started yet.

Monitor the SQS queue depth during backfill:

Terminal window
watch -n 5 'aws sqs get-queue-attributes \
--queue-url "https://sqs.us-east-1.amazonaws.com/705747465351/prod-consumer-graph-queue" \
--attribute-names ApproximateNumberOfMessages ApproximateNumberOfMessagesNotVisible \
--profile prod-services-admin \
--output table'

A backfill is safe to re-run at any time as long as Kafka consumer lag is zero (i.e., Kafka is consuming in real-time, not replaying a backlog):

  • PURCHASED: Receipt-ID deduplication guard prevents duplicates per user.
  • SHOPS_IN / SHOPS_AT: FullLoad=true overwrites the user’s aggregates with correct totals.
  • User node: MERGE with ON MATCH SET updates last_updated_at.
  • Request IDs: Each backfill invocation generates new backfill-<uuid> request IDs, so the orchestrator’s idempotency check won’t block new runs for the same user.

Last Updated: 2026-04-02