Skip to content

Consumer Graph — Neo4j Schema Reference

Complete schema of the Neo4j knowledge graph, with every property mapped to its upstream data source.


PropertyTypeSet OnData Source
user_idstringCREATEFactoid: factoid.GetUserId() / Receipt: receipt.GetUserId()
timezonestringCREATE; COALESCE on MATCHHardcoded "UTC" from both transformers. Falls back to "America/Chicago" at write time if empty.
created_atdatetimeCREATE onlyNeo4j datetime() — server clock at first write
last_updated_atdatetimeMATCH onlyNeo4j datetime() — server clock on subsequent writes
PropertyTypeSet OnData Source
product_idstringCREATEKafka message: FIDO ID from factoid data.fidos[].id or receipt item.GetFido()
namestringCREATE; COALESCE on MATCHFIDO Product Service name field, fallback description field, fallback Receipt item.GetDescription(), final fallback "Product {fidoID}"
brandstringCREATE; COALESCE on MATCHFIDO Product Service brand field, fallback "ENRICHMENT_UNAVAILABLE"
categorystringCREATE; COALESCE on MATCHFIDO Product Service category field, fallback "UNCATEGORIZED"
created_atdatetimeCREATE onlyNeo4j datetime()
PropertyTypeSet OnData Source
category_idstringCREATEDerived: "CAT_" + normalized(product.category) — lowercase, non-alphanumeric replaced with _ (see known issue)
namestringCREATE onlyFIDO Product Service category field (via product enrichment), fallback "UNCATEGORIZED"
product_countintCREATE onlyHardcoded 0 — set on CREATE, never incremented
PropertyTypeSet OnData Source
retailer_idstringCREATEDerived: "RET_" + storeName
namestringCREATE onlyFactoid: factoid.data.retailer_info.name / Receipt: receipt.GetStoreName()
venue_typestringCREATE onlyFactoid: hardcoded "Grocery". Receipt: Retailer Service GetRetailerByStoreName() normalized to one of: Grocery, Drugstore, Warehouse, Convenience, Big Box; defaults to "Grocery"
PropertyTypeSet OnData Source
community_idstringCREATEDerived: "COMM_{categoryID}_{zipCode}" — built from user’s top categories + zip
namestringCREATE onlyDerived: "{categoryName} Shoppers - {zipCode}"
member_countintCREATE=1; +1 on MATCHComputed: incremented each time any user is assigned
primary_categorystringCREATE onlyFIDO Product Service category name (via the category the community is built around)
zip_codestringCREATE onlyReceipt: receipt.GetZip(). Factoid path provides empty string (factoids don’t carry zip codes).

PropertyTypeSet OnData Source
timestampsdatetime[]CREATE=[ts]; append on MATCHFactoid: factoid.GetOccurredAt() / Receipt: receipt.GetPurchaseDate(), fallback receipt.GetScanDate()
receipt_idsstring[]CREATE=[id]; append on MATCHFactoid: factoid.GetSummaryId() / Receipt: receipt.GetId()
timesintCREATE=1; +1 on MATCHComputed: running count of writes
lastdatetimeCREATE=ts; max(current, new) on MATCHFactoid: occurred_at / Receipt: purchase_date or scan_date
avg_interval_daysfloatCREATE=0; recalculated on MATCHComputed in Cypher: (last_timestamp - first_timestamp) / (count - 1) using the timestamps array
repurchase_likelihoodfloatCREATE=0.0; recalculated on MATCHComputed in Cypher: 1.0 / (1.0 + days_since_last / (avg_interval + 1)) when times >= 2, else 0.5
PropertyTypeData Source
(none)Derived: links product to its category. Category ID derived from Product.category at write time.
PropertyTypeSet OnData Source
purchase_countintCREATE=n; +=n on MATCHComputed in transformer: count of items in this category within a single Kafka message
PropertyTypeSet OnData Source
frequencyintCREATE=visits; +=visits on MATCHHardcoded 1 per event (both transformers set Visits: 1)
last_visitdatetimeCREATE=ts; max(current, new) on MATCHFactoid: factoid.GetOccurredAt() / Receipt: purchase_date or scan_date
PropertyTypeData Source
(none)Derived: assigned by CommunityAssigner from user’s top 1-3 categories by purchase_count + zip code

