Skip to content
Published on

AI/MLのためのPython高度テクニック: パフォーマンス最適化・並列処理・メモリ管理

Authors

概要

PythonはAI/MLエコシステムの共通言語です。しかし、素のPythonコードには本質的なパフォーマンスの限界があり、大規模なデータ処理やモデル学習パイプラインでボトルネックが生じます。このガイドでは、本番グレードのAI/MLシステムを構築するための高度なPythonテクニックを解説します。

純粋なPythonループより何百倍も速いコードの書き方、GPUの活用方法、メモリの効率的な管理方法を、実際に動かせるサンプルとともに学びます。


1. Pythonパフォーマンスプロファイリング

最適化の前に必ずプロファイリングを行いましょう。「測定できなければ改善できない」が鉄則です。

1.1 cProfileとpstats

Pythonの標準ライブラリに含まれるcProfileは、関数レベルのプロファイリングの基本ツールです。

import cProfile
import pstats
import io
import numpy as np

def slow_matrix_multiply(a, b):
    """非効率な行列乗算"""
    rows_a = len(a)
    cols_a = len(a[0])
    cols_b = len(b[0])
    result = [[0] * cols_b for _ in range(rows_a)]
    for i in range(rows_a):
        for j in range(cols_b):
            for k in range(cols_a):
                result[i][j] += a[i][k] * b[k][j]
    return result

def profile_function():
    size = 100
    a = [[float(i + j) for j in range(size)] for i in range(size)]
    b = [[float(i * j + 1) for j in range(size)] for i in range(size)]
    slow_matrix_multiply(a, b)

# プロファイリング実行
pr = cProfile.Profile()
pr.enable()
profile_function()
pr.disable()

# 結果の分析
stream = io.StringIO()
ps = pstats.Stats(pr, stream=stream)
ps.sort_stats('cumulative')
ps.print_stats(20)
print(stream.getvalue())

コマンドラインから直接実行することもできます。

python -m cProfile -s cumulative your_script.py
python -m cProfile -o output.prof your_script.py

保存した.profファイルを分析する場合:

import pstats

p = pstats.Stats('output.prof')
p.sort_stats('cumulative')
p.print_stats(10)  # 上位10関数
p.print_callers('slow_function')  # この関数を呼び出した関数
p.print_callees('main')  # この関数が呼び出す関数

1.2 line_profiler(行レベル分析)

関数レベルではなく行レベルでパフォーマンスを測定します。

pip install line_profiler
from line_profiler import LineProfiler
import numpy as np

def process_batch(data):
    result = []
    for item in data:
        normalized = (item - item.mean()) / (item.std() + 1e-8)
        scaled = normalized * 2.0
        clipped = np.clip(scaled, -5, 5)
        result.append(clipped)
    return np.stack(result)

# プロファイラーの設定
lp = LineProfiler()
lp_wrapper = lp(process_batch)

# サンプルデータで実行
data = [np.random.randn(1000) for _ in range(1000)]
lp_wrapper(data)

# 結果の表示
lp.print_stats()

Jupyter Notebookではマジックコマンドを使用できます:

%load_ext line_profiler

%lprun -f process_batch process_batch(data)

1.3 memory_profiler

行ごとにメモリ使用量を追跡します。

pip install memory_profiler
from memory_profiler import profile
import numpy as np

@profile
def memory_intensive_function(n=1_000_000):
    list_data = list(range(n))          # リストの作成
    arr = np.array(list_data)           # NumPy配列に変換
    del list_data                       # リストの削除

    squared = arr ** 2                  # 新しい配列の作成
    filtered = squared[squared > 100]   # フィルタリング

    return filtered.sum()

result = memory_intensive_function()
print(f"Result: {result}")
python -m memory_profiler your_script.py
mprof run your_script.py
mprof plot  # 可視化

1.4 py-spy(サンプリングプロファイラー)

コードを変更せずに実行中のPythonプロセスをプロファイリングできる強力なツールです。

pip install py-spy

# 実行中のプロセスをプロファイリング
py-spy top --pid 12345

# フレームグラフの生成
py-spy record -o profile.svg --pid 12345

# スクリプトを直接プロファイリング
py-spy record -o profile.svg -- python your_script.py

1.5 SnakeViz可視化

pip install snakeviz

python -m cProfile -o output.prof your_script.py
snakeviz output.prof

2. NumPyベクトル化の深掘り

NumPyのベクトル化はPython AI/MLパフォーマンス最適化の要です。

2.1 forループとベクトル化のパフォーマンス比較

import numpy as np
import time

def benchmark(func, *args, runs=5):
    times = []
    for _ in range(runs):
        start = time.perf_counter()
        result = func(*args)
        end = time.perf_counter()
        times.append(end - start)
    return np.mean(times), np.std(times), result

# 1. PythonのForループ
def relu_loop(x):
    result = []
    for val in x:
        result.append(max(0, val))
    return result

# 2. NumPyベクトル化
def relu_numpy(x):
    return np.maximum(0, x)

# 3. NumPy clip
def relu_clip(x):
    return np.clip(x, 0, None)

n = 1_000_000
data_list = [np.random.randn() for _ in range(n)]
data_np = np.array(data_list)

t_loop, _, _ = benchmark(relu_loop, data_list)
t_numpy, _, _ = benchmark(relu_numpy, data_np)
t_clip, _, _ = benchmark(relu_clip, data_np)

print(f"Python loop:      {t_loop:.4f}s")
print(f"NumPy maximum:    {t_numpy:.4f}s  ({t_loop/t_numpy:.0f}x faster)")
print(f"NumPy clip:       {t_clip:.4f}s  ({t_loop/t_clip:.0f}x faster)")

2.2 ユニバーサル関数(ufunc)

import numpy as np

def custom_activation_scalar(x, alpha=0.1):
    """ELU活性化関数"""
    if x >= 0:
        return x
    else:
        return alpha * (np.exp(x) - 1)

