AWS Kinesis: Real-Time Data Streaming and Analytics

AWS Kinesis is a family of fully managed services for ingesting, processing, and analyzing real-time data streams at any scale. Whether you're capturing clickstream events from millions of users, ingesting IoT sensor readings, or building a real-time fraud detection pipeline, Kinesis provides the building blocks to move data from source to destination in seconds — not hours. This guide covers the full Kinesis family, practical producer and consumer patterns, shard capacity math, and a clear decision framework for choosing between Kinesis, MSK, and SQS.

Kinesis Family Overview

The Kinesis brand covers four distinct services that share a streaming philosophy but serve very different operational roles:

ServicePrimary RoleConsumersTypical Latency
Kinesis Data Streams (KDS)Durable, ordered, replayable stream of recordsKCL apps, Lambda, Flink, custom~70 ms
Kinesis Data FirehoseManaged delivery pipeline to S3/Redshift/OpenSearch/SplunkAWS-managed — no custom consumers60s–900s buffer
Kinesis Data AnalyticsReal-time SQL or Apache Flink on streamsKDS or Firehose as source; writes to KDS/S3/etc.Sub-second (Flink)
Kinesis Video StreamsIngest, store, and process video from devicesRekognition, custom ML, HLS playbackSeconds
Design principle: These services compose well together. A common pattern chains them as: application → KDSKDA (Flink) for real-time aggregation → FirehoseS3 for archival. Each layer can be replaced or bypassed depending on your latency and cost requirements.

For most architectures, you'll work primarily with KDS (the core abstraction) and Firehose (the delivery workhorse). KDA adds stream processing logic when Lambda isn't expressive enough. Video Streams is a specialized service for camera/device video ingestion.

Kinesis Data Streams: Shards, Partition Keys, Retention

A Kinesis Data Stream is made up of one or more shards. Each shard is the unit of capacity: it provides 1 MB/s write throughput and 2 MB/s read throughput, up to 1,000 records/s writes and 5 read transactions/s per shard. All records within a shard are strictly ordered.

Partition Keys and Sequence Numbers

Every record you write to KDS has a partition key — a string you choose (e.g., userId, deviceId). Kinesis hashes the partition key using MD5 and uses the result to determine which shard receives the record. All records with the same partition key go to the same shard, guaranteeing per-key order.

Once a record is accepted, Kinesis assigns it a sequence number — a monotonically increasing, 128-bit integer unique within the shard. Consumers use sequence numbers to checkpoint their position and resume from exactly where they left off after a restart.

Retention Period

By default, records are retained for 24 hours. You can extend this to 7 days (standard) or up to 365 days with long-term data retention (additional cost). Extended retention is valuable for replaying data to re-process after a bug fix, backfilling a new consumer, or auditing.

Retention TierDurationPricing Impact
Default24 hoursIncluded in shard-hour pricing
Extended7 days+$0.020/shard-hour above 24h
Long-termUp to 365 days+$0.023/GB/month
Note: Kinesis uses an at-least-once delivery model. Duplicate records are possible, especially during resharding or consumer failures. Design your consumers to be idempotent — use sequence numbers or a deduplication key in your downstream store.

Producer Patterns: KPL, SDK, Kinesis Agent

Three main mechanisms exist for writing to a Kinesis stream, each suited to different throughput and latency profiles.

1. AWS SDK — PutRecords API

The simplest approach: call PutRecords directly via boto3, the Java SDK, or any AWS SDK. PutRecords accepts up to 500 records per call (max 5 MB total). It's synchronous and returns a response with per-record success/failure, so you can retry failed records immediately.

import boto3, json, time

kinesis = boto3.client('kinesis', region_name='us-east-1')
STREAM_NAME = 'user-clickstream'