SourceTransportWhat It Provides
Kafka factoid-streamProtobuf via Buf CSR, {env}-factoids clusteruser_id, summary_id, occurred_at, FIDO IDs (with CENTS/COUNT), retailer_info.name
Kafka receipt-eventsPlain protobuf, {env}-receipt-infra clusteruser_id, receipt_id, purchase_date/scan_date, store_name, zip, item FIDOs + descriptions
FIDO Product ServiceHTTP batch API (up to 1000 FIDOs, 30s timeout, 3 retries)name, description, brand, category, barcode per FIDO ID. Note: barcode is fetched but never persisted to the graph.
Retailer ServiceHTTP with 30-min cacheRewardsStoreType per store name, normalized to venue_type. Only used by receipt path; factoid path hardcodes "Grocery".
Neo4j server clockInternalcreated_at, last_updated_at on nodes. avg_interval_days and repurchase_likelihood computed in Cypher at write time.
Derived / hardcodedInternalCategory IDs (CAT_*), Retailer IDs (RET_*), Community IDs (COMM_*), community names, product_count=0, default timezone "America/Chicago".

The notification scheduler reads from the graph to find users likely to repurchase:

MATCH (u:User)-[r:PURCHASED]->(p:Product)
WHERE r.times >= 2
AND p.category <> 'Alcohol'
AND r.last >= datetime() - duration({days: $lookbackDays})
AND r.avg_interval_days > 0
RETURN
u.user_id, u.timezone,
p.product_id, p.name, p.brand, p.category,
r.times, r.avg_interval_days, r.last,
r.repurchase_likelihood, r.timestamps
ORDER BY r.repurchase_likelihood DESC
LIMIT 10000

Filters: 2+ purchases, not Alcohol, last purchase within lookback window (default 90 days), has a computed avg interval.

Returned fields consumed by scheduler: timestamps array is used for time-of-day analysis to determine optimal notification delivery time. timezone is used to convert to local time.


There are two different implementations of categoryIDFromName:

  • internal/kafka/transformer.go:238 — normalizes: lowercase, replaces [^a-z0-9]+ with _, trims underscores. Example: "Meat & Seafood" becomes CAT_meat_seafood.
  • internal/graph/writer.go:383 — no normalization: fmt.Sprintf("CAT_%s", name). Example: "Meat & Seafood" becomes CAT_Meat & Seafood.

The transformer creates CategoryData with the normalized ID, but linkProductsToCategories in the writer re-derives the category ID from the raw Product.Category name using the writer’s non-normalizing version. This means the IN_CATEGORY MATCH may fail to find the category node that was created with the normalized ID.

The timestamps and receipt_ids arrays on PURCHASED relationships grow without bound — every new purchase appends. For frequently-purchased products this can become large over time.

Community.member_count increments on every user assignment but never decrements if a user’s category preferences shift. The count drifts upward over time.

Indexes are created at worker boot via Repository.EnsureIndexes in internal/graph/writer.go (idempotent via IF NOT EXISTS):

IndexTypeOn
user_id_idxRANGE(:User).user_id
product_id_idxRANGE(:Product).product_id
category_id_idxRANGE(:Category).category_id
retailer_id_idxRANGE(:Retailer).retailer_id
community_id_idxRANGE(:Community).community_id
purchased_last_idxRANGE()-[:PURCHASED]-().last — added so the scheduler’s repurchase-candidate query can range-seek the time window instead of post-filter scanning. See internal/notification/repurchase/neo4j.go GetRepurchaseCandidatesBatch.

No CREATE CONSTRAINT statements exist in the codebase.