Enrichment Persistence Pipeline
Enrichment Persistence Pipeline
Section titled “Enrichment Persistence Pipeline”BrightData web search enrichment results flow from CCS through a CDC pipeline to Snowflake for downstream analytics. This document describes where the data lives at each stage and how to inspect it.
Pipeline Overview
Section titled “Pipeline Overview”CCS (SearchEnricher) │ ├─ Valkey (read/write cache, 30d TTL) │ └─ DynamoDB (write-only, 90d TTL) │ └─ DynamoDB Streams (NEW_AND_OLD_IMAGES) │ └─ MSK Kafka (CDC cluster) │ └─ S3 (intermediate storage) │ └─ Snowpipe → SnowflakeWhat Gets Stored Where
Section titled “What Gets Stored Where”Not every enrichment result reaches DynamoDB/Snowflake. The persistence layer is intentionally narrow — only BrightData results that matched a Button partner are written to DynamoDB for CDC. The Valkey cache is broader and is the layer responsible for avoiding repeat BrightData API spend.
| Scenario | Valkey | DynamoDB | Snowflake |
|---|---|---|---|
| BD search → product found → Button partner matched | Yes (30d) | Yes (90d) | Yes |
| BD search → product found → no Button partner | Yes, negative (7d) | No | No |
| BD search → no product match | Yes, negative (7d) | No | No |
| FIDORA resolved (no BD search needed) | No | No | No |
Why only Button-matched results in DynamoDB? These are the products CCS can monetize via points-per-dollar (PPD). The no-partner results don’t carry a revenue signal, so they add volume without analytical value. FIDORA-resolved products already exist in upstream systems (FIDORA, FPS) and don’t need a separate persistence path.
Why cache negative results in Valkey? This is the cost control mechanism. At steady state ~97% of lookups are cache hits because the same products recur across users. Both positive and negative results are cached, so a product that had no Button partner won’t trigger another BrightData API call for 7 days. This is what drives BD spend from ~$3K/mo to ~$200/mo at current scale.
1. DynamoDB
Section titled “1. DynamoDB”Table: {env}-product-enrichment-cache
| Field | Type | Description |
|---|---|---|
fido_id | String (PK) | FIDO product UUID |
product_name | String | Product name (empty — stored in FPS) |
brand | String | Brand (empty — stored in FPS) |
merchant_name | String | Matched Button partner name (e.g. “Walmart”, “ULTA”) |
merchant_id | String | Button merchant org ID |
is_on_fetch_shop | Boolean | Always false (set at request time, not persist time) |
ppd_points | Number | Points per dollar (computed at request time) |
ppd_type | String | PPD type (e.g. “PPD”) |
price_cents | Number | Product price in cents from web search |
image_url | String | Product image URL from search result |
product_url | String | Affiliate link to merchant |
merchant_logo | String | Merchant logo URL |
source | String | Always "brightdata" |
createdAtMs | Number | First write timestamp (epoch ms) |
updatedAtMs | Number | Latest write timestamp (epoch ms) |
expiresAt | Number | DynamoDB TTL (epoch seconds, 90 days from write) |
Inspect via CLI:
# Count itemsaws dynamodb scan \ --table-name prod-product-enrichment-cache \ --region us-east-1 --profile prod-services \ --select COUNT
# Sample itemsaws dynamodb scan \ --table-name prod-product-enrichment-cache \ --region us-east-1 --profile prod-services \ --max-items 10 \ --query 'Items[*].{fido_id: fido_id.S, merchant: merchant_name.S, price: price_cents.N, source: source.S}'
# Lookup specific FIDOaws dynamodb get-item \ --table-name prod-product-enrichment-cache \ --region us-east-1 --profile prod-services \ --key '{"fido_id": {"S": "<FIDO_UUID>"}}'2. Kafka (CDC)
Section titled “2. Kafka (CDC)”Cluster: {env}-cdc (Fetch shared MSK cluster)
Topic: Auto-created by FSD’s data pipeline connector. Named based on the service and table:
{env}-cdc-consumer-context-service-product-enrichment-cache-cdc
The CDC connector runs as two ECS tasks managed by FSD:
- Source (
cdc-source-dynamodb-mkc): Reads DynamoDB Streams, publishes to Kafka - Sink (
cdc-sink-s3-mkc): Reads Kafka, writes to S3
Both use JSON key/value format. Tombstone messages are enabled for deletes.
3. Snowflake
Section titled “3. Snowflake”Schema: BRIGHTDATA_PRODUCT_ENRICHMENT
Table: PRODUCT_ENRICHMENT_CACHE_CURRENT_STATE_FSD (auto-created by Snowpipe)
Merge cadence: 60 minutes
Deduplication key: fido_id
Timestamps: createdAtMs (create), updatedAtMs (update)
Historical data load is enabled, so existing DynamoDB items are synced on first pipeline setup.
Inspect via Snowflake:
-- Row countSELECT COUNT(*) FROM BRIGHTDATA_PRODUCT_ENRICHMENT.PRODUCT_ENRICHMENT_CACHE_CURRENT_STATE_FSD;
-- Recent writesSELECT fido_id, merchant_name, price_cents, source, updatedAtMsFROM BRIGHTDATA_PRODUCT_ENRICHMENT.PRODUCT_ENRICHMENT_CACHE_CURRENT_STATE_FSDORDER BY updatedAtMs DESCLIMIT 20;
-- Merchant distributionSELECT merchant_name, COUNT(*) as cntFROM BRIGHTDATA_PRODUCT_ENRICHMENT.PRODUCT_ENRICHMENT_CACHE_CURRENT_STATE_FSDGROUP BY merchant_nameORDER BY cnt DESC;FSD Configuration
Section titled “FSD Configuration”The pipeline is defined in consumer-context-service.yml under the DynamoDB table dependency:
- type: aws_dynamodb_table table_name: "{{env}}-product-enrichment-cache" stream_specification: stream_enabled: true stream_view_type: NEW_AND_OLD_IMAGES data_pipeline: enabled: true migrate: true micro: true size: medium handle_tombstones: true key_format: json value_format: json snowflake: enabled: true load_historical_data: true schema_name: "BRIGHTDATA_PRODUCT_ENRICHMENT" dedupe_key: ["fido_id"] timestamps: create: "createdAtMs" update: "updatedAtMs" merge_cadence: "60 MINUTE"Requires a fetch_msk_cluster dependency on the cdc cluster declared before the table. The deploy workflow must use a VPC-connected runner (k8s-linux-amd64-use1) since the Terraform Kafka provider needs to reach MSK brokers on private IPs.
Write Path in CCS
Section titled “Write Path in CCS”The SearchEnricher writes to DynamoDB via a bounded worker pool (20 workers, 1024-item buffer). Writes are async and best-effort — failures are logged but never block the enrichment response. Only positive BrightData matches are written; negative “no partner found” results are cached in Valkey only.
EnrichFromSearch() → doSearch() (BrightData + Button partner matching) → writeToValkeyAndDynamo() ├─ Valkey: put/putNegative (sync, both positive and negative) └─ DynamoDB: send to worker pool channel (async, positive only) → dynamoWriteWorker() → EnrichmentStore.PutBatch()