Skip to content

필사 모드: AI/MLのためのPython高度テクニック: パフォーマンス最適化・並列処理・メモリ管理

日本語
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

概要

PythonはAI/MLエコシステムの共通言語です。しかし、素のPythonコードには本質的なパフォーマンスの限界があり、大規模なデータ処理やモデル学習パイプラインでボトルネックが生じます。このガイドでは、本番グレードのAI/MLシステムを構築するための高度なPythonテクニックを解説します。

純粋なPythonループより何百倍も速いコードの書き方、GPUの活用方法、メモリの効率的な管理方法を、実際に動かせるサンプルとともに学びます。

1. Pythonパフォーマンスプロファイリング

最適化の前に必ずプロファイリングを行いましょう。「測定できなければ改善できない」が鉄則です。

1.1 cProfileとpstats

Pythonの標準ライブラリに含まれる`cProfile`は、関数レベルのプロファイリングの基本ツールです。

def slow_matrix_multiply(a, b):

"""非効率な行列乗算"""

rows_a = len(a)

cols_a = len(a[0])

cols_b = len(b[0])

result = [[0] * cols_b for _ in range(rows_a)]

for i in range(rows_a):

for j in range(cols_b):

for k in range(cols_a):

result[i][j] += a[i][k] * b[k][j]

return result

def profile_function():

size = 100

a = [[float(i + j) for j in range(size)] for i in range(size)]

b = [[float(i * j + 1) for j in range(size)] for i in range(size)]

slow_matrix_multiply(a, b)

プロファイリング実行

pr = cProfile.Profile()

pr.enable()

profile_function()

pr.disable()

結果の分析

stream = io.StringIO()

ps = pstats.Stats(pr, stream=stream)

ps.sort_stats('cumulative')

ps.print_stats(20)

print(stream.getvalue())

コマンドラインから直接実行することもできます。

python -m cProfile -s cumulative your_script.py

python -m cProfile -o output.prof your_script.py

保存した`.prof`ファイルを分析する場合:

p = pstats.Stats('output.prof')

p.sort_stats('cumulative')

p.print_stats(10) # 上位10関数

p.print_callers('slow_function') # この関数を呼び出した関数

p.print_callees('main') # この関数が呼び出す関数

1.2 line_profiler(行レベル分析)

関数レベルではなく行レベルでパフォーマンスを測定します。

pip install line_profiler

from line_profiler import LineProfiler

def process_batch(data):

result = []

for item in data:

normalized = (item - item.mean()) / (item.std() + 1e-8)

scaled = normalized * 2.0

clipped = np.clip(scaled, -5, 5)

result.append(clipped)

return np.stack(result)

プロファイラーの設定

lp = LineProfiler()

lp_wrapper = lp(process_batch)

サンプルデータで実行

data = [np.random.randn(1000) for _ in range(1000)]

lp_wrapper(data)

結果の表示

lp.print_stats()

Jupyter Notebookではマジックコマンドを使用できます:

%load_ext line_profiler

%lprun -f process_batch process_batch(data)

1.3 memory_profiler

行ごとにメモリ使用量を追跡します。

pip install memory_profiler

from memory_profiler import profile

@profile

def memory_intensive_function(n=1_000_000):

list_data = list(range(n)) # リストの作成

arr = np.array(list_data) # NumPy配列に変換

del list_data # リストの削除

squared = arr ** 2 # 新しい配列の作成

filtered = squared[squared > 100] # フィルタリング

return filtered.sum()

result = memory_intensive_function()

print(f"Result: {result}")

python -m memory_profiler your_script.py

mprof run your_script.py

mprof plot # 可視化

1.4 py-spy(サンプリングプロファイラー)

コードを変更せずに実行中のPythonプロセスをプロファイリングできる強力なツールです。

pip install py-spy

実行中のプロセスをプロファイリング

py-spy top --pid 12345

フレームグラフの生成

py-spy record -o profile.svg --pid 12345

スクリプトを直接プロファイリング

py-spy record -o profile.svg -- python your_script.py

1.5 SnakeViz可視化

pip install snakeviz

python -m cProfile -o output.prof your_script.py

snakeviz output.prof

2. NumPyベクトル化の深掘り

NumPyのベクトル化はPython AI/MLパフォーマンス最適化の要です。

2.1 forループとベクトル化のパフォーマンス比較

def benchmark(func, *args, runs=5):

times = []

for _ in range(runs):

start = time.perf_counter()

result = func(*args)

end = time.perf_counter()

times.append(end - start)

return np.mean(times), np.std(times), result

1. PythonのForループ

def relu_loop(x):

result = []

for val in x:

result.append(max(0, val))

return result

2. NumPyベクトル化

def relu_numpy(x):

return np.maximum(0, x)

3. NumPy clip

def relu_clip(x):

return np.clip(x, 0, None)

n = 1_000_000

data_list = [np.random.randn() for _ in range(n)]

data_np = np.array(data_list)

t_loop, _, _ = benchmark(relu_loop, data_list)

t_numpy, _, _ = benchmark(relu_numpy, data_np)

t_clip, _, _ = benchmark(relu_clip, data_np)

print(f"Python loop: {t_loop:.4f}s")

print(f"NumPy maximum: {t_numpy:.4f}s ({t_loop/t_numpy:.0f}x faster)")

print(f"NumPy clip: {t_clip:.4f}s ({t_loop/t_clip:.0f}x faster)")

2.2 ユニバーサル関数(ufunc)

def custom_activation_scalar(x, alpha=0.1):

"""ELU活性化関数"""

if x >= 0:

return x

else:

return alpha * (np.exp(x) - 1)

vectorizeでufuncを作成

