AWS IoT Core: Connecting Millions of Devices to the Cloud (2026)

AWS IoT Core — Device Connectivity

AWS IoT Core is a fully managed cloud service that lets you connect billions of IoT devices — sensors, controllers, edge gateways, industrial machines — to the cloud securely and reliably, without managing infrastructure. Devices publish telemetry over MQTT, HTTP, or WebSocket; the Rules Engine routes that data to over 15 AWS services in real time; and Device Shadow keeps a persistent virtual representation of every device's state even when it's offline. In this guide, we walk through every major IoT Core component with production-grade code, CLI examples, and a complete smart-building reference architecture.

IoT Core Architecture Overview

AWS IoT Core is not a single service — it is a layered platform composed of five distinct subsystems that work together to handle device connectivity at planet scale.

Device Gateway is the connection broker. Devices open a persistent TCP connection over MQTT (port 8883, TLS 1.2+), MQTT over WebSocket (port 443), or HTTPS. The gateway handles connection state, heartbeats (MQTT keep-alive), and scales horizontally without any configuration from you. A single IoT Core endpoint can sustain hundreds of millions of simultaneous connections.

Message Broker is the publish/subscribe backbone. Devices and cloud applications publish messages to hierarchical topic strings like building/floor3/room12/temperature. Subscribers — including other devices, Lambda functions, or IoT Rules — receive messages matching topic filter patterns. The broker supports wildcard subscriptions: + matches a single level, # matches all remaining levels.

Rules Engine evaluates an SQL-like query against every inbound message and triggers actions — writing to DynamoDB, invoking Lambda, publishing to Kinesis, sending an SNS alert, storing in S3. Rules are evaluated in parallel; a single message can match multiple rules simultaneously.

Device Shadow Service maintains a JSON document for each Thing representing its last-known state (reported) and the desired future state (desired). Applications update the desired state; the device reconciles the delta when it reconnects. This decouples cloud application logic from device availability.

Thing Registry is the device database. Each physical device is registered as a Thing with a name, type, attributes, and group memberships. The registry integrates with Fleet Indexing to let you query your entire device fleet with SQL.

Key Limits to Know: IoT Core supports messages up to 128 KB. MQTT keep-alive is configurable from 30 to 1,200 seconds. The default throttle for Connect operations is 500 TPS per account per region — request a limit increase before large-scale fleet rollouts. Data transfer pricing is $0.08 per GB; messaging is $1 per million 5 KB message chunks (first 250,000 messages/month free).
# Create a Thing in the registry
aws iot create-thing \
  --thing-name "building-sensor-floor3-room12" \
  --thing-type-name "TemperatureSensor" \
  --attribute-payload '{"attributes":{"building":"HQ","floor":"3","room":"12","firmware":"2.1.4"}}'

# Create a Thing Type
aws iot create-thing-type \
  --thing-type-name "TemperatureSensor" \
  --thing-type-properties '{
    "thingTypeDescription": "Indoor temperature and humidity sensors",
    "searchableAttributes": ["building", "floor", "room", "firmware"]
  }'

# Add Thing to a group for policy management
aws iot create-thing-group \
  --thing-group-name "Building-HQ-Floor3"

aws iot add-thing-to-thing-group \
  --thing-group-name "Building-HQ-Floor3" \
  --thing-name "building-sensor-floor3-room12"

Device Provisioning and X.509 Certificates

Every device connecting to IoT Core must authenticate using one of three methods: X.509 client certificates (most common), custom authorizers (for token-based auth), or Cognito identities (for mobile apps). For embedded and industrial devices, X.509 certificates are the standard.

Each certificate is associated with one or more IoT Policies (JSON documents similar to IAM policies) that define what MQTT topics the device can publish to and subscribe from. A certificate can be attached to multiple Things, and a Thing can have multiple certificates — useful for certificate rotation without downtime.

# Create a certificate and private key (AWS-managed CA)
aws iot create-keys-and-certificate \
  --set-as-active \
  --certificate-pem-outfile device-cert.pem \
  --public-key-outfile device-public.pem \
  --private-key-outfile device-private.pem

# Create an IoT Policy
aws iot create-policy \
  --policy-name "BuildingSensorPolicy" \
  --policy-document '{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Action": "iot:Connect",
        "Resource": "arn:aws:iot:us-east-1:123456789:client/${iot:Connection.Thing.ThingName}"
      },
      {
        "Effect": "Allow",
        "Action": "iot:Publish",
        "Resource": "arn:aws:iot:us-east-1:123456789:topic/building/*/temperature"
      },
      {
        "Effect": "Allow",
        "Action": ["iot:Subscribe","iot:Receive"],
        "Resource": "arn:aws:iot:us-east-1:123456789:topicfilter/$aws/things/${iot:Connection.Thing.ThingName}/shadow/*"
      }
    ]
  }'

# Attach policy to certificate
CERT_ARN=$(aws iot describe-certificate --certificate-id YOUR_CERT_ID --query certificateDescription.certificateArn --output text)
aws iot attach-policy --policy-name "BuildingSensorPolicy" --target "$CERT_ARN"

# Attach certificate to Thing
aws iot attach-thing-principal \
  --thing-name "building-sensor-floor3-room12" \
  --principal "$CERT_ARN"

Fleet Provisioning with Templates

For large deployments — hundreds or thousands of devices — manual certificate creation is impractical. Fleet Provisioning lets devices claim their own certificates at first boot using a bootstrap certificate with limited permissions. The device calls CreateKeysAndCertificate or CreateCertificateFromCsr via MQTT, then the provisioning template creates the Thing, attaches policies, and registers the device automatically.

