AWS MSK: Managed Apache Kafka for Real-Time Data Streaming

AWS MSK — Managed Apache Kafka Streaming

Apache Kafka is the backbone of modern real-time data architectures — event-driven microservices, change data capture pipelines, fraud detection systems, and multi-region data replication all run on Kafka. But operating a Kafka cluster yourself means wrestling with ZooKeeper quorums, broker replication, disk provisioning, rolling upgrades, and 3am pager alerts. Amazon MSK (Managed Streaming for Apache Kafka) eliminates that operational burden while preserving 100% Kafka protocol compatibility. Your existing producers, consumers, Kafka Connect pipelines, and ksqlDB topologies work without modification.

This guide is the most comprehensive MSK reference you will find anywhere. We cover cluster architecture decisions (Standard vs Serverless), full Terraform and CLI provisioning, real Spring Kafka and kafka-python producer/consumer code, MSK Connect for managed pipelines, every security option, CloudWatch and Prometheus monitoring, and a cost optimization framework you can apply immediately to production clusters.

MSK vs Self-Managed Kafka vs Kinesis — Decision Matrix

Before committing to MSK, you should understand where it sits in the streaming landscape. Three realistic options exist for teams on AWS: Amazon MSK (managed Kafka), self-managed Kafka on EC2/EKS, and Amazon Kinesis Data Streams. Each has genuine advantages, and choosing incorrectly at design time is expensive to reverse.

DimensionAmazon MSKSelf-Managed Kafka (EC2/EKS)Kinesis Data Streams
Kafka protocol compatibility100% — any Kafka client works100%None — proprietary SDK only
Operational overheadLow — AWS manages brokers, ZooKeeper/KRaft, upgradesHigh — full ops responsibilityVery low — fully serverless
EcosystemFull — Kafka Connect, ksqlDB, Schema Registry, MirrorMaker 2FullAWS-only — KCL, Lambda, Flink
Multi-consumer fan-outUnlimited consumer groupsUnlimited consumer groupsUp to 20 (Enhanced Fan-Out) or shared 2 MB/s per shard
Max message size1 MB default, up to 10 MB+ (configurable)Configurable, practically unlimited1 MB hard limit
RetentionHours to forever (with tiered storage)Configurable24h – 365 days
Pricing modelPer broker-hour + EBS storage + data transferEC2 on-demand/reserved + EBS + ops laborPer shard-hour OR per GB (On-Demand)
Hybrid / multi-cloudYes — Kafka protocol runs everywhereYesNo — AWS-only consumers
Serverless optionYes — MSK ServerlessNoYes — On-Demand mode
Best forExisting Kafka workloads, rich ecosystem needs, microservices event busesMaximum control, exotic configs, self-hosted regionsAWS-native pipelines, IoT telemetry, Flink processing
The rule of thumb: If you already have Kafka producers/consumers (on-prem, other clouds, or legacy services), MSK is the lowest-friction migration path. If you are greenfield on AWS with no Kafka dependency, evaluate Kinesis first — it has zero operational footprint and native Lambda/Flink integrations. Use self-managed Kafka only when you need configs that MSK doesn't expose or when you need to run Kafka in a region or environment AWS doesn't support.

MSK runs Apache Kafka versions 2.6 through 3.6 (2026). You get automatic minor-version patch upgrades and can perform major version upgrades via a rolling update that keeps the cluster online. MSK handles broker replacement on instance failure, ZooKeeper (or KRaft in newer versions) quorum management, and cross-AZ replication — none of which you configure or monitor directly.

Cluster Setup: Standard vs Serverless — CLI + Terraform

MSK offers two cluster types: Standard (Provisioned) and MSK Serverless. Standard gives you dedicated brokers with predictable throughput; Serverless auto-scales capacity invisibly. We cover both here, with CLI commands and a production-ready Terraform module for Standard clusters.

Standard Cluster via AWS CLI

# 1 — Create a client config file (cluster-config.json)
cat > cluster-config.json <<'EOF'
{
  "InstanceType": "kafka.m5.xlarge",
  "ClientBroker": "TLS",
  "NumberOfBrokerNodes": 3
}
EOF

# 2 — Create the cluster (3 brokers, one per AZ)
aws kafka create-cluster \
  --cluster-name "prod-events" \
  --kafka-version "3.5.1" \
  --number-of-broker-nodes 3 \
  --broker-node-group-info '{
    "InstanceType": "kafka.m5.xlarge",
    "BrokerAZDistribution": "DEFAULT",
    "ClientSubnets": [
      "subnet-0a1b2c3d4e5f00001",
      "subnet-0a1b2c3d4e5f00002",
      "subnet-0a1b2c3d4e5f00003"
    ],
    "SecurityGroups": ["sg-0123456789abcdef0"],
    "StorageInfo": {
      "EbsStorageInfo": {
        "VolumeSize": 1000,
        "ProvisionedThroughput": {
          "Enabled": true,
          "VolumeThroughput": 250
        }
      }
    }
  }' \
  --encryption-info '{
    "EncryptionInTransit": {"ClientBroker": "TLS", "InCluster": true}
  }' \
  --enhanced-monitoring "PER_TOPIC_PER_PARTITION" \
  --open-monitoring '{
    "Prometheus": {
      "JmxExporter":  {"EnabledInBroker": true},
      "NodeExporter": {"EnabledInBroker": true}
    }
  }'

