Python S3 File Handling: Streaming Uploads and Downloads

AWS S3 is the standard object store for files, backups, media, and data lakes. Python's boto3 SDK provides high-level transfer utilities that handle multipart uploads, parallel chunking, retry logic, and progress callbacks automatically. This guide covers streaming large files without loading them into memory, generating presigned URLs for client-side access, using aioboto3 for async FastAPI code, and querying CSV/JSON files in-place with S3 Select.

Installation and Client Setup

pip install boto3 aioboto3
import boto3
from botocore.config import Config
import os

# boto3 reads credentials from env vars, ~/.aws/credentials, or IAM role automatically
s3 = boto3.client(
    "s3",
    region_name=os.environ.get("AWS_REGION", "ap-south-1"),
    config=Config(
        retries={"max_attempts": 3, "mode": "adaptive"},
        max_pool_connections=50,
    ),
)

# Resource API (higher-level)
s3_resource = boto3.resource("s3")
bucket = s3_resource.Bucket("my-bucket")

# Check connection
response = s3.list_buckets()
print([b["Name"] for b in response["Buckets"]])
Credentials: Never hard-code AWS credentials. boto3 automatically picks them up from environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY), ~/.aws/credentials, or the EC2/ECS/Lambda instance IAM role. In production, use IAM roles — no keys needed.

Uploading Files and Streams

import boto3
from boto3.s3.transfer import TransferConfig
import io

s3 = boto3.client("s3")
BUCKET = "my-app-bucket"

# Upload a file from disk
s3.upload_file(
    Filename="/tmp/report.pdf",
    Bucket=BUCKET,
    Key="reports/2026/june/report.pdf",
    ExtraArgs={
        "ContentType": "application/pdf",
        "ServerSideEncryption": "AES256",
        "Metadata": {"generated-by": "report-service", "version": "2.1"},
    },
)

# Upload from an in-memory buffer (no temp file needed)
csv_data = "id,name,amount\n1,Alice,100.00\n2,Bob,200.00\n".encode()
s3.put_object(
    Bucket=BUCKET,
    Key="exports/users.csv",
    Body=csv_data,
    ContentType="text/csv",
)

# Upload with progress callback
def progress_callback(bytes_transferred):
    print(f"\rUploaded: {bytes_transferred:,} bytes", end="", flush=True)

s3.upload_file(
    "/tmp/large-video.mp4",
    BUCKET,
    "media/video.mp4",
    Callback=progress_callback,
)
print()  # newline after progress

Downloading and Streaming

import boto3
import io

s3 = boto3.client("s3")
BUCKET = "my-app-bucket"

# Download to disk
s3.download_file(BUCKET, "reports/2026/june/report.pdf", "/tmp/report.pdf")

# Download to memory buffer
buffer = io.BytesIO()
s3.download_fileobj(BUCKET, "exports/users.csv", buffer)
buffer.seek(0)
content = buffer.read().decode("utf-8")
print(content)

# Stream large file chunk by chunk (never loads full file into memory)
def stream_s3_file(bucket: str, key: str, chunk_size: int = 65536):
    response = s3.get_object(Bucket=bucket, Key=key)
    body = response["Body"]
    while True:
        chunk = body.read(chunk_size)
        if not chunk:
            break
        yield chunk

# Process a large CSV without downloading it fully
import csv
import codecs

def process_large_csv(bucket: str, key: str):
    response = s3.get_object(Bucket=bucket, Key=key)
    lines = codecs.getreader("utf-8")(response["Body"])
    reader = csv.DictReader(lines)
    for row in reader:
        yield row

for record in process_large_csv(BUCKET, "data/large-export.csv"):
    print(record["id"], record["name"])

Multipart Upload for Large Files

boto3's upload_file and upload_fileobj automatically use multipart upload for files larger than the multipart_threshold (default 8 MB). You can tune chunk size, concurrency, and threshold via TransferConfig.

from boto3.s3.transfer import TransferConfig
import boto3

s3 = boto3.client("s3")

config = TransferConfig(
    multipart_threshold=1024 * 25,    # 25 MB — use multipart above this
    max_concurrency=10,               # parallel threads for upload chunks
    multipart_chunksize=1024 * 25,    # 25 MB per chunk
    use_threads=True,
)

# Automatic multipart for large files
s3.upload_file(
    "/data/10gb-dataset.parquet",
    "my-bucket",
    "datasets/2026/data.parquet",
    Config=config,
)

# Manual multipart control (for streams without a known size)
def multipart_upload_stream(stream, bucket: str, key: str, chunk_size: int = 25 * 1024 * 1024):
    mpu = s3.create_multipart_upload(Bucket=bucket, Key=key)
    upload_id = mpu["UploadId"]
    parts = []
    part_number = 1

    try:
        while True:
            chunk = stream.read(chunk_size)
            if not chunk:
                break
            part = s3.upload_part(
                Bucket=bucket, Key=key, UploadId=upload_id,
                PartNumber=part_number, Body=chunk,
            )
            parts.append({"ETag": part["ETag"], "PartNumber": part_number})
            part_number += 1

        s3.complete_multipart_upload(
            Bucket=bucket, Key=key, UploadId=upload_id,
            MultipartUpload={"Parts": parts},
        )
        print(f"Uploaded {part_number - 1} parts to s3://{bucket}/{key}")

    except Exception:
        s3.abort_multipart_upload(Bucket=bucket, Key=key, UploadId=upload_id)
        raise

