Skip to content

필사 모드: データ中心のAI完全ガイド:高品質データでAI性能を最大化する

日本語
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

データ中心のAI完全ガイド:高品質データでAI性能を最大化する

2021年、Andrew Ngは「モデルアーキテクチャの改善に注力する代わりに、データ品質の改善に注力したらどうだろうか?」という挑発的な問いをAIコミュニティに投げかけました。これが**データ中心のAI(Data-Centric AI)**運動の出発点となりました。

従来のモデル中心アプローチが「より良いアルゴリズム」を追求するのに対し、データ中心アプローチは「より良いデータ」を追求します。このガイドでは、理論から生産現場の実践まで、実践コードとともにData-Centric AIのすべての側面を網羅します。

1. データ中心のAI vs モデル中心のAI

1.1 パラダイムシフト

**モデル中心のAI**

- データは固定、コードを改善する

- より良いアーキテクチャを探索する

- ハイパーパラメータ調整に注力する

- 古典的ベンチマーク:データセットは固定、モデルだけが変化する

**データ中心のAI**

- モデルは固定、データを改善する

- ラベルエラーを修正する

- 一貫性のためにラベリングガイドラインを改善する

- データ拡張と合成データを追加する

1.2 Andrew Ngの核心的主張

Andrew Ngは次のように述べています:

> "AIシステム = コード(モデル/アルゴリズム) + データ"

多くの実際のAIプロジェクトでは、コードはすでに十分です。ボトルネックはデータ品質です。

**実験結果(Andrew Ng、DeepLearning.AI)**:

ラベルノイズのある製造検査データセットにて:

- ベースライン:76.2%

- より良いモデルのみ:+0.02%改善(76.22%)

- より良いデータのみ:+16.9%改善(93.1%)

この結果は、多くの実世界シナリオにおいて、データ品質の改善がモデルアーキテクチャの改善よりもはるかに効果的であることを示しています。

1.3 データ中心アプローチが最も効果的な場面

データ中心アプローチは特に以下の場合に効果的です:

1. **小規模データセット**:数千件未満のサンプルでは品質がより重要

2. **高いラベルノイズ**:ラベルエラー率が5%を超える場合

3. **ドメイン固有タスク**:強力な事前学習モデルがない専門ドメイン

4. **クラス不均衡**:希少クラスの品質が全体性能を決定する

5. **厳格な精度要件**:医療、金融、安全性が重要なアプリケーション

from sklearn.model_selection import train_test_split

from sklearn.metrics import accuracy_score

def compare_model_vs_data_centric(X, y, model_class, noise_level=0.1):

"""

モデル中心 vs データ中心アプローチの比較

Args:

X: 特徴行列

y: ラベル

model_class: ベースモデルクラス

noise_level: ノイジーラベルの割合

"""

ラベルノイズを追加

noisy_y = y.copy()

noise_idx = np.random.choice(len(y), int(len(y) * noise_level), replace=False)

n_classes = len(np.unique(y))

for idx in noise_idx:

wrong_labels = [l for l in range(n_classes) if l != y[idx]]

noisy_y[idx] = np.random.choice(wrong_labels)

X_train, X_test, y_train_clean, y_test = train_test_split(

X, y, test_size=0.2, random_state=42

)

_, _, y_train_noisy, _ = train_test_split(

X, noisy_y, test_size=0.2, random_state=42

)

--- モデル中心アプローチ ---

ノイジーデータ + ベースモデル

base_model = model_class()

base_model.fit(X_train, y_train_noisy)

base_acc = accuracy_score(y_test, base_model.predict(X_test))

ノイジーデータ + より複雑なモデル

from sklearn.ensemble import GradientBoostingClassifier

complex_model = GradientBoostingClassifier(n_estimators=200)

complex_model.fit(X_train, y_train_noisy)

complex_acc = accuracy_score(y_test, complex_model.predict(X_test))

--- データ中心アプローチ ---

クリーンデータ + ベースモデル

clean_model = model_class()

clean_model.fit(X_train, y_train_clean)

clean_acc = accuracy_score(y_test, clean_model.predict(X_test))

print("=" * 50)

print("モデル中心 vs データ中心の比較")

print("=" * 50)

print(f"ベースモデル + ノイジーデータ: {base_acc:.3f}")

print(f"複雑なモデル + ノイジーデータ: {complex_acc:.3f}")

print(f"ベースモデル + クリーンデータ: {clean_acc:.3f}")

print(f"\nモデル改善の効果: +{(complex_acc - base_acc):.3f}")

print(f"データ改善の効果: +{(clean_acc - base_acc):.3f}")

return {

'base_model_noisy_data': base_acc,

'complex_model_noisy_data': complex_acc,

'base_model_clean_data': clean_acc

}

