Mango DBLesson 34 – Sharding | Dataplexa

Sharding

Replication solves availability and read scalability — but every replica holds a full copy of every document, so a single replica set is limited by the storage and write throughput of one server. When your dataset grows beyond what one machine can handle — terabytes of data, millions of writes per second — you need sharding. Sharding distributes data across multiple replica sets called shards, where each shard owns a subset of the data. MongoDB's query router (the mongos) sits in front of all shards and makes the distribution transparent to the application — your PyMongo code connects to mongos and writes queries exactly as before, while MongoDB handles routing, splitting, and balancing data behind the scenes. This lesson covers the sharded cluster architecture, how MongoDB decides which shard owns which data, how to choose a shard key correctly, and how to query and monitor a sharded cluster.

1. Sharded Cluster Architecture

A MongoDB sharded cluster has three components working together. The shards are replica sets that store the actual data. The config servers are a replica set that stores the cluster metadata — which shard owns which chunk of data. The mongos is a lightweight routing process that receives client queries, consults the config servers, and routes each operation to the correct shard or shards.

# Sharded cluster architecture — connecting and inspecting the cluster

from pymongo import MongoClient

# Connect to mongos — not directly to shards
# Application always talks to mongos, never to individual shards
client = MongoClient("mongodb://mongos1:27017,mongos2:27017/")
db     = client["dataplexa"]

# Check cluster status
shard_status = client.admin.command("listShards")
print("Shards in cluster:")
for shard in shard_status.get("shards", []):
    print(f"  id:    {shard['_id']}")
    print(f"  hosts: {shard['host']}")
    print(f"  state: {'active' if shard.get('state', 1) == 1 else 'draining'}")
    print()

# Check which collections are sharded
print("Sharded collections in dataplexa:")
config_db = client["config"]
for coll in config_db.collections.find({"_id": {"$regex": "^dataplexa\\."}},
                                        {"_id": 1, "key": 1, "unique": 1}):
    print(f"  collection: {coll['_id']}")
    print(f"  shard key:  {coll.get('key', {})}")
    print(f"  unique:     {coll.get('unique', False)}")
    print()

# Architecture overview
print("Sharded cluster components:\n")
components = [
    ("mongos",         "Query router — receives all client connections. Stateless."),
    ("Config servers", "Replica set storing chunk metadata and shard topology."),
    ("Shard",          "Replica set storing a subset of the collection's documents."),
    ("Chunk",          "A contiguous range of shard key values — default 128 MB."),
    ("Balancer",       "Background process that moves chunks to equalise shard sizes."),
]
for comp, desc in components:
    print(f"  {comp:16}  {desc}")
Shards in cluster:
id: shard0
hosts: shard0/shard0a:27017,shard0b:27017,shard0c:27017
state: active

id: shard1
hosts: shard1/shard1a:27017,shard1b:27017,shard1c:27017
state: active

id: shard2
hosts: shard2/shard2a:27017,shard2b:27017,shard2c:27017
state: active

Sharded collections in dataplexa:
collection: dataplexa.orders
shard key: {'user_id': 1}
unique: False

Sharded cluster components:

mongos Query router — receives all client connections. Stateless.
Config servers Replica set storing chunk metadata and shard topology.
Shard Replica set storing a subset of the collection's documents.
Chunk A contiguous range of shard key values — default 128 MB.
Balancer Background process that moves chunks to equalise shard sizes.
  • Applications always connect to mongos — never directly to individual shards. mongos is the single connection point and handles all routing transparently
  • Each shard is itself a replica set — sharding provides horizontal write scalability while replication within each shard provides high availability
  • The config server replica set is critical — without it, mongos cannot route queries. Always deploy config servers as a three-member replica set, never as a standalone

2. Shard Keys — How MongoDB Distributes Data

The shard key is the field (or compound fields) MongoDB uses to decide which shard owns each document. MongoDB divides the possible range of shard key values into contiguous chunks and assigns each chunk to a shard. Choosing the right shard key is the single most important sharding decision — a poor shard key creates hotspots that defeat the purpose of sharding entirely.

# Shard keys — enabling sharding and choosing the right key

from pymongo import MongoClient

client = MongoClient("mongodb://mongos1:27017/")
admin  = client.admin
db     = client["dataplexa"]

# Step 1: enable sharding on the database
admin.command("enableSharding", "dataplexa")
print("Sharding enabled on dataplexa database")

# Step 2: create an index on the shard key field BEFORE sharding the collection
# MongoDB requires an index on the shard key to exist first
db.orders.create_index([("user_id", 1)], name="idx_orders_user_id_shard")