# vectorizeでufuncを作成
elu_ufunc = np.vectorize(custom_activation_scalar)

# 完全なNumPy ufunc(より高速)
def elu_numpy(x, alpha=0.1):
    return np.where(x >= 0, x, alpha * (np.exp(x) - 1))

x = np.random.randn(1_000_000)
t_vectorize, _, _ = benchmark(elu_ufunc, x)
t_numpy, _, _ = benchmark(elu_numpy, x)

print(f"np.vectorize: {t_vectorize:.4f}s")
print(f"np.where:     {t_numpy:.4f}s")

2.3 np.einsum(アインシュタイン縮約表記)

import numpy as np

batch_size, seq_len, d_model = 32, 128, 512
heads, d_head = 8, 64

A = np.random.randn(batch_size, seq_len, d_model)
W = np.random.randn(d_model, d_model)

# 標準的な行列積
result_matmul = A @ W  # (32, 128, 512)

# einsumで同じ演算
result_einsum = np.einsum('bsi,ij->bsj', A, W)

# アテンションスコア(バッチ行列積)
Q = np.random.randn(batch_size, heads, seq_len, d_head)
K = np.random.randn(batch_size, heads, seq_len, d_head)

scores_manual = Q @ K.transpose(0, 1, 3, 2)  # (32, 8, 128, 128)
scores_einsum = np.einsum('bhid,bhjd->bhij', Q, K)

# バッチ外積
a = np.random.randn(batch_size, d_model)
b = np.random.randn(batch_size, d_model)
outer = np.einsum('bi,bj->bij', a, b)  # (32, 512, 512)

# トレース(対角成分の和)
M = np.random.randn(batch_size, d_model, d_model)
trace = np.einsum('bii->b', M)  # (32,)

print(f"Attention scores shape: {scores_einsum.shape}")
print(f"Outer product shape: {outer.shape}")
print(f"Trace shape: {trace.shape}")

# 最適な縮約経路にはoptimizeパラメータを使用
result = np.einsum('bhid,bhjd->bhij', Q, K, optimize='optimal')

2.4 ストライドトリック

import numpy as np
from numpy.lib.stride_tricks import as_strided, sliding_window_view

arr = np.arange(20, dtype=float)
window_size = 5

# 現代的なアプローチ(NumPy 1.20以上)
windows = sliding_window_view(arr, window_size)
print(f"Sliding window shape: {windows.shape}")  # (16, 5)

# 移動平均
moving_avg = windows.mean(axis=-1)
print(f"Moving average: {moving_avg[:5]}")

# 2Dスライディングウィンドウ(畳み込み前処理用)
image = np.random.randn(64, 64)
kernel_size = (3, 3)
windows_2d = sliding_window_view(image, kernel_size)
print(f"2D window shape: {windows_2d.shape}")  # (62, 62, 3, 3)

# ストライドトリックを使った非重複チャンク
batch = np.random.randn(1000, 10)
chunk_size = 50
n_chunks = len(batch) // chunk_size
chunks = batch[:n_chunks * chunk_size].reshape(n_chunks, chunk_size, -1)
print(f"Chunks shape: {chunks.shape}")  # (20, 50, 10)

3. Numba JITコンパイル

NumbaはPython関数をマシンコードにコンパイルし、C/C++レベルのパフォーマンスを実現します。

3.1 @jit、@njitデコレータ

from numba import jit, njit, prange
import numpy as np
import time

# @jit: フォールバックモード(Pythonオブジェクトを許可)
@jit(nopython=False)
def sum_with_jit(arr):
    total = 0.0
    for i in range(len(arr)):
        total += arr[i]
    return total

# @njit: 純粋なネイティブモード(より高速、推奨)
@njit
def sum_with_njit(arr):
    total = 0.0
    for i in range(len(arr)):
        total += arr[i]
    return total

@njit
def compute_distances(points):
    """点間のユークリッド距離行列を計算"""
    n = len(points)
    distances = np.zeros((n, n))
    for i in range(n):
        for j in range(i + 1, n):
            diff = points[i] - points[j]
            dist = np.sqrt(np.sum(diff ** 2))
            distances[i, j] = dist
            distances[j, i] = dist
    return distances

arr = np.random.randn(1_000_000)

# 最初の呼び出しはコンパイル時間を含む
_ = sum_with_njit(arr)  # ウォームアップ

start = time.perf_counter()
result_numpy = np.sum(arr)
print(f"NumPy sum: {time.perf_counter() - start:.4f}s")

start = time.perf_counter()
result_njit = sum_with_njit(arr)
print(f"Numba njit: {time.perf_counter() - start:.4f}s")

# 点群の距離計算
points = np.random.randn(500, 3).astype(np.float64)
_ = compute_distances(points)  # ウォームアップ

start = time.perf_counter()
dist_matrix = compute_distances(points)
print(f"Numba distance matrix ({points.shape[0]}x{points.shape[0]}): {time.perf_counter() - start:.4f}s")

3.2 型推論と明示的な型宣言

from numba import njit, float64, int64
import numpy as np

# 明示的な型指定で初回呼び出しの遅延を回避
@njit(float64[:](float64[:], float64))
def scale_array_typed(arr, factor):
    return arr * factor

# 複数の型シグネチャ
from numba import float32, float64

@njit([
    float32[:](float32[:], float32),
    float64[:](float64[:], float64)
])
def scale_multi_type(arr, factor):
    return arr * factor

@njit
def softmax_numba(x):
    """Numba高速化softmax"""
    max_val = np.max(x)
    exp_x = np.exp(x - max_val)
    return exp_x / np.sum(exp_x)

@njit
def batch_softmax(X):
    """2次元配列のバッチsoftmax"""
    result = np.empty_like(X)
    for i in range(X.shape[0]):
        result[i] = softmax_numba(X[i])
    return result

