NO SQL Lesson 37 – Scaling NoSQL Databases | Dataplexa
Enterprise & Cloud · Lesson 37

Scaling NoSQL Databases

Every database starts on a single server and works fine — until it doesn't. Traffic grows, the disk fills, queries slow, and the CPU sits at 95% permanently. The moment you realise one machine will never be enough again is the moment you need a scaling strategy. NoSQL databases were built for exactly this problem — but scaling them correctly still requires deliberate decisions that are very hard to reverse once made.

Vertical vs Horizontal Scaling

There are two fundamental directions you can scale a database. The decision you make first shapes everything that comes after it — your operational complexity, your cost curve, and the ceiling of how far you can go.

Vertical Scaling (Scale Up)
🖥️ → 🖥️💪
Bigger machine, same architecture
Add more CPU cores, more RAM, faster NVMe SSDs to the existing server. Zero code changes. Zero operational complexity added. Works extremely well — until you hit the hardware ceiling or the cost becomes prohibitive.
No code changes required
Simple — one machine to operate
No distribution complexity
Hard ceiling — biggest instance type
Cost curve becomes exponential
Single point of failure remains
Horizontal Scaling (Scale Out)
🖥️ → 🖥️🖥️🖥️
More machines, distributed data
Add more nodes to the cluster. Data is partitioned (sharded) across them. No single machine holds all the data or handles all the traffic. Theoretically unlimited scale — practically limited by your ability to manage the complexity.
Theoretically unlimited scale
No hardware ceiling
Commodity hardware — lower cost/unit
Shard key choice is hard to reverse
Cross-shard queries are expensive
Operational complexity multiplies
The Rule of Thumb

Scale vertically first. It is faster, simpler, and cheap in the early stages. Switch to horizontal scaling when you can no longer afford the next vertical tier, when a single machine creates an unacceptable single point of failure, or when your write throughput exceeds what one node can absorb. Most teams shard too early — before they have the data volume to justify the operational cost.

Read Scaling — Replica Sets and Read Preferences

Most production applications are read-heavy. The fastest and safest way to scale reads is to add read replicas and direct read traffic to them — leaving the primary free for writes and reducing the read load on the machine that matters most.

The scenario: You are the lead engineer at a fast-growing content platform. Your MongoDB primary is at 78% CPU. Query profiling shows 90% of operations are reads — product pages, user feeds, search results. Writes are 10%. You do not want to shard yet because the data model is still evolving. You want to scale reads horizontally by adding secondaries and routing read traffic intelligently.

Node.js — configuring read preferences per operation
const { MongoClient } = require("mongodb");

const client = new MongoClient(process.env.MONGO_URI, {
  // Default read preference for the entire client
  // "secondaryPreferred" = use a secondary if available, primary as fallback
  readPreference: "secondaryPreferred"
});

const db = client.db("platform");

// Product page — staleness of a few seconds is fine
// Routes to a secondary — zero primary load
async function getProduct(productId) {
  return db.collection("products").findOne(
    { _id: productId },
    { readPreference: "secondaryPreferred" }
  );
}

// User's own order history — must be up-to-date
// Override to primary — user just placed this order
async function getMyOrders(userId) {
  return db.collection("orders").find(
    { userId },
    { readPreference: "primary" }   // override — must read own writes
  ).toArray();
}

// Analytics aggregation — can tolerate seconds of lag, heavy query
// nearest = route to lowest-latency node regardless of primary/secondary
async function getDailyStats() {
  return db.collection("events").aggregate(
    [{ $group: { _id: "$date", count: { $sum: 1 } } }],
    { readPreference: "nearest" }
  ).toArray();
}
// Before — all reads hitting the primary
Primary CPU: 78%  |  Read ops/sec on primary: 4,200

// After — secondary preferred routing in place
Primary CPU: 21%  |  Read ops/sec on primary: 490   (writes only)
Secondary-1 CPU: 38%  |  Read ops/sec: 1,890
Secondary-2 CPU: 41%  |  Read ops/sec: 1,810

// getProduct()  → routed to secondary  (staleness: ~1.2s)  ✓
// getMyOrders() → routed to primary    (always fresh)       ✓
// getDailyStats() → routed to nearest  (latency: 2.1ms)    ✓
"secondaryPreferred" vs "secondary"

