Skip to content

Enrichment Persistence Pipeline

BrightData web search enrichment results flow from CCS through a CDC pipeline to Snowflake for downstream analytics. This document describes where the data lives at each stage and how to inspect it.

CCS (SearchEnricher)
├─ Valkey (read/write cache, 30d TTL)
└─ DynamoDB (write-only, 90d TTL)
└─ DynamoDB Streams (NEW_AND_OLD_IMAGES)
└─ MSK Kafka (CDC cluster)
└─ S3 (intermediate storage)
└─ Snowpipe → Snowflake

Not every enrichment result reaches DynamoDB/Snowflake. The persistence layer is intentionally narrow — only BrightData results that matched a Button partner are written to DynamoDB for CDC. The Valkey cache is broader and is the layer responsible for avoiding repeat BrightData API spend.

ScenarioValkeyDynamoDBSnowflake
BD search → product found → Button partner matchedYes (30d)Yes (90d)Yes
BD search → product found → no Button partnerYes, negative (7d)NoNo
BD search → no product matchYes, negative (7d)NoNo
FIDORA resolved (no BD search needed)NoNoNo

Why only Button-matched results in DynamoDB? These are the products CCS can monetize via points-per-dollar (PPD). The no-partner results don’t carry a revenue signal, so they add volume without analytical value. FIDORA-resolved products already exist in upstream systems (FIDORA, FPS) and don’t need a separate persistence path.

Why cache negative results in Valkey? This is the cost control mechanism. At steady state ~97% of lookups are cache hits because the same products recur across users. Both positive and negative results are cached, so a product that had no Button partner won’t trigger another BrightData API call for 7 days. This is what drives BD spend from ~$3K/mo to ~$200/mo at current scale.

Table: {env}-product-enrichment-cache

FieldTypeDescription
fido_idString (PK)FIDO product UUID
product_nameStringProduct name (empty — stored in FPS)
brandStringBrand (empty — stored in FPS)
merchant_nameStringMatched Button partner name (e.g. “Walmart”, “ULTA”)
merchant_idStringButton merchant org ID
is_on_fetch_shopBooleanAlways false (set at request time, not persist time)
ppd_pointsNumberPoints per dollar (computed at request time)
ppd_typeStringPPD type (e.g. “PPD”)
price_centsNumberProduct price in cents from web search
image_urlStringProduct image URL from search result
product_urlStringAffiliate link to merchant
merchant_logoStringMerchant logo URL
sourceStringAlways "brightdata"
createdAtMsNumberFirst write timestamp (epoch ms)
updatedAtMsNumberLatest write timestamp (epoch ms)
expiresAtNumberDynamoDB TTL (epoch seconds, 90 days from write)

Inspect via CLI:

Terminal window
# Count items
aws dynamodb scan \
--table-name prod-product-enrichment-cache \
--region us-east-1 --profile prod-services \
--select COUNT
# Sample items
aws dynamodb scan \
--table-name prod-product-enrichment-cache \
--region us-east-1 --profile prod-services \
--max-items 10 \
--query 'Items[*].{fido_id: fido_id.S, merchant: merchant_name.S, price: price_cents.N, source: source.S}'
# Lookup specific FIDO
aws dynamodb get-item \
--table-name prod-product-enrichment-cache \
--region us-east-1 --profile prod-services \
--key '{"fido_id": {"S": "<FIDO_UUID>"}}'

Cluster: {env}-cdc (Fetch shared MSK cluster)

Topic: Auto-created by FSD’s data pipeline connector. Named based on the service and table: {env}-cdc-consumer-context-service-product-enrichment-cache-cdc

The CDC connector runs as two ECS tasks managed by FSD:

  • Source (cdc-source-dynamodb-mkc): Reads DynamoDB Streams, publishes to Kafka
  • Sink (cdc-sink-s3-mkc): Reads Kafka, writes to S3

Both use JSON key/value format. Tombstone messages are enabled for deletes.

Schema: BRIGHTDATA_PRODUCT_ENRICHMENT

Table: PRODUCT_ENRICHMENT_CACHE_CURRENT_STATE_FSD (auto-created by Snowpipe)

Merge cadence: 60 minutes

Deduplication key: fido_id

Timestamps: createdAtMs (create), updatedAtMs (update)

Historical data load is enabled, so existing DynamoDB items are synced on first pipeline setup.

Inspect via Snowflake:

-- Row count
SELECT COUNT(*) FROM BRIGHTDATA_PRODUCT_ENRICHMENT.PRODUCT_ENRICHMENT_CACHE_CURRENT_STATE_FSD;
-- Recent writes
SELECT fido_id, merchant_name, price_cents, source, updatedAtMs
FROM BRIGHTDATA_PRODUCT_ENRICHMENT.PRODUCT_ENRICHMENT_CACHE_CURRENT_STATE_FSD
ORDER BY updatedAtMs DESC
LIMIT 20;
-- Merchant distribution
SELECT merchant_name, COUNT(*) as cnt
FROM BRIGHTDATA_PRODUCT_ENRICHMENT.PRODUCT_ENRICHMENT_CACHE_CURRENT_STATE_FSD
GROUP BY merchant_name
ORDER BY cnt DESC;

The pipeline is defined in consumer-context-service.yml under the DynamoDB table dependency:

- type: aws_dynamodb_table
table_name: "{{env}}-product-enrichment-cache"
stream_specification:
stream_enabled: true
stream_view_type: NEW_AND_OLD_IMAGES
data_pipeline:
enabled: true
migrate: true
micro: true
size: medium
handle_tombstones: true
key_format: json
value_format: json
snowflake:
enabled: true
load_historical_data: true
schema_name: "BRIGHTDATA_PRODUCT_ENRICHMENT"
dedupe_key: ["fido_id"]
timestamps:
create: "createdAtMs"
update: "updatedAtMs"
merge_cadence: "60 MINUTE"

Requires a fetch_msk_cluster dependency on the cdc cluster declared before the table. The deploy workflow must use a VPC-connected runner (k8s-linux-amd64-use1) since the Terraform Kafka provider needs to reach MSK brokers on private IPs.

The SearchEnricher writes to DynamoDB via a bounded worker pool (20 workers, 1024-item buffer). Writes are async and best-effort — failures are logged but never block the enrichment response. Only positive BrightData matches are written; negative “no partner found” results are cached in Valkey only.

EnrichFromSearch()
→ doSearch() (BrightData + Button partner matching)
→ writeToValkeyAndDynamo()
├─ Valkey: put/putNegative (sync, both positive and negative)
└─ DynamoDB: send to worker pool channel (async, positive only)
→ dynamoWriteWorker() → EnrichmentStore.PutBatch()