Experiment Enrollment API — Design Document
Experiment Enrollment API — Design Document
Section titled “Experiment Enrollment API — Design Document”Overview
Section titled “Overview”The experiment team will randomly assign user IDs to the AI assistant experiment (rather than us pre-selecting 200K users). The mobile app sends each assigned user’s ID to a new backend API endpoint. The backend detects first-time users via Valkey, triggers a backfill of their purchase history, and enables real-time Kafka ingestion — all automatically on first contact.
Service Architecture
Section titled “Service Architecture”The enrollment endpoint lives on consumer-graph-worker and is routed through the KrakenD mobile gateway — the same gateway that handles all mobile app traffic. KrakenD provides token validation and injects standard auth headers, including the authenticated fetch-auth-userid.
POST /consumer-graph-worker/enroll┌──────────┐ ┌──────────────┐ ┌──────────────────────┐│ Mobile │──────────▶│ KrakenD │──────────▶│ consumer-graph- ││ App │◀──────────│ (mobile gw) │◀──────────│ worker │└──────────┘ └──────────────┘ └──────────┬───────────┘ Auth validated │ Headers injected: ┌───────┼──────────┐ • fetch-auth-userid │ │ │ • fetch-auth-role ▼ ▼ ▼ • fetch-auth-token ┌────────┐ ┌─────┐ ┌────────┐ │ Valkey │ │ SQS │ │Feature │ │(enroll)│ │ │ │Flipper │ └────────┘ └──┬──┘ └────────┘ │ ▼ ┌──────────────┐ │ Purchase Hx │ │ API → Neo4j │ └──────────────┘Why KrakenD instead of routing through rover-agent:
- Auth built-in. KrakenD validates the mobile app’s access token and injects
fetch-auth-userid— the authenticated user ID. The enrollment handler reads the user ID from this trusted header rather than a URL path parameter, ensuring the caller is authorized to enroll that user. - Standard headers. KrakenD injects
fetch-auth-role,fetch-auth-scopes, andfetch-auth-tokenautomatically. No custom auth logic needed in consumer-graph-worker. - All mobile traffic already goes through KrakenD. Adding a route is a KrakenD config change — no code changes needed in rover-agent at all.
- consumer-graph-worker already has Valkey, SQS, the backfill orchestrator, and the
/api/v1/users/{user_id}/loadendpoint. The enrollment handler is a thin addition that reuses all existing infrastructure.
What changes
Section titled “What changes”| Component | Change | Scope |
|---|---|---|
| KrakenD mobile gateway | Add route config for /consumer-graph-worker/enroll → consumer-graph-worker backend | Config change |
| consumer-graph-worker | Add handleEnroll handler, three Valkey methods, Kafka ingestion gate, OTel metrics | ~260 lines (incl. tests) |
Step-by-step flow for a request:
Section titled “Step-by-step flow for a request:”-
Mobile app sends
POST /consumer-graph-worker/enrollwith its access token (Authorization: Bearer <token>) on every session start (or whenever the experiment is active for that user). -
KrakenD validates the access token, extracts the user identity, and forwards the request to consumer-graph-worker with injected headers:
fetch-auth-userid— the authenticated Fetch user IDfetch-auth-role— user rolefetch-auth-token— service-to-service auth token
-
Consumer-graph-worker reads
user_idfrom thefetch-auth-useridheader and checks Valkey keyexperiment:enrolled:{user_id}:- Key exists → User already enrolled. Return
200 { "status": "existing" }immediately. This is the fast path (~1ms). - Key does not exist → New user. Continue to step 4.
- Key exists → User already enrolled. Return
-
Set Valkey key
experiment:enrolled:{user_id}with no TTL (permanent for the duration of the experiment). UseSetNXto handle race conditions from concurrent requests for the same user. Setting the key first ensures the Kafka ingestion gate is active before returning success. -
Send SQS backfill message to
{env}-consumer-graph-queueto load 365 days of purchase history:{"event_type": "user_load_request","user_id": "<user_id>","reason": "experiment_enrollment","requested_at": "<RFC3339>","lookback_days": 365,"request_id": "enroll-<uuid>","priority": "normal"}The existing SQS worker picks this up, calls the Purchase History API, and writes the full graph to Neo4j. The
loaded_users:{user_id}key (7-day TTL) prevents duplicate backfills.On SQS failure: The enrollment key is rolled back (
DeleteExperimentEnrolled) so the client can retry cleanly. If the rollback itself fails, the user is in a “stuck” state (enrolled but no backfill) — this is tracked via theconsumer_graph.enroll.total{status=error_rollback_failed}OTel metric for alerting. -
Consumer-graph-worker returns response through KrakenD to mobile:
200 { "status": "enrolled", "backfill_triggered": true }. -
Kafka ingestion begins automatically. The Kafka handlers (
handler.go,receipt_handler.go) are modified to check theexperiment:enrolled:{user_id}Valkey key as an alternative gate alongside the existing Feature Flipper check. Once the key is set in step 4, all future receipt/factoid events for this user flow through.
API Specification
Section titled “API Specification”POST /consumer-graph-worker/enroll
Section titled “POST /consumer-graph-worker/enroll”Request: No body required. User ID is extracted from the fetch-auth-userid header injected by KrakenD (not from the URL path).
Response:
| Scenario | Status | Body |
|---|---|---|
| New user, backfill triggered | 200 | { "status": "enrolled", "backfill_triggered": true } |
| Already enrolled | 200 | { "status": "existing", "backfill_triggered": false } |
| Missing fetch-auth-userid header | 400 | { "error": "Bad Request", "message": "user_id is required" } |
| Unauthenticated (no/invalid token) | 401 | Rejected by KrakenD before reaching consumer-graph-worker |
| Internal error | 500 | { "error": "Internal Server Error", "message": "..." } |
Idempotency: The endpoint is fully idempotent. Calling it multiple times for the same user has no side effects beyond the first call. The SetNX on the Valkey key and the existing loaded_users:{user_id} check in the SQS worker both guard against duplicates.
Latency:
- Existing user (cache hit): ~1-2ms
- New user (cache miss + SQS send): ~10-50ms (backfill runs asynchronously)
Valkey Key Design
Section titled “Valkey Key Design”| Key | Value | TTL | Purpose |
|---|---|---|---|
experiment:enrolled:{user_id} | <RFC3339 timestamp> | None (permanent) | Track enrolled users; fast dedup on every mobile request |
loaded_users:{user_id} | <RFC3339 timestamp> | 7 days | (Existing) Prevent duplicate backfills within 7-day window |
idempotency:enroll-{uuid} | "1" | 24 hours | (Existing) SQS message dedup |
Why a separate experiment:enrolled key instead of reusing loaded_users?
loaded_usershas a 7-day TTL. After 7 days, a returning user would trigger another backfill unnecessarily.experiment:enrolledis permanent — once enrolled, always enrolled. The mobile app can call the endpoint on every session without triggering redundant work.
Kafka Ingestion Gate
Section titled “Kafka Ingestion Gate”The consumer_graph_ingestion Feature Flipper flag currently gates which users’ Kafka messages get processed. Since the experiment team assigns users dynamically (we don’t have the full list upfront), we can’t pre-load them into the flag. Instead, we use the Valkey enrollment key as an alternative gate.
Current gating logic (Kafka handlers)
Section titled “Current gating logic (Kafka handlers)”// internal/kafka/handler.go, line ~66if !featureflag.IsIngestionEnabled(userID) { // skip processing}New gating logic
Section titled “New gating logic”Both FactoidHandler and ReceiptHandler use an isIngestionAllowed method:
func (h *FactoidHandler) isIngestionAllowed(ctx context.Context, userID string) bool { if featureflag.IsIngestionEnabled(userID) { return true } // Fallback: check experiment enrollment in Valkey if h.cache != nil { enrolled, err := h.cache.IsExperimentEnrolled(ctx, userID) if err != nil { // Fail closed — skip event rather than processing all users during outage return false } return enrolled } return false}Why this approach over Feature Flipper mutation:
- No FF API dependency at enrollment time. Feature Flipper’s REST API would require auth, and flag changes propagate via Kafka topic (
feature-flipper-flags) with eventual consistency — there’s a delay before Kafka handlers see the update. - Valkey is instant. The enrollment handler sets the key; the very next Kafka message for that user will pass the gate.
- No external side effects. We don’t mutate a shared Feature Flipper flag that other services may depend on.
- Clean rollback. Delete the Valkey keys and experiment users are immediately ungated, without touching the FF flag.
Performance: The Valkey check only runs for users who fail the Feature Flipper check (i.e., not internal employees or other pre-existing users). For the ~200K experiment users, this adds ~1ms per Kafka message. The Kafka handlers already do a Valkey idempotency check, so this is not a new dependency — just one additional key lookup on the same connection.
What Changes in Code
Section titled “What Changes in Code”KrakenD mobile gateway (config only)
Section titled “KrakenD mobile gateway (config only)”Add a route entry for POST /consumer-graph-worker/enroll that forwards to the consumer-graph-worker backend. This is a JSON config change in the KrakenD gateway configuration — no code changes in rover-agent.
Config file: krakend/krakend_template.json in the bitbucket/krakend repo (branch: feature/PLT-458-experiment-enrollment).
consumer-graph-worker (enrollment logic)
Section titled “consumer-graph-worker (enrollment logic)”New/modified code:
| File | Description |
|---|---|
cmd/unified-worker/server.go | Add handleEnroll handler registered at /consumer-graph-worker/enroll, with SetNX→SQS→rollback flow |
cmd/unified-worker/main.go | Create SQS client, pass to newHTTPServer along with MetricManager |
cmd/unified-worker/kafka_consumer.go | Pass ValkeyClient to Kafka handler constructors |
cmd/kafka-consumer/main.go | Pass valkeyClient to both Kafka handler constructors (standalone consumer) |
internal/cache/valkey.go | Add IsExperimentEnrolled, SetExperimentEnrolled, DeleteExperimentEnrolled methods + NewClientFromRedis (test helper) |
internal/kafka/handler.go | Add cache field, isIngestionAllowed method with FF + Valkey fallback (fail-closed) |
internal/kafka/receipt_handler.go | Same pattern as handler.go — add isIngestionAllowed with enrollment check |
internal/metrics/metrics.go | Add RecordEnrollment counter (consumer_graph.enroll.total) with status attribute |
Test files:
| File | Description |
|---|---|
cmd/unified-worker/enroll_test.go | 7 test cases: success, already enrolled, concurrent SetNX, SQS failure + rollback, double failure, missing user ID, wrong method |
internal/cache/valkey_experiment_test.go | 7 test cases: IsExperimentEnrolled, SetExperimentEnrolled, DeleteExperimentEnrolled, set-delete-set cycle |
internal/kafka/handler_test.go | Updated constructor calls to pass nil cache |
internal/kafka/receipt_handler_test.go | Updated constructor calls to pass nil cache |
Existing code (no changes needed):
| Component | Why it just works |
|---|---|
| SQS consumer + Orchestrator | Already handles user_load_request events. The reason: "experiment_enrollment" is informational only. |
| Purchase History API client | Already rate-limited (10 req/s) with retry + exponential backoff for 429s. |
| Neo4j writer | Dedup-aware Cypher (receipt_id IN check) handles backfill + Kafka overlap safely. |
loaded_users cache | 7-day TTL prevents the Kafka path from re-processing a user who was just backfilled. |
Total scope
Section titled “Total scope”~260 lines of new Go code (including tests) across 12 files in consumer-graph-worker + a KrakenD config change. One new test dependency: github.com/alicebob/miniredis/v2 for in-memory Redis in tests.
Rate Considerations
Section titled “Rate Considerations”The mobile app will call the endpoint on session start. Expected traffic pattern:
| Phase | Enrollments/sec | Notes |
|---|---|---|
| Ramp-up (first few days) | 10-50 new users/sec | Burst as experiment rolls out |
| Steady state | 0-5 new users/sec | Most requests are cache hits (existing users) |
| Total request volume | Up to hundreds/sec | Includes repeat visits from already-enrolled users |
Backfill throughput: The SQS queue + Purchase History API rate limit (10 req/s) is the bottleneck. At 10 req/s, 200K users takes ~5.5 hours to fully backfill. This is fine — the backfill runs asynchronously and the user doesn’t wait for it.
If enrollment bursts exceed SQS processing capacity: Messages queue up in SQS (4-day retention). The backfill is eventually consistent — users will have their graph populated within hours, not seconds.
Operational Sequence
Section titled “Operational Sequence”1. Deploy updated consumer-graph-worker with /consumer-graph-worker/enroll handler + Kafka gate changes2. Deploy KrakenD config with /consumer-graph-worker/enroll route3. Verify Kafka consumer lag is zero4. Mobile team enables experiment — app starts calling /consumer-graph-worker/enroll through KrakenD5. For each new user: Valkey key set → backfill triggered via SQS → Kafka ingestion gated via ValkeyNo pre-loading of user IDs into Feature Flipper is needed. The Valkey enrollment key serves as the dynamic gate for both backfill dedup and Kafka ingestion.
Monitoring
Section titled “Monitoring”OTel metric: consumer_graph.enroll.total with status attribute:
| Status | Meaning |
|---|---|
enrolled | New user successfully enrolled + backfill triggered |
existing | Already enrolled (cache hit or concurrent SetNX) |
error_check | Valkey GET failed |
error_set | Valkey SetNX failed |
error_sqs | SQS send failed, rollback succeeded (retryable) |
error_rollback_failed | SQS failed AND rollback failed (stuck user — alert on this) |
Other metrics:
| Metric | How to check |
|---|---|
| Enrollment rate | consumer_graph.enroll.total{status=enrolled} in CloudWatch |
| Stuck users | consumer_graph.enroll.total{status=error_rollback_failed} > 0 — trigger alarm |
| Enrolled user count | Count of experiment:enrolled:* keys in Valkey |
| Backfill queue depth | SQS ApproximateNumberOfMessages on {env}-consumer-graph-queue |
| Backfill failures | SQS dead-letter queue / application error logs |
| Neo4j write throughput | Neo4j Bolt connection metrics / query latency |
| Kafka consumer lag | CloudWatch SumOffsetLag for both consumer groups |
Rollback
Section titled “Rollback”To disable the experiment:
- Immediate: Delete
experiment:enrolled:*keys from Valkey. This instantly stops both new enrollments (dedup check passes, butSetNXreturns false on re-create) and Kafka ingestion (the Valkey gate check fails). - API: The
/consumer-graph-worker/enrollendpoint can be left in place (it’s a no-op without the Valkey keys) or removed in a subsequent deploy. - Backfill: Any in-flight SQS messages will still be processed, but without the Kafka gate those users won’t receive future real-time updates.
Neo4j data does not need to be cleaned up — it’s additive and doesn’t affect non-experiment users.