# 3 — Wait for the cluster to become ACTIVE (5–15 min)
aws kafka describe-cluster --cluster-arn "arn:aws:kafka:..." \
  --query 'ClusterInfo.State'

# 4 — Retrieve bootstrap broker endpoints
aws kafka get-bootstrap-brokers \
  --cluster-arn "arn:aws:kafka:us-east-1:123456789012:cluster/prod-events/..."

Terraform Module for a Production MSK Cluster

variable "cluster_name"   { default = "prod-events" }
variable "kafka_version"  { default = "3.5.1" }
variable "instance_type"  { default = "kafka.m5.xlarge" }
variable "subnet_ids"     { type = list(string) }
variable "security_group_ids" { type = list(string) }

resource "aws_msk_cluster" "main" {
  cluster_name           = var.cluster_name
  kafka_version          = var.kafka_version
  number_of_broker_nodes = 3

  broker_node_group_info {
    instance_type   = var.instance_type
    client_subnets  = var.subnet_ids
    security_groups = var.security_group_ids

    storage_info {
      ebs_storage_info {
        volume_size = 1000
        provisioned_throughput {
          enabled           = true
          volume_throughput = 250
        }
      }
    }
  }

  encryption_info {
    encryption_in_transit {
      client_broker = "TLS"
      in_cluster    = true
    }
  }

  client_authentication {
    sasl {
      scram = true
      iam   = true
    }
    tls {
      certificate_authority_arns = []
    }
  }

  configuration_info {
    arn      = aws_msk_configuration.main.arn
    revision = aws_msk_configuration.main.latest_revision
  }

  enhanced_monitoring = "PER_TOPIC_PER_PARTITION"

  open_monitoring {
    prometheus {
      jmx_exporter  { enabled_in_broker = true }
      node_exporter { enabled_in_broker = true }
    }
  }

  logging_info {
    broker_logs {
      cloudwatch_logs {
        enabled   = true
        log_group = "/msk/prod-events/broker-logs"
      }
      s3 {
        enabled = true
        bucket  = aws_s3_bucket.msk_logs.id
        prefix  = "broker-logs/"
      }
    }
  }

  tags = {
    Environment = "production"
    Team        = "platform"
  }
}

resource "aws_msk_configuration" "main" {
  name              = "${var.cluster_name}-config"
  kafka_versions    = [var.kafka_version]
  server_properties = <<-PROPS
    auto.create.topics.enable=false
    default.replication.factor=3
    min.insync.replicas=2
    num.io.threads=8
    num.network.threads=5
    num.partitions=6
    num.replica.fetchers=2
    replica.lag.time.max.ms=30000
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    socket.send.buffer.bytes=102400
    unclean.leader.election.enable=false
    zookeeper.session.timeout.ms=18000
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    compression.type=lz4
  PROPS
}

output "bootstrap_brokers_tls" {
  value = aws_msk_cluster.main.bootstrap_brokers_tls
}
output "bootstrap_brokers_sasl_iam" {
  value = aws_msk_cluster.main.bootstrap_brokers_sasl_iam
}
output "zookeeper_connect" {
  value = aws_msk_cluster.main.zookeeper_connect_string
}
Broker sizing guidance: kafka.m5.large (2 vCPU, 8 GB) handles ~50 MB/s per broker for development. kafka.m5.xlarge (4 vCPU, 16 GB) is the minimum recommended for production. kafka.m5.4xlarge or kafka.m5.12xlarge for high-throughput (>200 MB/s per broker) workloads. Always provision 3 brokers across 3 AZs for HA — never 2.

Topics, Partitioning Strategy, and Replication Factor

Topics are the fundamental unit of organization in Kafka. Getting partitioning right at creation time prevents painful repartitioning later — downstream consumers relying on key-based ordering break when you add partitions, because the hash assignment changes.

Creating Topics with the Kafka CLI

export BOOTSTRAP="b-1.prod-events.xyz.kafka.us-east-1.amazonaws.com:9096,b-2.prod-events.xyz.kafka.us-east-1.amazonaws.com:9096"

# Create a topic: 12 partitions, replication factor 3, 7-day retention
kafka-topics.sh \
  --bootstrap-server "$BOOTSTRAP" \
  --command-config client.properties \
  --create \
  --topic user-events \
  --partitions 12 \
  --replication-factor 3 \
  --config retention.ms=604800000 \
  --config min.insync.replicas=2 \
  --config compression.type=lz4 \
  --config cleanup.policy=delete

# List topics
kafka-topics.sh --bootstrap-server "$BOOTSTRAP" \
  --command-config client.properties --list

# Describe a topic (partition leaders, ISR)
kafka-topics.sh --bootstrap-server "$BOOTSTRAP" \
  --command-config client.properties \
  --describe --topic user-events

Partitioning Strategy: How Many Partitions?

Partitions are the unit of parallelism in Kafka. A consumer group can have at most one active consumer per partition — so if you have 6 partitions and 10 consumer instances, 4 instances sit idle. The formula for choosing partition count is:

Partitions = max(
    target_throughput_MB_per_sec / throughput_per_partition_MB_per_sec,
    max_consumer_parallelism_you_will_ever_need
)