elu_ufunc = np.vectorize(custom_activation_scalar)

完全なNumPy ufunc(より高速)

def elu_numpy(x, alpha=0.1):

return np.where(x >= 0, x, alpha * (np.exp(x) - 1))

x = np.random.randn(1_000_000)

t_vectorize, _, _ = benchmark(elu_ufunc, x)

t_numpy, _, _ = benchmark(elu_numpy, x)

print(f"np.vectorize: {t_vectorize:.4f}s")

print(f"np.where: {t_numpy:.4f}s")

2.3 np.einsum(アインシュタイン縮約表記)

batch_size, seq_len, d_model = 32, 128, 512

heads, d_head = 8, 64

A = np.random.randn(batch_size, seq_len, d_model)

W = np.random.randn(d_model, d_model)

標準的な行列積

result_matmul = A @ W # (32, 128, 512)

einsumで同じ演算

result_einsum = np.einsum('bsi,ij->bsj', A, W)

アテンションスコア(バッチ行列積)

Q = np.random.randn(batch_size, heads, seq_len, d_head)

K = np.random.randn(batch_size, heads, seq_len, d_head)

scores_manual = Q @ K.transpose(0, 1, 3, 2) # (32, 8, 128, 128)

scores_einsum = np.einsum('bhid,bhjd->bhij', Q, K)

バッチ外積

a = np.random.randn(batch_size, d_model)

b = np.random.randn(batch_size, d_model)

outer = np.einsum('bi,bj->bij', a, b) # (32, 512, 512)

トレース(対角成分の和)

M = np.random.randn(batch_size, d_model, d_model)

trace = np.einsum('bii->b', M) # (32,)

print(f"Attention scores shape: {scores_einsum.shape}")

print(f"Outer product shape: {outer.shape}")

print(f"Trace shape: {trace.shape}")

最適な縮約経路にはoptimizeパラメータを使用

result = np.einsum('bhid,bhjd->bhij', Q, K, optimize='optimal')

2.4 ストライドトリック

from numpy.lib.stride_tricks import as_strided, sliding_window_view

arr = np.arange(20, dtype=float)

window_size = 5

現代的なアプローチ(NumPy 1.20以上)

windows = sliding_window_view(arr, window_size)

print(f"Sliding window shape: {windows.shape}") # (16, 5)

移動平均

moving_avg = windows.mean(axis=-1)

print(f"Moving average: {moving_avg[:5]}")

2Dスライディングウィンドウ(畳み込み前処理用)

image = np.random.randn(64, 64)

kernel_size = (3, 3)

windows_2d = sliding_window_view(image, kernel_size)

print(f"2D window shape: {windows_2d.shape}") # (62, 62, 3, 3)

ストライドトリックを使った非重複チャンク

batch = np.random.randn(1000, 10)

chunk_size = 50

n_chunks = len(batch) // chunk_size

chunks = batch[:n_chunks * chunk_size].reshape(n_chunks, chunk_size, -1)

print(f"Chunks shape: {chunks.shape}") # (20, 50, 10)

3. Numba JITコンパイル

NumbaはPython関数をマシンコードにコンパイルし、C/C++レベルのパフォーマンスを実現します。

3.1 @jit、@njitデコレータ

from numba import jit, njit, prange

@jit: フォールバックモード(Pythonオブジェクトを許可)

@jit(nopython=False)

def sum_with_jit(arr):

total = 0.0

for i in range(len(arr)):

total += arr[i]

return total

@njit: 純粋なネイティブモード(より高速、推奨)

@njit

def sum_with_njit(arr):

total = 0.0

for i in range(len(arr)):

total += arr[i]

return total

@njit

def compute_distances(points):

"""点間のユークリッド距離行列を計算"""

n = len(points)

distances = np.zeros((n, n))

for i in range(n):

for j in range(i + 1, n):

diff = points[i] - points[j]

dist = np.sqrt(np.sum(diff ** 2))

distances[i, j] = dist

distances[j, i] = dist

return distances

arr = np.random.randn(1_000_000)

最初の呼び出しはコンパイル時間を含む

_ = sum_with_njit(arr) # ウォームアップ

start = time.perf_counter()

result_numpy = np.sum(arr)

print(f"NumPy sum: {time.perf_counter() - start:.4f}s")

start = time.perf_counter()

result_njit = sum_with_njit(arr)

print(f"Numba njit: {time.perf_counter() - start:.4f}s")

点群の距離計算

points = np.random.randn(500, 3).astype(np.float64)

_ = compute_distances(points) # ウォームアップ

start = time.perf_counter()

dist_matrix = compute_distances(points)

print(f"Numba distance matrix ({points.shape[0]}x{points.shape[0]}): {time.perf_counter() - start:.4f}s")

3.2 型推論と明示的な型宣言

from numba import njit, float64, int64

明示的な型指定で初回呼び出しの遅延を回避

@njit(float64[:](float64[:], float64))

def scale_array_typed(arr, factor):

return arr * factor

複数の型シグネチャ

from numba import float32, float64

@njit([

float32[:](float32[:], float32),

float64[:](float64[:], float64)

])

def scale_multi_type(arr, factor):

return arr * factor

@njit

def softmax_numba(x):

"""Numba高速化softmax"""

max_val = np.max(x)

exp_x = np.exp(x - max_val)

return exp_x / np.sum(exp_x)

@njit

def batch_softmax(X):

"""2次元配列のバッチsoftmax"""

result = np.empty_like(X)

for i in range(X.shape[0]):

result[i] = softmax_numba(X[i])

return result

X = np.random.randn(1000, 512).astype(np.float64)

_ = batch_softmax(X) # ウォームアップ