2. データ品質の測定

2.1 Confident LearningとラベルエラーDetection

Northcutt et al.が提案したConfident Learningは、交差検証した予測確率を使用してラベルエラーを体系的に検出します。

核心的なアイデア:「モデルがクラスAを高い確信度で予測しているのに、ラベルがクラスBを示している場合、そのラベルは誤っている可能性が高い。」

from cleanlab.filter import find_label_issues

from cleanlab.classification import CleanLearning

from sklearn.linear_model import LogisticRegression

from sklearn.model_selection import cross_val_predict

def detect_label_errors_cleanlab(X, y, model=None):

"""

Cleanlabを使用したラベルエラーの検出

Args:

X: 特徴行列

y: ラベル配列

model: 分類器(デフォルト:LogisticRegression)

Returns:

label_issues: ラベル問題のインデックスと情報

"""

if model is None:

model = LogisticRegression(max_iter=1000)

交差検証でクラス確率を予測

pred_probs = cross_val_predict(

model, X, y,

cv=5,

method='predict_proba'

)

ラベル問題を検出

label_issues = find_label_issues(

labels=y,

pred_probs=pred_probs,

return_indices_ranked_by='self_confidence'

)

print(f"総サンプル数: {len(y)}")

print(f"ラベル問題発見: {len(label_issues)}")

print(f"エラー率: {len(label_issues)/len(y):.2%}")

return label_issues

def cleanlab_full_pipeline(X_train, y_train_noisy, X_test, y_test):

"""

完全なCleanlabパイプライン:

1. ラベルエラーを検出

2. エラーを除去または修正

3. クリーニングしたデータで再学習

"""

from cleanlab.classification import CleanLearning

base_model = LogisticRegression(max_iter=1000)

CleanLearningは学習中に自動的にラベルエラーを処理

cl = CleanLearning(base_model, seed=42)

cl.fit(X_train, y_train_noisy)

y_pred = cl.predict(X_test)

accuracy = accuracy_score(y_test, y_pred)

print(f"CleanLearning精度: {accuracy:.3f}")

label_issues_df = cl.get_label_issues()

print(f"\nラベル問題情報:")

print(label_issues_df.head(10))

return cl, label_issues_df

def confident_learning_manual(pred_probs, labels):

"""

Confident Learningの手動実装

- クラス毎の閾値を計算

- Confident Joint行列を構築

"""

n_classes = pred_probs.shape[1]

n_samples = len(labels)

クラス毎の閾値:そのクラスのサンプルの平均予測確率

thresholds = np.zeros(n_classes)

for c in range(n_classes):

class_mask = labels == c

if class_mask.sum() > 0:

thresholds[c] = pred_probs[class_mask, c].mean()

Confident Joint行列 C[s][y]

s: 推定された真のクラス, y: 与えられたラベル

C = np.zeros((n_classes, n_classes), dtype=int)

for i in range(n_samples):

y_given = labels[i]

over_threshold = pred_probs[i] >= thresholds

if over_threshold.sum() == 0:

y_hat = pred_probs[i].argmax()

else:

y_hat = (pred_probs[i] * over_threshold).argmax()

C[y_hat, y_given] += 1

off_diagonal = C.copy()

np.fill_diagonal(off_diagonal, 0)

print("Confident Joint行列(行:推定真クラス、列:与えられたラベル):")

print(C)

print(f"\n推定ミスラベルサンプル数: {off_diagonal.sum()}")

return C

2.2 データ外れ値の検出

from sklearn.ensemble import IsolationForest

from sklearn.neighbors import LocalOutlierFactor

class DataQualityChecker:

"""総合的なデータ品質検査ツールキット"""

def __init__(self):

self.quality_report = {}

def check_class_distribution(self, labels):

"""クラス不均衡の確認"""

from collections import Counter

counts = Counter(labels)

total = len(labels)

df = pd.DataFrame([

{'class': c, 'count': n, 'percentage': 100 * n / total}

for c, n in sorted(counts.items())

])

imbalance_ratio = max(counts.values()) / min(counts.values())

print("クラス分布:")

print(df.to_string(index=False))

print(f"\n不均衡比率: {imbalance_ratio:.2f}x")

if imbalance_ratio > 10:

print("警告: 深刻なクラス不均衡!")

elif imbalance_ratio > 3:

print("注意: クラス不均衡を検出")

self.quality_report['class_imbalance_ratio'] = imbalance_ratio

return df

def detect_outliers(self, X, method='isolation_forest', contamination=0.1):

"""

外れ値の検出

Args:

method: 'isolation_forest' または 'lof'

contamination: 外れ値の期待される割合

"""