# Step 3: shard the orders collection on user_id
admin.command("shardCollection", "dataplexa.orders",
              key={"user_id": 1})
print("orders collection sharded on user_id\n")

# SHARD KEY QUALITY ANALYSIS
print("Shard key evaluation criteria:\n")
criteria = [
    {
        "criterion": "Cardinality",
        "good":      "High cardinality — many distinct values (user_id, order_id)",
        "bad":       "Low cardinality — few distinct values (status, membership)",
        "reason":    "Low cardinality limits chunk splitting — max chunks = distinct values"
    },
    {
        "criterion": "Write distribution",
        "good":      "Writes spread evenly across all shards",
        "bad":       "Monotonically increasing keys (timestamps, auto-increment IDs)",
        "reason":    "Monotonic keys always insert into the same 'max chunk' — one shard hot"
    },
    {
        "criterion": "Query isolation",
        "good":      "Most queries include the shard key — routed to one shard",
        "bad":       "Queries never include shard key — broadcast to all shards",
        "reason":    "Broadcast queries (scatter-gather) hit every shard — slow at scale"
    },
]
for c in criteria:
    print(f"  {c['criterion']}")
    print(f"  ✓ Good:   {c['good']}")
    print(f"  ✗ Bad:    {c['bad']}")
    print(f"  Why:      {c['reason']}")
    print()

# Shard key options for Dataplexa collections
print("Shard key recommendations for Dataplexa:\n")
recommendations = [
    ("orders",   "user_id",          "High cardinality, queries always include user_id"),
    ("orders",   "hashed _id",       "Even distribution if user_id is not well distributed"),
    ("products", "category + _id",   "Compound key: category isolates queries, _id adds cardinality"),
    ("users",    "hashed _id",       "No natural query pattern — hashed gives even distribution"),
    ("reviews",  "product_id",       "Queries always filter by product — good isolation"),
]
print(f"  {'Collection':10}  {'Shard Key':22}  Reason")
print(f"  {'─'*10}  {'─'*22}  {'─'*45}")
for coll, key, reason in recommendations:
    print(f"  {coll:10}  {key:22}  {reason}")
Sharding enabled on dataplexa database
orders collection sharded on user_id

Shard key evaluation criteria:

Cardinality
✓ Good: High cardinality — many distinct values (user_id, order_id)
✗ Bad: Low cardinality — few distinct values (status, membership)
Why: Low cardinality limits chunk splitting — max chunks = distinct values

Write distribution
✓ Good: Writes spread evenly across all shards
✗ Bad: Monotonically increasing keys (timestamps, auto-increment IDs)
Why: Monotonic keys always insert into the same max chunk — one shard hot

Query isolation
✓ Good: Most queries include the shard key — routed to one shard
✗ Bad: Queries never include shard key — broadcast to all shards
Why: Broadcast queries hit every shard — slow at scale

Shard key recommendations for Dataplexa:

Collection Shard Key Reason
────────── ────────────────────── ─────────────────────────────────────────────
orders user_id High cardinality, queries always include user_id
orders hashed _id Even distribution if user_id is not well distributed
products category + _id Compound: category isolates queries, _id adds cardinality
users hashed _id No natural query pattern — hashed gives even distribution
reviews product_id Queries always filter by product — good isolation
  • The shard key is immutable once set — you cannot change it without resharding the entire collection, which is an expensive operation. Choose carefully before sharding
  • A hashed shard key (key={"_id": "hashed"}) distributes documents by hashing the field value — this guarantees even write distribution but eliminates range query isolation
  • Never shard on a field with low cardinality like status or membership — with only a handful of distinct values you get at most that many chunks, and the balancer cannot spread them evenly

3. Ranged vs Hashed Sharding

MongoDB supports two sharding strategies. Ranged sharding assigns contiguous value ranges to each shard — great for range queries but risky with monotonically increasing keys. Hashed sharding hashes the shard key value before assigning it to a chunk — guarantees even distribution of writes but makes range queries into broadcast operations.

# Ranged vs hashed sharding — comparison and use cases

from pymongo import MongoClient

client = MongoClient("mongodb://mongos1:27017/")
admin  = client.admin
db     = client["dataplexa"]

# RANGED SHARDING — on user_id (string range)
# Chunk 1: user_id >= "u000" and < "u300"  → shard0
# Chunk 2: user_id >= "u300" and < "u700"  → shard1
# Chunk 3: user_id >= "u700" and < MaxKey  → shard2
admin.command("shardCollection", "dataplexa.orders",
              key={"user_id": 1})          # 1 = ranged sharding
