Skip to content
Published on

データ中心のAI完全ガイド:高品質データでAI性能を最大化する

Authors

データ中心のAI完全ガイド:高品質データでAI性能を最大化する

2021年、Andrew Ngは「モデルアーキテクチャの改善に注力する代わりに、データ品質の改善に注力したらどうだろうか?」という挑発的な問いをAIコミュニティに投げかけました。これが**データ中心のAI(Data-Centric AI)**運動の出発点となりました。

従来のモデル中心アプローチが「より良いアルゴリズム」を追求するのに対し、データ中心アプローチは「より良いデータ」を追求します。このガイドでは、理論から生産現場の実践まで、実践コードとともにData-Centric AIのすべての側面を網羅します。

1. データ中心のAI vs モデル中心のAI

1.1 パラダイムシフト

モデル中心のAI

  • データは固定、コードを改善する
  • より良いアーキテクチャを探索する
  • ハイパーパラメータ調整に注力する
  • 古典的ベンチマーク:データセットは固定、モデルだけが変化する

データ中心のAI

  • モデルは固定、データを改善する
  • ラベルエラーを修正する
  • 一貫性のためにラベリングガイドラインを改善する
  • データ拡張と合成データを追加する

1.2 Andrew Ngの核心的主張

Andrew Ngは次のように述べています:

"AIシステム = コード(モデル/アルゴリズム) + データ"

多くの実際のAIプロジェクトでは、コードはすでに十分です。ボトルネックはデータ品質です。

実験結果(Andrew Ng、DeepLearning.AI)

ラベルノイズのある製造検査データセットにて:

  • ベースライン:76.2%
  • より良いモデルのみ:+0.02%改善(76.22%)
  • より良いデータのみ:+16.9%改善(93.1%)

この結果は、多くの実世界シナリオにおいて、データ品質の改善がモデルアーキテクチャの改善よりもはるかに効果的であることを示しています。

1.3 データ中心アプローチが最も効果的な場面

データ中心アプローチは特に以下の場合に効果的です:

  1. 小規模データセット:数千件未満のサンプルでは品質がより重要
  2. 高いラベルノイズ:ラベルエラー率が5%を超える場合
  3. ドメイン固有タスク:強力な事前学習モデルがない専門ドメイン
  4. クラス不均衡:希少クラスの品質が全体性能を決定する
  5. 厳格な精度要件:医療、金融、安全性が重要なアプリケーション
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

def compare_model_vs_data_centric(X, y, model_class, noise_level=0.1):
    """
    モデル中心 vs データ中心アプローチの比較

    Args:
        X: 特徴行列
        y: ラベル
        model_class: ベースモデルクラス
        noise_level: ノイジーラベルの割合
    """
    # ラベルノイズを追加
    noisy_y = y.copy()
    noise_idx = np.random.choice(len(y), int(len(y) * noise_level), replace=False)
    n_classes = len(np.unique(y))
    for idx in noise_idx:
        wrong_labels = [l for l in range(n_classes) if l != y[idx]]
        noisy_y[idx] = np.random.choice(wrong_labels)

    X_train, X_test, y_train_clean, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    _, _, y_train_noisy, _ = train_test_split(
        X, noisy_y, test_size=0.2, random_state=42
    )

    # --- モデル中心アプローチ ---
    # ノイジーデータ + ベースモデル
    base_model = model_class()
    base_model.fit(X_train, y_train_noisy)
    base_acc = accuracy_score(y_test, base_model.predict(X_test))

    # ノイジーデータ + より複雑なモデル
    from sklearn.ensemble import GradientBoostingClassifier
    complex_model = GradientBoostingClassifier(n_estimators=200)
    complex_model.fit(X_train, y_train_noisy)
    complex_acc = accuracy_score(y_test, complex_model.predict(X_test))

    # --- データ中心アプローチ ---
    # クリーンデータ + ベースモデル
    clean_model = model_class()
    clean_model.fit(X_train, y_train_clean)
    clean_acc = accuracy_score(y_test, clean_model.predict(X_test))

    print("=" * 50)
    print("モデル中心 vs データ中心の比較")
    print("=" * 50)
    print(f"ベースモデル + ノイジーデータ: {base_acc:.3f}")
    print(f"複雑なモデル + ノイジーデータ: {complex_acc:.3f}")
    print(f"ベースモデル + クリーンデータ: {clean_acc:.3f}")
    print(f"\nモデル改善の効果: +{(complex_acc - base_acc):.3f}")
    print(f"データ改善の効果: +{(clean_acc - base_acc):.3f}")

    return {
        'base_model_noisy_data': base_acc,
        'complex_model_noisy_data': complex_acc,
        'base_model_clean_data': clean_acc
    }

2. データ品質の測定

2.1 Confident LearningとラベルエラーDetection