if method == 'isolation_forest':

detector = IsolationForest(

contamination=contamination,

random_state=42

)

elif method == 'lof':

detector = LocalOutlierFactor(

contamination=contamination,

novelty=True

)

predictions = detector.fit_predict(X)

outlier_mask = predictions == -1

outlier_indices = np.where(outlier_mask)[0]

print(f"外れ値検出: {outlier_mask.sum()} / {len(X)} ({outlier_mask.mean():.2%})")

self.quality_report['n_outliers'] = outlier_mask.sum()

return outlier_indices, outlier_mask

def check_duplicates(self, X, y=None, threshold=0.99):

"""

重複サンプルの検出

Args:

threshold: 類似度閾値(1.0 = 完全一致)

"""

from sklearn.metrics.pairwise import cosine_similarity

if len(X) > 10000:

sample_idx = np.random.choice(len(X), 10000, replace=False)

X_sample = X[sample_idx]

else:

X_sample = X

sample_idx = np.arange(len(X))

sim_matrix = cosine_similarity(X_sample)

np.fill_diagonal(sim_matrix, 0)

duplicate_pairs = np.argwhere(sim_matrix >= threshold)

duplicate_pairs = duplicate_pairs[duplicate_pairs[:, 0] < duplicate_pairs[:, 1]]

print(f"重複ペア発見: {len(duplicate_pairs)}")

if y is not None and len(duplicate_pairs) > 0:

label_conflicts = 0

for i, j in duplicate_pairs:

if y[sample_idx[i]] != y[sample_idx[j]]:

label_conflicts += 1

print(f"ラベルが異なる重複ペア: {label_conflicts}")

return duplicate_pairs

def compute_data_quality_score(self, X, y):

"""総合的なデータ品質スコアを計算"""

scores = {}

if hasattr(X, 'isnull'):

missing_rate = X.isnull().mean().mean()

else:

missing_rate = np.isnan(X).mean()

scores['completeness'] = 1 - missing_rate

from collections import Counter

counts = Counter(y)

n_classes = len(counts)

ideal_count = len(y) / n_classes

balance_score = sum(

min(c, ideal_count) / ideal_count

for c in counts.values()

) / n_classes

scores['balance'] = balance_score

overall_score = np.mean(list(scores.values()))

scores['overall'] = overall_score

print("データ品質スコア:")

for metric, score in scores.items():

print(f" {metric}: {score:.3f}")

return scores

3. ラベリング戦略

3.1 高品質ラベリングガイドライン

def compute_inter_rater_agreement(annotations):

"""

評価者間一致度を計算

Args:

annotations: 形状 (n_samples, n_raters) の配列

Returns:

cohen_kappa: Cohen's Kappaスコア

majority_labels: 多数決ラベル

"""

from sklearn.metrics import cohen_kappa_score

n_samples, n_raters = annotations.shape

kappa_scores = []

for i in range(n_raters):

for j in range(i+1, n_raters):

kappa = cohen_kappa_score(

annotations[:, i],

annotations[:, j]

)

kappa_scores.append((i, j, kappa))

print(f"評価者 {i} vs 評価者 {j}: kappa = {kappa:.3f}")

mean_kappa = np.mean([k for _, _, k in kappa_scores])

print(f"\n平均Cohen's Kappa: {mean_kappa:.3f}")

if mean_kappa < 0.2:

interpretation = "わずかな一致"

elif mean_kappa < 0.4:

interpretation = "まずまずの一致"

elif mean_kappa < 0.6:

interpretation = "中程度の一致"

elif mean_kappa < 0.8:

interpretation = "実質的な一致"

else:

interpretation = "ほぼ完全な一致"

print(f"解釈: {interpretation}")

from scipy import stats

majority_labels = stats.mode(annotations, axis=1)[0].flatten()

print(f"\n多数決ラベルを生成しました")

return mean_kappa, majority_labels

3.2 Snorkelによる弱教師あり学習

def snorkel_programmatic_labeling_demo():

"""

Snorkelによるプログラマティックラベリングのデモ

pip install snorkel

"""

from snorkel.labeling import labeling_function, PandasLFApplier

from snorkel.labeling.model import LabelModel

POSITIVE = 1

NEGATIVE = 0

ABSTAIN = -1

@labeling_function()

def lf_positive_keywords(x):

"""ポジティブなキーワードに基づくラベル付け"""

positive_words = ['good', 'great', 'excellent', 'amazing', 'love', 'best']

if any(word in x.text.lower() for word in positive_words):

return POSITIVE

return ABSTAIN

@labeling_function()

def lf_negative_keywords(x):

"""ネガティブなキーワードに基づくラベル付け"""

