Skip to content

Split View: 데이터 중심 AI(Data-Centric AI) 완전 가이드: 고품질 데이터로 AI 성능 극대화

|

데이터 중심 AI(Data-Centric AI) 완전 가이드: 고품질 데이터로 AI 성능 극대화

데이터 중심 AI(Data-Centric AI) 완전 가이드: 고품질 데이터로 AI 성능 극대화

2021년 Andrew Ng은 AI 커뮤니티에 도전적인 질문을 던졌습니다: "모델 아키텍처를 개선하는 데 집중하는 대신, 데이터 품질을 개선하는 데 집중하면 어떻게 될까?" 이것이 데이터 중심 AI(Data-Centric AI) 운동의 출발점입니다.

전통적인 모델 중심 접근법이 "더 좋은 알고리즘"을 추구한다면, 데이터 중심 접근법은 "더 좋은 데이터"를 추구합니다. 이 가이드는 데이터 중심 AI의 모든 측면을 실전 코드와 함께 완벽하게 다룹니다.

1. 데이터 중심 AI vs 모델 중심 AI

1.1 패러다임 전환

모델 중심 접근법 (Model-Centric AI)

  • 데이터는 고정, 코드를 개선
  • 더 나은 아키텍처 탐색
  • 하이퍼파라미터 튜닝에 집중
  • 기존 벤치마크: 데이터셋은 고정되고 모델만 변함

데이터 중심 접근법 (Data-Centric AI)

  • 모델은 고정, 데이터를 개선
  • 레이블 오류 수정
  • 일관된 레이블링 가이드라인 개선
  • 데이터 증강과 합성 데이터 추가

1.2 Andrew Ng의 핵심 주장

Andrew Ng은 다음과 같이 말합니다:

"AI 시스템 = 코드(모델/알고리즘) + 데이터"

많은 실용적인 AI 프로젝트에서 코드는 이미 충분히 좋습니다. 문제는 데이터 품질입니다.

실험 결과 (Andrew Ng, DeepLearning.AI):

결함이 있는 제조업 검사 데이터셋에서:

  • 기준 성능: 76.2%
  • 더 나은 모델만 사용: 0.02% 향상 (76.22%)
  • 더 나은 데이터만 사용: 16.9% 향상 (93.1%)

이 결과는 많은 실제 시나리오에서 데이터 개선이 모델 개선보다 훨씬 더 효과적임을 보여줍니다.

1.3 언제 데이터 중심 접근이 효과적인가?

데이터 중심 접근이 특히 효과적인 상황:

  1. 소규모 데이터셋: 데이터가 수천 개 이하일 때 품질이 더 중요
  2. 높은 레이블 노이즈: 레이블 오류율이 5% 이상일 때
  3. 도메인 특화 작업: 일반 모델이 없는 특수 도메인
  4. 불균형 클래스: 희귀 클래스의 품질이 성능을 결정
  5. 엄격한 정확도 요구: 의료, 금융 등 높은 정확도가 필요한 분야
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt

def compare_model_vs_data_centric(X, y, model_class, noise_level=0.1):
    """
    모델 중심 vs 데이터 중심 접근법 성능 비교

    Args:
        X: 특성 행렬
        y: 레이블
        model_class: 기본 모델 클래스
        noise_level: 레이블 노이즈 비율
    """
    # 레이블 노이즈 추가
    noisy_y = y.copy()
    noise_idx = np.random.choice(len(y), int(len(y) * noise_level), replace=False)
    n_classes = len(np.unique(y))
    for idx in noise_idx:
        wrong_labels = [l for l in range(n_classes) if l != y[idx]]
        noisy_y[idx] = np.random.choice(wrong_labels)

    X_train, X_test, y_train_clean, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    _, _, y_train_noisy, _ = train_test_split(
        X, noisy_y, test_size=0.2, random_state=42
    )

    # --- 모델 중심 접근 ---
    # 노이즈 데이터 + 기본 모델
    base_model = model_class()
    base_model.fit(X_train, y_train_noisy)
    base_acc = accuracy_score(y_test, base_model.predict(X_test))

    # 노이즈 데이터 + 더 복잡한 모델 (앙상블)
    from sklearn.ensemble import GradientBoostingClassifier
    complex_model = GradientBoostingClassifier(n_estimators=200)
    complex_model.fit(X_train, y_train_noisy)
    complex_acc = accuracy_score(y_test, complex_model.predict(X_test))

    # --- 데이터 중심 접근 ---
    # 클린 데이터 + 기본 모델
    clean_model = model_class()
    clean_model.fit(X_train, y_train_clean)
    clean_acc = accuracy_score(y_test, clean_model.predict(X_test))

    print("=" * 50)
    print("모델 중심 vs 데이터 중심 비교")
    print("=" * 50)
    print(f"기본 모델 + 노이즈 데이터: {base_acc:.3f}")
    print(f"복잡한 모델 + 노이즈 데이터: {complex_acc:.3f}")
    print(f"기본 모델 + 클린 데이터: {clean_acc:.3f}")
    print(f"\n모델 개선 효과: +{(complex_acc - base_acc):.3f}")
    print(f"데이터 개선 효과: +{(clean_acc - base_acc):.3f}")

    return {
        'base_model_noisy_data': base_acc,
        'complex_model_noisy_data': complex_acc,
        'base_model_clean_data': clean_acc
    }

2. 데이터 품질 측정

2.1 Confident Learning과 레이블 오류 탐지

Confident Learning은 Northcutt et al.이 제안한 방법으로, 교차 검증된 예측 확률을 사용하여 레이블 오류를 체계적으로 탐지합니다.

핵심 아이디어: "모델이 클래스 A라고 높은 확률로 예측하는데, 레이블이 클래스 B라면 레이블 오류일 가능성이 높다"

import cleanlab
from cleanlab.filter import find_label_issues
from cleanlab.classification import CleanLearning
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_predict

def detect_label_errors_cleanlab(X, y, model=None):
    """
    Cleanlab으로 레이블 오류 탐지

    Args:
        X: 특성 행렬
        y: 레이블 배열
        model: 분류 모델 (기본: LogisticRegression)

    Returns:
        label_issues: 레이블 오류 인덱스와 정보
    """
    if model is None:
        model = LogisticRegression(max_iter=1000)

    # 교차 검증으로 클래스 확률 예측
    pred_probs = cross_val_predict(
        model, X, y,
        cv=5,
        method='predict_proba'
    )

    # 레이블 오류 찾기
    label_issues = find_label_issues(
        labels=y,
        pred_probs=pred_probs,
        return_indices_ranked_by='self_confidence'
    )

    print(f"총 샘플 수: {len(y)}")
    print(f"발견된 레이블 오류: {len(label_issues)}")
    print(f"오류율: {len(label_issues)/len(y):.2%}")

    return label_issues


def cleanlab_full_pipeline(X_train, y_train_noisy, X_test, y_test):
    """
    Cleanlab 전체 파이프라인:
    1. 레이블 오류 탐지
    2. 오류 수정 또는 제거
    3. 정제된 데이터로 모델 재학습
    """
    from cleanlab.classification import CleanLearning

    base_model = LogisticRegression(max_iter=1000)

    # CleanLearning: 레이블 오류를 자동으로 처리하며 학습
    cl = CleanLearning(base_model, seed=42)
    cl.fit(X_train, y_train_noisy)

    # 평가
    y_pred = cl.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    print(f"CleanLearning 정확도: {accuracy:.3f}")

    # 레이블 이슈 정보 출력
    label_issues_df = cl.get_label_issues()
    print(f"\n레이블 이슈 정보:")
    print(label_issues_df.head(10))

    return cl, label_issues_df


def confident_learning_manual(pred_probs, labels):
    """
    Confident Learning 수동 구현
    - 클래스별 임계값 계산
    - Confident Joint 행렬 구축
    """
    n_classes = pred_probs.shape[1]
    n_samples = len(labels)

    # 클래스별 임계값: 해당 클래스 샘플들의 평균 예측 확률
    thresholds = np.zeros(n_classes)
    for c in range(n_classes):
        class_mask = labels == c
        if class_mask.sum() > 0:
            thresholds[c] = pred_probs[class_mask, c].mean()

    # Confident Joint 행렬 C[s][y]
    # s: 예측 클래스, y: 주어진 레이블
    C = np.zeros((n_classes, n_classes), dtype=int)

    for i in range(n_samples):
        y_given = labels[i]
        # 가장 높은 확률 클래스 (임계값 이상인 것 중)
        over_threshold = pred_probs[i] >= thresholds

        if over_threshold.sum() == 0:
            y_hat = pred_probs[i].argmax()
        else:
            y_hat = (pred_probs[i] * over_threshold).argmax()

        C[y_hat, y_given] += 1

    # 대각선이 아닌 요소가 많은 클래스 쌍이 오류 후보
    off_diagonal = C.copy()
    np.fill_diagonal(off_diagonal, 0)

    print("Confident Joint 행렬 (행: 추정 실제 클래스, 열: 주어진 레이블):")
    print(C)
    print(f"\n잠재적 오류 샘플 수: {off_diagonal.sum()}")

    return C

2.2 데이터 이상치 탐지

from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
import torch
import numpy as np