secondaryPreferred uses a secondary if one is available, falling back to the primary if all secondaries are down. secondary will refuse to serve the query at all if no secondary is reachable — which can cause application errors during maintenance or failover. Always use secondaryPreferred unless you explicitly need to fail rather than fall back.

readPreference: "primary" for getMyOrders()

Replication is asynchronous. A secondary may lag the primary by 0.5 to several seconds. If a user places an order and immediately hits "my orders", a secondary-routed read might not show the order yet — the user sees a blank page and panics. Any query where the user is reading data they just wrote must go to the primary. This is the read-your-own-writes consistency pattern.

"nearest" for analytics

nearest routes to whichever node has the lowest network round-trip time — primary or secondary. For a heavy aggregation that takes 3 seconds and where you do not care about a few seconds of lag, this minimises the latency of the network hop. It also means the aggregation might run on a geographically closer secondary replica in a multi-region setup.

Write Scaling — MongoDB Sharding

When writes outgrow a single node, you need sharding — partitioning your data across multiple nodes so each node handles a subset of writes. MongoDB sharding is powerful, but the most consequential decision is the shard key: the field (or fields) used to determine which shard a document lives on. A good shard key distributes load evenly. A bad one creates hotspots where one shard handles all the traffic while the others sit idle.

Shard Key Patterns — Choose Carefully
Monotonically increasing key (e.g. ObjectId, timestamp)
All new documents get the highest ObjectId value, so they always land on the same shard — the "last" shard in the range. That shard receives 100% of writes while all others are idle. This is the classic hotspot. Never use _id alone as a shard key on a write-heavy collection.
⚠️ Low cardinality key (e.g. status, country)
If status only has 3 values (active, pending, archived), you get at most 3 chunks regardless of how many shards you add. You cannot split a chunk further than its key cardinality. Low cardinality creates an upper bound on how much you can distribute data.
High cardinality + even distribution (e.g. hashed userId)
Hashing a high-cardinality field like userId distributes documents pseudo-randomly across shards with no hotspots. Every shard receives roughly equal write load. The trade-off: range queries on userId must hit every shard (scatter-gather). Best when your dominant access pattern is single-document lookups by that field.

The scenario: Your platform's events collection is receiving 50,000 writes per second and has grown to 4TB. A single shard can no longer keep up. You are enabling sharding on the collection. You need to choose a shard key that distributes writes evenly and supports your dominant query pattern — lookups by userId across a time range.

MongoDB shell — enabling sharding with a compound shard key
// Step 1: enable sharding on the database
sh.enableSharding("platform");

// Step 2: create the index that will back the shard key
// Compound: userId (range) + timestamp (range)
// userId gives isolation per user, timestamp allows range queries
db.events.createIndex({ userId: 1, timestamp: 1 });

// Step 3: shard the collection on the compound key
sh.shardCollection("platform.events", {
  userId:    1,   // range-based on userId — all events for one user co-located
  timestamp: 1    // range within a user — supports efficient time-range queries
});

// Step 4: verify shard distribution
sh.status();
--- Sharding Status ---
  shards:
    { "_id":"shard0","host":"shard0/rs0:27017,rs0:27018,rs0:27019" }
    { "_id":"shard1","host":"shard1/rs1:27017,rs1:27018,rs1:27019" }
    { "_id":"shard2","host":"shard2/rs2:27017,rs2:27018,rs2:27019" }

  databases:
    platform.events
      shard key: { userId: 1, timestamp: 1 }
      chunks:
        shard0: 142  (33.2% of data)
        shard1: 139  (32.6% of data)
        shard2: 145  (34.2% of data)    ✓ evenly distributed

  Query:  db.events.find({userId:"u_4412", timestamp:{$gte: t}})
  Plan:   IXSCAN on shard0 only  ← targeted single-shard query  ✓
Compound key { userId: 1, timestamp: 1 }

The compound key solves two problems simultaneously. userId as the first field ensures all events for a single user land on the same shard — so a query like "get all events for user X in the last 7 days" is a single-shard operation, not a scatter-gather across all shards. timestamp as the second field allows efficient range queries within a user's events using the index rather than scanning.

Shard key cannot be changed after sharding