X = np.random.randn(1000, 512).astype(np.float64)
_ = batch_softmax(X)  # ウォームアップ

start = time.perf_counter()
result = batch_softmax(X)
print(f"Numba batch softmax: {time.perf_counter() - start:.4f}s")

3.3 並列処理(@prange)

from numba import njit, prange
import numpy as np

@njit(parallel=True)
def parallel_normalize(X):
    """自動並列化による行ごとの正規化"""
    result = np.empty_like(X)
    for i in prange(X.shape[0]):  # 自動並列化
        row = X[i]
        mean = row.mean()
        std = row.std()
        result[i] = (row - mean) / (std + 1e-8)
    return result

@njit(parallel=True)
def parallel_batch_process(data, weights):
    """バッチの並列加重和"""
    n_samples, n_features = data.shape
    results = np.zeros(n_samples)
    for i in prange(n_samples):
        dot = 0.0
        for j in range(n_features):
            dot += data[i, j] * weights[j]
        results[i] = dot
    return results

X = np.random.randn(10000, 512).astype(np.float64)

_ = parallel_normalize(X)  # ウォームアップ

start = time.perf_counter()
result_parallel = parallel_normalize(X)
t_parallel = time.perf_counter() - start

@njit
def sequential_normalize(X):
    result = np.empty_like(X)
    for i in range(X.shape[0]):
        row = X[i]
        mean = row.mean()
        std = row.std()
        result[i] = (row - mean) / (std + 1e-8)
    return result

_ = sequential_normalize(X)
start = time.perf_counter()
result_seq = sequential_normalize(X)
t_seq = time.perf_counter() - start

print(f"Sequential: {t_seq:.4f}s")
print(f"Parallel:   {t_parallel:.4f}s ({t_seq/t_parallel:.1f}x faster)")

3.4 Numbaキャッシュ

from numba import njit
import numpy as np

# cache=Trueでコンパイル済みコードをディスクに保存
@njit(cache=True)
def cached_computation(x, y):
    """キャッシュ済みJIT関数 — 次回実行時は即座にロード"""
    result = np.empty(len(x))
    for i in range(len(x)):
        result[i] = x[i] ** 2 + y[i] ** 2
    return result

arr1 = np.random.randn(1000)
arr2 = np.random.randn(1000)
result = cached_computation(arr1, arr2)

4. Cython

CythonはPythonコードをCにトランスパイルし、最大のパフォーマンスを実現します。

4.1 .pyxファイルの作成

# setup.py — Cythonビルド設定
from setuptools import setup
from Cython.Build import cythonize
import numpy as np

setup(
    ext_modules=cythonize(
        "fast_ops.pyx",
        compiler_directives={
            'language_level': '3',
            'boundscheck': False,
            'wraparound': False,
            'nonecheck': False,
        }
    ),
    include_dirs=[np.get_include()]
)

fast_ops.pyxの例:

# fast_ops.pyx
# cython: language_level=3
# cython: boundscheck=False
# cython: wraparound=False

import numpy as np
cimport numpy as cnp
from libc.math cimport exp, sqrt

def relu_cython(cnp.ndarray[cnp.double_t, ndim=1] x):
    cdef int n = len(x)
    cdef cnp.ndarray[cnp.double_t, ndim=1] result = np.empty(n)
    cdef int i
    cdef double val

    for i in range(n):
        val = x[i]
        result[i] = val if val > 0 else 0.0

    return result

def softmax_cython(cnp.ndarray[cnp.double_t, ndim=1] x):
    cdef int n = len(x)
    cdef cnp.ndarray[cnp.double_t, ndim=1] result = np.empty(n)
    cdef double max_val = x[0]
    cdef double sum_exp = 0.0
    cdef int i

    for i in range(n):
        if x[i] > max_val:
            max_val = x[i]

    for i in range(n):
        result[i] = exp(x[i] - max_val)
        sum_exp += result[i]

    for i in range(n):
        result[i] /= sum_exp

    return result

ビルドと使用:

python setup.py build_ext --inplace
import numpy as np
import fast_ops  # コンパイル済みCythonモジュール

x = np.random.randn(1_000_000)
result = fast_ops.relu_cython(x)

4.2 IPython Cythonマジック

%load_ext Cython

%%cython -a
# -a: アノテーションモード(黄色=Python呼び出し、白=C)
import numpy as np
cimport numpy as cnp

def fast_l2_norm(cnp.ndarray[cnp.double_t, ndim=1] v):
    cdef int n = len(v)
    cdef double total = 0.0
    cdef int i
    for i in range(n):
        total += v[i] * v[i]
    return total ** 0.5

5. Pythonマルチプロセッシング

5.1 GILの理解

GIL(Global Interpreter Lock)は一度に1つのスレッドしかPythonバイトコードを実行できません。CPUバウンドなタスクでは、マルチスレッドは実質的な並列処理を提供しません — 代わりにマルチプロセッシングを使用します。

import threading
import multiprocessing
import time
import numpy as np

def cpu_bound_task(n):
    total = 0
    for i in range(n):
        total += i ** 2
    return total

def run_threaded(n_tasks=4, n_per_task=1_000_000):
    threads = []
    start = time.perf_counter()
    for _ in range(n_tasks):
        t = threading.Thread(target=cpu_bound_task, args=(n_per_task,))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    return time.perf_counter() - start

def run_multiprocessing(n_tasks=4, n_per_task=1_000_000):
    start = time.perf_counter()
    with multiprocessing.Pool(processes=n_tasks) as pool:
        results = pool.map(cpu_bound_task, [n_per_task] * n_tasks)
    return time.perf_counter() - start

t_threaded = run_threaded()
t_mp = run_multiprocessing()