Northcutt et al.が提案したConfident Learningは、交差検証した予測確率を使用してラベルエラーを体系的に検出します。

核心的なアイデア:「モデルがクラスAを高い確信度で予測しているのに、ラベルがクラスBを示している場合、そのラベルは誤っている可能性が高い。」

import cleanlab
from cleanlab.filter import find_label_issues
from cleanlab.classification import CleanLearning
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_predict

def detect_label_errors_cleanlab(X, y, model=None):
    """
    Cleanlabを使用したラベルエラーの検出

    Args:
        X: 特徴行列
        y: ラベル配列
        model: 分類器(デフォルト:LogisticRegression)

    Returns:
        label_issues: ラベル問題のインデックスと情報
    """
    if model is None:
        model = LogisticRegression(max_iter=1000)

    # 交差検証でクラス確率を予測
    pred_probs = cross_val_predict(
        model, X, y,
        cv=5,
        method='predict_proba'
    )

    # ラベル問題を検出
    label_issues = find_label_issues(
        labels=y,
        pred_probs=pred_probs,
        return_indices_ranked_by='self_confidence'
    )

    print(f"総サンプル数: {len(y)}")
    print(f"ラベル問題発見: {len(label_issues)}")
    print(f"エラー率: {len(label_issues)/len(y):.2%}")

    return label_issues


def cleanlab_full_pipeline(X_train, y_train_noisy, X_test, y_test):
    """
    完全なCleanlabパイプライン:
    1. ラベルエラーを検出
    2. エラーを除去または修正
    3. クリーニングしたデータで再学習
    """
    from cleanlab.classification import CleanLearning

    base_model = LogisticRegression(max_iter=1000)

    # CleanLearningは学習中に自動的にラベルエラーを処理
    cl = CleanLearning(base_model, seed=42)
    cl.fit(X_train, y_train_noisy)

    y_pred = cl.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    print(f"CleanLearning精度: {accuracy:.3f}")

    label_issues_df = cl.get_label_issues()
    print(f"\nラベル問題情報:")
    print(label_issues_df.head(10))

    return cl, label_issues_df


def confident_learning_manual(pred_probs, labels):
    """
    Confident Learningの手動実装
    - クラス毎の閾値を計算
    - Confident Joint行列を構築
    """
    n_classes = pred_probs.shape[1]
    n_samples = len(labels)

    # クラス毎の閾値:そのクラスのサンプルの平均予測確率
    thresholds = np.zeros(n_classes)
    for c in range(n_classes):
        class_mask = labels == c
        if class_mask.sum() > 0:
            thresholds[c] = pred_probs[class_mask, c].mean()

    # Confident Joint行列 C[s][y]
    # s: 推定された真のクラス, y: 与えられたラベル
    C = np.zeros((n_classes, n_classes), dtype=int)

    for i in range(n_samples):
        y_given = labels[i]
        over_threshold = pred_probs[i] >= thresholds

        if over_threshold.sum() == 0:
            y_hat = pred_probs[i].argmax()
        else:
            y_hat = (pred_probs[i] * over_threshold).argmax()

        C[y_hat, y_given] += 1

    off_diagonal = C.copy()
    np.fill_diagonal(off_diagonal, 0)

    print("Confident Joint行列(行:推定真クラス、列:与えられたラベル):")
    print(C)
    print(f"\n推定ミスラベルサンプル数: {off_diagonal.sum()}")

    return C

2.2 データ外れ値の検出

from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
import numpy as np

