Mango DBLesson 27 – Aggregation Basics | Dataplexa

Aggregation Basics

The find() method retrieves documents — but it cannot compute totals, group records, calculate averages, or reshape data. For anything beyond simple retrieval you need the aggregation framework. MongoDB's aggregation framework processes documents through a sequence of stages called a pipeline. Each stage receives the output of the previous stage, transforms it in some way, and passes the result downstream. The pipeline approach is flexible, composable, and powerful enough to replace entire layers of data-processing application code with a single database query. This lesson introduces the four most essential stages — $match, $group, $project, and $sort — and shows how they work individually and together using the Dataplexa Store dataset.

The Pipeline Model

An aggregation pipeline is a Python list of stage dictionaries passed to collection.aggregate(). Documents flow through each stage in order — the output of one stage becomes the input of the next. Stages do not modify the original collection; they transform a working copy of the documents as they pass through.

# The pipeline model — documents flow through stages sequentially

from pymongo import MongoClient

client = MongoClient("mongodb://localhost:27017/")
db     = client["dataplexa"]

# A minimal pipeline — one stage
single_stage = [
    {"$match": {"status": "delivered"}}   # Stage 1: filter documents
]

results = list(db.orders.aggregate(single_stage))
print("Single-stage pipeline ($match only):")
for o in results:
    print(f"  {o['_id']}  {o['status']:12}  ${o['total']:.2f}")

# A two-stage pipeline — filter then sort
two_stage = [
    {"$match": {"status": "delivered"}},          # Stage 1: filter
    {"$sort":  {"total": -1}}                      # Stage 2: sort descending
]

results = list(db.orders.aggregate(two_stage))
print("\nTwo-stage pipeline ($match → $sort):")
for o in results:
    print(f"  {o['_id']}  ${o['total']:.2f}")

# Pipeline principles
print("\nPipeline rules:")
rules = [
    "Stages execute left to right — output of each feeds the next",
    "Stages do not modify the original collection",
    "Any number of stages can be chained",
    "$match early — reduces document count before expensive stages",
    "aggregate() returns a cursor — iterate it like find()",
]
for r in rules:
    print(f"  • {r}")
Single-stage pipeline ($match only):
o001 delivered $44.96
o003 delivered $99.98
o005 delivered $329.98
o007 delivered $11.97

Two-stage pipeline ($match → $sort):
o005 $329.98
o003 $99.98
o001 $44.96
o007 $11.97

Pipeline rules:
• Stages execute left to right — output of each feeds the next
• Stages do not modify the original collection
• Any number of stages can be chained
• $match early — reduces document count before expensive stages
• aggregate() returns a cursor — iterate it like find()
  • aggregate() always receives a list — even a single-stage pipeline must be wrapped in a list
  • Placing $match as the first stage is the most important pipeline optimisation — it reduces the document count before any expensive grouping or reshaping work begins
  • The aggregation framework runs entirely on the MongoDB server — no data is transferred to the application until the final stage produces results

$match — Filtering Documents

$match is the aggregation equivalent of find()'s filter argument. It accepts the same query operators — comparison, logical, array, regex — and passes only the matching documents to the next stage. When $match is the first stage and its filter can use an index, the entire pipeline benefits from that index.

# $match — filtering at the start and middle of a pipeline

from pymongo import MongoClient

client = MongoClient("mongodb://localhost:27017/")
db     = client["dataplexa"]

# $match at the start — index-eligible, reduces input to later stages
print("Products rated 4.5 or higher ($match only):")
results = db.products.aggregate([
    {"$match": {"rating": {"$gte": 4.5}}}
])
for p in results:
    print(f"  {p['name']:25}  rating: {p['rating']}  ${p['price']:.2f}")

# $match with logical operators — same syntax as find()
print("\nPremium users from UK or Germany ($match with $or):")
results = db.users.aggregate([
    {"$match": {
        "membership": "premium",
        "$or": [
            {"country": "UK"},
            {"country": "Germany"}
        ]
    }}
])
for u in results:
    print(f"  {u['name']:15}  {u['country']}  {u['membership']}")

# $match mid-pipeline — after a $group to filter aggregated results
# Count orders per user, then keep only users with more than 1 order
results = list(db.orders.aggregate([
    {"$group": {"_id": "$user_id", "order_count": {"$sum": 1}}},
    {"$match": {"order_count": {"$gt": 1}}}          # filter on computed field
]))
print("\nUsers with more than 1 order ($group → $match):")
for r in results:
    print(f"  user: {r['_id']}  orders: {r['order_count']}")
Products rated 4.5 or higher ($match only):
Mechanical Keyboard rating: 4.7 $89.99
Standing Desk rating: 4.8 $349.99
Monitor 27-inch rating: 4.6 $299.99
Wireless Mouse rating: 4.5 $29.99