-- Rule of thumb for MSK kafka.m5.xlarge brokers:
--   Write throughput per partition: ~10–15 MB/s
--   Consumer throughput per partition: ~50 MB/s (if not doing heavy processing)

-- Example: 120 MB/s ingest, max 24 consumers needed
--   120 / 12 = 10 write partitions minimum
--   max parallelism = 24
--   Choose: 24 partitions
Partition count is permanent (practically): You can increase partitions but not decrease them. Increasing partitions breaks key ordering for existing keys — if order matters for your consumers, size generously upfront (use a power of 2 or a multiple of your target consumer count). For event streaming where order is per-user or per-device, 12–48 partitions is typical for medium workloads.

Replication Factor and min.insync.replicas

Always set replication.factor=3 for production topics on a 3-broker MSK cluster. Set min.insync.replicas=2 so that writes are only acknowledged when at least 2 of the 3 replicas have written the message. Combined with producer setting acks=all, this ensures no data loss even if one broker fails at the moment of writing. Setting min.insync.replicas=3 means a single broker failure blocks all writes — avoid this unless your workload demands it.

Producer and Consumer Code: Java Spring Kafka + Python

Because MSK is 100% Kafka-protocol compatible, standard Kafka client libraries work unchanged. Below are complete, production-ready examples using Spring Kafka (Java) and kafka-python, including TLS and SASL/SCRAM configuration for connecting to an MSK cluster.

Spring Kafka Producer (Java)

<!-- pom.xml dependencies -->
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
</dependency>
# application.yml — MSK with SASL/SCRAM over TLS
spring:
  kafka:
    bootstrap-servers: "${MSK_BOOTSTRAP_BROKERS}"
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all
      retries: 3
      properties:
        enable.idempotence: true
        max.in.flight.requests.per.connection: 5
        linger.ms: 5
        batch.size: 65536
        compression.type: lz4
    consumer:
      group-id: order-processing-service
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "com.techoral.events"
        max.poll.records: 500
        fetch.min.bytes: 50000
        fetch.max.wait.ms: 500
    security:
      protocol: SASL_SSL
    properties:
      sasl.mechanism: SCRAM-SHA-512
      sasl.jaas.config: >
        org.apache.kafka.common.security.scram.ScramLoginModule required
        username="${MSK_USERNAME}"
        password="${MSK_PASSWORD}";
      ssl.truststore.location: /etc/kafka/truststore.jks
      ssl.truststore.password: "${TRUSTSTORE_PASSWORD}"
// OrderEvent.java — event POJO
package com.techoral.events;

import java.time.Instant;

public record OrderEvent(
    String orderId,
    String userId,
    String status,
    double totalAmount,
    String currency,
    Instant timestamp
) {}

// OrderProducerService.java
package com.techoral.kafka;

import com.techoral.events.OrderEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;

@Slf4j
@Service
@RequiredArgsConstructor
public class OrderProducerService {

    private static final String TOPIC = "order-events";
    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    public void publishOrder(OrderEvent event) {
        // Partition key = userId ensures all events for one user go to same partition
        CompletableFuture<SendResult<String, OrderEvent>> future =
            kafkaTemplate.send(TOPIC, event.userId(), event);

        future.whenComplete((result, ex) -> {
            if (ex != null) {
                log.error("Failed to publish order {} to MSK: {}", event.orderId(), ex.getMessage());
                // publish to DLQ or retry queue here
            } else {
                log.debug("Published order {} to partition {} offset {}",
                    event.orderId(),
                    result.getRecordMetadata().partition(),
                    result.getRecordMetadata().offset());
            }
        });
    }

    // Transactional producer — exactly-once semantics
    public void publishOrderTransactional(OrderEvent event) {
        kafkaTemplate.executeInTransaction(ops -> {
            ops.send(TOPIC, event.userId(), event);
            return true;
        });
    }
}

// OrderConsumerService.java
package com.techoral.kafka;

import com.techoral.events.OrderEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

import java.util.List;

@Slf4j
@Service
public class OrderConsumerService {

    // Batch listener — process up to 500 records per poll
    @KafkaListener(
        topics = "order-events",
        containerFactory = "batchKafkaListenerContainerFactory"
    )
    public void consumeOrders(
        @Payload List<OrderEvent> events,
        @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets,
        Acknowledgment ack
    ) {
        log.info("Processing batch of {} order events", events.size());
        try {
            for (OrderEvent event : events) {
                processOrder(event);
            }
            // Manual commit after successful batch processing
            ack.acknowledge();
        } catch (Exception e) {
            log.error("Batch processing failed, not committing offsets", e);
            // Nack causes retry from last committed offset
            ack.nack(0, java.time.Duration.ofSeconds(5));
        }
    }

    private void processOrder(OrderEvent event) {
        // business logic here
        log.debug("Processing order: {} status: {}", event.orderId(), event.status());
    }
}

Python Producer and Consumer (kafka-python)

"""
msk_producer.py — Python producer for AWS MSK with SASL/SCRAM + TLS
pip install kafka-python boto3
"""
import json
import ssl
import time
import uuid
from datetime import datetime
from kafka import KafkaProducer
from kafka.errors import KafkaError

