Skip to content

필사 모드: Data-Centric AI Complete Guide: Maximizing AI Performance with High-Quality Data

English
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

Data-Centric AI Complete Guide: Maximizing AI Performance with High-Quality Data

In 2021, Andrew Ng posed a provocative question to the AI community: "Instead of focusing on improving model architectures, what if we focused on improving data quality?" This became the starting point of the **Data-Centric AI** movement.

While the traditional model-centric approach chases "better algorithms," the data-centric approach chases "better data." This guide covers every aspect of Data-Centric AI with hands-on code, from theory to production practice.

1. Data-Centric AI vs Model-Centric AI

1.1 The Paradigm Shift

**Model-Centric AI**

- Data is fixed; improve the code

- Search for better architectures

- Focus on hyperparameter tuning

- Classic benchmarks: dataset is fixed, only models change

**Data-Centric AI**

- Model is fixed; improve the data

- Fix label errors

- Improve labeling guidelines for consistency

- Add data augmentation and synthetic data

1.2 Andrew Ng's Core Argument

Andrew Ng states:

> "AI system = Code (model/algorithm) + Data"

In many practical AI projects, the code is already good enough. The bottleneck is data quality.

**Experiment Results (Andrew Ng, DeepLearning.AI)**:

On a manufacturing inspection dataset with label noise:

- Baseline: 76.2%

- Better model only: +0.02% improvement (76.22%)

- Better data only: +16.9% improvement (93.1%)

This result demonstrates that in many real-world scenarios, improving data quality is far more effective than improving model architecture.

1.3 When Is Data-Centric Most Effective?

Data-centric approaches are especially impactful when:

1. **Small datasets**: Quality matters more when you have fewer than a few thousand examples

2. **High label noise**: When the label error rate exceeds 5%

3. **Domain-specific tasks**: Specialized domains without strong pre-trained models

4. **Imbalanced classes**: Rare class quality determines overall performance

5. **Strict accuracy requirements**: Medical, financial, or safety-critical applications

from sklearn.model_selection import train_test_split

from sklearn.metrics import accuracy_score

def compare_model_vs_data_centric(X, y, model_class, noise_level=0.1):

"""

Compare model-centric vs data-centric approaches

Args:

X: feature matrix

y: labels

model_class: base model class

noise_level: fraction of noisy labels

"""

Add label noise

noisy_y = y.copy()

noise_idx = np.random.choice(len(y), int(len(y) * noise_level), replace=False)

n_classes = len(np.unique(y))

for idx in noise_idx:

wrong_labels = [l for l in range(n_classes) if l != y[idx]]

noisy_y[idx] = np.random.choice(wrong_labels)

X_train, X_test, y_train_clean, y_test = train_test_split(

X, y, test_size=0.2, random_state=42

)

_, _, y_train_noisy, _ = train_test_split(

X, noisy_y, test_size=0.2, random_state=42

)

--- Model-centric approach ---

Noisy data + base model

base_model = model_class()

base_model.fit(X_train, y_train_noisy)

base_acc = accuracy_score(y_test, base_model.predict(X_test))

Noisy data + more complex model

from sklearn.ensemble import GradientBoostingClassifier

complex_model = GradientBoostingClassifier(n_estimators=200)

complex_model.fit(X_train, y_train_noisy)

complex_acc = accuracy_score(y_test, complex_model.predict(X_test))

--- Data-centric approach ---

Clean data + base model

clean_model = model_class()

clean_model.fit(X_train, y_train_clean)

clean_acc = accuracy_score(y_test, clean_model.predict(X_test))

print("=" * 50)

print("Model-Centric vs Data-Centric Comparison")

print("=" * 50)

print(f"Base model + noisy data: {base_acc:.3f}")

print(f"Complex model + noisy data: {complex_acc:.3f}")

print(f"Base model + clean data: {clean_acc:.3f}")

print(f"\nModel improvement effect: +{(complex_acc - base_acc):.3f}")

print(f"Data improvement effect: +{(clean_acc - base_acc):.3f}")

return {

'base_model_noisy_data': base_acc,

'complex_model_noisy_data': complex_acc,

'base_model_clean_data': clean_acc

}

2. Data Quality Measurement

2.1 Confident Learning and Label Error Detection

Confident Learning, proposed by Northcutt et al., uses cross-validated prediction probabilities to systematically detect label errors.

Core idea: "If the model predicts class A with high confidence but the label says class B, the label is likely wrong."

from cleanlab.filter import find_label_issues

from cleanlab.classification import CleanLearning

from sklearn.linear_model import LogisticRegression

from sklearn.model_selection import cross_val_predict

def detect_label_errors_cleanlab(X, y, model=None):

