Boto3: AWS SDK for Python — Complete Guide (2026)

Published June 6, 2026 • 15 min read

Boto3 is the official AWS SDK for Python. It lets you interact with virtually every AWS service from Python code — S3 buckets, EC2 instances, Lambda functions, DynamoDB tables, SQS queues, and hundreds more. This guide covers the practical patterns you'll use every day: the client vs. resource distinction, credential management, the most-used services, paginators for large datasets, and proper error handling.

Installation and Credential Chain

pip install boto3 botocore

Boto3 resolves credentials in this order — use the highest applicable method for your environment:

  1. Explicit parameters (aws_access_key_id=) — only for local testing, never in source code
  2. Environment variables: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN
  3. ~/.aws/credentials file (aws configure)
  4. AWS config file (~/.aws/config)
  5. EC2 instance profile / ECS task role / Lambda execution role
import boto3

# Default session — uses credential chain
s3 = boto3.client('s3', region_name='us-east-1')

# Named profile (local dev with multiple accounts)
session = boto3.Session(profile_name='dev-account', region_name='us-west-2')
s3 = session.client('s3')

# Assume a role (cross-account access)
sts = boto3.client('sts')
creds = sts.assume_role(
    RoleArn='arn:aws:iam::123456789012:role/CrossAccountRole',
    RoleSessionName='my-session',
)['Credentials']

cross_account_s3 = boto3.client(
    's3',
    aws_access_key_id     = creds['AccessKeyId'],
    aws_secret_access_key = creds['SecretAccessKey'],
    aws_session_token     = creds['SessionToken'],
)
Note: On EC2, ECS, and Lambda, always use IAM roles — never hardcode credentials. The instance/task role is automatically refreshed by the metadata service. Hardcoded keys in code are the most common source of AWS account compromises.

Client vs Resource

AspectClientResource
InterfaceLow-level, maps 1:1 to AWS API callsHigh-level, object-oriented wrapper
Return typeRaw dicts (response['Body'])Python objects (bucket.objects)
CoverageAll servicesS3, EC2, DynamoDB, IAM, SQS, SNS, Glacier
PaginationManual or via paginatorOften built-in (e.g., .all())
When to usePrecise control, services without ResourceSimpler code for supported services
import boto3

# Client approach — explicit, verbose
client = boto3.client('s3')
response = client.list_objects_v2(Bucket='my-bucket', Prefix='images/')
for obj in response.get('Contents', []):
    print(obj['Key'], obj['Size'])

# Resource approach — more Pythonic
s3 = boto3.resource('s3')
bucket = s3.Bucket('my-bucket')
for obj in bucket.objects.filter(Prefix='images/'):
    print(obj.key, obj.size)
Pro Tip: The Resource API is being maintained but not extended to new services. For new code, use the Client API with paginators — it gives you the same level of control and works with every AWS service.

S3 — Upload, Download, Presigned URLs

import boto3
from botocore.exceptions import ClientError
import os

s3 = boto3.client('s3', region_name='us-east-1')
BUCKET = 'my-app-bucket'

# Upload a file
def upload_file(local_path: str, s3_key: str, content_type: str = 'application/octet-stream') -> str:
    s3.upload_file(
        Filename     = local_path,
        Bucket       = BUCKET,
        Key          = s3_key,
        ExtraArgs    = {
            'ContentType': content_type,
            'ServerSideEncryption': 'AES256',
        },
    )
    return f's3://{BUCKET}/{s3_key}'

# Upload from an in-memory buffer
import io
def upload_bytes(data: bytes, s3_key: str, content_type: str) -> None:
    s3.put_object(Bucket=BUCKET, Key=s3_key, Body=data,
                  ContentType=content_type, ServerSideEncryption='AES256')

# Download a file
def download_file(s3_key: str, local_path: str) -> None:
    s3.download_file(Bucket=BUCKET, Key=s3_key, Filename=local_path)

# Download to memory
def read_s3_file(s3_key: str) -> bytes:
    response = s3.get_object(Bucket=BUCKET, Key=s3_key)
    return response['Body'].read()