Premium users from UK or Germany ($match with $or):
Alice Johnson UK premium
Eva Müller Germany premium

Users with more than 1 order ($group → $match):
user: u001 orders: 2
user: u002 orders: 2
  • When $match is the first pipeline stage and uses an indexed field, MongoDB's query planner applies the index — making the pipeline as fast as a standard find()
  • $match mid-pipeline filters the documents produced by previous stages — the field names it references must exist in the current document shape at that point in the pipeline
  • Using $match after $group to filter on computed fields is the aggregation equivalent of SQL's HAVING clause

$group — Grouping and Accumulating

$group is the aggregation framework's most powerful stage. It groups input documents by a key expression and computes accumulated values across each group using accumulator operators like $sum, $avg, $min, $max, $count, and $push. Every $group stage requires an _id field that defines the grouping key — set it to null to accumulate across all documents.

# $group — grouping documents and computing aggregations

from pymongo import MongoClient

client = MongoClient("mongodb://localhost:27017/")
db     = client["dataplexa"]

# Group by category — count products and average price per category
print("Products per category with average price:")
results = db.products.aggregate([
    {"$group": {
        "_id":         "$category",          # group by this field
        "product_count": {"$sum": 1},        # count documents in each group
        "avg_price":     {"$avg": "$price"}, # average price per group
        "min_price":     {"$min": "$price"},
        "max_price":     {"$max": "$price"},
    }},
    {"$sort": {"product_count": -1}}
])
for r in results:
    print(f"  {r['_id']:12}  count: {r['product_count']}  "
          f"avg: ${r['avg_price']:>7.2f}  "
          f"range: ${r['min_price']:.2f}–${r['max_price']:.2f}")

# Group by status — total revenue and order count per status
print("\nOrder revenue by status:")
results = db.orders.aggregate([
    {"$group": {
        "_id":         "$status",
        "order_count": {"$sum": 1},
        "total_revenue": {"$sum": "$total"},
        "avg_order":   {"$avg": "$total"},
    }},
    {"$sort": {"total_revenue": -1}}
])
for r in results:
    print(f"  {r['_id']:12}  orders: {r['order_count']}  "
          f"revenue: ${r['total_revenue']:>7.2f}  "
          f"avg: ${r['avg_order']:.2f}")

# Group with _id: null — aggregate across the ENTIRE collection
print("\nOverall product stats (null group — all documents):")
results = list(db.products.aggregate([
    {"$group": {
        "_id":          None,
        "total_products": {"$sum": 1},
        "avg_price":      {"$avg": "$price"},
        "avg_rating":     {"$avg": "$rating"},
        "total_stock":    {"$sum": "$stock"},
    }}
]))
r = results[0]
print(f"  total products: {r['total_products']}")
print(f"  avg price:      ${r['avg_price']:.2f}")
print(f"  avg rating:     {r['avg_rating']:.2f}")
Products per category with average price:
Electronics count: 4 avg: $ 117.49 range: $29.99–$299.99
Stationery count: 2 avg: $ 4.24 range: $3.49–$4.99
Furniture count: 1 avg: $ 349.99 range: $349.99–$349.99

Order revenue by status:
delivered orders: 4 revenue: $ 486.89 avg: $121.72
processing orders: 1 revenue: $ 349.99 avg: $349.99
shipped orders: 1 revenue: $ 89.99 avg: $89.99
cancelled orders: 1 revenue: $ 0.00 avg: $0.00

Overall product stats (null group — all documents):
total products: 7
avg price: $118.34
avg rating: 4.54
  • The _id field in $group is the grouping key — it can be a field reference like "$category", a compound expression, or null to aggregate everything
  • Field references inside a $group expression use the "$fieldName" dollar-prefix string syntax — this tells MongoDB to use the value of that field
  • Accumulator operators: $sum (total or count), $avg, $min, $max, $push (collect into array), $addToSet (collect unique values), $first, $last

$project — Reshaping Documents

$project controls which fields are included in the output documents and allows you to compute new fields from existing ones. It is the aggregation equivalent of a projection in find() — but far more powerful, because it can add computed fields, rename fields, and apply expression operators inline.

# $project — inclusion, exclusion, and computed fields

from pymongo import MongoClient

client = MongoClient("mongodb://localhost:27017/")
db     = client["dataplexa"]

# Basic inclusion — keep only selected fields
print("Products — name and price only ($project inclusion):")
results = db.products.aggregate([
    {"$project": {"name": 1, "price": 1, "_id": 0}}
])
for p in results:
    print(f"  {p['name']:25}  ${p['price']:.2f}")

