Skip to content
Published on

Data-Centric AI Complete Guide: Maximizing AI Performance with High-Quality Data

Authors

Data-Centric AI Complete Guide: Maximizing AI Performance with High-Quality Data

In 2021, Andrew Ng posed a provocative question to the AI community: "Instead of focusing on improving model architectures, what if we focused on improving data quality?" This became the starting point of the Data-Centric AI movement.

While the traditional model-centric approach chases "better algorithms," the data-centric approach chases "better data." This guide covers every aspect of Data-Centric AI with hands-on code, from theory to production practice.

1. Data-Centric AI vs Model-Centric AI

1.1 The Paradigm Shift

Model-Centric AI

  • Data is fixed; improve the code
  • Search for better architectures
  • Focus on hyperparameter tuning
  • Classic benchmarks: dataset is fixed, only models change

Data-Centric AI

  • Model is fixed; improve the data
  • Fix label errors
  • Improve labeling guidelines for consistency
  • Add data augmentation and synthetic data

1.2 Andrew Ng's Core Argument

Andrew Ng states:

"AI system = Code (model/algorithm) + Data"

In many practical AI projects, the code is already good enough. The bottleneck is data quality.

Experiment Results (Andrew Ng, DeepLearning.AI):

On a manufacturing inspection dataset with label noise:

  • Baseline: 76.2%
  • Better model only: +0.02% improvement (76.22%)
  • Better data only: +16.9% improvement (93.1%)

This result demonstrates that in many real-world scenarios, improving data quality is far more effective than improving model architecture.

1.3 When Is Data-Centric Most Effective?

Data-centric approaches are especially impactful when:

  1. Small datasets: Quality matters more when you have fewer than a few thousand examples
  2. High label noise: When the label error rate exceeds 5%
  3. Domain-specific tasks: Specialized domains without strong pre-trained models
  4. Imbalanced classes: Rare class quality determines overall performance
  5. Strict accuracy requirements: Medical, financial, or safety-critical applications
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

def compare_model_vs_data_centric(X, y, model_class, noise_level=0.1):
    """
    Compare model-centric vs data-centric approaches

    Args:
        X: feature matrix
        y: labels
        model_class: base model class
        noise_level: fraction of noisy labels
    """
    # Add label noise
    noisy_y = y.copy()
    noise_idx = np.random.choice(len(y), int(len(y) * noise_level), replace=False)
    n_classes = len(np.unique(y))
    for idx in noise_idx:
        wrong_labels = [l for l in range(n_classes) if l != y[idx]]
        noisy_y[idx] = np.random.choice(wrong_labels)

    X_train, X_test, y_train_clean, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    _, _, y_train_noisy, _ = train_test_split(
        X, noisy_y, test_size=0.2, random_state=42
    )

    # --- Model-centric approach ---
    # Noisy data + base model
    base_model = model_class()
    base_model.fit(X_train, y_train_noisy)
    base_acc = accuracy_score(y_test, base_model.predict(X_test))

    # Noisy data + more complex model
    from sklearn.ensemble import GradientBoostingClassifier
    complex_model = GradientBoostingClassifier(n_estimators=200)
    complex_model.fit(X_train, y_train_noisy)
    complex_acc = accuracy_score(y_test, complex_model.predict(X_test))

    # --- Data-centric approach ---
    # Clean data + base model
    clean_model = model_class()
    clean_model.fit(X_train, y_train_clean)
    clean_acc = accuracy_score(y_test, clean_model.predict(X_test))

    print("=" * 50)
    print("Model-Centric vs Data-Centric Comparison")
    print("=" * 50)
    print(f"Base model + noisy data: {base_acc:.3f}")
    print(f"Complex model + noisy data: {complex_acc:.3f}")
    print(f"Base model + clean data: {clean_acc:.3f}")
    print(f"\nModel improvement effect: +{(complex_acc - base_acc):.3f}")
    print(f"Data improvement effect: +{(clean_acc - base_acc):.3f}")

    return {
        'base_model_noisy_data': base_acc,
        'complex_model_noisy_data': complex_acc,
        'base_model_clean_data': clean_acc
    }

2. Data Quality Measurement

2.1 Confident Learning and Label Error Detection

Confident Learning, proposed by Northcutt et al., uses cross-validated prediction probabilities to systematically detect label errors.

Core idea: "If the model predicts class A with high confidence but the label says class B, the label is likely wrong."

import cleanlab
from cleanlab.filter import find_label_issues
from cleanlab.classification import CleanLearning
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_predict

