MongoDB
Aggregation Pipelines
Lesson 27 introduced the four core stages — $match, $group, $project, and $sort. This lesson goes deeper into the full power of the aggregation framework. Real reporting workflows require joining collections with $lookup, flattening arrays with $unwind, adding computed fields mid-pipeline with $addFields, running multiple parallel aggregations with $facet, and paginating results with $skip and $limit. By the end of this lesson you will be able to write production-grade pipelines that replace entire layers of application-side data processing with a single server-side query.
$lookup — Joining Collections
$lookup performs a left outer join between the current collection and a foreign collection. For every document in the pipeline, it finds all matching documents in the foreign collection and attaches them as an array field. It is the aggregation equivalent of a SQL LEFT JOIN and the correct way to combine referenced data in one server round trip.
# $lookup — joining orders with their user details
from pymongo import MongoClient
client = MongoClient("mongodb://localhost:27017/")
db = client["dataplexa"]
# Join orders → users to get the user's name alongside each order
pipeline = [
# Stage 1: only look at delivered orders
{"$match": {"status": "delivered"}},
# Stage 2: join each order with its user document
{"$lookup": {
"from": "users", # foreign collection name
"localField": "user_id", # field in orders
"foreignField": "_id", # field in users
"as": "user_info" # name for the joined array
}},
# Stage 3: flatten the single-element user_info array into a sub-document
{"$unwind": "$user_info"},
# Stage 4: project only the fields we need in the final output
{"$project": {
"order_id": "$_id",
"customer": "$user_info.name",
"membership": "$user_info.membership",
"total": 1,
"date": 1,
"_id": 0
}},
# Stage 5: sort by total descending
{"$sort": {"total": -1}}
]
results = list(db.orders.aggregate(pipeline))
print("Delivered orders with customer name ($lookup):\n")
print(f" {'Order':6} {'Customer':15} {'Tier':8} {'Total':>8} {'Date'}")
print(f" {'─'*6} {'─'*15} {'─'*8} {'─'*8} {'─'*10}")
for r in results:
print(f" {r['order_id']:6} {r['customer']:15} "
f"{r['membership']:8} ${r['total']:>7.2f} {r['date']}")Order Customer Tier Total Date
────── ─────────────── ──────── ──────── ──────────
o005 Eva Müller premium $ 329.98 2024-03-10
o003 Alice Johnson premium $ 99.98 2024-02-20
o001 Alice Johnson premium $ 44.96 2024-01-10
o007 Bob Smith basic $ 11.97 2024-04-01
$lookupalways produces an array — even when there is exactly one match. Use$unwindimmediately after to flatten it into a flat sub-document- Always index the foreign collection's join field — without an index on
users._id, each lookup performs a full users collection scan $lookupis a left outer join — if no matching document exists in the foreign collection, the output array is empty and$unwind(by default) drops that document from the pipeline
$unwind — Flattening Arrays
$unwind deconstructs an array field, outputting one document per array element. The rest of the document is duplicated for each element. It is essential after $lookup (to flatten the joined array) and also for analysing embedded array fields like items in the orders collection.
# $unwind — flattening embedded arrays for per-element analysis
from pymongo import MongoClient
client = MongoClient("mongodb://localhost:27017/")
db = client["dataplexa"]
# Unwind the items array inside orders — one document per line item
pipeline = [
{"$match": {"_id": {"$in": ["o001", "o003"]}}}, # two orders
# Before $unwind: one document per order with an items array
# After $unwind: one document per item within each order
{"$unwind": "$items"},
{"$project": {
"order_id": "$_id",
"product": "$items.product_id",
"qty": "$items.qty",
"line_price": {"$round": [
{"$multiply": ["$items.qty", "$items.price"]}, 2
]},
"_id": 0
}},
{"$sort": {"order_id": 1, "product": 1}}
]
results = list(db.orders.aggregate(pipeline))
print("Order line items after $unwind:\n")
print(f" {'Order':6} {'Product':8} {'Qty':4} {'Line Total'}")
print(f" {'─'*6} {'─'*8} {'─'*4} {'─'*10}")
for r in results:
print(f" {r['order_id']:6} {r['product']:8} {r['qty']:4} ${r['line_price']:.2f}")
# $unwind with preserveNullAndEmptyArrays — keep documents with empty arrays
print("\n$unwind options:")
print(" preserveNullAndEmptyArrays: True → keeps docs with empty/null arrays")
print(" includeArrayIndex: 'itemIndex' → adds the array position as a field")
# Demonstrate includeArrayIndex
pipeline2 = [
{"$match": {"_id": "o005"}},
{"$unwind": {"path": "$items", "includeArrayIndex": "itemPos"}},
{"$project": {"product": "$items.product_id", "itemPos": 1, "_id": 0}}
]
for r in db.orders.aggregate(pipeline2):
print(f" position {r['itemPos']}: {r['product']}")Order Product Qty Line Total
────── ──────── ──── ──────────
o001 p001 1 $29.99
o001 p003 3 $14.97
o003 p001 2 $59.98
o003 p005 1 $49.99
$unwind options:
preserveNullAndEmptyArrays: True → keeps docs with empty/null arrays
includeArrayIndex: 'itemIndex' → adds the array position as a field
position 0: p007
position 1: p001
- After
$unwind, every previously-array field becomes a scalar — the document is effectively "exploded" into N copies, one per element - By default
$unwindsilently drops documents with missing, null, or empty array fields — usepreserveNullAndEmptyArrays: Trueto retain them - Combining
$unwindwith$groupis the standard pattern for aggregating across embedded array elements — unwind first, then group and accumulate
$addFields — Adding Computed Fields Without Reshaping
$addFields adds new computed fields to documents while keeping all existing fields intact. It is the right choice when you want to enrich a document mid-pipeline without writing a full $project that explicitly lists every field you want to retain. Think of it as "add these fields, touch nothing else".
# $addFields — adding computed fields while keeping everything else
from pymongo import MongoClient
client = MongoClient("mongodb://localhost:27017/")
db = client["dataplexa"]
# Add a discounted_price and a stock_status field to products
pipeline = [
{"$addFields": {
# 15% discount, rounded to 2 decimal places
"discounted_price": {
"$round": [{"$multiply": ["$price", 0.85]}, 2]
},
# Categorical stock status derived from the stock field
"stock_status": {
"$switch": {
"branches": [
{"case": {"$lte": ["$stock", 5]}, "then": "Low"},
{"case": {"$lte": ["$stock", 20]}, "then": "Medium"},
],
"default": "High"
}
},
# savings amount
"saving": {"$round": [{"$multiply": ["$price", 0.15]}, 2]}
}},
{"$project": {
"name": 1, "price": 1, "discounted_price": 1,
"saving": 1, "stock": 1, "stock_status": 1, "_id": 0
}},
{"$sort": {"price": DESCENDING := -1}}
]
from pymongo import DESCENDING
pipeline[-1] = {"$sort": {"price": -1}}
results = list(db.products.aggregate(pipeline))
print("Products with computed discount and stock status ($addFields):\n")
print(f" {'Name':25} {'Price':>7} {'Sale':>7} {'Save':>6} {'Stock':5} Status")
print(f" {'─'*25} {'─'*7} {'─'*7} {'─'*6} {'─'*5} {'─'*6}")
for p in results:
print(f" {p['name']:25} ${p['price']:>6.2f} "
f"${p['discounted_price']:>6.2f} "
f"${p['saving']:>5.2f} "
f"{p['stock']:5} {p['stock_status']}")Name Price Sale Save Stock Status
───────────────────────── ─────── ─────── ────── ───── ──────
Standing Desk $349.99 $297.49 $52.50 12 Medium
Monitor 27-inch $299.99 $254.99 $45.00 8 Medium
Mechanical Keyboard $ 89.99 $ 76.49 $13.50 35 High
USB-C Hub $ 49.99 $ 42.49 $ 7.50 50 High
Wireless Mouse $ 29.99 $ 25.49 $ 4.50 42 High
Notebook A5 $ 4.99 $ 4.24 $ 0.75 5 Low
Ballpoint Pens 10-pack $ 3.49 $ 2.97 $ 0.52 120 High
$addFieldsis equivalent to a$projectthat includes all existing fields plus the new ones — use it when you want enrichment without listing every retained field$switchis the aggregation equivalent of a CASE expression — it evaluates branches in order and returns the first matching value, or thedefaultif none match- Computed fields added by
$addFieldscan be referenced by name in all subsequent pipeline stages
$skip and $limit — Pagination in a Pipeline
$skip discards the first N documents in the pipeline and $limit caps the output to N documents. Together they implement offset-based pagination inside an aggregation pipeline — the same pattern as cursor skip() and limit() but composable within a larger multi-stage pipeline.
# $skip and $limit — paginating aggregation results
from pymongo import MongoClient
client = MongoClient("mongodb://localhost:27017/")
db = client["dataplexa"]
PAGE_SIZE = 3
def get_products_page(page: int):
"""Fetch one page of products sorted by price ascending."""
pipeline = [
{"$sort": {"price": 1}},
{"$skip": (page - 1) * PAGE_SIZE}, # skip pages before this one
{"$limit": PAGE_SIZE}, # return only this page
{"$project": {"name": 1, "price": 1, "category": 1, "_id": 0}}
]
return list(db.products.aggregate(pipeline))
# Page 1
print("Page 1 (products 1–3 by price):")
for p in get_products_page(1):
print(f" ${p['price']:>7.2f} {p['name']} ({p['category']})")
# Page 2
print("\nPage 2 (products 4–6 by price):")
for p in get_products_page(2):
print(f" ${p['price']:>7.2f} {p['name']} ({p['category']})")
# Page 3
print("\nPage 3 (product 7 by price):")
for p in get_products_page(3):
print(f" ${p['price']:>7.2f} {p['name']} ({p['category']})")
# Total count alongside paginated results — common API pattern
count_pipeline = [{"$count": "total"}]
total = list(db.products.aggregate(count_pipeline))[0]["total"]
import math
print(f"\nTotal products: {total} | Page size: {PAGE_SIZE} "
f"| Total pages: {math.ceil(total / PAGE_SIZE)}")$ 3.49 Ballpoint Pens 10-pack (Stationery)
$ 4.99 Notebook A5 (Stationery)
$ 29.99 Wireless Mouse (Electronics)
Page 2 (products 4–6 by price):
$ 49.99 USB-C Hub (Electronics)
$ 89.99 Mechanical Keyboard (Electronics)
$299.99 Monitor 27-inch (Electronics)
Page 3 (product 7 by price):
$349.99 Standing Desk (Furniture)
Total products: 7 | Page size: 3 | Total pages: 3
- The
$sort → $skip → $limitpattern must always sort first — without a stable sort, page boundaries are non-deterministic and results overlap between pages $countis a single-stage pipeline that returns one document with the total number of documents reaching that stage — use it in a separate pipeline call for the total page count- The same deep-page performance problem that affects cursor
skip()applies here — for large offsets, prefer keyset pagination using a range filter on the sort field
$facet — Multiple Aggregations in One Pass
$facet runs multiple independent sub-pipelines on the same input documents simultaneously, returning all results in a single document. It is the aggregation framework's answer to the common need for "give me the filtered results AND the counts AND the price ranges in one query" — the kind of data that powers search results pages with sidebar filters.
# $facet — multiple parallel sub-pipelines in one query
from pymongo import MongoClient
client = MongoClient("mongodb://localhost:27017/")
db = client["dataplexa"]
# A product search page needs simultaneously:
# 1. The paginated product list
# 2. A count breakdown by category (for sidebar filters)
# 3. Price range stats (for the price slider)
pipeline = [
# Filter to Electronics first (simulating a search/filter)
{"$match": {"category": "Electronics"}},
# Run three independent sub-pipelines on the matched documents
{"$facet": {
# Sub-pipeline 1: paginated product list (page 1, 3 per page)
"products": [
{"$sort": {"rating": -1}},
{"$limit": 3},
{"$project": {"name": 1, "price": 1, "rating": 1, "_id": 0}}
],
# Sub-pipeline 2: total count of matched products
"total_count": [
{"$count": "count"}
],
# Sub-pipeline 3: price distribution stats
"price_stats": [
{"$group": {
"_id": None,
"min_price": {"$min": "$price"},
"max_price": {"$max": "$price"},
"avg_price": {"$avg": "$price"},
}},
{"$project": {"_id": 0,
"min_price": 1,
"max_price": 1,
"avg_price": {"$round": ["$avg_price", 2]}}}
],
}}
]
result = list(db.products.aggregate(pipeline))[0]
print("$facet result — one query, three outputs:\n")
print("Products (page 1, sorted by rating):")
for p in result["products"]:
print(f" {p['name']:25} rating: {p['rating']} ${p['price']:.2f}")
total = result["total_count"][0]["count"] if result["total_count"] else 0
print(f"\nTotal Electronics products: {total}")
stats = result["price_stats"][0] if result["price_stats"] else {}
print(f"\nPrice stats:")
print(f" min: ${stats.get('min_price', 0):.2f}")
print(f" max: ${stats.get('max_price', 0):.2f}")
print(f" avg: ${stats.get('avg_price', 0):.2f}")Products (page 1, sorted by rating):
Mechanical Keyboard rating: 4.7 $89.99
Monitor 27-inch rating: 4.6 $299.99
Wireless Mouse rating: 4.5 $29.99
Total Electronics products: 4
Price stats:
min: $29.99
max: $299.99
avg: $117.49
$facetalways returns exactly one document containing the results of each sub-pipeline as a named array — even if a sub-pipeline returns no documents, its array is empty rather than absent- Sub-pipelines inside
$facetcannot include$facet,$out, or$geoNear— all other stages are permitted - The input documents are shared across all sub-pipelines —
$facetdoes not re-query the collection, it fans out a single input set
A Full Reporting Pipeline
Combining every stage learned so far, this pipeline produces a complete order summary report — joining collections, unwinding arrays, grouping, adding computed fields, and sorting — in one server-side query.
# Full reporting pipeline — product revenue report across all orders
from pymongo import MongoClient
client = MongoClient("mongodb://localhost:27017/")
db = client["dataplexa"]
pipeline = [
# Stage 1: only delivered orders
{"$match": {"status": "delivered"}},
# Stage 2: explode the items array — one document per line item
{"$unwind": "$items"},
# Stage 3: join each line item's product_id to the products collection
{"$lookup": {
"from": "products",
"localField": "items.product_id",
"foreignField": "_id",
"as": "product_info"
}},
{"$unwind": "$product_info"},
# Stage 4: group by product to compute sales metrics
{"$group": {
"_id": "$items.product_id",
"product_name": {"$first": "$product_info.name"},
"category": {"$first": "$product_info.category"},
"units_sold": {"$sum": "$items.qty"},
"total_revenue": {"$sum": {
"$multiply": ["$items.qty", "$items.price"]
}},
"order_count": {"$sum": 1},
}},
# Stage 5: add average revenue per order
{"$addFields": {
"avg_per_order": {
"$round": [{"$divide": ["$total_revenue", "$order_count"]}, 2]
},
"total_revenue": {"$round": ["$total_revenue", 2]}
}},
# Stage 6: sort by revenue descending
{"$sort": {"total_revenue": -1}},
# Stage 7: shape the final output
{"$project": {
"_id": 0,
"product": "$product_name",
"category": 1,
"units_sold": 1,
"total_revenue": 1,
"avg_per_order": 1,
}}
]
results = list(db.orders.aggregate(pipeline))
print("Product revenue report — delivered orders:\n")
print(f" {'Product':25} {'Cat':12} {'Units':5} {'Revenue':>10} {'Avg/Order':>9}")
print(f" {'─'*25} {'─'*12} {'─'*5} {'─'*10} {'─'*9}")
for r in results:
print(f" {r['product']:25} {r['category']:12} "
f"{r['units_sold']:5} ${r['total_revenue']:>9.2f} "
f"${r['avg_per_order']:>8.2f}")Product Cat Units Revenue Avg/Order
───────────────────────── ──────────── ───── ────────── ─────────
Monitor 27-inch Electronics 1 $ 269.99 $ 269.99
Standing Desk Furniture 1 $ 249.99 $ 249.99
Wireless Mouse Electronics 3 $ 89.97 $ 44.99
Mechanical Keyboard Electronics 1 $ 80.99 $ 80.99
USB-C Hub Electronics 1 $ 49.99 $ 49.99
Notebook A5 Stationery 5 $ 24.95 $ 4.99
Ballpoint Pens 10-pack Stationery 5 $ 17.45 $ 3.49
- This seven-stage pipeline replaces what would otherwise be multiple queries, multiple Python loops, and manual joins — all running on the server with no intermediate data transferred
- The double
$unwind+$lookuppattern (unwind items, then join on product_id) is the standard approach for aggregating across embedded arrays that reference another collection - Always test complex pipelines one stage at a time — add stages incrementally and inspect intermediate output to catch shape or field-name issues early
Summary Table
| Stage | What It Does | Key Option | SQL Equivalent |
|---|---|---|---|
$lookup |
Joins a foreign collection | from, localField, foreignField, as |
LEFT JOIN |
$unwind |
Explodes array into one doc per element | preserveNullAndEmptyArrays, includeArrayIndex |
CROSS APPLY / UNNEST |
$addFields |
Adds computed fields, keeps all existing | $switch, $multiply, $round |
SELECT *, expr AS newcol |
$skip |
Discards first N documents | Integer N | OFFSET |
$limit |
Caps output to N documents | Integer N | FETCH NEXT N ROWS |
$facet |
Runs multiple sub-pipelines in parallel | Named sub-pipeline arrays | Multiple CTEs in one query |
$count |
Returns total document count at that stage | Output field name string | COUNT(*) |
Practice Questions
Practice 1. Why must $unwind almost always follow immediately after $lookup?
Practice 2. What is the difference between $addFields and $project when adding a computed field?
Practice 3. Write the $skip and $limit stages to return page 3 with a page size of 10.
Practice 4. What does $facet return and how many documents does it produce regardless of the number of sub-pipelines?
Practice 5. In the full reporting pipeline, why is $unwind applied before $lookup on items.product_id?
Quiz
Quiz 1. What type of join does $lookup perform by default?
Quiz 2. What happens to documents with a null or empty array field when $unwind is applied without any options?
Quiz 3. Which aggregation stage is best suited for powering a search results page that needs product listings, category counts, and price range stats all in one query?
Quiz 4. Which stages are NOT permitted inside a $facet sub-pipeline?
Quiz 5. In a $sort → $skip → $limit pagination pipeline, why must $sort always come first?
Next up — Group, Match & Project: A focused deep-dive into the three most-used aggregation stages with advanced expressions, conditional logic, and real reporting patterns.