# Generate a presigned URL (time-limited, no auth required)
def create_presigned_url(s3_key: str, expiry_seconds: int = 3600) -> str:
    return s3.generate_presigned_url(
        'get_object',
        Params={'Bucket': BUCKET, 'Key': s3_key},
        ExpiresIn=expiry_seconds,
    )

# Presigned POST for direct browser-to-S3 uploads
def create_presigned_post(s3_key: str, max_size_mb: int = 10) -> dict:
    return s3.generate_presigned_post(
        Bucket     = BUCKET,
        Key        = s3_key,
        Conditions = [
            ['content-length-range', 1, max_size_mb * 1024 * 1024],
            ['starts-with', '$Content-Type', 'image/'],
        ],
        ExpiresIn  = 600,
    )

EC2 — Start, Stop, Describe

import boto3
from typing import list

ec2 = boto3.client('ec2', region_name='us-east-1')

def describe_instances(tag_name: str = None) -> list[dict]:
    filters = []
    if tag_name:
        filters.append({'Name': 'tag:Name', 'Values': [tag_name]})
    filters.append({'Name': 'instance-state-name', 'Values': ['running', 'stopped']})
    response = ec2.describe_instances(Filters=filters)
    instances = []
    for reservation in response['Reservations']:
        for inst in reservation['Instances']:
            name = next((t['Value'] for t in inst.get('Tags', []) if t['Key'] == 'Name'), 'N/A')
            instances.append({
                'id'    : inst['InstanceId'],
                'type'  : inst['InstanceType'],
                'state' : inst['State']['Name'],
                'name'  : name,
                'az'    : inst['Placement']['AvailabilityZone'],
            })
    return instances

def start_instances(instance_ids: list[str]) -> dict:
    response = ec2.start_instances(InstanceIds=instance_ids)
    return {i['InstanceId']: i['CurrentState']['Name']
            for i in response['StartingInstances']}

def stop_instances(instance_ids: list[str]) -> dict:
    response = ec2.stop_instances(InstanceIds=instance_ids)
    return {i['InstanceId']: i['CurrentState']['Name']
            for i in response['StoppingInstances']}

def wait_until_running(instance_id: str) -> None:
    waiter = ec2.get_waiter('instance_running')
    waiter.wait(InstanceIds=[instance_id],
                WaiterConfig={'Delay': 5, 'MaxAttempts': 40})

Lambda — Synchronous and Async Invocation

import boto3
import json

lambda_client = boto3.client('lambda', region_name='us-east-1')

def invoke_sync(function_name: str, payload: dict) -> dict:
    """Synchronous invocation — waits for the function to complete."""
    response = lambda_client.invoke(
        FunctionName   = function_name,
        InvocationType = 'RequestResponse',   # synchronous
        Payload        = json.dumps(payload).encode(),
    )
    body = json.loads(response['Payload'].read())
    if response.get('FunctionError'):
        raise RuntimeError(f"Lambda error: {body.get('errorMessage')}")
    return body

def invoke_async(function_name: str, payload: dict) -> str:
    """Async invocation — fire and forget, returns status code 202."""
    response = lambda_client.invoke(
        FunctionName   = function_name,
        InvocationType = 'Event',    # asynchronous
        Payload        = json.dumps(payload).encode(),
    )
    return response['StatusCode']   # 202 = accepted

# Example usage
result = invoke_sync('process-image', {'bucket': 'uploads', 'key': 'photo.jpg'})
print(result)  # {'statusCode': 200, 'body': '{"processed": true}'}

DynamoDB — CRUD Operations

import boto3
from boto3.dynamodb.conditions import Key, Attr
from decimal import Decimal

dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
table    = dynamodb.Table('Users')

# Create
table.put_item(Item={
    'pk'        : 'USER#alice',
    'sk'        : 'PROFILE',
    'username'  : 'alice',
    'email'     : 'alice@example.com',
    'score'     : Decimal('42.5'),
    'active'    : True,
})

# Read
response = table.get_item(Key={'pk': 'USER#alice', 'sk': 'PROFILE'})
user = response.get('Item')