print(f"Threading:        {t_threaded:.2f}s")
print(f"Multiprocessing:  {t_mp:.2f}s ({t_threaded/t_mp:.1f}x faster)")

5.2 multiprocessing.Pool

import multiprocessing
import numpy as np

def preprocess_sample(args):
    idx, data, mean, std = args
    normalized = (data - mean) / (std + 1e-8)
    augmented = normalized + np.random.randn(*normalized.shape) * 0.01
    return idx, augmented

def preprocess_dataset_parallel(dataset, n_workers=None):
    if n_workers is None:
        n_workers = multiprocessing.cpu_count()

    mean = dataset.mean(axis=0)
    std = dataset.std(axis=0)

    args = [(i, dataset[i], mean, std) for i in range(len(dataset))]

    with multiprocessing.Pool(processes=n_workers) as pool:
        results = pool.map(preprocess_sample, args, chunksize=100)

    results.sort(key=lambda x: x[0])
    return np.stack([r[1] for r in results])

dataset = np.random.randn(10000, 128)
processed = preprocess_dataset_parallel(dataset, n_workers=4)
print(f"Processed dataset shape: {processed.shape}")

5.3 ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor, as_completed
import numpy as np

def train_fold(fold_data):
    fold_idx, X_train, y_train, X_val, y_val = fold_data

    from sklearn.linear_model import LogisticRegression
    from sklearn.metrics import accuracy_score

    model = LogisticRegression(max_iter=1000)
    model.fit(X_train, y_train)

    val_pred = model.predict(X_val)
    accuracy = accuracy_score(y_val, val_pred)

    return fold_idx, accuracy, model

def parallel_cross_validation(X, y, n_folds=5):
    from sklearn.model_selection import KFold

    kf = KFold(n_splits=n_folds, shuffle=True, random_state=42)
    fold_data_list = []

    for fold_idx, (train_idx, val_idx) in enumerate(kf.split(X)):
        fold_data_list.append((
            fold_idx,
            X[train_idx], y[train_idx],
            X[val_idx], y[val_idx]
        ))

    results = {}

    with ProcessPoolExecutor(max_workers=n_folds) as executor:
        future_to_fold = {
            executor.submit(train_fold, fd): fd[0]
            for fd in fold_data_list
        }

        for future in as_completed(future_to_fold):
            fold_idx, accuracy, model = future.result()
            results[fold_idx] = {'accuracy': accuracy, 'model': model}
            print(f"Fold {fold_idx}: accuracy = {accuracy:.4f}")

    avg_accuracy = np.mean([r['accuracy'] for r in results.values()])
    print(f"\nMean accuracy: {avg_accuracy:.4f}")
    return results

from sklearn.datasets import make_classification
X, y = make_classification(n_samples=5000, n_features=20, random_state=42)
results = parallel_cross_validation(X, y, n_folds=5)

5.4 共有メモリ

from multiprocessing import shared_memory
import numpy as np
import multiprocessing

def worker_shared_memory(shm_name, shape, dtype, start_idx, end_idx):
    existing_shm = shared_memory.SharedMemory(name=shm_name)
    arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)

    arr[start_idx:end_idx] = arr[start_idx:end_idx] ** 2

    existing_shm.close()

def process_with_shared_memory(data):
    shm = shared_memory.SharedMemory(create=True, size=data.nbytes)
    shared_array = np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf)
    shared_array[:] = data[:]

    n_workers = multiprocessing.cpu_count()
    chunk_size = len(data) // n_workers

    processes = []
    for i in range(n_workers):
        start = i * chunk_size
        end = start + chunk_size if i < n_workers - 1 else len(data)

        p = multiprocessing.Process(
            target=worker_shared_memory,
            args=(shm.name, data.shape, data.dtype, start, end)
        )
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    result = shared_array.copy()
    shm.close()
    shm.unlink()

    return result

data = np.random.randn(1_000_000).astype(np.float64)
result = process_with_shared_memory(data)
print(f"Shared memory processing complete: {result[:5]}")

6. 非同期プログラミング(asyncio)

6.1 async/awaitパターン

import asyncio
import time

async def fetch_embeddings(text: str, model: str = "text-embedding-3-small") -> list:
    """非同期埋め込みリクエストのシミュレーション"""
    await asyncio.sleep(0.1)
    return [0.1, 0.2, 0.3]

async def sequential_processing(texts: list) -> list:
    results = []
    for text in texts:
        embedding = await fetch_embeddings(text)
        results.append(embedding)
    return results

async def parallel_processing(texts: list) -> list:
    tasks = [fetch_embeddings(text) for text in texts]
    results = await asyncio.gather(*tasks)
    return list(results)

async def compare_approaches():
    texts = [f"sample text {i}" for i in range(20)]

    start = time.perf_counter()
    sequential_results = await sequential_processing(texts)
    t_seq = time.perf_counter() - start

    start = time.perf_counter()
    parallel_results = await parallel_processing(texts)
    t_par = time.perf_counter() - start

    print(f"Sequential: {t_seq:.2f}s ({len(sequential_results)} results)")
    print(f"Parallel:   {t_par:.2f}s ({len(parallel_results)} results)")
    print(f"Speedup: {t_seq/t_par:.1f}x")

asyncio.run(compare_approaches())

6.2 aiohttpによる非同期HTTP

import asyncio
import aiohttp
import time
from typing import Optional

