Skip to content

Neo4j Knowledge Graph Service - Implementation Plan

Neo4j Knowledge Graph Service - Implementation Plan

Section titled “Neo4j Knowledge Graph Service - Implementation Plan”

This document outlines the complete implementation plan for integrating a Neo4j knowledge graph into the consumer-graph service. The graph will support real-time AI agent flows for personalized offer recommendations.

┌─────────────────────────────────────────────────┐
│ HTTP API Layer │
│ (handlers: recommendations, graph ops) │
└──────────────────┬──────────────────────────────┘
┌──────────────────▼──────────────────────────────┐
│ Service Layer │
│ • RecommendationService │
│ • GraphManagementService │
└──────────────────┬──────────────────────────────┘
┌──────────────────▼──────────────────────────────┐
│ Repository Layer │
│ • UserRepository │
│ • ProductRepository │
│ • OfferRepository │
│ • RecommendationRepository │
└──────────────────┬──────────────────────────────┘
┌──────────────────▼──────────────────────────────┐
│ Neo4j Driver (Connection Pool) │
└─────────────────────────────────────────────────┘
User {
user_id: String!,
zip: String,
created_at: DateTime
}
Product {
product_id: String!,
upc: String,
name: String,
brand: String,
category: String,
norm_name: String
}
Offer {
offer_id: String!,
title: String,
points: Int,
start: Date,
end: Date,
priority: Int
}
(:User)-[:PURCHASED {
times: Int,
qty: Int,
first: DateTime,
last: DateTime
}]->(:Product)
(:Product)-[:SIMILAR_TO {
score: Float,
v: Int
}]->(:Product)
(:Offer)-[:APPLIES_TO]->(:Product)
(:Product)<-[:APPLIES_TO]-(:Offer)
(:User)-[:ELIGIBLE {
start: Date
}]->(:Offer)
CREATE CONSTRAINT user_id IF NOT EXISTS FOR (u:User) REQUIRE u.user_id IS UNIQUE;
CREATE CONSTRAINT product_id IF NOT EXISTS FOR (p:Product) REQUIRE p.product_id IS UNIQUE;
CREATE CONSTRAINT offer_id IF NOT EXISTS FOR (o:Offer) REQUIRE o.offer_id IS UNIQUE;
CREATE INDEX offer_dates IF NOT EXISTS FOR (o:Offer) ON (o.start, o.end);
CREATE INDEX product_category IF NOT EXISTS FOR (p:Product) ON (p.category);
CREATE INDEX user_zip IF NOT EXISTS FOR (u:User) ON (u.zip);

Add to internal/config/config.go:

type Config struct {
Environment string `json:"environment"`
Server ServerConfig `json:"server"`
Telemetry Telemetry `json:"telemetry"`
Logging LoggingConfig `json:"logging"`
Neo4j Neo4jConfig `json:"neo4j"` // NEW
}
type Neo4jConfig struct {
URI string `json:"uri"`
Username string `json:"username"`
Password string `json:"password"`
Database string `json:"database"`
MaxConnPoolSize int `json:"max_conn_pool_size"`
ConnTimeout string `json:"conn_timeout"`
MaxConnLifetime string `json:"max_conn_lifetime"`
Encrypted bool `json:"encrypted"`
}

Local (config/local/config.json):

{
"neo4j": {
"uri": "bolt://localhost:7687",
"username": "neo4j",
"password": "local-password",
"database": "neo4j",
"max_conn_pool_size": 50,
"conn_timeout": "5s",
"max_conn_lifetime": "1h",
"encrypted": false
}
}

Dev/Stage/Prod:

  • Use environment variables for sensitive data
  • Higher connection pool sizes for prod (100+)
  • TLS encryption enabled
  • AWS Secrets Manager or Parameter Store references
pkg/
├── neo4j/
│ ├── client.go # Connection management & driver initialization
│ ├── health.go # Neo4j health checks
│ └── client_test.go
internal/
├── graph/
│ ├── repository/
│ │ ├── user.go # User node CRUD
│ │ ├── product.go # Product node CRUD
│ │ ├── offer.go # Offer node CRUD
│ │ ├── recommendation.go # Recommendation query
│ │ └── repository.go # Common interfaces
│ ├── service/
│ │ ├── recommendation.go # Business logic for recommendations
│ │ ├── graph_mgmt.go # Graph management operations
│ │ └── service.go # Service interfaces
│ ├── handler/
│ │ ├── recommendation.go # HTTP handlers for recommendations
│ │ ├── graph.go # HTTP handlers for graph ops
│ │ └── handler.go # Common handler utilities
│ └── schema/
│ └── schema.go # Schema initialization
├── models/
│ ├── user.go
│ ├── product.go
│ ├── offer.go
│ └── recommendation.go
  • GET /api/v1/recommendations?user_id={id} - Get personalized offer recommendations
  • POST /api/v1/graph/users - Create/update user nodes
  • POST /api/v1/graph/products - Create/update product nodes
  • POST /api/v1/graph/offers - Create/update offer nodes
  • POST /api/v1/graph/relationships/purchased - Create PURCHASED relationships
  • POST /api/v1/graph/relationships/similar - Create SIMILAR_TO relationships
  • POST /api/v1/graph/relationships/eligible - Create ELIGIBLE relationships
  • POST /api/v1/graph/relationships/applies-to - Create APPLIES_TO relationships
  • GET /api/v1/graph/health - Neo4j connection health
  • POST /api/v1/graph/schema/init - Initialize schema (admin)
  • POST /api/v1/graph/similarity/rebuild - Rebuild similarity graph (admin)

