Skip to content

Kafka Data Ingestion Status

Date: 2026-02-06 Environment: Production (prod-fargate, 3 ECS tasks) Image: db19cec (branch fix/PLT-319-prod-deploy)

Both Kafka consumers are running in production and writing data to Neo4j.

ConsumerKillswitch FlagStatusWrites (per 10 min)
Receiptconsumer_graph_receipt_killswitch = OFFRunning~247
Factoidconsumer_graph_factoid_killswitch = OFFRunning~184
SettingValueNotes
log_onlyfalseFull processing enabled
dry_runtrueNotification dispatcher disabled (deprecated)
consumer_graph_ingestionEnabled for all usersPer-user percentage rollout flag
Neo4jneo4j://10.4.19.205:7687Single EC2 instance

Topic: ProcessedReceiptEvent (protobuf, no schema registry) Consumer Group: configured in unified-worker-prod.yaml

  • Physical receipts with FINISHED status and FIDO-enriched items are processed and persisted to Neo4j
  • Each receipt is transformed to graph data: User, Product, Purchase, Retailer, Category nodes and relationships
  • FIDO Product Service enrichment adds product name, brand, and category metadata
  • Idempotency via Valkey cache prevents duplicate processing
FilterRateReason
Digital receipts~23%DigitalReceiptData present — intentionally skipped
No FIDOs in items~19%Some stores/items don’t have FIDO product identifiers in the receipt pipeline
Non-FINISHED status~9%REJECTED, PENDING, etc. — only FINISHED receipts are processed

Topic: factoid-stream (protobuf with Buf CSR schema registry) Consumer Group: configured in unified-worker-prod.yaml

  • SHOP ORDER_FINALIZED factoids with fido_count > 0 are processed and persisted to Neo4j
  • The consumer is currently processing backlog from 2025-11-13 (see Offset Issue below)
  • Factoids with FIDOs (fido_count=2 to 16) are enriched via FIDO service and written as graph data
FilterRateReason
Non-SHOP source~90%PLAY/MILESTONE_COMPLETED, QUEST_COMPLETED, etc. — only SHOP factoids are processed
Non-ORDER_FINALIZEDvariesORDER_PENDING, etc. — only ORDER_FINALIZED is processed
fido_count=0~67% of ORDER_FINALIZEDSome orders don’t have FIDO product data in the factoid payload

The consumer-graph-worker does not use yakl’s AWSRecommendedMSKOpts() (which sets auto.offset.reset = latest). No explicit kgo.ConsumeResetOffset() is configured, so the franz-go library default applies: auto.offset.reset = earliest (AtStart).

This means:

  • New consumer groups with no committed offset start from offset 0 (earliest available within topic retention)
  • The factoid consumer is currently processing backlog from 2025-11-13, working forward toward real-time
  • After offsets are committed, restarts resume from the last committed offset (no data loss, no re-processing)
  • Normal restart/redeployment: resumes from last committed offset
  • Reset offsets (requires stopping consumer via killswitch first):
    Terminal window
    kafka-consumer-groups.sh --bootstrap-server <brokers> \
    --group <consumer-group> --topic <topic> \
    --reset-offsets --to-datetime <timestamp> --execute
  • scripts/kill-kafka-consumer.sh (nuclear option): deletes the consumer group entirely from MSK. Next connection starts from earliest again.

Three layers of control, from coarsest to finest:

Killswitch flags (per-consumer, runtime)
consumer_graph_factoid_killswitch
consumer_graph_receipt_killswitch
|
v
Config flags (per-deployment)
log_only: true/false
|
v
Ingestion flag (per-user, percentage rollout)
consumer_graph_ingestion
  • Killswitch flags: Polled every 30s. Starts/stops the entire consumer (Kafka connection created/destroyed). No redeployment needed.
  • log_only config: Set in YAML config. When true, messages are logged but not processed. Requires redeployment.
  • consumer_graph_ingestion: Per-user flag checked on every message. Enables percentage-based rollout. Managed in Feature Flipper UI.
ComponentDetails
ECS Clusterprod-fargate, 3 tasks
Neo4jEC2 i-0f6e375c5dc124526 at 10.4.19.205, ports 7474 (Browser) / 7687 (Bolt)
Neo4j SGsg-09f3329675083d242 (prod-consumer-graph-neo4j-ec2)
Neo4j Credsneo4j / stored in config
MSKBrokers fetched via msk.GetIamBootstrapBrokers()
ValkeyIdempotency cache
FIDO ServiceProduct metadata enrichment
Retailer ServiceRetailer/store type enrichment

Logs are sent to Grafana Loki via FireLens/OpenTelemetry (NOT CloudWatch).

Loki label: service_name="consumer-graph-worker"

Key log patterns to monitor:

  • [SUCCESS] Persisted graph data — successful Neo4j writes
  • [WARN] Skipping receipt — filtered receipts (with reason)
  • [INFO] Shop order received ... fido_count=N — factoid FIDO availability
  • [ERROR] — processing errors (currently none observed)
  • FlagPoller: flag=... changed — killswitch state changes