Mango DBLesson 28 – Aggregation Pipelines | Dataplexa

Aggregation Pipelines

Lesson 27 introduced the four core stages — $match, $group, $project, and $sort. This lesson goes deeper into the full power of the aggregation framework. Real reporting workflows require joining collections with $lookup, flattening arrays with $unwind, adding computed fields mid-pipeline with $addFields, running multiple parallel aggregations with $facet, and paginating results with $skip and $limit. By the end of this lesson you will be able to write production-grade pipelines that replace entire layers of application-side data processing with a single server-side query.

$lookup — Joining Collections

$lookup performs a left outer join between the current collection and a foreign collection. For every document in the pipeline, it finds all matching documents in the foreign collection and attaches them as an array field. It is the aggregation equivalent of a SQL LEFT JOIN and the correct way to combine referenced data in one server round trip.

# $lookup — joining orders with their user details

from pymongo import MongoClient

client = MongoClient("mongodb://localhost:27017/")
db     = client["dataplexa"]

# Join orders → users to get the user's name alongside each order
pipeline = [
    # Stage 1: only look at delivered orders
    {"$match": {"status": "delivered"}},

    # Stage 2: join each order with its user document
    {"$lookup": {
        "from":         "users",     # foreign collection name
        "localField":   "user_id",   # field in orders
        "foreignField": "_id",       # field in users
        "as":           "user_info"  # name for the joined array
    }},

    # Stage 3: flatten the single-element user_info array into a sub-document
    {"$unwind": "$user_info"},

    # Stage 4: project only the fields we need in the final output
    {"$project": {
        "order_id":    "$_id",
        "customer":    "$user_info.name",
        "membership":  "$user_info.membership",
        "total":       1,
        "date":        1,
        "_id":         0
    }},

    # Stage 5: sort by total descending
    {"$sort": {"total": -1}}
]

results = list(db.orders.aggregate(pipeline))
print("Delivered orders with customer name ($lookup):\n")
print(f"  {'Order':6}  {'Customer':15}  {'Tier':8}  {'Total':>8}  {'Date'}")
print(f"  {'─'*6}  {'─'*15}  {'─'*8}  {'─'*8}  {'─'*10}")
for r in results:
    print(f"  {r['order_id']:6}  {r['customer']:15}  "
          f"{r['membership']:8}  ${r['total']:>7.2f}  {r['date']}")
Delivered orders with customer name ($lookup):

Order Customer Tier Total Date
────── ─────────────── ──────── ──────── ──────────
o005 Eva Müller premium $ 329.98 2024-03-10
o003 Alice Johnson premium $ 99.98 2024-02-20
o001 Alice Johnson premium $ 44.96 2024-01-10
o007 Bob Smith basic $ 11.97 2024-04-01
  • $lookup always produces an array — even when there is exactly one match. Use $unwind immediately after to flatten it into a flat sub-document
  • Always index the foreign collection's join field — without an index on users._id, each lookup performs a full users collection scan
  • $lookup is a left outer join — if no matching document exists in the foreign collection, the output array is empty and $unwind (by default) drops that document from the pipeline

$unwind — Flattening Arrays

$unwind deconstructs an array field, outputting one document per array element. The rest of the document is duplicated for each element. It is essential after $lookup (to flatten the joined array) and also for analysing embedded array fields like items in the orders collection.

# $unwind — flattening embedded arrays for per-element analysis

from pymongo import MongoClient

client = MongoClient("mongodb://localhost:27017/")
db     = client["dataplexa"]

# Unwind the items array inside orders — one document per line item
pipeline = [
    {"$match": {"_id": {"$in": ["o001", "o003"]}}},   # two orders

    # Before $unwind: one document per order with an items array
    # After $unwind:  one document per item within each order
    {"$unwind": "$items"},

    {"$project": {
        "order_id":  "$_id",
        "product":   "$items.product_id",
        "qty":       "$items.qty",
        "line_price": {"$round": [
            {"$multiply": ["$items.qty", "$items.price"]}, 2
        ]},
        "_id": 0
    }},
    {"$sort": {"order_id": 1, "product": 1}}
]

