- Authors

- Name
- Youngju Kim
- @fjvbn20031
데이터 중심 AI(Data-Centric AI) 완전 가이드: 고품질 데이터로 AI 성능 극대화
2021년 Andrew Ng은 AI 커뮤니티에 도전적인 질문을 던졌습니다: "모델 아키텍처를 개선하는 데 집중하는 대신, 데이터 품질을 개선하는 데 집중하면 어떻게 될까?" 이것이 데이터 중심 AI(Data-Centric AI) 운동의 출발점입니다.
전통적인 모델 중심 접근법이 "더 좋은 알고리즘"을 추구한다면, 데이터 중심 접근법은 "더 좋은 데이터"를 추구합니다. 이 가이드는 데이터 중심 AI의 모든 측면을 실전 코드와 함께 완벽하게 다룹니다.
1. 데이터 중심 AI vs 모델 중심 AI
1.1 패러다임 전환
모델 중심 접근법 (Model-Centric AI)
- 데이터는 고정, 코드를 개선
- 더 나은 아키텍처 탐색
- 하이퍼파라미터 튜닝에 집중
- 기존 벤치마크: 데이터셋은 고정되고 모델만 변함
데이터 중심 접근법 (Data-Centric AI)
- 모델은 고정, 데이터를 개선
- 레이블 오류 수정
- 일관된 레이블링 가이드라인 개선
- 데이터 증강과 합성 데이터 추가
1.2 Andrew Ng의 핵심 주장
Andrew Ng은 다음과 같이 말합니다:
"AI 시스템 = 코드(모델/알고리즘) + 데이터"
많은 실용적인 AI 프로젝트에서 코드는 이미 충분히 좋습니다. 문제는 데이터 품질입니다.
실험 결과 (Andrew Ng, DeepLearning.AI):
결함이 있는 제조업 검사 데이터셋에서:
- 기준 성능: 76.2%
- 더 나은 모델만 사용: 0.02% 향상 (76.22%)
- 더 나은 데이터만 사용: 16.9% 향상 (93.1%)
이 결과는 많은 실제 시나리오에서 데이터 개선이 모델 개선보다 훨씬 더 효과적임을 보여줍니다.
1.3 언제 데이터 중심 접근이 효과적인가?
데이터 중심 접근이 특히 효과적인 상황:
- 소규모 데이터셋: 데이터가 수천 개 이하일 때 품질이 더 중요
- 높은 레이블 노이즈: 레이블 오류율이 5% 이상일 때
- 도메인 특화 작업: 일반 모델이 없는 특수 도메인
- 불균형 클래스: 희귀 클래스의 품질이 성능을 결정
- 엄격한 정확도 요구: 의료, 금융 등 높은 정확도가 필요한 분야
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
def compare_model_vs_data_centric(X, y, model_class, noise_level=0.1):
"""
모델 중심 vs 데이터 중심 접근법 성능 비교
Args:
X: 특성 행렬
y: 레이블
model_class: 기본 모델 클래스
noise_level: 레이블 노이즈 비율
"""
# 레이블 노이즈 추가
noisy_y = y.copy()
noise_idx = np.random.choice(len(y), int(len(y) * noise_level), replace=False)
n_classes = len(np.unique(y))
for idx in noise_idx:
wrong_labels = [l for l in range(n_classes) if l != y[idx]]
noisy_y[idx] = np.random.choice(wrong_labels)
X_train, X_test, y_train_clean, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
_, _, y_train_noisy, _ = train_test_split(
X, noisy_y, test_size=0.2, random_state=42
)
# --- 모델 중심 접근 ---
# 노이즈 데이터 + 기본 모델
base_model = model_class()
base_model.fit(X_train, y_train_noisy)
base_acc = accuracy_score(y_test, base_model.predict(X_test))
# 노이즈 데이터 + 더 복잡한 모델 (앙상블)
from sklearn.ensemble import GradientBoostingClassifier
complex_model = GradientBoostingClassifier(n_estimators=200)
complex_model.fit(X_train, y_train_noisy)
complex_acc = accuracy_score(y_test, complex_model.predict(X_test))
# --- 데이터 중심 접근 ---
# 클린 데이터 + 기본 모델
clean_model = model_class()
clean_model.fit(X_train, y_train_clean)
clean_acc = accuracy_score(y_test, clean_model.predict(X_test))
print("=" * 50)
print("모델 중심 vs 데이터 중심 비교")
print("=" * 50)
print(f"기본 모델 + 노이즈 데이터: {base_acc:.3f}")
print(f"복잡한 모델 + 노이즈 데이터: {complex_acc:.3f}")
print(f"기본 모델 + 클린 데이터: {clean_acc:.3f}")
print(f"\n모델 개선 효과: +{(complex_acc - base_acc):.3f}")
print(f"데이터 개선 효과: +{(clean_acc - base_acc):.3f}")
return {
'base_model_noisy_data': base_acc,
'complex_model_noisy_data': complex_acc,
'base_model_clean_data': clean_acc
}
2. 데이터 품질 측정
2.1 Confident Learning과 레이블 오류 탐지
Confident Learning은 Northcutt et al.이 제안한 방법으로, 교차 검증된 예측 확률을 사용하여 레이블 오류를 체계적으로 탐지합니다.
핵심 아이디어: "모델이 클래스 A라고 높은 확률로 예측하는데, 레이블이 클래스 B라면 레이블 오류일 가능성이 높다"
import cleanlab
from cleanlab.filter import find_label_issues
from cleanlab.classification import CleanLearning
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_predict
def detect_label_errors_cleanlab(X, y, model=None):
"""
Cleanlab으로 레이블 오류 탐지
Args:
X: 특성 행렬
y: 레이블 배열
model: 분류 모델 (기본: LogisticRegression)
Returns:
label_issues: 레이블 오류 인덱스와 정보
"""
if model is None:
model = LogisticRegression(max_iter=1000)
# 교차 검증으로 클래스 확률 예측
pred_probs = cross_val_predict(
model, X, y,
cv=5,
method='predict_proba'
)
# 레이블 오류 찾기
label_issues = find_label_issues(
labels=y,
pred_probs=pred_probs,
return_indices_ranked_by='self_confidence'
)
print(f"총 샘플 수: {len(y)}")
print(f"발견된 레이블 오류: {len(label_issues)}")
print(f"오류율: {len(label_issues)/len(y):.2%}")
return label_issues
def cleanlab_full_pipeline(X_train, y_train_noisy, X_test, y_test):
"""
Cleanlab 전체 파이프라인:
1. 레이블 오류 탐지
2. 오류 수정 또는 제거
3. 정제된 데이터로 모델 재학습
"""
from cleanlab.classification import CleanLearning
base_model = LogisticRegression(max_iter=1000)
# CleanLearning: 레이블 오류를 자동으로 처리하며 학습
cl = CleanLearning(base_model, seed=42)
cl.fit(X_train, y_train_noisy)
# 평가
y_pred = cl.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"CleanLearning 정확도: {accuracy:.3f}")
# 레이블 이슈 정보 출력
label_issues_df = cl.get_label_issues()
print(f"\n레이블 이슈 정보:")
print(label_issues_df.head(10))
return cl, label_issues_df
def confident_learning_manual(pred_probs, labels):
"""
Confident Learning 수동 구현
- 클래스별 임계값 계산
- Confident Joint 행렬 구축
"""
n_classes = pred_probs.shape[1]
n_samples = len(labels)
# 클래스별 임계값: 해당 클래스 샘플들의 평균 예측 확률
thresholds = np.zeros(n_classes)
for c in range(n_classes):
class_mask = labels == c
if class_mask.sum() > 0:
thresholds[c] = pred_probs[class_mask, c].mean()
# Confident Joint 행렬 C[s][y]
# s: 예측 클래스, y: 주어진 레이블
C = np.zeros((n_classes, n_classes), dtype=int)
for i in range(n_samples):
y_given = labels[i]
# 가장 높은 확률 클래스 (임계값 이상인 것 중)
over_threshold = pred_probs[i] >= thresholds
if over_threshold.sum() == 0:
y_hat = pred_probs[i].argmax()
else:
y_hat = (pred_probs[i] * over_threshold).argmax()
C[y_hat, y_given] += 1
# 대각선이 아닌 요소가 많은 클래스 쌍이 오류 후보
off_diagonal = C.copy()
np.fill_diagonal(off_diagonal, 0)
print("Confident Joint 행렬 (행: 추정 실제 클래스, 열: 주어진 레이블):")
print(C)
print(f"\n잠재적 오류 샘플 수: {off_diagonal.sum()}")
return C
2.2 데이터 이상치 탐지
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
import torch
import numpy as np
class DataQualityChecker:
"""
종합적인 데이터 품질 검사 도구
"""
def __init__(self):
self.outlier_detector = None
self.quality_report = {}
def check_class_distribution(self, labels):
"""클래스 불균형 검사"""
from collections import Counter
import pandas as pd
counts = Counter(labels)
total = len(labels)
df = pd.DataFrame([
{'class': c, 'count': n, 'percentage': 100 * n / total}
for c, n in sorted(counts.items())
])
imbalance_ratio = max(counts.values()) / min(counts.values())
print("클래스 분포:")
print(df.to_string(index=False))
print(f"\n불균형 비율: {imbalance_ratio:.2f}x")
if imbalance_ratio > 10:
print("경고: 심각한 클래스 불균형!")
elif imbalance_ratio > 3:
print("주의: 클래스 불균형 존재")
self.quality_report['class_imbalance_ratio'] = imbalance_ratio
return df
def detect_outliers(self, X, method='isolation_forest', contamination=0.1):
"""
이상치 탐지
Args:
method: 'isolation_forest' 또는 'lof'
contamination: 예상 이상치 비율
"""
if method == 'isolation_forest':
detector = IsolationForest(
contamination=contamination,
random_state=42
)
elif method == 'lof':
detector = LocalOutlierFactor(
contamination=contamination,
novelty=True
)
# 이상치 탐지 (-1: 이상치, 1: 정상)
predictions = detector.fit_predict(X)
outlier_mask = predictions == -1
outlier_indices = np.where(outlier_mask)[0]
print(f"탐지된 이상치: {outlier_mask.sum()} / {len(X)} ({outlier_mask.mean():.2%})")
self.quality_report['n_outliers'] = outlier_mask.sum()
return outlier_indices, outlier_mask
def check_duplicates(self, X, y=None, threshold=0.99):
"""
중복 샘플 탐지
Args:
threshold: 유사도 임계값 (1.0 = 완전 동일)
"""
from sklearn.metrics.pairwise import cosine_similarity
# 유사도 행렬 계산 (대규모 데이터에서는 샘플링)
if len(X) > 10000:
sample_idx = np.random.choice(len(X), 10000, replace=False)
X_sample = X[sample_idx]
else:
X_sample = X
sample_idx = np.arange(len(X))
sim_matrix = cosine_similarity(X_sample)
np.fill_diagonal(sim_matrix, 0)
# 임계값 이상 유사한 쌍 찾기
duplicate_pairs = np.argwhere(sim_matrix >= threshold)
# 중복 제거 (i < j만 유지)
duplicate_pairs = duplicate_pairs[duplicate_pairs[:, 0] < duplicate_pairs[:, 1]]
print(f"중복 쌍 발견: {len(duplicate_pairs)}")
if y is not None and len(duplicate_pairs) > 0:
# 레이블이 다른 중복 (잠재적 오류)
label_conflicts = 0
for i, j in duplicate_pairs:
if y[sample_idx[i]] != y[sample_idx[j]]:
label_conflicts += 1
print(f"레이블 충돌 중복 쌍: {label_conflicts}")
return duplicate_pairs
def compute_data_quality_score(self, X, y):
"""종합 데이터 품질 점수 계산"""
scores = {}
# 결측값 비율
if hasattr(X, 'isnull'):
missing_rate = X.isnull().mean().mean()
else:
missing_rate = np.isnan(X).mean()
scores['completeness'] = 1 - missing_rate
# 클래스 균형
from collections import Counter
counts = Counter(y)
n_classes = len(counts)
ideal_count = len(y) / n_classes
balance_score = sum(
min(c, ideal_count) / ideal_count
for c in counts.values()
) / n_classes
scores['balance'] = balance_score
# 최종 점수
overall_score = np.mean(list(scores.values()))
scores['overall'] = overall_score
print("데이터 품질 점수:")
for metric, score in scores.items():
print(f" {metric}: {score:.3f}")
return scores
3. 레이블링 전략
3.1 고품질 레이블링 가이드라인
def compute_inter_rater_agreement(annotations):
"""
레이블러 간 일치도 계산
Args:
annotations: shape (n_samples, n_raters)의 배열
Returns:
cohen_kappa: 코헨 카파 점수
fleiss_kappa: 플라이스 카파 점수 (다수 레이블러)
"""
from sklearn.metrics import cohen_kappa_score
import numpy as np
n_samples, n_raters = annotations.shape
# 레이블러 쌍별 코헨 카파
kappa_scores = []
for i in range(n_raters):
for j in range(i+1, n_raters):
kappa = cohen_kappa_score(
annotations[:, i],
annotations[:, j]
)
kappa_scores.append((i, j, kappa))
print(f"레이블러 {i} vs 레이블러 {j}: kappa = {kappa:.3f}")
mean_kappa = np.mean([k for _, _, k in kappa_scores])
print(f"\n평균 코헨 카파: {mean_kappa:.3f}")
# 카파 해석
if mean_kappa < 0.2:
interpretation = "미약한 일치"
elif mean_kappa < 0.4:
interpretation = "보통 일치"
elif mean_kappa < 0.6:
interpretation = "적절한 일치"
elif mean_kappa < 0.8:
interpretation = "상당한 일치"
else:
interpretation = "거의 완벽한 일치"
print(f"해석: {interpretation}")
# 다수결 레이블 생성
from scipy import stats
majority_labels = stats.mode(annotations, axis=1)[0].flatten()
print(f"\n다수결 레이블 생성 완료")
return mean_kappa, majority_labels
def create_labeling_guidelines(task_name, examples):
"""
레이블링 가이드라인 템플릿 생성
좋은 레이블링 가이드라인의 요소:
1. 명확한 정의와 경계
2. 긍정/부정 예시
3. 경계 케이스 처리 방법
4. 일관성 체크리스트
"""
guidelines = {
'task': task_name,
'categories': {},
'edge_cases': [],
'consistency_rules': []
}
print(f"'{task_name}' 레이블링 가이드라인 템플릿:")
print("-" * 50)
print("1. 각 클래스의 명확한 정의를 작성하세요")
print("2. 최소 5개의 긍정 예시와 5개의 부정 예시를 포함하세요")
print("3. 경계 케이스(Boundary Cases) 처리 방법을 명시하세요")
print("4. 레이블러가 확신할 수 없을 때의 기본 규칙을 정의하세요")
print("5. 품질 체크 기준을 포함하세요")
return guidelines
3.2 약한 지도 학습 (Weak Supervision) with Snorkel
def snorkel_programmatic_labeling_demo():
"""
Snorkel을 사용한 프로그래매틱 레이블링 데모
pip install snorkel
"""
from snorkel.labeling import labeling_function, PandasLFApplier
from snorkel.labeling.model import LabelModel
import pandas as pd
import re
# 감성 분류 예시
POSITIVE = 1
NEGATIVE = 0
ABSTAIN = -1
@labeling_function()
def lf_positive_keywords(x):
"""긍정 키워드 기반 레이블링 함수"""
positive_words = ['good', 'great', 'excellent', 'amazing', 'love', 'best']
text = x.text.lower()
if any(word in text for word in positive_words):
return POSITIVE
return ABSTAIN
@labeling_function()
def lf_negative_keywords(x):
"""부정 키워드 기반 레이블링 함수"""
negative_words = ['bad', 'terrible', 'awful', 'hate', 'worst', 'horrible']
text = x.text.lower()
if any(word in text for word in negative_words):
return NEGATIVE
return ABSTAIN
@labeling_function()
def lf_rating_high(x):
"""높은 평점 기반 레이블링 함수"""
if hasattr(x, 'rating') and x.rating >= 4:
return POSITIVE
return ABSTAIN
@labeling_function()
def lf_rating_low(x):
"""낮은 평점 기반 레이블링 함수"""
if hasattr(x, 'rating') and x.rating <= 2:
return NEGATIVE
return ABSTAIN
@labeling_function()
def lf_negation_check(x):
"""부정 표현 감지 (NOT 연산자)"""
text = x.text.lower()
if re.search(r"not (good|great|excellent)", text):
return NEGATIVE
if re.search(r"not (bad|terrible)", text):
return POSITIVE
return ABSTAIN
# 레이블링 함수 목록
lfs = [
lf_positive_keywords,
lf_negative_keywords,
lf_rating_high,
lf_rating_low,
lf_negation_check,
]
# 데이터에 레이블링 함수 적용
# df_train은 레이블 없는 텍스트 데이터프레임
# applier = PandasLFApplier(lfs=lfs)
# L_train = applier.apply(df_train)
# 레이블 모델로 통합
# label_model = LabelModel(cardinality=2, verbose=True)
# label_model.fit(L_train=L_train, n_epochs=500, log_freq=100)
# 확률적 레이블 생성
# probs_train = label_model.predict_proba(L=L_train)
print("Snorkel 프로그래매틱 레이블링 파이프라인:")
print("1. 도메인 전문가가 레이블링 함수(LF) 작성")
print("2. LF를 레이블 없는 데이터에 적용")
print("3. 레이블 모델로 여러 LF 통합 (노이즈 제거)")
print("4. 생성된 소프트 레이블로 다운스트림 모델 학습")
print(f"\n정의된 레이블링 함수 수: {len(lfs)}")
return lfs
4. 능동 학습 (Active Learning)
능동 학습은 레이블이 없는 대규모 데이터에서 가장 유익한 샘플을 선택하여 레이블링 비용을 최소화하는 방법입니다.
import numpy as np
from sklearn.base import BaseEstimator
import torch
import torch.nn as nn
class ActiveLearner:
"""
능동 학습 구현
여러 샘플 선택 전략 지원
"""
def __init__(self, model, strategy='uncertainty', n_initial=100):
self.model = model
self.strategy = strategy
self.n_initial = n_initial
def uncertainty_sampling(self, X_unlabeled, n_samples):
"""
불확실성 샘플링
모델이 가장 확신하지 못하는 샘플 선택
"""
# 예측 확률
probs = self._get_probs(X_unlabeled)
if self.strategy == 'least_confidence':
# 가장 낮은 최대 확률
uncertainty = 1 - probs.max(axis=1)
elif self.strategy == 'margin':
# 상위 두 클래스의 확률 차이
sorted_probs = np.sort(probs, axis=1)[:, ::-1]
uncertainty = 1 - (sorted_probs[:, 0] - sorted_probs[:, 1])
elif self.strategy == 'entropy':
# 엔트로피: 예측 분포의 불확실성
uncertainty = -np.sum(probs * np.log(probs + 1e-10), axis=1)
else:
uncertainty = 1 - probs.max(axis=1)
# 가장 불확실한 샘플 인덱스 반환
selected_indices = np.argsort(uncertainty)[-n_samples:]
return selected_indices, uncertainty
def diversity_sampling(self, X_unlabeled, X_labeled, n_samples):
"""
다양성 기반 샘플링 (CoreSet 방법)
레이블된 데이터와 가장 다른 샘플 선택
"""
from sklearn.metrics.pairwise import euclidean_distances
# 레이블된 데이터와의 최소 거리
distances = euclidean_distances(X_unlabeled, X_labeled)
min_distances = distances.min(axis=1)
# CoreSet: 가장 먼 점들을 순차적으로 선택
selected = []
remaining = list(range(len(X_unlabeled)))
current_labeled = X_labeled.copy()
for _ in range(n_samples):
dists = euclidean_distances(
X_unlabeled[remaining],
current_labeled
).min(axis=1)
# 가장 먼 점 선택
best_idx = remaining[np.argmax(dists)]
selected.append(best_idx)
remaining.remove(best_idx)
current_labeled = np.vstack([current_labeled, X_unlabeled[best_idx]])
return np.array(selected)
def batch_mode_active_learning(self, X_pool, y_oracle, X_test, y_test,
n_iterations=10, n_per_iter=50):
"""
배치 모드 능동 학습 루프
Args:
X_pool: 레이블 없는 데이터 풀
y_oracle: 레이블러 (실제 레이블 소스)
n_per_iter: 각 반복에서 레이블할 샘플 수
"""
# 초기 레이블된 집합
initial_indices = np.random.choice(
len(X_pool), self.n_initial, replace=False
)
labeled_indices = list(initial_indices)
unlabeled_indices = [
i for i in range(len(X_pool)) if i not in labeled_indices
]
accuracies = []
n_labeled_list = []
for iteration in range(n_iterations):
X_labeled = X_pool[labeled_indices]
y_labeled = y_oracle[labeled_indices]
# 현재 레이블된 데이터로 모델 학습
self.model.fit(X_labeled, y_labeled)
# 테스트 정확도 평가
acc = accuracy_score(y_test, self.model.predict(X_test))
accuracies.append(acc)
n_labeled_list.append(len(labeled_indices))
print(f"반복 {iteration+1}: 레이블 수={len(labeled_indices)}, 정확도={acc:.3f}")
if len(unlabeled_indices) == 0:
break
# 다음에 레이블할 샘플 선택
X_unlabeled = X_pool[unlabeled_indices]
selected, _ = self.uncertainty_sampling(X_unlabeled, n_per_iter)
# 실제 인덱스로 변환
actual_selected = [unlabeled_indices[i] for i in selected]
# 레이블된 집합에 추가
labeled_indices.extend(actual_selected)
unlabeled_indices = [
i for i in unlabeled_indices if i not in actual_selected
]
return accuracies, n_labeled_list
def _get_probs(self, X):
"""모델 예측 확률 반환"""
if hasattr(self.model, 'predict_proba'):
return self.model.predict_proba(X)
else:
logits = self.model.predict(X)
from scipy.special import softmax
return softmax(logits, axis=1)
def compare_active_learning_strategies(X, y, model, n_initial=50, n_budget=500):
"""여러 능동 학습 전략 비교"""
strategies = ['least_confidence', 'margin', 'entropy']
results = {}
for strategy in strategies:
learner = ActiveLearner(model, strategy=strategy, n_initial=n_initial)
accs, n_labeled = learner.batch_mode_active_learning(
X, y,
X_test=X[:100],
y_test=y[:100],
n_iterations=10,
n_per_iter=50
)
results[strategy] = {'accuracies': accs, 'n_labeled': n_labeled}
# 랜덤 샘플링 기준
random_learner = ActiveLearner(model, strategy='random', n_initial=n_initial)
# (랜덤 전략 구현은 생략)
return results
5. 데이터 증강 심층 가이드
5.1 이미지 증강: Albumentations
import albumentations as A
from albumentations.pytorch import ToTensorV2
import cv2
import numpy as np
from PIL import Image
def get_train_transforms(image_size=224):
"""
학습용 강력한 증강 파이프라인 (Albumentations)
"""
return A.Compose([
# 기하학적 변환
A.RandomResizedCrop(
height=image_size,
width=image_size,
scale=(0.7, 1.0),
ratio=(0.75, 1.33)
),
A.HorizontalFlip(p=0.5),
A.ShiftScaleRotate(
shift_limit=0.1,
scale_limit=0.2,
rotate_limit=30,
p=0.5
),
# 색상 변환
A.ColorJitter(
brightness=0.3,
contrast=0.3,
saturation=0.3,
hue=0.1,
p=0.8
),
A.ToGray(p=0.1),
A.RandomGamma(gamma_limit=(80, 120), p=0.3),
# 노이즈 및 블러
A.GaussNoise(var_limit=(10, 50), p=0.3),
A.OneOf([
A.MotionBlur(blur_limit=7),
A.GaussianBlur(blur_limit=7),
A.MedianBlur(blur_limit=7),
], p=0.3),
# 컷아웃 / 랜덤 지우기
A.CoarseDropout(
max_holes=8,
max_height=32,
max_width=32,
fill_value=0,
p=0.3
),
# 그리드 왜곡
A.OneOf([
A.GridDistortion(p=1),
A.ElasticTransform(p=1),
A.OpticalDistortion(p=1),
], p=0.2),
# 정규화 및 텐서 변환
A.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
),
ToTensorV2(),
])
def get_val_transforms(image_size=224):
"""검증용 변환 (증강 없이 정규화만)"""
return A.Compose([
A.Resize(height=image_size, width=image_size),
A.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
),
ToTensorV2(),
])
def mixup_augmentation(images, labels, alpha=0.4):
"""
MixUp 증강: 두 이미지와 레이블을 혼합
Zhang et al., "mixup: Beyond Empirical Risk Minimization" (2018)
"""
import torch
batch_size = images.shape[0]
lam = np.random.beta(alpha, alpha)
# 랜덤 순서로 섞기
perm = torch.randperm(batch_size)
# 이미지 혼합
mixed_images = lam * images + (1 - lam) * images[perm]
# 레이블 혼합 (soft labels)
labels_a = labels
labels_b = labels[perm]
return mixed_images, labels_a, labels_b, lam
def cutmix_augmentation(images, labels, alpha=1.0):
"""
CutMix 증강: 이미지 일부를 다른 이미지로 잘라 붙이기
Yun et al., "CutMix: Training Strategy that Makes Use of
Sample Mixing" (2019)
"""
import torch
batch_size, c, h, w = images.shape
lam = np.random.beta(alpha, alpha)
perm = torch.randperm(batch_size)
# 랜덤 박스 생성
cut_ratio = np.sqrt(1 - lam)
cut_h = int(h * cut_ratio)
cut_w = int(w * cut_ratio)
cx = np.random.randint(w)
cy = np.random.randint(h)
bbx1 = np.clip(cx - cut_w // 2, 0, w)
bby1 = np.clip(cy - cut_h // 2, 0, h)
bbx2 = np.clip(cx + cut_w // 2, 0, w)
bby2 = np.clip(cy + cut_h // 2, 0, h)
# 박스 영역을 다른 이미지로 교체
mixed_images = images.clone()
mixed_images[:, :, bby1:bby2, bbx1:bbx2] = images[perm, :, bby1:bby2, bbx1:bbx2]
# 실제 혼합 비율 조정
lam = 1 - ((bbx2 - bbx1) * (bby2 - bby1) / (w * h))
labels_a = labels
labels_b = labels[perm]
return mixed_images, labels_a, labels_b, lam
5.2 텍스트 증강
class TextAugmenter:
"""
텍스트 데이터 증강 기법들
"""
def __init__(self, language='en'):
self.language = language
def eda_synonym_replacement(self, text, n=1):
"""
EDA (Easy Data Augmentation): 동의어 교체
Wei and Zou, "EDA: Easy Data Augmentation Techniques
for Boosting Performance on Text Classification Tasks" (2019)
"""
import nltk
from nltk.corpus import wordnet
words = text.split()
new_words = words.copy()
# 불용어 제외
stop_words = set(['a', 'an', 'the', 'is', 'are', 'was', 'were',
'i', 'me', 'my', 'we', 'our', 'you', 'your'])
# 교체 가능한 단어 찾기
replaceable = [
(i, word) for i, word in enumerate(words)
if word.lower() not in stop_words
]
np.random.shuffle(replaceable)
replaced = 0
for idx, word in replaceable:
if replaced >= n:
break
synsets = wordnet.synsets(word)
if synsets:
synonyms = [
lemma.name() for synset in synsets
for lemma in synset.lemmas()
if lemma.name() != word
]
if synonyms:
new_words[idx] = np.random.choice(synonyms).replace('_', ' ')
replaced += 1
return ' '.join(new_words)
def eda_random_insertion(self, text, n=1):
"""EDA: 랜덤 위치에 동의어 삽입"""
import nltk
from nltk.corpus import wordnet
words = text.split()
new_words = words.copy()
for _ in range(n):
# 랜덤 단어의 동의어 찾기
word = np.random.choice(words)
synsets = wordnet.synsets(word)
if synsets:
synonyms = [
lemma.name() for synset in synsets
for lemma in synset.lemmas()
]
if synonyms:
synonym = np.random.choice(synonyms).replace('_', ' ')
insert_pos = np.random.randint(0, len(new_words) + 1)
new_words.insert(insert_pos, synonym)
return ' '.join(new_words)
def eda_random_swap(self, text, n=1):
"""EDA: 랜덤 단어 교환"""
words = text.split()
if len(words) < 2:
return text
new_words = words.copy()
for _ in range(n):
i, j = np.random.choice(len(new_words), 2, replace=False)
new_words[i], new_words[j] = new_words[j], new_words[i]
return ' '.join(new_words)
def eda_random_deletion(self, text, p=0.1):
"""EDA: 랜덤 단어 삭제"""
words = text.split()
if len(words) == 1:
return text
new_words = [
word for word in words
if np.random.random() > p
]
return ' '.join(new_words) if new_words else np.random.choice(words)
def back_translation(self, text, src_lang='en', pivot_lang='fr'):
"""
역번역 증강: 영어 -> 프랑스어 -> 영어
의미는 유지하면서 표현 다양화
(실제 구현에서는 번역 API 또는 모델 사용)
"""
# 예시: Helsinki-NLP/opus-mt 모델 사용
try:
from transformers import pipeline
# 영어 -> 피벗 언어
translator_fwd = pipeline(
f"translation_{src_lang}_to_{pivot_lang}",
model=f"Helsinki-NLP/opus-mt-{src_lang}-{pivot_lang}"
)
# 피벗 언어 -> 영어
translator_bwd = pipeline(
f"translation_{pivot_lang}_to_{src_lang}",
model=f"Helsinki-NLP/opus-mt-{pivot_lang}-{src_lang}"
)
# 번역 수행
pivot_text = translator_fwd(text)[0]['translation_text']
back_translated = translator_bwd(pivot_text)[0]['translation_text']
return back_translated
except Exception as e:
print(f"번역 오류: {e}")
return text
def augment_dataset(self, texts, labels, n_aug=4):
"""전체 데이터셋 증강"""
augmented_texts = []
augmented_labels = []
for text, label in zip(texts, labels):
augmented_texts.append(text)
augmented_labels.append(label)
for _ in range(n_aug):
aug_type = np.random.choice(
['synonym', 'insertion', 'swap', 'deletion']
)
if aug_type == 'synonym':
aug_text = self.eda_synonym_replacement(text)
elif aug_type == 'insertion':
aug_text = self.eda_random_insertion(text)
elif aug_type == 'swap':
aug_text = self.eda_random_swap(text)
else:
aug_text = self.eda_random_deletion(text)
augmented_texts.append(aug_text)
augmented_labels.append(label)
print(f"원본 샘플: {len(texts)}")
print(f"증강 후 샘플: {len(augmented_texts)}")
return augmented_texts, augmented_labels
5.3 자동 증강 (AutoAugment, RandAugment)
import torch
import torchvision.transforms as transforms
def get_randaugment_transforms(n=2, m=9, image_size=224):
"""
RandAugment: 랜덤 증강 정책
Cubuk et al., "RandAugment: Practical Automated Data Augmentation" (2019)
Args:
n: 적용할 증강 연산 수
m: 증강 강도 (0-30)
"""
# PyTorch 내장 RandAugment 사용 (torchvision >= 0.12)
transform = transforms.Compose([
transforms.RandomResizedCrop(image_size),
transforms.RandAugment(num_ops=n, magnitude=m),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)
])
return transform
def specaugment_for_audio(spectrogram, freq_mask_param=27, time_mask_param=70):
"""
SpecAugment: 오디오 스펙트로그램 증강
Park et al., "SpecAugment: A Simple Data Augmentation Method
for Automatic Speech Recognition" (2019)
Args:
spectrogram: 입력 스펙트로그램 (freq, time)
freq_mask_param: 주파수 마스크 최대 크기
time_mask_param: 시간 마스크 최대 크기
"""
import torch
import torchaudio.transforms as T
freq_mask = T.FrequencyMasking(freq_mask_param=freq_mask_param)
time_mask = T.TimeMasking(time_mask_param=time_mask_param)
# 주파수 마스킹
augmented = freq_mask(spectrogram)
# 시간 마스킹
augmented = time_mask(augmented)
return augmented
6. 합성 데이터 생성
6.1 LLM으로 합성 텍스트 생성
class SyntheticTextGenerator:
"""
LLM을 사용한 합성 학습 데이터 생성
"""
def __init__(self, llm_client, model_name='gpt-4'):
self.llm = llm_client
self.model_name = model_name
def generate_classification_data(self, class_name, n_samples=100,
domain='general', style='diverse'):
"""
분류 작업을 위한 합성 데이터 생성
Args:
class_name: 생성할 클래스 이름
n_samples: 생성할 샘플 수
domain: 도메인 (medical, legal, etc.)
style: 스타일 (formal, casual, diverse)
"""
prompt = f"""Generate {n_samples} diverse text examples for the class '{class_name}'.
Domain: {domain}
Style: {style}
Requirements:
- Each example should be 1-3 sentences
- Vary the vocabulary, sentence structure, and perspective
- Include both simple and complex cases
- Format as a JSON list: ["example1", "example2", ...]
Generate realistic examples that would appear in real-world {domain} data."""
# LLM 호출 (실제 구현에서는 API 사용)
# response = self.llm.complete(prompt)
# examples = json.loads(response)
print(f"'{class_name}' 클래스 합성 데이터 생성 프롬프트:")
print(prompt[:300] + "...")
print(f"\n{n_samples}개 샘플 생성 예정")
def generate_edge_cases(self, class_examples, n_edge_cases=20):
"""경계 케이스 합성 데이터 생성"""
prompt = f"""Based on these training examples:
{chr(10).join(class_examples[:5])}
Generate {n_edge_cases} challenging edge cases that:
1. Are ambiguous between different categories
2. Contain misleading keywords
3. Have unusual sentence structures
4. Test the model's true understanding
Format as JSON list."""
print("경계 케이스 생성 프롬프트 준비 완료")
def augment_with_paraphrase(self, texts, n_paraphrases=3):
"""
LLM으로 의역(paraphrase) 생성
"""
augmented = []
for text in texts:
prompt = f"""Paraphrase the following text {n_paraphrases} times.
Keep the same meaning but use different words and sentence structures.
Original: "{text}"
Format as JSON list of {n_paraphrases} paraphrases."""
augmented.append({
'original': text,
'paraphrases': [] # LLM 응답으로 채워짐
})
return augmented
class SyntheticImageGenerator:
"""
Diffusion Model로 합성 이미지 생성
"""
def __init__(self, model_name='stabilityai/stable-diffusion-2-1'):
self.model_name = model_name
def setup_pipeline(self):
"""
Stable Diffusion 파이프라인 초기화
pip install diffusers accelerate
"""
try:
from diffusers import StableDiffusionPipeline
import torch
self.pipe = StableDiffusionPipeline.from_pretrained(
self.model_name,
torch_dtype=torch.float16
)
if torch.cuda.is_available():
self.pipe = self.pipe.to('cuda')
print(f"파이프라인 초기화 완료: {self.model_name}")
except Exception as e:
print(f"파이프라인 초기화 오류: {e}")
def generate_class_images(self, class_name, n_images=50,
style_prompt="high quality, photorealistic"):
"""
특정 클래스의 합성 이미지 생성
Args:
class_name: 생성할 클래스
n_images: 생성할 이미지 수
style_prompt: 스타일 프롬프트
"""
prompts = [
f"A photo of {class_name}, {style_prompt}",
f"{class_name} in natural environment, {style_prompt}",
f"Close-up of {class_name}, detailed, {style_prompt}",
f"{class_name} from different angle, {style_prompt}",
]
print(f"'{class_name}' 합성 이미지 생성 계획:")
print(f"생성할 이미지 수: {n_images}")
print("사용할 프롬프트 예시:")
for p in prompts[:2]:
print(f" - {p}")
# 실제 생성 코드
# images = []
# for i in range(n_images):
# prompt = prompts[i % len(prompts)]
# image = self.pipe(prompt).images[0]
# images.append(image)
# return images
def evaluate_synthetic_quality(self, real_images, synthetic_images):
"""
합성 이미지 품질 평가 (FID 스코어)
"""
try:
from torchmetrics.image.fid import FrechetInceptionDistance
fid = FrechetInceptionDistance(feature=64)
# 실제 이미지 추가
fid.update(real_images, real=True)
# 합성 이미지 추가
fid.update(synthetic_images, real=False)
fid_score = fid.compute()
print(f"FID 스코어: {fid_score:.2f}")
print("(낮을수록 좋음, 0이 완벽)")
return fid_score
except Exception as e:
print(f"FID 계산 오류: {e}")
7. 데이터 플라이휠 (Data Flywheel)
7.1 데이터 플라이휠 개념
데이터 플라이휠은 제품-데이터-모델의 선순환 구조입니다:
- 더 나은 모델 → 더 나은 제품
- 더 나은 제품 → 더 많은 사용자
- 더 많은 사용자 → 더 많은 데이터
- 더 많은 데이터 → 더 나은 모델
class DataFlywheelPipeline:
"""
데이터 플라이휠 구현 파이프라인
"""
def __init__(self, model, feedback_store):
self.model = model
self.feedback_store = feedback_store
self.version = 0
def collect_production_feedback(self, predictions, user_feedback):
"""
프로덕션에서 사용자 피드백 수집
Args:
predictions: 모델 예측값
user_feedback: 사용자 교정/확인 데이터
"""
valuable_samples = []
for pred, feedback in zip(predictions, user_feedback):
if feedback['corrected']:
# 사용자가 수정한 샘플 (오류 케이스)
sample = {
'input': feedback['input'],
'model_prediction': pred,
'true_label': feedback['correction'],
'confidence': pred['confidence'],
'timestamp': feedback['timestamp'],
'value': 'high' # 오류 케이스는 가치 높음
}
valuable_samples.append(sample)
elif feedback['confirmed'] and pred['confidence'] < 0.7:
# 낮은 신뢰도지만 정확한 예측
sample = {
'input': feedback['input'],
'true_label': pred['label'],
'confidence': pred['confidence'],
'value': 'medium'
}
valuable_samples.append(sample)
print(f"수집된 가치 있는 샘플: {len(valuable_samples)}")
return valuable_samples
def prioritize_labeling_queue(self, unlabeled_pool, budget):
"""
레이블링 우선순위 결정
우선순위 기준:
1. 모델 불확실성 (높을수록 우선)
2. 클래스 희귀도 (희귀 클래스 우선)
3. 데이터 다양성 (기존과 다른 샘플 우선)
"""
priorities = []
for sample in unlabeled_pool:
score = 0
# 불확실성 점수
uncertainty = 1 - max(sample['predicted_probs'])
score += uncertainty * 0.5
# 클래스 희귀도 점수
predicted_class = max(sample['predicted_probs'],
key=sample['predicted_probs'].get)
rarity = 1 / (sample['class_counts'].get(predicted_class, 1) + 1)
score += rarity * 0.3
# 다양성 점수 (간단한 근사)
diversity = np.std(sample['predicted_probs'].values())
score += diversity * 0.2
priorities.append((sample, score))
# 우선순위 정렬
priorities.sort(key=lambda x: x[1], reverse=True)
# 예산 내에서 선택
selected = [s for s, _ in priorities[:budget]]
return selected
def continuous_model_improvement(self, new_data, evaluation_set):
"""
지속적 모델 개선 루프
"""
metrics_history = []
while True: # 실제로는 종료 조건 추가
# 1. 새 데이터 수집
new_samples = self.collect_production_feedback(
# 실제 구현에서는 실시간 데이터 스트림
predictions=[],
user_feedback=[]
)
if len(new_samples) < 100: # 최소 샘플 수
continue
# 2. 데이터 품질 검사
checker = DataQualityChecker()
# quality_scores = checker.compute_data_quality_score(...)
# 3. 모델 재학습
# self.model.finetune(new_samples)
# 4. 평가 및 A/B 테스트
# metrics = evaluate_model(self.model, evaluation_set)
# metrics_history.append(metrics)
self.version += 1
print(f"모델 버전 {self.version} 학습 완료")
break # 데모용
return metrics_history
8. 데이터 파이프라인 모범 사례
8.1 재현 가능한 데이터 처리
import hashlib
import json
import os
from pathlib import Path
from datetime import datetime
class ReproducibleDataPipeline:
"""
재현 가능한 데이터 파이프라인
- 모든 처리 단계 추적
- 데이터 해시로 무결성 검증
- 버전 관리 지원
"""
def __init__(self, pipeline_name, base_dir='data/processed'):
self.pipeline_name = pipeline_name
self.base_dir = Path(base_dir)
self.steps = []
self.metadata = {
'pipeline': pipeline_name,
'created_at': datetime.now().isoformat(),
'steps': []
}
def add_step(self, step_name, func, *args, **kwargs):
"""처리 단계 추가"""
self.steps.append({
'name': step_name,
'func': func,
'args': args,
'kwargs': kwargs
})
def compute_hash(self, data):
"""데이터 해시 계산"""
if isinstance(data, np.ndarray):
return hashlib.md5(data.tobytes()).hexdigest()
elif isinstance(data, (list, dict)):
return hashlib.md5(
json.dumps(data, sort_keys=True, default=str).encode()
).hexdigest()
else:
return hashlib.md5(str(data).encode()).hexdigest()
def run(self, input_data):
"""파이프라인 실행"""
data = input_data
for step in self.steps:
print(f"실행 중: {step['name']}")
# 처리 전 해시
hash_before = self.compute_hash(data)
# 처리 실행
data = step['func'](data, *step['args'], **step['kwargs'])
# 처리 후 해시
hash_after = self.compute_hash(data)
# 메타데이터 기록
self.metadata['steps'].append({
'name': step['name'],
'hash_before': hash_before,
'hash_after': hash_after,
'timestamp': datetime.now().isoformat()
})
print(f" 완료: {hash_before[:8]} -> {hash_after[:8]}")
# 메타데이터 저장
metadata_path = self.base_dir / f"{self.pipeline_name}_metadata.json"
metadata_path.parent.mkdir(parents=True, exist_ok=True)
with open(metadata_path, 'w') as f:
json.dump(self.metadata, f, indent=2)
print(f"\n파이프라인 완료. 메타데이터: {metadata_path}")
return data
class DataVersionControl:
"""
DVC 스타일 데이터 버전 관리
(실제 사용에서는 dvc.org 사용 권장)
"""
def __init__(self, storage_path='data/.dvc'):
self.storage_path = Path(storage_path)
self.storage_path.mkdir(parents=True, exist_ok=True)
def add(self, data_path):
"""데이터 파일 추적 시작"""
data_path = Path(data_path)
# 해시 계산
with open(data_path, 'rb') as f:
file_hash = hashlib.md5(f.read()).hexdigest()
# .dvc 메타파일 생성
dvc_file = data_path.with_suffix('.dvc')
dvc_metadata = {
'md5': file_hash,
'size': os.path.getsize(data_path),
'path': str(data_path.name),
'version': datetime.now().isoformat()
}
with open(dvc_file, 'w') as f:
json.dump(dvc_metadata, f, indent=2)
print(f"추적 시작: {data_path}")
print(f" MD5: {file_hash}")
print(f" 메타파일: {dvc_file}")
return file_hash
def create_data_contract(self, schema):
"""
데이터 계약(Data Contract) 정의
- 스키마 정의
- 품질 기준
- SLA 요구사항
"""
contract = {
'version': '1.0',
'schema': schema,
'quality_rules': {
'completeness': {'min_threshold': 0.99},
'accuracy': {'label_error_rate': {'max': 0.05}},
'consistency': {'duplicate_rate': {'max': 0.01}},
},
'sla': {
'update_frequency': 'daily',
'max_staleness_hours': 24,
}
}
return contract
def demonstrate_full_data_pipeline():
"""
데이터 중심 AI 전체 파이프라인 데모
"""
print("=" * 60)
print("데이터 중심 AI 파이프라인 데모")
print("=" * 60)
# 1. 데이터 품질 검사
print("\n1단계: 데이터 품질 검사")
print(" - 레이블 오류율 측정")
print(" - 이상치 탐지")
print(" - 중복 샘플 제거")
print(" - 클래스 불균형 분석")
# 2. 레이블 정제
print("\n2단계: 레이블 정제")
print(" - Cleanlab으로 오류 레이블 탐지")
print(" - 다수결/전문가 검토로 수정")
print(" - 레이블러 간 일치도 개선")
# 3. 데이터 증강
print("\n3단계: 데이터 증강")
print(" - 이미지: Albumentations")
print(" - 텍스트: EDA, 역번역")
print(" - 자동 증강 정책 탐색")
# 4. 합성 데이터
print("\n4단계: 합성 데이터 생성")
print(" - LLM으로 텍스트 합성")
print(" - Diffusion Model로 이미지 합성")
print(" - 품질 필터링 (FID, 분류기 신뢰도)")
# 5. 능동 학습
print("\n5단계: 능동 학습")
print(" - 불확실성 샘플링으로 레이블링 우선순위 결정")
print(" - 코어셋 방법으로 다양성 보장")
# 6. 파이프라인 버전 관리
print("\n6단계: 버전 관리 및 모니터링")
print(" - DVC로 데이터 버전 관리")
print(" - 데이터 계약으로 품질 기준 유지")
print(" - 데이터 플라이휠로 지속 개선")
print("\n결론: 데이터 품질 개선이 모델 개선보다 효과적일 때가 많습니다!")
9. 종합 요약과 실천 가이드
데이터 중심 AI 체크리스트
1. 데이터 수집 단계
- 도메인 전문가의 레이블링 가이드라인 작성
- 레이블러 간 일치도 측정 (Cohen's Kappa > 0.8 목표)
- 클래스 분포 모니터링
2. 데이터 정제 단계
- Cleanlab으로 레이블 오류 탐지 및 수정
- 중복 샘플 제거
- 이상치 검토 (제거 또는 수정)
3. 데이터 증강 단계
- 훈련 데이터에만 증강 적용 (검증/테스트 제외)
- 증강 후 데이터 분포 검증
- 도메인에 적합한 증강 기법 선택
4. 지속적 개선 단계
- 프로덕션 오류 케이스 수집
- 능동 학습으로 레이블링 효율화
- 정기적인 데이터 품질 감사
권장 도구:
- 레이블 품질: Cleanlab (https://github.com/cleanlab/cleanlab)
- 약한 지도 학습: Snorkel (https://snorkel.ai/)
- 레이블링 플랫폼: Label Studio (https://labelstud.io/)
- 이미지 증강: Albumentations (https://albumentations.ai/)
- 능동 학습: modAL (https://modal.readthedocs.io/)
- 데이터 버전 관리: DVC (https://dvc.org/)
데이터 중심 AI는 단순히 도구나 기법의 문제가 아닙니다. 이것은 "더 좋은 모델"이 아닌 "더 좋은 데이터"에 집중하는 사고방식의 전환입니다. 많은 실제 프로젝트에서 이 전환만으로도 극적인 성능 개선을 경험할 수 있습니다.