Skip to content

필사 모드: Python Advanced Techniques for AI/ML: Performance Optimization, Parallel Processing, Memory Management

English
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

Overview

Python is the lingua franca of the AI/ML ecosystem. However, vanilla Python code has inherent performance limitations, and bottlenecks emerge in large-scale data processing and model training pipelines. This guide covers advanced Python techniques for building production-grade AI/ML systems.

You will learn how to write code that is hundreds of times faster than pure Python loops, leverage GPUs, and manage memory efficiently — all demonstrated with hands-on examples.

1. Python Performance Profiling

Always profile before optimizing. "If you can't measure it, you can't improve it" is the golden rule.

1.1 cProfile and pstats

`cProfile`, included in the Python standard library, is the foundational tool for function-level profiling.

def slow_matrix_multiply(a, b):

"""Inefficient matrix multiplication"""

rows_a = len(a)

cols_a = len(a[0])

cols_b = len(b[0])

result = [[0] * cols_b for _ in range(rows_a)]

for i in range(rows_a):

for j in range(cols_b):

for k in range(cols_a):

result[i][j] += a[i][k] * b[k][j]

return result

def profile_function():

size = 100

a = [[float(i + j) for j in range(size)] for i in range(size)]

b = [[float(i * j + 1) for j in range(size)] for i in range(size)]

slow_matrix_multiply(a, b)

Run profiling

pr = cProfile.Profile()

pr.enable()

profile_function()

pr.disable()

Analyze results

stream = io.StringIO()

ps = pstats.Stats(pr, stream=stream)

ps.sort_stats('cumulative')

ps.print_stats(20)

print(stream.getvalue())

You can also run it directly from the command line.

python -m cProfile -s cumulative your_script.py

python -m cProfile -o output.prof your_script.py

Analyzing a saved `.prof` file:

p = pstats.Stats('output.prof')

p.sort_stats('cumulative')

p.print_stats(10) # Top 10 functions

p.print_callers('slow_function') # Who called this function

p.print_callees('main') # What this function calls

1.2 line_profiler (Line-by-line Analysis)

Measures performance at the line level rather than the function level.

pip install line_profiler

from line_profiler import LineProfiler

def process_batch(data):

result = []

for item in data:

normalized = (item - item.mean()) / (item.std() + 1e-8)

scaled = normalized * 2.0

clipped = np.clip(scaled, -5, 5)

result.append(clipped)

return np.stack(result)

Set up profiler

lp = LineProfiler()

lp_wrapper = lp(process_batch)

Run with sample data

data = [np.random.randn(1000) for _ in range(1000)]

lp_wrapper(data)

Print results

lp.print_stats()

In Jupyter Notebook you can use magic commands:

%load_ext line_profiler

%lprun -f process_batch process_batch(data)

1.3 memory_profiler

Tracks memory usage line by line.

pip install memory_profiler

from memory_profiler import profile

@profile

def memory_intensive_function(n=1_000_000):

list_data = list(range(n)) # Create list

arr = np.array(list_data) # Convert to NumPy array

del list_data # Delete list

squared = arr ** 2 # Create new array

filtered = squared[squared > 100] # Filter

return filtered.sum()

result = memory_intensive_function()

print(f"Result: {result}")

python -m memory_profiler your_script.py

mprof run your_script.py

mprof plot # Visualize

1.4 py-spy (Sampling Profiler)

A powerful tool for profiling a running Python process without modifying its code.

pip install py-spy

Profile a running process

py-spy top --pid 12345

Generate a flame graph

py-spy record -o profile.svg --pid 12345

Profile a script directly

py-spy record -o profile.svg -- python your_script.py

1.5 SnakeViz Visualization

pip install snakeviz

python -m cProfile -o output.prof your_script.py

snakeviz output.prof

2. NumPy Vectorization Deep Dive

NumPy vectorization is the cornerstone of Python AI/ML performance optimization.

2.1 for loop vs. Vectorized Performance Comparison

def benchmark(func, *args, runs=5):

times = []

for _ in range(runs):

start = time.perf_counter()

result = func(*args)

end = time.perf_counter()

times.append(end - start)

return np.mean(times), np.std(times), result

1. Python for loop

def relu_loop(x):

result = []

for val in x:

result.append(max(0, val))

return result

2. NumPy vectorized

def relu_numpy(x):

return np.maximum(0, x)

3. NumPy clip

def relu_clip(x):

return np.clip(x, 0, None)

n = 1_000_000

data_list = [np.random.randn() for _ in range(n)]

data_np = np.array(data_list)

t_loop, _, _ = benchmark(relu_loop, data_list)

t_numpy, _, _ = benchmark(relu_numpy, data_np)

t_clip, _, _ = benchmark(relu_clip, data_np)

print(f"Python loop: {t_loop:.4f}s")

print(f"NumPy maximum: {t_numpy:.4f}s ({t_loop/t_numpy:.0f}x faster)")

print(f"NumPy clip: {t_clip:.4f}s ({t_loop/t_clip:.0f}x faster)")

2.2 Universal Functions (ufuncs)

def custom_activation_scalar(x, alpha=0.1):

"""ELU activation function"""

if x >= 0:

return x

else:

return alpha * (np.exp(x) - 1)

Create ufunc with vectorize

elu_ufunc = np.vectorize(custom_activation_scalar)

Full NumPy ufunc (faster)

def elu_numpy(x, alpha=0.1):

return np.where(x >= 0, x, alpha * (np.exp(x) - 1))