results = list(db.orders.aggregate(pipeline))
print("Order line items after $unwind:\n")
print(f"  {'Order':6}  {'Product':8}  {'Qty':4}  {'Line Total'}")
print(f"  {'─'*6}  {'─'*8}  {'─'*4}  {'─'*10}")
for r in results:
    print(f"  {r['order_id']:6}  {r['product']:8}  {r['qty']:4}  ${r['line_price']:.2f}")

# $unwind with preserveNullAndEmptyArrays — keep documents with empty arrays
print("\n$unwind options:")
print("  preserveNullAndEmptyArrays: True  → keeps docs with empty/null arrays")
print("  includeArrayIndex: 'itemIndex'    → adds the array position as a field")

# Demonstrate includeArrayIndex
pipeline2 = [
    {"$match": {"_id": "o005"}},
    {"$unwind": {"path": "$items", "includeArrayIndex": "itemPos"}},
    {"$project": {"product": "$items.product_id", "itemPos": 1, "_id": 0}}
]
for r in db.orders.aggregate(pipeline2):
    print(f"  position {r['itemPos']}: {r['product']}")
Order line items after $unwind:

Order Product Qty Line Total
────── ──────── ──── ──────────
o001 p001 1 $29.99
o001 p003 3 $14.97
o003 p001 2 $59.98
o003 p005 1 $49.99

$unwind options:
preserveNullAndEmptyArrays: True → keeps docs with empty/null arrays
includeArrayIndex: 'itemIndex' → adds the array position as a field

position 0: p007
position 1: p001
  • After $unwind, every previously-array field becomes a scalar — the document is effectively "exploded" into N copies, one per element
  • By default $unwind silently drops documents with missing, null, or empty array fields — use preserveNullAndEmptyArrays: True to retain them
  • Combining $unwind with $group is the standard pattern for aggregating across embedded array elements — unwind first, then group and accumulate

$addFields — Adding Computed Fields Without Reshaping

$addFields adds new computed fields to documents while keeping all existing fields intact. It is the right choice when you want to enrich a document mid-pipeline without writing a full $project that explicitly lists every field you want to retain. Think of it as "add these fields, touch nothing else".

# $addFields — adding computed fields while keeping everything else

from pymongo import MongoClient

client = MongoClient("mongodb://localhost:27017/")
db     = client["dataplexa"]

# Add a discounted_price and a stock_status field to products
pipeline = [
    {"$addFields": {
        # 15% discount, rounded to 2 decimal places
        "discounted_price": {
            "$round": [{"$multiply": ["$price", 0.85]}, 2]
        },
        # Categorical stock status derived from the stock field
        "stock_status": {
            "$switch": {
                "branches": [
                    {"case": {"$lte": ["$stock", 5]},  "then": "Low"},
                    {"case": {"$lte": ["$stock", 20]}, "then": "Medium"},
                ],
                "default": "High"
            }
        },
        # savings amount
        "saving": {"$round": [{"$multiply": ["$price", 0.15]}, 2]}
    }},
    {"$project": {
        "name": 1, "price": 1, "discounted_price": 1,
        "saving": 1, "stock": 1, "stock_status": 1, "_id": 0
    }},
    {"$sort": {"price": DESCENDING := -1}}
]

from pymongo import DESCENDING
pipeline[-1] = {"$sort": {"price": -1}}

results = list(db.products.aggregate(pipeline))
print("Products with computed discount and stock status ($addFields):\n")
print(f"  {'Name':25}  {'Price':>7}  {'Sale':>7}  {'Save':>6}  {'Stock':5}  Status")
print(f"  {'─'*25}  {'─'*7}  {'─'*7}  {'─'*6}  {'─'*5}  {'─'*6}")
for p in results:
    print(f"  {p['name']:25}  ${p['price']:>6.2f}  "
          f"${p['discounted_price']:>6.2f}  "
          f"${p['saving']:>5.2f}  "
          f"{p['stock']:5}  {p['stock_status']}")
