What is Apache Airflow?

Apache Airflow is an open-source platform for programmatically authoring, scheduling, and monitoring workflows. Originally developed at Airbnb, it has become the industry standard for orchestrating data pipelines.

Airflow uses Python to define workflows as Directed Acyclic Graphs (DAGs), making it flexible and powerful for any orchestration need.

Core Concepts

  • DAG: Directed Acyclic Graph - a collection of tasks with dependencies
  • Task: A unit of work defined by an Operator
  • Operator: Template for a predefined task (PythonOperator, BashOperator, etc.)
  • Sensor: Operator that waits for a condition to be met
  • Hook: Interface to external systems (databases, APIs)
  • XCom: Cross-communication for passing data between tasks

Your First DAG

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

# Default arguments
default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'email': ['alerts@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

# Define DAG
dag = DAG(
    'my_first_dag',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval='0 0 * * *',  # Daily at midnight
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['example'],
)

# Define tasks
def extract_data():
    print("Extracting data...")
    return {'records': 100}

def transform_data(**context):
    extracted = context['ti'].xcom_pull(task_ids='extract')
    print(f"Transforming {extracted['records']} records")
    return {'transformed': extracted['records']}

extract = PythonOperator(
    task_id='extract',
    python_callable=extract_data,
    dag=dag,
)

transform = PythonOperator(
    task_id='transform',
    python_callable=transform_data,
    dag=dag,
)

load = BashOperator(
    task_id='load',
    bash_command='echo "Loading data to warehouse"',
    dag=dag,
)

# Set dependencies
extract >> transform >> load

TaskFlow API (Airflow 2.0+)

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl']
)
def etl_pipeline():

    @task()
    def extract():
        """Extract data from source"""
        return {'data': [1, 2, 3, 4, 5]}

    @task()
    def transform(raw_data: dict):
        """Transform the extracted data"""
        return {'data': [x * 2 for x in raw_data['data']]}

    @task()
    def load(transformed_data: dict):
        """Load data to destination"""
        print(f"Loading: {transformed_data}")

    # Define dependencies via function calls
    raw = extract()
    transformed = transform(raw)
    load(transformed)

# Instantiate DAG
etl_dag = etl_pipeline()

Common Operators

from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.sensors.filesystem import FileSensor

# Python Operator
python_task = PythonOperator(
    task_id='python_task',
    python_callable=my_function,
    op_kwargs={'param': 'value'},
)

# Bash Operator
bash_task = BashOperator(
    task_id='bash_task',
    bash_command='python /scripts/process.py {{ ds }}',
)

# PostgreSQL Operator
sql_task = PostgresOperator(
    task_id='run_sql',
    postgres_conn_id='postgres_default',
    sql='SELECT * FROM users WHERE date = {{ ds }}',
)

# HTTP Operator
api_task = SimpleHttpOperator(
    task_id='call_api',
    http_conn_id='api_connection',
    endpoint='/data/{{ ds }}',
    method='GET',
)

# File Sensor - waits for file
wait_for_file = FileSensor(
    task_id='wait_for_file',
    filepath='/data/input/{{ ds }}.csv',
    poke_interval=60,
    timeout=3600,
)

Dynamic DAGs

# Generate tasks dynamically
tables = ['users', 'orders', 'products']

for table in tables:
    task = PythonOperator(
        task_id=f'process_{table}',
        python_callable=process_table,
        op_kwargs={'table': table},
        dag=dag,
    )

    start >> task >> end

# Branching
from airflow.operators.python import BranchPythonOperator

def choose_branch(**context):
    if condition:
        return 'branch_a'
    return 'branch_b'

branch = BranchPythonOperator(
    task_id='branch',
    python_callable=choose_branch,
)

Best Practices

  • Idempotent tasks: Tasks should produce same result when re-run
  • Atomic tasks: Each task does one thing well
  • No heavy processing in DAG file: DAG parsing should be fast
  • Use Connections: Store credentials securely in Airflow
  • Test DAGs: Write unit tests for your Python functions
  • Monitor: Set up alerts and monitor DAG runs

Master Apache Airflow with Expert Mentorship

Our Data Engineering program covers Airflow from basics to production deployment. Build robust pipeline orchestration with guidance from industry experts.

Explore Data Engineering Program

Related Articles