AWS DynamoDB Streams and Lambda: Event-Driven Data Patterns (2026)

DynamoDB Streams and Lambda Event-Driven Patterns

Every write to a DynamoDB table — every INSERT, MODIFY, and REMOVE — can trigger a Lambda function within milliseconds. That's the power of DynamoDB Streams: a time-ordered, exactly-once log of every change to your table, replayed into any downstream system you choose. Want to sync changes to Elasticsearch? Replicate cross-region? Build an audit trail? Invalidate a cache? DynamoDB Streams + Lambda makes all of this possible without polling, without cronjobs, and without touching your application code.

This guide goes deep. We'll cover stream view types and shard internals, every knob on the Lambda event source mapping, real production patterns with working Python and Java code, error handling that actually holds up under failure, and a frank cost analysis so you know what you're signing up for.

DynamoDB Streams Internals: View Types and Shard Architecture

DynamoDB Streams is a change data capture (CDC) mechanism that captures a time-ordered sequence of item-level modifications in a DynamoDB table. Each change record is called a stream record, and the stream is organized into shards — analogous to Kinesis shards. Understanding the internals is critical because your choice of stream view type and shard architecture directly determines how much data Lambda receives, what your Lambda can do with it, and how much you pay.

Stream View Types

When you enable a stream, you choose one of four view types. This is an immutable choice — you must disable and re-enable the stream to change it:

View TypeWhat's in the RecordSizeUse When
KEYS_ONLYOnly the PK and SK of the changed itemSmallestYou only need to know which item changed (e.g., cache invalidation)
NEW_IMAGEThe entire item as it looks after the changeMediumDownstream sync where you always want the latest state
OLD_IMAGEThe entire item as it looked before the changeMediumAudit trails where you want to know what was overwritten
NEW_AND_OLD_IMAGESBoth the before and after images of the itemLargestDiff-based sync, conflict detection, and full audit logs
Pro tip: Use NEW_AND_OLD_IMAGES during development — it gives you the most flexibility and makes debugging easy. Switch to a smaller view type in production once your access patterns are clear. A REMOVE event with NEW_IMAGE will have a null NewImage (the item no longer exists), so always null-check.

Shard Architecture and 24-Hour Retention

A DynamoDB Stream is automatically partitioned into shards, each of which contains a sequence of stream records. DynamoDB manages shard creation and splitting transparently — when your table's partition count grows (due to scaling), the stream gains corresponding shards. Each shard maps to one or more table partitions.

Key shard constraints you must design around:

  • Each shard can have at most one Lambda consumer at a given time (in a standard event source mapping). The parallelization factor can change this — see the performance section.
  • Stream records are retained for exactly 24 hours. If your Lambda is down or throttled for more than 24 hours, records are lost. Build your error handling strategy with this in mind.
  • Records within a shard are strictly ordered. Cross-shard ordering is not guaranteed. If you need global ordering, you need Kinesis Data Streams (see below).
  • A table can have at most 2 simultaneous stream consumers (e.g., two Lambda event source mappings on the same stream ARN). If you need more fan-out, use the SNS/SQS pattern described later.

Stream records include metadata fields: eventName (INSERT/MODIFY/REMOVE), eventSource (aws:dynamodb), awsRegion, dynamodb.SequenceNumber, dynamodb.SizeBytes, and the image(s) depending on view type.

Enabling Streams: CLI, Console, and Terraform

Enabling DynamoDB Streams on an existing table is a non-breaking, zero-downtime operation. The stream begins capturing changes from the moment it's enabled — it does not backfill historical data. Plan for this: if you need to bootstrap downstream systems with existing data, run a full table scan first, then enable the stream to catch incremental changes.

AWS CLI

# Enable streams on an existing table
aws dynamodb update-table \
  --table-name Orders \
  --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES \
  --region us-east-1

# Get the stream ARN (you'll need this for the Lambda trigger)
aws dynamodb describe-table \
  --table-name Orders \
  --query 'Table.LatestStreamArn' \
  --output text
# → arn:aws:dynamodb:us-east-1:123456789012:table/Orders/stream/2026-06-08T00:00:00.000

# List all stream shards
aws dynamodbstreams describe-stream \
  --stream-arn arn:aws:dynamodb:us-east-1:123456789012:table/Orders/stream/2026-06-08T00:00:00.000

Terraform (complete setup)

resource "aws_dynamodb_table" "orders" {
  name           = "Orders"
  billing_mode   = "PAY_PER_REQUEST"
  hash_key       = "customerId"
  range_key      = "orderId"

  attribute {
    name = "customerId"
    type = "S"
  }
  attribute {
    name = "orderId"
    type = "S"
  }

  # Enable DynamoDB Streams
  stream_enabled   = true
  stream_view_type = "NEW_AND_OLD_IMAGES"

  tags = {
    Environment = "production"
    Team        = "platform"
  }
}