Products with computed discount and stock status ($addFields):

Name Price Sale Save Stock Status
───────────────────────── ─────── ─────── ────── ───── ──────
Standing Desk $349.99 $297.49 $52.50 12 Medium
Monitor 27-inch $299.99 $254.99 $45.00 8 Medium
Mechanical Keyboard $ 89.99 $ 76.49 $13.50 35 High
USB-C Hub $ 49.99 $ 42.49 $ 7.50 50 High
Wireless Mouse $ 29.99 $ 25.49 $ 4.50 42 High
Notebook A5 $ 4.99 $ 4.24 $ 0.75 5 Low
Ballpoint Pens 10-pack $ 3.49 $ 2.97 $ 0.52 120 High
  • $addFields is equivalent to a $project that includes all existing fields plus the new ones — use it when you want enrichment without listing every retained field
  • $switch is the aggregation equivalent of a CASE expression — it evaluates branches in order and returns the first matching value, or the default if none match
  • Computed fields added by $addFields can be referenced by name in all subsequent pipeline stages

$skip and $limit — Pagination in a Pipeline

$skip discards the first N documents in the pipeline and $limit caps the output to N documents. Together they implement offset-based pagination inside an aggregation pipeline — the same pattern as cursor skip() and limit() but composable within a larger multi-stage pipeline.

# $skip and $limit — paginating aggregation results

from pymongo import MongoClient

client = MongoClient("mongodb://localhost:27017/")
db     = client["dataplexa"]

PAGE_SIZE = 3

def get_products_page(page: int):
    """Fetch one page of products sorted by price ascending."""
    pipeline = [
        {"$sort":  {"price": 1}},
        {"$skip":  (page - 1) * PAGE_SIZE},   # skip pages before this one
        {"$limit": PAGE_SIZE},                  # return only this page
        {"$project": {"name": 1, "price": 1, "category": 1, "_id": 0}}
    ]
    return list(db.products.aggregate(pipeline))

# Page 1
print("Page 1 (products 1–3 by price):")
for p in get_products_page(1):
    print(f"  ${p['price']:>7.2f}  {p['name']} ({p['category']})")

# Page 2
print("\nPage 2 (products 4–6 by price):")
for p in get_products_page(2):
    print(f"  ${p['price']:>7.2f}  {p['name']} ({p['category']})")

# Page 3
print("\nPage 3 (product 7 by price):")
for p in get_products_page(3):
    print(f"  ${p['price']:>7.2f}  {p['name']} ({p['category']})")

# Total count alongside paginated results — common API pattern
count_pipeline = [{"$count": "total"}]
total = list(db.products.aggregate(count_pipeline))[0]["total"]
import math
print(f"\nTotal products: {total}  |  Page size: {PAGE_SIZE}  "
      f"|  Total pages: {math.ceil(total / PAGE_SIZE)}")
Page 1 (products 1–3 by price):
$ 3.49 Ballpoint Pens 10-pack (Stationery)
$ 4.99 Notebook A5 (Stationery)
$ 29.99 Wireless Mouse (Electronics)

Page 2 (products 4–6 by price):
$ 49.99 USB-C Hub (Electronics)
$ 89.99 Mechanical Keyboard (Electronics)
$299.99 Monitor 27-inch (Electronics)

Page 3 (product 7 by price):
$349.99 Standing Desk (Furniture)

Total products: 7 | Page size: 3 | Total pages: 3
  • The $sort → $skip → $limit pattern must always sort first — without a stable sort, page boundaries are non-deterministic and results overlap between pages
  • $count is a single-stage pipeline that returns one document with the total number of documents reaching that stage — use it in a separate pipeline call for the total page count
  • The same deep-page performance problem that affects cursor skip() applies here — for large offsets, prefer keyset pagination using a range filter on the sort field

$facet — Multiple Aggregations in One Pass

$facet runs multiple independent sub-pipelines on the same input documents simultaneously, returning all results in a single document. It is the aggregation framework's answer to the common need for "give me the filtered results AND the counts AND the price ranges in one query" — the kind of data that powers search results pages with sidebar filters.