def put_records_batch(events: list[dict]):
    """Write a batch of up to 500 events to Kinesis."""
    records = [
        {
            'Data': json.dumps(event).encode('utf-8'),
            'PartitionKey': event['user_id']   # per-user ordering
        }
        for event in events
    ]
    response = kinesis.put_records(
        Records=records,
        StreamName=STREAM_NAME
    )
    failed = response['FailedRecordCount']
    if failed > 0:
        # retry failed records (ProvisionedThroughputExceededException)
        retry = [
            records[i] for i, r in enumerate(response['Records'])
            if 'ErrorCode' in r
        ]
        time.sleep(0.5)
        kinesis.put_records(Records=retry, StreamName=STREAM_NAME)
    return response['FailedRecordCount']

# Example usage
batch = [
    {'user_id': 'u-123', 'event': 'page_view', 'page': '/checkout', 'ts': time.time()},
    {'user_id': 'u-456', 'event': 'add_to_cart', 'product': 'widget-42', 'ts': time.time()},
]
put_records_batch(batch)

2. Kinesis Producer Library (KPL)

KPL is a Java/C++ library that runs as a child process and handles aggregation (packing multiple logical records into one Kinesis record), batching, retry with exponential backoff, and async flushing. It can achieve significantly higher throughput per shard than raw PutRecord calls. The trade-off: KPL adds ~1–5 seconds of latency due to buffering, and KPL-aggregated records must be de-aggregated on the consumer side using the KCL or the amazon-kinesis-client de-aggregation library.

When to use KPL: High-throughput scenarios (millions of events/min from a single producer) where sub-second latency is not required. IoT aggregation, log shipping, ad-tech event firehoses.

3. Kinesis Agent

Kinesis Agent is a Java daemon you install on EC2 or on-premise servers. It tails log files (including rotation), parses them (supports Apache CLF, syslog, CSV), and writes to KDS or Firehose. Configuration is a JSON file — no code required.

{
  "kinesis.endpoint": "",
  "flows": [
    {
      "filePattern": "/var/log/nginx/access.log*",
      "kinesisStream": "nginx-access-logs",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
        {
          "optionName": "LOGTOJSON",
          "logEntryFormat": "COMBINEDAPACHELOG",
          "matchPattern": "^([\\d.]+) (\\S+) (\\S+) \\[([^\\]]+)\\] \"([^\"]+)\" (\\d+) (\\d+) \"([^\"]+)\" \"([^\"]+)\"$",
          "customFieldNames": ["host","ident","authuser","datetime","request","response","bytes","referrer","agent"]
        }
      ]
    }
  ]
}

Consumer Patterns: KCL, Lambda Trigger, Enhanced Fan-Out

Reading from a KDS stream falls into three main patterns, each with different throughput and latency characteristics.

1. Kinesis Client Library (KCL)

KCL is a Java library (also available via MultiLangDaemon for Python/Ruby/Node) that handles shard discovery, checkpointing to DynamoDB, consumer rebalancing across worker nodes, and lease management. You implement a single processRecords() method. KCL uses the shared throughput model: all KCL consumers on a shard share the 2 MB/s read limit.

2. Lambda Trigger (Event Source Mapping)

The simplest consumer for serverless pipelines. Lambda polls the shard using GetRecords and invokes your function with a batch of records. You control batch size (1–10,000), batch window (0–300s), and starting position (LATEST, TRIM_HORIZON, or a specific timestamp).

import base64, json

def lambda_handler(event, context):
    """
    Kinesis Lambda trigger — receives a batch of records from one shard.
    Records arrive base64-encoded inside event['Records'].
    """
    processed = 0
    errors = []

    for record in event['Records']:
        # Decode the base64 payload
        payload = json.loads(
            base64.b64decode(record['kinesis']['data']).decode('utf-8')
        )
        shard_id = record['eventID'].split(':')[0]
        seq_no   = record['kinesis']['sequenceNumber']

        try:
            # Your business logic here
            handle_event(payload)
            processed += 1
        except Exception as e:
            errors.append({'seq': seq_no, 'error': str(e)})

    print(f"Processed {processed} records, {len(errors)} errors")

    # To report partial failures (requires ReportBatchItemFailures bisect mode):
    # return {'batchItemFailures': [{'itemIdentifier': e['seq']} for e in errors]}
    return {'statusCode': 200}