class DataQualityChecker:
    """総合的なデータ品質検査ツールキット"""

    def __init__(self):
        self.quality_report = {}

    def check_class_distribution(self, labels):
        """クラス不均衡の確認"""
        from collections import Counter
        import pandas as pd

        counts = Counter(labels)
        total = len(labels)

        df = pd.DataFrame([
            {'class': c, 'count': n, 'percentage': 100 * n / total}
            for c, n in sorted(counts.items())
        ])

        imbalance_ratio = max(counts.values()) / min(counts.values())

        print("クラス分布:")
        print(df.to_string(index=False))
        print(f"\n不均衡比率: {imbalance_ratio:.2f}x")

        if imbalance_ratio > 10:
            print("警告: 深刻なクラス不均衡!")
        elif imbalance_ratio > 3:
            print("注意: クラス不均衡を検出")

        self.quality_report['class_imbalance_ratio'] = imbalance_ratio
        return df

    def detect_outliers(self, X, method='isolation_forest', contamination=0.1):
        """
        外れ値の検出

        Args:
            method: 'isolation_forest' または 'lof'
            contamination: 外れ値の期待される割合
        """
        if method == 'isolation_forest':
            detector = IsolationForest(
                contamination=contamination,
                random_state=42
            )
        elif method == 'lof':
            detector = LocalOutlierFactor(
                contamination=contamination,
                novelty=True
            )

        predictions = detector.fit_predict(X)
        outlier_mask = predictions == -1
        outlier_indices = np.where(outlier_mask)[0]

        print(f"外れ値検出: {outlier_mask.sum()} / {len(X)} ({outlier_mask.mean():.2%})")

        self.quality_report['n_outliers'] = outlier_mask.sum()
        return outlier_indices, outlier_mask

    def check_duplicates(self, X, y=None, threshold=0.99):
        """
        重複サンプルの検出

        Args:
            threshold: 類似度閾値(1.0 = 完全一致)
        """
        from sklearn.metrics.pairwise import cosine_similarity

        if len(X) > 10000:
            sample_idx = np.random.choice(len(X), 10000, replace=False)
            X_sample = X[sample_idx]
        else:
            X_sample = X
            sample_idx = np.arange(len(X))

        sim_matrix = cosine_similarity(X_sample)
        np.fill_diagonal(sim_matrix, 0)

        duplicate_pairs = np.argwhere(sim_matrix >= threshold)
        duplicate_pairs = duplicate_pairs[duplicate_pairs[:, 0] < duplicate_pairs[:, 1]]

        print(f"重複ペア発見: {len(duplicate_pairs)}")

        if y is not None and len(duplicate_pairs) > 0:
            label_conflicts = 0
            for i, j in duplicate_pairs:
                if y[sample_idx[i]] != y[sample_idx[j]]:
                    label_conflicts += 1
            print(f"ラベルが異なる重複ペア: {label_conflicts}")

        return duplicate_pairs

    def compute_data_quality_score(self, X, y):
        """総合的なデータ品質スコアを計算"""
        scores = {}

        if hasattr(X, 'isnull'):
            missing_rate = X.isnull().mean().mean()
        else:
            missing_rate = np.isnan(X).mean()
        scores['completeness'] = 1 - missing_rate

        from collections import Counter
        counts = Counter(y)
        n_classes = len(counts)
        ideal_count = len(y) / n_classes
        balance_score = sum(
            min(c, ideal_count) / ideal_count
            for c in counts.values()
        ) / n_classes
        scores['balance'] = balance_score

        overall_score = np.mean(list(scores.values()))
        scores['overall'] = overall_score

        print("データ品質スコア:")
        for metric, score in scores.items():
            print(f"  {metric}: {score:.3f}")

        return scores

3. ラベリング戦略

3.1 高品質ラベリングガイドライン

def compute_inter_rater_agreement(annotations):
    """
    評価者間一致度を計算

    Args:
        annotations: 形状 (n_samples, n_raters) の配列

    Returns:
        cohen_kappa: Cohen's Kappaスコア
        majority_labels: 多数決ラベル
    """
    from sklearn.metrics import cohen_kappa_score
    import numpy as np

    n_samples, n_raters = annotations.shape

    kappa_scores = []
    for i in range(n_raters):
        for j in range(i+1, n_raters):
            kappa = cohen_kappa_score(
                annotations[:, i],
                annotations[:, j]
            )
            kappa_scores.append((i, j, kappa))
            print(f"評価者 {i} vs 評価者 {j}: kappa = {kappa:.3f}")

    mean_kappa = np.mean([k for _, _, k in kappa_scores])
    print(f"\n平均Cohen's Kappa: {mean_kappa:.3f}")

    if mean_kappa < 0.2:
        interpretation = "わずかな一致"
    elif mean_kappa < 0.4:
        interpretation = "まずまずの一致"
    elif mean_kappa < 0.6:
        interpretation = "中程度の一致"
    elif mean_kappa < 0.8:
        interpretation = "実質的な一致"
    else:
        interpretation = "ほぼ完全な一致"

    print(f"解釈: {interpretation}")

    from scipy import stats
    majority_labels = stats.mode(annotations, axis=1)[0].flatten()
    print(f"\n多数決ラベルを生成しました")

    return mean_kappa, majority_labels

3.2 Snorkelによる弱教師あり学習