def create_producer(bootstrap_servers: str, username: str, password: str) -> KafkaProducer:
    """Create a KafkaProducer configured for MSK SASL/SCRAM over TLS."""
    ssl_ctx = ssl.create_default_context()
    ssl_ctx.check_hostname = False  # MSK uses wildcard certs
    ssl_ctx.verify_mode = ssl.CERT_REQUIRED

    return KafkaProducer(
        bootstrap_servers=bootstrap_servers.split(','),
        security_protocol='SASL_SSL',
        sasl_mechanism='SCRAM-SHA-512',
        sasl_plain_username=username,
        sasl_plain_password=password,
        ssl_context=ssl_ctx,
        value_serializer=lambda v: json.dumps(v).encode('utf-8'),
        key_serializer=lambda k: k.encode('utf-8') if k else None,
        acks='all',
        retries=3,
        max_in_flight_requests_per_connection=5,
        enable_idempotence=True,
        compression_type='lz4',
        batch_size=65536,
        linger_ms=5,
    )

def on_send_success(record_metadata):
    print(f"Sent to {record_metadata.topic}[{record_metadata.partition}] "
          f"@ offset {record_metadata.offset}")

def on_send_error(excp):
    print(f"Producer error: {excp}")

def publish_user_events(producer: KafkaProducer, user_id: str, n: int = 100):
    """Publish n synthetic user events, partitioned by user_id."""
    for i in range(n):
        event = {
            'event_id':   str(uuid.uuid4()),
            'user_id':    user_id,
            'event_type': 'page_view' if i % 3 != 0 else 'click',
            'page':       f'/products/{i % 20}',
            'timestamp':  datetime.utcnow().isoformat() + 'Z',
            'session_id': str(uuid.uuid4()),
        }
        producer.send(
            topic='user-events',
            key=user_id,        # same key → same partition → ordered per user
            value=event,
        ).add_callback(on_send_success).add_errback(on_send_error)

        if i % 50 == 0:
            producer.flush()    # flush every 50 records

    producer.flush()
    print(f"Published {n} events for user {user_id}")


"""
msk_consumer.py — Python consumer with manual offset commit and lag tracking
"""
from kafka import KafkaConsumer, TopicPartition
from kafka.structs import OffsetAndMetadata

def create_consumer(bootstrap_servers, username, password, group_id):
    ssl_ctx = ssl.create_default_context()
    ssl_ctx.check_hostname = False
    ssl_ctx.verify_mode = ssl.CERT_REQUIRED

    return KafkaConsumer(
        'user-events',
        bootstrap_servers=bootstrap_servers.split(','),
        security_protocol='SASL_SSL',
        sasl_mechanism='SCRAM-SHA-512',
        sasl_plain_username=username,
        sasl_plain_password=password,
        ssl_context=ssl_ctx,
        group_id=group_id,
        auto_offset_reset='earliest',
        enable_auto_commit=False,       # manual commit for exactly-once processing
        value_deserializer=lambda m: json.loads(m.decode('utf-8')),
        key_deserializer=lambda m: m.decode('utf-8') if m else None,
        max_poll_records=500,
        fetch_min_bytes=50000,
        fetch_max_wait_ms=500,
        session_timeout_ms=30000,
        heartbeat_interval_ms=10000,
    )

def consume_events(consumer: KafkaConsumer):
    """Poll loop with manual offset commit and dead-letter handling."""
    dlq: list = []
    batch_count = 0

    try:
        for message in consumer:
            event = message.value
            try:
                process_event(event)
                # Commit this specific offset after successful processing
                tp = TopicPartition(message.topic, message.partition)
                consumer.commit({tp: OffsetAndMetadata(message.offset + 1, None)})
                batch_count += 1
                if batch_count % 1000 == 0:
                    print(f"Processed {batch_count} events, current lag: "
                          f"partition={message.partition} offset={message.offset}")
            except Exception as e:
                print(f"Failed to process event {event.get('event_id')}: {e}")
                dlq.append({'event': event, 'error': str(e)})
    except KeyboardInterrupt:
        print(f"Shutting down. DLQ size: {len(dlq)}")
    finally:
        consumer.close()

def process_event(event: dict):
    # Example: write to DynamoDB, update real-time dashboard, etc.
    pass
Idempotent producers: Set enable.idempotence=true and acks=all together. This gives you exactly-once delivery at the producer level — the broker deduplicates retried records using a sequence number. For end-to-end exactly-once (producer → consumer → database), use Kafka transactions with isolation.level=read_committed on consumers.

Consumer Groups, Offset Management, and Lag Monitoring

Consumer groups are the scalability unit on the consumer side. Every consumer in a group is assigned a disjoint set of partitions — each partition is consumed by exactly one group member at a time. Multiple independent groups (e.g., analytics service, audit logger, recommendation engine) can each consume the same topic independently at their own pace.

Offset Management

# List all consumer groups
kafka-consumer-groups.sh \
  --bootstrap-server "$BOOTSTRAP" \
  --command-config client.properties \
  --list

# Describe a group — shows current offset, end offset, and lag per partition
kafka-consumer-groups.sh \
  --bootstrap-server "$BOOTSTRAP" \
  --command-config client.properties \
  --describe --group order-processing-service

# Example output:
# GROUP                    TOPIC        PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# order-processing-service order-events 0          100241          100241          0
# order-processing-service order-events 1          99873           100100          227
# order-processing-service order-events 2          100102          100102          0