# IAM role for Lambda to consume the stream
resource "aws_iam_role" "stream_processor" {
  name = "dynamodb-stream-processor"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action    = "sts:AssumeRole"
      Effect    = "Allow"
      Principal = { Service = "lambda.amazonaws.com" }
    }]
  })
}

resource "aws_iam_role_policy" "stream_policy" {
  name = "dynamodb-stream-policy"
  role = aws_iam_role.stream_processor.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "dynamodb:GetRecords",
          "dynamodb:GetShardIterator",
          "dynamodb:DescribeStream",
          "dynamodb:ListStreams"
        ]
        Resource = "${aws_dynamodb_table.orders.stream_arn}"
      },
      {
        Effect   = "Allow"
        Action   = ["logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents"]
        Resource = "arn:aws:logs:*:*:*"
      },
      {
        # For DLQ on failure
        Effect   = "Allow"
        Action   = ["sqs:SendMessage"]
        Resource = aws_sqs_queue.stream_dlq.arn
      }
    ]
  })
}

# Lambda function
resource "aws_lambda_function" "stream_processor" {
  function_name = "orders-stream-processor"
  runtime       = "python3.12"
  handler       = "handler.lambda_handler"
  role          = aws_iam_role.stream_processor.arn
  filename      = "lambda_stream.zip"
  timeout       = 300
  memory_size   = 256

  environment {
    variables = {
      OPENSEARCH_ENDPOINT = var.opensearch_endpoint
    }
  }
}

# DLQ for failed batches
resource "aws_sqs_queue" "stream_dlq" {
  name                      = "orders-stream-dlq"
  message_retention_seconds = 1209600  # 14 days
}

# Event Source Mapping — the glue between stream and Lambda
resource "aws_lambda_event_source_mapping" "orders_stream" {
  event_source_arn  = aws_dynamodb_table.orders.stream_arn
  function_name     = aws_lambda_function.stream_processor.arn
  starting_position = "TRIM_HORIZON"

  batch_size                         = 100
  maximum_batching_window_in_seconds = 5
  parallelization_factor             = 2
  bisect_batch_on_function_error     = true
  maximum_retry_attempts             = 3
  maximum_record_age_in_seconds      = 3600  # drop records older than 1 hour

  destination_config {
    on_failure {
      destination_arn = aws_sqs_queue.stream_dlq.arn
    }
  }

  # Enable partial batch response (Lambda reports individual item failures)
  function_response_types = ["ReportBatchItemFailures"]
}
Terraform note: The stream ARN is available as aws_dynamodb_table.orders.stream_arn only after the table is created with stream_enabled = true. If you're adding streams to an existing Terraform-managed table, add both stream_enabled and stream_view_type and run terraform apply — it's an in-place update with no downtime.

Lambda Event Source Mapping: Every Parameter Explained

The Event Source Mapping (ESM) is the AWS-managed poller that reads records from your stream shards and invokes your Lambda. It's not a trigger in the traditional sense — it's a long-running poller managed by the Lambda service. Understanding every ESM parameter is the difference between a flaky processor and a production-grade one.

ParameterDefaultRecommendedNotes
batch_size10025–500Records per Lambda invocation. Higher = fewer cold starts, but larger payloads. Max 10,000 for Streams.
maximum_batching_window_in_seconds01–5Wait up to N seconds to fill a batch. Reduces Lambda invocations and cost during low-traffic periods.
starting_positionLATEST or TRIM_HORIZONTRIM_HORIZON = process from oldest available record. LATEST = only new records. AT_TIMESTAMP = specific time.
parallelization_factor12–10Concurrent Lambda invocations per shard. Massively increases throughput. Max 10. See performance section.
bisect_batch_on_function_errorfalsetrueOn error, splits batch in half and retries each half. Helps isolate poison-pill records.
maximum_retry_attempts-1 (infinite)3–5Infinite retries with a 24-hour stream window can block your shard for a full day. Always set a limit.
maximum_record_age_in_seconds-13600Discard records older than N seconds. Prevents processing stale events after an outage.
function_response_types[]["ReportBatchItemFailures"]Enables partial batch response — only retry failed items, not the entire batch.
destination_config.on_failureSQS DLQSend failed batches (after all retries exhausted) to an SQS queue for manual inspection.
Starting position strategy: Use TRIM_HORIZON when first connecting a new Lambda to an existing stream — it processes everything in the 24-hour window. Use LATEST for new streams or when historical records are irrelevant. Use AT_TIMESTAMP for incident recovery ("reprocess everything after the deploy at 14:30").

