What is Great Expectations?

Great Expectations (GX) is an open-source Python library for validating, documenting, and profiling your data. It helps you maintain data quality by defining "expectations" - assertions about your data that can be tested automatically.

Think of it as unit testing for your data pipelines.

Why Data Quality Matters

  • Catch issues early: Identify data problems before they affect downstream systems
  • Build trust: Stakeholders can rely on validated data
  • Documentation: Auto-generate data docs from expectations
  • Pipeline reliability: Stop bad data from propagating
  • Debugging: Clear reports when validation fails

Getting Started

# Install Great Expectations
pip install great_expectations

# Initialize a project
great_expectations init

# Project structure created:
# great_expectations/
# ├── great_expectations.yml   # Main config
# ├── expectations/            # Expectation suites
# ├── checkpoints/             # Validation checkpoints
# ├── plugins/                 # Custom expectations
# └── uncommitted/             # Local files

# Or programmatically
import great_expectations as gx

context = gx.get_context()  # Uses great_expectations.yml
# Or create in-memory context
context = gx.get_context(mode="ephemeral")

Core Concepts

┌─────────────────────────────────────────────────────────────────┐
│                    Great Expectations Concepts                   │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  Data Source          → Where your data lives (files, DB, etc.) │
│         ↓                                                        │
│  Data Asset           → A specific dataset (table, file)        │
│         ↓                                                        │
│  Batch                → A subset of data to validate            │
│         ↓                                                        │
│  Expectation Suite    → Collection of expectations              │
│         ↓                                                        │
│  Checkpoint           → Combines batch + suite for validation   │
│         ↓                                                        │
│  Validation Result    → Pass/fail results with details          │
│         ↓                                                        │
│  Data Docs            → Auto-generated documentation            │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Creating Expectations

import great_expectations as gx
import pandas as pd

# Create context
context = gx.get_context()

# Connect to data
data_source = context.sources.add_pandas("my_pandas_source")
data_asset = data_source.add_dataframe_asset("customers")

# Create a batch from DataFrame
df = pd.read_csv("customers.csv")
batch_request = data_asset.build_batch_request(dataframe=df)

# Create expectation suite
suite = context.add_expectation_suite("customers_suite")

# Get a validator
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="customers_suite"
)

# Add expectations
validator.expect_column_to_exist("customer_id")
validator.expect_column_to_exist("email")
validator.expect_column_to_exist("created_at")

validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_be_unique("customer_id")

validator.expect_column_values_to_match_regex(
    "email",
    regex=r"^[\w\.-]+@[\w\.-]+\.\w+$"
)

validator.expect_column_values_to_be_between(
    "age",
    min_value=0,
    max_value=120
)

# Save the suite
validator.save_expectation_suite(discard_failed_expectations=False)

Common Expectations

# Table-level expectations
validator.expect_table_row_count_to_be_between(min_value=1000, max_value=1000000)
validator.expect_table_column_count_to_equal(10)
validator.expect_table_columns_to_match_ordered_list(
    ["id", "name", "email", "created_at"]
)

# Column existence and type
validator.expect_column_to_exist("user_id")
validator.expect_column_values_to_be_of_type("amount", "float64")

# Null handling
validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_be_null("deleted_at")  # For soft deletes

# Uniqueness
validator.expect_column_values_to_be_unique("email")
validator.expect_compound_columns_to_be_unique(["order_id", "product_id"])

# Value constraints
validator.expect_column_values_to_be_in_set(
    "status",
    value_set=["pending", "completed", "cancelled"]
)
validator.expect_column_values_to_be_between(
    "price",
    min_value=0,
    max_value=10000
)

# String patterns
validator.expect_column_values_to_match_regex(
    "phone",
    regex=r"^\+?[1-9]\d{9,14}$"
)
validator.expect_column_value_lengths_to_be_between(
    "zip_code",
    min_value=5,
    max_value=10
)

# Date/time
validator.expect_column_values_to_be_dateutil_parseable("created_at")