Presigned URLs

Presigned URLs grant temporary, scoped access to S3 objects without requiring the caller to have AWS credentials. Use them to let browser clients upload directly to S3 (bypassing your server) or to share private files for a limited time.

import boto3
from datetime import timedelta

s3 = boto3.client("s3", region_name="ap-south-1")

# Presigned GET — download a private file
download_url = s3.generate_presigned_url(
    "get_object",
    Params={"Bucket": "my-bucket", "Key": "reports/confidential.pdf"},
    ExpiresIn=3600,  # 1 hour
)
print(f"Download link (valid 1h): {download_url}")

# Presigned PUT — browser uploads directly to S3
upload_url = s3.generate_presigned_url(
    "put_object",
    Params={
        "Bucket": "my-bucket",
        "Key": "uploads/user-42/avatar.jpg",
        "ContentType": "image/jpeg",
    },
    ExpiresIn=300,   # 5 minutes
)
print(f"Upload URL: {upload_url}")

# Presigned POST — more flexible, supports size limits and form fields
presigned_post = s3.generate_presigned_post(
    "my-bucket",
    "uploads/${filename}",
    Fields={"Content-Type": "image/jpeg"},
    Conditions=[
        {"Content-Type": "image/jpeg"},
        ["content-length-range", 1, 5 * 1024 * 1024],  # max 5 MB
    ],
    ExpiresIn=300,
)
# Return to browser: presigned_post["url"] + presigned_post["fields"]

# FastAPI endpoint to issue presigned upload URL
from fastapi import FastAPI
app = FastAPI()

@app.post("/upload-url")
async def get_upload_url(filename: str, content_type: str):
    key = f"uploads/{filename}"
    url = s3.generate_presigned_url(
        "put_object",
        Params={"Bucket": "my-bucket", "Key": key, "ContentType": content_type},
        ExpiresIn=300,
    )
    return {"upload_url": url, "key": key}

S3 Select: Query Without Downloading

S3 Select runs a SQL query server-side on CSV, JSON, or Parquet files stored in S3. It returns only the matching rows, reducing data transfer by up to 98% for selective queries on large files.

import boto3
import json

s3 = boto3.client("s3", region_name="ap-south-1")

# Query CSV file in S3 — only download matching rows
def s3_select_csv(bucket: str, key: str, sql: str) -> list[dict]:
    response = s3.select_object_content(
        Bucket=bucket,
        Key=key,
        ExpressionType="SQL",
        Expression=sql,
        InputSerialization={
            "CSV": {"FileHeaderInfo": "USE", "RecordDelimiter": "\n", "FieldDelimiter": ","},
            "CompressionType": "NONE",
        },
        OutputSerialization={"JSON": {"RecordDelimiter": "\n"}},
    )

    records = []
    for event in response["Payload"]:
        if "Records" in event:
            payload = event["Records"]["Payload"].decode("utf-8")
            for line in payload.strip().split("\n"):
                if line:
                    records.append(json.loads(line))
    return records


# Example: query orders CSV for high-value orders without downloading 10GB
results = s3_select_csv(
    "my-data-bucket",
    "orders/2026/orders.csv",
    "SELECT order_id, customer_id, amount FROM S3Object WHERE CAST(amount AS FLOAT) > 1000.00",
)
print(f"Found {len(results)} high-value orders")
for r in results[:5]:
    print(r)

Async S3 with aioboto3

import aioboto3
import asyncio
import io
from contextlib import asynccontextmanager
from fastapi import FastAPI, UploadFile

session = aioboto3.Session()

@asynccontextmanager
async def lifespan(app: FastAPI):
    yield

app = FastAPI(lifespan=lifespan)

@app.post("/upload")
async def upload_file(file: UploadFile, folder: str = "uploads"):
    key = f"{folder}/{file.filename}"
    async with session.client("s3", region_name="ap-south-1") as s3:
        await s3.upload_fileobj(
            file,
            "my-bucket",
            key,
            ExtraArgs={"ContentType": file.content_type},
        )
    return {"key": key, "bucket": "my-bucket"}


async def download_to_memory(bucket: str, key: str) -> bytes:
    async with session.client("s3") as s3:
        buffer = io.BytesIO()
        await s3.download_fileobj(bucket, key, buffer)
        return buffer.getvalue()


async def list_files(bucket: str, prefix: str) -> list[str]:
    async with session.client("s3") as s3:
        paginator = s3.get_paginator("list_objects_v2")
        keys = []
        async for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
            for obj in page.get("Contents", []):
                keys.append(obj["Key"])
        return keys

Frequently Asked Questions

What is the maximum file size for S3 upload?
S3 supports objects up to 5 TB. Single PUT operations are limited to 5 GB. For files larger than 5 GB you must use multipart upload. boto3's upload_file automatically switches to multipart for files over the multipart_threshold (default 8 MB).
How do I handle S3 upload errors and retries?
boto3 retries transient errors (throttling, connection resets) automatically via the retry config. For multipart uploads, always wrap in try/except and call abort_multipart_upload() on failure — incomplete multipart uploads incur storage charges. Enable S3 lifecycle rules to auto-abort incomplete uploads after N days.
How do I make S3 files publicly accessible?
Never set bucket-level public access. Instead, use presigned URLs for temporary access, or CloudFront as a CDN in front of a private bucket. If you need truly public static assets, enable public access on a specific path via bucket policy but block public access at the account level for everything else.