Neo4j Knowledge Graph Service - Implementation Plan
Neo4j Knowledge Graph Service - Implementation Plan
Section titled “Neo4j Knowledge Graph Service - Implementation Plan”Overview
Section titled “Overview”This document outlines the complete implementation plan for integrating a Neo4j knowledge graph into the consumer-graph service. The graph will support real-time AI agent flows for personalized offer recommendations.
Architecture Overview
Section titled “Architecture Overview”┌─────────────────────────────────────────────────┐│ HTTP API Layer ││ (handlers: recommendations, graph ops) │└──────────────────┬──────────────────────────────┘ │┌──────────────────▼──────────────────────────────┐│ Service Layer ││ • RecommendationService ││ • GraphManagementService │└──────────────────┬──────────────────────────────┘ │┌──────────────────▼──────────────────────────────┐│ Repository Layer ││ • UserRepository ││ • ProductRepository ││ • OfferRepository ││ • RecommendationRepository │└──────────────────┬──────────────────────────────┘ │┌──────────────────▼──────────────────────────────┐│ Neo4j Driver (Connection Pool) │└─────────────────────────────────────────────────┘Neo4j Schema Design
Section titled “Neo4j Schema Design”Core Entities (Nodes)
Section titled “Core Entities (Nodes)”User { user_id: String!, zip: String, created_at: DateTime}Product
Section titled “Product”Product { product_id: String!, upc: String, name: String, brand: String, category: String, norm_name: String}Offer { offer_id: String!, title: String, points: Int, start: Date, end: Date, priority: Int}Relationships
Section titled “Relationships”Purchases (User → Product)
Section titled “Purchases (User → Product)”(:User)-[:PURCHASED { times: Int, qty: Int, first: DateTime, last: DateTime}]->(:Product)Product Similarity
Section titled “Product Similarity”(:Product)-[:SIMILAR_TO { score: Float, v: Int}]->(:Product)Offer Applicability
Section titled “Offer Applicability”(:Offer)-[:APPLIES_TO]->(:Product)(:Product)<-[:APPLIES_TO]-(:Offer)User Eligibility
Section titled “User Eligibility”(:User)-[:ELIGIBLE { start: Date}]->(:Offer)Indexes and Constraints
Section titled “Indexes and Constraints”CREATE CONSTRAINT user_id IF NOT EXISTS FOR (u:User) REQUIRE u.user_id IS UNIQUE;CREATE CONSTRAINT product_id IF NOT EXISTS FOR (p:Product) REQUIRE p.product_id IS UNIQUE;CREATE CONSTRAINT offer_id IF NOT EXISTS FOR (o:Offer) REQUIRE o.offer_id IS UNIQUE;CREATE INDEX offer_dates IF NOT EXISTS FOR (o:Offer) ON (o.start, o.end);CREATE INDEX product_category IF NOT EXISTS FOR (p:Product) ON (p.category);CREATE INDEX user_zip IF NOT EXISTS FOR (u:User) ON (u.zip);Configuration Structure
Section titled “Configuration Structure”Config Extensions
Section titled “Config Extensions”Add to internal/config/config.go:
type Config struct { Environment string `json:"environment"` Server ServerConfig `json:"server"` Telemetry Telemetry `json:"telemetry"` Logging LoggingConfig `json:"logging"` Neo4j Neo4jConfig `json:"neo4j"` // NEW}
type Neo4jConfig struct { URI string `json:"uri"` Username string `json:"username"` Password string `json:"password"` Database string `json:"database"` MaxConnPoolSize int `json:"max_conn_pool_size"` ConnTimeout string `json:"conn_timeout"` MaxConnLifetime string `json:"max_conn_lifetime"` Encrypted bool `json:"encrypted"`}Environment Configurations
Section titled “Environment Configurations”Local (config/local/config.json):
{ "neo4j": { "uri": "bolt://localhost:7687", "username": "neo4j", "password": "local-password", "database": "neo4j", "max_conn_pool_size": 50, "conn_timeout": "5s", "max_conn_lifetime": "1h", "encrypted": false }}Dev/Stage/Prod:
- Use environment variables for sensitive data
- Higher connection pool sizes for prod (100+)
- TLS encryption enabled
- AWS Secrets Manager or Parameter Store references
Package Structure
Section titled “Package Structure”pkg/├── neo4j/│ ├── client.go # Connection management & driver initialization│ ├── health.go # Neo4j health checks│ └── client_test.go
internal/├── graph/│ ├── repository/│ │ ├── user.go # User node CRUD│ │ ├── product.go # Product node CRUD│ │ ├── offer.go # Offer node CRUD│ │ ├── recommendation.go # Recommendation query│ │ └── repository.go # Common interfaces│ ├── service/│ │ ├── recommendation.go # Business logic for recommendations│ │ ├── graph_mgmt.go # Graph management operations│ │ └── service.go # Service interfaces│ ├── handler/│ │ ├── recommendation.go # HTTP handlers for recommendations│ │ ├── graph.go # HTTP handlers for graph ops│ │ └── handler.go # Common handler utilities│ └── schema/│ └── schema.go # Schema initialization├── models/│ ├── user.go│ ├── product.go│ ├── offer.go│ └── recommendation.goAPI Endpoints
Section titled “API Endpoints”Recommendation Endpoints
Section titled “Recommendation Endpoints”GET /api/v1/recommendations?user_id={id}- Get personalized offer recommendations
Graph Management Endpoints
Section titled “Graph Management Endpoints”POST /api/v1/graph/users- Create/update user nodesPOST /api/v1/graph/products- Create/update product nodesPOST /api/v1/graph/offers- Create/update offer nodesPOST /api/v1/graph/relationships/purchased- Create PURCHASED relationshipsPOST /api/v1/graph/relationships/similar- Create SIMILAR_TO relationshipsPOST /api/v1/graph/relationships/eligible- Create ELIGIBLE relationshipsPOST /api/v1/graph/relationships/applies-to- Create APPLIES_TO relationships
Health & Admin Endpoints
Section titled “Health & Admin Endpoints”GET /api/v1/graph/health- Neo4j connection healthPOST /api/v1/graph/schema/init- Initialize schema (admin)POST /api/v1/graph/similarity/rebuild- Rebuild similarity graph (admin)
Deployment Strategy with FSD
Section titled “Deployment Strategy with FSD”Neo4j Deployment Options
Section titled “Neo4j Deployment Options”Option A: Neo4j Aura (Managed Service) - RECOMMENDED
- Fully managed Neo4j in AWS
- Auto-scaling, backups, monitoring included
- Connection via
bolt+routing://protocol - Configure via environment variables in FSD
Option B: Self-hosted Neo4j on ECS
- Deploy Neo4j as separate ECS service
- Use ECS Service Discovery for DNS
- More control but more operational overhead
FSD Configuration Updates
Section titled “FSD Configuration Updates”Update consumer-graph-mcp.yml:
variables: default: neo4j_uri: "bolt://neo4j.internal:7687" neo4j_database: "neo4j" dev: neo4j_uri: "bolt+routing://dev-neo4j.neo4j.io:7687" stage: neo4j_uri: "bolt+routing://stage-neo4j.neo4j.io:7687" prod: neo4j_uri: "bolt+routing://prod-neo4j.neo4j.io:7687"
process: environment_variables: NEO4J_URI: "{{neo4j_uri}}" NEO4J_DATABASE: "{{neo4j_database}}" NEO4J_USERNAME: "{{ssm:/consumer-graph/neo4j/username}}" NEO4J_PASSWORD: "{{ssm:/consumer-graph/neo4j/password}}"
aws_iam_role: policy_statements: - Effect: "Allow" Action: - "ssm:GetParameter" - "ssm:GetParameters" Resource: - "arn:aws:ssm:{{region}}:{{account}}:parameter/consumer-graph/neo4j/*"Observability
Section titled “Observability”Metrics to Add
Section titled “Metrics to Add”// Neo4j-specific metricsfunc RecordGraphQueryDuration(ctx context.Context, queryType string, durationMs float64)func RecordGraphQueryError(ctx context.Context, queryType string, err error)func RecordGraphConnectionPoolSize(ctx context.Context, size int)func RecordGraphNodeCount(ctx context.Context, label string, count int)Logging Standards
Section titled “Logging Standards”const ( msgNeo4jConnecting = "Connecting to Neo4j" msgNeo4jConnected = "Neo4j connection established" msgNeo4jQueryExec = "Executing Neo4j query" msgNeo4jQueryComplete = "Neo4j query completed" msgNeo4jQueryFailed = "Neo4j query failed"
propQueryType = "query_type" propDuration = "duration_ms" propNodeCount = "node_count" propDatabase = "database")Health Check Response
Section titled “Health Check Response”{ "status": "healthy", "service": "consumer-graph", "neo4j": { "connected": true, "database": "neo4j", "pool_size": 45, "last_check": "2025-11-07T12:00:00Z" }}Testing Strategy
Section titled “Testing Strategy”Unit Tests
Section titled “Unit Tests”- Mock Neo4j driver using interfaces
- Test repository logic independently
- Test service business logic
- Use table-driven tests for query building
Integration Tests
Section titled “Integration Tests”- Use Neo4j test containers
- Load fixture data
- Test full query flows
- Test concurrent access patterns
Performance Tests
Section titled “Performance Tests”- Benchmark recommendation queries
- Test connection pool under load
- Validate query execution times (
<100mstarget)
Phased Implementation
Section titled “Phased Implementation”Phase 1: Foundation (Week 1)
Section titled “Phase 1: Foundation (Week 1)”- Add Neo4j Go driver dependency
- Implement Neo4j client with connection pooling
- Add Neo4j configuration to config system
- Update environment configs
- Add Neo4j health check
- Write unit tests for client
Phase 2: Core Repositories (Week 1-2)
Section titled “Phase 2: Core Repositories (Week 1-2)”- Implement User repository
- Implement Product repository
- Implement Offer repository
- Create schema initialization
- Write repository unit tests with mocks
- Add repository metrics and logging
Phase 3: Recommendation Engine (Week 2)
Section titled “Phase 3: Recommendation Engine (Week 2)”- Implement Recommendation repository
- Implement Recommendation service
- Add recommendation metrics and logging
- Write integration tests
- Optimize query performance
Phase 4: HTTP API (Week 2-3)
Section titled “Phase 4: HTTP API (Week 2-3)”- Implement Recommendation handlers
- Register endpoints in server
- Add request validation and error handling
- Add API documentation
- Write handler tests
Phase 5: Graph Management API (Week 3)
Section titled “Phase 5: Graph Management API (Week 3)”- Implement CRUD handlers for Users, Products, Offers
- Implement relationship creation endpoints
- Add batch import endpoints
- Add admin endpoints (schema, similarity rebuild)
- Write comprehensive tests
Phase 6: Deployment (Week 3-4)
Section titled “Phase 6: Deployment (Week 3-4)”- Set up Neo4j Aura instance (or alternative)
- Configure AWS Parameter Store for credentials
- Update FSD configuration
- Deploy to dev environment
- Load initial test data
- Run smoke tests
Phase 7: Production Readiness (Week 4)
Section titled “Phase 7: Production Readiness (Week 4)”- Performance testing and tuning
- Connection pool optimization
- Query optimization with EXPLAIN/PROFILE
- Set up monitoring dashboards
- Document runbooks
- Stage deployment
- Production deployment
Dependencies
Section titled “Dependencies”Go Modules to Add
Section titled “Go Modules to Add”github.com/neo4j/neo4j-go-driver/v5 v5.15.0Testing Dependencies
Section titled “Testing Dependencies”github.com/testcontainers/testcontainers-go v0.26.0github.com/testcontainers/testcontainers-go/modules/neo4j v0.26.0Security Considerations
Section titled “Security Considerations”Credential Management
Section titled “Credential Management”- Store credentials in AWS Systems Manager Parameter Store
- Never commit passwords to git
- Use encrypted parameters for stage/prod
- Rotate credentials regularly
Parameter Store Structure
Section titled “Parameter Store Structure”/consumer-graph/dev/neo4j/username/consumer-graph/dev/neo4j/password/consumer-graph/dev/neo4j/uri/consumer-graph/stage/neo4j/username/consumer-graph/stage/neo4j/password/consumer-graph/stage/neo4j/uri/consumer-graph/prod/neo4j/username/consumer-graph/prod/neo4j/password/consumer-graph/prod/neo4j/uriNetwork Security
Section titled “Network Security”- Use TLS encryption for prod/stage
- Enable VPC peering for Neo4j Aura
- Restrict access via security groups
- Use IAM roles for AWS service access
Performance Targets
Section titled “Performance Targets”- Recommendation query: < 100ms p95
- Connection acquisition: < 10ms p95
- Graph write operations: < 50ms p95
- Health check: < 5ms p95
Monitoring & Alerts
Section titled “Monitoring & Alerts”Key Metrics to Monitor
Section titled “Key Metrics to Monitor”- Neo4j connection pool utilization
- Query execution times by type
- Error rates by operation
- Database size and growth rate
- Memory usage on Neo4j instances
Alerts to Configure
Section titled “Alerts to Configure”- Connection pool exhaustion
- Query timeouts > 1s
- Error rate > 1%
- Database disk usage > 80%
- Unusual query patterns
Recommendation Query Implementation
Section titled “Recommendation Query Implementation”The core recommendation query from the schema:
MATCH (u:User {user_id:$user_id})MATCH (u)-[bp:PURCHASED]->(p:Product)WHERE bp.last >= datetime() - duration({days:$lookback_days})
MATCH (p)-[s:SIMILAR_TO]->(cand:Product)MATCH (u)-[e:ELIGIBLE]->(o:Offer)-[:APPLIES_TO]->(cand)WHERE e.start <= $as_of AND o.start <= $as_of AND $as_of <= o.end
OPTIONAL MATCH (u)-[rp:PURCHASED]->(cand)WITH p, cand, s, o, rpWHERE rp IS NULL OR rp.last < datetime() - duration({days:$exclude_days})
WITH p, cand, s, oORDER BY p.product_id, s.score DESCWITH p, collect({cand:cand, score:s.score, o:o})[..$per_seed_cap] AS perSeedUNWIND perSeed AS rowWITH row.cand AS cand, row.score AS sim_score, row.o AS o
WITH cand, o, sum(sim_score) AS agg_simWITH cand, o, toInteger(agg_sim * (log(1 + o.points)/log(2)) * 100) AS rankORDER BY rank DESC, o.priority DESC, o.end ASCRETURN cand.product_id AS product_id, cand.name AS name, o.offer_id AS offer_id, o.points AS points, o.end AS offer_end, rankLIMIT $limitNQuery Parameters:
user_id: Target user IDas_of: Current date (defaults to today)lookback_days: 120 (default)exclude_days: 14 (default)per_seed_cap: 80 (default)limitN: 20 (default)
Operational Considerations
Section titled “Operational Considerations”Eligibility Ingestion
Section titled “Eligibility Ingestion”- Hydrate ELIGIBLE relationships hourly from source system
- Only store active or upcoming offers
- Delete expired edges where
offer.end < today
Similarity Graph
Section titled “Similarity Graph”- Rebuild nightly using Neo4j GDS
- Use
similarityCutoffinstead oftopKfor bounded fan-out - Consider separate relationship types for strong/weak edges
Cleanup Jobs
Section titled “Cleanup Jobs”- Periodically delete expired ELIGIBLE edges
- Archive old PURCHASED relationships
- Monitor and manage database size
Scaling Considerations
Section titled “Scaling Considerations”- Start with vertical scaling (increase instance size)
- Use read replicas for query load distribution
- Consider sharding by user cohorts if needed
- Monitor query patterns and optimize hot paths
Success Criteria
Section titled “Success Criteria”Phase 1 Complete When:
Section titled “Phase 1 Complete When:”- Neo4j client connects successfully
- Health check endpoint returns connection status
- Configuration loads from environment-specific files
- Unit tests pass with >80% coverage
Phase 2 Complete When:
Section titled “Phase 2 Complete When:”- All repository CRUD operations work
- Schema initializes correctly
- Indexes and constraints are created
- Repository tests pass
Phase 3 Complete When:
Section titled “Phase 3 Complete When:”- Recommendation query executes successfully
- Query performance meets
<100mstarget - Integration tests pass with real Neo4j
- Metrics are recorded correctly
Phase 4 Complete When:
Section titled “Phase 4 Complete When:”- All API endpoints are functional
- Request validation works
- Error handling is comprehensive
- API tests pass
Production Ready When:
Section titled “Production Ready When:”- All tests pass (unit, integration, performance)
- Monitoring and alerts configured
- Documentation complete
- Deployed to stage and tested
- Runbooks created
- Security review complete
References
Section titled “References”- Neo4j Go Driver Documentation
- Neo4j Aura
- OpenAI Cookbook: Temporal Agents with Knowledge Graphs
- FSD Documentation (internal)