start = time.perf_counter()

result = batch_softmax(X)

print(f"Numba batch softmax: {time.perf_counter() - start:.4f}s")

3.3 並列処理(@prange)

from numba import njit, prange

@njit(parallel=True)

def parallel_normalize(X):

"""自動並列化による行ごとの正規化"""

result = np.empty_like(X)

for i in prange(X.shape[0]): # 自動並列化

row = X[i]

mean = row.mean()

std = row.std()

result[i] = (row - mean) / (std + 1e-8)

return result

@njit(parallel=True)

def parallel_batch_process(data, weights):

"""バッチの並列加重和"""

n_samples, n_features = data.shape

results = np.zeros(n_samples)

for i in prange(n_samples):

dot = 0.0

for j in range(n_features):

dot += data[i, j] * weights[j]

results[i] = dot

return results

X = np.random.randn(10000, 512).astype(np.float64)

_ = parallel_normalize(X) # ウォームアップ

start = time.perf_counter()

result_parallel = parallel_normalize(X)

t_parallel = time.perf_counter() - start

@njit

def sequential_normalize(X):

result = np.empty_like(X)

for i in range(X.shape[0]):

row = X[i]

mean = row.mean()

std = row.std()

result[i] = (row - mean) / (std + 1e-8)

return result

_ = sequential_normalize(X)

start = time.perf_counter()

result_seq = sequential_normalize(X)

t_seq = time.perf_counter() - start

print(f"Sequential: {t_seq:.4f}s")

print(f"Parallel: {t_parallel:.4f}s ({t_seq/t_parallel:.1f}x faster)")

3.4 Numbaキャッシュ

from numba import njit

cache=Trueでコンパイル済みコードをディスクに保存

@njit(cache=True)

def cached_computation(x, y):

"""キャッシュ済みJIT関数 — 次回実行時は即座にロード"""

result = np.empty(len(x))

for i in range(len(x)):

result[i] = x[i] ** 2 + y[i] ** 2

return result

arr1 = np.random.randn(1000)

arr2 = np.random.randn(1000)

result = cached_computation(arr1, arr2)

4. Cython

CythonはPythonコードをCにトランスパイルし、最大のパフォーマンスを実現します。

4.1 .pyxファイルの作成

setup.py — Cythonビルド設定

from setuptools import setup

from Cython.Build import cythonize

setup(

ext_modules=cythonize(

"fast_ops.pyx",

compiler_directives={

'language_level': '3',

'boundscheck': False,

'wraparound': False,

'nonecheck': False,

}

),

include_dirs=[np.get_include()]

)

`fast_ops.pyx`の例:

fast_ops.pyx

cython: language_level=3

cython: boundscheck=False

cython: wraparound=False

cimport numpy as cnp

from libc.math cimport exp, sqrt

def relu_cython(cnp.ndarray[cnp.double_t, ndim=1] x):

cdef int n = len(x)

cdef cnp.ndarray[cnp.double_t, ndim=1] result = np.empty(n)

cdef int i

cdef double val

for i in range(n):

val = x[i]

result[i] = val if val > 0 else 0.0

return result

def softmax_cython(cnp.ndarray[cnp.double_t, ndim=1] x):

cdef int n = len(x)

cdef cnp.ndarray[cnp.double_t, ndim=1] result = np.empty(n)

cdef double max_val = x[0]

cdef double sum_exp = 0.0

cdef int i

for i in range(n):

if x[i] > max_val:

max_val = x[i]

for i in range(n):

result[i] = exp(x[i] - max_val)

sum_exp += result[i]

for i in range(n):

result[i] /= sum_exp

return result

ビルドと使用:

python setup.py build_ext --inplace

x = np.random.randn(1_000_000)

result = fast_ops.relu_cython(x)

4.2 IPython Cythonマジック

%load_ext Cython

%%cython -a

-a: アノテーションモード(黄色=Python呼び出し、白=C)

cimport numpy as cnp

def fast_l2_norm(cnp.ndarray[cnp.double_t, ndim=1] v):

cdef int n = len(v)

cdef double total = 0.0

cdef int i

for i in range(n):

total += v[i] * v[i]

return total ** 0.5

5. Pythonマルチプロセッシング

5.1 GILの理解

GIL(Global Interpreter Lock)は一度に1つのスレッドしかPythonバイトコードを実行できません。CPUバウンドなタスクでは、マルチスレッドは実質的な並列処理を提供しません — 代わりにマルチプロセッシングを使用します。

def cpu_bound_task(n):

total = 0

for i in range(n):

total += i ** 2

return total

def run_threaded(n_tasks=4, n_per_task=1_000_000):

threads = []

start = time.perf_counter()

for _ in range(n_tasks):

t = threading.Thread(target=cpu_bound_task, args=(n_per_task,))

threads.append(t)

t.start()

for t in threads:

t.join()

return time.perf_counter() - start

def run_multiprocessing(n_tasks=4, n_per_task=1_000_000):

start = time.perf_counter()

with multiprocessing.Pool(processes=n_tasks) as pool:

results = pool.map(cpu_bound_task, [n_per_task] * n_tasks)

return time.perf_counter() - start

t_threaded = run_threaded()

t_mp = run_multiprocessing()

print(f"Threading: {t_threaded:.2f}s")

print(f"Multiprocessing: {t_mp:.2f}s ({t_threaded/t_mp:.1f}x faster)")

5.2 multiprocessing.Pool

def preprocess_sample(args):

idx, data, mean, std = args

normalized = (data - mean) / (std + 1e-8)

augmented = normalized + np.random.randn(*normalized.shape) * 0.01

return idx, augmented