# $facet — multiple parallel sub-pipelines in one query

from pymongo import MongoClient

client = MongoClient("mongodb://localhost:27017/")
db     = client["dataplexa"]

# A product search page needs simultaneously:
# 1. The paginated product list
# 2. A count breakdown by category (for sidebar filters)
# 3. Price range stats (for the price slider)

pipeline = [
    # Filter to Electronics first (simulating a search/filter)
    {"$match": {"category": "Electronics"}},

    # Run three independent sub-pipelines on the matched documents
    {"$facet": {

        # Sub-pipeline 1: paginated product list (page 1, 3 per page)
        "products": [
            {"$sort":  {"rating": -1}},
            {"$limit": 3},
            {"$project": {"name": 1, "price": 1, "rating": 1, "_id": 0}}
        ],

        # Sub-pipeline 2: total count of matched products
        "total_count": [
            {"$count": "count"}
        ],

        # Sub-pipeline 3: price distribution stats
        "price_stats": [
            {"$group": {
                "_id":       None,
                "min_price": {"$min": "$price"},
                "max_price": {"$max": "$price"},
                "avg_price": {"$avg": "$price"},
            }},
            {"$project": {"_id": 0,
                          "min_price": 1,
                          "max_price": 1,
                          "avg_price": {"$round": ["$avg_price", 2]}}}
        ],
    }}
]

result = list(db.products.aggregate(pipeline))[0]

print("$facet result — one query, three outputs:\n")

print("Products (page 1, sorted by rating):")
for p in result["products"]:
    print(f"  {p['name']:25}  rating: {p['rating']}  ${p['price']:.2f}")

total = result["total_count"][0]["count"] if result["total_count"] else 0
print(f"\nTotal Electronics products: {total}")

stats = result["price_stats"][0] if result["price_stats"] else {}
print(f"\nPrice stats:")
print(f"  min: ${stats.get('min_price', 0):.2f}")
print(f"  max: ${stats.get('max_price', 0):.2f}")
print(f"  avg: ${stats.get('avg_price', 0):.2f}")
$facet result — one query, three outputs:

Products (page 1, sorted by rating):
Mechanical Keyboard rating: 4.7 $89.99
Monitor 27-inch rating: 4.6 $299.99
Wireless Mouse rating: 4.5 $29.99

Total Electronics products: 4

Price stats:
min: $29.99
max: $299.99
avg: $117.49
  • $facet always returns exactly one document containing the results of each sub-pipeline as a named array — even if a sub-pipeline returns no documents, its array is empty rather than absent
  • Sub-pipelines inside $facet cannot include $facet, $out, or $geoNear — all other stages are permitted
  • The input documents are shared across all sub-pipelines — $facet does not re-query the collection, it fans out a single input set

A Full Reporting Pipeline

Combining every stage learned so far, this pipeline produces a complete order summary report — joining collections, unwinding arrays, grouping, adding computed fields, and sorting — in one server-side query.

# Full reporting pipeline — product revenue report across all orders

from pymongo import MongoClient

client = MongoClient("mongodb://localhost:27017/")
db     = client["dataplexa"]

pipeline = [
    # Stage 1: only delivered orders
    {"$match": {"status": "delivered"}},

    # Stage 2: explode the items array — one document per line item
    {"$unwind": "$items"},

    # Stage 3: join each line item's product_id to the products collection
    {"$lookup": {
        "from":         "products",
        "localField":   "items.product_id",
        "foreignField": "_id",
        "as":           "product_info"
    }},
    {"$unwind": "$product_info"},

    # Stage 4: group by product to compute sales metrics
    {"$group": {
        "_id":           "$items.product_id",
        "product_name":  {"$first": "$product_info.name"},
        "category":      {"$first": "$product_info.category"},
        "units_sold":    {"$sum": "$items.qty"},
        "total_revenue": {"$sum": {
            "$multiply": ["$items.qty", "$items.price"]
        }},
        "order_count":   {"$sum": 1},
    }},

    # Stage 5: add average revenue per order
    {"$addFields": {
        "avg_per_order": {
            "$round": [{"$divide": ["$total_revenue", "$order_count"]}, 2]
        },
        "total_revenue": {"$round": ["$total_revenue", 2]}
    }},

    # Stage 6: sort by revenue descending
    {"$sort": {"total_revenue": -1}},

    # Stage 7: shape the final output
    {"$project": {
        "_id": 0,
        "product":       "$product_name",
        "category":      1,
        "units_sold":    1,
        "total_revenue": 1,
        "avg_per_order": 1,
    }}
]