{
  "Parameters": {
    "SerialNumber": {"Type": "String"},
    "Building": {"Type": "String"},
    "Floor": {"Type": "String"}
  },
  "Resources": {
    "thing": {
      "Type": "AWS::IoT::Thing",
      "Properties": {
        "ThingName": {"Fn::Join": ["-", ["sensor", {"Ref": "SerialNumber"}]]},
        "ThingTypeName": "TemperatureSensor",
        "AttributePayload": {
          "building": {"Ref": "Building"},
          "floor": {"Ref": "Floor"}
        }
      }
    },
    "certificate": {
      "Type": "AWS::IoT::Certificate",
      "Properties": {"CertificateId": {"Ref": "AWS::IoT::Certificate::Id"}, "Status": "Active"}
    },
    "policy": {
      "Type": "AWS::IoT::Policy",
      "Properties": {"PolicyName": "BuildingSensorPolicy"}
    }
  }
}
Just-in-Time Registration (JITR): Alternatively, pre-load device certificates signed by your own CA into IoT Core using JITR. When a device with an unknown certificate connects for the first time, IoT Core triggers a Lambda function via a special registration topic — the Lambda validates the device and activates the certificate automatically.

MQTT Protocol: Topics, QoS, and Python Client

MQTT (Message Queuing Telemetry Transport) is the primary protocol for IoT Core. It is a lightweight publish/subscribe protocol designed for constrained devices and unreliable networks. Understanding its semantics is critical for reliable IoT applications.

QoS Levels: IoT Core supports QoS 0 (at-most-once, fire and forget — no acknowledgment) and QoS 1 (at-least-once — message is stored and retried until the broker receives a PUBACK). QoS 2 is not supported. For sensor telemetry where occasional loss is acceptable, QoS 0 reduces overhead. For commands or state updates, always use QoS 1.

Topic design is hierarchical by convention. A good pattern: {org}/{site}/{device-type}/{device-id}/{data-type}. Avoid using personally identifiable information in topic names as they appear in CloudTrail logs. Keep topic depth under 7 levels for query efficiency.

Retained messages are not supported in IoT Core — the last message is not stored and delivered to new subscribers. Use Device Shadow as the IoT Core equivalent of retained messages.

"""
AWS IoT Core MQTT client using paho-mqtt + AWS SigV4 WebSocket auth.
Publishes temperature readings every 30 seconds.
"""
import ssl
import time
import json
import uuid
import random
import paho.mqtt.client as mqtt

# Configuration
IOT_ENDPOINT = "your-endpoint.iot.us-east-1.amazonaws.com"
IOT_PORT = 8883
THING_NAME = "building-sensor-floor3-room12"
TOPIC_TELEMETRY = f"building/hq/floor3/{THING_NAME}/temperature"
TOPIC_SHADOW_UPDATE = f"$aws/things/{THING_NAME}/shadow/update"

CA_CERT = "/certs/AmazonRootCA1.pem"
DEVICE_CERT = "/certs/device-cert.pem"
PRIVATE_KEY = "/certs/device-private.pem"


def on_connect(client, userdata, flags, rc):
    rc_codes = {
        0: "Connected successfully",
        1: "Connection refused — incorrect protocol version",
        2: "Connection refused — invalid client identifier",
        3: "Connection refused — server unavailable",
        4: "Connection refused — bad username or password",
        5: "Connection refused — not authorised",
    }
    print(f"[MQTT] {rc_codes.get(rc, f'Unknown error {rc}')}")
    if rc == 0:
        # Subscribe to shadow delta (desired state changes from cloud)
        delta_topic = f"$aws/things/{THING_NAME}/shadow/update/delta"
        client.subscribe(delta_topic, qos=1)
        print(f"[MQTT] Subscribed to {delta_topic}")


def on_message(client, userdata, msg):
    payload = json.loads(msg.payload.decode())
    print(f"[SHADOW DELTA] {json.dumps(payload, indent=2)}")
    # Apply configuration changes from cloud
    if "state" in payload and "reporting_interval" in payload["state"]:
        userdata["reporting_interval"] = payload["state"]["reporting_interval"]
        print(f"[CONFIG] Reporting interval updated: {userdata['reporting_interval']}s")


def on_disconnect(client, userdata, rc):
    print(f"[MQTT] Disconnected with code {rc}. Will auto-reconnect.")


def build_client(userdata):
    client = mqtt.Client(client_id=THING_NAME, userdata=userdata)
    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect

    # Mutual TLS authentication with X.509 certificate
    ssl_ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=CA_CERT)
    ssl_ctx.load_cert_chain(certfile=DEVICE_CERT, keyfile=PRIVATE_KEY)
    ssl_ctx.verify_mode = ssl.CERT_REQUIRED
    client.tls_set_context(ssl_ctx)

    return client


def publish_telemetry(client, seq):
    # Simulate temperature sensor reading
    temperature = round(21.5 + random.uniform(-2.0, 2.0), 2)
    humidity = round(45.0 + random.uniform(-5.0, 5.0), 1)

    payload = {
        "device_id": THING_NAME,
        "sequence": seq,
        "timestamp": int(time.time()),
        "temperature_c": temperature,
        "humidity_pct": humidity,
        "battery_v": round(3.7 - (seq * 0.0001), 3),
    }

    # Publish telemetry at QoS 0 (high frequency, loss acceptable)
    result = client.publish(TOPIC_TELEMETRY, json.dumps(payload), qos=0)
    if result.rc == mqtt.MQTT_ERR_SUCCESS:
        print(f"[PUB] seq={seq} temp={temperature}°C humidity={humidity}%")
    else:
        print(f"[ERROR] Publish failed: rc={result.rc}")

    # Update shadow with reported state (QoS 1 — must not lose)
    shadow_payload = {
        "state": {
            "reported": {
                "temperature_c": temperature,
                "humidity_pct": humidity,
                "firmware": "2.1.4",
                "connected": True,
            }
        }
    }
    client.publish(TOPIC_SHADOW_UPDATE, json.dumps(shadow_payload), qos=1)