# Reset offsets to beginning (useful for reprocessing after a bug fix)
kafka-consumer-groups.sh \
  --bootstrap-server "$BOOTSTRAP" \
  --command-config client.properties \
  --group order-processing-service \
  --topic order-events \
  --reset-offsets --to-earliest \
  --execute

# Reset to a specific timestamp (replay last 2 hours)
kafka-consumer-groups.sh \
  --bootstrap-server "$BOOTSTRAP" \
  --command-config client.properties \
  --group order-processing-service \
  --topic order-events \
  --reset-offsets --to-datetime 2026-06-08T06:00:00.000 \
  --execute

Lag Monitoring via Python Script

"""
lag_monitor.py — Emit consumer group lag to CloudWatch custom metrics.
Run on a cron or as a Lambda function every minute.
"""
import boto3
from kafka import KafkaAdminClient
from kafka.admin import NewPartitions

cloudwatch = boto3.client('cloudwatch', region_name='us-east-1')

def get_consumer_lag(bootstrap_servers, username, password, group_id, topic):
    admin = KafkaAdminClient(
        bootstrap_servers=bootstrap_servers.split(','),
        security_protocol='SASL_SSL',
        sasl_mechanism='SCRAM-SHA-512',
        sasl_plain_username=username,
        sasl_plain_password=password,
    )
    offsets = admin.list_consumer_group_offsets(group_id)
    admin.close()

    from kafka import KafkaConsumer
    consumer = KafkaConsumer(
        bootstrap_servers=bootstrap_servers.split(','),
        security_protocol='SASL_SSL',
        sasl_mechanism='SCRAM-SHA-512',
        sasl_plain_username=username,
        sasl_plain_password=password,
    )
    end_offsets = consumer.end_offsets(list(offsets.keys()))
    consumer.close()

    total_lag = 0
    for tp, om in offsets.items():
        if tp.topic == topic:
            lag = end_offsets[tp] - om.offset
            total_lag += max(0, lag)

    return total_lag

def publish_lag_metric(group_id, topic, lag):
    cloudwatch.put_metric_data(
        Namespace='MSK/ConsumerLag',
        MetricData=[{
            'MetricName': 'TotalLag',
            'Dimensions': [
                {'Name': 'ConsumerGroup', 'Value': group_id},
                {'Name': 'Topic',         'Value': topic},
            ],
            'Value': lag,
            'Unit': 'Count',
        }]
    )
    print(f"Published lag={lag} for group={group_id} topic={topic}")
Lag alert threshold: Set a CloudWatch alarm on your custom TotalLag metric. A good starting threshold is 10× the records-per-second your consumer processes — this gives you ~10 seconds of buffer before you're falling behind in real-time. For a consumer processing 1,000 records/sec, alarm at lag > 10,000.

MSK Connect: Managed Kafka Connect for S3 and RDS Sinks

Kafka Connect is the integration layer of the Kafka ecosystem — a framework for streaming data between Kafka and external systems without writing producer/consumer code. MSK Connect is a fully managed service that runs Kafka Connect workers as a scalable, serverless fleet. AWS handles worker provisioning, failure recovery, scaling, and Kafka connectivity. You supply connector plugins and configuration.

Deploying an S3 Sink Connector

# 1 — Upload the Confluent S3 Sink connector plugin to S3
aws s3 cp confluentinc-kafka-connect-s3-10.5.4.zip \
  s3://my-msk-plugins/connectors/

# 2 — Create the custom plugin in MSK Connect
aws kafkaconnect create-custom-plugin \
  --name "s3-sink-v10" \
  --content-type ZIP \
  --location '{
    "s3Location": {
      "bucketArn": "arn:aws:s3:::my-msk-plugins",
      "fileKey": "connectors/confluentinc-kafka-connect-s3-10.5.4.zip"
    }
  }'

# 3 — Create the connector (wait for plugin ARN from step 2)
aws kafkaconnect create-connector \
  --connector-name "order-events-to-s3" \
  --kafka-cluster '{
    "apacheKafkaCluster": {
      "bootstrapServers": "b-1.prod-events.xyz:9096,b-2.prod-events.xyz:9096",
      "vpc": {
        "subnets": ["subnet-001", "subnet-002", "subnet-003"],
        "securityGroups": ["sg-0123456789abcdef0"]
      }
    }
  }' \
  --kafka-cluster-client-authentication '{"authenticationType": "IAM"}' \
  --kafka-cluster-encryption-in-transit '{"encryptionType": "TLS"}' \
  --plugins '[{"customPlugin": {"customPluginArn": "arn:aws:kafkaconnect:...", "revision": 1}}]' \
  --connector-configuration '{
    "connector.class":                      "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max":                            "6",
    "topics":                               "order-events",
    "s3.region":                            "us-east-1",
    "s3.bucket.name":                       "my-event-lake",
    "s3.part.size":                         "67108864",
    "flush.size":                           "10000",
    "rotate.interval.ms":                   "300000",
    "storage.class":                        "io.confluent.connect.s3.storage.S3Storage",
    "format.class":                         "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "parquet.codec":                        "snappy",
    "schema.compatibility":                 "FULL",
    "locale":                               "en_US",
    "timezone":                             "UTC",
    "timestamp.extractor":                  "RecordField",
    "timestamp.field":                      "timestamp",
    "partitioner.class":                    "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format":                          "'"'"'year'"'"'=YYYY/'"'"'month'"'"'=MM/'"'"'day'"'"'=dd/'"'"'hour'"'"'=HH",
    "locale":                               "en_US",
    "timezone":                             "UTC"
  }' \
  --capacity '{
    "autoScaling": {
      "minWorkerCount": 1,
      "maxWorkerCount": 6,
      "scaleInPolicy": {"cpuUtilizationPercentage": 20},
      "scaleOutPolicy": {"cpuUtilizationPercentage": 80}
    }
  }' \
  --service-execution-role-arn "arn:aws:iam::123456789012:role/MSKConnectRole"