x = np.random.randn(1_000_000)

t_vectorize, _, _ = benchmark(elu_ufunc, x)

t_numpy, _, _ = benchmark(elu_numpy, x)

print(f"np.vectorize: {t_vectorize:.4f}s")

print(f"np.where: {t_numpy:.4f}s")

2.3 np.einsum (Einstein Summation)

batch_size, seq_len, d_model = 32, 128, 512

heads, d_head = 8, 64

A = np.random.randn(batch_size, seq_len, d_model)

W = np.random.randn(d_model, d_model)

Standard matmul

result_matmul = A @ W # (32, 128, 512)

Same operation with einsum

result_einsum = np.einsum('bsi,ij->bsj', A, W)

Attention scores (batched matmul)

Q = np.random.randn(batch_size, heads, seq_len, d_head)

K = np.random.randn(batch_size, heads, seq_len, d_head)

scores_manual = Q @ K.transpose(0, 1, 3, 2) # (32, 8, 128, 128)

scores_einsum = np.einsum('bhid,bhjd->bhij', Q, K)

Batch outer product

a = np.random.randn(batch_size, d_model)

b = np.random.randn(batch_size, d_model)

outer = np.einsum('bi,bj->bij', a, b) # (32, 512, 512)

Trace (sum of diagonal)

M = np.random.randn(batch_size, d_model, d_model)

trace = np.einsum('bii->b', M) # (32,)

print(f"Attention scores shape: {scores_einsum.shape}")

print(f"Outer product shape: {outer.shape}")

print(f"Trace shape: {trace.shape}")

Use optimize parameter for the best contraction path

result = np.einsum('bhid,bhjd->bhij', Q, K, optimize='optimal')

2.4 Stride Tricks

from numpy.lib.stride_tricks import as_strided, sliding_window_view

arr = np.arange(20, dtype=float)

window_size = 5

Modern approach (NumPy 1.20+)

windows = sliding_window_view(arr, window_size)

print(f"Sliding window shape: {windows.shape}") # (16, 5)

Moving average

moving_avg = windows.mean(axis=-1)

print(f"Moving average: {moving_avg[:5]}")

2D sliding window (for convolution prep)

image = np.random.randn(64, 64)

kernel_size = (3, 3)

windows_2d = sliding_window_view(image, kernel_size)

print(f"2D window shape: {windows_2d.shape}") # (62, 62, 3, 3)

Non-overlapping chunks via stride tricks

batch = np.random.randn(1000, 10)

chunk_size = 50

n_chunks = len(batch) // chunk_size

chunks = batch[:n_chunks * chunk_size].reshape(n_chunks, chunk_size, -1)

print(f"Chunks shape: {chunks.shape}") # (20, 50, 10)

3. Numba JIT Compilation

Numba compiles Python functions to machine code, delivering C/C++-level performance.

3.1 @jit, @njit Decorators

from numba import jit, njit, prange

@jit: fallback mode (allows Python objects)

@jit(nopython=False)

def sum_with_jit(arr):

total = 0.0

for i in range(len(arr)):

total += arr[i]

return total

@njit: pure native mode (faster, recommended)

@njit

def sum_with_njit(arr):

total = 0.0

for i in range(len(arr)):

total += arr[i]

return total

@njit

def compute_distances(points):

"""Compute Euclidean distance matrix between points"""

n = len(points)

distances = np.zeros((n, n))

for i in range(n):

for j in range(i + 1, n):

diff = points[i] - points[j]

dist = np.sqrt(np.sum(diff ** 2))

distances[i, j] = dist

distances[j, i] = dist

return distances

arr = np.random.randn(1_000_000)

First call includes compilation time

_ = sum_with_njit(arr) # Warm-up

start = time.perf_counter()

result_numpy = np.sum(arr)

print(f"NumPy sum: {time.perf_counter() - start:.4f}s")

start = time.perf_counter()

result_njit = sum_with_njit(arr)

print(f"Numba njit: {time.perf_counter() - start:.4f}s")

Point cloud distance calculation

points = np.random.randn(500, 3).astype(np.float64)

_ = compute_distances(points) # Warm-up

start = time.perf_counter()

dist_matrix = compute_distances(points)

print(f"Numba distance matrix ({points.shape[0]}x{points.shape[0]}): {time.perf_counter() - start:.4f}s")

3.2 Type Inference and Explicit Type Declarations

from numba import njit, float64, int64

Explicit type avoids first-call delay

@njit(float64[:](float64[:], float64))

def scale_array_typed(arr, factor):

return arr * factor

Multiple type signatures

from numba import float32, float64

@njit([

float32[:](float32[:], float32),

float64[:](float64[:], float64)

])

def scale_multi_type(arr, factor):

return arr * factor

@njit

def softmax_numba(x):

"""Numba-accelerated softmax"""

max_val = np.max(x)

exp_x = np.exp(x - max_val)

return exp_x / np.sum(exp_x)

@njit

def batch_softmax(X):

"""Batch softmax for 2D array"""

result = np.empty_like(X)

for i in range(X.shape[0]):

result[i] = softmax_numba(X[i])

return result

X = np.random.randn(1000, 512).astype(np.float64)

_ = batch_softmax(X) # Warm-up

start = time.perf_counter()

result = batch_softmax(X)

print(f"Numba batch softmax: {time.perf_counter() - start:.4f}s")

3.3 Parallel Processing (@prange)

from numba import njit, prange

@njit(parallel=True)