Option A: Neo4j Aura (Managed Service) - RECOMMENDED

  • Fully managed Neo4j in AWS
  • Auto-scaling, backups, monitoring included
  • Connection via bolt+routing:// protocol
  • Configure via environment variables in FSD

Option B: Self-hosted Neo4j on ECS

  • Deploy Neo4j as separate ECS service
  • Use ECS Service Discovery for DNS
  • More control but more operational overhead

Update consumer-graph-mcp.yml:

variables:
default:
neo4j_uri: "bolt://neo4j.internal:7687"
neo4j_database: "neo4j"
dev:
neo4j_uri: "bolt+routing://dev-neo4j.neo4j.io:7687"
stage:
neo4j_uri: "bolt+routing://stage-neo4j.neo4j.io:7687"
prod:
neo4j_uri: "bolt+routing://prod-neo4j.neo4j.io:7687"
process:
environment_variables:
NEO4J_URI: "{{neo4j_uri}}"
NEO4J_DATABASE: "{{neo4j_database}}"
NEO4J_USERNAME: "{{ssm:/consumer-graph/neo4j/username}}"
NEO4J_PASSWORD: "{{ssm:/consumer-graph/neo4j/password}}"
aws_iam_role:
policy_statements:
- Effect: "Allow"
Action:
- "ssm:GetParameter"
- "ssm:GetParameters"
Resource:
- "arn:aws:ssm:{{region}}:{{account}}:parameter/consumer-graph/neo4j/*"
// Neo4j-specific metrics
func RecordGraphQueryDuration(ctx context.Context, queryType string, durationMs float64)
func RecordGraphQueryError(ctx context.Context, queryType string, err error)
func RecordGraphConnectionPoolSize(ctx context.Context, size int)
func RecordGraphNodeCount(ctx context.Context, label string, count int)
const (
msgNeo4jConnecting = "Connecting to Neo4j"
msgNeo4jConnected = "Neo4j connection established"
msgNeo4jQueryExec = "Executing Neo4j query"
msgNeo4jQueryComplete = "Neo4j query completed"
msgNeo4jQueryFailed = "Neo4j query failed"
propQueryType = "query_type"
propDuration = "duration_ms"
propNodeCount = "node_count"
propDatabase = "database"
)
{
"status": "healthy",
"service": "consumer-graph",
"neo4j": {
"connected": true,
"database": "neo4j",
"pool_size": 45,
"last_check": "2025-11-07T12:00:00Z"
}
}
  • Mock Neo4j driver using interfaces
  • Test repository logic independently
  • Test service business logic
  • Use table-driven tests for query building
  • Use Neo4j test containers
  • Load fixture data
  • Test full query flows
  • Test concurrent access patterns
  • Benchmark recommendation queries
  • Test connection pool under load
  • Validate query execution times (<100ms target)
  1. Add Neo4j Go driver dependency
  2. Implement Neo4j client with connection pooling
  3. Add Neo4j configuration to config system
  4. Update environment configs
  5. Add Neo4j health check
  6. Write unit tests for client
  1. Implement User repository
  2. Implement Product repository
  3. Implement Offer repository
  4. Create schema initialization
  5. Write repository unit tests with mocks
  6. Add repository metrics and logging
  1. Implement Recommendation repository
  2. Implement Recommendation service
  3. Add recommendation metrics and logging
  4. Write integration tests
  5. Optimize query performance
  1. Implement Recommendation handlers
  2. Register endpoints in server
  3. Add request validation and error handling
  4. Add API documentation
  5. Write handler tests
  1. Implement CRUD handlers for Users, Products, Offers
  2. Implement relationship creation endpoints
  3. Add batch import endpoints
  4. Add admin endpoints (schema, similarity rebuild)
  5. Write comprehensive tests
  1. Set up Neo4j Aura instance (or alternative)
  2. Configure AWS Parameter Store for credentials
  3. Update FSD configuration
  4. Deploy to dev environment
  5. Load initial test data
  6. Run smoke tests
  1. Performance testing and tuning
  2. Connection pool optimization
  3. Query optimization with EXPLAIN/PROFILE
  4. Set up monitoring dashboards
  5. Document runbooks
  6. Stage deployment
  7. Production deployment
