Database sharding (horizontal partitioning) is the technique of splitting a single large database into multiple smaller databases called shards. Each shard holds a subset of the data and runs on its own server — together they hold the complete dataset.
Sharding is the go-to solution when your data outgrows a single machine. But it introduces significant complexity — cross-shard queries, rebalancing, and the critical shard key decision. This guide explains everything you need to know for system design interviews and real-world implementation.
Don't shard prematurely — it adds enormous complexity. Try simpler options first:
# Scaling ladder (try in order before sharding):
1. Vertical scaling: bigger machine (more CPU, RAM, NVMe SSDs)
→ PostgreSQL on 192-core, 12TB RAM can handle billions of rows
2. Read replicas: 3–5 read-only replicas for read-heavy workloads
→ offloads 80%+ of reads from primary
3. Caching: Redis/Memcached in front of DB
→ 99% cache hit rate means DB rarely touched
4. Table partitioning: Partition by date within a SINGLE server (PostgreSQL, MySQL)
→ fast range queries, automatic old data archival
5. Vertical sharding: Move specific tables to their own dedicated DB server
(user table on Server A, orders table on Server B)
6. Horizontal sharding: Split rows of ONE large table across multiple servers
→ only when everything else is insufficient
# When sharding is necessary:
✓ Single table > 100–500 GB and growing fast
✓ Write TPS consistently > 10K on a single primary (even with SSD)
✓ Vertical scaling cost exceeds horizontal sharding cost
✓ Data residency / compliance requires geographic separation
shard = hash(shard_key) % N
Even distribution, no hot spots. Bad for range queries. Adding shards remaps data (use consistent hashing to mitigate).
Shard A: IDs 1–1M, Shard B: 1M–2M, etc.
Good for range queries. Sequential writes create hot shard at tail. Easy to reason about.
Lookup table: user_id → shard_id
Maximum flexibility. Lookup table is a SPOF. Extra network hop per request.
US users → US shard, EU → EU shard
Data residency compliance (GDPR). Low latency for regional users. Unbalanced if regions grow unevenly.
# Simple hash sharding:
def get_shard(user_id: int, n_shards: int) -> int:
return hash(str(user_id)) % n_shards
# Problem: adding a shard remaps (N-1)/N of all keys → mass data migration
# Solution: use consistent hashing (see consistent-hashing-explained.html)
def get_shard_consistent(user_id: int, ring: ConsistentHashRing) -> str:
return ring.get_server(str(user_id))
# Shard routing at application layer:
class ShardRouter:
def __init__(self, shard_connections: dict):
self.ring = ConsistentHashRing(list(shard_connections.keys()))
self.connections = shard_connections # {shard_name: DB_connection}
def get_connection(self, user_id: int):
shard_name = self.ring.get_server(str(user_id))
return self.connections[shard_name]
router = ShardRouter({
"shard_0": psycopg2.connect("host=shard0.db ..."),
"shard_1": psycopg2.connect("host=shard1.db ..."),
"shard_2": psycopg2.connect("host=shard2.db ..."),
})
# Query: always include shard key in WHERE clause
def get_user(user_id: int) -> dict:
conn = router.get_connection(user_id)
return conn.execute("SELECT * FROM users WHERE id = %s", user_id).fetchone()
# Range sharding config (stored in ZooKeeper or etcd):
SHARD_RANGES = [
{"shard": "shard_0", "min": 0, "max": 25_000_000},
{"shard": "shard_1", "min": 25_000_001, "max": 50_000_000},
{"shard": "shard_2", "min": 50_000_001, "max": 75_000_000},
{"shard": "shard_3", "min": 75_000_001, "max": float("inf")},
]
# Hot spot problem: all new users get ID > 75M → all writes go to shard_3
# Fix: use UUIDs instead of sequential IDs for user IDs
# Or: pre-split based on expected growth (Cassandra's approach)
# Good use case: IoT time-series data
# Shard by date range: shard_2026_06 contains all readings from June 2026
# Range queries "give me readings from 2026-06-01 to 2026-06-30" = single shard ✓
The shard key decision is permanent (or extremely expensive to change). Choose carefully.
# 1. HIGH CARDINALITY — many unique values # Bad: status field (3 values: active/inactive/deleted) → 3 shards max, all data in 1-2 # Good: user_id (millions of unique values) # 2. EVEN DISTRIBUTION — no hot shards # Bad: country_code if 90% of users are from one country → one shard gets 90% of data # Good: user_id (random/UUID) → uniform distribution # 3. MINIMISE CROSS-SHARD QUERIES — related data in same shard # Bad: shard by product_id for an e-commerce app # → order "user A bought product B" → user on shard 2, product on shard 5 → JOIN = cross-shard # Good: shard by user_id → all of user A's orders, cart, preferences on same shard # 4. MONOTONICALLY INCREASING KEYS ARE DANGEROUS for hash sharding # Sequential IDs (1, 2, 3, ...) hash well # But auto-increment creates write hotspot in range sharding → use UUIDs or Snowflake IDs
| System | Shard Key | Why |
|---|---|---|
| Social network (Twitter) | user_id | User's tweets, follows, and feed all co-located |
| E-commerce (Amazon) | user_id or order_id | Order history per user in same shard; Checkout queries by order_id |
| Chat (WhatsApp) | conversation_id | All messages in a conversation in same shard |
| Multi-tenant SaaS | tenant_id (org_id) | One tenant's data stays together; easy tenant-level backup/delete |
| URL shortener | short_code | Redirect lookup is always by short_code |
| IoT time-series | device_id + time bucket | All readings for a device together; time range queries efficient |
# Problem: "Get top 10 users by follower_count"
# → follower counts are on different shards → must query all shards → expensive
# Solution A: Scatter-gather (fan out to all shards, merge results)
def get_top_users_globally(n=10):
results = []
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(shard.query,
"SELECT id, follower_count FROM users ORDER BY follower_count DESC LIMIT %s", n)
for shard in all_shards]
for future in concurrent.futures.as_completed(futures):
results.extend(future.result())
results.sort(key=lambda x: x.follower_count, reverse=True)
return results[:n]
# Cost: O(N_shards) queries → acceptable if N_shards is small (8–16)
# Solution B: Denormalize — maintain a global analytics table in a separate non-sharded DB
# Write: when follower_count changes → async update to analytics.user_stats
# Read: query analytics.user_stats directly (not sharded → single query)
# Solution C: Avoid the query — pre-compute and cache the leaderboard
# Cron job every 5 minutes → scatter-gather → store result in Redis → serve from cache
# Problem: Transfer $100 from User A (Shard 1) to User B (Shard 3) # Need atomicity: either BOTH accounts update or NEITHER # Solution A: Two-Phase Commit (2PC) — strong consistency # Phase 1 (Prepare): coordinator asks both shards "can you commit?" # Phase 2 (Commit): if both say yes → both commit; otherwise both rollback # Problem: coordinator failure during Phase 2 → stuck in limbo (slow, fragile) # Solution B: Saga Pattern — eventual consistency # Step 1: Debit User A → success # Step 2: Credit User B → if fails → compensate: Credit User A back # Implemented as sequence of local transactions with compensating actions # Used by most financial systems at scale (much more resilient than 2PC) # Solution C: Redesign to avoid cross-shard transactions # For payments: route both users to same "payment shard" (by payment_id, not user_id) # The payment record is a single-shard operation; update user balances asynchronously
# Problem: 10% of users are power users → shard_2 (which has power users) gets 80% of traffic # Detection: monitor query volume per shard # Alert if any shard handles > 2× the average traffic # Solution A: Shard splitting — split the hot shard into 2 # Shard 2 (users A-Z) → Shard 2a (users A-M) + Shard 2b (users N-Z) # Data migration: copy half the data, update routing, switch traffic # Solution B: Add read replicas to the hot shard only # Hot shard: 1 primary + 5 read replicas (others: 1 primary + 1 replica) # Read requests load-balanced across replicas # Solution C: Tiered caching — aggressive cache in front of hot shard # If power users' data is heavily read, cache it → DB sees fewer requests
# Problem: going from 4 shards to 8 shards # With naive hashing: massive data migration (every key remapped) # With consistent hashing: only 1/8 of data moves (from the half of each old shard) # Resharding process (zero-downtime): # 1. Add new shards to the ring # 2. Start dual-writing: write to old shard AND new shard for each new write # 3. Background job: migrate existing data from old shard to new shard # 4. Once migration complete: switch reads to new shard, stop dual-writing, decommission old # Resharding is expensive — plan shard count conservatively # Over-shard initially: 64 or 128 shards even if starting small # Shards can run on fewer physical servers (multiple shards per server) # Adding servers later = move shards (not resharding), which is much simpler
| Type | What It Does | When to Use | Limitation |
|---|---|---|---|
| Vertical Scaling | Bigger machine (more RAM, CPU) | First response to bottleneck. Quick and simple. | Hardware limit exists. Expensive at high end. |
| Vertical Sharding | Split different tables to different servers (users table on DB1, orders table on DB2) | One table is causing all the bottleneck. | JOIN between tables on different servers = expensive. |
| Horizontal Sharding | Split rows of same table across multiple servers | Single large table; unlimited horizontal scale. | Cross-shard JOINs; no distributed transactions. |
| Read Replicas | Copy of primary DB for read-only queries | Read-heavy workloads (analytics, reporting). | Replication lag; doesn't help write bottleneck. |
Building your own sharding layer is complex. In practice, use one of these:
| Solution | Type | Sharding Approach | Best For |
|---|---|---|---|
| Amazon Aurora Sharding | Managed MySQL/PostgreSQL | Horizontal + read replicas | AWS-native apps with <128TB |
| Vitess (YouTube/PlanetScale) | MySQL proxy layer | Hash + range sharding over MySQL clusters | Large MySQL deployments, open source |
| Citus (PostgreSQL extension) | PostgreSQL extension | Hash/range sharding, distributed queries | PostgreSQL users needing horizontal scale |
| Amazon DynamoDB | Managed NoSQL | Automatic hash sharding via partition keys | Key-value/document access patterns, serverless |
| Cassandra | Distributed NoSQL | Consistent hashing with virtual nodes | Write-heavy, time-series, global distribution |
| MongoDB Atlas | Managed document DB | Hash/range sharding on shard key | Document data with flexible schema |