def parallel_normalize(X):

"""Row-wise normalization with automatic parallelism"""

result = np.empty_like(X)

for i in prange(X.shape[0]): # Auto-parallelized

row = X[i]

mean = row.mean()

std = row.std()

result[i] = (row - mean) / (std + 1e-8)

return result

@njit(parallel=True)

def parallel_batch_process(data, weights):

"""Weighted sum over batch in parallel"""

n_samples, n_features = data.shape

results = np.zeros(n_samples)

for i in prange(n_samples):

dot = 0.0

for j in range(n_features):

dot += data[i, j] * weights[j]

results[i] = dot

return results

X = np.random.randn(10000, 512).astype(np.float64)

_ = parallel_normalize(X) # Warm-up

start = time.perf_counter()

result_parallel = parallel_normalize(X)

t_parallel = time.perf_counter() - start

@njit

def sequential_normalize(X):

result = np.empty_like(X)

for i in range(X.shape[0]):

row = X[i]

mean = row.mean()

std = row.std()

result[i] = (row - mean) / (std + 1e-8)

return result

_ = sequential_normalize(X)

start = time.perf_counter()

result_seq = sequential_normalize(X)

t_seq = time.perf_counter() - start

print(f"Sequential: {t_seq:.4f}s")

print(f"Parallel: {t_parallel:.4f}s ({t_seq/t_parallel:.1f}x faster)")

3.4 Numba Caching

from numba import njit

cache=True saves compiled code to disk

@njit(cache=True)

def cached_computation(x, y):

"""Cached JIT function — loads instantly on next run"""

result = np.empty(len(x))

for i in range(len(x)):

result[i] = x[i] ** 2 + y[i] ** 2

return result

arr1 = np.random.randn(1000)

arr2 = np.random.randn(1000)

result = cached_computation(arr1, arr2)

4. Cython

Cython transpiles Python code to C for maximum performance.

4.1 Writing .pyx Files

setup.py — Cython build configuration

from setuptools import setup

from Cython.Build import cythonize

setup(

ext_modules=cythonize(

"fast_ops.pyx",

compiler_directives={

'language_level': '3',

'boundscheck': False,

'wraparound': False,

'nonecheck': False,

}

),

include_dirs=[np.get_include()]

)

`fast_ops.pyx` example:

fast_ops.pyx

cython: language_level=3

cython: boundscheck=False

cython: wraparound=False

cimport numpy as cnp

from libc.math cimport exp, sqrt

def relu_cython(cnp.ndarray[cnp.double_t, ndim=1] x):

cdef int n = len(x)

cdef cnp.ndarray[cnp.double_t, ndim=1] result = np.empty(n)

cdef int i

cdef double val

for i in range(n):

val = x[i]

result[i] = val if val > 0 else 0.0

return result

def softmax_cython(cnp.ndarray[cnp.double_t, ndim=1] x):

cdef int n = len(x)

cdef cnp.ndarray[cnp.double_t, ndim=1] result = np.empty(n)

cdef double max_val = x[0]

cdef double sum_exp = 0.0

cdef int i

for i in range(n):

if x[i] > max_val:

max_val = x[i]

for i in range(n):

result[i] = exp(x[i] - max_val)

sum_exp += result[i]

for i in range(n):

result[i] /= sum_exp

return result

Build and use:

python setup.py build_ext --inplace

x = np.random.randn(1_000_000)

result = fast_ops.relu_cython(x)

4.2 IPython Cython Magic

%load_ext Cython

%%cython -a

-a: annotation mode (yellow = Python calls, white = C)

cimport numpy as cnp

def fast_l2_norm(cnp.ndarray[cnp.double_t, ndim=1] v):

cdef int n = len(v)

cdef double total = 0.0

cdef int i

for i in range(n):

total += v[i] * v[i]

return total ** 0.5

5. Python Multiprocessing

5.1 Understanding the GIL

The GIL (Global Interpreter Lock) allows only one thread to execute Python bytecode at a time. For CPU-bound tasks, multithreading provides no real parallelism — use multiprocessing instead.

def cpu_bound_task(n):

total = 0

for i in range(n):

total += i ** 2

return total

def run_threaded(n_tasks=4, n_per_task=1_000_000):

threads = []

start = time.perf_counter()

for _ in range(n_tasks):

t = threading.Thread(target=cpu_bound_task, args=(n_per_task,))

threads.append(t)

t.start()

for t in threads:

t.join()

return time.perf_counter() - start

def run_multiprocessing(n_tasks=4, n_per_task=1_000_000):

start = time.perf_counter()

with multiprocessing.Pool(processes=n_tasks) as pool:

results = pool.map(cpu_bound_task, [n_per_task] * n_tasks)

return time.perf_counter() - start

t_threaded = run_threaded()

t_mp = run_multiprocessing()

print(f"Threading: {t_threaded:.2f}s")

print(f"Multiprocessing: {t_mp:.2f}s ({t_threaded/t_mp:.1f}x faster)")

5.2 multiprocessing.Pool

def preprocess_sample(args):

idx, data, mean, std = args

normalized = (data - mean) / (std + 1e-8)

augmented = normalized + np.random.randn(*normalized.shape) * 0.01

return idx, augmented

def preprocess_dataset_parallel(dataset, n_workers=None):

if n_workers is None:

n_workers = multiprocessing.cpu_count()

mean = dataset.mean(axis=0)

std = dataset.std(axis=0)

args = [(i, dataset[i], mean, std) for i in range(len(dataset))]