def detect_label_errors_cleanlab(X, y, model=None):
    """
    Detect label errors using Cleanlab

    Args:
        X: feature matrix
        y: label array
        model: classifier (default: LogisticRegression)

    Returns:
        label_issues: indices and info about label issues
    """
    if model is None:
        model = LogisticRegression(max_iter=1000)

    # Predict class probabilities via cross-validation
    pred_probs = cross_val_predict(
        model, X, y,
        cv=5,
        method='predict_proba'
    )

    # Find label issues
    label_issues = find_label_issues(
        labels=y,
        pred_probs=pred_probs,
        return_indices_ranked_by='self_confidence'
    )

    print(f"Total samples: {len(y)}")
    print(f"Label issues found: {len(label_issues)}")
    print(f"Error rate: {len(label_issues)/len(y):.2%}")

    return label_issues


def cleanlab_full_pipeline(X_train, y_train_noisy, X_test, y_test):
    """
    Complete Cleanlab pipeline:
    1. Detect label errors
    2. Remove or correct errors
    3. Retrain on cleaned data
    """
    from cleanlab.classification import CleanLearning

    base_model = LogisticRegression(max_iter=1000)

    # CleanLearning automatically handles label errors during training
    cl = CleanLearning(base_model, seed=42)
    cl.fit(X_train, y_train_noisy)

    y_pred = cl.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    print(f"CleanLearning accuracy: {accuracy:.3f}")

    label_issues_df = cl.get_label_issues()
    print(f"\nLabel issues info:")
    print(label_issues_df.head(10))

    return cl, label_issues_df


def confident_learning_manual(pred_probs, labels):
    """
    Manual implementation of Confident Learning
    - Compute per-class thresholds
    - Build the Confident Joint matrix
    """
    n_classes = pred_probs.shape[1]
    n_samples = len(labels)

    # Per-class threshold: mean predicted probability for that class's samples
    thresholds = np.zeros(n_classes)
    for c in range(n_classes):
        class_mask = labels == c
        if class_mask.sum() > 0:
            thresholds[c] = pred_probs[class_mask, c].mean()

    # Confident Joint matrix C[s][y]
    # s: estimated true class, y: given label
    C = np.zeros((n_classes, n_classes), dtype=int)

    for i in range(n_samples):
        y_given = labels[i]
        over_threshold = pred_probs[i] >= thresholds

        if over_threshold.sum() == 0:
            y_hat = pred_probs[i].argmax()
        else:
            y_hat = (pred_probs[i] * over_threshold).argmax()

        C[y_hat, y_given] += 1

    off_diagonal = C.copy()
    np.fill_diagonal(off_diagonal, 0)

    print("Confident Joint Matrix (rows: estimated true class, cols: given label):")
    print(C)
    print(f"\nEstimated mislabeled samples: {off_diagonal.sum()}")

    return C

2.2 Data Outlier Detection

from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
import numpy as np

class DataQualityChecker:
    """Comprehensive data quality inspection toolkit"""

    def __init__(self):
        self.quality_report = {}

    def check_class_distribution(self, labels):
        """Check class imbalance"""
        from collections import Counter
        import pandas as pd

        counts = Counter(labels)
        total = len(labels)

        df = pd.DataFrame([
            {'class': c, 'count': n, 'percentage': 100 * n / total}
            for c, n in sorted(counts.items())
        ])

        imbalance_ratio = max(counts.values()) / min(counts.values())

        print("Class distribution:")
        print(df.to_string(index=False))
        print(f"\nImbalance ratio: {imbalance_ratio:.2f}x")

        if imbalance_ratio > 10:
            print("WARNING: Severe class imbalance!")
        elif imbalance_ratio > 3:
            print("NOTICE: Class imbalance detected")

        self.quality_report['class_imbalance_ratio'] = imbalance_ratio
        return df

    def detect_outliers(self, X, method='isolation_forest', contamination=0.1):
        """
        Outlier detection

        Args:
            method: 'isolation_forest' or 'lof'
            contamination: expected fraction of outliers
        """
        if method == 'isolation_forest':
            detector = IsolationForest(
                contamination=contamination,
                random_state=42
            )
        elif method == 'lof':
            detector = LocalOutlierFactor(
                contamination=contamination,
                novelty=True
            )

        predictions = detector.fit_predict(X)
        outlier_mask = predictions == -1
        outlier_indices = np.where(outlier_mask)[0]

        print(f"Outliers detected: {outlier_mask.sum()} / {len(X)} ({outlier_mask.mean():.2%})")

        self.quality_report['n_outliers'] = outlier_mask.sum()
        return outlier_indices, outlier_mask

    def check_duplicates(self, X, y=None, threshold=0.99):
        """
        Detect duplicate samples

        Args:
            threshold: similarity threshold (1.0 = exact match)
        """
        from sklearn.metrics.pairwise import cosine_similarity

        if len(X) > 10000:
            sample_idx = np.random.choice(len(X), 10000, replace=False)
            X_sample = X[sample_idx]
        else:
            X_sample = X
            sample_idx = np.arange(len(X))

        sim_matrix = cosine_similarity(X_sample)
        np.fill_diagonal(sim_matrix, 0)

        duplicate_pairs = np.argwhere(sim_matrix >= threshold)
        duplicate_pairs = duplicate_pairs[duplicate_pairs[:, 0] < duplicate_pairs[:, 1]]

        print(f"Duplicate pairs found: {len(duplicate_pairs)}")

        if y is not None and len(duplicate_pairs) > 0:
            label_conflicts = 0
            for i, j in duplicate_pairs:
                if y[sample_idx[i]] != y[sample_idx[j]]:
                    label_conflicts += 1
            print(f"Label-conflicting duplicate pairs: {label_conflicts}")

        return duplicate_pairs

    def compute_data_quality_score(self, X, y):
        """Compute overall data quality score"""
        scores = {}

        if hasattr(X, 'isnull'):
            missing_rate = X.isnull().mean().mean()
        else:
            missing_rate = np.isnan(X).mean()
        scores['completeness'] = 1 - missing_rate

        from collections import Counter
        counts = Counter(y)
        n_classes = len(counts)
        ideal_count = len(y) / n_classes
        balance_score = sum(
            min(c, ideal_count) / ideal_count
            for c in counts.values()
        ) / n_classes
        scores['balance'] = balance_score

        overall_score = np.mean(list(scores.values()))
        scores['overall'] = overall_score

        print("Data quality scores:")
        for metric, score in scores.items():
            print(f"  {metric}: {score:.3f}")

        return scores