negative_words = ['bad', 'terrible', 'awful', 'hate', 'worst', 'horrible']

if any(word in x.text.lower() for word in negative_words):

return NEGATIVE

return ABSTAIN

@labeling_function()

def lf_rating_high(x):

"""高い評価に基づくラベル付け"""

if hasattr(x, 'rating') and x.rating >= 4:

return POSITIVE

return ABSTAIN

@labeling_function()

def lf_rating_low(x):

"""低い評価に基づくラベル付け"""

if hasattr(x, 'rating') and x.rating <= 2:

return NEGATIVE

return ABSTAIN

@labeling_function()

def lf_negation_check(x):

"""否定の検出"""

text = x.text.lower()

if re.search(r"not (good|great|excellent)", text):

return NEGATIVE

if re.search(r"not (bad|terrible)", text):

return POSITIVE

return ABSTAIN

lfs = [

lf_positive_keywords,

lf_negative_keywords,

lf_rating_high,

lf_rating_low,

lf_negation_check,

]

print("Snorkelプログラマティックラベリングパイプライン:")

print("1. ドメイン専門家がラベリング関数(LF)を作成")

print("2. LFをラベルなしデータに適用")

print("3. Label Modelで複数のLFを統合(ノイズ考慮)")

print("4. ソフトラベルで下流モデルを学習")

print(f"\n定義されたラベリング関数: {len(lfs)}")

return lfs

4. 能動学習(Active Learning)

能動学習は、大規模なラベルなしプールから最も情報価値の高いサンプルを選択することでラベリングコストを最小化します。

class ActiveLearner:

"""

複数のサンプリング戦略を持つ能動学習

"""

def __init__(self, model, strategy='uncertainty', n_initial=100):

self.model = model

self.strategy = strategy

self.n_initial = n_initial

def uncertainty_sampling(self, X_unlabeled, n_samples):

"""

不確実性サンプリング:モデルが最も確信を持てないサンプルを選択

"""

probs = self._get_probs(X_unlabeled)

if self.strategy == 'least_confidence':

uncertainty = 1 - probs.max(axis=1)

elif self.strategy == 'margin':

sorted_probs = np.sort(probs, axis=1)[:, ::-1]

uncertainty = 1 - (sorted_probs[:, 0] - sorted_probs[:, 1])

elif self.strategy == 'entropy':

uncertainty = -np.sum(probs * np.log(probs + 1e-10), axis=1)

else:

uncertainty = 1 - probs.max(axis=1)

selected_indices = np.argsort(uncertainty)[-n_samples:]

return selected_indices, uncertainty

def diversity_sampling(self, X_unlabeled, X_labeled, n_samples):

"""

多様性ベースのサンプリング(CoreSet)

既にラベル付けされたデータと最も異なるサンプルを選択

"""

from sklearn.metrics.pairwise import euclidean_distances

selected = []

remaining = list(range(len(X_unlabeled)))

current_labeled = X_labeled.copy()

for _ in range(n_samples):

dists = euclidean_distances(

X_unlabeled[remaining],

current_labeled

).min(axis=1)

best_idx = remaining[np.argmax(dists)]

selected.append(best_idx)

remaining.remove(best_idx)

current_labeled = np.vstack([current_labeled, X_unlabeled[best_idx]])

return np.array(selected)

def batch_mode_active_learning(self, X_pool, y_oracle, X_test, y_test,

n_iterations=10, n_per_iter=50):

"""

バッチモード能動学習ループ

Args:

X_pool: ラベルなしデータプール

y_oracle: 真のラベル(オラクル)

n_per_iter: 反復毎にラベル付けするサンプル数

"""

initial_indices = np.random.choice(

len(X_pool), self.n_initial, replace=False

)

labeled_indices = list(initial_indices)

unlabeled_indices = [

i for i in range(len(X_pool)) if i not in labeled_indices

]

accuracies = []

n_labeled_list = []

for iteration in range(n_iterations):

X_labeled = X_pool[labeled_indices]

y_labeled = y_oracle[labeled_indices]

self.model.fit(X_labeled, y_labeled)

acc = accuracy_score(y_test, self.model.predict(X_test))

accuracies.append(acc)

n_labeled_list.append(len(labeled_indices))

print(f"反復 {iteration+1}: n_labeled={len(labeled_indices)}, accuracy={acc:.3f}")

if len(unlabeled_indices) == 0:

break

X_unlabeled = X_pool[unlabeled_indices]

selected, _ = self.uncertainty_sampling(X_unlabeled, n_per_iter)

actual_selected = [unlabeled_indices[i] for i in selected]

labeled_indices.extend(actual_selected)

unlabeled_indices = [

i for i in unlabeled_indices if i not in actual_selected

]

