Python S3 File Handling: Streaming Uploads and Downloads
AWS S3 is the standard object store for files, backups, media, and data lakes. Python's boto3 SDK provides high-level transfer utilities that handle multipart uploads, parallel chunking, retry logic, and progress callbacks automatically. This guide covers streaming large files without loading them into memory, generating presigned URLs for client-side access, using aioboto3 for async FastAPI code, and querying CSV/JSON files in-place with S3 Select.
Table of Contents
Installation and Client Setup
pip install boto3 aioboto3
import boto3
from botocore.config import Config
import os
# boto3 reads credentials from env vars, ~/.aws/credentials, or IAM role automatically
s3 = boto3.client(
"s3",
region_name=os.environ.get("AWS_REGION", "ap-south-1"),
config=Config(
retries={"max_attempts": 3, "mode": "adaptive"},
max_pool_connections=50,
),
)
# Resource API (higher-level)
s3_resource = boto3.resource("s3")
bucket = s3_resource.Bucket("my-bucket")
# Check connection
response = s3.list_buckets()
print([b["Name"] for b in response["Buckets"]])
AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY), ~/.aws/credentials, or the EC2/ECS/Lambda instance IAM role. In production, use IAM roles — no keys needed.
Uploading Files and Streams
import boto3
from boto3.s3.transfer import TransferConfig
import io
s3 = boto3.client("s3")
BUCKET = "my-app-bucket"
# Upload a file from disk
s3.upload_file(
Filename="/tmp/report.pdf",
Bucket=BUCKET,
Key="reports/2026/june/report.pdf",
ExtraArgs={
"ContentType": "application/pdf",
"ServerSideEncryption": "AES256",
"Metadata": {"generated-by": "report-service", "version": "2.1"},
},
)
# Upload from an in-memory buffer (no temp file needed)
csv_data = "id,name,amount\n1,Alice,100.00\n2,Bob,200.00\n".encode()
s3.put_object(
Bucket=BUCKET,
Key="exports/users.csv",
Body=csv_data,
ContentType="text/csv",
)
# Upload with progress callback
def progress_callback(bytes_transferred):
print(f"\rUploaded: {bytes_transferred:,} bytes", end="", flush=True)
s3.upload_file(
"/tmp/large-video.mp4",
BUCKET,
"media/video.mp4",
Callback=progress_callback,
)
print() # newline after progress
Downloading and Streaming
import boto3
import io
s3 = boto3.client("s3")
BUCKET = "my-app-bucket"
# Download to disk
s3.download_file(BUCKET, "reports/2026/june/report.pdf", "/tmp/report.pdf")
# Download to memory buffer
buffer = io.BytesIO()
s3.download_fileobj(BUCKET, "exports/users.csv", buffer)
buffer.seek(0)
content = buffer.read().decode("utf-8")
print(content)
# Stream large file chunk by chunk (never loads full file into memory)
def stream_s3_file(bucket: str, key: str, chunk_size: int = 65536):
response = s3.get_object(Bucket=bucket, Key=key)
body = response["Body"]
while True:
chunk = body.read(chunk_size)
if not chunk:
break
yield chunk
# Process a large CSV without downloading it fully
import csv
import codecs
def process_large_csv(bucket: str, key: str):
response = s3.get_object(Bucket=bucket, Key=key)
lines = codecs.getreader("utf-8")(response["Body"])
reader = csv.DictReader(lines)
for row in reader:
yield row
for record in process_large_csv(BUCKET, "data/large-export.csv"):
print(record["id"], record["name"])
Multipart Upload for Large Files
boto3's upload_file and upload_fileobj automatically use multipart upload for files larger than the multipart_threshold (default 8 MB). You can tune chunk size, concurrency, and threshold via TransferConfig.
from boto3.s3.transfer import TransferConfig
import boto3
s3 = boto3.client("s3")
config = TransferConfig(
multipart_threshold=1024 * 25, # 25 MB — use multipart above this
max_concurrency=10, # parallel threads for upload chunks
multipart_chunksize=1024 * 25, # 25 MB per chunk
use_threads=True,
)
# Automatic multipart for large files
s3.upload_file(
"/data/10gb-dataset.parquet",
"my-bucket",
"datasets/2026/data.parquet",
Config=config,
)
# Manual multipart control (for streams without a known size)
def multipart_upload_stream(stream, bucket: str, key: str, chunk_size: int = 25 * 1024 * 1024):
mpu = s3.create_multipart_upload(Bucket=bucket, Key=key)
upload_id = mpu["UploadId"]
parts = []
part_number = 1
try:
while True:
chunk = stream.read(chunk_size)
if not chunk:
break
part = s3.upload_part(
Bucket=bucket, Key=key, UploadId=upload_id,
PartNumber=part_number, Body=chunk,
)
parts.append({"ETag": part["ETag"], "PartNumber": part_number})
part_number += 1
s3.complete_multipart_upload(
Bucket=bucket, Key=key, UploadId=upload_id,
MultipartUpload={"Parts": parts},
)
print(f"Uploaded {part_number - 1} parts to s3://{bucket}/{key}")
except Exception:
s3.abort_multipart_upload(Bucket=bucket, Key=key, UploadId=upload_id)
raise
Presigned URLs
Presigned URLs grant temporary, scoped access to S3 objects without requiring the caller to have AWS credentials. Use them to let browser clients upload directly to S3 (bypassing your server) or to share private files for a limited time.
import boto3
from datetime import timedelta
s3 = boto3.client("s3", region_name="ap-south-1")
# Presigned GET — download a private file
download_url = s3.generate_presigned_url(
"get_object",
Params={"Bucket": "my-bucket", "Key": "reports/confidential.pdf"},
ExpiresIn=3600, # 1 hour
)
print(f"Download link (valid 1h): {download_url}")
# Presigned PUT — browser uploads directly to S3
upload_url = s3.generate_presigned_url(
"put_object",
Params={
"Bucket": "my-bucket",
"Key": "uploads/user-42/avatar.jpg",
"ContentType": "image/jpeg",
},
ExpiresIn=300, # 5 minutes
)
print(f"Upload URL: {upload_url}")
# Presigned POST — more flexible, supports size limits and form fields
presigned_post = s3.generate_presigned_post(
"my-bucket",
"uploads/${filename}",
Fields={"Content-Type": "image/jpeg"},
Conditions=[
{"Content-Type": "image/jpeg"},
["content-length-range", 1, 5 * 1024 * 1024], # max 5 MB
],
ExpiresIn=300,
)
# Return to browser: presigned_post["url"] + presigned_post["fields"]
# FastAPI endpoint to issue presigned upload URL
from fastapi import FastAPI
app = FastAPI()
@app.post("/upload-url")
async def get_upload_url(filename: str, content_type: str):
key = f"uploads/{filename}"
url = s3.generate_presigned_url(
"put_object",
Params={"Bucket": "my-bucket", "Key": key, "ContentType": content_type},
ExpiresIn=300,
)
return {"upload_url": url, "key": key}
S3 Select: Query Without Downloading
S3 Select runs a SQL query server-side on CSV, JSON, or Parquet files stored in S3. It returns only the matching rows, reducing data transfer by up to 98% for selective queries on large files.
import boto3
import json
s3 = boto3.client("s3", region_name="ap-south-1")
# Query CSV file in S3 — only download matching rows
def s3_select_csv(bucket: str, key: str, sql: str) -> list[dict]:
response = s3.select_object_content(
Bucket=bucket,
Key=key,
ExpressionType="SQL",
Expression=sql,
InputSerialization={
"CSV": {"FileHeaderInfo": "USE", "RecordDelimiter": "\n", "FieldDelimiter": ","},
"CompressionType": "NONE",
},
OutputSerialization={"JSON": {"RecordDelimiter": "\n"}},
)
records = []
for event in response["Payload"]:
if "Records" in event:
payload = event["Records"]["Payload"].decode("utf-8")
for line in payload.strip().split("\n"):
if line:
records.append(json.loads(line))
return records
# Example: query orders CSV for high-value orders without downloading 10GB
results = s3_select_csv(
"my-data-bucket",
"orders/2026/orders.csv",
"SELECT order_id, customer_id, amount FROM S3Object WHERE CAST(amount AS FLOAT) > 1000.00",
)
print(f"Found {len(results)} high-value orders")
for r in results[:5]:
print(r)
Async S3 with aioboto3
import aioboto3
import asyncio
import io
from contextlib import asynccontextmanager
from fastapi import FastAPI, UploadFile
session = aioboto3.Session()
@asynccontextmanager
async def lifespan(app: FastAPI):
yield
app = FastAPI(lifespan=lifespan)
@app.post("/upload")
async def upload_file(file: UploadFile, folder: str = "uploads"):
key = f"{folder}/{file.filename}"
async with session.client("s3", region_name="ap-south-1") as s3:
await s3.upload_fileobj(
file,
"my-bucket",
key,
ExtraArgs={"ContentType": file.content_type},
)
return {"key": key, "bucket": "my-bucket"}
async def download_to_memory(bucket: str, key: str) -> bytes:
async with session.client("s3") as s3:
buffer = io.BytesIO()
await s3.download_fileobj(bucket, key, buffer)
return buffer.getvalue()
async def list_files(bucket: str, prefix: str) -> list[str]:
async with session.client("s3") as s3:
paginator = s3.get_paginator("list_objects_v2")
keys = []
async for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []):
keys.append(obj["Key"])
return keys
Frequently Asked Questions
- What is the maximum file size for S3 upload?
- S3 supports objects up to 5 TB. Single PUT operations are limited to 5 GB. For files larger than 5 GB you must use multipart upload. boto3's
upload_fileautomatically switches to multipart for files over themultipart_threshold(default 8 MB). - How do I handle S3 upload errors and retries?
- boto3 retries transient errors (throttling, connection resets) automatically via the retry config. For multipart uploads, always wrap in try/except and call
abort_multipart_upload()on failure — incomplete multipart uploads incur storage charges. Enable S3 lifecycle rules to auto-abort incomplete uploads after N days. - How do I make S3 files publicly accessible?
- Never set bucket-level public access. Instead, use presigned URLs for temporary access, or CloudFront as a CDN in front of a private bucket. If you need truly public static assets, enable public access on a specific path via bucket policy but block public access at the account level for everything else.