class AsyncAIClient:
    """レート制限付き非同期AIAPIクライアント"""

    def __init__(self, api_key: str, base_url: str, max_concurrent: int = 10):
        self.api_key = api_key
        self.base_url = base_url
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session: Optional[aiohttp.ClientSession] = None

    async def __aenter__(self):
        connector = aiohttp.TCPConnector(limit=100)
        self.session = aiohttp.ClientSession(
            connector=connector,
            headers={"Authorization": f"Bearer {self.api_key}"}
        )
        return self

    async def __aexit__(self, *args):
        if self.session:
            await self.session.close()

    async def get_embedding(self, text: str, retry: int = 3) -> list:
        """指数バックオフリトライ付き埋め込みリクエスト"""
        async with self.semaphore:
            for attempt in range(retry):
                try:
                    async with self.session.post(
                        f"{self.base_url}/embeddings",
                        json={"input": text, "model": "text-embedding-3-small"}
                    ) as response:
                        if response.status == 200:
                            data = await response.json()
                            return data["data"][0]["embedding"]
                        elif response.status == 429:
                            # レート制限 — 指数バックオフ
                            await asyncio.sleep(2 ** attempt)
                        else:
                            response.raise_for_status()
                except aiohttp.ClientError as e:
                    if attempt == retry - 1:
                        raise
                    await asyncio.sleep(1)
        return []

    async def batch_embeddings(self, texts: list, batch_size: int = 100) -> list:
        """バッチで埋め込みを処理"""
        all_results = []

        for i in range(0, len(texts), batch_size):
            batch = texts[i:i + batch_size]
            tasks = [self.get_embedding(text) for text in batch]
            batch_results = await asyncio.gather(*tasks, return_exceptions=True)

            for j, result in enumerate(batch_results):
                if isinstance(result, Exception):
                    print(f"Error for text {i+j}: {result}")
                    all_results.append(None)
                else:
                    all_results.append(result)

        return all_results

6.3 AI API用動的バッチプロセッサー

import asyncio
from dataclasses import dataclass, field
from typing import Any
import time

@dataclass
class BatchProcessor:
    """動的リクエストバッチャー"""
    max_batch_size: int = 20
    max_wait_time: float = 0.05  # 50ms
    queue: asyncio.Queue = field(default_factory=asyncio.Queue)

    async def process_request(self, request_id: str, data: Any) -> Any:
        future = asyncio.get_event_loop().create_future()
        await self.queue.put((request_id, data, future))
        return await future

    async def batch_worker(self):
        while True:
            batch = []
            deadline = time.perf_counter() + self.max_wait_time

            while len(batch) < self.max_batch_size:
                timeout = deadline - time.perf_counter()
                if timeout <= 0:
                    break
                try:
                    item = await asyncio.wait_for(self.queue.get(), timeout=timeout)
                    batch.append(item)
                except asyncio.TimeoutError:
                    break

            if not batch:
                await asyncio.sleep(0.001)
                continue

            request_ids, data_list, futures = zip(*batch)

            try:
                results = await self._call_api_batch(list(data_list))
                for future, result in zip(futures, results):
                    future.set_result(result)
            except Exception as e:
                for future in futures:
                    future.set_exception(e)

    async def _call_api_batch(self, data_list: list) -> list:
        await asyncio.sleep(0.1)  # APIレイテンシのシミュレーション
        return [f"result_{d}" for d in data_list]

async def test_batch_processor():
    processor = BatchProcessor(max_batch_size=10, max_wait_time=0.05)
    worker_task = asyncio.create_task(processor.batch_worker())

    async def make_request(i):
        return await processor.process_request(f"req_{i}", f"data_{i}")

    start = time.perf_counter()
    tasks = [make_request(i) for i in range(50)]
    results = await asyncio.gather(*tasks)
    elapsed = time.perf_counter() - start

    print(f"50 requests processed: {elapsed:.3f}s")
    print(f"First 5 results: {results[:5]}")

    worker_task.cancel()

asyncio.run(test_batch_processor())

7. メモリ管理

7.1 Pythonメモリモデル

import sys
import gc

data_types = {
    'int': 42,
    'float': 3.14,
    'str (small)': "hello",
    'str (large)': "hello" * 1000,
    'list (empty)': [],
    'dict (empty)': {},
    'tuple (empty)': (),
}

print("Object sizes:")
for name, obj in data_types.items():
    print(f"  {name}: {sys.getsizeof(obj)} bytes")

def deep_size(obj, seen=None):
    """オブジェクトとその参照するすべてのオブジェクトの合計メモリ"""
    if seen is None:
        seen = set()
    obj_id = id(obj)
    if obj_id in seen:
        return 0
    seen.add(obj_id)
    size = sys.getsizeof(obj)
    if isinstance(obj, dict):
        size += sum(deep_size(k, seen) + deep_size(v, seen)
                   for k, v in obj.items())
    elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes)):
        size += sum(deep_size(item, seen) for item in obj)
    return size

nested = {'data': [i for i in range(1000)], 'meta': {'name': 'test'}}
print(f"\nNested dict size: {deep_size(nested):,} bytes")

7.2 メモリ効率のためのジェネレータ

import numpy as np
from typing import Generator, Iterator

def dataset_generator(
    file_paths: list,
    batch_size: int = 32
) -> Generator:
    """メモリ効率的なデータローダー"""
    for path in file_paths:
        data = np.random.randn(1000, 128)
        labels = np.random.randint(0, 10, 1000)

        for start in range(0, len(data), batch_size):
            end = min(start + batch_size, len(data))
            yield data[start:end], labels[start:end]

def augment_generator(gen: Iterator, noise_std: float = 0.01):
    """連鎖拡張ジェネレータ"""
    for X, y in gen:
        X_aug = X + np.random.randn(*X.shape) * noise_std
        yield X_aug, y

def normalize_generator(gen: Iterator):
    """正規化ジェネレータ"""
    for X, y in gen:
        X_norm = (X - X.mean(axis=1, keepdims=True)) / (X.std(axis=1, keepdims=True) + 1e-8)
        yield X_norm, y

# パイプラインの構成
fake_paths = [f"data_{i}.npy" for i in range(5)]
base_gen = dataset_generator(fake_paths, batch_size=32)
aug_gen = augment_generator(base_gen)
norm_gen = normalize_generator(aug_gen)