This is the most important operational constraint in MongoDB sharding. Once you shard a collection, changing the shard key requires unsharding the collection (exporting all data, dropping, resharding) — which is a multi-hour operation on a large collection with downtime risk. Get the shard key right before you enable sharding. Test your dominant query patterns against it on a staging cluster first.

Targeted vs scatter-gather queries

A query that includes the shard key prefix (userId) is routed to exactly one shard — a targeted query. A query without the shard key (e.g., find events by eventType only) must be sent to every shard and the results merged by the mongos router — a scatter-gather query. On 10 shards, a scatter-gather query has 10× the latency and 10× the resource cost of a targeted query. Design your shard key so your dominant queries are targeted.

Scaling Cassandra — Adding Nodes to a Live Cluster

Cassandra was built for horizontal scaling from day one. Adding a new node to a live Cassandra cluster is a routine operational task — unlike MongoDB sharding, it does not require pre-planning a shard key. Cassandra automatically rebalances token ranges across the expanded cluster. The process is called bootstrapping.

The scenario: Your 4-node Cassandra cluster storing IoT sensor data is approaching capacity. Each node holds around 800GB of data and write latency is creeping up. You are adding two new nodes to expand the cluster to 6. Once added, Cassandra will stream data from existing nodes to the new ones and rebalance token ownership automatically.

bash — adding a node to a live Cassandra cluster
# cassandra.yaml on the NEW node — key settings for joining
# seeds: at least one existing node so the new node can find the cluster
# auto_bootstrap: true means the node will stream data from existing nodes

cat /etc/cassandra/cassandra.yaml | grep -E "seeds|auto_bootstrap|listen"
# seeds: "10.0.1.10,10.0.1.11"
# listen_address: "10.0.1.20"   # new node's IP
# auto_bootstrap: true           # default true — enables data streaming on join

# Start the new node — it automatically joins and begins bootstrapping
cassandra -f &

# Monitor bootstrap progress from any existing node
watch -n 5 nodetool status

# After bootstrap completes, run cleanup on existing nodes
# This removes data that no longer belongs to them after rebalancing
for node in 10.0.1.10 10.0.1.11 10.0.1.12 10.0.1.13; do
  ssh "$node" "nodetool cleanup iot_sensors" &
done
wait
echo "Cleanup complete — cluster rebalanced"
$ nodetool status  (during bootstrap)
Datacenter: dc1
--  Address       Load        Tokens  Owns    State
UN  10.0.1.10    801.2 GiB   256     25.1%   Normal
UN  10.0.1.11    798.4 GiB   256     24.9%   Normal
UN  10.0.1.12    803.1 GiB   256     25.2%   Normal
UN  10.0.1.13    799.7 GiB   256     24.8%   Normal
UJ  10.0.1.20    ?           256     ?       Joining  ← new node bootstrapping

$ nodetool status  (after bootstrap + cleanup)
UN  10.0.1.10    534.8 GiB   256     16.7%   Normal
UN  10.0.1.11    533.9 GiB   256     16.6%   Normal
UN  10.0.1.12    535.1 GiB   256     16.7%   Normal
UN  10.0.1.13    534.2 GiB   256     16.6%   Normal
UN  10.0.1.20    532.7 GiB   256     16.6%   Normal   ← joined  ✓
UN  10.0.1.21    533.3 GiB   256     16.7%   Normal   ← joined  ✓
Cluster rebalanced. Write load distributed across 6 nodes.
State: UJ (Up/Joining)

During bootstrap, the new node shows UJ — it is up and reachable but still receiving data from existing nodes via the streaming process. The cluster continues to serve all traffic during this period. Bootstrapping on an 800GB node typically takes 1–4 hours depending on network bandwidth. Do not add a second new node until the first has fully joined — two simultaneous bootstraps split the streaming bandwidth and slow both.

nodetool cleanup after bootstrap

After bootstrap, existing nodes still hold data that has been transferred to the new node — they just no longer own those token ranges. nodetool cleanup removes this now-orphaned data, freeing the disk space. Without cleanup, the old nodes remain over-full even though the logical data distribution has rebalanced. Always run cleanup on all existing nodes after adding capacity.

Load drops from 800GB to 535GB per node

