Skip to main content

Data Quality

Great Expectations (expectations, suites, checkpoints, data docs), schema validation, anomaly detection, drift monitoring, and data contracts

~40 min
Listen to this lesson

Data Quality: Trust But Verify

Models are only as good as their data. A single upstream schema change, a broken ETL pipeline, or a subtle shift in data distribution can silently degrade model performance. Data quality checks act as automated guardrails that catch problems before they reach your model.

This lesson covers Great Expectations (the industry standard for data validation), schema enforcement, anomaly detection, drift monitoring, and data contracts.

The Data Quality Iceberg

The most dangerous data quality issues are the ones you do not notice. A column that changes from integers to floats, a categorical feature that gains a new value, or a subtle shift in data distribution will not cause errors -- the pipeline keeps running, but model predictions silently degrade. Automated data quality checks catch these invisible problems.

Great Expectations

Great Expectations (GX) is the leading open-source framework for data validation. It lets you define expectations (assertions about your data) and automatically validate datasets against them.

Core Concepts

ConceptDescription
ExpectationA single assertion about the data (e.g., "column X has no nulls")
Expectation SuiteA collection of expectations for a dataset
ValidatorApplies expectations to actual data and reports results
CheckpointAutomates validation as part of a pipeline
Data DocsAuto-generated HTML documentation of your expectations and results

Common Expectations

# Column-level expectations
expect_column_to_exist("user_id")
expect_column_values_to_not_be_null("user_id")
expect_column_values_to_be_unique("user_id")
expect_column_values_to_be_between("age", min_value=0, max_value=150)
expect_column_values_to_be_in_set("status", ["active", "inactive"])
expect_column_mean_to_be_between("price", min_value=10, max_value=100)

Table-level expectations

expect_table_row_count_to_be_between(min_value=1000, max_value=1000000) expect_table_columns_to_match_set(["id", "name", "value"])

