Repurchase Nudge — Direct Message Pipeline
Repurchase Nudge — Direct Message Pipeline
Section titled “Repurchase Nudge — Direct Message Pipeline”What It Is
Section titled “What It Is”The repurchase nudge sends personalized push notifications reminding users to rebuy products they purchase regularly, timed to arrive just before they’re likely to run out. Each notification links to an AI assistant episode in the Fetch app with a product card showing the item, price, and points incentive.
How It Works
Section titled “How It Works”Daily at 3 AM UTC → Neo4j: find users with repeat purchases (≥2x, 90+ day history) → Filter: category hierarchy (block non-products), due today, already notified → Enrich: CCS batch call for points/offers/merchant data → Eligibility: points ≥50, not purchased too recently, not snoozed/opted-out → Select: top 1 candidate per user by expected value → Feature flag gate: consumer_dm_enabled_backend (per-user) → Create episode (consumer-agent) + send push (notification-service)Pipeline Stages
Section titled “Pipeline Stages”1. Candidate Discovery
Section titled “1. Candidate Discovery”Where: internal/notification/repurchase/neo4j.go
A Cypher query finds users with repeat purchase patterns:
MATCH (u:User)-[r:PURCHASED]->(p:Product)WHERE r.times >= 2 AND p.category_hierarchy IS NOT NULL AND size(p.category_hierarchy) > 0 AND NOT p.category_hierarchy[0] IN $blockedTier1CategoryIDs AND r.last >= datetime() - duration({days: 90}) AND r.avg_interval_days > 0Results are paginated in batches of 1000 for memory efficiency.
Category filtering: Products must have a resolved Fetch taxonomy hierarchy. The Tier 1 (root) UUID is checked against a blocklist:
| Blocked Tier 1 | UUID | Why |
|---|---|---|
| Alcohol | 6da2ba91-... | Age-gated |
| Fees | 33df9308-... | Taxes, bag fees, bottle deposits, shipping |
| Savings & Coupons | e67f53ec-... | Coupon line items |
| Membership and Perks | 98c2d3b0-... | Loyalty fees |
Products with no category_hierarchy (not yet enriched with taxonomy data) are also excluded.
2. Early Filters (Before Enrichment)
Section titled “2. Early Filters (Before Enrichment)”These run before any external HTTP calls to minimize cost:
| Filter | What it checks | On failure |
|---|---|---|
| Already processed | User got a notification today (Valkey) | Skip |
| Due today | Predicted send time falls within today | Skip (unless force=true) |
3. CCS Batch Enrichment
Section titled “3. CCS Batch Enrichment”Where: internal/scheduler/orchestrator.go
A single batch HTTP call to Consumer Context Service enriches all remaining candidates with:
- Offer points (active offers for this user + product)
- PPD points (Button merchant pay-per-discovery)
- Whether the product is on Fetch Shop
- Merchant name, price, image URL
4. Eligibility Checks
Section titled “4. Eligibility Checks”Where: internal/scheduler/eligibility.go
Run concurrently (50 workers) after enrichment:
| Check | Threshold | Default |
|---|---|---|
| Min purchases | r.times >= 2 | 2 |
| Min history | First purchase ≥90 days ago | 90 days |
| Not too recent | Days since last buy ≥ 80% of avg interval | 0.8 |
| Min points | Total points (offer + PPD) ≥ 50 | 50 |
| Snooze/Opt-out | User hasn’t snoozed or opted out | Valkey state |
| Platform | User has an iOS device | ["ios"] |
5. Scheduling Calculations
Section titled “5. Scheduling Calculations”Where: internal/scheduler/calculator.go, internal/notification/repurchase/send_time_analyzer.go
Day-level: predicted_next_purchase = last_purchase + avg_interval_days
Time-of-day: Analyzes purchase timestamps with exponential recency weighting (decay factor: 0.5). Confidence levels:
- High (≥3 purchases, stdDev ≤ 3h): Use predicted hour
- Medium (≥2, stdDev ≤ 5h): Use predicted hour
- Low: Fall back to 9 AM
Send time = predicted purchase time − 24h lead, clamped to delivery window (prod: 8–10 AM local).
6. Selection & Ranking
Section titled “6. Selection & Ranking”Where: internal/scheduler/selector.go
When a user has multiple eligible products, pick the best one:
- Expected Value (descending):
repurchase_likelihood × (total_points − 25) - Tie-break: repurchase likelihood → expected points → purchase count
- Max 1 notification per user per day
7. Feature Flag Gate
Section titled “7. Feature Flag Gate”Flag: consumer_dm_enabled_backend (Feature Flipper, per-user rollout)
Checked right before dispatch. Gated users are logged as “would-have-sent” but NOT marked as processed — they’ll be retried if the flag enables later.
8. Notification Dispatch
Section titled “8. Notification Dispatch”Where: internal/notification/repurchase/adapter.go
Two API calls per notification:
Step 1 — Create episode (consumer-agent):
POST /v1/users/{userID}/episodesContains a templated message + a product card component with the FIDO ID. The product card includes inline title, price, and image from CCS enrichment. Rover-agent re-enriches product cards at read time when the user opens the episode.
Step 2 — Send notification (notification-service):
POST /v1/gateway/notifyIncludes the scheduled send time, a deeplink to the episode (fetchrewards://ai_assistant?subAction=episode&subActionValue={episodeID}), priority score (sigmoid-normalized expected value), and metadata for analytics.
Dedup: Request ID is a UUID v5 hash of repurchase_nudge:{userID}:{productID}:{date}, so the same user+product on the same day always produces the same ID.
Message Templates
Section titled “Message Templates”Three variants based on available incentives:
| Variant | Condition | Title | Push Body |
|---|---|---|---|
| ShopOnly | On Fetch Shop, no offer | ”Time to restock {Product}?" | "Low on {Product}? Get {Points} when you buy it at {Merchant} on Fetch Shop” |
| DoubleDip | On Fetch Shop + offer | ”Double dip on points when you rebuy {Product}" | "Double-dip and get {Points} by repurchasing {Product} at {Merchant}…” |
| OfferOnly | Offer only, not on shop | ”Time to restock {Product}!" | "Earn {Points} on your next {Brand} {Product} purchase” |
Points calculation:
- ShopOnly: PPD points only
- DoubleDip: PPD + offer points
- OfferOnly: Offer points only
Episode body uses {{total_points}} placeholder that rover-agent replaces at delivery time after re-enrichment.
Data Flow: How Products Enter the Graph
Section titled “Data Flow: How Products Enter the Graph”Products reach Neo4j through two Kafka ingestion paths:
| Source | Topic | Cluster | Format |
|---|---|---|---|
| Factoid (Fetch Shop orders) | factoid-stream | {env}-factoids | Protobuf (Buf CSR) |
| Receipt (scanned receipts) | pipeline-v1-processed-receipt-events | {env}-receipt-infra | Protobuf (plain) |
During ingestion, each FIDO is:
- Enriched via FPS (name, brand, category, categoryId, fidoType)
- Filtered: skip non-PRODUCT fidoTypes (BRAND, GENERIC-CATEGORY, etc.)
- Resolved: categoryId → full taxonomy hierarchy via category-service
- Written to Neo4j as a Product node with
category_idandcategory_hierarchy
Configuration
Section titled “Configuration”Production Defaults
Section titled “Production Defaults”| Setting | Value | Notes |
|---|---|---|
| Run time | 3 AM UTC | ~10 PM ET |
| Delivery window | 8–10 AM local | Morning engagement window |
| Min purchases | 2 | |
| Min history days | 90 | |
| Min expected points | 50 | Reduced from 100 (PLT-516, +24% daily sends) |
| Skip-recent threshold | 0.8× interval | |
| Max notifications/day | 1 per user | |
| Notification lead | 24 hours | |
| Eligibility workers | 50 | Concurrent CCS+state checks |
| Allowed platforms | iOS only | |
| dry_run | false | Real sends enabled |
| test_mode | false |
Stage Overrides
Section titled “Stage Overrides”| Setting | Value |
|---|---|
| Delivery window | 0–24 (all day) |
| dry_run | false |
Error Handling
Section titled “Error Handling”| Scenario | Marked Processed? | Retried Next Run? |
|---|---|---|
| CCS enrichment fails | No | Yes |
| Episode creation fails | No | Yes |
| Notification send fails | Yes | No (prevents retry storms) |
| Feature flag gated | No | Yes (when flag enables) |
| Episode creation: 3 retries with exponential backoff (500ms → 1s → 2s) |
Triggering a Run
Section titled “Triggering a Run”- Automatic: Daily at 3 AM UTC
- Manual:
POST /scheduler/trigger?force=true— bypasses the due-today filter - Distributed lock: Valkey lock (10 min TTL) prevents concurrent runs
External Dependencies
Section titled “External Dependencies”| Service | Endpoint | Purpose |
|---|---|---|
| Neo4j | Cypher queries | Purchase graph — candidates, history |
| Category Service | GET /category/:id | Taxonomy hierarchy resolution |
| Profile Service | GET /v1/users/{id}/profile | User timezone |
| Consumer Context Service | GET /v1/products/{fido}/enrich | Offers, PPD, merchant, price, image |
| Consumer Agent | POST /v1/users/{id}/episodes | Create AI assistant episode |
| Notification Service | POST /v1/gateway/notify | Schedule push notification |
| Feature Flipper | SDK | Per-user flag gating (consumer_dm_enabled_backend) |
| Valkey | Keys | Dedup, snooze, opt-out, processed state, distributed lock |
| FPS (FIDO Product Service) | Batch API | Product enrichment during Kafka ingestion |
Key Files
Section titled “Key Files”| File | Role |
|---|---|
internal/scheduler/orchestrator.go | Main run loop — filters, enrichment, dispatch |
internal/notification/repurchase/neo4j.go | Neo4j candidate queries + category filtering |
internal/notification/repurchase/handler.go | Enrichment + filtering orchestration |
internal/notification/repurchase/adapter.go | Episode creation + notification dispatch |
internal/notification/repurchase/send_time_analyzer.go | Time-of-day prediction |
internal/scheduler/calculator.go | Day-level scheduling math |
internal/scheduler/eligibility.go | Points/history eligibility checks |
internal/scheduler/selector.go | Per-user candidate ranking |
internal/scheduler/state.go | Valkey state (processed/snooze/opt-out) |
internal/api/consumeragent/templates.go | Message template variants |
internal/api/notificationservice/templates.go | Notification request builder |
internal/api/category/client.go | Category-service client for taxonomy |
internal/kafka/receipt_transformer.go | Receipt ingestion → graph data |
internal/kafka/transformer.go | Factoid ingestion → graph data |
internal/graph/writer.go | Neo4j persistence (MERGE queries) |
cmd/unified-worker/scheduler_component.go | Scheduler lifecycle + cron |
config/unified-worker-prod.yaml | Production config |