"""

Detect label errors using Cleanlab

Args:

X: feature matrix

y: label array

model: classifier (default: LogisticRegression)

Returns:

label_issues: indices and info about label issues

"""

if model is None:

model = LogisticRegression(max_iter=1000)

Predict class probabilities via cross-validation

pred_probs = cross_val_predict(

model, X, y,

cv=5,

method='predict_proba'

)

Find label issues

label_issues = find_label_issues(

labels=y,

pred_probs=pred_probs,

return_indices_ranked_by='self_confidence'

)

print(f"Total samples: {len(y)}")

print(f"Label issues found: {len(label_issues)}")

print(f"Error rate: {len(label_issues)/len(y):.2%}")

return label_issues

def cleanlab_full_pipeline(X_train, y_train_noisy, X_test, y_test):

"""

Complete Cleanlab pipeline:

1. Detect label errors

2. Remove or correct errors

3. Retrain on cleaned data

"""

from cleanlab.classification import CleanLearning

base_model = LogisticRegression(max_iter=1000)

CleanLearning automatically handles label errors during training

cl = CleanLearning(base_model, seed=42)

cl.fit(X_train, y_train_noisy)

y_pred = cl.predict(X_test)

accuracy = accuracy_score(y_test, y_pred)

print(f"CleanLearning accuracy: {accuracy:.3f}")

label_issues_df = cl.get_label_issues()

print(f"\nLabel issues info:")

print(label_issues_df.head(10))

return cl, label_issues_df

def confident_learning_manual(pred_probs, labels):

"""

Manual implementation of Confident Learning

- Compute per-class thresholds

- Build the Confident Joint matrix

"""

n_classes = pred_probs.shape[1]

n_samples = len(labels)

Per-class threshold: mean predicted probability for that class's samples

thresholds = np.zeros(n_classes)

for c in range(n_classes):

class_mask = labels == c

if class_mask.sum() > 0:

thresholds[c] = pred_probs[class_mask, c].mean()

Confident Joint matrix C[s][y]

s: estimated true class, y: given label

C = np.zeros((n_classes, n_classes), dtype=int)

for i in range(n_samples):

y_given = labels[i]

over_threshold = pred_probs[i] >= thresholds

if over_threshold.sum() == 0:

y_hat = pred_probs[i].argmax()

else:

y_hat = (pred_probs[i] * over_threshold).argmax()

C[y_hat, y_given] += 1

off_diagonal = C.copy()

np.fill_diagonal(off_diagonal, 0)

print("Confident Joint Matrix (rows: estimated true class, cols: given label):")

print(C)

print(f"\nEstimated mislabeled samples: {off_diagonal.sum()}")

return C

2.2 Data Outlier Detection

from sklearn.ensemble import IsolationForest

from sklearn.neighbors import LocalOutlierFactor

class DataQualityChecker:

"""Comprehensive data quality inspection toolkit"""

def __init__(self):

self.quality_report = {}

def check_class_distribution(self, labels):

"""Check class imbalance"""

from collections import Counter

counts = Counter(labels)

total = len(labels)

df = pd.DataFrame([

{'class': c, 'count': n, 'percentage': 100 * n / total}

for c, n in sorted(counts.items())

])

imbalance_ratio = max(counts.values()) / min(counts.values())

print("Class distribution:")

print(df.to_string(index=False))

print(f"\nImbalance ratio: {imbalance_ratio:.2f}x")

if imbalance_ratio > 10:

print("WARNING: Severe class imbalance!")

elif imbalance_ratio > 3:

print("NOTICE: Class imbalance detected")

self.quality_report['class_imbalance_ratio'] = imbalance_ratio

return df

def detect_outliers(self, X, method='isolation_forest', contamination=0.1):

"""

Outlier detection

Args:

method: 'isolation_forest' or 'lof'

contamination: expected fraction of outliers

"""

if method == 'isolation_forest':

detector = IsolationForest(

contamination=contamination,

random_state=42

)

elif method == 'lof':

detector = LocalOutlierFactor(

contamination=contamination,

novelty=True

)

predictions = detector.fit_predict(X)

outlier_mask = predictions == -1

outlier_indices = np.where(outlier_mask)[0]

print(f"Outliers detected: {outlier_mask.sum()} / {len(X)} ({outlier_mask.mean():.2%})")

self.quality_report['n_outliers'] = outlier_mask.sum()

return outlier_indices, outlier_mask

def check_duplicates(self, X, y=None, threshold=0.99):

"""

Detect duplicate samples

Args:

threshold: similarity threshold (1.0 = exact match)

"""

from sklearn.metrics.pairwise import cosine_similarity

if len(X) > 10000:

sample_idx = np.random.choice(len(X), 10000, replace=False)

X_sample = X[sample_idx]

else:

X_sample = X

sample_idx = np.arange(len(X))