def snorkel_programmatic_labeling_demo():
    """
    Snorkelによるプログラマティックラベリングのデモ
    pip install snorkel
    """
    from snorkel.labeling import labeling_function, PandasLFApplier
    from snorkel.labeling.model import LabelModel
    import re

    POSITIVE = 1
    NEGATIVE = 0
    ABSTAIN = -1

    @labeling_function()
    def lf_positive_keywords(x):
        """ポジティブなキーワードに基づくラベル付け"""
        positive_words = ['good', 'great', 'excellent', 'amazing', 'love', 'best']
        if any(word in x.text.lower() for word in positive_words):
            return POSITIVE
        return ABSTAIN

    @labeling_function()
    def lf_negative_keywords(x):
        """ネガティブなキーワードに基づくラベル付け"""
        negative_words = ['bad', 'terrible', 'awful', 'hate', 'worst', 'horrible']
        if any(word in x.text.lower() for word in negative_words):
            return NEGATIVE
        return ABSTAIN

    @labeling_function()
    def lf_rating_high(x):
        """高い評価に基づくラベル付け"""
        if hasattr(x, 'rating') and x.rating >= 4:
            return POSITIVE
        return ABSTAIN

    @labeling_function()
    def lf_rating_low(x):
        """低い評価に基づくラベル付け"""
        if hasattr(x, 'rating') and x.rating <= 2:
            return NEGATIVE
        return ABSTAIN

    @labeling_function()
    def lf_negation_check(x):
        """否定の検出"""
        text = x.text.lower()
        if re.search(r"not (good|great|excellent)", text):
            return NEGATIVE
        if re.search(r"not (bad|terrible)", text):
            return POSITIVE
        return ABSTAIN

    lfs = [
        lf_positive_keywords,
        lf_negative_keywords,
        lf_rating_high,
        lf_rating_low,
        lf_negation_check,
    ]

    print("Snorkelプログラマティックラベリングパイプライン:")
    print("1. ドメイン専門家がラベリング関数(LF)を作成")
    print("2. LFをラベルなしデータに適用")
    print("3. Label Modelで複数のLFを統合(ノイズ考慮)")
    print("4. ソフトラベルで下流モデルを学習")
    print(f"\n定義されたラベリング関数: {len(lfs)}")

    return lfs

4. 能動学習(Active Learning)

能動学習は、大規模なラベルなしプールから最も情報価値の高いサンプルを選択することでラベリングコストを最小化します。

import numpy as np
import torch
import torch.nn as nn

class ActiveLearner:
    """
    複数のサンプリング戦略を持つ能動学習
    """

    def __init__(self, model, strategy='uncertainty', n_initial=100):
        self.model = model
        self.strategy = strategy
        self.n_initial = n_initial

    def uncertainty_sampling(self, X_unlabeled, n_samples):
        """
        不確実性サンプリング:モデルが最も確信を持てないサンプルを選択
        """
        probs = self._get_probs(X_unlabeled)

        if self.strategy == 'least_confidence':
            uncertainty = 1 - probs.max(axis=1)

        elif self.strategy == 'margin':
            sorted_probs = np.sort(probs, axis=1)[:, ::-1]
            uncertainty = 1 - (sorted_probs[:, 0] - sorted_probs[:, 1])

        elif self.strategy == 'entropy':
            uncertainty = -np.sum(probs * np.log(probs + 1e-10), axis=1)

        else:
            uncertainty = 1 - probs.max(axis=1)

        selected_indices = np.argsort(uncertainty)[-n_samples:]
        return selected_indices, uncertainty

    def diversity_sampling(self, X_unlabeled, X_labeled, n_samples):
        """
        多様性ベースのサンプリング(CoreSet)
        既にラベル付けされたデータと最も異なるサンプルを選択
        """
        from sklearn.metrics.pairwise import euclidean_distances

        selected = []
        remaining = list(range(len(X_unlabeled)))
        current_labeled = X_labeled.copy()

        for _ in range(n_samples):
            dists = euclidean_distances(
                X_unlabeled[remaining],
                current_labeled
            ).min(axis=1)

            best_idx = remaining[np.argmax(dists)]
            selected.append(best_idx)
            remaining.remove(best_idx)
            current_labeled = np.vstack([current_labeled, X_unlabeled[best_idx]])

        return np.array(selected)

    def batch_mode_active_learning(self, X_pool, y_oracle, X_test, y_test,
                                    n_iterations=10, n_per_iter=50):
        """
        バッチモード能動学習ループ

        Args:
            X_pool: ラベルなしデータプール
            y_oracle: 真のラベル(オラクル)
            n_per_iter: 反復毎にラベル付けするサンプル数
        """
        initial_indices = np.random.choice(
            len(X_pool), self.n_initial, replace=False
        )
        labeled_indices = list(initial_indices)
        unlabeled_indices = [
            i for i in range(len(X_pool)) if i not in labeled_indices
        ]

        accuracies = []
        n_labeled_list = []

        for iteration in range(n_iterations):
            X_labeled = X_pool[labeled_indices]
            y_labeled = y_oracle[labeled_indices]

            self.model.fit(X_labeled, y_labeled)

            acc = accuracy_score(y_test, self.model.predict(X_test))
            accuracies.append(acc)
            n_labeled_list.append(len(labeled_indices))

            print(f"反復 {iteration+1}: n_labeled={len(labeled_indices)}, accuracy={acc:.3f}")

            if len(unlabeled_indices) == 0:
                break

            X_unlabeled = X_pool[unlabeled_indices]
            selected, _ = self.uncertainty_sampling(X_unlabeled, n_per_iter)

            actual_selected = [unlabeled_indices[i] for i in selected]
            labeled_indices.extend(actual_selected)
            unlabeled_indices = [
                i for i in unlabeled_indices if i not in actual_selected
            ]

        return accuracies, n_labeled_list

    def _get_probs(self, X):
        if hasattr(self.model, 'predict_proba'):
            return self.model.predict_proba(X)
        else:
            logits = self.model.predict(X)
            from scipy.special import softmax
            return softmax(logits, axis=1)