def main():
    state = {"reporting_interval": 30}
    client = build_client(state)

    client.connect(IOT_ENDPOINT, IOT_PORT, keepalive=60)
    client.loop_start()

    seq = 0
    while True:
        publish_telemetry(client, seq)
        seq += 1
        time.sleep(state["reporting_interval"])


if __name__ == "__main__":
    main()

IoT Rules Engine: Routing Data to AWS Services

The IoT Rules Engine is the data routing layer of IoT Core. Each rule specifies an SQL statement evaluated against inbound MQTT messages, and one or more actions to take when the statement matches. Rules are evaluated against every message that matches the topic filter in the FROM clause — there is no sampling; every matching message triggers the rule.

The SQL syntax supports standard projections, WHERE clauses, functions (mathematical, string, date/time, crypto), and nested JSON access via dot notation and array indexing. You can filter, transform, and enrich messages in the rule itself before routing them downstream.

-- Route high-temperature alerts to SNS
SELECT
  device_id,
  temperature_c,
  humidity_pct,
  timestamp,
  floor() AS floor_number
FROM 'building/+/+/+/temperature'
WHERE temperature_c > 28.0

-- Aggregate and route to Kinesis (pass full payload)
SELECT * FROM 'building/#'

-- Transform units and add derived fields
SELECT
  device_id,
  temperature_c,
  round((temperature_c * 9/5) + 32, 1) AS temperature_f,
  humidity_pct,
  timestamp,
  parse_time("yyyy-MM-dd'T'HH:mm:ss", timestamp()) AS iso_time
FROM 'building/hq/floor3/+/temperature'

-- Route only offline events using clientDisconnected lifecycle topic
SELECT clientId, timestamp, eventType
FROM '$aws/events/presence/disconnected/+'
# Create a complete IoT Rule with multiple actions
aws iot create-topic-rule \
  --rule-name "BuildingTelemetryRouter" \
  --topic-rule-payload '{
    "sql": "SELECT * FROM '"'"'building/#'"'"'",
    "awsIotSqlVersion": "2016-03-23",
    "ruleDisabled": false,
    "actions": [
      {
        "kinesis": {
          "roleArn": "arn:aws:iam::123456789:role/IoTKinesisRole",
          "streamName": "building-telemetry-stream",
          "partitionKey": "${device_id}"
        }
      },
      {
        "dynamoDBv2": {
          "roleArn": "arn:aws:iam::123456789:role/IoTDynamoRole",
          "putItem": {
            "tableName": "SensorReadings"
          }
        }
      }
    ],
    "errorAction": {
      "sqs": {
        "roleArn": "arn:aws:iam::123456789:role/IoTSQSRole",
        "queueUrl": "https://sqs.us-east-1.amazonaws.com/123456789/iot-error-queue",
        "useBase64": false
      }
    }
  }'

# Create high-temperature alert rule
aws iot create-topic-rule \
  --rule-name "HighTemperatureAlert" \
  --topic-rule-payload '{
    "sql": "SELECT device_id, temperature_c, timestamp FROM '"'"'building/+/+/+/temperature'"'"' WHERE temperature_c > 28.0",
    "awsIotSqlVersion": "2016-03-23",
    "ruleDisabled": false,
    "actions": [
      {
        "sns": {
          "roleArn": "arn:aws:iam::123456789:role/IoTSNSRole",
          "targetArn": "arn:aws:sns:us-east-1:123456789:BuildingAlerts",
          "messageFormat": "JSON"
        }
      },
      {
        "lambda": {
          "functionArn": "arn:aws:lambda:us-east-1:123456789:function:ProcessTempAlert"
        }
      }
    ]
  }'
Error Actions: Always configure an errorAction on every rule. If an action fails (e.g., Kinesis is throttling), IoT Core invokes the error action with the original message plus error metadata. Use an SQS queue as the error action so no data is lost. Without an error action, failed deliveries are silently dropped.

Device Shadow: Reported vs Desired State

Device Shadow solves one of the hardest IoT problems: what do you do when a device is offline? The shadow is a JSON document stored in the cloud. It has two state keys — reported (what the device last told us its state is) and desired (what the cloud application wants the state to be). When a device reconnects, it retrieves the shadow, calculates the delta (desired minus reported), applies changes, and publishes an updated reported state.

IoT Core automatically computes and publishes the delta document to $aws/things/{thingName}/shadow/update/delta whenever desired and reported differ. The device subscribes to this topic and acts on changes. Named shadows (up to 10 per Thing) allow different subsystems to have isolated state — e.g., one shadow for display settings, another for network configuration.

"""
Device Shadow management using AWS IoT Device SDK v2 for Python.
Demonstrates updating shadow, reading delta, and reconciling state.
"""
import json
import time
import threading
from awsiot import iotshadow, mqtt_connection_builder
from awscrt import io, mqtt

ENDPOINT = "your-endpoint.iot.us-east-1.amazonaws.com"
THING_NAME = "building-sensor-floor3-room12"
CA_PATH = "/certs/AmazonRootCA1.pem"
CERT_PATH = "/certs/device-cert.pem"
KEY_PATH = "/certs/device-private.pem"

