AWS Kinesis: Real-Time Data Streaming and Analytics
AWS Kinesis is a family of fully managed services for ingesting, processing, and analyzing real-time data streams at any scale. Whether you're capturing clickstream events from millions of users, ingesting IoT sensor readings, or building a real-time fraud detection pipeline, Kinesis provides the building blocks to move data from source to destination in seconds — not hours. This guide covers the full Kinesis family, practical producer and consumer patterns, shard capacity math, and a clear decision framework for choosing between Kinesis, MSK, and SQS.
Table of Contents
- Kinesis Family Overview
- Kinesis Data Streams: Shards, Partition Keys, Retention
- Producer Patterns: KPL, SDK, Kinesis Agent
- Consumer Patterns: KCL, Lambda, Enhanced Fan-Out
- Kinesis Firehose: Delivery and Transformation
- Kinesis Data Analytics: Managed Flink and SQL Windows
- Shard Capacity Planning and Scaling
- Kinesis vs MSK vs SQS: When to Use Which
- Frequently Asked Questions
Kinesis Family Overview
The Kinesis brand covers four distinct services that share a streaming philosophy but serve very different operational roles:
| Service | Primary Role | Consumers | Typical Latency |
|---|---|---|---|
| Kinesis Data Streams (KDS) | Durable, ordered, replayable stream of records | KCL apps, Lambda, Flink, custom | ~70 ms |
| Kinesis Data Firehose | Managed delivery pipeline to S3/Redshift/OpenSearch/Splunk | AWS-managed — no custom consumers | 60s–900s buffer |
| Kinesis Data Analytics | Real-time SQL or Apache Flink on streams | KDS or Firehose as source; writes to KDS/S3/etc. | Sub-second (Flink) |
| Kinesis Video Streams | Ingest, store, and process video from devices | Rekognition, custom ML, HLS playback | Seconds |
For most architectures, you'll work primarily with KDS (the core abstraction) and Firehose (the delivery workhorse). KDA adds stream processing logic when Lambda isn't expressive enough. Video Streams is a specialized service for camera/device video ingestion.
Kinesis Data Streams: Shards, Partition Keys, Retention
A Kinesis Data Stream is made up of one or more shards. Each shard is the unit of capacity: it provides 1 MB/s write throughput and 2 MB/s read throughput, up to 1,000 records/s writes and 5 read transactions/s per shard. All records within a shard are strictly ordered.
Partition Keys and Sequence Numbers
Every record you write to KDS has a partition key — a string you choose (e.g., userId, deviceId). Kinesis hashes the partition key using MD5 and uses the result to determine which shard receives the record. All records with the same partition key go to the same shard, guaranteeing per-key order.
Once a record is accepted, Kinesis assigns it a sequence number — a monotonically increasing, 128-bit integer unique within the shard. Consumers use sequence numbers to checkpoint their position and resume from exactly where they left off after a restart.
Retention Period
By default, records are retained for 24 hours. You can extend this to 7 days (standard) or up to 365 days with long-term data retention (additional cost). Extended retention is valuable for replaying data to re-process after a bug fix, backfilling a new consumer, or auditing.
| Retention Tier | Duration | Pricing Impact |
|---|---|---|
| Default | 24 hours | Included in shard-hour pricing |
| Extended | 7 days | +$0.020/shard-hour above 24h |
| Long-term | Up to 365 days | +$0.023/GB/month |
Producer Patterns: KPL, SDK, Kinesis Agent
Three main mechanisms exist for writing to a Kinesis stream, each suited to different throughput and latency profiles.
1. AWS SDK — PutRecords API
The simplest approach: call PutRecords directly via boto3, the Java SDK, or any AWS SDK. PutRecords accepts up to 500 records per call (max 5 MB total). It's synchronous and returns a response with per-record success/failure, so you can retry failed records immediately.
import boto3, json, time
kinesis = boto3.client('kinesis', region_name='us-east-1')
STREAM_NAME = 'user-clickstream'
def put_records_batch(events: list[dict]):
"""Write a batch of up to 500 events to Kinesis."""
records = [
{
'Data': json.dumps(event).encode('utf-8'),
'PartitionKey': event['user_id'] # per-user ordering
}
for event in events
]
response = kinesis.put_records(
Records=records,
StreamName=STREAM_NAME
)
failed = response['FailedRecordCount']
if failed > 0:
# retry failed records (ProvisionedThroughputExceededException)
retry = [
records[i] for i, r in enumerate(response['Records'])
if 'ErrorCode' in r
]
time.sleep(0.5)
kinesis.put_records(Records=retry, StreamName=STREAM_NAME)
return response['FailedRecordCount']
# Example usage
batch = [
{'user_id': 'u-123', 'event': 'page_view', 'page': '/checkout', 'ts': time.time()},
{'user_id': 'u-456', 'event': 'add_to_cart', 'product': 'widget-42', 'ts': time.time()},
]
put_records_batch(batch)
2. Kinesis Producer Library (KPL)
KPL is a Java/C++ library that runs as a child process and handles aggregation (packing multiple logical records into one Kinesis record), batching, retry with exponential backoff, and async flushing. It can achieve significantly higher throughput per shard than raw PutRecord calls. The trade-off: KPL adds ~1–5 seconds of latency due to buffering, and KPL-aggregated records must be de-aggregated on the consumer side using the KCL or the amazon-kinesis-client de-aggregation library.
3. Kinesis Agent
Kinesis Agent is a Java daemon you install on EC2 or on-premise servers. It tails log files (including rotation), parses them (supports Apache CLF, syslog, CSV), and writes to KDS or Firehose. Configuration is a JSON file — no code required.
{
"kinesis.endpoint": "",
"flows": [
{
"filePattern": "/var/log/nginx/access.log*",
"kinesisStream": "nginx-access-logs",
"partitionKeyOption": "RANDOM",
"dataProcessingOptions": [
{
"optionName": "LOGTOJSON",
"logEntryFormat": "COMBINEDAPACHELOG",
"matchPattern": "^([\\d.]+) (\\S+) (\\S+) \\[([^\\]]+)\\] \"([^\"]+)\" (\\d+) (\\d+) \"([^\"]+)\" \"([^\"]+)\"$",
"customFieldNames": ["host","ident","authuser","datetime","request","response","bytes","referrer","agent"]
}
]
}
]
}
Consumer Patterns: KCL, Lambda Trigger, Enhanced Fan-Out
Reading from a KDS stream falls into three main patterns, each with different throughput and latency characteristics.
1. Kinesis Client Library (KCL)
KCL is a Java library (also available via MultiLangDaemon for Python/Ruby/Node) that handles shard discovery, checkpointing to DynamoDB, consumer rebalancing across worker nodes, and lease management. You implement a single processRecords() method. KCL uses the shared throughput model: all KCL consumers on a shard share the 2 MB/s read limit.
2. Lambda Trigger (Event Source Mapping)
The simplest consumer for serverless pipelines. Lambda polls the shard using GetRecords and invokes your function with a batch of records. You control batch size (1–10,000), batch window (0–300s), and starting position (LATEST, TRIM_HORIZON, or a specific timestamp).
import base64, json
def lambda_handler(event, context):
"""
Kinesis Lambda trigger — receives a batch of records from one shard.
Records arrive base64-encoded inside event['Records'].
"""
processed = 0
errors = []
for record in event['Records']:
# Decode the base64 payload
payload = json.loads(
base64.b64decode(record['kinesis']['data']).decode('utf-8')
)
shard_id = record['eventID'].split(':')[0]
seq_no = record['kinesis']['sequenceNumber']
try:
# Your business logic here
handle_event(payload)
processed += 1
except Exception as e:
errors.append({'seq': seq_no, 'error': str(e)})
print(f"Processed {processed} records, {len(errors)} errors")
# To report partial failures (requires ReportBatchItemFailures bisect mode):
# return {'batchItemFailures': [{'itemIdentifier': e['seq']} for e in errors]}
return {'statusCode': 200}
def handle_event(payload: dict):
"""Placeholder — write to DynamoDB, call downstream API, etc."""
if payload.get('event') == 'purchase':
# send to fraud scoring service
pass
3. Enhanced Fan-Out (EFO)
Standard KDS GetRecords polling shares the 2 MB/s shard throughput across all consumers. Enhanced Fan-Out gives each registered consumer its own dedicated 2 MB/s per-shard pipe using HTTP/2 server-push. Latency drops from ~200ms (polling) to ~70ms. The cost is $0.015/shard-hour + $0.013/GB delivered — so EFO makes sense when you have 2+ consumers reading the same shard that each need full throughput.
import boto3
kinesis = boto3.client('kinesis', region_name='us-east-1')
# Register an Enhanced Fan-Out consumer (one-time setup)
response = kinesis.register_stream_consumer(
StreamARN='arn:aws:kinesis:us-east-1:123456789012:stream/user-clickstream',
ConsumerName='fraud-detection-service'
)
consumer_arn = response['Consumer']['ConsumerARN']
print(f"EFO consumer ARN: {consumer_arn}")
# Subscribe to the shard using SubscribeToShard (HTTP/2 streaming)
# Typically done via KCL 2.x or AWS SDK streaming response
shard_iterator = kinesis.subscribe_to_shard(
ConsumerARN=consumer_arn,
ShardId='shardId-000000000000',
StartingPosition={'Type': 'LATEST'}
)
# The response is a streaming EventStream — iterate event by event
for event in shard_iterator['EventStream']:
if 'SubscribeToShardEvent' in event:
for record in event['SubscribeToShardEvent']['Records']:
print(record['SequenceNumber'], record['Data'])
Kinesis Firehose: Delivery and Transformation
Kinesis Data Firehose (now also called Amazon Data Firehose) is the easiest way to continuously load streaming data into AWS storage and analytics services. It is fully managed — no shards to provision, no consumers to write. You configure a delivery stream with a source (KDS, direct PUT, MSK, or S3) and a destination, and Firehose handles buffering, batching, compression, encryption, and retry.
Supported Destinations
| Destination | Typical Use Case | Formats Supported |
|---|---|---|
| Amazon S3 | Data lake, archival, Athena queries | JSON, CSV, Parquet, ORC (via conversion) |
| Amazon Redshift | Data warehouse analytics | COPY command via S3 staging |
| Amazon OpenSearch | Full-text search, log dashboards | JSON |
| Splunk | Security/ops log management | Raw / JSON |
| HTTP Endpoint | Third-party SaaS (Datadog, New Relic, Coralogix) | JSON |
Buffering
Firehose batches records before delivery. You configure a buffer based on size (1–128 MB) and time (60–900 seconds). Firehose flushes whichever condition is met first. For S3, a common setting is 128 MB / 300s — large files are better for Athena query performance. For OpenSearch, use 5 MB / 60s to keep dashboards near-real-time.
Data Transformation with Lambda
You can attach a Lambda function to a Firehose delivery stream to transform, filter, or enrich records before delivery. Firehose invokes your Lambda synchronously with a batch of up to 3 MB of records. Your function returns each record with a result field: Ok, Dropped, or ProcessingFailed. Failed records are written to a separate S3 error prefix.
import base64, json
def lambda_handler(event, context):
"""
Firehose transformation Lambda.
Adds geo-enrichment and drops bot traffic before delivery to S3.
"""
output_records = []
for record in event['records']:
# Decode
payload = json.loads(
base64.b64decode(record['data']).decode('utf-8')
)
# Drop synthetic/bot traffic
if payload.get('user_agent', '').startswith('Googlebot'):
output_records.append({
'recordId': record['recordId'],
'result': 'Dropped',
'data': record['data']
})
continue
# Enrich: add processing timestamp
payload['processed_at'] = context.aws_request_id
payload['env'] = 'prod'
# Re-encode — must end with newline for S3 newline-delimited JSON
encoded = base64.b64encode(
(json.dumps(payload) + '\n').encode('utf-8')
).decode('utf-8')
output_records.append({
'recordId': record['recordId'],
'result': 'Ok',
'data': encoded
})
return {'records': output_records}
Creating a Firehose Delivery Stream via CLI
# Create a Firehose delivery stream from KDS to S3 with buffering + compression
aws firehose create-delivery-stream \
--delivery-stream-name clickstream-to-s3 \
--delivery-stream-type KinesisStreamAsSource \
--kinesis-stream-source-configuration \
KinesisStreamARN=arn:aws:kinesis:us-east-1:123456789012:stream/user-clickstream,\
RoleARN=arn:aws:iam::123456789012:role/firehose-role \
--extended-s3-destination-configuration '{
"RoleARN": "arn:aws:iam::123456789012:role/firehose-role",
"BucketARN": "arn:aws:s3:::my-datalake-bucket",
"Prefix": "clickstream/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/",
"ErrorOutputPrefix": "clickstream-errors/",
"BufferingHints": {
"SizeInMBs": 128,
"IntervalInSeconds": 300
},
"CompressionFormat": "GZIP",
"DataFormatConversionConfiguration": {
"Enabled": false
}
}'
!{timestamp:...} expressions in your S3 prefix to automatically create Hive-compatible partitions by year/month/day. Athena and Glue can auto-discover these partitions, making ad-hoc SQL queries on months of data fast and cheap.Kinesis Data Analytics: Managed Flink and SQL Windows
Kinesis Data Analytics (KDA) provides a managed Apache Flink environment. You write Flink jobs in Java, Scala, or Python (PyFlink), or use the Kinesis Data Analytics Studio (Zeppelin notebooks with Flink SQL). KDA handles cluster provisioning, auto-scaling, checkpointing, and Flink job management.
Stream Processing with Flink SQL Windows
The most common KDA use case is real-time aggregation over time windows. Flink supports three window types:
| Window Type | Behavior | Example Use Case |
|---|---|---|
| Tumbling | Fixed-size, non-overlapping windows | Revenue per 5-minute bucket |
| Sliding (Hopping) | Fixed size, advances by a smaller hop | 60-min rolling average updated every 5 min |
| Session | Gaps-based: closes after N seconds of inactivity | User session grouping |
-- Kinesis Data Analytics Studio (Flink SQL)
-- Count page views per user per 5-minute tumbling window
CREATE TABLE clickstream (
user_id VARCHAR,
event_type VARCHAR,
page VARCHAR,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kinesis',
'stream' = 'user-clickstream',
'aws.region' = 'us-east-1',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);
-- Tumbling window: aggregate per 5-minute bucket
SELECT
user_id,
COUNT(*) AS pageviews,
TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start,
TUMBLE_END (event_time, INTERVAL '5' MINUTE) AS window_end
FROM clickstream
WHERE event_type = 'page_view'
GROUP BY
user_id,
TUMBLE(event_time, INTERVAL '5' MINUTE);
-- Sliding window: 60-min rolling click count, updated every 5 min
SELECT
user_id,
COUNT(*) AS clicks_60m,
HOP_START(event_time, INTERVAL '5' MINUTE, INTERVAL '60' MINUTE) AS window_start
FROM clickstream
GROUP BY
user_id,
HOP(event_time, INTERVAL '5' MINUTE, INTERVAL '60' MINUTE);
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND declaration tells Flink how much late arrival to tolerate. Records arriving more than 5 seconds after the watermark are dropped. For mobile events that may buffer offline, increase this to 30–60 seconds.KDA Application Parallelism
KDA applications scale by increasing Kinesis Processing Units (KPUs). One KPU = 1 vCPU + 4 GB RAM. Set parallelism (how many Flink operators run in parallel) and parallelismPerKPU (default 1). For a Flink job reading from a 10-shard stream, set parallelism = 10 so each subtask reads one shard. KDA can auto-scale KPUs based on CPU utilization when auto-scaling is enabled.
Shard Capacity Planning and Scaling
Getting shard count right is critical — too few causes ProvisionedThroughputExceededException; too many wastes money at $0.015/shard-hour.
Capacity Formula
Calculate required shards based on the higher of write and read requirements:
Write shards needed = max(
ceil(incoming_MB_per_sec / 1), -- 1 MB/s write per shard
ceil(incoming_records_per_sec / 1000) -- 1000 records/s per shard
)
Read shards needed = ceil(outgoing_MB_per_sec / 2) -- 2 MB/s read per shard
-- multiplied by consumer count
-- if NOT using Enhanced Fan-Out
Shard count = max(write_shards_needed, read_shards_needed)
Example: You ingest 3 MB/s of writes across 4,000 records/s, with 3 consumers each reading the full stream at standard throughput:
- Write: ceil(3/1) = 3 shards; ceil(4000/1000) = 4 shards → write needs 4
- Read: 3 consumers × full stream = 3 × 3 MB/s out = 9 MB/s → ceil(9/2) = 5 shards
- Required: 5 shards (or switch consumers to Enhanced Fan-Out and use 4)
Splitting and Merging Shards
Scale up by splitting a shard into two. The parent shard becomes read-only (existing records remain readable until retention expires) and two child shards start accepting new writes. Scale down by merging two adjacent shards (adjacent by hash key range) into one. Both operations take about 30 seconds and can be done via Console, CLI, or API:
# Split a shard at its midpoint hash key
SHARD_ID="shardId-000000000002"
STREAM="user-clickstream"
# Get current shard details
aws kinesis describe-stream-summary --stream-name $STREAM
# Split: provide the starting hash key of the new shard (midpoint of the parent)
aws kinesis split-shard \
--stream-name $STREAM \
--shard-to-split $SHARD_ID \
--new-starting-hash-key 170141183460469231731687303715884105727
# Merge two adjacent shards
aws kinesis merge-shards \
--stream-name $STREAM \
--shard-to-merge shardId-000000000000 \
--adjacent-shard-to-merge shardId-000000000001
# Monitor until stream status returns to ACTIVE
aws kinesis describe-stream-summary --stream-name $STREAM \
--query 'StreamDescriptionSummary.StreamStatus'
event_type with only 3 values), splitting won't help — all traffic still hits the same shard. Fix the root cause: use a high-cardinality key like user_id or a composite user_id + timestamp_bucket. You can also use KPL's random salt feature to spread writes artificially.On-Demand Capacity Mode
As of 2022, KDS supports an On-Demand mode that automatically scales shards based on traffic (up to 200 MB/s write / 400 MB/s read). You pay per GB ingested ($0.08/GB) rather than per shard-hour. On-Demand is ideal for unpredictable or spiky workloads — you give up control of exact shard count but eliminate under/over-provisioning risk. Switch between Provisioned and On-Demand at any time without downtime.
Kinesis vs MSK vs SQS: When to Use Which
AWS offers three primary messaging/streaming services. Choosing the wrong one at design time is expensive to fix later.
| Dimension | Kinesis Data Streams | Amazon MSK (Kafka) | Amazon SQS |
|---|---|---|---|
| Message ordering | Per-shard (partition key) | Per-partition (topic) | FIFO queues only |
| Retention | 24h–365 days | Configurable (days to forever) | Up to 14 days |
| Replay | Yes (within retention window) | Yes (within retention) | No (message deleted after consume) |
| Consumer groups | Up to 20 (EFO) or share 2MB/s | Unlimited consumer groups | No concept of groups — each message to one consumer |
| Max message size | 1 MB | 1 MB default (configurable to 10 MB+) | 256 KB |
| Operational overhead | Low (serverless option) | Medium–High (broker management) | None (fully managed) |
| Ecosystem / protocols | AWS-proprietary SDK | Kafka protocol — broad ecosystem | AWS-proprietary SDK |
| Pricing model | Per shard-hour + data in/out | Per broker-hour + storage | Per million requests |
Decision Guide
- Choose Kinesis Data Streams when you need ordered, replayable stream processing with AWS-native consumers (Lambda, Flink, KCL) and want fully serverless operation. Best for: clickstream analytics, IoT telemetry, real-time ETL pipelines entirely within AWS.
- Choose MSK (Kafka) when you have existing Kafka producers/consumers (on-prem or multi-cloud), need Kafka's rich ecosystem (Kafka Connect, ksqlDB, Schema Registry), require large messages or long retention, or have strong multi-consumer fan-out needs. Best for: hybrid cloud pipelines, microservices event buses, data mesh architectures.
- Choose SQS when you need simple point-to-point queuing, decoupling microservices, or task fan-out without ordering guarantees. Best for: work queues, async job dispatch, retry logic, dead-letter queuing. SQS is not a streaming platform — it does not support replay or consumer groups.
Frequently Asked Questions
What happens when a Kinesis shard reaches its write throughput limit?
The API returns a ProvisionedThroughputExceededException. The KPL handles this with automatic exponential backoff and retry. If you're using the raw SDK, implement your own retry loop with jitter. To prevent hot shards, use high-cardinality partition keys. Consider switching to On-Demand capacity mode for automatic scaling.
Can I change the number of shards without downtime?
Yes. Split and merge operations are non-disruptive to active producers and consumers. During a split, the parent shard remains readable (for existing records) and two child shards start accepting writes. Consumers using KCL 2.x automatically discover and re-assign to the new child shards within a few seconds. Lambda triggers also handle resharding automatically.
How does Firehose handle delivery failures?
Firehose retries delivery for up to 24 hours (S3) or configurable durations for other destinations. Records that cannot be delivered after the retry window are written to an S3 error bucket with a timestamp prefix. You can re-drive these records manually or via a Lambda that reads the error prefix and calls PutRecordBatch. Never ignore error prefixes in production — they indicate data loss.
Is Kinesis Data Analytics (Flink) a good replacement for a custom Spark Streaming job?
For most real-time aggregation workloads — windowed counts, joins across streams, anomaly flagging — yes. KDA eliminates Flink cluster management, handles checkpointing and savepoints, and auto-scales. The trade-offs: KDA is more expensive than self-managed EMR Flink for large-scale workloads, and some advanced Flink APIs (custom state backends, specialized connectors) require Managed Flink Studio workarounds. For sub-second latency analytics at moderate scale, KDA is the right choice. For petabyte-scale batch + stream unification, consider EMR or AWS Glue Streaming ETL.
What's the maximum throughput of a single Kinesis stream?
In Provisioned mode, a stream can have up to 500 shards in most regions (request a limit increase for more), giving 500 MB/s write and 1,000 MB/s read. In On-Demand mode, a stream can scale to 200 MB/s write / 400 MB/s read. For higher throughput, partition data across multiple streams and use a routing layer (Lambda or application logic) to distribute records.