# Computed fields — create new fields from expressions
print("\nProducts with discounted price and label ($project computed):")
results = db.products.aggregate([
    {"$match":   {"category": "Electronics"}},
    {"$project": {
        "name":             1,
        "original_price":   "$price",
        # Computed: 10% discount rounded to 2 dp
        "discounted_price": {"$round": [{"$multiply": ["$price", 0.9]}, 2]},
        # Computed: label string using $concat
        "label": {"$concat": ["$name", " (", "$category", ")"]},
        "_id": 0
    }}
])
for p in results:
    print(f"  {p['label']:40}  "
          f"was ${p['original_price']:.2f}  now ${p['discounted_price']:.2f}")

# Renaming fields
print("\nOrders with renamed fields ($project rename):")
results = db.orders.aggregate([
    {"$match":   {"status": "delivered"}},
    {"$project": {
        "order_id":   "$_id",
        "customer":   "$user_id",
        "amount":     "$total",
        "fulfilled":  "$date",
        "_id":        0
    }}
])
for o in results:
    print(f"  order: {o['order_id']}  customer: {o['customer']}  amount: ${o['amount']:.2f}")
Products — name and price only ($project inclusion):
Wireless Mouse $29.99
Mechanical Keyboard $89.99
Notebook A5 $4.99
Standing Desk $349.99
USB-C Hub $49.99
Ballpoint Pens 10-pack $3.49
Monitor 27-inch $299.99

Products with discounted price and label ($project computed):
Wireless Mouse (Electronics) was $29.99 now $26.99
Mechanical Keyboard (Electronics) was $89.99 now $80.99
USB-C Hub (Electronics) was $49.99 now $44.99
Monitor 27-inch (Electronics) was $299.99 now $269.99

Orders with renamed fields ($project rename):
order: o001 customer: u001 amount: $44.96
order: o003 customer: u001 amount: $99.98
order: o005 customer: u004 amount: $329.98
order: o007 customer: u002 amount: $11.97
  • Setting a field to 1 includes it — setting it to 0 excludes it. You cannot mix inclusion and exclusion in the same $project except for _id
  • Computed fields use expression operators: $multiply, $add, $subtract, $divide, $round, $concat, $toUpper, $toLower, $dateToString, and many more
  • Renaming a field is done by projecting the old field reference ("$old_name") as the value of a new field name — the original field is excluded by omitting it from the projection

$sort — Ordering the Output

$sort in an aggregation pipeline works identically to the cursor .sort() method — it reorders documents by one or more fields in ascending (1) or descending (-1) order. Its position in the pipeline matters: placing it before a $limit is far more efficient than sorting after, because only the top N documents need to be retained.

# $sort — ordering pipeline output

from pymongo import MongoClient

client = MongoClient("mongodb://localhost:27017/")
db     = client["dataplexa"]

# Simple sort on a single field
print("All products sorted by price ascending:")
results = db.products.aggregate([
    {"$project": {"name": 1, "price": 1, "_id": 0}},
    {"$sort":    {"price": 1}}
])
for p in results:
    print(f"  ${p['price']:>7.2f}  {p['name']}")

# Multi-key sort — primary key then secondary
print("\nUsers sorted by membership desc then name asc:")
results = db.users.aggregate([
    {"$project": {"name": 1, "membership": 1, "country": 1, "_id": 0}},
    {"$sort":    {"membership": -1, "name": 1}}   # premium first, then alphabetical
])
for u in results:
    print(f"  {u['membership']:8}  {u['name']:15}  {u['country']}")

# $sort + $limit — top-N pattern (efficient — sort only needs to track N best)
print("\nTop 3 most expensive products ($sort → $limit):")
results = db.products.aggregate([
    {"$sort":  {"price": -1}},
    {"$limit": 3},
    {"$project": {"name": 1, "price": 1, "category": 1, "_id": 0}}
])
for p in results:
    print(f"  ${p['price']:>7.2f}  {p['name']} ({p['category']})")

# $sort after $group — sort computed aggregation results
print("\nCategories sorted by average price desc:")
results = db.products.aggregate([
    {"$group": {"_id": "$category", "avg_price": {"$avg": "$price"}}},
    {"$sort":  {"avg_price": -1}},
    {"$project": {"category": "$_id", "avg_price": {"$round": ["$avg_price", 2]}, "_id": 0}}
])
for r in results:
    print(f"  {r['category']:12}  avg: ${r['avg_price']:.2f}")
All products sorted by price ascending:
$ 3.49 Ballpoint Pens 10-pack
$ 4.99 Notebook A5
$ 29.99 Wireless Mouse
$ 49.99 USB-C Hub
$ 89.99 Mechanical Keyboard
$299.99 Monitor 27-inch
$349.99 Standing Desk

Users sorted by membership desc then name asc:
premium Alice Johnson UK
premium Clara Diaz Spain
premium Eva Müller Germany
basic Bob Smith UK
basic David Lee USA