# Query (uses partition key)
response = table.query(
    KeyConditionExpression=Key('pk').eq('USER#alice') & Key('sk').begins_with('POST#')
)
posts = response['Items']

# Update
table.update_item(
    Key={'pk': 'USER#alice', 'sk': 'PROFILE'},
    UpdateExpression='SET score = score + :inc, #st = :status',
    ExpressionAttributeValues={':inc': Decimal('1'), ':status': 'premium'},
    ExpressionAttributeNames={'#st': 'status'},  # 'status' is a reserved word
    ReturnValues='UPDATED_NEW',
)

# Delete
table.delete_item(Key={'pk': 'USER#alice', 'sk': 'PROFILE'})

# Conditional write (optimistic locking)
try:
    table.put_item(
        Item={'pk': 'USER#bob', 'sk': 'PROFILE', 'email': 'bob@example.com'},
        ConditionExpression=Attr('pk').not_exists(),  # only if doesn't exist
    )
except dynamodb.meta.client.exceptions.ConditionalCheckFailedException:
    print("User already exists")

SQS — Send, Receive, Delete

import boto3
import json

sqs = boto3.client('sqs', region_name='us-east-1')
QUEUE_URL = 'https://sqs.us-east-1.amazonaws.com/123456789012/my-queue'

# Send a message
def send_message(body: dict, delay_seconds: int = 0) -> str:
    response = sqs.send_message(
        QueueUrl     = QUEUE_URL,
        MessageBody  = json.dumps(body),
        DelaySeconds = delay_seconds,
        MessageAttributes={
            'source': {'DataType': 'String', 'StringValue': 'web-api'},
        },
    )
    return response['MessageId']

# Send a batch (up to 10 messages per call)
def send_batch(messages: list[dict]) -> None:
    entries = [
        {'Id': str(i), 'MessageBody': json.dumps(msg)}
        for i, msg in enumerate(messages)
    ]
    sqs.send_message_batch(QueueUrl=QUEUE_URL, Entries=entries)

# Receive and process messages
def process_messages(max_count: int = 10) -> int:
    response = sqs.receive_message(
        QueueUrl            = QUEUE_URL,
        MaxNumberOfMessages = max_count,
        WaitTimeSeconds     = 20,   # long polling — reduces empty receives
        VisibilityTimeout   = 60,   # hide message for 60s while processing
    )
    processed = 0
    for msg in response.get('Messages', []):
        body = json.loads(msg['Body'])
        try:
            handle_event(body)
            # Delete only after successful processing
            sqs.delete_message(
                QueueUrl      = QUEUE_URL,
                ReceiptHandle = msg['ReceiptHandle'],
            )
            processed += 1
        except Exception as e:
            # Let message become visible again after VisibilityTimeout
            print(f"Failed to process {msg['MessageId']}: {e}")
    return processed

Secrets Manager

import boto3
import json
from functools import lru_cache

secrets_client = boto3.client('secretsmanager', region_name='us-east-1')

@lru_cache(maxsize=None)
def get_secret(secret_name: str) -> dict:
    """Fetch and cache a secret. Cache is per-process lifetime."""
    response = secrets_client.get_secret_value(SecretId=secret_name)
    secret = response.get('SecretString') or response.get('SecretBinary')
    try:
        return json.loads(secret)
    except (json.JSONDecodeError, TypeError):
        return {'value': secret}

# Usage
db_creds = get_secret('prod/database/postgres')
conn_str = f"postgresql://{db_creds['username']}:{db_creds['password']}@{db_creds['host']}/mydb"
Pro Tip: Cache secrets at startup (or with @lru_cache) rather than fetching them on every request — Secrets Manager has rate limits and charges per API call. For Lambda, cache in a module-level variable outside the handler function.

Paginators for Large Datasets

import boto3

s3 = boto3.client('s3')

