Data Science Lesson 53 – Data Ingestion | Dataplexa
Data Engineering · Lesson 53

Data Ingestion

Master the art of bringing data from various sources into your analytics pipeline with Python libraries and practical techniques.

1
Extract data from source
2
Validate and clean data format
3
Monitor ingestion pipeline
4
Load into target system

Understanding Data Sources

Data ingestion works like a sophisticated delivery system. Companies like Swiggy handle orders from mobile apps, restaurant dashboards, payment gateways, and GPS trackers. Each source speaks a different language — JSON from APIs, CSV from exports, streaming events from IoT devices.

The challenge? Every source has its own rhythm and format. Your payment system might send real-time transactions while your inventory database dumps files hourly. Data ingestion bridges these gaps systematically.

Batch Ingestion

Process large volumes at scheduled intervals. Perfect for historical reports.

Stream Ingestion

Handle real-time data flows. Essential for fraud detection, live dashboards.

API Ingestion

Pull data on-demand from web services. Great for enriching existing datasets.

File Ingestion

Import from various file formats. Most common starting point for analytics.

Reading Different File Formats

The scenario: You're a data analyst at Myntra and need to combine customer data from three different systems — CSV exports, JSON API responses, and Excel reports from the marketing team.

# Import essential libraries for data ingestion
import pandas as pd
import json
import requests
from pathlib import Path

# Create a Path object for clean file handling
data_path = Path('data')
print(f"Working directory: {data_path}")

What just happened?

We imported pandas for data manipulation, json for JSON handling, and pathlib for clean file path management. Try this: Always use pathlib instead of string concatenation for file paths.

Now let's read our main CSV file that contains e-commerce transaction data:

# Read CSV file with proper data type specification
df_orders = pd.read_csv('dataplexa_ecommerce.csv')

