TechSetupGuides
Intermediatepythonworkfloworchestrationetldata-engineeringschedulerdagdata-pipelinesmlopsautomation

Apache Airflow: Workflow orchestration platform

Platform to programmatically author, schedule, and monitor workflows. Build complex data pipelines, ETL/ELT processes, and machine learning workflows with Python code.

  1. Step 1

    What is Apache Airflow?

    Apache Airflow is an open-source platform for programmatically authoring, scheduling, and monitoring workflows. Originally developed by Airbnb in 2014 and open-sourced in 2015, it became an Apache top-level project in 2019. With over 45,000 GitHub stars, Airflow is the industry standard for workflow orchestration.

    Airflow allows you to define workflows as Directed Acyclic Graphs (DAGs) using Python code. Each workflow is a collection of tasks with dependencies, schedules, and retry logic. The rich UI provides visibility into pipeline execution, logs, and debugging tools. Airflow is designed for batch-oriented workflows and is particularly strong in data engineering, ETL/ELT processes, machine learning pipelines, and DevOps automation.

  2. Step 2

    Technology stack

    Apache Airflow is built on a modular Python architecture with pluggable components:

    Core platform:

    • Python 3.8-3.12 (Airflow 2.10+)
    • SQLAlchemy for database abstraction
    • Flask for web server and API
    • Gunicorn/uWSGI for production web serving
    • Celery or Kubernetes for distributed task execution
    • Redis or RabbitMQ for message broker (Celery executor)

    Database support:

    • PostgreSQL (recommended for production)
    • MySQL/MariaDB
    • SQLite (development only)

    Web UI:

    • Flask-AppBuilder for the admin interface
    • React components for interactive visualizations
    • D3.js for DAG graph rendering

    Task execution:

    • LocalExecutor (single machine)
    • CeleryExecutor (distributed workers)
    • KubernetesExecutor (dynamic pod creation)
    • Sequential/Debug executors (testing)

    Integrations:

    • 1000+ provider packages for AWS, GCP, Azure, Databricks, Snowflake, dbt, Spark, Kubernetes, Docker, and more
    • Extensive operator library for common tasks
    • Custom operators and hooks for any API

    Airflow follows a modular provider pattern where integrations are installed separately from core, allowing minimal footprint deployments.

    Core:
    ├── Python 3.8-3.12
    ├── SQLAlchemy (ORM)
    ├── Flask (web + API)
    └── Gunicorn/uWSGI
    
    Executors:
    ├── LocalExecutor
    ├── CeleryExecutor (+ Redis/RabbitMQ)
    ├── KubernetesExecutor
    └── Sequential/Debug
    
    Database:
    ├── PostgreSQL (recommended)
    ├── MySQL/MariaDB
    └── SQLite (dev only)
    
    UI:
    ├── Flask-AppBuilder
    ├── React
    └── D3.js (DAG graphs)
    
    Providers:
    ├── AWS, GCP, Azure (cloud)
    ├── Databricks, Snowflake (data)
    ├── Spark, dbt (processing)
    └── Docker, Kubernetes (containers)
  3. Step 3

    Core concepts

    Understanding Airflow's architecture is key to building robust workflows:

    DAG (Directed Acyclic Graph): A collection of tasks with dependencies. DAGs are defined in Python files placed in the dags/ folder. Airflow scans this folder periodically to discover new or modified DAGs.

    Task: A single unit of work. Tasks are instances of operators (e.g., PythonOperator, BashOperator, SQLOperator). Tasks can depend on other tasks, creating a workflow graph.

    Operator: A template for a task. Operators define what work to perform. Common operators include PythonOperator (run Python functions), BashOperator (run shell commands), EmailOperator (send emails), and provider-specific operators (S3ToRedshiftOperator, BigQueryOperator, etc.).

    Sensor: A special operator that waits for a condition to be met (e.g., file exists, API returns success). Sensors run periodically until success or timeout.

    Hook: A high-level interface to external systems. Hooks handle authentication, connection pooling, and API calls. Operators use hooks internally.

    Executor: Determines how and where tasks run. LocalExecutor runs tasks in parallel on a single machine. CeleryExecutor distributes tasks across multiple worker nodes. KubernetesExecutor creates a new Kubernetes pod for each task.

    Scheduler: Core component that reads DAGs, determines task dependencies, and queues tasks for execution based on schedules and dependencies.

    Webserver: Flask application that serves the UI for monitoring, triggering, and debugging workflows.

    # Example DAG structure
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from airflow.operators.bash import BashOperator
    from datetime import datetime
    
    with DAG(
        dag_id='example_dag',
        start_date=datetime(2024, 1, 1),
        schedule='@daily',  # Run once per day
        catchup=False,
    ) as dag:
        
        task_1 = BashOperator(
            task_id='print_date',
            bash_command='date',
        )
        
        task_2 = PythonOperator(
            task_id='process_data',
            python_callable=lambda: print('Processing data'),
        )
        
        task_3 = PythonOperator(
            task_id='send_report',
            python_callable=lambda: print('Sending report'),
        )
        
        # Define dependencies: task_1 -> task_2 -> task_3
        task_1 >> task_2 >> task_3
  4. Step 4

    Prerequisites

    Before installing Apache Airflow, ensure your environment meets these requirements:

    • Python 3.8, 3.9, 3.10, 3.11, or 3.12 installed
    • Sufficient memory (minimum 4GB RAM for small workloads; 8GB+ recommended)
    • Linux, macOS, or Windows (via WSL2)
    • PostgreSQL or MySQL for production deployments (SQLite works for development)
    • Docker (optional, for containerized deployment)

    Airflow is memory-intensive, especially the scheduler and webserver. Plan resources based on the number of DAGs and task concurrency.

    Note for Windows users: Airflow is not officially supported on native Windows. Use WSL2 (Windows Subsystem for Linux 2) or Docker.

    # Check Python version (3.8-3.12 required)
    python --version
    
    # Verify pip is installed
    pip --version
    
    # (Optional) Check Docker for containerized deployment
    docker --version
    
    # Recommended: Create a virtual environment
    python -m venv airflow-venv
    source airflow-venv/bin/activate  # On Windows WSL: same command
  5. Step 5

    Quick start installation

    The fastest way to get started with Airflow is using pip with constraints. Airflow has many dependencies, and the constraints file ensures compatible versions.

    Important: Set AIRFLOW_HOME before installation. This directory will store DAGs, logs, and configuration. Default is ~/airflow.

    # Set Airflow home directory
    export AIRFLOW_HOME=~/airflow
    
    # Set Python and Airflow versions
    export AIRFLOW_VERSION=2.10.4
    export PYTHON_VERSION="$(python -c 'import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}')"
    
    # Build constraints URL
    export CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
    
    # Install Airflow with constraints
    pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
    
    # Initialize the database (creates SQLite DB by default)
    airflow db migrate
    
    # Create an admin user
    airflow users create \
        --username admin \
        --firstname Admin \
        --lastname User \
        --role Admin \
        --email admin@example.com
    
    # Start the webserver (default port 8080)
    airflow webserver --port 8080
    
    # In a separate terminal, start the scheduler
    airflow scheduler
  6. Step 6

    Docker Compose deployment

    For production-like environments or quick experimentation, use the official Docker Compose setup. This includes all services (webserver, scheduler, database, Redis) and uses CeleryExecutor for distributed task execution.

    # Download the official docker-compose.yaml
    curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'
    
    # Create required directories
    mkdir -p ./dags ./logs ./plugins ./config
    
    # Set the Airflow user (prevents permission issues)
    echo -e "AIRFLOW_UID=$(id -u)" > .env
    
    # Initialize the database
    docker compose up airflow-init
    
    # Start all services
    docker compose up
    
    # Access the UI at http://localhost:8080
    # Default credentials: airflow / airflow
    
    # To stop all services
    docker compose down
    
    # To remove volumes (clean slate)
    docker compose down --volumes --remove-orphans
  7. Step 7

    Configuration basics

    Airflow configuration is managed through airflow.cfg (located in $AIRFLOW_HOME) and environment variables. Environment variables take precedence and follow the pattern AIRFLOW__SECTION__KEY.

    Key configuration areas:

    • Executor: Set executor to LocalExecutor, CeleryExecutor, or KubernetesExecutor based on your deployment
    • Database: Configure sql_alchemy_conn for PostgreSQL or MySQL in production
    • Webserver: Set base_url, web_server_port, and authentication settings
    • Scheduler: Configure dag_dir_list_interval (how often to scan for new DAGs)
    • Logging: Set base_log_folder and remote logging backends (S3, GCS, etc.)
    • Security: Configure secret_key, enable authentication, set up RBAC

    For production, always use a robust database (PostgreSQL recommended) and never use SQLite.

    # Example: Configure PostgreSQL database via environment variable
    export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN='postgresql+psycopg2://airflow:password@localhost:5432/airflow'
    
    # Set executor to LocalExecutor for single-machine deployments
    export AIRFLOW__CORE__EXECUTOR='LocalExecutor'
    
    # Increase parallelism for more concurrent tasks
    export AIRFLOW__CORE__PARALLELISM=32
    export AIRFLOW__CORE__DAG_CONCURRENCY=16
    
    # Enable example DAGs (useful for learning)
    export AIRFLOW__CORE__LOAD_EXAMPLES='True'
    
    # Or edit airflow.cfg directly
    # [core]
    # executor = LocalExecutor
    # [database]
    # sql_alchemy_conn = postgresql+psycopg2://airflow:password@localhost:5432/airflow
  8. Step 8

    Creating your first DAG

    DAGs are Python files placed in the dags/ folder. Airflow scans this folder and automatically loads new DAGs. Here's a complete example that demonstrates task dependencies, scheduling, and retries.

    # Save as $AIRFLOW_HOME/dags/my_first_dag.py
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from airflow.operators.bash import BashOperator
    from datetime import datetime, timedelta
    
    # Default arguments applied to all tasks
    default_args = {
        'owner': 'data-team',
        'depends_on_past': False,
        'email': ['alerts@example.com'],
        'email_on_failure': True,
        'email_on_retry': False,
        'retries': 3,
        'retry_delay': timedelta(minutes=5),
    }
    
    # Define the DAG
    with DAG(
        dag_id='my_first_dag',
        default_args=default_args,
        description='A simple data pipeline',
        schedule='0 2 * * *',  # Run at 2 AM daily (cron syntax)
        start_date=datetime(2024, 1, 1),
        catchup=False,  # Don't backfill missed runs
        tags=['example', 'data'],
    ) as dag:
        
        def extract_data():
            print('Extracting data from source')
            return {'records': 100}
        
        def transform_data(**context):
            # Access output from previous task
            ti = context['ti']
            data = ti.xcom_pull(task_ids='extract')
            print(f'Transforming {data["records"]} records')
            return {'processed': data['records']}
        
        def load_data(**context):
            ti = context['ti']
            data = ti.xcom_pull(task_ids='transform')
            print(f'Loading {data["processed"]} records to destination')
        
        # Define tasks
        extract = PythonOperator(
            task_id='extract',
            python_callable=extract_data,
        )
        
        transform = PythonOperator(
            task_id='transform',
            python_callable=transform_data,
        )
        
        load = PythonOperator(
            task_id='load',
            python_callable=load_data,
        )
        
        cleanup = BashOperator(
            task_id='cleanup',
            bash_command='echo "Cleaning up temporary files"',
        )
        
        # Set dependencies
        extract >> transform >> load >> cleanup
  9. Step 9

    Working with providers

    Airflow providers are separate packages that add integrations with external systems. The core Airflow installation is minimal; install only the providers you need.

    Popular providers:

    • apache-airflow-providers-amazon (AWS S3, Redshift, EMR, Glue, etc.)
    • apache-airflow-providers-google (GCP BigQuery, GCS, Dataproc, etc.)
    • apache-airflow-providers-snowflake (Snowflake data warehouse)
    • apache-airflow-providers-databricks (Databricks jobs)
    • apache-airflow-providers-postgres (PostgreSQL operators)
    • apache-airflow-providers-http (HTTP requests)
    • apache-airflow-providers-docker (Docker containers)
    • apache-airflow-providers-kubernetes (Kubernetes pods)

    Providers include operators, hooks, sensors, and transfers for seamless integration.

    # Install AWS provider
    pip install apache-airflow-providers-amazon
    
    # Install multiple providers
    pip install apache-airflow-providers-google apache-airflow-providers-postgres
    
    # List installed providers
    airflow providers list
    
    # View available provider packages
    airflow providers list --output table
  10. Step 10

    Example: AWS S3 to Redshift ETL

    A common pattern is extracting data from S3, transforming it, and loading into Redshift. This example demonstrates provider operators and task dependencies.

    from airflow import DAG
    from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
    from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
    from airflow.providers.postgres.operators.postgres import PostgresOperator
    from datetime import datetime
    
    with DAG(
        dag_id='s3_to_redshift_etl',
        start_date=datetime(2024, 1, 1),
        schedule='@hourly',
        catchup=False,
    ) as dag:
        
        # Wait for new data file in S3
        wait_for_file = S3KeySensor(
            task_id='wait_for_s3_file',
            bucket_name='my-data-bucket',
            bucket_key='incoming/data_{{ ds }}.csv',  # Templated with execution date
            aws_conn_id='aws_default',
            timeout=600,
            poke_interval=60,
        )
        
        # Copy data from S3 to Redshift staging table
        copy_to_staging = S3ToRedshiftOperator(
            task_id='copy_to_redshift',
            s3_bucket='my-data-bucket',
            s3_key='incoming/data_{{ ds }}.csv',
            schema='staging',
            table='raw_data',
            redshift_conn_id='redshift_default',
            aws_conn_id='aws_default',
            copy_options=['CSV', 'IGNOREHEADER 1'],
        )
        
        # Transform and load into production table
        transform_and_load = PostgresOperator(
            task_id='transform_data',
            postgres_conn_id='redshift_default',
            sql="""
                INSERT INTO production.clean_data
                SELECT 
                    id,
                    UPPER(name) as name,
                    processed_at,
                    '{{ ds }}' as batch_date
                FROM staging.raw_data
                WHERE processed_at >= '{{ ds }}'
            """,
        )
        
        # Cleanup staging table
        cleanup_staging = PostgresOperator(
            task_id='cleanup_staging',
            postgres_conn_id='redshift_default',
            sql='TRUNCATE TABLE staging.raw_data;',
        )
        
        wait_for_file >> copy_to_staging >> transform_and_load >> cleanup_staging
  11. Step 11

    Production best practices

    Running Airflow in production requires careful planning and configuration:

    Database: Use PostgreSQL (recommended) or MySQL. Never use SQLite in production. Configure connection pooling and tune based on workload.

    Executor: Choose based on scale. LocalExecutor works for small deployments on a single machine. CeleryExecutor scales horizontally with multiple workers. KubernetesExecutor provides dynamic scaling and task isolation.

    High availability: Run multiple scheduler instances (Airflow 2.0+) for redundancy. Use a load balancer for the webserver.

    Monitoring: Integrate with Prometheus, StatsD, or DataDog. Monitor scheduler lag, task duration, and failure rates. Set up alerting for task failures.

    Resource limits: Set task_concurrency, parallelism, and dag_concurrency to prevent resource exhaustion. Use task pools to limit concurrent execution of specific task types.

    DAG best practices:

    • Keep DAG files lightweight (no heavy imports at the top level)
    • Use dynamic task generation sparingly
    • Set appropriate start_date and avoid datetime.now()
    • Disable catchup unless backfilling is needed
    • Use XComs sparingly; don't pass large data between tasks
    • Implement idempotent tasks that can safely retry

    Security: Enable authentication (LDAP, OAuth, or database auth). Configure RBAC for fine-grained permissions. Use Fernet encryption for sensitive data in the database. Store credentials in Airflow Connections or external secret backends (AWS Secrets Manager, HashiCorp Vault, etc.).

    # Generate Fernet key for encrypting credentials
    python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
    
    # Set in airflow.cfg or environment variable
    export AIRFLOW__CORE__FERNET_KEY='your-generated-fernet-key'
    
    # Configure PostgreSQL for production
    export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN='postgresql+psycopg2://airflow:password@postgres:5432/airflow'
    export AIRFLOW__DATABASE__SQL_ALCHEMY_POOL_SIZE=10
    export AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_OVERFLOW=20
    
    # Use CeleryExecutor with Redis
    export AIRFLOW__CORE__EXECUTOR='CeleryExecutor'
    export AIRFLOW__CELERY__BROKER_URL='redis://redis:6379/0'
    export AIRFLOW__CELERY__RESULT_BACKEND='db+postgresql://airflow:password@postgres:5432/airflow'
    
    # Enable webserver authentication
    export AIRFLOW__WEBSERVER__AUTHENTICATE='True'
    export AIRFLOW__WEBSERVER__AUTH_BACKEND='airflow.contrib.auth.backends.password_auth'
  12. Step 12

    Testing DAGs

    Testing is critical for reliable workflows. Airflow provides several testing approaches:

    Unit testing: Test task logic independently using pytest. Mock external dependencies.

    DAG validation: Use airflow dags test to run a DAG without affecting the scheduler or database.

    Task testing: Use airflow tasks test to run a single task instance.

    Always validate DAG syntax before deploying to production. The airflow dags list-import-errors command shows DAGs that failed to load.

    # Check DAG syntax and import errors
    airflow dags list-import-errors
    
    # Test a complete DAG run (doesn't save to database)
    airflow dags test my_first_dag 2024-01-01
    
    # Test a single task
    airflow tasks test my_first_dag extract 2024-01-01
    
    # Unit test example (pytest)
    # test_dags.py
    import pytest
    from airflow.models import DagBag
    
    def test_dag_loaded():
        dagbag = DagBag()
        assert 'my_first_dag' in dagbag.dags
        assert len(dagbag.import_errors) == 0
    
    def test_dag_structure():
        dagbag = DagBag()
        dag = dagbag.get_dag('my_first_dag')
        assert len(dag.tasks) == 4
        assert 'extract' in dag.task_ids
  13. Step 13

    Monitoring and troubleshooting

    Airflow provides multiple tools for monitoring and debugging:

    Web UI: The primary interface for monitoring. View DAG runs, task instances, logs, and graphs. The Tree View shows historical runs; the Graph View shows task dependencies.

    Logs: Each task execution writes logs to $AIRFLOW_HOME/logs/. For distributed systems, configure remote logging (S3, GCS, Azure Blob) so all workers' logs are centralized.

    CLI commands: Use airflow dags list, airflow tasks list, and airflow dags state to inspect DAG and task states from the terminal.

    Metrics: Airflow exports metrics to StatsD, Prometheus, or OpenTelemetry for external monitoring systems.

    Common issues:

    • Task stuck in queue: Check executor capacity, database connections, or worker availability
    • Import errors: Run airflow dags list-import-errors to see DAG parsing failures
    • Scheduler lag: Increase scheduler resources or reduce DAG parsing frequency
    • Task failures: Check task logs in the UI or logs/ folder
    # List all DAGs
    airflow dags list
    
    # Show DAG structure
    airflow dags show my_first_dag
    
    # List tasks in a DAG
    airflow tasks list my_first_dag
    
    # Check DAG state for a specific date
    airflow dags state my_first_dag 2024-01-01
    
    # View task logs (from CLI)
    airflow tasks logs my_first_dag extract 2024-01-01
    
    # Trigger a DAG manually
    airflow dags trigger my_first_dag
    
    # Backfill a DAG for a date range
    airflow dags backfill my_first_dag \
        --start-date 2024-01-01 \
        --end-date 2024-01-31
  14. Step 14

    Helm deployment on Kubernetes

    For cloud-native deployments, use the official Airflow Helm chart. This provides production-ready configurations for running Airflow on Kubernetes with auto-scaling workers.

    # Add the Airflow Helm repository
    helm repo add apache-airflow https://airflow.apache.org
    helm repo update
    
    # Create a namespace for Airflow
    kubectl create namespace airflow
    
    # Install Airflow with default settings
    helm install airflow apache-airflow/airflow --namespace airflow
    
    # Or customize with values.yaml
    helm install airflow apache-airflow/airflow \
        --namespace airflow \
        --values custom-values.yaml
    
    # Example custom-values.yaml snippet:
    # executor: KubernetesExecutor
    # postgresql:
    #   enabled: true
    # webserver:
    #   replicas: 2
    # scheduler:
    #   replicas: 2
    
    # Access the webserver
    kubectl port-forward svc/airflow-webserver 8080:8080 --namespace airflow
    
    # Upgrade Airflow
    helm upgrade airflow apache-airflow/airflow --namespace airflow
  15. Step 15

    Resources

    Official website: https://airflow.apache.org

    Documentation: https://airflow.apache.org/docs/

    GitHub repository: https://github.com/apache/airflow

    Slack community: https://apache-airflow-slack.herokuapp.com (auto-invite) → Join #newbie-questions and #troubleshooting

    Mailing lists: dev@airflow.apache.org, users@airflow.apache.org

    Stack Overflow: Use the apache-airflow tag

    Provider packages: https://airflow.apache.org/docs/apache-airflow-providers/

    Astronomer: Commercial Airflow platform with managed hosting, blog, and learning resources at https://www.astronomer.io

    Awesome Airflow: Curated list of resources at https://github.com/jghoman/awesome-apache-airflow

    Website: https://airflow.apache.org
    Docs: https://airflow.apache.org/docs/
    GitHub: https://github.com/apache/airflow
    Slack: https://apache-airflow-slack.herokuapp.com
    Providers: https://airflow.apache.org/docs/apache-airflow-providers/
    Stack Overflow: tag [apache-airflow]
    Awesome Airflow: https://github.com/jghoman/awesome-apache-airflow

Feature requests

Sign in to suggest features or vote on existing ones.

No feature requests yet.

Discussion

0 people marked this as worked·Sign in to mark your own.

Sign in to join the discussion.

No comments yet.