Kafka Data Ingestion Status
Kafka Data Ingestion Status
Section titled “Kafka Data Ingestion Status”Date: 2026-02-06
Environment: Production (prod-fargate, 3 ECS tasks)
Image: db19cec (branch fix/PLT-319-prod-deploy)
Current State
Section titled “Current State”Both Kafka consumers are running in production and writing data to Neo4j.
| Consumer | Killswitch Flag | Status | Writes (per 10 min) |
|---|---|---|---|
| Receipt | consumer_graph_receipt_killswitch = OFF | Running | ~247 |
| Factoid | consumer_graph_factoid_killswitch = OFF | Running | ~184 |
Configuration
Section titled “Configuration”| Setting | Value | Notes |
|---|---|---|
log_only | false | Full processing enabled |
dry_run | true | Notification dispatcher disabled (deprecated) |
consumer_graph_ingestion | Enabled for all users | Per-user percentage rollout flag |
| Neo4j | neo4j://10.4.19.205:7687 | Single EC2 instance |
Receipt Consumer
Section titled “Receipt Consumer”Topic: ProcessedReceiptEvent (protobuf, no schema registry)
Consumer Group: configured in unified-worker-prod.yaml
What’s Working
Section titled “What’s Working”- Physical receipts with FINISHED status and FIDO-enriched items are processed and persisted to Neo4j
- Each receipt is transformed to graph data: User, Product, Purchase, Retailer, Category nodes and relationships
- FIDO Product Service enrichment adds product name, brand, and category metadata
- Idempotency via Valkey cache prevents duplicate processing
What’s Filtered (Expected)
Section titled “What’s Filtered (Expected)”| Filter | Rate | Reason |
|---|---|---|
| Digital receipts | ~23% | DigitalReceiptData present — intentionally skipped |
| No FIDOs in items | ~19% | Some stores/items don’t have FIDO product identifiers in the receipt pipeline |
| Non-FINISHED status | ~9% | REJECTED, PENDING, etc. — only FINISHED receipts are processed |
Factoid Consumer
Section titled “Factoid Consumer”Topic: factoid-stream (protobuf with Buf CSR schema registry)
Consumer Group: configured in unified-worker-prod.yaml
What’s Working
Section titled “What’s Working”- SHOP ORDER_FINALIZED factoids with
fido_count > 0are processed and persisted to Neo4j - The consumer is currently processing backlog from 2025-11-13 (see Offset Issue below)
- Factoids with FIDOs (fido_count=2 to 16) are enriched via FIDO service and written as graph data
What’s Filtered (Expected)
Section titled “What’s Filtered (Expected)”| Filter | Rate | Reason |
|---|---|---|
| Non-SHOP source | ~90% | PLAY/MILESTONE_COMPLETED, QUEST_COMPLETED, etc. — only SHOP factoids are processed |
| Non-ORDER_FINALIZED | varies | ORDER_PENDING, etc. — only ORDER_FINALIZED is processed |
| fido_count=0 | ~67% of ORDER_FINALIZED | Some orders don’t have FIDO product data in the factoid payload |
Offset Behavior
Section titled “Offset Behavior”The consumer-graph-worker does not use yakl’s AWSRecommendedMSKOpts() (which sets auto.offset.reset = latest). No explicit kgo.ConsumeResetOffset() is configured, so the franz-go library default applies: auto.offset.reset = earliest (AtStart).
This means:
- New consumer groups with no committed offset start from offset 0 (earliest available within topic retention)
- The factoid consumer is currently processing backlog from 2025-11-13, working forward toward real-time
- After offsets are committed, restarts resume from the last committed offset (no data loss, no re-processing)
Offset Management
Section titled “Offset Management”- Normal restart/redeployment: resumes from last committed offset
- Reset offsets (requires stopping consumer via killswitch first):
Terminal window kafka-consumer-groups.sh --bootstrap-server <brokers> \--group <consumer-group> --topic <topic> \--reset-offsets --to-datetime <timestamp> --execute scripts/kill-kafka-consumer.sh(nuclear option): deletes the consumer group entirely from MSK. Next connection starts from earliest again.
Feature Flag Architecture
Section titled “Feature Flag Architecture”Three layers of control, from coarsest to finest:
Killswitch flags (per-consumer, runtime) consumer_graph_factoid_killswitch consumer_graph_receipt_killswitch | vConfig flags (per-deployment) log_only: true/false | vIngestion flag (per-user, percentage rollout) consumer_graph_ingestion- Killswitch flags: Polled every 30s. Starts/stops the entire consumer (Kafka connection created/destroyed). No redeployment needed.
log_onlyconfig: Set in YAML config. When true, messages are logged but not processed. Requires redeployment.consumer_graph_ingestion: Per-user flag checked on every message. Enables percentage-based rollout. Managed in Feature Flipper UI.
Infrastructure
Section titled “Infrastructure”| Component | Details |
|---|---|
| ECS Cluster | prod-fargate, 3 tasks |
| Neo4j | EC2 i-0f6e375c5dc124526 at 10.4.19.205, ports 7474 (Browser) / 7687 (Bolt) |
| Neo4j SG | sg-09f3329675083d242 (prod-consumer-graph-neo4j-ec2) |
| Neo4j Creds | neo4j / stored in config |
| MSK | Brokers fetched via msk.GetIamBootstrapBrokers() |
| Valkey | Idempotency cache |
| FIDO Service | Product metadata enrichment |
| Retailer Service | Retailer/store type enrichment |
Monitoring
Section titled “Monitoring”Logs are sent to Grafana Loki via FireLens/OpenTelemetry (NOT CloudWatch).
Loki label: service_name="consumer-graph-worker"
Key log patterns to monitor:
[SUCCESS] Persisted graph data— successful Neo4j writes[WARN] Skipping receipt— filtered receipts (with reason)[INFO] Shop order received ... fido_count=N— factoid FIDO availability[ERROR]— processing errors (currently none observed)FlagPoller: flag=... changed— killswitch state changes