def preprocess_dataset_parallel(dataset, n_workers=None):

if n_workers is None:

n_workers = multiprocessing.cpu_count()

mean = dataset.mean(axis=0)

std = dataset.std(axis=0)

args = [(i, dataset[i], mean, std) for i in range(len(dataset))]

with multiprocessing.Pool(processes=n_workers) as pool:

results = pool.map(preprocess_sample, args, chunksize=100)

results.sort(key=lambda x: x[0])

return np.stack([r[1] for r in results])

dataset = np.random.randn(10000, 128)

processed = preprocess_dataset_parallel(dataset, n_workers=4)

print(f"Processed dataset shape: {processed.shape}")

5.3 ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor, as_completed

def train_fold(fold_data):

fold_idx, X_train, y_train, X_val, y_val = fold_data

from sklearn.linear_model import LogisticRegression

from sklearn.metrics import accuracy_score

model = LogisticRegression(max_iter=1000)

model.fit(X_train, y_train)

val_pred = model.predict(X_val)

accuracy = accuracy_score(y_val, val_pred)

return fold_idx, accuracy, model

def parallel_cross_validation(X, y, n_folds=5):

from sklearn.model_selection import KFold

kf = KFold(n_splits=n_folds, shuffle=True, random_state=42)

fold_data_list = []

for fold_idx, (train_idx, val_idx) in enumerate(kf.split(X)):

fold_data_list.append((

fold_idx,

X[train_idx], y[train_idx],

X[val_idx], y[val_idx]

))

results = {}

with ProcessPoolExecutor(max_workers=n_folds) as executor:

future_to_fold = {

executor.submit(train_fold, fd): fd[0]

for fd in fold_data_list

}

for future in as_completed(future_to_fold):

fold_idx, accuracy, model = future.result()

results[fold_idx] = {'accuracy': accuracy, 'model': model}

print(f"Fold {fold_idx}: accuracy = {accuracy:.4f}")

avg_accuracy = np.mean([r['accuracy'] for r in results.values()])

print(f"\nMean accuracy: {avg_accuracy:.4f}")

return results

from sklearn.datasets import make_classification

X, y = make_classification(n_samples=5000, n_features=20, random_state=42)

results = parallel_cross_validation(X, y, n_folds=5)

5.4 共有メモリ

from multiprocessing import shared_memory

def worker_shared_memory(shm_name, shape, dtype, start_idx, end_idx):

existing_shm = shared_memory.SharedMemory(name=shm_name)

arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)

arr[start_idx:end_idx] = arr[start_idx:end_idx] ** 2

existing_shm.close()

def process_with_shared_memory(data):

shm = shared_memory.SharedMemory(create=True, size=data.nbytes)

shared_array = np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf)

shared_array[:] = data[:]

n_workers = multiprocessing.cpu_count()

chunk_size = len(data) // n_workers

processes = []

for i in range(n_workers):

start = i * chunk_size

end = start + chunk_size if i < n_workers - 1 else len(data)

p = multiprocessing.Process(

target=worker_shared_memory,

args=(shm.name, data.shape, data.dtype, start, end)

)

processes.append(p)

p.start()

for p in processes:

p.join()

result = shared_array.copy()

shm.close()

shm.unlink()

return result

data = np.random.randn(1_000_000).astype(np.float64)

result = process_with_shared_memory(data)

print(f"Shared memory processing complete: {result[:5]}")

6. 非同期プログラミング(asyncio)

6.1 async/awaitパターン

async def fetch_embeddings(text: str, model: str = "text-embedding-3-small") -> list:

"""非同期埋め込みリクエストのシミュレーション"""

await asyncio.sleep(0.1)

return [0.1, 0.2, 0.3]

async def sequential_processing(texts: list) -> list:

results = []

for text in texts:

embedding = await fetch_embeddings(text)

results.append(embedding)

return results

async def parallel_processing(texts: list) -> list:

tasks = [fetch_embeddings(text) for text in texts]

results = await asyncio.gather(*tasks)

return list(results)

async def compare_approaches():

texts = [f"sample text {i}" for i in range(20)]

start = time.perf_counter()

sequential_results = await sequential_processing(texts)

t_seq = time.perf_counter() - start

start = time.perf_counter()

parallel_results = await parallel_processing(texts)

t_par = time.perf_counter() - start

print(f"Sequential: {t_seq:.2f}s ({len(sequential_results)} results)")

print(f"Parallel: {t_par:.2f}s ({len(parallel_results)} results)")

print(f"Speedup: {t_seq/t_par:.1f}x")

asyncio.run(compare_approaches())

6.2 aiohttpによる非同期HTTP

from typing import Optional

class AsyncAIClient:

"""レート制限付き非同期AIAPIクライアント"""

def __init__(self, api_key: str, base_url: str, max_concurrent: int = 10):

self.api_key = api_key

self.base_url = base_url

self.semaphore = asyncio.Semaphore(max_concurrent)

self.session: Optional[aiohttp.ClientSession] = None

async def __aenter__(self):

connector = aiohttp.TCPConnector(limit=100)

self.session = aiohttp.ClientSession(

connector=connector,

headers={"Authorization": f"Bearer {self.api_key}"}

)

return self

async def __aexit__(self, *args):

if self.session:

await self.session.close()

async def get_embedding(self, text: str, retry: int = 3) -> list:

"""指数バックオフリトライ付き埋め込みリクエスト"""

async with self.semaphore:

for attempt in range(retry):

try:

async with self.session.post(

f"{self.base_url}/embeddings",

json={"input": text, "model": "text-embedding-3-small"}

) as response:

if response.status == 200:

data = await response.json()

return data["data"][0]["embedding"]

elif response.status == 429:

