NoSQL
Scaling NoSQL Databases
Every database starts on a single server and works fine — until it doesn't. Traffic grows, the disk fills, queries slow, and the CPU sits at 95% permanently. The moment you realise one machine will never be enough again is the moment you need a scaling strategy. NoSQL databases were built for exactly this problem — but scaling them correctly still requires deliberate decisions that are very hard to reverse once made.
Vertical vs Horizontal Scaling
There are two fundamental directions you can scale a database. The decision you make first shapes everything that comes after it — your operational complexity, your cost curve, and the ceiling of how far you can go.
Scale vertically first. It is faster, simpler, and cheap in the early stages. Switch to horizontal scaling when you can no longer afford the next vertical tier, when a single machine creates an unacceptable single point of failure, or when your write throughput exceeds what one node can absorb. Most teams shard too early — before they have the data volume to justify the operational cost.
Read Scaling — Replica Sets and Read Preferences
Most production applications are read-heavy. The fastest and safest way to scale reads is to add read replicas and direct read traffic to them — leaving the primary free for writes and reducing the read load on the machine that matters most.
The scenario: You are the lead engineer at a fast-growing content platform. Your MongoDB primary is at 78% CPU. Query profiling shows 90% of operations are reads — product pages, user feeds, search results. Writes are 10%. You do not want to shard yet because the data model is still evolving. You want to scale reads horizontally by adding secondaries and routing read traffic intelligently.
const { MongoClient } = require("mongodb");
const client = new MongoClient(process.env.MONGO_URI, {
// Default read preference for the entire client
// "secondaryPreferred" = use a secondary if available, primary as fallback
readPreference: "secondaryPreferred"
});
const db = client.db("platform");
// Product page — staleness of a few seconds is fine
// Routes to a secondary — zero primary load
async function getProduct(productId) {
return db.collection("products").findOne(
{ _id: productId },
{ readPreference: "secondaryPreferred" }
);
}
// User's own order history — must be up-to-date
// Override to primary — user just placed this order
async function getMyOrders(userId) {
return db.collection("orders").find(
{ userId },
{ readPreference: "primary" } // override — must read own writes
).toArray();
}
// Analytics aggregation — can tolerate seconds of lag, heavy query
// nearest = route to lowest-latency node regardless of primary/secondary
async function getDailyStats() {
return db.collection("events").aggregate(
[{ $group: { _id: "$date", count: { $sum: 1 } } }],
{ readPreference: "nearest" }
).toArray();
}
// Before — all reads hitting the primary Primary CPU: 78% | Read ops/sec on primary: 4,200 // After — secondary preferred routing in place Primary CPU: 21% | Read ops/sec on primary: 490 (writes only) Secondary-1 CPU: 38% | Read ops/sec: 1,890 Secondary-2 CPU: 41% | Read ops/sec: 1,810 // getProduct() → routed to secondary (staleness: ~1.2s) ✓ // getMyOrders() → routed to primary (always fresh) ✓ // getDailyStats() → routed to nearest (latency: 2.1ms) ✓
"secondaryPreferred" vs "secondary"
secondaryPreferred uses a secondary if one is available, falling back to the primary if all secondaries are down. secondary will refuse to serve the query at all if no secondary is reachable — which can cause application errors during maintenance or failover. Always use secondaryPreferred unless you explicitly need to fail rather than fall back.
readPreference: "primary" for getMyOrders()
Replication is asynchronous. A secondary may lag the primary by 0.5 to several seconds. If a user places an order and immediately hits "my orders", a secondary-routed read might not show the order yet — the user sees a blank page and panics. Any query where the user is reading data they just wrote must go to the primary. This is the read-your-own-writes consistency pattern.
"nearest" for analytics
nearest routes to whichever node has the lowest network round-trip time — primary or secondary. For a heavy aggregation that takes 3 seconds and where you do not care about a few seconds of lag, this minimises the latency of the network hop. It also means the aggregation might run on a geographically closer secondary replica in a multi-region setup.
Write Scaling — MongoDB Sharding
When writes outgrow a single node, you need sharding — partitioning your data across multiple nodes so each node handles a subset of writes. MongoDB sharding is powerful, but the most consequential decision is the shard key: the field (or fields) used to determine which shard a document lives on. A good shard key distributes load evenly. A bad one creates hotspots where one shard handles all the traffic while the others sit idle.
_id alone as a shard key on a write-heavy collection.status only has 3 values (active, pending, archived), you get at most 3 chunks regardless of how many shards you add. You cannot split a chunk further than its key cardinality. Low cardinality creates an upper bound on how much you can distribute data.userId distributes documents pseudo-randomly across shards with no hotspots. Every shard receives roughly equal write load. The trade-off: range queries on userId must hit every shard (scatter-gather). Best when your dominant access pattern is single-document lookups by that field.The scenario: Your platform's events collection is receiving 50,000 writes per second and has grown to 4TB. A single shard can no longer keep up. You are enabling sharding on the collection. You need to choose a shard key that distributes writes evenly and supports your dominant query pattern — lookups by userId across a time range.
// Step 1: enable sharding on the database
sh.enableSharding("platform");
// Step 2: create the index that will back the shard key
// Compound: userId (range) + timestamp (range)
// userId gives isolation per user, timestamp allows range queries
db.events.createIndex({ userId: 1, timestamp: 1 });
// Step 3: shard the collection on the compound key
sh.shardCollection("platform.events", {
userId: 1, // range-based on userId — all events for one user co-located
timestamp: 1 // range within a user — supports efficient time-range queries
});
// Step 4: verify shard distribution
sh.status();
--- Sharding Status ---
shards:
{ "_id":"shard0","host":"shard0/rs0:27017,rs0:27018,rs0:27019" }
{ "_id":"shard1","host":"shard1/rs1:27017,rs1:27018,rs1:27019" }
{ "_id":"shard2","host":"shard2/rs2:27017,rs2:27018,rs2:27019" }
databases:
platform.events
shard key: { userId: 1, timestamp: 1 }
chunks:
shard0: 142 (33.2% of data)
shard1: 139 (32.6% of data)
shard2: 145 (34.2% of data) ✓ evenly distributed
Query: db.events.find({userId:"u_4412", timestamp:{$gte: t}})
Plan: IXSCAN on shard0 only ← targeted single-shard query ✓Compound key { userId: 1, timestamp: 1 }
The compound key solves two problems simultaneously. userId as the first field ensures all events for a single user land on the same shard — so a query like "get all events for user X in the last 7 days" is a single-shard operation, not a scatter-gather across all shards. timestamp as the second field allows efficient range queries within a user's events using the index rather than scanning.
Shard key cannot be changed after sharding
This is the most important operational constraint in MongoDB sharding. Once you shard a collection, changing the shard key requires unsharding the collection (exporting all data, dropping, resharding) — which is a multi-hour operation on a large collection with downtime risk. Get the shard key right before you enable sharding. Test your dominant query patterns against it on a staging cluster first.
Targeted vs scatter-gather queries
A query that includes the shard key prefix (userId) is routed to exactly one shard — a targeted query. A query without the shard key (e.g., find events by eventType only) must be sent to every shard and the results merged by the mongos router — a scatter-gather query. On 10 shards, a scatter-gather query has 10× the latency and 10× the resource cost of a targeted query. Design your shard key so your dominant queries are targeted.
Scaling Cassandra — Adding Nodes to a Live Cluster
Cassandra was built for horizontal scaling from day one. Adding a new node to a live Cassandra cluster is a routine operational task — unlike MongoDB sharding, it does not require pre-planning a shard key. Cassandra automatically rebalances token ranges across the expanded cluster. The process is called bootstrapping.
The scenario: Your 4-node Cassandra cluster storing IoT sensor data is approaching capacity. Each node holds around 800GB of data and write latency is creeping up. You are adding two new nodes to expand the cluster to 6. Once added, Cassandra will stream data from existing nodes to the new ones and rebalance token ownership automatically.
# cassandra.yaml on the NEW node — key settings for joining
# seeds: at least one existing node so the new node can find the cluster
# auto_bootstrap: true means the node will stream data from existing nodes
cat /etc/cassandra/cassandra.yaml | grep -E "seeds|auto_bootstrap|listen"
# seeds: "10.0.1.10,10.0.1.11"
# listen_address: "10.0.1.20" # new node's IP
# auto_bootstrap: true # default true — enables data streaming on join
# Start the new node — it automatically joins and begins bootstrapping
cassandra -f &
# Monitor bootstrap progress from any existing node
watch -n 5 nodetool status
# After bootstrap completes, run cleanup on existing nodes
# This removes data that no longer belongs to them after rebalancing
for node in 10.0.1.10 10.0.1.11 10.0.1.12 10.0.1.13; do
ssh "$node" "nodetool cleanup iot_sensors" &
done
wait
echo "Cleanup complete — cluster rebalanced"
$ nodetool status (during bootstrap) Datacenter: dc1 -- Address Load Tokens Owns State UN 10.0.1.10 801.2 GiB 256 25.1% Normal UN 10.0.1.11 798.4 GiB 256 24.9% Normal UN 10.0.1.12 803.1 GiB 256 25.2% Normal UN 10.0.1.13 799.7 GiB 256 24.8% Normal UJ 10.0.1.20 ? 256 ? Joining ← new node bootstrapping $ nodetool status (after bootstrap + cleanup) UN 10.0.1.10 534.8 GiB 256 16.7% Normal UN 10.0.1.11 533.9 GiB 256 16.6% Normal UN 10.0.1.12 535.1 GiB 256 16.7% Normal UN 10.0.1.13 534.2 GiB 256 16.6% Normal UN 10.0.1.20 532.7 GiB 256 16.6% Normal ← joined ✓ UN 10.0.1.21 533.3 GiB 256 16.7% Normal ← joined ✓ Cluster rebalanced. Write load distributed across 6 nodes.
State: UJ (Up/Joining)
During bootstrap, the new node shows UJ — it is up and reachable but still receiving data from existing nodes via the streaming process. The cluster continues to serve all traffic during this period. Bootstrapping on an 800GB node typically takes 1–4 hours depending on network bandwidth. Do not add a second new node until the first has fully joined — two simultaneous bootstraps split the streaming bandwidth and slow both.
nodetool cleanup after bootstrap
After bootstrap, existing nodes still hold data that has been transferred to the new node — they just no longer own those token ranges. nodetool cleanup removes this now-orphaned data, freeing the disk space. Without cleanup, the old nodes remain over-full even though the logical data distribution has rebalanced. Always run cleanup on all existing nodes after adding capacity.
Load drops from 800GB to 535GB per node
Going from 4 to 6 nodes reduces each node's data ownership from 25% to 16.7% — a 33% reduction in per-node load with no application changes, no shard key decisions, and no downtime. This is the horizontal scaling advantage Cassandra was designed for. Each new node also adds a proportional share of CPU, RAM, and network bandwidth to the cluster.
Scaling Comparison — MongoDB vs Cassandra vs DynamoDB
| Dimension | MongoDB | Cassandra | DynamoDB |
|---|---|---|---|
| Read scaling | Add secondaries, use read preferences | Add nodes, all nodes serve reads | Auto-scales; DAX cache for hot reads |
| Write scaling | Sharding — requires shard key decision | Add nodes — rebalances automatically | Auto-scales; provisioned or on-demand |
| Operational effort | Medium — shard key planning required | Medium — node ops, compaction tuning | Minimal — fully managed by AWS |
| Scaling ceiling | Very high — multi-shard clusters | Extremely high — proven at petabyte scale | Virtually unlimited (AWS infra) |
| Worst mistake | Wrong shard key — hotspot, can't change | Skipping cleanup after node add | Hot partition key — one item gets all traffic |
Teacher's Note
The engineers I have seen make the most expensive scaling mistakes were not the ones who scaled too late — they were the ones who sharded too early, before they understood their dominant query patterns, and chose a shard key that looked reasonable in testing but created a hotspot under real write traffic. Resharding a 5TB MongoDB collection in production is a multi-day operation with significant risk. Do the boring work first: profile your queries, measure your actual read/write ratio, and exhaust vertical scaling and read replicas before you touch sharding. Sharding is a one-way door. Open it carefully.
Practice Questions — You're the Engineer
Scenario:
Scenario:
orders collection using created_at as the shard key. Three months after launch, your monitoring shows shard2 handling 94% of all writes while shard0 and shard1 sit almost idle. The write latency on shard2 is 800ms while the other shards are at 4ms. A colleague explains that because created_at is a timestamp that always increases, all new documents land on the rightmost chunk — always on shard2 — while older shards receive no new data. What is the single word used to describe this scaling failure where one shard receives a disproportionate share of traffic?
Scenario:
nodetool status shows all 6 nodes as UN (Up/Normal) with evenly distributed token ownership. However, your disk usage monitoring shows the original 4 nodes still reporting 800GB each — the same as before the expansion — even though the new nodes are each showing 530GB. A senior engineer tells you there is one more command you must run on each of the original 4 nodes to remove the data they no longer own. What is that command?
Quiz — Scaling in Production
Scenario:
Scenario:
events collection: Option A is { timestamp: 1 } and Option B is { userId: "hashed" }. A senior engineer says Option A will create a write hotspot within weeks and recommends Option B. A junior engineer argues both keys have high cardinality so both should distribute evenly. Who is correct, and why?
Scenario:
events collection is sharded on { userId: 1, timestamp: 1 } across 8 shards. A new reporting feature runs a query that finds all events of type "purchase" across all users in a date range: db.events.find({ eventType: "purchase", timestamp: { $gte: t } }). In testing this query took 12ms. In production with 8 shards it takes 380ms and your DBA is alarmed. The shard key is not included in the query filter. What is the correct explanation for why this query is disproportionately slow on a sharded cluster?
Up Next · Lesson 38
NoSQL in AWS
DynamoDB, DocumentDB, ElastiCache, and Keyspaces — the managed NoSQL services that let you scale without operating a single node.