print("Ranged sharding on orders.user_id")
print("  Query {user_id: 'u001'} → routed to shard0 only  ✓ targeted")
print("  Query {user_id: {$gte: 'u100', $lte: 'u500'}} → shard0 + shard1 ✓ range")
print("  Query {status: 'delivered'} → broadcast to ALL shards  ✗ scatter-gather")

# HASHED SHARDING — on _id
# MongoDB hashes each _id value — distributes writes evenly
# Chunk 1: hash(id) in range [-inf, -3074...]  → shard0
# Chunk 2: hash(id) in range [-3074..., +3074...] → shard1
# Chunk 3: hash(id) in range [+3074..., +inf]  → shard2
db.users.create_index([("_id", "hashed")], name="idx_users_id_hashed")
admin.command("shardCollection", "dataplexa.users",
              key={"_id": "hashed"})       # "hashed" = hashed sharding
print("\nHashed sharding on users._id")
print("  Insert any user → evenly spread across all shards  ✓ even writes")
print("  Query {_id: 'u001'} → hash('u001') → routed to one shard ✓ targeted")
print("  Query {country: 'UK'} → broadcast to ALL shards  ✗ scatter-gather")
print("  Query {_id: {$gte: 'u001', $lte: 'u005'}} → broadcast  ✗ no range support")

# Decision guide
print("\nRanged vs Hashed — when to use each:\n")
comparison = [
    ("Write pattern",    "Sequential/monotonic keys",    "Random or high-cardinality keys"),
    ("Read pattern",     "Range queries on shard key",   "Point lookups only"),
    ("Distribution",     "Uneven if key is monotonic",   "Always even"),
    ("Hotspot risk",     "High with timestamps/auto-inc", "None — hash randomises"),
    ("Range query",      "Targeted to subset of shards", "Broadcast — hits all shards"),
    ("Use case",         "user_id ranges, product ranges","ObjectIds, UUIDs, random IDs"),
]
print(f"  {'Aspect':20}  {'Ranged':35}  {'Hashed'}")
print(f"  {'─'*20}  {'─'*35}  {'─'*30}")
for aspect, ranged, hashed in comparison:
    print(f"  {aspect:20}  {ranged:35}  {hashed}")
Ranged sharding on orders.user_id
Query {user_id: 'u001'} → routed to shard0 only ✓ targeted
Query {user_id: {$gte: 'u100', $lte: 'u500'}} → shard0 + shard1 ✓ range
Query {status: 'delivered'} → broadcast to ALL shards ✗ scatter-gather

Hashed sharding on users._id
Insert any user → evenly spread across all shards ✓ even writes
Query {_id: 'u001'} → hash('u001') → routed to one shard ✓ targeted
Query {country: 'UK'} → broadcast to ALL shards ✗ scatter-gather
Query {_id: {$gte: 'u001', $lte: 'u005'}} → broadcast ✗ no range support

Ranged vs Hashed — when to use each:

Aspect Ranged Hashed
──────────────────── ─────────────────────────────────── ──────────────────────────────
Write pattern Sequential/monotonic keys Random or high-cardinality keys
Read pattern Range queries on shard key Point lookups only
Distribution Uneven if key is monotonic Always even
Hotspot risk High with timestamps/auto-inc None — hash randomises
Range query Targeted to subset of shards Broadcast — hits all shards
Use case user_id ranges, product ranges ObjectIds, UUIDs, random IDs
  • A hotspot occurs when all writes land on the same shard — typically because the shard key is monotonically increasing (timestamps, sequential IDs) and all new documents get the highest key value, which always maps to the same chunk
  • Hashed sharding eliminates hotspots completely but sacrifices range query efficiency — a range query on a hashed field must broadcast to all shards because adjacent hash values land on different shards
  • A compound shard key like {category: 1, _id: 1} can give you both query isolation (category routes most queries to a subset of shards) and sufficient cardinality (_id ensures enough unique values for chunk splitting)

4. Chunks, Splitting, and Balancing

MongoDB divides each sharded collection into chunks — contiguous ranges of shard key values, each stored entirely on one shard. As data grows, chunks that exceed the 128 MB threshold are automatically split into two smaller chunks. The balancer runs in the background and migrates chunks between shards to keep the distribution even. Understanding chunks helps you interpret sharding health and diagnose distribution problems.

# Chunks, splitting, and balancing — monitoring distribution health