# Current device state
device_state = {
    "temperature_c": 22.1,
    "reporting_interval": 30,
    "display_enabled": True,
}

got_delta_event = threading.Event()


def on_shadow_delta_updated(delta):
    """Called when the cloud has a different desired state than reported."""
    print(f"[DELTA] Received delta: {json.dumps(delta.state, indent=2)}")

    if delta.state is None:
        return

    # Apply each desired change
    for key, value in delta.state.items():
        if key in device_state:
            old_val = device_state[key]
            device_state[key] = value
            print(f"[APPLY] {key}: {old_val} → {value}")

    # Report back the new state to clear the delta
    got_delta_event.set()


def on_update_shadow_accepted(response):
    print(f"[SHADOW] Update accepted. Version: {response.version}")


def on_update_shadow_rejected(error):
    print(f"[SHADOW] Update rejected: {error.code} — {error.message}")


def main():
    event_loop_group = io.EventLoopGroup(1)
    host_resolver = io.DefaultHostResolver(event_loop_group)
    client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)

    mqtt_connection = mqtt_connection_builder.mtls_from_path(
        endpoint=ENDPOINT,
        cert_filepath=CERT_PATH,
        pri_key_filepath=KEY_PATH,
        client_bootstrap=client_bootstrap,
        ca_filepath=CA_PATH,
        client_id=THING_NAME,
        clean_session=False,
        keep_alive_secs=30,
    )

    print(f"[SHADOW] Connecting to {ENDPOINT}...")
    connect_future = mqtt_connection.connect()
    connect_future.result()
    print("[SHADOW] Connected.")

    shadow_client = iotshadow.IotShadowClient(mqtt_connection)

    # Subscribe to delta updates
    delta_future, _ = shadow_client.subscribe_to_shadow_delta_updated_events(
        request=iotshadow.ShadowDeltaUpdatedSubscriptionRequest(thing_name=THING_NAME),
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=on_shadow_delta_updated,
    )
    delta_future.result()

    # Subscribe to update results
    update_accepted_future, _ = shadow_client.subscribe_to_update_shadow_accepted(
        request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=THING_NAME),
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=on_update_shadow_accepted,
    )
    update_accepted_future.result()

    # Publish initial reported state
    update_request = iotshadow.UpdateShadowRequest(
        thing_name=THING_NAME,
        state=iotshadow.ShadowState(
            reported={"temperature_c": device_state["temperature_c"],
                      "reporting_interval": device_state["reporting_interval"],
                      "firmware": "2.1.4",
                      "connected": True}
        ),
    )
    publish_future = shadow_client.publish_update_shadow(update_request, mqtt.QoS.AT_LEAST_ONCE)
    publish_future.result()
    print("[SHADOW] Initial state reported.")

    # Main loop: wait for delta events and report updated state
    while True:
        if got_delta_event.wait(timeout=30):
            got_delta_event.clear()
            # Report the updated state to clear the delta
            publish_future = shadow_client.publish_update_shadow(
                iotshadow.UpdateShadowRequest(
                    thing_name=THING_NAME,
                    state=iotshadow.ShadowState(reported=dict(device_state)),
                ),
                mqtt.QoS.AT_LEAST_ONCE,
            )
            publish_future.result()
            print("[SHADOW] Updated reported state after applying delta.")


if __name__ == "__main__":
    main()
# From the cloud side: update desired state for a device
aws iot-data update-thing-shadow \
  --thing-name "building-sensor-floor3-room12" \
  --payload '{"state":{"desired":{"reporting_interval":60,"display_enabled":false}}}' \
  /dev/stdout

# Read the current shadow document
aws iot-data get-thing-shadow \
  --thing-name "building-sensor-floor3-room12" \
  /dev/stdout | python3 -m json.tool

# Delete the shadow (resets state)
aws iot-data delete-thing-shadow \
  --thing-name "building-sensor-floor3-room12"

IoT Jobs: OTA Firmware Updates

IoT Jobs is the managed job execution system for device fleets. Its primary use case is Over-The-Air (OTA) firmware updates, but it supports any device-side task: configuration rollouts, diagnostics, certificate rotation. Jobs are durable — a job persists until every targeted device completes or fails it, regardless of device connectivity.

A Job targets either individual Things, Thing Groups, or a snapshot of Things matching a fleet index query. The job document is any JSON object you define — typically containing an S3 URL for the firmware binary and expected SHA256 hash. Devices download this document, fetch the binary, validate the hash, flash memory, and report completion.

{
  "operation": "firmware-update",
  "version": "2.2.0",
  "firmware_url": "https://your-bucket.s3.amazonaws.com/firmware/sensor-v2.2.0.bin",
  "sha256": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
  "rollback_on_failure": true,
  "max_execution_timeout_seconds": 300
}
# Create the firmware update job targeting all floor-3 sensors
aws iot create-job \
  --job-id "firmware-update-v2.2.0-$(date +%Y%m%d)" \
  --targets "arn:aws:iot:us-east-1:123456789:thinggroup/Building-HQ-Floor3" \
  --document-source "https://your-bucket.s3.amazonaws.com/job-docs/firmware-update-v2.2.0.json" \
  --target-selection SNAPSHOT \
  --job-executions-rollout-config '{
    "maximumPerMinute": 10,
    "exponentialRate": {
      "baseRatePerMinute": 2,
      "incrementFactor": 1.5,
      "rateIncreaseCriteria": {"numberOfSucceededThings": 5}
    }
  }' \
  --abort-config '{
    "criteriaList": [{
      "failureType": "FAILED",
      "action": "CANCEL",
      "thresholdPercentage": 10.0,
      "minNumberOfExecutedThings": 10
    }]
  }' \
  --timeout-config '{"inProgressTimeoutInMinutes": 10}'
