Backfill & Dedup Strategy
Backfill & Dedup Strategy
Section titled “Backfill & Dedup Strategy”Context
Section titled “Context”Consumer-graph-worker ingests purchase data via two Kafka consumers (receipts + factoids) gated by the consumer_graph_ingestion feature flag. An SQS-based user load path exists (calls Purchase History API / Neli for 365-day history), but there is no batch tooling to trigger it at scale.
Additionally, writing the same purchase through both Kafka and SQS/Airflow causes duplicate data in Neo4j because the PURCHASED relationship MERGE keys only on (user_id, product_id) and blindly appends receipt_ids/timestamps on match.
This document covers five areas:
- Near-term: SQS Backfill Job
- Mid-term: Airflow Snowflake Backfill
- Kafka + Backfill Dedup Strategies
- Feature Flipper New User Detection
- BTS Shop Order Backfill (PLT-591)
- Cloud Backfill Endpoint (PLT-591)
1. Near-Term: SQS Backfill Job
Section titled “1. Near-Term: SQS Backfill Job”Create a CLI tool that sends SQS user_load_request messages for a list of user IDs. The existing SQS worker picks them up, calls Neli for 365 days of purchase history, and writes to Neo4j.
- Target: ~75K test users initially
- Rate: 10 req/s (configurable) -> ~2 hours for 75K users
- Triggerable manually or via cron
Design
Section titled “Design”New CLI at cmd/backfill/main.go with a rate-limited sender at internal/backfill/sender.go.
CLI flags:
| Flag | Default | Description |
|---|---|---|
--user-file | (required) | Path to newline-delimited user IDs |
--rate-limit | 10 | Messages per second |
--lookback-days | 365 | Days of history to request |
--dry-run | false | Log messages without sending |
--queue-url | $SQS_QUEUE_URL | SQS queue URL override |
SQS message format (must match UserLoadEvent in internal/worker/orchestrator.go):
{ "event_type": "user_load_request", "user_id": "<id>", "reason": "batch_backfill", "requested_at": "<RFC3339>", "lookback_days": 365, "request_id": "<uuid>", "priority": "low"}Progress tracking:
- Log every 1000 users:
Sent 5000/75000 (6.7%), elapsed: 8m20s - On failure, write remaining user IDs to a resume file
- Support
--resume-fromflag to skip already-sent IDs
Idempotency: The orchestrator already checks loaded_users:{userID} (7d TTL). Users loaded within the past week will be skipped automatically.
Infrastructure
Section titled “Infrastructure”Uses the existing SQS queue declared in consumer-graph-worker.yml:
- type: aws_sqs_queue name: "{{env}}-consumer-graph-queue" attributes: visibility_timeout: 600 # 10 min — BTS power users can have 200+ transactions message_retention_period: 345600 # 4 daysNo new infrastructure needed.
# Manual rungo run cmd/backfill/main.go \ --user-file ./data/test-users-75k.txt \ --rate-limit 10 \ --lookback-days 365
# Dry rungo run cmd/backfill/main.go \ --user-file ./data/test-users-75k.txt \ --dry-run
# Cron (ECS Scheduled Task or k8s CronJob)# Schedule as needed for incremental user additionsQPS Risk
Section titled “QPS Risk”At 10 req/s, the Purchase History API (Neli) should be within its standard rate limits. The existing orchestrator has retry + exponential backoff for 429 responses (internal/api/purchasehistory/client.go). If Neli rate-limits us, backfill will slow down but not fail.
Files to create/modify
Section titled “Files to create/modify”cmd/backfill/main.go— CLI entry pointinternal/backfill/sender.go— rate-limited SQS message senderinternal/backfill/sender_test.go— unit testsMakefile— addmake backfilltarget
2. Mid-Term: Airflow Snowflake Backfill
Section titled “2. Mid-Term: Airflow Snowflake Backfill”New Airflow DAG that reads purchase history directly from Snowflake and writes to Neo4j. Bypasses the Purchase History API for bulk operations (hundreds of thousands or millions of users).
Design
Section titled “Design”Create internal/airflow/dags/neo4j_purchase_history_backfill.py following patterns from the existing neo4j_feature_sync.py (same Neo4jHook, SnowflakeHook, batch writes, sample_users_only parameter, alert_failure_callback).
DAG task flow:
validate_schema -> query_user_list -> chunk_users -> [query_purchases -> write_to_neo4j] per chunk -> verify_dataSnowflake source tables:
FETCH_SERVICES_PROD.RECEIPT_SERVICE.RECEIPTS— receipt-level dataFETCH_SERVICES_PROD.RECEIPT_SERVICE.RECEIPT_ITEMS— item-level with FIDO, brand, category
Query sketch:
SELECT r.USER_ID_RAW AS user_id, r.RECEIPT_ID_RAW AS receipt_id, r.PURCHASE_DATE AS purchase_date, r.STORE_NAME AS store_name, r.RETAILER_CHANNEL AS store_type, i.FIDO AS product_id, i.BRAND AS brand, i.CATEGORY_1 AS category, i.DESCRIPTION AS product_name, i.FINAL_QUANTITY AS quantityFROM FETCH_SERVICES_PROD.RECEIPT_SERVICE.RECEIPTS rJOIN FETCH_SERVICES_PROD.RECEIPT_SERVICE.RECEIPT_ITEMS i ON r.RECEIPT_ID = i.RECEIPT_IDWHERE r.USER_ID_RAW IN ({{user_ids}}) AND r.PURCHASE_DATE >= DATEADD(day, -365, CURRENT_DATE())ORDER BY r.USER_ID_RAW, r.PURCHASE_DATESchema Drift Mitigation
Section titled “Schema Drift Mitigation”Risk: Snowflake table schemas may change over time (column renames, type changes, table moves).
Strategy A: Schema Validation Task (Recommended)
Add a leading DAG task that checks expected columns exist before any data queries:
@task()def validate_schema(): hook = SnowflakeHook(snowflake_conn_id="snowflake_default")
expected_receipt_cols = {'USER_ID_RAW', 'RECEIPT_ID_RAW', 'PURCHASE_DATE', 'STORE_NAME'} expected_item_cols = {'FIDO', 'BRAND', 'CATEGORY_1', 'DESCRIPTION', 'FINAL_QUANTITY'}
actual_cols = set(row[3] for row in hook.get_records( "SHOW COLUMNS IN FETCH_SERVICES_PROD.RECEIPT_SERVICE.RECEIPTS" ))
missing = expected_receipt_cols - actual_cols if missing: raise ValueError(f"Schema drift detected! Missing columns in RECEIPTS: {missing}")
# Repeat for RECEIPT_ITEMS...If validation fails, the DAG short-circuits before wasting compute. The existing alert_failure_callback fires a notification.
Strategy B: Column Aliasing with Fallbacks
Use COALESCE in Snowflake queries for columns known to have rename risk:
COALESCE(i.CATEGORY_1, i.CATEGORY, 'UNCATEGORIZED') AS categoryRecommendation: Use both — Strategy A as a hard gate, Strategy B for graceful degradation on known-risk columns.
Neo4j Write Strategy
Section titled “Neo4j Write Strategy”The Airflow DAG must use dedup-aware Cypher (see Area 3 below). It should NOT blindly append receipt_ids.
Files to create
Section titled “Files to create”internal/airflow/dags/neo4j_purchase_history_backfill.py— new DAGinternal/airflow/shared_code/schema_validator.py— reusable validation utility
3. Kafka + Backfill Dedup Strategies
Section titled “3. Kafka + Backfill Dedup Strategies”The Problem
Section titled “The Problem”internal/graph/writer.go createPurchaseBatch (lines 261-289) uses:
MERGE (u)-[r:PURCHASED]->(p)ON MATCH SET r.receipt_ids = r.receipt_ids + purch.receiptID, r.times = r.times + 1, r.timestamps = r.timestamps + datetime(purch.timestamp)If the same purchase arrives through both Kafka and SQS backfill:
receipt_idsarray gets["R1", "R1"](duplicate)timesbecomes2(should be1)timestampsgets[T1, T1](duplicate)
The Kafka and SQS paths use different idempotency key namespaces in Valkey, so the cache doesn’t prevent cross-pipeline duplicates:
- Kafka:
idempotency:kafka:factoid:{summaryID}/idempotency:kafka:receipt:{receiptID} - SQS:
idempotency:{requestID}
Strategy 1: Dedup-Aware Cypher (Recommended)
Section titled “Strategy 1: Dedup-Aware Cypher (Recommended)”Fix the ON MATCH SET to check before appending:
MERGE (u)-[r:PURCHASED]->(p)ON CREATE SET r.timestamps = [datetime(purch.timestamp)], r.receipt_ids = [purch.receiptID], r.times = 1, r.last = datetime(purch.timestamp), r.avg_interval_days = 0, r.repurchase_likelihood = 0.0ON MATCH SET r.receipt_ids = CASE WHEN purch.receiptID IN r.receipt_ids THEN r.receipt_ids ELSE r.receipt_ids + purch.receiptID END, r.timestamps = CASE WHEN purch.receiptID IN r.receipt_ids THEN r.timestamps ELSE r.timestamps + datetime(purch.timestamp) END, r.times = CASE WHEN purch.receiptID IN r.receipt_ids THEN r.times ELSE r.times + 1 END, r.last = CASE WHEN datetime(purch.timestamp) > r.last THEN datetime(purch.timestamp) ELSE r.last ENDWhy this is the right fix:
- Single change point — all pipelines (Kafka, SQS, Airflow) benefit automatically
INcheck on a small array (typical<50receipt_ids per user+product pair) is cheap in Neo4j- No changes needed to Kafka or SQS processing logic
- This is a prerequisite for Area 1 and Area 2. Must ship before running any backfill.
Files to modify:
internal/graph/writer.go— fixcreatePurchaseBatchCypherinternal/graph/writer_test.go— add duplicate receipt test
Strategy 2: Cross-Pipeline Valkey Keys (Optional)
Section titled “Strategy 2: Cross-Pipeline Valkey Keys (Optional)”Add receipt-level idempotency across pipelines:
receipt_processed:{receipt_id}:{product_id} -> "1" (TTL: 30 days)Check this key in both internal/kafka/graph_processor.go and internal/worker/orchestrator.go before calling PersistUserGraph().
| Pros | Cons |
|---|---|
| Prevents reaching Neo4j for duplicates | Adds millions of Valkey keys |
| Reduces Neo4j write load | 30-day TTL means older duplicates slip through |
Verdict: Nice-to-have optimization. The Cypher fix (Strategy 1) alone is sufficient.
Strategy 3: One-Time Data Cleanup
Section titled “Strategy 3: One-Time Data Cleanup”For already-corrupted data in Neo4j:
MATCH (u:User)-[r:PURCHASED]->(p:Product)WHERE size(r.receipt_ids) <> size(apoc.coll.toSet(r.receipt_ids))SET r.receipt_ids = apoc.coll.toSet(r.receipt_ids), r.timestamps = apoc.coll.toSet(r.timestamps), r.times = size(apoc.coll.toSet(r.receipt_ids))Requires APOC library. If APOC is unavailable, use REDUCE + list comprehension equivalent.
Run as a one-time migration after deploying the Cypher fix.
4. Feature Flipper New User Detection
Section titled “4. Feature Flipper New User Detection”The Constraint
Section titled “The Constraint”The Feature Flipper library (feature-flipper-service/v2) only exposes Check(flagName, WithUserId(userID)). There is no ListUsers() or OnChange() API available. The library internally consumes the feature-flipper-flags Kafka topic and reads from S3 bucket {{env}}-feature-flipper.
Strategy A: Polling-Based Detection (Recommended for near-term)
Section titled “Strategy A: Polling-Based Detection (Recommended for near-term)”Add a periodic job that:
- Gets candidate user IDs from an external source (Snowflake query of recently active users, or a maintained file)
- Checks each candidate against
featureflag.IsIngestionEnabled(userID) - Compares against a Valkey set
backfill:known_enabled_users - For newly-enabled users not already in
loaded_users:{userID}, sends SQS backfill messages
Implementation: New component in unified-worker following the SchedulerComponent pattern at cmd/unified-worker/scheduler_component.go.
| Pros | Cons |
|---|---|
| Works today with existing APIs | Requires an external candidate user list |
| Simple, self-healing | Polling delay (configurable, e.g., hourly) |
| No dependency on FF internals |
Files to create/modify:
internal/backfill/detector.go— polling detection logicinternal/backfill/detector_test.go— unit testscmd/unified-worker/backfill_detector_component.go— unified worker componentcmd/unified-worker/main.go— wire up detector component
Strategy B: Feature Flipper Kafka Topic Consumer (Investigate for mid-term)
Section titled “Strategy B: Feature Flipper Kafka Topic Consumer (Investigate for mid-term)”Consume feature-flipper-flags topic directly to detect flag changes in real-time. The dependency is already configured in consumer-graph-worker.yml with read permissions and a consumer group.
| Pros | Cons |
|---|---|
| Real-time detection | Undocumented message format |
| No external user list needed | Format may change without notice |
| Infrastructure already provisioned | Flag changes may be percentage-based, not per-user |
Next step: Sample production messages from the topic to determine feasibility. If the message format includes per-user flag changes with user IDs, this becomes viable.
Strategy C: S3 Bucket Monitoring (Alternative)
Section titled “Strategy C: S3 Bucket Monitoring (Alternative)”Read {{env}}-feature-flipper S3 bucket (read permissions already granted in FSD YAML) to find the user list for the consumer_graph_ingestion flag. Diff against a known set periodically.
| Pros | Cons |
|---|---|
| S3 likely has ground truth | Unknown object structure |
| Read permissions already granted | Needs investigation of FF S3 format |
Next step: List objects in the bucket to determine if per-flag user lists are stored as readable files.
Recommendation
Section titled “Recommendation”- Now: Implement Strategy A (polling-based). Works immediately with existing APIs.
- In parallel: Investigate Strategy B (Kafka topic) and Strategy C (S3 bucket) to understand the data formats. If viable, upgrade to real-time detection.
5. BTS Shop Order Backfill (PLT-591)
Section titled “5. BTS Shop Order Backfill (PLT-591)”Problem
Section titled “Problem”The Purchase History API only returns receipt-based purchases. Shop orders (Fetch Shop / Button transactions) flow exclusively through the factoid Kafka stream. When the IsUserLoaded skip logic was active (see PR #142 for removal), factoid events were silently dropped for recently-backfilled users, leaving shop purchase data permanently missing from the graph. This caused false-positive repurchase DMs (e.g., a user received a DM for a product they already bought via Fetch Shop).
Solution
Section titled “Solution”Query the Button Transaction Service (BTS) for historical shop orders alongside the existing Purchase History API backfill.
API endpoints used:
GET /user/{userId}/transaction?startTimeMs={365_days_ago}— list of transactions (summary)GET /internal/transaction/{buttonOrderId}/button— full transaction with line items and FIDOs
Data retention: BTS MongoDB retains 2+ years of transaction data (verified in prod). A 365-day lookback is well within range.
Dedupe Key: Why Not FIDO + Timestamp?
Section titled “Dedupe Key: Why Not FIDO + Timestamp?”PLT-591 originally proposed FIDO + timestamp as a new dedupe key. After investigation, this is unnecessary:
-
buttonOrderId= factoidsummaryID. The BTS factoid builder (button-transaction-service/internal/services/factoid_service/builder.go:175) setsSummaryId: tfd.ButtonOrderId. Both the real-time factoid Kafka path and the BTS backfill share the same key. -
Existing Cypher dedup works automatically. The graph writer checks
purch.receiptID IN r.receipt_ids. Since factoid Kafka usessummaryIDasreceiptIDand BTS backfill usesbuttonOrderIdasreceiptID— and these are the same value — no changes to the graph writer are needed. -
FIDO + timestamp would lose per-order granularity. Two shop orders for the same product on the same day would be collapsed into one purchase.
-
Receipt and shop data don’t overlap. The Purchase History API and BTS represent different purchase channels. They never produce the same physical purchase, so cross-source dedup is not needed.
Architecture
Section titled “Architecture”The BTS backfill runs as part of ProcessUserLoad in the orchestrator, after the receipt backfill:
SQS message (user_load_request) → ProcessUserLoad() → processReceiptBackfill() ← Purchase History API (existing) → ProcessShopOrderBackfill() ← BTS (new, PLT-591)Each has an independent idempotency key (receipt-{requestID} / shop-{requestID}) so partial failures can be retried independently.
Files:
internal/api/bts/client.go— HTTP client with rate limiting, retry, URL escapinginternal/api/bts/types.go— BTS response typesinternal/api/bts/transformer.go— BTS transaction → GraphData (reuses FIDO enrichment)internal/worker/orchestrator.go—mergeGraphData(), refactoredProcessUserLoad()
Atomic Merged Write
Section titled “Atomic Merged Write”Receipt and BTS data are fetched in parallel with fail-fast cancellation (if either fails, the other is cancelled immediately). Results are merged into a single GraphData and written atomically with FullLoad=true. This ensures SHOPS_IN/SHOPS_AT aggregates always reflect combined totals from both sources. On retry, the same merge+overwrite produces the same result — idempotent.
Any failure (receipt fetch, BTS fetch, any individual BTS detail call, Neo4j write) fails the entire job. SQS retries from scratch, re-fetching both sources. This prevents incomplete FullLoad=true writes from corrupting aggregates.
Adaptive Concurrency
Section titled “Adaptive Concurrency”Production data: ~29% of users have BTS transactions, median 3, p90=32, p99=171, max=288.
- ≤50 validated transactions (93% of active users): sequential detail calls
- >50 transactions: 5 concurrent goroutines with semaphore,
context.WithCancelon first error
Constants btsConcurrencyThreshold (50) and btsConcurrencyWorkers (5) are at the top of orchestrator.go.
SQS Visibility Timeout
Section titled “SQS Visibility Timeout”BTS power users (200+ transactions) can take several minutes to process. The SQS visibility timeout is configurable:
- YAML:
sqs_consumer.visibility_timeout(seconds) - Env:
SQS_VISIBILITY_TIMEOUT - Default: 600s (10 min)
- FSD queue default (
consumer-graph-worker.yml): 600s
6. Cloud Backfill Endpoint (PLT-591)
Section titled “6. Cloud Backfill Endpoint (PLT-591)”Problem
Section titled “Problem”The existing backfill CLI (cmd/backfill/main.go) requires local AWS credentials via TempAdmin. This is operationally fragile and doesn’t scale for ongoing reingestion.
Solution
Section titled “Solution”POST /admin/backfill HTTP endpoint on the unified worker. Runs in ECS using the service’s IAM role — no TempAdmin needed.
Three source modes:
# Backfill specific userscurl -X POST http://<ecs>:8080/admin/backfill \ -d '{"user_ids": ["id1", "id2"]}'
# Backfill from S3 filecurl -X POST http://<ecs>:8080/admin/backfill \ -d '{"s3_uri": "s3://bucket/user-ids.txt"}'
# Backfill all users in Neo4jcurl -X POST http://<ecs>:8080/admin/backfill \ -d '{"source": "neo4j"}'Status polling: GET /admin/backfill/status
Cancellation: POST /admin/backfill/cancel
Queue Architecture
Section titled “Queue Architecture”Enrollment and bulk backfill use separate SQS queues so new user enrollments aren’t blocked behind bulk backfill:
Enrollment API → {{env}}-consumer-graph-enrollment-queue (2 dedicated workers)Bulk backfill → {{env}}-consumer-graph-queue (5 workers) ↓ Both call ProcessUserLoad (receipt + BTS)The enrollment endpoint (POST /consumer-graph-worker/enroll) sends to the enrollment queue. The cloud backfill endpoint and CLI tool send to the main queue. Both queues are processed by separate SQSConsumerComponent instances running the same ProcessUserLoad code.
Config:
enrollment_consumer.queue_url/ENROLLMENT_QUEUE_URL— enrollment queue URLenrollment_consumer.worker_count— default 2enrollment_consumer.visibility_timeout— default 600s
Safeguards
Section titled “Safeguards”- One backfill job at a time (concurrency guard)
- 1M user ID cap (inline, S3, and Neo4j sources)
- Request body limited to 50MB
- S3 URI validated synchronously before accepting
- Cancellable context — stops on server shutdown
- User IDs deduplicated before sending
Files:
cmd/unified-worker/server.go— endpoint handlers
Implementation Sequence
Section titled “Implementation Sequence”| Step | Area | Description | Prerequisite |
|---|---|---|---|
| 1 | 3 | Fix dedup-aware Cypher in internal/graph/writer.go | None |
| 2 | 1 | Build SQS backfill CLI (cmd/backfill/) | Step 1 |
| 3 | 3 | Run one-time data cleanup in Neo4j | Step 1 |
| 4 | 1 | Execute 75K user backfill, monitor | Steps 1-2 |
| 5 | 2 | Build Airflow Snowflake backfill DAG | Step 1 |
| 6 | 4 | Build polling-based FF new user detection | Step 2 |
| 7 | 4 | Investigate FF Kafka topic / S3 bucket formats | None (can run in parallel) |
Risk Assessment
Section titled “Risk Assessment”| Risk | Likelihood | Impact | Mitigation |
|---|---|---|---|
| Neo4j overload during backfill | Medium | High | Rate limit at 10 req/s; monitor Neo4j metrics |
| Snowflake schema drift breaks Airflow DAG | Medium | Medium | Schema validation task as DAG prerequisite |
| Dedup Cypher change causes regression | Low | High | Thorough testing; staged rollout |
| Purchase History API rate limits during backfill | Low | Medium | Existing retry + exponential backoff in client |
| Feature Flipper Kafka topic format changes | Medium | Low | Only impacts Strategy B; Strategy A (polling) is unaffected |
5. Operational Runbook: Running a Backfill
Section titled “5. Operational Runbook: Running a Backfill”Prerequisites
Section titled “Prerequisites”-
The
FullLoaddedup fix must be deployed. The unified worker must include the conditional SHOPS_IN/SHOPS_AT logic (see “FullLoad Conditional Logic” below). -
Kafka consumers must be running and caught up (lag = 0) before the feature flag is turned on. The Kafka topic retains ~7 days of messages. Consumer offsets are per-partition, not per-user. If the consumers are already at the tip of the topic when the flag is enabled for new users, the retained messages for those users have already been read and discarded (because the flag was off). Turning on the flag only affects new messages going forward — there is no replay of old messages and therefore no double-counting risk.
Important: If you turn on the flag before consumers have caught up (e.g., consumers were stopped or restarted with a reset offset), the consumers would reprocess retained messages for the newly-enabled users, and a subsequent backfill would double-count SHOPS_IN/SHOPS_AT aggregates for those users.
Correct Ordering
Section titled “Correct Ordering”1. Deploy unified worker with FullLoad dedup fix2. Verify Kafka consumer lag is zero (consumers are caught up)3. Update consumer_graph_ingestion feature flag segment to include target users (e.g., internal employees segment)4. Run backfill — FullLoad=true overwrites aggregates with correct totalsThe consumer_graph_ingestion feature flag gates which users’ Kafka messages are processed. Before running a backfill for a set of users, their user IDs must be included in the flag’s segment so that Kafka ingests their data. If a user is backfilled but not in the feature flag segment, Kafka won’t process their future receipts, and the graph will go stale.
Why no wait is needed between steps 3 and 4: When the flag is turned on while consumers are already at the tip of the topic, the 7-day retained messages have already been consumed and discarded (flag was off at the time). No retained messages will be replayed for the newly-enabled users. The backfill can run immediately.
Once Kafka is consuming in real-time, the backfill can be run at any time — including re-runs. The backfill overwrites aggregates, so it self-corrects. Any new Kafka messages arriving after the backfill will add incrementally to the correct baseline.
FullLoad Conditional Logic
Section titled “FullLoad Conditional Logic”The FullLoad field on GraphData (internal/api/purchasehistory/types.go) controls how aggregate relationships are written:
| Relationship | FullLoad=true (backfill) | FullLoad=false (Kafka incremental) |
|---|---|---|
| PURCHASED | Receipt-ID dedup (always safe) | Receipt-ID dedup (always safe) |
| SHOPS_IN (User→Category) | purchase_count = cat.purchaseCount (overwrite) | purchase_count += cat.purchaseCount (add) |
| SHOPS_AT (User→Retailer) | frequency = ret.visits (overwrite) | frequency += ret.visits (add) |
- Backfill path (
internal/worker/orchestrator.go): SetsgraphData.FullLoad = trueafter callingTransformToGraphData. The Purchase History API returns the complete history, so aggregates should be overwritten. - Kafka path (
internal/kafka/graph_processor.go):FullLoaddefaults tofalse. Each Kafka event contains data from a single receipt, so aggregates should be added to.
Backfill Command
Section titled “Backfill Command”# Dry run (logs only, no SQS messages sent)./bin/backfill \ --user-file ./data/internal_employees.csv \ --queue-url "https://sqs.us-east-1.amazonaws.com/705747465351/prod-consumer-graph-queue" \ --rate-limit 10 \ --lookback-days 365 \ --dry-run
# Actual runAWS_PROFILE=prod-services-admin ./bin/backfill \ --user-file ./data/internal_employees.csv \ --queue-url "https://sqs.us-east-1.amazonaws.com/705747465351/prod-consumer-graph-queue" \ --rate-limit 10 \ --lookback-days 365Checking Kafka Consumer Lag
Section titled “Checking Kafka Consumer Lag”Before running a backfill, verify that consumer lag is zero for both consumer groups. The CloudWatch query requires three dimensions: Consumer Group, Cluster Name, and Topic.
# Factoids consumer lagaws cloudwatch get-metric-statistics \ --namespace AWS/Kafka \ --metric-name SumOffsetLag \ --dimensions "Name=Consumer Group,Value=unified-worker-factoids" \ "Name=Cluster Name,Value=prod-factoids" \ "Name=Topic,Value=factoid-stream" \ --start-time $(date -u -v-10M +%Y-%m-%dT%H:%M:%S) \ --end-time $(date -u +%Y-%m-%dT%H:%M:%S) \ --period 60 \ --statistics Maximum \ --profile prod-services-admin \ --region us-east-1
# Receipts consumer lagaws cloudwatch get-metric-statistics \ --namespace AWS/Kafka \ --metric-name SumOffsetLag \ --dimensions "Name=Consumer Group,Value=unified-worker-receipts" \ "Name=Cluster Name,Value=prod-receipt-infra" \ "Name=Topic,Value=pipeline-v1-processed-receipt-events" \ --start-time $(date -u -v-10M +%Y-%m-%dT%H:%M:%S) \ --end-time $(date -u +%Y-%m-%dT%H:%M:%S) \ --period 60 \ --statistics Maximum \ --profile prod-services-admin \ --region us-east-1All Maximum datapoints should be 0 (or near-zero, e.g., 1-2 from messages arriving in real-time). If datapoints are empty, the consumer group is not active — consumers haven’t been started yet.
Monitoring
Section titled “Monitoring”Monitor the SQS queue depth during backfill:
watch -n 5 'aws sqs get-queue-attributes \ --queue-url "https://sqs.us-east-1.amazonaws.com/705747465351/prod-consumer-graph-queue" \ --attribute-names ApproximateNumberOfMessages ApproximateNumberOfMessagesNotVisible \ --profile prod-services-admin \ --output table'Re-running a Backfill
Section titled “Re-running a Backfill”A backfill is safe to re-run at any time as long as Kafka consumer lag is zero (i.e., Kafka is consuming in real-time, not replaying a backlog):
- PURCHASED: Receipt-ID deduplication guard prevents duplicates per user.
- SHOPS_IN / SHOPS_AT:
FullLoad=trueoverwrites the user’s aggregates with correct totals. - User node:
MERGEwithON MATCH SETupdateslast_updated_at. - Request IDs: Each backfill invocation generates new
backfill-<uuid>request IDs, so the orchestrator’s idempotency check won’t block new runs for the same user.
Last Updated: 2026-04-02