from pymongo import MongoClient

client = MongoClient("mongodb://mongos1:27017/")
config = client["config"]

# List chunks for the orders collection
print("Chunk distribution for dataplexa.orders:\n")
chunks = list(config.chunks.find(
    {"ns": "dataplexa.orders"},
    {"min": 1, "max": 1, "shard": 1, "jumbo": 1}
).sort("min", 1))

shard_counts = {}
for chunk in chunks:
    shard = chunk["shard"]
    shard_counts[shard] = shard_counts.get(shard, 0) + 1
    jumbo = " [JUMBO]" if chunk.get("jumbo") else ""
    print(f"  shard: {shard:8}  "
          f"min: {str(chunk['min']):20}  "
          f"max: {str(chunk['max']):20}{jumbo}")

print(f"\nChunk count per shard:")
for shard, count in sorted(shard_counts.items()):
    bar = "█" * count
    print(f"  {shard:10}  {bar:30}  {count} chunks")

# Check balancer status
balancer_status = client.admin.command("balancerStatus")
print(f"\nBalancer status:")
print(f"  mode:    {balancer_status.get('mode', 'unknown')}")
print(f"  running: {balancer_status.get('balancerCompliant', False)}")

# Jumbo chunks — chunks too large to split (bad)
jumbo_chunks = list(config.chunks.find(
    {"ns": "dataplexa.orders", "jumbo": True}
))
if jumbo_chunks:
    print(f"\n⚠ Jumbo chunks detected: {len(jumbo_chunks)}")
    print("  Jumbo chunks cannot be migrated — shard imbalance will persist")
    print("  Fix: clearJumboFlag or improve shard key cardinality")
else:
    print("\n✓ No jumbo chunks — distribution is healthy")

# Collection sharding stats
stats = client["dataplexa"].command("collStats", "orders")
print(f"\nOrders collection sharding stats:")
print(f"  sharded:    {stats.get('sharded', False)}")
print(f"  nchunks:    {stats.get('nchunks', 'N/A')}")
Chunk distribution for dataplexa.orders:

shard: shard0 min: {'user_id': MinKey} max: {'user_id': 'u200'}
shard: shard0 min: {'user_id': 'u200'} max: {'user_id': 'u500'}
shard: shard1 min: {'user_id': 'u500'} max: {'user_id': 'u750'}
shard: shard1 min: {'user_id': 'u750'} max: {'user_id': 'u900'}
shard: shard2 min: {'user_id': 'u900'} max: {'user_id': MaxKey}

Chunk count per shard:
shard0 ██████████████████████████████ 2 chunks
shard1 ██████████████████████████████ 2 chunks
shard2 ███████████████ 1 chunks

Balancer status:
mode: full
running: True

✓ No jumbo chunks — distribution is healthy

Orders collection sharding stats:
sharded: True
nchunks: 5
  • A jumbo chunk is a chunk that exceeds the maximum size but cannot be split because all documents within it share the same shard key value — the definitive sign of a low-cardinality shard key problem
  • The balancer runs during a configurable maintenance window by default — if you need to prevent migrations during peak hours, set a balancer window in the config server
  • Uneven chunk counts across shards are normal during rapid growth — the balancer continuously migrates chunks in the background to restore balance, but this takes time

5. Querying a Sharded Cluster — Targeted vs Broadcast

When mongos receives a query, it inspects the filter to determine if it includes the shard key. If yes, it routes the query to only the relevant shard — a targeted query. If no, it broadcasts the query to every shard and merges the results — a scatter-gather query. Targeted queries are fast and scale linearly with more shards; scatter-gather queries get slower as you add shards.

# Targeted vs scatter-gather queries — explain on a sharded cluster

from pymongo import MongoClient

client = MongoClient("mongodb://mongos1:27017/")
db     = client["dataplexa"]

def explain_sharded(cursor, label: str):
    """Show shard routing for a query on a sharded collection."""
    plan  = cursor.explain("executionStats")
    shards_used = plan.get("queryPlanner", {}).get("winningPlan", {})

    # In sharded explain, shards list shows which shards were contacted
    shards_section = plan.get("queryPlanner", {}).get(
        "winningPlan", {}
    ).get("shards", [])

    shard_names = [s.get("shardName", "?") for s in shards_section]
    stats       = plan.get("executionStats", {})

    print(f"\n  {label}")
    print(f"  Shards contacted: {shard_names if shard_names else ['all — scatter-gather']}")
    print(f"  Docs examined:    {stats.get('totalDocsExamined', '?')}")
    print(f"  Docs returned:    {stats.get('nReturned', '?')}")
    query_type = "✓ TARGETED" if len(shard_names) == 1 else "✗ SCATTER-GATHER"
    print(f"  Query type:       {query_type}")