python
1import numpy as np
2import pandas as pd
3
4# === Great Expectations Simulation ===
5# (Full GX requires installation; this simulates the concepts)
6
7class Expectation:
8    """A single data quality expectation."""
9    def __init__(self, name, check_fn, params=None):
10        self.name = name
11        self.check_fn = check_fn
12        self.params = params or {}
13
14    def validate(self, df):
15        try:
16            success, details = self.check_fn(df, **self.params)
17            return {
18                "expectation": self.name,
19                "success": success,
20                "details": details,
21            }
22        except Exception as e:
23            return {
24                "expectation": self.name,
25                "success": False,
26                "details": f"Error: {str(e)}",
27            }
28
29class ExpectationSuite:
30    """A collection of expectations for a dataset."""
31    def __init__(self, name):
32        self.name = name
33        self.expectations = []
34
35    def add(self, expectation):
36        self.expectations.append(expectation)
37
38    def validate(self, df):
39        results = []
40        for exp in self.expectations:
41            results.append(exp.validate(df))
42        n_pass = sum(1 for r in results if r["success"])
43        n_fail = sum(1 for r in results if not r["success"])
44        return {
45            "suite": self.name,
46            "passed": n_pass,
47            "failed": n_fail,
48            "total": len(results),
49            "success": n_fail == 0,
50            "results": results,
51        }
52
53# --- Define expectation factory functions ---
54def expect_column_exists(df, column):
55    return column in df.columns, f"Column '{column}' {'exists' if column in df.columns else 'missing'}"
56
57def expect_no_nulls(df, column):
58    null_count = df[column].isna().sum()
59    return null_count == 0, f"{null_count} nulls found"
60
61def expect_unique(df, column):
62    dupes = df[column].duplicated().sum()
63    return dupes == 0, f"{dupes} duplicates found"
64
65def expect_between(df, column, min_val, max_val):
66    violations = ((df[column] < min_val) | (df[column] > max_val)).sum()
67    return violations == 0, f"{violations} values out of [{min_val}, {max_val}]"
68
69def expect_in_set(df, column, values):
70    invalid = ~df[column].isin(values)
71    return invalid.sum() == 0, f"{invalid.sum()} values not in {values}"
72
73def expect_row_count(df, min_rows, max_rows):
74    n = len(df)
75    return min_rows <= n <= max_rows, f"Row count: {n} (expected [{min_rows}, {max_rows}])"
76
77def expect_mean_between(df, column, min_val, max_val):
78    mean = df[column].mean()
79    return min_val <= mean <= max_val, f"Mean={mean:.2f} (expected [{min_val}, {max_val}])"
80
81# === Build a validation suite ===
82suite = ExpectationSuite("customer_data_quality")
83
84suite.add(Expectation("column_exists: customer_id",
85                       expect_column_exists, {"column": "customer_id"}))
86suite.add(Expectation("no_nulls: customer_id",
87                       expect_no_nulls, {"column": "customer_id"}))
88suite.add(Expectation("unique: customer_id",
89                       expect_unique, {"column": "customer_id"}))
90suite.add(Expectation("between: age [0, 120]",
91                       expect_between, {"column": "age", "min_val": 0, "max_val": 120}))
92suite.add(Expectation("in_set: status",
93                       expect_in_set, {"column": "status", "values": ["active", "inactive", "churned"]}))
94suite.add(Expectation("row_count [100, 10000]",
95                       expect_row_count, {"min_rows": 100, "max_rows": 10000}))
96suite.add(Expectation("mean: purchase_amount [10, 200]",
97                       expect_mean_between,
98                       {"column": "purchase_amount", "min_val": 10, "max_val": 200}))
99
100# --- Test with GOOD data ---
101np.random.seed(42)
102good_data = pd.DataFrame({
103    "customer_id": range(500),
104    "age": np.random.randint(18, 80, 500),
105    "status": np.random.choice(["active", "inactive", "churned"], 500),
106    "purchase_amount": np.random.lognormal(3.5, 0.8, 500),
107})
108
109print("=== Validating GOOD data ===")
110result = suite.validate(good_data)
111print(f"Suite: {result['suite']}")
112print(f"Result: {'PASS' if result['success'] else 'FAIL'}")
113print(f"Passed: {result['passed']}/{result['total']}")
114for r in result["results"]:
115    status = "PASS" if r["success"] else "FAIL"
116    print(f"  [{status}] {r['expectation']}: {r['details']}")
117
118# --- Test with BAD data ---
119bad_data = good_data.copy()
120bad_data.loc[10, "age"] = -5          # Invalid age
121bad_data.loc[20, "age"] = 999         # Invalid age
122bad_data.loc[30, "status"] = "unknown" # Invalid status
123bad_data.loc[40, "customer_id"] = 0    # Duplicate ID
124
125print("\n=== Validating BAD data ===")
126result = suite.validate(bad_data)
127print(f"Result: {'PASS' if result['success'] else 'FAIL'}")
128print(f"Passed: {result['passed']}/{result['total']}")
129for r in result["results"]:
130    status = "PASS" if r["success"] else "FAIL"
131    print(f"  [{status}] {r['expectation']}: {r['details']}")

Data Drift Monitoring

Data drift occurs when the statistical properties of input data change over time, causing model performance to degrade.

Types of Drift

TypeWhat changesDetection
Covariate driftInput feature distributionsStatistical tests on features
Label driftTarget variable distributionMonitor prediction distribution
Concept driftRelationship between features and targetMonitor model performance metrics