レート制限 — 指数バックオフ

await asyncio.sleep(2 ** attempt)

else:

response.raise_for_status()

except aiohttp.ClientError as e:

if attempt == retry - 1:

raise

await asyncio.sleep(1)

return []

async def batch_embeddings(self, texts: list, batch_size: int = 100) -> list:

"""バッチで埋め込みを処理"""

all_results = []

for i in range(0, len(texts), batch_size):

batch = texts[i:i + batch_size]

tasks = [self.get_embedding(text) for text in batch]

batch_results = await asyncio.gather(*tasks, return_exceptions=True)

for j, result in enumerate(batch_results):

if isinstance(result, Exception):

print(f"Error for text {i+j}: {result}")

all_results.append(None)

else:

all_results.append(result)

return all_results

6.3 AI API用動的バッチプロセッサー

from dataclasses import dataclass, field

from typing import Any

@dataclass

class BatchProcessor:

"""動的リクエストバッチャー"""

max_batch_size: int = 20

max_wait_time: float = 0.05 # 50ms

queue: asyncio.Queue = field(default_factory=asyncio.Queue)

async def process_request(self, request_id: str, data: Any) -> Any:

future = asyncio.get_event_loop().create_future()

await self.queue.put((request_id, data, future))

return await future

async def batch_worker(self):

while True:

batch = []

deadline = time.perf_counter() + self.max_wait_time

while len(batch) < self.max_batch_size:

timeout = deadline - time.perf_counter()

if timeout <= 0:

break

try:

item = await asyncio.wait_for(self.queue.get(), timeout=timeout)

batch.append(item)

except asyncio.TimeoutError:

break

if not batch:

await asyncio.sleep(0.001)

continue

request_ids, data_list, futures = zip(*batch)

try:

results = await self._call_api_batch(list(data_list))

for future, result in zip(futures, results):

future.set_result(result)

except Exception as e:

for future in futures:

future.set_exception(e)

async def _call_api_batch(self, data_list: list) -> list:

await asyncio.sleep(0.1) # APIレイテンシのシミュレーション

return [f"result_{d}" for d in data_list]

async def test_batch_processor():

processor = BatchProcessor(max_batch_size=10, max_wait_time=0.05)

worker_task = asyncio.create_task(processor.batch_worker())

async def make_request(i):

return await processor.process_request(f"req_{i}", f"data_{i}")

start = time.perf_counter()

tasks = [make_request(i) for i in range(50)]

results = await asyncio.gather(*tasks)

elapsed = time.perf_counter() - start

print(f"50 requests processed: {elapsed:.3f}s")

print(f"First 5 results: {results[:5]}")

worker_task.cancel()

asyncio.run(test_batch_processor())

7. メモリ管理

7.1 Pythonメモリモデル

data_types = {

'int': 42,

'float': 3.14,

'str (small)': "hello",

'str (large)': "hello" * 1000,

'list (empty)': [],

'dict (empty)': {},

'tuple (empty)': (),

}

print("Object sizes:")

for name, obj in data_types.items():

print(f" {name}: {sys.getsizeof(obj)} bytes")

def deep_size(obj, seen=None):

"""オブジェクトとその参照するすべてのオブジェクトの合計メモリ"""

if seen is None:

seen = set()

obj_id = id(obj)

if obj_id in seen:

return 0

seen.add(obj_id)

size = sys.getsizeof(obj)

if isinstance(obj, dict):

size += sum(deep_size(k, seen) + deep_size(v, seen)

for k, v in obj.items())

elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes)):

size += sum(deep_size(item, seen) for item in obj)

return size

nested = {'data': [i for i in range(1000)], 'meta': {'name': 'test'}}

print(f"\nNested dict size: {deep_size(nested):,} bytes")

7.2 メモリ効率のためのジェネレータ

from typing import Generator, Iterator

def dataset_generator(

file_paths: list,

batch_size: int = 32

) -> Generator:

"""メモリ効率的なデータローダー"""

for path in file_paths:

data = np.random.randn(1000, 128)

labels = np.random.randint(0, 10, 1000)

for start in range(0, len(data), batch_size):

end = min(start + batch_size, len(data))

yield data[start:end], labels[start:end]

def augment_generator(gen: Iterator, noise_std: float = 0.01):

"""連鎖拡張ジェネレータ"""

for X, y in gen:

X_aug = X + np.random.randn(*X.shape) * noise_std

yield X_aug, y

def normalize_generator(gen: Iterator):

"""正規化ジェネレータ"""

for X, y in gen:

X_norm = (X - X.mean(axis=1, keepdims=True)) / (X.std(axis=1, keepdims=True) + 1e-8)

yield X_norm, y

パイプラインの構成

fake_paths = [f"data_{i}.npy" for i in range(5)]

base_gen = dataset_generator(fake_paths, batch_size=32)

aug_gen = augment_generator(base_gen)

norm_gen = normalize_generator(aug_gen)

for i, (X_batch, y_batch) in enumerate(norm_gen):

if i >= 3:

break

print(f"Batch {i}: shape={X_batch.shape}, mean={X_batch.mean():.4f}")

7.3 `__slots__`の最適化

class RegularEmbedding:

def __init__(self, vector, label, metadata):

self.vector = vector

self.label = label

self.metadata = metadata

class SlottedEmbedding:

__slots__ = ['vector', 'label', 'metadata']

def __init__(self, vector, label, metadata):

self.vector = vector

self.label = label

self.metadata = metadata

n = 100_000

vector_size = 128

regular_objects = [

RegularEmbedding(np.random.randn(vector_size), i % 10, {'source': 'test'})

for i in range(n)

]

