What is ETL?
ETL (Extract, Transform, Load) is the process of moving data from source systems into a data warehouse or data lake. It's the backbone of any data platform, enabling analytics and business intelligence.
Modern pipelines often use ELT (Extract, Load, Transform), where raw data is loaded first and transformations happen in the target system using tools like dbt.
ETL Architecture
- Extract: Pull data from source systems (databases, APIs, files)
- Transform: Clean, validate, and reshape data
- Load: Write data to destination (warehouse, lake, database)
- Orchestrate: Schedule and monitor pipeline execution
Extraction Patterns
import pandas as pd
import requests
from sqlalchemy import create_engine
# Extract from Database
def extract_from_database(connection_string, query):
engine = create_engine(connection_string)
return pd.read_sql(query, engine)
# Incremental extraction
def extract_incremental(connection_string, table, last_modified):
query = f"""
SELECT * FROM {table}
WHERE updated_at > '{last_modified}'
"""
return extract_from_database(connection_string, query)
# Extract from API
def extract_from_api(url, headers=None, params=None):
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json()
# Paginated API extraction
def extract_paginated_api(base_url, headers):
all_data = []
page = 1
while True:
response = requests.get(
f"{base_url}?page={page}",
headers=headers
)
data = response.json()
if not data['results']:
break
all_data.extend(data['results'])
page += 1
return all_data
# Extract from files
def extract_from_files(path, file_format='csv'):
if file_format == 'csv':
return pd.read_csv(path)
elif file_format == 'parquet':
return pd.read_parquet(path)
elif file_format == 'json':
return pd.read_json(path)
Transformation Patterns
import pandas as pd
import numpy as np
def transform_data(df):
"""Apply common transformations"""
# 1. Data cleaning
df = df.drop_duplicates()
df = df.dropna(subset=['required_column'])
# 2. Type casting
df['date_column'] = pd.to_datetime(df['date_column'])
df['amount'] = df['amount'].astype(float)
# 3. String normalization
df['email'] = df['email'].str.lower().str.strip()
df['name'] = df['name'].str.title()
# 4. Derived columns
df['year'] = df['date_column'].dt.year
df['month'] = df['date_column'].dt.month
df['full_name'] = df['first_name'] + ' ' + df['last_name']
# 5. Conditional logic
df['segment'] = np.where(
df['total_spend'] > 1000, 'Premium', 'Standard'
)
# 6. Aggregations
summary = df.groupby('customer_id').agg({
'order_id': 'count',
'amount': 'sum'
}).reset_index()
# 7. Joins
df = df.merge(
lookup_table,
on='key_column',
how='left'
)
return df
# Data validation
def validate_data(df):
"""Validate data quality"""
errors = []
# Check for nulls in required columns
null_counts = df[['id', 'email']].isnull().sum()
if null_counts.any():
errors.append(f"Null values found: {null_counts.to_dict()}")
# Check for duplicates
dup_count = df.duplicated(subset=['id']).sum()
if dup_count > 0:
errors.append(f"Found {dup_count} duplicate records")
# Check value ranges
invalid_amounts = df[df['amount'] < 0]
if len(invalid_amounts) > 0:
errors.append(f"Found {len(invalid_amounts)} negative amounts")
if errors:
raise ValueError(f"Validation failed: {errors}")
return df
Loading Patterns
from sqlalchemy import create_engine
import pandas as pd
def load_to_database(df, table_name, connection_string, mode='append'):
"""
Load data to database
mode: 'append', 'replace', or 'fail'
"""
engine = create_engine(connection_string)
df.to_sql(
name=table_name,
con=engine,
if_exists=mode,
index=False,
chunksize=10000 # Batch inserts
)
# Upsert pattern (merge)
def upsert_data(df, table_name, key_columns, connection_string):
"""Insert new records, update existing ones"""
engine = create_engine(connection_string)
# Load existing keys
existing = pd.read_sql(
f"SELECT {', '.join(key_columns)} FROM {table_name}",
engine
)
# Split into inserts and updates
merged = df.merge(existing, on=key_columns, how='left', indicator=True)
to_insert = merged[merged['_merge'] == 'left_only'].drop('_merge', axis=1)
to_update = merged[merged['_merge'] == 'both'].drop('_merge', axis=1)
# Insert new records
if len(to_insert) > 0:
to_insert.to_sql(table_name, engine, if_exists='append', index=False)
# Update existing records (simplified)
for _, row in to_update.iterrows():
update_query = build_update_query(table_name, row, key_columns)
engine.execute(update_query)
# Load to data lake (S3/GCS)
def load_to_data_lake(df, path, partition_cols=None):
"""Write partitioned parquet to data lake"""
df.to_parquet(
path,
partition_cols=partition_cols,
compression='snappy',
index=False
)
Complete Pipeline Example
from datetime import datetime
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ETLPipeline:
def __init__(self, config):
self.config = config
self.metrics = {}
def run(self):
"""Execute the full ETL pipeline"""
try:
logger.info("Starting ETL pipeline")
start_time = datetime.now()
# Extract
logger.info("Extracting data...")
raw_data = self.extract()
self.metrics['extracted_rows'] = len(raw_data)
# Transform
logger.info("Transforming data...")
transformed_data = self.transform(raw_data)
self.metrics['transformed_rows'] = len(transformed_data)
# Validate
logger.info("Validating data...")
self.validate(transformed_data)
# Load
logger.info("Loading data...")
self.load(transformed_data)
self.metrics['loaded_rows'] = len(transformed_data)
# Record metrics
self.metrics['duration'] = (datetime.now() - start_time).seconds
self.metrics['status'] = 'success'
logger.info(f"Pipeline completed: {self.metrics}")
return self.metrics
except Exception as e:
self.metrics['status'] = 'failed'
self.metrics['error'] = str(e)
logger.error(f"Pipeline failed: {e}")
raise
def extract(self):
# Implementation
pass
def transform(self, data):
# Implementation
pass
def validate(self, data):
# Implementation
pass
def load(self, data):
# Implementation
pass
# Usage
if __name__ == '__main__':
config = {'source': 'postgres', 'destination': 'warehouse'}
pipeline = ETLPipeline(config)
pipeline.run()
Error Handling
import time
from functools import wraps
def retry(max_attempts=3, delay=1, backoff=2):
"""Retry decorator with exponential backoff"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
attempts = 0
current_delay = delay
while attempts < max_attempts:
try:
return func(*args, **kwargs)
except Exception as e:
attempts += 1
if attempts == max_attempts:
raise
logger.warning(
f"Attempt {attempts} failed: {e}. "
f"Retrying in {current_delay}s..."
)
time.sleep(current_delay)
current_delay *= backoff
return wrapper
return decorator
@retry(max_attempts=3, delay=5)
def extract_with_retry():
"""Extraction with automatic retry"""
return extract_from_api(url)
# Dead letter queue pattern
def process_with_dlq(records, process_func, dlq_path):
"""Process records, send failures to dead letter queue"""
successful = []
failed = []
for record in records:
try:
result = process_func(record)
successful.append(result)
except Exception as e:
failed.append({
'record': record,
'error': str(e),
'timestamp': datetime.now().isoformat()
})
# Save failed records for later reprocessing
if failed:
pd.DataFrame(failed).to_json(dlq_path)
logger.warning(f"Sent {len(failed)} records to DLQ")
return successful
Best Practices
- Idempotency: Pipeline should produce same result when re-run
- Incremental processing: Only process new/changed data
- Data validation: Validate at extraction and after transformation
- Logging and monitoring: Track metrics, errors, and data lineage
- Atomic operations: All-or-nothing loads prevent partial data
- Testing: Unit test transformations, integration test full pipeline
Master ETL Pipelines with Expert Mentorship
Our Data Engineering program covers ETL design patterns and implementation. Build production-ready pipelines with guidance from industry experts.
Explore Data Engineering Program