- Published on
Data-Centric AI Complete Guide: Maximizing AI Performance with High-Quality Data
- Authors

- Name
- Youngju Kim
- @fjvbn20031
Data-Centric AI Complete Guide: Maximizing AI Performance with High-Quality Data
In 2021, Andrew Ng posed a provocative question to the AI community: "Instead of focusing on improving model architectures, what if we focused on improving data quality?" This became the starting point of the Data-Centric AI movement.
While the traditional model-centric approach chases "better algorithms," the data-centric approach chases "better data." This guide covers every aspect of Data-Centric AI with hands-on code, from theory to production practice.
1. Data-Centric AI vs Model-Centric AI
1.1 The Paradigm Shift
Model-Centric AI
- Data is fixed; improve the code
- Search for better architectures
- Focus on hyperparameter tuning
- Classic benchmarks: dataset is fixed, only models change
Data-Centric AI
- Model is fixed; improve the data
- Fix label errors
- Improve labeling guidelines for consistency
- Add data augmentation and synthetic data
1.2 Andrew Ng's Core Argument
Andrew Ng states:
"AI system = Code (model/algorithm) + Data"
In many practical AI projects, the code is already good enough. The bottleneck is data quality.
Experiment Results (Andrew Ng, DeepLearning.AI):
On a manufacturing inspection dataset with label noise:
- Baseline: 76.2%
- Better model only: +0.02% improvement (76.22%)
- Better data only: +16.9% improvement (93.1%)
This result demonstrates that in many real-world scenarios, improving data quality is far more effective than improving model architecture.
1.3 When Is Data-Centric Most Effective?
Data-centric approaches are especially impactful when:
- Small datasets: Quality matters more when you have fewer than a few thousand examples
- High label noise: When the label error rate exceeds 5%
- Domain-specific tasks: Specialized domains without strong pre-trained models
- Imbalanced classes: Rare class quality determines overall performance
- Strict accuracy requirements: Medical, financial, or safety-critical applications
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
def compare_model_vs_data_centric(X, y, model_class, noise_level=0.1):
"""
Compare model-centric vs data-centric approaches
Args:
X: feature matrix
y: labels
model_class: base model class
noise_level: fraction of noisy labels
"""
# Add label noise
noisy_y = y.copy()
noise_idx = np.random.choice(len(y), int(len(y) * noise_level), replace=False)
n_classes = len(np.unique(y))
for idx in noise_idx:
wrong_labels = [l for l in range(n_classes) if l != y[idx]]
noisy_y[idx] = np.random.choice(wrong_labels)
X_train, X_test, y_train_clean, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
_, _, y_train_noisy, _ = train_test_split(
X, noisy_y, test_size=0.2, random_state=42
)
# --- Model-centric approach ---
# Noisy data + base model
base_model = model_class()
base_model.fit(X_train, y_train_noisy)
base_acc = accuracy_score(y_test, base_model.predict(X_test))
# Noisy data + more complex model
from sklearn.ensemble import GradientBoostingClassifier
complex_model = GradientBoostingClassifier(n_estimators=200)
complex_model.fit(X_train, y_train_noisy)
complex_acc = accuracy_score(y_test, complex_model.predict(X_test))
# --- Data-centric approach ---
# Clean data + base model
clean_model = model_class()
clean_model.fit(X_train, y_train_clean)
clean_acc = accuracy_score(y_test, clean_model.predict(X_test))
print("=" * 50)
print("Model-Centric vs Data-Centric Comparison")
print("=" * 50)
print(f"Base model + noisy data: {base_acc:.3f}")
print(f"Complex model + noisy data: {complex_acc:.3f}")
print(f"Base model + clean data: {clean_acc:.3f}")
print(f"\nModel improvement effect: +{(complex_acc - base_acc):.3f}")
print(f"Data improvement effect: +{(clean_acc - base_acc):.3f}")
return {
'base_model_noisy_data': base_acc,
'complex_model_noisy_data': complex_acc,
'base_model_clean_data': clean_acc
}
2. Data Quality Measurement
2.1 Confident Learning and Label Error Detection
Confident Learning, proposed by Northcutt et al., uses cross-validated prediction probabilities to systematically detect label errors.
Core idea: "If the model predicts class A with high confidence but the label says class B, the label is likely wrong."
import cleanlab
from cleanlab.filter import find_label_issues
from cleanlab.classification import CleanLearning
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_predict
def detect_label_errors_cleanlab(X, y, model=None):
"""
Detect label errors using Cleanlab
Args:
X: feature matrix
y: label array
model: classifier (default: LogisticRegression)
Returns:
label_issues: indices and info about label issues
"""
if model is None:
model = LogisticRegression(max_iter=1000)
# Predict class probabilities via cross-validation
pred_probs = cross_val_predict(
model, X, y,
cv=5,
method='predict_proba'
)
# Find label issues
label_issues = find_label_issues(
labels=y,
pred_probs=pred_probs,
return_indices_ranked_by='self_confidence'
)
print(f"Total samples: {len(y)}")
print(f"Label issues found: {len(label_issues)}")
print(f"Error rate: {len(label_issues)/len(y):.2%}")
return label_issues
def cleanlab_full_pipeline(X_train, y_train_noisy, X_test, y_test):
"""
Complete Cleanlab pipeline:
1. Detect label errors
2. Remove or correct errors
3. Retrain on cleaned data
"""
from cleanlab.classification import CleanLearning
base_model = LogisticRegression(max_iter=1000)
# CleanLearning automatically handles label errors during training
cl = CleanLearning(base_model, seed=42)
cl.fit(X_train, y_train_noisy)
y_pred = cl.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"CleanLearning accuracy: {accuracy:.3f}")
label_issues_df = cl.get_label_issues()
print(f"\nLabel issues info:")
print(label_issues_df.head(10))
return cl, label_issues_df
def confident_learning_manual(pred_probs, labels):
"""
Manual implementation of Confident Learning
- Compute per-class thresholds
- Build the Confident Joint matrix
"""
n_classes = pred_probs.shape[1]
n_samples = len(labels)
# Per-class threshold: mean predicted probability for that class's samples
thresholds = np.zeros(n_classes)
for c in range(n_classes):
class_mask = labels == c
if class_mask.sum() > 0:
thresholds[c] = pred_probs[class_mask, c].mean()
# Confident Joint matrix C[s][y]
# s: estimated true class, y: given label
C = np.zeros((n_classes, n_classes), dtype=int)
for i in range(n_samples):
y_given = labels[i]
over_threshold = pred_probs[i] >= thresholds
if over_threshold.sum() == 0:
y_hat = pred_probs[i].argmax()
else:
y_hat = (pred_probs[i] * over_threshold).argmax()
C[y_hat, y_given] += 1
off_diagonal = C.copy()
np.fill_diagonal(off_diagonal, 0)
print("Confident Joint Matrix (rows: estimated true class, cols: given label):")
print(C)
print(f"\nEstimated mislabeled samples: {off_diagonal.sum()}")
return C
2.2 Data Outlier Detection
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
import numpy as np
class DataQualityChecker:
"""Comprehensive data quality inspection toolkit"""
def __init__(self):
self.quality_report = {}
def check_class_distribution(self, labels):
"""Check class imbalance"""
from collections import Counter
import pandas as pd
counts = Counter(labels)
total = len(labels)
df = pd.DataFrame([
{'class': c, 'count': n, 'percentage': 100 * n / total}
for c, n in sorted(counts.items())
])
imbalance_ratio = max(counts.values()) / min(counts.values())
print("Class distribution:")
print(df.to_string(index=False))
print(f"\nImbalance ratio: {imbalance_ratio:.2f}x")
if imbalance_ratio > 10:
print("WARNING: Severe class imbalance!")
elif imbalance_ratio > 3:
print("NOTICE: Class imbalance detected")
self.quality_report['class_imbalance_ratio'] = imbalance_ratio
return df
def detect_outliers(self, X, method='isolation_forest', contamination=0.1):
"""
Outlier detection
Args:
method: 'isolation_forest' or 'lof'
contamination: expected fraction of outliers
"""
if method == 'isolation_forest':
detector = IsolationForest(
contamination=contamination,
random_state=42
)
elif method == 'lof':
detector = LocalOutlierFactor(
contamination=contamination,
novelty=True
)
predictions = detector.fit_predict(X)
outlier_mask = predictions == -1
outlier_indices = np.where(outlier_mask)[0]
print(f"Outliers detected: {outlier_mask.sum()} / {len(X)} ({outlier_mask.mean():.2%})")
self.quality_report['n_outliers'] = outlier_mask.sum()
return outlier_indices, outlier_mask
def check_duplicates(self, X, y=None, threshold=0.99):
"""
Detect duplicate samples
Args:
threshold: similarity threshold (1.0 = exact match)
"""
from sklearn.metrics.pairwise import cosine_similarity
if len(X) > 10000:
sample_idx = np.random.choice(len(X), 10000, replace=False)
X_sample = X[sample_idx]
else:
X_sample = X
sample_idx = np.arange(len(X))
sim_matrix = cosine_similarity(X_sample)
np.fill_diagonal(sim_matrix, 0)
duplicate_pairs = np.argwhere(sim_matrix >= threshold)
duplicate_pairs = duplicate_pairs[duplicate_pairs[:, 0] < duplicate_pairs[:, 1]]
print(f"Duplicate pairs found: {len(duplicate_pairs)}")
if y is not None and len(duplicate_pairs) > 0:
label_conflicts = 0
for i, j in duplicate_pairs:
if y[sample_idx[i]] != y[sample_idx[j]]:
label_conflicts += 1
print(f"Label-conflicting duplicate pairs: {label_conflicts}")
return duplicate_pairs
def compute_data_quality_score(self, X, y):
"""Compute overall data quality score"""
scores = {}
if hasattr(X, 'isnull'):
missing_rate = X.isnull().mean().mean()
else:
missing_rate = np.isnan(X).mean()
scores['completeness'] = 1 - missing_rate
from collections import Counter
counts = Counter(y)
n_classes = len(counts)
ideal_count = len(y) / n_classes
balance_score = sum(
min(c, ideal_count) / ideal_count
for c in counts.values()
) / n_classes
scores['balance'] = balance_score
overall_score = np.mean(list(scores.values()))
scores['overall'] = overall_score
print("Data quality scores:")
for metric, score in scores.items():
print(f" {metric}: {score:.3f}")
return scores
3. Labeling Strategies
3.1 High-Quality Labeling Guidelines
def compute_inter_rater_agreement(annotations):
"""
Compute inter-rater agreement
Args:
annotations: array of shape (n_samples, n_raters)
Returns:
cohen_kappa: Cohen's Kappa score
majority_labels: majority-vote labels
"""
from sklearn.metrics import cohen_kappa_score
import numpy as np
n_samples, n_raters = annotations.shape
kappa_scores = []
for i in range(n_raters):
for j in range(i+1, n_raters):
kappa = cohen_kappa_score(
annotations[:, i],
annotations[:, j]
)
kappa_scores.append((i, j, kappa))
print(f"Rater {i} vs Rater {j}: kappa = {kappa:.3f}")
mean_kappa = np.mean([k for _, _, k in kappa_scores])
print(f"\nMean Cohen's Kappa: {mean_kappa:.3f}")
if mean_kappa < 0.2:
interpretation = "Slight agreement"
elif mean_kappa < 0.4:
interpretation = "Fair agreement"
elif mean_kappa < 0.6:
interpretation = "Moderate agreement"
elif mean_kappa < 0.8:
interpretation = "Substantial agreement"
else:
interpretation = "Almost perfect agreement"
print(f"Interpretation: {interpretation}")
from scipy import stats
majority_labels = stats.mode(annotations, axis=1)[0].flatten()
print(f"\nMajority-vote labels generated")
return mean_kappa, majority_labels
3.2 Weak Supervision with Snorkel
def snorkel_programmatic_labeling_demo():
"""
Programmatic labeling demo with Snorkel
pip install snorkel
"""
from snorkel.labeling import labeling_function, PandasLFApplier
from snorkel.labeling.model import LabelModel
import re
POSITIVE = 1
NEGATIVE = 0
ABSTAIN = -1
@labeling_function()
def lf_positive_keywords(x):
"""Label based on positive keywords"""
positive_words = ['good', 'great', 'excellent', 'amazing', 'love', 'best']
if any(word in x.text.lower() for word in positive_words):
return POSITIVE
return ABSTAIN
@labeling_function()
def lf_negative_keywords(x):
"""Label based on negative keywords"""
negative_words = ['bad', 'terrible', 'awful', 'hate', 'worst', 'horrible']
if any(word in x.text.lower() for word in negative_words):
return NEGATIVE
return ABSTAIN
@labeling_function()
def lf_rating_high(x):
"""Label based on high rating"""
if hasattr(x, 'rating') and x.rating >= 4:
return POSITIVE
return ABSTAIN
@labeling_function()
def lf_rating_low(x):
"""Label based on low rating"""
if hasattr(x, 'rating') and x.rating <= 2:
return NEGATIVE
return ABSTAIN
@labeling_function()
def lf_negation_check(x):
"""Detect negation"""
text = x.text.lower()
if re.search(r"not (good|great|excellent)", text):
return NEGATIVE
if re.search(r"not (bad|terrible)", text):
return POSITIVE
return ABSTAIN
lfs = [
lf_positive_keywords,
lf_negative_keywords,
lf_rating_high,
lf_rating_low,
lf_negation_check,
]
print("Snorkel Programmatic Labeling Pipeline:")
print("1. Domain experts write labeling functions (LFs)")
print("2. Apply LFs to unlabeled data")
print("3. Combine multiple LFs with Label Model (noise-aware)")
print("4. Train downstream model on soft labels")
print(f"\nDefined labeling functions: {len(lfs)}")
return lfs
4. Active Learning
Active learning minimizes labeling cost by selecting the most informative samples from a large unlabeled pool.
import numpy as np
import torch
import torch.nn as nn
class ActiveLearner:
"""
Active learning with multiple sampling strategies
"""
def __init__(self, model, strategy='uncertainty', n_initial=100):
self.model = model
self.strategy = strategy
self.n_initial = n_initial
def uncertainty_sampling(self, X_unlabeled, n_samples):
"""
Uncertainty sampling: select the samples the model is least confident about
"""
probs = self._get_probs(X_unlabeled)
if self.strategy == 'least_confidence':
uncertainty = 1 - probs.max(axis=1)
elif self.strategy == 'margin':
sorted_probs = np.sort(probs, axis=1)[:, ::-1]
uncertainty = 1 - (sorted_probs[:, 0] - sorted_probs[:, 1])
elif self.strategy == 'entropy':
uncertainty = -np.sum(probs * np.log(probs + 1e-10), axis=1)
else:
uncertainty = 1 - probs.max(axis=1)
selected_indices = np.argsort(uncertainty)[-n_samples:]
return selected_indices, uncertainty
def diversity_sampling(self, X_unlabeled, X_labeled, n_samples):
"""
Diversity-based sampling (CoreSet)
Select samples most dissimilar to already-labeled data
"""
from sklearn.metrics.pairwise import euclidean_distances
selected = []
remaining = list(range(len(X_unlabeled)))
current_labeled = X_labeled.copy()
for _ in range(n_samples):
dists = euclidean_distances(
X_unlabeled[remaining],
current_labeled
).min(axis=1)
best_idx = remaining[np.argmax(dists)]
selected.append(best_idx)
remaining.remove(best_idx)
current_labeled = np.vstack([current_labeled, X_unlabeled[best_idx]])
return np.array(selected)
def batch_mode_active_learning(self, X_pool, y_oracle, X_test, y_test,
n_iterations=10, n_per_iter=50):
"""
Batch-mode active learning loop
Args:
X_pool: unlabeled data pool
y_oracle: true labels (oracle)
n_per_iter: number of samples to label per iteration
"""
initial_indices = np.random.choice(
len(X_pool), self.n_initial, replace=False
)
labeled_indices = list(initial_indices)
unlabeled_indices = [
i for i in range(len(X_pool)) if i not in labeled_indices
]
accuracies = []
n_labeled_list = []
for iteration in range(n_iterations):
X_labeled = X_pool[labeled_indices]
y_labeled = y_oracle[labeled_indices]
self.model.fit(X_labeled, y_labeled)
acc = accuracy_score(y_test, self.model.predict(X_test))
accuracies.append(acc)
n_labeled_list.append(len(labeled_indices))
print(f"Iteration {iteration+1}: n_labeled={len(labeled_indices)}, accuracy={acc:.3f}")
if len(unlabeled_indices) == 0:
break
X_unlabeled = X_pool[unlabeled_indices]
selected, _ = self.uncertainty_sampling(X_unlabeled, n_per_iter)
actual_selected = [unlabeled_indices[i] for i in selected]
labeled_indices.extend(actual_selected)
unlabeled_indices = [
i for i in unlabeled_indices if i not in actual_selected
]
return accuracies, n_labeled_list
def _get_probs(self, X):
if hasattr(self.model, 'predict_proba'):
return self.model.predict_proba(X)
else:
logits = self.model.predict(X)
from scipy.special import softmax
return softmax(logits, axis=1)
5. Data Augmentation Deep Dive
5.1 Image Augmentation with Albumentations
import albumentations as A
from albumentations.pytorch import ToTensorV2
import numpy as np
def get_train_transforms(image_size=224):
"""
Strong augmentation pipeline for training (Albumentations)
"""
return A.Compose([
# Geometric transforms
A.RandomResizedCrop(
height=image_size,
width=image_size,
scale=(0.7, 1.0),
ratio=(0.75, 1.33)
),
A.HorizontalFlip(p=0.5),
A.ShiftScaleRotate(
shift_limit=0.1,
scale_limit=0.2,
rotate_limit=30,
p=0.5
),
# Color transforms
A.ColorJitter(
brightness=0.3,
contrast=0.3,
saturation=0.3,
hue=0.1,
p=0.8
),
A.ToGray(p=0.1),
A.RandomGamma(gamma_limit=(80, 120), p=0.3),
# Noise and blur
A.GaussNoise(var_limit=(10, 50), p=0.3),
A.OneOf([
A.MotionBlur(blur_limit=7),
A.GaussianBlur(blur_limit=7),
A.MedianBlur(blur_limit=7),
], p=0.3),
# Cutout / random erasing
A.CoarseDropout(
max_holes=8,
max_height=32,
max_width=32,
fill_value=0,
p=0.3
),
# Grid distortion
A.OneOf([
A.GridDistortion(p=1),
A.ElasticTransform(p=1),
A.OpticalDistortion(p=1),
], p=0.2),
# Normalize and convert to tensor
A.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
),
ToTensorV2(),
])
def mixup_augmentation(images, labels, alpha=0.4):
"""
MixUp: blend two images and their labels
Zhang et al., "mixup: Beyond Empirical Risk Minimization" (2018)
"""
import torch
batch_size = images.shape[0]
lam = np.random.beta(alpha, alpha)
perm = torch.randperm(batch_size)
mixed_images = lam * images + (1 - lam) * images[perm]
labels_a = labels
labels_b = labels[perm]
return mixed_images, labels_a, labels_b, lam
def cutmix_augmentation(images, labels, alpha=1.0):
"""
CutMix: paste a patch from one image onto another
Yun et al., "CutMix: Training Strategy that Makes Use of
Sample Mixing" (2019)
"""
import torch
batch_size, c, h, w = images.shape
lam = np.random.beta(alpha, alpha)
perm = torch.randperm(batch_size)
cut_ratio = np.sqrt(1 - lam)
cut_h = int(h * cut_ratio)
cut_w = int(w * cut_ratio)
cx = np.random.randint(w)
cy = np.random.randint(h)
bbx1 = np.clip(cx - cut_w // 2, 0, w)
bby1 = np.clip(cy - cut_h // 2, 0, h)
bbx2 = np.clip(cx + cut_w // 2, 0, w)
bby2 = np.clip(cy + cut_h // 2, 0, h)
mixed_images = images.clone()
mixed_images[:, :, bby1:bby2, bbx1:bbx2] = images[perm, :, bby1:bby2, bbx1:bbx2]
lam = 1 - ((bbx2 - bbx1) * (bby2 - bby1) / (w * h))
labels_a = labels
labels_b = labels[perm]
return mixed_images, labels_a, labels_b, lam
5.2 Text Augmentation
class TextAugmenter:
"""
Text data augmentation techniques
"""
def __init__(self):
pass
def eda_synonym_replacement(self, text, n=1):
"""
EDA: Synonym Replacement
Wei and Zou, "EDA: Easy Data Augmentation Techniques
for Boosting Performance on Text Classification Tasks" (2019)
"""
import nltk
from nltk.corpus import wordnet
words = text.split()
new_words = words.copy()
stop_words = set(['a', 'an', 'the', 'is', 'are', 'was', 'were',
'i', 'me', 'my', 'we', 'our', 'you', 'your'])
replaceable = [
(i, word) for i, word in enumerate(words)
if word.lower() not in stop_words
]
np.random.shuffle(replaceable)
replaced = 0
for idx, word in replaceable:
if replaced >= n:
break
synsets = wordnet.synsets(word)
if synsets:
synonyms = [
lemma.name() for synset in synsets
for lemma in synset.lemmas()
if lemma.name() != word
]
if synonyms:
new_words[idx] = np.random.choice(synonyms).replace('_', ' ')
replaced += 1
return ' '.join(new_words)
def eda_random_swap(self, text, n=1):
"""EDA: Random Swap"""
words = text.split()
if len(words) < 2:
return text
new_words = words.copy()
for _ in range(n):
i, j = np.random.choice(len(new_words), 2, replace=False)
new_words[i], new_words[j] = new_words[j], new_words[i]
return ' '.join(new_words)
def eda_random_deletion(self, text, p=0.1):
"""EDA: Random Deletion"""
words = text.split()
if len(words) == 1:
return text
new_words = [word for word in words if np.random.random() > p]
return ' '.join(new_words) if new_words else np.random.choice(words)
def back_translation(self, text, src_lang='en', pivot_lang='fr'):
"""
Back-Translation: en -> fr -> en
Preserves meaning while diversifying expression
"""
try:
from transformers import pipeline
translator_fwd = pipeline(
f"translation_{src_lang}_to_{pivot_lang}",
model=f"Helsinki-NLP/opus-mt-{src_lang}-{pivot_lang}"
)
translator_bwd = pipeline(
f"translation_{pivot_lang}_to_{src_lang}",
model=f"Helsinki-NLP/opus-mt-{pivot_lang}-{src_lang}"
)
pivot_text = translator_fwd(text)[0]['translation_text']
back_translated = translator_bwd(pivot_text)[0]['translation_text']
return back_translated
except Exception as e:
print(f"Translation error: {e}")
return text
def augment_dataset(self, texts, labels, n_aug=4):
"""Augment entire dataset"""
augmented_texts = []
augmented_labels = []
for text, label in zip(texts, labels):
augmented_texts.append(text)
augmented_labels.append(label)
for _ in range(n_aug):
aug_type = np.random.choice(
['synonym', 'swap', 'deletion']
)
if aug_type == 'synonym':
aug_text = self.eda_synonym_replacement(text)
elif aug_type == 'swap':
aug_text = self.eda_random_swap(text)
else:
aug_text = self.eda_random_deletion(text)
augmented_texts.append(aug_text)
augmented_labels.append(label)
print(f"Original samples: {len(texts)}")
print(f"Augmented samples: {len(augmented_texts)}")
return augmented_texts, augmented_labels
5.3 Automatic Augmentation (RandAugment, SpecAugment)
import torch
import torchvision.transforms as transforms
def get_randaugment_transforms(n=2, m=9, image_size=224):
"""
RandAugment: randomized augmentation policy
Cubuk et al., "RandAugment: Practical Automated Data Augmentation" (2019)
Args:
n: number of augmentation operations to apply
m: magnitude of augmentation (0-30)
"""
transform = transforms.Compose([
transforms.RandomResizedCrop(image_size),
transforms.RandAugment(num_ops=n, magnitude=m),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)
])
return transform
def specaugment_for_audio(spectrogram, freq_mask_param=27, time_mask_param=70):
"""
SpecAugment: audio spectrogram augmentation
Park et al., "SpecAugment: A Simple Data Augmentation Method
for Automatic Speech Recognition" (2019)
Args:
spectrogram: input spectrogram (freq, time)
freq_mask_param: max frequency mask size
time_mask_param: max time mask size
"""
import torchaudio.transforms as T
freq_mask = T.FrequencyMasking(freq_mask_param=freq_mask_param)
time_mask = T.TimeMasking(time_mask_param=time_mask_param)
augmented = freq_mask(spectrogram)
augmented = time_mask(augmented)
return augmented
6. Synthetic Data Generation
6.1 Synthetic Text with LLMs
class SyntheticTextGenerator:
"""Generate synthetic training data using LLMs"""
def __init__(self, llm_client, model_name='gpt-4'):
self.llm = llm_client
self.model_name = model_name
def generate_classification_data(self, class_name, n_samples=100,
domain='general', style='diverse'):
"""
Generate synthetic data for a classification class
Args:
class_name: name of the target class
n_samples: number of samples to generate
domain: domain context (medical, legal, etc.)
style: writing style (formal, casual, diverse)
"""
prompt = f"""Generate {n_samples} diverse text examples for the class '{class_name}'.
Domain: {domain}
Style: {style}
Requirements:
- Each example should be 1-3 sentences
- Vary the vocabulary, sentence structure, and perspective
- Include both simple and complex cases
- Format as a JSON list: ["example1", "example2", ...]
Generate realistic examples that would appear in real-world {domain} data."""
print(f"Synthetic data generation prompt for '{class_name}':")
print(prompt[:300] + "...")
print(f"\nPlanning to generate {n_samples} samples")
def generate_edge_cases(self, class_examples, n_edge_cases=20):
"""Generate challenging edge cases"""
prompt = f"""Based on these training examples:
{chr(10).join(class_examples[:5])}
Generate {n_edge_cases} challenging edge cases that:
1. Are ambiguous between different categories
2. Contain misleading keywords
3. Have unusual sentence structures
4. Test the model's true understanding
Format as JSON list."""
print("Edge case generation prompt ready")
def augment_with_paraphrase(self, texts, n_paraphrases=3):
"""Generate paraphrases using LLM"""
augmented = []
for text in texts:
prompt = f"""Paraphrase the following text {n_paraphrases} times.
Keep the same meaning but use different words and sentence structures.
Original: "{text}"
Format as JSON list of {n_paraphrases} paraphrases."""
augmented.append({
'original': text,
'paraphrases': []
})
return augmented
class SyntheticImageGenerator:
"""Generate synthetic images with Diffusion Models"""
def __init__(self, model_name='stabilityai/stable-diffusion-2-1'):
self.model_name = model_name
def setup_pipeline(self):
"""
Initialize Stable Diffusion pipeline
pip install diffusers accelerate
"""
try:
from diffusers import StableDiffusionPipeline
import torch
self.pipe = StableDiffusionPipeline.from_pretrained(
self.model_name,
torch_dtype=torch.float16
)
if torch.cuda.is_available():
self.pipe = self.pipe.to('cuda')
print(f"Pipeline initialized: {self.model_name}")
except Exception as e:
print(f"Pipeline initialization error: {e}")
def generate_class_images(self, class_name, n_images=50,
style_prompt="high quality, photorealistic"):
"""
Generate synthetic images for a class
Args:
class_name: target class name
n_images: number of images to generate
style_prompt: style guidance
"""
prompts = [
f"A photo of {class_name}, {style_prompt}",
f"{class_name} in natural environment, {style_prompt}",
f"Close-up of {class_name}, detailed, {style_prompt}",
f"{class_name} from different angle, {style_prompt}",
]
print(f"Synthetic image generation plan for '{class_name}':")
print(f"Images to generate: {n_images}")
print("Sample prompts:")
for p in prompts[:2]:
print(f" - {p}")
def evaluate_synthetic_quality(self, real_images, synthetic_images):
"""Evaluate synthetic image quality using FID score"""
try:
from torchmetrics.image.fid import FrechetInceptionDistance
fid = FrechetInceptionDistance(feature=64)
fid.update(real_images, real=True)
fid.update(synthetic_images, real=False)
fid_score = fid.compute()
print(f"FID Score: {fid_score:.2f}")
print("(Lower is better; 0 is perfect)")
return fid_score
except Exception as e:
print(f"FID computation error: {e}")
7. The Data Flywheel
7.1 Data Flywheel Concept
The data flywheel is a virtuous cycle of product, data, and model:
- Better model → Better product
- Better product → More users
- More users → More data
- More data → Better model
class DataFlywheelPipeline:
"""Data Flywheel implementation pipeline"""
def __init__(self, model, feedback_store):
self.model = model
self.feedback_store = feedback_store
self.version = 0
def collect_production_feedback(self, predictions, user_feedback):
"""
Collect user feedback from production
Args:
predictions: model predictions
user_feedback: user corrections/confirmations
"""
valuable_samples = []
for pred, feedback in zip(predictions, user_feedback):
if feedback['corrected']:
sample = {
'input': feedback['input'],
'model_prediction': pred,
'true_label': feedback['correction'],
'confidence': pred['confidence'],
'timestamp': feedback['timestamp'],
'value': 'high'
}
valuable_samples.append(sample)
elif feedback['confirmed'] and pred['confidence'] < 0.7:
sample = {
'input': feedback['input'],
'true_label': pred['label'],
'confidence': pred['confidence'],
'value': 'medium'
}
valuable_samples.append(sample)
print(f"Valuable samples collected: {len(valuable_samples)}")
return valuable_samples
def prioritize_labeling_queue(self, unlabeled_pool, budget):
"""
Prioritize labeling queue
Priority criteria:
1. Model uncertainty (higher = more priority)
2. Class rarity (rarer classes = more priority)
3. Data diversity (more different from existing = more priority)
"""
priorities = []
for sample in unlabeled_pool:
score = 0
uncertainty = 1 - max(sample['predicted_probs'])
score += uncertainty * 0.5
predicted_class = max(sample['predicted_probs'],
key=sample['predicted_probs'].get)
rarity = 1 / (sample['class_counts'].get(predicted_class, 1) + 1)
score += rarity * 0.3
diversity = np.std(list(sample['predicted_probs'].values()))
score += diversity * 0.2
priorities.append((sample, score))
priorities.sort(key=lambda x: x[1], reverse=True)
selected = [s for s, _ in priorities[:budget]]
return selected
8. Data Pipeline Best Practices
8.1 Reproducible Data Processing
import hashlib
import json
import os
from pathlib import Path
from datetime import datetime
class ReproducibleDataPipeline:
"""
Reproducible data pipeline
- Tracks all processing steps
- Verifies integrity via data hashes
- Supports version control
"""
def __init__(self, pipeline_name, base_dir='data/processed'):
self.pipeline_name = pipeline_name
self.base_dir = Path(base_dir)
self.steps = []
self.metadata = {
'pipeline': pipeline_name,
'created_at': datetime.now().isoformat(),
'steps': []
}
def add_step(self, step_name, func, *args, **kwargs):
"""Add processing step"""
self.steps.append({
'name': step_name,
'func': func,
'args': args,
'kwargs': kwargs
})
def compute_hash(self, data):
"""Compute data hash"""
if isinstance(data, np.ndarray):
return hashlib.md5(data.tobytes()).hexdigest()
elif isinstance(data, (list, dict)):
return hashlib.md5(
json.dumps(data, sort_keys=True, default=str).encode()
).hexdigest()
else:
return hashlib.md5(str(data).encode()).hexdigest()
def run(self, input_data):
"""Execute pipeline"""
data = input_data
for step in self.steps:
print(f"Running: {step['name']}")
hash_before = self.compute_hash(data)
data = step['func'](data, *step['args'], **step['kwargs'])
hash_after = self.compute_hash(data)
self.metadata['steps'].append({
'name': step['name'],
'hash_before': hash_before,
'hash_after': hash_after,
'timestamp': datetime.now().isoformat()
})
print(f" Done: {hash_before[:8]} -> {hash_after[:8]}")
metadata_path = self.base_dir / f"{self.pipeline_name}_metadata.json"
metadata_path.parent.mkdir(parents=True, exist_ok=True)
with open(metadata_path, 'w') as f:
json.dump(self.metadata, f, indent=2)
print(f"\nPipeline complete. Metadata: {metadata_path}")
return data
class DataVersionControl:
"""DVC-style data version control"""
def __init__(self, storage_path='data/.dvc'):
self.storage_path = Path(storage_path)
self.storage_path.mkdir(parents=True, exist_ok=True)
def add(self, data_path):
"""Start tracking a data file"""
data_path = Path(data_path)
with open(data_path, 'rb') as f:
file_hash = hashlib.md5(f.read()).hexdigest()
dvc_file = data_path.with_suffix('.dvc')
dvc_metadata = {
'md5': file_hash,
'size': os.path.getsize(data_path),
'path': str(data_path.name),
'version': datetime.now().isoformat()
}
with open(dvc_file, 'w') as f:
json.dump(dvc_metadata, f, indent=2)
print(f"Tracking: {data_path}")
print(f" MD5: {file_hash}")
print(f" Metafile: {dvc_file}")
return file_hash
def create_data_contract(self, schema):
"""
Define a Data Contract
- Schema definition
- Quality criteria
- SLA requirements
"""
contract = {
'version': '1.0',
'schema': schema,
'quality_rules': {
'completeness': {'min_threshold': 0.99},
'accuracy': {'label_error_rate': {'max': 0.05}},
'consistency': {'duplicate_rate': {'max': 0.01}},
},
'sla': {
'update_frequency': 'daily',
'max_staleness_hours': 24,
}
}
return contract
def demonstrate_full_data_pipeline():
"""Full Data-Centric AI pipeline demonstration"""
print("=" * 60)
print("Data-Centric AI Pipeline Demo")
print("=" * 60)
print("\nStep 1: Data Quality Assessment")
print(" - Measure label error rate")
print(" - Detect outliers")
print(" - Remove duplicates")
print(" - Analyze class distribution")
print("\nStep 2: Label Refinement")
print(" - Detect errors with Cleanlab")
print(" - Correct via majority vote / expert review")
print(" - Improve inter-rater agreement")
print("\nStep 3: Data Augmentation")
print(" - Images: Albumentations")
print(" - Text: EDA, back-translation")
print(" - Search for automatic augmentation policies")
print("\nStep 4: Synthetic Data Generation")
print(" - Synthesize text with LLMs")
print(" - Synthesize images with Diffusion Models")
print(" - Filter by quality (FID, classifier confidence)")
print("\nStep 5: Active Learning")
print(" - Prioritize labeling with uncertainty sampling")
print(" - Ensure diversity with CoreSet method")
print("\nStep 6: Version Control and Monitoring")
print(" - Version data with DVC")
print(" - Maintain quality standards with Data Contracts")
print(" - Continuously improve with the Data Flywheel")
print("\nConclusion: Improving data quality is often more impactful than improving models!")
9. Summary and Practical Guide
Data-Centric AI Checklist
1. Data Collection
- Write labeling guidelines with domain experts
- Measure inter-rater agreement (target Cohen's Kappa > 0.8)
- Monitor class distribution during collection
2. Data Cleaning
- Detect and fix label errors with Cleanlab
- Remove duplicate samples
- Review outliers (remove or correct)
3. Data Augmentation
- Apply augmentation only to training data (not validation/test)
- Validate data distribution after augmentation
- Choose augmentation techniques appropriate for your domain
4. Continuous Improvement
- Collect production error cases
- Use active learning to make labeling efficient
- Conduct regular data quality audits
Recommended Tools:
- Label quality: Cleanlab (https://github.com/cleanlab/cleanlab)
- Weak supervision: Snorkel (https://snorkel.ai/)
- Labeling platform: Label Studio (https://labelstud.io/)
- Image augmentation: Albumentations (https://albumentations.ai/)
- Active learning: modAL (https://modal.readthedocs.io/)
- Data version control: DVC (https://dvc.org/)
Data-Centric AI is not merely a matter of tools or techniques. It is a mindset shift — from chasing "better models" to chasing "better data." In many real-world projects, this shift alone can deliver dramatic performance improvements.
The most important insight from Andrew Ng's Data-Centric AI movement: your model is only as good as the data it learns from. Investing in data quality, labeling consistency, and systematic data improvement is often the highest-return activity in an AI project.