Output format choice: Use Parquet + Snappy compression for S3 sinks that will be queried by Athena or Redshift Spectrum — you get columnar storage, predicate pushdown, and 3–5× better query performance compared to JSON. Use JSON only if downstream consumers need human-readable files or can't handle Parquet.

Debezium CDC Connector for RDS → Kafka

{
  "name": "rds-postgres-cdc",
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "tasks.max": "1",
  "database.hostname": "prod-db.cluster-xyz.us-east-1.rds.amazonaws.com",
  "database.port": "5432",
  "database.user": "${file:/opt/kafka/secrets/db-creds.properties:db.user}",
  "database.password": "${file:/opt/kafka/secrets/db-creds.properties:db.password}",
  "database.dbname": "orders",
  "database.server.name": "prod-rds",
  "table.include.list": "public.orders,public.order_items",
  "plugin.name": "pgoutput",
  "publication.autocreate.mode": "filtered",
  "slot.name": "debezium_orders",
  "heartbeat.interval.ms": "5000",
  "snapshot.mode": "initial",
  "topic.prefix": "cdc",
  "transforms": "unwrap",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.unwrap.drop.tombstones": "false",
  "transforms.unwrap.delete.handling.mode": "rewrite"
}

MSK Serverless: Auto-Scaling and When to Use It

MSK Serverless is a cluster mode where you provision no brokers, choose no instance type, and manage no storage. Capacity scales automatically based on actual throughput, and you pay per GB ingested and per GB stored. It was designed for workloads with unpredictable or bursty traffic patterns.

Creating an MSK Serverless Cluster

# MSK Serverless — no broker count or instance type needed
aws kafka create-cluster-v2 \
  --cluster-name "serverless-events" \
  --serverless '{
    "vpcConfigs": [
      {
        "subnetIds": ["subnet-001", "subnet-002", "subnet-003"],
        "securityGroupIds": ["sg-0123456789abcdef0"]
      }
    ],
    "clientAuthentication": {
      "sasl": {"iam": {"enabled": true}}
    }
  }'
# Terraform — MSK Serverless
resource "aws_msk_serverless_cluster" "events" {
  cluster_name = "serverless-events"

  vpc_config {
    subnet_ids         = var.subnet_ids
    security_group_ids = var.security_group_ids
  }

  client_authentication {
    sasl {
      iam { enabled = true }
    }
  }

  tags = { Environment = "staging" }
}

MSK Serverless Limits and Pricing (2026)

DimensionMSK ServerlessMSK Standard (m5.xlarge, 3 brokers)
Max throughput (write)200 MB/s per cluster~300 MB/s (3 × 100 MB/s per broker)
Max partitions120 per clusterNo hard limit (thousands)
RetentionUp to 24 hoursConfigurable (days to forever with tiered storage)
IAM auth onlyYes — SASL/SCRAM not supportedNo — supports TLS, SASL/SCRAM, IAM
Pricing~$0.24/GB ingested + $0.024/GB stored~$0.51/hr for 3 × m5.xlarge + EBS
Break-even pointBelow ~60 GB/hr ingestedAbove ~60 GB/hr ingested
When to choose Serverless: Use MSK Serverless for development and staging environments (no idle broker cost), for event-driven systems with highly variable traffic (batch pipelines that run nightly), or when you need to launch quickly without capacity planning. Move to Standard clusters when you exceed 120 partitions, need SASL/SCRAM auth, require retention beyond 24 hours, or reach the cost break-even point.

Security: TLS, SASL/SCRAM, IAM Auth, and VPC Config

MSK supports three authentication mechanisms, and you can enable multiple simultaneously. TLS (mTLS) uses X.509 client certificates issued by a private CA in ACM; SASL/SCRAM uses usernames and passwords stored in AWS Secrets Manager; IAM auth uses AWS IAM policies and is the preferred option for applications running on EC2, ECS, Lambda, or EKS.

IAM Authentication (Recommended for AWS Workloads)

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kafka-cluster:Connect",
        "kafka-cluster:AlterCluster",
        "kafka-cluster:DescribeCluster"
      ],
      "Resource": "arn:aws:kafka:us-east-1:123456789012:cluster/prod-events/*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "kafka-cluster:*Topic*",
        "kafka-cluster:WriteData",
        "kafka-cluster:ReadData"
      ],
      "Resource": "arn:aws:kafka:us-east-1:123456789012:topic/prod-events/*/order-events"
    },
    {
      "Effect": "Allow",
      "Action": "kafka-cluster:AlterGroup",
      "Resource": "arn:aws:kafka:us-east-1:123456789012:group/prod-events/*/order-processing-service"
    }
  ]
}
// Java — IAM auth via MSK IAM library
// pom.xml: add software.amazon.msk:aws-msk-iam-auth:2.0.3