CLI: Create the Event Source Mapping

STREAM_ARN=$(aws dynamodb describe-table \
  --table-name Orders \
  --query 'Table.LatestStreamArn' \
  --output text)

aws lambda create-event-source-mapping \
  --function-name orders-stream-processor \
  --event-source-arn "$STREAM_ARN" \
  --batch-size 100 \
  --maximum-batching-window-in-seconds 5 \
  --starting-position TRIM_HORIZON \
  --parallelization-factor 2 \
  --bisect-batch-on-function-error \
  --maximum-retry-attempts 3 \
  --maximum-record-age-in-seconds 3600 \
  --function-response-types ReportBatchItemFailures \
  --destination-config '{"OnFailure":{"Destination":"arn:aws:sqs:us-east-1:123456789012:orders-stream-dlq"}}'

Writing the Lambda Handler: Python and Java

The Lambda receives a batch of stream records grouped by shard. Your handler must parse the eventName to decide what to do, unmarshall DynamoDB's wire format into native types, and — critically — report partial failures correctly so the ESM doesn't retry records that already succeeded.

Python Handler (with partial batch response)

import json
import logging
from typing import Any
from boto3.dynamodb.types import TypeDeserializer

logger = logging.getLogger()
logger.setLevel(logging.INFO)

deserializer = TypeDeserializer()

def unmarshall(dynamo_item: dict) -> dict:
    """Convert DynamoDB wire format to Python native types."""
    return {k: deserializer.deserialize(v) for k, v in dynamo_item.items()}

def process_insert(new_image: dict, sequence_number: str) -> None:
    """Handle a new item being inserted into DynamoDB."""
    logger.info(f"INSERT: seq={sequence_number}, item={new_image}")
    # Example: sync to OpenSearch
    # opensearch_client.index(index="orders", id=new_image["orderId"], body=new_image)

def process_modify(old_image: dict, new_image: dict, sequence_number: str) -> None:
    """Handle an existing item being modified."""
    changed_fields = {
        k: {"old": old_image.get(k), "new": new_image.get(k)}
        for k in set(old_image) | set(new_image)
        if old_image.get(k) != new_image.get(k)
    }
    logger.info(f"MODIFY: seq={sequence_number}, changes={changed_fields}")
    # Example: partial update in OpenSearch
    # opensearch_client.update(index="orders", id=new_image["orderId"],
    #                          body={"doc": changed_fields})

def process_remove(old_image: dict, keys: dict, sequence_number: str) -> None:
    """Handle an item being deleted from DynamoDB."""
    logger.info(f"REMOVE: seq={sequence_number}, keys={keys}")
    # Example: delete from OpenSearch
    # opensearch_client.delete(index="orders", id=keys["orderId"])

def lambda_handler(event: dict, context: Any) -> dict:
    """
    Main Lambda handler. Returns ReportBatchItemFailures response
    so the ESM only retries failed records, not the entire batch.
    """
    batch_item_failures = []

    for record in event["Records"]:
        sequence_number = record["dynamodb"]["SequenceNumber"]
        event_name = record["eventName"]  # INSERT | MODIFY | REMOVE

        try:
            dynamo_record = record["dynamodb"]

            new_image = (
                unmarshall(dynamo_record["NewImage"])
                if "NewImage" in dynamo_record
                else None
            )
            old_image = (
                unmarshall(dynamo_record["OldImage"])
                if "OldImage" in dynamo_record
                else None
            )
            keys = unmarshall(dynamo_record["Keys"])

            if event_name == "INSERT":
                process_insert(new_image, sequence_number)
            elif event_name == "MODIFY":
                process_modify(old_image, new_image, sequence_number)
            elif event_name == "REMOVE":
                process_remove(old_image, keys, sequence_number)
            else:
                logger.warning(f"Unknown eventName: {event_name}")

        except Exception as e:
            logger.error(
                f"Failed to process record seq={sequence_number}: {e}",
                exc_info=True
            )
            # Report this specific record as failed — ESM will retry it
            batch_item_failures.append({"itemIdentifier": sequence_number})

    return {"batchItemFailures": batch_item_failures}

Java Handler (with Jackson + DynamoDB Enhanced Client)