def handle_event(payload: dict):
    """Placeholder — write to DynamoDB, call downstream API, etc."""
    if payload.get('event') == 'purchase':
        # send to fraud scoring service
        pass

3. Enhanced Fan-Out (EFO)

Standard KDS GetRecords polling shares the 2 MB/s shard throughput across all consumers. Enhanced Fan-Out gives each registered consumer its own dedicated 2 MB/s per-shard pipe using HTTP/2 server-push. Latency drops from ~200ms (polling) to ~70ms. The cost is $0.015/shard-hour + $0.013/GB delivered — so EFO makes sense when you have 2+ consumers reading the same shard that each need full throughput.

import boto3

kinesis = boto3.client('kinesis', region_name='us-east-1')

# Register an Enhanced Fan-Out consumer (one-time setup)
response = kinesis.register_stream_consumer(
    StreamARN='arn:aws:kinesis:us-east-1:123456789012:stream/user-clickstream',
    ConsumerName='fraud-detection-service'
)
consumer_arn = response['Consumer']['ConsumerARN']
print(f"EFO consumer ARN: {consumer_arn}")

# Subscribe to the shard using SubscribeToShard (HTTP/2 streaming)
# Typically done via KCL 2.x or AWS SDK streaming response
shard_iterator = kinesis.subscribe_to_shard(
    ConsumerARN=consumer_arn,
    ShardId='shardId-000000000000',
    StartingPosition={'Type': 'LATEST'}
)
# The response is a streaming EventStream — iterate event by event
for event in shard_iterator['EventStream']:
    if 'SubscribeToShardEvent' in event:
        for record in event['SubscribeToShardEvent']['Records']:
            print(record['SequenceNumber'], record['Data'])
EFO vs Shared Throughput: Use EFO when you have multiple downstream consumers (e.g., one consumer writing to S3, another to DynamoDB, another to ElasticSearch) all reading the same stream. Each gets the full 2 MB/s instead of splitting it.

Kinesis Firehose: Delivery and Transformation

Kinesis Data Firehose (now also called Amazon Data Firehose) is the easiest way to continuously load streaming data into AWS storage and analytics services. It is fully managed — no shards to provision, no consumers to write. You configure a delivery stream with a source (KDS, direct PUT, MSK, or S3) and a destination, and Firehose handles buffering, batching, compression, encryption, and retry.

Supported Destinations

DestinationTypical Use CaseFormats Supported
Amazon S3Data lake, archival, Athena queriesJSON, CSV, Parquet, ORC (via conversion)
Amazon RedshiftData warehouse analyticsCOPY command via S3 staging
Amazon OpenSearchFull-text search, log dashboardsJSON
SplunkSecurity/ops log managementRaw / JSON
HTTP EndpointThird-party SaaS (Datadog, New Relic, Coralogix)JSON

Buffering

Firehose batches records before delivery. You configure a buffer based on size (1–128 MB) and time (60–900 seconds). Firehose flushes whichever condition is met first. For S3, a common setting is 128 MB / 300s — large files are better for Athena query performance. For OpenSearch, use 5 MB / 60s to keep dashboards near-real-time.

Data Transformation with Lambda

You can attach a Lambda function to a Firehose delivery stream to transform, filter, or enrich records before delivery. Firehose invokes your Lambda synchronously with a batch of up to 3 MB of records. Your function returns each record with a result field: Ok, Dropped, or ProcessingFailed. Failed records are written to a separate S3 error prefix.

import base64, json