for i, (X_batch, y_batch) in enumerate(norm_gen):
    if i >= 3:
        break
    print(f"Batch {i}: shape={X_batch.shape}, mean={X_batch.mean():.4f}")

7.3 __slots__の最適化

import sys

class RegularEmbedding:
    def __init__(self, vector, label, metadata):
        self.vector = vector
        self.label = label
        self.metadata = metadata

class SlottedEmbedding:
    __slots__ = ['vector', 'label', 'metadata']

    def __init__(self, vector, label, metadata):
        self.vector = vector
        self.label = label
        self.metadata = metadata

import numpy as np

n = 100_000
vector_size = 128

regular_objects = [
    RegularEmbedding(np.random.randn(vector_size), i % 10, {'source': 'test'})
    for i in range(n)
]

slotted_objects = [
    SlottedEmbedding(np.random.randn(vector_size), i % 10, {'source': 'test'})
    for i in range(n)
]

regular_size = sys.getsizeof(regular_objects[0]) + sys.getsizeof(regular_objects[0].__dict__)
slotted_size = sys.getsizeof(slotted_objects[0])

print(f"Regular class instance: {regular_size} bytes")
print(f"__slots__ class instance: {slotted_size} bytes")
print(f"Memory saved: {(regular_size - slotted_size) / regular_size * 100:.1f}%")

7.4 weakref

import weakref
import gc

class ModelCache:
    """弱参照を使用したモデルキャッシュ"""

    def __init__(self):
        self._cache = weakref.WeakValueDictionary()

    def get_or_load(self, model_name: str):
        model = self._cache.get(model_name)
        if model is None:
            print(f"Loading model: {model_name}")
            model = self._load_model(model_name)
            self._cache[model_name] = model
        else:
            print(f"Returning from cache: {model_name}")
        return model

    def _load_model(self, model_name: str):
        import numpy as np
        return {'name': model_name, 'weights': np.random.randn(100, 100)}

cache = ModelCache()

model = cache.get_or_load('resnet50')
print(f"Cache size: {len(cache._cache)}")

model_again = cache.get_or_load('resnet50')

del model
del model_again
gc.collect()

print(f"Cache size after GC: {len(cache._cache)}")

new_model = cache.get_or_load('resnet50')

8. Python型ヒント

8.1 mypyによる静的型チェック

pip install mypy
mypy your_script.py --strict
from typing import Optional, Union, List, Dict, Tuple, Callable
import numpy as np
from numpy.typing import NDArray

def compute_loss(
    predictions: NDArray[np.float32],
    targets: NDArray[np.float32],
    reduction: str = 'mean'
) -> float:
    diff = predictions - targets
    loss = np.sum(diff ** 2)
    if reduction == 'mean':
        return float(loss / len(predictions))
    return float(loss)

def load_checkpoint(
    path: str,
    device: Optional[str] = None,
    strict: bool = True
) -> Optional[Dict[str, NDArray]]:
    try:
        return {'weights': np.random.randn(100)}
    except FileNotFoundError:
        return None

def split_dataset(
    X: NDArray,
    y: NDArray,
    test_size: float = 0.2,
    random_state: Optional[int] = None
) -> Tuple[NDArray, NDArray, NDArray, NDArray]:
    if random_state is not None:
        np.random.seed(random_state)

    n = len(X)
    n_test = int(n * test_size)
    indices = np.random.permutation(n)

    test_idx = indices[:n_test]
    train_idx = indices[n_test:]

    return X[train_idx], X[test_idx], y[train_idx], y[test_idx]

8.2 ジェネリクスとProtocol

from typing import TypeVar, Generic, Protocol, runtime_checkable, List, Dict
import numpy as np

T = TypeVar('T')

class DataLoader(Generic[T]):
    def __init__(self, dataset: List[T], batch_size: int = 32):
        self.dataset = dataset
        self.batch_size = batch_size
        self._idx = 0

    def __iter__(self):
        self._idx = 0
        return self

    def __next__(self) -> List[T]:
        if self._idx >= len(self.dataset):
            raise StopIteration
        batch = self.dataset[self._idx:self._idx + self.batch_size]
        self._idx += self.batch_size
        return batch

@runtime_checkable
class Optimizer(Protocol):
    def step(self) -> None: ...
    def zero_grad(self) -> None: ...
    def state_dict(self) -> Dict: ...

@runtime_checkable
class Model(Protocol):
    def forward(self, x: 'NDArray') -> 'NDArray': ...
    def parameters(self) -> List['NDArray']: ...
    def train(self) -> None: ...
    def eval(self) -> None: ...

8.3 dataclassとTypedDict

from dataclasses import dataclass, field
from typing import TypedDict, List, Dict, Optional

class ModelConfig(TypedDict):
    hidden_size: int
    num_layers: int
    dropout: float
    activation: str

@dataclass
class ExperimentConfig:
    experiment_name: str
    model_type: str

    learning_rate: float = 1e-3
    batch_size: int = 32
    max_epochs: int = 100
    seed: int = 42

    tags: List[str] = field(default_factory=list)
    hyperparams: Dict[str, float] = field(default_factory=dict)

    def __post_init__(self):
        if self.learning_rate <= 0:
            raise ValueError(f"Learning rate must be positive: {self.learning_rate}")
        if not self.experiment_name:
            raise ValueError("Experiment name is required")

    def to_dict(self) -> Dict:
        from dataclasses import asdict
        return asdict(self)

config = ExperimentConfig(
    experiment_name="bert-finetuning-v1",
    model_type="bert-base",
    learning_rate=2e-5,
    batch_size=16,
    tags=["nlp", "classification"]
)

print(f"Experiment: {config.experiment_name}")
print(f"Config: {config.to_dict()}")

8.4 pydanticによるデータ検証

from pydantic import BaseModel, Field, validator, root_validator
from typing import Optional, List

