AI/ML을 위한 Python 완전 가이드
Python은 AI와 머신러닝 분야의 표준 언어입니다. 간결한 문법, 방대한 라이브러리 생태계, 활성화된 커뮤니티 덕분에 연구자와 엔지니어 모두가 선택하는 언어가 되었습니다. 이 가이드에서는 AI/ML 개발에 필요한 Python 핵심 라이브러리들을 완전히 마스터하는 방법을 다룹니다.
1. AI/ML을 위한 Python 환경 설정
Python 버전 선택
AI/ML 작업에는 Python 3.10 이상을 권장합니다. Python 3.10+는 구조적 패턴 매칭, 더 명확한 에러 메시지, 향상된 타입 힌트를 제공합니다. 2026년 현재 Python 3.12가 안정 버전으로 대부분의 ML 라이브러리와 호환됩니다.
Python 버전 확인
python --version
python3 --version
pyenv를 이용한 특정 버전 설치
pyenv install 3.12.0
pyenv global 3.12.0
가상환경 설정
가상환경은 프로젝트별 의존성을 격리하는 핵심 도구입니다.
**venv (표준 라이브러리)**
가상환경 생성
python -m venv ml_env
활성화 (Linux/Mac)
source ml_env/bin/activate
활성화 (Windows)
ml_env\Scripts\activate
비활성화
deactivate
**conda (Anaconda/Miniconda)**
환경 생성
conda create -n ml_env python=3.12
활성화
conda activate ml_env
패키지 설치
conda install numpy pandas scikit-learn matplotlib
환경 목록
conda env list
환경 내보내기
conda env export > environment.yml
환경 복원
conda env create -f environment.yml
**Poetry (의존성 관리 고급)**
Poetry 설치
curl -sSL https://install.python-poetry.org | python3 -
프로젝트 초기화
poetry new ml_project
cd ml_project
패키지 추가
poetry add numpy pandas scikit-learn torch
개발 의존성 추가
poetry add --dev pytest black flake8
환경 실행
poetry run python train.py
Jupyter Notebook/Lab 설정
JupyterLab 설치
pip install jupyterlab
커널 등록
python -m ipykernel install --user --name=ml_env --display-name "ML Environment"
JupyterLab 실행
jupyter lab
유용한 확장 설치
pip install jupyterlab-git
pip install nbformat
**Jupyter 설정 파일 (~/.jupyter/jupyter_lab_config.py)**
c.ServerApp.open_browser = True
c.ServerApp.port = 8888
c.ServerApp.ip = '0.0.0.0'
GPU Python 환경 (CUDA, cuDNN)
CUDA 버전 확인
nvidia-smi
nvcc --version
PyTorch with CUDA 설치 (CUDA 12.1)
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
TensorFlow with GPU
pip install tensorflow[and-cuda]
GPU 사용 가능 여부 확인 (PyTorch)
python -c "import torch; print(torch.cuda.is_available())"
cuDNN 확인
python -c "import torch; print(torch.backends.cudnn.version())"
필수 패키지 목록
requirements.txt
numpy>=1.24.0
pandas>=2.0.0
matplotlib>=3.7.0
seaborn>=0.12.0
scikit-learn>=1.3.0
scipy>=1.11.0
torch>=2.0.0
torchvision>=0.15.0
tensorflow>=2.13.0
xgboost>=1.7.0
lightgbm>=4.0.0
optuna>=3.3.0
wandb>=0.15.0
tqdm>=4.65.0
jupyterlab>=4.0.0
black>=23.0.0
flake8>=6.0.0
pytest>=7.4.0
한 번에 설치
pip install -r requirements.txt
2. NumPy 완전 마스터
NumPy(Numerical Python)는 파이썬 과학 계산의 기반입니다. 다차원 배열과 수학 함수를 제공하며, 대부분의 ML 라이브러리가 내부적으로 NumPy를 사용합니다.
ndarray 생성
기본 배열 생성
arr1 = np.array([1, 2, 3, 4, 5])
arr2 = np.array([[1, 2, 3], [4, 5, 6]])
print(arr1.shape) # (5,)
print(arr2.shape) # (2, 3)
print(arr2.dtype) # int64
print(arr2.ndim) # 2
print(arr2.size) # 6
특수 배열
zeros = np.zeros((3, 4)) # 모든 원소 0
ones = np.ones((2, 3, 4)) # 모든 원소 1
full = np.full((3, 3), 7) # 모든 원소 7
eye = np.eye(4) # 단위 행렬
empty = np.empty((2, 3)) # 초기화 없는 배열
범위 배열
arange = np.arange(0, 10, 2) # [0, 2, 4, 6, 8]
linspace = np.linspace(0, 1, 5) # [0, 0.25, 0.5, 0.75, 1.0]
logspace = np.logspace(0, 3, 4) # [1, 10, 100, 1000]
난수 배열
np.random.seed(42)
rand_uniform = np.random.rand(3, 4) # [0, 1) 균등분포
rand_normal = np.random.randn(3, 4) # 표준 정규분포
rand_int = np.random.randint(0, 10, (3, 4)) # 정수 난수
rand_choice = np.random.choice([1, 2, 3, 4, 5], size=10, replace=True)
고급 난수 (권장 방식)
rng = np.random.default_rng(42)
samples = rng.normal(loc=0, scale=1, size=(100, 3))
기본 연산과 Broadcasting
a = np.array([[1, 2, 3], [4, 5, 6]])
b = np.array([[7, 8, 9], [10, 11, 12]])
기본 산술 연산 (원소별)
print(a + b) # 원소 덧셈
print(a - b) # 원소 뺄셈
print(a * b) # 원소 곱셈
print(a / b) # 원소 나눗셈
print(a ** 2) # 원소 제곱
print(a % 2) # 원소 나머지
Broadcasting - 서로 다른 형태의 배열 연산
규칙: 차원이 맞지 않으면 1인 차원을 확장
x = np.array([[1], [2], [3]]) # shape: (3, 1)
y = np.array([10, 20, 30]) # shape: (3,) → (1, 3)
Broadcasting 결과: (3, 3)
result = x + y
print(result)
[[11, 21, 31],
[12, 22, 32],
[13, 23, 33]]
실용적인 Broadcasting 예제
배치 데이터 정규화
data = np.random.randn(100, 10) # 100개 샘플, 10개 특성
mean = data.mean(axis=0) # 각 특성의 평균 (shape: 10,)
std = data.std(axis=0) # 각 특성의 표준편차 (shape: 10,)
normalized = (data - mean) / std # Broadcasting으로 정규화
print(normalized.mean(axis=0).round(10)) # ≈ 0
print(normalized.std(axis=0).round(10)) # ≈ 1
인덱싱, 슬라이싱, Boolean 인덱싱
arr = np.arange(24).reshape(4, 6)
print(arr)
[[ 0 1 2 3 4 5]
[ 6 7 8 9 10 11]
[12 13 14 15 16 17]
[18 19 20 21 22 23]]
기본 인덱싱
print(arr[0, 0]) # 0
print(arr[3, 5]) # 23
print(arr[-1, -1]) # 23
슬라이싱
print(arr[1:3, 2:5]) # 2~3행, 3~5열
print(arr[:, 0]) # 모든 행의 0번째 열
print(arr[::2, ::2]) # 2 간격으로 샘플링
Fancy 인덱싱
rows = np.array([0, 2])
cols = np.array([1, 4])
print(arr[rows, cols]) # [arr[0,1], arr[2,4]] = [1, 16]
Boolean 인덱싱 (마스킹)
mask = arr > 12
print(arr[mask]) # 12보다 큰 원소들
조건을 이용한 필터링
data = np.array([1, -2, 3, -4, 5, -6])
positive = data[data > 0] # [1, 3, 5]
print(positive)
np.where - 조건에 따라 선택
result = np.where(data > 0, data, 0) # 양수는 그대로, 음수는 0
print(result) # [1, 0, 3, 0, 5, 0]
np.where로 인덱스 찾기
indices = np.where(data > 0)
print(indices) # (array([0, 2, 4]),)
형태 변환
arr = np.arange(12)
reshape
a = arr.reshape(3, 4)
b = arr.reshape(2, 2, 3)
c = arr.reshape(-1, 4) # -1은 자동 계산: (3, 4)
flatten vs ravel
flat1 = a.flatten() # 복사본 반환
flat2 = a.ravel() # 가능하면 뷰 반환 (메모리 효율)
transpose
mat = np.random.randn(3, 4)
transposed = mat.T # (4, 3)
transposed2 = mat.transpose() # 동일
transposed3 = np.transpose(mat, (1, 0)) # 축 순서 지정
3D 배열 transpose
tensor = np.random.randn(2, 3, 4)
배치, 채널, 공간 → 배치, 공간, 채널
reordered = tensor.transpose(0, 2, 1) # (2, 4, 3)
squeeze와 expand_dims
x = np.array([[[1, 2, 3]]]) # shape: (1, 1, 3)
squeezed = np.squeeze(x) # (3,)
expanded = np.expand_dims(squeezed, axis=0) # (1, 3)
배열 이어붙이기
a = np.array([[1, 2], [3, 4]])
b = np.array([[5, 6], [7, 8]])
hstack = np.hstack([a, b]) # 수평으로 쌓기 (2, 4)
vstack = np.vstack([a, b]) # 수직으로 쌓기 (4, 2)
concat0 = np.concatenate([a, b], axis=0) # vstack과 동일
concat1 = np.concatenate([a, b], axis=1) # hstack과 동일
수학 함수
x = np.array([0, np.pi/6, np.pi/4, np.pi/3, np.pi/2])
삼각함수
sin_x = np.sin(x)
cos_x = np.cos(x)
tan_x = np.tan(x)
지수/로그
exp_x = np.exp(x) # e^x
log_x = np.log(x + 1) # 자연로그 (ln)
log2_x = np.log2(x + 1) # 밑이 2인 로그
log10_x = np.log10(x + 1) # 상용로그
제곱/제곱근
sqrt_x = np.sqrt(x)
square_x = np.square(x) # x^2
power_x = np.power(x, 3) # x^3
절댓값, 반올림
abs_x = np.abs(x)
ceil_x = np.ceil(x) # 올림
floor_x = np.floor(x) # 내림
round_x = np.round(x, 2) # 반올림
시그모이드 (직접 구현)
def sigmoid(x):
return 1 / (1 + np.exp(-x))
def softmax(x):
e_x = np.exp(x - x.max()) # 수치 안정성을 위해 max 빼기
return e_x / e_x.sum()
z = np.array([1.0, 2.0, 3.0])
print(sigmoid(z)) # [0.731, 0.880, 0.952]
print(softmax(z)) # [0.090, 0.245, 0.665]
선형대수
A = np.array([[1, 2], [3, 4]])
B = np.array([[5, 6], [7, 8]])
행렬 곱셈
C = np.dot(A, B) # 고전적 방식
C = A @ B # Python 3.5+ 권장 방식
C = np.matmul(A, B) # np.dot와 동일 (2D)
배치 행렬 곱 (3D 이상)
batch_A = np.random.randn(32, 3, 4)
batch_B = np.random.randn(32, 4, 5)
batch_C = batch_A @ batch_B # (32, 3, 5)
선형대수 함수
det = np.linalg.det(A) # 행렬식
inv = np.linalg.inv(A) # 역행렬
rank = np.linalg.matrix_rank(A) # 계수
trace = np.trace(A) # 대각합
고유값 분해
eigenvalues, eigenvectors = np.linalg.eig(A)
특이값 분해 (SVD)
U, S, Vt = np.linalg.svd(A)
연립방정식 풀기 Ax = b
b = np.array([5, 6])
x = np.linalg.solve(A, b)
놈 (Norm)
v = np.array([3, 4])
l1_norm = np.linalg.norm(v, ord=1) # L1 놈: 7
l2_norm = np.linalg.norm(v, ord=2) # L2 놈: 5
inf_norm = np.linalg.norm(v, ord=np.inf) # 최대값 놈: 4
통계 함수
data = np.random.randn(100, 5)
기본 통계
print(data.mean()) # 전체 평균
print(data.mean(axis=0)) # 열별 평균 (shape: 5,)
print(data.mean(axis=1)) # 행별 평균 (shape: 100,)
print(data.std()) # 표준편차
print(data.var()) # 분산
print(data.sum()) # 합
print(data.min()) # 최솟값
print(data.max()) # 최댓값
누적 연산
cumsum = data.cumsum(axis=0) # 누적 합
cumprod = data.cumprod(axis=0) # 누적 곱
정렬
sorted_arr = np.sort(data, axis=0)
sort_indices = np.argsort(data, axis=0) # 정렬 인덱스
분위수
q25 = np.percentile(data, 25)
q50 = np.percentile(data, 50) # 중앙값
q75 = np.percentile(data, 75)
median = np.median(data)
상관계수
corr = np.corrcoef(data.T) # 5x5 상관계수 행렬
히스토그램
counts, bin_edges = np.histogram(data[:, 0], bins=20)
벡터화 연산 vs for loop 성능 비교
n = 1_000_000
a = np.random.randn(n)
b = np.random.randn(n)
for loop 방식
start = time.time()
result_loop = []
for i in range(n):
result_loop.append(a[i] * b[i])
loop_time = time.time() - start
print(f"For loop: {loop_time:.4f}초")
벡터화 방식
start = time.time()
result_vec = a * b
vec_time = time.time() - start
print(f"Vectorized: {vec_time:.4f}초")
print(f"속도 향상: {loop_time / vec_time:.1f}배")
일반적으로 100~1000배 빠름
실전: 신경망 순전파 NumPy로 구현
class SimpleNeuralNetwork:
"""NumPy만으로 구현한 2층 신경망"""
def __init__(self, input_size, hidden_size, output_size, seed=42):
np.random.seed(seed)
He 초기화
self.W1 = np.random.randn(input_size, hidden_size) * np.sqrt(2.0 / input_size)
self.b1 = np.zeros((1, hidden_size))
self.W2 = np.random.randn(hidden_size, output_size) * np.sqrt(2.0 / hidden_size)
self.b2 = np.zeros((1, output_size))
def relu(self, z):
return np.maximum(0, z)
def relu_derivative(self, z):
return (z > 0).astype(float)
def softmax(self, z):
exp_z = np.exp(z - z.max(axis=1, keepdims=True))
return exp_z / exp_z.sum(axis=1, keepdims=True)
def forward(self, X):
층 1
self.Z1 = X @ self.W1 + self.b1
self.A1 = self.relu(self.Z1)
층 2
self.Z2 = self.A1 @ self.W2 + self.b2
self.A2 = self.softmax(self.Z2)
return self.A2
def cross_entropy_loss(self, y_pred, y_true):
m = y_true.shape[0]
log_probs = -np.log(y_pred[range(m), y_true] + 1e-8)
return log_probs.mean()
def backward(self, X, y_true, learning_rate=0.01):
m = X.shape[0]
출력층 기울기
dZ2 = self.A2.copy()
dZ2[range(m), y_true] -= 1
dZ2 /= m
dW2 = self.A1.T @ dZ2
db2 = dZ2.sum(axis=0, keepdims=True)
은닉층 기울기
dA1 = dZ2 @ self.W2.T
dZ1 = dA1 * self.relu_derivative(self.Z1)
dW1 = X.T @ dZ1
db1 = dZ1.sum(axis=0, keepdims=True)
가중치 업데이트
self.W1 -= learning_rate * dW1
self.b1 -= learning_rate * db1
self.W2 -= learning_rate * dW2
self.b2 -= learning_rate * db2
def train(self, X, y, epochs=100, learning_rate=0.01):
losses = []
for epoch in range(epochs):
y_pred = self.forward(X)
loss = self.cross_entropy_loss(y_pred, y)
losses.append(loss)
self.backward(X, y, learning_rate)
if epoch % 10 == 0:
acc = (y_pred.argmax(axis=1) == y).mean()
print(f"Epoch {epoch:3d}: Loss={loss:.4f}, Acc={acc:.4f}")
return losses
테스트
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=1000, n_features=20,
n_classes=3, n_informative=15,
random_state=42)
nn = SimpleNeuralNetwork(input_size=20, hidden_size=64, output_size=3)
losses = nn.train(X, y, epochs=50, learning_rate=0.1)
3. Pandas 완전 마스터
Pandas는 표 형식 데이터를 다루는 핵심 라이브러리입니다. DataFrame과 Series 자료구조를 제공하며 데이터 정제, 변환, 분석의 전 과정을 지원합니다.
Series와 DataFrame
Series 생성
s1 = pd.Series([1, 2, 3, 4, 5])
s2 = pd.Series([10, 20, 30], index=['a', 'b', 'c'])
s3 = pd.Series({'x': 100, 'y': 200, 'z': 300})
print(s2['a']) # 10
print(s2[['a', 'c']]) # a=10, c=30
DataFrame 생성
data = {
'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
'age': [25, 30, 35, 28, 22],
'score': [88.5, 92.3, 78.1, 95.7, 83.2],
'passed': [True, True, False, True, True]
}
df = pd.DataFrame(data)
print(df.head())
print(df.tail(3))
print(df.info())
print(df.describe())
print(df.dtypes)
print(df.shape) # (5, 4)
데이터 읽기/쓰기
CSV
df_csv = pd.read_csv('data.csv',
sep=',',
header=0,
index_col=0,
parse_dates=['date'],
encoding='utf-8',
na_values=['N/A', 'null', ''])
df_csv.to_csv('output.csv', index=False, encoding='utf-8-sig')
Excel
df_excel = pd.read_excel('data.xlsx',
sheet_name='Sheet1',
header=0)
df_excel.to_excel('output.xlsx', sheet_name='Result', index=False)
JSON
df_json = pd.read_json('data.json', orient='records')
df_json.to_json('output.json', orient='records', force_ascii=False, indent=2)
Parquet (고성능 컬럼형 형식)
df.to_parquet('data.parquet', engine='pyarrow', compression='snappy')
df_parquet = pd.read_parquet('data.parquet')
SQL (SQLite 예시)
conn = sqlite3.connect('database.db')
df_sql = pd.read_sql_query("SELECT * FROM users WHERE age > 25", conn)
df.to_sql('new_table', conn, if_exists='replace', index=False)
인덱싱 (loc, iloc)
df = pd.DataFrame({
'A': range(10),
'B': range(10, 20),
'C': range(20, 30)
}, index=[f'row{i}' for i in range(10)])
loc: 레이블 기반 인덱싱
print(df.loc['row0', 'A']) # 단일 값
print(df.loc['row0':'row3', 'A':'B']) # 범위 (끝 포함)
print(df.loc[['row1', 'row5'], 'C']) # 리스트
iloc: 위치 기반 인덱싱
print(df.iloc[0, 0]) # 0행 0열
print(df.iloc[0:4, 0:2]) # 범위 (끝 미포함)
print(df.iloc[[1, 5], 2]) # 리스트
조건 기반 선택
mask = df['A'] > 5
filtered = df[mask]
filtered2 = df[df['B'].between(12, 17)]
filtered3 = df.query('A > 5 and B < 18')
복합 조건
condition = (df['A'] > 3) & (df['B'] < 17) | (df['C'] >= 28)
result = df[condition]
at/iat (단일 값 접근 - 빠름)
val = df.at['row3', 'A']
val2 = df.iat[3, 0]
결측치 처리
결측치 있는 데이터 생성
df = pd.DataFrame({
'age': [25, np.nan, 35, np.nan, 22],
'income': [50000, 60000, np.nan, 80000, np.nan],
'city': ['Seoul', 'Busan', None, 'Incheon', 'Seoul'],
'score': [88.5, 92.3, 78.1, np.nan, 83.2]
})
결측치 확인
print(df.isnull())
print(df.isnull().sum()) # 열별 결측치 수
print(df.isnull().sum() / len(df) * 100) # 결측 비율(%)
결측치 삭제
df_dropped_rows = df.dropna() # 결측치 있는 행 삭제
df_dropped_cols = df.dropna(axis=1) # 결측치 있는 열 삭제
df_thresh = df.dropna(thresh=3) # 최소 3개 비결측값 필요
결측치 채우기
df_filled_0 = df.fillna(0) # 0으로 채우기
df_filled_mean = df.fillna(df.mean()) # 평균으로 채우기
df_filled_dict = df.fillna({
'age': df['age'].mean(),
'income': df['income'].median(),
'city': 'Unknown',
'score': df['score'].mean()
})
앞/뒤 값으로 채우기
df_ffill = df.fillna(method='ffill') # 앞 값으로 채우기
df_bfill = df.fillna(method='bfill') # 뒤 값으로 채우기
보간
df_interpolated = df.interpolate(method='linear')
결측치 확인 후 처리 패턴
for col in df.columns:
missing_pct = df[col].isnull().mean()
if missing_pct > 0.5:
df.drop(columns=[col], inplace=True)
elif df[col].dtype == 'object':
df[col].fillna(df[col].mode()[0], inplace=True)
else:
df[col].fillna(df[col].median(), inplace=True)
데이터 변환
df = pd.DataFrame({
'text': ['hello world', 'PYTHON IS GREAT', 'data science'],
'value': [1, 2, 3],
'category': ['A', 'B', 'A']
})
apply: 함수 적용
df['text_upper'] = df['text'].apply(str.upper)
df['text_length'] = df['text'].apply(len)
복잡한 함수
def process_text(text):
return ' '.join(word.capitalize() for word in text.lower().split())
df['text_processed'] = df['text'].apply(process_text)
여러 열에 동시 적용
def feature_engineer(row):
return pd.Series({
'value_squared': row['value'] ** 2,
'category_is_A': int(row['category'] == 'A')
})
new_features = df.apply(feature_engineer, axis=1)
df = pd.concat([df, new_features], axis=1)
map: 매핑 테이블 적용
category_map = {'A': 'Alpha', 'B': 'Beta', 'C': 'Gamma'}
df['category_name'] = df['category'].map(category_map)
transform: 그룹 내 변환 (그룹 크기 유지)
df['numeric'] = [10, 20, 30, 40, 50, 60]
df['category'] = ['A', 'B', 'A', 'B', 'A', 'B']
df['group_mean'] = df.groupby('category')['numeric'].transform('mean')
문자열 연산 (벡터화)
texts = pd.Series(['Hello World', 'Python 3.12', 'Machine Learning'])
print(texts.str.lower())
print(texts.str.split())
print(texts.str.contains('Python'))
print(texts.str.extract(r'(\w+)\s+(\w+)'))
그룹화 (groupby)
np.random.seed(42)
df = pd.DataFrame({
'team': np.random.choice(['A', 'B', 'C'], 100),
'role': np.random.choice(['dev', 'ds', 'pm'], 100),
'score': np.random.randint(60, 100, 100),
'salary': np.random.randint(3000, 8000, 100)
})
기본 groupby
grouped = df.groupby('team')
print(grouped['score'].mean())
print(grouped['salary'].describe())
다중 키
multi_grouped = df.groupby(['team', 'role'])
print(multi_grouped['score'].mean().unstack())
집계 함수
agg_result = df.groupby('team').agg(
avg_score=('score', 'mean'),
total_salary=('salary', 'sum'),
count=('score', 'count'),
max_score=('score', 'max'),
min_salary=('salary', 'min')
)
print(agg_result)
사용자 정의 집계
def iqr(x):
return x.quantile(0.75) - x.quantile(0.25)
custom_agg = df.groupby('team')['score'].agg([
'mean', 'median', 'std', iqr
])
filter: 조건을 만족하는 그룹만 선택
large_teams = df.groupby('team').filter(lambda x: len(x) > 30)
병합 (merge, join, concat)
users = pd.DataFrame({
'user_id': [1, 2, 3, 4, 5],
'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
'age': [25, 30, 35, 28, 22]
})
orders = pd.DataFrame({
'order_id': [101, 102, 103, 104, 105, 106],
'user_id': [1, 2, 1, 3, 5, 6],
'amount': [150, 250, 80, 320, 190, 440]
})
Inner Join (교집합)
inner = pd.merge(users, orders, on='user_id', how='inner')
Left Join
left = pd.merge(users, orders, on='user_id', how='left')
Right Join
right = pd.merge(users, orders, on='user_id', how='right')
Outer Join (합집합)
outer = pd.merge(users, orders, on='user_id', how='outer')
다른 키로 병합
merged = pd.merge(users, orders,
left_on='user_id', right_on='user_id',
suffixes=('_user', '_order'))
concat
df1 = pd.DataFrame({'A': [1, 2], 'B': [3, 4]})
df2 = pd.DataFrame({'A': [5, 6], 'B': [7, 8]})
df3 = pd.DataFrame({'C': [9, 10], 'D': [11, 12]})
vertical = pd.concat([df1, df2], axis=0, ignore_index=True)
horizontal = pd.concat([df1, df3], axis=1)
실전: AI 훈련 데이터 전처리 파이프라인
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
def preprocess_ml_data(filepath):
"""AI 훈련을 위한 데이터 전처리 파이프라인"""
1. 데이터 로드
df = pd.read_csv(filepath)
print(f"원본 데이터: {df.shape}")
2. 중복 제거
df = df.drop_duplicates()
print(f"중복 제거 후: {df.shape}")
3. 결측치 처리
numeric_cols = df.select_dtypes(include=[np.number]).columns
cat_cols = df.select_dtypes(include=['object']).columns
for col in numeric_cols:
df[col].fillna(df[col].median(), inplace=True)
for col in cat_cols:
df[col].fillna(df[col].mode()[0], inplace=True)
4. 이상치 처리 (IQR 방법)
for col in numeric_cols:
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - 1.5 * IQR
upper = Q3 + 1.5 * IQR
df[col] = df[col].clip(lower=lower, upper=upper)
5. 범주형 변수 인코딩
le = LabelEncoder()
for col in cat_cols:
if df[col].nunique() <= 10:
df[col] = le.fit_transform(df[col].astype(str))
else:
고카디널리티: 빈도 인코딩
freq_map = df[col].value_counts().to_dict()
df[col] = df[col].map(freq_map)
6. 특성 엔지니어링
if 'date' in df.columns:
df['date'] = pd.to_datetime(df['date'])
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
df['dayofweek'] = df['date'].dt.dayofweek
df.drop('date', axis=1, inplace=True)
return df
타이타닉 데이터 전처리 예시
def preprocess_titanic(df):
df = df.copy()
특성 엔지니어링
df['Title'] = df['Name'].str.extract(r' ([A-Za-z]+)\.')
title_map = {'Mr': 0, 'Miss': 1, 'Mrs': 2, 'Master': 3}
df['Title'] = df['Title'].map(title_map).fillna(4)
df['FamilySize'] = df['SibSp'] + df['Parch'] + 1
df['IsAlone'] = (df['FamilySize'] == 1).astype(int)
결측치 처리
df['Age'].fillna(df.groupby('Title')['Age'].transform('median'), inplace=True)
df['Fare'].fillna(df['Fare'].median(), inplace=True)
df['Embarked'].fillna(df['Embarked'].mode()[0], inplace=True)
인코딩
df['Sex'] = (df['Sex'] == 'male').astype(int)
df['Embarked'] = df['Embarked'].map({'S': 0, 'C': 1, 'Q': 2})
features = ['Pclass', 'Sex', 'Age', 'Fare', 'Embarked',
'FamilySize', 'IsAlone', 'Title']
return df[features]
4. Matplotlib & Seaborn 시각화
기본 플롯
한글 폰트 설정 (Mac)
plt.rcParams['font.family'] = 'AppleGothic'
plt.rcParams['axes.unicode_minus'] = False
기본 설정
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
라인 플롯
x = np.linspace(0, 2 * np.pi, 100)
axes[0, 0].plot(x, np.sin(x), 'b-', linewidth=2, label='sin(x)')
axes[0, 0].plot(x, np.cos(x), 'r--', linewidth=2, label='cos(x)')
axes[0, 0].set_title('삼각함수')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
막대 플롯
categories = ['분류', '회귀', '군집화', '차원축소']
values = [85, 72, 68, 91]
bars = axes[0, 1].bar(categories, values, color=['#3498db', '#e74c3c', '#2ecc71', '#f39c12'])
axes[0, 1].set_title('알고리즘별 정확도')
axes[0, 1].set_ylabel('정확도 (%)')
for bar, val in zip(bars, values):
axes[0, 1].text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.5,
f'{val}%', ha='center', va='bottom')
산점도
np.random.seed(42)
x_scatter = np.random.randn(100)
y_scatter = 2 * x_scatter + np.random.randn(100) * 0.5
axes[0, 2].scatter(x_scatter, y_scatter, alpha=0.6, c=y_scatter, cmap='viridis')
axes[0, 2].set_title('산점도')
히스토그램
data = np.concatenate([
np.random.normal(0, 1, 500),
np.random.normal(4, 1.5, 300)
])
axes[1, 0].hist(data, bins=50, density=True, alpha=0.7, color='steelblue')
axes[1, 0].set_title('데이터 분포')
박스 플롯
box_data = [np.random.normal(i, 1, 100) for i in range(5)]
axes[1, 1].boxplot(box_data, labels=[f'모델{i+1}' for i in range(5)])
axes[1, 1].set_title('모델별 성능 분포')
파이 차트
sizes = [35, 25, 20, 12, 8]
labels = ['Python', 'R', 'Scala', 'Java', '기타']
explode = (0.05, 0, 0, 0, 0)
axes[1, 2].pie(sizes, labels=labels, explode=explode, autopct='%1.1f%%',
shadow=True, startangle=90)
axes[1, 2].set_title('언어 사용 비율')
plt.tight_layout()
plt.savefig('basic_plots.png', dpi=150, bbox_inches='tight')
plt.show()
Seaborn 통계 시각화
스타일 설정
sns.set_theme(style='whitegrid', palette='husl', font_scale=1.2)
샘플 데이터
df = pd.DataFrame({
'model': np.repeat(['ResNet', 'VGG', 'EfficientNet', 'ViT'], 50),
'accuracy': np.concatenate([
np.random.normal(92, 2, 50),
np.random.normal(88, 3, 50),
np.random.normal(94, 1.5, 50),
np.random.normal(95, 2.5, 50)
]),
'params_M': np.concatenate([
np.random.normal(25, 2, 50),
np.random.normal(138, 5, 50),
np.random.normal(5.3, 0.3, 50),
np.random.normal(86, 3, 50)
]),
'training_time': np.concatenate([
np.random.exponential(10, 50),
np.random.exponential(20, 50),
np.random.exponential(8, 50),
np.random.exponential(15, 50)
])
})
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
바이올린 플롯
sns.violinplot(data=df, x='model', y='accuracy', ax=axes[0, 0])
axes[0, 0].set_title('모델별 정확도 분포')
박스 플롯 with swarm
sns.boxplot(data=df, x='model', y='accuracy', ax=axes[0, 1])
sns.swarmplot(data=df, x='model', y='accuracy', color='black',
size=2, ax=axes[0, 1])
axes[0, 1].set_title('정확도 상세 분포')
히트맵 (상관계수)
corr_data = df[['accuracy', 'params_M', 'training_time']].corr()
sns.heatmap(corr_data, annot=True, fmt='.2f', cmap='RdYlGn',
center=0, ax=axes[0, 2])
axes[0, 2].set_title('변수 간 상관관계')
산점도 + 회귀선
sns.regplot(data=df, x='params_M', y='accuracy',
scatter_kws={'alpha': 0.4}, ax=axes[1, 0])
axes[1, 0].set_title('파라미터 수 vs 정확도')
분포 플롯 (KDE + 히스토그램)
for model in df['model'].unique():
subset = df[df['model'] == model]
sns.kdeplot(data=subset, x='accuracy', label=model, ax=axes[1, 1])
axes[1, 1].set_title('모델별 정확도 분포 (KDE)')
axes[1, 1].legend()
Facet Grid (고급)
axes[1, 2]는 별도로 처리
axes[1, 2].remove()
plt.tight_layout()
plt.savefig('seaborn_plots.png', dpi=150, bbox_inches='tight')
plt.show()
실전: 학습 곡선, 혼동 행렬 시각화
from sklearn.metrics import confusion_matrix
def plot_learning_curve(train_losses, val_losses, train_accs, val_accs):
"""학습 곡선 시각화"""
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
epochs = range(1, len(train_losses) + 1)
ax1.plot(epochs, train_losses, 'b-', label='훈련 손실', linewidth=2)
ax1.plot(epochs, val_losses, 'r--', label='검증 손실', linewidth=2)
ax1.fill_between(epochs, train_losses, val_losses, alpha=0.1, color='gray')
ax1.set_xlabel('에폭')
ax1.set_ylabel('손실')
ax1.set_title('손실 곡선')
ax1.legend()
ax1.grid(True, alpha=0.3)
ax2.plot(epochs, train_accs, 'b-', label='훈련 정확도', linewidth=2)
ax2.plot(epochs, val_accs, 'r--', label='검증 정확도', linewidth=2)
best_epoch = np.argmax(val_accs)
ax2.axvline(x=best_epoch + 1, color='g', linestyle=':', label=f'최적 에폭 ({best_epoch+1})')
ax2.set_xlabel('에폭')
ax2.set_ylabel('정확도')
ax2.set_title('정확도 곡선')
ax2.legend()
ax2.grid(True, alpha=0.3)
plt.tight_layout()
return fig
def plot_confusion_matrix(y_true, y_pred, class_names):
"""혼동 행렬 시각화"""
cm = confusion_matrix(y_true, y_pred)
cm_pct = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
절댓값
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=class_names, yticklabels=class_names, ax=ax1)
ax1.set_title('혼동 행렬 (절댓값)')
ax1.set_ylabel('실제 레이블')
ax1.set_xlabel('예측 레이블')
비율
sns.heatmap(cm_pct, annot=True, fmt='.2%', cmap='Greens',
xticklabels=class_names, yticklabels=class_names, ax=ax2)
ax2.set_title('혼동 행렬 (비율)')
ax2.set_ylabel('실제 레이블')
ax2.set_xlabel('예측 레이블')
plt.tight_layout()
return fig
5. Scikit-learn으로 머신러닝
데이터 전처리
from sklearn.preprocessing import (
StandardScaler, MinMaxScaler, RobustScaler,
LabelEncoder, OneHotEncoder, OrdinalEncoder
)
X = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]], dtype=float)
StandardScaler: 평균 0, 표준편차 1
scaler = StandardScaler()
X_standard = scaler.fit_transform(X)
MinMaxScaler: [0, 1] 범위
min_max = MinMaxScaler(feature_range=(0, 1))
X_minmax = min_max.fit_transform(X)
RobustScaler: 이상치에 강건 (중앙값, IQR 사용)
robust = RobustScaler()
X_robust = robust.fit_transform(X)
LabelEncoder: 범주형 → 숫자
le = LabelEncoder()
labels = ['cat', 'dog', 'bird', 'cat', 'dog']
encoded = le.fit_transform(labels) # [0, 2, 1, 0, 2]
decoded = le.inverse_transform(encoded)
OneHotEncoder
ohe = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
categories = np.array([['red'], ['green'], ['blue'], ['red']])
encoded_ohe = ohe.fit_transform(categories)
특성 선택 및 추출
from sklearn.decomposition import PCA
from sklearn.feature_selection import (
SelectKBest, f_classif, mutual_info_classif,
RFE, SelectFromModel
)
from sklearn.ensemble import RandomForestClassifier
X = np.random.randn(200, 20)
y = (X[:, 0] + X[:, 1] + np.random.randn(200) * 0.1 > 0).astype(int)
PCA (주성분 분석)
pca = PCA(n_components=10)
X_pca = pca.fit_transform(X)
print(f"설명 분산 비율: {pca.explained_variance_ratio_.sum():.2%}")
설명 분산 누적 플롯
cumsum = np.cumsum(pca.explained_variance_ratio_)
plt.figure(figsize=(8, 4))
plt.plot(range(1, len(cumsum)+1), cumsum * 100)
plt.xlabel('주성분 수')
plt.ylabel('누적 설명 분산 (%)')
plt.axhline(y=95, color='r', linestyle='--', label='95%')
plt.legend()
plt.grid(True)
SelectKBest
selector = SelectKBest(f_classif, k=5)
X_kbest = selector.fit_transform(X, y)
selected_features = selector.get_support(indices=True)
print(f"선택된 특성 인덱스: {selected_features}")
RFE (재귀적 특성 제거)
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rfe = RFE(estimator=rf, n_features_to_select=5)
X_rfe = rfe.fit_transform(X, y)
특성 중요도 기반 선택
rf.fit(X, y)
sfm = SelectFromModel(rf, threshold='mean')
X_sfm = sfm.fit_transform(X, y)
선형 모델
from sklearn.linear_model import (
LinearRegression, LogisticRegression,
Ridge, Lasso, ElasticNet
)
from sklearn.datasets import make_classification, make_regression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score, accuracy_score
회귀
X_reg, y_reg = make_regression(n_samples=500, n_features=20,
noise=0.1, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X_reg, y_reg, test_size=0.2)
Linear Regression
lr = LinearRegression()
lr.fit(X_train, y_train)
y_pred = lr.predict(X_test)
print(f"Linear R2: {r2_score(y_test, y_pred):.4f}")
Ridge (L2 정규화)
ridge = Ridge(alpha=1.0)
ridge.fit(X_train, y_train)
print(f"Ridge R2: {r2_score(y_test, ridge.predict(X_test)):.4f}")
Lasso (L1 정규화, 특성 선택 효과)
lasso = Lasso(alpha=0.01)
lasso.fit(X_train, y_train)
print(f"Lasso R2: {r2_score(y_test, lasso.predict(X_test)):.4f}")
print(f"Non-zero coefficients: {np.sum(lasso.coef_ != 0)}")
분류
X_cls, y_cls = make_classification(n_samples=500, n_features=20,
n_classes=3, n_informative=10,
random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X_cls, y_cls, test_size=0.2)
logistic = LogisticRegression(C=1.0, max_iter=1000, solver='lbfgs',
multi_class='multinomial')
logistic.fit(X_train, y_train)
print(f"Logistic Accuracy: {accuracy_score(y_test, logistic.predict(X_test)):.4f}")
트리 기반 모델
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import (
RandomForestClassifier, GradientBoostingClassifier,
AdaBoostClassifier, ExtraTreesClassifier
)
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
X, y = make_classification(n_samples=1000, n_features=20,
n_classes=2, n_informative=10,
random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
models = {
'Decision Tree': DecisionTreeClassifier(max_depth=5, random_state=42),
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1),
'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42),
'XGBoost': xgb.XGBClassifier(n_estimators=100, random_state=42, eval_metric='logloss'),
}
results = {}
for name, model in models.items():
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
acc = (y_pred == y_test).mean()
results[name] = acc
print(f"{name}: {acc:.4f}")
특성 중요도 시각화
rf_model = models['Random Forest']
importances = rf_model.feature_importances_
indices = np.argsort(importances)[::-1][:10]
plt.figure(figsize=(10, 6))
plt.bar(range(10), importances[indices])
plt.xticks(range(10), [f'F{i}' for i in indices])
plt.title('상위 10개 특성 중요도')
plt.xlabel('특성')
plt.ylabel('중요도')
plt.tight_layout()
plt.show()
모델 평가와 교차 검증
from sklearn.model_selection import (
cross_val_score, StratifiedKFold,
GridSearchCV, RandomizedSearchCV
)
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, roc_auc_score
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
K-Fold 교차 검증
rf = RandomForestClassifier(n_estimators=100, random_state=42)
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(rf, X, y, cv=cv, scoring='accuracy', n_jobs=-1)
print(f"CV Accuracy: {scores.mean():.4f} (+/- {scores.std() * 2:.4f})")
GridSearchCV
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [None, 5, 10],
'min_samples_split': [2, 5, 10]
}
grid_search = GridSearchCV(
RandomForestClassifier(random_state=42),
param_grid,
cv=5,
scoring='accuracy',
n_jobs=-1,
verbose=1
)
grid_search.fit(X, y)
print(f"최적 파라미터: {grid_search.best_params_}")
print(f"최고 CV 점수: {grid_search.best_score_:.4f}")
최종 모델로 테스트셋 평가
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
best_model = grid_search.best_estimator_
best_model.fit(X_train, y_train)
y_pred = best_model.predict(X_test)
print(classification_report(y_test, y_pred))
파이프라인
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
예시 데이터
np.random.seed(42)
n = 500
df = pd.DataFrame({
'age': np.random.randint(18, 70, n).astype(float),
'income': np.random.randint(20000, 100000, n).astype(float),
'education': np.random.choice(['High School', 'Bachelor', 'Master', 'PhD'], n),
'city': np.random.choice(['Seoul', 'Busan', 'Incheon', 'Daegu'], n),
'target': np.random.randint(0, 2, n)
})
일부 결측치 추가
df.loc[np.random.choice(n, 50), 'age'] = np.nan
df.loc[np.random.choice(n, 30), 'income'] = np.nan
df.loc[np.random.choice(n, 20), 'education'] = None
X = df.drop('target', axis=1)
y = df['target']
수치형, 범주형 열 분리
numeric_features = ['age', 'income']
categorical_features = ['education', 'city']
수치형 전처리 파이프라인
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
범주형 전처리 파이프라인
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
ColumnTransformer로 합치기
preprocessor = ColumnTransformer(transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
])
전체 파이프라인
full_pipeline = Pipeline(steps=[
('preprocessor', preprocessor),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
학습 및 평가
from sklearn.model_selection import cross_val_score
scores = cross_val_score(full_pipeline, X, y, cv=5, scoring='accuracy')
print(f"파이프라인 CV 정확도: {scores.mean():.4f} ± {scores.std():.4f}")
실전: 타이타닉 생존 예측
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import cross_val_score, GridSearchCV
from sklearn.metrics import classification_report, roc_auc_score
데이터 로드 (실제로는 pd.read_csv('titanic.csv'))
여기서는 예시 데이터 생성
np.random.seed(42)
n = 891
df = pd.DataFrame({
'Pclass': np.random.choice([1, 2, 3], n, p=[0.24, 0.21, 0.55]),
'Sex': np.random.choice(['male', 'female'], n, p=[0.65, 0.35]),
'Age': np.random.uniform(1, 80, n),
'SibSp': np.random.randint(0, 5, n),
'Parch': np.random.randint(0, 5, n),
'Fare': np.random.exponential(50, n),
'Embarked': np.random.choice(['S', 'C', 'Q'], n, p=[0.72, 0.19, 0.09]),
'Survived': np.random.randint(0, 2, n)
})
결측치 추가
df.loc[np.random.choice(n, 177), 'Age'] = np.nan
특성 엔지니어링
df['FamilySize'] = df['SibSp'] + df['Parch'] + 1
df['IsAlone'] = (df['FamilySize'] == 1).astype(int)
df['Sex_binary'] = (df['Sex'] == 'male').astype(int)
df['Fare_log'] = np.log1p(df['Fare'])
feature_cols = ['Pclass', 'Sex_binary', 'Age', 'FamilySize', 'IsAlone',
'Fare_log', 'Embarked']
X = df[feature_cols]
y = df['Survived']
전처리 파이프라인
numeric_features = ['Age', 'FamilySize', 'Fare_log', 'Pclass']
categorical_features = ['Embarked']
binary_features = ['Sex_binary', 'IsAlone']
preprocessor = ColumnTransformer(transformers=[
('num', Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
]), numeric_features),
('cat', Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
]), categorical_features),
('bin', 'passthrough', binary_features)
])
모델 파이프라인
pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', GradientBoostingClassifier(random_state=42))
])
하이퍼파라미터 탐색
param_grid = {
'classifier__n_estimators': [100, 200],
'classifier__max_depth': [3, 5],
'classifier__learning_rate': [0.05, 0.1]
}
grid_search = GridSearchCV(pipeline, param_grid, cv=5, scoring='roc_auc', n_jobs=-1)
grid_search.fit(X, y)
print(f"최적 AUC: {grid_search.best_score_:.4f}")
print(f"최적 파라미터: {grid_search.best_params_}")
6. Python 성능 최적화
리스트 컴프리헨션 vs map vs for
n = 1_000_000
data = list(range(n))
for loop
start = time.time()
result_for = []
for x in data:
result_for.append(x ** 2)
print(f"For loop: {time.time() - start:.4f}s")
리스트 컴프리헨션
start = time.time()
result_lc = [x ** 2 for x in data]
print(f"List comprehension: {time.time() - start:.4f}s")
map
start = time.time()
result_map = list(map(lambda x: x ** 2, data))
print(f"Map: {time.time() - start:.4f}s")
NumPy 벡터화
arr = np.array(data)
start = time.time()
result_np = arr ** 2
print(f"NumPy: {time.time() - start:.4f}s")
딕셔너리/집합 컴프리헨션
squares_dict = {x: x**2 for x in range(10)}
even_set = {x for x in range(20) if x % 2 == 0}
제너레이터
리스트 vs 제너레이터 메모리 비교
list_comp = [x**2 for x in range(1_000_000)]
gen_expr = (x**2 for x in range(1_000_000))
print(f"List size: {sys.getsizeof(list_comp):,} bytes") # ~8MB
print(f"Generator size: {sys.getsizeof(gen_expr)} bytes") # ~120 bytes
제너레이터 함수
def infinite_data_loader(dataset, batch_size=32):
"""무한 데이터 로더 제너레이터"""
while True:
indices = np.random.permutation(len(dataset))
for i in range(0, len(dataset), batch_size):
batch_indices = indices[i:i + batch_size]
yield dataset[batch_indices]
대용량 파일 처리
def read_large_csv(filepath, chunk_size=1000):
"""대용량 CSV를 청크 단위로 읽기"""
for chunk in pd.read_csv(filepath, chunksize=chunk_size):
yield chunk
yield from
def flatten(nested):
for item in nested:
if isinstance(item, list):
yield from flatten(item)
else:
yield item
print(list(flatten([[1, [2, 3]], [4, [5, [6]]]])))
[1, 2, 3, 4, 5, 6]
병렬 처리
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def cpu_intensive_task(n):
"""CPU 집약적 작업"""
return sum(i**2 for i in range(n))
def io_bound_task(url):
"""I/O 집약적 작업 (시뮬레이션)"""
time.sleep(0.1)
return f"Fetched: {url}"
ThreadPoolExecutor: I/O 작업에 적합
urls = [f"https://example.com/data/{i}" for i in range(20)]
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(io_bound_task, urls))
print(f"ThreadPool 시간: {time.time() - start:.2f}s")
ProcessPoolExecutor: CPU 작업에 적합
numbers = [1_000_000] * 8
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(cpu_intensive_task, numbers))
print(f"ProcessPool 시간: {time.time() - start:.2f}s")
multiprocessing Pool
def worker(args):
data, label = args
return data * label
if __name__ == '__main__':
with multiprocessing.Pool(processes=4) as pool:
tasks = [(np.random.randn(100), i) for i in range(100)]
results = pool.map(worker, tasks)
Numba로 JIT 컴파일
from numba import jit, njit, prange
JIT 컴파일 (첫 실행 시 컴파일, 이후 빠름)
@njit(parallel=True)
def fast_matrix_norm(A):
"""병렬화된 행렬 놈 계산"""
n, m = A.shape
result = 0.0
for i in prange(n):
for j in prange(m):
result += A[i, j] ** 2
return result ** 0.5
A = np.random.randn(1000, 1000)
워밍업 (첫 실행 = JIT 컴파일)
_ = fast_matrix_norm(A)
실제 벤치마크
start = time.time()
for _ in range(10):
result = fast_matrix_norm(A)
print(f"Numba: {time.time() - start:.4f}s")
start = time.time()
for _ in range(10):
result = np.linalg.norm(A)
print(f"NumPy: {time.time() - start:.4f}s")
7. AI/ML 유틸리티 라이브러리
tqdm - 진행 바
from tqdm import tqdm, trange
기본 사용
for i in tqdm(range(100)):
time.sleep(0.01)
커스텀 설명
items = list(range(50))
for item in tqdm(items, desc='처리 중', unit='샘플'):
pass
Nested tqdm
for epoch in trange(10, desc='에폭'):
for batch in trange(100, desc='배치', leave=False):
pass
수동 업데이트
with tqdm(total=100, desc='학습') as pbar:
for i in range(10):
pbar.update(10)
pbar.set_postfix({'loss': 0.5 - i * 0.04, 'acc': 0.7 + i * 0.02})
Pandas 통합
tqdm.pandas()
df = pd.DataFrame({'x': range(1000)})
df['x_squared'] = df['x'].progress_apply(lambda x: x ** 2)
Weights & Biases (wandb) - 실험 추적
초기화
wandb.init(
project="ml-experiment",
name="run-001",
config={
"learning_rate": 0.001,
"batch_size": 32,
"epochs": 100,
"model": "ResNet50",
"optimizer": "AdamW"
}
)
학습 루프에서 메트릭 기록
for epoch in range(100):
train_loss = 1.0 - epoch * 0.009 + np.random.normal(0, 0.01)
val_loss = 1.0 - epoch * 0.008 + np.random.normal(0, 0.02)
train_acc = epoch * 0.009 + np.random.normal(0, 0.01)
val_acc = epoch * 0.008 + np.random.normal(0, 0.02)
wandb.log({
"epoch": epoch,
"train/loss": train_loss,
"val/loss": val_loss,
"train/acc": train_acc,
"val/acc": val_acc,
"learning_rate": 0.001 * (0.95 ** epoch)
})
모델 저장
wandb.save('model.pt')
wandb.finish()
Hydra - 설정 관리
config/config.yaml
model:
type: resnet50
pretrained: true
training:
epochs: 100
batch_size: 32
learning_rate: 0.001
data:
path: /data/imagenet
num_workers: 4
from hydra import initialize, compose
from omegaconf import DictConfig, OmegaConf
@hydra.main(config_path="config", config_name="config", version_base="1.3")
def train(cfg: DictConfig) -> None:
print(OmegaConf.to_yaml(cfg))
설정 사용
model_type = cfg.model.type
lr = cfg.training.learning_rate
epochs = cfg.training.epochs
print(f"모델: {model_type}, LR: {lr}, 에폭: {epochs}")
커맨드라인에서 오버라이드:
python train.py model.type=vgg16 training.learning_rate=0.0001
pytest - 테스트
tests/test_preprocessing.py
def normalize(x):
"""데이터 정규화"""
return (x - x.mean()) / x.std()
def process_dataframe(df):
"""DataFrame 전처리"""
df = df.copy()
df = df.dropna()
df['value'] = normalize(df['value'])
return df
class TestNormalize:
def test_mean_zero(self):
x = np.array([1.0, 2.0, 3.0, 4.0, 5.0])
result = normalize(x)
assert abs(result.mean()) < 1e-10
def test_std_one(self):
x = np.array([1.0, 2.0, 3.0, 4.0, 5.0])
result = normalize(x)
assert abs(result.std() - 1.0) < 1e-10
def test_shape_preserved(self):
x = np.random.randn(10, 5)
result = normalize(x)
assert result.shape == x.shape
@pytest.fixture
def sample_df():
return pd.DataFrame({
'value': [1.0, 2.0, np.nan, 4.0, 5.0],
'label': ['a', 'b', 'c', 'd', 'e']
})
def test_process_dataframe_removes_nan(sample_df):
result = process_dataframe(sample_df)
assert result.isnull().sum().sum() == 0
def test_process_dataframe_normalizes(sample_df):
result = process_dataframe(sample_df)
assert abs(result['value'].mean()) < 1e-10
실행: pytest tests/ -v --coverage
마무리
이 가이드에서는 AI/ML 개발에 필요한 Python 생태계의 핵심을 다뤘습니다.
- **환경 설정**: venv, conda, poetry로 프로젝트 격리
- **NumPy**: 벡터화 연산으로 고성능 수치 계산
- **Pandas**: 데이터 전처리와 분석 파이프라인 구축
- **Matplotlib/Seaborn**: 풍부한 시각화로 인사이트 발견
- **Scikit-learn**: 전처리부터 모델 평가까지 완전한 ML 워크플로
- **성능 최적화**: 제너레이터, 병렬 처리, Numba JIT
- **유틸리티**: tqdm, wandb, hydra, pytest
실전 ML 프로젝트에서는 이 모든 도구를 조합하여 데이터 로드 → 전처리 → 특성 엔지니어링 → 모델 학습 → 평가 → 배포의 파이프라인을 구축합니다. 각 도구의 공식 문서를 참고하여 더 깊이 학습하시길 권장합니다.
참고 자료
- [NumPy 공식 문서](https://numpy.org/doc/stable/)
- [Pandas 공식 문서](https://pandas.pydata.org/docs/)
- [Scikit-learn 공식 문서](https://scikit-learn.org/stable/)
- [Matplotlib 공식 문서](https://matplotlib.org/stable/)
- [Seaborn 공식 문서](https://seaborn.pydata.org/)
- [Weights & Biases 문서](https://docs.wandb.ai/)
현재 단락 (1/1040)
Python은 AI와 머신러닝 분야의 표준 언어입니다. 간결한 문법, 방대한 라이브러리 생태계, 활성화된 커뮤니티 덕분에 연구자와 엔지니어 모두가 선택하는 언어가 되었습니다. 이 ...