package com.techoral.streams;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.enhanced.dynamodb.document.EnhancedDocument;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamProcessor
    implements RequestHandler<DynamodbEvent, StreamsEventResponse> {

    private static final Logger log = LoggerFactory.getLogger(StreamProcessor.class);

    @Override
    public StreamsEventResponse handleRequest(DynamodbEvent event, Context context) {
        List<StreamsEventResponse.BatchItemFailure> failures = new ArrayList<>();

        for (DynamodbEvent.DynamodbStreamRecord record : event.getRecords()) {
            String sequenceNumber = record.getDynamodb().getSequenceNumber();
            String eventName = record.getEventName();

            try {
                switch (eventName) {
                    case "INSERT" -> handleInsert(record, sequenceNumber);
                    case "MODIFY" -> handleModify(record, sequenceNumber);
                    case "REMOVE" -> handleRemove(record, sequenceNumber);
                    default -> log.warn("Unknown event: {}", eventName);
                }
            } catch (Exception e) {
                log.error("Failed processing seq={}: {}", sequenceNumber, e.getMessage(), e);
                failures.add(StreamsEventResponse.BatchItemFailure.builder()
                    .withItemIdentifier(sequenceNumber)
                    .build());
            }
        }

        return StreamsEventResponse.builder()
            .withBatchItemFailures(failures)
            .build();
    }

    private void handleInsert(DynamodbEvent.DynamodbStreamRecord record, String seq) {
        Map<String, com.amazonaws.services.dynamodb.model.AttributeValue> newImage =
            record.getDynamodb().getNewImage();
        if (newImage == null) return;

        String orderId = newImage.get("orderId").getS();
        String customerId = newImage.get("customerId").getS();
        String status = newImage.containsKey("status") ? newImage.get("status").getS() : "UNKNOWN";

        log.info("INSERT seq={} orderId={} customerId={} status={}", seq, orderId, customerId, status);
        // syncToOpenSearch(orderId, newImage);
    }

    private void handleModify(DynamodbEvent.DynamodbStreamRecord record, String seq) {
        Map<String, com.amazonaws.services.dynamodb.model.AttributeValue> oldImage =
            record.getDynamodb().getOldImage();
        Map<String, com.amazonaws.services.dynamodb.model.AttributeValue> newImage =
            record.getDynamodb().getNewImage();

        if (oldImage != null && newImage != null) {
            String oldStatus = oldImage.containsKey("status") ? oldImage.get("status").getS() : null;
            String newStatus = newImage.containsKey("status") ? newImage.get("status").getS() : null;
            if (!java.util.Objects.equals(oldStatus, newStatus)) {
                log.info("MODIFY seq={} status changed: {} → {}", seq, oldStatus, newStatus);
                // publishStatusChangeEvent(newImage);
            }
        }
    }

    private void handleRemove(DynamodbEvent.DynamodbStreamRecord record, String seq) {
        Map<String, com.amazonaws.services.dynamodb.model.AttributeValue> keys =
            record.getDynamodb().getKeys();
        String orderId = keys.get("orderId").getS();
        log.info("REMOVE seq={} orderId={}", seq, orderId);
        // removeFromOpenSearch(orderId);
    }
}
DynamoDB type unmarshalling: Stream records use DynamoDB's wire format: {"S": "hello"}, {"N": "42"}, {"BOOL": true}, {"L": [...]}, {"M": {...}}. Python's boto3.dynamodb.types.TypeDeserializer handles this cleanly. In Java, use the AWS Lambda Events library's built-in AttributeValue map or the DynamoDB Enhanced Client's EnhancedDocument. Never parse the wire format manually — edge cases like null sets and binary types will bite you.

Real-World Patterns: CDC, Replication, Audit, Cache Invalidation

DynamoDB Streams unlocks a class of event-driven architectures that would otherwise require complex polling infrastructure. Here are four production patterns with concrete implementation details.

Pattern 1: Change Data Capture to OpenSearch

Keep a search index in sync with your DynamoDB table without dual-writes from your application. The stream guarantees every change is propagated, in order, exactly once. Use NEW_IMAGE for full document sync or NEW_AND_OLD_IMAGES for delta-only updates:

from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
import boto3

def get_opensearch_client() -> OpenSearch:
    credentials = boto3.Session().get_credentials()
    region = "us-east-1"
    awsauth = AWS4Auth(credentials.access_key, credentials.secret_key,
                       region, "es", session_token=credentials.token)
    return OpenSearch(
        hosts=[{"host": os.environ["OPENSEARCH_ENDPOINT"], "port": 443}],
        http_auth=awsauth,
        use_ssl=True,
        connection_class=RequestsHttpConnection
    )

es = get_opensearch_client()

def sync_to_opensearch(event_name: str, new_image: dict, keys: dict) -> None:
    doc_id = keys["orderId"]
    if event_name == "REMOVE":
        es.delete(index="orders", id=doc_id, ignore=[404])
    else:
        es.index(index="orders", id=doc_id, body=new_image)