slotted_objects = [

SlottedEmbedding(np.random.randn(vector_size), i % 10, {'source': 'test'})

for i in range(n)

]

regular_size = sys.getsizeof(regular_objects[0]) + sys.getsizeof(regular_objects[0].__dict__)

slotted_size = sys.getsizeof(slotted_objects[0])

print(f"Regular class instance: {regular_size} bytes")

print(f"__slots__ class instance: {slotted_size} bytes")

print(f"Memory saved: {(regular_size - slotted_size) / regular_size * 100:.1f}%")

7.4 weakref

class ModelCache:

"""弱参照を使用したモデルキャッシュ"""

def __init__(self):

self._cache = weakref.WeakValueDictionary()

def get_or_load(self, model_name: str):

model = self._cache.get(model_name)

if model is None:

print(f"Loading model: {model_name}")

model = self._load_model(model_name)

self._cache[model_name] = model

else:

print(f"Returning from cache: {model_name}")

return model

def _load_model(self, model_name: str):

return {'name': model_name, 'weights': np.random.randn(100, 100)}

cache = ModelCache()

model = cache.get_or_load('resnet50')

print(f"Cache size: {len(cache._cache)}")

model_again = cache.get_or_load('resnet50')

del model

del model_again

gc.collect()

print(f"Cache size after GC: {len(cache._cache)}")

new_model = cache.get_or_load('resnet50')

8. Python型ヒント

8.1 mypyによる静的型チェック

pip install mypy

mypy your_script.py --strict

from typing import Optional, Union, List, Dict, Tuple, Callable

from numpy.typing import NDArray

def compute_loss(

predictions: NDArray[np.float32],

targets: NDArray[np.float32],

reduction: str = 'mean'

) -> float:

diff = predictions - targets

loss = np.sum(diff ** 2)

if reduction == 'mean':

return float(loss / len(predictions))

return float(loss)

def load_checkpoint(

path: str,

device: Optional[str] = None,

strict: bool = True

) -> Optional[Dict[str, NDArray]]:

try:

return {'weights': np.random.randn(100)}

except FileNotFoundError:

return None

def split_dataset(

X: NDArray,

y: NDArray,

test_size: float = 0.2,

random_state: Optional[int] = None

) -> Tuple[NDArray, NDArray, NDArray, NDArray]:

if random_state is not None:

np.random.seed(random_state)

n = len(X)

n_test = int(n * test_size)

indices = np.random.permutation(n)

test_idx = indices[:n_test]

train_idx = indices[n_test:]

return X[train_idx], X[test_idx], y[train_idx], y[test_idx]

8.2 ジェネリクスとProtocol

from typing import TypeVar, Generic, Protocol, runtime_checkable, List, Dict

T = TypeVar('T')

class DataLoader(Generic[T]):

def __init__(self, dataset: List[T], batch_size: int = 32):

self.dataset = dataset

self.batch_size = batch_size

self._idx = 0

def __iter__(self):

self._idx = 0

return self

def __next__(self) -> List[T]:

if self._idx >= len(self.dataset):

raise StopIteration

batch = self.dataset[self._idx:self._idx + self.batch_size]

self._idx += self.batch_size

return batch

@runtime_checkable

class Optimizer(Protocol):

def step(self) -> None: ...

def zero_grad(self) -> None: ...

def state_dict(self) -> Dict: ...

@runtime_checkable

class Model(Protocol):

def forward(self, x: 'NDArray') -> 'NDArray': ...

def parameters(self) -> List['NDArray']: ...

def train(self) -> None: ...

def eval(self) -> None: ...

8.3 dataclassとTypedDict

from dataclasses import dataclass, field

from typing import TypedDict, List, Dict, Optional

class ModelConfig(TypedDict):

hidden_size: int

num_layers: int

dropout: float

activation: str

@dataclass

class ExperimentConfig:

experiment_name: str

model_type: str

learning_rate: float = 1e-3

batch_size: int = 32

max_epochs: int = 100

seed: int = 42

tags: List[str] = field(default_factory=list)

hyperparams: Dict[str, float] = field(default_factory=dict)

def __post_init__(self):

if self.learning_rate <= 0:

raise ValueError(f"Learning rate must be positive: {self.learning_rate}")

if not self.experiment_name:

raise ValueError("Experiment name is required")

def to_dict(self) -> Dict:

from dataclasses import asdict

return asdict(self)

config = ExperimentConfig(

experiment_name="bert-finetuning-v1",

model_type="bert-base",

learning_rate=2e-5,

batch_size=16,

tags=["nlp", "classification"]

)

print(f"Experiment: {config.experiment_name}")

print(f"Config: {config.to_dict()}")

8.4 pydanticによるデータ検証

from pydantic import BaseModel, Field, validator, root_validator

from typing import Optional, List

class DataConfig(BaseModel):

data_path: str

batch_size: int = Field(ge=1, le=1024, default=32)

shuffle: bool = True

num_workers: int = Field(ge=0, le=16, default=4)

@validator('data_path')

def path_must_exist(cls, v):

if not os.path.exists(v) and v != "test_path":

raise ValueError(f"Data path does not exist: {v}")

return v

class ModelConfig(BaseModel):

architecture: str

hidden_sizes: List[int] = Field(min_items=1)

dropout_rate: float = Field(ge=0.0, le=0.9, default=0.1)

activation: str = "relu"

@validator('activation')

def valid_activation(cls, v):

valid = ['relu', 'gelu', 'silu', 'tanh', 'sigmoid']

if v not in valid:

raise ValueError(f"Activation must be one of {valid}")

return v

class TrainingConfig(BaseModel):

data: DataConfig

model: ModelConfig