return accuracies, n_labeled_list

def _get_probs(self, X):

if hasattr(self.model, 'predict_proba'):

return self.model.predict_proba(X)

else:

logits = self.model.predict(X)

from scipy.special import softmax

return softmax(logits, axis=1)

5. データ拡張の深掘り

5.1 Albumentationsによる画像拡張

from albumentations.pytorch import ToTensorV2

def get_train_transforms(image_size=224):

"""

学習用の強力な拡張パイプライン(Albumentations)

"""

return A.Compose([

幾何学的変換

A.RandomResizedCrop(

height=image_size,

width=image_size,

scale=(0.7, 1.0),

ratio=(0.75, 1.33)

),

A.HorizontalFlip(p=0.5),

A.ShiftScaleRotate(

shift_limit=0.1,

scale_limit=0.2,

rotate_limit=30,

p=0.5

),

カラー変換

A.ColorJitter(

brightness=0.3,

contrast=0.3,

saturation=0.3,

hue=0.1,

p=0.8

),

A.ToGray(p=0.1),

A.RandomGamma(gamma_limit=(80, 120), p=0.3),

ノイズとぼかし

A.GaussNoise(var_limit=(10, 50), p=0.3),

A.OneOf([

A.MotionBlur(blur_limit=7),

A.GaussianBlur(blur_limit=7),

A.MedianBlur(blur_limit=7),

], p=0.3),

カットアウト / ランダム消去

A.CoarseDropout(

max_holes=8,

max_height=32,

max_width=32,

fill_value=0,

p=0.3

),

グリッド歪み

A.OneOf([

A.GridDistortion(p=1),

A.ElasticTransform(p=1),

A.OpticalDistortion(p=1),

], p=0.2),

正規化とテンソル変換

A.Normalize(

mean=[0.485, 0.456, 0.406],

std=[0.229, 0.224, 0.225]

),

ToTensorV2(),

])

def mixup_augmentation(images, labels, alpha=0.4):

"""

MixUp: 2つの画像とそのラベルを混合する

Zhang et al., "mixup: Beyond Empirical Risk Minimization" (2018)

"""

batch_size = images.shape[0]

lam = np.random.beta(alpha, alpha)

perm = torch.randperm(batch_size)

mixed_images = lam * images + (1 - lam) * images[perm]

labels_a = labels

labels_b = labels[perm]

return mixed_images, labels_a, labels_b, lam

def cutmix_augmentation(images, labels, alpha=1.0):

"""

CutMix: 一方の画像から他方へパッチを貼り付ける

Yun et al., "CutMix: Training Strategy that Makes Use of

Sample Mixing" (2019)

"""

batch_size, c, h, w = images.shape

lam = np.random.beta(alpha, alpha)

perm = torch.randperm(batch_size)

cut_ratio = np.sqrt(1 - lam)

cut_h = int(h * cut_ratio)

cut_w = int(w * cut_ratio)

cx = np.random.randint(w)

cy = np.random.randint(h)