Pattern 2: Cross-Region Replication (Custom)

DynamoDB Global Tables handles cross-region replication natively, but if you need custom logic (filtering, transformation, selective replication), build it with Streams + Lambda:

import boto3
import os

dest_dynamodb = boto3.resource("dynamodb", region_name=os.environ["DEST_REGION"])
dest_table = dest_dynamodb.Table(os.environ["DEST_TABLE"])

def replicate_item(event_name: str, new_image: dict, keys: dict) -> None:
    if event_name == "REMOVE":
        dest_table.delete_item(Key=keys)
    elif event_name in ("INSERT", "MODIFY"):
        # Optional: transform or filter before replicating
        if new_image.get("replicateToRegion") == os.environ["DEST_REGION"]:
            dest_table.put_item(Item=new_image)

Pattern 3: Audit Log to S3 via Kinesis Firehose

Write every change to S3 as an immutable audit trail, partitioned by date for Athena queries:

import boto3
import json
import time

firehose = boto3.client("firehose")

def write_audit_log(event_name: str, old_image: dict, new_image: dict,
                     keys: dict, sequence_number: str) -> None:
    record = {
        "eventName": event_name,
        "sequenceNumber": sequence_number,
        "timestamp": int(time.time()),
        "keys": keys,
        "oldImage": old_image,
        "newImage": new_image,
    }
    firehose.put_record(
        DeliveryStreamName="orders-audit-trail",
        Record={"Data": json.dumps(record) + "\n"}  # newline-delimited JSON for Athena
    )

Pattern 4: Cache Invalidation

Use KEYS_ONLY stream view for minimal data transfer — you only need to know which cache key to evict:

import redis
import os

cache = redis.Redis.from_url(os.environ["REDIS_URL"])

def invalidate_cache(keys: dict) -> None:
    # Build the cache key matching your application's cache key scheme
    cache_key = f"order:{keys['customerId']}:{keys['orderId']}"
    count = cache.delete(cache_key)
    # Also invalidate list caches for this customer
    list_key = f"orders:customer:{keys['customerId']}"
    cache.delete(list_key)
    logger.info(f"Invalidated {count} cache key(s) for {cache_key}")

Error Handling: DLQ, Partial Batch Response, Retry Strategy

Error handling in stream processing is where most production systems fall apart. The naive approach — let Lambda throw an exception and retry the whole batch — leads to stuck shards, duplicate processing, and eventually data loss as records age past the 24-hour window. Here's the complete strategy.

The Problem with Default Behavior

Without configuration, a single failing record blocks the entire shard. Lambda retries the full batch indefinitely until the record expires (24 hours) or you disable the ESM. During this time, no new records from that shard are processed. In a high-write table, this means your downstream system falls further and further behind.

Three-Layer Defense

Layer 1: Partial Batch Response — Enable ReportBatchItemFailures. Your handler returns a list of failed sequence numbers. The ESM only retries those specific records, not the whole batch. This is the single most impactful change you can make.

Layer 2: Bisect on Error — Enable bisect_batch_on_function_error. If your Lambda throws an unhandled exception (the handler itself crashes, not individual record processing), the ESM halves the batch and retries each half separately. This isolates poison-pill records that crash the handler.

Layer 3: DLQ on Failure — After maximum_retry_attempts, the ESM sends the failed batch to your SQS DLQ. This prevents permanent data loss and gives you a queue to investigate and replay from:

import json
import boto3
import os

sqs = boto3.client("sqs")
DLQ_URL = os.environ["DLQ_URL"]

def send_to_dlq(record: dict, error: str) -> None:
    """Manually send a specific record to DLQ with error context."""
    sqs.send_message(
        QueueUrl=DLQ_URL,
        MessageBody=json.dumps({
            "sequenceNumber": record["dynamodb"]["SequenceNumber"],
            "eventName": record["eventName"],
            "error": error,
            "record": record,
        }),
        MessageAttributes={
            "ErrorType": {
                "DataType": "String",
                "StringValue": type(error).__name__
            }
        }
    )

DLQ Replay Script

import boto3
import json