learning_rate: float = Field(gt=0, le=1.0, default=1e-3)

weight_decay: float = Field(ge=0, default=1e-4)

max_epochs: int = Field(ge=1, le=10000, default=100)

@root_validator

def check_consistency(cls, values):

model = values.get('model')

if model and model.dropout_rate > 0.5:

print("Warning: High dropout rate may cause underfitting")

return values

try:

config = TrainingConfig(

data=DataConfig(data_path="test_path", batch_size=64),

model=ModelConfig(

architecture="transformer",

hidden_sizes=[512, 256, 128],

dropout_rate=0.1

),

learning_rate=2e-5

)

print(f"Validation passed: {config.model.architecture}")

print(f"JSON: {config.json(indent=2)}")

except Exception as e:

print(f"Validation error: {e}")

9. パッケージ管理と依存関係

9.1 Poetry

Poetryのインストール

curl -sSL https://install.python-poetry.org | python3 -

poetry new my-ml-project

cd my-ml-project

依存関係の追加

poetry add numpy torch torchvision

poetry add --group dev pytest black ruff mypy

仮想環境のアクティベート

poetry shell

すべての依存関係をインストール

poetry install

requirements.txtにエクスポート

poetry export -f requirements.txt --output requirements.txt

9.2 pyproject.toml

[tool.poetry]

name = "my-ml-project"

version = "0.1.0"

description = "AI/ML project"

authors = ["Your Name <email@example.com>"]

license = "MIT"

readme = "README.md"

packages = [{include = "my_ml_project"}]

[tool.poetry.dependencies]

python = "^3.10"

numpy = "^1.24"

torch = "^2.1"

torchvision = "^0.16"

transformers = "^4.35"

pydantic = "^2.0"

aiohttp = "^3.9"

[tool.poetry.group.dev.dependencies]

pytest = "^7.4"

pytest-cov = "^4.1"

black = "^23.0"

ruff = "^0.1"

mypy = "^1.6"

pre-commit = "^3.5"

[build-system]

requires = ["poetry-core"]

build-backend = "poetry.core.masonry.api"

[tool.black]

line-length = 88

target-version = ['py310', 'py311']

[tool.ruff]

select = ["E", "F", "I", "N", "W"]

ignore = ["E501"]

line-length = 88

[tool.mypy]

python_version = "3.10"

warn_return_any = true

warn_unused_configs = true

disallow_untyped_defs = true

[tool.pytest.ini_options]

testpaths = ["tests"]

addopts = "-v --cov=my_ml_project --cov-report=html"

9.3 uv(高速パッケージマネージャー)

uvのインストール(Rustベース、超高速)

curl -LsSf https://astral.sh/uv/install.sh | sh

プロジェクトの初期化

uv init my-project

cd my-project

依存関係の追加(pipより10〜100倍高速)

uv add numpy torch transformers

uv add --dev pytest ruff mypy

環境の同期

uv sync

Pythonバージョンの管理

uv python install 3.11

uv python pin 3.11

スクリプトの実行

uv run python train.py

10. MLのためのクリーンコード

10.1 デザインパターン(ファクトリー、ストラテジー)

from abc import ABC, abstractmethod

from typing import Dict, Type

ストラテジーパターン — 損失関数

class LossFunction(ABC):

@abstractmethod

def __call__(self, predictions: np.ndarray, targets: np.ndarray) -> float:

pass

@abstractmethod

def gradient(self, predictions: np.ndarray, targets: np.ndarray) -> np.ndarray:

pass

class MSELoss(LossFunction):

def __call__(self, predictions, targets):

return float(np.mean((predictions - targets) ** 2))

def gradient(self, predictions, targets):

return 2 * (predictions - targets) / len(predictions)

class HuberLoss(LossFunction):

def __init__(self, delta: float = 1.0):

self.delta = delta

def __call__(self, predictions, targets):

diff = predictions - targets

is_small = np.abs(diff) <= self.delta

loss = np.where(is_small, 0.5 * diff**2,

self.delta * np.abs(diff) - 0.5 * self.delta**2)

return float(np.mean(loss))

def gradient(self, predictions, targets):

diff = predictions - targets

return np.where(np.abs(diff) <= self.delta, diff,

self.delta * np.sign(diff))

ファクトリーパターン — モデルの作成

class ModelFactory:

_registry: Dict[str, Type] = {}

@classmethod

def register(cls, name: str):

def decorator(model_class):

cls._registry[name] = model_class

return model_class

return decorator

@classmethod

def create(cls, name: str, **kwargs):

if name not in cls._registry:

available = list(cls._registry.keys())

raise ValueError(f"Unknown model: {name}. Available: {available}")

return cls._registry[name](**kwargs)

@ModelFactory.register("linear")

class LinearModel:

def __init__(self, input_size: int, output_size: int):

self.W = np.random.randn(input_size, output_size) * 0.01

self.b = np.zeros(output_size)

def forward(self, X: np.ndarray) -> np.ndarray:

return X @ self.W + self.b

@ModelFactory.register("mlp")

class MLPModel:

def __init__(self, layer_sizes: list):

self.layers = []

for i in range(len(layer_sizes) - 1):

W = np.random.randn(layer_sizes[i], layer_sizes[i+1]) * 0.01

b = np.zeros(layer_sizes[i+1])

self.layers.append((W, b))

def forward(self, X: np.ndarray) -> np.ndarray:

out = X

for i, (W, b) in enumerate(self.layers):

out = out @ W + b

if i < len(self.layers) - 1:

out = np.maximum(0, out)

return out

model = ModelFactory.create("mlp", layer_sizes=[128, 256, 10])

X = np.random.randn(32, 128)