class DataQualityChecker:
    """
    종합적인 데이터 품질 검사 도구
    """

    def __init__(self):
        self.outlier_detector = None
        self.quality_report = {}

    def check_class_distribution(self, labels):
        """클래스 불균형 검사"""
        from collections import Counter
        import pandas as pd

        counts = Counter(labels)
        total = len(labels)

        df = pd.DataFrame([
            {'class': c, 'count': n, 'percentage': 100 * n / total}
            for c, n in sorted(counts.items())
        ])

        imbalance_ratio = max(counts.values()) / min(counts.values())

        print("클래스 분포:")
        print(df.to_string(index=False))
        print(f"\n불균형 비율: {imbalance_ratio:.2f}x")

        if imbalance_ratio > 10:
            print("경고: 심각한 클래스 불균형!")
        elif imbalance_ratio > 3:
            print("주의: 클래스 불균형 존재")

        self.quality_report['class_imbalance_ratio'] = imbalance_ratio
        return df

    def detect_outliers(self, X, method='isolation_forest', contamination=0.1):
        """
        이상치 탐지

        Args:
            method: 'isolation_forest' 또는 'lof'
            contamination: 예상 이상치 비율
        """
        if method == 'isolation_forest':
            detector = IsolationForest(
                contamination=contamination,
                random_state=42
            )
        elif method == 'lof':
            detector = LocalOutlierFactor(
                contamination=contamination,
                novelty=True
            )

        # 이상치 탐지 (-1: 이상치, 1: 정상)
        predictions = detector.fit_predict(X)
        outlier_mask = predictions == -1
        outlier_indices = np.where(outlier_mask)[0]

        print(f"탐지된 이상치: {outlier_mask.sum()} / {len(X)} ({outlier_mask.mean():.2%})")

        self.quality_report['n_outliers'] = outlier_mask.sum()
        return outlier_indices, outlier_mask

    def check_duplicates(self, X, y=None, threshold=0.99):
        """
        중복 샘플 탐지

        Args:
            threshold: 유사도 임계값 (1.0 = 완전 동일)
        """
        from sklearn.metrics.pairwise import cosine_similarity

        # 유사도 행렬 계산 (대규모 데이터에서는 샘플링)
        if len(X) > 10000:
            sample_idx = np.random.choice(len(X), 10000, replace=False)
            X_sample = X[sample_idx]
        else:
            X_sample = X
            sample_idx = np.arange(len(X))

        sim_matrix = cosine_similarity(X_sample)
        np.fill_diagonal(sim_matrix, 0)

        # 임계값 이상 유사한 쌍 찾기
        duplicate_pairs = np.argwhere(sim_matrix >= threshold)
        # 중복 제거 (i < j만 유지)
        duplicate_pairs = duplicate_pairs[duplicate_pairs[:, 0] < duplicate_pairs[:, 1]]

        print(f"중복 쌍 발견: {len(duplicate_pairs)}")

        if y is not None and len(duplicate_pairs) > 0:
            # 레이블이 다른 중복 (잠재적 오류)
            label_conflicts = 0
            for i, j in duplicate_pairs:
                if y[sample_idx[i]] != y[sample_idx[j]]:
                    label_conflicts += 1
            print(f"레이블 충돌 중복 쌍: {label_conflicts}")

        return duplicate_pairs

    def compute_data_quality_score(self, X, y):
        """종합 데이터 품질 점수 계산"""
        scores = {}

        # 결측값 비율
        if hasattr(X, 'isnull'):
            missing_rate = X.isnull().mean().mean()
        else:
            missing_rate = np.isnan(X).mean()
        scores['completeness'] = 1 - missing_rate

        # 클래스 균형
        from collections import Counter
        counts = Counter(y)
        n_classes = len(counts)
        ideal_count = len(y) / n_classes
        balance_score = sum(
            min(c, ideal_count) / ideal_count
            for c in counts.values()
        ) / n_classes
        scores['balance'] = balance_score

        # 최종 점수
        overall_score = np.mean(list(scores.values()))
        scores['overall'] = overall_score

        print("데이터 품질 점수:")
        for metric, score in scores.items():
            print(f"  {metric}: {score:.3f}")

        return scores

3. 레이블링 전략

3.1 고품질 레이블링 가이드라인

def compute_inter_rater_agreement(annotations):
    """
    레이블러 간 일치도 계산

    Args:
        annotations: shape (n_samples, n_raters)의 배열

    Returns:
        cohen_kappa: 코헨 카파 점수
        fleiss_kappa: 플라이스 카파 점수 (다수 레이블러)
    """
    from sklearn.metrics import cohen_kappa_score
    import numpy as np

    n_samples, n_raters = annotations.shape

    # 레이블러 쌍별 코헨 카파
    kappa_scores = []
    for i in range(n_raters):
        for j in range(i+1, n_raters):
            kappa = cohen_kappa_score(
                annotations[:, i],
                annotations[:, j]
            )
            kappa_scores.append((i, j, kappa))
            print(f"레이블러 {i} vs 레이블러 {j}: kappa = {kappa:.3f}")

    mean_kappa = np.mean([k for _, _, k in kappa_scores])
    print(f"\n평균 코헨 카파: {mean_kappa:.3f}")

    # 카파 해석
    if mean_kappa < 0.2:
        interpretation = "미약한 일치"
    elif mean_kappa < 0.4:
        interpretation = "보통 일치"
    elif mean_kappa < 0.6:
        interpretation = "적절한 일치"
    elif mean_kappa < 0.8:
        interpretation = "상당한 일치"
    else:
        interpretation = "거의 완벽한 일치"

    print(f"해석: {interpretation}")

    # 다수결 레이블 생성
    from scipy import stats
    majority_labels = stats.mode(annotations, axis=1)[0].flatten()
    print(f"\n다수결 레이블 생성 완료")

    return mean_kappa, majority_labels


def create_labeling_guidelines(task_name, examples):
    """
    레이블링 가이드라인 템플릿 생성

    좋은 레이블링 가이드라인의 요소:
    1. 명확한 정의와 경계
    2. 긍정/부정 예시
    3. 경계 케이스 처리 방법
    4. 일관성 체크리스트
    """
    guidelines = {
        'task': task_name,
        'categories': {},
        'edge_cases': [],
        'consistency_rules': []
    }

    print(f"'{task_name}' 레이블링 가이드라인 템플릿:")
    print("-" * 50)
    print("1. 각 클래스의 명확한 정의를 작성하세요")
    print("2. 최소 5개의 긍정 예시와 5개의 부정 예시를 포함하세요")
    print("3. 경계 케이스(Boundary Cases) 처리 방법을 명시하세요")
    print("4. 레이블러가 확신할 수 없을 때의 기본 규칙을 정의하세요")
    print("5. 품질 체크 기준을 포함하세요")

    return guidelines

3.2 약한 지도 학습 (Weak Supervision) with Snorkel

def snorkel_programmatic_labeling_demo():
    """
    Snorkel을 사용한 프로그래매틱 레이블링 데모
    pip install snorkel
    """
    from snorkel.labeling import labeling_function, PandasLFApplier
    from snorkel.labeling.model import LabelModel
    import pandas as pd
    import re

    # 감성 분류 예시
    POSITIVE = 1
    NEGATIVE = 0
    ABSTAIN = -1

    @labeling_function()
    def lf_positive_keywords(x):
        """긍정 키워드 기반 레이블링 함수"""
        positive_words = ['good', 'great', 'excellent', 'amazing', 'love', 'best']
        text = x.text.lower()
        if any(word in text for word in positive_words):
            return POSITIVE
        return ABSTAIN

    @labeling_function()
    def lf_negative_keywords(x):
        """부정 키워드 기반 레이블링 함수"""
        negative_words = ['bad', 'terrible', 'awful', 'hate', 'worst', 'horrible']
        text = x.text.lower()
        if any(word in text for word in negative_words):
            return NEGATIVE
        return ABSTAIN

    @labeling_function()
    def lf_rating_high(x):
        """높은 평점 기반 레이블링 함수"""
        if hasattr(x, 'rating') and x.rating >= 4:
            return POSITIVE
        return ABSTAIN

    @labeling_function()
    def lf_rating_low(x):
        """낮은 평점 기반 레이블링 함수"""
        if hasattr(x, 'rating') and x.rating <= 2:
            return NEGATIVE
        return ABSTAIN

    @labeling_function()
    def lf_negation_check(x):
        """부정 표현 감지 (NOT 연산자)"""
        text = x.text.lower()
        if re.search(r"not (good|great|excellent)", text):
            return NEGATIVE
        if re.search(r"not (bad|terrible)", text):
            return POSITIVE
        return ABSTAIN

    # 레이블링 함수 목록
    lfs = [
        lf_positive_keywords,
        lf_negative_keywords,
        lf_rating_high,
        lf_rating_low,
        lf_negation_check,
    ]

    # 데이터에 레이블링 함수 적용
    # df_train은 레이블 없는 텍스트 데이터프레임
    # applier = PandasLFApplier(lfs=lfs)
    # L_train = applier.apply(df_train)

    # 레이블 모델로 통합
    # label_model = LabelModel(cardinality=2, verbose=True)
    # label_model.fit(L_train=L_train, n_epochs=500, log_freq=100)

    # 확률적 레이블 생성
    # probs_train = label_model.predict_proba(L=L_train)

    print("Snorkel 프로그래매틱 레이블링 파이프라인:")
    print("1. 도메인 전문가가 레이블링 함수(LF) 작성")
    print("2. LF를 레이블 없는 데이터에 적용")
    print("3. 레이블 모델로 여러 LF 통합 (노이즈 제거)")
    print("4. 생성된 소프트 레이블로 다운스트림 모델 학습")
    print(f"\n정의된 레이블링 함수 수: {len(lfs)}")

    return lfs

4. 능동 학습 (Active Learning)

능동 학습은 레이블이 없는 대규모 데이터에서 가장 유익한 샘플을 선택하여 레이블링 비용을 최소화하는 방법입니다.

import numpy as np
from sklearn.base import BaseEstimator
import torch
import torch.nn as nn

class ActiveLearner:
    """
    능동 학습 구현
    여러 샘플 선택 전략 지원
    """

    def __init__(self, model, strategy='uncertainty', n_initial=100):
        self.model = model
        self.strategy = strategy
        self.n_initial = n_initial

    def uncertainty_sampling(self, X_unlabeled, n_samples):
        """
        불확실성 샘플링
        모델이 가장 확신하지 못하는 샘플 선택
        """
        # 예측 확률
        probs = self._get_probs(X_unlabeled)

        if self.strategy == 'least_confidence':
            # 가장 낮은 최대 확률
            uncertainty = 1 - probs.max(axis=1)

        elif self.strategy == 'margin':
            # 상위 두 클래스의 확률 차이
            sorted_probs = np.sort(probs, axis=1)[:, ::-1]
            uncertainty = 1 - (sorted_probs[:, 0] - sorted_probs[:, 1])

        elif self.strategy == 'entropy':
            # 엔트로피: 예측 분포의 불확실성
            uncertainty = -np.sum(probs * np.log(probs + 1e-10), axis=1)

        else:
            uncertainty = 1 - probs.max(axis=1)

        # 가장 불확실한 샘플 인덱스 반환
        selected_indices = np.argsort(uncertainty)[-n_samples:]
        return selected_indices, uncertainty

    def diversity_sampling(self, X_unlabeled, X_labeled, n_samples):
        """
        다양성 기반 샘플링 (CoreSet 방법)
        레이블된 데이터와 가장 다른 샘플 선택
        """
        from sklearn.metrics.pairwise import euclidean_distances

        # 레이블된 데이터와의 최소 거리
        distances = euclidean_distances(X_unlabeled, X_labeled)
        min_distances = distances.min(axis=1)

        # CoreSet: 가장 먼 점들을 순차적으로 선택
        selected = []
        remaining = list(range(len(X_unlabeled)))
        current_labeled = X_labeled.copy()

        for _ in range(n_samples):
            dists = euclidean_distances(
                X_unlabeled[remaining],
                current_labeled
            ).min(axis=1)

            # 가장 먼 점 선택
            best_idx = remaining[np.argmax(dists)]
            selected.append(best_idx)
            remaining.remove(best_idx)
            current_labeled = np.vstack([current_labeled, X_unlabeled[best_idx]])

        return np.array(selected)

    def batch_mode_active_learning(self, X_pool, y_oracle, X_test, y_test,
                                    n_iterations=10, n_per_iter=50):
        """
        배치 모드 능동 학습 루프

        Args:
            X_pool: 레이블 없는 데이터 풀
            y_oracle: 레이블러 (실제 레이블 소스)
            n_per_iter: 각 반복에서 레이블할 샘플 수
        """
        # 초기 레이블된 집합
        initial_indices = np.random.choice(
            len(X_pool), self.n_initial, replace=False
        )
        labeled_indices = list(initial_indices)
        unlabeled_indices = [
            i for i in range(len(X_pool)) if i not in labeled_indices
        ]

        accuracies = []
        n_labeled_list = []

        for iteration in range(n_iterations):
            X_labeled = X_pool[labeled_indices]
            y_labeled = y_oracle[labeled_indices]

            # 현재 레이블된 데이터로 모델 학습
            self.model.fit(X_labeled, y_labeled)

            # 테스트 정확도 평가
            acc = accuracy_score(y_test, self.model.predict(X_test))
            accuracies.append(acc)
            n_labeled_list.append(len(labeled_indices))

            print(f"반복 {iteration+1}: 레이블 수={len(labeled_indices)}, 정확도={acc:.3f}")

            if len(unlabeled_indices) == 0:
                break

            # 다음에 레이블할 샘플 선택
            X_unlabeled = X_pool[unlabeled_indices]
            selected, _ = self.uncertainty_sampling(X_unlabeled, n_per_iter)

            # 실제 인덱스로 변환
            actual_selected = [unlabeled_indices[i] for i in selected]

            # 레이블된 집합에 추가
            labeled_indices.extend(actual_selected)
            unlabeled_indices = [
                i for i in unlabeled_indices if i not in actual_selected
            ]

        return accuracies, n_labeled_list

    def _get_probs(self, X):
        """모델 예측 확률 반환"""
        if hasattr(self.model, 'predict_proba'):
            return self.model.predict_proba(X)
        else:
            logits = self.model.predict(X)
            from scipy.special import softmax
            return softmax(logits, axis=1)


