Backfill Runbook
Backfill Runbook
Section titled “Backfill Runbook”How to backfill a set of user IDs into Neo4j and enable real-time Kafka ingestion for them.
Use case: You have a file of user IDs (e.g., experiment cohort, new segment). You want to:
- Load their 365-day purchase history into Neo4j
- Enable real-time Kafka ingestion so their future receipts/factoids flow into the graph
General Procedure
Section titled “General Procedure”Prerequisites
Section titled “Prerequisites”- Unified worker is deployed with the
FullLoaddedup fix (overwrites SHOPS_IN/SHOPS_AT aggregates on backfill) - Unified worker is deployed with category enrichment code (backfill now enriches Product nodes with
category_idandcategory_hierarchyvia FPS + category-service) - You have AWS credentials with access to the target environment (stage or prod)
- You have
tempadmincredentials for the target AWS account (needed for graph audit and backfill CLI):- Request via https://aws-temp-access.fetchrewards.com/ for
stage-servicesorprod-services
- Request via https://aws-temp-access.fetchrewards.com/ for
- Your user ID file is newline-delimited (one user ID per line)
Step 1: Verify Kafka consumer lag is zero
Section titled “Step 1: Verify Kafka consumer lag is zero”Both consumer groups must be caught up to the tip of their topics before proceeding.
# Factoids consumer lagaws cloudwatch get-metric-statistics \ --namespace AWS/Kafka \ --metric-name SumOffsetLag \ --dimensions "Name=Consumer Group,Value=unified-worker-factoids" \ "Name=Cluster Name,Value=prod-factoids" \ "Name=Topic,Value=factoid-stream" \ --start-time $(date -u -v-10M +%Y-%m-%dT%H:%M:%S) \ --end-time $(date -u +%Y-%m-%dT%H:%M:%S) \ --period 60 \ --statistics Maximum \ --profile prod-services-admin \ --region us-east-1
# Receipts consumer lagaws cloudwatch get-metric-statistics \ --namespace AWS/Kafka \ --metric-name SumOffsetLag \ --dimensions "Name=Consumer Group,Value=unified-worker-receipts" \ "Name=Cluster Name,Value=prod-receipt-infra" \ "Name=Topic,Value=pipeline-v1-processed-receipt-events" \ --start-time $(date -u -v-10M +%Y-%m-%dT%H:%M:%S) \ --end-time $(date -u +%Y-%m-%dT%H:%M:%S) \ --period 60 \ --statistics Maximum \ --profile prod-services-admin \ --region us-east-1For stage, replace the following in both commands:
| Parameter | Prod | Stage |
|---|---|---|
| Cluster Name (factoids) | prod-factoids | stage-factoids |
| Cluster Name (receipts) | prod-receipt-infra | stage-receipt-infra |
| AWS profile | prod-services-admin | stage-services-admin |
All Maximum datapoints should be 0 (or near-zero, e.g., 1-2 from real-time messages). If datapoints are empty, the consumer group is not active.
Do not proceed if lag > 0. If consumers are replaying a backlog, enabling the flag would cause double-counting of SHOPS_IN/SHOPS_AT aggregates.
Step 2: Add user IDs to the feature flag that gates Kafka ingestion
Section titled “Step 2: Add user IDs to the feature flag that gates Kafka ingestion”Add the user IDs as a segment in Feature Flipper on the flag that gates Kafka ingestion for your users. This ensures future receipts/factoids flow into the graph after the backfill.
There are two approaches depending on the number of users:
Option A: Inline IDs (small sets, <1K users)
Section titled “Option A: Inline IDs (small sets, <1K users)”In the Feature Flipper UI, add user IDs directly to a segment’s allowedIDs field (comma-separated).
Option B: S3-backed segment (large sets, >1K users)
Section titled “Option B: S3-backed segment (large sets, >1K users)”Upload the user ID file to the Feature Flipper S3 bucket, then point a segment’s allowedS3IDs to the file.
# Upload to prodaws s3 cp <user-file> \ s3://prod-feature-flipper/<filename>.csv \ --profile prod-services-admin
# Upload to stageaws s3 cp <user-file> \ s3://stage-feature-flipper/<filename>.csv \ --profile stage-services-adminThen in the Feature Flipper UI:
- Go to the flag
- Add a new segment (or update an existing S3-backed segment)
- Set
allowedS3IDsto<filename>.csv - Set rollout to 100
Do this BEFORE the backfill. It’s safe because:
- Kafka consumers are already at the tip of the topic (verified in Step 1)
- The 7-day retained messages were already consumed and discarded when the flag was off for these users
- No retained messages will replay for newly-enabled users
- There is no wait needed between enabling the flag and running the backfill
Note: For experiment-enrolled users (via
/consumer-graph-worker/enroll), the Valkey keyexperiment:enrolled:{user_id}serves as an alternative Kafka gate alongside the feature flag. Those users do not need to be added to the flag. This step is only needed for users being backfilled outside the enrollment API flow.
Step 3: Run pre-backfill graph audit
Section titled “Step 3: Run pre-backfill graph audit”Run /graph-audit <env> (Claude Code skill) before the backfill to capture a baseline. Key metrics to record:
- Total nodes and relationships
- Products with
category_id/category_hierarchycounts - IN_CATEGORY relationship count
- Any existing CRITICAL or WARN issues
This baseline is compared against the post-backfill audit to verify the backfill wrote data correctly.
Step 4: Run the backfill
Section titled “Step 4: Run the backfill”Option A: Cloud endpoint (recommended — no TempAdmin needed)
Section titled “Option A: Cloud endpoint (recommended — no TempAdmin needed)”The unified worker exposes POST /admin/backfill for cloud-based execution. No local AWS credentials required.
# Backfill specific userscurl -X POST http://<ecs-service>:8080/admin/backfill \ -d '{"user_ids": ["user1", "user2"], "rate_limit": 10, "lookback_days": 365}'
# Backfill from S3 filecurl -X POST http://<ecs-service>:8080/admin/backfill \ -d '{"s3_uri": "s3://bucket/user-ids.txt"}'
# Backfill all users currently in Neo4jcurl -X POST http://<ecs-service>:8080/admin/backfill \ -d '{"source": "neo4j"}'
# Check progresscurl http://<ecs-service>:8080/admin/backfill/<job_id>
# Cancel a running backfillcurl -X POST http://<ecs-service>:8080/admin/backfill/<job_id>/cancelOption B: CLI tool (requires TempAdmin)
Section titled “Option B: CLI tool (requires TempAdmin)”# Dry run./bin/backfill \ --user-file <user-file> \ --queue-url "https://sqs.us-east-1.amazonaws.com/705747465351/prod-consumer-graph-queue" \ --rate-limit 10 \ --lookback-days 365 \ --dry-run
# Actual runAWS_PROFILE=prod-services-admin ./bin/backfill \ --user-file <user-file> \ --queue-url "https://sqs.us-east-1.amazonaws.com/705747465351/prod-consumer-graph-queue" \ --rate-limit 10 \ --lookback-days 365Stage queue URL: https://sqs.us-east-1.amazonaws.com/643098206224/stage-consumer-graph-queue
Timing estimates at 10 req/s:
| Users | Time |
|---|---|
| 1,000 | ~2 min |
| 10,000 | ~17 min |
| 75,000 | ~2 hours |
| 200,000 | ~5.5 hours |
The backfill sends SQS messages. The unified worker’s SQS consumer picks them up and runs two backfill steps per user:
- Receipt backfill — calls Purchase History API for 365 days of receipt data, enriches with FPS + category-service, writes to Neo4j
- Shop order backfill — calls Button Transaction Service (BTS) for 365 days of shop order data, enriches FIDOs with FPS, writes to Neo4j
Each step has an independent idempotency key (receipt-{requestID} / shop-{requestID}), so partial failures can be retried without reprocessing the successful step. Shop order dedup uses buttonOrderId as receiptID, which matches the factoid Kafka stream’s summaryID.
Step 5: Monitor
Section titled “Step 5: Monitor”SQS queue depth
Section titled “SQS queue depth”Watch the queue drain to confirm messages are being processed:
watch -n 5 'aws sqs get-queue-attributes \ --queue-url "https://sqs.us-east-1.amazonaws.com/705747465351/prod-consumer-graph-queue" \ --attribute-names ApproximateNumberOfMessages ApproximateNumberOfMessagesNotVisible \ --profile prod-services-admin \ --output table'ApproximateNumberOfMessages= messages waiting to be picked upApproximateNumberOfMessagesNotVisible= messages currently being processed
Both should trend to 0 as the backfill completes.
Loki logs
Section titled “Loki logs”Check consumer-graph-worker logs for backfill progress:
{service_name="consumer-graph-worker", deployment_environment="prod"} |= "user_load"Neo4j verification
Section titled “Neo4j verification”Spot-check a few user IDs to confirm their graphs were written:
curl -s -u 'neo4j:<password>' \ -H "Content-Type: application/json" \ -d '{"statements":[{"statement":"MATCH (u:User {user_id: $uid})-[r]-() RETURN type(r) AS rel, count(r) AS cnt","parameters":{"uid":"<user_id>"}}]}' \ http://<neo4j-host>:7474/db/neo4j/tx/commitStep 6: Run post-backfill graph audit
Section titled “Step 6: Run post-backfill graph audit”Run /graph-audit <env> again after the SQS queue drains to 0/0. Compare against the pre-backfill baseline:
| Metric | What to check |
|---|---|
| User count | Should match number of unique user IDs in the backfill file |
| Product count | Should increase (new products discovered from purchase history) |
Products with category_id | Should increase — FPS returns categoryId for ~47% of products |
Products with category_hierarchy | Should match category_id count (category-service resolves all valid IDs) |
| IN_CATEGORY count | Should increase proportionally to new products |
| PURCHASED count | Should increase (new purchase relationships) |
| Future timestamps | Should remain 0 |
| Health score | Should remain HEALTHY (no new CRITICAL issues) |
If category_hierarchy count is significantly lower than category_id count, investigate category-service connectivity.
Use Case: Backfill AI Assistant Beta Users (~50K from Feature Flipper)
Section titled “Use Case: Backfill AI Assistant Beta Users (~50K from Feature Flipper)”The ai_assistant_enabled Feature Flipper flag contains ~50K users across multiple segments (internal employees, beta testers, active users via S3). To backfill these users into the consumer graph and enable Kafka ingestion:
1. Retrieve user IDs from Feature Flipper
Section titled “1. Retrieve user IDs from Feature Flipper”cd ~/src/consumer-graph-worker
# Fetch all user IDs from ai_assistant_enabled segments + add extra IDspython3 scripts/fetch-beta-users.py \ --flag ai_assistant_enabled \ --extra-ids <comma-separated-extra-ids> \ -o ~/src/data/ai-assistant-beta-users.csvThe script pulls user IDs from all segments (inline allowedIDs and S3-backed allowedS3IDs in s3://prod-feature-flipper/). Use --env stage for staging.
Expected output: ~50K user IDs written to ~/src/data/ai-assistant-beta-users.csv.
2. Verify Kafka consumer lag is zero
Section titled “2. Verify Kafka consumer lag is zero”Follow Step 1. All datapoints must be 0 or near-zero before proceeding.
3. Run pre-backfill graph audit
Section titled “3. Run pre-backfill graph audit”Follow Step 3. Record baseline metrics for comparison.
4. Upload to consumer_graph_ingestion flag via S3
Section titled “4. Upload to consumer_graph_ingestion flag via S3”Follow Step 2 using Option B (S3-backed segment):
aws s3 cp ~/src/data/ai-assistant-beta-users.csv \ s3://prod-feature-flipper/ai-assistant-beta-users.csv \ --profile prod-services-adminThen in the Feature Flipper UI, add a segment to the consumer_graph_ingestion flag:
- Add a new segment (e.g., “AI Assistant Beta Users”)
- Set
allowedS3IDstoai-assistant-beta-users.csv - Set rollout to 100
The consumer_graph_ingestion flag currently has two segments:
- Internal Users (
internalUsers=true) — internal employees - Team Users — 1 inline ID
The new S3-backed segment adds the ~50K AI assistant beta users.
5. Run the backfill
Section titled “5. Run the backfill”Follow Step 4 with the user file:
# Dry run./bin/backfill \ --user-file ~/src/data/ai-assistant-beta-users.csv \ --queue-url "https://sqs.us-east-1.amazonaws.com/705747465351/prod-consumer-graph-queue" \ --rate-limit 10 \ --lookback-days 365 \ --dry-run
# Actual runAWS_PROFILE=prod-services-admin ./bin/backfill \ --user-file ~/src/data/ai-assistant-beta-users.csv \ --queue-url "https://sqs.us-east-1.amazonaws.com/705747465351/prod-consumer-graph-queue" \ --rate-limit 10 \ --lookback-days 365At 10 req/s, ~50K users takes ~1.5 hours.
6. Monitor and post-backfill audit
Section titled “6. Monitor and post-backfill audit”Follow Step 5. Watch SQS queue depth, Loki logs, and spot-check Neo4j.
Once the queue drains to 0/0, follow Step 6 to compare against the baseline.
Re-running a backfill
Section titled “Re-running a backfill”Backfills are safe to re-run at any time as long as Kafka lag is zero:
- PURCHASED: Receipt-ID dedup guard prevents duplicate edges (receipts use
receipt_id, shop orders usebuttonOrderId/summaryID) - SHOPS_IN / SHOPS_AT: Receipt backfill sets
FullLoad=true(overwrites aggregates). Shop order backfill usesFullLoad=false(incremental — adds to receipt-based aggregates) - User node:
MERGEwithON MATCH SETupdateslast_updated_at - Product nodes:
category_idandcategory_hierarchyare updated viaON MATCH SETwithCASEguards — only overwrites when the new value is non-empty, so re-runs and Kafka writes coexist safely - Request IDs: Each run generates new
backfill-<uuid>request IDs with independent receipt/shop idempotency keys, so the orchestrator won’t block re-runs
Category Enrichment in Backfill
Section titled “Category Enrichment in Backfill”As of the category enrichment update, the backfill path now populates category_id and category_hierarchy on Product nodes. The enrichment chain is:
- Purchase History API returns purchase data with FIDO UUIDs
- FIDO Product Service (FPS) returns product metadata including
categoryId(a UUID) - category-service resolves the
categoryIdinto a hierarchy array (list of UUIDs from root to leaf) - Neo4j write persists
category_idandcategory_hierarchyon the Product node
This only runs when the orchestrator has a categoryClient configured — which is the case in the unified worker (both SQS consumer component and main). The standalone cmd/worker passes nil for the category client (no category enrichment).
Kafka path is not affected. The Kafka consumer has its own category enrichment pipeline (PR #134). The Cypher ON MATCH SET with CASE guards ensures safe coexistence:
- If backfill writes first, Kafka overwrites with its own data (always non-empty from real-time events)
- If Kafka writes first, backfill only overwrites if its data is non-empty (which it will be)
- Both paths produce the same result for the same product
Deployment required. The worker must be deployed with the category enrichment code before running backfill. Without it, Product nodes will have category_id and category_hierarchy only from the Kafka path (real-time events after flag enablement).
Troubleshooting
Section titled “Troubleshooting”| Symptom | Cause | Fix |
|---|---|---|
Backfill CLI exits with resume file written | SQS send failures | Use --resume-from <file> to continue from where it stopped |
| Users in Neo4j but no new Kafka data flowing | User not in feature flag segment | Add user IDs to consumer_graph_ingestion flag |
| Duplicate SHOPS_IN/SHOPS_AT counts | Kafka replayed retained messages (lag was not zero) | Re-run backfill — FullLoad=true will overwrite with correct totals |
loaded_users cache blocking re-backfill | 7-day TTL on loaded_users:{user_id} | Wait for TTL expiry, or delete the key manually in Valkey |
| Purchase History API 429 rate limits | Too many concurrent requests | Reduce --rate-limit (default 10 is safe) |
Quick reference
Section titled “Quick reference”1. Verify Kafka lag = 0 ← consumers must be caught up2. Upload users to FF S3 + add segment ← enables Kafka gate for future events3. Run pre-backfill graph audit ← baseline metrics4. Run backfill (dry-run first) ← loads 365-day history + category enrichment into Neo4j5. Monitor SQS + Loki + Neo4j ← confirm completion6. Run post-backfill graph audit ← compare against baseline, verify category dataSee also:
- Backfill Strategy — design details, dedup strategies, Airflow path
- Experiment Enrollment API — mobile enrollment flow (auto-backfill + Valkey-based Kafka gate)
- Capacity Experiment Plan — infrastructure sizing
- scripts/fetch-beta-users.py — script to retrieve user IDs from Feature Flipper