sim_matrix = cosine_similarity(X_sample)

np.fill_diagonal(sim_matrix, 0)

duplicate_pairs = np.argwhere(sim_matrix >= threshold)

duplicate_pairs = duplicate_pairs[duplicate_pairs[:, 0] < duplicate_pairs[:, 1]]

print(f"Duplicate pairs found: {len(duplicate_pairs)}")

if y is not None and len(duplicate_pairs) > 0:

label_conflicts = 0

for i, j in duplicate_pairs:

if y[sample_idx[i]] != y[sample_idx[j]]:

label_conflicts += 1

print(f"Label-conflicting duplicate pairs: {label_conflicts}")

return duplicate_pairs

def compute_data_quality_score(self, X, y):

"""Compute overall data quality score"""

scores = {}

if hasattr(X, 'isnull'):

missing_rate = X.isnull().mean().mean()

else:

missing_rate = np.isnan(X).mean()

scores['completeness'] = 1 - missing_rate

from collections import Counter

counts = Counter(y)

n_classes = len(counts)

ideal_count = len(y) / n_classes

balance_score = sum(

min(c, ideal_count) / ideal_count

for c in counts.values()

) / n_classes

scores['balance'] = balance_score

overall_score = np.mean(list(scores.values()))

scores['overall'] = overall_score

print("Data quality scores:")

for metric, score in scores.items():

print(f" {metric}: {score:.3f}")

return scores

3. Labeling Strategies

3.1 High-Quality Labeling Guidelines

def compute_inter_rater_agreement(annotations):

"""

Compute inter-rater agreement

Args:

annotations: array of shape (n_samples, n_raters)

Returns:

cohen_kappa: Cohen's Kappa score

majority_labels: majority-vote labels

"""

from sklearn.metrics import cohen_kappa_score

n_samples, n_raters = annotations.shape

kappa_scores = []

for i in range(n_raters):

for j in range(i+1, n_raters):

kappa = cohen_kappa_score(

annotations[:, i],

annotations[:, j]

)

kappa_scores.append((i, j, kappa))

print(f"Rater {i} vs Rater {j}: kappa = {kappa:.3f}")

mean_kappa = np.mean([k for _, _, k in kappa_scores])

print(f"\nMean Cohen's Kappa: {mean_kappa:.3f}")

if mean_kappa < 0.2:

interpretation = "Slight agreement"

elif mean_kappa < 0.4:

interpretation = "Fair agreement"

elif mean_kappa < 0.6:

interpretation = "Moderate agreement"

elif mean_kappa < 0.8:

interpretation = "Substantial agreement"

else:

interpretation = "Almost perfect agreement"

print(f"Interpretation: {interpretation}")

from scipy import stats

majority_labels = stats.mode(annotations, axis=1)[0].flatten()

print(f"\nMajority-vote labels generated")

return mean_kappa, majority_labels

3.2 Weak Supervision with Snorkel

def snorkel_programmatic_labeling_demo():

"""

Programmatic labeling demo with Snorkel

pip install snorkel

"""

from snorkel.labeling import labeling_function, PandasLFApplier

from snorkel.labeling.model import LabelModel

POSITIVE = 1

NEGATIVE = 0

ABSTAIN = -1

@labeling_function()

def lf_positive_keywords(x):

"""Label based on positive keywords"""

positive_words = ['good', 'great', 'excellent', 'amazing', 'love', 'best']

if any(word in x.text.lower() for word in positive_words):

return POSITIVE

return ABSTAIN

@labeling_function()

def lf_negative_keywords(x):

"""Label based on negative keywords"""

negative_words = ['bad', 'terrible', 'awful', 'hate', 'worst', 'horrible']

if any(word in x.text.lower() for word in negative_words):

return NEGATIVE

return ABSTAIN

@labeling_function()

def lf_rating_high(x):

"""Label based on high rating"""

if hasattr(x, 'rating') and x.rating >= 4:

return POSITIVE

return ABSTAIN

@labeling_function()

def lf_rating_low(x):

"""Label based on low rating"""

if hasattr(x, 'rating') and x.rating <= 2:

return NEGATIVE

return ABSTAIN

@labeling_function()

def lf_negation_check(x):

"""Detect negation"""

text = x.text.lower()

if re.search(r"not (good|great|excellent)", text):

return NEGATIVE

if re.search(r"not (bad|terrible)", text):

return POSITIVE

return ABSTAIN

lfs = [

lf_positive_keywords,

lf_negative_keywords,

lf_rating_high,

lf_rating_low,

lf_negation_check,

]

print("Snorkel Programmatic Labeling Pipeline:")

print("1. Domain experts write labeling functions (LFs)")

print("2. Apply LFs to unlabeled data")

print("3. Combine multiple LFs with Label Model (noise-aware)")