3. Labeling Strategies

3.1 High-Quality Labeling Guidelines

def compute_inter_rater_agreement(annotations):
    """
    Compute inter-rater agreement

    Args:
        annotations: array of shape (n_samples, n_raters)

    Returns:
        cohen_kappa: Cohen's Kappa score
        majority_labels: majority-vote labels
    """
    from sklearn.metrics import cohen_kappa_score
    import numpy as np

    n_samples, n_raters = annotations.shape

    kappa_scores = []
    for i in range(n_raters):
        for j in range(i+1, n_raters):
            kappa = cohen_kappa_score(
                annotations[:, i],
                annotations[:, j]
            )
            kappa_scores.append((i, j, kappa))
            print(f"Rater {i} vs Rater {j}: kappa = {kappa:.3f}")

    mean_kappa = np.mean([k for _, _, k in kappa_scores])
    print(f"\nMean Cohen's Kappa: {mean_kappa:.3f}")

    if mean_kappa < 0.2:
        interpretation = "Slight agreement"
    elif mean_kappa < 0.4:
        interpretation = "Fair agreement"
    elif mean_kappa < 0.6:
        interpretation = "Moderate agreement"
    elif mean_kappa < 0.8:
        interpretation = "Substantial agreement"
    else:
        interpretation = "Almost perfect agreement"

    print(f"Interpretation: {interpretation}")

    from scipy import stats
    majority_labels = stats.mode(annotations, axis=1)[0].flatten()
    print(f"\nMajority-vote labels generated")

    return mean_kappa, majority_labels

3.2 Weak Supervision with Snorkel

def snorkel_programmatic_labeling_demo():
    """
    Programmatic labeling demo with Snorkel
    pip install snorkel
    """
    from snorkel.labeling import labeling_function, PandasLFApplier
    from snorkel.labeling.model import LabelModel
    import re

    POSITIVE = 1
    NEGATIVE = 0
    ABSTAIN = -1

    @labeling_function()
    def lf_positive_keywords(x):
        """Label based on positive keywords"""
        positive_words = ['good', 'great', 'excellent', 'amazing', 'love', 'best']
        if any(word in x.text.lower() for word in positive_words):
            return POSITIVE
        return ABSTAIN

    @labeling_function()
    def lf_negative_keywords(x):
        """Label based on negative keywords"""
        negative_words = ['bad', 'terrible', 'awful', 'hate', 'worst', 'horrible']
        if any(word in x.text.lower() for word in negative_words):
            return NEGATIVE
        return ABSTAIN

    @labeling_function()
    def lf_rating_high(x):
        """Label based on high rating"""
        if hasattr(x, 'rating') and x.rating >= 4:
            return POSITIVE
        return ABSTAIN

    @labeling_function()
    def lf_rating_low(x):
        """Label based on low rating"""
        if hasattr(x, 'rating') and x.rating <= 2:
            return NEGATIVE
        return ABSTAIN

    @labeling_function()
    def lf_negation_check(x):
        """Detect negation"""
        text = x.text.lower()
        if re.search(r"not (good|great|excellent)", text):
            return NEGATIVE
        if re.search(r"not (bad|terrible)", text):
            return POSITIVE
        return ABSTAIN

    lfs = [
        lf_positive_keywords,
        lf_negative_keywords,
        lf_rating_high,
        lf_rating_low,
        lf_negation_check,
    ]

    print("Snorkel Programmatic Labeling Pipeline:")
    print("1. Domain experts write labeling functions (LFs)")
    print("2. Apply LFs to unlabeled data")
    print("3. Combine multiple LFs with Label Model (noise-aware)")
    print("4. Train downstream model on soft labels")
    print(f"\nDefined labeling functions: {len(lfs)}")

    return lfs