def replay_from_dlq(dlq_url: str, processor_fn) -> None:
    """
    Pull messages from DLQ and replay them through the processor.
    Run this as a one-off script during incident recovery.
    """
    sqs = boto3.client("sqs")

    while True:
        response = sqs.receive_message(
            QueueUrl=dlq_url,
            MaxNumberOfMessages=10,
            WaitTimeSeconds=20
        )
        messages = response.get("Messages", [])
        if not messages:
            break

        for msg in messages:
            body = json.loads(msg["Body"])
            original_record = body["record"]

            try:
                processor_fn(original_record)
                sqs.delete_message(
                    QueueUrl=dlq_url,
                    ReceiptHandle=msg["ReceiptHandle"]
                )
                print(f"Replayed and deleted: {body['sequenceNumber']}")
            except Exception as e:
                print(f"Failed replay: {body['sequenceNumber']}: {e}")
                # Leave in DLQ for next attempt
Idempotency is mandatory: Because streams guarantee at-least-once delivery (not exactly-once), your handler must be idempotent. A record can be delivered twice if Lambda is interrupted mid-batch. Use the DynamoDB sequence number or a composite of event name + item keys as an idempotency key. Write processed sequence numbers to a DynamoDB table with a 48-hour TTL.

DynamoDB Kinesis Streams: When to Use It Instead

Since November 2020, DynamoDB offers a second streaming mechanism: Kinesis Data Streams integration. Instead of writing change records to DynamoDB's proprietary stream, DynamoDB writes them directly to a Kinesis Data Stream you provision. This is fundamentally different from DynamoDB Streams and serves different use cases.

DynamoDB Streams vs Kinesis Data Streams

FeatureDynamoDB StreamsKinesis Data Streams
Retention24 hours (fixed)1–365 days (configurable)
Max consumers2Unlimited (Enhanced Fan-Out)
ReplayWithin 24 hours onlyReplay from any point in retention window
Shard managementAutomatic (mirrors DDB partitions)Manual or on-demand auto scaling
CostFree (pay for reads only)Per-shard-hour + PUT payload
Firehose integrationVia LambdaNative, no-code
Cross-accountNoYes (Kinesis cross-account)
SetupOne API callProvision KDS first, then link to DDB

Enable Kinesis Data Streams integration

# Create the Kinesis stream first
aws kinesis create-stream \
  --stream-name orders-changes \
  --shard-count 4

# Link DynamoDB to the Kinesis stream
aws dynamodb enable-kinesis-streaming-destination \
  --table-name Orders \
  --stream-arn arn:aws:kinesis:us-east-1:123456789012:stream/orders-changes

# Verify status (takes ~1 minute to activate)
aws dynamodb describe-kinesis-streaming-destination \
  --table-name Orders

Terraform: Kinesis Direct Integration

resource "aws_kinesis_stream" "orders_changes" {
  name             = "orders-changes"
  shard_count      = 4
  retention_period = 168  # 7 days

  stream_mode_details {
    stream_mode = "PROVISIONED"
  }
}

resource "aws_dynamodb_kinesis_streaming_destination" "orders" {
  table_name = aws_dynamodb_table.orders.name
  stream_arn = aws_kinesis_stream.orders_changes.arn
}

# Send to S3 via Firehose — no Lambda needed for raw archival
resource "aws_kinesis_firehose_delivery_stream" "orders_archive" {
  name        = "orders-change-archive"
  destination = "extended_s3"

  kinesis_source_configuration {
    kinesis_stream_arn = aws_kinesis_stream.orders_changes.arn
    role_arn           = aws_iam_role.firehose.arn
  }

  extended_s3_configuration {
    role_arn           = aws_iam_role.firehose.arn
    bucket_arn         = aws_s3_bucket.audit_archive.arn
    prefix             = "dynamodb-changes/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/"
    error_output_prefix = "errors/"
    buffering_size     = 128
    buffering_interval = 300
    compression_format = "GZIP"
  }
}
When to choose Kinesis over DynamoDB Streams: Choose Kinesis when you need more than 2 consumers, need retention beyond 24 hours, need replay capability for disaster recovery, need to fan out to Firehose without Lambda, or need cross-account streaming. Choose DynamoDB Streams when you have a simple Lambda integration and want the lowest possible cost and complexity.

Fan-Out Pattern: Streams → Lambda → SNS/SQS for Multiple Consumers

DynamoDB Streams supports only 2 simultaneous stream consumers (Lambda event source mappings). If you need 3, 5, or 10 downstream systems consuming the same changes, you need the fan-out pattern: one Lambda reads the stream and publishes to SNS, which fans out to multiple SQS queues, each consumed by a dedicated Lambda.

import boto3
import json
import os

sns = boto3.client("sns")
TOPIC_ARN = os.environ["ORDER_CHANGE_TOPIC_ARN"]