class DataConfig(BaseModel):
    data_path: str
    batch_size: int = Field(ge=1, le=1024, default=32)
    shuffle: bool = True
    num_workers: int = Field(ge=0, le=16, default=4)

    @validator('data_path')
    def path_must_exist(cls, v):
        import os
        if not os.path.exists(v) and v != "test_path":
            raise ValueError(f"Data path does not exist: {v}")
        return v

class ModelConfig(BaseModel):
    architecture: str
    hidden_sizes: List[int] = Field(min_items=1)
    dropout_rate: float = Field(ge=0.0, le=0.9, default=0.1)
    activation: str = "relu"

    @validator('activation')
    def valid_activation(cls, v):
        valid = ['relu', 'gelu', 'silu', 'tanh', 'sigmoid']
        if v not in valid:
            raise ValueError(f"Activation must be one of {valid}")
        return v

class TrainingConfig(BaseModel):
    data: DataConfig
    model: ModelConfig
    learning_rate: float = Field(gt=0, le=1.0, default=1e-3)
    weight_decay: float = Field(ge=0, default=1e-4)
    max_epochs: int = Field(ge=1, le=10000, default=100)

    @root_validator
    def check_consistency(cls, values):
        model = values.get('model')
        if model and model.dropout_rate > 0.5:
            print("Warning: High dropout rate may cause underfitting")
        return values

try:
    config = TrainingConfig(
        data=DataConfig(data_path="test_path", batch_size=64),
        model=ModelConfig(
            architecture="transformer",
            hidden_sizes=[512, 256, 128],
            dropout_rate=0.1
        ),
        learning_rate=2e-5
    )
    print(f"Validation passed: {config.model.architecture}")
    print(f"JSON: {config.json(indent=2)}")
except Exception as e:
    print(f"Validation error: {e}")

9. パッケージ管理と依存関係

9.1 Poetry

# Poetryのインストール
curl -sSL https://install.python-poetry.org | python3 -
poetry new my-ml-project
cd my-ml-project

# 依存関係の追加
poetry add numpy torch torchvision
poetry add --group dev pytest black ruff mypy

# 仮想環境のアクティベート
poetry shell

# すべての依存関係をインストール
poetry install

# requirements.txtにエクスポート
poetry export -f requirements.txt --output requirements.txt

9.2 pyproject.toml

[tool.poetry]
name = "my-ml-project"
version = "0.1.0"
description = "AI/ML project"
authors = ["Your Name <email@example.com>"]
license = "MIT"
readme = "README.md"
packages = [{include = "my_ml_project"}]

[tool.poetry.dependencies]
python = "^3.10"
numpy = "^1.24"
torch = "^2.1"
torchvision = "^0.16"
transformers = "^4.35"
pydantic = "^2.0"
aiohttp = "^3.9"

[tool.poetry.group.dev.dependencies]
pytest = "^7.4"
pytest-cov = "^4.1"
black = "^23.0"
ruff = "^0.1"
mypy = "^1.6"
pre-commit = "^3.5"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.black]
line-length = 88
target-version = ['py310', 'py311']

[tool.ruff]
select = ["E", "F", "I", "N", "W"]
ignore = ["E501"]
line-length = 88

[tool.mypy]
python_version = "3.10"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true