import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapBrokersSaslIam);
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "AWS_MSK_IAM");
props.put("sasl.jaas.config",
    "software.amazon.msk.auth.iam.IAMLoginModule required;");
props.put("sasl.client.callback.handler.class",
    "software.amazon.msk.auth.iam.IAMClientCallbackHandler");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

SASL/SCRAM Setup (Secrets Manager)

# 1 — Create a SCRAM secret in Secrets Manager
# Secret name MUST begin with "AmazonMSK_"
aws secretsmanager create-secret \
  --name "AmazonMSK_prod-events_app-user" \
  --secret-string '{"username":"app-user","password":"S3cur3P@ssw0rd!"}'

# 2 — Associate the secret with the MSK cluster
aws kafka batch-associate-scram-secret \
  --cluster-arn "arn:aws:kafka:us-east-1:123456789012:cluster/prod-events/..." \
  --secret-arn-list "arn:aws:secretsmanager:us-east-1:123456789012:secret:AmazonMSK_prod-events_app-user-..."

# 3 — Verify association
aws kafka list-scram-secrets \
  --cluster-arn "arn:aws:kafka:us-east-1:123456789012:cluster/prod-events/..."

VPC and Security Group Best Practices

MSK clusters must be deployed in a VPC. Brokers are placed in private subnets (no public IP by default — enable public access via PubliclyAccessible: true only for development). Security group rules should allow inbound traffic on broker ports only from client security groups, never from 0.0.0.0/0:

resource "aws_security_group_rule" "msk_from_app" {
  type                     = "ingress"
  from_port                = 9096   # SASL_SSL
  to_port                  = 9096
  protocol                 = "tcp"
  source_security_group_id = aws_security_group.app_service.id
  security_group_id        = aws_security_group.msk_broker.id
  description              = "App service → MSK SASL/SSL"
}
# Port 9094 = TLS (mTLS), 9092 = Plaintext (dev only), 9096 = SASL_SSL, 9098 = IAM

Monitoring: CloudWatch Metrics and Prometheus/Grafana

MSK emits over 150 CloudWatch metrics across three levels of detail. DEFAULT monitoring includes cluster-level and broker-level metrics at no extra charge. PER_BROKER adds per-broker metrics. PER_TOPIC_PER_BROKER and PER_TOPIC_PER_PARTITION are the most granular and cost extra CloudWatch fees — enable them selectively for critical topics.

Key CloudWatch Metrics to Alert On

MetricNamespaceAlert ThresholdMeaning
KafkaDataLogsDiskUsedAWS/Kafka> 80%Disk full → broker crash
UnderReplicatedPartitionsAWS/Kafka> 0 for 5+ minReplication lag — data durability risk
ActiveControllerCountAWS/Kafka!= 1Cluster controller is down
OfflinePartitionsCountAWS/Kafka> 0Partitions unavailable — data loss imminent
BytesInPerSecAWS/KafkaTrending to broker limitCapacity planning — add brokers
ProduceTotalTimeMsMeanAWS/Kafka> 500 msBroker latency spike
EstimatedMaxTimeLagAWS/KafkaWorkload-specificConsumer falling behind

Prometheus + Grafana Integration

# prometheus.yml — scrape MSK broker JMX and Node Exporter metrics
# MSK exposes these when open_monitoring.prometheus is enabled
scrape_configs:
  - job_name: 'msk-jmx'
    static_configs:
      - targets:
          - 'b-1.prod-events.xyz.kafka.us-east-1.amazonaws.com:11001'
          - 'b-2.prod-events.xyz.kafka.us-east-1.amazonaws.com:11001'
          - 'b-3.prod-events.xyz.kafka.us-east-1.amazonaws.com:11001'
    scrape_interval: 60s

  - job_name: 'msk-node'
    static_configs:
      - targets:
          - 'b-1.prod-events.xyz.kafka.us-east-1.amazonaws.com:11002'
          - 'b-2.prod-events.xyz.kafka.us-east-1.amazonaws.com:11002'
          - 'b-3.prod-events.xyz.kafka.us-east-1.amazonaws.com:11002'
    scrape_interval: 60s
# Grafana — useful PromQL queries for MSK dashboards

# Total bytes in per second (cluster-wide)
sum(rate(kafka_server_brokertopicmetrics_bytesin_total[5m]))

# Under-replicated partitions (should be 0)
sum(kafka_server_replicamanager_underreplicatedpartitions)

# Consumer lag per group + topic
sum by (consumergroup, topic) (
  kafka_consumergroup_lag
)

# Network processor idle ratio (should stay above 30%)
avg(kafka_network_processor_idle_ratio) by (listener)

# Request queue size (should stay near 0)
avg(kafka_network_requestchannel_request_queue_size)

Cost Optimization: Broker Sizing and Storage Tiering

MSK costs have three components: broker-hour pricing, EBS storage, and data transfer. For a 3-broker kafka.m5.xlarge cluster in us-east-1, broker cost alone is ~$0.51/hr = ~$370/month. Here is a systematic framework for keeping those costs under control.

Right-Sizing Brokers