5. データ拡張の深掘り

5.1 Albumentationsによる画像拡張

import albumentations as A
from albumentations.pytorch import ToTensorV2
import numpy as np

def get_train_transforms(image_size=224):
    """
    学習用の強力な拡張パイプライン(Albumentations)
    """
    return A.Compose([
        # 幾何学的変換
        A.RandomResizedCrop(
            height=image_size,
            width=image_size,
            scale=(0.7, 1.0),
            ratio=(0.75, 1.33)
        ),
        A.HorizontalFlip(p=0.5),
        A.ShiftScaleRotate(
            shift_limit=0.1,
            scale_limit=0.2,
            rotate_limit=30,
            p=0.5
        ),

        # カラー変換
        A.ColorJitter(
            brightness=0.3,
            contrast=0.3,
            saturation=0.3,
            hue=0.1,
            p=0.8
        ),
        A.ToGray(p=0.1),
        A.RandomGamma(gamma_limit=(80, 120), p=0.3),

        # ノイズとぼかし
        A.GaussNoise(var_limit=(10, 50), p=0.3),
        A.OneOf([
            A.MotionBlur(blur_limit=7),
            A.GaussianBlur(blur_limit=7),
            A.MedianBlur(blur_limit=7),
        ], p=0.3),

        # カットアウト / ランダム消去
        A.CoarseDropout(
            max_holes=8,
            max_height=32,
            max_width=32,
            fill_value=0,
            p=0.3
        ),

        # グリッド歪み
        A.OneOf([
            A.GridDistortion(p=1),
            A.ElasticTransform(p=1),
            A.OpticalDistortion(p=1),
        ], p=0.2),

        # 正規化とテンソル変換
        A.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        ),
        ToTensorV2(),
    ])


def mixup_augmentation(images, labels, alpha=0.4):
    """
    MixUp: 2つの画像とそのラベルを混合する
    Zhang et al., "mixup: Beyond Empirical Risk Minimization" (2018)
    """
    import torch

    batch_size = images.shape[0]
    lam = np.random.beta(alpha, alpha)
    perm = torch.randperm(batch_size)

    mixed_images = lam * images + (1 - lam) * images[perm]
    labels_a = labels
    labels_b = labels[perm]

    return mixed_images, labels_a, labels_b, lam