[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "-v --cov=my_ml_project --cov-report=html"

9.3 uv(高速パッケージマネージャー)

# uvのインストール(Rustベース、超高速)
curl -LsSf https://astral.sh/uv/install.sh | sh

# プロジェクトの初期化
uv init my-project
cd my-project

# 依存関係の追加(pipより10〜100倍高速)
uv add numpy torch transformers
uv add --dev pytest ruff mypy

# 環境の同期
uv sync

# Pythonバージョンの管理
uv python install 3.11
uv python pin 3.11

# スクリプトの実行
uv run python train.py

10. MLのためのクリーンコード

10.1 デザインパターン(ファクトリー、ストラテジー)

from abc import ABC, abstractmethod
from typing import Dict, Type
import numpy as np

# ストラテジーパターン — 損失関数
class LossFunction(ABC):
    @abstractmethod
    def __call__(self, predictions: np.ndarray, targets: np.ndarray) -> float:
        pass

    @abstractmethod
    def gradient(self, predictions: np.ndarray, targets: np.ndarray) -> np.ndarray:
        pass

class MSELoss(LossFunction):
    def __call__(self, predictions, targets):
        return float(np.mean((predictions - targets) ** 2))

    def gradient(self, predictions, targets):
        return 2 * (predictions - targets) / len(predictions)

class HuberLoss(LossFunction):
    def __init__(self, delta: float = 1.0):
        self.delta = delta

    def __call__(self, predictions, targets):
        diff = predictions - targets
        is_small = np.abs(diff) <= self.delta
        loss = np.where(is_small, 0.5 * diff**2,
                       self.delta * np.abs(diff) - 0.5 * self.delta**2)
        return float(np.mean(loss))

    def gradient(self, predictions, targets):
        diff = predictions - targets
        return np.where(np.abs(diff) <= self.delta, diff,
                       self.delta * np.sign(diff))

# ファクトリーパターン — モデルの作成
class ModelFactory:
    _registry: Dict[str, Type] = {}

    @classmethod
    def register(cls, name: str):
        def decorator(model_class):
            cls._registry[name] = model_class
            return model_class
        return decorator

    @classmethod
    def create(cls, name: str, **kwargs):
        if name not in cls._registry:
            available = list(cls._registry.keys())
            raise ValueError(f"Unknown model: {name}. Available: {available}")
        return cls._registry[name](**kwargs)

@ModelFactory.register("linear")
class LinearModel:
    def __init__(self, input_size: int, output_size: int):
        self.W = np.random.randn(input_size, output_size) * 0.01
        self.b = np.zeros(output_size)

    def forward(self, X: np.ndarray) -> np.ndarray:
        return X @ self.W + self.b

@ModelFactory.register("mlp")
class MLPModel:
    def __init__(self, layer_sizes: list):
        self.layers = []
        for i in range(len(layer_sizes) - 1):
            W = np.random.randn(layer_sizes[i], layer_sizes[i+1]) * 0.01
            b = np.zeros(layer_sizes[i+1])
            self.layers.append((W, b))

    def forward(self, X: np.ndarray) -> np.ndarray:
        out = X
        for i, (W, b) in enumerate(self.layers):
            out = out @ W + b
            if i < len(self.layers) - 1:
                out = np.maximum(0, out)
        return out

model = ModelFactory.create("mlp", layer_sizes=[128, 256, 10])
X = np.random.randn(32, 128)
output = model.forward(X)
print(f"Model output shape: {output.shape}")

10.2 pytestを使ったMLテスト

# tests/test_models.py
import pytest
import numpy as np
from numpy.testing import assert_array_almost_equal

class TestLossFunctions:

    def test_mse_zero_loss(self):
        """完全な予測はゼロ損失を生じさせるべき"""
        x = np.array([1.0, 2.0, 3.0])
        loss = MSELoss()(x, x)
        assert abs(loss) < 1e-10

    def test_mse_positive(self):
        """MSEは常に非負でなければならない"""
        predictions = np.random.randn(100)
        targets = np.random.randn(100)
        loss = MSELoss()(predictions, targets)
        assert loss >= 0

    @pytest.mark.parametrize("batch_size", [1, 16, 32, 128])
    def test_batch_sizes(self, batch_size):
        """モデルはさまざまなバッチサイズを処理できなければならない"""
        X = np.random.randn(batch_size, 64)
        model = LinearModel(64, 10)
        output = model.forward(X)
        assert output.shape == (batch_size, 10)

    def test_numerical_gradient(self):
        """解析的勾配が数値的勾配と一致することを検証"""
        loss_fn = MSELoss()
        pred = np.array([0.5, 0.3, 0.2])
        target = np.array([0.6, 0.2, 0.1])

        analytical_grad = loss_fn.gradient(pred, target)

        eps = 1e-5
        numerical_grad = np.zeros_like(pred)
        for i in range(len(pred)):
            pred_plus = pred.copy()
            pred_plus[i] += eps
            pred_minus = pred.copy()
            pred_minus[i] -= eps
            numerical_grad[i] = (loss_fn(pred_plus, target) - loss_fn(pred_minus, target)) / (2 * eps)

        assert_array_almost_equal(analytical_grad, numerical_grad, decimal=5)

# 実行方法:
# pytest tests/ -v --cov=src --cov-report=html -x

10.3 ロギングのベストプラクティス

import logging
import sys
from pathlib import Path
from typing import Optional

def setup_logger(
    name: str,
    level: int = logging.INFO,
    log_file: Optional[str] = None,
    format_str: Optional[str] = None
) -> logging.Logger:
    if format_str is None:
        format_str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

    formatter = logging.Formatter(format_str, datefmt='%Y-%m-%d %H:%M:%S')

    logger = logging.getLogger(name)
    logger.setLevel(level)

    if not logger.handlers:
        console_handler = logging.StreamHandler(sys.stdout)
        console_handler.setFormatter(formatter)
        logger.addHandler(console_handler)

        if log_file:
            Path(log_file).parent.mkdir(parents=True, exist_ok=True)
            file_handler = logging.FileHandler(log_file)
            file_handler.setFormatter(formatter)
            logger.addHandler(file_handler)

    return logger

class TrainingLogger:
    def __init__(self, experiment_name: str, log_dir: str = "logs"):
        self.logger = setup_logger(
            f"training.{experiment_name}",
            log_file=f"{log_dir}/{experiment_name}.log"
        )
        self.step = 0

    def log_metrics(self, metrics: dict, phase: str = "train"):
        metrics_str = ", ".join(f"{k}={v:.4f}" for k, v in metrics.items())
        self.logger.info(f"[{phase}] step={self.step} {metrics_str}")
        self.step += 1

    def log_epoch(self, epoch: int, train_loss: float, val_loss: float):
        self.logger.info(
            f"Epoch {epoch}: train_loss={train_loss:.4f}, val_loss={val_loss:.4f}"
        )

    def log_hyperparams(self, params: dict):
        self.logger.info(f"Hyperparameters: {params}")

logger = TrainingLogger("bert-finetuning-v1")
logger.log_hyperparams({"lr": 2e-5, "batch_size": 32, "epochs": 10})

for epoch in range(3):
    for step in range(10):
        logger.log_metrics(
            {"loss": 2.0 - epoch * 0.3 - step * 0.01, "acc": 0.5 + epoch * 0.1},
            phase="train"
        )
    logger.log_epoch(epoch, 1.5 - epoch * 0.3, 1.8 - epoch * 0.25)

まとめ

このガイドで解説したテクニックを実際のAI/MLプロジェクトに適用する際は、以下の優先順位に従いましょう:

  1. まずプロファイリング:最適化前に必ずcProfileとline_profilerでボトルネックを特定する。

  2. NumPyでベクトル化:ほとんどの場合、ベクトル化だけで10〜100倍の高速化が実現できる。

  3. ホットループにNumbaを適用:純粋なPythonループが本当に避けられない場合にNumba JITを使用する。

  4. CPU飽和にはマルチプロセッシングを使用:データ前処理やクロスバリデーションなどの独立したタスクにProcessPoolExecutorを活用する。

  5. I/Oバウンドパイプラインにはasyncio:APIヘビーまたはファイルI/Oヘビーなワークフローで10倍以上のスループット向上を達成する。

  6. 型ヒントとテスト:mypy、pytest、pydanticを組み合わせて、安全にスケールできる堅牢なMLコードベースを維持する。


参考文献