Boto3: AWS SDK for Python — Complete Guide (2026)
Published June 6, 2026 • 15 min read
Boto3 is the official AWS SDK for Python. It lets you interact with virtually every AWS service from Python code — S3 buckets, EC2 instances, Lambda functions, DynamoDB tables, SQS queues, and hundreds more. This guide covers the practical patterns you'll use every day: the client vs. resource distinction, credential management, the most-used services, paginators for large datasets, and proper error handling.
Installation and Credential Chain
pip install boto3 botocore
Boto3 resolves credentials in this order — use the highest applicable method for your environment:
- Explicit parameters (
aws_access_key_id=) — only for local testing, never in source code - Environment variables:
AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,AWS_SESSION_TOKEN ~/.aws/credentialsfile (aws configure)- AWS config file (
~/.aws/config) - EC2 instance profile / ECS task role / Lambda execution role
import boto3
# Default session — uses credential chain
s3 = boto3.client('s3', region_name='us-east-1')
# Named profile (local dev with multiple accounts)
session = boto3.Session(profile_name='dev-account', region_name='us-west-2')
s3 = session.client('s3')
# Assume a role (cross-account access)
sts = boto3.client('sts')
creds = sts.assume_role(
RoleArn='arn:aws:iam::123456789012:role/CrossAccountRole',
RoleSessionName='my-session',
)['Credentials']
cross_account_s3 = boto3.client(
's3',
aws_access_key_id = creds['AccessKeyId'],
aws_secret_access_key = creds['SecretAccessKey'],
aws_session_token = creds['SessionToken'],
)
Note: On EC2, ECS, and Lambda, always use IAM roles — never hardcode credentials. The instance/task role is automatically refreshed by the metadata service. Hardcoded keys in code are the most common source of AWS account compromises.
Client vs Resource
| Aspect | Client | Resource |
|---|---|---|
| Interface | Low-level, maps 1:1 to AWS API calls | High-level, object-oriented wrapper |
| Return type | Raw dicts (response['Body']) | Python objects (bucket.objects) |
| Coverage | All services | S3, EC2, DynamoDB, IAM, SQS, SNS, Glacier |
| Pagination | Manual or via paginator | Often built-in (e.g., .all()) |
| When to use | Precise control, services without Resource | Simpler code for supported services |
import boto3
# Client approach — explicit, verbose
client = boto3.client('s3')
response = client.list_objects_v2(Bucket='my-bucket', Prefix='images/')
for obj in response.get('Contents', []):
print(obj['Key'], obj['Size'])
# Resource approach — more Pythonic
s3 = boto3.resource('s3')
bucket = s3.Bucket('my-bucket')
for obj in bucket.objects.filter(Prefix='images/'):
print(obj.key, obj.size)
Pro Tip: The Resource API is being maintained but not extended to new services. For new code, use the Client API with paginators — it gives you the same level of control and works with every AWS service.
S3 — Upload, Download, Presigned URLs
import boto3
from botocore.exceptions import ClientError
import os
s3 = boto3.client('s3', region_name='us-east-1')
BUCKET = 'my-app-bucket'
# Upload a file
def upload_file(local_path: str, s3_key: str, content_type: str = 'application/octet-stream') -> str:
s3.upload_file(
Filename = local_path,
Bucket = BUCKET,
Key = s3_key,
ExtraArgs = {
'ContentType': content_type,
'ServerSideEncryption': 'AES256',
},
)
return f's3://{BUCKET}/{s3_key}'
# Upload from an in-memory buffer
import io
def upload_bytes(data: bytes, s3_key: str, content_type: str) -> None:
s3.put_object(Bucket=BUCKET, Key=s3_key, Body=data,
ContentType=content_type, ServerSideEncryption='AES256')
# Download a file
def download_file(s3_key: str, local_path: str) -> None:
s3.download_file(Bucket=BUCKET, Key=s3_key, Filename=local_path)
# Download to memory
def read_s3_file(s3_key: str) -> bytes:
response = s3.get_object(Bucket=BUCKET, Key=s3_key)
return response['Body'].read()
# Generate a presigned URL (time-limited, no auth required)
def create_presigned_url(s3_key: str, expiry_seconds: int = 3600) -> str:
return s3.generate_presigned_url(
'get_object',
Params={'Bucket': BUCKET, 'Key': s3_key},
ExpiresIn=expiry_seconds,
)
# Presigned POST for direct browser-to-S3 uploads
def create_presigned_post(s3_key: str, max_size_mb: int = 10) -> dict:
return s3.generate_presigned_post(
Bucket = BUCKET,
Key = s3_key,
Conditions = [
['content-length-range', 1, max_size_mb * 1024 * 1024],
['starts-with', '$Content-Type', 'image/'],
],
ExpiresIn = 600,
)
EC2 — Start, Stop, Describe
import boto3
from typing import list
ec2 = boto3.client('ec2', region_name='us-east-1')
def describe_instances(tag_name: str = None) -> list[dict]:
filters = []
if tag_name:
filters.append({'Name': 'tag:Name', 'Values': [tag_name]})
filters.append({'Name': 'instance-state-name', 'Values': ['running', 'stopped']})
response = ec2.describe_instances(Filters=filters)
instances = []
for reservation in response['Reservations']:
for inst in reservation['Instances']:
name = next((t['Value'] for t in inst.get('Tags', []) if t['Key'] == 'Name'), 'N/A')
instances.append({
'id' : inst['InstanceId'],
'type' : inst['InstanceType'],
'state' : inst['State']['Name'],
'name' : name,
'az' : inst['Placement']['AvailabilityZone'],
})
return instances
def start_instances(instance_ids: list[str]) -> dict:
response = ec2.start_instances(InstanceIds=instance_ids)
return {i['InstanceId']: i['CurrentState']['Name']
for i in response['StartingInstances']}
def stop_instances(instance_ids: list[str]) -> dict:
response = ec2.stop_instances(InstanceIds=instance_ids)
return {i['InstanceId']: i['CurrentState']['Name']
for i in response['StoppingInstances']}
def wait_until_running(instance_id: str) -> None:
waiter = ec2.get_waiter('instance_running')
waiter.wait(InstanceIds=[instance_id],
WaiterConfig={'Delay': 5, 'MaxAttempts': 40})
Lambda — Synchronous and Async Invocation
import boto3
import json
lambda_client = boto3.client('lambda', region_name='us-east-1')
def invoke_sync(function_name: str, payload: dict) -> dict:
"""Synchronous invocation — waits for the function to complete."""
response = lambda_client.invoke(
FunctionName = function_name,
InvocationType = 'RequestResponse', # synchronous
Payload = json.dumps(payload).encode(),
)
body = json.loads(response['Payload'].read())
if response.get('FunctionError'):
raise RuntimeError(f"Lambda error: {body.get('errorMessage')}")
return body
def invoke_async(function_name: str, payload: dict) -> str:
"""Async invocation — fire and forget, returns status code 202."""
response = lambda_client.invoke(
FunctionName = function_name,
InvocationType = 'Event', # asynchronous
Payload = json.dumps(payload).encode(),
)
return response['StatusCode'] # 202 = accepted
# Example usage
result = invoke_sync('process-image', {'bucket': 'uploads', 'key': 'photo.jpg'})
print(result) # {'statusCode': 200, 'body': '{"processed": true}'}
DynamoDB — CRUD Operations
import boto3
from boto3.dynamodb.conditions import Key, Attr
from decimal import Decimal
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
table = dynamodb.Table('Users')
# Create
table.put_item(Item={
'pk' : 'USER#alice',
'sk' : 'PROFILE',
'username' : 'alice',
'email' : 'alice@example.com',
'score' : Decimal('42.5'),
'active' : True,
})
# Read
response = table.get_item(Key={'pk': 'USER#alice', 'sk': 'PROFILE'})
user = response.get('Item')
# Query (uses partition key)
response = table.query(
KeyConditionExpression=Key('pk').eq('USER#alice') & Key('sk').begins_with('POST#')
)
posts = response['Items']
# Update
table.update_item(
Key={'pk': 'USER#alice', 'sk': 'PROFILE'},
UpdateExpression='SET score = score + :inc, #st = :status',
ExpressionAttributeValues={':inc': Decimal('1'), ':status': 'premium'},
ExpressionAttributeNames={'#st': 'status'}, # 'status' is a reserved word
ReturnValues='UPDATED_NEW',
)
# Delete
table.delete_item(Key={'pk': 'USER#alice', 'sk': 'PROFILE'})
# Conditional write (optimistic locking)
try:
table.put_item(
Item={'pk': 'USER#bob', 'sk': 'PROFILE', 'email': 'bob@example.com'},
ConditionExpression=Attr('pk').not_exists(), # only if doesn't exist
)
except dynamodb.meta.client.exceptions.ConditionalCheckFailedException:
print("User already exists")
SQS — Send, Receive, Delete
import boto3
import json
sqs = boto3.client('sqs', region_name='us-east-1')
QUEUE_URL = 'https://sqs.us-east-1.amazonaws.com/123456789012/my-queue'
# Send a message
def send_message(body: dict, delay_seconds: int = 0) -> str:
response = sqs.send_message(
QueueUrl = QUEUE_URL,
MessageBody = json.dumps(body),
DelaySeconds = delay_seconds,
MessageAttributes={
'source': {'DataType': 'String', 'StringValue': 'web-api'},
},
)
return response['MessageId']
# Send a batch (up to 10 messages per call)
def send_batch(messages: list[dict]) -> None:
entries = [
{'Id': str(i), 'MessageBody': json.dumps(msg)}
for i, msg in enumerate(messages)
]
sqs.send_message_batch(QueueUrl=QUEUE_URL, Entries=entries)
# Receive and process messages
def process_messages(max_count: int = 10) -> int:
response = sqs.receive_message(
QueueUrl = QUEUE_URL,
MaxNumberOfMessages = max_count,
WaitTimeSeconds = 20, # long polling — reduces empty receives
VisibilityTimeout = 60, # hide message for 60s while processing
)
processed = 0
for msg in response.get('Messages', []):
body = json.loads(msg['Body'])
try:
handle_event(body)
# Delete only after successful processing
sqs.delete_message(
QueueUrl = QUEUE_URL,
ReceiptHandle = msg['ReceiptHandle'],
)
processed += 1
except Exception as e:
# Let message become visible again after VisibilityTimeout
print(f"Failed to process {msg['MessageId']}: {e}")
return processed
Secrets Manager
import boto3
import json
from functools import lru_cache
secrets_client = boto3.client('secretsmanager', region_name='us-east-1')
@lru_cache(maxsize=None)
def get_secret(secret_name: str) -> dict:
"""Fetch and cache a secret. Cache is per-process lifetime."""
response = secrets_client.get_secret_value(SecretId=secret_name)
secret = response.get('SecretString') or response.get('SecretBinary')
try:
return json.loads(secret)
except (json.JSONDecodeError, TypeError):
return {'value': secret}
# Usage
db_creds = get_secret('prod/database/postgres')
conn_str = f"postgresql://{db_creds['username']}:{db_creds['password']}@{db_creds['host']}/mydb"
Pro Tip: Cache secrets at startup (or with
@lru_cache) rather than fetching them on every request — Secrets Manager has rate limits and charges per API call. For Lambda, cache in a module-level variable outside the handler function.Paginators for Large Datasets
import boto3
s3 = boto3.client('s3')
def list_all_objects(bucket: str, prefix: str = '') -> list[dict]:
"""list_objects_v2 returns max 1,000 objects. Paginator handles multi-page."""
paginator = s3.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=bucket, Prefix=prefix)
objects = []
for page in pages:
objects.extend(page.get('Contents', []))
return objects
# EC2 describe_instances paginator
ec2 = boto3.client('ec2')
def list_all_instances() -> list[dict]:
paginator = ec2.get_paginator('describe_instances')
instances = []
for page in paginator.paginate():
for reservation in page['Reservations']:
instances.extend(reservation['Instances'])
return instances
# Filter paginators on the fly
def list_large_objects(bucket: str, min_size_mb: int = 100) -> list[dict]:
paginator = s3.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=bucket, PaginationConfig={'PageSize': 1000})
return [
obj for page in pages
for obj in page.get('Contents', [])
if obj['Size'] > min_size_mb * 1024 * 1024
]
Error Handling
import boto3
from botocore.exceptions import ClientError, NoCredentialsError, EndpointResolutionError
s3 = boto3.client('s3')
def safe_get_object(bucket: str, key: str) -> bytes | None:
try:
response = s3.get_object(Bucket=bucket, Key=key)
return response['Body'].read()
except ClientError as e:
code = e.response['Error']['Code']
if code == 'NoSuchKey':
return None
elif code == 'NoSuchBucket':
raise ValueError(f"Bucket {bucket!r} does not exist") from e
elif code == 'AccessDenied':
raise PermissionError(f"Access denied to s3://{bucket}/{key}") from e
elif code == 'RequestLimitExceeded':
# Implement exponential backoff here
raise
else:
raise # re-raise unknown errors
except NoCredentialsError:
raise RuntimeError("AWS credentials not configured") from None
# Common S3 error codes
S3_ERROR_CODES = {
'NoSuchKey' : 'Object does not exist',
'NoSuchBucket' : 'Bucket does not exist',
'AccessDenied' : 'Insufficient IAM permissions',
'BucketAlreadyExists': 'Bucket name already taken globally',
'InvalidObjectState': 'Object in Glacier, needs restore',
'SlowDown' : 'Rate limit — back off and retry',
}
Frequently Asked Questions
- How do I use Boto3 with IAM roles in Docker / Kubernetes?
- On ECS, attach a task role via the task definition's
taskRoleArn. On Kubernetes (EKS), use IRSA (IAM Roles for Service Accounts) — annotate the pod's service account with the role ARN. Boto3 picks up the credentials automatically from the container metadata endpoint. Never mount AWS credentials files into containers. - How do I mock Boto3 calls in tests?
- Use the
motolibrary — it intercepts Boto3 calls at the HTTP level and simulates AWS services locally. Decorate your tests with@mock_s3,@mock_dynamodb, etc. It requires no real AWS account and runs fast. Alternatively, useunittest.mock.patchto mock specific client methods. - What is the difference between S3 Transfer Manager and raw put_object?
upload_file()anddownload_file()use the S3 Transfer Manager, which automatically uses multipart upload for files larger than 8 MB, handles retries, and uploads concurrently in multiple parts. For small files or in-memory data,put_object()is simpler. Useput_object()for data under 5 MB andupload_file()/upload_fileobj()for everything else.- How do I reduce DynamoDB costs?
- Use on-demand capacity for unpredictable workloads and provisioned capacity for steady, predictable traffic. Project only the attributes you need in queries with
ProjectionExpression. Use DynamoDB's TTL feature to auto-expire old items instead of running delete operations. Batch operations (batch_write_item) are more cost-efficient than individual writes for bulk loads. - How do I handle SQS message visibility timeouts?
- Set the visibility timeout to at least 6× your expected processing time to avoid duplicate processing. If a long-running task is still in progress when the timeout expires, call
change_message_visibilityto extend it. Failed messages (not deleted within the visibility timeout) become visible again — configure a Dead Letter Queue (DLQ) to capture messages that fail repeatedly.