Skip to content

Repurchase Nudge — Direct Message Pipeline

Repurchase Nudge — Direct Message Pipeline

Section titled “Repurchase Nudge — Direct Message Pipeline”

The repurchase nudge sends personalized push notifications reminding users to rebuy products they purchase regularly, timed to arrive just before they’re likely to run out. Each notification links to an AI assistant episode in the Fetch app with a product card showing the item, price, and points incentive.

Daily at 3 AM UTC
→ Neo4j: find users with repeat purchases (≥2x, 90+ day history)
→ Filter: category hierarchy (block non-products), due today, already notified
→ Enrich: CCS batch call for points/offers/merchant data
→ Eligibility: points ≥50, not purchased too recently, not snoozed/opted-out
→ Select: top 1 candidate per user by expected value
→ Feature flag gate: consumer_dm_enabled_backend (per-user)
→ Create episode (consumer-agent) + send push (notification-service)

Where: internal/notification/repurchase/neo4j.go

A Cypher query finds users with repeat purchase patterns:

MATCH (u:User)-[r:PURCHASED]->(p:Product)
WHERE r.times >= 2
AND p.category_hierarchy IS NOT NULL
AND size(p.category_hierarchy) > 0
AND NOT p.category_hierarchy[0] IN $blockedTier1CategoryIDs
AND r.last >= datetime() - duration({days: 90})
AND r.avg_interval_days > 0

Results are paginated in batches of 1000 for memory efficiency.

Category filtering: Products must have a resolved Fetch taxonomy hierarchy. The Tier 1 (root) UUID is checked against a blocklist:

Blocked Tier 1UUIDWhy
Alcohol6da2ba91-...Age-gated
Fees33df9308-...Taxes, bag fees, bottle deposits, shipping
Savings & Couponse67f53ec-...Coupon line items
Membership and Perks98c2d3b0-...Loyalty fees

Products with no category_hierarchy (not yet enriched with taxonomy data) are also excluded.

These run before any external HTTP calls to minimize cost:

FilterWhat it checksOn failure
Already processedUser got a notification today (Valkey)Skip
Due todayPredicted send time falls within todaySkip (unless force=true)

Where: internal/scheduler/orchestrator.go

A single batch HTTP call to Consumer Context Service enriches all remaining candidates with:

  • Offer points (active offers for this user + product)
  • PPD points (Button merchant pay-per-discovery)
  • Whether the product is on Fetch Shop
  • Merchant name, price, image URL

Where: internal/scheduler/eligibility.go

Run concurrently (50 workers) after enrichment:

CheckThresholdDefault
Min purchasesr.times >= 22
Min historyFirst purchase ≥90 days ago90 days
Not too recentDays since last buy ≥ 80% of avg interval0.8
Min pointsTotal points (offer + PPD) ≥ 5050
Snooze/Opt-outUser hasn’t snoozed or opted outValkey state
PlatformUser has an iOS device["ios"]

Where: internal/scheduler/calculator.go, internal/notification/repurchase/send_time_analyzer.go

Day-level: predicted_next_purchase = last_purchase + avg_interval_days

Time-of-day: Analyzes purchase timestamps with exponential recency weighting (decay factor: 0.5). Confidence levels:

  • High (≥3 purchases, stdDev ≤ 3h): Use predicted hour
  • Medium (≥2, stdDev ≤ 5h): Use predicted hour
  • Low: Fall back to 9 AM

Send time = predicted purchase time − 24h lead, clamped to delivery window (prod: 8–10 AM local).

Where: internal/scheduler/selector.go

When a user has multiple eligible products, pick the best one:

  1. Expected Value (descending): repurchase_likelihood × (total_points − 25)
  2. Tie-break: repurchase likelihood → expected points → purchase count
  3. Max 1 notification per user per day

Flag: consumer_dm_enabled_backend (Feature Flipper, per-user rollout)

Checked right before dispatch. Gated users are logged as “would-have-sent” but NOT marked as processed — they’ll be retried if the flag enables later.

Where: internal/notification/repurchase/adapter.go

Two API calls per notification:

Step 1 — Create episode (consumer-agent):

POST /v1/users/{userID}/episodes

Contains a templated message + a product card component with the FIDO ID. The product card includes inline title, price, and image from CCS enrichment. Rover-agent re-enriches product cards at read time when the user opens the episode.

Step 2 — Send notification (notification-service):

POST /v1/gateway/notify