Start by measuring actual CPU and network utilization over a 2-week period. kafka.m5.large (2 vCPU, 8 GB, ~100 MB/s network) is sufficient for <50 MB/s total ingest. Move to kafka.m5.xlarge when you hit 60% CPU or 70% network utilization. Use the broker upgrade path — MSK supports live rolling broker instance type changes with no downtime.

# Check CPU utilization over last 7 days (pick the right-size broker)
aws cloudwatch get-metric-statistics \
  --namespace AWS/Kafka \
  --metric-name CpuUser \
  --dimensions Name=Cluster Name,Value=prod-events Name=BrokerID,Value=1 \
  --start-time 2026-06-01T00:00:00Z \
  --end-time   2026-06-08T00:00:00Z \
  --period 3600 \
  --statistics Average Maximum \
  --output table

# Update broker instance type (rolling — no downtime)
aws kafka update-broker-type \
  --cluster-arn "arn:aws:kafka:..." \
  --current-version "K3MAGZA5ZBCQPD" \
  --target-instance-type "kafka.m5.2xlarge"

Tiered Storage

MSK Tiered Storage offloads older log segments from broker EBS volumes to S3, transparently. EBS costs ~$0.10/GB-month; S3 costs ~$0.023/GB-month — a 4× saving on the cold tier. Consumers can still read tiered data seamlessly. Enable it per-topic:

# Enable tiered storage on the cluster first (requires MSK >= 2.8.0)
aws kafka update-cluster-kafka-version \
  --cluster-arn "arn:aws:kafka:..." \
  --current-version "K3MAGZA5ZBCQPD" \
  --target-kafka-version "3.5.1"

# Configure local retention (hot tier on EBS) vs total retention
kafka-configs.sh \
  --bootstrap-server "$BOOTSTRAP" \
  --command-config client.properties \
  --alter \
  --entity-type topics \
  --entity-name order-events \
  --add-config \
    remote.storage.enable=true,\
    local.retention.ms=86400000,\
    retention.ms=2592000000

Compression and Storage Best Practices

  • Set compression.type=lz4 at the producer level — LZ4 is fast and achieves 2–4× compression on JSON events, directly reducing both network transfer and disk cost.
  • Use compact cleanup policy for changelog topics (e.g., user profiles, product catalog) — Kafka retains only the latest value per key, dramatically reducing storage versus time-based retention.
  • Delete unused consumer groups promptly — stale offsets stored in __consumer_offsets accumulate over time.
  • Set log.segment.bytes=512MB for topics with tiered storage — smaller segments tier to S3 sooner, reducing EBS footprint.
  • Use Reserved Instances for long-running MSK clusters — AWS offers 1-year and 3-year reserved pricing for MSK brokers, saving up to 40% vs on-demand.
Cost estimate: A typical production setup — 3 × kafka.m5.xlarge brokers with 500 GB EBS each and tiered storage for 30-day retention — costs approximately $550–650/month. MSK Serverless for the same 30-day workload at 10 GB/hr ingest costs roughly $1,700/month. Standard clusters win on cost above ~30 GB/day sustained ingest.

Frequently Asked Questions

Can I use MSK with on-premises Kafka producers?

Yes. MSK is 100% Kafka protocol compatible. On-premises producers using standard Kafka clients connect to MSK broker endpoints over TLS. Ensure your on-premises network has connectivity to the MSK VPC (via AWS Direct Connect or VPN), configure the client with SASL/SCRAM or mTLS credentials, and update the bootstrap.servers property. No code changes are required. For large-scale on-premises → MSK migrations, use MirrorMaker 2 to replicate topics during the cutover period.

What is the difference between MSK and MSK Serverless?

MSK Standard gives you dedicated brokers with fixed capacity (instance type × broker count), predictable costs, unlimited partition count, configurable retention, and all authentication options (TLS, SASL/SCRAM, IAM). MSK Serverless auto-scales capacity invisibly, requires no broker provisioning, supports only IAM authentication, caps retention at 24 hours, and limits you to 120 partitions. Serverless is ideal for dev/staging and variable workloads; Standard is required for production at scale.

How does MSK handle broker failures?

When a broker fails, MSK automatically replaces it within 15–30 minutes. During replacement, partitions whose leader was on the failed broker elect a new leader from the in-sync replica (ISR) set — typically within seconds. If min.insync.replicas=2 is set and the ISR drops below that threshold, produce requests with acks=all will fail with NotEnoughReplicasException until the replacement broker catches up. Design producers with retry logic and exponential backoff to handle this brief window.

Can I run ksqlDB or Schema Registry on MSK?

MSK does not include ksqlDB or Schema Registry — those are Confluent Platform components. However, you can run them yourself as containers on ECS or EKS, pointed at your MSK cluster. For Schema Registry specifically, AWS Glue Schema Registry is a fully managed alternative that integrates natively with MSK, Kinesis, and other AWS services and supports Avro, JSON Schema, and Protobuf.

How do I upgrade the Kafka version on a running MSK cluster?

Use the MSK console or CLI: aws kafka update-cluster-kafka-version --target-kafka-version X.Y.Z. MSK performs a rolling upgrade — one broker at a time — with no downtime. The upgrade takes 20–45 minutes depending on cluster size and data volume. Follow the Kafka upgrade path (you cannot skip major versions): upgrade 2.8 → 3.x requires an intermediate step if jumping more than one major version. Always test on a staging cluster first and review the Kafka release notes for incompatible config changes.