# Display basic information about our dataset
print(f"Dataset shape: {df_orders.shape}")
print(f"Memory usage: {df_orders.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

# Show first few rows to understand structure
df_orders.head(3)

What just happened?

Pandas automatically detected data types and loaded 50,000 rows with 12 columns. Notice order_id 1001 shows a wireless mouse purchase, and revenue 2599.00 matches quantity × unit_price. Try this: Always check .shape and .head() first to understand your data structure.

Handling JSON Data

JSON comes from APIs and modern applications. The tricky part? Nested structures that don't fit naturally into rows and columns. Here's how to flatten complex JSON into DataFrames:

# Simulate JSON data from customer API
customer_json = '''
{
  "customers": [
    {"id": 1001, "profile": {"age": 29, "city": "Mumbai"}, "preferences": ["Electronics"]},
    {"id": 1002, "profile": {"age": 34, "city": "Delhi"}, "preferences": ["Clothing", "Books"]}
  ]
}
'''

# Parse JSON string into Python dictionary
data = json.loads(customer_json)
print("JSON parsed successfully")
print(f"Number of customers: {len(data['customers'])}")
# Normalize nested JSON into flat DataFrame
from pandas import json_normalize

# Flatten the nested structure
df_customers = json_normalize(data['customers'])
print("Columns after normalization:")
print(df_customers.columns.tolist())

# Display the flattened data
df_customers

What just happened?

The json_normalize() function flattened nested objects using dot notation — profile.age and profile.city. Lists like preferences stay as arrays. Try this: Always check column names after normalization to understand the flattening pattern.

Data Quality Monitoring

Real-world data ingestion fails 20% of the time. Network issues, format changes, missing files — you need monitoring that catches problems before they break your pipeline.

# Create data quality checker function
def validate_ingested_data(df, expected_columns, min_rows=100):
    """Check if ingested data meets quality standards"""
    issues = []
    
    # Check column presence
    missing_cols = set(expected_columns) - set(df.columns)
    if missing_cols:
        issues.append(f"Missing columns: {missing_cols}")
    
    return issues
# Test our data quality function
required_cols = ['order_id', 'date', 'customer_age', 'revenue']

# Run validation check
issues = validate_ingested_data(df_orders, required_cols, min_rows=1000)

# Report results
if issues:
    print("⚠️  Data quality issues found:")
    for issue in issues:
        print(f"  - {issue}")
else:
    print("✅ Data quality check passed")

📊 Data Insight

Our dataset passed all quality checks with 50,000 rows and all required columns present. Revenue values range from ₹899 to ₹25,970, indicating healthy transaction diversity across product categories.

Electronics generates highest data volume due to detailed product specifications and user reviews

Electronics data dominates ingestion pipelines because each product carries extensive metadata — specifications, reviews, images, compatibility matrices. This 2.4 GB daily volume requires robust infrastructure and careful scheduling to avoid overwhelming downstream systems.

The volume distribution tells a business story. Electronics and Clothing drive 65% of data ingestion but only represent 40% of transactions. This suggests complex products need richer data to support customer decisions.

Handling Ingestion Errors

The scenario: Your Zepto data pipeline just failed at 3 AM because a CSV file had unexpected characters. You need bulletproof error handling that keeps the system running.

# Create robust file reader with error handling
def safe_read_csv(filepath, fallback_encoding='latin1'):
    """Read CSV with multiple encoding attempts"""
    try:
        # Try UTF-8 first (most common)
        df = pd.read_csv(filepath, encoding='utf-8')
        print(f"✅ Read {filepath} with UTF-8 encoding")
        return df
    except UnicodeDecodeError:
        print(f"⚠️  UTF-8 failed, trying {fallback_encoding}")
# Add retry mechanism for network-based ingestion
import time
from requests.adapters import HTTPAdapter

def ingest_with_retry(url, max_attempts=3, delay=2):
    """Fetch data with exponential backoff retry"""
    for attempt in range(max_attempts):
        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status()  # Raise exception for HTTP errors
            return response.json()
        except requests.RequestException as e:
            print(f"Attempt {attempt + 1} failed: {e}")
            if attempt < max_attempts - 1:
                time.sleep(delay * (2 ** attempt))  # Exponential backoff
    return None

What just happened?

We built a retry mechanism with exponential backoff — delays increase from 2s to 4s to 8s between attempts. The raise_for_status() method catches HTTP 4xx and 5xx errors. Try this: Always log failed attempts so you can tune retry parameters based on real failure patterns.

Common Mistake: Infinite Retries

Never retry indefinitely without limits. Set max_attempts=3 and implement circuit breakers. A broken API can consume your entire compute budget in minutes through endless retry loops.

Peak failures occur at 4 AM during maintenance windows and API rate limit resets

The 4 AM failure spike reveals a critical insight — many third-party APIs reset rate limits and perform maintenance during these hours. Your ingestion schedule should account for this by implementing delayed retries or alternative data sources during high-risk periods.

Success rates stay above 90% throughout the day, but those failed ingestions represent real business impact. Each failed customer data sync could mean missing personalization opportunities worth ₹2,000-5,000 in revenue.

Performance Optimization

Large datasets kill ingestion pipelines. A 500MB CSV takes 3 minutes to load with default pandas settings. But with proper chunking and data types, you can cut that to 30 seconds.

# Optimize data types for memory efficiency
optimized_dtypes = {
    'order_id': 'int32',           # Smaller int type
    'customer_age': 'int8',        # Age fits in 8 bits
    'quantity': 'int8',            # Quantity 1-10 fits in 8 bits
    'unit_price': 'float32',       # Half precision for prices
    'rating': 'float32',           # Half precision for ratings
    'returned': 'bool'             # Boolean for true/false
}

print("Memory optimization mapping:")
for col, dtype in optimized_dtypes.items():
    print(f"  {col}: {dtype}")
# Compare memory usage before and after optimization
original_memory = df_orders.memory_usage(deep=True).sum() / 1024**2

# Apply optimized data types
df_optimized = df_orders.astype(optimized_dtypes)
optimized_memory = df_optimized.memory_usage(deep=True).sum() / 1024**2

print(f"Original memory: {original_memory:.2f} MB")
print(f"Optimized memory: {optimized_memory:.2f} MB")
print(f"Memory reduction: {((original_memory - optimized_memory) / original_memory * 100):.1f}%")