Going from 4 to 6 nodes reduces each node's data ownership from 25% to 16.7% — a 33% reduction in per-node load with no application changes, no shard key decisions, and no downtime. This is the horizontal scaling advantage Cassandra was designed for. Each new node also adds a proportional share of CPU, RAM, and network bandwidth to the cluster.

Scaling Comparison — MongoDB vs Cassandra vs DynamoDB

Dimension MongoDB Cassandra DynamoDB
Read scaling Add secondaries, use read preferences Add nodes, all nodes serve reads Auto-scales; DAX cache for hot reads
Write scaling Sharding — requires shard key decision Add nodes — rebalances automatically Auto-scales; provisioned or on-demand
Operational effort Medium — shard key planning required Medium — node ops, compaction tuning Minimal — fully managed by AWS
Scaling ceiling Very high — multi-shard clusters Extremely high — proven at petabyte scale Virtually unlimited (AWS infra)
Worst mistake Wrong shard key — hotspot, can't change Skipping cleanup after node add Hot partition key — one item gets all traffic

Teacher's Note

The engineers I have seen make the most expensive scaling mistakes were not the ones who scaled too late — they were the ones who sharded too early, before they understood their dominant query patterns, and chose a shard key that looked reasonable in testing but created a hotspot under real write traffic. Resharding a 5TB MongoDB collection in production is a multi-day operation with significant risk. Do the boring work first: profile your queries, measure your actual read/write ratio, and exhaust vertical scaling and read replicas before you touch sharding. Sharding is a one-way door. Open it carefully.

Practice Questions — You're the Engineer

Scenario:

You are configuring MongoDB read preferences for a product catalogue API. Product pages can tolerate data being 1–2 seconds stale — the SEO team updates descriptions once a day. You want reads to go to a secondary replica to offload the primary, but if both secondaries go down during a maintenance window, you do not want the entire API to return errors. You need the read preference that uses a secondary when available but falls back to the primary automatically. Which read preference mode should you use?


Scenario:

Your team shards a MongoDB orders collection using created_at as the shard key. Three months after launch, your monitoring shows shard2 handling 94% of all writes while shard0 and shard1 sit almost idle. The write latency on shard2 is 800ms while the other shards are at 4ms. A colleague explains that because created_at is a timestamp that always increases, all new documents land on the rightmost chunk — always on shard2 — while older shards receive no new data. What is the single word used to describe this scaling failure where one shard receives a disproportionate share of traffic?


Scenario:

You add two new nodes to a 4-node Cassandra cluster. The bootstrap completes successfully and nodetool status shows all 6 nodes as UN (Up/Normal) with evenly distributed token ownership. However, your disk usage monitoring shows the original 4 nodes still reporting 800GB each — the same as before the expansion — even though the new nodes are each showing 530GB. A senior engineer tells you there is one more command you must run on each of the original 4 nodes to remove the data they no longer own. What is that command?


Quiz — Scaling in Production

Scenario:

Your MongoDB primary node is at 82% CPU. Query profiling shows your application sends 4,800 reads per second and 200 writes per second — a 96% read workload. Your data model is still changing week-to-week and you cannot afford to lock in a shard key yet. Your VP of Engineering wants the CPU below 40% by end of week with minimal operational risk. What is the correct scaling approach?

Scenario:

Your team is debating two shard key options for a high-write MongoDB events collection: Option A is { timestamp: 1 } and Option B is { userId: "hashed" }. A senior engineer says Option A will create a write hotspot within weeks and recommends Option B. A junior engineer argues both keys have high cardinality so both should distribute evenly. Who is correct, and why?

Scenario:

Your events collection is sharded on { userId: 1, timestamp: 1 } across 8 shards. A new reporting feature runs a query that finds all events of type "purchase" across all users in a date range: db.events.find({ eventType: "purchase", timestamp: { $gte: t } }). In testing this query took 12ms. In production with 8 shards it takes 380ms and your DBA is alarmed. The shard key is not included in the query filter. What is the correct explanation for why this query is disproportionately slow on a sharded cluster?

Up Next · Lesson 38

NoSQL in AWS

DynamoDB, DocumentDB, ElastiCache, and Keyspaces — the managed NoSQL services that let you scale without operating a single node.