print("4. Train downstream model on soft labels")

print(f"\nDefined labeling functions: {len(lfs)}")

return lfs

4. Active Learning

Active learning minimizes labeling cost by selecting the most informative samples from a large unlabeled pool.

class ActiveLearner:

"""

Active learning with multiple sampling strategies

"""

def __init__(self, model, strategy='uncertainty', n_initial=100):

self.model = model

self.strategy = strategy

self.n_initial = n_initial

def uncertainty_sampling(self, X_unlabeled, n_samples):

"""

Uncertainty sampling: select the samples the model is least confident about

"""

probs = self._get_probs(X_unlabeled)

if self.strategy == 'least_confidence':

uncertainty = 1 - probs.max(axis=1)

elif self.strategy == 'margin':

sorted_probs = np.sort(probs, axis=1)[:, ::-1]

uncertainty = 1 - (sorted_probs[:, 0] - sorted_probs[:, 1])

elif self.strategy == 'entropy':

uncertainty = -np.sum(probs * np.log(probs + 1e-10), axis=1)

else:

uncertainty = 1 - probs.max(axis=1)

selected_indices = np.argsort(uncertainty)[-n_samples:]

return selected_indices, uncertainty

def diversity_sampling(self, X_unlabeled, X_labeled, n_samples):

"""

Diversity-based sampling (CoreSet)

Select samples most dissimilar to already-labeled data

"""

from sklearn.metrics.pairwise import euclidean_distances

selected = []

remaining = list(range(len(X_unlabeled)))

current_labeled = X_labeled.copy()

for _ in range(n_samples):

dists = euclidean_distances(

X_unlabeled[remaining],

current_labeled

).min(axis=1)

best_idx = remaining[np.argmax(dists)]

selected.append(best_idx)

remaining.remove(best_idx)

current_labeled = np.vstack([current_labeled, X_unlabeled[best_idx]])

return np.array(selected)

def batch_mode_active_learning(self, X_pool, y_oracle, X_test, y_test,

n_iterations=10, n_per_iter=50):

"""

Batch-mode active learning loop

Args:

X_pool: unlabeled data pool

y_oracle: true labels (oracle)

n_per_iter: number of samples to label per iteration

"""

initial_indices = np.random.choice(

len(X_pool), self.n_initial, replace=False

)

labeled_indices = list(initial_indices)

unlabeled_indices = [

i for i in range(len(X_pool)) if i not in labeled_indices

]

accuracies = []

n_labeled_list = []

for iteration in range(n_iterations):

X_labeled = X_pool[labeled_indices]

y_labeled = y_oracle[labeled_indices]

self.model.fit(X_labeled, y_labeled)

acc = accuracy_score(y_test, self.model.predict(X_test))

accuracies.append(acc)

n_labeled_list.append(len(labeled_indices))

print(f"Iteration {iteration+1}: n_labeled={len(labeled_indices)}, accuracy={acc:.3f}")

if len(unlabeled_indices) == 0:

break

X_unlabeled = X_pool[unlabeled_indices]

selected, _ = self.uncertainty_sampling(X_unlabeled, n_per_iter)

actual_selected = [unlabeled_indices[i] for i in selected]

labeled_indices.extend(actual_selected)

unlabeled_indices = [

i for i in unlabeled_indices if i not in actual_selected

]

return accuracies, n_labeled_list

def _get_probs(self, X):

if hasattr(self.model, 'predict_proba'):

return self.model.predict_proba(X)

else:

logits = self.model.predict(X)

from scipy.special import softmax

return softmax(logits, axis=1)

5. Data Augmentation Deep Dive

5.1 Image Augmentation with Albumentations

from albumentations.pytorch import ToTensorV2

def get_train_transforms(image_size=224):

"""

Strong augmentation pipeline for training (Albumentations)

"""

return A.Compose([

Geometric transforms

A.RandomResizedCrop(

height=image_size,

width=image_size,

scale=(0.7, 1.0),

ratio=(0.75, 1.33)

),

A.HorizontalFlip(p=0.5),

A.ShiftScaleRotate(

shift_limit=0.1,

scale_limit=0.2,

rotate_limit=30,

p=0.5

),

Color transforms

A.ColorJitter(

brightness=0.3,

contrast=0.3,

saturation=0.3,

hue=0.1,

p=0.8

),

A.ToGray(p=0.1),

A.RandomGamma(gamma_limit=(80, 120), p=0.3),

Noise and blur

A.GaussNoise(var_limit=(10, 50), p=0.3),

A.OneOf([

A.MotionBlur(blur_limit=7),

A.GaussianBlur(blur_limit=7),

A.MedianBlur(blur_limit=7),

], p=0.3),

Cutout / random erasing

A.CoarseDropout(

max_holes=8,

max_height=32,

max_width=32,

fill_value=0,

p=0.3

),

Grid distortion

A.OneOf([

A.GridDistortion(p=1),

A.ElasticTransform(p=1),

A.OpticalDistortion(p=1),

], p=0.2),

Normalize and convert to tensor

A.Normalize(

mean=[0.485, 0.456, 0.406],

std=[0.229, 0.224, 0.225]

),

ToTensorV2(),

])