"""
Device-side IoT Jobs execution: polls for pending jobs, downloads firmware,
validates hash, simulates flash, reports completion.
"""
import hashlib
import json
import os
import time
import urllib.request
import paho.mqtt.client as mqtt
import ssl

THING_NAME = "building-sensor-floor3-room12"
IOT_ENDPOINT = "your-endpoint.iot.us-east-1.amazonaws.com"

JOBS_NOTIFY_TOPIC = f"$aws/things/{THING_NAME}/jobs/notify-next"
JOBS_GET_TOPIC = f"$aws/things/{THING_NAME}/jobs/start-next"
JOBS_UPDATE_PREFIX = f"$aws/things/{THING_NAME}/jobs"

active_job = {}


def on_message(client, userdata, msg):
    global active_job
    payload = json.loads(msg.payload)
    topic = msg.topic

    if "notify-next" in topic or "start-next/accepted" in topic:
        execution = payload.get("execution", {})
        if not execution:
            print("[JOBS] No pending jobs.")
            return

        active_job = execution
        job_id = execution["jobId"]
        job_doc = execution["jobDocument"]
        print(f"[JOBS] Starting job {job_id}: {job_doc.get('operation')}")

        # Mark as IN_PROGRESS
        update_topic = f"{JOBS_UPDATE_PREFIX}/{job_id}/update"
        client.publish(update_topic, json.dumps({
            "status": "IN_PROGRESS",
            "statusDetails": {"step": "downloading"}
        }), qos=1)

        # Download and validate firmware
        fw_url = job_doc.get("firmware_url")
        expected_hash = job_doc.get("sha256")

        try:
            fw_path = f"/tmp/firmware-{job_id}.bin"
            urllib.request.urlretrieve(fw_url, fw_path)

            # Validate SHA256
            sha256 = hashlib.sha256()
            with open(fw_path, "rb") as f:
                sha256.update(f.read())
            actual_hash = sha256.hexdigest()

            if actual_hash != expected_hash:
                raise ValueError(f"Hash mismatch: expected {expected_hash}, got {actual_hash}")

            # Simulate flash operation
            print(f"[JOBS] Flashing firmware {job_doc.get('version')}...")
            time.sleep(5)

            # Report success
            client.publish(update_topic, json.dumps({
                "status": "SUCCEEDED",
                "statusDetails": {
                    "version": job_doc.get("version"),
                    "flashedAt": str(int(time.time()))
                }
            }), qos=1)
            print(f"[JOBS] Job {job_id} SUCCEEDED.")

        except Exception as e:
            print(f"[JOBS] Job {job_id} FAILED: {e}")
            client.publish(update_topic, json.dumps({
                "status": "FAILED",
                "statusDetails": {"error": str(e)}
            }), qos=1)


def on_connect(client, userdata, flags, rc):
    client.subscribe(JOBS_NOTIFY_TOPIC, qos=1)
    # Request next pending job immediately on connect
    client.publish(JOBS_GET_TOPIC, "{}", qos=1)
    client.subscribe(f"{JOBS_UPDATE_PREFIX}/start-next/accepted", qos=1)

Greengrass v2: Edge Computing and Local Inference

AWS IoT Greengrass v2 extends the cloud to the device itself. Greengrass runs as a daemon on a local gateway device (a Raspberry Pi, industrial PC, or server) and orchestrates components — containerized or native software modules. Components can run local MQTT brokers, stream processing, ML inference, custom business logic, and local Lambda functions. Devices on the local network connect to Greengrass instead of the cloud, reducing latency from hundreds of milliseconds to single-digit milliseconds.

Key Greengrass v2 capabilities:

  • Local MQTT Broker — Devices publish to the Greengrass core; Greengrass batches and forwards to IoT Core, handling reconnection logic.
  • Stream Manager — Buffers high-volume telemetry locally, exports to Kinesis/S3 in optimized batches.
  • ML Inference at the Edge — Run SageMaker Neo-compiled models locally. No cloud round-trip for inference.
  • Local Secrets — Greengrass syncs Secrets Manager secrets locally for offline use.
  • OTA Component Updates — Greengrass handles component versioning and rollback.
# Install Greengrass v2 on a Linux edge device
curl -s https://d2s8p88vqu9w66.cloudfront.net/releases/greengrass-nucleus-latest.zip \
  -o greengrass-nucleus-latest.zip
unzip greengrass-nucleus-latest.zip -d GreengrassInstaller

sudo -E java -Droot="/greengrass/v2" \
  -Dlog.store=FILE \
  -jar ./GreengrassInstaller/lib/Greengrass.jar \
  --aws-region us-east-1 \
  --thing-name BuildingGateway-HQ \
  --thing-group-name Building-HQ-Gateways \
  --component-default-user ggc_user:ggc_group \
  --provision true \
  --setup-system-service true \
  --deploy-dev-tools true
# Greengrass component recipe: local temperature anomaly detector
---
RecipeFormatVersion: "2020-01-25"
ComponentName: com.techoral.TempAnomalyDetector
ComponentVersion: "1.0.0"
ComponentDescription: "Detects temperature anomalies using local ML model"
ComponentPublisher: Techoral

ComponentDependencies:
  aws.greengrass.StreamManager:
    VersionRequirement: ">=2.0.0"