4. Active Learning

Active learning minimizes labeling cost by selecting the most informative samples from a large unlabeled pool.

import numpy as np
import torch
import torch.nn as nn

class ActiveLearner:
    """
    Active learning with multiple sampling strategies
    """

    def __init__(self, model, strategy='uncertainty', n_initial=100):
        self.model = model
        self.strategy = strategy
        self.n_initial = n_initial

    def uncertainty_sampling(self, X_unlabeled, n_samples):
        """
        Uncertainty sampling: select the samples the model is least confident about
        """
        probs = self._get_probs(X_unlabeled)

        if self.strategy == 'least_confidence':
            uncertainty = 1 - probs.max(axis=1)

        elif self.strategy == 'margin':
            sorted_probs = np.sort(probs, axis=1)[:, ::-1]
            uncertainty = 1 - (sorted_probs[:, 0] - sorted_probs[:, 1])

        elif self.strategy == 'entropy':
            uncertainty = -np.sum(probs * np.log(probs + 1e-10), axis=1)

        else:
            uncertainty = 1 - probs.max(axis=1)

        selected_indices = np.argsort(uncertainty)[-n_samples:]
        return selected_indices, uncertainty

    def diversity_sampling(self, X_unlabeled, X_labeled, n_samples):
        """
        Diversity-based sampling (CoreSet)
        Select samples most dissimilar to already-labeled data
        """
        from sklearn.metrics.pairwise import euclidean_distances

        selected = []
        remaining = list(range(len(X_unlabeled)))
        current_labeled = X_labeled.copy()

        for _ in range(n_samples):
            dists = euclidean_distances(
                X_unlabeled[remaining],
                current_labeled
            ).min(axis=1)

            best_idx = remaining[np.argmax(dists)]
            selected.append(best_idx)
            remaining.remove(best_idx)
            current_labeled = np.vstack([current_labeled, X_unlabeled[best_idx]])

        return np.array(selected)

    def batch_mode_active_learning(self, X_pool, y_oracle, X_test, y_test,
                                    n_iterations=10, n_per_iter=50):
        """
        Batch-mode active learning loop

        Args:
            X_pool: unlabeled data pool
            y_oracle: true labels (oracle)
            n_per_iter: number of samples to label per iteration
        """
        initial_indices = np.random.choice(
            len(X_pool), self.n_initial, replace=False
        )
        labeled_indices = list(initial_indices)
        unlabeled_indices = [
            i for i in range(len(X_pool)) if i not in labeled_indices
        ]

        accuracies = []
        n_labeled_list = []

        for iteration in range(n_iterations):
            X_labeled = X_pool[labeled_indices]
            y_labeled = y_oracle[labeled_indices]

            self.model.fit(X_labeled, y_labeled)

            acc = accuracy_score(y_test, self.model.predict(X_test))
            accuracies.append(acc)
            n_labeled_list.append(len(labeled_indices))

            print(f"Iteration {iteration+1}: n_labeled={len(labeled_indices)}, accuracy={acc:.3f}")

            if len(unlabeled_indices) == 0:
                break

            X_unlabeled = X_pool[unlabeled_indices]
            selected, _ = self.uncertainty_sampling(X_unlabeled, n_per_iter)

            actual_selected = [unlabeled_indices[i] for i in selected]
            labeled_indices.extend(actual_selected)
            unlabeled_indices = [
                i for i in unlabeled_indices if i not in actual_selected
            ]

        return accuracies, n_labeled_list

    def _get_probs(self, X):
        if hasattr(self.model, 'predict_proba'):
            return self.model.predict_proba(X)
        else:
            logits = self.model.predict(X)
            from scipy.special import softmax
            return softmax(logits, axis=1)

5. Data Augmentation Deep Dive

5.1 Image Augmentation with Albumentations

import albumentations as A
from albumentations.pytorch import ToTensorV2
import numpy as np

def get_train_transforms(image_size=224):
    """
    Strong augmentation pipeline for training (Albumentations)
    """
    return A.Compose([
        # Geometric transforms
        A.RandomResizedCrop(
            height=image_size,
            width=image_size,
            scale=(0.7, 1.0),
            ratio=(0.75, 1.33)
        ),
        A.HorizontalFlip(p=0.5),
        A.ShiftScaleRotate(
            shift_limit=0.1,
            scale_limit=0.2,
            rotate_limit=30,
            p=0.5
        ),

        # Color transforms
        A.ColorJitter(
            brightness=0.3,
            contrast=0.3,
            saturation=0.3,
            hue=0.1,
            p=0.8
        ),
        A.ToGray(p=0.1),
        A.RandomGamma(gamma_limit=(80, 120), p=0.3),

        # Noise and blur
        A.GaussNoise(var_limit=(10, 50), p=0.3),
        A.OneOf([
            A.MotionBlur(blur_limit=7),
            A.GaussianBlur(blur_limit=7),
            A.MedianBlur(blur_limit=7),
        ], p=0.3),

        # Cutout / random erasing
        A.CoarseDropout(
            max_holes=8,
            max_height=32,
            max_width=32,
            fill_value=0,
            p=0.3
        ),

        # Grid distortion
        A.OneOf([
            A.GridDistortion(p=1),
            A.ElasticTransform(p=1),
            A.OpticalDistortion(p=1),
        ], p=0.2),

        # Normalize and convert to tensor
        A.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        ),
        ToTensorV2(),
    ])