output = model.forward(X)

print(f"Model output shape: {output.shape}")

10.2 pytestを使ったMLテスト

tests/test_models.py

from numpy.testing import assert_array_almost_equal

class TestLossFunctions:

def test_mse_zero_loss(self):

"""完全な予測はゼロ損失を生じさせるべき"""

x = np.array([1.0, 2.0, 3.0])

loss = MSELoss()(x, x)

assert abs(loss) < 1e-10

def test_mse_positive(self):

"""MSEは常に非負でなければならない"""

predictions = np.random.randn(100)

targets = np.random.randn(100)

loss = MSELoss()(predictions, targets)

assert loss >= 0

@pytest.mark.parametrize("batch_size", [1, 16, 32, 128])

def test_batch_sizes(self, batch_size):

"""モデルはさまざまなバッチサイズを処理できなければならない"""

X = np.random.randn(batch_size, 64)

model = LinearModel(64, 10)

output = model.forward(X)

assert output.shape == (batch_size, 10)

def test_numerical_gradient(self):

"""解析的勾配が数値的勾配と一致することを検証"""

loss_fn = MSELoss()

pred = np.array([0.5, 0.3, 0.2])

target = np.array([0.6, 0.2, 0.1])

analytical_grad = loss_fn.gradient(pred, target)

eps = 1e-5

numerical_grad = np.zeros_like(pred)

for i in range(len(pred)):

pred_plus = pred.copy()

pred_plus[i] += eps

pred_minus = pred.copy()

pred_minus[i] -= eps

numerical_grad[i] = (loss_fn(pred_plus, target) - loss_fn(pred_minus, target)) / (2 * eps)

assert_array_almost_equal(analytical_grad, numerical_grad, decimal=5)

実行方法:

pytest tests/ -v --cov=src --cov-report=html -x

10.3 ロギングのベストプラクティス

from pathlib import Path

from typing import Optional

def setup_logger(

name: str,

level: int = logging.INFO,

log_file: Optional[str] = None,

format_str: Optional[str] = None

) -> logging.Logger:

if format_str is None:

format_str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

formatter = logging.Formatter(format_str, datefmt='%Y-%m-%d %H:%M:%S')

logger = logging.getLogger(name)

logger.setLevel(level)

if not logger.handlers:

console_handler = logging.StreamHandler(sys.stdout)

console_handler.setFormatter(formatter)

logger.addHandler(console_handler)

if log_file:

Path(log_file).parent.mkdir(parents=True, exist_ok=True)

file_handler = logging.FileHandler(log_file)

file_handler.setFormatter(formatter)

logger.addHandler(file_handler)

return logger

class TrainingLogger:

def __init__(self, experiment_name: str, log_dir: str = "logs"):

self.logger = setup_logger(

f"training.{experiment_name}",

log_file=f"{log_dir}/{experiment_name}.log"

)

self.step = 0

def log_metrics(self, metrics: dict, phase: str = "train"):

metrics_str = ", ".join(f"{k}={v:.4f}" for k, v in metrics.items())

self.logger.info(f"[{phase}] step={self.step} {metrics_str}")

self.step += 1

def log_epoch(self, epoch: int, train_loss: float, val_loss: float):

self.logger.info(

f"Epoch {epoch}: train_loss={train_loss:.4f}, val_loss={val_loss:.4f}"

)

def log_hyperparams(self, params: dict):

self.logger.info(f"Hyperparameters: {params}")

logger = TrainingLogger("bert-finetuning-v1")

logger.log_hyperparams({"lr": 2e-5, "batch_size": 32, "epochs": 10})

for epoch in range(3):

for step in range(10):

logger.log_metrics(

{"loss": 2.0 - epoch * 0.3 - step * 0.01, "acc": 0.5 + epoch * 0.1},

phase="train"

)

logger.log_epoch(epoch, 1.5 - epoch * 0.3, 1.8 - epoch * 0.25)

まとめ

このガイドで解説したテクニックを実際のAI/MLプロジェクトに適用する際は、以下の優先順位に従いましょう:

1. **まずプロファイリング**:最適化前に必ずcProfileとline_profilerでボトルネックを特定する。

2. **NumPyでベクトル化**:ほとんどの場合、ベクトル化だけで10〜100倍の高速化が実現できる。

3. **ホットループにNumbaを適用**:純粋なPythonループが本当に避けられない場合にNumba JITを使用する。

4. **CPU飽和にはマルチプロセッシングを使用**:データ前処理やクロスバリデーションなどの独立したタスクにProcessPoolExecutorを活用する。

5. **I/Oバウンドパイプラインにはasyncio**:APIヘビーまたはファイルI/Oヘビーなワークフローで10倍以上のスループット向上を達成する。

6. **型ヒントとテスト**:mypy、pytest、pydanticを組み合わせて、安全にスケールできる堅牢なMLコードベースを維持する。

参考文献

- [Numba公式ドキュメント](https://numba.readthedocs.io/)

- [Cython公式ドキュメント](https://cython.readthedocs.io/)

- [Python asyncioドキュメント](https://docs.python.org/3/library/asyncio.html)

- [NumPy高度ガイド](https://numpy.org/doc/stable/user/quickstart.html)

- [Poetryドキュメント](https://python-poetry.org/docs/)

- [Pydantic v2ドキュメント](https://docs.pydantic.dev/)

현재 단락 (1/1053)

PythonはAI/MLエコシステムの共通言語です。しかし、素のPythonコードには本質的なパフォーマンスの限界があり、大規模なデータ処理やモデル学習パイプラインでボトルネックが生じます。このガイド...

작성 글자: 0원문 글자: 32,632작성 단락: 0/1053