Manifests:
  - Platform:
      os: linux
    Lifecycle:
      Install:
        RequiresPrivilege: false
        Script: pip3 install -r {artifacts:decompressedPath}/requirements.txt
      Run:
        Script: python3 {artifacts:decompressedPath}/anomaly_detector.py
    Artifacts:
      - URI: s3://your-bucket/greengrass/anomaly-detector/v1.0.0/anomaly_detector.zip
        Unarchive: ZIP
      - URI: s3://your-bucket/greengrass/models/temp-anomaly-model.tar.gz
        Unarchive: TAR
"""
Greengrass v2 component: local anomaly detection + stream manager export.
"""
import json
import time
import numpy as np
from stream_manager import StreamManagerClient, ExportDefinition
from stream_manager.data import KinesisConfig, MessageStreamDefinition, StrategyOnFull

STREAM_NAME = "building-telemetry-stream"
KINESIS_STREAM = "building-telemetry-kinesis"

# Simple statistical anomaly detector (Z-score)
class AnomalyDetector:
    def __init__(self, window=50, threshold=3.0):
        self.window = window
        self.threshold = threshold
        self.readings = []

    def is_anomaly(self, value):
        self.readings.append(value)
        if len(self.readings) > self.window:
            self.readings.pop(0)
        if len(self.readings) < 10:
            return False
        mean = np.mean(self.readings)
        std = np.std(self.readings)
        if std == 0:
            return False
        z_score = abs(value - mean) / std
        return z_score > self.threshold


def main():
    detector = AnomalyDetector()

    # Stream Manager client — exports to Kinesis in batches
    sm_client = StreamManagerClient()

    try:
        sm_client.create_message_stream(
            MessageStreamDefinition(
                name=STREAM_NAME,
                strategy_on_full=StrategyOnFull.OverwriteOldestData,
                max_size=268435456,  # 256 MB local buffer
                export_definition=ExportDefinition(
                    kinesis=[KinesisConfig(
                        identifier=f"KinesisExport-{STREAM_NAME}",
                        kinesis_stream_name=KINESIS_STREAM,
                        batch_size=100,
                        batch_interval_millis=5000,
                    )]
                )
            )
        )
    except Exception:
        pass  # Stream already exists

    print("[GREENGRASS] Anomaly detector running.")

    # In production: read from local MQTT broker
    while True:
        reading = {
            "device_id": "local-sensor-001",
            "temperature_c": 22.0 + np.random.normal(0, 0.5),
            "timestamp": int(time.time()),
        }
        if detector.is_anomaly(reading["temperature_c"]):
            reading["anomaly"] = True
            print(f"[ANOMALY] Detected: {reading['temperature_c']}°C")

        sm_client.append_message(STREAM_NAME, json.dumps(reading).encode())
        time.sleep(1)


if __name__ == "__main__":
    main()

Security: Custom Authorizers, Policies, and Audit

IoT Core security is multi-layered. Beyond X.509 certificates, IoT Core supports custom authorizers — Lambda functions that validate any token-based scheme (JWT, API key, OAuth) and return an IAM-style policy dynamically. This lets legacy devices or web clients authenticate without X.509 certificates.

IoT Policy Variables let you write a single policy that is parameterized per connection. Instead of creating one policy per device, use variables like ${iot:Connection.Thing.ThingName} and ${iot:ClientId} to restrict each device to its own topics automatically.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "iot:Connect",
      "Resource": "arn:aws:iot:us-east-1:123456789:client/${iot:Connection.Thing.ThingName}",
      "Condition": {
        "Bool": {"iot:Connection.Thing.IsAttached": "true"}
      }
    },
    {
      "Effect": "Allow",
      "Action": "iot:Publish",
      "Resource": [
        "arn:aws:iot:us-east-1:123456789:topic/building/*/${iot:Connection.Thing.ThingName}/*",
        "arn:aws:iot:us-east-1:123456789:topic/$aws/things/${iot:Connection.Thing.ThingName}/shadow/*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": ["iot:Subscribe", "iot:Receive"],
      "Resource": [
        "arn:aws:iot:us-east-1:123456789:topicfilter/$aws/things/${iot:Connection.Thing.ThingName}/shadow/*",
        "arn:aws:iot:us-east-1:123456789:topicfilter/$aws/things/${iot:Connection.Thing.ThingName}/jobs/*"
      ]
    }
  ]
}
# Create a custom authorizer backed by a Lambda function
aws iot create-authorizer \
  --authorizer-name "JWTAuthorizer" \
  --authorizer-function-arn "arn:aws:lambda:us-east-1:123456789:function:IoTJWTAuthorizer" \
  --signing-disabled \
  --token-key-name "x-auth-token" \
  --status ACTIVE

# Enable IoT Device Defender Audit
aws iot update-account-audit-configuration \
  --audit-notification-target-configurations '{
    "SNS": {
      "targetArn": "arn:aws:sns:us-east-1:123456789:IoTAuditAlerts",
      "roleArn": "arn:aws:iam::123456789:role/IoTAuditRole",
      "enabled": true
    }
  }' \
  --audit-check-configurations '{
    "UNAUTHENTICATED_COGNITO_ROLE_OVERLY_PERMISSIVE_CHECK": {"enabled": true},
    "IOT_POLICY_OVERLY_PERMISSIVE_CHECK": {"enabled": true},
    "CA_CERTIFICATE_EXPIRING_CHECK": {"enabled": true},
    "DEVICE_CERTIFICATE_EXPIRING_CHECK": {"enabled": true},
    "REVOKED_DEVICE_CERTIFICATE_STILL_ACTIVE_CHECK": {"enabled": true},
    "LOGGING_DISABLED_CHECK": {"enabled": true}
  }'