# Statistical expectations
validator.expect_column_mean_to_be_between("amount", min_value=50, max_value=200)
validator.expect_column_median_to_be_between("age", min_value=25, max_value=45)
validator.expect_column_stdev_to_be_between("score", min_value=5, max_value=15)

# Distribution
validator.expect_column_distinct_values_to_be_in_set(
    "country",
    value_set=["US", "UK", "CA", "AU"]
)
validator.expect_column_proportion_of_unique_values_to_be_between(
    "user_id",
    min_value=0.9,
    max_value=1.0
)

Running Validations

# Create a checkpoint
checkpoint = context.add_or_update_checkpoint(
    name="customers_checkpoint",
    validations=[
        {
            "batch_request": batch_request,
            "expectation_suite_name": "customers_suite"
        }
    ]
)

# Run the checkpoint
result = checkpoint.run()

# Check if validation passed
if result.success:
    print("All expectations passed!")
else:
    print("Validation failed!")
    # Get details
    for validation_result in result.run_results.values():
        for expectation_result in validation_result["validation_result"]["results"]:
            if not expectation_result["success"]:
                print(f"Failed: {expectation_result['expectation_config']}")

# Build and view data docs
context.build_data_docs()
context.open_data_docs()

Integration with Pipelines

# Airflow integration
from airflow import DAG
from airflow.operators.python import PythonOperator
from great_expectations_provider.operators.great_expectations import (
    GreatExpectationsOperator
)

with DAG("data_pipeline", schedule_interval="@daily") as dag:

    validate_source = GreatExpectationsOperator(
        task_id="validate_source_data",
        data_context_root_dir="/path/to/great_expectations",
        checkpoint_name="source_checkpoint",
        fail_task_on_validation_failure=True
    )

    transform = PythonOperator(
        task_id="transform_data",
        python_callable=transform_data
    )

    validate_output = GreatExpectationsOperator(
        task_id="validate_output_data",
        data_context_root_dir="/path/to/great_expectations",
        checkpoint_name="output_checkpoint"
    )

    validate_source >> transform >> validate_output


# Direct Python integration
def validate_and_process(df):
    import great_expectations as gx

    context = gx.get_context()
    result = context.run_checkpoint(
        checkpoint_name="my_checkpoint",
        batch_request={
            "runtime_parameters": {"batch_data": df},
            "batch_identifiers": {"batch_id": "runtime_batch"}
        }
    )

    if not result.success:
        raise ValueError("Data validation failed!")

    return process_data(df)

Custom Expectations

# Create custom expectation
from great_expectations.expectations.expectation import ColumnMapExpectation
from great_expectations.execution_engine import PandasExecutionEngine

class ExpectColumnValuesToBeValidEmail(ColumnMapExpectation):
    """Expect column values to be valid email addresses."""

    expectation_type = "expect_column_values_to_be_valid_email"

    @classmethod
    def _validate(
        cls,
        configuration,
        metrics,
        runtime_configuration=None,
        execution_engine=None,
    ):
        import re
        email_pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$'

        column_values = metrics["column_values"]
        valid_count = sum(
            1 for val in column_values
            if val and re.match(email_pattern, str(val))
        )
        total_count = len(column_values)

        success = valid_count == total_count

        return {
            "success": success,
            "result": {
                "element_count": total_count,
                "unexpected_count": total_count - valid_count,
            }
        }

# Use custom expectation
validator.expect_column_values_to_be_valid_email("email")

Best Practices

  • Start simple: Begin with basic expectations, add complexity over time
  • Profile first: Use auto-profiling to discover data patterns
  • Version suites: Store expectation suites in version control
  • Fail fast: Validate at ingestion to catch issues early
  • Document context: Add notes explaining why expectations exist
  • Monitor trends: Track validation results over time

Master Data Quality

Our Data Engineering program covers data quality, testing, and production-grade data pipelines.

Explore Data Engineering Program

Related Articles