def lambda_handler(event, context):
    """
    Firehose transformation Lambda.
    Adds geo-enrichment and drops bot traffic before delivery to S3.
    """
    output_records = []

    for record in event['records']:
        # Decode
        payload = json.loads(
            base64.b64decode(record['data']).decode('utf-8')
        )

        # Drop synthetic/bot traffic
        if payload.get('user_agent', '').startswith('Googlebot'):
            output_records.append({
                'recordId': record['recordId'],
                'result': 'Dropped',
                'data': record['data']
            })
            continue

        # Enrich: add processing timestamp
        payload['processed_at'] = context.aws_request_id
        payload['env'] = 'prod'

        # Re-encode — must end with newline for S3 newline-delimited JSON
        encoded = base64.b64encode(
            (json.dumps(payload) + '\n').encode('utf-8')
        ).decode('utf-8')

        output_records.append({
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': encoded
        })

    return {'records': output_records}

Creating a Firehose Delivery Stream via CLI

# Create a Firehose delivery stream from KDS to S3 with buffering + compression
aws firehose create-delivery-stream \
  --delivery-stream-name clickstream-to-s3 \
  --delivery-stream-type KinesisStreamAsSource \
  --kinesis-stream-source-configuration \
    KinesisStreamARN=arn:aws:kinesis:us-east-1:123456789012:stream/user-clickstream,\
    RoleARN=arn:aws:iam::123456789012:role/firehose-role \
  --extended-s3-destination-configuration '{
    "RoleARN": "arn:aws:iam::123456789012:role/firehose-role",
    "BucketARN": "arn:aws:s3:::my-datalake-bucket",
    "Prefix": "clickstream/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/",
    "ErrorOutputPrefix": "clickstream-errors/",
    "BufferingHints": {
      "SizeInMBs": 128,
      "IntervalInSeconds": 300
    },
    "CompressionFormat": "GZIP",
    "DataFormatConversionConfiguration": {
      "Enabled": false
    }
  }'
S3 Partitioning Tip: Use the !{timestamp:...} expressions in your S3 prefix to automatically create Hive-compatible partitions by year/month/day. Athena and Glue can auto-discover these partitions, making ad-hoc SQL queries on months of data fast and cheap.

Kinesis Data Analytics: Managed Flink and SQL Windows

Kinesis Data Analytics (KDA) provides a managed Apache Flink environment. You write Flink jobs in Java, Scala, or Python (PyFlink), or use the Kinesis Data Analytics Studio (Zeppelin notebooks with Flink SQL). KDA handles cluster provisioning, auto-scaling, checkpointing, and Flink job management.

Stream Processing with Flink SQL Windows

The most common KDA use case is real-time aggregation over time windows. Flink supports three window types:

Window TypeBehaviorExample Use Case
TumblingFixed-size, non-overlapping windowsRevenue per 5-minute bucket
Sliding (Hopping)Fixed size, advances by a smaller hop60-min rolling average updated every 5 min
SessionGaps-based: closes after N seconds of inactivityUser session grouping
-- Kinesis Data Analytics Studio (Flink SQL)
-- Count page views per user per 5-minute tumbling window

CREATE TABLE clickstream (
  user_id     VARCHAR,
  event_type  VARCHAR,
  page        VARCHAR,
  event_time  TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
  'connector'  = 'kinesis',
  'stream'     = 'user-clickstream',
  'aws.region' = 'us-east-1',
  'format'     = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
);

-- Tumbling window: aggregate per 5-minute bucket
SELECT
  user_id,
  COUNT(*)                                          AS pageviews,
  TUMBLE_START(event_time, INTERVAL '5' MINUTE)    AS window_start,
  TUMBLE_END  (event_time, INTERVAL '5' MINUTE)    AS window_end
FROM clickstream
WHERE event_type = 'page_view'
GROUP BY
  user_id,
  TUMBLE(event_time, INTERVAL '5' MINUTE);

-- Sliding window: 60-min rolling click count, updated every 5 min
SELECT
  user_id,
  COUNT(*)                                                AS clicks_60m,
  HOP_START(event_time, INTERVAL '5' MINUTE, INTERVAL '60' MINUTE) AS window_start
FROM clickstream
GROUP BY
  user_id,
  HOP(event_time, INTERVAL '5' MINUTE, INTERVAL '60' MINUTE);
