Split View: AI/ML을 위한 Python 완전 가이드: NumPy, Pandas, Matplotlib, Scikit-learn 마스터하기
AI/ML을 위한 Python 완전 가이드: NumPy, Pandas, Matplotlib, Scikit-learn 마스터하기
AI/ML을 위한 Python 완전 가이드
Python은 AI와 머신러닝 분야의 표준 언어입니다. 간결한 문법, 방대한 라이브러리 생태계, 활성화된 커뮤니티 덕분에 연구자와 엔지니어 모두가 선택하는 언어가 되었습니다. 이 가이드에서는 AI/ML 개발에 필요한 Python 핵심 라이브러리들을 완전히 마스터하는 방법을 다룹니다.
1. AI/ML을 위한 Python 환경 설정
Python 버전 선택
AI/ML 작업에는 Python 3.10 이상을 권장합니다. Python 3.10+는 구조적 패턴 매칭, 더 명확한 에러 메시지, 향상된 타입 힌트를 제공합니다. 2026년 현재 Python 3.12가 안정 버전으로 대부분의 ML 라이브러리와 호환됩니다.
# Python 버전 확인
python --version
python3 --version
# pyenv를 이용한 특정 버전 설치
pyenv install 3.12.0
pyenv global 3.12.0
가상환경 설정
가상환경은 프로젝트별 의존성을 격리하는 핵심 도구입니다.
venv (표준 라이브러리)
# 가상환경 생성
python -m venv ml_env
# 활성화 (Linux/Mac)
source ml_env/bin/activate
# 활성화 (Windows)
ml_env\Scripts\activate
# 비활성화
deactivate
conda (Anaconda/Miniconda)
# 환경 생성
conda create -n ml_env python=3.12
# 활성화
conda activate ml_env
# 패키지 설치
conda install numpy pandas scikit-learn matplotlib
# 환경 목록
conda env list
# 환경 내보내기
conda env export > environment.yml
# 환경 복원
conda env create -f environment.yml
Poetry (의존성 관리 고급)
# Poetry 설치
curl -sSL https://install.python-poetry.org | python3 -
# 프로젝트 초기화
poetry new ml_project
cd ml_project
# 패키지 추가
poetry add numpy pandas scikit-learn torch
# 개발 의존성 추가
poetry add --dev pytest black flake8
# 환경 실행
poetry run python train.py
Jupyter Notebook/Lab 설정
# JupyterLab 설치
pip install jupyterlab
# 커널 등록
python -m ipykernel install --user --name=ml_env --display-name "ML Environment"
# JupyterLab 실행
jupyter lab
# 유용한 확장 설치
pip install jupyterlab-git
pip install nbformat
Jupyter 설정 파일 (~/.jupyter/jupyter_lab_config.py)
c.ServerApp.open_browser = True
c.ServerApp.port = 8888
c.ServerApp.ip = '0.0.0.0'
GPU Python 환경 (CUDA, cuDNN)
# CUDA 버전 확인
nvidia-smi
nvcc --version
# PyTorch with CUDA 설치 (CUDA 12.1)
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
# TensorFlow with GPU
pip install tensorflow[and-cuda]
# GPU 사용 가능 여부 확인 (PyTorch)
python -c "import torch; print(torch.cuda.is_available())"
# cuDNN 확인
python -c "import torch; print(torch.backends.cudnn.version())"
필수 패키지 목록
# requirements.txt
numpy>=1.24.0
pandas>=2.0.0
matplotlib>=3.7.0
seaborn>=0.12.0
scikit-learn>=1.3.0
scipy>=1.11.0
torch>=2.0.0
torchvision>=0.15.0
tensorflow>=2.13.0
xgboost>=1.7.0
lightgbm>=4.0.0
optuna>=3.3.0
wandb>=0.15.0
tqdm>=4.65.0
jupyterlab>=4.0.0
black>=23.0.0
flake8>=6.0.0
pytest>=7.4.0
# 한 번에 설치
pip install -r requirements.txt
2. NumPy 완전 마스터
NumPy(Numerical Python)는 파이썬 과학 계산의 기반입니다. 다차원 배열과 수학 함수를 제공하며, 대부분의 ML 라이브러리가 내부적으로 NumPy를 사용합니다.
ndarray 생성
import numpy as np
# 기본 배열 생성
arr1 = np.array([1, 2, 3, 4, 5])
arr2 = np.array([[1, 2, 3], [4, 5, 6]])
print(arr1.shape) # (5,)
print(arr2.shape) # (2, 3)
print(arr2.dtype) # int64
print(arr2.ndim) # 2
print(arr2.size) # 6
# 특수 배열
zeros = np.zeros((3, 4)) # 모든 원소 0
ones = np.ones((2, 3, 4)) # 모든 원소 1
full = np.full((3, 3), 7) # 모든 원소 7
eye = np.eye(4) # 단위 행렬
empty = np.empty((2, 3)) # 초기화 없는 배열
# 범위 배열
arange = np.arange(0, 10, 2) # [0, 2, 4, 6, 8]
linspace = np.linspace(0, 1, 5) # [0, 0.25, 0.5, 0.75, 1.0]
logspace = np.logspace(0, 3, 4) # [1, 10, 100, 1000]
# 난수 배열
np.random.seed(42)
rand_uniform = np.random.rand(3, 4) # [0, 1) 균등분포
rand_normal = np.random.randn(3, 4) # 표준 정규분포
rand_int = np.random.randint(0, 10, (3, 4)) # 정수 난수
rand_choice = np.random.choice([1, 2, 3, 4, 5], size=10, replace=True)
# 고급 난수 (권장 방식)
rng = np.random.default_rng(42)
samples = rng.normal(loc=0, scale=1, size=(100, 3))
기본 연산과 Broadcasting
import numpy as np
a = np.array([[1, 2, 3], [4, 5, 6]])
b = np.array([[7, 8, 9], [10, 11, 12]])
# 기본 산술 연산 (원소별)
print(a + b) # 원소 덧셈
print(a - b) # 원소 뺄셈
print(a * b) # 원소 곱셈
print(a / b) # 원소 나눗셈
print(a ** 2) # 원소 제곱
print(a % 2) # 원소 나머지
# Broadcasting - 서로 다른 형태의 배열 연산
# 규칙: 차원이 맞지 않으면 1인 차원을 확장
x = np.array([[1], [2], [3]]) # shape: (3, 1)
y = np.array([10, 20, 30]) # shape: (3,) → (1, 3)
# Broadcasting 결과: (3, 3)
result = x + y
print(result)
# [[11, 21, 31],
# [12, 22, 32],
# [13, 23, 33]]
# 실용적인 Broadcasting 예제
# 배치 데이터 정규화
data = np.random.randn(100, 10) # 100개 샘플, 10개 특성
mean = data.mean(axis=0) # 각 특성의 평균 (shape: 10,)
std = data.std(axis=0) # 각 특성의 표준편차 (shape: 10,)
normalized = (data - mean) / std # Broadcasting으로 정규화
print(normalized.mean(axis=0).round(10)) # ≈ 0
print(normalized.std(axis=0).round(10)) # ≈ 1
인덱싱, 슬라이싱, Boolean 인덱싱
import numpy as np
arr = np.arange(24).reshape(4, 6)
print(arr)
# [[ 0 1 2 3 4 5]
# [ 6 7 8 9 10 11]
# [12 13 14 15 16 17]
# [18 19 20 21 22 23]]
# 기본 인덱싱
print(arr[0, 0]) # 0
print(arr[3, 5]) # 23
print(arr[-1, -1]) # 23
# 슬라이싱
print(arr[1:3, 2:5]) # 2~3행, 3~5열
print(arr[:, 0]) # 모든 행의 0번째 열
print(arr[::2, ::2]) # 2 간격으로 샘플링
# Fancy 인덱싱
rows = np.array([0, 2])
cols = np.array([1, 4])
print(arr[rows, cols]) # [arr[0,1], arr[2,4]] = [1, 16]
# Boolean 인덱싱 (마스킹)
mask = arr > 12
print(arr[mask]) # 12보다 큰 원소들
# 조건을 이용한 필터링
data = np.array([1, -2, 3, -4, 5, -6])
positive = data[data > 0] # [1, 3, 5]
print(positive)
# np.where - 조건에 따라 선택
result = np.where(data > 0, data, 0) # 양수는 그대로, 음수는 0
print(result) # [1, 0, 3, 0, 5, 0]
# np.where로 인덱스 찾기
indices = np.where(data > 0)
print(indices) # (array([0, 2, 4]),)
형태 변환
import numpy as np
arr = np.arange(12)
# reshape
a = arr.reshape(3, 4)
b = arr.reshape(2, 2, 3)
c = arr.reshape(-1, 4) # -1은 자동 계산: (3, 4)
# flatten vs ravel
flat1 = a.flatten() # 복사본 반환
flat2 = a.ravel() # 가능하면 뷰 반환 (메모리 효율)
# transpose
mat = np.random.randn(3, 4)
transposed = mat.T # (4, 3)
transposed2 = mat.transpose() # 동일
transposed3 = np.transpose(mat, (1, 0)) # 축 순서 지정
# 3D 배열 transpose
tensor = np.random.randn(2, 3, 4)
# 배치, 채널, 공간 → 배치, 공간, 채널
reordered = tensor.transpose(0, 2, 1) # (2, 4, 3)
# squeeze와 expand_dims
x = np.array([[[1, 2, 3]]]) # shape: (1, 1, 3)
squeezed = np.squeeze(x) # (3,)
expanded = np.expand_dims(squeezed, axis=0) # (1, 3)
# 배열 이어붙이기
a = np.array([[1, 2], [3, 4]])
b = np.array([[5, 6], [7, 8]])
hstack = np.hstack([a, b]) # 수평으로 쌓기 (2, 4)
vstack = np.vstack([a, b]) # 수직으로 쌓기 (4, 2)
concat0 = np.concatenate([a, b], axis=0) # vstack과 동일
concat1 = np.concatenate([a, b], axis=1) # hstack과 동일
수학 함수
import numpy as np
x = np.array([0, np.pi/6, np.pi/4, np.pi/3, np.pi/2])
# 삼각함수
sin_x = np.sin(x)
cos_x = np.cos(x)
tan_x = np.tan(x)
# 지수/로그
exp_x = np.exp(x) # e^x
log_x = np.log(x + 1) # 자연로그 (ln)
log2_x = np.log2(x + 1) # 밑이 2인 로그
log10_x = np.log10(x + 1) # 상용로그
# 제곱/제곱근
sqrt_x = np.sqrt(x)
square_x = np.square(x) # x^2
power_x = np.power(x, 3) # x^3
# 절댓값, 반올림
abs_x = np.abs(x)
ceil_x = np.ceil(x) # 올림
floor_x = np.floor(x) # 내림
round_x = np.round(x, 2) # 반올림
# 시그모이드 (직접 구현)
def sigmoid(x):
return 1 / (1 + np.exp(-x))
def softmax(x):
e_x = np.exp(x - x.max()) # 수치 안정성을 위해 max 빼기
return e_x / e_x.sum()
z = np.array([1.0, 2.0, 3.0])
print(sigmoid(z)) # [0.731, 0.880, 0.952]
print(softmax(z)) # [0.090, 0.245, 0.665]
선형대수
import numpy as np
A = np.array([[1, 2], [3, 4]])
B = np.array([[5, 6], [7, 8]])
# 행렬 곱셈
C = np.dot(A, B) # 고전적 방식
C = A @ B # Python 3.5+ 권장 방식
C = np.matmul(A, B) # np.dot와 동일 (2D)
# 배치 행렬 곱 (3D 이상)
batch_A = np.random.randn(32, 3, 4)
batch_B = np.random.randn(32, 4, 5)
batch_C = batch_A @ batch_B # (32, 3, 5)
# 선형대수 함수
det = np.linalg.det(A) # 행렬식
inv = np.linalg.inv(A) # 역행렬
rank = np.linalg.matrix_rank(A) # 계수
trace = np.trace(A) # 대각합
# 고유값 분해
eigenvalues, eigenvectors = np.linalg.eig(A)
# 특이값 분해 (SVD)
U, S, Vt = np.linalg.svd(A)
# 연립방정식 풀기 Ax = b
b = np.array([5, 6])
x = np.linalg.solve(A, b)
# 놈 (Norm)
v = np.array([3, 4])
l1_norm = np.linalg.norm(v, ord=1) # L1 놈: 7
l2_norm = np.linalg.norm(v, ord=2) # L2 놈: 5
inf_norm = np.linalg.norm(v, ord=np.inf) # 최대값 놈: 4
통계 함수
import numpy as np
data = np.random.randn(100, 5)
# 기본 통계
print(data.mean()) # 전체 평균
print(data.mean(axis=0)) # 열별 평균 (shape: 5,)
print(data.mean(axis=1)) # 행별 평균 (shape: 100,)
print(data.std()) # 표준편차
print(data.var()) # 분산
print(data.sum()) # 합
print(data.min()) # 최솟값
print(data.max()) # 최댓값
# 누적 연산
cumsum = data.cumsum(axis=0) # 누적 합
cumprod = data.cumprod(axis=0) # 누적 곱
# 정렬
sorted_arr = np.sort(data, axis=0)
sort_indices = np.argsort(data, axis=0) # 정렬 인덱스
# 분위수
q25 = np.percentile(data, 25)
q50 = np.percentile(data, 50) # 중앙값
q75 = np.percentile(data, 75)
median = np.median(data)
# 상관계수
corr = np.corrcoef(data.T) # 5x5 상관계수 행렬
# 히스토그램
counts, bin_edges = np.histogram(data[:, 0], bins=20)
벡터화 연산 vs for loop 성능 비교
import numpy as np
import time
n = 1_000_000
a = np.random.randn(n)
b = np.random.randn(n)
# for loop 방식
start = time.time()
result_loop = []
for i in range(n):
result_loop.append(a[i] * b[i])
loop_time = time.time() - start
print(f"For loop: {loop_time:.4f}초")
# 벡터화 방식
start = time.time()
result_vec = a * b
vec_time = time.time() - start
print(f"Vectorized: {vec_time:.4f}초")
print(f"속도 향상: {loop_time / vec_time:.1f}배")
# 일반적으로 100~1000배 빠름
실전: 신경망 순전파 NumPy로 구현
import numpy as np
class SimpleNeuralNetwork:
"""NumPy만으로 구현한 2층 신경망"""
def __init__(self, input_size, hidden_size, output_size, seed=42):
np.random.seed(seed)
# He 초기화
self.W1 = np.random.randn(input_size, hidden_size) * np.sqrt(2.0 / input_size)
self.b1 = np.zeros((1, hidden_size))
self.W2 = np.random.randn(hidden_size, output_size) * np.sqrt(2.0 / hidden_size)
self.b2 = np.zeros((1, output_size))
def relu(self, z):
return np.maximum(0, z)
def relu_derivative(self, z):
return (z > 0).astype(float)
def softmax(self, z):
exp_z = np.exp(z - z.max(axis=1, keepdims=True))
return exp_z / exp_z.sum(axis=1, keepdims=True)
def forward(self, X):
# 층 1
self.Z1 = X @ self.W1 + self.b1
self.A1 = self.relu(self.Z1)
# 층 2
self.Z2 = self.A1 @ self.W2 + self.b2
self.A2 = self.softmax(self.Z2)
return self.A2
def cross_entropy_loss(self, y_pred, y_true):
m = y_true.shape[0]
log_probs = -np.log(y_pred[range(m), y_true] + 1e-8)
return log_probs.mean()
def backward(self, X, y_true, learning_rate=0.01):
m = X.shape[0]
# 출력층 기울기
dZ2 = self.A2.copy()
dZ2[range(m), y_true] -= 1
dZ2 /= m
dW2 = self.A1.T @ dZ2
db2 = dZ2.sum(axis=0, keepdims=True)
# 은닉층 기울기
dA1 = dZ2 @ self.W2.T
dZ1 = dA1 * self.relu_derivative(self.Z1)
dW1 = X.T @ dZ1
db1 = dZ1.sum(axis=0, keepdims=True)
# 가중치 업데이트
self.W1 -= learning_rate * dW1
self.b1 -= learning_rate * db1
self.W2 -= learning_rate * dW2
self.b2 -= learning_rate * db2
def train(self, X, y, epochs=100, learning_rate=0.01):
losses = []
for epoch in range(epochs):
y_pred = self.forward(X)
loss = self.cross_entropy_loss(y_pred, y)
losses.append(loss)
self.backward(X, y, learning_rate)
if epoch % 10 == 0:
acc = (y_pred.argmax(axis=1) == y).mean()
print(f"Epoch {epoch:3d}: Loss={loss:.4f}, Acc={acc:.4f}")
return losses
# 테스트
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=1000, n_features=20,
n_classes=3, n_informative=15,
random_state=42)
nn = SimpleNeuralNetwork(input_size=20, hidden_size=64, output_size=3)
losses = nn.train(X, y, epochs=50, learning_rate=0.1)
3. Pandas 완전 마스터
Pandas는 표 형식 데이터를 다루는 핵심 라이브러리입니다. DataFrame과 Series 자료구조를 제공하며 데이터 정제, 변환, 분석의 전 과정을 지원합니다.
Series와 DataFrame
import pandas as pd
import numpy as np
# Series 생성
s1 = pd.Series([1, 2, 3, 4, 5])
s2 = pd.Series([10, 20, 30], index=['a', 'b', 'c'])
s3 = pd.Series({'x': 100, 'y': 200, 'z': 300})
print(s2['a']) # 10
print(s2[['a', 'c']]) # a=10, c=30
# DataFrame 생성
data = {
'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
'age': [25, 30, 35, 28, 22],
'score': [88.5, 92.3, 78.1, 95.7, 83.2],
'passed': [True, True, False, True, True]
}
df = pd.DataFrame(data)
print(df.head())
print(df.tail(3))
print(df.info())
print(df.describe())
print(df.dtypes)
print(df.shape) # (5, 4)
데이터 읽기/쓰기
import pandas as pd
# CSV
df_csv = pd.read_csv('data.csv',
sep=',',
header=0,
index_col=0,
parse_dates=['date'],
encoding='utf-8',
na_values=['N/A', 'null', ''])
df_csv.to_csv('output.csv', index=False, encoding='utf-8-sig')
# Excel
df_excel = pd.read_excel('data.xlsx',
sheet_name='Sheet1',
header=0)
df_excel.to_excel('output.xlsx', sheet_name='Result', index=False)
# JSON
df_json = pd.read_json('data.json', orient='records')
df_json.to_json('output.json', orient='records', force_ascii=False, indent=2)
# Parquet (고성능 컬럼형 형식)
df.to_parquet('data.parquet', engine='pyarrow', compression='snappy')
df_parquet = pd.read_parquet('data.parquet')
# SQL (SQLite 예시)
import sqlite3
conn = sqlite3.connect('database.db')
df_sql = pd.read_sql_query("SELECT * FROM users WHERE age > 25", conn)
df.to_sql('new_table', conn, if_exists='replace', index=False)
인덱싱 (loc, iloc)
import pandas as pd
import numpy as np
df = pd.DataFrame({
'A': range(10),
'B': range(10, 20),
'C': range(20, 30)
}, index=[f'row{i}' for i in range(10)])
# loc: 레이블 기반 인덱싱
print(df.loc['row0', 'A']) # 단일 값
print(df.loc['row0':'row3', 'A':'B']) # 범위 (끝 포함)
print(df.loc[['row1', 'row5'], 'C']) # 리스트
# iloc: 위치 기반 인덱싱
print(df.iloc[0, 0]) # 0행 0열
print(df.iloc[0:4, 0:2]) # 범위 (끝 미포함)
print(df.iloc[[1, 5], 2]) # 리스트
# 조건 기반 선택
mask = df['A'] > 5
filtered = df[mask]
filtered2 = df[df['B'].between(12, 17)]
filtered3 = df.query('A > 5 and B < 18')
# 복합 조건
condition = (df['A'] > 3) & (df['B'] < 17) | (df['C'] >= 28)
result = df[condition]
# at/iat (단일 값 접근 - 빠름)
val = df.at['row3', 'A']
val2 = df.iat[3, 0]
결측치 처리
import pandas as pd
import numpy as np
# 결측치 있는 데이터 생성
df = pd.DataFrame({
'age': [25, np.nan, 35, np.nan, 22],
'income': [50000, 60000, np.nan, 80000, np.nan],
'city': ['Seoul', 'Busan', None, 'Incheon', 'Seoul'],
'score': [88.5, 92.3, 78.1, np.nan, 83.2]
})
# 결측치 확인
print(df.isnull())
print(df.isnull().sum()) # 열별 결측치 수
print(df.isnull().sum() / len(df) * 100) # 결측 비율(%)
# 결측치 삭제
df_dropped_rows = df.dropna() # 결측치 있는 행 삭제
df_dropped_cols = df.dropna(axis=1) # 결측치 있는 열 삭제
df_thresh = df.dropna(thresh=3) # 최소 3개 비결측값 필요
# 결측치 채우기
df_filled_0 = df.fillna(0) # 0으로 채우기
df_filled_mean = df.fillna(df.mean()) # 평균으로 채우기
df_filled_dict = df.fillna({
'age': df['age'].mean(),
'income': df['income'].median(),
'city': 'Unknown',
'score': df['score'].mean()
})
# 앞/뒤 값으로 채우기
df_ffill = df.fillna(method='ffill') # 앞 값으로 채우기
df_bfill = df.fillna(method='bfill') # 뒤 값으로 채우기
# 보간
df_interpolated = df.interpolate(method='linear')
# 결측치 확인 후 처리 패턴
for col in df.columns:
missing_pct = df[col].isnull().mean()
if missing_pct > 0.5:
df.drop(columns=[col], inplace=True)
elif df[col].dtype == 'object':
df[col].fillna(df[col].mode()[0], inplace=True)
else:
df[col].fillna(df[col].median(), inplace=True)
데이터 변환
import pandas as pd
import numpy as np
df = pd.DataFrame({
'text': ['hello world', 'PYTHON IS GREAT', 'data science'],
'value': [1, 2, 3],
'category': ['A', 'B', 'A']
})
# apply: 함수 적용
df['text_upper'] = df['text'].apply(str.upper)
df['text_length'] = df['text'].apply(len)
# 복잡한 함수
def process_text(text):
return ' '.join(word.capitalize() for word in text.lower().split())
df['text_processed'] = df['text'].apply(process_text)
# 여러 열에 동시 적용
def feature_engineer(row):
return pd.Series({
'value_squared': row['value'] ** 2,
'category_is_A': int(row['category'] == 'A')
})
new_features = df.apply(feature_engineer, axis=1)
df = pd.concat([df, new_features], axis=1)
# map: 매핑 테이블 적용
category_map = {'A': 'Alpha', 'B': 'Beta', 'C': 'Gamma'}
df['category_name'] = df['category'].map(category_map)
# transform: 그룹 내 변환 (그룹 크기 유지)
df['numeric'] = [10, 20, 30, 40, 50, 60]
df['category'] = ['A', 'B', 'A', 'B', 'A', 'B']
df['group_mean'] = df.groupby('category')['numeric'].transform('mean')
# 문자열 연산 (벡터화)
texts = pd.Series(['Hello World', 'Python 3.12', 'Machine Learning'])
print(texts.str.lower())
print(texts.str.split())
print(texts.str.contains('Python'))
print(texts.str.extract(r'(\w+)\s+(\w+)'))
그룹화 (groupby)
import pandas as pd
import numpy as np
np.random.seed(42)
df = pd.DataFrame({
'team': np.random.choice(['A', 'B', 'C'], 100),
'role': np.random.choice(['dev', 'ds', 'pm'], 100),
'score': np.random.randint(60, 100, 100),
'salary': np.random.randint(3000, 8000, 100)
})
# 기본 groupby
grouped = df.groupby('team')
print(grouped['score'].mean())
print(grouped['salary'].describe())
# 다중 키
multi_grouped = df.groupby(['team', 'role'])
print(multi_grouped['score'].mean().unstack())
# 집계 함수
agg_result = df.groupby('team').agg(
avg_score=('score', 'mean'),
total_salary=('salary', 'sum'),
count=('score', 'count'),
max_score=('score', 'max'),
min_salary=('salary', 'min')
)
print(agg_result)
# 사용자 정의 집계
def iqr(x):
return x.quantile(0.75) - x.quantile(0.25)
custom_agg = df.groupby('team')['score'].agg([
'mean', 'median', 'std', iqr
])
# filter: 조건을 만족하는 그룹만 선택
large_teams = df.groupby('team').filter(lambda x: len(x) > 30)
병합 (merge, join, concat)
import pandas as pd
users = pd.DataFrame({
'user_id': [1, 2, 3, 4, 5],
'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
'age': [25, 30, 35, 28, 22]
})
orders = pd.DataFrame({
'order_id': [101, 102, 103, 104, 105, 106],
'user_id': [1, 2, 1, 3, 5, 6],
'amount': [150, 250, 80, 320, 190, 440]
})
# Inner Join (교집합)
inner = pd.merge(users, orders, on='user_id', how='inner')
# Left Join
left = pd.merge(users, orders, on='user_id', how='left')
# Right Join
right = pd.merge(users, orders, on='user_id', how='right')
# Outer Join (합집합)
outer = pd.merge(users, orders, on='user_id', how='outer')
# 다른 키로 병합
merged = pd.merge(users, orders,
left_on='user_id', right_on='user_id',
suffixes=('_user', '_order'))
# concat
df1 = pd.DataFrame({'A': [1, 2], 'B': [3, 4]})
df2 = pd.DataFrame({'A': [5, 6], 'B': [7, 8]})
df3 = pd.DataFrame({'C': [9, 10], 'D': [11, 12]})
vertical = pd.concat([df1, df2], axis=0, ignore_index=True)
horizontal = pd.concat([df1, df3], axis=1)
실전: AI 훈련 데이터 전처리 파이프라인
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
def preprocess_ml_data(filepath):
"""AI 훈련을 위한 데이터 전처리 파이프라인"""
# 1. 데이터 로드
df = pd.read_csv(filepath)
print(f"원본 데이터: {df.shape}")
# 2. 중복 제거
df = df.drop_duplicates()
print(f"중복 제거 후: {df.shape}")
# 3. 결측치 처리
numeric_cols = df.select_dtypes(include=[np.number]).columns
cat_cols = df.select_dtypes(include=['object']).columns
for col in numeric_cols:
df[col].fillna(df[col].median(), inplace=True)
for col in cat_cols:
df[col].fillna(df[col].mode()[0], inplace=True)
# 4. 이상치 처리 (IQR 방법)
for col in numeric_cols:
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - 1.5 * IQR
upper = Q3 + 1.5 * IQR
df[col] = df[col].clip(lower=lower, upper=upper)
# 5. 범주형 변수 인코딩
le = LabelEncoder()
for col in cat_cols:
if df[col].nunique() <= 10:
df[col] = le.fit_transform(df[col].astype(str))
else:
# 고카디널리티: 빈도 인코딩
freq_map = df[col].value_counts().to_dict()
df[col] = df[col].map(freq_map)
# 6. 특성 엔지니어링
if 'date' in df.columns:
df['date'] = pd.to_datetime(df['date'])
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
df['dayofweek'] = df['date'].dt.dayofweek
df.drop('date', axis=1, inplace=True)
return df
# 타이타닉 데이터 전처리 예시
def preprocess_titanic(df):
df = df.copy()
# 특성 엔지니어링
df['Title'] = df['Name'].str.extract(r' ([A-Za-z]+)\.')
title_map = {'Mr': 0, 'Miss': 1, 'Mrs': 2, 'Master': 3}
df['Title'] = df['Title'].map(title_map).fillna(4)
df['FamilySize'] = df['SibSp'] + df['Parch'] + 1
df['IsAlone'] = (df['FamilySize'] == 1).astype(int)
# 결측치 처리
df['Age'].fillna(df.groupby('Title')['Age'].transform('median'), inplace=True)
df['Fare'].fillna(df['Fare'].median(), inplace=True)
df['Embarked'].fillna(df['Embarked'].mode()[0], inplace=True)
# 인코딩
df['Sex'] = (df['Sex'] == 'male').astype(int)
df['Embarked'] = df['Embarked'].map({'S': 0, 'C': 1, 'Q': 2})
features = ['Pclass', 'Sex', 'Age', 'Fare', 'Embarked',
'FamilySize', 'IsAlone', 'Title']
return df[features]
4. Matplotlib & Seaborn 시각화
기본 플롯
import matplotlib.pyplot as plt
import numpy as np
# 한글 폰트 설정 (Mac)
import matplotlib.font_manager as fm
plt.rcParams['font.family'] = 'AppleGothic'
plt.rcParams['axes.unicode_minus'] = False
# 기본 설정
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
# 라인 플롯
x = np.linspace(0, 2 * np.pi, 100)
axes[0, 0].plot(x, np.sin(x), 'b-', linewidth=2, label='sin(x)')
axes[0, 0].plot(x, np.cos(x), 'r--', linewidth=2, label='cos(x)')
axes[0, 0].set_title('삼각함수')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 막대 플롯
categories = ['분류', '회귀', '군집화', '차원축소']
values = [85, 72, 68, 91]
bars = axes[0, 1].bar(categories, values, color=['#3498db', '#e74c3c', '#2ecc71', '#f39c12'])
axes[0, 1].set_title('알고리즘별 정확도')
axes[0, 1].set_ylabel('정확도 (%)')
for bar, val in zip(bars, values):
axes[0, 1].text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.5,
f'{val}%', ha='center', va='bottom')
# 산점도
np.random.seed(42)
x_scatter = np.random.randn(100)
y_scatter = 2 * x_scatter + np.random.randn(100) * 0.5
axes[0, 2].scatter(x_scatter, y_scatter, alpha=0.6, c=y_scatter, cmap='viridis')
axes[0, 2].set_title('산점도')
# 히스토그램
data = np.concatenate([
np.random.normal(0, 1, 500),
np.random.normal(4, 1.5, 300)
])
axes[1, 0].hist(data, bins=50, density=True, alpha=0.7, color='steelblue')
axes[1, 0].set_title('데이터 분포')
# 박스 플롯
box_data = [np.random.normal(i, 1, 100) for i in range(5)]
axes[1, 1].boxplot(box_data, labels=[f'모델{i+1}' for i in range(5)])
axes[1, 1].set_title('모델별 성능 분포')
# 파이 차트
sizes = [35, 25, 20, 12, 8]
labels = ['Python', 'R', 'Scala', 'Java', '기타']
explode = (0.05, 0, 0, 0, 0)
axes[1, 2].pie(sizes, labels=labels, explode=explode, autopct='%1.1f%%',
shadow=True, startangle=90)
axes[1, 2].set_title('언어 사용 비율')
plt.tight_layout()
plt.savefig('basic_plots.png', dpi=150, bbox_inches='tight')
plt.show()
Seaborn 통계 시각화
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
# 스타일 설정
sns.set_theme(style='whitegrid', palette='husl', font_scale=1.2)
# 샘플 데이터
df = pd.DataFrame({
'model': np.repeat(['ResNet', 'VGG', 'EfficientNet', 'ViT'], 50),
'accuracy': np.concatenate([
np.random.normal(92, 2, 50),
np.random.normal(88, 3, 50),
np.random.normal(94, 1.5, 50),
np.random.normal(95, 2.5, 50)
]),
'params_M': np.concatenate([
np.random.normal(25, 2, 50),
np.random.normal(138, 5, 50),
np.random.normal(5.3, 0.3, 50),
np.random.normal(86, 3, 50)
]),
'training_time': np.concatenate([
np.random.exponential(10, 50),
np.random.exponential(20, 50),
np.random.exponential(8, 50),
np.random.exponential(15, 50)
])
})
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
# 바이올린 플롯
sns.violinplot(data=df, x='model', y='accuracy', ax=axes[0, 0])
axes[0, 0].set_title('모델별 정확도 분포')
# 박스 플롯 with swarm
sns.boxplot(data=df, x='model', y='accuracy', ax=axes[0, 1])
sns.swarmplot(data=df, x='model', y='accuracy', color='black',
size=2, ax=axes[0, 1])
axes[0, 1].set_title('정확도 상세 분포')
# 히트맵 (상관계수)
corr_data = df[['accuracy', 'params_M', 'training_time']].corr()
sns.heatmap(corr_data, annot=True, fmt='.2f', cmap='RdYlGn',
center=0, ax=axes[0, 2])
axes[0, 2].set_title('변수 간 상관관계')
# 산점도 + 회귀선
sns.regplot(data=df, x='params_M', y='accuracy',
scatter_kws={'alpha': 0.4}, ax=axes[1, 0])
axes[1, 0].set_title('파라미터 수 vs 정확도')
# 분포 플롯 (KDE + 히스토그램)
for model in df['model'].unique():
subset = df[df['model'] == model]
sns.kdeplot(data=subset, x='accuracy', label=model, ax=axes[1, 1])
axes[1, 1].set_title('모델별 정확도 분포 (KDE)')
axes[1, 1].legend()
# Facet Grid (고급)
# axes[1, 2]는 별도로 처리
axes[1, 2].remove()
plt.tight_layout()
plt.savefig('seaborn_plots.png', dpi=150, bbox_inches='tight')
plt.show()
실전: 학습 곡선, 혼동 행렬 시각화
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from sklearn.metrics import confusion_matrix
def plot_learning_curve(train_losses, val_losses, train_accs, val_accs):
"""학습 곡선 시각화"""
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
epochs = range(1, len(train_losses) + 1)
ax1.plot(epochs, train_losses, 'b-', label='훈련 손실', linewidth=2)
ax1.plot(epochs, val_losses, 'r--', label='검증 손실', linewidth=2)
ax1.fill_between(epochs, train_losses, val_losses, alpha=0.1, color='gray')
ax1.set_xlabel('에폭')
ax1.set_ylabel('손실')
ax1.set_title('손실 곡선')
ax1.legend()
ax1.grid(True, alpha=0.3)
ax2.plot(epochs, train_accs, 'b-', label='훈련 정확도', linewidth=2)
ax2.plot(epochs, val_accs, 'r--', label='검증 정확도', linewidth=2)
best_epoch = np.argmax(val_accs)
ax2.axvline(x=best_epoch + 1, color='g', linestyle=':', label=f'최적 에폭 ({best_epoch+1})')
ax2.set_xlabel('에폭')
ax2.set_ylabel('정확도')
ax2.set_title('정확도 곡선')
ax2.legend()
ax2.grid(True, alpha=0.3)
plt.tight_layout()
return fig
def plot_confusion_matrix(y_true, y_pred, class_names):
"""혼동 행렬 시각화"""
cm = confusion_matrix(y_true, y_pred)
cm_pct = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
# 절댓값
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=class_names, yticklabels=class_names, ax=ax1)
ax1.set_title('혼동 행렬 (절댓값)')
ax1.set_ylabel('실제 레이블')
ax1.set_xlabel('예측 레이블')
# 비율
sns.heatmap(cm_pct, annot=True, fmt='.2%', cmap='Greens',
xticklabels=class_names, yticklabels=class_names, ax=ax2)
ax2.set_title('혼동 행렬 (비율)')
ax2.set_ylabel('실제 레이블')
ax2.set_xlabel('예측 레이블')
plt.tight_layout()
return fig
5. Scikit-learn으로 머신러닝
데이터 전처리
from sklearn.preprocessing import (
StandardScaler, MinMaxScaler, RobustScaler,
LabelEncoder, OneHotEncoder, OrdinalEncoder
)
import numpy as np
X = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]], dtype=float)
# StandardScaler: 평균 0, 표준편차 1
scaler = StandardScaler()
X_standard = scaler.fit_transform(X)
# MinMaxScaler: [0, 1] 범위
min_max = MinMaxScaler(feature_range=(0, 1))
X_minmax = min_max.fit_transform(X)
# RobustScaler: 이상치에 강건 (중앙값, IQR 사용)
robust = RobustScaler()
X_robust = robust.fit_transform(X)
# LabelEncoder: 범주형 → 숫자
le = LabelEncoder()
labels = ['cat', 'dog', 'bird', 'cat', 'dog']
encoded = le.fit_transform(labels) # [0, 2, 1, 0, 2]
decoded = le.inverse_transform(encoded)
# OneHotEncoder
ohe = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
categories = np.array([['red'], ['green'], ['blue'], ['red']])
encoded_ohe = ohe.fit_transform(categories)
특성 선택 및 추출
from sklearn.decomposition import PCA
from sklearn.feature_selection import (
SelectKBest, f_classif, mutual_info_classif,
RFE, SelectFromModel
)
from sklearn.ensemble import RandomForestClassifier
import numpy as np
X = np.random.randn(200, 20)
y = (X[:, 0] + X[:, 1] + np.random.randn(200) * 0.1 > 0).astype(int)
# PCA (주성분 분석)
pca = PCA(n_components=10)
X_pca = pca.fit_transform(X)
print(f"설명 분산 비율: {pca.explained_variance_ratio_.sum():.2%}")
# 설명 분산 누적 플롯
import matplotlib.pyplot as plt
cumsum = np.cumsum(pca.explained_variance_ratio_)
plt.figure(figsize=(8, 4))
plt.plot(range(1, len(cumsum)+1), cumsum * 100)
plt.xlabel('주성분 수')
plt.ylabel('누적 설명 분산 (%)')
plt.axhline(y=95, color='r', linestyle='--', label='95%')
plt.legend()
plt.grid(True)
# SelectKBest
selector = SelectKBest(f_classif, k=5)
X_kbest = selector.fit_transform(X, y)
selected_features = selector.get_support(indices=True)
print(f"선택된 특성 인덱스: {selected_features}")
# RFE (재귀적 특성 제거)
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rfe = RFE(estimator=rf, n_features_to_select=5)
X_rfe = rfe.fit_transform(X, y)
# 특성 중요도 기반 선택
rf.fit(X, y)
sfm = SelectFromModel(rf, threshold='mean')
X_sfm = sfm.fit_transform(X, y)
선형 모델
from sklearn.linear_model import (
LinearRegression, LogisticRegression,
Ridge, Lasso, ElasticNet
)
from sklearn.datasets import make_classification, make_regression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score, accuracy_score
import numpy as np
# 회귀
X_reg, y_reg = make_regression(n_samples=500, n_features=20,
noise=0.1, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X_reg, y_reg, test_size=0.2)
# Linear Regression
lr = LinearRegression()
lr.fit(X_train, y_train)
y_pred = lr.predict(X_test)
print(f"Linear R2: {r2_score(y_test, y_pred):.4f}")
# Ridge (L2 정규화)
ridge = Ridge(alpha=1.0)
ridge.fit(X_train, y_train)
print(f"Ridge R2: {r2_score(y_test, ridge.predict(X_test)):.4f}")
# Lasso (L1 정규화, 특성 선택 효과)
lasso = Lasso(alpha=0.01)
lasso.fit(X_train, y_train)
print(f"Lasso R2: {r2_score(y_test, lasso.predict(X_test)):.4f}")
print(f"Non-zero coefficients: {np.sum(lasso.coef_ != 0)}")
# 분류
X_cls, y_cls = make_classification(n_samples=500, n_features=20,
n_classes=3, n_informative=10,
random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X_cls, y_cls, test_size=0.2)
logistic = LogisticRegression(C=1.0, max_iter=1000, solver='lbfgs',
multi_class='multinomial')
logistic.fit(X_train, y_train)
print(f"Logistic Accuracy: {accuracy_score(y_test, logistic.predict(X_test)):.4f}")
트리 기반 모델
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import (
RandomForestClassifier, GradientBoostingClassifier,
AdaBoostClassifier, ExtraTreesClassifier
)
import xgboost as xgb
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import matplotlib.pyplot as plt
import numpy as np
X, y = make_classification(n_samples=1000, n_features=20,
n_classes=2, n_informative=10,
random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
models = {
'Decision Tree': DecisionTreeClassifier(max_depth=5, random_state=42),
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1),
'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42),
'XGBoost': xgb.XGBClassifier(n_estimators=100, random_state=42, eval_metric='logloss'),
}
results = {}
for name, model in models.items():
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
acc = (y_pred == y_test).mean()
results[name] = acc
print(f"{name}: {acc:.4f}")
# 특성 중요도 시각화
rf_model = models['Random Forest']
importances = rf_model.feature_importances_
indices = np.argsort(importances)[::-1][:10]
plt.figure(figsize=(10, 6))
plt.bar(range(10), importances[indices])
plt.xticks(range(10), [f'F{i}' for i in indices])
plt.title('상위 10개 특성 중요도')
plt.xlabel('특성')
plt.ylabel('중요도')
plt.tight_layout()
plt.show()
모델 평가와 교차 검증
from sklearn.model_selection import (
cross_val_score, StratifiedKFold,
GridSearchCV, RandomizedSearchCV
)
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, roc_auc_score
from sklearn.datasets import make_classification
import numpy as np
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
# K-Fold 교차 검증
rf = RandomForestClassifier(n_estimators=100, random_state=42)
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(rf, X, y, cv=cv, scoring='accuracy', n_jobs=-1)
print(f"CV Accuracy: {scores.mean():.4f} (+/- {scores.std() * 2:.4f})")
# GridSearchCV
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [None, 5, 10],
'min_samples_split': [2, 5, 10]
}
grid_search = GridSearchCV(
RandomForestClassifier(random_state=42),
param_grid,
cv=5,
scoring='accuracy',
n_jobs=-1,
verbose=1
)
grid_search.fit(X, y)
print(f"최적 파라미터: {grid_search.best_params_}")
print(f"최고 CV 점수: {grid_search.best_score_:.4f}")
# 최종 모델로 테스트셋 평가
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
best_model = grid_search.best_estimator_
best_model.fit(X_train, y_train)
y_pred = best_model.predict(X_test)
print(classification_report(y_test, y_pred))
파이프라인
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
import pandas as pd
import numpy as np
# 예시 데이터
np.random.seed(42)
n = 500
df = pd.DataFrame({
'age': np.random.randint(18, 70, n).astype(float),
'income': np.random.randint(20000, 100000, n).astype(float),
'education': np.random.choice(['High School', 'Bachelor', 'Master', 'PhD'], n),
'city': np.random.choice(['Seoul', 'Busan', 'Incheon', 'Daegu'], n),
'target': np.random.randint(0, 2, n)
})
# 일부 결측치 추가
df.loc[np.random.choice(n, 50), 'age'] = np.nan
df.loc[np.random.choice(n, 30), 'income'] = np.nan
df.loc[np.random.choice(n, 20), 'education'] = None
X = df.drop('target', axis=1)
y = df['target']
# 수치형, 범주형 열 분리
numeric_features = ['age', 'income']
categorical_features = ['education', 'city']
# 수치형 전처리 파이프라인
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
# 범주형 전처리 파이프라인
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
# ColumnTransformer로 합치기
preprocessor = ColumnTransformer(transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
])
# 전체 파이프라인
full_pipeline = Pipeline(steps=[
('preprocessor', preprocessor),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
# 학습 및 평가
from sklearn.model_selection import cross_val_score
scores = cross_val_score(full_pipeline, X, y, cv=5, scoring='accuracy')
print(f"파이프라인 CV 정확도: {scores.mean():.4f} ± {scores.std():.4f}")
실전: 타이타닉 생존 예측
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import cross_val_score, GridSearchCV
from sklearn.metrics import classification_report, roc_auc_score
import matplotlib.pyplot as plt
# 데이터 로드 (실제로는 pd.read_csv('titanic.csv'))
# 여기서는 예시 데이터 생성
np.random.seed(42)
n = 891
df = pd.DataFrame({
'Pclass': np.random.choice([1, 2, 3], n, p=[0.24, 0.21, 0.55]),
'Sex': np.random.choice(['male', 'female'], n, p=[0.65, 0.35]),
'Age': np.random.uniform(1, 80, n),
'SibSp': np.random.randint(0, 5, n),
'Parch': np.random.randint(0, 5, n),
'Fare': np.random.exponential(50, n),
'Embarked': np.random.choice(['S', 'C', 'Q'], n, p=[0.72, 0.19, 0.09]),
'Survived': np.random.randint(0, 2, n)
})
# 결측치 추가
df.loc[np.random.choice(n, 177), 'Age'] = np.nan
# 특성 엔지니어링
df['FamilySize'] = df['SibSp'] + df['Parch'] + 1
df['IsAlone'] = (df['FamilySize'] == 1).astype(int)
df['Sex_binary'] = (df['Sex'] == 'male').astype(int)
df['Fare_log'] = np.log1p(df['Fare'])
feature_cols = ['Pclass', 'Sex_binary', 'Age', 'FamilySize', 'IsAlone',
'Fare_log', 'Embarked']
X = df[feature_cols]
y = df['Survived']
# 전처리 파이프라인
numeric_features = ['Age', 'FamilySize', 'Fare_log', 'Pclass']
categorical_features = ['Embarked']
binary_features = ['Sex_binary', 'IsAlone']
preprocessor = ColumnTransformer(transformers=[
('num', Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
]), numeric_features),
('cat', Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
]), categorical_features),
('bin', 'passthrough', binary_features)
])
# 모델 파이프라인
pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', GradientBoostingClassifier(random_state=42))
])
# 하이퍼파라미터 탐색
param_grid = {
'classifier__n_estimators': [100, 200],
'classifier__max_depth': [3, 5],
'classifier__learning_rate': [0.05, 0.1]
}
grid_search = GridSearchCV(pipeline, param_grid, cv=5, scoring='roc_auc', n_jobs=-1)
grid_search.fit(X, y)
print(f"최적 AUC: {grid_search.best_score_:.4f}")
print(f"최적 파라미터: {grid_search.best_params_}")
6. Python 성능 최적화
리스트 컴프리헨션 vs map vs for
import time
import numpy as np
n = 1_000_000
data = list(range(n))
# for loop
start = time.time()
result_for = []
for x in data:
result_for.append(x ** 2)
print(f"For loop: {time.time() - start:.4f}s")
# 리스트 컴프리헨션
start = time.time()
result_lc = [x ** 2 for x in data]
print(f"List comprehension: {time.time() - start:.4f}s")
# map
start = time.time()
result_map = list(map(lambda x: x ** 2, data))
print(f"Map: {time.time() - start:.4f}s")
# NumPy 벡터화
arr = np.array(data)
start = time.time()
result_np = arr ** 2
print(f"NumPy: {time.time() - start:.4f}s")
# 딕셔너리/집합 컴프리헨션
squares_dict = {x: x**2 for x in range(10)}
even_set = {x for x in range(20) if x % 2 == 0}
제너레이터
import sys
# 리스트 vs 제너레이터 메모리 비교
list_comp = [x**2 for x in range(1_000_000)]
gen_expr = (x**2 for x in range(1_000_000))
print(f"List size: {sys.getsizeof(list_comp):,} bytes") # ~8MB
print(f"Generator size: {sys.getsizeof(gen_expr)} bytes") # ~120 bytes
# 제너레이터 함수
def infinite_data_loader(dataset, batch_size=32):
"""무한 데이터 로더 제너레이터"""
while True:
indices = np.random.permutation(len(dataset))
for i in range(0, len(dataset), batch_size):
batch_indices = indices[i:i + batch_size]
yield dataset[batch_indices]
# 대용량 파일 처리
def read_large_csv(filepath, chunk_size=1000):
"""대용량 CSV를 청크 단위로 읽기"""
import pandas as pd
for chunk in pd.read_csv(filepath, chunksize=chunk_size):
yield chunk
# yield from
def flatten(nested):
for item in nested:
if isinstance(item, list):
yield from flatten(item)
else:
yield item
print(list(flatten([[1, [2, 3]], [4, [5, [6]]]])))
# [1, 2, 3, 4, 5, 6]
병렬 처리
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import multiprocessing
import time
def cpu_intensive_task(n):
"""CPU 집약적 작업"""
return sum(i**2 for i in range(n))
def io_bound_task(url):
"""I/O 집약적 작업 (시뮬레이션)"""
import time
time.sleep(0.1)
return f"Fetched: {url}"
# ThreadPoolExecutor: I/O 작업에 적합
urls = [f"https://example.com/data/{i}" for i in range(20)]
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(io_bound_task, urls))
print(f"ThreadPool 시간: {time.time() - start:.2f}s")
# ProcessPoolExecutor: CPU 작업에 적합
numbers = [1_000_000] * 8
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(cpu_intensive_task, numbers))
print(f"ProcessPool 시간: {time.time() - start:.2f}s")
# multiprocessing Pool
def worker(args):
data, label = args
return data * label
if __name__ == '__main__':
with multiprocessing.Pool(processes=4) as pool:
tasks = [(np.random.randn(100), i) for i in range(100)]
results = pool.map(worker, tasks)
Numba로 JIT 컴파일
from numba import jit, njit, prange
import numpy as np
import time
# JIT 컴파일 (첫 실행 시 컴파일, 이후 빠름)
@njit(parallel=True)
def fast_matrix_norm(A):
"""병렬화된 행렬 놈 계산"""
n, m = A.shape
result = 0.0
for i in prange(n):
for j in prange(m):
result += A[i, j] ** 2
return result ** 0.5
A = np.random.randn(1000, 1000)
# 워밍업 (첫 실행 = JIT 컴파일)
_ = fast_matrix_norm(A)
# 실제 벤치마크
start = time.time()
for _ in range(10):
result = fast_matrix_norm(A)
print(f"Numba: {time.time() - start:.4f}s")
start = time.time()
for _ in range(10):
result = np.linalg.norm(A)
print(f"NumPy: {time.time() - start:.4f}s")
7. AI/ML 유틸리티 라이브러리
tqdm - 진행 바
from tqdm import tqdm, trange
import time
# 기본 사용
for i in tqdm(range(100)):
time.sleep(0.01)
# 커스텀 설명
items = list(range(50))
for item in tqdm(items, desc='처리 중', unit='샘플'):
pass
# Nested tqdm
for epoch in trange(10, desc='에폭'):
for batch in trange(100, desc='배치', leave=False):
pass
# 수동 업데이트
with tqdm(total=100, desc='학습') as pbar:
for i in range(10):
pbar.update(10)
pbar.set_postfix({'loss': 0.5 - i * 0.04, 'acc': 0.7 + i * 0.02})
# Pandas 통합
import pandas as pd
tqdm.pandas()
df = pd.DataFrame({'x': range(1000)})
df['x_squared'] = df['x'].progress_apply(lambda x: x ** 2)
Weights & Biases (wandb) - 실험 추적
import wandb
import numpy as np
# 초기화
wandb.init(
project="ml-experiment",
name="run-001",
config={
"learning_rate": 0.001,
"batch_size": 32,
"epochs": 100,
"model": "ResNet50",
"optimizer": "AdamW"
}
)
# 학습 루프에서 메트릭 기록
for epoch in range(100):
train_loss = 1.0 - epoch * 0.009 + np.random.normal(0, 0.01)
val_loss = 1.0 - epoch * 0.008 + np.random.normal(0, 0.02)
train_acc = epoch * 0.009 + np.random.normal(0, 0.01)
val_acc = epoch * 0.008 + np.random.normal(0, 0.02)
wandb.log({
"epoch": epoch,
"train/loss": train_loss,
"val/loss": val_loss,
"train/acc": train_acc,
"val/acc": val_acc,
"learning_rate": 0.001 * (0.95 ** epoch)
})
# 모델 저장
# wandb.save('model.pt')
wandb.finish()
Hydra - 설정 관리
# config/config.yaml
# model:
# type: resnet50
# pretrained: true
# training:
# epochs: 100
# batch_size: 32
# learning_rate: 0.001
# data:
# path: /data/imagenet
# num_workers: 4
from hydra import initialize, compose
from omegaconf import DictConfig, OmegaConf
import hydra
@hydra.main(config_path="config", config_name="config", version_base="1.3")
def train(cfg: DictConfig) -> None:
print(OmegaConf.to_yaml(cfg))
# 설정 사용
model_type = cfg.model.type
lr = cfg.training.learning_rate
epochs = cfg.training.epochs
print(f"모델: {model_type}, LR: {lr}, 에폭: {epochs}")
# 커맨드라인에서 오버라이드:
# python train.py model.type=vgg16 training.learning_rate=0.0001
pytest - 테스트
# tests/test_preprocessing.py
import pytest
import numpy as np
import pandas as pd
def normalize(x):
"""데이터 정규화"""
return (x - x.mean()) / x.std()
def process_dataframe(df):
"""DataFrame 전처리"""
df = df.copy()
df = df.dropna()
df['value'] = normalize(df['value'])
return df
class TestNormalize:
def test_mean_zero(self):
x = np.array([1.0, 2.0, 3.0, 4.0, 5.0])
result = normalize(x)
assert abs(result.mean()) < 1e-10
def test_std_one(self):
x = np.array([1.0, 2.0, 3.0, 4.0, 5.0])
result = normalize(x)
assert abs(result.std() - 1.0) < 1e-10
def test_shape_preserved(self):
x = np.random.randn(10, 5)
result = normalize(x)
assert result.shape == x.shape
@pytest.fixture
def sample_df():
return pd.DataFrame({
'value': [1.0, 2.0, np.nan, 4.0, 5.0],
'label': ['a', 'b', 'c', 'd', 'e']
})
def test_process_dataframe_removes_nan(sample_df):
result = process_dataframe(sample_df)
assert result.isnull().sum().sum() == 0
def test_process_dataframe_normalizes(sample_df):
result = process_dataframe(sample_df)
assert abs(result['value'].mean()) < 1e-10
# 실행: pytest tests/ -v --coverage
마무리
이 가이드에서는 AI/ML 개발에 필요한 Python 생태계의 핵심을 다뤘습니다.
- 환경 설정: venv, conda, poetry로 프로젝트 격리
- NumPy: 벡터화 연산으로 고성능 수치 계산
- Pandas: 데이터 전처리와 분석 파이프라인 구축
- Matplotlib/Seaborn: 풍부한 시각화로 인사이트 발견
- Scikit-learn: 전처리부터 모델 평가까지 완전한 ML 워크플로
- 성능 최적화: 제너레이터, 병렬 처리, Numba JIT
- 유틸리티: tqdm, wandb, hydra, pytest
실전 ML 프로젝트에서는 이 모든 도구를 조합하여 데이터 로드 → 전처리 → 특성 엔지니어링 → 모델 학습 → 평가 → 배포의 파이프라인을 구축합니다. 각 도구의 공식 문서를 참고하여 더 깊이 학습하시길 권장합니다.
참고 자료
Python Complete Guide for AI/ML: Master NumPy, Pandas, Matplotlib, and Scikit-learn
Python Complete Guide for AI/ML
Python is the standard language for AI and machine learning. Its clean syntax, vast library ecosystem, and active community have made it the go-to choice for both researchers and engineers. This guide covers everything you need to master the core Python libraries for AI/ML development.
1. Setting Up Your Python AI/ML Environment
Choosing a Python Version
For AI/ML work, Python 3.10 or later is recommended. Python 3.10+ offers structural pattern matching, clearer error messages, and improved type hints. As of 2026, Python 3.12 is stable and compatible with most ML libraries.
# Check Python version
python --version
python3 --version
# Install a specific version with pyenv
pyenv install 3.12.0
pyenv global 3.12.0
Setting Up Virtual Environments
Virtual environments isolate dependencies per project.
venv (Standard Library)
# Create a virtual environment
python -m venv ml_env
# Activate (Linux/Mac)
source ml_env/bin/activate
# Activate (Windows)
ml_env\Scripts\activate
# Deactivate
deactivate
conda (Anaconda/Miniconda)
# Create environment
conda create -n ml_env python=3.12
# Activate
conda activate ml_env
# Install packages
conda install numpy pandas scikit-learn matplotlib
# List environments
conda env list
# Export environment
conda env export > environment.yml
# Restore environment
conda env create -f environment.yml
Poetry (Advanced Dependency Management)
# Install Poetry
curl -sSL https://install.python-poetry.org | python3 -
# Initialize a project
poetry new ml_project
cd ml_project
# Add packages
poetry add numpy pandas scikit-learn torch
# Add dev dependencies
poetry add --dev pytest black flake8
# Run inside the environment
poetry run python train.py
Jupyter Notebook/Lab Setup
# Install JupyterLab
pip install jupyterlab
# Register kernel
python -m ipykernel install --user --name=ml_env --display-name "ML Environment"
# Launch JupyterLab
jupyter lab
# Install useful extensions
pip install jupyterlab-git
pip install nbformat
Jupyter config file (~/.jupyter/jupyter_lab_config.py)
c.ServerApp.open_browser = True
c.ServerApp.port = 8888
c.ServerApp.ip = '0.0.0.0'
GPU Python Environment (CUDA, cuDNN)
# Check CUDA version
nvidia-smi
nvcc --version
# Install PyTorch with CUDA (CUDA 12.1)
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
# Install TensorFlow with GPU
pip install tensorflow[and-cuda]
# Verify GPU availability (PyTorch)
python -c "import torch; print(torch.cuda.is_available())"
# Check cuDNN
python -c "import torch; print(torch.backends.cudnn.version())"
Essential Package List
# requirements.txt
numpy>=1.24.0
pandas>=2.0.0
matplotlib>=3.7.0
seaborn>=0.12.0
scikit-learn>=1.3.0
scipy>=1.11.0
torch>=2.0.0
torchvision>=0.15.0
tensorflow>=2.13.0
xgboost>=1.7.0
lightgbm>=4.0.0
optuna>=3.3.0
wandb>=0.15.0
tqdm>=4.65.0
jupyterlab>=4.0.0
black>=23.0.0
flake8>=6.0.0
pytest>=7.4.0
# Install all at once
pip install -r requirements.txt
2. Mastering NumPy
NumPy (Numerical Python) is the foundation of scientific computing in Python. It provides multidimensional arrays and mathematical functions, and most ML libraries use NumPy internally.
Creating ndarrays
import numpy as np
# Basic array creation
arr1 = np.array([1, 2, 3, 4, 5])
arr2 = np.array([[1, 2, 3], [4, 5, 6]])
print(arr1.shape) # (5,)
print(arr2.shape) # (2, 3)
print(arr2.dtype) # int64
print(arr2.ndim) # 2
print(arr2.size) # 6
# Special arrays
zeros = np.zeros((3, 4)) # all zeros
ones = np.ones((2, 3, 4)) # all ones
full = np.full((3, 3), 7) # all sevens
eye = np.eye(4) # identity matrix
empty = np.empty((2, 3)) # uninitialized
# Range arrays
arange = np.arange(0, 10, 2) # [0, 2, 4, 6, 8]
linspace = np.linspace(0, 1, 5) # [0.0, 0.25, 0.5, 0.75, 1.0]
logspace = np.logspace(0, 3, 4) # [1, 10, 100, 1000]
# Random arrays
np.random.seed(42)
rand_uniform = np.random.rand(3, 4) # uniform [0, 1)
rand_normal = np.random.randn(3, 4) # standard normal
rand_int = np.random.randint(0, 10, (3, 4)) # random integers
# Modern random API (recommended)
rng = np.random.default_rng(42)
samples = rng.normal(loc=0, scale=1, size=(100, 3))
Basic Operations and Broadcasting
import numpy as np
a = np.array([[1, 2, 3], [4, 5, 6]])
b = np.array([[7, 8, 9], [10, 11, 12]])
# Element-wise arithmetic
print(a + b) # element-wise addition
print(a - b) # element-wise subtraction
print(a * b) # element-wise multiplication
print(a / b) # element-wise division
print(a ** 2) # element-wise squaring
print(a % 2) # element-wise modulo
# Broadcasting - operations on arrays of different shapes
x = np.array([[1], [2], [3]]) # shape: (3, 1)
y = np.array([10, 20, 30]) # shape: (3,) treated as (1, 3)
# Broadcasting result: (3, 3)
result = x + y
print(result)
# [[11, 21, 31],
# [12, 22, 32],
# [13, 23, 33]]
# Practical Broadcasting: batch normalization
data = np.random.randn(100, 10) # 100 samples, 10 features
mean = data.mean(axis=0) # per-feature mean (shape: 10,)
std = data.std(axis=0) # per-feature std (shape: 10,)
normalized = (data - mean) / std # broadcasting normalization
print(normalized.mean(axis=0).round(10)) # approximately 0
print(normalized.std(axis=0).round(10)) # approximately 1
Indexing, Slicing, and Boolean Indexing
import numpy as np
arr = np.arange(24).reshape(4, 6)
print(arr)
# [[ 0 1 2 3 4 5]
# [ 6 7 8 9 10 11]
# [12 13 14 15 16 17]
# [18 19 20 21 22 23]]
# Basic indexing
print(arr[0, 0]) # 0
print(arr[3, 5]) # 23
print(arr[-1, -1]) # 23
# Slicing
print(arr[1:3, 2:5]) # rows 1-2, cols 2-4
print(arr[:, 0]) # all rows, column 0
print(arr[::2, ::2]) # every 2nd row and column
# Fancy indexing
rows = np.array([0, 2])
cols = np.array([1, 4])
print(arr[rows, cols]) # [arr[0,1], arr[2,4]] = [1, 16]
# Boolean indexing (masking)
mask = arr > 12
print(arr[mask]) # elements greater than 12
# Filtering with conditions
data = np.array([1, -2, 3, -4, 5, -6])
positive = data[data > 0] # [1, 3, 5]
# np.where - conditional selection
result = np.where(data > 0, data, 0) # keep positives, zero out negatives
print(result) # [1, 0, 3, 0, 5, 0]
# np.where to find indices
indices = np.where(data > 0)
print(indices) # (array([0, 2, 4]),)
Shape Transformations
import numpy as np
arr = np.arange(12)
# reshape
a = arr.reshape(3, 4)
b = arr.reshape(2, 2, 3)
c = arr.reshape(-1, 4) # -1 infers the size: gives (3, 4)
# flatten vs ravel
flat1 = a.flatten() # always returns a copy
flat2 = a.ravel() # returns a view when possible (more memory-efficient)
# transpose
mat = np.random.randn(3, 4)
transposed = mat.T
transposed2 = mat.transpose()
transposed3 = np.transpose(mat, (1, 0))
# 3D transpose
tensor = np.random.randn(2, 3, 4)
# batch, channels, spatial -> batch, spatial, channels
reordered = tensor.transpose(0, 2, 1) # (2, 4, 3)
# squeeze and expand_dims
x = np.array([[[1, 2, 3]]]) # shape: (1, 1, 3)
squeezed = np.squeeze(x) # (3,)
expanded = np.expand_dims(squeezed, axis=0) # (1, 3)
# Stacking arrays
a = np.array([[1, 2], [3, 4]])
b = np.array([[5, 6], [7, 8]])
hstack = np.hstack([a, b]) # horizontal stack (2, 4)
vstack = np.vstack([a, b]) # vertical stack (4, 2)
Mathematical Functions
import numpy as np
x = np.array([0, np.pi/6, np.pi/4, np.pi/3, np.pi/2])
# Trigonometry
sin_x = np.sin(x)
cos_x = np.cos(x)
tan_x = np.tan(x)
# Exponential and logarithm
exp_x = np.exp(x) # e^x
log_x = np.log(x + 1) # natural log (ln)
log2_x = np.log2(x + 1) # base-2 log
log10_x = np.log10(x + 1)
# Power and root
sqrt_x = np.sqrt(x)
square_x = np.square(x) # x^2
power_x = np.power(x, 3) # x^3
# Sigmoid and softmax
def sigmoid(x):
return 1 / (1 + np.exp(-x))
def softmax(x):
e_x = np.exp(x - x.max()) # subtract max for numerical stability
return e_x / e_x.sum()
z = np.array([1.0, 2.0, 3.0])
print(sigmoid(z)) # [0.731, 0.880, 0.952]
print(softmax(z)) # [0.090, 0.245, 0.665]
Linear Algebra
import numpy as np
A = np.array([[1, 2], [3, 4]])
B = np.array([[5, 6], [7, 8]])
# Matrix multiplication
C = np.dot(A, B) # classic approach
C = A @ B # preferred in Python 3.5+
C = np.matmul(A, B) # same as np.dot for 2D
# Batched matrix multiplication (3D+)
batch_A = np.random.randn(32, 3, 4)
batch_B = np.random.randn(32, 4, 5)
batch_C = batch_A @ batch_B # (32, 3, 5)
# Linear algebra functions
det = np.linalg.det(A) # determinant
inv = np.linalg.inv(A) # inverse
rank = np.linalg.matrix_rank(A) # rank
trace = np.trace(A) # trace
# Eigendecomposition
eigenvalues, eigenvectors = np.linalg.eig(A)
# Singular Value Decomposition (SVD)
U, S, Vt = np.linalg.svd(A)
# Solve linear system Ax = b
b = np.array([5, 6])
x = np.linalg.solve(A, b)
# Norms
v = np.array([3, 4])
l1_norm = np.linalg.norm(v, ord=1) # L1 norm: 7
l2_norm = np.linalg.norm(v, ord=2) # L2 norm: 5
inf_norm = np.linalg.norm(v, ord=np.inf) # max norm: 4
Vectorized Operations vs For Loops
import numpy as np
import time
n = 1_000_000
a = np.random.randn(n)
b = np.random.randn(n)
# For loop
start = time.time()
result_loop = []
for i in range(n):
result_loop.append(a[i] * b[i])
loop_time = time.time() - start
print(f"For loop: {loop_time:.4f}s")
# Vectorized
start = time.time()
result_vec = a * b
vec_time = time.time() - start
print(f"Vectorized: {vec_time:.4f}s")
print(f"Speedup: {loop_time / vec_time:.1f}x")
# Typically 100-1000x faster
Practical: Neural Network Forward Pass with NumPy
import numpy as np
class SimpleNeuralNetwork:
"""Two-layer neural network implemented with NumPy only"""
def __init__(self, input_size, hidden_size, output_size, seed=42):
np.random.seed(seed)
# He initialization
self.W1 = np.random.randn(input_size, hidden_size) * np.sqrt(2.0 / input_size)
self.b1 = np.zeros((1, hidden_size))
self.W2 = np.random.randn(hidden_size, output_size) * np.sqrt(2.0 / hidden_size)
self.b2 = np.zeros((1, output_size))
def relu(self, z):
return np.maximum(0, z)
def relu_derivative(self, z):
return (z > 0).astype(float)
def softmax(self, z):
exp_z = np.exp(z - z.max(axis=1, keepdims=True))
return exp_z / exp_z.sum(axis=1, keepdims=True)
def forward(self, X):
# Layer 1
self.Z1 = X @ self.W1 + self.b1
self.A1 = self.relu(self.Z1)
# Layer 2
self.Z2 = self.A1 @ self.W2 + self.b2
self.A2 = self.softmax(self.Z2)
return self.A2
def cross_entropy_loss(self, y_pred, y_true):
m = y_true.shape[0]
log_probs = -np.log(y_pred[range(m), y_true] + 1e-8)
return log_probs.mean()
def backward(self, X, y_true, learning_rate=0.01):
m = X.shape[0]
# Output layer gradient
dZ2 = self.A2.copy()
dZ2[range(m), y_true] -= 1
dZ2 /= m
dW2 = self.A1.T @ dZ2
db2 = dZ2.sum(axis=0, keepdims=True)
# Hidden layer gradient
dA1 = dZ2 @ self.W2.T
dZ1 = dA1 * self.relu_derivative(self.Z1)
dW1 = X.T @ dZ1
db1 = dZ1.sum(axis=0, keepdims=True)
# Weight update
self.W1 -= learning_rate * dW1
self.b1 -= learning_rate * db1
self.W2 -= learning_rate * dW2
self.b2 -= learning_rate * db2
def train(self, X, y, epochs=100, learning_rate=0.01):
losses = []
for epoch in range(epochs):
y_pred = self.forward(X)
loss = self.cross_entropy_loss(y_pred, y)
losses.append(loss)
self.backward(X, y, learning_rate)
if epoch % 10 == 0:
acc = (y_pred.argmax(axis=1) == y).mean()
print(f"Epoch {epoch:3d}: Loss={loss:.4f}, Acc={acc:.4f}")
return losses
# Test
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=1000, n_features=20,
n_classes=3, n_informative=15,
random_state=42)
nn = SimpleNeuralNetwork(input_size=20, hidden_size=64, output_size=3)
losses = nn.train(X, y, epochs=50, learning_rate=0.1)
3. Mastering Pandas
Pandas is the core library for working with tabular data. It provides DataFrame and Series data structures and supports every step of data cleaning, transformation, and analysis.
Series and DataFrame
import pandas as pd
import numpy as np
# Creating a Series
s1 = pd.Series([1, 2, 3, 4, 5])
s2 = pd.Series([10, 20, 30], index=['a', 'b', 'c'])
s3 = pd.Series({'x': 100, 'y': 200, 'z': 300})
print(s2['a']) # 10
print(s2[['a', 'c']]) # a=10, c=30
# Creating a DataFrame
data = {
'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
'age': [25, 30, 35, 28, 22],
'score': [88.5, 92.3, 78.1, 95.7, 83.2],
'passed': [True, True, False, True, True]
}
df = pd.DataFrame(data)
print(df.head())
print(df.tail(3))
print(df.info())
print(df.describe())
print(df.dtypes)
print(df.shape) # (5, 4)
Reading and Writing Data
import pandas as pd
# CSV
df_csv = pd.read_csv('data.csv',
sep=',',
header=0,
index_col=0,
parse_dates=['date'],
encoding='utf-8',
na_values=['N/A', 'null', ''])
df_csv.to_csv('output.csv', index=False, encoding='utf-8')
# Excel
df_excel = pd.read_excel('data.xlsx', sheet_name='Sheet1', header=0)
df_excel.to_excel('output.xlsx', sheet_name='Result', index=False)
# JSON
df_json = pd.read_json('data.json', orient='records')
df_json.to_json('output.json', orient='records', indent=2)
# Parquet (high-performance columnar format)
df.to_parquet('data.parquet', engine='pyarrow', compression='snappy')
df_parquet = pd.read_parquet('data.parquet')
# SQL (SQLite example)
import sqlite3
conn = sqlite3.connect('database.db')
df_sql = pd.read_sql_query("SELECT * FROM users WHERE age > 25", conn)
df.to_sql('new_table', conn, if_exists='replace', index=False)
Indexing with loc and iloc
import pandas as pd
import numpy as np
df = pd.DataFrame({
'A': range(10),
'B': range(10, 20),
'C': range(20, 30)
}, index=[f'row{i}' for i in range(10)])
# loc: label-based indexing
print(df.loc['row0', 'A']) # single value
print(df.loc['row0':'row3', 'A':'B']) # range (end inclusive)
print(df.loc[['row1', 'row5'], 'C']) # list
# iloc: position-based indexing
print(df.iloc[0, 0]) # row 0, col 0
print(df.iloc[0:4, 0:2]) # range (end exclusive)
print(df.iloc[[1, 5], 2]) # list
# Condition-based selection
mask = df['A'] > 5
filtered = df[mask]
filtered2 = df[df['B'].between(12, 17)]
filtered3 = df.query('A > 5 and B < 18')
Handling Missing Values
import pandas as pd
import numpy as np
# Create data with missing values
df = pd.DataFrame({
'age': [25, np.nan, 35, np.nan, 22],
'income': [50000, 60000, np.nan, 80000, np.nan],
'city': ['Seoul', 'Busan', None, 'Incheon', 'Seoul'],
'score': [88.5, 92.3, 78.1, np.nan, 83.2]
})
# Inspect missing values
print(df.isnull().sum()) # count per column
print(df.isnull().sum() / len(df) * 100) # missing percentage
# Drop missing values
df_dropped_rows = df.dropna() # drop rows with any NaN
df_dropped_cols = df.dropna(axis=1) # drop columns with any NaN
df_thresh = df.dropna(thresh=3) # keep rows with at least 3 non-NaN
# Fill missing values
df_filled_0 = df.fillna(0)
df_filled_mean = df.fillna(df.mean())
df_filled_dict = df.fillna({
'age': df['age'].mean(),
'income': df['income'].median(),
'city': 'Unknown',
'score': df['score'].mean()
})
# Forward/backward fill
df_ffill = df.fillna(method='ffill')
df_bfill = df.fillna(method='bfill')
# Interpolation
df_interpolated = df.interpolate(method='linear')
# Smart handling pattern
for col in df.columns:
missing_pct = df[col].isnull().mean()
if missing_pct > 0.5:
df.drop(columns=[col], inplace=True)
elif df[col].dtype == 'object':
df[col].fillna(df[col].mode()[0], inplace=True)
else:
df[col].fillna(df[col].median(), inplace=True)
Data Transformation
import pandas as pd
import numpy as np
df = pd.DataFrame({
'text': ['hello world', 'PYTHON IS GREAT', 'data science'],
'value': [1, 2, 3],
'category': ['A', 'B', 'A']
})
# apply: apply a function
df['text_upper'] = df['text'].apply(str.upper)
df['text_length'] = df['text'].apply(len)
# Complex function
def process_text(text):
return ' '.join(word.capitalize() for word in text.lower().split())
df['text_processed'] = df['text'].apply(process_text)
# Multiple columns simultaneously
def feature_engineer(row):
return pd.Series({
'value_squared': row['value'] ** 2,
'category_is_A': int(row['category'] == 'A')
})
new_features = df.apply(feature_engineer, axis=1)
df = pd.concat([df, new_features], axis=1)
# map: apply a mapping table
category_map = {'A': 'Alpha', 'B': 'Beta', 'C': 'Gamma'}
df['category_name'] = df['category'].map(category_map)
# String operations (vectorized)
texts = pd.Series(['Hello World', 'Python 3.12', 'Machine Learning'])
print(texts.str.lower())
print(texts.str.split())
print(texts.str.contains('Python'))
print(texts.str.extract(r'(\w+)\s+(\w+)'))
Groupby Aggregation
import pandas as pd
import numpy as np
np.random.seed(42)
df = pd.DataFrame({
'team': np.random.choice(['A', 'B', 'C'], 100),
'role': np.random.choice(['dev', 'ds', 'pm'], 100),
'score': np.random.randint(60, 100, 100),
'salary': np.random.randint(3000, 8000, 100)
})
# Basic groupby
grouped = df.groupby('team')
print(grouped['score'].mean())
print(grouped['salary'].describe())
# Multiple keys
multi_grouped = df.groupby(['team', 'role'])
print(multi_grouped['score'].mean().unstack())
# Custom aggregation
agg_result = df.groupby('team').agg(
avg_score=('score', 'mean'),
total_salary=('salary', 'sum'),
count=('score', 'count'),
max_score=('score', 'max'),
min_salary=('salary', 'min')
)
# Custom aggregation function
def iqr(x):
return x.quantile(0.75) - x.quantile(0.25)
custom_agg = df.groupby('team')['score'].agg(['mean', 'median', 'std', iqr])
# filter: keep groups matching a condition
large_teams = df.groupby('team').filter(lambda x: len(x) > 30)
Merging DataFrames
import pandas as pd
users = pd.DataFrame({
'user_id': [1, 2, 3, 4, 5],
'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
'age': [25, 30, 35, 28, 22]
})
orders = pd.DataFrame({
'order_id': [101, 102, 103, 104, 105, 106],
'user_id': [1, 2, 1, 3, 5, 6],
'amount': [150, 250, 80, 320, 190, 440]
})
# Inner join (intersection)
inner = pd.merge(users, orders, on='user_id', how='inner')
# Left join
left = pd.merge(users, orders, on='user_id', how='left')
# Right join
right = pd.merge(users, orders, on='user_id', how='right')
# Outer join (union)
outer = pd.merge(users, orders, on='user_id', how='outer')
# concat
df1 = pd.DataFrame({'A': [1, 2], 'B': [3, 4]})
df2 = pd.DataFrame({'A': [5, 6], 'B': [7, 8]})
vertical = pd.concat([df1, df2], axis=0, ignore_index=True)
Practical: AI Training Data Preprocessing Pipeline
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
def preprocess_titanic(df):
"""Titanic dataset preprocessing pipeline"""
df = df.copy()
# Feature engineering
df['Title'] = df['Name'].str.extract(r' ([A-Za-z]+)\.')
title_map = {'Mr': 0, 'Miss': 1, 'Mrs': 2, 'Master': 3}
df['Title'] = df['Title'].map(title_map).fillna(4)
df['FamilySize'] = df['SibSp'] + df['Parch'] + 1
df['IsAlone'] = (df['FamilySize'] == 1).astype(int)
# Handle missing values
df['Age'].fillna(df.groupby('Title')['Age'].transform('median'), inplace=True)
df['Fare'].fillna(df['Fare'].median(), inplace=True)
df['Embarked'].fillna(df['Embarked'].mode()[0], inplace=True)
# Encoding
df['Sex'] = (df['Sex'] == 'male').astype(int)
df['Embarked'] = df['Embarked'].map({'S': 0, 'C': 1, 'Q': 2})
features = ['Pclass', 'Sex', 'Age', 'Fare', 'Embarked',
'FamilySize', 'IsAlone', 'Title']
return df[features]
4. Matplotlib and Seaborn Visualization
Basic Plots
import matplotlib.pyplot as plt
import numpy as np
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
# Line plot
x = np.linspace(0, 2 * np.pi, 100)
axes[0, 0].plot(x, np.sin(x), 'b-', linewidth=2, label='sin(x)')
axes[0, 0].plot(x, np.cos(x), 'r--', linewidth=2, label='cos(x)')
axes[0, 0].set_title('Trigonometric Functions')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# Bar plot
categories = ['Classification', 'Regression', 'Clustering', 'Dim. Reduction']
values = [85, 72, 68, 91]
bars = axes[0, 1].bar(categories, values,
color=['#3498db', '#e74c3c', '#2ecc71', '#f39c12'])
axes[0, 1].set_title('Algorithm Accuracy')
axes[0, 1].set_ylabel('Accuracy (%)')
for bar, val in zip(bars, values):
axes[0, 1].text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.5,
f'{val}%', ha='center', va='bottom')
# Scatter plot
np.random.seed(42)
x_scatter = np.random.randn(100)
y_scatter = 2 * x_scatter + np.random.randn(100) * 0.5
axes[0, 2].scatter(x_scatter, y_scatter, alpha=0.6, c=y_scatter, cmap='viridis')
axes[0, 2].set_title('Scatter Plot')
# Histogram
data = np.concatenate([
np.random.normal(0, 1, 500),
np.random.normal(4, 1.5, 300)
])
axes[1, 0].hist(data, bins=50, density=True, alpha=0.7, color='steelblue')
axes[1, 0].set_title('Data Distribution')
# Box plot
box_data = [np.random.normal(i, 1, 100) for i in range(5)]
axes[1, 1].boxplot(box_data, labels=[f'Model{i+1}' for i in range(5)])
axes[1, 1].set_title('Model Performance Distribution')
plt.tight_layout()
plt.savefig('basic_plots.png', dpi=150, bbox_inches='tight')
plt.show()
Seaborn Statistical Visualization
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
sns.set_theme(style='whitegrid', palette='husl', font_scale=1.2)
# Sample data
df = pd.DataFrame({
'model': np.repeat(['ResNet', 'VGG', 'EfficientNet', 'ViT'], 50),
'accuracy': np.concatenate([
np.random.normal(92, 2, 50),
np.random.normal(88, 3, 50),
np.random.normal(94, 1.5, 50),
np.random.normal(95, 2.5, 50)
]),
'params_M': np.concatenate([
np.random.normal(25, 2, 50),
np.random.normal(138, 5, 50),
np.random.normal(5.3, 0.3, 50),
np.random.normal(86, 3, 50)
])
})
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# Violin plot
sns.violinplot(data=df, x='model', y='accuracy', ax=axes[0, 0])
axes[0, 0].set_title('Accuracy Distribution by Model')
# Heatmap (correlation)
corr_data = df[['accuracy', 'params_M']].corr()
sns.heatmap(corr_data, annot=True, fmt='.2f', cmap='RdYlGn',
center=0, ax=axes[0, 1])
axes[0, 1].set_title('Correlation Matrix')
# Scatter with regression line
sns.regplot(data=df, x='params_M', y='accuracy',
scatter_kws={'alpha': 0.4}, ax=axes[1, 0])
axes[1, 0].set_title('Parameters vs Accuracy')
# KDE distribution plot
for model in df['model'].unique():
subset = df[df['model'] == model]
sns.kdeplot(data=subset, x='accuracy', label=model, ax=axes[1, 1])
axes[1, 1].set_title('Accuracy KDE by Model')
axes[1, 1].legend()
plt.tight_layout()
plt.savefig('seaborn_plots.png', dpi=150, bbox_inches='tight')
plt.show()
Practical: Learning Curves and Confusion Matrix
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from sklearn.metrics import confusion_matrix
def plot_learning_curve(train_losses, val_losses, train_accs, val_accs):
"""Visualize training and validation learning curves"""
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
epochs = range(1, len(train_losses) + 1)
ax1.plot(epochs, train_losses, 'b-', label='Train Loss', linewidth=2)
ax1.plot(epochs, val_losses, 'r--', label='Val Loss', linewidth=2)
ax1.fill_between(epochs, train_losses, val_losses, alpha=0.1, color='gray')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.set_title('Loss Curve')
ax1.legend()
ax1.grid(True, alpha=0.3)
ax2.plot(epochs, train_accs, 'b-', label='Train Accuracy', linewidth=2)
ax2.plot(epochs, val_accs, 'r--', label='Val Accuracy', linewidth=2)
best_epoch = np.argmax(val_accs)
ax2.axvline(x=best_epoch + 1, color='g', linestyle=':',
label=f'Best Epoch ({best_epoch+1})')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Accuracy')
ax2.set_title('Accuracy Curve')
ax2.legend()
ax2.grid(True, alpha=0.3)
plt.tight_layout()
return fig
def plot_confusion_matrix(y_true, y_pred, class_names):
"""Visualize a confusion matrix"""
cm = confusion_matrix(y_true, y_pred)
cm_pct = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=class_names, yticklabels=class_names, ax=ax1)
ax1.set_title('Confusion Matrix (Counts)')
ax1.set_ylabel('True Label')
ax1.set_xlabel('Predicted Label')
sns.heatmap(cm_pct, annot=True, fmt='.2%', cmap='Greens',
xticklabels=class_names, yticklabels=class_names, ax=ax2)
ax2.set_title('Confusion Matrix (Rates)')
ax2.set_ylabel('True Label')
ax2.set_xlabel('Predicted Label')
plt.tight_layout()
return fig
5. Machine Learning with Scikit-learn
Data Preprocessing
from sklearn.preprocessing import (
StandardScaler, MinMaxScaler, RobustScaler,
LabelEncoder, OneHotEncoder
)
import numpy as np
X = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]], dtype=float)
# StandardScaler: mean 0, std 1
scaler = StandardScaler()
X_standard = scaler.fit_transform(X)
# MinMaxScaler: scale to [0, 1]
min_max = MinMaxScaler(feature_range=(0, 1))
X_minmax = min_max.fit_transform(X)
# RobustScaler: uses median and IQR, robust to outliers
robust = RobustScaler()
X_robust = robust.fit_transform(X)
# LabelEncoder
le = LabelEncoder()
labels = ['cat', 'dog', 'bird', 'cat', 'dog']
encoded = le.fit_transform(labels) # [0, 2, 1, 0, 2]
# OneHotEncoder
ohe = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
categories = np.array([['red'], ['green'], ['blue'], ['red']])
encoded_ohe = ohe.fit_transform(categories)
Feature Selection and Extraction
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.ensemble import RandomForestClassifier
import numpy as np
X = np.random.randn(200, 20)
y = (X[:, 0] + X[:, 1] + np.random.randn(200) * 0.1 > 0).astype(int)
# PCA
pca = PCA(n_components=10)
X_pca = pca.fit_transform(X)
print(f"Explained variance: {pca.explained_variance_ratio_.sum():.2%}")
# SelectKBest
selector = SelectKBest(f_classif, k=5)
X_kbest = selector.fit_transform(X, y)
selected = selector.get_support(indices=True)
print(f"Selected feature indices: {selected}")
Linear Models
from sklearn.linear_model import (
LinearRegression, LogisticRegression, Ridge, Lasso
)
from sklearn.datasets import make_classification, make_regression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score, accuracy_score
import numpy as np
# Regression
X_reg, y_reg = make_regression(n_samples=500, n_features=20, noise=0.1, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X_reg, y_reg, test_size=0.2)
lr = LinearRegression()
lr.fit(X_train, y_train)
print(f"Linear R2: {r2_score(y_test, lr.predict(X_test)):.4f}")
ridge = Ridge(alpha=1.0)
ridge.fit(X_train, y_train)
print(f"Ridge R2: {r2_score(y_test, ridge.predict(X_test)):.4f}")
lasso = Lasso(alpha=0.01)
lasso.fit(X_train, y_train)
print(f"Lasso R2: {r2_score(y_test, lasso.predict(X_test)):.4f}")
print(f"Non-zero coefficients: {np.sum(lasso.coef_ != 0)}")
Tree-based Models
from sklearn.ensemble import (
RandomForestClassifier, GradientBoostingClassifier
)
import xgboost as xgb
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import numpy as np
X, y = make_classification(n_samples=1000, n_features=20,
n_classes=2, n_informative=10,
random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
models = {
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1),
'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42),
'XGBoost': xgb.XGBClassifier(n_estimators=100, random_state=42, eval_metric='logloss'),
}
for name, model in models.items():
model.fit(X_train, y_train)
acc = (model.predict(X_test) == y_test).mean()
print(f"{name}: {acc:.4f}")
Model Evaluation and Cross-Validation
from sklearn.model_selection import cross_val_score, StratifiedKFold, GridSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
rf = RandomForestClassifier(n_estimators=100, random_state=42)
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(rf, X, y, cv=cv, scoring='accuracy', n_jobs=-1)
print(f"CV Accuracy: {scores.mean():.4f} (+/- {scores.std() * 2:.4f})")
# GridSearchCV
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [None, 5, 10],
'min_samples_split': [2, 5, 10]
}
grid_search = GridSearchCV(
RandomForestClassifier(random_state=42),
param_grid, cv=5, scoring='accuracy', n_jobs=-1, verbose=1
)
grid_search.fit(X, y)
print(f"Best params: {grid_search.best_params_}")
print(f"Best CV score: {grid_search.best_score_:.4f}")
Pipelines
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
import pandas as pd
import numpy as np
np.random.seed(42)
n = 500
df = pd.DataFrame({
'age': np.random.randint(18, 70, n).astype(float),
'income': np.random.randint(20000, 100000, n).astype(float),
'education': np.random.choice(['High School', 'Bachelor', 'Master', 'PhD'], n),
'city': np.random.choice(['New York', 'Chicago', 'Houston', 'Phoenix'], n),
'target': np.random.randint(0, 2, n)
})
X = df.drop('target', axis=1)
y = df['target']
numeric_features = ['age', 'income']
categorical_features = ['education', 'city']
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
preprocessor = ColumnTransformer(transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
])
full_pipeline = Pipeline(steps=[
('preprocessor', preprocessor),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
from sklearn.model_selection import cross_val_score
scores = cross_val_score(full_pipeline, X, y, cv=5, scoring='accuracy')
print(f"Pipeline CV accuracy: {scores.mean():.4f} +/- {scores.std():.4f}")
6. Python Performance Optimization
List Comprehensions vs map vs for
import time
import numpy as np
n = 1_000_000
data = list(range(n))
# For loop
start = time.time()
result_for = []
for x in data:
result_for.append(x ** 2)
print(f"For loop: {time.time() - start:.4f}s")
# List comprehension
start = time.time()
result_lc = [x ** 2 for x in data]
print(f"List comprehension: {time.time() - start:.4f}s")
# map
start = time.time()
result_map = list(map(lambda x: x ** 2, data))
print(f"Map: {time.time() - start:.4f}s")
# NumPy vectorization
arr = np.array(data)
start = time.time()
result_np = arr ** 2
print(f"NumPy: {time.time() - start:.4f}s")
Generators
import sys
# List vs generator memory comparison
list_comp = [x**2 for x in range(1_000_000)]
gen_expr = (x**2 for x in range(1_000_000))
print(f"List size: {sys.getsizeof(list_comp):,} bytes") # ~8MB
print(f"Generator size: {sys.getsizeof(gen_expr)} bytes") # ~120 bytes
# Generator function
def infinite_data_loader(dataset, batch_size=32):
"""Infinite data loader generator"""
while True:
indices = np.random.permutation(len(dataset))
for i in range(0, len(dataset), batch_size):
batch_indices = indices[i:i + batch_size]
yield dataset[batch_indices]
Parallel Processing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
def cpu_intensive_task(n):
return sum(i**2 for i in range(n))
def io_bound_task(url):
time.sleep(0.1)
return f"Fetched: {url}"
# ThreadPoolExecutor: best for I/O-bound tasks
urls = [f"https://example.com/data/{i}" for i in range(20)]
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(io_bound_task, urls))
print(f"ThreadPool time: {time.time() - start:.2f}s")
# ProcessPoolExecutor: best for CPU-bound tasks
numbers = [1_000_000] * 8
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(cpu_intensive_task, numbers))
print(f"ProcessPool time: {time.time() - start:.2f}s")
Numba JIT Compilation
from numba import njit, prange
import numpy as np
import time
@njit(parallel=True)
def fast_matrix_norm(A):
"""Parallelized matrix norm computation"""
n, m = A.shape
result = 0.0
for i in prange(n):
for j in prange(m):
result += A[i, j] ** 2
return result ** 0.5
A = np.random.randn(1000, 1000)
# Warmup (first run triggers JIT compilation)
_ = fast_matrix_norm(A)
start = time.time()
for _ in range(10):
result = fast_matrix_norm(A)
print(f"Numba: {time.time() - start:.4f}s")
start = time.time()
for _ in range(10):
result = np.linalg.norm(A)
print(f"NumPy: {time.time() - start:.4f}s")
7. AI/ML Utility Libraries
tqdm - Progress Bars
from tqdm import tqdm, trange
import time
# Basic usage
for i in tqdm(range(100)):
time.sleep(0.01)
# Custom description
items = list(range(50))
for item in tqdm(items, desc='Processing', unit='sample'):
pass
# Nested progress bars
for epoch in trange(10, desc='Epochs'):
for batch in trange(100, desc='Batches', leave=False):
pass
# Manual update with metrics
with tqdm(total=100, desc='Training') as pbar:
for i in range(10):
pbar.update(10)
pbar.set_postfix({'loss': 0.5 - i * 0.04, 'acc': 0.7 + i * 0.02})
Weights and Biases (wandb) - Experiment Tracking
import wandb
import numpy as np
wandb.init(
project="ml-experiment",
name="run-001",
config={
"learning_rate": 0.001,
"batch_size": 32,
"epochs": 100,
"model": "ResNet50",
"optimizer": "AdamW"
}
)
for epoch in range(100):
train_loss = 1.0 - epoch * 0.009 + np.random.normal(0, 0.01)
val_loss = 1.0 - epoch * 0.008 + np.random.normal(0, 0.02)
train_acc = epoch * 0.009 + np.random.normal(0, 0.01)
val_acc = epoch * 0.008 + np.random.normal(0, 0.02)
wandb.log({
"epoch": epoch,
"train/loss": train_loss,
"val/loss": val_loss,
"train/acc": train_acc,
"val/acc": val_acc,
"learning_rate": 0.001 * (0.95 ** epoch)
})
wandb.finish()
pytest - Testing
# tests/test_preprocessing.py
import pytest
import numpy as np
import pandas as pd
def normalize(x):
return (x - x.mean()) / x.std()
class TestNormalize:
def test_mean_zero(self):
x = np.array([1.0, 2.0, 3.0, 4.0, 5.0])
result = normalize(x)
assert abs(result.mean()) < 1e-10
def test_std_one(self):
x = np.array([1.0, 2.0, 3.0, 4.0, 5.0])
result = normalize(x)
assert abs(result.std() - 1.0) < 1e-10
def test_shape_preserved(self):
x = np.random.randn(10, 5)
result = normalize(x)
assert result.shape == x.shape
@pytest.fixture
def sample_df():
return pd.DataFrame({
'value': [1.0, 2.0, np.nan, 4.0, 5.0],
'label': ['a', 'b', 'c', 'd', 'e']
})
def test_dropna(sample_df):
result = sample_df.dropna()
assert result.isnull().sum().sum() == 0
# Run with: pytest tests/ -v --coverage
Conclusion
This guide covered the essential Python ecosystem for AI/ML development:
- Environment Setup: Project isolation with venv, conda, and poetry
- NumPy: High-performance numerical computation through vectorized operations
- Pandas: Building data preprocessing and analysis pipelines
- Matplotlib/Seaborn: Rich visualizations to surface insights
- Scikit-learn: Complete ML workflow from preprocessing to model evaluation
- Performance Optimization: Generators, parallel processing, and Numba JIT
- Utilities: tqdm, wandb, hydra, and pytest
In real-world ML projects, these tools combine to build a complete pipeline: data loading, preprocessing, feature engineering, model training, evaluation, and deployment. Consult the official documentation for each tool to go deeper.