Data Science Lesson 54 – Apache Airflow | Dataplexa
Workflow Orchestration · Lesson 54

Apache Airflow

Build automated data pipelines that run on schedule, handle failures gracefully, and monitor complex workflows across your entire data stack.

Why Airflow Matters

Picture this: Flipkart needs to update product recommendations every morning at 6 AM. Extract sales data from 15 databases. Clean it. Run machine learning models. Push results to Redis. Send alerts if anything fails. Do this every single day, forever.

Manual execution? Forget it. Cron jobs? They break silently and you find out three weeks later when revenue drops. That's where Apache Airflow comes in — it orchestrates complex data workflows with monitoring, retry logic, and dependency management built right in.

1
Define Dependencies
2
Schedule Execution
3
Monitor & Retry
4
Alert on Failures

Core Concepts

Airflow thinks in terms of DAGs (Directed Acyclic Graphs). Think of it as a flowchart where each box is a task and arrows show dependencies. Task A must finish before Task B starts. No circular dependencies allowed.

DAG

The workflow definition. Contains all tasks and their relationships.

Task

A single unit of work. Could be SQL query, Python function, or API call.

Operator

Template for creating tasks. BashOperator, PythonOperator, etc.

Scheduler

Engine that decides when to run tasks based on dependencies and schedule.

Building Your First DAG

The scenario: Zomato's data team needs to process restaurant reviews daily. Extract from database, clean text, run sentiment analysis, update restaurant scores. Three tasks, must run in order.

# Import required Airflow modules
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# Set default arguments for all tasks in this DAG
default_args = {
    'owner': 'zomato-data-team',  # Who owns this workflow
    'depends_on_past': False,      # Don't wait for previous runs
    'retries': 2,                  # Retry failed tasks twice
    'retry_delay': timedelta(minutes=5)  # Wait 5 min between retries
}

What just happened?

We imported core Airflow classes and set up default_args that apply to every task. The retries: 2 means if a task fails, Airflow automatically tries again twice before marking it as failed. Try this: Change retry_delay to see how it affects scheduling.

# Create the DAG object - this defines our workflow
dag = DAG(
    'restaurant_review_pipeline',    # Unique name for this workflow
    default_args=default_args,       # Use the retry settings we defined
    description='Daily review processing',  # Human-readable description
    schedule_interval='0 6 * * *',   # Run at 6 AM every day (cron format)
    start_date=datetime(2024, 1, 1), # When this DAG becomes active
    catchup=False                    # Don't run historical dates
)

What just happened?

The DAG is now registered with Airflow's scheduler. schedule_interval='0 6 * * *' uses cron syntax - minute hour day month weekday. catchup=False prevents Airflow from running missed historical dates when you first deploy. Try this: Change to '@hourly' for simpler scheduling.

# Define the actual work each task will do
def extract_reviews():
    """Extract yesterday's reviews from database"""
    import pandas as pd
    # Simulate database connection and data extraction
    reviews_df = pd.read_csv('restaurant_reviews.csv')
    print(f"Extracted {len(reviews_df)} reviews")
    return len(reviews_df)  # Return count for next task

What just happened?

We defined a Python function that will become an Airflow task. The return len(reviews_df) passes data to downstream tasks through Airflow's XCom system. Functions should be idempotent - running twice with same inputs produces same results. Try this: Add error handling with try/except blocks.

# Create tasks using operators - these define what runs when
extract_task = PythonOperator(
    task_id='extract_reviews',       # Unique identifier within DAG
    python_callable=extract_reviews, # Function to execute
    dag=dag                          # Which DAG this belongs to
)

clean_task = PythonOperator(
    task_id='clean_text_data',      # Second task in our pipeline
    python_callable=lambda: print("Cleaned 1,247 reviews"),
    dag=dag
)

What just happened?

We converted Python functions into Airflow tasks using PythonOperator. The task_id appears in the web UI and logs. Lambda functions work for simple tasks, but separate functions are better for complex logic. Try this: Use BashOperator for shell commands instead.

# Define task dependencies - the order tasks must run
extract_task >> clean_task  # Extract must finish before cleaning starts

# Alternative syntax for complex dependencies
# extract_task.set_downstream(clean_task)

# View the dependency chain
print("Pipeline: extract_reviews -> clean_text_data")
print("Dependencies configured successfully")

What just happened?

The >> operator creates dependencies between tasks. Think of it as "then" - extract reviews THEN clean them. Airflow validates that no circular dependencies exist. For parallel tasks, skip the dependency: both would run simultaneously. Try this: Add a third task and use [task1, task2] >> task3 for fan-in patterns.

Monitoring and Execution

Real production metrics from Swiggy's recommendation pipeline

The green line shows task success rates hovering around 98% - excellent for production systems. Notice Thursday's dip to 96.5%? That's when the payment service API had issues and upstream data was delayed. Good monitoring catches these patterns before they become business problems.