Watermarks and late data: The WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND declaration tells Flink how much late arrival to tolerate. Records arriving more than 5 seconds after the watermark are dropped. For mobile events that may buffer offline, increase this to 30–60 seconds.

KDA Application Parallelism

KDA applications scale by increasing Kinesis Processing Units (KPUs). One KPU = 1 vCPU + 4 GB RAM. Set parallelism (how many Flink operators run in parallel) and parallelismPerKPU (default 1). For a Flink job reading from a 10-shard stream, set parallelism = 10 so each subtask reads one shard. KDA can auto-scale KPUs based on CPU utilization when auto-scaling is enabled.

Shard Capacity Planning and Scaling

Getting shard count right is critical — too few causes ProvisionedThroughputExceededException; too many wastes money at $0.015/shard-hour.

Capacity Formula

Calculate required shards based on the higher of write and read requirements:

Write shards needed = max(
    ceil(incoming_MB_per_sec / 1),     -- 1 MB/s write per shard
    ceil(incoming_records_per_sec / 1000)  -- 1000 records/s per shard
)

Read shards needed = ceil(outgoing_MB_per_sec / 2)   -- 2 MB/s read per shard
                                                      -- multiplied by consumer count
                                                      -- if NOT using Enhanced Fan-Out

Shard count = max(write_shards_needed, read_shards_needed)

Example: You ingest 3 MB/s of writes across 4,000 records/s, with 3 consumers each reading the full stream at standard throughput:

  • Write: ceil(3/1) = 3 shards; ceil(4000/1000) = 4 shards → write needs 4
  • Read: 3 consumers × full stream = 3 × 3 MB/s out = 9 MB/s → ceil(9/2) = 5 shards
  • Required: 5 shards (or switch consumers to Enhanced Fan-Out and use 4)

Splitting and Merging Shards

Scale up by splitting a shard into two. The parent shard becomes read-only (existing records remain readable until retention expires) and two child shards start accepting new writes. Scale down by merging two adjacent shards (adjacent by hash key range) into one. Both operations take about 30 seconds and can be done via Console, CLI, or API:

# Split a shard at its midpoint hash key
SHARD_ID="shardId-000000000002"
STREAM="user-clickstream"

# Get current shard details
aws kinesis describe-stream-summary --stream-name $STREAM

# Split: provide the starting hash key of the new shard (midpoint of the parent)
aws kinesis split-shard \
  --stream-name $STREAM \
  --shard-to-split $SHARD_ID \
  --new-starting-hash-key 170141183460469231731687303715884105727

# Merge two adjacent shards
aws kinesis merge-shards \
  --stream-name $STREAM \
  --shard-to-merge shardId-000000000000 \
  --adjacent-shard-to-merge shardId-000000000001

# Monitor until stream status returns to ACTIVE
aws kinesis describe-stream-summary --stream-name $STREAM \
  --query 'StreamDescriptionSummary.StreamStatus'