# Schedule a daily audit
aws iot create-scheduled-audit \
  --scheduled-audit-name "DailySecurityAudit" \
  --frequency DAILY \
  --target-check-names IOT_POLICY_OVERLY_PERMISSIVE_CHECK DEVICE_CERTIFICATE_EXPIRING_CHECK
Critical Security Rule: Never use wildcard topics (iot:Publish on arn:*) in IoT policies. Device compromise would allow a single rogue device to publish to all topics — including other devices' shadow topics. Always scope policies to the specific Thing name using policy variables and the iot:Connection.Thing.IsAttached condition to prevent client ID spoofing.

Fleet Indexing and Thing Queries

Fleet Indexing enables SQL-like queries across your entire Thing registry — including Thing attributes, shadow state, and connectivity status. Without fleet indexing, finding "all sensors on floor 3 with firmware older than 2.1.0 that are currently offline" would require iterating the full registry programmatically. With fleet indexing, it's a single query.

Fleet Indexing indexes three data sources: registry data (Thing attributes and groups), shadow data (reported and desired state values), and connectivity data (connected/disconnected, last connected time, disconnect reason). You pay $0.025 per indexed device per month for registry + connectivity; shadow indexing adds $0.01 per device per month.

# Enable fleet indexing with all data sources
aws iot update-indexing-configuration \
  --thing-indexing-configuration '{
    "thingIndexingMode": "REGISTRY_AND_SHADOW",
    "thingConnectivityIndexingMode": "STATUS",
    "namedShadowIndexingMode": "OFF",
    "managedFields": [
      {"name": "connectivity.connected", "type": "Boolean"},
      {"name": "connectivity.disconnectReason", "type": "String"},
      {"name": "shadow.reported.firmware", "type": "String"},
      {"name": "shadow.reported.temperature_c", "type": "Number"},
      {"name": "attributes.building", "type": "String"},
      {"name": "attributes.floor", "type": "String"}
    ]
  }'

# Query: all devices currently offline on floor 3
aws iot search-index \
  --index-name "AWS_Things" \
  --query-string "attributes.floor:3 AND connectivity.connected:false"

# Query: all devices with old firmware and high temperature
aws iot search-index \
  --index-name "AWS_Things" \
  --query-string 'shadow.reported.firmware:[* TO "2.1.0"] AND shadow.reported.temperature_c:[28 TO *]'

# Count devices by floor
aws iot search-index \
  --index-name "AWS_Things" \
  --query-string "attributes.building:HQ" \
  --aggregation-field "attributes.floor" \
  --query-version "2017-09-30"
Fleet Indexing Latency: Fleet indexing is eventually consistent — shadow state changes appear in the index within seconds (typically 1–5s). Connectivity status is updated near-real-time on connect/disconnect. Do not use fleet indexing for strict real-time decisions; use IoT Rules Engine lifecycle events instead.

Real-World Architecture: Smart Building Sensor Pipeline

Let's put everything together in a production-grade smart building IoT pipeline. The scenario: 5,000 temperature/humidity sensors across 10 floors of a commercial office building, publishing readings every 30 seconds. Requirements: real-time anomaly alerts, 2-year historical data retention, live dashboard for facilities management, OTA firmware update capability.

Architecture Flow:

  1. Devices → MQTT over TLS to IoT Core Device Gateway (X.509 mutual auth)
  2. IoT Core Rules Engine → routes all telemetry to Amazon Kinesis Data Streams (2 shards, 5k records/sec capacity)
  3. High-temp rule → triggers Lambda directly for immediate SNS alert (SMS/email to facilities team)
  4. Kinesis Data Firehose → buffers 60-second windows → writes to S3 in Parquet format (Snappy compressed)
  5. Lambda (Kinesis trigger) → validates, enriches (adds building metadata from DynamoDB lookup), writes to DynamoDB time-series table
  6. Amazon Timestream → connected to Kinesis via Lambda for time-series analytics
  7. Amazon QuickSight → live dashboard queries Timestream for current readings; Athena on S3 for historical reports
  8. IoT Jobs → fleet-wide firmware rollout (exponential rollout, auto-abort on 10% failure rate)
  9. Device Shadow → cloud-side reporting interval control (facilities team can slow down sensors during off-hours)
"""
Lambda function: Kinesis trigger → validate → enrich → write to DynamoDB + Timestream
"""
import base64
import boto3
import json
import time
from datetime import datetime, timezone

dynamodb = boto3.resource('dynamodb')
timestream = boto3.client('timestream-write')
sensor_table = dynamodb.Table('SensorReadings')
metadata_table = dynamodb.Table('DeviceMetadata')

# Cache device metadata to reduce DynamoDB lookups
_metadata_cache = {}

def get_device_metadata(device_id):
    if device_id not in _metadata_cache:
        resp = metadata_table.get_item(Key={'device_id': device_id})
        _metadata_cache[device_id] = resp.get('Item', {})
    return _metadata_cache[device_id]