with multiprocessing.Pool(processes=n_workers) as pool:

results = pool.map(preprocess_sample, args, chunksize=100)

results.sort(key=lambda x: x[0])

return np.stack([r[1] for r in results])

dataset = np.random.randn(10000, 128)

processed = preprocess_dataset_parallel(dataset, n_workers=4)

print(f"Processed dataset shape: {processed.shape}")

5.3 ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor, as_completed

def train_fold(fold_data):

fold_idx, X_train, y_train, X_val, y_val = fold_data

from sklearn.linear_model import LogisticRegression

from sklearn.metrics import accuracy_score

model = LogisticRegression(max_iter=1000)

model.fit(X_train, y_train)

val_pred = model.predict(X_val)

accuracy = accuracy_score(y_val, val_pred)

return fold_idx, accuracy, model

def parallel_cross_validation(X, y, n_folds=5):

from sklearn.model_selection import KFold

kf = KFold(n_splits=n_folds, shuffle=True, random_state=42)

fold_data_list = []

for fold_idx, (train_idx, val_idx) in enumerate(kf.split(X)):

fold_data_list.append((

fold_idx,

X[train_idx], y[train_idx],

X[val_idx], y[val_idx]

))

results = {}

with ProcessPoolExecutor(max_workers=n_folds) as executor:

future_to_fold = {

executor.submit(train_fold, fd): fd[0]

for fd in fold_data_list

}

for future in as_completed(future_to_fold):

fold_idx, accuracy, model = future.result()

results[fold_idx] = {'accuracy': accuracy, 'model': model}

print(f"Fold {fold_idx}: accuracy = {accuracy:.4f}")

avg_accuracy = np.mean([r['accuracy'] for r in results.values()])

print(f"\nMean accuracy: {avg_accuracy:.4f}")

return results

from sklearn.datasets import make_classification

X, y = make_classification(n_samples=5000, n_features=20, random_state=42)

results = parallel_cross_validation(X, y, n_folds=5)

5.4 Shared Memory

from multiprocessing import shared_memory

def worker_shared_memory(shm_name, shape, dtype, start_idx, end_idx):

existing_shm = shared_memory.SharedMemory(name=shm_name)

arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)

arr[start_idx:end_idx] = arr[start_idx:end_idx] ** 2

existing_shm.close()

def process_with_shared_memory(data):

shm = shared_memory.SharedMemory(create=True, size=data.nbytes)

shared_array = np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf)

shared_array[:] = data[:]

n_workers = multiprocessing.cpu_count()

chunk_size = len(data) // n_workers

processes = []

for i in range(n_workers):

start = i * chunk_size

end = start + chunk_size if i < n_workers - 1 else len(data)

p = multiprocessing.Process(

target=worker_shared_memory,

args=(shm.name, data.shape, data.dtype, start, end)

)

processes.append(p)

p.start()

for p in processes:

p.join()

result = shared_array.copy()

shm.close()

shm.unlink()

return result

data = np.random.randn(1_000_000).astype(np.float64)

result = process_with_shared_memory(data)

print(f"Shared memory processing complete: {result[:5]}")

6. Asynchronous Programming (asyncio)

6.1 async/await Pattern

async def fetch_embeddings(text: str, model: str = "text-embedding-3-small") -> list:

"""Simulate async embedding request"""

await asyncio.sleep(0.1)

return [0.1, 0.2, 0.3]

async def sequential_processing(texts: list) -> list:

results = []

for text in texts:

embedding = await fetch_embeddings(text)

results.append(embedding)

return results

async def parallel_processing(texts: list) -> list:

tasks = [fetch_embeddings(text) for text in texts]

results = await asyncio.gather(*tasks)

return list(results)

async def compare_approaches():

texts = [f"sample text {i}" for i in range(20)]

start = time.perf_counter()

sequential_results = await sequential_processing(texts)

t_seq = time.perf_counter() - start

start = time.perf_counter()

parallel_results = await parallel_processing(texts)

t_par = time.perf_counter() - start

print(f"Sequential: {t_seq:.2f}s ({len(sequential_results)} results)")

print(f"Parallel: {t_par:.2f}s ({len(parallel_results)} results)")

print(f"Speedup: {t_seq/t_par:.1f}x")

asyncio.run(compare_approaches())

6.2 Async HTTP with aiohttp

from typing import Optional

class AsyncAIClient:

"""Async AI API client with rate limiting"""

def __init__(self, api_key: str, base_url: str, max_concurrent: int = 10):

self.api_key = api_key

self.base_url = base_url

self.semaphore = asyncio.Semaphore(max_concurrent)

self.session: Optional[aiohttp.ClientSession] = None

async def __aenter__(self):

connector = aiohttp.TCPConnector(limit=100)

self.session = aiohttp.ClientSession(

connector=connector,

headers={"Authorization": f"Bearer {self.api_key}"}

)

return self

async def __aexit__(self, *args):

if self.session:

await self.session.close()

async def get_embedding(self, text: str, retry: int = 3) -> list:

"""Embedding request with exponential backoff retry"""

async with self.semaphore:

for attempt in range(retry):

try:

async with self.session.post(

f"{self.base_url}/embeddings",

json={"input": text, "model": "text-embedding-3-small"}

) as response:

if response.status == 200:

data = await response.json()

return data["data"][0]["embedding"]

elif response.status == 429:

Rate limit — exponential backoff

await asyncio.sleep(2 ** attempt)

else:

response.raise_for_status()

except aiohttp.ClientError as e:

if attempt == retry - 1:

raise

await asyncio.sleep(1)

return []

async def batch_embeddings(self, texts: list, batch_size: int = 100) -> list:

"""Process embeddings in batches"""

all_results = []

for i in range(0, len(texts), batch_size):

batch = texts[i:i + batch_size]

tasks = [self.get_embedding(text) for text in batch]

batch_results = await asyncio.gather(*tasks, return_exceptions=True)

for j, result in enumerate(batch_results):

if isinstance(result, Exception):

print(f"Error for text {i+j}: {result}")

all_results.append(None)

else:

all_results.append(result)

return all_results

6.3 Dynamic Batch Processor for AI APIs

from dataclasses import dataclass, field

from typing import Any

@dataclass

class BatchProcessor:

"""Dynamic request batcher"""

max_batch_size: int = 20

max_wait_time: float = 0.05 # 50ms

queue: asyncio.Queue = field(default_factory=asyncio.Queue)

async def process_request(self, request_id: str, data: Any) -> Any:

future = asyncio.get_event_loop().create_future()

await self.queue.put((request_id, data, future))

return await future

async def batch_worker(self):

while True:

batch = []

deadline = time.perf_counter() + self.max_wait_time

while len(batch) < self.max_batch_size:

timeout = deadline - time.perf_counter()

if timeout <= 0:

break

try:

item = await asyncio.wait_for(self.queue.get(), timeout=timeout)

batch.append(item)

except asyncio.TimeoutError:

break

if not batch:

await asyncio.sleep(0.001)

continue

request_ids, data_list, futures = zip(*batch)

try:

results = await self._call_api_batch(list(data_list))

for future, result in zip(futures, results):

future.set_result(result)

except Exception as e:

for future in futures:

future.set_exception(e)

async def _call_api_batch(self, data_list: list) -> list:

await asyncio.sleep(0.1) # Simulate API latency

return [f"result_{d}" for d in data_list]

async def test_batch_processor():

processor = BatchProcessor(max_batch_size=10, max_wait_time=0.05)

worker_task = asyncio.create_task(processor.batch_worker())

async def make_request(i):

return await processor.process_request(f"req_{i}", f"data_{i}")

start = time.perf_counter()

tasks = [make_request(i) for i in range(50)]

results = await asyncio.gather(*tasks)

elapsed = time.perf_counter() - start

print(f"50 requests processed: {elapsed:.3f}s")

print(f"First 5 results: {results[:5]}")

worker_task.cancel()

asyncio.run(test_batch_processor())

7. Memory Management

7.1 Python Memory Model

data_types = {

'int': 42,

'float': 3.14,

'str (small)': "hello",

'str (large)': "hello" * 1000,

'list (empty)': [],

'dict (empty)': {},

'tuple (empty)': (),

}

print("Object sizes:")

for name, obj in data_types.items():

print(f" {name}: {sys.getsizeof(obj)} bytes")

def deep_size(obj, seen=None):

"""Total memory of an object and all its referenced objects"""

if seen is None:

seen = set()

obj_id = id(obj)

if obj_id in seen:

return 0

seen.add(obj_id)

size = sys.getsizeof(obj)

if isinstance(obj, dict):

size += sum(deep_size(k, seen) + deep_size(v, seen)

for k, v in obj.items())

elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes)):

size += sum(deep_size(item, seen) for item in obj)

return size

nested = {'data': [i for i in range(1000)], 'meta': {'name': 'test'}}

print(f"\nNested dict size: {deep_size(nested):,} bytes")

7.2 Generators for Memory Efficiency

from typing import Generator, Iterator

def dataset_generator(

file_paths: list,

batch_size: int = 32

) -> Generator:

"""Memory-efficient data loader"""

for path in file_paths:

data = np.random.randn(1000, 128)

labels = np.random.randint(0, 10, 1000)

for start in range(0, len(data), batch_size):

end = min(start + batch_size, len(data))

yield data[start:end], labels[start:end]

def augment_generator(gen: Iterator, noise_std: float = 0.01):

"""Chained augmentation generator"""

for X, y in gen:

X_aug = X + np.random.randn(*X.shape) * noise_std

yield X_aug, y

def normalize_generator(gen: Iterator):

"""Normalization generator"""

for X, y in gen:

X_norm = (X - X.mean(axis=1, keepdims=True)) / (X.std(axis=1, keepdims=True) + 1e-8)

yield X_norm, y

Compose the pipeline

fake_paths = [f"data_{i}.npy" for i in range(5)]

base_gen = dataset_generator(fake_paths, batch_size=32)

aug_gen = augment_generator(base_gen)

norm_gen = normalize_generator(aug_gen)

for i, (X_batch, y_batch) in enumerate(norm_gen):

if i >= 3:

break

print(f"Batch {i}: shape={X_batch.shape}, mean={X_batch.mean():.4f}")

7.3 `__slots__` Optimization

class RegularEmbedding:

def __init__(self, vector, label, metadata):

self.vector = vector

self.label = label

self.metadata = metadata

class SlottedEmbedding:

__slots__ = ['vector', 'label', 'metadata']

def __init__(self, vector, label, metadata):

self.vector = vector

self.label = label

self.metadata = metadata

n = 100_000

vector_size = 128

regular_objects = [

RegularEmbedding(np.random.randn(vector_size), i % 10, {'source': 'test'})

for i in range(n)

]