def lambda_handler(event: dict, context) -> dict:
    """Fan-out Lambda: reads stream, publishes to SNS."""
    batch_item_failures = []

    for record in event["Records"]:
        sequence_number = record["dynamodb"]["SequenceNumber"]
        try:
            event_name = record["eventName"]
            dynamo = record["dynamodb"]

            message = {
                "eventName": event_name,
                "sequenceNumber": sequence_number,
                "tableName": record["eventSourceARN"].split("/")[1],
                "keys": dynamo.get("Keys", {}),
                "newImage": dynamo.get("NewImage"),
                "oldImage": dynamo.get("OldImage"),
            }

            sns.publish(
                TopicArn=TOPIC_ARN,
                Message=json.dumps(message),
                MessageAttributes={
                    # Allows SQS subscribers to filter by event type
                    "eventName": {
                        "DataType": "String",
                        "StringValue": event_name
                    }
                }
            )
        except Exception as e:
            logger.error(f"Fan-out failed for seq={sequence_number}: {e}")
            batch_item_failures.append({"itemIdentifier": sequence_number})

    return {"batchItemFailures": batch_item_failures}

SNS → SQS Fan-Out Terraform

resource "aws_sns_topic" "order_changes" {
  name = "order-change-events"
}

# Consumer 1: OpenSearch sync queue
resource "aws_sqs_queue" "opensearch_sync" {
  name = "order-opensearch-sync"
}

# Consumer 2: Analytics pipeline queue
resource "aws_sqs_queue" "analytics" {
  name = "order-analytics-events"
}

# Consumer 3: Cache invalidation queue (only INSERT + MODIFY)
resource "aws_sqs_queue" "cache_invalidation" {
  name = "order-cache-invalidation"
}

# Subscribe each SQS to SNS with optional filtering
resource "aws_sns_topic_subscription" "opensearch" {
  topic_arn = aws_sns_topic.order_changes.arn
  protocol  = "sqs"
  endpoint  = aws_sqs_queue.opensearch_sync.arn
  # No filter — receive all events
}

resource "aws_sns_topic_subscription" "cache" {
  topic_arn = aws_sns_topic.order_changes.arn
  protocol  = "sqs"
  endpoint  = aws_sqs_queue.cache_invalidation.arn
  filter_policy = jsonencode({
    eventName = ["INSERT", "MODIFY"]  # Cache only needs changes, not deletes
  })
}
Architecture note: With this pattern, the fan-out Lambda becomes a critical single point of failure. Give it a DLQ, set maximum_retry_attempts conservatively, and alarm on DLQ depth. The individual consumer Lambdas should also have their own DLQs. Each SQS queue provides backpressure buffering if a downstream system is temporarily unavailable.

Performance Tuning: Shards, Parallelization, and Concurrency

By default, the Lambda ESM reads one shard sequentially — one batch at a time, waiting for the Lambda invocation to complete before reading the next batch. On a high-write DynamoDB table, this creates lag. Here's how to tune it.

Parallelization Factor

The parallelization factor (1–10) controls how many concurrent Lambda invocations run per shard simultaneously. A factor of 4 on a table with 10 shards means up to 40 concurrent Lambda invocations. This is the primary lever for reducing lag:

# Update an existing ESM to increase parallelization
aws lambda update-event-source-mapping \
  --uuid "your-esm-uuid" \
  --parallelization-factor 4 \
  --batch-size 200 \
  --maximum-batching-window-in-seconds 3

Lambda Concurrency Limits

Calculate your peak concurrency requirement:

Peak Lambda concurrency = shard_count × parallelization_factor

Example:
- Table scales to 20 shards during peak
- Parallelization factor = 5
- Peak concurrency = 20 × 5 = 100 concurrent Lambdas

Set reserved concurrency on the function:
  aws lambda put-function-concurrency \
    --function-name orders-stream-processor \
    --reserved-concurrent-executions 120  # 20% headroom

Batch Size and Batching Window Tradeoffs

Low volume (< 100 writes/sec):
  batch_size = 25
  batching_window = 5s
  → Fewer, larger batches; lower cost; slightly more latency

High volume (> 10,000 writes/sec):
  batch_size = 500–1000
  batching_window = 1s
  parallelization_factor = 5–10
  → More concurrent Lambdas; process lag kept under 1 second

Ultra-high volume (real-time requirement):
  Consider switching to Kinesis + Enhanced Fan-Out
  → 2.5 MB/s per shard, 70ms p99 latency

Monitoring Lag: The Key Metric

# Check current iterator age (lag) in CloudWatch
aws cloudwatch get-metric-statistics \
  --namespace AWS/Lambda \
  --metric-name IteratorAge \
  --dimensions Name=FunctionName,Value=orders-stream-processor \
  --start-time 2026-06-08T00:00:00Z \
  --end-time 2026-06-08T01:00:00Z \
  --period 60 \
  --statistics Maximum \
  --query 'Datapoints[*].[Timestamp,Maximum]' \
  --output table