def cutmix_augmentation(images, labels, alpha=1.0):
    """
    CutMix: 一方の画像から他方へパッチを貼り付ける
    Yun et al., "CutMix: Training Strategy that Makes Use of
    Sample Mixing" (2019)
    """
    import torch

    batch_size, c, h, w = images.shape
    lam = np.random.beta(alpha, alpha)
    perm = torch.randperm(batch_size)

    cut_ratio = np.sqrt(1 - lam)
    cut_h = int(h * cut_ratio)
    cut_w = int(w * cut_ratio)

    cx = np.random.randint(w)
    cy = np.random.randint(h)

    bbx1 = np.clip(cx - cut_w // 2, 0, w)
    bby1 = np.clip(cy - cut_h // 2, 0, h)
    bbx2 = np.clip(cx + cut_w // 2, 0, w)
    bby2 = np.clip(cy + cut_h // 2, 0, h)

    mixed_images = images.clone()
    mixed_images[:, :, bby1:bby2, bbx1:bbx2] = images[perm, :, bby1:bby2, bbx1:bbx2]

    lam = 1 - ((bbx2 - bbx1) * (bby2 - bby1) / (w * h))
    labels_a = labels
    labels_b = labels[perm]

    return mixed_images, labels_a, labels_b, lam

5.2 テキスト拡張

class TextAugmenter:
    """
    テキストデータ拡張技術
    """

    def __init__(self):
        pass

    def eda_synonym_replacement(self, text, n=1):
        """
        EDA: 同義語置換
        Wei and Zou, "EDA: Easy Data Augmentation Techniques
        for Boosting Performance on Text Classification Tasks" (2019)
        """
        import nltk
        from nltk.corpus import wordnet

        words = text.split()
        new_words = words.copy()

        stop_words = set(['a', 'an', 'the', 'is', 'are', 'was', 'were',
                          'i', 'me', 'my', 'we', 'our', 'you', 'your'])

        replaceable = [
            (i, word) for i, word in enumerate(words)
            if word.lower() not in stop_words
        ]

        np.random.shuffle(replaceable)
        replaced = 0

        for idx, word in replaceable:
            if replaced >= n:
                break

            synsets = wordnet.synsets(word)
            if synsets:
                synonyms = [
                    lemma.name() for synset in synsets
                    for lemma in synset.lemmas()
                    if lemma.name() != word
                ]
                if synonyms:
                    new_words[idx] = np.random.choice(synonyms).replace('_', ' ')
                    replaced += 1

        return ' '.join(new_words)

    def eda_random_swap(self, text, n=1):
        """EDA: ランダムスワップ"""
        words = text.split()
        if len(words) < 2:
            return text

        new_words = words.copy()
        for _ in range(n):
            i, j = np.random.choice(len(new_words), 2, replace=False)
            new_words[i], new_words[j] = new_words[j], new_words[i]

        return ' '.join(new_words)

    def eda_random_deletion(self, text, p=0.1):
        """EDA: ランダム削除"""
        words = text.split()
        if len(words) == 1:
            return text

        new_words = [word for word in words if np.random.random() > p]
        return ' '.join(new_words) if new_words else np.random.choice(words)

    def back_translation(self, text, src_lang='en', pivot_lang='fr'):
        """
        逆翻訳: en -> fr -> en
        意味を保ちながら表現を多様化する
        """
        try:
            from transformers import pipeline

            translator_fwd = pipeline(
                f"translation_{src_lang}_to_{pivot_lang}",
                model=f"Helsinki-NLP/opus-mt-{src_lang}-{pivot_lang}"
            )

            translator_bwd = pipeline(
                f"translation_{pivot_lang}_to_{src_lang}",
                model=f"Helsinki-NLP/opus-mt-{pivot_lang}-{src_lang}"
            )

            pivot_text = translator_fwd(text)[0]['translation_text']
            back_translated = translator_bwd(pivot_text)[0]['translation_text']

            return back_translated

        except Exception as e:
            print(f"翻訳エラー: {e}")
            return text

    def augment_dataset(self, texts, labels, n_aug=4):
        """データセット全体を拡張"""
        augmented_texts = []
        augmented_labels = []

        for text, label in zip(texts, labels):
            augmented_texts.append(text)
            augmented_labels.append(label)

            for _ in range(n_aug):
                aug_type = np.random.choice(
                    ['synonym', 'swap', 'deletion']
                )

                if aug_type == 'synonym':
                    aug_text = self.eda_synonym_replacement(text)
                elif aug_type == 'swap':
                    aug_text = self.eda_random_swap(text)
                else:
                    aug_text = self.eda_random_deletion(text)

                augmented_texts.append(aug_text)
                augmented_labels.append(label)

        print(f"元のサンプル数: {len(texts)}")
        print(f"拡張後のサンプル数: {len(augmented_texts)}")

        return augmented_texts, augmented_labels

5.3 自動拡張(RandAugment、SpecAugment)

import torch
import torchvision.transforms as transforms

def get_randaugment_transforms(n=2, m=9, image_size=224):
    """
    RandAugment: ランダム化された拡張ポリシー
    Cubuk et al., "RandAugment: Practical Automated Data Augmentation" (2019)

    Args:
        n: 適用する拡張操作の数
        m: 拡張の強度(0-30)
    """
    transform = transforms.Compose([
        transforms.RandomResizedCrop(image_size),
        transforms.RandAugment(num_ops=n, magnitude=m),
        transforms.ToTensor(),
        transforms.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        )
    ])

    return transform


def specaugment_for_audio(spectrogram, freq_mask_param=27, time_mask_param=70):
    """
    SpecAugment: 音声スペクトログラム拡張
    Park et al., "SpecAugment: A Simple Data Augmentation Method
    for Automatic Speech Recognition" (2019)

    Args:
        spectrogram: 入力スペクトログラム(周波数, 時間)
        freq_mask_param: 最大周波数マスクサイズ
        time_mask_param: 最大時間マスクサイズ
    """
    import torchaudio.transforms as T

    freq_mask = T.FrequencyMasking(freq_mask_param=freq_mask_param)
    time_mask = T.TimeMasking(time_mask_param=time_mask_param)

    augmented = freq_mask(spectrogram)
    augmented = time_mask(augmented)

    return augmented

6. 合成データ生成

6.1 LLMによる合成テキスト

class SyntheticTextGenerator:
    """LLMを使用して合成学習データを生成"""

    def __init__(self, llm_client, model_name='gpt-4'):
        self.llm = llm_client
        self.model_name = model_name

    def generate_classification_data(self, class_name, n_samples=100,
                                     domain='general', style='diverse'):
        """
        分類クラス用の合成データを生成

        Args:
            class_name: ターゲットクラス名
            n_samples: 生成するサンプル数
            domain: ドメインコンテキスト(医療、法律など)
            style: 文章スタイル(フォーマル、カジュアル、多様)
        """
        prompt = f"""Generate {n_samples} diverse text examples for the class '{class_name}'.
Domain: {domain}
Style: {style}

Requirements:
- Each example should be 1-3 sentences
- Vary the vocabulary, sentence structure, and perspective
- Include both simple and complex cases
- Format as a JSON list: ["example1", "example2", ...]

Generate realistic examples that would appear in real-world {domain} data."""

        print(f"'{class_name}'の合成データ生成プロンプト:")
        print(prompt[:300] + "...")
        print(f"\n{n_samples}サンプルの生成を予定")

    def generate_edge_cases(self, class_examples, n_edge_cases=20):
        """困難なエッジケースを生成"""
        prompt = f"""Based on these training examples:
{chr(10).join(class_examples[:5])}

Generate {n_edge_cases} challenging edge cases that:
1. Are ambiguous between different categories
2. Contain misleading keywords
3. Have unusual sentence structures
4. Test the model's true understanding

Format as JSON list."""

        print("エッジケース生成プロンプトを準備しました")

    def augment_with_paraphrase(self, texts, n_paraphrases=3):
        """LLMを使用してパラフレーズを生成"""
        augmented = []

        for text in texts:
            prompt = f"""Paraphrase the following text {n_paraphrases} times.
Keep the same meaning but use different words and sentence structures.

Original: "{text}"

Format as JSON list of {n_paraphrases} paraphrases."""

            augmented.append({
                'original': text,
                'paraphrases': []
            })

        return augmented


class SyntheticImageGenerator:
    """拡散モデルで合成画像を生成"""

    def __init__(self, model_name='stabilityai/stable-diffusion-2-1'):
        self.model_name = model_name

    def setup_pipeline(self):
        """
        Stable Diffusionパイプラインを初期化
        pip install diffusers accelerate
        """
        try:
            from diffusers import StableDiffusionPipeline
            import torch

            self.pipe = StableDiffusionPipeline.from_pretrained(
                self.model_name,
                torch_dtype=torch.float16
            )

            if torch.cuda.is_available():
                self.pipe = self.pipe.to('cuda')

            print(f"パイプライン初期化: {self.model_name}")
        except Exception as e:
            print(f"パイプライン初期化エラー: {e}")

    def generate_class_images(self, class_name, n_images=50,
                               style_prompt="high quality, photorealistic"):
        """
        クラス用の合成画像を生成

        Args:
            class_name: ターゲットクラス名
            n_images: 生成する画像数
            style_prompt: スタイルガイダンス
        """
        prompts = [
            f"A photo of {class_name}, {style_prompt}",
            f"{class_name} in natural environment, {style_prompt}",
            f"Close-up of {class_name}, detailed, {style_prompt}",
            f"{class_name} from different angle, {style_prompt}",
        ]

        print(f"'{class_name}'の合成画像生成計画:")
        print(f"生成する画像数: {n_images}")
        print("サンプルプロンプト:")
        for p in prompts[:2]:
            print(f"  - {p}")

    def evaluate_synthetic_quality(self, real_images, synthetic_images):
        """FIDスコアを使用して合成画像の品質を評価"""
        try:
            from torchmetrics.image.fid import FrechetInceptionDistance

            fid = FrechetInceptionDistance(feature=64)
            fid.update(real_images, real=True)
            fid.update(synthetic_images, real=False)

            fid_score = fid.compute()
            print(f"FIDスコア: {fid_score:.2f}")
            print("(低いほど良い;0が完璧)")

            return fid_score
        except Exception as e:
            print(f"FID計算エラー: {e}")

7. データフライホイール

7.1 データフライホイールの概念

データフライホイールとは、製品、データ、モデルの好循環です:

  1. より良いモデル → より良い製品
  2. より良い製品 → より多くのユーザー
  3. より多くのユーザー → より多くのデータ
  4. より多くのデータ → より良いモデル
class DataFlywheelPipeline:
    """データフライホイールの実装パイプライン"""

    def __init__(self, model, feedback_store):
        self.model = model
        self.feedback_store = feedback_store
        self.version = 0

    def collect_production_feedback(self, predictions, user_feedback):
        """
        本番環境からユーザーフィードバックを収集

        Args:
            predictions: モデルの予測
            user_feedback: ユーザーの訂正/確認
        """
        valuable_samples = []

        for pred, feedback in zip(predictions, user_feedback):
            if feedback['corrected']:
                sample = {
                    'input': feedback['input'],
                    'model_prediction': pred,
                    'true_label': feedback['correction'],
                    'confidence': pred['confidence'],
                    'timestamp': feedback['timestamp'],
                    'value': 'high'
                }
                valuable_samples.append(sample)

            elif feedback['confirmed'] and pred['confidence'] < 0.7:
                sample = {
                    'input': feedback['input'],
                    'true_label': pred['label'],
                    'confidence': pred['confidence'],
                    'value': 'medium'
                }
                valuable_samples.append(sample)

        print(f"価値あるサンプルを収集: {len(valuable_samples)}")
        return valuable_samples

    def prioritize_labeling_queue(self, unlabeled_pool, budget):
        """
        ラベリングキューの優先順位付け

        優先度基準:
        1. モデルの不確実性(高いほど優先度高)
        2. クラスの希少性(希少なクラスほど優先度高)
        3. データの多様性(既存データと異なるほど優先度高)
        """
        priorities = []

        for sample in unlabeled_pool:
            score = 0

            uncertainty = 1 - max(sample['predicted_probs'])
            score += uncertainty * 0.5

            predicted_class = max(sample['predicted_probs'],
                                  key=sample['predicted_probs'].get)
            rarity = 1 / (sample['class_counts'].get(predicted_class, 1) + 1)
            score += rarity * 0.3

            diversity = np.std(list(sample['predicted_probs'].values()))
            score += diversity * 0.2

            priorities.append((sample, score))

        priorities.sort(key=lambda x: x[1], reverse=True)
        selected = [s for s, _ in priorities[:budget]]

        return selected

8. データパイプラインのベストプラクティス

8.1 再現可能なデータ処理

import hashlib
import json
import os
from pathlib import Path
from datetime import datetime

class ReproducibleDataPipeline:
    """
    再現可能なデータパイプライン
    - すべての処理ステップを追跡
    - データハッシュで整合性を検証
    - バージョン管理をサポート
    """

    def __init__(self, pipeline_name, base_dir='data/processed'):
        self.pipeline_name = pipeline_name
        self.base_dir = Path(base_dir)
        self.steps = []
        self.metadata = {
            'pipeline': pipeline_name,
            'created_at': datetime.now().isoformat(),
            'steps': []
        }

    def add_step(self, step_name, func, *args, **kwargs):
        """処理ステップを追加"""
        self.steps.append({
            'name': step_name,
            'func': func,
            'args': args,
            'kwargs': kwargs
        })

    def compute_hash(self, data):
        """データハッシュを計算"""
        if isinstance(data, np.ndarray):
            return hashlib.md5(data.tobytes()).hexdigest()
        elif isinstance(data, (list, dict)):
            return hashlib.md5(
                json.dumps(data, sort_keys=True, default=str).encode()
            ).hexdigest()
        else:
            return hashlib.md5(str(data).encode()).hexdigest()

    def run(self, input_data):
        """パイプラインを実行"""
        data = input_data

        for step in self.steps:
            print(f"実行中: {step['name']}")

            hash_before = self.compute_hash(data)
            data = step['func'](data, *step['args'], **step['kwargs'])
            hash_after = self.compute_hash(data)

            self.metadata['steps'].append({
                'name': step['name'],
                'hash_before': hash_before,
                'hash_after': hash_after,
                'timestamp': datetime.now().isoformat()
            })

            print(f"  完了: {hash_before[:8]} -> {hash_after[:8]}")

        metadata_path = self.base_dir / f"{self.pipeline_name}_metadata.json"
        metadata_path.parent.mkdir(parents=True, exist_ok=True)
        with open(metadata_path, 'w') as f:
            json.dump(self.metadata, f, indent=2)

        print(f"\nパイプライン完了。メタデータ: {metadata_path}")
        return data

9. まとめと実践ガイド

データ中心のAIチェックリスト

1. データ収集

  • ドメイン専門家とラベリングガイドラインを作成する
  • 評価者間一致度を測定する(目標:Cohen's Kappa > 0.8)
  • 収集中にクラス分布を監視する

2. データクリーニング

  • Cleanlabでラベルエラーを検出・修正する
  • 重複サンプルを除去する
  • 外れ値をレビューする(除去または修正)

3. データ拡張

  • 拡張は学習データのみに適用する(検証/テストデータには不適用)
  • 拡張後のデータ分布を検証する
  • ドメインに適した拡張技術を選択する

4. 継続的改善

  • 本番環境のエラーケースを収集する
  • 能動学習を使用してラベリングを効率化する
  • 定期的なデータ品質監査を実施する

推奨ツール:

データ中心のAIは単なるツールや技術の問題ではありません。それは「より良いモデル」を追求することから「より良いデータ」を追求することへのマインドセットの転換です。多くの実世界プロジェクトでは、この転換だけで劇的な性能改善が達成できます。

Andrew NgのData-Centric AI運動から得られる最も重要な洞察:あなたのモデルの品質は、そのモデルが学習するデータの品質を超えることはできません。データ品質、ラベリングの一貫性、体系的なデータ改善への投資は、AIプロジェクトにおいて最も高いリターンをもたらす活動であることが多いのです。