Top 3 most expensive products ($sort → $limit):
$349.99 Standing Desk (Furniture)
$299.99 Monitor 27-inch (Electronics)
$ 89.99 Mechanical Keyboard (Electronics)

Categories sorted by average price desc:
Furniture avg: $349.99
Electronics avg: $117.49
Stationery avg: $4.24
  • Always place $sort immediately before $limit for top-N queries — MongoDB optimises this pattern internally, avoiding a full sort of all documents
  • Sorting on an indexed field in the first $sort stage (with no preceding $group) allows MongoDB to use the index for ordering without a blocking in-memory sort
  • Multi-key sorts in a pipeline use the same dict syntax as cursor sort — fields are applied left to right as primary, secondary, tertiary sort keys

Putting It All Together — A Four-Stage Pipeline

The real power of the aggregation framework emerges when stages are combined. A pipeline that matches, groups, projects, and sorts can answer complex business questions in a single database round trip that would otherwise require multiple queries and application-layer processing.

# Combined pipeline — $match → $group → $project → $sort

from pymongo import MongoClient

client = MongoClient("mongodb://localhost:27017/")
db     = client["dataplexa"]

# Business question:
# "For delivered orders only, what is the total spend and order count
#  per user, showing only users who spent more than $50,
#  sorted by total spend descending?"

pipeline = [
    # Stage 1: filter to delivered orders only (index-eligible — runs first)
    {"$match": {"status": "delivered"}},

    # Stage 2: group by user — sum total spend and count orders
    {"$group": {
        "_id":         "$user_id",
        "total_spent": {"$sum": "$total"},
        "order_count": {"$sum": 1},
        "avg_order":   {"$avg": "$total"},
    }},

    # Stage 3: filter out users who spent $50 or less (HAVING equivalent)
    {"$match": {"total_spent": {"$gt": 50}}},

    # Stage 4: rename and shape the output fields
    {"$project": {
        "user_id":     "$_id",
        "total_spent": {"$round": ["$total_spent", 2]},
        "order_count": 1,
        "avg_order":   {"$round": ["$avg_order", 2]},
        "_id":         0
    }},

    # Stage 5: sort by highest spender first
    {"$sort": {"total_spent": -1}},
]

results = list(db.orders.aggregate(pipeline))
print("High-value customers — delivered orders > $50 total spend:\n")
print(f"  {'User':8}  {'Orders':8}  {'Total Spent':12}  {'Avg Order'}")
print(f"  {'─'*8}  {'─'*8}  {'─'*12}  {'─'*9}")
for r in results:
    print(f"  {r['user_id']:8}  {r['order_count']:8}  "
          f"${r['total_spent']:>11.2f}  ${r['avg_order']:.2f}")
High-value customers — delivered orders > $50 total spend:

User Orders Total Spent Avg Order
──────── ──────── ──────────── ─────────
u004 1 $ 329.98 $329.98
u001 2 $ 144.94 $72.47
  • The two $match stages serve different purposes — the first filters raw documents using an index, the second filters computed group results (the HAVING pattern)
  • This entire analysis — filtering, grouping, aggregating, reshaping, and sorting — runs on the MongoDB server in one round trip with zero application-layer loops
  • Pipeline stages are reusable building blocks — swap out the initial $match filter and the same pipeline answers a different business question without changing the structure

Summary Table

Stage What It Does Key Options SQL Equivalent
$match Filters documents All query operators WHERE / HAVING
$group Groups and accumulates $sum, $avg, $min, $max, $push GROUP BY
$project Shapes and computes fields $multiply, $concat, $round, rename SELECT
$sort Orders documents 1 asc, -1 desc, multi-key ORDER BY
$limit Caps result count Integer N LIMIT
$group _id: null Aggregates all documents Same accumulators SELECT COUNT(*), AVG() with no GROUP BY

Practice Questions

Practice 1. Why should $match always be placed as early as possible in a pipeline — ideally as the first stage?



Practice 2. What does setting _id to null in a $group stage do?



Practice 3. Write a pipeline to count how many orders each user has, returning only users with more than one order.



Practice 4. What is the difference between $project and a find() projection?



Practice 5. What SQL clause does a $match placed after a $group stage correspond to, and why?



Quiz

Quiz 1. What must you always wrap an aggregation pipeline in when passing it to aggregate()?






Quiz 2. Which accumulator operator in $group collects the values from each document in the group into an array?






Quiz 3. Which stage ordering produces the most efficient top-3 query?






Quiz 4. In $project, what does assigning a field the value "$originalFieldName" do?






Quiz 5. What is the key difference between $match before $group and $match after $group?






Next up — Aggregation Pipelines: Going deeper with $lookup, $unwind, $addFields, $facet, and multi-collection pipelines that power real reporting workflows.