print("Sharded query routing analysis:")

# TARGETED — includes shard key (user_id)
explain_sharded(
    db.orders.find({"user_id": "u001"}),
    "Orders for user u001 (shard key in filter)"
)

# TARGETED — shard key range
explain_sharded(
    db.orders.find({"user_id": {"$in": ["u001", "u002", "u003"]}}),
    "Orders for u001/u002/u003 ($in on shard key)"
)

# SCATTER-GATHER — no shard key
explain_sharded(
    db.orders.find({"status": "delivered"}),
    "Delivered orders (no shard key — status only)"
)

# SCATTER-GATHER — aggregation without shard key in $match
explain_sharded(
    db.orders.aggregate([
        {"$match":  {"status": "delivered"}},
        {"$group":  {"_id": "$user_id", "total": {"$sum": "$total"}}}
    ]),
    "Revenue by user — $match on status (no shard key)"
)

# TARGETED aggregation — include shard key in $match
explain_sharded(
    db.orders.aggregate([
        {"$match":  {"user_id": "u001", "status": "delivered"}},
        {"$group":  {"_id": "$status", "total": {"$sum": "$total"}}}
    ]),
    "u001 delivered revenue — $match includes shard key"
)
Sharded query routing analysis:

Orders for user u001 (shard key in filter)
Shards contacted: ['shard0']
Docs examined: 2
Docs returned: 2
Query type: ✓ TARGETED

Orders for u001/u002/u003 ($in on shard key)
Shards contacted: ['shard0']
Docs examined: 4
Docs returned: 4
Query type: ✓ TARGETED

Delivered orders (no shard key — status only)
Shards contacted: ['all — scatter-gather']
Docs examined: 7
Docs returned: 4
Query type: ✗ SCATTER-GATHER

Revenue by user — $match on status (no shard key)
Shards contacted: ['all — scatter-gather']
Docs examined: 7
Docs returned: 5
Query type: ✗ SCATTER-GATHER

u001 delivered revenue — $match includes shard key
Shards contacted: ['shard0']
Docs examined: 2
Docs returned: 1
Query type: ✓ TARGETED
  • Always include the shard key in the $match stage of aggregation pipelines on sharded collections — without it, mongos broadcasts the entire pipeline to every shard
  • $in on the shard key is a targeted query — mongos hashes or ranges each value and routes to only the relevant shards, even if multiple shards are involved
  • Scatter-gather queries are not always avoidable — administrative queries, cross-shard reports, and analytics often require all shards. They become a problem only when they are the majority of your traffic

Summary Table

Concept What It Does Key Rule
mongos Query router — single client connection point Always connect to mongos, never directly to shards
Shard key Field used to assign documents to shards Immutable — choose carefully before sharding
Ranged sharding Assigns value ranges to shards Good for range queries — avoid monotonic keys
Hashed sharding Hashes key value for even distribution Eliminates hotspots — range queries become broadcast
Chunk 128 MB range of shard key values on one shard Jumbo chunks signal low shard key cardinality
Balancer Migrates chunks to equalise shard sizes Set a maintenance window to avoid peak-hour migrations
Targeted query Routed to one shard — shard key in filter Always include shard key in high-frequency queries
Scatter-gather Broadcast to all shards — no shard key in filter Gets slower as shards are added — minimise for hot paths

Practice Questions

Practice 1. Why is choosing a monotonically increasing field like a timestamp a poor shard key choice?



Practice 2. What is a jumbo chunk and why is it a problem?



Practice 3. What is the difference between a targeted query and a scatter-gather query in a sharded cluster?



Practice 4. What must you do before running the shardCollection command on a collection?



Practice 5. When would you choose hashed sharding over ranged sharding for the orders collection?



Quiz

Quiz 1. Which component of a sharded cluster do application clients always connect to?






Quiz 2. What happens to a shard key after a collection has been sharded?






Quiz 3. Which sharding strategy guarantees even write distribution but makes range queries broadcast operations?






Quiz 4. What is the default chunk size in MongoDB sharding?






Quiz 5. Why does sharding status (sharded: false) on a collection not prevent it from being in a sharded cluster?






Next up — Backup & Restore: Protecting your data with mongodump, mongorestore, Atlas snapshots, and point-in-time recovery strategies.