Split View: AI/ML을 위한 Python 고급 기법: 성능 최적화, 병렬 처리, 메모리 관리
AI/ML을 위한 Python 고급 기법: 성능 최적화, 병렬 처리, 메모리 관리
개요
Python은 AI/ML 생태계의 중심 언어입니다. 그러나 기본 Python 코드는 성능 한계가 있으며, 대규모 데이터 처리와 모델 학습 파이프라인에서 병목이 발생합니다. 이 가이드에서는 프로덕션 수준의 AI/ML 시스템을 구축하기 위한 Python 고급 기법을 다룹니다.
순수 Python for 루프보다 수백 배 빠른 코드를 작성하고, GPU를 활용하며, 메모리를 효율적으로 관리하는 방법을 실전 예제와 함께 학습합니다.
1. 파이썬 성능 프로파일링
코드를 최적화하기 전에 반드시 프로파일링을 먼저 수행해야 합니다. "측정하지 않으면 최적화할 수 없다"는 원칙을 항상 기억하세요.
1.1 cProfile과 pstats
Python 표준 라이브러리에 포함된 cProfile은 함수 단위 프로파일링의 기본 도구입니다.
import cProfile
import pstats
import io
import numpy as np
def slow_matrix_multiply(a, b):
"""비효율적인 행렬 곱셈"""
rows_a = len(a)
cols_a = len(a[0])
cols_b = len(b[0])
result = [[0] * cols_b for _ in range(rows_a)]
for i in range(rows_a):
for j in range(cols_b):
for k in range(cols_a):
result[i][j] += a[i][k] * b[k][j]
return result
def profile_function():
size = 100
a = [[float(i + j) for j in range(size)] for i in range(size)]
b = [[float(i * j + 1) for j in range(size)] for i in range(size)]
slow_matrix_multiply(a, b)
# 프로파일링 실행
pr = cProfile.Profile()
pr.enable()
profile_function()
pr.disable()
# 결과 분석
stream = io.StringIO()
ps = pstats.Stats(pr, stream=stream)
ps.sort_stats('cumulative')
ps.print_stats(20)
print(stream.getvalue())
커맨드라인에서도 바로 실행할 수 있습니다.
python -m cProfile -s cumulative your_script.py
python -m cProfile -o output.prof your_script.py
저장된 .prof 파일을 분석할 때는 다음과 같이 합니다.
import pstats
p = pstats.Stats('output.prof')
p.sort_stats('cumulative')
p.print_stats(10) # 상위 10개 함수
p.print_callers('slow_function') # 특정 함수 호출자
p.print_callees('main') # 특정 함수가 호출하는 함수들
1.2 line_profiler (라인별 분석)
함수 단위가 아닌 라인 단위 성능을 측정합니다.
pip install line_profiler
# line_profiler 사용 예시
from line_profiler import LineProfiler
import numpy as np
def process_batch(data):
# 각 라인의 실행 시간 측정
result = []
for item in data:
normalized = (item - item.mean()) / (item.std() + 1e-8)
scaled = normalized * 2.0
clipped = np.clip(scaled, -5, 5)
result.append(clipped)
return np.stack(result)
# 프로파일러 설정
lp = LineProfiler()
lp_wrapper = lp(process_batch)
# 샘플 데이터로 실행
data = [np.random.randn(1000) for _ in range(1000)]
lp_wrapper(data)
# 결과 출력
lp.print_stats()
Jupyter Notebook에서는 매직 커맨드를 사용할 수 있습니다.
# Jupyter에서 line_profiler 사용
%load_ext line_profiler
%lprun -f process_batch process_batch(data)
1.3 memory_profiler
메모리 사용량을 라인별로 추적합니다.
pip install memory_profiler
from memory_profiler import profile
import numpy as np
@profile
def memory_intensive_function(n=1_000_000):
# 메모리 할당 추적
list_data = list(range(n)) # 리스트 생성
arr = np.array(list_data) # NumPy 배열 변환
del list_data # 리스트 삭제
squared = arr ** 2 # 새 배열 생성
filtered = squared[squared > 100] # 필터링
return filtered.sum()
result = memory_intensive_function()
print(f"결과: {result}")
python -m memory_profiler your_script.py
mprof run your_script.py
mprof plot # 시각화
1.4 py-spy (샘플링 프로파일러)
실행 중인 Python 프로세스를 프로파일링하는 강력한 도구입니다.
pip install py-spy
# 실행 중인 프로세스 프로파일링
py-spy top --pid 12345
# 플레임 그래프 생성
py-spy record -o profile.svg --pid 12345
# 스크립트 직접 프로파일링
py-spy record -o profile.svg -- python your_script.py
1.5 SnakeViz 시각화
pip install snakeviz
python -m cProfile -o output.prof your_script.py
snakeviz output.prof
2. NumPy 벡터화 심화
NumPy 벡터화는 Python AI/ML 성능 최적화의 핵심입니다.
2.1 for loop vs 벡터화 성능 비교
import numpy as np
import time
# 성능 비교 함수
def benchmark(func, *args, runs=5):
times = []
for _ in range(runs):
start = time.perf_counter()
result = func(*args)
end = time.perf_counter()
times.append(end - start)
return np.mean(times), np.std(times), result
# 1. Python for loop
def relu_loop(x):
result = []
for val in x:
result.append(max(0, val))
return result
# 2. NumPy 벡터화
def relu_numpy(x):
return np.maximum(0, x)
# 3. NumPy 조건 인덱싱
def relu_clip(x):
return np.clip(x, 0, None)
# 비교
n = 1_000_000
data_list = [np.random.randn() for _ in range(n)]
data_np = np.array(data_list)
t_loop, _, _ = benchmark(relu_loop, data_list)
t_numpy, _, _ = benchmark(relu_numpy, data_np)
t_clip, _, _ = benchmark(relu_clip, data_np)
print(f"Python loop: {t_loop:.4f}s")
print(f"NumPy maximum: {t_numpy:.4f}s ({t_loop/t_numpy:.0f}x faster)")
print(f"NumPy clip: {t_clip:.4f}s ({t_loop/t_clip:.0f}x faster)")
2.2 Universal Functions (ufuncs)
import numpy as np
# 커스텀 ufunc 생성
def custom_activation_scalar(x, alpha=0.1):
"""ELU 활성화 함수"""
if x >= 0:
return x
else:
return alpha * (np.exp(x) - 1)
# vectorize로 ufunc 생성
elu_ufunc = np.vectorize(custom_activation_scalar)
# 완전한 NumPy ufunc 생성 (더 빠름)
def elu_numpy(x, alpha=0.1):
return np.where(x >= 0, x, alpha * (np.exp(x) - 1))
# frompyfunc으로 ufunc 생성
elu_frompyfunc = np.frompyfunc(
lambda x: x if x >= 0 else 0.1 * (np.exp(x) - 1),
1, 1
)
# 성능 테스트
x = np.random.randn(1_000_000)
t_vectorize, _, _ = benchmark(elu_ufunc, x)
t_numpy, _, _ = benchmark(elu_numpy, x)
print(f"np.vectorize: {t_vectorize:.4f}s")
print(f"np.where: {t_numpy:.4f}s")
2.3 np.einsum (아인슈타인 합산)
import numpy as np
# 아인슈타인 합산 표기법 예시
batch_size, seq_len, d_model = 32, 128, 512
heads, d_head = 8, 64
# 행렬 곱셈
A = np.random.randn(batch_size, seq_len, d_model)
W = np.random.randn(d_model, d_model)
# 표준 방법
result_matmul = A @ W # (32, 128, 512)
# einsum으로 같은 연산
result_einsum = np.einsum('bsi,ij->bsj', A, W)
# 어텐션 스코어 계산 (배치 행렬 곱)
Q = np.random.randn(batch_size, heads, seq_len, d_head)
K = np.random.randn(batch_size, heads, seq_len, d_head)
# 표준 방법
scores_manual = Q @ K.transpose(0, 1, 3, 2) # (32, 8, 128, 128)
# einsum으로 어텐션 스코어
scores_einsum = np.einsum('bhid,bhjd->bhij', Q, K)
# 배치 외적 (outer product)
a = np.random.randn(batch_size, d_model)
b = np.random.randn(batch_size, d_model)
outer = np.einsum('bi,bj->bij', a, b) # (32, 512, 512)
# 트레이스 (대각합)
M = np.random.randn(batch_size, d_model, d_model)
trace = np.einsum('bii->b', M) # (32,)
print(f"어텐션 스코어 shape: {scores_einsum.shape}")
print(f"외적 shape: {outer.shape}")
print(f"트레이스 shape: {trace.shape}")
# optimize 파라미터로 최적화 경로 선택
result = np.einsum('bhid,bhjd->bhij', Q, K, optimize='optimal')
2.4 스트라이드 트릭 (stride tricks)
import numpy as np
from numpy.lib.stride_tricks import as_strided, sliding_window_view
# 슬라이딩 윈도우 (오래된 방법)
def sliding_window_strided(arr, window_size):
"""스트라이드 트릭으로 슬라이딩 윈도우 생성"""
shape = (arr.shape[0] - window_size + 1, window_size)
strides = (arr.strides[0], arr.strides[0])
return as_strided(arr, shape=shape, strides=strides)
# 새로운 방법 (NumPy 1.20+)
arr = np.arange(20, dtype=float)
window_size = 5
# 현대적인 방법
windows = sliding_window_view(arr, window_size)
print(f"슬라이딩 윈도우 shape: {windows.shape}") # (16, 5)
# 이동 평균 계산
moving_avg = windows.mean(axis=-1)
print(f"이동 평균: {moving_avg[:5]}")
# 2D 슬라이딩 윈도우 (컨볼루션 준비)
image = np.random.randn(64, 64)
kernel_size = (3, 3)
windows_2d = sliding_window_view(image, kernel_size)
print(f"2D 윈도우 shape: {windows_2d.shape}") # (62, 62, 3, 3)
# 배치 처리에서 스트라이드 트릭
batch = np.random.randn(1000, 10) # (샘플, 특성)
# 오버랩 없는 청크
chunk_size = 50
n_chunks = len(batch) // chunk_size
chunks = batch[:n_chunks * chunk_size].reshape(n_chunks, chunk_size, -1)
print(f"청크 shape: {chunks.shape}") # (20, 50, 10)
3. Numba JIT 컴파일
Numba는 Python 함수를 기계어 코드로 컴파일하여 C/C++ 수준의 성능을 제공합니다.
3.1 @jit, @njit 데코레이터
from numba import jit, njit, prange
import numpy as np
import time
# @jit: 폴백 모드 지원 (파이썬 객체 허용)
@jit(nopython=False)
def sum_with_jit(arr):
total = 0.0
for i in range(len(arr)):
total += arr[i]
return total
# @njit: 순수 네이티브 모드 (더 빠르고 권장됨)
@njit
def sum_with_njit(arr):
total = 0.0
for i in range(len(arr)):
total += arr[i]
return total
# 복잡한 함수 예시
@njit
def compute_distances(points):
"""점들 간의 유클리드 거리 행렬 계산"""
n = len(points)
distances = np.zeros((n, n))
for i in range(n):
for j in range(i + 1, n):
diff = points[i] - points[j]
dist = np.sqrt(np.sum(diff ** 2))
distances[i, j] = dist
distances[j, i] = dist
return distances
# 성능 비교
arr = np.random.randn(1_000_000)
# 첫 호출은 컴파일 시간 포함
_ = sum_with_njit(arr) # 워밍업
start = time.perf_counter()
result_numpy = np.sum(arr)
print(f"NumPy sum: {time.perf_counter() - start:.4f}s")
start = time.perf_counter()
result_njit = sum_with_njit(arr)
print(f"Numba njit: {time.perf_counter() - start:.4f}s")
# 포인트 클라우드 거리 계산
points = np.random.randn(500, 3).astype(np.float64)
_ = compute_distances(points) # 워밍업
start = time.perf_counter()
dist_matrix = compute_distances(points)
print(f"Numba 거리 행렬 ({points.shape[0]}x{points.shape[0]}): {time.perf_counter() - start:.4f}s")
3.2 타입 추론과 명시적 타입 선언
from numba import njit, float64, int64
from numba import types
import numpy as np
# 명시적 타입으로 컴파일 (첫 호출 지연 없음)
@njit(float64[:](float64[:], float64))
def scale_array_typed(arr, factor):
return arr * factor
# 여러 타입 시그니처
from numba import float32, float64
@njit([
float32[:](float32[:], float32),
float64[:](float64[:], float64)
])
def scale_multi_type(arr, factor):
return arr * factor
# 복잡한 타입 선언
@njit
def softmax_numba(x):
"""Numba로 가속된 소프트맥스"""
max_val = np.max(x)
exp_x = np.exp(x - max_val)
return exp_x / np.sum(exp_x)
# 배치 소프트맥스
@njit
def batch_softmax(X):
"""2D 배열의 배치 소프트맥스"""
result = np.empty_like(X)
for i in range(X.shape[0]):
result[i] = softmax_numba(X[i])
return result
# 테스트
X = np.random.randn(1000, 512).astype(np.float64)
_ = batch_softmax(X) # 워밍업
start = time.perf_counter()
result = batch_softmax(X)
print(f"Numba 배치 소프트맥스: {time.perf_counter() - start:.4f}s")
3.3 병렬 처리 (@prange)
from numba import njit, prange
import numpy as np
@njit(parallel=True)
def parallel_normalize(X):
"""행별 정규화 (병렬 처리)"""
result = np.empty_like(X)
for i in prange(X.shape[0]): # 자동 병렬화
row = X[i]
mean = row.mean()
std = row.std()
result[i] = (row - mean) / (std + 1e-8)
return result
@njit(parallel=True)
def parallel_matrix_multiply(A, B):
"""병렬 행렬 곱셈"""
m, k = A.shape
k2, n = B.shape
C = np.zeros((m, n))
for i in prange(m):
for j in range(n):
for l in range(k):
C[i, j] += A[i, l] * B[l, j]
return C
# 병렬 배치 처리
@njit(parallel=True)
def parallel_batch_process(data, weights):
"""배치 데이터의 가중 합산 (병렬)"""
n_samples, n_features = data.shape
results = np.zeros(n_samples)
for i in prange(n_samples):
dot = 0.0
for j in range(n_features):
dot += data[i, j] * weights[j]
results[i] = dot
return results
# 성능 테스트
X = np.random.randn(10000, 512).astype(np.float64)
# 워밍업
_ = parallel_normalize(X)
start = time.perf_counter()
result_parallel = parallel_normalize(X)
t_parallel = time.perf_counter() - start
# 비교: 순차 버전
@njit
def sequential_normalize(X):
result = np.empty_like(X)
for i in range(X.shape[0]):
row = X[i]
mean = row.mean()
std = row.std()
result[i] = (row - mean) / (std + 1e-8)
return result
_ = sequential_normalize(X)
start = time.perf_counter()
result_seq = sequential_normalize(X)
t_seq = time.perf_counter() - start
print(f"순차 처리: {t_seq:.4f}s")
print(f"병렬 처리: {t_parallel:.4f}s ({t_seq/t_parallel:.1f}x 빠름)")
3.4 Numba 캐싱
from numba import njit
import numpy as np
# cache=True로 컴파일된 코드 저장
@njit(cache=True)
def cached_computation(x, y):
"""캐시된 JIT 함수 - 다음 실행시 즉시 로드"""
result = np.empty(len(x))
for i in range(len(x)):
result[i] = x[i] ** 2 + y[i] ** 2
return result
# 컴파일된 함수 정보 확인
arr1 = np.random.randn(1000)
arr2 = np.random.randn(1000)
result = cached_computation(arr1, arr2)
print(f"캐시 파일 위치: {cached_computation.stats}")
4. Cython
Cython은 Python 코드를 C 코드로 변환하여 컴파일하는 도구입니다.
4.1 .pyx 파일 작성
# setup.py - Cython 빌드 설정
from setuptools import setup
from Cython.Build import cythonize
import numpy as np
setup(
ext_modules=cythonize(
"fast_ops.pyx",
compiler_directives={
'language_level': '3',
'boundscheck': False,
'wraparound': False,
'nonecheck': False,
}
),
include_dirs=[np.get_include()]
)
fast_ops.pyx 파일 예시입니다.
# fast_ops.pyx
# cython: language_level=3
# cython: boundscheck=False
# cython: wraparound=False
import numpy as np
cimport numpy as cnp
from libc.math cimport exp, sqrt
# C 타입 선언으로 최대 성능 달성
def relu_cython(cnp.ndarray[cnp.double_t, ndim=1] x):
cdef int n = len(x)
cdef cnp.ndarray[cnp.double_t, ndim=1] result = np.empty(n)
cdef int i
cdef double val
for i in range(n):
val = x[i]
result[i] = val if val > 0 else 0.0
return result
# 행렬 연산
def matrix_dot_cython(
cnp.ndarray[cnp.double_t, ndim=2] A,
cnp.ndarray[cnp.double_t, ndim=2] B
):
cdef int m = A.shape[0]
cdef int k = A.shape[1]
cdef int n = B.shape[1]
cdef cnp.ndarray[cnp.double_t, ndim=2] C = np.zeros((m, n))
cdef int i, j, l
cdef double sum_val
for i in range(m):
for j in range(n):
sum_val = 0.0
for l in range(k):
sum_val += A[i, l] * B[l, j]
C[i, j] = sum_val
return C
# 소프트맥스 (C 수학 함수 사용)
def softmax_cython(cnp.ndarray[cnp.double_t, ndim=1] x):
cdef int n = len(x)
cdef cnp.ndarray[cnp.double_t, ndim=1] result = np.empty(n)
cdef double max_val = x[0]
cdef double sum_exp = 0.0
cdef int i
for i in range(n):
if x[i] > max_val:
max_val = x[i]
for i in range(n):
result[i] = exp(x[i] - max_val)
sum_exp += result[i]
for i in range(n):
result[i] /= sum_exp
return result
빌드 및 사용합니다.
python setup.py build_ext --inplace
import numpy as np
import fast_ops # 컴파일된 Cython 모듈
x = np.random.randn(1_000_000)
result = fast_ops.relu_cython(x)
4.2 IPython Cython 매직
# Jupyter에서 Cython 사용
%load_ext Cython
%%cython -a
# -a: 어노테이션 모드 (노란색 = Python, 흰색 = C)
import numpy as np
cimport numpy as cnp
def fast_l2_norm(cnp.ndarray[cnp.double_t, ndim=1] v):
cdef int n = len(v)
cdef double total = 0.0
cdef int i
for i in range(n):
total += v[i] * v[i]
return total ** 0.5
5. Python 멀티프로세싱
5.1 GIL (Global Interpreter Lock) 이해
GIL은 CPython에서 한 번에 하나의 스레드만 Python 바이트코드를 실행하도록 합니다. CPU 바운드 작업에서는 멀티스레딩이 효과 없으므로 멀티프로세싱을 사용해야 합니다.
import threading
import multiprocessing
import time
import numpy as np
def cpu_bound_task(n):
"""CPU 집약적인 작업"""
total = 0
for i in range(n):
total += i ** 2
return total
# 스레딩 (GIL로 인해 실질적 병렬화 없음)
def run_threaded(n_tasks=4, n_per_task=1_000_000):
threads = []
start = time.perf_counter()
for _ in range(n_tasks):
t = threading.Thread(target=cpu_bound_task, args=(n_per_task,))
threads.append(t)
t.start()
for t in threads:
t.join()
return time.perf_counter() - start
# 멀티프로세싱 (진정한 병렬화)
def run_multiprocessing(n_tasks=4, n_per_task=1_000_000):
start = time.perf_counter()
with multiprocessing.Pool(processes=n_tasks) as pool:
results = pool.map(cpu_bound_task, [n_per_task] * n_tasks)
return time.perf_counter() - start
t_threaded = run_threaded()
t_mp = run_multiprocessing()
print(f"스레딩: {t_threaded:.2f}s")
print(f"멀티프로세싱: {t_mp:.2f}s ({t_threaded/t_mp:.1f}x 빠름)")
5.2 multiprocessing.Pool
import multiprocessing
import numpy as np
from functools import partial
def preprocess_sample(args):
"""단일 샘플 전처리"""
idx, data, mean, std = args
normalized = (data - mean) / (std + 1e-8)
augmented = normalized + np.random.randn(*normalized.shape) * 0.01
return idx, augmented
def preprocess_dataset_parallel(dataset, n_workers=None):
"""데이터셋 병렬 전처리"""
if n_workers is None:
n_workers = multiprocessing.cpu_count()
mean = dataset.mean(axis=0)
std = dataset.std(axis=0)
# 인자 패키징
args = [(i, dataset[i], mean, std) for i in range(len(dataset))]
with multiprocessing.Pool(processes=n_workers) as pool:
# map: 결과 순서 보장
results = pool.map(preprocess_sample, args, chunksize=100)
# 결과 정렬 및 재구성
results.sort(key=lambda x: x[0])
return np.stack([r[1] for r in results])
# imap_unordered로 스트리밍 처리
def process_large_dataset(data_path_list, n_workers=4):
"""대용량 파일 병렬 처리"""
def load_and_process(path):
data = np.load(path)
return data.mean(), data.std()
results = []
with multiprocessing.Pool(processes=n_workers) as pool:
# 결과를 순서 없이 받아 메모리 효율적으로 처리
for result in pool.imap_unordered(load_and_process, data_path_list):
results.append(result)
return results
# 테스트
dataset = np.random.randn(10000, 128)
processed = preprocess_dataset_parallel(dataset, n_workers=4)
print(f"처리된 데이터셋 shape: {processed.shape}")
5.3 ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor, as_completed
import numpy as np
def train_fold(fold_data):
"""교차 검증 폴드 학습"""
fold_idx, X_train, y_train, X_val, y_val = fold_data
# 간단한 로지스틱 회귀 (예시)
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
model = LogisticRegression(max_iter=1000)
model.fit(X_train, y_train)
val_pred = model.predict(X_val)
accuracy = accuracy_score(y_val, val_pred)
return fold_idx, accuracy, model
def parallel_cross_validation(X, y, n_folds=5):
"""ProcessPoolExecutor로 병렬 교차 검증"""
from sklearn.model_selection import KFold
kf = KFold(n_splits=n_folds, shuffle=True, random_state=42)
fold_data_list = []
for fold_idx, (train_idx, val_idx) in enumerate(kf.split(X)):
fold_data_list.append((
fold_idx,
X[train_idx], y[train_idx],
X[val_idx], y[val_idx]
))
results = {}
with ProcessPoolExecutor(max_workers=n_folds) as executor:
# 모든 폴드 동시 실행
future_to_fold = {
executor.submit(train_fold, fd): fd[0]
for fd in fold_data_list
}
for future in as_completed(future_to_fold):
fold_idx, accuracy, model = future.result()
results[fold_idx] = {
'accuracy': accuracy,
'model': model
}
print(f"폴드 {fold_idx}: 정확도 = {accuracy:.4f}")
avg_accuracy = np.mean([r['accuracy'] for r in results.values()])
print(f"\n평균 정확도: {avg_accuracy:.4f}")
return results
# 테스트 데이터 생성
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=5000, n_features=20, random_state=42)
results = parallel_cross_validation(X, y, n_folds=5)
5.4 공유 메모리 (SharedMemory)
from multiprocessing import shared_memory
import numpy as np
import multiprocessing
def worker_shared_memory(shm_name, shape, dtype, start_idx, end_idx):
"""공유 메모리에서 데이터 처리"""
# 기존 공유 메모리에 연결
existing_shm = shared_memory.SharedMemory(name=shm_name)
arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
# 슬라이스 처리
arr[start_idx:end_idx] = arr[start_idx:end_idx] ** 2
existing_shm.close()
def process_with_shared_memory(data):
"""공유 메모리를 사용한 병렬 처리"""
# 공유 메모리 생성
shm = shared_memory.SharedMemory(create=True, size=data.nbytes)
shared_array = np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf)
shared_array[:] = data[:] # 데이터 복사
n_workers = multiprocessing.cpu_count()
chunk_size = len(data) // n_workers
processes = []
for i in range(n_workers):
start = i * chunk_size
end = start + chunk_size if i < n_workers - 1 else len(data)
p = multiprocessing.Process(
target=worker_shared_memory,
args=(shm.name, data.shape, data.dtype, start, end)
)
processes.append(p)
p.start()
for p in processes:
p.join()
result = shared_array.copy()
shm.close()
shm.unlink() # 공유 메모리 해제
return result
# 테스트
data = np.random.randn(1_000_000).astype(np.float64)
result = process_with_shared_memory(data)
print(f"공유 메모리 처리 완료: {result[:5]}")
6. 비동기 프로그래밍 (asyncio)
6.1 async/await 패턴
import asyncio
import time
# 기본 비동기 함수
async def fetch_embeddings(text: str, model: str = "text-embedding-3-small") -> list:
"""비동기 임베딩 요청 시뮬레이션"""
await asyncio.sleep(0.1) # API 호출 시뮬레이션
return [0.1, 0.2, 0.3] # 더미 임베딩
# 순차 처리 (비교용)
async def sequential_processing(texts: list) -> list:
results = []
for text in texts:
embedding = await fetch_embeddings(text)
results.append(embedding)
return results
# 병렬 처리
async def parallel_processing(texts: list) -> list:
tasks = [fetch_embeddings(text) for text in texts]
results = await asyncio.gather(*tasks)
return list(results)
# 성능 비교
async def compare_approaches():
texts = [f"텍스트 샘플 {i}" for i in range(20)]
start = time.perf_counter()
sequential_results = await sequential_processing(texts)
t_seq = time.perf_counter() - start
start = time.perf_counter()
parallel_results = await parallel_processing(texts)
t_par = time.perf_counter() - start
print(f"순차 처리: {t_seq:.2f}s ({len(sequential_results)} 결과)")
print(f"병렬 처리: {t_par:.2f}s ({len(parallel_results)} 결과)")
print(f"속도 향상: {t_seq/t_par:.1f}x")
asyncio.run(compare_approaches())
6.2 aiohttp로 비동기 HTTP
import asyncio
import aiohttp
import time
from typing import Optional
class AsyncAIClient:
"""비동기 AI API 클라이언트"""
def __init__(self, api_key: str, base_url: str, max_concurrent: int = 10):
self.api_key = api_key
self.base_url = base_url
self.semaphore = asyncio.Semaphore(max_concurrent) # 동시 요청 제한
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
connector = aiohttp.TCPConnector(limit=100)
self.session = aiohttp.ClientSession(
connector=connector,
headers={"Authorization": f"Bearer {self.api_key}"}
)
return self
async def __aexit__(self, *args):
if self.session:
await self.session.close()
async def get_embedding(self, text: str, retry: int = 3) -> list:
"""임베딩 요청 (재시도 포함)"""
async with self.semaphore:
for attempt in range(retry):
try:
async with self.session.post(
f"{self.base_url}/embeddings",
json={"input": text, "model": "text-embedding-3-small"}
) as response:
if response.status == 200:
data = await response.json()
return data["data"][0]["embedding"]
elif response.status == 429:
# Rate limit: 지수 백오프
await asyncio.sleep(2 ** attempt)
else:
response.raise_for_status()
except aiohttp.ClientError as e:
if attempt == retry - 1:
raise
await asyncio.sleep(1)
return []
async def batch_embeddings(self, texts: list, batch_size: int = 100) -> list:
"""배치 임베딩 처리"""
all_results = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
tasks = [self.get_embedding(text) for text in batch]
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
for j, result in enumerate(batch_results):
if isinstance(result, Exception):
print(f"오류 발생 텍스트 {i+j}: {result}")
all_results.append(None)
else:
all_results.append(result)
return all_results
# 사용 예시
async def main():
texts = [f"AI/ML 텍스트 샘플 {i}" for i in range(50)]
async with AsyncAIClient(
api_key="your-api-key",
base_url="https://api.openai.com/v1",
max_concurrent=20
) as client:
# 실제로는 API 호출하지만 여기서는 시뮬레이션
print("배치 임베딩 처리 시작...")
# embeddings = await client.batch_embeddings(texts)
# print(f"임베딩 수: {len(embeddings)}")
# asyncio.run(main())
6.3 AI API 배치 호출 최적화
import asyncio
from dataclasses import dataclass, field
from typing import Any
import time
@dataclass
class BatchProcessor:
"""동적 배치 처리기"""
max_batch_size: int = 20
max_wait_time: float = 0.05 # 50ms
queue: asyncio.Queue = field(default_factory=asyncio.Queue)
results: dict = field(default_factory=dict)
async def process_request(self, request_id: str, data: Any) -> Any:
"""요청을 큐에 추가하고 결과 대기"""
future = asyncio.get_event_loop().create_future()
await self.queue.put((request_id, data, future))
return await future
async def batch_worker(self):
"""배치 처리 워커"""
while True:
batch = []
deadline = time.perf_counter() + self.max_wait_time
# 배치 수집
while len(batch) < self.max_batch_size:
timeout = deadline - time.perf_counter()
if timeout <= 0:
break
try:
item = await asyncio.wait_for(
self.queue.get(),
timeout=timeout
)
batch.append(item)
except asyncio.TimeoutError:
break
if not batch:
await asyncio.sleep(0.001)
continue
# 배치 처리 (API 호출 시뮬레이션)
request_ids, data_list, futures = zip(*batch)
try:
# 여기서 실제 배치 API 호출
results = await self._call_api_batch(list(data_list))
for future, result in zip(futures, results):
future.set_result(result)
except Exception as e:
for future in futures:
future.set_exception(e)
async def _call_api_batch(self, data_list: list) -> list:
"""배치 API 호출 시뮬레이션"""
await asyncio.sleep(0.1) # API 지연 시뮬레이션
return [f"결과_{d}" for d in data_list]
# 테스트
async def test_batch_processor():
processor = BatchProcessor(max_batch_size=10, max_wait_time=0.05)
# 백그라운드 워커 시작
worker_task = asyncio.create_task(processor.batch_worker())
# 동시 요청 시뮬레이션
async def make_request(i):
result = await processor.process_request(f"req_{i}", f"data_{i}")
return result
start = time.perf_counter()
tasks = [make_request(i) for i in range(50)]
results = await asyncio.gather(*tasks)
elapsed = time.perf_counter() - start
print(f"50개 요청 처리: {elapsed:.3f}s")
print(f"처음 5개 결과: {results[:5]}")
worker_task.cancel()
asyncio.run(test_batch_processor())
7. 메모리 관리
7.1 Python 메모리 모델
import sys
import gc
# 객체 크기 확인
data_types = {
'int': 42,
'float': 3.14,
'str (small)': "hello",
'str (large)': "hello" * 1000,
'list (empty)': [],
'dict (empty)': {},
'tuple (empty)': (),
}
print("객체 크기:")
for name, obj in data_types.items():
print(f" {name}: {sys.getsizeof(obj)} bytes")
# 중첩 구조의 실제 메모리 사용량
def deep_size(obj, seen=None):
"""객체와 모든 참조 객체의 총 메모리 계산"""
if seen is None:
seen = set()
obj_id = id(obj)
if obj_id in seen:
return 0
seen.add(obj_id)
size = sys.getsizeof(obj)
if isinstance(obj, dict):
size += sum(deep_size(k, seen) + deep_size(v, seen)
for k, v in obj.items())
elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes)):
size += sum(deep_size(item, seen) for item in obj)
return size
nested = {'data': [i for i in range(1000)], 'meta': {'name': 'test'}}
print(f"\n중첩 딕셔너리 크기: {deep_size(nested):,} bytes")
7.2 제너레이터로 메모리 절약
import numpy as np
from typing import Generator, Iterator
# 대용량 데이터셋 제너레이터
def dataset_generator(
file_paths: list,
batch_size: int = 32
) -> Generator:
"""메모리 효율적인 데이터 로더"""
for path in file_paths:
# 실제로는 파일에서 로드
data = np.random.randn(1000, 128)
labels = np.random.randint(0, 10, 1000)
for start in range(0, len(data), batch_size):
end = min(start + batch_size, len(data))
yield data[start:end], labels[start:end]
# 메모리 효율 비교
def load_all_to_memory(n_samples: int = 100_000) -> list:
"""전체 데이터를 메모리에 로드"""
return [np.random.randn(128) for _ in range(n_samples)]
def streaming_process(n_samples: int = 100_000) -> float:
"""스트리밍 처리"""
total = 0.0
for i in range(n_samples):
sample = np.random.randn(128)
total += sample.sum()
return total
# 제너레이터 파이프라인
def augment_generator(gen: Iterator, noise_std: float = 0.01):
"""제너레이터 체이닝"""
for X, y in gen:
X_aug = X + np.random.randn(*X.shape) * noise_std
yield X_aug, y
def normalize_generator(gen: Iterator):
"""정규화 제너레이터"""
for X, y in gen:
X_norm = (X - X.mean(axis=1, keepdims=True)) / (X.std(axis=1, keepdims=True) + 1e-8)
yield X_norm, y
# 파이프라인 구성
fake_paths = [f"data_{i}.npy" for i in range(5)]
base_gen = dataset_generator(fake_paths, batch_size=32)
aug_gen = augment_generator(base_gen)
norm_gen = normalize_generator(aug_gen)
# 한 번에 한 배치씩 처리
for i, (X_batch, y_batch) in enumerate(norm_gen):
if i >= 3:
break
print(f"배치 {i}: shape={X_batch.shape}, 평균={X_batch.mean():.4f}")
7.3 __slots__ 최적화
import sys
# 일반 클래스
class RegularEmbedding:
def __init__(self, vector, label, metadata):
self.vector = vector
self.label = label
self.metadata = metadata
# __slots__ 클래스
class SlottedEmbedding:
__slots__ = ['vector', 'label', 'metadata']
def __init__(self, vector, label, metadata):
self.vector = vector
self.label = label
self.metadata = metadata
import numpy as np
# 메모리 비교
n = 100_000
vector_size = 128
regular_objects = [
RegularEmbedding(
np.random.randn(vector_size),
i % 10,
{'source': 'test'}
) for i in range(n)
]
slotted_objects = [
SlottedEmbedding(
np.random.randn(vector_size),
i % 10,
{'source': 'test'}
) for i in range(n)
]
regular_size = sys.getsizeof(regular_objects[0]) + sys.getsizeof(regular_objects[0].__dict__)
slotted_size = sys.getsizeof(slotted_objects[0])
print(f"일반 클래스 인스턴스: {regular_size} bytes")
print(f"__slots__ 클래스 인스턴스: {slotted_size} bytes")
print(f"메모리 절약: {(regular_size - slotted_size) / regular_size * 100:.1f}%")
7.4 weakref
import weakref
import gc
class ModelCache:
"""약한 참조를 사용한 모델 캐시"""
def __init__(self):
self._cache = weakref.WeakValueDictionary()
def get_or_load(self, model_name: str):
model = self._cache.get(model_name)
if model is None:
print(f"모델 로딩: {model_name}")
model = self._load_model(model_name)
self._cache[model_name] = model
else:
print(f"캐시에서 반환: {model_name}")
return model
def _load_model(self, model_name: str):
# 실제로는 모델 파일 로드
import numpy as np
return {'name': model_name, 'weights': np.random.randn(100, 100)}
cache = ModelCache()
# 첫 번째 로드
model = cache.get_or_load('resnet50')
print(f"캐시 크기: {len(cache._cache)}")
# 다시 요청 - 캐시에서 반환
model_again = cache.get_or_load('resnet50')
# 강한 참조 제거 후 GC
del model
del model_again
gc.collect()
# 이제 캐시에서 자동으로 제거됨
print(f"GC 후 캐시 크기: {len(cache._cache)}")
# 재로드
new_model = cache.get_or_load('resnet50')
8. 파이썬 타입 힌팅
8.1 mypy로 정적 타입 검사
pip install mypy
mypy your_script.py --strict
from typing import Optional, Union, List, Dict, Tuple, Callable
import numpy as np
from numpy.typing import NDArray
# 기본 타입 힌팅
def compute_loss(
predictions: NDArray[np.float32],
targets: NDArray[np.float32],
reduction: str = 'mean'
) -> float:
diff = predictions - targets
loss = np.sum(diff ** 2)
if reduction == 'mean':
return float(loss / len(predictions))
return float(loss)
# Optional과 Union
def load_checkpoint(
path: str,
device: Optional[str] = None,
strict: bool = True
) -> Optional[Dict[str, NDArray]]:
"""체크포인트 로드 (실패시 None 반환)"""
try:
# 실제로는 파일 로드
return {'weights': np.random.randn(100)}
except FileNotFoundError:
return None
# 복잡한 반환 타입
def split_dataset(
X: NDArray,
y: NDArray,
test_size: float = 0.2,
random_state: Optional[int] = None
) -> Tuple[NDArray, NDArray, NDArray, NDArray]:
if random_state is not None:
np.random.seed(random_state)
n = len(X)
n_test = int(n * test_size)
indices = np.random.permutation(n)
test_idx = indices[:n_test]
train_idx = indices[n_test:]
return X[train_idx], X[test_idx], y[train_idx], y[test_idx]
8.2 제네릭 타입과 Protocol
from typing import TypeVar, Generic, Protocol, runtime_checkable
import numpy as np
T = TypeVar('T')
T_co = TypeVar('T_co', covariant=True)
# 제네릭 클래스
class DataLoader(Generic[T]):
def __init__(self, dataset: List[T], batch_size: int = 32):
self.dataset = dataset
self.batch_size = batch_size
self._idx = 0
def __iter__(self):
self._idx = 0
return self
def __next__(self) -> List[T]:
if self._idx >= len(self.dataset):
raise StopIteration
batch = self.dataset[self._idx:self._idx + self.batch_size]
self._idx += self.batch_size
return batch
# Protocol (구조적 서브타이핑)
@runtime_checkable
class Optimizer(Protocol):
"""옵티마이저 프로토콜"""
def step(self) -> None: ...
def zero_grad(self) -> None: ...
def state_dict(self) -> Dict: ...
@runtime_checkable
class Model(Protocol):
def forward(self, x: NDArray) -> NDArray: ...
def parameters(self) -> List[NDArray]: ...
def train(self) -> None: ...
def eval(self) -> None: ...
# Protocol을 사용한 함수
def train_epoch(
model: Model,
optimizer: Optimizer,
dataloader: DataLoader,
loss_fn: Callable[[NDArray, NDArray], float]
) -> float:
model.train()
total_loss = 0.0
n_batches = 0
for batch in dataloader:
optimizer.zero_grad()
# 실제 학습 로직
n_batches += 1
return total_loss / max(n_batches, 1)
8.3 dataclass와 TypedDict
from dataclasses import dataclass, field
from typing import TypedDict, Optional
import numpy as np
# TypedDict: 딕셔너리 타입 힌팅
class ModelConfig(TypedDict):
hidden_size: int
num_layers: int
dropout: float
activation: str
class TrainingConfig(TypedDict, total=False):
learning_rate: float
batch_size: int
max_epochs: int
early_stopping: bool
# dataclass
@dataclass
class ExperimentConfig:
# 필수 필드
experiment_name: str
model_type: str
# 기본값 있는 필드
learning_rate: float = 1e-3
batch_size: int = 32
max_epochs: int = 100
seed: int = 42
# 복잡한 기본값 (field 사용)
tags: List[str] = field(default_factory=list)
hyperparams: Dict[str, float] = field(default_factory=dict)
# 후처리 메서드
def __post_init__(self):
if self.learning_rate <= 0:
raise ValueError(f"학습률은 양수여야 합니다: {self.learning_rate}")
if not self.experiment_name:
raise ValueError("실험 이름이 필요합니다")
def to_dict(self) -> Dict:
from dataclasses import asdict
return asdict(self)
# 사용 예시
config = ExperimentConfig(
experiment_name="bert-finetuning-v1",
model_type="bert-base",
learning_rate=2e-5,
batch_size=16,
tags=["nlp", "classification"]
)
print(f"실험: {config.experiment_name}")
print(f"설정: {config.to_dict()}")
8.4 pydantic으로 데이터 검증
from pydantic import BaseModel, Field, validator, root_validator
from typing import Optional, List
import numpy as np
class DataConfig(BaseModel):
"""Pydantic으로 데이터 설정 검증"""
data_path: str
batch_size: int = Field(ge=1, le=1024, default=32)
shuffle: bool = True
num_workers: int = Field(ge=0, le=16, default=4)
@validator('data_path')
def path_must_exist(cls, v):
import os
if not os.path.exists(v) and v != "test_path":
raise ValueError(f"데이터 경로가 존재하지 않습니다: {v}")
return v
class ModelConfig(BaseModel):
"""모델 설정"""
architecture: str
hidden_sizes: List[int] = Field(min_items=1)
dropout_rate: float = Field(ge=0.0, le=0.9, default=0.1)
activation: str = "relu"
@validator('activation')
def valid_activation(cls, v):
valid = ['relu', 'gelu', 'silu', 'tanh', 'sigmoid']
if v not in valid:
raise ValueError(f"활성화 함수는 {valid} 중 하나여야 합니다")
return v
@validator('hidden_sizes')
def sizes_must_be_positive(cls, v):
if any(s <= 0 for s in v):
raise ValueError("히든 크기는 양수여야 합니다")
return v
class TrainingConfig(BaseModel):
"""학습 설정 (중첩 모델)"""
data: DataConfig
model: ModelConfig
learning_rate: float = Field(gt=0, le=1.0, default=1e-3)
weight_decay: float = Field(ge=0, default=1e-4)
max_epochs: int = Field(ge=1, le=10000, default=100)
@root_validator
def check_consistency(cls, values):
model = values.get('model')
if model and model.dropout_rate > 0.5:
print("경고: 높은 드롭아웃 비율은 과소적합을 유발할 수 있습니다")
return values
class Config:
# JSON 스키마 생성 허용
schema_extra = {
"example": {
"data": {
"data_path": "/data/train",
"batch_size": 32
},
"model": {
"architecture": "transformer",
"hidden_sizes": [512, 256]
},
"learning_rate": 1e-4
}
}
# 사용 및 검증
try:
config = TrainingConfig(
data=DataConfig(data_path="test_path", batch_size=64),
model=ModelConfig(
architecture="transformer",
hidden_sizes=[512, 256, 128],
dropout_rate=0.1
),
learning_rate=2e-5
)
print(f"설정 검증 성공: {config.model.architecture}")
print(f"JSON: {config.json(indent=2)}")
except Exception as e:
print(f"검증 오류: {e}")
9. 패키지 관리와 의존성
9.1 poetry vs pip vs conda
# Poetry 설치 및 프로젝트 초기화
curl -sSL https://install.python-poetry.org | python3 -
poetry new my-ml-project
cd my-ml-project
# 의존성 추가
poetry add numpy torch torchvision
poetry add --group dev pytest black ruff mypy
# 가상환경 활성화
poetry shell
# 패키지 설치
poetry install
# requirements.txt 내보내기
poetry export -f requirements.txt --output requirements.txt
9.2 pyproject.toml
[tool.poetry]
name = "my-ml-project"
version = "0.1.0"
description = "AI/ML 프로젝트"
authors = ["개발자 이름 <email@example.com>"]
license = "MIT"
readme = "README.md"
packages = [{include = "my_ml_project"}]
[tool.poetry.dependencies]
python = "^3.10"
numpy = "^1.24"
torch = "^2.1"
torchvision = "^0.16"
transformers = "^4.35"
pydantic = "^2.0"
aiohttp = "^3.9"
[tool.poetry.group.dev.dependencies]
pytest = "^7.4"
pytest-cov = "^4.1"
black = "^23.0"
ruff = "^0.1"
mypy = "^1.6"
pre-commit = "^3.5"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
[tool.black]
line-length = 88
target-version = ['py310', 'py311']
[tool.ruff]
select = ["E", "F", "I", "N", "W"]
ignore = ["E501"]
line-length = 88
[tool.mypy]
python_version = "3.10"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
addopts = "-v --cov=my_ml_project --cov-report=html"
9.3 uv (빠른 패키지 매니저)
# uv 설치 (Rust 기반, 매우 빠름)
curl -LsSf https://astral.sh/uv/install.sh | sh
# 프로젝트 생성
uv init my-project
cd my-project
# 의존성 추가 (pip보다 10-100x 빠름)
uv add numpy torch transformers
uv add --dev pytest ruff mypy
# 가상환경 생성 및 설치
uv sync
# Python 버전 관리
uv python install 3.11
uv python pin 3.11
# 빠른 스크립트 실행
uv run python train.py
10. 클린 코드 for ML
10.1 디자인 패턴 (Factory, Strategy)
from abc import ABC, abstractmethod
from typing import Dict, Type, Optional
import numpy as np
# Strategy 패턴 - 손실 함수
class LossFunction(ABC):
@abstractmethod
def __call__(self, predictions: np.ndarray, targets: np.ndarray) -> float:
pass
@abstractmethod
def gradient(self, predictions: np.ndarray, targets: np.ndarray) -> np.ndarray:
pass
class MSELoss(LossFunction):
def __call__(self, predictions, targets):
return float(np.mean((predictions - targets) ** 2))
def gradient(self, predictions, targets):
return 2 * (predictions - targets) / len(predictions)
class CrossEntropyLoss(LossFunction):
def __call__(self, predictions, targets):
eps = 1e-8
return float(-np.mean(targets * np.log(predictions + eps)))
def gradient(self, predictions, targets):
return -targets / (predictions + 1e-8)
class HuberLoss(LossFunction):
def __init__(self, delta: float = 1.0):
self.delta = delta
def __call__(self, predictions, targets):
diff = predictions - targets
is_small = np.abs(diff) <= self.delta
loss = np.where(is_small, 0.5 * diff**2,
self.delta * np.abs(diff) - 0.5 * self.delta**2)
return float(np.mean(loss))
def gradient(self, predictions, targets):
diff = predictions - targets
return np.where(np.abs(diff) <= self.delta, diff,
self.delta * np.sign(diff))
# Factory 패턴 - 모델 생성
class ModelFactory:
_registry: Dict[str, Type] = {}
@classmethod
def register(cls, name: str):
def decorator(model_class):
cls._registry[name] = model_class
return model_class
return decorator
@classmethod
def create(cls, name: str, **kwargs):
if name not in cls._registry:
available = list(cls._registry.keys())
raise ValueError(f"알 수 없는 모델: {name}. 사용 가능: {available}")
return cls._registry[name](**kwargs)
@ModelFactory.register("linear")
class LinearModel:
def __init__(self, input_size: int, output_size: int):
self.W = np.random.randn(input_size, output_size) * 0.01
self.b = np.zeros(output_size)
def forward(self, X: np.ndarray) -> np.ndarray:
return X @ self.W + self.b
@ModelFactory.register("mlp")
class MLPModel:
def __init__(self, layer_sizes: list):
self.layers = []
for i in range(len(layer_sizes) - 1):
W = np.random.randn(layer_sizes[i], layer_sizes[i+1]) * 0.01
b = np.zeros(layer_sizes[i+1])
self.layers.append((W, b))
def forward(self, X: np.ndarray) -> np.ndarray:
out = X
for i, (W, b) in enumerate(self.layers):
out = out @ W + b
if i < len(self.layers) - 1:
out = np.maximum(0, out) # ReLU
return out
# 사용 예시
model = ModelFactory.create("mlp", layer_sizes=[128, 256, 10])
loss_fn = CrossEntropyLoss()
X = np.random.randn(32, 128)
y = np.zeros((32, 10))
for i in range(32):
y[i, np.random.randint(10)] = 1
predictions = model.forward(X)
loss = loss_fn(predictions, y)
print(f"손실: {loss:.4f}")
10.2 Hydra와 OmegaConf로 설정 관리
pip install hydra-core omegaconf
# config/train.yaml 내용 (주석으로 표시)
# model:
# type: transformer
# hidden_size: 512
# num_layers: 6
# training:
# lr: 1e-4
# batch_size: 32
# max_epochs: 100
import hydra
from omegaconf import DictConfig, OmegaConf
@hydra.main(version_base=None, config_path="config", config_name="train")
def train(cfg: DictConfig):
print(OmegaConf.to_yaml(cfg))
# 설정 접근
model_type = cfg.model.type
lr = cfg.training.lr
print(f"모델: {model_type}, 학습률: {lr}")
# 커맨드라인 오버라이드
# python train.py training.lr=1e-3 model.hidden_size=256
10.3 ML 코드 테스팅 (pytest)
# tests/test_models.py
import pytest
import numpy as np
from numpy.testing import assert_array_almost_equal
class TestLossFunctions:
"""손실 함수 테스트"""
@pytest.fixture
def sample_data(self):
np.random.seed(42)
predictions = np.random.softmax(np.random.randn(10, 5), axis=-1)
targets = np.eye(5)[np.random.randint(0, 5, 10)]
return predictions, targets
def test_mse_zero_loss(self):
"""완벽한 예측에서 손실 = 0"""
x = np.array([1.0, 2.0, 3.0])
loss = MSELoss()(x, x)
assert abs(loss) < 1e-10
def test_mse_positive(self):
"""MSE는 항상 양수"""
predictions = np.random.randn(100)
targets = np.random.randn(100)
loss = MSELoss()(predictions, targets)
assert loss >= 0
def test_gradient_shape(self, sample_data):
"""그래디언트 형상 검증"""
predictions, targets = sample_data
grad = CrossEntropyLoss().gradient(predictions, targets)
assert grad.shape == predictions.shape
@pytest.mark.parametrize("batch_size", [1, 16, 32, 128])
def test_batch_sizes(self, batch_size):
"""다양한 배치 크기에서 동작 확인"""
X = np.random.randn(batch_size, 64)
y = np.random.randn(batch_size, 10)
model = LinearModel(64, 10)
output = model.forward(X)
assert output.shape == (batch_size, 10)
def test_numerical_gradient(self):
"""수치 그래디언트와 해석적 그래디언트 비교"""
loss_fn = MSELoss()
pred = np.array([0.5, 0.3, 0.2])
target = np.array([0.6, 0.2, 0.1])
analytical_grad = loss_fn.gradient(pred, target)
# 수치 그래디언트 계산
eps = 1e-5
numerical_grad = np.zeros_like(pred)
for i in range(len(pred)):
pred_plus = pred.copy()
pred_plus[i] += eps
pred_minus = pred.copy()
pred_minus[i] -= eps
numerical_grad[i] = (loss_fn(pred_plus, target) - loss_fn(pred_minus, target)) / (2 * eps)
assert_array_almost_equal(analytical_grad, numerical_grad, decimal=5)
# conftest.py에서 공통 픽스처
# tests/conftest.py
import pytest
import numpy as np
@pytest.fixture(scope="session")
def sample_dataset():
"""세션 전체에서 공유하는 데이터셋"""
np.random.seed(42)
X = np.random.randn(1000, 128)
y = np.random.randint(0, 10, 1000)
return X, y
# pytest 실행
# pytest tests/ -v --cov=src --cov-report=html -x
10.4 로깅 모범 사례
import logging
import sys
from pathlib import Path
from typing import Optional
def setup_logger(
name: str,
level: int = logging.INFO,
log_file: Optional[str] = None,
format_str: Optional[str] = None
) -> logging.Logger:
"""구조화된 로거 설정"""
if format_str is None:
format_str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
formatter = logging.Formatter(format_str, datefmt='%Y-%m-%d %H:%M:%S')
logger = logging.getLogger(name)
logger.setLevel(level)
# 핸들러 중복 추가 방지
if not logger.handlers:
# 콘솔 핸들러
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# 파일 핸들러 (선택적)
if log_file:
Path(log_file).parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
# ML 학습 로거 예시
class TrainingLogger:
"""학습 진행 전용 로거"""
def __init__(self, experiment_name: str, log_dir: str = "logs"):
self.logger = setup_logger(
f"training.{experiment_name}",
log_file=f"{log_dir}/{experiment_name}.log"
)
self.step = 0
def log_metrics(self, metrics: dict, phase: str = "train"):
metrics_str = ", ".join(f"{k}={v:.4f}" for k, v in metrics.items())
self.logger.info(f"[{phase}] step={self.step} {metrics_str}")
self.step += 1
def log_epoch(self, epoch: int, train_loss: float, val_loss: float):
self.logger.info(
f"Epoch {epoch}: train_loss={train_loss:.4f}, val_loss={val_loss:.4f}"
)
def log_hyperparams(self, params: dict):
self.logger.info(f"하이퍼파라미터: {params}")
# 사용 예시
logger = TrainingLogger("bert-finetuning-v1")
logger.log_hyperparams({
"lr": 2e-5,
"batch_size": 32,
"epochs": 10
})
for epoch in range(3):
for step in range(10):
logger.log_metrics(
{"loss": 2.0 - epoch * 0.3 - step * 0.01, "acc": 0.5 + epoch * 0.1},
phase="train"
)
logger.log_epoch(epoch, 1.5 - epoch * 0.3, 1.8 - epoch * 0.25)
결론
이 가이드에서 다룬 주요 기법들을 실제 AI/ML 프로젝트에 적용할 때는 다음 우선순위를 권장합니다.
-
먼저 프로파일링: 코드를 최적화하기 전에 반드시 cProfile과 line_profiler로 병목을 찾습니다.
-
NumPy 벡터화 우선: 대부분의 경우 벡터화만으로도 10-100배 성능 향상이 가능합니다.
-
Numba로 핫 코드 최적화: 순수 Python 루프가 필요한 곳에 Numba JIT을 적용합니다.
-
멀티프로세싱으로 CPU 활용: 데이터 전처리, 교차 검증 등 독립적인 작업은 ProcessPoolExecutor를 활용합니다.
-
asyncio로 I/O 최적화: API 호출, 파일 I/O가 많은 파이프라인에서 asyncio로 10배 이상 속도를 높입니다.
-
타입 힌팅과 테스팅: 코드 품질을 위해 mypy, pytest, pydantic을 조합하여 견고한 ML 코드베이스를 유지합니다.
참고 자료
Python Advanced Techniques for AI/ML: Performance Optimization, Parallel Processing, Memory Management
Overview
Python is the lingua franca of the AI/ML ecosystem. However, vanilla Python code has inherent performance limitations, and bottlenecks emerge in large-scale data processing and model training pipelines. This guide covers advanced Python techniques for building production-grade AI/ML systems.
You will learn how to write code that is hundreds of times faster than pure Python loops, leverage GPUs, and manage memory efficiently — all demonstrated with hands-on examples.
1. Python Performance Profiling
Always profile before optimizing. "If you can't measure it, you can't improve it" is the golden rule.
1.1 cProfile and pstats
cProfile, included in the Python standard library, is the foundational tool for function-level profiling.
import cProfile
import pstats
import io
import numpy as np
def slow_matrix_multiply(a, b):
"""Inefficient matrix multiplication"""
rows_a = len(a)
cols_a = len(a[0])
cols_b = len(b[0])
result = [[0] * cols_b for _ in range(rows_a)]
for i in range(rows_a):
for j in range(cols_b):
for k in range(cols_a):
result[i][j] += a[i][k] * b[k][j]
return result
def profile_function():
size = 100
a = [[float(i + j) for j in range(size)] for i in range(size)]
b = [[float(i * j + 1) for j in range(size)] for i in range(size)]
slow_matrix_multiply(a, b)
# Run profiling
pr = cProfile.Profile()
pr.enable()
profile_function()
pr.disable()
# Analyze results
stream = io.StringIO()
ps = pstats.Stats(pr, stream=stream)
ps.sort_stats('cumulative')
ps.print_stats(20)
print(stream.getvalue())
You can also run it directly from the command line.
python -m cProfile -s cumulative your_script.py
python -m cProfile -o output.prof your_script.py
Analyzing a saved .prof file:
import pstats
p = pstats.Stats('output.prof')
p.sort_stats('cumulative')
p.print_stats(10) # Top 10 functions
p.print_callers('slow_function') # Who called this function
p.print_callees('main') # What this function calls
1.2 line_profiler (Line-by-line Analysis)
Measures performance at the line level rather than the function level.
pip install line_profiler
from line_profiler import LineProfiler
import numpy as np
def process_batch(data):
result = []
for item in data:
normalized = (item - item.mean()) / (item.std() + 1e-8)
scaled = normalized * 2.0
clipped = np.clip(scaled, -5, 5)
result.append(clipped)
return np.stack(result)
# Set up profiler
lp = LineProfiler()
lp_wrapper = lp(process_batch)
# Run with sample data
data = [np.random.randn(1000) for _ in range(1000)]
lp_wrapper(data)
# Print results
lp.print_stats()
In Jupyter Notebook you can use magic commands:
%load_ext line_profiler
%lprun -f process_batch process_batch(data)
1.3 memory_profiler
Tracks memory usage line by line.
pip install memory_profiler
from memory_profiler import profile
import numpy as np
@profile
def memory_intensive_function(n=1_000_000):
list_data = list(range(n)) # Create list
arr = np.array(list_data) # Convert to NumPy array
del list_data # Delete list
squared = arr ** 2 # Create new array
filtered = squared[squared > 100] # Filter
return filtered.sum()
result = memory_intensive_function()
print(f"Result: {result}")
python -m memory_profiler your_script.py
mprof run your_script.py
mprof plot # Visualize
1.4 py-spy (Sampling Profiler)
A powerful tool for profiling a running Python process without modifying its code.
pip install py-spy
# Profile a running process
py-spy top --pid 12345
# Generate a flame graph
py-spy record -o profile.svg --pid 12345
# Profile a script directly
py-spy record -o profile.svg -- python your_script.py
1.5 SnakeViz Visualization
pip install snakeviz
python -m cProfile -o output.prof your_script.py
snakeviz output.prof
2. NumPy Vectorization Deep Dive
NumPy vectorization is the cornerstone of Python AI/ML performance optimization.
2.1 for loop vs. Vectorized Performance Comparison
import numpy as np
import time
def benchmark(func, *args, runs=5):
times = []
for _ in range(runs):
start = time.perf_counter()
result = func(*args)
end = time.perf_counter()
times.append(end - start)
return np.mean(times), np.std(times), result
# 1. Python for loop
def relu_loop(x):
result = []
for val in x:
result.append(max(0, val))
return result
# 2. NumPy vectorized
def relu_numpy(x):
return np.maximum(0, x)
# 3. NumPy clip
def relu_clip(x):
return np.clip(x, 0, None)
n = 1_000_000
data_list = [np.random.randn() for _ in range(n)]
data_np = np.array(data_list)
t_loop, _, _ = benchmark(relu_loop, data_list)
t_numpy, _, _ = benchmark(relu_numpy, data_np)
t_clip, _, _ = benchmark(relu_clip, data_np)
print(f"Python loop: {t_loop:.4f}s")
print(f"NumPy maximum: {t_numpy:.4f}s ({t_loop/t_numpy:.0f}x faster)")
print(f"NumPy clip: {t_clip:.4f}s ({t_loop/t_clip:.0f}x faster)")
2.2 Universal Functions (ufuncs)
import numpy as np
def custom_activation_scalar(x, alpha=0.1):
"""ELU activation function"""
if x >= 0:
return x
else:
return alpha * (np.exp(x) - 1)
# Create ufunc with vectorize
elu_ufunc = np.vectorize(custom_activation_scalar)
# Full NumPy ufunc (faster)
def elu_numpy(x, alpha=0.1):
return np.where(x >= 0, x, alpha * (np.exp(x) - 1))
x = np.random.randn(1_000_000)
t_vectorize, _, _ = benchmark(elu_ufunc, x)
t_numpy, _, _ = benchmark(elu_numpy, x)
print(f"np.vectorize: {t_vectorize:.4f}s")
print(f"np.where: {t_numpy:.4f}s")
2.3 np.einsum (Einstein Summation)
import numpy as np
batch_size, seq_len, d_model = 32, 128, 512
heads, d_head = 8, 64
A = np.random.randn(batch_size, seq_len, d_model)
W = np.random.randn(d_model, d_model)
# Standard matmul
result_matmul = A @ W # (32, 128, 512)
# Same operation with einsum
result_einsum = np.einsum('bsi,ij->bsj', A, W)
# Attention scores (batched matmul)
Q = np.random.randn(batch_size, heads, seq_len, d_head)
K = np.random.randn(batch_size, heads, seq_len, d_head)
scores_manual = Q @ K.transpose(0, 1, 3, 2) # (32, 8, 128, 128)
scores_einsum = np.einsum('bhid,bhjd->bhij', Q, K)
# Batch outer product
a = np.random.randn(batch_size, d_model)
b = np.random.randn(batch_size, d_model)
outer = np.einsum('bi,bj->bij', a, b) # (32, 512, 512)
# Trace (sum of diagonal)
M = np.random.randn(batch_size, d_model, d_model)
trace = np.einsum('bii->b', M) # (32,)
print(f"Attention scores shape: {scores_einsum.shape}")
print(f"Outer product shape: {outer.shape}")
print(f"Trace shape: {trace.shape}")
# Use optimize parameter for the best contraction path
result = np.einsum('bhid,bhjd->bhij', Q, K, optimize='optimal')
2.4 Stride Tricks
import numpy as np
from numpy.lib.stride_tricks import as_strided, sliding_window_view
arr = np.arange(20, dtype=float)
window_size = 5
# Modern approach (NumPy 1.20+)
windows = sliding_window_view(arr, window_size)
print(f"Sliding window shape: {windows.shape}") # (16, 5)
# Moving average
moving_avg = windows.mean(axis=-1)
print(f"Moving average: {moving_avg[:5]}")
# 2D sliding window (for convolution prep)
image = np.random.randn(64, 64)
kernel_size = (3, 3)
windows_2d = sliding_window_view(image, kernel_size)
print(f"2D window shape: {windows_2d.shape}") # (62, 62, 3, 3)
# Non-overlapping chunks via stride tricks
batch = np.random.randn(1000, 10)
chunk_size = 50
n_chunks = len(batch) // chunk_size
chunks = batch[:n_chunks * chunk_size].reshape(n_chunks, chunk_size, -1)
print(f"Chunks shape: {chunks.shape}") # (20, 50, 10)
3. Numba JIT Compilation
Numba compiles Python functions to machine code, delivering C/C++-level performance.
3.1 @jit, @njit Decorators
from numba import jit, njit, prange
import numpy as np
import time
# @jit: fallback mode (allows Python objects)
@jit(nopython=False)
def sum_with_jit(arr):
total = 0.0
for i in range(len(arr)):
total += arr[i]
return total
# @njit: pure native mode (faster, recommended)
@njit
def sum_with_njit(arr):
total = 0.0
for i in range(len(arr)):
total += arr[i]
return total
@njit
def compute_distances(points):
"""Compute Euclidean distance matrix between points"""
n = len(points)
distances = np.zeros((n, n))
for i in range(n):
for j in range(i + 1, n):
diff = points[i] - points[j]
dist = np.sqrt(np.sum(diff ** 2))
distances[i, j] = dist
distances[j, i] = dist
return distances
arr = np.random.randn(1_000_000)
# First call includes compilation time
_ = sum_with_njit(arr) # Warm-up
start = time.perf_counter()
result_numpy = np.sum(arr)
print(f"NumPy sum: {time.perf_counter() - start:.4f}s")
start = time.perf_counter()
result_njit = sum_with_njit(arr)
print(f"Numba njit: {time.perf_counter() - start:.4f}s")
# Point cloud distance calculation
points = np.random.randn(500, 3).astype(np.float64)
_ = compute_distances(points) # Warm-up
start = time.perf_counter()
dist_matrix = compute_distances(points)
print(f"Numba distance matrix ({points.shape[0]}x{points.shape[0]}): {time.perf_counter() - start:.4f}s")
3.2 Type Inference and Explicit Type Declarations
from numba import njit, float64, int64
import numpy as np
# Explicit type avoids first-call delay
@njit(float64[:](float64[:], float64))
def scale_array_typed(arr, factor):
return arr * factor
# Multiple type signatures
from numba import float32, float64
@njit([
float32[:](float32[:], float32),
float64[:](float64[:], float64)
])
def scale_multi_type(arr, factor):
return arr * factor
@njit
def softmax_numba(x):
"""Numba-accelerated softmax"""
max_val = np.max(x)
exp_x = np.exp(x - max_val)
return exp_x / np.sum(exp_x)
@njit
def batch_softmax(X):
"""Batch softmax for 2D array"""
result = np.empty_like(X)
for i in range(X.shape[0]):
result[i] = softmax_numba(X[i])
return result
X = np.random.randn(1000, 512).astype(np.float64)
_ = batch_softmax(X) # Warm-up
start = time.perf_counter()
result = batch_softmax(X)
print(f"Numba batch softmax: {time.perf_counter() - start:.4f}s")
3.3 Parallel Processing (@prange)
from numba import njit, prange
import numpy as np
@njit(parallel=True)
def parallel_normalize(X):
"""Row-wise normalization with automatic parallelism"""
result = np.empty_like(X)
for i in prange(X.shape[0]): # Auto-parallelized
row = X[i]
mean = row.mean()
std = row.std()
result[i] = (row - mean) / (std + 1e-8)
return result
@njit(parallel=True)
def parallel_batch_process(data, weights):
"""Weighted sum over batch in parallel"""
n_samples, n_features = data.shape
results = np.zeros(n_samples)
for i in prange(n_samples):
dot = 0.0
for j in range(n_features):
dot += data[i, j] * weights[j]
results[i] = dot
return results
X = np.random.randn(10000, 512).astype(np.float64)
_ = parallel_normalize(X) # Warm-up
start = time.perf_counter()
result_parallel = parallel_normalize(X)
t_parallel = time.perf_counter() - start
@njit
def sequential_normalize(X):
result = np.empty_like(X)
for i in range(X.shape[0]):
row = X[i]
mean = row.mean()
std = row.std()
result[i] = (row - mean) / (std + 1e-8)
return result
_ = sequential_normalize(X)
start = time.perf_counter()
result_seq = sequential_normalize(X)
t_seq = time.perf_counter() - start
print(f"Sequential: {t_seq:.4f}s")
print(f"Parallel: {t_parallel:.4f}s ({t_seq/t_parallel:.1f}x faster)")
3.4 Numba Caching
from numba import njit
import numpy as np
# cache=True saves compiled code to disk
@njit(cache=True)
def cached_computation(x, y):
"""Cached JIT function — loads instantly on next run"""
result = np.empty(len(x))
for i in range(len(x)):
result[i] = x[i] ** 2 + y[i] ** 2
return result
arr1 = np.random.randn(1000)
arr2 = np.random.randn(1000)
result = cached_computation(arr1, arr2)
4. Cython
Cython transpiles Python code to C for maximum performance.
4.1 Writing .pyx Files
# setup.py — Cython build configuration
from setuptools import setup
from Cython.Build import cythonize
import numpy as np
setup(
ext_modules=cythonize(
"fast_ops.pyx",
compiler_directives={
'language_level': '3',
'boundscheck': False,
'wraparound': False,
'nonecheck': False,
}
),
include_dirs=[np.get_include()]
)
fast_ops.pyx example:
# fast_ops.pyx
# cython: language_level=3
# cython: boundscheck=False
# cython: wraparound=False
import numpy as np
cimport numpy as cnp
from libc.math cimport exp, sqrt
def relu_cython(cnp.ndarray[cnp.double_t, ndim=1] x):
cdef int n = len(x)
cdef cnp.ndarray[cnp.double_t, ndim=1] result = np.empty(n)
cdef int i
cdef double val
for i in range(n):
val = x[i]
result[i] = val if val > 0 else 0.0
return result
def softmax_cython(cnp.ndarray[cnp.double_t, ndim=1] x):
cdef int n = len(x)
cdef cnp.ndarray[cnp.double_t, ndim=1] result = np.empty(n)
cdef double max_val = x[0]
cdef double sum_exp = 0.0
cdef int i
for i in range(n):
if x[i] > max_val:
max_val = x[i]
for i in range(n):
result[i] = exp(x[i] - max_val)
sum_exp += result[i]
for i in range(n):
result[i] /= sum_exp
return result
Build and use:
python setup.py build_ext --inplace
import numpy as np
import fast_ops # Compiled Cython module
x = np.random.randn(1_000_000)
result = fast_ops.relu_cython(x)
4.2 IPython Cython Magic
%load_ext Cython
%%cython -a
# -a: annotation mode (yellow = Python calls, white = C)
import numpy as np
cimport numpy as cnp
def fast_l2_norm(cnp.ndarray[cnp.double_t, ndim=1] v):
cdef int n = len(v)
cdef double total = 0.0
cdef int i
for i in range(n):
total += v[i] * v[i]
return total ** 0.5
5. Python Multiprocessing
5.1 Understanding the GIL
The GIL (Global Interpreter Lock) allows only one thread to execute Python bytecode at a time. For CPU-bound tasks, multithreading provides no real parallelism — use multiprocessing instead.
import threading
import multiprocessing
import time
import numpy as np
def cpu_bound_task(n):
total = 0
for i in range(n):
total += i ** 2
return total
def run_threaded(n_tasks=4, n_per_task=1_000_000):
threads = []
start = time.perf_counter()
for _ in range(n_tasks):
t = threading.Thread(target=cpu_bound_task, args=(n_per_task,))
threads.append(t)
t.start()
for t in threads:
t.join()
return time.perf_counter() - start
def run_multiprocessing(n_tasks=4, n_per_task=1_000_000):
start = time.perf_counter()
with multiprocessing.Pool(processes=n_tasks) as pool:
results = pool.map(cpu_bound_task, [n_per_task] * n_tasks)
return time.perf_counter() - start
t_threaded = run_threaded()
t_mp = run_multiprocessing()
print(f"Threading: {t_threaded:.2f}s")
print(f"Multiprocessing: {t_mp:.2f}s ({t_threaded/t_mp:.1f}x faster)")
5.2 multiprocessing.Pool
import multiprocessing
import numpy as np
def preprocess_sample(args):
idx, data, mean, std = args
normalized = (data - mean) / (std + 1e-8)
augmented = normalized + np.random.randn(*normalized.shape) * 0.01
return idx, augmented
def preprocess_dataset_parallel(dataset, n_workers=None):
if n_workers is None:
n_workers = multiprocessing.cpu_count()
mean = dataset.mean(axis=0)
std = dataset.std(axis=0)
args = [(i, dataset[i], mean, std) for i in range(len(dataset))]
with multiprocessing.Pool(processes=n_workers) as pool:
results = pool.map(preprocess_sample, args, chunksize=100)
results.sort(key=lambda x: x[0])
return np.stack([r[1] for r in results])
dataset = np.random.randn(10000, 128)
processed = preprocess_dataset_parallel(dataset, n_workers=4)
print(f"Processed dataset shape: {processed.shape}")
5.3 ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor, as_completed
import numpy as np
def train_fold(fold_data):
fold_idx, X_train, y_train, X_val, y_val = fold_data
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
model = LogisticRegression(max_iter=1000)
model.fit(X_train, y_train)
val_pred = model.predict(X_val)
accuracy = accuracy_score(y_val, val_pred)
return fold_idx, accuracy, model
def parallel_cross_validation(X, y, n_folds=5):
from sklearn.model_selection import KFold
kf = KFold(n_splits=n_folds, shuffle=True, random_state=42)
fold_data_list = []
for fold_idx, (train_idx, val_idx) in enumerate(kf.split(X)):
fold_data_list.append((
fold_idx,
X[train_idx], y[train_idx],
X[val_idx], y[val_idx]
))
results = {}
with ProcessPoolExecutor(max_workers=n_folds) as executor:
future_to_fold = {
executor.submit(train_fold, fd): fd[0]
for fd in fold_data_list
}
for future in as_completed(future_to_fold):
fold_idx, accuracy, model = future.result()
results[fold_idx] = {'accuracy': accuracy, 'model': model}
print(f"Fold {fold_idx}: accuracy = {accuracy:.4f}")
avg_accuracy = np.mean([r['accuracy'] for r in results.values()])
print(f"\nMean accuracy: {avg_accuracy:.4f}")
return results
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=5000, n_features=20, random_state=42)
results = parallel_cross_validation(X, y, n_folds=5)
5.4 Shared Memory
from multiprocessing import shared_memory
import numpy as np
import multiprocessing
def worker_shared_memory(shm_name, shape, dtype, start_idx, end_idx):
existing_shm = shared_memory.SharedMemory(name=shm_name)
arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
arr[start_idx:end_idx] = arr[start_idx:end_idx] ** 2
existing_shm.close()
def process_with_shared_memory(data):
shm = shared_memory.SharedMemory(create=True, size=data.nbytes)
shared_array = np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf)
shared_array[:] = data[:]
n_workers = multiprocessing.cpu_count()
chunk_size = len(data) // n_workers
processes = []
for i in range(n_workers):
start = i * chunk_size
end = start + chunk_size if i < n_workers - 1 else len(data)
p = multiprocessing.Process(
target=worker_shared_memory,
args=(shm.name, data.shape, data.dtype, start, end)
)
processes.append(p)
p.start()
for p in processes:
p.join()
result = shared_array.copy()
shm.close()
shm.unlink()
return result
data = np.random.randn(1_000_000).astype(np.float64)
result = process_with_shared_memory(data)
print(f"Shared memory processing complete: {result[:5]}")
6. Asynchronous Programming (asyncio)
6.1 async/await Pattern
import asyncio
import time
async def fetch_embeddings(text: str, model: str = "text-embedding-3-small") -> list:
"""Simulate async embedding request"""
await asyncio.sleep(0.1)
return [0.1, 0.2, 0.3]
async def sequential_processing(texts: list) -> list:
results = []
for text in texts:
embedding = await fetch_embeddings(text)
results.append(embedding)
return results
async def parallel_processing(texts: list) -> list:
tasks = [fetch_embeddings(text) for text in texts]
results = await asyncio.gather(*tasks)
return list(results)
async def compare_approaches():
texts = [f"sample text {i}" for i in range(20)]
start = time.perf_counter()
sequential_results = await sequential_processing(texts)
t_seq = time.perf_counter() - start
start = time.perf_counter()
parallel_results = await parallel_processing(texts)
t_par = time.perf_counter() - start
print(f"Sequential: {t_seq:.2f}s ({len(sequential_results)} results)")
print(f"Parallel: {t_par:.2f}s ({len(parallel_results)} results)")
print(f"Speedup: {t_seq/t_par:.1f}x")
asyncio.run(compare_approaches())
6.2 Async HTTP with aiohttp
import asyncio
import aiohttp
import time
from typing import Optional
class AsyncAIClient:
"""Async AI API client with rate limiting"""
def __init__(self, api_key: str, base_url: str, max_concurrent: int = 10):
self.api_key = api_key
self.base_url = base_url
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
connector = aiohttp.TCPConnector(limit=100)
self.session = aiohttp.ClientSession(
connector=connector,
headers={"Authorization": f"Bearer {self.api_key}"}
)
return self
async def __aexit__(self, *args):
if self.session:
await self.session.close()
async def get_embedding(self, text: str, retry: int = 3) -> list:
"""Embedding request with exponential backoff retry"""
async with self.semaphore:
for attempt in range(retry):
try:
async with self.session.post(
f"{self.base_url}/embeddings",
json={"input": text, "model": "text-embedding-3-small"}
) as response:
if response.status == 200:
data = await response.json()
return data["data"][0]["embedding"]
elif response.status == 429:
# Rate limit — exponential backoff
await asyncio.sleep(2 ** attempt)
else:
response.raise_for_status()
except aiohttp.ClientError as e:
if attempt == retry - 1:
raise
await asyncio.sleep(1)
return []
async def batch_embeddings(self, texts: list, batch_size: int = 100) -> list:
"""Process embeddings in batches"""
all_results = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
tasks = [self.get_embedding(text) for text in batch]
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
for j, result in enumerate(batch_results):
if isinstance(result, Exception):
print(f"Error for text {i+j}: {result}")
all_results.append(None)
else:
all_results.append(result)
return all_results
6.3 Dynamic Batch Processor for AI APIs
import asyncio
from dataclasses import dataclass, field
from typing import Any
import time
@dataclass
class BatchProcessor:
"""Dynamic request batcher"""
max_batch_size: int = 20
max_wait_time: float = 0.05 # 50ms
queue: asyncio.Queue = field(default_factory=asyncio.Queue)
async def process_request(self, request_id: str, data: Any) -> Any:
future = asyncio.get_event_loop().create_future()
await self.queue.put((request_id, data, future))
return await future
async def batch_worker(self):
while True:
batch = []
deadline = time.perf_counter() + self.max_wait_time
while len(batch) < self.max_batch_size:
timeout = deadline - time.perf_counter()
if timeout <= 0:
break
try:
item = await asyncio.wait_for(self.queue.get(), timeout=timeout)
batch.append(item)
except asyncio.TimeoutError:
break
if not batch:
await asyncio.sleep(0.001)
continue
request_ids, data_list, futures = zip(*batch)
try:
results = await self._call_api_batch(list(data_list))
for future, result in zip(futures, results):
future.set_result(result)
except Exception as e:
for future in futures:
future.set_exception(e)
async def _call_api_batch(self, data_list: list) -> list:
await asyncio.sleep(0.1) # Simulate API latency
return [f"result_{d}" for d in data_list]
async def test_batch_processor():
processor = BatchProcessor(max_batch_size=10, max_wait_time=0.05)
worker_task = asyncio.create_task(processor.batch_worker())
async def make_request(i):
return await processor.process_request(f"req_{i}", f"data_{i}")
start = time.perf_counter()
tasks = [make_request(i) for i in range(50)]
results = await asyncio.gather(*tasks)
elapsed = time.perf_counter() - start
print(f"50 requests processed: {elapsed:.3f}s")
print(f"First 5 results: {results[:5]}")
worker_task.cancel()
asyncio.run(test_batch_processor())
7. Memory Management
7.1 Python Memory Model
import sys
import gc
data_types = {
'int': 42,
'float': 3.14,
'str (small)': "hello",
'str (large)': "hello" * 1000,
'list (empty)': [],
'dict (empty)': {},
'tuple (empty)': (),
}
print("Object sizes:")
for name, obj in data_types.items():
print(f" {name}: {sys.getsizeof(obj)} bytes")
def deep_size(obj, seen=None):
"""Total memory of an object and all its referenced objects"""
if seen is None:
seen = set()
obj_id = id(obj)
if obj_id in seen:
return 0
seen.add(obj_id)
size = sys.getsizeof(obj)
if isinstance(obj, dict):
size += sum(deep_size(k, seen) + deep_size(v, seen)
for k, v in obj.items())
elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes)):
size += sum(deep_size(item, seen) for item in obj)
return size
nested = {'data': [i for i in range(1000)], 'meta': {'name': 'test'}}
print(f"\nNested dict size: {deep_size(nested):,} bytes")
7.2 Generators for Memory Efficiency
import numpy as np
from typing import Generator, Iterator
def dataset_generator(
file_paths: list,
batch_size: int = 32
) -> Generator:
"""Memory-efficient data loader"""
for path in file_paths:
data = np.random.randn(1000, 128)
labels = np.random.randint(0, 10, 1000)
for start in range(0, len(data), batch_size):
end = min(start + batch_size, len(data))
yield data[start:end], labels[start:end]
def augment_generator(gen: Iterator, noise_std: float = 0.01):
"""Chained augmentation generator"""
for X, y in gen:
X_aug = X + np.random.randn(*X.shape) * noise_std
yield X_aug, y
def normalize_generator(gen: Iterator):
"""Normalization generator"""
for X, y in gen:
X_norm = (X - X.mean(axis=1, keepdims=True)) / (X.std(axis=1, keepdims=True) + 1e-8)
yield X_norm, y
# Compose the pipeline
fake_paths = [f"data_{i}.npy" for i in range(5)]
base_gen = dataset_generator(fake_paths, batch_size=32)
aug_gen = augment_generator(base_gen)
norm_gen = normalize_generator(aug_gen)
for i, (X_batch, y_batch) in enumerate(norm_gen):
if i >= 3:
break
print(f"Batch {i}: shape={X_batch.shape}, mean={X_batch.mean():.4f}")
7.3 __slots__ Optimization
import sys
class RegularEmbedding:
def __init__(self, vector, label, metadata):
self.vector = vector
self.label = label
self.metadata = metadata
class SlottedEmbedding:
__slots__ = ['vector', 'label', 'metadata']
def __init__(self, vector, label, metadata):
self.vector = vector
self.label = label
self.metadata = metadata
import numpy as np
n = 100_000
vector_size = 128
regular_objects = [
RegularEmbedding(np.random.randn(vector_size), i % 10, {'source': 'test'})
for i in range(n)
]
slotted_objects = [
SlottedEmbedding(np.random.randn(vector_size), i % 10, {'source': 'test'})
for i in range(n)
]
regular_size = sys.getsizeof(regular_objects[0]) + sys.getsizeof(regular_objects[0].__dict__)
slotted_size = sys.getsizeof(slotted_objects[0])
print(f"Regular class instance: {regular_size} bytes")
print(f"__slots__ class instance: {slotted_size} bytes")
print(f"Memory saved: {(regular_size - slotted_size) / regular_size * 100:.1f}%")
7.4 weakref
import weakref
import gc
class ModelCache:
"""Model cache using weak references"""
def __init__(self):
self._cache = weakref.WeakValueDictionary()
def get_or_load(self, model_name: str):
model = self._cache.get(model_name)
if model is None:
print(f"Loading model: {model_name}")
model = self._load_model(model_name)
self._cache[model_name] = model
else:
print(f"Returning from cache: {model_name}")
return model
def _load_model(self, model_name: str):
import numpy as np
return {'name': model_name, 'weights': np.random.randn(100, 100)}
cache = ModelCache()
model = cache.get_or_load('resnet50')
print(f"Cache size: {len(cache._cache)}")
model_again = cache.get_or_load('resnet50')
del model
del model_again
gc.collect()
print(f"Cache size after GC: {len(cache._cache)}")
new_model = cache.get_or_load('resnet50')
8. Python Type Hints
8.1 Static Type Checking with mypy
pip install mypy
mypy your_script.py --strict
from typing import Optional, Union, List, Dict, Tuple, Callable
import numpy as np
from numpy.typing import NDArray
def compute_loss(
predictions: NDArray[np.float32],
targets: NDArray[np.float32],
reduction: str = 'mean'
) -> float:
diff = predictions - targets
loss = np.sum(diff ** 2)
if reduction == 'mean':
return float(loss / len(predictions))
return float(loss)
def load_checkpoint(
path: str,
device: Optional[str] = None,
strict: bool = True
) -> Optional[Dict[str, NDArray]]:
try:
return {'weights': np.random.randn(100)}
except FileNotFoundError:
return None
def split_dataset(
X: NDArray,
y: NDArray,
test_size: float = 0.2,
random_state: Optional[int] = None
) -> Tuple[NDArray, NDArray, NDArray, NDArray]:
if random_state is not None:
np.random.seed(random_state)
n = len(X)
n_test = int(n * test_size)
indices = np.random.permutation(n)
test_idx = indices[:n_test]
train_idx = indices[n_test:]
return X[train_idx], X[test_idx], y[train_idx], y[test_idx]
8.2 Generics and Protocol
from typing import TypeVar, Generic, Protocol, runtime_checkable, List, Dict
import numpy as np
T = TypeVar('T')
class DataLoader(Generic[T]):
def __init__(self, dataset: List[T], batch_size: int = 32):
self.dataset = dataset
self.batch_size = batch_size
self._idx = 0
def __iter__(self):
self._idx = 0
return self
def __next__(self) -> List[T]:
if self._idx >= len(self.dataset):
raise StopIteration
batch = self.dataset[self._idx:self._idx + self.batch_size]
self._idx += self.batch_size
return batch
@runtime_checkable
class Optimizer(Protocol):
def step(self) -> None: ...
def zero_grad(self) -> None: ...
def state_dict(self) -> Dict: ...
@runtime_checkable
class Model(Protocol):
def forward(self, x: 'NDArray') -> 'NDArray': ...
def parameters(self) -> List['NDArray']: ...
def train(self) -> None: ...
def eval(self) -> None: ...
8.3 dataclass and TypedDict
from dataclasses import dataclass, field
from typing import TypedDict, List, Dict, Optional
class ModelConfig(TypedDict):
hidden_size: int
num_layers: int
dropout: float
activation: str
@dataclass
class ExperimentConfig:
experiment_name: str
model_type: str
learning_rate: float = 1e-3
batch_size: int = 32
max_epochs: int = 100
seed: int = 42
tags: List[str] = field(default_factory=list)
hyperparams: Dict[str, float] = field(default_factory=dict)
def __post_init__(self):
if self.learning_rate <= 0:
raise ValueError(f"Learning rate must be positive: {self.learning_rate}")
if not self.experiment_name:
raise ValueError("Experiment name is required")
def to_dict(self) -> Dict:
from dataclasses import asdict
return asdict(self)
config = ExperimentConfig(
experiment_name="bert-finetuning-v1",
model_type="bert-base",
learning_rate=2e-5,
batch_size=16,
tags=["nlp", "classification"]
)
print(f"Experiment: {config.experiment_name}")
print(f"Config: {config.to_dict()}")
8.4 Data Validation with pydantic
from pydantic import BaseModel, Field, validator, root_validator
from typing import Optional, List
class DataConfig(BaseModel):
data_path: str
batch_size: int = Field(ge=1, le=1024, default=32)
shuffle: bool = True
num_workers: int = Field(ge=0, le=16, default=4)
@validator('data_path')
def path_must_exist(cls, v):
import os
if not os.path.exists(v) and v != "test_path":
raise ValueError(f"Data path does not exist: {v}")
return v
class ModelConfig(BaseModel):
architecture: str
hidden_sizes: List[int] = Field(min_items=1)
dropout_rate: float = Field(ge=0.0, le=0.9, default=0.1)
activation: str = "relu"
@validator('activation')
def valid_activation(cls, v):
valid = ['relu', 'gelu', 'silu', 'tanh', 'sigmoid']
if v not in valid:
raise ValueError(f"Activation must be one of {valid}")
return v
class TrainingConfig(BaseModel):
data: DataConfig
model: ModelConfig
learning_rate: float = Field(gt=0, le=1.0, default=1e-3)
weight_decay: float = Field(ge=0, default=1e-4)
max_epochs: int = Field(ge=1, le=10000, default=100)
@root_validator
def check_consistency(cls, values):
model = values.get('model')
if model and model.dropout_rate > 0.5:
print("Warning: High dropout rate may cause underfitting")
return values
try:
config = TrainingConfig(
data=DataConfig(data_path="test_path", batch_size=64),
model=ModelConfig(
architecture="transformer",
hidden_sizes=[512, 256, 128],
dropout_rate=0.1
),
learning_rate=2e-5
)
print(f"Validation passed: {config.model.architecture}")
print(f"JSON: {config.json(indent=2)}")
except Exception as e:
print(f"Validation error: {e}")
9. Package Management and Dependencies
9.1 Poetry
# Install Poetry
curl -sSL https://install.python-poetry.org | python3 -
poetry new my-ml-project
cd my-ml-project
# Add dependencies
poetry add numpy torch torchvision
poetry add --group dev pytest black ruff mypy
# Activate virtual environment
poetry shell
# Install all dependencies
poetry install
# Export to requirements.txt
poetry export -f requirements.txt --output requirements.txt
9.2 pyproject.toml
[tool.poetry]
name = "my-ml-project"
version = "0.1.0"
description = "AI/ML project"
authors = ["Your Name <email@example.com>"]
license = "MIT"
readme = "README.md"
packages = [{include = "my_ml_project"}]
[tool.poetry.dependencies]
python = "^3.10"
numpy = "^1.24"
torch = "^2.1"
torchvision = "^0.16"
transformers = "^4.35"
pydantic = "^2.0"
aiohttp = "^3.9"
[tool.poetry.group.dev.dependencies]
pytest = "^7.4"
pytest-cov = "^4.1"
black = "^23.0"
ruff = "^0.1"
mypy = "^1.6"
pre-commit = "^3.5"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
[tool.black]
line-length = 88
target-version = ['py310', 'py311']
[tool.ruff]
select = ["E", "F", "I", "N", "W"]
ignore = ["E501"]
line-length = 88
[tool.mypy]
python_version = "3.10"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "-v --cov=my_ml_project --cov-report=html"
9.3 uv (Fast Package Manager)
# Install uv (Rust-based, extremely fast)
curl -LsSf https://astral.sh/uv/install.sh | sh
# Initialize project
uv init my-project
cd my-project
# Add dependencies (10-100x faster than pip)
uv add numpy torch transformers
uv add --dev pytest ruff mypy
# Sync environment
uv sync
# Manage Python versions
uv python install 3.11
uv python pin 3.11
# Run scripts
uv run python train.py
10. Clean Code for ML
10.1 Design Patterns (Factory, Strategy)
from abc import ABC, abstractmethod
from typing import Dict, Type
import numpy as np
# Strategy Pattern — Loss Functions
class LossFunction(ABC):
@abstractmethod
def __call__(self, predictions: np.ndarray, targets: np.ndarray) -> float:
pass
@abstractmethod
def gradient(self, predictions: np.ndarray, targets: np.ndarray) -> np.ndarray:
pass
class MSELoss(LossFunction):
def __call__(self, predictions, targets):
return float(np.mean((predictions - targets) ** 2))
def gradient(self, predictions, targets):
return 2 * (predictions - targets) / len(predictions)
class HuberLoss(LossFunction):
def __init__(self, delta: float = 1.0):
self.delta = delta
def __call__(self, predictions, targets):
diff = predictions - targets
is_small = np.abs(diff) <= self.delta
loss = np.where(is_small, 0.5 * diff**2,
self.delta * np.abs(diff) - 0.5 * self.delta**2)
return float(np.mean(loss))
def gradient(self, predictions, targets):
diff = predictions - targets
return np.where(np.abs(diff) <= self.delta, diff,
self.delta * np.sign(diff))
# Factory Pattern — Model Creation
class ModelFactory:
_registry: Dict[str, Type] = {}
@classmethod
def register(cls, name: str):
def decorator(model_class):
cls._registry[name] = model_class
return model_class
return decorator
@classmethod
def create(cls, name: str, **kwargs):
if name not in cls._registry:
available = list(cls._registry.keys())
raise ValueError(f"Unknown model: {name}. Available: {available}")
return cls._registry[name](**kwargs)
@ModelFactory.register("linear")
class LinearModel:
def __init__(self, input_size: int, output_size: int):
self.W = np.random.randn(input_size, output_size) * 0.01
self.b = np.zeros(output_size)
def forward(self, X: np.ndarray) -> np.ndarray:
return X @ self.W + self.b
@ModelFactory.register("mlp")
class MLPModel:
def __init__(self, layer_sizes: list):
self.layers = []
for i in range(len(layer_sizes) - 1):
W = np.random.randn(layer_sizes[i], layer_sizes[i+1]) * 0.01
b = np.zeros(layer_sizes[i+1])
self.layers.append((W, b))
def forward(self, X: np.ndarray) -> np.ndarray:
out = X
for i, (W, b) in enumerate(self.layers):
out = out @ W + b
if i < len(self.layers) - 1:
out = np.maximum(0, out)
return out
model = ModelFactory.create("mlp", layer_sizes=[128, 256, 10])
X = np.random.randn(32, 128)
output = model.forward(X)
print(f"Model output shape: {output.shape}")
10.2 ML Testing with pytest
# tests/test_models.py
import pytest
import numpy as np
from numpy.testing import assert_array_almost_equal
class TestLossFunctions:
def test_mse_zero_loss(self):
"""Perfect predictions should yield zero loss"""
x = np.array([1.0, 2.0, 3.0])
loss = MSELoss()(x, x)
assert abs(loss) < 1e-10
def test_mse_positive(self):
"""MSE must always be non-negative"""
predictions = np.random.randn(100)
targets = np.random.randn(100)
loss = MSELoss()(predictions, targets)
assert loss >= 0
@pytest.mark.parametrize("batch_size", [1, 16, 32, 128])
def test_batch_sizes(self, batch_size):
"""Model must handle various batch sizes"""
X = np.random.randn(batch_size, 64)
model = LinearModel(64, 10)
output = model.forward(X)
assert output.shape == (batch_size, 10)
def test_numerical_gradient(self):
"""Verify analytical gradient matches numerical gradient"""
loss_fn = MSELoss()
pred = np.array([0.5, 0.3, 0.2])
target = np.array([0.6, 0.2, 0.1])
analytical_grad = loss_fn.gradient(pred, target)
eps = 1e-5
numerical_grad = np.zeros_like(pred)
for i in range(len(pred)):
pred_plus = pred.copy()
pred_plus[i] += eps
pred_minus = pred.copy()
pred_minus[i] -= eps
numerical_grad[i] = (loss_fn(pred_plus, target) - loss_fn(pred_minus, target)) / (2 * eps)
assert_array_almost_equal(analytical_grad, numerical_grad, decimal=5)
# Run with:
# pytest tests/ -v --cov=src --cov-report=html -x
10.3 Logging Best Practices
import logging
import sys
from pathlib import Path
from typing import Optional
def setup_logger(
name: str,
level: int = logging.INFO,
log_file: Optional[str] = None,
format_str: Optional[str] = None
) -> logging.Logger:
if format_str is None:
format_str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
formatter = logging.Formatter(format_str, datefmt='%Y-%m-%d %H:%M:%S')
logger = logging.getLogger(name)
logger.setLevel(level)
if not logger.handlers:
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
if log_file:
Path(log_file).parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
class TrainingLogger:
def __init__(self, experiment_name: str, log_dir: str = "logs"):
self.logger = setup_logger(
f"training.{experiment_name}",
log_file=f"{log_dir}/{experiment_name}.log"
)
self.step = 0
def log_metrics(self, metrics: dict, phase: str = "train"):
metrics_str = ", ".join(f"{k}={v:.4f}" for k, v in metrics.items())
self.logger.info(f"[{phase}] step={self.step} {metrics_str}")
self.step += 1
def log_epoch(self, epoch: int, train_loss: float, val_loss: float):
self.logger.info(
f"Epoch {epoch}: train_loss={train_loss:.4f}, val_loss={val_loss:.4f}"
)
def log_hyperparams(self, params: dict):
self.logger.info(f"Hyperparameters: {params}")
logger = TrainingLogger("bert-finetuning-v1")
logger.log_hyperparams({"lr": 2e-5, "batch_size": 32, "epochs": 10})
for epoch in range(3):
for step in range(10):
logger.log_metrics(
{"loss": 2.0 - epoch * 0.3 - step * 0.01, "acc": 0.5 + epoch * 0.1},
phase="train"
)
logger.log_epoch(epoch, 1.5 - epoch * 0.3, 1.8 - epoch * 0.25)
Conclusion
When applying the techniques covered in this guide to real AI/ML projects, follow this priority order:
-
Profile first: Always identify bottlenecks with cProfile and line_profiler before optimizing.
-
Vectorize with NumPy: In most cases, vectorization alone achieves 10-100x speedups.
-
Apply Numba to hot loops: Use Numba JIT where pure Python loops are truly unavoidable.
-
Use multiprocessing for CPU saturation: Leverage ProcessPoolExecutor for independent tasks like data preprocessing and cross-validation.
-
asyncio for I/O-bound pipelines: Achieve 10x+ throughput gains on API-heavy or file I/O-heavy workflows.
-
Type hints and tests: Combine mypy, pytest, and pydantic to maintain a robust ML codebase that scales safely.