def mixup_augmentation(images, labels, alpha=0.4):
    """
    MixUp: blend two images and their labels
    Zhang et al., "mixup: Beyond Empirical Risk Minimization" (2018)
    """
    import torch

    batch_size = images.shape[0]
    lam = np.random.beta(alpha, alpha)
    perm = torch.randperm(batch_size)

    mixed_images = lam * images + (1 - lam) * images[perm]
    labels_a = labels
    labels_b = labels[perm]

    return mixed_images, labels_a, labels_b, lam


def cutmix_augmentation(images, labels, alpha=1.0):
    """
    CutMix: paste a patch from one image onto another
    Yun et al., "CutMix: Training Strategy that Makes Use of
    Sample Mixing" (2019)
    """
    import torch

    batch_size, c, h, w = images.shape
    lam = np.random.beta(alpha, alpha)
    perm = torch.randperm(batch_size)

    cut_ratio = np.sqrt(1 - lam)
    cut_h = int(h * cut_ratio)
    cut_w = int(w * cut_ratio)

    cx = np.random.randint(w)
    cy = np.random.randint(h)

    bbx1 = np.clip(cx - cut_w // 2, 0, w)
    bby1 = np.clip(cy - cut_h // 2, 0, h)
    bbx2 = np.clip(cx + cut_w // 2, 0, w)
    bby2 = np.clip(cy + cut_h // 2, 0, h)

    mixed_images = images.clone()
    mixed_images[:, :, bby1:bby2, bbx1:bbx2] = images[perm, :, bby1:bby2, bbx1:bbx2]

    lam = 1 - ((bbx2 - bbx1) * (bby2 - bby1) / (w * h))
    labels_a = labels
    labels_b = labels[perm]

    return mixed_images, labels_a, labels_b, lam

5.2 Text Augmentation

class TextAugmenter:
    """
    Text data augmentation techniques
    """

    def __init__(self):
        pass

    def eda_synonym_replacement(self, text, n=1):
        """
        EDA: Synonym Replacement
        Wei and Zou, "EDA: Easy Data Augmentation Techniques
        for Boosting Performance on Text Classification Tasks" (2019)
        """
        import nltk
        from nltk.corpus import wordnet

        words = text.split()
        new_words = words.copy()

        stop_words = set(['a', 'an', 'the', 'is', 'are', 'was', 'were',
                          'i', 'me', 'my', 'we', 'our', 'you', 'your'])

        replaceable = [
            (i, word) for i, word in enumerate(words)
            if word.lower() not in stop_words
        ]

        np.random.shuffle(replaceable)
        replaced = 0

        for idx, word in replaceable:
            if replaced >= n:
                break

            synsets = wordnet.synsets(word)
            if synsets:
                synonyms = [
                    lemma.name() for synset in synsets
                    for lemma in synset.lemmas()
                    if lemma.name() != word
                ]
                if synonyms:
                    new_words[idx] = np.random.choice(synonyms).replace('_', ' ')
                    replaced += 1

        return ' '.join(new_words)

    def eda_random_swap(self, text, n=1):
        """EDA: Random Swap"""
        words = text.split()
        if len(words) < 2:
            return text

        new_words = words.copy()
        for _ in range(n):
            i, j = np.random.choice(len(new_words), 2, replace=False)
            new_words[i], new_words[j] = new_words[j], new_words[i]

        return ' '.join(new_words)

    def eda_random_deletion(self, text, p=0.1):
        """EDA: Random Deletion"""
        words = text.split()
        if len(words) == 1:
            return text

        new_words = [word for word in words if np.random.random() > p]
        return ' '.join(new_words) if new_words else np.random.choice(words)

    def back_translation(self, text, src_lang='en', pivot_lang='fr'):
        """
        Back-Translation: en -> fr -> en
        Preserves meaning while diversifying expression
        """
        try:
            from transformers import pipeline

            translator_fwd = pipeline(
                f"translation_{src_lang}_to_{pivot_lang}",
                model=f"Helsinki-NLP/opus-mt-{src_lang}-{pivot_lang}"
            )

            translator_bwd = pipeline(
                f"translation_{pivot_lang}_to_{src_lang}",
                model=f"Helsinki-NLP/opus-mt-{pivot_lang}-{src_lang}"
            )

            pivot_text = translator_fwd(text)[0]['translation_text']
            back_translated = translator_bwd(pivot_text)[0]['translation_text']

            return back_translated

        except Exception as e:
            print(f"Translation error: {e}")
            return text

    def augment_dataset(self, texts, labels, n_aug=4):
        """Augment entire dataset"""
        augmented_texts = []
        augmented_labels = []

        for text, label in zip(texts, labels):
            augmented_texts.append(text)
            augmented_labels.append(label)

            for _ in range(n_aug):
                aug_type = np.random.choice(
                    ['synonym', 'swap', 'deletion']
                )

                if aug_type == 'synonym':
                    aug_text = self.eda_synonym_replacement(text)
                elif aug_type == 'swap':
                    aug_text = self.eda_random_swap(text)
                else:
                    aug_text = self.eda_random_deletion(text)

                augmented_texts.append(aug_text)
                augmented_labels.append(label)

        print(f"Original samples: {len(texts)}")
        print(f"Augmented samples: {len(augmented_texts)}")

        return augmented_texts, augmented_labels

5.3 Automatic Augmentation (RandAugment, SpecAugment)

import torch
import torchvision.transforms as transforms

def get_randaugment_transforms(n=2, m=9, image_size=224):
    """
    RandAugment: randomized augmentation policy
    Cubuk et al., "RandAugment: Practical Automated Data Augmentation" (2019)

    Args:
        n: number of augmentation operations to apply
        m: magnitude of augmentation (0-30)
    """
    transform = transforms.Compose([
        transforms.RandomResizedCrop(image_size),
        transforms.RandAugment(num_ops=n, magnitude=m),
        transforms.ToTensor(),
        transforms.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        )
    ])

    return transform


def specaugment_for_audio(spectrogram, freq_mask_param=27, time_mask_param=70):
    """
    SpecAugment: audio spectrogram augmentation
    Park et al., "SpecAugment: A Simple Data Augmentation Method
    for Automatic Speech Recognition" (2019)

    Args:
        spectrogram: input spectrogram (freq, time)
        freq_mask_param: max frequency mask size
        time_mask_param: max time mask size
    """
    import torchaudio.transforms as T

    freq_mask = T.FrequencyMasking(freq_mask_param=freq_mask_param)
    time_mask = T.TimeMasking(time_mask_param=time_mask_param)

    augmented = freq_mask(spectrogram)
    augmented = time_mask(augmented)

    return augmented

6. Synthetic Data Generation

6.1 Synthetic Text with LLMs

class SyntheticTextGenerator:
    """Generate synthetic training data using LLMs"""

    def __init__(self, llm_client, model_name='gpt-4'):
        self.llm = llm_client
        self.model_name = model_name

    def generate_classification_data(self, class_name, n_samples=100,
                                     domain='general', style='diverse'):
        """
        Generate synthetic data for a classification class

        Args:
            class_name: name of the target class
            n_samples: number of samples to generate
            domain: domain context (medical, legal, etc.)
            style: writing style (formal, casual, diverse)
        """
        prompt = f"""Generate {n_samples} diverse text examples for the class '{class_name}'.
Domain: {domain}
Style: {style}

Requirements:
- Each example should be 1-3 sentences
- Vary the vocabulary, sentence structure, and perspective
- Include both simple and complex cases
- Format as a JSON list: ["example1", "example2", ...]

Generate realistic examples that would appear in real-world {domain} data."""

        print(f"Synthetic data generation prompt for '{class_name}':")
        print(prompt[:300] + "...")
        print(f"\nPlanning to generate {n_samples} samples")

    def generate_edge_cases(self, class_examples, n_edge_cases=20):
        """Generate challenging edge cases"""
        prompt = f"""Based on these training examples:
{chr(10).join(class_examples[:5])}

Generate {n_edge_cases} challenging edge cases that:
1. Are ambiguous between different categories
2. Contain misleading keywords
3. Have unusual sentence structures
4. Test the model's true understanding

Format as JSON list."""

        print("Edge case generation prompt ready")

    def augment_with_paraphrase(self, texts, n_paraphrases=3):
        """Generate paraphrases using LLM"""
        augmented = []

        for text in texts:
            prompt = f"""Paraphrase the following text {n_paraphrases} times.
Keep the same meaning but use different words and sentence structures.

Original: "{text}"

Format as JSON list of {n_paraphrases} paraphrases."""

            augmented.append({
                'original': text,
                'paraphrases': []
            })

        return augmented


class SyntheticImageGenerator:
    """Generate synthetic images with Diffusion Models"""

    def __init__(self, model_name='stabilityai/stable-diffusion-2-1'):
        self.model_name = model_name

    def setup_pipeline(self):
        """
        Initialize Stable Diffusion pipeline
        pip install diffusers accelerate
        """
        try:
            from diffusers import StableDiffusionPipeline
            import torch

            self.pipe = StableDiffusionPipeline.from_pretrained(
                self.model_name,
                torch_dtype=torch.float16
            )

            if torch.cuda.is_available():
                self.pipe = self.pipe.to('cuda')

            print(f"Pipeline initialized: {self.model_name}")
        except Exception as e:
            print(f"Pipeline initialization error: {e}")

    def generate_class_images(self, class_name, n_images=50,
                               style_prompt="high quality, photorealistic"):
        """
        Generate synthetic images for a class

        Args:
            class_name: target class name
            n_images: number of images to generate
            style_prompt: style guidance
        """
        prompts = [
            f"A photo of {class_name}, {style_prompt}",
            f"{class_name} in natural environment, {style_prompt}",
            f"Close-up of {class_name}, detailed, {style_prompt}",
            f"{class_name} from different angle, {style_prompt}",
        ]

        print(f"Synthetic image generation plan for '{class_name}':")
        print(f"Images to generate: {n_images}")
        print("Sample prompts:")
        for p in prompts[:2]:
            print(f"  - {p}")

    def evaluate_synthetic_quality(self, real_images, synthetic_images):
        """Evaluate synthetic image quality using FID score"""
        try:
            from torchmetrics.image.fid import FrechetInceptionDistance

            fid = FrechetInceptionDistance(feature=64)
            fid.update(real_images, real=True)
            fid.update(synthetic_images, real=False)

            fid_score = fid.compute()
            print(f"FID Score: {fid_score:.2f}")
            print("(Lower is better; 0 is perfect)")

            return fid_score
        except Exception as e:
            print(f"FID computation error: {e}")

7. The Data Flywheel

7.1 Data Flywheel Concept

The data flywheel is a virtuous cycle of product, data, and model:

  1. Better model → Better product
  2. Better product → More users
  3. More users → More data
  4. More data → Better model
class DataFlywheelPipeline:
    """Data Flywheel implementation pipeline"""

    def __init__(self, model, feedback_store):
        self.model = model
        self.feedback_store = feedback_store
        self.version = 0

    def collect_production_feedback(self, predictions, user_feedback):
        """
        Collect user feedback from production

        Args:
            predictions: model predictions
            user_feedback: user corrections/confirmations
        """
        valuable_samples = []

        for pred, feedback in zip(predictions, user_feedback):
            if feedback['corrected']:
                sample = {
                    'input': feedback['input'],
                    'model_prediction': pred,
                    'true_label': feedback['correction'],
                    'confidence': pred['confidence'],
                    'timestamp': feedback['timestamp'],
                    'value': 'high'
                }
                valuable_samples.append(sample)

            elif feedback['confirmed'] and pred['confidence'] < 0.7:
                sample = {
                    'input': feedback['input'],
                    'true_label': pred['label'],
                    'confidence': pred['confidence'],
                    'value': 'medium'
                }
                valuable_samples.append(sample)

        print(f"Valuable samples collected: {len(valuable_samples)}")
        return valuable_samples

    def prioritize_labeling_queue(self, unlabeled_pool, budget):
        """
        Prioritize labeling queue

        Priority criteria:
        1. Model uncertainty (higher = more priority)
        2. Class rarity (rarer classes = more priority)
        3. Data diversity (more different from existing = more priority)
        """
        priorities = []

        for sample in unlabeled_pool:
            score = 0

            uncertainty = 1 - max(sample['predicted_probs'])
            score += uncertainty * 0.5

            predicted_class = max(sample['predicted_probs'],
                                  key=sample['predicted_probs'].get)
            rarity = 1 / (sample['class_counts'].get(predicted_class, 1) + 1)
            score += rarity * 0.3

            diversity = np.std(list(sample['predicted_probs'].values()))
            score += diversity * 0.2

            priorities.append((sample, score))

        priorities.sort(key=lambda x: x[1], reverse=True)
        selected = [s for s, _ in priorities[:budget]]

        return selected

8. Data Pipeline Best Practices

8.1 Reproducible Data Processing

import hashlib
import json
import os
from pathlib import Path
from datetime import datetime

class ReproducibleDataPipeline:
    """
    Reproducible data pipeline
    - Tracks all processing steps
    - Verifies integrity via data hashes
    - Supports version control
    """

    def __init__(self, pipeline_name, base_dir='data/processed'):
        self.pipeline_name = pipeline_name
        self.base_dir = Path(base_dir)
        self.steps = []
        self.metadata = {
            'pipeline': pipeline_name,
            'created_at': datetime.now().isoformat(),
            'steps': []
        }

    def add_step(self, step_name, func, *args, **kwargs):
        """Add processing step"""
        self.steps.append({
            'name': step_name,
            'func': func,
            'args': args,
            'kwargs': kwargs
        })

    def compute_hash(self, data):
        """Compute data hash"""
        if isinstance(data, np.ndarray):
            return hashlib.md5(data.tobytes()).hexdigest()
        elif isinstance(data, (list, dict)):
            return hashlib.md5(
                json.dumps(data, sort_keys=True, default=str).encode()
            ).hexdigest()
        else:
            return hashlib.md5(str(data).encode()).hexdigest()

    def run(self, input_data):
        """Execute pipeline"""
        data = input_data

        for step in self.steps:
            print(f"Running: {step['name']}")

            hash_before = self.compute_hash(data)
            data = step['func'](data, *step['args'], **step['kwargs'])
            hash_after = self.compute_hash(data)

            self.metadata['steps'].append({
                'name': step['name'],
                'hash_before': hash_before,
                'hash_after': hash_after,
                'timestamp': datetime.now().isoformat()
            })

            print(f"  Done: {hash_before[:8]} -> {hash_after[:8]}")

        metadata_path = self.base_dir / f"{self.pipeline_name}_metadata.json"
        metadata_path.parent.mkdir(parents=True, exist_ok=True)
        with open(metadata_path, 'w') as f:
            json.dump(self.metadata, f, indent=2)

        print(f"\nPipeline complete. Metadata: {metadata_path}")
        return data


class DataVersionControl:
    """DVC-style data version control"""

    def __init__(self, storage_path='data/.dvc'):
        self.storage_path = Path(storage_path)
        self.storage_path.mkdir(parents=True, exist_ok=True)

    def add(self, data_path):
        """Start tracking a data file"""
        data_path = Path(data_path)

        with open(data_path, 'rb') as f:
            file_hash = hashlib.md5(f.read()).hexdigest()

        dvc_file = data_path.with_suffix('.dvc')
        dvc_metadata = {
            'md5': file_hash,
            'size': os.path.getsize(data_path),
            'path': str(data_path.name),
            'version': datetime.now().isoformat()
        }

        with open(dvc_file, 'w') as f:
            json.dump(dvc_metadata, f, indent=2)

        print(f"Tracking: {data_path}")
        print(f"  MD5: {file_hash}")
        print(f"  Metafile: {dvc_file}")

        return file_hash

    def create_data_contract(self, schema):
        """
        Define a Data Contract
        - Schema definition
        - Quality criteria
        - SLA requirements
        """
        contract = {
            'version': '1.0',
            'schema': schema,
            'quality_rules': {
                'completeness': {'min_threshold': 0.99},
                'accuracy': {'label_error_rate': {'max': 0.05}},
                'consistency': {'duplicate_rate': {'max': 0.01}},
            },
            'sla': {
                'update_frequency': 'daily',
                'max_staleness_hours': 24,
            }
        }

        return contract


def demonstrate_full_data_pipeline():
    """Full Data-Centric AI pipeline demonstration"""
    print("=" * 60)
    print("Data-Centric AI Pipeline Demo")
    print("=" * 60)

    print("\nStep 1: Data Quality Assessment")
    print("  - Measure label error rate")
    print("  - Detect outliers")
    print("  - Remove duplicates")
    print("  - Analyze class distribution")

    print("\nStep 2: Label Refinement")
    print("  - Detect errors with Cleanlab")
    print("  - Correct via majority vote / expert review")
    print("  - Improve inter-rater agreement")

    print("\nStep 3: Data Augmentation")
    print("  - Images: Albumentations")
    print("  - Text: EDA, back-translation")
    print("  - Search for automatic augmentation policies")

    print("\nStep 4: Synthetic Data Generation")
    print("  - Synthesize text with LLMs")
    print("  - Synthesize images with Diffusion Models")
    print("  - Filter by quality (FID, classifier confidence)")

    print("\nStep 5: Active Learning")
    print("  - Prioritize labeling with uncertainty sampling")
    print("  - Ensure diversity with CoreSet method")

    print("\nStep 6: Version Control and Monitoring")
    print("  - Version data with DVC")
    print("  - Maintain quality standards with Data Contracts")
    print("  - Continuously improve with the Data Flywheel")

    print("\nConclusion: Improving data quality is often more impactful than improving models!")

9. Summary and Practical Guide

Data-Centric AI Checklist

1. Data Collection

  • Write labeling guidelines with domain experts
  • Measure inter-rater agreement (target Cohen's Kappa > 0.8)
  • Monitor class distribution during collection

2. Data Cleaning

  • Detect and fix label errors with Cleanlab
  • Remove duplicate samples
  • Review outliers (remove or correct)

3. Data Augmentation

  • Apply augmentation only to training data (not validation/test)
  • Validate data distribution after augmentation
  • Choose augmentation techniques appropriate for your domain

4. Continuous Improvement

  • Collect production error cases
  • Use active learning to make labeling efficient
  • Conduct regular data quality audits

Recommended Tools:

Data-Centric AI is not merely a matter of tools or techniques. It is a mindset shift — from chasing "better models" to chasing "better data." In many real-world projects, this shift alone can deliver dramatic performance improvements.

The most important insight from Andrew Ng's Data-Centric AI movement: your model is only as good as the data it learns from. Investing in data quality, labeling consistency, and systematic data improvement is often the highest-return activity in an AI project.