AWS DynamoDB Streams and Lambda: Event-Driven Data Patterns (2026)
Every write to a DynamoDB table — every INSERT, MODIFY, and REMOVE — can trigger a Lambda function within milliseconds. That's the power of DynamoDB Streams: a time-ordered, exactly-once log of every change to your table, replayed into any downstream system you choose. Want to sync changes to Elasticsearch? Replicate cross-region? Build an audit trail? Invalidate a cache? DynamoDB Streams + Lambda makes all of this possible without polling, without cronjobs, and without touching your application code.
This guide goes deep. We'll cover stream view types and shard internals, every knob on the Lambda event source mapping, real production patterns with working Python and Java code, error handling that actually holds up under failure, and a frank cost analysis so you know what you're signing up for.
Table of Contents
- DynamoDB Streams Internals: View Types and Shard Architecture
- Enabling Streams: CLI, Console, and Terraform
- Lambda Event Source Mapping: Every Parameter Explained
- Writing the Lambda Handler: Python and Java
- Real-World Patterns: CDC, Replication, Audit, Cache
- Error Handling: DLQ, Partial Batch Response, Retry Strategy
- DynamoDB Kinesis Streams: When to Use It Instead
- Fan-Out Pattern: Streams → Lambda → SNS/SQS
- Performance Tuning: Shards, Parallelization, Concurrency
- Cost Analysis: Streams Pricing vs Kinesis
DynamoDB Streams Internals: View Types and Shard Architecture
DynamoDB Streams is a change data capture (CDC) mechanism that captures a time-ordered sequence of item-level modifications in a DynamoDB table. Each change record is called a stream record, and the stream is organized into shards — analogous to Kinesis shards. Understanding the internals is critical because your choice of stream view type and shard architecture directly determines how much data Lambda receives, what your Lambda can do with it, and how much you pay.
Stream View Types
When you enable a stream, you choose one of four view types. This is an immutable choice — you must disable and re-enable the stream to change it:
| View Type | What's in the Record | Size | Use When |
|---|---|---|---|
KEYS_ONLY | Only the PK and SK of the changed item | Smallest | You only need to know which item changed (e.g., cache invalidation) |
NEW_IMAGE | The entire item as it looks after the change | Medium | Downstream sync where you always want the latest state |
OLD_IMAGE | The entire item as it looked before the change | Medium | Audit trails where you want to know what was overwritten |
NEW_AND_OLD_IMAGES | Both the before and after images of the item | Largest | Diff-based sync, conflict detection, and full audit logs |
NEW_AND_OLD_IMAGES during development — it gives you the most flexibility and makes debugging easy. Switch to a smaller view type in production once your access patterns are clear. A REMOVE event with NEW_IMAGE will have a null NewImage (the item no longer exists), so always null-check.
Shard Architecture and 24-Hour Retention
A DynamoDB Stream is automatically partitioned into shards, each of which contains a sequence of stream records. DynamoDB manages shard creation and splitting transparently — when your table's partition count grows (due to scaling), the stream gains corresponding shards. Each shard maps to one or more table partitions.
Key shard constraints you must design around:
- Each shard can have at most one Lambda consumer at a given time (in a standard event source mapping). The parallelization factor can change this — see the performance section.
- Stream records are retained for exactly 24 hours. If your Lambda is down or throttled for more than 24 hours, records are lost. Build your error handling strategy with this in mind.
- Records within a shard are strictly ordered. Cross-shard ordering is not guaranteed. If you need global ordering, you need Kinesis Data Streams (see below).
- A table can have at most 2 simultaneous stream consumers (e.g., two Lambda event source mappings on the same stream ARN). If you need more fan-out, use the SNS/SQS pattern described later.
Stream records include metadata fields: eventName (INSERT/MODIFY/REMOVE), eventSource (aws:dynamodb), awsRegion, dynamodb.SequenceNumber, dynamodb.SizeBytes, and the image(s) depending on view type.
Enabling Streams: CLI, Console, and Terraform
Enabling DynamoDB Streams on an existing table is a non-breaking, zero-downtime operation. The stream begins capturing changes from the moment it's enabled — it does not backfill historical data. Plan for this: if you need to bootstrap downstream systems with existing data, run a full table scan first, then enable the stream to catch incremental changes.
AWS CLI
# Enable streams on an existing table
aws dynamodb update-table \
--table-name Orders \
--stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES \
--region us-east-1
# Get the stream ARN (you'll need this for the Lambda trigger)
aws dynamodb describe-table \
--table-name Orders \
--query 'Table.LatestStreamArn' \
--output text
# → arn:aws:dynamodb:us-east-1:123456789012:table/Orders/stream/2026-06-08T00:00:00.000
# List all stream shards
aws dynamodbstreams describe-stream \
--stream-arn arn:aws:dynamodb:us-east-1:123456789012:table/Orders/stream/2026-06-08T00:00:00.000
Terraform (complete setup)
resource "aws_dynamodb_table" "orders" {
name = "Orders"
billing_mode = "PAY_PER_REQUEST"
hash_key = "customerId"
range_key = "orderId"
attribute {
name = "customerId"
type = "S"
}
attribute {
name = "orderId"
type = "S"
}
# Enable DynamoDB Streams
stream_enabled = true
stream_view_type = "NEW_AND_OLD_IMAGES"
tags = {
Environment = "production"
Team = "platform"
}
}
# IAM role for Lambda to consume the stream
resource "aws_iam_role" "stream_processor" {
name = "dynamodb-stream-processor"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = { Service = "lambda.amazonaws.com" }
}]
})
}
resource "aws_iam_role_policy" "stream_policy" {
name = "dynamodb-stream-policy"
role = aws_iam_role.stream_processor.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
"dynamodb:DescribeStream",
"dynamodb:ListStreams"
]
Resource = "${aws_dynamodb_table.orders.stream_arn}"
},
{
Effect = "Allow"
Action = ["logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents"]
Resource = "arn:aws:logs:*:*:*"
},
{
# For DLQ on failure
Effect = "Allow"
Action = ["sqs:SendMessage"]
Resource = aws_sqs_queue.stream_dlq.arn
}
]
})
}
# Lambda function
resource "aws_lambda_function" "stream_processor" {
function_name = "orders-stream-processor"
runtime = "python3.12"
handler = "handler.lambda_handler"
role = aws_iam_role.stream_processor.arn
filename = "lambda_stream.zip"
timeout = 300
memory_size = 256
environment {
variables = {
OPENSEARCH_ENDPOINT = var.opensearch_endpoint
}
}
}
# DLQ for failed batches
resource "aws_sqs_queue" "stream_dlq" {
name = "orders-stream-dlq"
message_retention_seconds = 1209600 # 14 days
}
# Event Source Mapping — the glue between stream and Lambda
resource "aws_lambda_event_source_mapping" "orders_stream" {
event_source_arn = aws_dynamodb_table.orders.stream_arn
function_name = aws_lambda_function.stream_processor.arn
starting_position = "TRIM_HORIZON"
batch_size = 100
maximum_batching_window_in_seconds = 5
parallelization_factor = 2
bisect_batch_on_function_error = true
maximum_retry_attempts = 3
maximum_record_age_in_seconds = 3600 # drop records older than 1 hour
destination_config {
on_failure {
destination_arn = aws_sqs_queue.stream_dlq.arn
}
}
# Enable partial batch response (Lambda reports individual item failures)
function_response_types = ["ReportBatchItemFailures"]
}
aws_dynamodb_table.orders.stream_arn only after the table is created with stream_enabled = true. If you're adding streams to an existing Terraform-managed table, add both stream_enabled and stream_view_type and run terraform apply — it's an in-place update with no downtime.
Lambda Event Source Mapping: Every Parameter Explained
The Event Source Mapping (ESM) is the AWS-managed poller that reads records from your stream shards and invokes your Lambda. It's not a trigger in the traditional sense — it's a long-running poller managed by the Lambda service. Understanding every ESM parameter is the difference between a flaky processor and a production-grade one.
| Parameter | Default | Recommended | Notes |
|---|---|---|---|
batch_size | 100 | 25–500 | Records per Lambda invocation. Higher = fewer cold starts, but larger payloads. Max 10,000 for Streams. |
maximum_batching_window_in_seconds | 0 | 1–5 | Wait up to N seconds to fill a batch. Reduces Lambda invocations and cost during low-traffic periods. |
starting_position | — | LATEST or TRIM_HORIZON | TRIM_HORIZON = process from oldest available record. LATEST = only new records. AT_TIMESTAMP = specific time. |
parallelization_factor | 1 | 2–10 | Concurrent Lambda invocations per shard. Massively increases throughput. Max 10. See performance section. |
bisect_batch_on_function_error | false | true | On error, splits batch in half and retries each half. Helps isolate poison-pill records. |
maximum_retry_attempts | -1 (infinite) | 3–5 | Infinite retries with a 24-hour stream window can block your shard for a full day. Always set a limit. |
maximum_record_age_in_seconds | -1 | 3600 | Discard records older than N seconds. Prevents processing stale events after an outage. |
function_response_types | [] | ["ReportBatchItemFailures"] | Enables partial batch response — only retry failed items, not the entire batch. |
destination_config.on_failure | — | SQS DLQ | Send failed batches (after all retries exhausted) to an SQS queue for manual inspection. |
TRIM_HORIZON when first connecting a new Lambda to an existing stream — it processes everything in the 24-hour window. Use LATEST for new streams or when historical records are irrelevant. Use AT_TIMESTAMP for incident recovery ("reprocess everything after the deploy at 14:30").
CLI: Create the Event Source Mapping
STREAM_ARN=$(aws dynamodb describe-table \
--table-name Orders \
--query 'Table.LatestStreamArn' \
--output text)
aws lambda create-event-source-mapping \
--function-name orders-stream-processor \
--event-source-arn "$STREAM_ARN" \
--batch-size 100 \
--maximum-batching-window-in-seconds 5 \
--starting-position TRIM_HORIZON \
--parallelization-factor 2 \
--bisect-batch-on-function-error \
--maximum-retry-attempts 3 \
--maximum-record-age-in-seconds 3600 \
--function-response-types ReportBatchItemFailures \
--destination-config '{"OnFailure":{"Destination":"arn:aws:sqs:us-east-1:123456789012:orders-stream-dlq"}}'
Writing the Lambda Handler: Python and Java
The Lambda receives a batch of stream records grouped by shard. Your handler must parse the eventName to decide what to do, unmarshall DynamoDB's wire format into native types, and — critically — report partial failures correctly so the ESM doesn't retry records that already succeeded.
Python Handler (with partial batch response)
import json
import logging
from typing import Any
from boto3.dynamodb.types import TypeDeserializer
logger = logging.getLogger()
logger.setLevel(logging.INFO)
deserializer = TypeDeserializer()
def unmarshall(dynamo_item: dict) -> dict:
"""Convert DynamoDB wire format to Python native types."""
return {k: deserializer.deserialize(v) for k, v in dynamo_item.items()}
def process_insert(new_image: dict, sequence_number: str) -> None:
"""Handle a new item being inserted into DynamoDB."""
logger.info(f"INSERT: seq={sequence_number}, item={new_image}")
# Example: sync to OpenSearch
# opensearch_client.index(index="orders", id=new_image["orderId"], body=new_image)
def process_modify(old_image: dict, new_image: dict, sequence_number: str) -> None:
"""Handle an existing item being modified."""
changed_fields = {
k: {"old": old_image.get(k), "new": new_image.get(k)}
for k in set(old_image) | set(new_image)
if old_image.get(k) != new_image.get(k)
}
logger.info(f"MODIFY: seq={sequence_number}, changes={changed_fields}")
# Example: partial update in OpenSearch
# opensearch_client.update(index="orders", id=new_image["orderId"],
# body={"doc": changed_fields})
def process_remove(old_image: dict, keys: dict, sequence_number: str) -> None:
"""Handle an item being deleted from DynamoDB."""
logger.info(f"REMOVE: seq={sequence_number}, keys={keys}")
# Example: delete from OpenSearch
# opensearch_client.delete(index="orders", id=keys["orderId"])
def lambda_handler(event: dict, context: Any) -> dict:
"""
Main Lambda handler. Returns ReportBatchItemFailures response
so the ESM only retries failed records, not the entire batch.
"""
batch_item_failures = []
for record in event["Records"]:
sequence_number = record["dynamodb"]["SequenceNumber"]
event_name = record["eventName"] # INSERT | MODIFY | REMOVE
try:
dynamo_record = record["dynamodb"]
new_image = (
unmarshall(dynamo_record["NewImage"])
if "NewImage" in dynamo_record
else None
)
old_image = (
unmarshall(dynamo_record["OldImage"])
if "OldImage" in dynamo_record
else None
)
keys = unmarshall(dynamo_record["Keys"])
if event_name == "INSERT":
process_insert(new_image, sequence_number)
elif event_name == "MODIFY":
process_modify(old_image, new_image, sequence_number)
elif event_name == "REMOVE":
process_remove(old_image, keys, sequence_number)
else:
logger.warning(f"Unknown eventName: {event_name}")
except Exception as e:
logger.error(
f"Failed to process record seq={sequence_number}: {e}",
exc_info=True
)
# Report this specific record as failed — ESM will retry it
batch_item_failures.append({"itemIdentifier": sequence_number})
return {"batchItemFailures": batch_item_failures}
Java Handler (with Jackson + DynamoDB Enhanced Client)
package com.techoral.streams;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.enhanced.dynamodb.document.EnhancedDocument;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StreamProcessor
implements RequestHandler<DynamodbEvent, StreamsEventResponse> {
private static final Logger log = LoggerFactory.getLogger(StreamProcessor.class);
@Override
public StreamsEventResponse handleRequest(DynamodbEvent event, Context context) {
List<StreamsEventResponse.BatchItemFailure> failures = new ArrayList<>();
for (DynamodbEvent.DynamodbStreamRecord record : event.getRecords()) {
String sequenceNumber = record.getDynamodb().getSequenceNumber();
String eventName = record.getEventName();
try {
switch (eventName) {
case "INSERT" -> handleInsert(record, sequenceNumber);
case "MODIFY" -> handleModify(record, sequenceNumber);
case "REMOVE" -> handleRemove(record, sequenceNumber);
default -> log.warn("Unknown event: {}", eventName);
}
} catch (Exception e) {
log.error("Failed processing seq={}: {}", sequenceNumber, e.getMessage(), e);
failures.add(StreamsEventResponse.BatchItemFailure.builder()
.withItemIdentifier(sequenceNumber)
.build());
}
}
return StreamsEventResponse.builder()
.withBatchItemFailures(failures)
.build();
}
private void handleInsert(DynamodbEvent.DynamodbStreamRecord record, String seq) {
Map<String, com.amazonaws.services.dynamodb.model.AttributeValue> newImage =
record.getDynamodb().getNewImage();
if (newImage == null) return;
String orderId = newImage.get("orderId").getS();
String customerId = newImage.get("customerId").getS();
String status = newImage.containsKey("status") ? newImage.get("status").getS() : "UNKNOWN";
log.info("INSERT seq={} orderId={} customerId={} status={}", seq, orderId, customerId, status);
// syncToOpenSearch(orderId, newImage);
}
private void handleModify(DynamodbEvent.DynamodbStreamRecord record, String seq) {
Map<String, com.amazonaws.services.dynamodb.model.AttributeValue> oldImage =
record.getDynamodb().getOldImage();
Map<String, com.amazonaws.services.dynamodb.model.AttributeValue> newImage =
record.getDynamodb().getNewImage();
if (oldImage != null && newImage != null) {
String oldStatus = oldImage.containsKey("status") ? oldImage.get("status").getS() : null;
String newStatus = newImage.containsKey("status") ? newImage.get("status").getS() : null;
if (!java.util.Objects.equals(oldStatus, newStatus)) {
log.info("MODIFY seq={} status changed: {} → {}", seq, oldStatus, newStatus);
// publishStatusChangeEvent(newImage);
}
}
}
private void handleRemove(DynamodbEvent.DynamodbStreamRecord record, String seq) {
Map<String, com.amazonaws.services.dynamodb.model.AttributeValue> keys =
record.getDynamodb().getKeys();
String orderId = keys.get("orderId").getS();
log.info("REMOVE seq={} orderId={}", seq, orderId);
// removeFromOpenSearch(orderId);
}
}
{"S": "hello"}, {"N": "42"}, {"BOOL": true}, {"L": [...]}, {"M": {...}}. Python's boto3.dynamodb.types.TypeDeserializer handles this cleanly. In Java, use the AWS Lambda Events library's built-in AttributeValue map or the DynamoDB Enhanced Client's EnhancedDocument. Never parse the wire format manually — edge cases like null sets and binary types will bite you.
Real-World Patterns: CDC, Replication, Audit, Cache Invalidation
DynamoDB Streams unlocks a class of event-driven architectures that would otherwise require complex polling infrastructure. Here are four production patterns with concrete implementation details.
Pattern 1: Change Data Capture to OpenSearch
Keep a search index in sync with your DynamoDB table without dual-writes from your application. The stream guarantees every change is propagated, in order, exactly once. Use NEW_IMAGE for full document sync or NEW_AND_OLD_IMAGES for delta-only updates:
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
import boto3
def get_opensearch_client() -> OpenSearch:
credentials = boto3.Session().get_credentials()
region = "us-east-1"
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key,
region, "es", session_token=credentials.token)
return OpenSearch(
hosts=[{"host": os.environ["OPENSEARCH_ENDPOINT"], "port": 443}],
http_auth=awsauth,
use_ssl=True,
connection_class=RequestsHttpConnection
)
es = get_opensearch_client()
def sync_to_opensearch(event_name: str, new_image: dict, keys: dict) -> None:
doc_id = keys["orderId"]
if event_name == "REMOVE":
es.delete(index="orders", id=doc_id, ignore=[404])
else:
es.index(index="orders", id=doc_id, body=new_image)
Pattern 2: Cross-Region Replication (Custom)
DynamoDB Global Tables handles cross-region replication natively, but if you need custom logic (filtering, transformation, selective replication), build it with Streams + Lambda:
import boto3
import os
dest_dynamodb = boto3.resource("dynamodb", region_name=os.environ["DEST_REGION"])
dest_table = dest_dynamodb.Table(os.environ["DEST_TABLE"])
def replicate_item(event_name: str, new_image: dict, keys: dict) -> None:
if event_name == "REMOVE":
dest_table.delete_item(Key=keys)
elif event_name in ("INSERT", "MODIFY"):
# Optional: transform or filter before replicating
if new_image.get("replicateToRegion") == os.environ["DEST_REGION"]:
dest_table.put_item(Item=new_image)
Pattern 3: Audit Log to S3 via Kinesis Firehose
Write every change to S3 as an immutable audit trail, partitioned by date for Athena queries:
import boto3
import json
import time
firehose = boto3.client("firehose")
def write_audit_log(event_name: str, old_image: dict, new_image: dict,
keys: dict, sequence_number: str) -> None:
record = {
"eventName": event_name,
"sequenceNumber": sequence_number,
"timestamp": int(time.time()),
"keys": keys,
"oldImage": old_image,
"newImage": new_image,
}
firehose.put_record(
DeliveryStreamName="orders-audit-trail",
Record={"Data": json.dumps(record) + "\n"} # newline-delimited JSON for Athena
)
Pattern 4: Cache Invalidation
Use KEYS_ONLY stream view for minimal data transfer — you only need to know which cache key to evict:
import redis
import os
cache = redis.Redis.from_url(os.environ["REDIS_URL"])
def invalidate_cache(keys: dict) -> None:
# Build the cache key matching your application's cache key scheme
cache_key = f"order:{keys['customerId']}:{keys['orderId']}"
count = cache.delete(cache_key)
# Also invalidate list caches for this customer
list_key = f"orders:customer:{keys['customerId']}"
cache.delete(list_key)
logger.info(f"Invalidated {count} cache key(s) for {cache_key}")
Error Handling: DLQ, Partial Batch Response, Retry Strategy
Error handling in stream processing is where most production systems fall apart. The naive approach — let Lambda throw an exception and retry the whole batch — leads to stuck shards, duplicate processing, and eventually data loss as records age past the 24-hour window. Here's the complete strategy.
The Problem with Default Behavior
Without configuration, a single failing record blocks the entire shard. Lambda retries the full batch indefinitely until the record expires (24 hours) or you disable the ESM. During this time, no new records from that shard are processed. In a high-write table, this means your downstream system falls further and further behind.
Three-Layer Defense
Layer 1: Partial Batch Response — Enable ReportBatchItemFailures. Your handler returns a list of failed sequence numbers. The ESM only retries those specific records, not the whole batch. This is the single most impactful change you can make.
Layer 2: Bisect on Error — Enable bisect_batch_on_function_error. If your Lambda throws an unhandled exception (the handler itself crashes, not individual record processing), the ESM halves the batch and retries each half separately. This isolates poison-pill records that crash the handler.
Layer 3: DLQ on Failure — After maximum_retry_attempts, the ESM sends the failed batch to your SQS DLQ. This prevents permanent data loss and gives you a queue to investigate and replay from:
import json
import boto3
import os
sqs = boto3.client("sqs")
DLQ_URL = os.environ["DLQ_URL"]
def send_to_dlq(record: dict, error: str) -> None:
"""Manually send a specific record to DLQ with error context."""
sqs.send_message(
QueueUrl=DLQ_URL,
MessageBody=json.dumps({
"sequenceNumber": record["dynamodb"]["SequenceNumber"],
"eventName": record["eventName"],
"error": error,
"record": record,
}),
MessageAttributes={
"ErrorType": {
"DataType": "String",
"StringValue": type(error).__name__
}
}
)
DLQ Replay Script
import boto3
import json
def replay_from_dlq(dlq_url: str, processor_fn) -> None:
"""
Pull messages from DLQ and replay them through the processor.
Run this as a one-off script during incident recovery.
"""
sqs = boto3.client("sqs")
while True:
response = sqs.receive_message(
QueueUrl=dlq_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20
)
messages = response.get("Messages", [])
if not messages:
break
for msg in messages:
body = json.loads(msg["Body"])
original_record = body["record"]
try:
processor_fn(original_record)
sqs.delete_message(
QueueUrl=dlq_url,
ReceiptHandle=msg["ReceiptHandle"]
)
print(f"Replayed and deleted: {body['sequenceNumber']}")
except Exception as e:
print(f"Failed replay: {body['sequenceNumber']}: {e}")
# Leave in DLQ for next attempt
DynamoDB Kinesis Streams: When to Use It Instead
Since November 2020, DynamoDB offers a second streaming mechanism: Kinesis Data Streams integration. Instead of writing change records to DynamoDB's proprietary stream, DynamoDB writes them directly to a Kinesis Data Stream you provision. This is fundamentally different from DynamoDB Streams and serves different use cases.
DynamoDB Streams vs Kinesis Data Streams
| Feature | DynamoDB Streams | Kinesis Data Streams |
|---|---|---|
| Retention | 24 hours (fixed) | 1–365 days (configurable) |
| Max consumers | 2 | Unlimited (Enhanced Fan-Out) |
| Replay | Within 24 hours only | Replay from any point in retention window |
| Shard management | Automatic (mirrors DDB partitions) | Manual or on-demand auto scaling |
| Cost | Free (pay for reads only) | Per-shard-hour + PUT payload |
| Firehose integration | Via Lambda | Native, no-code |
| Cross-account | No | Yes (Kinesis cross-account) |
| Setup | One API call | Provision KDS first, then link to DDB |
Enable Kinesis Data Streams integration
# Create the Kinesis stream first
aws kinesis create-stream \
--stream-name orders-changes \
--shard-count 4
# Link DynamoDB to the Kinesis stream
aws dynamodb enable-kinesis-streaming-destination \
--table-name Orders \
--stream-arn arn:aws:kinesis:us-east-1:123456789012:stream/orders-changes
# Verify status (takes ~1 minute to activate)
aws dynamodb describe-kinesis-streaming-destination \
--table-name Orders
Terraform: Kinesis Direct Integration
resource "aws_kinesis_stream" "orders_changes" {
name = "orders-changes"
shard_count = 4
retention_period = 168 # 7 days
stream_mode_details {
stream_mode = "PROVISIONED"
}
}
resource "aws_dynamodb_kinesis_streaming_destination" "orders" {
table_name = aws_dynamodb_table.orders.name
stream_arn = aws_kinesis_stream.orders_changes.arn
}
# Send to S3 via Firehose — no Lambda needed for raw archival
resource "aws_kinesis_firehose_delivery_stream" "orders_archive" {
name = "orders-change-archive"
destination = "extended_s3"
kinesis_source_configuration {
kinesis_stream_arn = aws_kinesis_stream.orders_changes.arn
role_arn = aws_iam_role.firehose.arn
}
extended_s3_configuration {
role_arn = aws_iam_role.firehose.arn
bucket_arn = aws_s3_bucket.audit_archive.arn
prefix = "dynamodb-changes/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/"
error_output_prefix = "errors/"
buffering_size = 128
buffering_interval = 300
compression_format = "GZIP"
}
}
Fan-Out Pattern: Streams → Lambda → SNS/SQS for Multiple Consumers
DynamoDB Streams supports only 2 simultaneous stream consumers (Lambda event source mappings). If you need 3, 5, or 10 downstream systems consuming the same changes, you need the fan-out pattern: one Lambda reads the stream and publishes to SNS, which fans out to multiple SQS queues, each consumed by a dedicated Lambda.
import boto3
import json
import os
sns = boto3.client("sns")
TOPIC_ARN = os.environ["ORDER_CHANGE_TOPIC_ARN"]
def lambda_handler(event: dict, context) -> dict:
"""Fan-out Lambda: reads stream, publishes to SNS."""
batch_item_failures = []
for record in event["Records"]:
sequence_number = record["dynamodb"]["SequenceNumber"]
try:
event_name = record["eventName"]
dynamo = record["dynamodb"]
message = {
"eventName": event_name,
"sequenceNumber": sequence_number,
"tableName": record["eventSourceARN"].split("/")[1],
"keys": dynamo.get("Keys", {}),
"newImage": dynamo.get("NewImage"),
"oldImage": dynamo.get("OldImage"),
}
sns.publish(
TopicArn=TOPIC_ARN,
Message=json.dumps(message),
MessageAttributes={
# Allows SQS subscribers to filter by event type
"eventName": {
"DataType": "String",
"StringValue": event_name
}
}
)
except Exception as e:
logger.error(f"Fan-out failed for seq={sequence_number}: {e}")
batch_item_failures.append({"itemIdentifier": sequence_number})
return {"batchItemFailures": batch_item_failures}
SNS → SQS Fan-Out Terraform
resource "aws_sns_topic" "order_changes" {
name = "order-change-events"
}
# Consumer 1: OpenSearch sync queue
resource "aws_sqs_queue" "opensearch_sync" {
name = "order-opensearch-sync"
}
# Consumer 2: Analytics pipeline queue
resource "aws_sqs_queue" "analytics" {
name = "order-analytics-events"
}
# Consumer 3: Cache invalidation queue (only INSERT + MODIFY)
resource "aws_sqs_queue" "cache_invalidation" {
name = "order-cache-invalidation"
}
# Subscribe each SQS to SNS with optional filtering
resource "aws_sns_topic_subscription" "opensearch" {
topic_arn = aws_sns_topic.order_changes.arn
protocol = "sqs"
endpoint = aws_sqs_queue.opensearch_sync.arn
# No filter — receive all events
}
resource "aws_sns_topic_subscription" "cache" {
topic_arn = aws_sns_topic.order_changes.arn
protocol = "sqs"
endpoint = aws_sqs_queue.cache_invalidation.arn
filter_policy = jsonencode({
eventName = ["INSERT", "MODIFY"] # Cache only needs changes, not deletes
})
}
maximum_retry_attempts conservatively, and alarm on DLQ depth. The individual consumer Lambdas should also have their own DLQs. Each SQS queue provides backpressure buffering if a downstream system is temporarily unavailable.
Performance Tuning: Shards, Parallelization, and Concurrency
By default, the Lambda ESM reads one shard sequentially — one batch at a time, waiting for the Lambda invocation to complete before reading the next batch. On a high-write DynamoDB table, this creates lag. Here's how to tune it.
Parallelization Factor
The parallelization factor (1–10) controls how many concurrent Lambda invocations run per shard simultaneously. A factor of 4 on a table with 10 shards means up to 40 concurrent Lambda invocations. This is the primary lever for reducing lag:
# Update an existing ESM to increase parallelization
aws lambda update-event-source-mapping \
--uuid "your-esm-uuid" \
--parallelization-factor 4 \
--batch-size 200 \
--maximum-batching-window-in-seconds 3
Lambda Concurrency Limits
Calculate your peak concurrency requirement:
Peak Lambda concurrency = shard_count × parallelization_factor
Example:
- Table scales to 20 shards during peak
- Parallelization factor = 5
- Peak concurrency = 20 × 5 = 100 concurrent Lambdas
Set reserved concurrency on the function:
aws lambda put-function-concurrency \
--function-name orders-stream-processor \
--reserved-concurrent-executions 120 # 20% headroom
Batch Size and Batching Window Tradeoffs
Low volume (< 100 writes/sec):
batch_size = 25
batching_window = 5s
→ Fewer, larger batches; lower cost; slightly more latency
High volume (> 10,000 writes/sec):
batch_size = 500–1000
batching_window = 1s
parallelization_factor = 5–10
→ More concurrent Lambdas; process lag kept under 1 second
Ultra-high volume (real-time requirement):
Consider switching to Kinesis + Enhanced Fan-Out
→ 2.5 MB/s per shard, 70ms p99 latency
Monitoring Lag: The Key Metric
# Check current iterator age (lag) in CloudWatch
aws cloudwatch get-metric-statistics \
--namespace AWS/Lambda \
--metric-name IteratorAge \
--dimensions Name=FunctionName,Value=orders-stream-processor \
--start-time 2026-06-08T00:00:00Z \
--end-time 2026-06-08T01:00:00Z \
--period 60 \
--statistics Maximum \
--query 'Datapoints[*].[Timestamp,Maximum]' \
--output table
IteratorAge > 60,000ms (1 minute). This is the primary SLA indicator for stream processing health. If IteratorAge grows continuously, you have insufficient Lambda concurrency or your processing function is too slow.
Cost Analysis: Streams Pricing vs Kinesis
DynamoDB Streams pricing is simple and often surprising — it's significantly cheaper than most engineers expect. Understanding the cost model helps you choose the right architecture for your budget.
DynamoDB Streams Pricing
| Component | Cost | Notes |
|---|---|---|
| Stream reads (GetRecords) | $0.02 per 100,000 read request units | Each Lambda invocation reads 1+ batches |
| Stream storage | Free | Included in DynamoDB pricing |
| Lambda invocations | $0.20 per 1M requests | Standard Lambda pricing |
| Lambda duration | $0.0000166667 per GB-second | Standard Lambda pricing |
Kinesis Data Streams Pricing (for comparison)
| Component | Provisioned Cost | On-Demand Cost |
|---|---|---|
| Shard hours | $0.015/shard/hour ($10.80/shard/month) | N/A |
| PUT payload units | $0.014 per 1M 25KB units | $0.08 per 1M units |
| Extended retention (7–365 days) | $0.023 per GB-month | Same |
| Enhanced Fan-Out | $0.015 per shard/hour + $0.013 per GB | Included |
Cost Example: 1 Million Writes/Day
Table: Orders, 1M writes/day, avg item 1KB, NEW_AND_OLD_IMAGES
DynamoDB Streams:
Stream reads: 1M events / 100 batch_size = 10,000 Lambda invocations
Stream read cost: 10,000 × (1KB/4KB) ≈ 2,500 read units
Cost: 2,500 / 100,000 × $0.02 = $0.0005/day ≈ $0.015/month
Lambda (128ms avg, 256MB): 10,000 × 0.128s × 0.25GB × $0.0000166667 = $0.0053/day
Total: ~$0.17/month for streams + Lambda
Kinesis Data Streams (4 shards):
Shard cost: 4 × 720h × $0.015 = $43.20/month
PUT cost: 1M × 2KB/25KB ≈ 80K units/day → $33.60/month
Total: ~$76.80/month
Winner: DynamoDB Streams is ~450× cheaper for this workload.
Use Kinesis when you need retention > 24h or > 2 consumers.
Frequently Asked Questions
No — DynamoDB Streams guarantees at-least-once delivery with strict ordering within a shard. A record can be delivered more than once in failure scenarios (Lambda crashes mid-batch, ESM internal errors). Your handler must be idempotent. Use the sequence number as an idempotency key stored in DynamoDB with a TTL longer than your maximum retry window.
Yes, but with a caveat: Global Tables use Streams internally for replication. You can add a second consumer (Lambda) to the stream, but you'll see both local writes and remote replicated writes in your stream. Filter by eventSource == "aws:dynamodb" and check the awsRegion field if you only want to process writes originating in a specific region.
Records older than 24 hours are expired from the stream and lost. The ESM will skip to the oldest available record. To mitigate: set a CloudWatch alarm on IteratorAge to alert at > 12 hours, enable DLQ on the ESM, and implement an out-of-band recovery process (full table scan to re-bootstrap the downstream system) for this worst-case scenario.
Construct synthetic DynamoDB Stream events matching the AWS event schema, or use aws lambda invoke with a JSON payload. The AWS documentation provides sample event payloads for INSERT, MODIFY, and REMOVE. For integration testing, write to a real DynamoDB table with streams enabled and observe the invocation in CloudWatch Logs.
AWS Articles
Quick Reference
Stream View Types
- KEYS_ONLY — smallest, cache invalidation
- NEW_IMAGE — downstream sync
- OLD_IMAGE — audit / before state
- NEW_AND_OLD_IMAGES — full CDC
ESM Key Parameters
- bisect_batch_on_error: true
- max_retry_attempts: 3–5
- parallelization_factor: 2–10
- ReportBatchItemFailures: always