NoSQL
Mini Project — Build a Cloud-Native Data Platform
Every concept in this course — schema design, consistency models, sharding, replication, security, monitoring, backups, scaling, and cloud-native patterns — converges here. You are going to build a complete, production-grade data platform for a real product from scratch. Not a toy. Not a hello-world. A system that a real engineering team could ship, operate, and evolve.
The Project — ShopStream
ShopStream is a live-commerce platform — think QVC meets Twitch. Hosts go live, showcase products, viewers chat and purchase in real time. The data platform must handle:
ShopStream — Data Requirements
User Profiles
Flexible attributes per user — preferences, payment methods, address book. Schema evolves constantly as new features ship.
Live Sessions
Real-time viewer counts, chat messages, product showcases. Millions of events per minute during popular streams.
Orders
Payment processing, inventory deduction, fulfilment status. ACID required — cannot double-charge or oversell.
Recommendations
"Viewers who bought this also bought..." — relationship traversal across purchase history and viewer behaviour.
Session Cache
Auth tokens, cart state, viewer position in live streams. Sub-millisecond reads with TTL expiry. Never needs persistence.
Step 1 — Polyglot Persistence Architecture
The first decision: which database for which service. No single database can be optimal for all five data types above. The architecture uses five databases — each chosen for a specific access pattern — connected through events and APIs, never through shared connections.
| Service | Database | Why this one | AWS managed |
|---|---|---|---|
| User Service | MongoDB | Flexible schema, rich queries, frequent attribute additions | DocumentDB |
| Events Service | Cassandra | Millions of writes/sec, time-series, append-only, TTL | Keyspaces |
| Order Service | DynamoDB | Serverless scale, conditional writes, transactional API | DynamoDB |
| Recommendation Service | Neo4j | Multi-hop graph traversal, purchase relationship queries | Neptune |
| Session Service | Redis | Sub-ms reads, TTL expiry, no durability needed | ElastiCache |
Step 2 — MongoDB User Profiles (Flexible Schema)
The User Service stores profiles with schema validation that enforces required fields while allowing any additional attributes. New product features add new fields to user documents — the schema evolves without migrations.
The scenario: The engineering team is implementing the User Service. Profiles need required fields — userId, email, createdAt — plus optional, product-specific attributes that different teams add over time: streamingQuality, paymentMethods, addressBook. You are setting up the collection with JSON Schema validation and a partial index so email uniqueness is enforced only on verified accounts.
db.createCollection("users", {
validator: {
$jsonSchema: {
bsonType: "object",
required: ["userId", "email", "createdAt"],
properties: {
userId: { bsonType: "string" },
email: { bsonType: "string", pattern: "^.+@.+\\..+$" },
createdAt: { bsonType: "date" },
// Additional fields are allowed without declaration (flexible schema)
}
}
},
validationAction: "error" // reject invalid docs, don't just warn
});
// Partial index: enforce email uniqueness only on verified accounts
// Unverified accounts (email pending confirmation) can share email temporarily
db.users.createIndex(
{ email: 1 },
{
unique: true,
partialFilterExpression: { emailVerified: true },
name: "unique_verified_email"
}
);
// Insert a full user profile — required + optional fields together
db.users.insertOne({
userId: "usr_8821",
email: "alex@example.com",
createdAt: new Date(),
emailVerified: true,
// Optional fields added freely — no schema change required
streamingQuality: "1080p",
paymentMethods: [{ type: "card", last4: "4242", default: true }],
addressBook: [{ label: "home", city: "London", postcode: "EC1A" }],
preferences: { notifications: true, darkMode: false }
});
Collection created with schema validation ✓
Index "unique_verified_email" created ✓
{ acknowledged: true, insertedId: ObjectId("...") } ✓
// Validation working — try inserting without required field:
db.users.insertOne({ email: "test@example.com" })
MongoServerError: Document failed validation
details: { operatorName: '$jsonSchema',
schemaRulesNotSatisfied: [{ missingProperties: ['userId','createdAt'] }] }
// Partial index working — two unverified users CAN share email:
db.users.insertOne({ userId:"u1", email:"dupe@x.com", createdAt:new Date(),
emailVerified: false }) // succeeds
db.users.insertOne({ userId:"u2", email:"dupe@x.com", createdAt:new Date(),
emailVerified: false }) // also succeeds — not covered by partial index ✓validationAction: "error" vs "warn"
"warn" logs a warning but inserts the invalid document anyway — useless for data integrity. "error" rejects the insert and returns an error to the caller, forcing the application to fix the document before it can be stored. In production, always use "error". Use "warn" only during a migration period when you are adding validation to an existing collection that already contains non-conforming documents.
partialFilterExpression: { emailVerified: true }
A partial index only indexes documents matching the filter expression — documents where emailVerified is not true are not included. This means the uniqueness constraint applies only to verified accounts. An unverified account created via OAuth with the same email as a pending registration does not conflict — which is the intended behaviour during email confirmation flows. The index is also smaller and more efficient because it covers fewer documents.
Step 3 — Cassandra Events (High-Write Time Series)
The Events Service records every viewer action — joins, chat messages, product views, purchases — during live streams. At peak, a single viral stream generates 2 million events per minute. Cassandra's append-heavy LSM-tree write model and TTL support make it the right tool.
The scenario: You are designing the Cassandra schema for the Events Service. The dominant queries are: (1) all events for a session ordered by time, (2) all events for a user across sessions in the past 30 days. You are applying query-driven design — primary key chosen to make both queries targeted, with a 90-day TTL so old events expire automatically without manual cleanup jobs.
-- Primary table: query by session (access pattern 1)
CREATE TABLE shopstream.session_events (
session_id TEXT,
occurred_at TIMESTAMP,
event_id UUID,
user_id TEXT,
event_type TEXT, -- join | leave | chat | product_view | purchase
payload TEXT, -- JSON blob — flexible per event type
PRIMARY KEY (session_id, occurred_at, event_id)
) WITH CLUSTERING ORDER BY (occurred_at ASC)
AND default_time_to_live = 7776000; -- 90 days in seconds
-- Materialized view: query by user (access pattern 2)
-- Same data, different partition key — Cassandra maintains sync automatically
CREATE MATERIALIZED VIEW shopstream.user_events AS
SELECT * FROM shopstream.session_events
WHERE user_id IS NOT NULL
AND session_id IS NOT NULL
AND occurred_at IS NOT NULL
AND event_id IS NOT NULL
PRIMARY KEY (user_id, occurred_at, session_id, event_id)
WITH CLUSTERING ORDER BY (occurred_at DESC); -- newest first for user feed
-- Insert a high-volume event batch (no TTL override — uses table default)
INSERT INTO shopstream.session_events
(session_id, occurred_at, event_id, user_id, event_type, payload)
VALUES ('sess_live_882', toTimestamp(now()), uuid(),
'usr_8821', 'product_view',
'{"productId":"prod_001","dwellMs":4200}');
Table session_events created ✓
Materialized view user_events created ✓
-- Query access pattern 1: all events in a session
SELECT * FROM shopstream.session_events
WHERE session_id = 'sess_live_882'
AND occurred_at > '2025-03-10 14:00:00'
LIMIT 500;
→ 500 rows (2.1ms) IXSCAN on primary key ✓
-- Query access pattern 2: user's recent activity (via materialized view)
SELECT * FROM shopstream.user_events
WHERE user_id = 'usr_8821'
AND occurred_at > '2025-02-10 00:00:00'
LIMIT 100;
→ 83 rows (1.8ms) IXSCAN on view partition key ✓
-- Peak load: 2M inserts/minute across 6 Cassandra nodes
Write throughput: 33,333/sec | p99 write latency: 3.4ms ✓
TTL: rows expire automatically after 90 days — no cleanup job needed ✓PRIMARY KEY (session_id, occurred_at, event_id)
session_id is the partition key — all events for one session live on the same node, making per-session queries single-node operations. occurred_at is the first clustering column — events within a session are physically stored in timestamp order on disk, making time-range queries a sequential read rather than a random one. event_id is added to guarantee uniqueness — two events in the same session at the exact same millisecond do not collide.
Materialized view — same data, different partition key
Without the materialized view, a query for all of a user's events across all sessions would be a scatter-gather across every partition — Cassandra would have to check every session to find the ones that user participated in. The materialized view re-partitions the same data by user_id, turning it into a targeted single-partition query. Cassandra maintains the view automatically on every write to the base table — you pay a small write overhead to get a free fast read path.
default_time_to_live = 7776000
Every row in this table automatically expires 90 days after insertion without any application-level cleanup job, cron task, or background worker. Cassandra's TTL mechanism marks rows with a tombstone at expiry and removes them during the next compaction cycle. At 2 million events per minute, without TTL the table would grow by 86 billion rows per month indefinitely — TTL is not optional at this scale, it is the only viable retention strategy.
Step 4 — DynamoDB Orders (Transactional Writes)
The Order Service handles the most financially sensitive operations on the platform. When a viewer clicks "Buy Now" during a live stream, three things must happen atomically: the inventory must decrement, the order must be created, and the payment intent must be recorded. If any one fails, none should persist.
The scenario: Your team is implementing the purchase flow. During a flash sale, 500 viewers attempt to buy the last unit of a limited-edition product simultaneously. You need a transactional write that decrements inventory by exactly 1 (with a check that stock is not already zero), creates the order record, and records the payment intent — all in a single atomic DynamoDB transaction.
import boto3
from botocore.exceptions import ClientError
import uuid, datetime
dynamodb = boto3.client("dynamodb", region_name="us-east-1")
def place_order(user_id: str, product_id: str, amount_cents: int) -> dict:
order_id = str(uuid.uuid4())
try:
dynamodb.transact_write_items(
TransactItems=[
# Operation 1: decrement inventory (fails if stock = 0)
{
"Update": {
"TableName": "Inventory",
"Key": {"productId": {"S": product_id}},
"UpdateExpression": "SET stock = stock - :one",
"ConditionExpression": "stock > :zero", # prevent oversell
"ExpressionAttributeValues": {
":one": {"N": "1"},
":zero": {"N": "0"}
}
}
},
# Operation 2: create order record
{
"Put": {
"TableName": "Orders",
"Item": {
"userId": {"S": user_id},
"SK": {"S": f"ORDER#{order_id}"},
"productId": {"S": product_id},
"amountCents": {"N": str(amount_cents)},
"status": {"S": "PENDING"},
"createdAt": {"S": datetime.datetime.utcnow().isoformat()}
},
"ConditionExpression": "attribute_not_exists(SK)" # idempotent
}
},
# Operation 3: write payment intent
{
"Put": {
"TableName": "Payments",
"Item": {
"orderId": {"S": order_id},
"SK": {"S": "INTENT"},
"amountCents": {"N": str(amount_cents)},
"status": {"S": "INITIATED"}
}
}
}
]
)
return {"ok": True, "orderId": order_id}
except ClientError as e:
if e.response["Error"]["Code"] == "TransactionCanceledException":
return {"ok": False, "reason": "Out of stock or duplicate order"}
raise
// First buyer — stock was 1
place_order("usr_8821", "prod_limited_001", 9999)
Inventory: stock 1 → 0 ✓
Order created: ORDER#abc-123 ✓
Payment intent: INITIATED ✓
→ { "ok": True, "orderId": "abc-123" }
// Second buyer — stock already 0
place_order("usr_9934", "prod_limited_001", 9999)
ConditionExpression "stock > 0" FAILED — transaction cancelled
Inventory: unchanged ✓
Order: not created ✓
Payment: not created ✓
→ { "ok": False, "reason": "Out of stock or duplicate order" }
// 500 concurrent buyers — exactly 1 succeeds ✓
// Zero oversells in production ✓TransactWriteItems — all or nothing across 3 tables
DynamoDB's TransactWriteItems provides ACID semantics across up to 100 operations on up to 100 distinct items — even across different tables. If any single operation fails its condition check, the entire transaction is cancelled and no changes are persisted. This is the only way to guarantee you never have an order without a payment intent, or a payment intent without a corresponding inventory decrement.
ConditionExpression: "stock > :zero"
This is the oversell prevention. Without the condition, 500 concurrent buyers could all read stock: 1, all pass the "is there stock?" check in application code, and all proceed to decrement — leaving stock at -499. The condition is evaluated atomically on the DynamoDB storage node, serialising all concurrent updates. Exactly one buyer wins; the other 499 get TransactionCanceledException and a clean "out of stock" response.
TransactionCanceledException
DynamoDB raises this specific exception when any condition check in a transaction fails. The response body includes a CancellationReasons list — one entry per operation — that tells you exactly which operation failed and why. Parsing this lets you return a specific error message: "out of stock" when the inventory condition fails vs "duplicate order" when the attribute_not_exists condition fails.
Step 5 — Wiring it Together with DynamoDB Streams
The five services do not operate in isolation. When an order is created in DynamoDB, three downstream things need to happen: the recommendation graph in Neo4j/Neptune must record a new purchase edge, the user's MongoDB profile must be updated with order history, and a fulfilment event must be published to SQS. DynamoDB Streams drives all three — without coupling the Order Service to any downstream system.
The scenario: You are implementing the order fanout Lambda that consumes the DynamoDB Streams event from the Orders table and propagates the new order to all downstream systems. The Lambda must be idempotent — SQS and DynamoDB Streams both have at-least-once delivery, so the same order event may arrive more than once.
import boto3, json, os
from pymongo import MongoClient
sqs = boto3.client("sqs")
mongo = MongoClient(os.environ["MONGO_URI"])
users = mongo["shopstream"]["users"]
# Neptune/Neo4j client omitted for brevity — same pattern applies
def handler(event, context):
for record in event["Records"]:
# Only process new order inserts — ignore updates and deletes
if record["eventName"] != "INSERT":
continue
new = record["dynamodb"]["NewImage"]
order_id = new["SK"]["S"].replace("ORDER#", "")
user_id = new["userId"]["S"]
product_id = new["productId"]["S"]
amount = int(new["amountCents"]["N"])
# 1. Update MongoDB user profile — $push is idempotent-safe with $ne
users.update_one(
{"userId": user_id},
{"$addToSet": {"orderHistory": order_id}} # addToSet = no duplicates
)
# 2. Publish fulfilment event to SQS
# MessageDeduplicationId makes SQS FIFO queue idempotent on retry
sqs.send_message(
QueueUrl=os.environ["FULFILMENT_QUEUE"],
MessageBody=json.dumps({
"orderId": order_id,
"userId": user_id,
"productId": product_id,
"amount": amount
}),
MessageGroupId="fulfilment",
MessageDeduplicationId=order_id # SQS FIFO deduplication window: 5min
)
# 3. Neptune graph edge — MERGE prevents duplicate edges
# (Neptune Gremlin query simplified)
# g.V(user_id).addE('PURCHASED').to(g.V(product_id))
# with .property(id, order_id) — idempotent via MERGE semantics
Stream record received: INSERT on Orders table orderId: "abc-123" userId: "usr_8821" productId: "prod_limited_001" amount: 9999 cents → MongoDB: $addToSet orderHistory "abc-123" ✓ → SQS FIFO: MessageDeduplicationId="abc-123" — sent ✓ → Neptune: PURCHASED edge usr_8821 → prod_limited_001 ✓ // Retry scenario — same stream record delivered twice: → MongoDB: $addToSet is idempotent — "abc-123" already in set, skipped ✓ → SQS FIFO: MessageDeduplicationId="abc-123" already seen — deduplicated ✓ → Neptune: MERGE — edge already exists, no duplicate created ✓ All three downstream systems consistent ✓ Zero coupling between Order Service and downstream consumers ✓
$addToSet — idempotent array update
MongoDB's $addToSet adds a value to an array only if it is not already present — it is a set operation, not a list append. If the stream delivers the same order event twice, the second $addToSet finds order_id already in the array and does nothing. Compare with $push, which would add a duplicate on every delivery, leaving the user with two identical entries in their order history.
MessageDeduplicationId = order_id
SQS FIFO queues deduplicate messages with the same MessageDeduplicationId within a 5-minute window. Using order_id as the deduplication ID means if the Lambda retries and calls send_message again with the same order ID, SQS silently discards the duplicate — the fulfilment service receives it exactly once. Standard SQS queues do not have this guarantee, which is why FIFO queues are required for financial operations.
Streams decouple the Order Service from all consumers
The Order Service writes to its DynamoDB table and returns. It has no knowledge of MongoDB, Neptune, or SQS. If the recommendation service goes down for 4 hours, orders still succeed — the stream records the changes and the fanout Lambda processes them when Neptune recovers. Adding a new downstream consumer (an analytics pipeline, a customer email trigger) requires no changes to the Order Service — just a new Lambda event source mapping on the same stream.
Step 6 — Production Readiness Checklist
A system that works in development becomes production-ready when it can handle failure gracefully, recover from data loss, and give operators visibility into what is happening. Here is the minimum bar for ShopStream before it goes live.
bindIp locked to private subnet, authorization: enabled, requireTLS, least-privilege RBAC per service. DynamoDB: IAM roles per Lambda with resource-scoped policies. Redis: AUTH token + VPC-only access. All secrets in AWS Secrets Manager with automatic rotation.mongodump to S3 with --oplog, 7-day retention. DynamoDB: PITR enabled on all tables (35-day window). Cassandra: weekly nodetool snapshot to S3. Monthly restore drill — timed and documented, not theoretical.for: duration gates. Dashboard in Grafana with one-click drill-down to slow query profiler.attribute_not_exists conditions on all put_item calls. MongoDB: $addToSet for array updates. SQS: FIFO queues with MessageDeduplicationId. Stream processors: all downstream operations designed as upserts or set operations, never plain appends.Teacher's Note
You have just built a system that most engineering teams take months to design and several painful incidents to get right. The pattern that runs through every step — user profiles in MongoDB, events in Cassandra, orders in DynamoDB, recommendations in Neptune, sessions in Redis — is the same pattern: match the database to the access pattern, not the access pattern to the database you already know. Every decision in this project can be justified by a specific technical requirement. That is what senior engineering looks like. Not knowing every database API — knowing why you choose one over another, what you are trading off, and what you will do when the inevitable failure happens. Congratulations on completing the course. Now go build something.
Practice Questions — You're the Engineer
Scenario:
TransactWriteItems call includes a ConditionExpression: "stock > :zero" on the Inventory table update. The first buyer's transaction succeeds and sets stock to 0. The remaining 499 buyers' transactions attempt to execute — but the inventory condition check fails because stock is now 0. What exception does DynamoDB raise for the 499 failed transactions, and what does your Python except block catch to return a clean "Out of Stock" message to those users?
Scenario:
orderHistory array in MongoDB a second time. You need the MongoDB array update operator that adds a value to an array only if it does not already exist — making the operation safe to run multiple times with the same input. Which MongoDB update operator should you use?
Scenario:
session_events table has grown to 520 billion rows and the cluster is running out of disk space. A data retention policy states that event data older than 90 days is no longer needed for analytics or compliance. Your engineering manager asks you to implement automatic row expiry so that rows are deleted 90 days after insertion without any external cleanup job, cron task, or application-level deletion logic. What Cassandra table property did you include in the original CREATE TABLE statement to implement this automatic row expiry?
Quiz — Putting It All Together
Scenario:
Scenario:
Scenario:
Course Complete
You've finished the NoSQL Course
From CAP Theorem to cloud-native data platforms — 40 lessons, 160 scenario-based questions, and the full engineering intuition for choosing, designing, operating, and scaling NoSQL systems in production.