slotted_objects = [

SlottedEmbedding(np.random.randn(vector_size), i % 10, {'source': 'test'})

for i in range(n)

]

regular_size = sys.getsizeof(regular_objects[0]) + sys.getsizeof(regular_objects[0].__dict__)

slotted_size = sys.getsizeof(slotted_objects[0])

print(f"Regular class instance: {regular_size} bytes")

print(f"__slots__ class instance: {slotted_size} bytes")

print(f"Memory saved: {(regular_size - slotted_size) / regular_size * 100:.1f}%")

7.4 weakref

class ModelCache:

"""Model cache using weak references"""

def __init__(self):

self._cache = weakref.WeakValueDictionary()

def get_or_load(self, model_name: str):

model = self._cache.get(model_name)

if model is None:

print(f"Loading model: {model_name}")

model = self._load_model(model_name)

self._cache[model_name] = model

else:

print(f"Returning from cache: {model_name}")

return model

def _load_model(self, model_name: str):

return {'name': model_name, 'weights': np.random.randn(100, 100)}

cache = ModelCache()

model = cache.get_or_load('resnet50')

print(f"Cache size: {len(cache._cache)}")

model_again = cache.get_or_load('resnet50')

del model

del model_again

gc.collect()

print(f"Cache size after GC: {len(cache._cache)}")

new_model = cache.get_or_load('resnet50')

8. Python Type Hints

8.1 Static Type Checking with mypy

pip install mypy

mypy your_script.py --strict

from typing import Optional, Union, List, Dict, Tuple, Callable

from numpy.typing import NDArray

def compute_loss(

predictions: NDArray[np.float32],

targets: NDArray[np.float32],

reduction: str = 'mean'

) -> float:

diff = predictions - targets

loss = np.sum(diff ** 2)

if reduction == 'mean':

return float(loss / len(predictions))

return float(loss)

def load_checkpoint(

path: str,

device: Optional[str] = None,

strict: bool = True

) -> Optional[Dict[str, NDArray]]:

try:

return {'weights': np.random.randn(100)}

except FileNotFoundError:

return None

def split_dataset(

X: NDArray,

y: NDArray,

test_size: float = 0.2,

random_state: Optional[int] = None

) -> Tuple[NDArray, NDArray, NDArray, NDArray]:

if random_state is not None:

np.random.seed(random_state)

n = len(X)

n_test = int(n * test_size)

indices = np.random.permutation(n)

test_idx = indices[:n_test]

train_idx = indices[n_test:]

return X[train_idx], X[test_idx], y[train_idx], y[test_idx]

8.2 Generics and Protocol

from typing import TypeVar, Generic, Protocol, runtime_checkable, List, Dict

T = TypeVar('T')

class DataLoader(Generic[T]):

def __init__(self, dataset: List[T], batch_size: int = 32):

self.dataset = dataset

self.batch_size = batch_size

self._idx = 0

def __iter__(self):

self._idx = 0

return self

def __next__(self) -> List[T]:

if self._idx >= len(self.dataset):

raise StopIteration

batch = self.dataset[self._idx:self._idx + self.batch_size]

self._idx += self.batch_size

return batch

@runtime_checkable

class Optimizer(Protocol):

def step(self) -> None: ...

def zero_grad(self) -> None: ...

def state_dict(self) -> Dict: ...

@runtime_checkable

class Model(Protocol):

def forward(self, x: 'NDArray') -> 'NDArray': ...

def parameters(self) -> List['NDArray']: ...

def train(self) -> None: ...

def eval(self) -> None: ...

8.3 dataclass and TypedDict

from dataclasses import dataclass, field

from typing import TypedDict, List, Dict, Optional

class ModelConfig(TypedDict):

hidden_size: int

num_layers: int

dropout: float

activation: str

@dataclass

class ExperimentConfig:

experiment_name: str

model_type: str

learning_rate: float = 1e-3

batch_size: int = 32

max_epochs: int = 100

seed: int = 42

tags: List[str] = field(default_factory=list)

hyperparams: Dict[str, float] = field(default_factory=dict)

def __post_init__(self):

if self.learning_rate <= 0:

raise ValueError(f"Learning rate must be positive: {self.learning_rate}")

if not self.experiment_name:

raise ValueError("Experiment name is required")

def to_dict(self) -> Dict:

from dataclasses import asdict

return asdict(self)

config = ExperimentConfig(

experiment_name="bert-finetuning-v1",

model_type="bert-base",

learning_rate=2e-5,

batch_size=16,

tags=["nlp", "classification"]

)

print(f"Experiment: {config.experiment_name}")

print(f"Config: {config.to_dict()}")

8.4 Data Validation with pydantic

from pydantic import BaseModel, Field, validator, root_validator

from typing import Optional, List

class DataConfig(BaseModel):

data_path: str

batch_size: int = Field(ge=1, le=1024, default=32)

shuffle: bool = True

num_workers: int = Field(ge=0, le=16, default=4)

@validator('data_path')

def path_must_exist(cls, v):

if not os.path.exists(v) and v != "test_path":

raise ValueError(f"Data path does not exist: {v}")

return v

class ModelConfig(BaseModel):

architecture: str

hidden_sizes: List[int] = Field(min_items=1)

dropout_rate: float = Field(ge=0.0, le=0.9, default=0.1)

activation: str = "relu"

@validator('activation')

def valid_activation(cls, v):

valid = ['relu', 'gelu', 'silu', 'tanh', 'sigmoid']

if v not in valid:

raise ValueError(f"Activation must be one of {valid}")

return v

class TrainingConfig(BaseModel):

data: DataConfig

model: ModelConfig

learning_rate: float = Field(gt=0, le=1.0, default=1e-3)

weight_decay: float = Field(ge=0, default=1e-4)

max_epochs: int = Field(ge=1, le=10000, default=100)

@root_validator

def check_consistency(cls, values):

model = values.get('model')

if model and model.dropout_rate > 0.5:

print("Warning: High dropout rate may cause underfitting")

return values

try:

config = TrainingConfig(

data=DataConfig(data_path="test_path", batch_size=64),

model=ModelConfig(

architecture="transformer",

hidden_sizes=[512, 256, 128],

dropout_rate=0.1

),

learning_rate=2e-5

)

print(f"Validation passed: {config.model.architecture}")

print(f"JSON: {config.json(indent=2)}")

except Exception as e:

print(f"Validation error: {e}")

9. Package Management and Dependencies

9.1 Poetry

Install Poetry

curl -sSL https://install.python-poetry.org | python3 -

poetry new my-ml-project

cd my-ml-project

Add dependencies

poetry add numpy torch torchvision

poetry add --group dev pytest black ruff mypy

Activate virtual environment

poetry shell

Install all dependencies

poetry install

Export to requirements.txt

poetry export -f requirements.txt --output requirements.txt

9.2 pyproject.toml

[tool.poetry]

name = "my-ml-project"

version = "0.1.0"

description = "AI/ML project"

authors = ["Your Name <email@example.com>"]

license = "MIT"

readme = "README.md"

packages = [{include = "my_ml_project"}]

[tool.poetry.dependencies]

python = "^3.10"

numpy = "^1.24"

torch = "^2.1"

torchvision = "^0.16"

transformers = "^4.35"

pydantic = "^2.0"

aiohttp = "^3.9"

[tool.poetry.group.dev.dependencies]

pytest = "^7.4"

pytest-cov = "^4.1"

black = "^23.0"

ruff = "^0.1"

mypy = "^1.6"

pre-commit = "^3.5"

[build-system]

requires = ["poetry-core"]

build-backend = "poetry.core.masonry.api"

[tool.black]

line-length = 88

target-version = ['py310', 'py311']

[tool.ruff]

select = ["E", "F", "I", "N", "W"]

ignore = ["E501"]

line-length = 88

[tool.mypy]

python_version = "3.10"

warn_return_any = true

warn_unused_configs = true

disallow_untyped_defs = true

[tool.pytest.ini_options]

testpaths = ["tests"]

addopts = "-v --cov=my_ml_project --cov-report=html"

9.3 uv (Fast Package Manager)

Install uv (Rust-based, extremely fast)

curl -LsSf https://astral.sh/uv/install.sh | sh

Initialize project

uv init my-project

cd my-project

Add dependencies (10-100x faster than pip)

uv add numpy torch transformers

uv add --dev pytest ruff mypy

Sync environment

uv sync

Manage Python versions

uv python install 3.11

uv python pin 3.11

Run scripts

uv run python train.py

10. Clean Code for ML

10.1 Design Patterns (Factory, Strategy)

from abc import ABC, abstractmethod

from typing import Dict, Type

Strategy Pattern — Loss Functions

class LossFunction(ABC):

@abstractmethod

def __call__(self, predictions: np.ndarray, targets: np.ndarray) -> float:

pass

@abstractmethod

def gradient(self, predictions: np.ndarray, targets: np.ndarray) -> np.ndarray:

pass

class MSELoss(LossFunction):

def __call__(self, predictions, targets):

return float(np.mean((predictions - targets) ** 2))

def gradient(self, predictions, targets):

return 2 * (predictions - targets) / len(predictions)

class HuberLoss(LossFunction):

def __init__(self, delta: float = 1.0):

self.delta = delta

def __call__(self, predictions, targets):

diff = predictions - targets

is_small = np.abs(diff) <= self.delta

loss = np.where(is_small, 0.5 * diff**2,

self.delta * np.abs(diff) - 0.5 * self.delta**2)

return float(np.mean(loss))

def gradient(self, predictions, targets):

diff = predictions - targets

return np.where(np.abs(diff) <= self.delta, diff,

self.delta * np.sign(diff))

Factory Pattern — Model Creation

class ModelFactory:

_registry: Dict[str, Type] = {}

@classmethod

def register(cls, name: str):

def decorator(model_class):

cls._registry[name] = model_class

return model_class

return decorator

@classmethod

def create(cls, name: str, **kwargs):

if name not in cls._registry:

available = list(cls._registry.keys())

raise ValueError(f"Unknown model: {name}. Available: {available}")

return cls._registry[name](**kwargs)

@ModelFactory.register("linear")

class LinearModel:

def __init__(self, input_size: int, output_size: int):

self.W = np.random.randn(input_size, output_size) * 0.01

self.b = np.zeros(output_size)

def forward(self, X: np.ndarray) -> np.ndarray:

return X @ self.W + self.b

@ModelFactory.register("mlp")

class MLPModel:

def __init__(self, layer_sizes: list):

self.layers = []

for i in range(len(layer_sizes) - 1):

W = np.random.randn(layer_sizes[i], layer_sizes[i+1]) * 0.01

b = np.zeros(layer_sizes[i+1])

self.layers.append((W, b))

def forward(self, X: np.ndarray) -> np.ndarray:

out = X

for i, (W, b) in enumerate(self.layers):