IteratorAge alert: Set a CloudWatch alarm on IteratorAge > 60,000ms (1 minute). This is the primary SLA indicator for stream processing health. If IteratorAge grows continuously, you have insufficient Lambda concurrency or your processing function is too slow.

Cost Analysis: Streams Pricing vs Kinesis

DynamoDB Streams pricing is simple and often surprising — it's significantly cheaper than most engineers expect. Understanding the cost model helps you choose the right architecture for your budget.

DynamoDB Streams Pricing

ComponentCostNotes
Stream reads (GetRecords)$0.02 per 100,000 read request unitsEach Lambda invocation reads 1+ batches
Stream storageFreeIncluded in DynamoDB pricing
Lambda invocations$0.20 per 1M requestsStandard Lambda pricing
Lambda duration$0.0000166667 per GB-secondStandard Lambda pricing

Kinesis Data Streams Pricing (for comparison)

ComponentProvisioned CostOn-Demand Cost
Shard hours$0.015/shard/hour ($10.80/shard/month)N/A
PUT payload units$0.014 per 1M 25KB units$0.08 per 1M units
Extended retention (7–365 days)$0.023 per GB-monthSame
Enhanced Fan-Out$0.015 per shard/hour + $0.013 per GBIncluded

Cost Example: 1 Million Writes/Day

Table: Orders, 1M writes/day, avg item 1KB, NEW_AND_OLD_IMAGES

DynamoDB Streams:
  Stream reads: 1M events / 100 batch_size = 10,000 Lambda invocations
  Stream read cost: 10,000 × (1KB/4KB) ≈ 2,500 read units
  Cost: 2,500 / 100,000 × $0.02 = $0.0005/day ≈ $0.015/month
  Lambda (128ms avg, 256MB): 10,000 × 0.128s × 0.25GB × $0.0000166667 = $0.0053/day
  Total: ~$0.17/month for streams + Lambda

Kinesis Data Streams (4 shards):
  Shard cost: 4 × 720h × $0.015 = $43.20/month
  PUT cost: 1M × 2KB/25KB ≈ 80K units/day → $33.60/month
  Total: ~$76.80/month

Winner: DynamoDB Streams is ~450× cheaper for this workload.
Use Kinesis when you need retention > 24h or > 2 consumers.
Hidden cost: If your Lambda calls other AWS services (OpenSearch, SQS, S3, RDS), those API calls dominate your cost, not the stream reads. Profile your Lambda's AWS API calls and optimize the highest-cost operations first (batch writes, connection pooling, payload compression).

Frequently Asked Questions

Q: Does DynamoDB Streams guarantee exactly-once delivery?

No — DynamoDB Streams guarantees at-least-once delivery with strict ordering within a shard. A record can be delivered more than once in failure scenarios (Lambda crashes mid-batch, ESM internal errors). Your handler must be idempotent. Use the sequence number as an idempotency key stored in DynamoDB with a TTL longer than your maximum retry window.

Q: Can I use DynamoDB Streams with DynamoDB Global Tables?

Yes, but with a caveat: Global Tables use Streams internally for replication. You can add a second consumer (Lambda) to the stream, but you'll see both local writes and remote replicated writes in your stream. Filter by eventSource == "aws:dynamodb" and check the awsRegion field if you only want to process writes originating in a specific region.

Q: What happens if my Lambda is down for more than 24 hours?

Records older than 24 hours are expired from the stream and lost. The ESM will skip to the oldest available record. To mitigate: set a CloudWatch alarm on IteratorAge to alert at > 12 hours, enable DLQ on the ESM, and implement an out-of-band recovery process (full table scan to re-bootstrap the downstream system) for this worst-case scenario.

Q: How do I test my stream Lambda locally?

Construct synthetic DynamoDB Stream events matching the AWS event schema, or use aws lambda invoke with a JSON payload. The AWS documentation provides sample event payloads for INSERT, MODIFY, and REMOVE. For integration testing, write to a real DynamoDB table with streams enabled and observe the invocation in CloudWatch Logs.

Quick Reference

Stream View Types

  • KEYS_ONLY — smallest, cache invalidation
  • NEW_IMAGE — downstream sync
  • OLD_IMAGE — audit / before state
  • NEW_AND_OLD_IMAGES — full CDC

ESM Key Parameters

  • bisect_batch_on_error: true
  • max_retry_attempts: 3–5
  • parallelization_factor: 2–10
  • ReportBatchItemFailures: always