- Authors

- Name
- Youngju Kim
- @fjvbn20031
概要
PythonはAI/MLエコシステムの共通言語です。しかし、素のPythonコードには本質的なパフォーマンスの限界があり、大規模なデータ処理やモデル学習パイプラインでボトルネックが生じます。このガイドでは、本番グレードのAI/MLシステムを構築するための高度なPythonテクニックを解説します。
純粋なPythonループより何百倍も速いコードの書き方、GPUの活用方法、メモリの効率的な管理方法を、実際に動かせるサンプルとともに学びます。
1. Pythonパフォーマンスプロファイリング
最適化の前に必ずプロファイリングを行いましょう。「測定できなければ改善できない」が鉄則です。
1.1 cProfileとpstats
Pythonの標準ライブラリに含まれるcProfileは、関数レベルのプロファイリングの基本ツールです。
import cProfile
import pstats
import io
import numpy as np
def slow_matrix_multiply(a, b):
"""非効率な行列乗算"""
rows_a = len(a)
cols_a = len(a[0])
cols_b = len(b[0])
result = [[0] * cols_b for _ in range(rows_a)]
for i in range(rows_a):
for j in range(cols_b):
for k in range(cols_a):
result[i][j] += a[i][k] * b[k][j]
return result
def profile_function():
size = 100
a = [[float(i + j) for j in range(size)] for i in range(size)]
b = [[float(i * j + 1) for j in range(size)] for i in range(size)]
slow_matrix_multiply(a, b)
# プロファイリング実行
pr = cProfile.Profile()
pr.enable()
profile_function()
pr.disable()
# 結果の分析
stream = io.StringIO()
ps = pstats.Stats(pr, stream=stream)
ps.sort_stats('cumulative')
ps.print_stats(20)
print(stream.getvalue())
コマンドラインから直接実行することもできます。
python -m cProfile -s cumulative your_script.py
python -m cProfile -o output.prof your_script.py
保存した.profファイルを分析する場合:
import pstats
p = pstats.Stats('output.prof')
p.sort_stats('cumulative')
p.print_stats(10) # 上位10関数
p.print_callers('slow_function') # この関数を呼び出した関数
p.print_callees('main') # この関数が呼び出す関数
1.2 line_profiler(行レベル分析)
関数レベルではなく行レベルでパフォーマンスを測定します。
pip install line_profiler
from line_profiler import LineProfiler
import numpy as np
def process_batch(data):
result = []
for item in data:
normalized = (item - item.mean()) / (item.std() + 1e-8)
scaled = normalized * 2.0
clipped = np.clip(scaled, -5, 5)
result.append(clipped)
return np.stack(result)
# プロファイラーの設定
lp = LineProfiler()
lp_wrapper = lp(process_batch)
# サンプルデータで実行
data = [np.random.randn(1000) for _ in range(1000)]
lp_wrapper(data)
# 結果の表示
lp.print_stats()
Jupyter Notebookではマジックコマンドを使用できます:
%load_ext line_profiler
%lprun -f process_batch process_batch(data)
1.3 memory_profiler
行ごとにメモリ使用量を追跡します。
pip install memory_profiler
from memory_profiler import profile
import numpy as np
@profile
def memory_intensive_function(n=1_000_000):
list_data = list(range(n)) # リストの作成
arr = np.array(list_data) # NumPy配列に変換
del list_data # リストの削除
squared = arr ** 2 # 新しい配列の作成
filtered = squared[squared > 100] # フィルタリング
return filtered.sum()
result = memory_intensive_function()
print(f"Result: {result}")
python -m memory_profiler your_script.py
mprof run your_script.py
mprof plot # 可視化
1.4 py-spy(サンプリングプロファイラー)
コードを変更せずに実行中のPythonプロセスをプロファイリングできる強力なツールです。
pip install py-spy
# 実行中のプロセスをプロファイリング
py-spy top --pid 12345
# フレームグラフの生成
py-spy record -o profile.svg --pid 12345
# スクリプトを直接プロファイリング
py-spy record -o profile.svg -- python your_script.py
1.5 SnakeViz可視化
pip install snakeviz
python -m cProfile -o output.prof your_script.py
snakeviz output.prof
2. NumPyベクトル化の深掘り
NumPyのベクトル化はPython AI/MLパフォーマンス最適化の要です。
2.1 forループとベクトル化のパフォーマンス比較
import numpy as np
import time
def benchmark(func, *args, runs=5):
times = []
for _ in range(runs):
start = time.perf_counter()
result = func(*args)
end = time.perf_counter()
times.append(end - start)
return np.mean(times), np.std(times), result
# 1. PythonのForループ
def relu_loop(x):
result = []
for val in x:
result.append(max(0, val))
return result
# 2. NumPyベクトル化
def relu_numpy(x):
return np.maximum(0, x)
# 3. NumPy clip
def relu_clip(x):
return np.clip(x, 0, None)
n = 1_000_000
data_list = [np.random.randn() for _ in range(n)]
data_np = np.array(data_list)
t_loop, _, _ = benchmark(relu_loop, data_list)
t_numpy, _, _ = benchmark(relu_numpy, data_np)
t_clip, _, _ = benchmark(relu_clip, data_np)
print(f"Python loop: {t_loop:.4f}s")
print(f"NumPy maximum: {t_numpy:.4f}s ({t_loop/t_numpy:.0f}x faster)")
print(f"NumPy clip: {t_clip:.4f}s ({t_loop/t_clip:.0f}x faster)")
2.2 ユニバーサル関数(ufunc)
import numpy as np
def custom_activation_scalar(x, alpha=0.1):
"""ELU活性化関数"""
if x >= 0:
return x
else:
return alpha * (np.exp(x) - 1)
# vectorizeでufuncを作成
elu_ufunc = np.vectorize(custom_activation_scalar)
# 完全なNumPy ufunc(より高速)
def elu_numpy(x, alpha=0.1):
return np.where(x >= 0, x, alpha * (np.exp(x) - 1))
x = np.random.randn(1_000_000)
t_vectorize, _, _ = benchmark(elu_ufunc, x)
t_numpy, _, _ = benchmark(elu_numpy, x)
print(f"np.vectorize: {t_vectorize:.4f}s")
print(f"np.where: {t_numpy:.4f}s")
2.3 np.einsum(アインシュタイン縮約表記)
import numpy as np
batch_size, seq_len, d_model = 32, 128, 512
heads, d_head = 8, 64
A = np.random.randn(batch_size, seq_len, d_model)
W = np.random.randn(d_model, d_model)
# 標準的な行列積
result_matmul = A @ W # (32, 128, 512)
# einsumで同じ演算
result_einsum = np.einsum('bsi,ij->bsj', A, W)
# アテンションスコア(バッチ行列積)
Q = np.random.randn(batch_size, heads, seq_len, d_head)
K = np.random.randn(batch_size, heads, seq_len, d_head)
scores_manual = Q @ K.transpose(0, 1, 3, 2) # (32, 8, 128, 128)
scores_einsum = np.einsum('bhid,bhjd->bhij', Q, K)
# バッチ外積
a = np.random.randn(batch_size, d_model)
b = np.random.randn(batch_size, d_model)
outer = np.einsum('bi,bj->bij', a, b) # (32, 512, 512)
# トレース(対角成分の和)
M = np.random.randn(batch_size, d_model, d_model)
trace = np.einsum('bii->b', M) # (32,)
print(f"Attention scores shape: {scores_einsum.shape}")
print(f"Outer product shape: {outer.shape}")
print(f"Trace shape: {trace.shape}")
# 最適な縮約経路にはoptimizeパラメータを使用
result = np.einsum('bhid,bhjd->bhij', Q, K, optimize='optimal')
2.4 ストライドトリック
import numpy as np
from numpy.lib.stride_tricks import as_strided, sliding_window_view
arr = np.arange(20, dtype=float)
window_size = 5
# 現代的なアプローチ(NumPy 1.20以上)
windows = sliding_window_view(arr, window_size)
print(f"Sliding window shape: {windows.shape}") # (16, 5)
# 移動平均
moving_avg = windows.mean(axis=-1)
print(f"Moving average: {moving_avg[:5]}")
# 2Dスライディングウィンドウ(畳み込み前処理用)
image = np.random.randn(64, 64)
kernel_size = (3, 3)
windows_2d = sliding_window_view(image, kernel_size)
print(f"2D window shape: {windows_2d.shape}") # (62, 62, 3, 3)
# ストライドトリックを使った非重複チャンク
batch = np.random.randn(1000, 10)
chunk_size = 50
n_chunks = len(batch) // chunk_size
chunks = batch[:n_chunks * chunk_size].reshape(n_chunks, chunk_size, -1)
print(f"Chunks shape: {chunks.shape}") # (20, 50, 10)
3. Numba JITコンパイル
NumbaはPython関数をマシンコードにコンパイルし、C/C++レベルのパフォーマンスを実現します。
3.1 @jit、@njitデコレータ
from numba import jit, njit, prange
import numpy as np
import time
# @jit: フォールバックモード(Pythonオブジェクトを許可)
@jit(nopython=False)
def sum_with_jit(arr):
total = 0.0
for i in range(len(arr)):
total += arr[i]
return total
# @njit: 純粋なネイティブモード(より高速、推奨)
@njit
def sum_with_njit(arr):
total = 0.0
for i in range(len(arr)):
total += arr[i]
return total
@njit
def compute_distances(points):
"""点間のユークリッド距離行列を計算"""
n = len(points)
distances = np.zeros((n, n))
for i in range(n):
for j in range(i + 1, n):
diff = points[i] - points[j]
dist = np.sqrt(np.sum(diff ** 2))
distances[i, j] = dist
distances[j, i] = dist
return distances
arr = np.random.randn(1_000_000)
# 最初の呼び出しはコンパイル時間を含む
_ = sum_with_njit(arr) # ウォームアップ
start = time.perf_counter()
result_numpy = np.sum(arr)
print(f"NumPy sum: {time.perf_counter() - start:.4f}s")
start = time.perf_counter()
result_njit = sum_with_njit(arr)
print(f"Numba njit: {time.perf_counter() - start:.4f}s")
# 点群の距離計算
points = np.random.randn(500, 3).astype(np.float64)
_ = compute_distances(points) # ウォームアップ
start = time.perf_counter()
dist_matrix = compute_distances(points)
print(f"Numba distance matrix ({points.shape[0]}x{points.shape[0]}): {time.perf_counter() - start:.4f}s")
3.2 型推論と明示的な型宣言
from numba import njit, float64, int64
import numpy as np
# 明示的な型指定で初回呼び出しの遅延を回避
@njit(float64[:](float64[:], float64))
def scale_array_typed(arr, factor):
return arr * factor
# 複数の型シグネチャ
from numba import float32, float64
@njit([
float32[:](float32[:], float32),
float64[:](float64[:], float64)
])
def scale_multi_type(arr, factor):
return arr * factor
@njit
def softmax_numba(x):
"""Numba高速化softmax"""
max_val = np.max(x)
exp_x = np.exp(x - max_val)
return exp_x / np.sum(exp_x)
@njit
def batch_softmax(X):
"""2次元配列のバッチsoftmax"""
result = np.empty_like(X)
for i in range(X.shape[0]):
result[i] = softmax_numba(X[i])
return result
X = np.random.randn(1000, 512).astype(np.float64)
_ = batch_softmax(X) # ウォームアップ
start = time.perf_counter()
result = batch_softmax(X)
print(f"Numba batch softmax: {time.perf_counter() - start:.4f}s")
3.3 並列処理(@prange)
from numba import njit, prange
import numpy as np
@njit(parallel=True)
def parallel_normalize(X):
"""自動並列化による行ごとの正規化"""
result = np.empty_like(X)
for i in prange(X.shape[0]): # 自動並列化
row = X[i]
mean = row.mean()
std = row.std()
result[i] = (row - mean) / (std + 1e-8)
return result
@njit(parallel=True)
def parallel_batch_process(data, weights):
"""バッチの並列加重和"""
n_samples, n_features = data.shape
results = np.zeros(n_samples)
for i in prange(n_samples):
dot = 0.0
for j in range(n_features):
dot += data[i, j] * weights[j]
results[i] = dot
return results
X = np.random.randn(10000, 512).astype(np.float64)
_ = parallel_normalize(X) # ウォームアップ
start = time.perf_counter()
result_parallel = parallel_normalize(X)
t_parallel = time.perf_counter() - start
@njit
def sequential_normalize(X):
result = np.empty_like(X)
for i in range(X.shape[0]):
row = X[i]
mean = row.mean()
std = row.std()
result[i] = (row - mean) / (std + 1e-8)
return result
_ = sequential_normalize(X)
start = time.perf_counter()
result_seq = sequential_normalize(X)
t_seq = time.perf_counter() - start
print(f"Sequential: {t_seq:.4f}s")
print(f"Parallel: {t_parallel:.4f}s ({t_seq/t_parallel:.1f}x faster)")
3.4 Numbaキャッシュ
from numba import njit
import numpy as np
# cache=Trueでコンパイル済みコードをディスクに保存
@njit(cache=True)
def cached_computation(x, y):
"""キャッシュ済みJIT関数 — 次回実行時は即座にロード"""
result = np.empty(len(x))
for i in range(len(x)):
result[i] = x[i] ** 2 + y[i] ** 2
return result
arr1 = np.random.randn(1000)
arr2 = np.random.randn(1000)
result = cached_computation(arr1, arr2)
4. Cython
CythonはPythonコードをCにトランスパイルし、最大のパフォーマンスを実現します。
4.1 .pyxファイルの作成
# setup.py — Cythonビルド設定
from setuptools import setup
from Cython.Build import cythonize
import numpy as np
setup(
ext_modules=cythonize(
"fast_ops.pyx",
compiler_directives={
'language_level': '3',
'boundscheck': False,
'wraparound': False,
'nonecheck': False,
}
),
include_dirs=[np.get_include()]
)
fast_ops.pyxの例:
# fast_ops.pyx
# cython: language_level=3
# cython: boundscheck=False
# cython: wraparound=False
import numpy as np
cimport numpy as cnp
from libc.math cimport exp, sqrt
def relu_cython(cnp.ndarray[cnp.double_t, ndim=1] x):
cdef int n = len(x)
cdef cnp.ndarray[cnp.double_t, ndim=1] result = np.empty(n)
cdef int i
cdef double val
for i in range(n):
val = x[i]
result[i] = val if val > 0 else 0.0
return result
def softmax_cython(cnp.ndarray[cnp.double_t, ndim=1] x):
cdef int n = len(x)
cdef cnp.ndarray[cnp.double_t, ndim=1] result = np.empty(n)
cdef double max_val = x[0]
cdef double sum_exp = 0.0
cdef int i
for i in range(n):
if x[i] > max_val:
max_val = x[i]
for i in range(n):
result[i] = exp(x[i] - max_val)
sum_exp += result[i]
for i in range(n):
result[i] /= sum_exp
return result
ビルドと使用:
python setup.py build_ext --inplace
import numpy as np
import fast_ops # コンパイル済みCythonモジュール
x = np.random.randn(1_000_000)
result = fast_ops.relu_cython(x)
4.2 IPython Cythonマジック
%load_ext Cython
%%cython -a
# -a: アノテーションモード(黄色=Python呼び出し、白=C)
import numpy as np
cimport numpy as cnp
def fast_l2_norm(cnp.ndarray[cnp.double_t, ndim=1] v):
cdef int n = len(v)
cdef double total = 0.0
cdef int i
for i in range(n):
total += v[i] * v[i]
return total ** 0.5
5. Pythonマルチプロセッシング
5.1 GILの理解
GIL(Global Interpreter Lock)は一度に1つのスレッドしかPythonバイトコードを実行できません。CPUバウンドなタスクでは、マルチスレッドは実質的な並列処理を提供しません — 代わりにマルチプロセッシングを使用します。
import threading
import multiprocessing
import time
import numpy as np
def cpu_bound_task(n):
total = 0
for i in range(n):
total += i ** 2
return total
def run_threaded(n_tasks=4, n_per_task=1_000_000):
threads = []
start = time.perf_counter()
for _ in range(n_tasks):
t = threading.Thread(target=cpu_bound_task, args=(n_per_task,))
threads.append(t)
t.start()
for t in threads:
t.join()
return time.perf_counter() - start
def run_multiprocessing(n_tasks=4, n_per_task=1_000_000):
start = time.perf_counter()
with multiprocessing.Pool(processes=n_tasks) as pool:
results = pool.map(cpu_bound_task, [n_per_task] * n_tasks)
return time.perf_counter() - start
t_threaded = run_threaded()
t_mp = run_multiprocessing()
print(f"Threading: {t_threaded:.2f}s")
print(f"Multiprocessing: {t_mp:.2f}s ({t_threaded/t_mp:.1f}x faster)")
5.2 multiprocessing.Pool
import multiprocessing
import numpy as np
def preprocess_sample(args):
idx, data, mean, std = args
normalized = (data - mean) / (std + 1e-8)
augmented = normalized + np.random.randn(*normalized.shape) * 0.01
return idx, augmented
def preprocess_dataset_parallel(dataset, n_workers=None):
if n_workers is None:
n_workers = multiprocessing.cpu_count()
mean = dataset.mean(axis=0)
std = dataset.std(axis=0)
args = [(i, dataset[i], mean, std) for i in range(len(dataset))]
with multiprocessing.Pool(processes=n_workers) as pool:
results = pool.map(preprocess_sample, args, chunksize=100)
results.sort(key=lambda x: x[0])
return np.stack([r[1] for r in results])
dataset = np.random.randn(10000, 128)
processed = preprocess_dataset_parallel(dataset, n_workers=4)
print(f"Processed dataset shape: {processed.shape}")
5.3 ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor, as_completed
import numpy as np
def train_fold(fold_data):
fold_idx, X_train, y_train, X_val, y_val = fold_data
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
model = LogisticRegression(max_iter=1000)
model.fit(X_train, y_train)
val_pred = model.predict(X_val)
accuracy = accuracy_score(y_val, val_pred)
return fold_idx, accuracy, model
def parallel_cross_validation(X, y, n_folds=5):
from sklearn.model_selection import KFold
kf = KFold(n_splits=n_folds, shuffle=True, random_state=42)
fold_data_list = []
for fold_idx, (train_idx, val_idx) in enumerate(kf.split(X)):
fold_data_list.append((
fold_idx,
X[train_idx], y[train_idx],
X[val_idx], y[val_idx]
))
results = {}
with ProcessPoolExecutor(max_workers=n_folds) as executor:
future_to_fold = {
executor.submit(train_fold, fd): fd[0]
for fd in fold_data_list
}
for future in as_completed(future_to_fold):
fold_idx, accuracy, model = future.result()
results[fold_idx] = {'accuracy': accuracy, 'model': model}
print(f"Fold {fold_idx}: accuracy = {accuracy:.4f}")
avg_accuracy = np.mean([r['accuracy'] for r in results.values()])
print(f"\nMean accuracy: {avg_accuracy:.4f}")
return results
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=5000, n_features=20, random_state=42)
results = parallel_cross_validation(X, y, n_folds=5)
5.4 共有メモリ
from multiprocessing import shared_memory
import numpy as np
import multiprocessing
def worker_shared_memory(shm_name, shape, dtype, start_idx, end_idx):
existing_shm = shared_memory.SharedMemory(name=shm_name)
arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
arr[start_idx:end_idx] = arr[start_idx:end_idx] ** 2
existing_shm.close()
def process_with_shared_memory(data):
shm = shared_memory.SharedMemory(create=True, size=data.nbytes)
shared_array = np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf)
shared_array[:] = data[:]
n_workers = multiprocessing.cpu_count()
chunk_size = len(data) // n_workers
processes = []
for i in range(n_workers):
start = i * chunk_size
end = start + chunk_size if i < n_workers - 1 else len(data)
p = multiprocessing.Process(
target=worker_shared_memory,
args=(shm.name, data.shape, data.dtype, start, end)
)
processes.append(p)
p.start()
for p in processes:
p.join()
result = shared_array.copy()
shm.close()
shm.unlink()
return result
data = np.random.randn(1_000_000).astype(np.float64)
result = process_with_shared_memory(data)
print(f"Shared memory processing complete: {result[:5]}")
6. 非同期プログラミング(asyncio)
6.1 async/awaitパターン
import asyncio
import time
async def fetch_embeddings(text: str, model: str = "text-embedding-3-small") -> list:
"""非同期埋め込みリクエストのシミュレーション"""
await asyncio.sleep(0.1)
return [0.1, 0.2, 0.3]
async def sequential_processing(texts: list) -> list:
results = []
for text in texts:
embedding = await fetch_embeddings(text)
results.append(embedding)
return results
async def parallel_processing(texts: list) -> list:
tasks = [fetch_embeddings(text) for text in texts]
results = await asyncio.gather(*tasks)
return list(results)
async def compare_approaches():
texts = [f"sample text {i}" for i in range(20)]
start = time.perf_counter()
sequential_results = await sequential_processing(texts)
t_seq = time.perf_counter() - start
start = time.perf_counter()
parallel_results = await parallel_processing(texts)
t_par = time.perf_counter() - start
print(f"Sequential: {t_seq:.2f}s ({len(sequential_results)} results)")
print(f"Parallel: {t_par:.2f}s ({len(parallel_results)} results)")
print(f"Speedup: {t_seq/t_par:.1f}x")
asyncio.run(compare_approaches())
6.2 aiohttpによる非同期HTTP
import asyncio
import aiohttp
import time
from typing import Optional
class AsyncAIClient:
"""レート制限付き非同期AIAPIクライアント"""
def __init__(self, api_key: str, base_url: str, max_concurrent: int = 10):
self.api_key = api_key
self.base_url = base_url
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
connector = aiohttp.TCPConnector(limit=100)
self.session = aiohttp.ClientSession(
connector=connector,
headers={"Authorization": f"Bearer {self.api_key}"}
)
return self
async def __aexit__(self, *args):
if self.session:
await self.session.close()
async def get_embedding(self, text: str, retry: int = 3) -> list:
"""指数バックオフリトライ付き埋め込みリクエスト"""
async with self.semaphore:
for attempt in range(retry):
try:
async with self.session.post(
f"{self.base_url}/embeddings",
json={"input": text, "model": "text-embedding-3-small"}
) as response:
if response.status == 200:
data = await response.json()
return data["data"][0]["embedding"]
elif response.status == 429:
# レート制限 — 指数バックオフ
await asyncio.sleep(2 ** attempt)
else:
response.raise_for_status()
except aiohttp.ClientError as e:
if attempt == retry - 1:
raise
await asyncio.sleep(1)
return []
async def batch_embeddings(self, texts: list, batch_size: int = 100) -> list:
"""バッチで埋め込みを処理"""
all_results = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
tasks = [self.get_embedding(text) for text in batch]
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
for j, result in enumerate(batch_results):
if isinstance(result, Exception):
print(f"Error for text {i+j}: {result}")
all_results.append(None)
else:
all_results.append(result)
return all_results
6.3 AI API用動的バッチプロセッサー
import asyncio
from dataclasses import dataclass, field
from typing import Any
import time
@dataclass
class BatchProcessor:
"""動的リクエストバッチャー"""
max_batch_size: int = 20
max_wait_time: float = 0.05 # 50ms
queue: asyncio.Queue = field(default_factory=asyncio.Queue)
async def process_request(self, request_id: str, data: Any) -> Any:
future = asyncio.get_event_loop().create_future()
await self.queue.put((request_id, data, future))
return await future
async def batch_worker(self):
while True:
batch = []
deadline = time.perf_counter() + self.max_wait_time
while len(batch) < self.max_batch_size:
timeout = deadline - time.perf_counter()
if timeout <= 0:
break
try:
item = await asyncio.wait_for(self.queue.get(), timeout=timeout)
batch.append(item)
except asyncio.TimeoutError:
break
if not batch:
await asyncio.sleep(0.001)
continue
request_ids, data_list, futures = zip(*batch)
try:
results = await self._call_api_batch(list(data_list))
for future, result in zip(futures, results):
future.set_result(result)
except Exception as e:
for future in futures:
future.set_exception(e)
async def _call_api_batch(self, data_list: list) -> list:
await asyncio.sleep(0.1) # APIレイテンシのシミュレーション
return [f"result_{d}" for d in data_list]
async def test_batch_processor():
processor = BatchProcessor(max_batch_size=10, max_wait_time=0.05)
worker_task = asyncio.create_task(processor.batch_worker())
async def make_request(i):
return await processor.process_request(f"req_{i}", f"data_{i}")
start = time.perf_counter()
tasks = [make_request(i) for i in range(50)]
results = await asyncio.gather(*tasks)
elapsed = time.perf_counter() - start
print(f"50 requests processed: {elapsed:.3f}s")
print(f"First 5 results: {results[:5]}")
worker_task.cancel()
asyncio.run(test_batch_processor())
7. メモリ管理
7.1 Pythonメモリモデル
import sys
import gc
data_types = {
'int': 42,
'float': 3.14,
'str (small)': "hello",
'str (large)': "hello" * 1000,
'list (empty)': [],
'dict (empty)': {},
'tuple (empty)': (),
}
print("Object sizes:")
for name, obj in data_types.items():
print(f" {name}: {sys.getsizeof(obj)} bytes")
def deep_size(obj, seen=None):
"""オブジェクトとその参照するすべてのオブジェクトの合計メモリ"""
if seen is None:
seen = set()
obj_id = id(obj)
if obj_id in seen:
return 0
seen.add(obj_id)
size = sys.getsizeof(obj)
if isinstance(obj, dict):
size += sum(deep_size(k, seen) + deep_size(v, seen)
for k, v in obj.items())
elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes)):
size += sum(deep_size(item, seen) for item in obj)
return size
nested = {'data': [i for i in range(1000)], 'meta': {'name': 'test'}}
print(f"\nNested dict size: {deep_size(nested):,} bytes")
7.2 メモリ効率のためのジェネレータ
import numpy as np
from typing import Generator, Iterator
def dataset_generator(
file_paths: list,
batch_size: int = 32
) -> Generator:
"""メモリ効率的なデータローダー"""
for path in file_paths:
data = np.random.randn(1000, 128)
labels = np.random.randint(0, 10, 1000)
for start in range(0, len(data), batch_size):
end = min(start + batch_size, len(data))
yield data[start:end], labels[start:end]
def augment_generator(gen: Iterator, noise_std: float = 0.01):
"""連鎖拡張ジェネレータ"""
for X, y in gen:
X_aug = X + np.random.randn(*X.shape) * noise_std
yield X_aug, y
def normalize_generator(gen: Iterator):
"""正規化ジェネレータ"""
for X, y in gen:
X_norm = (X - X.mean(axis=1, keepdims=True)) / (X.std(axis=1, keepdims=True) + 1e-8)
yield X_norm, y
# パイプラインの構成
fake_paths = [f"data_{i}.npy" for i in range(5)]
base_gen = dataset_generator(fake_paths, batch_size=32)
aug_gen = augment_generator(base_gen)
norm_gen = normalize_generator(aug_gen)
for i, (X_batch, y_batch) in enumerate(norm_gen):
if i >= 3:
break
print(f"Batch {i}: shape={X_batch.shape}, mean={X_batch.mean():.4f}")
7.3 __slots__の最適化
import sys
class RegularEmbedding:
def __init__(self, vector, label, metadata):
self.vector = vector
self.label = label
self.metadata = metadata
class SlottedEmbedding:
__slots__ = ['vector', 'label', 'metadata']
def __init__(self, vector, label, metadata):
self.vector = vector
self.label = label
self.metadata = metadata
import numpy as np
n = 100_000
vector_size = 128
regular_objects = [
RegularEmbedding(np.random.randn(vector_size), i % 10, {'source': 'test'})
for i in range(n)
]
slotted_objects = [
SlottedEmbedding(np.random.randn(vector_size), i % 10, {'source': 'test'})
for i in range(n)
]
regular_size = sys.getsizeof(regular_objects[0]) + sys.getsizeof(regular_objects[0].__dict__)
slotted_size = sys.getsizeof(slotted_objects[0])
print(f"Regular class instance: {regular_size} bytes")
print(f"__slots__ class instance: {slotted_size} bytes")
print(f"Memory saved: {(regular_size - slotted_size) / regular_size * 100:.1f}%")
7.4 weakref
import weakref
import gc
class ModelCache:
"""弱参照を使用したモデルキャッシュ"""
def __init__(self):
self._cache = weakref.WeakValueDictionary()
def get_or_load(self, model_name: str):
model = self._cache.get(model_name)
if model is None:
print(f"Loading model: {model_name}")
model = self._load_model(model_name)
self._cache[model_name] = model
else:
print(f"Returning from cache: {model_name}")
return model
def _load_model(self, model_name: str):
import numpy as np
return {'name': model_name, 'weights': np.random.randn(100, 100)}
cache = ModelCache()
model = cache.get_or_load('resnet50')
print(f"Cache size: {len(cache._cache)}")
model_again = cache.get_or_load('resnet50')
del model
del model_again
gc.collect()
print(f"Cache size after GC: {len(cache._cache)}")
new_model = cache.get_or_load('resnet50')
8. Python型ヒント
8.1 mypyによる静的型チェック
pip install mypy
mypy your_script.py --strict
from typing import Optional, Union, List, Dict, Tuple, Callable
import numpy as np
from numpy.typing import NDArray
def compute_loss(
predictions: NDArray[np.float32],
targets: NDArray[np.float32],
reduction: str = 'mean'
) -> float:
diff = predictions - targets
loss = np.sum(diff ** 2)
if reduction == 'mean':
return float(loss / len(predictions))
return float(loss)
def load_checkpoint(
path: str,
device: Optional[str] = None,
strict: bool = True
) -> Optional[Dict[str, NDArray]]:
try:
return {'weights': np.random.randn(100)}
except FileNotFoundError:
return None
def split_dataset(
X: NDArray,
y: NDArray,
test_size: float = 0.2,
random_state: Optional[int] = None
) -> Tuple[NDArray, NDArray, NDArray, NDArray]:
if random_state is not None:
np.random.seed(random_state)
n = len(X)
n_test = int(n * test_size)
indices = np.random.permutation(n)
test_idx = indices[:n_test]
train_idx = indices[n_test:]
return X[train_idx], X[test_idx], y[train_idx], y[test_idx]
8.2 ジェネリクスとProtocol
from typing import TypeVar, Generic, Protocol, runtime_checkable, List, Dict
import numpy as np
T = TypeVar('T')
class DataLoader(Generic[T]):
def __init__(self, dataset: List[T], batch_size: int = 32):
self.dataset = dataset
self.batch_size = batch_size
self._idx = 0
def __iter__(self):
self._idx = 0
return self
def __next__(self) -> List[T]:
if self._idx >= len(self.dataset):
raise StopIteration
batch = self.dataset[self._idx:self._idx + self.batch_size]
self._idx += self.batch_size
return batch
@runtime_checkable
class Optimizer(Protocol):
def step(self) -> None: ...
def zero_grad(self) -> None: ...
def state_dict(self) -> Dict: ...
@runtime_checkable
class Model(Protocol):
def forward(self, x: 'NDArray') -> 'NDArray': ...
def parameters(self) -> List['NDArray']: ...
def train(self) -> None: ...
def eval(self) -> None: ...
8.3 dataclassとTypedDict
from dataclasses import dataclass, field
from typing import TypedDict, List, Dict, Optional
class ModelConfig(TypedDict):
hidden_size: int
num_layers: int
dropout: float
activation: str
@dataclass
class ExperimentConfig:
experiment_name: str
model_type: str
learning_rate: float = 1e-3
batch_size: int = 32
max_epochs: int = 100
seed: int = 42
tags: List[str] = field(default_factory=list)
hyperparams: Dict[str, float] = field(default_factory=dict)
def __post_init__(self):
if self.learning_rate <= 0:
raise ValueError(f"Learning rate must be positive: {self.learning_rate}")
if not self.experiment_name:
raise ValueError("Experiment name is required")
def to_dict(self) -> Dict:
from dataclasses import asdict
return asdict(self)
config = ExperimentConfig(
experiment_name="bert-finetuning-v1",
model_type="bert-base",
learning_rate=2e-5,
batch_size=16,
tags=["nlp", "classification"]
)
print(f"Experiment: {config.experiment_name}")
print(f"Config: {config.to_dict()}")
8.4 pydanticによるデータ検証
from pydantic import BaseModel, Field, validator, root_validator
from typing import Optional, List
class DataConfig(BaseModel):
data_path: str
batch_size: int = Field(ge=1, le=1024, default=32)
shuffle: bool = True
num_workers: int = Field(ge=0, le=16, default=4)
@validator('data_path')
def path_must_exist(cls, v):
import os
if not os.path.exists(v) and v != "test_path":
raise ValueError(f"Data path does not exist: {v}")
return v
class ModelConfig(BaseModel):
architecture: str
hidden_sizes: List[int] = Field(min_items=1)
dropout_rate: float = Field(ge=0.0, le=0.9, default=0.1)
activation: str = "relu"
@validator('activation')
def valid_activation(cls, v):
valid = ['relu', 'gelu', 'silu', 'tanh', 'sigmoid']
if v not in valid:
raise ValueError(f"Activation must be one of {valid}")
return v
class TrainingConfig(BaseModel):
data: DataConfig
model: ModelConfig
learning_rate: float = Field(gt=0, le=1.0, default=1e-3)
weight_decay: float = Field(ge=0, default=1e-4)
max_epochs: int = Field(ge=1, le=10000, default=100)
@root_validator
def check_consistency(cls, values):
model = values.get('model')
if model and model.dropout_rate > 0.5:
print("Warning: High dropout rate may cause underfitting")
return values
try:
config = TrainingConfig(
data=DataConfig(data_path="test_path", batch_size=64),
model=ModelConfig(
architecture="transformer",
hidden_sizes=[512, 256, 128],
dropout_rate=0.1
),
learning_rate=2e-5
)
print(f"Validation passed: {config.model.architecture}")
print(f"JSON: {config.json(indent=2)}")
except Exception as e:
print(f"Validation error: {e}")
9. パッケージ管理と依存関係
9.1 Poetry
# Poetryのインストール
curl -sSL https://install.python-poetry.org | python3 -
poetry new my-ml-project
cd my-ml-project
# 依存関係の追加
poetry add numpy torch torchvision
poetry add --group dev pytest black ruff mypy
# 仮想環境のアクティベート
poetry shell
# すべての依存関係をインストール
poetry install
# requirements.txtにエクスポート
poetry export -f requirements.txt --output requirements.txt
9.2 pyproject.toml
[tool.poetry]
name = "my-ml-project"
version = "0.1.0"
description = "AI/ML project"
authors = ["Your Name <email@example.com>"]
license = "MIT"
readme = "README.md"
packages = [{include = "my_ml_project"}]
[tool.poetry.dependencies]
python = "^3.10"
numpy = "^1.24"
torch = "^2.1"
torchvision = "^0.16"
transformers = "^4.35"
pydantic = "^2.0"
aiohttp = "^3.9"
[tool.poetry.group.dev.dependencies]
pytest = "^7.4"
pytest-cov = "^4.1"
black = "^23.0"
ruff = "^0.1"
mypy = "^1.6"
pre-commit = "^3.5"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
[tool.black]
line-length = 88
target-version = ['py310', 'py311']
[tool.ruff]
select = ["E", "F", "I", "N", "W"]
ignore = ["E501"]
line-length = 88
[tool.mypy]
python_version = "3.10"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "-v --cov=my_ml_project --cov-report=html"
9.3 uv(高速パッケージマネージャー)
# uvのインストール(Rustベース、超高速)
curl -LsSf https://astral.sh/uv/install.sh | sh
# プロジェクトの初期化
uv init my-project
cd my-project
# 依存関係の追加(pipより10〜100倍高速)
uv add numpy torch transformers
uv add --dev pytest ruff mypy
# 環境の同期
uv sync
# Pythonバージョンの管理
uv python install 3.11
uv python pin 3.11
# スクリプトの実行
uv run python train.py
10. MLのためのクリーンコード
10.1 デザインパターン(ファクトリー、ストラテジー)
from abc import ABC, abstractmethod
from typing import Dict, Type
import numpy as np
# ストラテジーパターン — 損失関数
class LossFunction(ABC):
@abstractmethod
def __call__(self, predictions: np.ndarray, targets: np.ndarray) -> float:
pass
@abstractmethod
def gradient(self, predictions: np.ndarray, targets: np.ndarray) -> np.ndarray:
pass
class MSELoss(LossFunction):
def __call__(self, predictions, targets):
return float(np.mean((predictions - targets) ** 2))
def gradient(self, predictions, targets):
return 2 * (predictions - targets) / len(predictions)
class HuberLoss(LossFunction):
def __init__(self, delta: float = 1.0):
self.delta = delta
def __call__(self, predictions, targets):
diff = predictions - targets
is_small = np.abs(diff) <= self.delta
loss = np.where(is_small, 0.5 * diff**2,
self.delta * np.abs(diff) - 0.5 * self.delta**2)
return float(np.mean(loss))
def gradient(self, predictions, targets):
diff = predictions - targets
return np.where(np.abs(diff) <= self.delta, diff,
self.delta * np.sign(diff))
# ファクトリーパターン — モデルの作成
class ModelFactory:
_registry: Dict[str, Type] = {}
@classmethod
def register(cls, name: str):
def decorator(model_class):
cls._registry[name] = model_class
return model_class
return decorator
@classmethod
def create(cls, name: str, **kwargs):
if name not in cls._registry:
available = list(cls._registry.keys())
raise ValueError(f"Unknown model: {name}. Available: {available}")
return cls._registry[name](**kwargs)
@ModelFactory.register("linear")
class LinearModel:
def __init__(self, input_size: int, output_size: int):
self.W = np.random.randn(input_size, output_size) * 0.01
self.b = np.zeros(output_size)
def forward(self, X: np.ndarray) -> np.ndarray:
return X @ self.W + self.b
@ModelFactory.register("mlp")
class MLPModel:
def __init__(self, layer_sizes: list):
self.layers = []
for i in range(len(layer_sizes) - 1):
W = np.random.randn(layer_sizes[i], layer_sizes[i+1]) * 0.01
b = np.zeros(layer_sizes[i+1])
self.layers.append((W, b))
def forward(self, X: np.ndarray) -> np.ndarray:
out = X
for i, (W, b) in enumerate(self.layers):
out = out @ W + b
if i < len(self.layers) - 1:
out = np.maximum(0, out)
return out
model = ModelFactory.create("mlp", layer_sizes=[128, 256, 10])
X = np.random.randn(32, 128)
output = model.forward(X)
print(f"Model output shape: {output.shape}")
10.2 pytestを使ったMLテスト
# tests/test_models.py
import pytest
import numpy as np
from numpy.testing import assert_array_almost_equal
class TestLossFunctions:
def test_mse_zero_loss(self):
"""完全な予測はゼロ損失を生じさせるべき"""
x = np.array([1.0, 2.0, 3.0])
loss = MSELoss()(x, x)
assert abs(loss) < 1e-10
def test_mse_positive(self):
"""MSEは常に非負でなければならない"""
predictions = np.random.randn(100)
targets = np.random.randn(100)
loss = MSELoss()(predictions, targets)
assert loss >= 0
@pytest.mark.parametrize("batch_size", [1, 16, 32, 128])
def test_batch_sizes(self, batch_size):
"""モデルはさまざまなバッチサイズを処理できなければならない"""
X = np.random.randn(batch_size, 64)
model = LinearModel(64, 10)
output = model.forward(X)
assert output.shape == (batch_size, 10)
def test_numerical_gradient(self):
"""解析的勾配が数値的勾配と一致することを検証"""
loss_fn = MSELoss()
pred = np.array([0.5, 0.3, 0.2])
target = np.array([0.6, 0.2, 0.1])
analytical_grad = loss_fn.gradient(pred, target)
eps = 1e-5
numerical_grad = np.zeros_like(pred)
for i in range(len(pred)):
pred_plus = pred.copy()
pred_plus[i] += eps
pred_minus = pred.copy()
pred_minus[i] -= eps
numerical_grad[i] = (loss_fn(pred_plus, target) - loss_fn(pred_minus, target)) / (2 * eps)
assert_array_almost_equal(analytical_grad, numerical_grad, decimal=5)
# 実行方法:
# pytest tests/ -v --cov=src --cov-report=html -x
10.3 ロギングのベストプラクティス
import logging
import sys
from pathlib import Path
from typing import Optional
def setup_logger(
name: str,
level: int = logging.INFO,
log_file: Optional[str] = None,
format_str: Optional[str] = None
) -> logging.Logger:
if format_str is None:
format_str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
formatter = logging.Formatter(format_str, datefmt='%Y-%m-%d %H:%M:%S')
logger = logging.getLogger(name)
logger.setLevel(level)
if not logger.handlers:
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
if log_file:
Path(log_file).parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
class TrainingLogger:
def __init__(self, experiment_name: str, log_dir: str = "logs"):
self.logger = setup_logger(
f"training.{experiment_name}",
log_file=f"{log_dir}/{experiment_name}.log"
)
self.step = 0
def log_metrics(self, metrics: dict, phase: str = "train"):
metrics_str = ", ".join(f"{k}={v:.4f}" for k, v in metrics.items())
self.logger.info(f"[{phase}] step={self.step} {metrics_str}")
self.step += 1
def log_epoch(self, epoch: int, train_loss: float, val_loss: float):
self.logger.info(
f"Epoch {epoch}: train_loss={train_loss:.4f}, val_loss={val_loss:.4f}"
)
def log_hyperparams(self, params: dict):
self.logger.info(f"Hyperparameters: {params}")
logger = TrainingLogger("bert-finetuning-v1")
logger.log_hyperparams({"lr": 2e-5, "batch_size": 32, "epochs": 10})
for epoch in range(3):
for step in range(10):
logger.log_metrics(
{"loss": 2.0 - epoch * 0.3 - step * 0.01, "acc": 0.5 + epoch * 0.1},
phase="train"
)
logger.log_epoch(epoch, 1.5 - epoch * 0.3, 1.8 - epoch * 0.25)
まとめ
このガイドで解説したテクニックを実際のAI/MLプロジェクトに適用する際は、以下の優先順位に従いましょう:
-
まずプロファイリング:最適化前に必ずcProfileとline_profilerでボトルネックを特定する。
-
NumPyでベクトル化:ほとんどの場合、ベクトル化だけで10〜100倍の高速化が実現できる。
-
ホットループにNumbaを適用:純粋なPythonループが本当に避けられない場合にNumba JITを使用する。
-
CPU飽和にはマルチプロセッシングを使用:データ前処理やクロスバリデーションなどの独立したタスクにProcessPoolExecutorを活用する。
-
I/Oバウンドパイプラインにはasyncio:APIヘビーまたはファイルI/Oヘビーなワークフローで10倍以上のスループット向上を達成する。
-
型ヒントとテスト:mypy、pytest、pydanticを組み合わせて、安全にスケールできる堅牢なMLコードベースを維持する。