AWS MSK: Managed Apache Kafka for Real-Time Data Streaming
Apache Kafka is the backbone of modern real-time data architectures — event-driven microservices, change data capture pipelines, fraud detection systems, and multi-region data replication all run on Kafka. But operating a Kafka cluster yourself means wrestling with ZooKeeper quorums, broker replication, disk provisioning, rolling upgrades, and 3am pager alerts. Amazon MSK (Managed Streaming for Apache Kafka) eliminates that operational burden while preserving 100% Kafka protocol compatibility. Your existing producers, consumers, Kafka Connect pipelines, and ksqlDB topologies work without modification.
This guide is the most comprehensive MSK reference you will find anywhere. We cover cluster architecture decisions (Standard vs Serverless), full Terraform and CLI provisioning, real Spring Kafka and kafka-python producer/consumer code, MSK Connect for managed pipelines, every security option, CloudWatch and Prometheus monitoring, and a cost optimization framework you can apply immediately to production clusters.
Table of Contents
- MSK vs Self-Managed Kafka vs Kinesis — Decision Matrix
- Cluster Setup: Standard vs Serverless — CLI + Terraform
- Topics, Partitioning Strategy, and Replication Factor
- Producer and Consumer Code: Java Spring Kafka + Python
- Consumer Groups, Offset Management, and Lag Monitoring
- MSK Connect: Managed Kafka Connect for S3 and RDS Sinks
- MSK Serverless: Auto-Scaling and When to Use It
- Security: TLS, SASL/SCRAM, IAM Auth, and VPC Config
- Monitoring: CloudWatch Metrics and Prometheus/Grafana
- Cost Optimization: Broker Sizing and Storage Tiering
- Frequently Asked Questions
MSK vs Self-Managed Kafka vs Kinesis — Decision Matrix
Before committing to MSK, you should understand where it sits in the streaming landscape. Three realistic options exist for teams on AWS: Amazon MSK (managed Kafka), self-managed Kafka on EC2/EKS, and Amazon Kinesis Data Streams. Each has genuine advantages, and choosing incorrectly at design time is expensive to reverse.
| Dimension | Amazon MSK | Self-Managed Kafka (EC2/EKS) | Kinesis Data Streams |
|---|---|---|---|
| Kafka protocol compatibility | 100% — any Kafka client works | 100% | None — proprietary SDK only |
| Operational overhead | Low — AWS manages brokers, ZooKeeper/KRaft, upgrades | High — full ops responsibility | Very low — fully serverless |
| Ecosystem | Full — Kafka Connect, ksqlDB, Schema Registry, MirrorMaker 2 | Full | AWS-only — KCL, Lambda, Flink |
| Multi-consumer fan-out | Unlimited consumer groups | Unlimited consumer groups | Up to 20 (Enhanced Fan-Out) or shared 2 MB/s per shard |
| Max message size | 1 MB default, up to 10 MB+ (configurable) | Configurable, practically unlimited | 1 MB hard limit |
| Retention | Hours to forever (with tiered storage) | Configurable | 24h – 365 days |
| Pricing model | Per broker-hour + EBS storage + data transfer | EC2 on-demand/reserved + EBS + ops labor | Per shard-hour OR per GB (On-Demand) |
| Hybrid / multi-cloud | Yes — Kafka protocol runs everywhere | Yes | No — AWS-only consumers |
| Serverless option | Yes — MSK Serverless | No | Yes — On-Demand mode |
| Best for | Existing Kafka workloads, rich ecosystem needs, microservices event buses | Maximum control, exotic configs, self-hosted regions | AWS-native pipelines, IoT telemetry, Flink processing |
MSK runs Apache Kafka versions 2.6 through 3.6 (2026). You get automatic minor-version patch upgrades and can perform major version upgrades via a rolling update that keeps the cluster online. MSK handles broker replacement on instance failure, ZooKeeper (or KRaft in newer versions) quorum management, and cross-AZ replication — none of which you configure or monitor directly.
Cluster Setup: Standard vs Serverless — CLI + Terraform
MSK offers two cluster types: Standard (Provisioned) and MSK Serverless. Standard gives you dedicated brokers with predictable throughput; Serverless auto-scales capacity invisibly. We cover both here, with CLI commands and a production-ready Terraform module for Standard clusters.
Standard Cluster via AWS CLI
# 1 — Create a client config file (cluster-config.json)
cat > cluster-config.json <<'EOF'
{
"InstanceType": "kafka.m5.xlarge",
"ClientBroker": "TLS",
"NumberOfBrokerNodes": 3
}
EOF
# 2 — Create the cluster (3 brokers, one per AZ)
aws kafka create-cluster \
--cluster-name "prod-events" \
--kafka-version "3.5.1" \
--number-of-broker-nodes 3 \
--broker-node-group-info '{
"InstanceType": "kafka.m5.xlarge",
"BrokerAZDistribution": "DEFAULT",
"ClientSubnets": [
"subnet-0a1b2c3d4e5f00001",
"subnet-0a1b2c3d4e5f00002",
"subnet-0a1b2c3d4e5f00003"
],
"SecurityGroups": ["sg-0123456789abcdef0"],
"StorageInfo": {
"EbsStorageInfo": {
"VolumeSize": 1000,
"ProvisionedThroughput": {
"Enabled": true,
"VolumeThroughput": 250
}
}
}
}' \
--encryption-info '{
"EncryptionInTransit": {"ClientBroker": "TLS", "InCluster": true}
}' \
--enhanced-monitoring "PER_TOPIC_PER_PARTITION" \
--open-monitoring '{
"Prometheus": {
"JmxExporter": {"EnabledInBroker": true},
"NodeExporter": {"EnabledInBroker": true}
}
}'
# 3 — Wait for the cluster to become ACTIVE (5–15 min)
aws kafka describe-cluster --cluster-arn "arn:aws:kafka:..." \
--query 'ClusterInfo.State'
# 4 — Retrieve bootstrap broker endpoints
aws kafka get-bootstrap-brokers \
--cluster-arn "arn:aws:kafka:us-east-1:123456789012:cluster/prod-events/..."
Terraform Module for a Production MSK Cluster
variable "cluster_name" { default = "prod-events" }
variable "kafka_version" { default = "3.5.1" }
variable "instance_type" { default = "kafka.m5.xlarge" }
variable "subnet_ids" { type = list(string) }
variable "security_group_ids" { type = list(string) }
resource "aws_msk_cluster" "main" {
cluster_name = var.cluster_name
kafka_version = var.kafka_version
number_of_broker_nodes = 3
broker_node_group_info {
instance_type = var.instance_type
client_subnets = var.subnet_ids
security_groups = var.security_group_ids
storage_info {
ebs_storage_info {
volume_size = 1000
provisioned_throughput {
enabled = true
volume_throughput = 250
}
}
}
}
encryption_info {
encryption_in_transit {
client_broker = "TLS"
in_cluster = true
}
}
client_authentication {
sasl {
scram = true
iam = true
}
tls {
certificate_authority_arns = []
}
}
configuration_info {
arn = aws_msk_configuration.main.arn
revision = aws_msk_configuration.main.latest_revision
}
enhanced_monitoring = "PER_TOPIC_PER_PARTITION"
open_monitoring {
prometheus {
jmx_exporter { enabled_in_broker = true }
node_exporter { enabled_in_broker = true }
}
}
logging_info {
broker_logs {
cloudwatch_logs {
enabled = true
log_group = "/msk/prod-events/broker-logs"
}
s3 {
enabled = true
bucket = aws_s3_bucket.msk_logs.id
prefix = "broker-logs/"
}
}
}
tags = {
Environment = "production"
Team = "platform"
}
}
resource "aws_msk_configuration" "main" {
name = "${var.cluster_name}-config"
kafka_versions = [var.kafka_version]
server_properties = <<-PROPS
auto.create.topics.enable=false
default.replication.factor=3
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.partitions=6
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=false
zookeeper.session.timeout.ms=18000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
compression.type=lz4
PROPS
}
output "bootstrap_brokers_tls" {
value = aws_msk_cluster.main.bootstrap_brokers_tls
}
output "bootstrap_brokers_sasl_iam" {
value = aws_msk_cluster.main.bootstrap_brokers_sasl_iam
}
output "zookeeper_connect" {
value = aws_msk_cluster.main.zookeeper_connect_string
}
kafka.m5.large (2 vCPU, 8 GB) handles ~50 MB/s per broker for development. kafka.m5.xlarge (4 vCPU, 16 GB) is the minimum recommended for production. kafka.m5.4xlarge or kafka.m5.12xlarge for high-throughput (>200 MB/s per broker) workloads. Always provision 3 brokers across 3 AZs for HA — never 2.Topics, Partitioning Strategy, and Replication Factor
Topics are the fundamental unit of organization in Kafka. Getting partitioning right at creation time prevents painful repartitioning later — downstream consumers relying on key-based ordering break when you add partitions, because the hash assignment changes.
Creating Topics with the Kafka CLI
export BOOTSTRAP="b-1.prod-events.xyz.kafka.us-east-1.amazonaws.com:9096,b-2.prod-events.xyz.kafka.us-east-1.amazonaws.com:9096"
# Create a topic: 12 partitions, replication factor 3, 7-day retention
kafka-topics.sh \
--bootstrap-server "$BOOTSTRAP" \
--command-config client.properties \
--create \
--topic user-events \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config min.insync.replicas=2 \
--config compression.type=lz4 \
--config cleanup.policy=delete
# List topics
kafka-topics.sh --bootstrap-server "$BOOTSTRAP" \
--command-config client.properties --list
# Describe a topic (partition leaders, ISR)
kafka-topics.sh --bootstrap-server "$BOOTSTRAP" \
--command-config client.properties \
--describe --topic user-events
Partitioning Strategy: How Many Partitions?
Partitions are the unit of parallelism in Kafka. A consumer group can have at most one active consumer per partition — so if you have 6 partitions and 10 consumer instances, 4 instances sit idle. The formula for choosing partition count is:
Partitions = max(
target_throughput_MB_per_sec / throughput_per_partition_MB_per_sec,
max_consumer_parallelism_you_will_ever_need
)
-- Rule of thumb for MSK kafka.m5.xlarge brokers:
-- Write throughput per partition: ~10–15 MB/s
-- Consumer throughput per partition: ~50 MB/s (if not doing heavy processing)
-- Example: 120 MB/s ingest, max 24 consumers needed
-- 120 / 12 = 10 write partitions minimum
-- max parallelism = 24
-- Choose: 24 partitions
Replication Factor and min.insync.replicas
Always set replication.factor=3 for production topics on a 3-broker MSK cluster. Set min.insync.replicas=2 so that writes are only acknowledged when at least 2 of the 3 replicas have written the message. Combined with producer setting acks=all, this ensures no data loss even if one broker fails at the moment of writing. Setting min.insync.replicas=3 means a single broker failure blocks all writes — avoid this unless your workload demands it.
Producer and Consumer Code: Java Spring Kafka + Python
Because MSK is 100% Kafka-protocol compatible, standard Kafka client libraries work unchanged. Below are complete, production-ready examples using Spring Kafka (Java) and kafka-python, including TLS and SASL/SCRAM configuration for connecting to an MSK cluster.
Spring Kafka Producer (Java)
<!-- pom.xml dependencies -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
# application.yml — MSK with SASL/SCRAM over TLS
spring:
kafka:
bootstrap-servers: "${MSK_BOOTSTRAP_BROKERS}"
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
retries: 3
properties:
enable.idempotence: true
max.in.flight.requests.per.connection: 5
linger.ms: 5
batch.size: 65536
compression.type: lz4
consumer:
group-id: order-processing-service
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "com.techoral.events"
max.poll.records: 500
fetch.min.bytes: 50000
fetch.max.wait.ms: 500
security:
protocol: SASL_SSL
properties:
sasl.mechanism: SCRAM-SHA-512
sasl.jaas.config: >
org.apache.kafka.common.security.scram.ScramLoginModule required
username="${MSK_USERNAME}"
password="${MSK_PASSWORD}";
ssl.truststore.location: /etc/kafka/truststore.jks
ssl.truststore.password: "${TRUSTSTORE_PASSWORD}"
// OrderEvent.java — event POJO
package com.techoral.events;
import java.time.Instant;
public record OrderEvent(
String orderId,
String userId,
String status,
double totalAmount,
String currency,
Instant timestamp
) {}
// OrderProducerService.java
package com.techoral.kafka;
import com.techoral.events.OrderEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderProducerService {
private static final String TOPIC = "order-events";
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
public void publishOrder(OrderEvent event) {
// Partition key = userId ensures all events for one user go to same partition
CompletableFuture<SendResult<String, OrderEvent>> future =
kafkaTemplate.send(TOPIC, event.userId(), event);
future.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Failed to publish order {} to MSK: {}", event.orderId(), ex.getMessage());
// publish to DLQ or retry queue here
} else {
log.debug("Published order {} to partition {} offset {}",
event.orderId(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
});
}
// Transactional producer — exactly-once semantics
public void publishOrderTransactional(OrderEvent event) {
kafkaTemplate.executeInTransaction(ops -> {
ops.send(TOPIC, event.userId(), event);
return true;
});
}
}
// OrderConsumerService.java
package com.techoral.kafka;
import com.techoral.events.OrderEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import java.util.List;
@Slf4j
@Service
public class OrderConsumerService {
// Batch listener — process up to 500 records per poll
@KafkaListener(
topics = "order-events",
containerFactory = "batchKafkaListenerContainerFactory"
)
public void consumeOrders(
@Payload List<OrderEvent> events,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
Acknowledgment ack
) {
log.info("Processing batch of {} order events", events.size());
try {
for (OrderEvent event : events) {
processOrder(event);
}
// Manual commit after successful batch processing
ack.acknowledge();
} catch (Exception e) {
log.error("Batch processing failed, not committing offsets", e);
// Nack causes retry from last committed offset
ack.nack(0, java.time.Duration.ofSeconds(5));
}
}
private void processOrder(OrderEvent event) {
// business logic here
log.debug("Processing order: {} status: {}", event.orderId(), event.status());
}
}
Python Producer and Consumer (kafka-python)
"""
msk_producer.py — Python producer for AWS MSK with SASL/SCRAM + TLS
pip install kafka-python boto3
"""
import json
import ssl
import time
import uuid
from datetime import datetime
from kafka import KafkaProducer
from kafka.errors import KafkaError
def create_producer(bootstrap_servers: str, username: str, password: str) -> KafkaProducer:
"""Create a KafkaProducer configured for MSK SASL/SCRAM over TLS."""
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False # MSK uses wildcard certs
ssl_ctx.verify_mode = ssl.CERT_REQUIRED
return KafkaProducer(
bootstrap_servers=bootstrap_servers.split(','),
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username=username,
sasl_plain_password=password,
ssl_context=ssl_ctx,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks='all',
retries=3,
max_in_flight_requests_per_connection=5,
enable_idempotence=True,
compression_type='lz4',
batch_size=65536,
linger_ms=5,
)
def on_send_success(record_metadata):
print(f"Sent to {record_metadata.topic}[{record_metadata.partition}] "
f"@ offset {record_metadata.offset}")
def on_send_error(excp):
print(f"Producer error: {excp}")
def publish_user_events(producer: KafkaProducer, user_id: str, n: int = 100):
"""Publish n synthetic user events, partitioned by user_id."""
for i in range(n):
event = {
'event_id': str(uuid.uuid4()),
'user_id': user_id,
'event_type': 'page_view' if i % 3 != 0 else 'click',
'page': f'/products/{i % 20}',
'timestamp': datetime.utcnow().isoformat() + 'Z',
'session_id': str(uuid.uuid4()),
}
producer.send(
topic='user-events',
key=user_id, # same key → same partition → ordered per user
value=event,
).add_callback(on_send_success).add_errback(on_send_error)
if i % 50 == 0:
producer.flush() # flush every 50 records
producer.flush()
print(f"Published {n} events for user {user_id}")
"""
msk_consumer.py — Python consumer with manual offset commit and lag tracking
"""
from kafka import KafkaConsumer, TopicPartition
from kafka.structs import OffsetAndMetadata
def create_consumer(bootstrap_servers, username, password, group_id):
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_REQUIRED
return KafkaConsumer(
'user-events',
bootstrap_servers=bootstrap_servers.split(','),
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username=username,
sasl_plain_password=password,
ssl_context=ssl_ctx,
group_id=group_id,
auto_offset_reset='earliest',
enable_auto_commit=False, # manual commit for exactly-once processing
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
key_deserializer=lambda m: m.decode('utf-8') if m else None,
max_poll_records=500,
fetch_min_bytes=50000,
fetch_max_wait_ms=500,
session_timeout_ms=30000,
heartbeat_interval_ms=10000,
)
def consume_events(consumer: KafkaConsumer):
"""Poll loop with manual offset commit and dead-letter handling."""
dlq: list = []
batch_count = 0
try:
for message in consumer:
event = message.value
try:
process_event(event)
# Commit this specific offset after successful processing
tp = TopicPartition(message.topic, message.partition)
consumer.commit({tp: OffsetAndMetadata(message.offset + 1, None)})
batch_count += 1
if batch_count % 1000 == 0:
print(f"Processed {batch_count} events, current lag: "
f"partition={message.partition} offset={message.offset}")
except Exception as e:
print(f"Failed to process event {event.get('event_id')}: {e}")
dlq.append({'event': event, 'error': str(e)})
except KeyboardInterrupt:
print(f"Shutting down. DLQ size: {len(dlq)}")
finally:
consumer.close()
def process_event(event: dict):
# Example: write to DynamoDB, update real-time dashboard, etc.
pass
enable.idempotence=true and acks=all together. This gives you exactly-once delivery at the producer level — the broker deduplicates retried records using a sequence number. For end-to-end exactly-once (producer → consumer → database), use Kafka transactions with isolation.level=read_committed on consumers.Consumer Groups, Offset Management, and Lag Monitoring
Consumer groups are the scalability unit on the consumer side. Every consumer in a group is assigned a disjoint set of partitions — each partition is consumed by exactly one group member at a time. Multiple independent groups (e.g., analytics service, audit logger, recommendation engine) can each consume the same topic independently at their own pace.
Offset Management
# List all consumer groups
kafka-consumer-groups.sh \
--bootstrap-server "$BOOTSTRAP" \
--command-config client.properties \
--list
# Describe a group — shows current offset, end offset, and lag per partition
kafka-consumer-groups.sh \
--bootstrap-server "$BOOTSTRAP" \
--command-config client.properties \
--describe --group order-processing-service
# Example output:
# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# order-processing-service order-events 0 100241 100241 0
# order-processing-service order-events 1 99873 100100 227
# order-processing-service order-events 2 100102 100102 0
# Reset offsets to beginning (useful for reprocessing after a bug fix)
kafka-consumer-groups.sh \
--bootstrap-server "$BOOTSTRAP" \
--command-config client.properties \
--group order-processing-service \
--topic order-events \
--reset-offsets --to-earliest \
--execute
# Reset to a specific timestamp (replay last 2 hours)
kafka-consumer-groups.sh \
--bootstrap-server "$BOOTSTRAP" \
--command-config client.properties \
--group order-processing-service \
--topic order-events \
--reset-offsets --to-datetime 2026-06-08T06:00:00.000 \
--execute
Lag Monitoring via Python Script
"""
lag_monitor.py — Emit consumer group lag to CloudWatch custom metrics.
Run on a cron or as a Lambda function every minute.
"""
import boto3
from kafka import KafkaAdminClient
from kafka.admin import NewPartitions
cloudwatch = boto3.client('cloudwatch', region_name='us-east-1')
def get_consumer_lag(bootstrap_servers, username, password, group_id, topic):
admin = KafkaAdminClient(
bootstrap_servers=bootstrap_servers.split(','),
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username=username,
sasl_plain_password=password,
)
offsets = admin.list_consumer_group_offsets(group_id)
admin.close()
from kafka import KafkaConsumer
consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers.split(','),
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username=username,
sasl_plain_password=password,
)
end_offsets = consumer.end_offsets(list(offsets.keys()))
consumer.close()
total_lag = 0
for tp, om in offsets.items():
if tp.topic == topic:
lag = end_offsets[tp] - om.offset
total_lag += max(0, lag)
return total_lag
def publish_lag_metric(group_id, topic, lag):
cloudwatch.put_metric_data(
Namespace='MSK/ConsumerLag',
MetricData=[{
'MetricName': 'TotalLag',
'Dimensions': [
{'Name': 'ConsumerGroup', 'Value': group_id},
{'Name': 'Topic', 'Value': topic},
],
'Value': lag,
'Unit': 'Count',
}]
)
print(f"Published lag={lag} for group={group_id} topic={topic}")
TotalLag metric. A good starting threshold is 10× the records-per-second your consumer processes — this gives you ~10 seconds of buffer before you're falling behind in real-time. For a consumer processing 1,000 records/sec, alarm at lag > 10,000.MSK Connect: Managed Kafka Connect for S3 and RDS Sinks
Kafka Connect is the integration layer of the Kafka ecosystem — a framework for streaming data between Kafka and external systems without writing producer/consumer code. MSK Connect is a fully managed service that runs Kafka Connect workers as a scalable, serverless fleet. AWS handles worker provisioning, failure recovery, scaling, and Kafka connectivity. You supply connector plugins and configuration.
Deploying an S3 Sink Connector
# 1 — Upload the Confluent S3 Sink connector plugin to S3
aws s3 cp confluentinc-kafka-connect-s3-10.5.4.zip \
s3://my-msk-plugins/connectors/
# 2 — Create the custom plugin in MSK Connect
aws kafkaconnect create-custom-plugin \
--name "s3-sink-v10" \
--content-type ZIP \
--location '{
"s3Location": {
"bucketArn": "arn:aws:s3:::my-msk-plugins",
"fileKey": "connectors/confluentinc-kafka-connect-s3-10.5.4.zip"
}
}'
# 3 — Create the connector (wait for plugin ARN from step 2)
aws kafkaconnect create-connector \
--connector-name "order-events-to-s3" \
--kafka-cluster '{
"apacheKafkaCluster": {
"bootstrapServers": "b-1.prod-events.xyz:9096,b-2.prod-events.xyz:9096",
"vpc": {
"subnets": ["subnet-001", "subnet-002", "subnet-003"],
"securityGroups": ["sg-0123456789abcdef0"]
}
}
}' \
--kafka-cluster-client-authentication '{"authenticationType": "IAM"}' \
--kafka-cluster-encryption-in-transit '{"encryptionType": "TLS"}' \
--plugins '[{"customPlugin": {"customPluginArn": "arn:aws:kafkaconnect:...", "revision": 1}}]' \
--connector-configuration '{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "6",
"topics": "order-events",
"s3.region": "us-east-1",
"s3.bucket.name": "my-event-lake",
"s3.part.size": "67108864",
"flush.size": "10000",
"rotate.interval.ms": "300000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"parquet.codec": "snappy",
"schema.compatibility": "FULL",
"locale": "en_US",
"timezone": "UTC",
"timestamp.extractor": "RecordField",
"timestamp.field": "timestamp",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'"'"'year'"'"'=YYYY/'"'"'month'"'"'=MM/'"'"'day'"'"'=dd/'"'"'hour'"'"'=HH",
"locale": "en_US",
"timezone": "UTC"
}' \
--capacity '{
"autoScaling": {
"minWorkerCount": 1,
"maxWorkerCount": 6,
"scaleInPolicy": {"cpuUtilizationPercentage": 20},
"scaleOutPolicy": {"cpuUtilizationPercentage": 80}
}
}' \
--service-execution-role-arn "arn:aws:iam::123456789012:role/MSKConnectRole"
Debezium CDC Connector for RDS → Kafka
{
"name": "rds-postgres-cdc",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "prod-db.cluster-xyz.us-east-1.rds.amazonaws.com",
"database.port": "5432",
"database.user": "${file:/opt/kafka/secrets/db-creds.properties:db.user}",
"database.password": "${file:/opt/kafka/secrets/db-creds.properties:db.password}",
"database.dbname": "orders",
"database.server.name": "prod-rds",
"table.include.list": "public.orders,public.order_items",
"plugin.name": "pgoutput",
"publication.autocreate.mode": "filtered",
"slot.name": "debezium_orders",
"heartbeat.interval.ms": "5000",
"snapshot.mode": "initial",
"topic.prefix": "cdc",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite"
}
MSK Serverless: Auto-Scaling and When to Use It
MSK Serverless is a cluster mode where you provision no brokers, choose no instance type, and manage no storage. Capacity scales automatically based on actual throughput, and you pay per GB ingested and per GB stored. It was designed for workloads with unpredictable or bursty traffic patterns.
Creating an MSK Serverless Cluster
# MSK Serverless — no broker count or instance type needed
aws kafka create-cluster-v2 \
--cluster-name "serverless-events" \
--serverless '{
"vpcConfigs": [
{
"subnetIds": ["subnet-001", "subnet-002", "subnet-003"],
"securityGroupIds": ["sg-0123456789abcdef0"]
}
],
"clientAuthentication": {
"sasl": {"iam": {"enabled": true}}
}
}'
# Terraform — MSK Serverless
resource "aws_msk_serverless_cluster" "events" {
cluster_name = "serverless-events"
vpc_config {
subnet_ids = var.subnet_ids
security_group_ids = var.security_group_ids
}
client_authentication {
sasl {
iam { enabled = true }
}
}
tags = { Environment = "staging" }
}
MSK Serverless Limits and Pricing (2026)
| Dimension | MSK Serverless | MSK Standard (m5.xlarge, 3 brokers) |
|---|---|---|
| Max throughput (write) | 200 MB/s per cluster | ~300 MB/s (3 × 100 MB/s per broker) |
| Max partitions | 120 per cluster | No hard limit (thousands) |
| Retention | Up to 24 hours | Configurable (days to forever with tiered storage) |
| IAM auth only | Yes — SASL/SCRAM not supported | No — supports TLS, SASL/SCRAM, IAM |
| Pricing | ~$0.24/GB ingested + $0.024/GB stored | ~$0.51/hr for 3 × m5.xlarge + EBS |
| Break-even point | Below ~60 GB/hr ingested | Above ~60 GB/hr ingested |
Security: TLS, SASL/SCRAM, IAM Auth, and VPC Config
MSK supports three authentication mechanisms, and you can enable multiple simultaneously. TLS (mTLS) uses X.509 client certificates issued by a private CA in ACM; SASL/SCRAM uses usernames and passwords stored in AWS Secrets Manager; IAM auth uses AWS IAM policies and is the preferred option for applications running on EC2, ECS, Lambda, or EKS.
IAM Authentication (Recommended for AWS Workloads)
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:AlterCluster",
"kafka-cluster:DescribeCluster"
],
"Resource": "arn:aws:kafka:us-east-1:123456789012:cluster/prod-events/*"
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:*Topic*",
"kafka-cluster:WriteData",
"kafka-cluster:ReadData"
],
"Resource": "arn:aws:kafka:us-east-1:123456789012:topic/prod-events/*/order-events"
},
{
"Effect": "Allow",
"Action": "kafka-cluster:AlterGroup",
"Resource": "arn:aws:kafka:us-east-1:123456789012:group/prod-events/*/order-processing-service"
}
]
}
// Java — IAM auth via MSK IAM library
// pom.xml: add software.amazon.msk:aws-msk-iam-auth:2.0.3
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapBrokersSaslIam);
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "AWS_MSK_IAM");
props.put("sasl.jaas.config",
"software.amazon.msk.auth.iam.IAMLoginModule required;");
props.put("sasl.client.callback.handler.class",
"software.amazon.msk.auth.iam.IAMClientCallbackHandler");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
SASL/SCRAM Setup (Secrets Manager)
# 1 — Create a SCRAM secret in Secrets Manager
# Secret name MUST begin with "AmazonMSK_"
aws secretsmanager create-secret \
--name "AmazonMSK_prod-events_app-user" \
--secret-string '{"username":"app-user","password":"S3cur3P@ssw0rd!"}'
# 2 — Associate the secret with the MSK cluster
aws kafka batch-associate-scram-secret \
--cluster-arn "arn:aws:kafka:us-east-1:123456789012:cluster/prod-events/..." \
--secret-arn-list "arn:aws:secretsmanager:us-east-1:123456789012:secret:AmazonMSK_prod-events_app-user-..."
# 3 — Verify association
aws kafka list-scram-secrets \
--cluster-arn "arn:aws:kafka:us-east-1:123456789012:cluster/prod-events/..."
VPC and Security Group Best Practices
MSK clusters must be deployed in a VPC. Brokers are placed in private subnets (no public IP by default — enable public access via PubliclyAccessible: true only for development). Security group rules should allow inbound traffic on broker ports only from client security groups, never from 0.0.0.0/0:
resource "aws_security_group_rule" "msk_from_app" {
type = "ingress"
from_port = 9096 # SASL_SSL
to_port = 9096
protocol = "tcp"
source_security_group_id = aws_security_group.app_service.id
security_group_id = aws_security_group.msk_broker.id
description = "App service → MSK SASL/SSL"
}
# Port 9094 = TLS (mTLS), 9092 = Plaintext (dev only), 9096 = SASL_SSL, 9098 = IAM
Monitoring: CloudWatch Metrics and Prometheus/Grafana
MSK emits over 150 CloudWatch metrics across three levels of detail. DEFAULT monitoring includes cluster-level and broker-level metrics at no extra charge. PER_BROKER adds per-broker metrics. PER_TOPIC_PER_BROKER and PER_TOPIC_PER_PARTITION are the most granular and cost extra CloudWatch fees — enable them selectively for critical topics.
Key CloudWatch Metrics to Alert On
| Metric | Namespace | Alert Threshold | Meaning |
|---|---|---|---|
KafkaDataLogsDiskUsed | AWS/Kafka | > 80% | Disk full → broker crash |
UnderReplicatedPartitions | AWS/Kafka | > 0 for 5+ min | Replication lag — data durability risk |
ActiveControllerCount | AWS/Kafka | != 1 | Cluster controller is down |
OfflinePartitionsCount | AWS/Kafka | > 0 | Partitions unavailable — data loss imminent |
BytesInPerSec | AWS/Kafka | Trending to broker limit | Capacity planning — add brokers |
ProduceTotalTimeMsMean | AWS/Kafka | > 500 ms | Broker latency spike |
EstimatedMaxTimeLag | AWS/Kafka | Workload-specific | Consumer falling behind |
Prometheus + Grafana Integration
# prometheus.yml — scrape MSK broker JMX and Node Exporter metrics
# MSK exposes these when open_monitoring.prometheus is enabled
scrape_configs:
- job_name: 'msk-jmx'
static_configs:
- targets:
- 'b-1.prod-events.xyz.kafka.us-east-1.amazonaws.com:11001'
- 'b-2.prod-events.xyz.kafka.us-east-1.amazonaws.com:11001'
- 'b-3.prod-events.xyz.kafka.us-east-1.amazonaws.com:11001'
scrape_interval: 60s
- job_name: 'msk-node'
static_configs:
- targets:
- 'b-1.prod-events.xyz.kafka.us-east-1.amazonaws.com:11002'
- 'b-2.prod-events.xyz.kafka.us-east-1.amazonaws.com:11002'
- 'b-3.prod-events.xyz.kafka.us-east-1.amazonaws.com:11002'
scrape_interval: 60s
# Grafana — useful PromQL queries for MSK dashboards
# Total bytes in per second (cluster-wide)
sum(rate(kafka_server_brokertopicmetrics_bytesin_total[5m]))
# Under-replicated partitions (should be 0)
sum(kafka_server_replicamanager_underreplicatedpartitions)
# Consumer lag per group + topic
sum by (consumergroup, topic) (
kafka_consumergroup_lag
)
# Network processor idle ratio (should stay above 30%)
avg(kafka_network_processor_idle_ratio) by (listener)
# Request queue size (should stay near 0)
avg(kafka_network_requestchannel_request_queue_size)
Cost Optimization: Broker Sizing and Storage Tiering
MSK costs have three components: broker-hour pricing, EBS storage, and data transfer. For a 3-broker kafka.m5.xlarge cluster in us-east-1, broker cost alone is ~$0.51/hr = ~$370/month. Here is a systematic framework for keeping those costs under control.
Right-Sizing Brokers
Start by measuring actual CPU and network utilization over a 2-week period. kafka.m5.large (2 vCPU, 8 GB, ~100 MB/s network) is sufficient for <50 MB/s total ingest. Move to kafka.m5.xlarge when you hit 60% CPU or 70% network utilization. Use the broker upgrade path — MSK supports live rolling broker instance type changes with no downtime.
# Check CPU utilization over last 7 days (pick the right-size broker)
aws cloudwatch get-metric-statistics \
--namespace AWS/Kafka \
--metric-name CpuUser \
--dimensions Name=Cluster Name,Value=prod-events Name=BrokerID,Value=1 \
--start-time 2026-06-01T00:00:00Z \
--end-time 2026-06-08T00:00:00Z \
--period 3600 \
--statistics Average Maximum \
--output table
# Update broker instance type (rolling — no downtime)
aws kafka update-broker-type \
--cluster-arn "arn:aws:kafka:..." \
--current-version "K3MAGZA5ZBCQPD" \
--target-instance-type "kafka.m5.2xlarge"
Tiered Storage
MSK Tiered Storage offloads older log segments from broker EBS volumes to S3, transparently. EBS costs ~$0.10/GB-month; S3 costs ~$0.023/GB-month — a 4× saving on the cold tier. Consumers can still read tiered data seamlessly. Enable it per-topic:
# Enable tiered storage on the cluster first (requires MSK >= 2.8.0)
aws kafka update-cluster-kafka-version \
--cluster-arn "arn:aws:kafka:..." \
--current-version "K3MAGZA5ZBCQPD" \
--target-kafka-version "3.5.1"
# Configure local retention (hot tier on EBS) vs total retention
kafka-configs.sh \
--bootstrap-server "$BOOTSTRAP" \
--command-config client.properties \
--alter \
--entity-type topics \
--entity-name order-events \
--add-config \
remote.storage.enable=true,\
local.retention.ms=86400000,\
retention.ms=2592000000
Compression and Storage Best Practices
- Set
compression.type=lz4at the producer level — LZ4 is fast and achieves 2–4× compression on JSON events, directly reducing both network transfer and disk cost. - Use
compactcleanup policy for changelog topics (e.g., user profiles, product catalog) — Kafka retains only the latest value per key, dramatically reducing storage versus time-based retention. - Delete unused consumer groups promptly — stale offsets stored in
__consumer_offsetsaccumulate over time. - Set
log.segment.bytes=512MBfor topics with tiered storage — smaller segments tier to S3 sooner, reducing EBS footprint. - Use Reserved Instances for long-running MSK clusters — AWS offers 1-year and 3-year reserved pricing for MSK brokers, saving up to 40% vs on-demand.
kafka.m5.xlarge brokers with 500 GB EBS each and tiered storage for 30-day retention — costs approximately $550–650/month. MSK Serverless for the same 30-day workload at 10 GB/hr ingest costs roughly $1,700/month. Standard clusters win on cost above ~30 GB/day sustained ingest.Frequently Asked Questions
Can I use MSK with on-premises Kafka producers?
Yes. MSK is 100% Kafka protocol compatible. On-premises producers using standard Kafka clients connect to MSK broker endpoints over TLS. Ensure your on-premises network has connectivity to the MSK VPC (via AWS Direct Connect or VPN), configure the client with SASL/SCRAM or mTLS credentials, and update the bootstrap.servers property. No code changes are required. For large-scale on-premises → MSK migrations, use MirrorMaker 2 to replicate topics during the cutover period.
What is the difference between MSK and MSK Serverless?
MSK Standard gives you dedicated brokers with fixed capacity (instance type × broker count), predictable costs, unlimited partition count, configurable retention, and all authentication options (TLS, SASL/SCRAM, IAM). MSK Serverless auto-scales capacity invisibly, requires no broker provisioning, supports only IAM authentication, caps retention at 24 hours, and limits you to 120 partitions. Serverless is ideal for dev/staging and variable workloads; Standard is required for production at scale.
How does MSK handle broker failures?
When a broker fails, MSK automatically replaces it within 15–30 minutes. During replacement, partitions whose leader was on the failed broker elect a new leader from the in-sync replica (ISR) set — typically within seconds. If min.insync.replicas=2 is set and the ISR drops below that threshold, produce requests with acks=all will fail with NotEnoughReplicasException until the replacement broker catches up. Design producers with retry logic and exponential backoff to handle this brief window.
Can I run ksqlDB or Schema Registry on MSK?
MSK does not include ksqlDB or Schema Registry — those are Confluent Platform components. However, you can run them yourself as containers on ECS or EKS, pointed at your MSK cluster. For Schema Registry specifically, AWS Glue Schema Registry is a fully managed alternative that integrates natively with MSK, Kinesis, and other AWS services and supports Avro, JSON Schema, and Protobuf.
How do I upgrade the Kafka version on a running MSK cluster?
Use the MSK console or CLI: aws kafka update-cluster-kafka-version --target-kafka-version X.Y.Z. MSK performs a rolling upgrade — one broker at a time — with no downtime. The upgrade takes 20–45 minutes depending on cluster size and data volume. Follow the Kafka upgrade path (you cannot skip major versions): upgrade 2.8 → 3.x requires an intermediate step if jumping more than one major version. Always test on a staging cluster first and review the Kafka release notes for incompatible config changes.