bbx1 = np.clip(cx - cut_w // 2, 0, w)

bby1 = np.clip(cy - cut_h // 2, 0, h)

bbx2 = np.clip(cx + cut_w // 2, 0, w)

bby2 = np.clip(cy + cut_h // 2, 0, h)

mixed_images = images.clone()

mixed_images[:, :, bby1:bby2, bbx1:bbx2] = images[perm, :, bby1:bby2, bbx1:bbx2]

lam = 1 - ((bbx2 - bbx1) * (bby2 - bby1) / (w * h))

labels_a = labels

labels_b = labels[perm]

return mixed_images, labels_a, labels_b, lam

5.2 テキスト拡張

class TextAugmenter:

"""

テキストデータ拡張技術

"""

def __init__(self):

pass

def eda_synonym_replacement(self, text, n=1):

"""

EDA: 同義語置換

Wei and Zou, "EDA: Easy Data Augmentation Techniques

for Boosting Performance on Text Classification Tasks" (2019)

"""

from nltk.corpus import wordnet

words = text.split()

new_words = words.copy()

stop_words = set(['a', 'an', 'the', 'is', 'are', 'was', 'were',

'i', 'me', 'my', 'we', 'our', 'you', 'your'])

replaceable = [

(i, word) for i, word in enumerate(words)

if word.lower() not in stop_words

]

np.random.shuffle(replaceable)

replaced = 0

for idx, word in replaceable:

if replaced >= n:

break

synsets = wordnet.synsets(word)

if synsets:

synonyms = [

lemma.name() for synset in synsets

for lemma in synset.lemmas()

if lemma.name() != word

]

if synonyms:

new_words[idx] = np.random.choice(synonyms).replace('_', ' ')

replaced += 1

return ' '.join(new_words)

def eda_random_swap(self, text, n=1):

"""EDA: ランダムスワップ"""

words = text.split()

if len(words) < 2:

return text

new_words = words.copy()

for _ in range(n):

i, j = np.random.choice(len(new_words), 2, replace=False)

new_words[i], new_words[j] = new_words[j], new_words[i]

return ' '.join(new_words)

def eda_random_deletion(self, text, p=0.1):

"""EDA: ランダム削除"""

words = text.split()

if len(words) == 1:

return text

new_words = [word for word in words if np.random.random() > p]

return ' '.join(new_words) if new_words else np.random.choice(words)

def back_translation(self, text, src_lang='en', pivot_lang='fr'):

"""

逆翻訳: en -> fr -> en

意味を保ちながら表現を多様化する

"""

try:

from transformers import pipeline

translator_fwd = pipeline(

f"translation_{src_lang}_to_{pivot_lang}",

model=f"Helsinki-NLP/opus-mt-{src_lang}-{pivot_lang}"

)

translator_bwd = pipeline(

f"translation_{pivot_lang}_to_{src_lang}",

model=f"Helsinki-NLP/opus-mt-{pivot_lang}-{src_lang}"

)

pivot_text = translator_fwd(text)[0]['translation_text']

back_translated = translator_bwd(pivot_text)[0]['translation_text']

return back_translated

except Exception as e:

print(f"翻訳エラー: {e}")

return text

def augment_dataset(self, texts, labels, n_aug=4):

"""データセット全体を拡張"""

augmented_texts = []

augmented_labels = []

for text, label in zip(texts, labels):

augmented_texts.append(text)

augmented_labels.append(label)

for _ in range(n_aug):

aug_type = np.random.choice(

['synonym', 'swap', 'deletion']

)

if aug_type == 'synonym':

aug_text = self.eda_synonym_replacement(text)

elif aug_type == 'swap':

aug_text = self.eda_random_swap(text)

else:

aug_text = self.eda_random_deletion(text)

augmented_texts.append(aug_text)

augmented_labels.append(label)

print(f"元のサンプル数: {len(texts)}")

print(f"拡張後のサンプル数: {len(augmented_texts)}")

return augmented_texts, augmented_labels

5.3 自動拡張(RandAugment、SpecAugment)

def get_randaugment_transforms(n=2, m=9, image_size=224):

"""

RandAugment: ランダム化された拡張ポリシー

Cubuk et al., "RandAugment: Practical Automated Data Augmentation" (2019)

Args:

n: 適用する拡張操作の数

m: 拡張の強度(0-30)

"""

transform = transforms.Compose([

transforms.RandomResizedCrop(image_size),

transforms.RandAugment(num_ops=n, magnitude=m),

transforms.ToTensor(),

transforms.Normalize(

mean=[0.485, 0.456, 0.406],

std=[0.229, 0.224, 0.225]

)

])

return transform

def specaugment_for_audio(spectrogram, freq_mask_param=27, time_mask_param=70):

"""

SpecAugment: 音声スペクトログラム拡張

Park et al., "SpecAugment: A Simple Data Augmentation Method

for Automatic Speech Recognition" (2019)

Args:

spectrogram: 入力スペクトログラム(周波数, 時間)

freq_mask_param: 最大周波数マスクサイズ

time_mask_param: 最大時間マスクサイズ

"""

freq_mask = T.FrequencyMasking(freq_mask_param=freq_mask_param)

time_mask = T.TimeMasking(time_mask_param=time_mask_param)

augmented = freq_mask(spectrogram)

augmented = time_mask(augmented)

return augmented

6. 合成データ生成

6.1 LLMによる合成テキスト

class SyntheticTextGenerator:

"""LLMを使用して合成学習データを生成"""

def __init__(self, llm_client, model_name='gpt-4'):

self.llm = llm_client

self.model_name = model_name

def generate_classification_data(self, class_name, n_samples=100,

domain='general', style='diverse'):

"""

分類クラス用の合成データを生成

Args:

class_name: ターゲットクラス名

n_samples: 生成するサンプル数

domain: ドメインコンテキスト(医療、法律など)

style: 文章スタイル(フォーマル、カジュアル、多様)

"""

prompt = f"""Generate {n_samples} diverse text examples for the class '{class_name}'.

Domain: {domain}

Style: {style}

Requirements:

- Each example should be 1-3 sentences

- Vary the vocabulary, sentence structure, and perspective

- Include both simple and complex cases

- Format as a JSON list: ["example1", "example2", ...]

Generate realistic examples that would appear in real-world {domain} data."""

print(f"'{class_name}'の合成データ生成プロンプト:")

print(prompt[:300] + "...")

print(f"\n{n_samples}サンプルの生成を予定")

def generate_edge_cases(self, class_examples, n_edge_cases=20):

"""困難なエッジケースを生成"""

prompt = f"""Based on these training examples:

{chr(10).join(class_examples[:5])}

Generate {n_edge_cases} challenging edge cases that:

1. Are ambiguous between different categories

2. Contain misleading keywords

3. Have unusual sentence structures

4. Test the model's true understanding

Format as JSON list."""

print("エッジケース生成プロンプトを準備しました")

def augment_with_paraphrase(self, texts, n_paraphrases=3):

"""LLMを使用してパラフレーズを生成"""

augmented = []

for text in texts:

prompt = f"""Paraphrase the following text {n_paraphrases} times.

Keep the same meaning but use different words and sentence structures.

Original: "{text}"

Format as JSON list of {n_paraphrases} paraphrases."""

augmented.append({

'original': text,

'paraphrases': []

})

return augmented

class SyntheticImageGenerator:

"""拡散モデルで合成画像を生成"""

def __init__(self, model_name='stabilityai/stable-diffusion-2-1'):

self.model_name = model_name

def setup_pipeline(self):

"""

Stable Diffusionパイプラインを初期化

pip install diffusers accelerate

"""

try:

from diffusers import StableDiffusionPipeline

self.pipe = StableDiffusionPipeline.from_pretrained(

self.model_name,

torch_dtype=torch.float16

)

if torch.cuda.is_available():

self.pipe = self.pipe.to('cuda')

print(f"パイプライン初期化: {self.model_name}")

except Exception as e:

print(f"パイプライン初期化エラー: {e}")

def generate_class_images(self, class_name, n_images=50,

style_prompt="high quality, photorealistic"):

"""

クラス用の合成画像を生成

Args:

class_name: ターゲットクラス名

n_images: 生成する画像数

style_prompt: スタイルガイダンス

"""

prompts = [

f"A photo of {class_name}, {style_prompt}",

f"{class_name} in natural environment, {style_prompt}",

f"Close-up of {class_name}, detailed, {style_prompt}",

f"{class_name} from different angle, {style_prompt}",

]

print(f"'{class_name}'の合成画像生成計画:")

print(f"生成する画像数: {n_images}")

print("サンプルプロンプト:")

for p in prompts[:2]:

print(f" - {p}")

def evaluate_synthetic_quality(self, real_images, synthetic_images):

"""FIDスコアを使用して合成画像の品質を評価"""

try:

from torchmetrics.image.fid import FrechetInceptionDistance

fid = FrechetInceptionDistance(feature=64)

fid.update(real_images, real=True)

fid.update(synthetic_images, real=False)

fid_score = fid.compute()

print(f"FIDスコア: {fid_score:.2f}")

print("(低いほど良い;0が完璧)")

return fid_score

except Exception as e:

print(f"FID計算エラー: {e}")

7. データフライホイール

7.1 データフライホイールの概念

データフライホイールとは、製品、データ、モデルの好循環です:

1. **より良いモデル** → より良い製品

2. **より良い製品** → より多くのユーザー

3. **より多くのユーザー** → より多くのデータ

4. **より多くのデータ** → より良いモデル

class DataFlywheelPipeline:

"""データフライホイールの実装パイプライン"""

def __init__(self, model, feedback_store):

self.model = model

self.feedback_store = feedback_store

self.version = 0

def collect_production_feedback(self, predictions, user_feedback):

"""

本番環境からユーザーフィードバックを収集

Args:

predictions: モデルの予測

user_feedback: ユーザーの訂正/確認

"""

valuable_samples = []

for pred, feedback in zip(predictions, user_feedback):

if feedback['corrected']:

sample = {

'input': feedback['input'],

'model_prediction': pred,

'true_label': feedback['correction'],

'confidence': pred['confidence'],

'timestamp': feedback['timestamp'],

'value': 'high'

}

valuable_samples.append(sample)

elif feedback['confirmed'] and pred['confidence'] < 0.7:

sample = {

'input': feedback['input'],

'true_label': pred['label'],

'confidence': pred['confidence'],

'value': 'medium'

}

valuable_samples.append(sample)

print(f"価値あるサンプルを収集: {len(valuable_samples)}")

return valuable_samples

def prioritize_labeling_queue(self, unlabeled_pool, budget):

"""

ラベリングキューの優先順位付け

優先度基準:

1. モデルの不確実性(高いほど優先度高)

2. クラスの希少性(希少なクラスほど優先度高)

3. データの多様性(既存データと異なるほど優先度高)

"""

priorities = []

for sample in unlabeled_pool:

score = 0

uncertainty = 1 - max(sample['predicted_probs'])

score += uncertainty * 0.5

predicted_class = max(sample['predicted_probs'],

key=sample['predicted_probs'].get)

rarity = 1 / (sample['class_counts'].get(predicted_class, 1) + 1)

score += rarity * 0.3

diversity = np.std(list(sample['predicted_probs'].values()))

score += diversity * 0.2

priorities.append((sample, score))

priorities.sort(key=lambda x: x[1], reverse=True)

selected = [s for s, _ in priorities[:budget]]

return selected

8. データパイプラインのベストプラクティス

8.1 再現可能なデータ処理

from pathlib import Path

from datetime import datetime

class ReproducibleDataPipeline:

"""

再現可能なデータパイプライン

- すべての処理ステップを追跡

- データハッシュで整合性を検証

- バージョン管理をサポート

"""

def __init__(self, pipeline_name, base_dir='data/processed'):

self.pipeline_name = pipeline_name

self.base_dir = Path(base_dir)

self.steps = []

self.metadata = {

'pipeline': pipeline_name,

'created_at': datetime.now().isoformat(),

'steps': []

}

def add_step(self, step_name, func, *args, **kwargs):

"""処理ステップを追加"""

self.steps.append({

'name': step_name,

'func': func,

'args': args,

'kwargs': kwargs

})

def compute_hash(self, data):

"""データハッシュを計算"""

if isinstance(data, np.ndarray):

return hashlib.md5(data.tobytes()).hexdigest()

elif isinstance(data, (list, dict)):

return hashlib.md5(

json.dumps(data, sort_keys=True, default=str).encode()

).hexdigest()

else:

return hashlib.md5(str(data).encode()).hexdigest()

def run(self, input_data):

"""パイプラインを実行"""

data = input_data

for step in self.steps:

print(f"実行中: {step['name']}")

hash_before = self.compute_hash(data)

data = step['func'](data, *step['args'], **step['kwargs'])

hash_after = self.compute_hash(data)

self.metadata['steps'].append({

'name': step['name'],

'hash_before': hash_before,

'hash_after': hash_after,

'timestamp': datetime.now().isoformat()

})

print(f" 完了: {hash_before[:8]} -> {hash_after[:8]}")

metadata_path = self.base_dir / f"{self.pipeline_name}_metadata.json"

metadata_path.parent.mkdir(parents=True, exist_ok=True)

with open(metadata_path, 'w') as f:

json.dump(self.metadata, f, indent=2)

print(f"\nパイプライン完了。メタデータ: {metadata_path}")

return data

9. まとめと実践ガイド

データ中心のAIチェックリスト

**1. データ収集**

- ドメイン専門家とラベリングガイドラインを作成する

- 評価者間一致度を測定する(目標:Cohen's Kappa > 0.8)

- 収集中にクラス分布を監視する

**2. データクリーニング**

- Cleanlabでラベルエラーを検出・修正する

- 重複サンプルを除去する

- 外れ値をレビューする(除去または修正)

**3. データ拡張**

- 拡張は学習データのみに適用する(検証/テストデータには不適用)

- 拡張後のデータ分布を検証する

- ドメインに適した拡張技術を選択する

**4. 継続的改善**

- 本番環境のエラーケースを収集する

- 能動学習を使用してラベリングを効率化する

- 定期的なデータ品質監査を実施する

**推奨ツール:**

- ラベル品質:Cleanlab (https://github.com/cleanlab/cleanlab)

- 弱教師あり学習:Snorkel (https://snorkel.ai/)

- ラベリングプラットフォーム:Label Studio (https://labelstud.io/)

- 画像拡張:Albumentations (https://albumentations.ai/)

- 能動学習:modAL (https://modal.readthedocs.io/)

- データバージョン管理:DVC (https://dvc.org/)

データ中心のAIは単なるツールや技術の問題ではありません。それは「より良いモデル」を追求することから「より良いデータ」を追求することへの**マインドセットの転換**です。多くの実世界プロジェクトでは、この転換だけで劇的な性能改善が達成できます。

Andrew NgのData-Centric AI運動から得られる最も重要な洞察:あなたのモデルの品質は、そのモデルが学習するデータの品質を超えることはできません。データ品質、ラベリングの一貫性、体系的なデータ改善への投資は、AIプロジェクトにおいて最も高いリターンをもたらす活動であることが多いのです。

현재 단락 (1/896)

2021年、Andrew Ngは「モデルアーキテクチャの改善に注力する代わりに、データ品質の改善に注力したらどうだろうか?」という挑発的な問いをAIコミュニティに投げかけました。これが**データ中心の...

작성 글자: 0원문 글자: 24,575작성 단락: 0/896