github.com/neo4j/neo4j-go-driver/v5 v5.15.0
github.com/testcontainers/testcontainers-go v0.26.0
github.com/testcontainers/testcontainers-go/modules/neo4j v0.26.0
  • Store credentials in AWS Systems Manager Parameter Store
  • Never commit passwords to git
  • Use encrypted parameters for stage/prod
  • Rotate credentials regularly
/consumer-graph/dev/neo4j/username
/consumer-graph/dev/neo4j/password
/consumer-graph/dev/neo4j/uri
/consumer-graph/stage/neo4j/username
/consumer-graph/stage/neo4j/password
/consumer-graph/stage/neo4j/uri
/consumer-graph/prod/neo4j/username
/consumer-graph/prod/neo4j/password
/consumer-graph/prod/neo4j/uri
  • Use TLS encryption for prod/stage
  • Enable VPC peering for Neo4j Aura
  • Restrict access via security groups
  • Use IAM roles for AWS service access
  • Recommendation query: < 100ms p95
  • Connection acquisition: < 10ms p95
  • Graph write operations: < 50ms p95
  • Health check: < 5ms p95
  • Neo4j connection pool utilization
  • Query execution times by type
  • Error rates by operation
  • Database size and growth rate
  • Memory usage on Neo4j instances
  • Connection pool exhaustion
  • Query timeouts > 1s
  • Error rate > 1%
  • Database disk usage > 80%
  • Unusual query patterns

The core recommendation query from the schema:

MATCH (u:User {user_id:$user_id})
MATCH (u)-[bp:PURCHASED]->(p:Product)
WHERE bp.last >= datetime() - duration({days:$lookback_days})
MATCH (p)-[s:SIMILAR_TO]->(cand:Product)
MATCH (u)-[e:ELIGIBLE]->(o:Offer)-[:APPLIES_TO]->(cand)
WHERE e.start <= $as_of
AND o.start <= $as_of AND $as_of <= o.end
OPTIONAL MATCH (u)-[rp:PURCHASED]->(cand)
WITH p, cand, s, o, rp
WHERE rp IS NULL OR rp.last < datetime() - duration({days:$exclude_days})
WITH p, cand, s, o
ORDER BY p.product_id, s.score DESC
WITH p, collect({cand:cand, score:s.score, o:o})[..$per_seed_cap] AS perSeed
UNWIND perSeed AS row
WITH row.cand AS cand, row.score AS sim_score, row.o AS o
WITH cand, o, sum(sim_score) AS agg_sim
WITH cand, o, toInteger(agg_sim * (log(1 + o.points)/log(2)) * 100) AS rank
ORDER BY rank DESC, o.priority DESC, o.end ASC
RETURN cand.product_id AS product_id,
cand.name AS name,
o.offer_id AS offer_id,
o.points AS points,
o.end AS offer_end,
rank
LIMIT $limitN

Query Parameters:

  • user_id: Target user ID
  • as_of: Current date (defaults to today)
  • lookback_days: 120 (default)
  • exclude_days: 14 (default)
  • per_seed_cap: 80 (default)
  • limitN: 20 (default)
  • Hydrate ELIGIBLE relationships hourly from source system
  • Only store active or upcoming offers
  • Delete expired edges where offer.end < today
  • Rebuild nightly using Neo4j GDS
  • Use similarityCutoff instead of topK for bounded fan-out
  • Consider separate relationship types for strong/weak edges
  • Periodically delete expired ELIGIBLE edges
  • Archive old PURCHASED relationships
  • Monitor and manage database size
  • Start with vertical scaling (increase instance size)
  • Use read replicas for query load distribution
  • Consider sharding by user cohorts if needed
  • Monitor query patterns and optimize hot paths
  • Neo4j client connects successfully
  • Health check endpoint returns connection status
  • Configuration loads from environment-specific files
  • Unit tests pass with >80% coverage
  • All repository CRUD operations work
  • Schema initializes correctly
  • Indexes and constraints are created
  • Repository tests pass
  • Recommendation query executes successfully
  • Query performance meets <100ms target
  • Integration tests pass with real Neo4j
  • Metrics are recorded correctly
  • All API endpoints are functional
  • Request validation works
  • Error handling is comprehensive
  • API tests pass
  • All tests pass (unit, integration, performance)
  • Monitoring and alerts configured
  • Documentation complete
  • Deployed to stage and tested
  • Runbooks created
  • Security review complete