results = list(db.orders.aggregate(pipeline))
print("Product revenue report — delivered orders:\n")
print(f"  {'Product':25}  {'Cat':12}  {'Units':5}  {'Revenue':>10}  {'Avg/Order':>9}")
print(f"  {'─'*25}  {'─'*12}  {'─'*5}  {'─'*10}  {'─'*9}")
for r in results:
    print(f"  {r['product']:25}  {r['category']:12}  "
          f"{r['units_sold']:5}  ${r['total_revenue']:>9.2f}  "
          f"${r['avg_per_order']:>8.2f}")
Product revenue report — delivered orders:

Product Cat Units Revenue Avg/Order
───────────────────────── ──────────── ───── ────────── ─────────
Monitor 27-inch Electronics 1 $ 269.99 $ 269.99
Standing Desk Furniture 1 $ 249.99 $ 249.99
Wireless Mouse Electronics 3 $ 89.97 $ 44.99
Mechanical Keyboard Electronics 1 $ 80.99 $ 80.99
USB-C Hub Electronics 1 $ 49.99 $ 49.99
Notebook A5 Stationery 5 $ 24.95 $ 4.99
Ballpoint Pens 10-pack Stationery 5 $ 17.45 $ 3.49
  • This seven-stage pipeline replaces what would otherwise be multiple queries, multiple Python loops, and manual joins — all running on the server with no intermediate data transferred
  • The double $unwind + $lookup pattern (unwind items, then join on product_id) is the standard approach for aggregating across embedded arrays that reference another collection
  • Always test complex pipelines one stage at a time — add stages incrementally and inspect intermediate output to catch shape or field-name issues early

Summary Table

Stage What It Does Key Option SQL Equivalent
$lookup Joins a foreign collection from, localField, foreignField, as LEFT JOIN
$unwind Explodes array into one doc per element preserveNullAndEmptyArrays, includeArrayIndex CROSS APPLY / UNNEST
$addFields Adds computed fields, keeps all existing $switch, $multiply, $round SELECT *, expr AS newcol
$skip Discards first N documents Integer N OFFSET
$limit Caps output to N documents Integer N FETCH NEXT N ROWS
$facet Runs multiple sub-pipelines in parallel Named sub-pipeline arrays Multiple CTEs in one query
$count Returns total document count at that stage Output field name string COUNT(*)

Practice Questions

Practice 1. Why must $unwind almost always follow immediately after $lookup?



Practice 2. What is the difference between $addFields and $project when adding a computed field?



Practice 3. Write the $skip and $limit stages to return page 3 with a page size of 10.



Practice 4. What does $facet return and how many documents does it produce regardless of the number of sub-pipelines?



Practice 5. In the full reporting pipeline, why is $unwind applied before $lookup on items.product_id?



Quiz

Quiz 1. What type of join does $lookup perform by default?






Quiz 2. What happens to documents with a null or empty array field when $unwind is applied without any options?






Quiz 3. Which aggregation stage is best suited for powering a search results page that needs product listings, category counts, and price range stats all in one query?






Quiz 4. Which stages are NOT permitted inside a $facet sub-pipeline?






Quiz 5. In a $sort → $skip → $limit pagination pipeline, why must $sort always come first?






Next up — Group, Match & Project: A focused deep-dive into the three most-used aggregation stages with advanced expressions, conditional logic, and real reporting patterns.