def list_all_objects(bucket: str, prefix: str = '') -> list[dict]:
    """list_objects_v2 returns max 1,000 objects. Paginator handles multi-page."""
    paginator = s3.get_paginator('list_objects_v2')
    pages = paginator.paginate(Bucket=bucket, Prefix=prefix)
    objects = []
    for page in pages:
        objects.extend(page.get('Contents', []))
    return objects

# EC2 describe_instances paginator
ec2 = boto3.client('ec2')
def list_all_instances() -> list[dict]:
    paginator = ec2.get_paginator('describe_instances')
    instances = []
    for page in paginator.paginate():
        for reservation in page['Reservations']:
            instances.extend(reservation['Instances'])
    return instances

# Filter paginators on the fly
def list_large_objects(bucket: str, min_size_mb: int = 100) -> list[dict]:
    paginator = s3.get_paginator('list_objects_v2')
    pages = paginator.paginate(Bucket=bucket, PaginationConfig={'PageSize': 1000})
    return [
        obj for page in pages
        for obj in page.get('Contents', [])
        if obj['Size'] > min_size_mb * 1024 * 1024
    ]

Error Handling

import boto3
from botocore.exceptions import ClientError, NoCredentialsError, EndpointResolutionError

s3 = boto3.client('s3')

def safe_get_object(bucket: str, key: str) -> bytes | None:
    try:
        response = s3.get_object(Bucket=bucket, Key=key)
        return response['Body'].read()
    except ClientError as e:
        code = e.response['Error']['Code']
        if code == 'NoSuchKey':
            return None
        elif code == 'NoSuchBucket':
            raise ValueError(f"Bucket {bucket!r} does not exist") from e
        elif code == 'AccessDenied':
            raise PermissionError(f"Access denied to s3://{bucket}/{key}") from e
        elif code == 'RequestLimitExceeded':
            # Implement exponential backoff here
            raise
        else:
            raise   # re-raise unknown errors
    except NoCredentialsError:
        raise RuntimeError("AWS credentials not configured") from None

# Common S3 error codes
S3_ERROR_CODES = {
    'NoSuchKey'         : 'Object does not exist',
    'NoSuchBucket'      : 'Bucket does not exist',
    'AccessDenied'      : 'Insufficient IAM permissions',
    'BucketAlreadyExists': 'Bucket name already taken globally',
    'InvalidObjectState': 'Object in Glacier, needs restore',
    'SlowDown'          : 'Rate limit — back off and retry',
}

Frequently Asked Questions

How do I use Boto3 with IAM roles in Docker / Kubernetes?
On ECS, attach a task role via the task definition's taskRoleArn. On Kubernetes (EKS), use IRSA (IAM Roles for Service Accounts) — annotate the pod's service account with the role ARN. Boto3 picks up the credentials automatically from the container metadata endpoint. Never mount AWS credentials files into containers.
How do I mock Boto3 calls in tests?
Use the moto library — it intercepts Boto3 calls at the HTTP level and simulates AWS services locally. Decorate your tests with @mock_s3, @mock_dynamodb, etc. It requires no real AWS account and runs fast. Alternatively, use unittest.mock.patch to mock specific client methods.
What is the difference between S3 Transfer Manager and raw put_object?
upload_file() and download_file() use the S3 Transfer Manager, which automatically uses multipart upload for files larger than 8 MB, handles retries, and uploads concurrently in multiple parts. For small files or in-memory data, put_object() is simpler. Use put_object() for data under 5 MB and upload_file()/upload_fileobj() for everything else.
How do I reduce DynamoDB costs?
Use on-demand capacity for unpredictable workloads and provisioned capacity for steady, predictable traffic. Project only the attributes you need in queries with ProjectionExpression. Use DynamoDB's TTL feature to auto-expire old items instead of running delete operations. Batch operations (batch_write_item) are more cost-efficient than individual writes for bulk loads.
How do I handle SQS message visibility timeouts?
Set the visibility timeout to at least 6× your expected processing time to avoid duplicate processing. If a long-running task is still in progress when the timeout expires, call change_message_visibility to extend it. Failed messages (not deleted within the visibility timeout) become visible again — configure a Dead Letter Queue (DLQ) to capture messages that fail repeatedly.