def compare_active_learning_strategies(X, y, model, n_initial=50, n_budget=500):
    """여러 능동 학습 전략 비교"""
    strategies = ['least_confidence', 'margin', 'entropy']
    results = {}

    for strategy in strategies:
        learner = ActiveLearner(model, strategy=strategy, n_initial=n_initial)
        accs, n_labeled = learner.batch_mode_active_learning(
            X, y,
            X_test=X[:100],
            y_test=y[:100],
            n_iterations=10,
            n_per_iter=50
        )
        results[strategy] = {'accuracies': accs, 'n_labeled': n_labeled}

    # 랜덤 샘플링 기준
    random_learner = ActiveLearner(model, strategy='random', n_initial=n_initial)
    # (랜덤 전략 구현은 생략)

    return results

5. 데이터 증강 심층 가이드

5.1 이미지 증강: Albumentations

import albumentations as A
from albumentations.pytorch import ToTensorV2
import cv2
import numpy as np
from PIL import Image

def get_train_transforms(image_size=224):
    """
    학습용 강력한 증강 파이프라인 (Albumentations)
    """
    return A.Compose([
        # 기하학적 변환
        A.RandomResizedCrop(
            height=image_size,
            width=image_size,
            scale=(0.7, 1.0),
            ratio=(0.75, 1.33)
        ),
        A.HorizontalFlip(p=0.5),
        A.ShiftScaleRotate(
            shift_limit=0.1,
            scale_limit=0.2,
            rotate_limit=30,
            p=0.5
        ),

        # 색상 변환
        A.ColorJitter(
            brightness=0.3,
            contrast=0.3,
            saturation=0.3,
            hue=0.1,
            p=0.8
        ),
        A.ToGray(p=0.1),
        A.RandomGamma(gamma_limit=(80, 120), p=0.3),

        # 노이즈 및 블러
        A.GaussNoise(var_limit=(10, 50), p=0.3),
        A.OneOf([
            A.MotionBlur(blur_limit=7),
            A.GaussianBlur(blur_limit=7),
            A.MedianBlur(blur_limit=7),
        ], p=0.3),

        # 컷아웃 / 랜덤 지우기
        A.CoarseDropout(
            max_holes=8,
            max_height=32,
            max_width=32,
            fill_value=0,
            p=0.3
        ),

        # 그리드 왜곡
        A.OneOf([
            A.GridDistortion(p=1),
            A.ElasticTransform(p=1),
            A.OpticalDistortion(p=1),
        ], p=0.2),

        # 정규화 및 텐서 변환
        A.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        ),
        ToTensorV2(),
    ])


def get_val_transforms(image_size=224):
    """검증용 변환 (증강 없이 정규화만)"""
    return A.Compose([
        A.Resize(height=image_size, width=image_size),
        A.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        ),
        ToTensorV2(),
    ])


def mixup_augmentation(images, labels, alpha=0.4):
    """
    MixUp 증강: 두 이미지와 레이블을 혼합
    Zhang et al., "mixup: Beyond Empirical Risk Minimization" (2018)
    """
    import torch

    batch_size = images.shape[0]
    lam = np.random.beta(alpha, alpha)

    # 랜덤 순서로 섞기
    perm = torch.randperm(batch_size)

    # 이미지 혼합
    mixed_images = lam * images + (1 - lam) * images[perm]

    # 레이블 혼합 (soft labels)
    labels_a = labels
    labels_b = labels[perm]

    return mixed_images, labels_a, labels_b, lam


