概要
PythonはAI/MLエコシステムの共通言語です。しかし、素のPythonコードには本質的なパフォーマンスの限界があり、大規模なデータ処理やモデル学習パイプラインでボトルネックが生じます。このガイドでは、本番グレードのAI/MLシステムを構築するための高度なPythonテクニックを解説します。
純粋なPythonループより何百倍も速いコードの書き方、GPUの活用方法、メモリの効率的な管理方法を、実際に動かせるサンプルとともに学びます。
1. Pythonパフォーマンスプロファイリング
最適化の前に必ずプロファイリングを行いましょう。「測定できなければ改善できない」が鉄則です。
1.1 cProfileとpstats
Pythonの標準ライブラリに含まれる`cProfile`は、関数レベルのプロファイリングの基本ツールです。
def slow_matrix_multiply(a, b):
"""非効率な行列乗算"""
rows_a = len(a)
cols_a = len(a[0])
cols_b = len(b[0])
result = [[0] * cols_b for _ in range(rows_a)]
for i in range(rows_a):
for j in range(cols_b):
for k in range(cols_a):
result[i][j] += a[i][k] * b[k][j]
return result
def profile_function():
size = 100
a = [[float(i + j) for j in range(size)] for i in range(size)]
b = [[float(i * j + 1) for j in range(size)] for i in range(size)]
slow_matrix_multiply(a, b)
プロファイリング実行
pr = cProfile.Profile()
pr.enable()
profile_function()
pr.disable()
結果の分析
stream = io.StringIO()
ps = pstats.Stats(pr, stream=stream)
ps.sort_stats('cumulative')
ps.print_stats(20)
print(stream.getvalue())
コマンドラインから直接実行することもできます。
python -m cProfile -s cumulative your_script.py
python -m cProfile -o output.prof your_script.py
保存した`.prof`ファイルを分析する場合:
p = pstats.Stats('output.prof')
p.sort_stats('cumulative')
p.print_stats(10) # 上位10関数
p.print_callers('slow_function') # この関数を呼び出した関数
p.print_callees('main') # この関数が呼び出す関数
1.2 line_profiler(行レベル分析)
関数レベルではなく行レベルでパフォーマンスを測定します。
pip install line_profiler
from line_profiler import LineProfiler
def process_batch(data):
result = []
for item in data:
normalized = (item - item.mean()) / (item.std() + 1e-8)
scaled = normalized * 2.0
clipped = np.clip(scaled, -5, 5)
result.append(clipped)
return np.stack(result)
プロファイラーの設定
lp = LineProfiler()
lp_wrapper = lp(process_batch)
サンプルデータで実行
data = [np.random.randn(1000) for _ in range(1000)]
lp_wrapper(data)
結果の表示
lp.print_stats()
Jupyter Notebookではマジックコマンドを使用できます:
%load_ext line_profiler
%lprun -f process_batch process_batch(data)
1.3 memory_profiler
行ごとにメモリ使用量を追跡します。
pip install memory_profiler
from memory_profiler import profile
@profile
def memory_intensive_function(n=1_000_000):
list_data = list(range(n)) # リストの作成
arr = np.array(list_data) # NumPy配列に変換
del list_data # リストの削除
squared = arr ** 2 # 新しい配列の作成
filtered = squared[squared > 100] # フィルタリング
return filtered.sum()
result = memory_intensive_function()
print(f"Result: {result}")
python -m memory_profiler your_script.py
mprof run your_script.py
mprof plot # 可視化
1.4 py-spy(サンプリングプロファイラー)
コードを変更せずに実行中のPythonプロセスをプロファイリングできる強力なツールです。
pip install py-spy
実行中のプロセスをプロファイリング
py-spy top --pid 12345
フレームグラフの生成
py-spy record -o profile.svg --pid 12345
スクリプトを直接プロファイリング
py-spy record -o profile.svg -- python your_script.py
1.5 SnakeViz可視化
pip install snakeviz
python -m cProfile -o output.prof your_script.py
snakeviz output.prof
2. NumPyベクトル化の深掘り
NumPyのベクトル化はPython AI/MLパフォーマンス最適化の要です。
2.1 forループとベクトル化のパフォーマンス比較
def benchmark(func, *args, runs=5):
times = []
for _ in range(runs):
start = time.perf_counter()
result = func(*args)
end = time.perf_counter()
times.append(end - start)
return np.mean(times), np.std(times), result
1. PythonのForループ
def relu_loop(x):
result = []
for val in x:
result.append(max(0, val))
return result
2. NumPyベクトル化
def relu_numpy(x):
return np.maximum(0, x)
3. NumPy clip
def relu_clip(x):
return np.clip(x, 0, None)
n = 1_000_000
data_list = [np.random.randn() for _ in range(n)]
data_np = np.array(data_list)
t_loop, _, _ = benchmark(relu_loop, data_list)
t_numpy, _, _ = benchmark(relu_numpy, data_np)
t_clip, _, _ = benchmark(relu_clip, data_np)
print(f"Python loop: {t_loop:.4f}s")
print(f"NumPy maximum: {t_numpy:.4f}s ({t_loop/t_numpy:.0f}x faster)")
print(f"NumPy clip: {t_clip:.4f}s ({t_loop/t_clip:.0f}x faster)")
2.2 ユニバーサル関数(ufunc)
def custom_activation_scalar(x, alpha=0.1):
"""ELU活性化関数"""
if x >= 0:
return x
else:
return alpha * (np.exp(x) - 1)
vectorizeでufuncを作成
elu_ufunc = np.vectorize(custom_activation_scalar)
完全なNumPy ufunc(より高速)
def elu_numpy(x, alpha=0.1):
return np.where(x >= 0, x, alpha * (np.exp(x) - 1))
x = np.random.randn(1_000_000)
t_vectorize, _, _ = benchmark(elu_ufunc, x)
t_numpy, _, _ = benchmark(elu_numpy, x)
print(f"np.vectorize: {t_vectorize:.4f}s")
print(f"np.where: {t_numpy:.4f}s")
2.3 np.einsum(アインシュタイン縮約表記)
batch_size, seq_len, d_model = 32, 128, 512
heads, d_head = 8, 64
A = np.random.randn(batch_size, seq_len, d_model)
W = np.random.randn(d_model, d_model)
標準的な行列積
result_matmul = A @ W # (32, 128, 512)
einsumで同じ演算
result_einsum = np.einsum('bsi,ij->bsj', A, W)
アテンションスコア(バッチ行列積)
Q = np.random.randn(batch_size, heads, seq_len, d_head)
K = np.random.randn(batch_size, heads, seq_len, d_head)
scores_manual = Q @ K.transpose(0, 1, 3, 2) # (32, 8, 128, 128)
scores_einsum = np.einsum('bhid,bhjd->bhij', Q, K)
バッチ外積
a = np.random.randn(batch_size, d_model)
b = np.random.randn(batch_size, d_model)
outer = np.einsum('bi,bj->bij', a, b) # (32, 512, 512)
トレース(対角成分の和)
M = np.random.randn(batch_size, d_model, d_model)
trace = np.einsum('bii->b', M) # (32,)
print(f"Attention scores shape: {scores_einsum.shape}")
print(f"Outer product shape: {outer.shape}")
print(f"Trace shape: {trace.shape}")
最適な縮約経路にはoptimizeパラメータを使用
result = np.einsum('bhid,bhjd->bhij', Q, K, optimize='optimal')
2.4 ストライドトリック
from numpy.lib.stride_tricks import as_strided, sliding_window_view
arr = np.arange(20, dtype=float)
window_size = 5
現代的なアプローチ(NumPy 1.20以上)
windows = sliding_window_view(arr, window_size)
print(f"Sliding window shape: {windows.shape}") # (16, 5)
移動平均
moving_avg = windows.mean(axis=-1)
print(f"Moving average: {moving_avg[:5]}")
2Dスライディングウィンドウ(畳み込み前処理用)
image = np.random.randn(64, 64)
kernel_size = (3, 3)
windows_2d = sliding_window_view(image, kernel_size)
print(f"2D window shape: {windows_2d.shape}") # (62, 62, 3, 3)
ストライドトリックを使った非重複チャンク
batch = np.random.randn(1000, 10)
chunk_size = 50
n_chunks = len(batch) // chunk_size
chunks = batch[:n_chunks * chunk_size].reshape(n_chunks, chunk_size, -1)
print(f"Chunks shape: {chunks.shape}") # (20, 50, 10)
3. Numba JITコンパイル
NumbaはPython関数をマシンコードにコンパイルし、C/C++レベルのパフォーマンスを実現します。
3.1 @jit、@njitデコレータ
from numba import jit, njit, prange
@jit: フォールバックモード(Pythonオブジェクトを許可)
@jit(nopython=False)
def sum_with_jit(arr):
total = 0.0
for i in range(len(arr)):
total += arr[i]
return total
@njit: 純粋なネイティブモード(より高速、推奨)
@njit
def sum_with_njit(arr):
total = 0.0
for i in range(len(arr)):
total += arr[i]
return total
@njit
def compute_distances(points):
"""点間のユークリッド距離行列を計算"""
n = len(points)
distances = np.zeros((n, n))
for i in range(n):
for j in range(i + 1, n):
diff = points[i] - points[j]
dist = np.sqrt(np.sum(diff ** 2))
distances[i, j] = dist
distances[j, i] = dist
return distances
arr = np.random.randn(1_000_000)
最初の呼び出しはコンパイル時間を含む
_ = sum_with_njit(arr) # ウォームアップ
start = time.perf_counter()
result_numpy = np.sum(arr)
print(f"NumPy sum: {time.perf_counter() - start:.4f}s")
start = time.perf_counter()
result_njit = sum_with_njit(arr)
print(f"Numba njit: {time.perf_counter() - start:.4f}s")
点群の距離計算
points = np.random.randn(500, 3).astype(np.float64)
_ = compute_distances(points) # ウォームアップ
start = time.perf_counter()
dist_matrix = compute_distances(points)
print(f"Numba distance matrix ({points.shape[0]}x{points.shape[0]}): {time.perf_counter() - start:.4f}s")
3.2 型推論と明示的な型宣言
from numba import njit, float64, int64
明示的な型指定で初回呼び出しの遅延を回避
@njit(float64[:](float64[:], float64))
def scale_array_typed(arr, factor):
return arr * factor
複数の型シグネチャ
from numba import float32, float64
@njit([
float32[:](float32[:], float32),
float64[:](float64[:], float64)
])
def scale_multi_type(arr, factor):
return arr * factor
@njit
def softmax_numba(x):
"""Numba高速化softmax"""
max_val = np.max(x)
exp_x = np.exp(x - max_val)
return exp_x / np.sum(exp_x)
@njit
def batch_softmax(X):
"""2次元配列のバッチsoftmax"""
result = np.empty_like(X)
for i in range(X.shape[0]):
result[i] = softmax_numba(X[i])
return result
X = np.random.randn(1000, 512).astype(np.float64)
_ = batch_softmax(X) # ウォームアップ
start = time.perf_counter()
result = batch_softmax(X)
print(f"Numba batch softmax: {time.perf_counter() - start:.4f}s")
3.3 並列処理(@prange)
from numba import njit, prange
@njit(parallel=True)
def parallel_normalize(X):
"""自動並列化による行ごとの正規化"""
result = np.empty_like(X)
for i in prange(X.shape[0]): # 自動並列化
row = X[i]
mean = row.mean()
std = row.std()
result[i] = (row - mean) / (std + 1e-8)
return result
@njit(parallel=True)
def parallel_batch_process(data, weights):
"""バッチの並列加重和"""
n_samples, n_features = data.shape
results = np.zeros(n_samples)
for i in prange(n_samples):
dot = 0.0
for j in range(n_features):
dot += data[i, j] * weights[j]
results[i] = dot
return results
X = np.random.randn(10000, 512).astype(np.float64)
_ = parallel_normalize(X) # ウォームアップ
start = time.perf_counter()
result_parallel = parallel_normalize(X)
t_parallel = time.perf_counter() - start
@njit
def sequential_normalize(X):
result = np.empty_like(X)
for i in range(X.shape[0]):
row = X[i]
mean = row.mean()
std = row.std()
result[i] = (row - mean) / (std + 1e-8)
return result
_ = sequential_normalize(X)
start = time.perf_counter()
result_seq = sequential_normalize(X)
t_seq = time.perf_counter() - start
print(f"Sequential: {t_seq:.4f}s")
print(f"Parallel: {t_parallel:.4f}s ({t_seq/t_parallel:.1f}x faster)")
3.4 Numbaキャッシュ
from numba import njit
cache=Trueでコンパイル済みコードをディスクに保存
@njit(cache=True)
def cached_computation(x, y):
"""キャッシュ済みJIT関数 — 次回実行時は即座にロード"""
result = np.empty(len(x))
for i in range(len(x)):
result[i] = x[i] ** 2 + y[i] ** 2
return result
arr1 = np.random.randn(1000)
arr2 = np.random.randn(1000)
result = cached_computation(arr1, arr2)
4. Cython
CythonはPythonコードをCにトランスパイルし、最大のパフォーマンスを実現します。
4.1 .pyxファイルの作成
setup.py — Cythonビルド設定
from setuptools import setup
from Cython.Build import cythonize
setup(
ext_modules=cythonize(
"fast_ops.pyx",
compiler_directives={
'language_level': '3',
'boundscheck': False,
'wraparound': False,
'nonecheck': False,
}
),
include_dirs=[np.get_include()]
)
`fast_ops.pyx`の例:
fast_ops.pyx
cython: language_level=3
cython: boundscheck=False
cython: wraparound=False
cimport numpy as cnp
from libc.math cimport exp, sqrt
def relu_cython(cnp.ndarray[cnp.double_t, ndim=1] x):
cdef int n = len(x)
cdef cnp.ndarray[cnp.double_t, ndim=1] result = np.empty(n)
cdef int i
cdef double val
for i in range(n):
val = x[i]
result[i] = val if val > 0 else 0.0
return result
def softmax_cython(cnp.ndarray[cnp.double_t, ndim=1] x):
cdef int n = len(x)
cdef cnp.ndarray[cnp.double_t, ndim=1] result = np.empty(n)
cdef double max_val = x[0]
cdef double sum_exp = 0.0
cdef int i
for i in range(n):
if x[i] > max_val:
max_val = x[i]
for i in range(n):
result[i] = exp(x[i] - max_val)
sum_exp += result[i]
for i in range(n):
result[i] /= sum_exp
return result
ビルドと使用:
python setup.py build_ext --inplace
x = np.random.randn(1_000_000)
result = fast_ops.relu_cython(x)
4.2 IPython Cythonマジック
%load_ext Cython
%%cython -a
-a: アノテーションモード(黄色=Python呼び出し、白=C)
cimport numpy as cnp
def fast_l2_norm(cnp.ndarray[cnp.double_t, ndim=1] v):
cdef int n = len(v)
cdef double total = 0.0
cdef int i
for i in range(n):
total += v[i] * v[i]
return total ** 0.5
5. Pythonマルチプロセッシング
5.1 GILの理解
GIL(Global Interpreter Lock)は一度に1つのスレッドしかPythonバイトコードを実行できません。CPUバウンドなタスクでは、マルチスレッドは実質的な並列処理を提供しません — 代わりにマルチプロセッシングを使用します。
def cpu_bound_task(n):
total = 0
for i in range(n):
total += i ** 2
return total
def run_threaded(n_tasks=4, n_per_task=1_000_000):
threads = []
start = time.perf_counter()
for _ in range(n_tasks):
t = threading.Thread(target=cpu_bound_task, args=(n_per_task,))
threads.append(t)
t.start()
for t in threads:
t.join()
return time.perf_counter() - start
def run_multiprocessing(n_tasks=4, n_per_task=1_000_000):
start = time.perf_counter()
with multiprocessing.Pool(processes=n_tasks) as pool:
results = pool.map(cpu_bound_task, [n_per_task] * n_tasks)
return time.perf_counter() - start
t_threaded = run_threaded()
t_mp = run_multiprocessing()
print(f"Threading: {t_threaded:.2f}s")
print(f"Multiprocessing: {t_mp:.2f}s ({t_threaded/t_mp:.1f}x faster)")
5.2 multiprocessing.Pool
def preprocess_sample(args):
idx, data, mean, std = args
normalized = (data - mean) / (std + 1e-8)
augmented = normalized + np.random.randn(*normalized.shape) * 0.01
return idx, augmented
def preprocess_dataset_parallel(dataset, n_workers=None):
if n_workers is None:
n_workers = multiprocessing.cpu_count()
mean = dataset.mean(axis=0)
std = dataset.std(axis=0)
args = [(i, dataset[i], mean, std) for i in range(len(dataset))]
with multiprocessing.Pool(processes=n_workers) as pool:
results = pool.map(preprocess_sample, args, chunksize=100)
results.sort(key=lambda x: x[0])
return np.stack([r[1] for r in results])
dataset = np.random.randn(10000, 128)
processed = preprocess_dataset_parallel(dataset, n_workers=4)
print(f"Processed dataset shape: {processed.shape}")
5.3 ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor, as_completed
def train_fold(fold_data):
fold_idx, X_train, y_train, X_val, y_val = fold_data
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
model = LogisticRegression(max_iter=1000)
model.fit(X_train, y_train)
val_pred = model.predict(X_val)
accuracy = accuracy_score(y_val, val_pred)
return fold_idx, accuracy, model
def parallel_cross_validation(X, y, n_folds=5):
from sklearn.model_selection import KFold
kf = KFold(n_splits=n_folds, shuffle=True, random_state=42)
fold_data_list = []
for fold_idx, (train_idx, val_idx) in enumerate(kf.split(X)):
fold_data_list.append((
fold_idx,
X[train_idx], y[train_idx],
X[val_idx], y[val_idx]
))
results = {}
with ProcessPoolExecutor(max_workers=n_folds) as executor:
future_to_fold = {
executor.submit(train_fold, fd): fd[0]
for fd in fold_data_list
}
for future in as_completed(future_to_fold):
fold_idx, accuracy, model = future.result()
results[fold_idx] = {'accuracy': accuracy, 'model': model}
print(f"Fold {fold_idx}: accuracy = {accuracy:.4f}")
avg_accuracy = np.mean([r['accuracy'] for r in results.values()])
print(f"\nMean accuracy: {avg_accuracy:.4f}")
return results
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=5000, n_features=20, random_state=42)
results = parallel_cross_validation(X, y, n_folds=5)
5.4 共有メモリ
from multiprocessing import shared_memory
def worker_shared_memory(shm_name, shape, dtype, start_idx, end_idx):
existing_shm = shared_memory.SharedMemory(name=shm_name)
arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
arr[start_idx:end_idx] = arr[start_idx:end_idx] ** 2
existing_shm.close()
def process_with_shared_memory(data):
shm = shared_memory.SharedMemory(create=True, size=data.nbytes)
shared_array = np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf)
shared_array[:] = data[:]
n_workers = multiprocessing.cpu_count()
chunk_size = len(data) // n_workers
processes = []
for i in range(n_workers):
start = i * chunk_size
end = start + chunk_size if i < n_workers - 1 else len(data)
p = multiprocessing.Process(
target=worker_shared_memory,
args=(shm.name, data.shape, data.dtype, start, end)
)
processes.append(p)
p.start()
for p in processes:
p.join()
result = shared_array.copy()
shm.close()
shm.unlink()
return result
data = np.random.randn(1_000_000).astype(np.float64)
result = process_with_shared_memory(data)
print(f"Shared memory processing complete: {result[:5]}")
6. 非同期プログラミング(asyncio)
6.1 async/awaitパターン
async def fetch_embeddings(text: str, model: str = "text-embedding-3-small") -> list:
"""非同期埋め込みリクエストのシミュレーション"""
await asyncio.sleep(0.1)
return [0.1, 0.2, 0.3]
async def sequential_processing(texts: list) -> list:
results = []
for text in texts:
embedding = await fetch_embeddings(text)
results.append(embedding)
return results
async def parallel_processing(texts: list) -> list:
tasks = [fetch_embeddings(text) for text in texts]
results = await asyncio.gather(*tasks)
return list(results)
async def compare_approaches():
texts = [f"sample text {i}" for i in range(20)]
start = time.perf_counter()
sequential_results = await sequential_processing(texts)
t_seq = time.perf_counter() - start
start = time.perf_counter()
parallel_results = await parallel_processing(texts)
t_par = time.perf_counter() - start
print(f"Sequential: {t_seq:.2f}s ({len(sequential_results)} results)")
print(f"Parallel: {t_par:.2f}s ({len(parallel_results)} results)")
print(f"Speedup: {t_seq/t_par:.1f}x")
asyncio.run(compare_approaches())
6.2 aiohttpによる非同期HTTP
from typing import Optional
class AsyncAIClient:
"""レート制限付き非同期AIAPIクライアント"""
def __init__(self, api_key: str, base_url: str, max_concurrent: int = 10):
self.api_key = api_key
self.base_url = base_url
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
connector = aiohttp.TCPConnector(limit=100)
self.session = aiohttp.ClientSession(
connector=connector,
headers={"Authorization": f"Bearer {self.api_key}"}
)
return self
async def __aexit__(self, *args):
if self.session:
await self.session.close()
async def get_embedding(self, text: str, retry: int = 3) -> list:
"""指数バックオフリトライ付き埋め込みリクエスト"""
async with self.semaphore:
for attempt in range(retry):
try:
async with self.session.post(
f"{self.base_url}/embeddings",
json={"input": text, "model": "text-embedding-3-small"}
) as response:
if response.status == 200:
data = await response.json()
return data["data"][0]["embedding"]
elif response.status == 429:
レート制限 — 指数バックオフ
await asyncio.sleep(2 ** attempt)
else:
response.raise_for_status()
except aiohttp.ClientError as e:
if attempt == retry - 1:
raise
await asyncio.sleep(1)
return []
async def batch_embeddings(self, texts: list, batch_size: int = 100) -> list:
"""バッチで埋め込みを処理"""
all_results = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
tasks = [self.get_embedding(text) for text in batch]
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
for j, result in enumerate(batch_results):
if isinstance(result, Exception):
print(f"Error for text {i+j}: {result}")
all_results.append(None)
else:
all_results.append(result)
return all_results
6.3 AI API用動的バッチプロセッサー
from dataclasses import dataclass, field
from typing import Any
@dataclass
class BatchProcessor:
"""動的リクエストバッチャー"""
max_batch_size: int = 20
max_wait_time: float = 0.05 # 50ms
queue: asyncio.Queue = field(default_factory=asyncio.Queue)
async def process_request(self, request_id: str, data: Any) -> Any:
future = asyncio.get_event_loop().create_future()
await self.queue.put((request_id, data, future))
return await future
async def batch_worker(self):
while True:
batch = []
deadline = time.perf_counter() + self.max_wait_time
while len(batch) < self.max_batch_size:
timeout = deadline - time.perf_counter()
if timeout <= 0:
break
try:
item = await asyncio.wait_for(self.queue.get(), timeout=timeout)
batch.append(item)
except asyncio.TimeoutError:
break
if not batch:
await asyncio.sleep(0.001)
continue
request_ids, data_list, futures = zip(*batch)
try:
results = await self._call_api_batch(list(data_list))
for future, result in zip(futures, results):
future.set_result(result)
except Exception as e:
for future in futures:
future.set_exception(e)
async def _call_api_batch(self, data_list: list) -> list:
await asyncio.sleep(0.1) # APIレイテンシのシミュレーション
return [f"result_{d}" for d in data_list]
async def test_batch_processor():
processor = BatchProcessor(max_batch_size=10, max_wait_time=0.05)
worker_task = asyncio.create_task(processor.batch_worker())
async def make_request(i):
return await processor.process_request(f"req_{i}", f"data_{i}")
start = time.perf_counter()
tasks = [make_request(i) for i in range(50)]
results = await asyncio.gather(*tasks)
elapsed = time.perf_counter() - start
print(f"50 requests processed: {elapsed:.3f}s")
print(f"First 5 results: {results[:5]}")
worker_task.cancel()
asyncio.run(test_batch_processor())
7. メモリ管理
7.1 Pythonメモリモデル
data_types = {
'int': 42,
'float': 3.14,
'str (small)': "hello",
'str (large)': "hello" * 1000,
'list (empty)': [],
'dict (empty)': {},
'tuple (empty)': (),
}
print("Object sizes:")
for name, obj in data_types.items():
print(f" {name}: {sys.getsizeof(obj)} bytes")
def deep_size(obj, seen=None):
"""オブジェクトとその参照するすべてのオブジェクトの合計メモリ"""
if seen is None:
seen = set()
obj_id = id(obj)
if obj_id in seen:
return 0
seen.add(obj_id)
size = sys.getsizeof(obj)
if isinstance(obj, dict):
size += sum(deep_size(k, seen) + deep_size(v, seen)
for k, v in obj.items())
elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes)):
size += sum(deep_size(item, seen) for item in obj)
return size
nested = {'data': [i for i in range(1000)], 'meta': {'name': 'test'}}
print(f"\nNested dict size: {deep_size(nested):,} bytes")
7.2 メモリ効率のためのジェネレータ
from typing import Generator, Iterator
def dataset_generator(
file_paths: list,
batch_size: int = 32
) -> Generator:
"""メモリ効率的なデータローダー"""
for path in file_paths:
data = np.random.randn(1000, 128)
labels = np.random.randint(0, 10, 1000)
for start in range(0, len(data), batch_size):
end = min(start + batch_size, len(data))
yield data[start:end], labels[start:end]
def augment_generator(gen: Iterator, noise_std: float = 0.01):
"""連鎖拡張ジェネレータ"""
for X, y in gen:
X_aug = X + np.random.randn(*X.shape) * noise_std
yield X_aug, y
def normalize_generator(gen: Iterator):
"""正規化ジェネレータ"""
for X, y in gen:
X_norm = (X - X.mean(axis=1, keepdims=True)) / (X.std(axis=1, keepdims=True) + 1e-8)
yield X_norm, y
パイプラインの構成
fake_paths = [f"data_{i}.npy" for i in range(5)]
base_gen = dataset_generator(fake_paths, batch_size=32)
aug_gen = augment_generator(base_gen)
norm_gen = normalize_generator(aug_gen)
for i, (X_batch, y_batch) in enumerate(norm_gen):
if i >= 3:
break
print(f"Batch {i}: shape={X_batch.shape}, mean={X_batch.mean():.4f}")
7.3 `__slots__`の最適化
class RegularEmbedding:
def __init__(self, vector, label, metadata):
self.vector = vector
self.label = label
self.metadata = metadata
class SlottedEmbedding:
__slots__ = ['vector', 'label', 'metadata']
def __init__(self, vector, label, metadata):
self.vector = vector
self.label = label
self.metadata = metadata
n = 100_000
vector_size = 128
regular_objects = [
RegularEmbedding(np.random.randn(vector_size), i % 10, {'source': 'test'})
for i in range(n)
]
slotted_objects = [
SlottedEmbedding(np.random.randn(vector_size), i % 10, {'source': 'test'})
for i in range(n)
]
regular_size = sys.getsizeof(regular_objects[0]) + sys.getsizeof(regular_objects[0].__dict__)
slotted_size = sys.getsizeof(slotted_objects[0])
print(f"Regular class instance: {regular_size} bytes")
print(f"__slots__ class instance: {slotted_size} bytes")
print(f"Memory saved: {(regular_size - slotted_size) / regular_size * 100:.1f}%")
7.4 weakref
class ModelCache:
"""弱参照を使用したモデルキャッシュ"""
def __init__(self):
self._cache = weakref.WeakValueDictionary()
def get_or_load(self, model_name: str):
model = self._cache.get(model_name)
if model is None:
print(f"Loading model: {model_name}")
model = self._load_model(model_name)
self._cache[model_name] = model
else:
print(f"Returning from cache: {model_name}")
return model
def _load_model(self, model_name: str):
return {'name': model_name, 'weights': np.random.randn(100, 100)}
cache = ModelCache()
model = cache.get_or_load('resnet50')
print(f"Cache size: {len(cache._cache)}")
model_again = cache.get_or_load('resnet50')
del model
del model_again
gc.collect()
print(f"Cache size after GC: {len(cache._cache)}")
new_model = cache.get_or_load('resnet50')
8. Python型ヒント
8.1 mypyによる静的型チェック
pip install mypy
mypy your_script.py --strict
from typing import Optional, Union, List, Dict, Tuple, Callable
from numpy.typing import NDArray
def compute_loss(
predictions: NDArray[np.float32],
targets: NDArray[np.float32],
reduction: str = 'mean'
) -> float:
diff = predictions - targets
loss = np.sum(diff ** 2)
if reduction == 'mean':
return float(loss / len(predictions))
return float(loss)
def load_checkpoint(
path: str,
device: Optional[str] = None,
strict: bool = True
) -> Optional[Dict[str, NDArray]]:
try:
return {'weights': np.random.randn(100)}
except FileNotFoundError:
return None
def split_dataset(
X: NDArray,
y: NDArray,
test_size: float = 0.2,
random_state: Optional[int] = None
) -> Tuple[NDArray, NDArray, NDArray, NDArray]:
if random_state is not None:
np.random.seed(random_state)
n = len(X)
n_test = int(n * test_size)
indices = np.random.permutation(n)
test_idx = indices[:n_test]
train_idx = indices[n_test:]
return X[train_idx], X[test_idx], y[train_idx], y[test_idx]
8.2 ジェネリクスとProtocol
from typing import TypeVar, Generic, Protocol, runtime_checkable, List, Dict
T = TypeVar('T')
class DataLoader(Generic[T]):
def __init__(self, dataset: List[T], batch_size: int = 32):
self.dataset = dataset
self.batch_size = batch_size
self._idx = 0
def __iter__(self):
self._idx = 0
return self
def __next__(self) -> List[T]:
if self._idx >= len(self.dataset):
raise StopIteration
batch = self.dataset[self._idx:self._idx + self.batch_size]
self._idx += self.batch_size
return batch
@runtime_checkable
class Optimizer(Protocol):
def step(self) -> None: ...
def zero_grad(self) -> None: ...
def state_dict(self) -> Dict: ...
@runtime_checkable
class Model(Protocol):
def forward(self, x: 'NDArray') -> 'NDArray': ...
def parameters(self) -> List['NDArray']: ...
def train(self) -> None: ...
def eval(self) -> None: ...
8.3 dataclassとTypedDict
from dataclasses import dataclass, field
from typing import TypedDict, List, Dict, Optional
class ModelConfig(TypedDict):
hidden_size: int
num_layers: int
dropout: float
activation: str
@dataclass
class ExperimentConfig:
experiment_name: str
model_type: str
learning_rate: float = 1e-3
batch_size: int = 32
max_epochs: int = 100
seed: int = 42
tags: List[str] = field(default_factory=list)
hyperparams: Dict[str, float] = field(default_factory=dict)
def __post_init__(self):
if self.learning_rate <= 0:
raise ValueError(f"Learning rate must be positive: {self.learning_rate}")
if not self.experiment_name:
raise ValueError("Experiment name is required")
def to_dict(self) -> Dict:
from dataclasses import asdict
return asdict(self)
config = ExperimentConfig(
experiment_name="bert-finetuning-v1",
model_type="bert-base",
learning_rate=2e-5,
batch_size=16,
tags=["nlp", "classification"]
)
print(f"Experiment: {config.experiment_name}")
print(f"Config: {config.to_dict()}")
8.4 pydanticによるデータ検証
from pydantic import BaseModel, Field, validator, root_validator
from typing import Optional, List
class DataConfig(BaseModel):
data_path: str
batch_size: int = Field(ge=1, le=1024, default=32)
shuffle: bool = True
num_workers: int = Field(ge=0, le=16, default=4)
@validator('data_path')
def path_must_exist(cls, v):
if not os.path.exists(v) and v != "test_path":
raise ValueError(f"Data path does not exist: {v}")
return v
class ModelConfig(BaseModel):
architecture: str
hidden_sizes: List[int] = Field(min_items=1)
dropout_rate: float = Field(ge=0.0, le=0.9, default=0.1)
activation: str = "relu"
@validator('activation')
def valid_activation(cls, v):
valid = ['relu', 'gelu', 'silu', 'tanh', 'sigmoid']
if v not in valid:
raise ValueError(f"Activation must be one of {valid}")
return v
class TrainingConfig(BaseModel):
data: DataConfig
model: ModelConfig
learning_rate: float = Field(gt=0, le=1.0, default=1e-3)
weight_decay: float = Field(ge=0, default=1e-4)
max_epochs: int = Field(ge=1, le=10000, default=100)
@root_validator
def check_consistency(cls, values):
model = values.get('model')
if model and model.dropout_rate > 0.5:
print("Warning: High dropout rate may cause underfitting")
return values
try:
config = TrainingConfig(
data=DataConfig(data_path="test_path", batch_size=64),
model=ModelConfig(
architecture="transformer",
hidden_sizes=[512, 256, 128],
dropout_rate=0.1
),
learning_rate=2e-5
)
print(f"Validation passed: {config.model.architecture}")
print(f"JSON: {config.json(indent=2)}")
except Exception as e:
print(f"Validation error: {e}")
9. パッケージ管理と依存関係
9.1 Poetry
Poetryのインストール
curl -sSL https://install.python-poetry.org | python3 -
poetry new my-ml-project
cd my-ml-project
依存関係の追加
poetry add numpy torch torchvision
poetry add --group dev pytest black ruff mypy
仮想環境のアクティベート
poetry shell
すべての依存関係をインストール
poetry install
requirements.txtにエクスポート
poetry export -f requirements.txt --output requirements.txt
9.2 pyproject.toml
[tool.poetry]
name = "my-ml-project"
version = "0.1.0"
description = "AI/ML project"
authors = ["Your Name <email@example.com>"]
license = "MIT"
readme = "README.md"
packages = [{include = "my_ml_project"}]
[tool.poetry.dependencies]
python = "^3.10"
numpy = "^1.24"
torch = "^2.1"
torchvision = "^0.16"
transformers = "^4.35"
pydantic = "^2.0"
aiohttp = "^3.9"
[tool.poetry.group.dev.dependencies]
pytest = "^7.4"
pytest-cov = "^4.1"
black = "^23.0"
ruff = "^0.1"
mypy = "^1.6"
pre-commit = "^3.5"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
[tool.black]
line-length = 88
target-version = ['py310', 'py311']
[tool.ruff]
select = ["E", "F", "I", "N", "W"]
ignore = ["E501"]
line-length = 88
[tool.mypy]
python_version = "3.10"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "-v --cov=my_ml_project --cov-report=html"
9.3 uv(高速パッケージマネージャー)
uvのインストール(Rustベース、超高速)
curl -LsSf https://astral.sh/uv/install.sh | sh
プロジェクトの初期化
uv init my-project
cd my-project
依存関係の追加(pipより10〜100倍高速)
uv add numpy torch transformers
uv add --dev pytest ruff mypy
環境の同期
uv sync
Pythonバージョンの管理
uv python install 3.11
uv python pin 3.11
スクリプトの実行
uv run python train.py
10. MLのためのクリーンコード
10.1 デザインパターン(ファクトリー、ストラテジー)
from abc import ABC, abstractmethod
from typing import Dict, Type
ストラテジーパターン — 損失関数
class LossFunction(ABC):
@abstractmethod
def __call__(self, predictions: np.ndarray, targets: np.ndarray) -> float:
pass
@abstractmethod
def gradient(self, predictions: np.ndarray, targets: np.ndarray) -> np.ndarray:
pass
class MSELoss(LossFunction):
def __call__(self, predictions, targets):
return float(np.mean((predictions - targets) ** 2))
def gradient(self, predictions, targets):
return 2 * (predictions - targets) / len(predictions)
class HuberLoss(LossFunction):
def __init__(self, delta: float = 1.0):
self.delta = delta
def __call__(self, predictions, targets):
diff = predictions - targets
is_small = np.abs(diff) <= self.delta
loss = np.where(is_small, 0.5 * diff**2,
self.delta * np.abs(diff) - 0.5 * self.delta**2)
return float(np.mean(loss))
def gradient(self, predictions, targets):
diff = predictions - targets
return np.where(np.abs(diff) <= self.delta, diff,
self.delta * np.sign(diff))
ファクトリーパターン — モデルの作成
class ModelFactory:
_registry: Dict[str, Type] = {}
@classmethod
def register(cls, name: str):
def decorator(model_class):
cls._registry[name] = model_class
return model_class
return decorator
@classmethod
def create(cls, name: str, **kwargs):
if name not in cls._registry:
available = list(cls._registry.keys())
raise ValueError(f"Unknown model: {name}. Available: {available}")
return cls._registry[name](**kwargs)
@ModelFactory.register("linear")
class LinearModel:
def __init__(self, input_size: int, output_size: int):
self.W = np.random.randn(input_size, output_size) * 0.01
self.b = np.zeros(output_size)
def forward(self, X: np.ndarray) -> np.ndarray:
return X @ self.W + self.b
@ModelFactory.register("mlp")
class MLPModel:
def __init__(self, layer_sizes: list):
self.layers = []
for i in range(len(layer_sizes) - 1):
W = np.random.randn(layer_sizes[i], layer_sizes[i+1]) * 0.01
b = np.zeros(layer_sizes[i+1])
self.layers.append((W, b))
def forward(self, X: np.ndarray) -> np.ndarray:
out = X
for i, (W, b) in enumerate(self.layers):
out = out @ W + b
if i < len(self.layers) - 1:
out = np.maximum(0, out)
return out
model = ModelFactory.create("mlp", layer_sizes=[128, 256, 10])
X = np.random.randn(32, 128)
output = model.forward(X)
print(f"Model output shape: {output.shape}")
10.2 pytestを使ったMLテスト
tests/test_models.py
from numpy.testing import assert_array_almost_equal
class TestLossFunctions:
def test_mse_zero_loss(self):
"""完全な予測はゼロ損失を生じさせるべき"""
x = np.array([1.0, 2.0, 3.0])
loss = MSELoss()(x, x)
assert abs(loss) < 1e-10
def test_mse_positive(self):
"""MSEは常に非負でなければならない"""
predictions = np.random.randn(100)
targets = np.random.randn(100)
loss = MSELoss()(predictions, targets)
assert loss >= 0
@pytest.mark.parametrize("batch_size", [1, 16, 32, 128])
def test_batch_sizes(self, batch_size):
"""モデルはさまざまなバッチサイズを処理できなければならない"""
X = np.random.randn(batch_size, 64)
model = LinearModel(64, 10)
output = model.forward(X)
assert output.shape == (batch_size, 10)
def test_numerical_gradient(self):
"""解析的勾配が数値的勾配と一致することを検証"""
loss_fn = MSELoss()
pred = np.array([0.5, 0.3, 0.2])
target = np.array([0.6, 0.2, 0.1])
analytical_grad = loss_fn.gradient(pred, target)
eps = 1e-5
numerical_grad = np.zeros_like(pred)
for i in range(len(pred)):
pred_plus = pred.copy()
pred_plus[i] += eps
pred_minus = pred.copy()
pred_minus[i] -= eps
numerical_grad[i] = (loss_fn(pred_plus, target) - loss_fn(pred_minus, target)) / (2 * eps)
assert_array_almost_equal(analytical_grad, numerical_grad, decimal=5)
実行方法:
pytest tests/ -v --cov=src --cov-report=html -x
10.3 ロギングのベストプラクティス
from pathlib import Path
from typing import Optional
def setup_logger(
name: str,
level: int = logging.INFO,
log_file: Optional[str] = None,
format_str: Optional[str] = None
) -> logging.Logger:
if format_str is None:
format_str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
formatter = logging.Formatter(format_str, datefmt='%Y-%m-%d %H:%M:%S')
logger = logging.getLogger(name)
logger.setLevel(level)
if not logger.handlers:
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
if log_file:
Path(log_file).parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
class TrainingLogger:
def __init__(self, experiment_name: str, log_dir: str = "logs"):
self.logger = setup_logger(
f"training.{experiment_name}",
log_file=f"{log_dir}/{experiment_name}.log"
)
self.step = 0
def log_metrics(self, metrics: dict, phase: str = "train"):
metrics_str = ", ".join(f"{k}={v:.4f}" for k, v in metrics.items())
self.logger.info(f"[{phase}] step={self.step} {metrics_str}")
self.step += 1
def log_epoch(self, epoch: int, train_loss: float, val_loss: float):
self.logger.info(
f"Epoch {epoch}: train_loss={train_loss:.4f}, val_loss={val_loss:.4f}"
)
def log_hyperparams(self, params: dict):
self.logger.info(f"Hyperparameters: {params}")
logger = TrainingLogger("bert-finetuning-v1")
logger.log_hyperparams({"lr": 2e-5, "batch_size": 32, "epochs": 10})
for epoch in range(3):
for step in range(10):
logger.log_metrics(
{"loss": 2.0 - epoch * 0.3 - step * 0.01, "acc": 0.5 + epoch * 0.1},
phase="train"
)
logger.log_epoch(epoch, 1.5 - epoch * 0.3, 1.8 - epoch * 0.25)
まとめ
このガイドで解説したテクニックを実際のAI/MLプロジェクトに適用する際は、以下の優先順位に従いましょう:
1. **まずプロファイリング**:最適化前に必ずcProfileとline_profilerでボトルネックを特定する。
2. **NumPyでベクトル化**:ほとんどの場合、ベクトル化だけで10〜100倍の高速化が実現できる。
3. **ホットループにNumbaを適用**:純粋なPythonループが本当に避けられない場合にNumba JITを使用する。
4. **CPU飽和にはマルチプロセッシングを使用**:データ前処理やクロスバリデーションなどの独立したタスクにProcessPoolExecutorを活用する。
5. **I/Oバウンドパイプラインにはasyncio**:APIヘビーまたはファイルI/Oヘビーなワークフローで10倍以上のスループット向上を達成する。
6. **型ヒントとテスト**:mypy、pytest、pydanticを組み合わせて、安全にスケールできる堅牢なMLコードベースを維持する。
参考文献
- [Numba公式ドキュメント](https://numba.readthedocs.io/)
- [Cython公式ドキュメント](https://cython.readthedocs.io/)
- [Python asyncioドキュメント](https://docs.python.org/3/library/asyncio.html)
- [NumPy高度ガイド](https://numpy.org/doc/stable/user/quickstart.html)
- [Poetryドキュメント](https://python-poetry.org/docs/)
- [Pydantic v2ドキュメント](https://docs.pydantic.dev/)
현재 단락 (1/1053)
PythonはAI/MLエコシステムの共通言語です。しかし、素のPythonコードには本質的なパフォーマンスの限界があり、大規模なデータ処理やモデル学習パイプラインでボトルネックが生じます。このガイド...