out = out @ W + b

if i < len(self.layers) - 1:

out = np.maximum(0, out)

return out

model = ModelFactory.create("mlp", layer_sizes=[128, 256, 10])

X = np.random.randn(32, 128)

output = model.forward(X)

print(f"Model output shape: {output.shape}")

10.2 ML Testing with pytest

tests/test_models.py

from numpy.testing import assert_array_almost_equal

class TestLossFunctions:

def test_mse_zero_loss(self):

"""Perfect predictions should yield zero loss"""

x = np.array([1.0, 2.0, 3.0])

loss = MSELoss()(x, x)

assert abs(loss) < 1e-10

def test_mse_positive(self):

"""MSE must always be non-negative"""

predictions = np.random.randn(100)

targets = np.random.randn(100)

loss = MSELoss()(predictions, targets)

assert loss >= 0

@pytest.mark.parametrize("batch_size", [1, 16, 32, 128])

def test_batch_sizes(self, batch_size):

"""Model must handle various batch sizes"""

X = np.random.randn(batch_size, 64)

model = LinearModel(64, 10)

output = model.forward(X)

assert output.shape == (batch_size, 10)

def test_numerical_gradient(self):

"""Verify analytical gradient matches numerical gradient"""

loss_fn = MSELoss()

pred = np.array([0.5, 0.3, 0.2])

target = np.array([0.6, 0.2, 0.1])

analytical_grad = loss_fn.gradient(pred, target)

eps = 1e-5

numerical_grad = np.zeros_like(pred)

for i in range(len(pred)):

pred_plus = pred.copy()

pred_plus[i] += eps

pred_minus = pred.copy()

pred_minus[i] -= eps

numerical_grad[i] = (loss_fn(pred_plus, target) - loss_fn(pred_minus, target)) / (2 * eps)

assert_array_almost_equal(analytical_grad, numerical_grad, decimal=5)

Run with:

pytest tests/ -v --cov=src --cov-report=html -x

10.3 Logging Best Practices

from pathlib import Path

from typing import Optional

def setup_logger(

name: str,

level: int = logging.INFO,

log_file: Optional[str] = None,

format_str: Optional[str] = None

) -> logging.Logger:

if format_str is None:

format_str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

formatter = logging.Formatter(format_str, datefmt='%Y-%m-%d %H:%M:%S')

logger = logging.getLogger(name)

logger.setLevel(level)

if not logger.handlers:

console_handler = logging.StreamHandler(sys.stdout)

console_handler.setFormatter(formatter)

logger.addHandler(console_handler)

if log_file:

Path(log_file).parent.mkdir(parents=True, exist_ok=True)

file_handler = logging.FileHandler(log_file)

file_handler.setFormatter(formatter)

logger.addHandler(file_handler)

return logger

class TrainingLogger:

def __init__(self, experiment_name: str, log_dir: str = "logs"):

self.logger = setup_logger(

f"training.{experiment_name}",

log_file=f"{log_dir}/{experiment_name}.log"

)

self.step = 0

def log_metrics(self, metrics: dict, phase: str = "train"):

metrics_str = ", ".join(f"{k}={v:.4f}" for k, v in metrics.items())

self.logger.info(f"[{phase}] step={self.step} {metrics_str}")

self.step += 1

def log_epoch(self, epoch: int, train_loss: float, val_loss: float):

self.logger.info(

f"Epoch {epoch}: train_loss={train_loss:.4f}, val_loss={val_loss:.4f}"

)

def log_hyperparams(self, params: dict):

self.logger.info(f"Hyperparameters: {params}")

logger = TrainingLogger("bert-finetuning-v1")

logger.log_hyperparams({"lr": 2e-5, "batch_size": 32, "epochs": 10})

for epoch in range(3):

for step in range(10):

logger.log_metrics(

{"loss": 2.0 - epoch * 0.3 - step * 0.01, "acc": 0.5 + epoch * 0.1},

phase="train"

)

logger.log_epoch(epoch, 1.5 - epoch * 0.3, 1.8 - epoch * 0.25)

Conclusion

When applying the techniques covered in this guide to real AI/ML projects, follow this priority order:

1. **Profile first**: Always identify bottlenecks with cProfile and line_profiler before optimizing.

2. **Vectorize with NumPy**: In most cases, vectorization alone achieves 10-100x speedups.

3. **Apply Numba to hot loops**: Use Numba JIT where pure Python loops are truly unavoidable.

4. **Use multiprocessing for CPU saturation**: Leverage ProcessPoolExecutor for independent tasks like data preprocessing and cross-validation.

5. **asyncio for I/O-bound pipelines**: Achieve 10x+ throughput gains on API-heavy or file I/O-heavy workflows.

6. **Type hints and tests**: Combine mypy, pytest, and pydantic to maintain a robust ML codebase that scales safely.

References

- [Numba Official Documentation](https://numba.readthedocs.io/)

- [Cython Official Documentation](https://cython.readthedocs.io/)

- [Python asyncio Documentation](https://docs.python.org/3/library/asyncio.html)

- [NumPy Advanced Guide](https://numpy.org/doc/stable/user/quickstart.html)

- [Poetry Documentation](https://python-poetry.org/docs/)

- [Pydantic v2 Documentation](https://docs.pydantic.dev/)

현재 단락 (1/1053)

Python is the lingua franca of the AI/ML ecosystem. However, vanilla Python code has inherent perfor...

작성 글자: 0원문 글자: 34,539작성 단락: 0/1053