def handler(event, kinesis_context):
    records_to_write = []
    timestream_records = []

    for record in event['Records']:
        try:
            payload = json.loads(base64.b64decode(record['kinesis']['data']))
        except Exception as e:
            print(f"[ERROR] Failed to decode record: {e}")
            continue

        device_id = payload.get('device_id')
        if not device_id:
            continue

        meta = get_device_metadata(device_id)

        # Enrich with building metadata
        enriched = {
            **payload,
            'building': meta.get('building', 'unknown'),
            'floor': meta.get('floor', 'unknown'),
            'room': meta.get('room', 'unknown'),
            'ingested_at': int(time.time()),
            'partition_date': datetime.fromtimestamp(
                payload.get('timestamp', time.time()), tz=timezone.utc
            ).strftime('%Y-%m-%d'),
        }

        records_to_write.append(enriched)

        # Timestream record
        timestream_records.append({
            'Dimensions': [
                {'Name': 'device_id', 'Value': device_id},
                {'Name': 'building', 'Value': enriched['building']},
                {'Name': 'floor', 'Value': str(enriched['floor'])},
            ],
            'MeasureName': 'temperature_c',
            'MeasureValue': str(payload.get('temperature_c', 0)),
            'MeasureValueType': 'DOUBLE',
            'Time': str(payload.get('timestamp', int(time.time())) * 1000),
            'TimeUnit': 'MILLISECONDS',
        })

    # Batch write to DynamoDB
    if records_to_write:
        with sensor_table.batch_writer() as batch:
            for item in records_to_write:
                batch.put_item(Item={
                    'device_id': item['device_id'],
                    'timestamp': item['timestamp'],
                    'temperature_c': str(item['temperature_c']),
                    'humidity_pct': str(item.get('humidity_pct', 0)),
                    'floor': item['floor'],
                    'building': item['building'],
                    'ttl': int(time.time()) + (2 * 365 * 24 * 3600),  # 2-year TTL
                })

    # Write to Timestream in batches of 100
    for i in range(0, len(timestream_records), 100):
        batch = timestream_records[i:i+100]
        try:
            timestream.write_records(
                DatabaseName='building-iot',
                TableName='sensor-readings',
                Records=batch,
            )
        except timestream.exceptions.RejectedRecordsException as e:
            print(f"[TIMESTREAM] Rejected {len(e.response['RejectedRecords'])} records")

    print(f"[PIPELINE] Processed {len(records_to_write)} records.")
    return {'statusCode': 200}
# CloudFormation snippet: DynamoDB table for sensor readings
# (deploy with CDK or SAM in production)
aws dynamodb create-table \
  --table-name SensorReadings \
  --attribute-definitions \
    AttributeName=device_id,AttributeType=S \
    AttributeName=timestamp,AttributeType=N \
  --key-schema \
    AttributeName=device_id,KeyType=HASH \
    AttributeName=timestamp,KeyType=RANGE \
  --billing-mode PAY_PER_REQUEST \
  --time-to-live-specification Enabled=true,AttributeName=ttl \
  --stream-specification StreamEnabled=false

# Create Timestream database and table
aws timestream-write create-database \
  --database-name building-iot

aws timestream-write create-table \
  --database-name building-iot \
  --table-name sensor-readings \
  --retention-properties '{
    "MemoryStoreRetentionPeriodInHours": 24,
    "MagneticStoreRetentionPeriodInDays": 730
  }'
Cost Estimate for 5,000 Sensors at 30s Intervals: Messages per month = 5,000 × 2 × 60 × 24 × 30 = 432 million messages. At $1/million 5KB chunks: ~$432/month for messaging. Kinesis (2 shards): $29/month. DynamoDB (on-demand, ~14.4 billion RCU): estimate $50-100/month. Timestream: ~$30/month. Total: approximately $600-700/month — less than $0.15 per device per month.

Frequently Asked Questions

What is the difference between IoT Core and IoT Greengrass?

IoT Core is the cloud-side managed service — it runs entirely in AWS and handles device connections, message routing, device shadows, and job management. Greengrass v2 is software that runs on your local hardware (a gateway device). Greengrass devices connect to each other locally with sub-millisecond latency, perform local processing, and sync with IoT Core in the cloud. Use IoT Core for cloud connectivity; add Greengrass when you need offline capability, local inference, or ultra-low-latency local control loops.

Can IoT Core handle devices that connect over cellular (LTE-M, NB-IoT)?

Yes. Any device that can establish a TCP connection can use IoT Core. Cellular-connected devices typically use MQTT over TLS on port 8883 or MQTT over WebSocket on port 443 (to traverse carrier firewalls). For extremely constrained cellular devices, use MQTT QoS 0 to minimize retransmission overhead, set MQTT keep-alive to 1,200 seconds to reduce control packet frequency, and use batch messages (multiple readings in one publish) to amortize connection overhead.

How does IoT Core handle device disconnections and reconnections?

IoT Core tracks connection state via the Device Gateway. When a device disconnects, IoT Core publishes a lifecycle event to $aws/events/presence/disconnected/{clientId}. The Device Shadow's reported state remains in place from the last update — it does not clear on disconnect. When the device reconnects, it should call GET on its shadow to retrieve any delta that accumulated while offline and apply pending desired state changes. Use MQTT cleanSession=false (persistent sessions) to receive QoS 1 messages that arrived while the device was offline, up to a 1-hour session expiry.

What's the maximum throughput per IoT Core account?

Default limits: 20,000 publish requests/second per account, 2,000 shadow operations/second, 500 MQTT connections/second. These are soft limits — open an AWS support case to increase them. For very large deployments (10M+ devices), IoT Core uses partitioned endpoints: provision multiple accounts and use a fleet management layer to distribute devices across accounts, or use the IoT Core multi-account configuration.

How do I migrate from a third-party MQTT broker to IoT Core?

The MQTT protocol is standard, so the device firmware changes are minimal — update the broker endpoint, port, and authentication method (add X.509 certificate loading). For the infrastructure side: (1) create Things and certificates for all devices (use Fleet Provisioning if devices support it), (2) create IoT Policies matching your existing topic structure, (3) update the endpoint in your device configuration (often a firmware update), (4) set up IoT Rules to replicate any existing broker-side routing. Test with a small cohort using a parallel topic namespace before cutting over production devices.