Hot shard alert: If all your partition keys resolve to the same shard (e.g., you're using a low-cardinality key like event_type with only 3 values), splitting won't help — all traffic still hits the same shard. Fix the root cause: use a high-cardinality key like user_id or a composite user_id + timestamp_bucket. You can also use KPL's random salt feature to spread writes artificially.

On-Demand Capacity Mode

As of 2022, KDS supports an On-Demand mode that automatically scales shards based on traffic (up to 200 MB/s write / 400 MB/s read). You pay per GB ingested ($0.08/GB) rather than per shard-hour. On-Demand is ideal for unpredictable or spiky workloads — you give up control of exact shard count but eliminate under/over-provisioning risk. Switch between Provisioned and On-Demand at any time without downtime.

Kinesis vs MSK vs SQS: When to Use Which

AWS offers three primary messaging/streaming services. Choosing the wrong one at design time is expensive to fix later.

DimensionKinesis Data StreamsAmazon MSK (Kafka)Amazon SQS
Message orderingPer-shard (partition key)Per-partition (topic)FIFO queues only
Retention24h–365 daysConfigurable (days to forever)Up to 14 days
ReplayYes (within retention window)Yes (within retention)No (message deleted after consume)
Consumer groupsUp to 20 (EFO) or share 2MB/sUnlimited consumer groupsNo concept of groups — each message to one consumer
Max message size1 MB1 MB default (configurable to 10 MB+)256 KB
Operational overheadLow (serverless option)Medium–High (broker management)None (fully managed)
Ecosystem / protocolsAWS-proprietary SDKKafka protocol — broad ecosystemAWS-proprietary SDK
Pricing modelPer shard-hour + data in/outPer broker-hour + storagePer million requests

Decision Guide

  • Choose Kinesis Data Streams when you need ordered, replayable stream processing with AWS-native consumers (Lambda, Flink, KCL) and want fully serverless operation. Best for: clickstream analytics, IoT telemetry, real-time ETL pipelines entirely within AWS.
  • Choose MSK (Kafka) when you have existing Kafka producers/consumers (on-prem or multi-cloud), need Kafka's rich ecosystem (Kafka Connect, ksqlDB, Schema Registry), require large messages or long retention, or have strong multi-consumer fan-out needs. Best for: hybrid cloud pipelines, microservices event buses, data mesh architectures.
  • Choose SQS when you need simple point-to-point queuing, decoupling microservices, or task fan-out without ordering guarantees. Best for: work queues, async job dispatch, retry logic, dead-letter queuing. SQS is not a streaming platform — it does not support replay or consumer groups.
Hybrid pattern: Many production architectures combine all three. Producers write to KDS for ordered real-time processing. A Flink job in KDA aggregates and enriches, writing alerts to SQS for downstream Lambda processing, while Firehose archives raw records to S3. MSK handles the cross-service event bus layer where Kafka ecosystem tools are needed.

Frequently Asked Questions

What happens when a Kinesis shard reaches its write throughput limit?

The API returns a ProvisionedThroughputExceededException. The KPL handles this with automatic exponential backoff and retry. If you're using the raw SDK, implement your own retry loop with jitter. To prevent hot shards, use high-cardinality partition keys. Consider switching to On-Demand capacity mode for automatic scaling.

Can I change the number of shards without downtime?

Yes. Split and merge operations are non-disruptive to active producers and consumers. During a split, the parent shard remains readable (for existing records) and two child shards start accepting writes. Consumers using KCL 2.x automatically discover and re-assign to the new child shards within a few seconds. Lambda triggers also handle resharding automatically.

How does Firehose handle delivery failures?

Firehose retries delivery for up to 24 hours (S3) or configurable durations for other destinations. Records that cannot be delivered after the retry window are written to an S3 error bucket with a timestamp prefix. You can re-drive these records manually or via a Lambda that reads the error prefix and calls PutRecordBatch. Never ignore error prefixes in production — they indicate data loss.

Is Kinesis Data Analytics (Flink) a good replacement for a custom Spark Streaming job?

For most real-time aggregation workloads — windowed counts, joins across streams, anomaly flagging — yes. KDA eliminates Flink cluster management, handles checkpointing and savepoints, and auto-scales. The trade-offs: KDA is more expensive than self-managed EMR Flink for large-scale workloads, and some advanced Flink APIs (custom state backends, specialized connectors) require Managed Flink Studio workarounds. For sub-second latency analytics at moderate scale, KDA is the right choice. For petabyte-scale batch + stream unification, consider EMR or AWS Glue Streaming ETL.

What's the maximum throughput of a single Kinesis stream?

In Provisioned mode, a stream can have up to 500 shards in most regions (request a limit increase for more), giving 500 MB/s write and 1,000 MB/s read. In On-Demand mode, a stream can scale to 200 MB/s write / 400 MB/s read. For higher throughput, partition data across multiple streams and use a routing layer (Lambda or application logic) to distribute records.