def mixup_augmentation(images, labels, alpha=0.4):

"""

MixUp: blend two images and their labels

Zhang et al., "mixup: Beyond Empirical Risk Minimization" (2018)

"""

batch_size = images.shape[0]

lam = np.random.beta(alpha, alpha)

perm = torch.randperm(batch_size)

mixed_images = lam * images + (1 - lam) * images[perm]

labels_a = labels

labels_b = labels[perm]

return mixed_images, labels_a, labels_b, lam

def cutmix_augmentation(images, labels, alpha=1.0):

"""

CutMix: paste a patch from one image onto another

Yun et al., "CutMix: Training Strategy that Makes Use of

Sample Mixing" (2019)

"""

batch_size, c, h, w = images.shape

lam = np.random.beta(alpha, alpha)

perm = torch.randperm(batch_size)

cut_ratio = np.sqrt(1 - lam)

cut_h = int(h * cut_ratio)

cut_w = int(w * cut_ratio)

cx = np.random.randint(w)

cy = np.random.randint(h)

bbx1 = np.clip(cx - cut_w // 2, 0, w)

bby1 = np.clip(cy - cut_h // 2, 0, h)

bbx2 = np.clip(cx + cut_w // 2, 0, w)

bby2 = np.clip(cy + cut_h // 2, 0, h)

mixed_images = images.clone()

mixed_images[:, :, bby1:bby2, bbx1:bbx2] = images[perm, :, bby1:bby2, bbx1:bbx2]

lam = 1 - ((bbx2 - bbx1) * (bby2 - bby1) / (w * h))

labels_a = labels

labels_b = labels[perm]

return mixed_images, labels_a, labels_b, lam

5.2 Text Augmentation

class TextAugmenter:

"""

Text data augmentation techniques

"""

def __init__(self):

pass

def eda_synonym_replacement(self, text, n=1):

"""

EDA: Synonym Replacement

Wei and Zou, "EDA: Easy Data Augmentation Techniques

for Boosting Performance on Text Classification Tasks" (2019)

"""

from nltk.corpus import wordnet

words = text.split()

new_words = words.copy()

stop_words = set(['a', 'an', 'the', 'is', 'are', 'was', 'were',

'i', 'me', 'my', 'we', 'our', 'you', 'your'])

replaceable = [

(i, word) for i, word in enumerate(words)

if word.lower() not in stop_words

]

np.random.shuffle(replaceable)

replaced = 0

for idx, word in replaceable:

if replaced >= n:

break

synsets = wordnet.synsets(word)

if synsets:

synonyms = [

lemma.name() for synset in synsets

for lemma in synset.lemmas()

if lemma.name() != word

]

if synonyms:

new_words[idx] = np.random.choice(synonyms).replace('_', ' ')

replaced += 1

return ' '.join(new_words)

def eda_random_swap(self, text, n=1):

"""EDA: Random Swap"""

words = text.split()

if len(words) < 2:

return text

new_words = words.copy()

for _ in range(n):

i, j = np.random.choice(len(new_words), 2, replace=False)

new_words[i], new_words[j] = new_words[j], new_words[i]

return ' '.join(new_words)

def eda_random_deletion(self, text, p=0.1):

"""EDA: Random Deletion"""

words = text.split()

if len(words) == 1:

return text

new_words = [word for word in words if np.random.random() > p]

return ' '.join(new_words) if new_words else np.random.choice(words)

def back_translation(self, text, src_lang='en', pivot_lang='fr'):

"""

Back-Translation: en -> fr -> en

Preserves meaning while diversifying expression

"""

try:

from transformers import pipeline

translator_fwd = pipeline(

f"translation_{src_lang}_to_{pivot_lang}",

model=f"Helsinki-NLP/opus-mt-{src_lang}-{pivot_lang}"

)

translator_bwd = pipeline(

f"translation_{pivot_lang}_to_{src_lang}",

model=f"Helsinki-NLP/opus-mt-{pivot_lang}-{src_lang}"

)

pivot_text = translator_fwd(text)[0]['translation_text']

back_translated = translator_bwd(pivot_text)[0]['translation_text']

return back_translated

except Exception as e:

print(f"Translation error: {e}")

return text

def augment_dataset(self, texts, labels, n_aug=4):

"""Augment entire dataset"""

augmented_texts = []

augmented_labels = []

for text, label in zip(texts, labels):

augmented_texts.append(text)

augmented_labels.append(label)

for _ in range(n_aug):

aug_type = np.random.choice(

['synonym', 'swap', 'deletion']

)

if aug_type == 'synonym':

aug_text = self.eda_synonym_replacement(text)

elif aug_type == 'swap':

aug_text = self.eda_random_swap(text)

else:

aug_text = self.eda_random_deletion(text)

augmented_texts.append(aug_text)

augmented_labels.append(label)

print(f"Original samples: {len(texts)}")

print(f"Augmented samples: {len(augmented_texts)}")

return augmented_texts, augmented_labels

5.3 Automatic Augmentation (RandAugment, SpecAugment)

def get_randaugment_transforms(n=2, m=9, image_size=224):

"""

RandAugment: randomized augmentation policy

Cubuk et al., "RandAugment: Practical Automated Data Augmentation" (2019)

Args:

n: number of augmentation operations to apply

m: magnitude of augmentation (0-30)

"""

transform = transforms.Compose([

transforms.RandomResizedCrop(image_size),

transforms.RandAugment(num_ops=n, magnitude=m),

transforms.ToTensor(),

transforms.Normalize(

mean=[0.485, 0.456, 0.406],

std=[0.229, 0.224, 0.225]

)

])

return transform

def specaugment_for_audio(spectrogram, freq_mask_param=27, time_mask_param=70):

"""

SpecAugment: audio spectrogram augmentation

Park et al., "SpecAugment: A Simple Data Augmentation Method

for Automatic Speech Recognition" (2019)

Args:

spectrogram: input spectrogram (freq, time)

freq_mask_param: max frequency mask size

time_mask_param: max time mask size

"""

freq_mask = T.FrequencyMasking(freq_mask_param=freq_mask_param)

time_mask = T.TimeMasking(time_mask_param=time_mask_param)

augmented = freq_mask(spectrogram)

augmented = time_mask(augmented)

return augmented

6. Synthetic Data Generation

6.1 Synthetic Text with LLMs

class SyntheticTextGenerator:

"""Generate synthetic training data using LLMs"""

def __init__(self, llm_client, model_name='gpt-4'):

self.llm = llm_client

self.model_name = model_name

def generate_classification_data(self, class_name, n_samples=100,

domain='general', style='diverse'):

"""

Generate synthetic data for a classification class

Args:

class_name: name of the target class

n_samples: number of samples to generate

domain: domain context (medical, legal, etc.)

style: writing style (formal, casual, diverse)

"""

prompt = f"""Generate {n_samples} diverse text examples for the class '{class_name}'.

Domain: {domain}

Style: {style}

Requirements:

- Each example should be 1-3 sentences

- Vary the vocabulary, sentence structure, and perspective

- Include both simple and complex cases

- Format as a JSON list: ["example1", "example2", ...]

Generate realistic examples that would appear in real-world {domain} data."""

print(f"Synthetic data generation prompt for '{class_name}':")

print(prompt[:300] + "...")

print(f"\nPlanning to generate {n_samples} samples")

def generate_edge_cases(self, class_examples, n_edge_cases=20):

"""Generate challenging edge cases"""

prompt = f"""Based on these training examples:

{chr(10).join(class_examples[:5])}

Generate {n_edge_cases} challenging edge cases that:

1. Are ambiguous between different categories

2. Contain misleading keywords

3. Have unusual sentence structures

4. Test the model's true understanding

Format as JSON list."""

print("Edge case generation prompt ready")

def augment_with_paraphrase(self, texts, n_paraphrases=3):

"""Generate paraphrases using LLM"""

augmented = []

for text in texts:

prompt = f"""Paraphrase the following text {n_paraphrases} times.

Keep the same meaning but use different words and sentence structures.

Original: "{text}"

Format as JSON list of {n_paraphrases} paraphrases."""

augmented.append({

'original': text,

'paraphrases': []

})

return augmented

class SyntheticImageGenerator:

"""Generate synthetic images with Diffusion Models"""

def __init__(self, model_name='stabilityai/stable-diffusion-2-1'):

self.model_name = model_name

def setup_pipeline(self):

"""

Initialize Stable Diffusion pipeline

pip install diffusers accelerate

"""

try:

from diffusers import StableDiffusionPipeline

self.pipe = StableDiffusionPipeline.from_pretrained(

self.model_name,

torch_dtype=torch.float16

)

if torch.cuda.is_available():

self.pipe = self.pipe.to('cuda')

print(f"Pipeline initialized: {self.model_name}")

except Exception as e:

print(f"Pipeline initialization error: {e}")

def generate_class_images(self, class_name, n_images=50,

style_prompt="high quality, photorealistic"):

"""

Generate synthetic images for a class

Args:

class_name: target class name

n_images: number of images to generate

style_prompt: style guidance

"""

prompts = [

f"A photo of {class_name}, {style_prompt}",

f"{class_name} in natural environment, {style_prompt}",

f"Close-up of {class_name}, detailed, {style_prompt}",

f"{class_name} from different angle, {style_prompt}",

]

print(f"Synthetic image generation plan for '{class_name}':")

print(f"Images to generate: {n_images}")

print("Sample prompts:")

for p in prompts[:2]:

print(f" - {p}")

def evaluate_synthetic_quality(self, real_images, synthetic_images):

"""Evaluate synthetic image quality using FID score"""

try:

from torchmetrics.image.fid import FrechetInceptionDistance

fid = FrechetInceptionDistance(feature=64)

fid.update(real_images, real=True)

fid.update(synthetic_images, real=False)

fid_score = fid.compute()

print(f"FID Score: {fid_score:.2f}")

print("(Lower is better; 0 is perfect)")

return fid_score

except Exception as e:

print(f"FID computation error: {e}")

7. The Data Flywheel

7.1 Data Flywheel Concept

The data flywheel is a virtuous cycle of product, data, and model:

1. **Better model** → Better product

2. **Better product** → More users

3. **More users** → More data

4. **More data** → Better model

class DataFlywheelPipeline:

"""Data Flywheel implementation pipeline"""

def __init__(self, model, feedback_store):

self.model = model

self.feedback_store = feedback_store

self.version = 0

def collect_production_feedback(self, predictions, user_feedback):

"""

Collect user feedback from production

Args:

predictions: model predictions

user_feedback: user corrections/confirmations

"""

valuable_samples = []

for pred, feedback in zip(predictions, user_feedback):

if feedback['corrected']:

sample = {

'input': feedback['input'],

'model_prediction': pred,

'true_label': feedback['correction'],

'confidence': pred['confidence'],

'timestamp': feedback['timestamp'],

'value': 'high'

}

valuable_samples.append(sample)

elif feedback['confirmed'] and pred['confidence'] < 0.7:

sample = {

'input': feedback['input'],

'true_label': pred['label'],

'confidence': pred['confidence'],

'value': 'medium'

}

valuable_samples.append(sample)

print(f"Valuable samples collected: {len(valuable_samples)}")

return valuable_samples

def prioritize_labeling_queue(self, unlabeled_pool, budget):

"""

Prioritize labeling queue

Priority criteria:

1. Model uncertainty (higher = more priority)

2. Class rarity (rarer classes = more priority)

3. Data diversity (more different from existing = more priority)

"""

priorities = []

for sample in unlabeled_pool:

score = 0

uncertainty = 1 - max(sample['predicted_probs'])

score += uncertainty * 0.5

predicted_class = max(sample['predicted_probs'],

key=sample['predicted_probs'].get)

rarity = 1 / (sample['class_counts'].get(predicted_class, 1) + 1)

score += rarity * 0.3

diversity = np.std(list(sample['predicted_probs'].values()))

score += diversity * 0.2

priorities.append((sample, score))

priorities.sort(key=lambda x: x[1], reverse=True)

selected = [s for s, _ in priorities[:budget]]

return selected

8. Data Pipeline Best Practices

8.1 Reproducible Data Processing

from pathlib import Path

from datetime import datetime

class ReproducibleDataPipeline:

"""

Reproducible data pipeline

- Tracks all processing steps

- Verifies integrity via data hashes

- Supports version control

"""

def __init__(self, pipeline_name, base_dir='data/processed'):

self.pipeline_name = pipeline_name

self.base_dir = Path(base_dir)

self.steps = []

self.metadata = {

'pipeline': pipeline_name,

'created_at': datetime.now().isoformat(),

'steps': []

}

def add_step(self, step_name, func, *args, **kwargs):

"""Add processing step"""

self.steps.append({

'name': step_name,

'func': func,

'args': args,

'kwargs': kwargs

})

def compute_hash(self, data):

"""Compute data hash"""

if isinstance(data, np.ndarray):

return hashlib.md5(data.tobytes()).hexdigest()

elif isinstance(data, (list, dict)):

return hashlib.md5(

json.dumps(data, sort_keys=True, default=str).encode()

).hexdigest()

else:

return hashlib.md5(str(data).encode()).hexdigest()

def run(self, input_data):

"""Execute pipeline"""

data = input_data

for step in self.steps:

print(f"Running: {step['name']}")

hash_before = self.compute_hash(data)

data = step['func'](data, *step['args'], **step['kwargs'])

hash_after = self.compute_hash(data)

self.metadata['steps'].append({

'name': step['name'],

'hash_before': hash_before,

'hash_after': hash_after,

'timestamp': datetime.now().isoformat()

})

print(f" Done: {hash_before[:8]} -> {hash_after[:8]}")

metadata_path = self.base_dir / f"{self.pipeline_name}_metadata.json"

metadata_path.parent.mkdir(parents=True, exist_ok=True)

with open(metadata_path, 'w') as f:

json.dump(self.metadata, f, indent=2)

print(f"\nPipeline complete. Metadata: {metadata_path}")

return data

class DataVersionControl:

"""DVC-style data version control"""

def __init__(self, storage_path='data/.dvc'):

self.storage_path = Path(storage_path)

self.storage_path.mkdir(parents=True, exist_ok=True)

def add(self, data_path):

"""Start tracking a data file"""

data_path = Path(data_path)

with open(data_path, 'rb') as f:

file_hash = hashlib.md5(f.read()).hexdigest()

dvc_file = data_path.with_suffix('.dvc')

dvc_metadata = {

'md5': file_hash,

'size': os.path.getsize(data_path),

'path': str(data_path.name),

'version': datetime.now().isoformat()

}

with open(dvc_file, 'w') as f:

json.dump(dvc_metadata, f, indent=2)

print(f"Tracking: {data_path}")

print(f" MD5: {file_hash}")

print(f" Metafile: {dvc_file}")

return file_hash

def create_data_contract(self, schema):

"""

Define a Data Contract

- Schema definition

- Quality criteria

- SLA requirements

"""

contract = {

'version': '1.0',

'schema': schema,

'quality_rules': {

'completeness': {'min_threshold': 0.99},

'accuracy': {'label_error_rate': {'max': 0.05}},

'consistency': {'duplicate_rate': {'max': 0.01}},

},

'sla': {

'update_frequency': 'daily',

'max_staleness_hours': 24,

}

}

return contract

def demonstrate_full_data_pipeline():

"""Full Data-Centric AI pipeline demonstration"""

print("=" * 60)

print("Data-Centric AI Pipeline Demo")

print("=" * 60)

print("\nStep 1: Data Quality Assessment")

print(" - Measure label error rate")

print(" - Detect outliers")

print(" - Remove duplicates")

print(" - Analyze class distribution")

print("\nStep 2: Label Refinement")

print(" - Detect errors with Cleanlab")

print(" - Correct via majority vote / expert review")

print(" - Improve inter-rater agreement")

print("\nStep 3: Data Augmentation")

print(" - Images: Albumentations")

print(" - Text: EDA, back-translation")

print(" - Search for automatic augmentation policies")

print("\nStep 4: Synthetic Data Generation")

print(" - Synthesize text with LLMs")

print(" - Synthesize images with Diffusion Models")

print(" - Filter by quality (FID, classifier confidence)")

print("\nStep 5: Active Learning")

print(" - Prioritize labeling with uncertainty sampling")

print(" - Ensure diversity with CoreSet method")

print("\nStep 6: Version Control and Monitoring")

print(" - Version data with DVC")

print(" - Maintain quality standards with Data Contracts")

print(" - Continuously improve with the Data Flywheel")

print("\nConclusion: Improving data quality is often more impactful than improving models!")

9. Summary and Practical Guide

Data-Centric AI Checklist

**1. Data Collection**

- Write labeling guidelines with domain experts

- Measure inter-rater agreement (target Cohen's Kappa > 0.8)

- Monitor class distribution during collection

**2. Data Cleaning**

- Detect and fix label errors with Cleanlab

- Remove duplicate samples

- Review outliers (remove or correct)

**3. Data Augmentation**

- Apply augmentation only to training data (not validation/test)

- Validate data distribution after augmentation

- Choose augmentation techniques appropriate for your domain

**4. Continuous Improvement**

- Collect production error cases

- Use active learning to make labeling efficient

- Conduct regular data quality audits

**Recommended Tools:**

- Label quality: Cleanlab (https://github.com/cleanlab/cleanlab)

- Weak supervision: Snorkel (https://snorkel.ai/)

- Labeling platform: Label Studio (https://labelstud.io/)

- Image augmentation: Albumentations (https://albumentations.ai/)

- Active learning: modAL (https://modal.readthedocs.io/)

- Data version control: DVC (https://dvc.org/)

Data-Centric AI is not merely a matter of tools or techniques. It is a **mindset shift** — from chasing "better models" to chasing "better data." In many real-world projects, this shift alone can deliver dramatic performance improvements.

The most important insight from Andrew Ng's Data-Centric AI movement: your model is only as good as the data it learns from. Investing in data quality, labeling consistency, and systematic data improvement is often the highest-return activity in an AI project.

현재 단락 (1/970)

In 2021, Andrew Ng posed a provocative question to the AI community: "Instead of focusing on improvi...

작성 글자: 0원문 글자: 30,310작성 단락: 0/970