Includes the scheduled send time, a deeplink to the episode (fetchrewards://ai_assistant?subAction=episode&subActionValue={episodeID}), priority score (sigmoid-normalized expected value), and metadata for analytics.

Dedup: Request ID is a UUID v5 hash of repurchase_nudge:{userID}:{productID}:{date}, so the same user+product on the same day always produces the same ID.

Three variants based on available incentives:

VariantConditionTitlePush Body
ShopOnlyOn Fetch Shop, no offer”Time to restock {Product}?""Low on {Product}? Get {Points} when you buy it at {Merchant} on Fetch Shop”
DoubleDipOn Fetch Shop + offer”Double dip on points when you rebuy {Product}""Double-dip and get {Points} by repurchasing {Product} at {Merchant}…”
OfferOnlyOffer only, not on shop”Time to restock {Product}!""Earn {Points} on your next {Brand} {Product} purchase”

Points calculation:

  • ShopOnly: PPD points only
  • DoubleDip: PPD + offer points
  • OfferOnly: Offer points only

Episode body uses {{total_points}} placeholder that rover-agent replaces at delivery time after re-enrichment.

Products reach Neo4j through two Kafka ingestion paths:

SourceTopicClusterFormat
Factoid (Fetch Shop orders)factoid-stream{env}-factoidsProtobuf (Buf CSR)
Receipt (scanned receipts)pipeline-v1-processed-receipt-events{env}-receipt-infraProtobuf (plain)

During ingestion, each FIDO is:

  1. Enriched via FPS (name, brand, category, categoryId, fidoType)
  2. Filtered: skip non-PRODUCT fidoTypes (BRAND, GENERIC-CATEGORY, etc.)
  3. Resolved: categoryId → full taxonomy hierarchy via category-service
  4. Written to Neo4j as a Product node with category_id and category_hierarchy
SettingValueNotes
Run time3 AM UTC~10 PM ET
Delivery window8–10 AM localMorning engagement window
Min purchases2
Min history days90
Min expected points50Reduced from 100 (PLT-516, +24% daily sends)
Skip-recent threshold0.8× interval
Max notifications/day1 per user
Notification lead24 hours
Eligibility workers50Concurrent CCS+state checks
Allowed platformsiOS only
dry_runfalseReal sends enabled
test_modefalse
SettingValue
Delivery window0–24 (all day)
dry_runfalse
ScenarioMarked Processed?Retried Next Run?
CCS enrichment failsNoYes
Episode creation failsNoYes
Notification send failsYesNo (prevents retry storms)
Feature flag gatedNoYes (when flag enables)
Episode creation: 3 retries with exponential backoff (500ms → 1s → 2s)
  • Automatic: Daily at 3 AM UTC
  • Manual: POST /scheduler/trigger?force=true — bypasses the due-today filter
  • Distributed lock: Valkey lock (10 min TTL) prevents concurrent runs
ServiceEndpointPurpose
Neo4jCypher queriesPurchase graph — candidates, history
Category ServiceGET /category/:idTaxonomy hierarchy resolution
Profile ServiceGET /v1/users/{id}/profileUser timezone
Consumer Context ServiceGET /v1/products/{fido}/enrichOffers, PPD, merchant, price, image
Consumer AgentPOST /v1/users/{id}/episodesCreate AI assistant episode
Notification ServicePOST /v1/gateway/notifySchedule push notification
Feature FlipperSDKPer-user flag gating (consumer_dm_enabled_backend)
ValkeyKeysDedup, snooze, opt-out, processed state, distributed lock
FPS (FIDO Product Service)Batch APIProduct enrichment during Kafka ingestion
FileRole
internal/scheduler/orchestrator.goMain run loop — filters, enrichment, dispatch
internal/notification/repurchase/neo4j.goNeo4j candidate queries + category filtering
internal/notification/repurchase/handler.goEnrichment + filtering orchestration
internal/notification/repurchase/adapter.goEpisode creation + notification dispatch
internal/notification/repurchase/send_time_analyzer.goTime-of-day prediction
internal/scheduler/calculator.goDay-level scheduling math
internal/scheduler/eligibility.goPoints/history eligibility checks
internal/scheduler/selector.goPer-user candidate ranking
internal/scheduler/state.goValkey state (processed/snooze/opt-out)
internal/api/consumeragent/templates.goMessage template variants
internal/api/notificationservice/templates.goNotification request builder
internal/api/category/client.goCategory-service client for taxonomy
internal/kafka/receipt_transformer.goReceipt ingestion → graph data
internal/kafka/transformer.goFactoid ingestion → graph data
internal/graph/writer.goNeo4j persistence (MERGE queries)
cmd/unified-worker/scheduler_component.goScheduler lifecycle + cron
config/unified-worker-prod.yamlProduction config