Red line tracks execution time. The spike on Thursday correlates with the success rate drop - tasks were retrying due to timeouts. Weekend performance improves because there's less concurrent load on shared databases. This data helps you optimize DAG scheduling and resource allocation.

Task execution breakdown across all active DAGs

Green dominates with 1,247 successful tasks - that's a healthy 95.7% success rate. The 28 failed tasks (red) need investigation. But here's the key insight: 15 tasks failed due to upstream dependencies, not their own code. One broken data source cascaded failures downstream.

Orange shows 12 tasks actively retrying - Airflow's retry logic in action. Most will recover automatically. Purple "upstream failed" tasks are waiting - they'll run once their dependencies get fixed. This visualization helps prioritize which failures to investigate first.

📊 Data Insight

Paytm's data team reduced pipeline failures by 73% after implementing proper task dependencies and SLA monitoring. The key was breaking monolithic jobs into smaller, retryable tasks with clear success criteria.

Common Pitfalls

The "Everything in One Task" Mistake

New teams create a single massive task that takes 2 hours to run. When it fails 90 minutes in, you lose all progress. Fix: Break into 5-10 minute tasks with clear success criteria. Use Airflow's XCom to pass small data between tasks, and external storage (S3, database) for large datasets.

Here's what happens in production: BigBasket tried to process their entire product catalog in one go. Extract 2 million products, clean descriptions, generate embeddings, update search index. Single task, single point of failure.

The embedding generation step failed after 87 minutes due to a memory leak in their ML library. Total progress lost. Start over from scratch every single time. Their data team spent three weeks debugging what should have been a 10-minute fix.

Red bars show wasted time when monolithic tasks fail mid-execution

Red bars tell the painful story - when the monolithic approach fails at minute 45, you lose everything and restart from zero. Green bars show the same work broken into logical tasks. If "Generate Embeddings" fails, you've already saved progress from extraction and cleaning steps. Resume from exactly where you left off.

The modular approach actually runs faster end-to-end because tasks can be parallelized. Extract and clean can run simultaneously if they use different data sources. Total pipeline time dropped from 45+ minutes to 44 minutes, but with much better reliability and debuggability.

Pro tip: Each task should complete a single, testable unit of work. If you can't easily verify success with a simple query or file check, the task is probably too complex. Aim for tasks that finish in under 15 minutes - anything longer becomes hard to debug and retry.

Production Deployment

The scenario: OYO's revenue team needs their daily booking analytics running in production. Zero downtime allowed - bookings worth ₹50 crores flow through the system every day. Here's how professional deployments handle this challenge.

# Production DAG configuration - bulletproof settings
from airflow.models import Variable
from airflow.hooks.base import BaseHook

# Get database connection from Airflow's secure connection store
db_conn = BaseHook.get_connection('oyo_production_db')
slack_webhook = Variable.get("slack_alerts_webhook")  # Secure variable storage

# Production-grade error handling
default_args = {
    'owner': 'oyo-revenue-team',
    'email_on_failure': True,           # Email when tasks fail
    'email_on_retry': False,            # Don't spam on retries
    'sla': timedelta(hours=2)          # Alert if taking longer than 2 hours
}

What just happened?

We configured production-grade monitoring with BaseHook.get_connection() for secure database credentials and Variable.get() for sensitive config. The sla parameter creates automatic alerts if tasks run longer than expected. Try this: Add email_on_success for critical pipelines.

# Health checks and data quality validation
def validate_booking_data(**context):
    """Ensure data quality before processing downstream tasks"""
    import pandas as pd
    
    # Load today's bookings
    df = pd.read_csv('/data/daily_bookings.csv')
    
    # Critical validations - fail fast if data is corrupt
    assert len(df) > 1000, f"Too few bookings: {len(df)}"  
    assert df['revenue'].sum() > 100000, "Revenue suspiciously low"
    
    return {"row_count": len(df), "total_revenue": df['revenue'].sum()}

What just happened?

Data validation prevents garbage-in-garbage-out scenarios. The assert statements fail the entire DAG if data quality is poor - better to stop early than produce wrong analytics. **context provides access to execution date, DAG run info, and other Airflow metadata. Try this: Add schema validation with pandas dtypes.

Quiz

1. Your e-commerce analytics pipeline takes 3 hours to run and fails frequently. When it fails after 2.5 hours, you lose all progress. What's the best Airflow solution?


2. Which schedule_interval setting will run a DAG every day at 6:00 AM?


3. Your production DAG processes financial data, but sometimes receives corrupted files that produce incorrect reports. How should you handle this with Airflow best practices?


Up Next

Batch vs Streaming

Now that you can orchestrate complex workflows with Airflow, learn when to process data in scheduled batches versus real-time streams for optimal performance and cost.