def cutmix_augmentation(images, labels, alpha=1.0):
    """
    CutMix 증강: 이미지 일부를 다른 이미지로 잘라 붙이기
    Yun et al., "CutMix: Training Strategy that Makes Use of
    Sample Mixing" (2019)
    """
    import torch

    batch_size, c, h, w = images.shape
    lam = np.random.beta(alpha, alpha)

    perm = torch.randperm(batch_size)

    # 랜덤 박스 생성
    cut_ratio = np.sqrt(1 - lam)
    cut_h = int(h * cut_ratio)
    cut_w = int(w * cut_ratio)

    cx = np.random.randint(w)
    cy = np.random.randint(h)

    bbx1 = np.clip(cx - cut_w // 2, 0, w)
    bby1 = np.clip(cy - cut_h // 2, 0, h)
    bbx2 = np.clip(cx + cut_w // 2, 0, w)
    bby2 = np.clip(cy + cut_h // 2, 0, h)

    # 박스 영역을 다른 이미지로 교체
    mixed_images = images.clone()
    mixed_images[:, :, bby1:bby2, bbx1:bbx2] = images[perm, :, bby1:bby2, bbx1:bbx2]

    # 실제 혼합 비율 조정
    lam = 1 - ((bbx2 - bbx1) * (bby2 - bby1) / (w * h))

    labels_a = labels
    labels_b = labels[perm]

    return mixed_images, labels_a, labels_b, lam

5.2 텍스트 증강

class TextAugmenter:
    """
    텍스트 데이터 증강 기법들
    """

    def __init__(self, language='en'):
        self.language = language

    def eda_synonym_replacement(self, text, n=1):
        """
        EDA (Easy Data Augmentation): 동의어 교체
        Wei and Zou, "EDA: Easy Data Augmentation Techniques
        for Boosting Performance on Text Classification Tasks" (2019)
        """
        import nltk
        from nltk.corpus import wordnet

        words = text.split()
        new_words = words.copy()

        # 불용어 제외
        stop_words = set(['a', 'an', 'the', 'is', 'are', 'was', 'were',
                          'i', 'me', 'my', 'we', 'our', 'you', 'your'])

        # 교체 가능한 단어 찾기
        replaceable = [
            (i, word) for i, word in enumerate(words)
            if word.lower() not in stop_words
        ]

        np.random.shuffle(replaceable)

        replaced = 0
        for idx, word in replaceable:
            if replaced >= n:
                break

            synsets = wordnet.synsets(word)
            if synsets:
                synonyms = [
                    lemma.name() for synset in synsets
                    for lemma in synset.lemmas()
                    if lemma.name() != word
                ]
                if synonyms:
                    new_words[idx] = np.random.choice(synonyms).replace('_', ' ')
                    replaced += 1

        return ' '.join(new_words)

    def eda_random_insertion(self, text, n=1):
        """EDA: 랜덤 위치에 동의어 삽입"""
        import nltk
        from nltk.corpus import wordnet

        words = text.split()
        new_words = words.copy()

        for _ in range(n):
            # 랜덤 단어의 동의어 찾기
            word = np.random.choice(words)
            synsets = wordnet.synsets(word)

            if synsets:
                synonyms = [
                    lemma.name() for synset in synsets
                    for lemma in synset.lemmas()
                ]
                if synonyms:
                    synonym = np.random.choice(synonyms).replace('_', ' ')
                    insert_pos = np.random.randint(0, len(new_words) + 1)
                    new_words.insert(insert_pos, synonym)

        return ' '.join(new_words)

    def eda_random_swap(self, text, n=1):
        """EDA: 랜덤 단어 교환"""
        words = text.split()
        if len(words) < 2:
            return text

        new_words = words.copy()
        for _ in range(n):
            i, j = np.random.choice(len(new_words), 2, replace=False)
            new_words[i], new_words[j] = new_words[j], new_words[i]

        return ' '.join(new_words)

    def eda_random_deletion(self, text, p=0.1):
        """EDA: 랜덤 단어 삭제"""
        words = text.split()
        if len(words) == 1:
            return text

        new_words = [
            word for word in words
            if np.random.random() > p
        ]

        return ' '.join(new_words) if new_words else np.random.choice(words)

    def back_translation(self, text, src_lang='en', pivot_lang='fr'):
        """
        역번역 증강: 영어 -> 프랑스어 -> 영어
        의미는 유지하면서 표현 다양화
        (실제 구현에서는 번역 API 또는 모델 사용)
        """
        # 예시: Helsinki-NLP/opus-mt 모델 사용
        try:
            from transformers import pipeline

            # 영어 -> 피벗 언어
            translator_fwd = pipeline(
                f"translation_{src_lang}_to_{pivot_lang}",
                model=f"Helsinki-NLP/opus-mt-{src_lang}-{pivot_lang}"
            )

            # 피벗 언어 -> 영어
            translator_bwd = pipeline(
                f"translation_{pivot_lang}_to_{src_lang}",
                model=f"Helsinki-NLP/opus-mt-{pivot_lang}-{src_lang}"
            )

            # 번역 수행
            pivot_text = translator_fwd(text)[0]['translation_text']
            back_translated = translator_bwd(pivot_text)[0]['translation_text']

            return back_translated

        except Exception as e:
            print(f"번역 오류: {e}")
            return text

    def augment_dataset(self, texts, labels, n_aug=4):
        """전체 데이터셋 증강"""
        augmented_texts = []
        augmented_labels = []

        for text, label in zip(texts, labels):
            augmented_texts.append(text)
            augmented_labels.append(label)

            for _ in range(n_aug):
                aug_type = np.random.choice(
                    ['synonym', 'insertion', 'swap', 'deletion']
                )

                if aug_type == 'synonym':
                    aug_text = self.eda_synonym_replacement(text)
                elif aug_type == 'insertion':
                    aug_text = self.eda_random_insertion(text)
                elif aug_type == 'swap':
                    aug_text = self.eda_random_swap(text)
                else:
                    aug_text = self.eda_random_deletion(text)

                augmented_texts.append(aug_text)
                augmented_labels.append(label)

        print(f"원본 샘플: {len(texts)}")
        print(f"증강 후 샘플: {len(augmented_texts)}")

        return augmented_texts, augmented_labels

5.3 자동 증강 (AutoAugment, RandAugment)

import torch
import torchvision.transforms as transforms

def get_randaugment_transforms(n=2, m=9, image_size=224):
    """
    RandAugment: 랜덤 증강 정책
    Cubuk et al., "RandAugment: Practical Automated Data Augmentation" (2019)

    Args:
        n: 적용할 증강 연산 수
        m: 증강 강도 (0-30)
    """
    # PyTorch 내장 RandAugment 사용 (torchvision >= 0.12)
    transform = transforms.Compose([
        transforms.RandomResizedCrop(image_size),
        transforms.RandAugment(num_ops=n, magnitude=m),
        transforms.ToTensor(),
        transforms.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        )
    ])

    return transform


def specaugment_for_audio(spectrogram, freq_mask_param=27, time_mask_param=70):
    """
    SpecAugment: 오디오 스펙트로그램 증강
    Park et al., "SpecAugment: A Simple Data Augmentation Method
    for Automatic Speech Recognition" (2019)

    Args:
        spectrogram: 입력 스펙트로그램 (freq, time)
        freq_mask_param: 주파수 마스크 최대 크기
        time_mask_param: 시간 마스크 최대 크기
    """
    import torch
    import torchaudio.transforms as T

    freq_mask = T.FrequencyMasking(freq_mask_param=freq_mask_param)
    time_mask = T.TimeMasking(time_mask_param=time_mask_param)

    # 주파수 마스킹
    augmented = freq_mask(spectrogram)
    # 시간 마스킹
    augmented = time_mask(augmented)

    return augmented

6. 합성 데이터 생성

6.1 LLM으로 합성 텍스트 생성

class SyntheticTextGenerator:
    """
    LLM을 사용한 합성 학습 데이터 생성
    """

    def __init__(self, llm_client, model_name='gpt-4'):
        self.llm = llm_client
        self.model_name = model_name

    def generate_classification_data(self, class_name, n_samples=100,
                                     domain='general', style='diverse'):
        """
        분류 작업을 위한 합성 데이터 생성

        Args:
            class_name: 생성할 클래스 이름
            n_samples: 생성할 샘플 수
            domain: 도메인 (medical, legal, etc.)
            style: 스타일 (formal, casual, diverse)
        """
        prompt = f"""Generate {n_samples} diverse text examples for the class '{class_name}'.
Domain: {domain}
Style: {style}

Requirements:
- Each example should be 1-3 sentences
- Vary the vocabulary, sentence structure, and perspective
- Include both simple and complex cases
- Format as a JSON list: ["example1", "example2", ...]

Generate realistic examples that would appear in real-world {domain} data."""

        # LLM 호출 (실제 구현에서는 API 사용)
        # response = self.llm.complete(prompt)
        # examples = json.loads(response)

        print(f"'{class_name}' 클래스 합성 데이터 생성 프롬프트:")
        print(prompt[:300] + "...")
        print(f"\n{n_samples}개 샘플 생성 예정")

    def generate_edge_cases(self, class_examples, n_edge_cases=20):
        """경계 케이스 합성 데이터 생성"""
        prompt = f"""Based on these training examples:
{chr(10).join(class_examples[:5])}

Generate {n_edge_cases} challenging edge cases that:
1. Are ambiguous between different categories
2. Contain misleading keywords
3. Have unusual sentence structures
4. Test the model's true understanding

Format as JSON list."""

        print("경계 케이스 생성 프롬프트 준비 완료")

    def augment_with_paraphrase(self, texts, n_paraphrases=3):
        """
        LLM으로 의역(paraphrase) 생성
        """
        augmented = []

        for text in texts:
            prompt = f"""Paraphrase the following text {n_paraphrases} times.
Keep the same meaning but use different words and sentence structures.

Original: "{text}"

Format as JSON list of {n_paraphrases} paraphrases."""

            augmented.append({
                'original': text,
                'paraphrases': []  # LLM 응답으로 채워짐
            })

        return augmented


class SyntheticImageGenerator:
    """
    Diffusion Model로 합성 이미지 생성
    """

    def __init__(self, model_name='stabilityai/stable-diffusion-2-1'):
        self.model_name = model_name

    def setup_pipeline(self):
        """
        Stable Diffusion 파이프라인 초기화
        pip install diffusers accelerate
        """
        try:
            from diffusers import StableDiffusionPipeline
            import torch

            self.pipe = StableDiffusionPipeline.from_pretrained(
                self.model_name,
                torch_dtype=torch.float16
            )

            if torch.cuda.is_available():
                self.pipe = self.pipe.to('cuda')

            print(f"파이프라인 초기화 완료: {self.model_name}")
        except Exception as e:
            print(f"파이프라인 초기화 오류: {e}")

    def generate_class_images(self, class_name, n_images=50,
                               style_prompt="high quality, photorealistic"):
        """
        특정 클래스의 합성 이미지 생성

        Args:
            class_name: 생성할 클래스
            n_images: 생성할 이미지 수
            style_prompt: 스타일 프롬프트
        """
        prompts = [
            f"A photo of {class_name}, {style_prompt}",
            f"{class_name} in natural environment, {style_prompt}",
            f"Close-up of {class_name}, detailed, {style_prompt}",
            f"{class_name} from different angle, {style_prompt}",
        ]

        print(f"'{class_name}' 합성 이미지 생성 계획:")
        print(f"생성할 이미지 수: {n_images}")
        print("사용할 프롬프트 예시:")
        for p in prompts[:2]:
            print(f"  - {p}")

        # 실제 생성 코드
        # images = []
        # for i in range(n_images):
        #     prompt = prompts[i % len(prompts)]
        #     image = self.pipe(prompt).images[0]
        #     images.append(image)
        # return images

    def evaluate_synthetic_quality(self, real_images, synthetic_images):
        """
        합성 이미지 품질 평가 (FID 스코어)
        """
        try:
            from torchmetrics.image.fid import FrechetInceptionDistance

            fid = FrechetInceptionDistance(feature=64)

            # 실제 이미지 추가
            fid.update(real_images, real=True)
            # 합성 이미지 추가
            fid.update(synthetic_images, real=False)

            fid_score = fid.compute()
            print(f"FID 스코어: {fid_score:.2f}")
            print("(낮을수록 좋음, 0이 완벽)")

            return fid_score
        except Exception as e:
            print(f"FID 계산 오류: {e}")

7. 데이터 플라이휠 (Data Flywheel)

7.1 데이터 플라이휠 개념

데이터 플라이휠은 제품-데이터-모델의 선순환 구조입니다:

  1. 더 나은 모델 → 더 나은 제품
  2. 더 나은 제품 → 더 많은 사용자
  3. 더 많은 사용자 → 더 많은 데이터
  4. 더 많은 데이터 → 더 나은 모델
class DataFlywheelPipeline:
    """
    데이터 플라이휠 구현 파이프라인
    """

    def __init__(self, model, feedback_store):
        self.model = model
        self.feedback_store = feedback_store
        self.version = 0

    def collect_production_feedback(self, predictions, user_feedback):
        """
        프로덕션에서 사용자 피드백 수집

        Args:
            predictions: 모델 예측값
            user_feedback: 사용자 교정/확인 데이터
        """
        valuable_samples = []

        for pred, feedback in zip(predictions, user_feedback):
            if feedback['corrected']:
                # 사용자가 수정한 샘플 (오류 케이스)
                sample = {
                    'input': feedback['input'],
                    'model_prediction': pred,
                    'true_label': feedback['correction'],
                    'confidence': pred['confidence'],
                    'timestamp': feedback['timestamp'],
                    'value': 'high'  # 오류 케이스는 가치 높음
                }
                valuable_samples.append(sample)

            elif feedback['confirmed'] and pred['confidence'] < 0.7:
                # 낮은 신뢰도지만 정확한 예측
                sample = {
                    'input': feedback['input'],
                    'true_label': pred['label'],
                    'confidence': pred['confidence'],
                    'value': 'medium'
                }
                valuable_samples.append(sample)

        print(f"수집된 가치 있는 샘플: {len(valuable_samples)}")
        return valuable_samples

    def prioritize_labeling_queue(self, unlabeled_pool, budget):
        """
        레이블링 우선순위 결정

        우선순위 기준:
        1. 모델 불확실성 (높을수록 우선)
        2. 클래스 희귀도 (희귀 클래스 우선)
        3. 데이터 다양성 (기존과 다른 샘플 우선)
        """
        priorities = []

        for sample in unlabeled_pool:
            score = 0

            # 불확실성 점수
            uncertainty = 1 - max(sample['predicted_probs'])
            score += uncertainty * 0.5

            # 클래스 희귀도 점수
            predicted_class = max(sample['predicted_probs'],
                                  key=sample['predicted_probs'].get)
            rarity = 1 / (sample['class_counts'].get(predicted_class, 1) + 1)
            score += rarity * 0.3

            # 다양성 점수 (간단한 근사)
            diversity = np.std(sample['predicted_probs'].values())
            score += diversity * 0.2

            priorities.append((sample, score))

        # 우선순위 정렬
        priorities.sort(key=lambda x: x[1], reverse=True)

        # 예산 내에서 선택
        selected = [s for s, _ in priorities[:budget]]

        return selected

    def continuous_model_improvement(self, new_data, evaluation_set):
        """
        지속적 모델 개선 루프
        """
        metrics_history = []

        while True:  # 실제로는 종료 조건 추가
            # 1. 새 데이터 수집
            new_samples = self.collect_production_feedback(
                # 실제 구현에서는 실시간 데이터 스트림
                predictions=[],
                user_feedback=[]
            )

            if len(new_samples) < 100:  # 최소 샘플 수
                continue

            # 2. 데이터 품질 검사
            checker = DataQualityChecker()
            # quality_scores = checker.compute_data_quality_score(...)

            # 3. 모델 재학습
            # self.model.finetune(new_samples)

            # 4. 평가 및 A/B 테스트
            # metrics = evaluate_model(self.model, evaluation_set)
            # metrics_history.append(metrics)

            self.version += 1
            print(f"모델 버전 {self.version} 학습 완료")

            break  # 데모용

        return metrics_history

8. 데이터 파이프라인 모범 사례

8.1 재현 가능한 데이터 처리

import hashlib
import json
import os
from pathlib import Path
from datetime import datetime

class ReproducibleDataPipeline:
    """
    재현 가능한 데이터 파이프라인
    - 모든 처리 단계 추적
    - 데이터 해시로 무결성 검증
    - 버전 관리 지원
    """

    def __init__(self, pipeline_name, base_dir='data/processed'):
        self.pipeline_name = pipeline_name
        self.base_dir = Path(base_dir)
        self.steps = []
        self.metadata = {
            'pipeline': pipeline_name,
            'created_at': datetime.now().isoformat(),
            'steps': []
        }

    def add_step(self, step_name, func, *args, **kwargs):
        """처리 단계 추가"""
        self.steps.append({
            'name': step_name,
            'func': func,
            'args': args,
            'kwargs': kwargs
        })

    def compute_hash(self, data):
        """데이터 해시 계산"""
        if isinstance(data, np.ndarray):
            return hashlib.md5(data.tobytes()).hexdigest()
        elif isinstance(data, (list, dict)):
            return hashlib.md5(
                json.dumps(data, sort_keys=True, default=str).encode()
            ).hexdigest()
        else:
            return hashlib.md5(str(data).encode()).hexdigest()

    def run(self, input_data):
        """파이프라인 실행"""
        data = input_data

        for step in self.steps:
            print(f"실행 중: {step['name']}")

            # 처리 전 해시
            hash_before = self.compute_hash(data)

            # 처리 실행
            data = step['func'](data, *step['args'], **step['kwargs'])

            # 처리 후 해시
            hash_after = self.compute_hash(data)

            # 메타데이터 기록
            self.metadata['steps'].append({
                'name': step['name'],
                'hash_before': hash_before,
                'hash_after': hash_after,
                'timestamp': datetime.now().isoformat()
            })

            print(f"  완료: {hash_before[:8]} -> {hash_after[:8]}")

        # 메타데이터 저장
        metadata_path = self.base_dir / f"{self.pipeline_name}_metadata.json"
        metadata_path.parent.mkdir(parents=True, exist_ok=True)
        with open(metadata_path, 'w') as f:
            json.dump(self.metadata, f, indent=2)

        print(f"\n파이프라인 완료. 메타데이터: {metadata_path}")
        return data


class DataVersionControl:
    """
    DVC 스타일 데이터 버전 관리
    (실제 사용에서는 dvc.org 사용 권장)
    """

    def __init__(self, storage_path='data/.dvc'):
        self.storage_path = Path(storage_path)
        self.storage_path.mkdir(parents=True, exist_ok=True)

    def add(self, data_path):
        """데이터 파일 추적 시작"""
        data_path = Path(data_path)

        # 해시 계산
        with open(data_path, 'rb') as f:
            file_hash = hashlib.md5(f.read()).hexdigest()

        # .dvc 메타파일 생성
        dvc_file = data_path.with_suffix('.dvc')
        dvc_metadata = {
            'md5': file_hash,
            'size': os.path.getsize(data_path),
            'path': str(data_path.name),
            'version': datetime.now().isoformat()
        }

        with open(dvc_file, 'w') as f:
            json.dump(dvc_metadata, f, indent=2)

        print(f"추적 시작: {data_path}")
        print(f"  MD5: {file_hash}")
        print(f"  메타파일: {dvc_file}")

        return file_hash

    def create_data_contract(self, schema):
        """
        데이터 계약(Data Contract) 정의
        - 스키마 정의
        - 품질 기준
        - SLA 요구사항
        """
        contract = {
            'version': '1.0',
            'schema': schema,
            'quality_rules': {
                'completeness': {'min_threshold': 0.99},
                'accuracy': {'label_error_rate': {'max': 0.05}},
                'consistency': {'duplicate_rate': {'max': 0.01}},
            },
            'sla': {
                'update_frequency': 'daily',
                'max_staleness_hours': 24,
            }
        }

        return contract


def demonstrate_full_data_pipeline():
    """
    데이터 중심 AI 전체 파이프라인 데모
    """
    print("=" * 60)
    print("데이터 중심 AI 파이프라인 데모")
    print("=" * 60)

    # 1. 데이터 품질 검사
    print("\n1단계: 데이터 품질 검사")
    print("  - 레이블 오류율 측정")
    print("  - 이상치 탐지")
    print("  - 중복 샘플 제거")
    print("  - 클래스 불균형 분석")

    # 2. 레이블 정제
    print("\n2단계: 레이블 정제")
    print("  - Cleanlab으로 오류 레이블 탐지")
    print("  - 다수결/전문가 검토로 수정")
    print("  - 레이블러 간 일치도 개선")

    # 3. 데이터 증강
    print("\n3단계: 데이터 증강")
    print("  - 이미지: Albumentations")
    print("  - 텍스트: EDA, 역번역")
    print("  - 자동 증강 정책 탐색")

    # 4. 합성 데이터
    print("\n4단계: 합성 데이터 생성")
    print("  - LLM으로 텍스트 합성")
    print("  - Diffusion Model로 이미지 합성")
    print("  - 품질 필터링 (FID, 분류기 신뢰도)")

    # 5. 능동 학습
    print("\n5단계: 능동 학습")
    print("  - 불확실성 샘플링으로 레이블링 우선순위 결정")
    print("  - 코어셋 방법으로 다양성 보장")

    # 6. 파이프라인 버전 관리
    print("\n6단계: 버전 관리 및 모니터링")
    print("  - DVC로 데이터 버전 관리")
    print("  - 데이터 계약으로 품질 기준 유지")
    print("  - 데이터 플라이휠로 지속 개선")

    print("\n결론: 데이터 품질 개선이 모델 개선보다 효과적일 때가 많습니다!")

9. 종합 요약과 실천 가이드

데이터 중심 AI 체크리스트

1. 데이터 수집 단계

  • 도메인 전문가의 레이블링 가이드라인 작성
  • 레이블러 간 일치도 측정 (Cohen's Kappa > 0.8 목표)
  • 클래스 분포 모니터링

2. 데이터 정제 단계

  • Cleanlab으로 레이블 오류 탐지 및 수정
  • 중복 샘플 제거
  • 이상치 검토 (제거 또는 수정)

3. 데이터 증강 단계

  • 훈련 데이터에만 증강 적용 (검증/테스트 제외)
  • 증강 후 데이터 분포 검증
  • 도메인에 적합한 증강 기법 선택

4. 지속적 개선 단계

  • 프로덕션 오류 케이스 수집
  • 능동 학습으로 레이블링 효율화
  • 정기적인 데이터 품질 감사

권장 도구:

데이터 중심 AI는 단순히 도구나 기법의 문제가 아닙니다. 이것은 "더 좋은 모델"이 아닌 "더 좋은 데이터"에 집중하는 사고방식의 전환입니다. 많은 실제 프로젝트에서 이 전환만으로도 극적인 성능 개선을 경험할 수 있습니다.

Data-Centric AI Complete Guide: Maximizing AI Performance with High-Quality Data

Data-Centric AI Complete Guide: Maximizing AI Performance with High-Quality Data

In 2021, Andrew Ng posed a provocative question to the AI community: "Instead of focusing on improving model architectures, what if we focused on improving data quality?" This became the starting point of the Data-Centric AI movement.

While the traditional model-centric approach chases "better algorithms," the data-centric approach chases "better data." This guide covers every aspect of Data-Centric AI with hands-on code, from theory to production practice.

1. Data-Centric AI vs Model-Centric AI

1.1 The Paradigm Shift

Model-Centric AI

  • Data is fixed; improve the code
  • Search for better architectures
  • Focus on hyperparameter tuning
  • Classic benchmarks: dataset is fixed, only models change

Data-Centric AI

  • Model is fixed; improve the data
  • Fix label errors
  • Improve labeling guidelines for consistency
  • Add data augmentation and synthetic data

1.2 Andrew Ng's Core Argument

Andrew Ng states:

"AI system = Code (model/algorithm) + Data"

In many practical AI projects, the code is already good enough. The bottleneck is data quality.

Experiment Results (Andrew Ng, DeepLearning.AI):

On a manufacturing inspection dataset with label noise:

  • Baseline: 76.2%
  • Better model only: +0.02% improvement (76.22%)
  • Better data only: +16.9% improvement (93.1%)

This result demonstrates that in many real-world scenarios, improving data quality is far more effective than improving model architecture.

1.3 When Is Data-Centric Most Effective?

Data-centric approaches are especially impactful when:

  1. Small datasets: Quality matters more when you have fewer than a few thousand examples
  2. High label noise: When the label error rate exceeds 5%
  3. Domain-specific tasks: Specialized domains without strong pre-trained models
  4. Imbalanced classes: Rare class quality determines overall performance
  5. Strict accuracy requirements: Medical, financial, or safety-critical applications
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

def compare_model_vs_data_centric(X, y, model_class, noise_level=0.1):
    """
    Compare model-centric vs data-centric approaches

    Args:
        X: feature matrix
        y: labels
        model_class: base model class
        noise_level: fraction of noisy labels
    """
    # Add label noise
    noisy_y = y.copy()
    noise_idx = np.random.choice(len(y), int(len(y) * noise_level), replace=False)
    n_classes = len(np.unique(y))
    for idx in noise_idx:
        wrong_labels = [l for l in range(n_classes) if l != y[idx]]
        noisy_y[idx] = np.random.choice(wrong_labels)

    X_train, X_test, y_train_clean, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    _, _, y_train_noisy, _ = train_test_split(
        X, noisy_y, test_size=0.2, random_state=42
    )

    # --- Model-centric approach ---
    # Noisy data + base model
    base_model = model_class()
    base_model.fit(X_train, y_train_noisy)
    base_acc = accuracy_score(y_test, base_model.predict(X_test))

    # Noisy data + more complex model
    from sklearn.ensemble import GradientBoostingClassifier
    complex_model = GradientBoostingClassifier(n_estimators=200)
    complex_model.fit(X_train, y_train_noisy)
    complex_acc = accuracy_score(y_test, complex_model.predict(X_test))

    # --- Data-centric approach ---
    # Clean data + base model
    clean_model = model_class()
    clean_model.fit(X_train, y_train_clean)
    clean_acc = accuracy_score(y_test, clean_model.predict(X_test))

    print("=" * 50)
    print("Model-Centric vs Data-Centric Comparison")
    print("=" * 50)
    print(f"Base model + noisy data: {base_acc:.3f}")
    print(f"Complex model + noisy data: {complex_acc:.3f}")
    print(f"Base model + clean data: {clean_acc:.3f}")
    print(f"\nModel improvement effect: +{(complex_acc - base_acc):.3f}")
    print(f"Data improvement effect: +{(clean_acc - base_acc):.3f}")

    return {
        'base_model_noisy_data': base_acc,
        'complex_model_noisy_data': complex_acc,
        'base_model_clean_data': clean_acc
    }

2. Data Quality Measurement

2.1 Confident Learning and Label Error Detection

Confident Learning, proposed by Northcutt et al., uses cross-validated prediction probabilities to systematically detect label errors.

Core idea: "If the model predicts class A with high confidence but the label says class B, the label is likely wrong."

import cleanlab
from cleanlab.filter import find_label_issues
from cleanlab.classification import CleanLearning
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_predict

def detect_label_errors_cleanlab(X, y, model=None):
    """
    Detect label errors using Cleanlab

    Args:
        X: feature matrix
        y: label array
        model: classifier (default: LogisticRegression)

    Returns:
        label_issues: indices and info about label issues
    """
    if model is None:
        model = LogisticRegression(max_iter=1000)

    # Predict class probabilities via cross-validation
    pred_probs = cross_val_predict(
        model, X, y,
        cv=5,
        method='predict_proba'
    )

    # Find label issues
    label_issues = find_label_issues(
        labels=y,
        pred_probs=pred_probs,
        return_indices_ranked_by='self_confidence'
    )

    print(f"Total samples: {len(y)}")
    print(f"Label issues found: {len(label_issues)}")
    print(f"Error rate: {len(label_issues)/len(y):.2%}")

    return label_issues


def cleanlab_full_pipeline(X_train, y_train_noisy, X_test, y_test):
    """
    Complete Cleanlab pipeline:
    1. Detect label errors
    2. Remove or correct errors
    3. Retrain on cleaned data
    """
    from cleanlab.classification import CleanLearning

    base_model = LogisticRegression(max_iter=1000)

    # CleanLearning automatically handles label errors during training
    cl = CleanLearning(base_model, seed=42)
    cl.fit(X_train, y_train_noisy)

    y_pred = cl.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    print(f"CleanLearning accuracy: {accuracy:.3f}")

    label_issues_df = cl.get_label_issues()
    print(f"\nLabel issues info:")
    print(label_issues_df.head(10))

    return cl, label_issues_df


def confident_learning_manual(pred_probs, labels):
    """
    Manual implementation of Confident Learning
    - Compute per-class thresholds
    - Build the Confident Joint matrix
    """
    n_classes = pred_probs.shape[1]
    n_samples = len(labels)

    # Per-class threshold: mean predicted probability for that class's samples
    thresholds = np.zeros(n_classes)
    for c in range(n_classes):
        class_mask = labels == c
        if class_mask.sum() > 0:
            thresholds[c] = pred_probs[class_mask, c].mean()

    # Confident Joint matrix C[s][y]
    # s: estimated true class, y: given label
    C = np.zeros((n_classes, n_classes), dtype=int)

    for i in range(n_samples):
        y_given = labels[i]
        over_threshold = pred_probs[i] >= thresholds

        if over_threshold.sum() == 0:
            y_hat = pred_probs[i].argmax()
        else:
            y_hat = (pred_probs[i] * over_threshold).argmax()

        C[y_hat, y_given] += 1

    off_diagonal = C.copy()
    np.fill_diagonal(off_diagonal, 0)

    print("Confident Joint Matrix (rows: estimated true class, cols: given label):")
    print(C)
    print(f"\nEstimated mislabeled samples: {off_diagonal.sum()}")

    return C

2.2 Data Outlier Detection

from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
import numpy as np

class DataQualityChecker:
    """Comprehensive data quality inspection toolkit"""

    def __init__(self):
        self.quality_report = {}

    def check_class_distribution(self, labels):
        """Check class imbalance"""
        from collections import Counter
        import pandas as pd

        counts = Counter(labels)
        total = len(labels)

        df = pd.DataFrame([
            {'class': c, 'count': n, 'percentage': 100 * n / total}
            for c, n in sorted(counts.items())
        ])

        imbalance_ratio = max(counts.values()) / min(counts.values())

        print("Class distribution:")
        print(df.to_string(index=False))
        print(f"\nImbalance ratio: {imbalance_ratio:.2f}x")

        if imbalance_ratio > 10:
            print("WARNING: Severe class imbalance!")
        elif imbalance_ratio > 3:
            print("NOTICE: Class imbalance detected")

        self.quality_report['class_imbalance_ratio'] = imbalance_ratio
        return df

    def detect_outliers(self, X, method='isolation_forest', contamination=0.1):
        """
        Outlier detection

        Args:
            method: 'isolation_forest' or 'lof'
            contamination: expected fraction of outliers
        """
        if method == 'isolation_forest':
            detector = IsolationForest(
                contamination=contamination,
                random_state=42
            )
        elif method == 'lof':
            detector = LocalOutlierFactor(
                contamination=contamination,
                novelty=True
            )

        predictions = detector.fit_predict(X)
        outlier_mask = predictions == -1
        outlier_indices = np.where(outlier_mask)[0]

        print(f"Outliers detected: {outlier_mask.sum()} / {len(X)} ({outlier_mask.mean():.2%})")

        self.quality_report['n_outliers'] = outlier_mask.sum()
        return outlier_indices, outlier_mask

    def check_duplicates(self, X, y=None, threshold=0.99):
        """
        Detect duplicate samples

        Args:
            threshold: similarity threshold (1.0 = exact match)
        """
        from sklearn.metrics.pairwise import cosine_similarity

        if len(X) > 10000:
            sample_idx = np.random.choice(len(X), 10000, replace=False)
            X_sample = X[sample_idx]
        else:
            X_sample = X
            sample_idx = np.arange(len(X))

        sim_matrix = cosine_similarity(X_sample)
        np.fill_diagonal(sim_matrix, 0)

        duplicate_pairs = np.argwhere(sim_matrix >= threshold)
        duplicate_pairs = duplicate_pairs[duplicate_pairs[:, 0] < duplicate_pairs[:, 1]]

        print(f"Duplicate pairs found: {len(duplicate_pairs)}")

        if y is not None and len(duplicate_pairs) > 0:
            label_conflicts = 0
            for i, j in duplicate_pairs:
                if y[sample_idx[i]] != y[sample_idx[j]]:
                    label_conflicts += 1
            print(f"Label-conflicting duplicate pairs: {label_conflicts}")

        return duplicate_pairs

    def compute_data_quality_score(self, X, y):
        """Compute overall data quality score"""
        scores = {}

        if hasattr(X, 'isnull'):
            missing_rate = X.isnull().mean().mean()
        else:
            missing_rate = np.isnan(X).mean()
        scores['completeness'] = 1 - missing_rate

        from collections import Counter
        counts = Counter(y)
        n_classes = len(counts)
        ideal_count = len(y) / n_classes
        balance_score = sum(
            min(c, ideal_count) / ideal_count
            for c in counts.values()
        ) / n_classes
        scores['balance'] = balance_score

        overall_score = np.mean(list(scores.values()))
        scores['overall'] = overall_score

        print("Data quality scores:")
        for metric, score in scores.items():
            print(f"  {metric}: {score:.3f}")

        return scores

3. Labeling Strategies

3.1 High-Quality Labeling Guidelines

def compute_inter_rater_agreement(annotations):
    """
    Compute inter-rater agreement

    Args:
        annotations: array of shape (n_samples, n_raters)

    Returns:
        cohen_kappa: Cohen's Kappa score
        majority_labels: majority-vote labels
    """
    from sklearn.metrics import cohen_kappa_score
    import numpy as np

    n_samples, n_raters = annotations.shape

    kappa_scores = []
    for i in range(n_raters):
        for j in range(i+1, n_raters):
            kappa = cohen_kappa_score(
                annotations[:, i],
                annotations[:, j]
            )
            kappa_scores.append((i, j, kappa))
            print(f"Rater {i} vs Rater {j}: kappa = {kappa:.3f}")

    mean_kappa = np.mean([k for _, _, k in kappa_scores])
    print(f"\nMean Cohen's Kappa: {mean_kappa:.3f}")

    if mean_kappa < 0.2:
        interpretation = "Slight agreement"
    elif mean_kappa < 0.4:
        interpretation = "Fair agreement"
    elif mean_kappa < 0.6:
        interpretation = "Moderate agreement"
    elif mean_kappa < 0.8:
        interpretation = "Substantial agreement"
    else:
        interpretation = "Almost perfect agreement"

    print(f"Interpretation: {interpretation}")

    from scipy import stats
    majority_labels = stats.mode(annotations, axis=1)[0].flatten()
    print(f"\nMajority-vote labels generated")

    return mean_kappa, majority_labels

3.2 Weak Supervision with Snorkel

def snorkel_programmatic_labeling_demo():
    """
    Programmatic labeling demo with Snorkel
    pip install snorkel
    """
    from snorkel.labeling import labeling_function, PandasLFApplier
    from snorkel.labeling.model import LabelModel
    import re

    POSITIVE = 1
    NEGATIVE = 0
    ABSTAIN = -1

    @labeling_function()
    def lf_positive_keywords(x):
        """Label based on positive keywords"""
        positive_words = ['good', 'great', 'excellent', 'amazing', 'love', 'best']
        if any(word in x.text.lower() for word in positive_words):
            return POSITIVE
        return ABSTAIN

    @labeling_function()
    def lf_negative_keywords(x):
        """Label based on negative keywords"""
        negative_words = ['bad', 'terrible', 'awful', 'hate', 'worst', 'horrible']
        if any(word in x.text.lower() for word in negative_words):
            return NEGATIVE
        return ABSTAIN

    @labeling_function()
    def lf_rating_high(x):
        """Label based on high rating"""
        if hasattr(x, 'rating') and x.rating >= 4:
            return POSITIVE
        return ABSTAIN

    @labeling_function()
    def lf_rating_low(x):
        """Label based on low rating"""
        if hasattr(x, 'rating') and x.rating <= 2:
            return NEGATIVE
        return ABSTAIN

    @labeling_function()
    def lf_negation_check(x):
        """Detect negation"""
        text = x.text.lower()
        if re.search(r"not (good|great|excellent)", text):
            return NEGATIVE
        if re.search(r"not (bad|terrible)", text):
            return POSITIVE
        return ABSTAIN

    lfs = [
        lf_positive_keywords,
        lf_negative_keywords,
        lf_rating_high,
        lf_rating_low,
        lf_negation_check,
    ]

    print("Snorkel Programmatic Labeling Pipeline:")
    print("1. Domain experts write labeling functions (LFs)")
    print("2. Apply LFs to unlabeled data")
    print("3. Combine multiple LFs with Label Model (noise-aware)")
    print("4. Train downstream model on soft labels")
    print(f"\nDefined labeling functions: {len(lfs)}")

    return lfs

4. Active Learning

Active learning minimizes labeling cost by selecting the most informative samples from a large unlabeled pool.

import numpy as np
import torch
import torch.nn as nn

class ActiveLearner:
    """
    Active learning with multiple sampling strategies
    """

    def __init__(self, model, strategy='uncertainty', n_initial=100):
        self.model = model
        self.strategy = strategy
        self.n_initial = n_initial

    def uncertainty_sampling(self, X_unlabeled, n_samples):
        """
        Uncertainty sampling: select the samples the model is least confident about
        """
        probs = self._get_probs(X_unlabeled)

        if self.strategy == 'least_confidence':
            uncertainty = 1 - probs.max(axis=1)

        elif self.strategy == 'margin':
            sorted_probs = np.sort(probs, axis=1)[:, ::-1]
            uncertainty = 1 - (sorted_probs[:, 0] - sorted_probs[:, 1])

        elif self.strategy == 'entropy':
            uncertainty = -np.sum(probs * np.log(probs + 1e-10), axis=1)

        else:
            uncertainty = 1 - probs.max(axis=1)

        selected_indices = np.argsort(uncertainty)[-n_samples:]
        return selected_indices, uncertainty

    def diversity_sampling(self, X_unlabeled, X_labeled, n_samples):
        """
        Diversity-based sampling (CoreSet)
        Select samples most dissimilar to already-labeled data
        """
        from sklearn.metrics.pairwise import euclidean_distances

        selected = []
        remaining = list(range(len(X_unlabeled)))
        current_labeled = X_labeled.copy()

        for _ in range(n_samples):
            dists = euclidean_distances(
                X_unlabeled[remaining],
                current_labeled
            ).min(axis=1)

            best_idx = remaining[np.argmax(dists)]
            selected.append(best_idx)
            remaining.remove(best_idx)
            current_labeled = np.vstack([current_labeled, X_unlabeled[best_idx]])

        return np.array(selected)

    def batch_mode_active_learning(self, X_pool, y_oracle, X_test, y_test,
                                    n_iterations=10, n_per_iter=50):
        """
        Batch-mode active learning loop

        Args:
            X_pool: unlabeled data pool
            y_oracle: true labels (oracle)
            n_per_iter: number of samples to label per iteration
        """
        initial_indices = np.random.choice(
            len(X_pool), self.n_initial, replace=False
        )
        labeled_indices = list(initial_indices)
        unlabeled_indices = [
            i for i in range(len(X_pool)) if i not in labeled_indices
        ]

        accuracies = []
        n_labeled_list = []

        for iteration in range(n_iterations):
            X_labeled = X_pool[labeled_indices]
            y_labeled = y_oracle[labeled_indices]

            self.model.fit(X_labeled, y_labeled)

            acc = accuracy_score(y_test, self.model.predict(X_test))
            accuracies.append(acc)
            n_labeled_list.append(len(labeled_indices))

            print(f"Iteration {iteration+1}: n_labeled={len(labeled_indices)}, accuracy={acc:.3f}")

            if len(unlabeled_indices) == 0:
                break

            X_unlabeled = X_pool[unlabeled_indices]
            selected, _ = self.uncertainty_sampling(X_unlabeled, n_per_iter)

            actual_selected = [unlabeled_indices[i] for i in selected]
            labeled_indices.extend(actual_selected)
            unlabeled_indices = [
                i for i in unlabeled_indices if i not in actual_selected
            ]

        return accuracies, n_labeled_list

    def _get_probs(self, X):
        if hasattr(self.model, 'predict_proba'):
            return self.model.predict_proba(X)
        else:
            logits = self.model.predict(X)
            from scipy.special import softmax
            return softmax(logits, axis=1)

5. Data Augmentation Deep Dive

5.1 Image Augmentation with Albumentations

import albumentations as A
from albumentations.pytorch import ToTensorV2
import numpy as np

def get_train_transforms(image_size=224):
    """
    Strong augmentation pipeline for training (Albumentations)
    """
    return A.Compose([
        # Geometric transforms
        A.RandomResizedCrop(
            height=image_size,
            width=image_size,
            scale=(0.7, 1.0),
            ratio=(0.75, 1.33)
        ),
        A.HorizontalFlip(p=0.5),
        A.ShiftScaleRotate(
            shift_limit=0.1,
            scale_limit=0.2,
            rotate_limit=30,
            p=0.5
        ),

        # Color transforms
        A.ColorJitter(
            brightness=0.3,
            contrast=0.3,
            saturation=0.3,
            hue=0.1,
            p=0.8
        ),
        A.ToGray(p=0.1),
        A.RandomGamma(gamma_limit=(80, 120), p=0.3),

        # Noise and blur
        A.GaussNoise(var_limit=(10, 50), p=0.3),
        A.OneOf([
            A.MotionBlur(blur_limit=7),
            A.GaussianBlur(blur_limit=7),
            A.MedianBlur(blur_limit=7),
        ], p=0.3),

        # Cutout / random erasing
        A.CoarseDropout(
            max_holes=8,
            max_height=32,
            max_width=32,
            fill_value=0,
            p=0.3
        ),

        # Grid distortion
        A.OneOf([
            A.GridDistortion(p=1),
            A.ElasticTransform(p=1),
            A.OpticalDistortion(p=1),
        ], p=0.2),

        # Normalize and convert to tensor
        A.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        ),
        ToTensorV2(),
    ])


def mixup_augmentation(images, labels, alpha=0.4):
    """
    MixUp: blend two images and their labels
    Zhang et al., "mixup: Beyond Empirical Risk Minimization" (2018)
    """
    import torch

    batch_size = images.shape[0]
    lam = np.random.beta(alpha, alpha)
    perm = torch.randperm(batch_size)

    mixed_images = lam * images + (1 - lam) * images[perm]
    labels_a = labels
    labels_b = labels[perm]

    return mixed_images, labels_a, labels_b, lam


def cutmix_augmentation(images, labels, alpha=1.0):
    """
    CutMix: paste a patch from one image onto another
    Yun et al., "CutMix: Training Strategy that Makes Use of
    Sample Mixing" (2019)
    """
    import torch

    batch_size, c, h, w = images.shape
    lam = np.random.beta(alpha, alpha)
    perm = torch.randperm(batch_size)

    cut_ratio = np.sqrt(1 - lam)
    cut_h = int(h * cut_ratio)
    cut_w = int(w * cut_ratio)

    cx = np.random.randint(w)
    cy = np.random.randint(h)

    bbx1 = np.clip(cx - cut_w // 2, 0, w)
    bby1 = np.clip(cy - cut_h // 2, 0, h)
    bbx2 = np.clip(cx + cut_w // 2, 0, w)
    bby2 = np.clip(cy + cut_h // 2, 0, h)

    mixed_images = images.clone()
    mixed_images[:, :, bby1:bby2, bbx1:bbx2] = images[perm, :, bby1:bby2, bbx1:bbx2]

    lam = 1 - ((bbx2 - bbx1) * (bby2 - bby1) / (w * h))
    labels_a = labels
    labels_b = labels[perm]

    return mixed_images, labels_a, labels_b, lam

5.2 Text Augmentation

class TextAugmenter:
    """
    Text data augmentation techniques
    """

    def __init__(self):
        pass

    def eda_synonym_replacement(self, text, n=1):
        """
        EDA: Synonym Replacement
        Wei and Zou, "EDA: Easy Data Augmentation Techniques
        for Boosting Performance on Text Classification Tasks" (2019)
        """
        import nltk
        from nltk.corpus import wordnet

        words = text.split()
        new_words = words.copy()

        stop_words = set(['a', 'an', 'the', 'is', 'are', 'was', 'were',
                          'i', 'me', 'my', 'we', 'our', 'you', 'your'])

        replaceable = [
            (i, word) for i, word in enumerate(words)
            if word.lower() not in stop_words
        ]

        np.random.shuffle(replaceable)
        replaced = 0

        for idx, word in replaceable:
            if replaced >= n:
                break

            synsets = wordnet.synsets(word)
            if synsets:
                synonyms = [
                    lemma.name() for synset in synsets
                    for lemma in synset.lemmas()
                    if lemma.name() != word
                ]
                if synonyms:
                    new_words[idx] = np.random.choice(synonyms).replace('_', ' ')
                    replaced += 1

        return ' '.join(new_words)

    def eda_random_swap(self, text, n=1):
        """EDA: Random Swap"""
        words = text.split()
        if len(words) < 2:
            return text

        new_words = words.copy()
        for _ in range(n):
            i, j = np.random.choice(len(new_words), 2, replace=False)
            new_words[i], new_words[j] = new_words[j], new_words[i]

        return ' '.join(new_words)

    def eda_random_deletion(self, text, p=0.1):
        """EDA: Random Deletion"""
        words = text.split()
        if len(words) == 1:
            return text

        new_words = [word for word in words if np.random.random() > p]
        return ' '.join(new_words) if new_words else np.random.choice(words)

    def back_translation(self, text, src_lang='en', pivot_lang='fr'):
        """
        Back-Translation: en -> fr -> en
        Preserves meaning while diversifying expression
        """
        try:
            from transformers import pipeline

            translator_fwd = pipeline(
                f"translation_{src_lang}_to_{pivot_lang}",
                model=f"Helsinki-NLP/opus-mt-{src_lang}-{pivot_lang}"
            )

            translator_bwd = pipeline(
                f"translation_{pivot_lang}_to_{src_lang}",
                model=f"Helsinki-NLP/opus-mt-{pivot_lang}-{src_lang}"
            )

            pivot_text = translator_fwd(text)[0]['translation_text']
            back_translated = translator_bwd(pivot_text)[0]['translation_text']

            return back_translated

        except Exception as e:
            print(f"Translation error: {e}")
            return text

    def augment_dataset(self, texts, labels, n_aug=4):
        """Augment entire dataset"""
        augmented_texts = []
        augmented_labels = []

        for text, label in zip(texts, labels):
            augmented_texts.append(text)
            augmented_labels.append(label)

            for _ in range(n_aug):
                aug_type = np.random.choice(
                    ['synonym', 'swap', 'deletion']
                )

                if aug_type == 'synonym':
                    aug_text = self.eda_synonym_replacement(text)
                elif aug_type == 'swap':
                    aug_text = self.eda_random_swap(text)
                else:
                    aug_text = self.eda_random_deletion(text)

                augmented_texts.append(aug_text)
                augmented_labels.append(label)

        print(f"Original samples: {len(texts)}")
        print(f"Augmented samples: {len(augmented_texts)}")

        return augmented_texts, augmented_labels

5.3 Automatic Augmentation (RandAugment, SpecAugment)

import torch
import torchvision.transforms as transforms

def get_randaugment_transforms(n=2, m=9, image_size=224):
    """
    RandAugment: randomized augmentation policy
    Cubuk et al., "RandAugment: Practical Automated Data Augmentation" (2019)

    Args:
        n: number of augmentation operations to apply
        m: magnitude of augmentation (0-30)
    """
    transform = transforms.Compose([
        transforms.RandomResizedCrop(image_size),
        transforms.RandAugment(num_ops=n, magnitude=m),
        transforms.ToTensor(),
        transforms.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        )
    ])

    return transform


def specaugment_for_audio(spectrogram, freq_mask_param=27, time_mask_param=70):
    """
    SpecAugment: audio spectrogram augmentation
    Park et al., "SpecAugment: A Simple Data Augmentation Method
    for Automatic Speech Recognition" (2019)

    Args:
        spectrogram: input spectrogram (freq, time)
        freq_mask_param: max frequency mask size
        time_mask_param: max time mask size
    """
    import torchaudio.transforms as T

    freq_mask = T.FrequencyMasking(freq_mask_param=freq_mask_param)
    time_mask = T.TimeMasking(time_mask_param=time_mask_param)

    augmented = freq_mask(spectrogram)
    augmented = time_mask(augmented)

    return augmented

6. Synthetic Data Generation

6.1 Synthetic Text with LLMs

class SyntheticTextGenerator:
    """Generate synthetic training data using LLMs"""

    def __init__(self, llm_client, model_name='gpt-4'):
        self.llm = llm_client
        self.model_name = model_name

    def generate_classification_data(self, class_name, n_samples=100,
                                     domain='general', style='diverse'):
        """
        Generate synthetic data for a classification class

        Args:
            class_name: name of the target class
            n_samples: number of samples to generate
            domain: domain context (medical, legal, etc.)
            style: writing style (formal, casual, diverse)
        """
        prompt = f"""Generate {n_samples} diverse text examples for the class '{class_name}'.
Domain: {domain}
Style: {style}

Requirements:
- Each example should be 1-3 sentences
- Vary the vocabulary, sentence structure, and perspective
- Include both simple and complex cases
- Format as a JSON list: ["example1", "example2", ...]

Generate realistic examples that would appear in real-world {domain} data."""

        print(f"Synthetic data generation prompt for '{class_name}':")
        print(prompt[:300] + "...")
        print(f"\nPlanning to generate {n_samples} samples")

    def generate_edge_cases(self, class_examples, n_edge_cases=20):
        """Generate challenging edge cases"""
        prompt = f"""Based on these training examples:
{chr(10).join(class_examples[:5])}

Generate {n_edge_cases} challenging edge cases that:
1. Are ambiguous between different categories
2. Contain misleading keywords
3. Have unusual sentence structures
4. Test the model's true understanding

Format as JSON list."""

        print("Edge case generation prompt ready")

    def augment_with_paraphrase(self, texts, n_paraphrases=3):
        """Generate paraphrases using LLM"""
        augmented = []

        for text in texts:
            prompt = f"""Paraphrase the following text {n_paraphrases} times.
Keep the same meaning but use different words and sentence structures.

Original: "{text}"

Format as JSON list of {n_paraphrases} paraphrases."""

            augmented.append({
                'original': text,
                'paraphrases': []
            })

        return augmented


class SyntheticImageGenerator:
    """Generate synthetic images with Diffusion Models"""

    def __init__(self, model_name='stabilityai/stable-diffusion-2-1'):
        self.model_name = model_name

    def setup_pipeline(self):
        """
        Initialize Stable Diffusion pipeline
        pip install diffusers accelerate
        """
        try:
            from diffusers import StableDiffusionPipeline
            import torch

            self.pipe = StableDiffusionPipeline.from_pretrained(
                self.model_name,
                torch_dtype=torch.float16
            )

            if torch.cuda.is_available():
                self.pipe = self.pipe.to('cuda')

            print(f"Pipeline initialized: {self.model_name}")
        except Exception as e:
            print(f"Pipeline initialization error: {e}")

    def generate_class_images(self, class_name, n_images=50,
                               style_prompt="high quality, photorealistic"):
        """
        Generate synthetic images for a class

        Args:
            class_name: target class name
            n_images: number of images to generate
            style_prompt: style guidance
        """
        prompts = [
            f"A photo of {class_name}, {style_prompt}",
            f"{class_name} in natural environment, {style_prompt}",
            f"Close-up of {class_name}, detailed, {style_prompt}",
            f"{class_name} from different angle, {style_prompt}",
        ]

        print(f"Synthetic image generation plan for '{class_name}':")
        print(f"Images to generate: {n_images}")
        print("Sample prompts:")
        for p in prompts[:2]:
            print(f"  - {p}")

    def evaluate_synthetic_quality(self, real_images, synthetic_images):
        """Evaluate synthetic image quality using FID score"""
        try:
            from torchmetrics.image.fid import FrechetInceptionDistance

            fid = FrechetInceptionDistance(feature=64)
            fid.update(real_images, real=True)
            fid.update(synthetic_images, real=False)

            fid_score = fid.compute()
            print(f"FID Score: {fid_score:.2f}")
            print("(Lower is better; 0 is perfect)")

            return fid_score
        except Exception as e:
            print(f"FID computation error: {e}")

7. The Data Flywheel

7.1 Data Flywheel Concept

The data flywheel is a virtuous cycle of product, data, and model:

  1. Better model → Better product
  2. Better product → More users
  3. More users → More data
  4. More data → Better model
class DataFlywheelPipeline:
    """Data Flywheel implementation pipeline"""

    def __init__(self, model, feedback_store):
        self.model = model
        self.feedback_store = feedback_store
        self.version = 0

    def collect_production_feedback(self, predictions, user_feedback):
        """
        Collect user feedback from production

        Args:
            predictions: model predictions
            user_feedback: user corrections/confirmations
        """
        valuable_samples = []

        for pred, feedback in zip(predictions, user_feedback):
            if feedback['corrected']:
                sample = {
                    'input': feedback['input'],
                    'model_prediction': pred,
                    'true_label': feedback['correction'],
                    'confidence': pred['confidence'],
                    'timestamp': feedback['timestamp'],
                    'value': 'high'
                }
                valuable_samples.append(sample)

            elif feedback['confirmed'] and pred['confidence'] < 0.7:
                sample = {
                    'input': feedback['input'],
                    'true_label': pred['label'],
                    'confidence': pred['confidence'],
                    'value': 'medium'
                }
                valuable_samples.append(sample)

        print(f"Valuable samples collected: {len(valuable_samples)}")
        return valuable_samples

    def prioritize_labeling_queue(self, unlabeled_pool, budget):
        """
        Prioritize labeling queue

        Priority criteria:
        1. Model uncertainty (higher = more priority)
        2. Class rarity (rarer classes = more priority)
        3. Data diversity (more different from existing = more priority)
        """
        priorities = []

        for sample in unlabeled_pool:
            score = 0

            uncertainty = 1 - max(sample['predicted_probs'])
            score += uncertainty * 0.5

            predicted_class = max(sample['predicted_probs'],
                                  key=sample['predicted_probs'].get)
            rarity = 1 / (sample['class_counts'].get(predicted_class, 1) + 1)
            score += rarity * 0.3

            diversity = np.std(list(sample['predicted_probs'].values()))
            score += diversity * 0.2

            priorities.append((sample, score))

        priorities.sort(key=lambda x: x[1], reverse=True)
        selected = [s for s, _ in priorities[:budget]]

        return selected

8. Data Pipeline Best Practices

8.1 Reproducible Data Processing

import hashlib
import json
import os
from pathlib import Path
from datetime import datetime

class ReproducibleDataPipeline:
    """
    Reproducible data pipeline
    - Tracks all processing steps
    - Verifies integrity via data hashes
    - Supports version control
    """

    def __init__(self, pipeline_name, base_dir='data/processed'):
        self.pipeline_name = pipeline_name
        self.base_dir = Path(base_dir)
        self.steps = []
        self.metadata = {
            'pipeline': pipeline_name,
            'created_at': datetime.now().isoformat(),
            'steps': []
        }

    def add_step(self, step_name, func, *args, **kwargs):
        """Add processing step"""
        self.steps.append({
            'name': step_name,
            'func': func,
            'args': args,
            'kwargs': kwargs
        })

    def compute_hash(self, data):
        """Compute data hash"""
        if isinstance(data, np.ndarray):
            return hashlib.md5(data.tobytes()).hexdigest()
        elif isinstance(data, (list, dict)):
            return hashlib.md5(
                json.dumps(data, sort_keys=True, default=str).encode()
            ).hexdigest()
        else:
            return hashlib.md5(str(data).encode()).hexdigest()

    def run(self, input_data):
        """Execute pipeline"""
        data = input_data

        for step in self.steps:
            print(f"Running: {step['name']}")

            hash_before = self.compute_hash(data)
            data = step['func'](data, *step['args'], **step['kwargs'])
            hash_after = self.compute_hash(data)

            self.metadata['steps'].append({
                'name': step['name'],
                'hash_before': hash_before,
                'hash_after': hash_after,
                'timestamp': datetime.now().isoformat()
            })

            print(f"  Done: {hash_before[:8]} -> {hash_after[:8]}")

        metadata_path = self.base_dir / f"{self.pipeline_name}_metadata.json"
        metadata_path.parent.mkdir(parents=True, exist_ok=True)
        with open(metadata_path, 'w') as f:
            json.dump(self.metadata, f, indent=2)

        print(f"\nPipeline complete. Metadata: {metadata_path}")
        return data


class DataVersionControl:
    """DVC-style data version control"""

    def __init__(self, storage_path='data/.dvc'):
        self.storage_path = Path(storage_path)
        self.storage_path.mkdir(parents=True, exist_ok=True)

    def add(self, data_path):
        """Start tracking a data file"""
        data_path = Path(data_path)

        with open(data_path, 'rb') as f:
            file_hash = hashlib.md5(f.read()).hexdigest()

        dvc_file = data_path.with_suffix('.dvc')
        dvc_metadata = {
            'md5': file_hash,
            'size': os.path.getsize(data_path),
            'path': str(data_path.name),
            'version': datetime.now().isoformat()
        }

        with open(dvc_file, 'w') as f:
            json.dump(dvc_metadata, f, indent=2)

        print(f"Tracking: {data_path}")
        print(f"  MD5: {file_hash}")
        print(f"  Metafile: {dvc_file}")

        return file_hash

    def create_data_contract(self, schema):
        """
        Define a Data Contract
        - Schema definition
        - Quality criteria
        - SLA requirements
        """
        contract = {
            'version': '1.0',
            'schema': schema,
            'quality_rules': {
                'completeness': {'min_threshold': 0.99},
                'accuracy': {'label_error_rate': {'max': 0.05}},
                'consistency': {'duplicate_rate': {'max': 0.01}},
            },
            'sla': {
                'update_frequency': 'daily',
                'max_staleness_hours': 24,
            }
        }

        return contract


def demonstrate_full_data_pipeline():
    """Full Data-Centric AI pipeline demonstration"""
    print("=" * 60)
    print("Data-Centric AI Pipeline Demo")
    print("=" * 60)

    print("\nStep 1: Data Quality Assessment")
    print("  - Measure label error rate")
    print("  - Detect outliers")
    print("  - Remove duplicates")
    print("  - Analyze class distribution")

    print("\nStep 2: Label Refinement")
    print("  - Detect errors with Cleanlab")
    print("  - Correct via majority vote / expert review")
    print("  - Improve inter-rater agreement")

    print("\nStep 3: Data Augmentation")
    print("  - Images: Albumentations")
    print("  - Text: EDA, back-translation")
    print("  - Search for automatic augmentation policies")

    print("\nStep 4: Synthetic Data Generation")
    print("  - Synthesize text with LLMs")
    print("  - Synthesize images with Diffusion Models")
    print("  - Filter by quality (FID, classifier confidence)")

    print("\nStep 5: Active Learning")
    print("  - Prioritize labeling with uncertainty sampling")
    print("  - Ensure diversity with CoreSet method")

    print("\nStep 6: Version Control and Monitoring")
    print("  - Version data with DVC")
    print("  - Maintain quality standards with Data Contracts")
    print("  - Continuously improve with the Data Flywheel")

    print("\nConclusion: Improving data quality is often more impactful than improving models!")

9. Summary and Practical Guide

Data-Centric AI Checklist

1. Data Collection

  • Write labeling guidelines with domain experts
  • Measure inter-rater agreement (target Cohen's Kappa > 0.8)
  • Monitor class distribution during collection

2. Data Cleaning

  • Detect and fix label errors with Cleanlab
  • Remove duplicate samples
  • Review outliers (remove or correct)

3. Data Augmentation

  • Apply augmentation only to training data (not validation/test)
  • Validate data distribution after augmentation
  • Choose augmentation techniques appropriate for your domain

4. Continuous Improvement

  • Collect production error cases
  • Use active learning to make labeling efficient
  • Conduct regular data quality audits

Recommended Tools:

Data-Centric AI is not merely a matter of tools or techniques. It is a mindset shift — from chasing "better models" to chasing "better data." In many real-world projects, this shift alone can deliver dramatic performance improvements.

The most important insight from Andrew Ng's Data-Centric AI movement: your model is only as good as the data it learns from. Investing in data quality, labeling consistency, and systematic data improvement is often the highest-return activity in an AI project.