Detection Methods

  • Kolmogorov-Smirnov (KS) test: Compares two distributions. p < 0.05 indicates significant drift.
  • Population Stability Index (PSI): Measures distribution shift. PSI > 0.2 indicates significant drift.
  • Jensen-Shannon Divergence: Symmetric divergence between distributions. Higher values indicate more drift.
  • Evidently AI: Open-source library for drift monitoring dashboards.
  • Data Contracts

    Data contracts are formal agreements between data producers and consumers that define:

  • Schema (column names, types, constraints)
  • Quality guarantees (freshness, completeness, accuracy)
  • SLAs (update frequency, availability)
  • Ownership and escalation procedures
  • Data contracts prevent the "my upstream broke your downstream" problem by making data quality a shared responsibility with clear expectations.

    python
    1import numpy as np
    2import pandas as pd
    3from scipy import stats
    4
    5np.random.seed(42)
    6
    7# === Data Drift Detection ===
    8
    9def compute_psi(reference, current, n_bins=10):
    10    """Population Stability Index between two distributions."""
    11    # Create bins from reference distribution
    12    bins = np.percentile(reference, np.linspace(0, 100, n_bins + 1))
    13    bins[0] = -np.inf
    14    bins[-1] = np.inf
    15
    16    ref_counts = np.histogram(reference, bins=bins)[0]
    17    cur_counts = np.histogram(current, bins=bins)[0]
    18
    19    # Add small constant to avoid division by zero
    20    ref_pct = (ref_counts + 1) / (len(reference) + n_bins)
    21    cur_pct = (cur_counts + 1) / (len(current) + n_bins)
    22
    23    psi = np.sum((cur_pct - ref_pct) * np.log(cur_pct / ref_pct))
    24    return psi
    25
    26def detect_drift(reference_df, current_df, feature_names,
    27                  psi_threshold=0.2, ks_alpha=0.05):
    28    """Detect drift across multiple features."""
    29    results = []
    30    for feat in feature_names:
    31        ref = reference_df[feat].dropna().values
    32        cur = current_df[feat].dropna().values
    33
    34        # KS test
    35        ks_stat, ks_pval = stats.ks_2samp(ref, cur)
    36
    37        # PSI
    38        psi = compute_psi(ref, cur)
    39
    40        # Summary stats comparison
    41        ref_mean, ref_std = ref.mean(), ref.std()
    42        cur_mean, cur_std = cur.mean(), cur.std()
    43
    44        drift_detected = (psi > psi_threshold) or (ks_pval < ks_alpha)
    45
    46        results.append({
    47            "feature": feat,
    48            "ks_stat": ks_stat,
    49            "ks_pval": ks_pval,
    50            "psi": psi,
    51            "ref_mean": ref_mean,
    52            "cur_mean": cur_mean,
    53            "mean_shift": cur_mean - ref_mean,
    54            "drift": drift_detected,
    55        })
    56
    57    return pd.DataFrame(results)
    58
    59# --- Simulate drift scenarios ---
    60n_reference = 5000
    61n_current = 5000
    62
    63# Reference data (training distribution)
    64reference = pd.DataFrame({
    65    "age": np.random.normal(35, 10, n_reference),
    66    "income": np.random.lognormal(10.5, 0.5, n_reference),
    67    "score": np.random.beta(2, 5, n_reference) * 100,
    68    "days_active": np.random.exponential(30, n_reference),
    69})
    70
    71# Current data WITH drift
    72current = pd.DataFrame({
    73    "age": np.random.normal(40, 12, n_current),        # Mean shifted
    74    "income": np.random.lognormal(10.5, 0.5, n_current), # No drift
    75    "score": np.random.beta(5, 2, n_current) * 100,     # Distribution flipped
    76    "days_active": np.random.exponential(30, n_current),  # No drift
    77})
    78
    79# Run drift detection
    80features = ["age", "income", "score", "days_active"]
    81drift_results = detect_drift(reference, current, features)
    82
    83print("=== Data Drift Report ===")
    84print(f"{'Feature':<15} {'PSI':>8} {'KS stat':>8} {'KS p-val':>10} "
    85      f"{'Mean Shift':>12} {'Drift?':>8}")
    86print("-" * 65)
    87for _, row in drift_results.iterrows():
    88    flag = "YES ***" if row["drift"] else "no"
    89    print(f"{row['feature']:<15} {row['psi']:>8.4f} {row['ks_stat']:>8.4f} "
    90          f"{row['ks_pval']:>10.4f} {row['mean_shift']:>+12.2f} "
    91          f"{flag:>8}")
    92
    93drifted = drift_results[drift_results["drift"]]
    94print(f"\nDrifted features: {list(drifted['feature'])}")
    95
    96# PSI interpretation
    97print("\nPSI Interpretation:")
    98for _, row in drift_results.iterrows():
    99    if row["psi"] < 0.1:
    100        level = "No significant drift"
    101    elif row["psi"] < 0.2:
    102        level = "Moderate drift - monitor closely"
    103    else:
    104        level = "Significant drift - investigate immediately"
    105    print(f"  {row['feature']}: PSI={row['psi']:.4f} -> {level}")

    Drift Does Not Always Mean Retraining

    Not all drift requires action. Covariate drift (input distribution changes) only matters if it affects model performance. Always pair drift detection with performance monitoring. If drift is detected but model metrics remain stable, the model may be robust to this shift. If performance degrades, investigate the drifted features and retrain.