Skip to content

필사 모드: MLOps 완전 정복 가이드: ML 파이프라인부터 프로덕션 배포까지

한국어
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

들어가며

현대의 머신러닝 프로젝트는 Jupyter Notebook에서 모델을 만들고 끝나는 게 아닙니다. 데이터 수집, 전처리, 학습, 평가, 배포, 모니터링, 재학습까지 전체 라이프사이클을 안정적으로 관리해야 합니다. 이것이 바로 **MLOps**가 등장한 이유입니다.

이 가이드에서는 MLOps의 기초 개념부터 실전 도구 활용까지 완전히 정복합니다.

1. MLOps란 무엇인가?

1.1 MLOps의 정의

MLOps(Machine Learning Operations)는 ML 시스템의 개발과 운영을 통합하는 방법론입니다. DevOps의 철학을 ML 워크플로에 적용한 것으로, 모델의 지속적인 학습(CT), 지속적인 통합(CI), 지속적인 배포(CD)를 자동화합니다.

1.2 MLOps vs DevOps vs DataOps

| 구분 | DevOps | DataOps | MLOps |

| ------ | --------------- | ----------------- | --------------------- |

| 초점 | 소프트웨어 배포 | 데이터 파이프라인 | ML 모델 라이프사이클 |

| 산출물 | 애플리케이션 | 데이터/리포트 | ML 모델 |

| 재현성 | 코드 버전 | 데이터 버전 | 코드+데이터+모델 버전 |

| 자동화 | CI/CD | 데이터 테스트 | 학습+평가+배포 자동화 |

MLOps가 DevOps와 다른 핵심은 **데이터 의존성**입니다. 동일한 코드라도 다른 데이터로 학습하면 전혀 다른 모델이 생성되며, 모델 성능은 코드 품질뿐만 아니라 데이터 품질에도 크게 좌우됩니다.

1.3 ML 시스템의 특수성

ML 시스템은 전통적인 소프트웨어와 다른 몇 가지 고유한 특성을 가집니다:

1. **데이터 의존성**: 모델 동작이 코드가 아닌 데이터에 의해 결정됨

2. **실험적 특성**: 수십~수백 번의 실험 반복이 필요

3. **모델 부식(Model Decay)**: 시간이 지남에 따라 데이터 분포가 변하면 모델 성능 저하

4. **재현성 문제**: 동일한 환경에서 동일한 결과를 보장하기 어려움

5. **멀티플 아티팩트**: 코드, 데이터, 모델 모두 버전 관리 필요

1.4 MLOps 성숙도 모델

Google의 MLOps 성숙도 레벨을 기준으로 설명합니다:

**레벨 0 - 수동 ML**

- 모든 과정이 수동

- 스크립트 기반 실험

- 배포가 드물고 수동

- 모니터링 없음

**레벨 1 - ML 파이프라인 자동화**

- 학습 파이프라인 자동화

- 지속적 학습 가능

- 실험 추적 시작

- 피처 스토어 도입

**레벨 2 - CI/CD 파이프라인 자동화**

- CI/CD 완전 자동화

- 모델 레지스트리 활용

- 자동 재학습 트리거

- 완전한 모니터링

1.5 MLOps 도구 에코시스템

데이터 버전 관리: DVC, LakeFS, Delta Lake

실험 추적: MLflow, W&B, Neptune, Comet ML

파이프라인: Airflow, Prefect, Metaflow, Kubeflow Pipelines

모델 레지스트리: MLflow Registry, W&B Artifacts, Vertex AI

컨테이너화: Docker, Podman

오케스트레이션: Kubernetes, ECS, GKE

모델 서빙: Triton, TorchServe, BentoML, KServe

모니터링: Evidently, WhyLogs, Arize, Fiddler

피처 스토어: Feast, Tecton, Vertex AI Feature Store

2. 데이터 버전 관리 (DVC)

2.1 DVC 소개

DVC(Data Version Control)는 Git과 함께 사용하는 ML 프로젝트용 버전 관리 도구입니다. 대용량 데이터셋과 ML 모델을 Git처럼 버전 관리할 수 있습니다.

DVC 설치

pip install dvc

S3 지원 포함 설치

pip install "dvc[s3]"

GCS 지원 포함 설치

pip install "dvc[gs]"

모든 원격 저장소 지원

pip install "dvc[all]"

2.2 DVC 초기화 및 기본 사용

Git 저장소 초기화 (없다면)

git init

DVC 초기화

dvc init

생성된 파일 확인

ls .dvc/

config .gitignore tmp/

데이터 추적 시작

dvc add data/train.csv

.dvc 파일이 생성됨 (Git으로 추적)

git add data/train.csv.dvc data/.gitignore

git commit -m "Add training data"

2.3 원격 스토리지 설정

S3 원격 스토리지 설정

dvc remote add -d myremote s3://my-bucket/dvc-store

AWS 자격증명 설정 (환경변수 또는 ~/.aws/credentials)

dvc remote modify myremote access_key_id YOUR_ACCESS_KEY

dvc remote modify myremote secret_access_key YOUR_SECRET_KEY

GCS 원격 스토리지

dvc remote add -d gcsstorage gs://my-bucket/dvc-store

로컬 원격 스토리지 (테스트용)

dvc remote add -d localremote /tmp/dvc-storage

데이터 푸시

dvc push

데이터 풀

dvc pull

2.4 DVC 파이프라인

DVC 파이프라인은 각 단계의 의존성을 추적하고 변경된 단계만 재실행합니다.

dvc.yaml

stages:

prepare:

cmd: python src/prepare.py

deps:

- src/prepare.py

- data/raw

outs:

- data/prepared

featurize:

cmd: python src/featurize.py

deps:

- src/featurize.py

- data/prepared

outs:

- data/features

train:

cmd: python src/train.py

deps:

- src/train.py

- data/features

params:

- params.yaml:

- train.lr

- train.n_estimators

outs:

- models/model.pkl

metrics:

- metrics/scores.json:

cache: false

evaluate:

cmd: python src/evaluate.py

deps:

- src/evaluate.py

- models/model.pkl

- data/features

metrics:

- metrics/eval.json:

cache: false

파이프라인 실행

dvc repro

특정 단계까지 실행

dvc repro train

파이프라인 DAG 시각화

dvc dag

실험 결과 비교

dvc metrics show

dvc metrics diff HEAD~1

2.5 파라미터 관리

params.yaml

train:

lr: 0.001

n_estimators: 100

max_depth: 5

batch_size: 32

epochs: 10

data:

test_size: 0.2

random_state: 42

src/train.py

파라미터 로드

with open("params.yaml") as f:

params = yaml.safe_load(f)

lr = params["train"]["lr"]

n_estimators = params["train"]["n_estimators"]

모델 학습

from sklearn.ensemble import RandomForestClassifier

model = RandomForestClassifier(

n_estimators=n_estimators,

max_depth=params["train"]["max_depth"],

random_state=params["data"]["random_state"]

)

2.6 Git + DVC 워크플로

새 실험 브랜치 생성

git checkout -b experiment/increase-lr

파라미터 수정

params.yaml에서 lr: 0.001 → lr: 0.01로 변경

파이프라인 재실행

dvc repro

결과 확인

dvc metrics show

커밋

git add dvc.lock params.yaml

git commit -m "Experiment: increase learning rate to 0.01"

실험 비교

git checkout main

dvc metrics diff experiment/increase-lr

3. 실험 추적 (Experiment Tracking)

3.1 MLflow 설치 및 설정

MLflow 설치

pip install mlflow

MLflow UI 시작

mlflow ui

특정 포트와 호스트 지정

mlflow ui --host 0.0.0.0 --port 5001

원격 추적 서버 설정

export MLFLOW_TRACKING_URI=http://mlflow-server:5000

3.2 MLflow 기본 사용

from sklearn.ensemble import RandomForestClassifier

from sklearn.model_selection import train_test_split

from sklearn.metrics import accuracy_score, f1_score

추적 URI 설정 (기본값: ./mlruns)

mlflow.set_tracking_uri("http://localhost:5000")

실험 설정

mlflow.set_experiment("my-classification-experiment")

실험 실행

with mlflow.start_run(run_name="random-forest-v1"):

파라미터 로깅

n_estimators = 100

max_depth = 5

lr = 0.001

mlflow.log_param("n_estimators", n_estimators)

mlflow.log_param("max_depth", max_depth)

mlflow.log_param("learning_rate", lr)

모델 학습

X_train, X_test, y_train, y_test = train_test_split(

X, y, test_size=0.2, random_state=42

)

model = RandomForestClassifier(

n_estimators=n_estimators,

max_depth=max_depth,

random_state=42

)

model.fit(X_train, y_train)

메트릭 로깅

y_pred = model.predict(X_test)

accuracy = accuracy_score(y_test, y_pred)

f1 = f1_score(y_test, y_pred, average='weighted')

mlflow.log_metric("accuracy", accuracy)

mlflow.log_metric("f1_score", f1)

학습 곡선 (스텝별 메트릭)

for epoch in range(10):

train_loss = np.random.random() * 0.5 / (epoch + 1)

mlflow.log_metric("train_loss", train_loss, step=epoch)

아티팩트 로깅

mlflow.log_artifact("data/train.csv")

모델 로깅

mlflow.sklearn.log_model(

model,

"model",

registered_model_name="RandomForestClassifier"

)

print(f"Accuracy: {accuracy:.4f}, F1: {f1:.4f}")

print(f"Run ID: {mlflow.active_run().info.run_id}")

3.3 MLflow 자동 로깅

자동 로깅 활성화 (sklearn 예제)

mlflow.sklearn.autolog()

with mlflow.start_run():

model = RandomForestClassifier(n_estimators=100)

model.fit(X_train, y_train)

파라미터, 메트릭, 모델이 자동으로 로깅됨

PyTorch 자동 로깅

mlflow.pytorch.autolog()

XGBoost 자동 로깅

mlflow.xgboost.autolog()

3.4 실전: PyTorch 학습 추적

class SimpleNet(nn.Module):

def __init__(self, input_dim, hidden_dim, output_dim):

super().__init__()

self.fc1 = nn.Linear(input_dim, hidden_dim)

self.fc2 = nn.Linear(hidden_dim, output_dim)

self.relu = nn.ReLU()

self.dropout = nn.Dropout(0.3)

def forward(self, x):

x = self.relu(self.fc1(x))

x = self.dropout(x)

return self.fc2(x)

def train_with_mlflow(

model, train_loader, val_loader,

epochs=10, lr=0.001, experiment_name="pytorch-training"

):

mlflow.set_experiment(experiment_name)

with mlflow.start_run():

하이퍼파라미터 로깅

mlflow.log_params({

"epochs": epochs,

"learning_rate": lr,

"model_type": "SimpleNet",

"optimizer": "Adam",

"batch_size": train_loader.batch_size,

})

optimizer = optim.Adam(model.parameters(), lr=lr)

criterion = nn.CrossEntropyLoss()

scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

for epoch in range(epochs):

학습

model.train()

train_loss = 0.0

for batch_X, batch_y in train_loader:

optimizer.zero_grad()

outputs = model(batch_X)

loss = criterion(outputs, batch_y)

loss.backward()

optimizer.step()

train_loss += loss.item()

train_loss /= len(train_loader)

검증

model.eval()

val_loss = 0.0

correct = 0

total = 0

with torch.no_grad():

for batch_X, batch_y in val_loader:

outputs = model(batch_X)

loss = criterion(outputs, batch_y)

val_loss += loss.item()

_, predicted = outputs.max(1)

total += batch_y.size(0)

correct += predicted.eq(batch_y).sum().item()

val_loss /= len(val_loader)

val_accuracy = correct / total

스텝별 메트릭 로깅

mlflow.log_metrics({

"train_loss": train_loss,

"val_loss": val_loss,

"val_accuracy": val_accuracy,

"learning_rate": scheduler.get_last_lr()[0],

}, step=epoch)

scheduler.step()

print(f"Epoch {epoch+1}/{epochs} | "

f"Train Loss: {train_loss:.4f} | "

f"Val Loss: {val_loss:.4f} | "

f"Val Acc: {val_accuracy:.4f}")

최종 모델 저장

mlflow.pytorch.log_model(model, "model")

모델 구조 텍스트로 저장

with open("model_summary.txt", "w") as f:

f.write(str(model))

mlflow.log_artifact("model_summary.txt")

return mlflow.active_run().info.run_id

3.5 W&B (Weights & Biases) 사용법

W&B 설치

pip install wandb

로그인

wandb login

W&B 초기화

wandb.init(

project="my-ml-project",

name="experiment-001",

config={

"learning_rate": 0.001,

"epochs": 100,

"batch_size": 64,

"architecture": "ResNet50",

"dataset": "ImageNet",

}

)

config 접근

lr = wandb.config.learning_rate

학습 루프 예제

for epoch in range(wandb.config.epochs):

train_loss, train_acc = train_epoch(model, train_loader)

val_loss, val_acc = eval_epoch(model, val_loader)

메트릭 로깅

wandb.log({

"epoch": epoch,

"train_loss": train_loss,

"train_accuracy": train_acc,

"val_loss": val_loss,

"val_accuracy": val_acc,

"learning_rate": optimizer.param_groups[0]['lr'],

})

모델 저장

wandb.save("model.pth")

이미지 로깅 예제

wandb.log({

"predictions": [

wandb.Image(img, caption=f"Pred: {pred}, True: {true}")

for img, pred, true in sample_predictions

]

})

W&B Artifacts (데이터셋 버전 관리)

artifact = wandb.Artifact("training-data", type="dataset")

artifact.add_dir("data/train")

wandb.log_artifact(artifact)

실험 종료

wandb.finish()

3.6 W&B Sweeps (하이퍼파라미터 최적화)

sweep 설정

sweep_config = {

"method": "bayes", # random, grid, bayes

"metric": {

"name": "val_accuracy",

"goal": "maximize"

},

"parameters": {

"learning_rate": {

"distribution": "log_uniform_values",

"min": 1e-5,

"max": 1e-1,

},

"batch_size": {

"values": [16, 32, 64, 128]

},

"hidden_dim": {

"values": [64, 128, 256, 512]

},

"dropout": {

"distribution": "uniform",

"min": 0.1,

"max": 0.5,

}

}

}

def train_sweep():

with wandb.init() as run:

config = wandb.config

model = SimpleNet(

input_dim=784,

hidden_dim=config.hidden_dim,

output_dim=10

)

optimizer = torch.optim.Adam(

model.parameters(),

lr=config.learning_rate

)

학습 루프 (간략화)

for epoch in range(10):

train_loss, val_loss, val_acc = run_epoch(

model, optimizer, config.batch_size

)

wandb.log({

"train_loss": train_loss,

"val_loss": val_loss,

"val_accuracy": val_acc,

})

sweep 생성 및 실행

sweep_id = wandb.sweep(sweep_config, project="my-project")

wandb.agent(sweep_id, function=train_sweep, count=50)

4. ML 파이프라인 오케스트레이션

4.1 Apache Airflow로 ML 파이프라인

dags/ml_pipeline.py

from airflow import DAG

from airflow.operators.python import PythonOperator

from airflow.operators.bash import BashOperator

from datetime import datetime, timedelta

default_args = {

'owner': 'mlops-team',

'depends_on_past': False,

'start_date': datetime(2026, 1, 1),

'email_on_failure': True,

'email_on_retry': False,

'retries': 1,

'retry_delay': timedelta(minutes=5),

}

dag = DAG(

'ml_training_pipeline',

default_args=default_args,

description='ML 모델 학습 파이프라인',

schedule_interval='0 2 * * *', # 매일 오전 2시

catchup=False,

)

def extract_data(**kwargs):

"""데이터 추출"""

from sqlalchemy import create_engine

engine = create_engine('postgresql://user:pass@db:5432/mldb')

df = pd.read_sql("SELECT * FROM features WHERE date >= CURRENT_DATE - 7", engine)

output_path = '/tmp/raw_data.parquet'

df.to_parquet(output_path)

XCom으로 경로 전달

kwargs['ti'].xcom_push(key='data_path', value=output_path)

return output_path

def preprocess_data(**kwargs):

"""데이터 전처리"""

ti = kwargs['ti']

data_path = ti.xcom_pull(task_ids='extract_data', key='data_path')

from sklearn.preprocessing import StandardScaler

df = pd.read_parquet(data_path)

전처리

scaler = StandardScaler()

features = df.drop('target', axis=1)

labels = df['target']

scaled_features = scaler.fit_transform(features)

processed_path = '/tmp/processed_data.parquet'

pd.DataFrame(scaled_features).to_parquet(processed_path)

ti.xcom_push(key='processed_path', value=processed_path)

def train_model(**kwargs):

"""모델 학습"""

ti = kwargs['ti']

processed_path = ti.xcom_pull(

task_ids='preprocess_data',

key='processed_path'

)

from sklearn.ensemble import GradientBoostingClassifier

mlflow.set_experiment("production-training")

with mlflow.start_run():

data = pd.read_parquet(processed_path)

X_train, X_test, y_train, y_test = train_test_split(

data.drop('target', axis=1),

data['target'],

test_size=0.2

)

model = GradientBoostingClassifier(n_estimators=200)

model.fit(X_train, y_train)

accuracy = model.score(X_test, y_test)

mlflow.log_metric("accuracy", accuracy)

mlflow.sklearn.log_model(model, "model")

run_id = mlflow.active_run().info.run_id

ti.xcom_push(key='run_id', value=run_id)

def evaluate_and_deploy(**kwargs):

"""모델 평가 및 배포"""

ti = kwargs['ti']

run_id = ti.xcom_pull(task_ids='train_model', key='run_id')

client = mlflow.tracking.MlflowClient()

run = client.get_run(run_id)

accuracy = run.data.metrics['accuracy']

성능 기준 충족시 배포

if accuracy >= 0.90:

model_uri = f"runs:/{run_id}/model"

mlflow.register_model(model_uri, "ProductionModel")

print(f"Model deployed with accuracy: {accuracy:.4f}")

else:

raise ValueError(f"Model accuracy {accuracy:.4f} below threshold 0.90")

태스크 정의

extract_task = PythonOperator(

task_id='extract_data',

python_callable=extract_data,

dag=dag,

)

preprocess_task = PythonOperator(

task_id='preprocess_data',

python_callable=preprocess_data,

dag=dag,

)

train_task = PythonOperator(

task_id='train_model',

python_callable=train_model,

dag=dag,

)

evaluate_task = PythonOperator(

task_id='evaluate_and_deploy',

python_callable=evaluate_and_deploy,

dag=dag,

)

의존성 설정

extract_task >> preprocess_task >> train_task >> evaluate_task

4.2 Prefect로 ML 파이프라인

from prefect import flow, task

from prefect.tasks import task_input_hash

from datetime import timedelta

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))

def load_data(data_path: str) -> pd.DataFrame:

"""데이터 로드 (캐싱 지원)"""

return pd.read_parquet(data_path)

@task(retries=3, retry_delay_seconds=10)

def preprocess(df: pd.DataFrame) -> pd.DataFrame:

"""데이터 전처리"""

df = df.dropna()

df = df.drop_duplicates()

return df

@task

def train(df: pd.DataFrame, params: dict) -> str:

"""모델 학습"""

from sklearn.ensemble import RandomForestClassifier

mlflow.set_experiment("prefect-ml-pipeline")

with mlflow.start_run():

model = RandomForestClassifier(**params)

X = df.drop('target', axis=1)

y = df['target']

model.fit(X, y)

mlflow.sklearn.log_model(model, "model")

mlflow.log_params(params)

return mlflow.active_run().info.run_id

@task

def evaluate(run_id: str) -> float:

"""모델 평가"""

client = mlflow.tracking.MlflowClient()

run = client.get_run(run_id)

return run.data.metrics.get('accuracy', 0.0)

@flow(name="ml-training-pipeline")

def ml_pipeline(data_path: str, params: dict = None):

"""메인 파이프라인"""

if params is None:

params = {"n_estimators": 100, "max_depth": 5}

df = load_data(data_path)

processed_df = preprocess(df)

run_id = train(processed_df, params)

accuracy = evaluate(run_id)

print(f"Pipeline complete. Accuracy: {accuracy:.4f}")

return accuracy

실행

if __name__ == "__main__":

ml_pipeline(

data_path="data/train.parquet",

params={"n_estimators": 200, "max_depth": 7}

)

5. 모델 레지스트리

5.1 MLflow Model Registry

모델 레지스트리는 모델의 버전을 중앙에서 관리하고, Staging/Production 상태를 추적합니다.

from mlflow.tracking import MlflowClient

client = MlflowClient()

모델 등록

model_uri = "runs:/abc123def456/model"

registered_model = mlflow.register_model(model_uri, "MyClassifier")

버전 정보 확인

print(f"Version: {registered_model.version}")

print(f"Status: {registered_model.status}")

모델 버전 메타데이터 업데이트

client.update_model_version(

name="MyClassifier",

version=registered_model.version,

description="Random Forest with 200 estimators, accuracy 0.94"

)

태그 추가

client.set_model_version_tag(

name="MyClassifier",

version=registered_model.version,

key="validated_by",

value="data-science-team"

)

5.2 Staging → Production 승격

Staging으로 전환

client.transition_model_version_stage(

name="MyClassifier",

version=1,

stage="Staging",

archive_existing_versions=False

)

검증 후 Production으로 승격

def promote_to_production(model_name: str, version: int, min_accuracy: float = 0.90):

"""모델을 Production으로 승격"""

현재 메트릭 확인

model_version = client.get_model_version(model_name, version)

run_id = model_version.run_id

run = client.get_run(run_id)

accuracy = run.data.metrics.get('accuracy', 0)

if accuracy < min_accuracy:

raise ValueError(

f"Model accuracy {accuracy:.4f} is below minimum {min_accuracy}"

)

Production으로 승격 (기존 Production 버전은 Archived로)

client.transition_model_version_stage(

name=model_name,

version=version,

stage="Production",

archive_existing_versions=True

)

print(f"Model {model_name} v{version} promoted to Production!")

print(f"Accuracy: {accuracy:.4f}")

Production 모델 로드

production_model = mlflow.pyfunc.load_model(

model_uri=f"models:/MyClassifier/Production"

)

predictions = production_model.predict(X_test)

5.3 모델 서빙 (MLflow 내장)

MLflow 모델 서빙

mlflow models serve \

-m "models:/MyClassifier/Production" \

--host 0.0.0.0 \

--port 5001

예측 요청

curl -X POST http://localhost:5001/invocations \

-H "Content-Type: application/json" \

-d '{"dataframe_split": {"columns": ["feature1", "feature2"], "data": [[1.0, 2.0]]}}'

6. 컨테이너화 (Docker)

6.1 ML 환경의 Docker화

Dockerfile.train - 학습용 이미지

FROM python:3.11-slim

작업 디렉토리 설정

WORKDIR /app

시스템 의존성 설치

RUN apt-get update && apt-get install -y \

gcc \

g++ \

git \

&& rm -rf /var/lib/apt/lists/*

파이썬 패키지 설치 (레이어 캐싱 최적화)

COPY requirements.txt .

RUN pip install --no-cache-dir -r requirements.txt

소스 코드 복사

COPY src/ ./src/

COPY params.yaml .

COPY dvc.yaml .

학습 실행

CMD ["python", "src/train.py"]

6.2 멀티스테이지 빌드

Dockerfile.serve - 서빙용 멀티스테이지 빌드

Stage 1: 빌더

FROM python:3.11 AS builder

WORKDIR /build

COPY requirements.txt .

RUN pip install --no-cache-dir --target /install -r requirements.txt

Stage 2: 최종 이미지 (슬림)

FROM python:3.11-slim AS runtime

WORKDIR /app

빌더에서 패키지 복사

COPY --from=builder /install /usr/local/lib/python3.11/site-packages

서빙 코드만 복사

COPY src/serve.py .

COPY models/ ./models/

포트 노출

EXPOSE 8080

비루트 사용자로 실행 (보안)

RUN useradd -m -u 1000 mluser

USER mluser

CMD ["python", "serve.py"]

6.3 GPU Docker (nvidia-docker)

Dockerfile.gpu - GPU 지원 이미지

FROM nvidia/cuda:12.1.0-cudnn8-devel-ubuntu22.04

ENV DEBIAN_FRONTEND=noninteractive

RUN apt-get update && apt-get install -y \

python3.11 \

python3-pip \

git \

&& rm -rf /var/lib/apt/lists/*

PyTorch GPU 버전 설치

RUN pip3 install torch torchvision torchaudio \

--index-url https://download.pytorch.org/whl/cu121

WORKDIR /app

COPY requirements-gpu.txt .

RUN pip3 install --no-cache-dir -r requirements-gpu.txt

COPY src/ ./src/

CMD ["python3", "src/train_gpu.py"]

GPU 컨테이너 실행

docker run --gpus all \

-v /data:/data \

-v /models:/models \

--shm-size=8gb \

my-ml-gpu:latest

6.4 Docker Compose로 ML 스택

docker-compose.yml

version: '3.8'

services:

MLflow 추적 서버

mlflow:

image: ghcr.io/mlflow/mlflow:latest

command: >

mlflow server

--backend-store-uri postgresql://mlflow:password@postgres:5432/mlflow

--default-artifact-root s3://my-bucket/mlflow

--host 0.0.0.0

--port 5000

ports:

- '5000:5000'

environment:

AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}

AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}

depends_on:

- postgres

PostgreSQL (MLflow 백엔드)

postgres:

image: postgres:15

environment:

POSTGRES_DB: mlflow

POSTGRES_USER: mlflow

POSTGRES_PASSWORD: password

volumes:

- postgres_data:/var/lib/postgresql/data

ports:

- '5432:5432'

MinIO (S3 호환 스토리지)

minio:

image: minio/minio:latest

command: server /data --console-address ":9001"

environment:

MINIO_ROOT_USER: minioadmin

MINIO_ROOT_PASSWORD: minioadmin

ports:

- '9000:9000'

- '9001:9001'

volumes:

- minio_data:/data

학습 워커

trainer:

build:

context: .

dockerfile: Dockerfile.train

environment:

MLFLOW_TRACKING_URI: http://mlflow:5000

AWS_ACCESS_KEY_ID: minioadmin

AWS_SECRET_ACCESS_KEY: minioadmin

MLFLOW_S3_ENDPOINT_URL: http://minio:9000

volumes:

- ./data:/app/data

- ./models:/app/models

depends_on:

- mlflow

deploy:

resources:

reservations:

devices:

- driver: nvidia

count: 1

capabilities: [gpu]

volumes:

postgres_data:

minio_data:

6.5 실전: PyTorch 모델 서버 Dockerfile

Dockerfile.pytorch-serve

FROM python:3.11-slim

WORKDIR /app

의존성

RUN pip install --no-cache-dir \

torch==2.2.0+cpu \

torchvision \

fastapi \

uvicorn \

pydantic \

numpy \

pillow \

--index-url https://download.pytorch.org/whl/cpu

COPY src/server.py .

COPY models/model.pt .

EXPOSE 8000

CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8000"]

src/server.py

from fastapi import FastAPI, HTTPException

from pydantic import BaseModel

from typing import List

app = FastAPI(title="ML Model Server")

모델 로드 (서버 시작 시 한 번만)

model = torch.jit.load("model.pt")

model.eval()

class PredictionRequest(BaseModel):

features: List[List[float]]

class PredictionResponse(BaseModel):

predictions: List[int]

probabilities: List[List[float]]

@app.get("/health")

async def health_check():

return {"status": "healthy"}

@app.post("/predict", response_model=PredictionResponse)

async def predict(request: PredictionRequest):

try:

tensor = torch.tensor(request.features, dtype=torch.float32)

with torch.no_grad():

outputs = model(tensor)

probabilities = torch.softmax(outputs, dim=1)

predictions = torch.argmax(probabilities, dim=1)

return PredictionResponse(

predictions=predictions.tolist(),

probabilities=probabilities.tolist()

)

except Exception as e:

raise HTTPException(status_code=500, detail=str(e))

7. Kubernetes for ML

7.1 ML 워크로드를 위한 K8s 기초

k8s/namespace.yaml

apiVersion: v1

kind: Namespace

metadata:

name: ml-platform

labels:

app: ml-platform

environment: production

7.2 GPU 노드 풀 설정

k8s/gpu-node-pool.yaml (GKE 예시)

apiVersion: container.googleapis.com/v1

kind: NodePool

metadata:

name: gpu-pool

spec:

machineType: n1-standard-8

accelerators:

- acceleratorCount: 1

acceleratorType: nvidia-tesla-t4

autoscaling:

minNodeCount: 0

maxNodeCount: 10

nodeConfig:

labels:

accelerator: nvidia-t4

taints:

- key: nvidia.com/gpu

value: present

effect: NoSchedule

7.3 학습 Job

k8s/training-job.yaml

apiVersion: batch/v1

kind: Job

metadata:

name: ml-training-job

namespace: ml-platform

spec:

backoffLimit: 3

template:

spec:

restartPolicy: OnFailure

GPU 노드에 스케줄링

nodeSelector:

accelerator: nvidia-t4

tolerations:

- key: nvidia.com/gpu

operator: Exists

effect: NoSchedule

containers:

- name: trainer

image: my-registry/ml-trainer:v1.2.0

resources:

requests:

memory: '8Gi'

cpu: '4'

nvidia.com/gpu: '1'

limits:

memory: '16Gi'

cpu: '8'

nvidia.com/gpu: '1'

env:

- name: MLFLOW_TRACKING_URI

value: 'http://mlflow-service.ml-platform:5000'

- name: AWS_ACCESS_KEY_ID

valueFrom:

secretKeyRef:

name: aws-credentials

key: access_key_id

volumeMounts:

- name: training-data

mountPath: /data

- name: model-output

mountPath: /models

- name: dshm

mountPath: /dev/shm

volumes:

- name: training-data

persistentVolumeClaim:

claimName: training-data-pvc

- name: model-output

persistentVolumeClaim:

claimName: model-output-pvc

- name: dshm # PyTorch DataLoader 공유 메모리

emptyDir:

medium: Memory

sizeLimit: 8Gi

7.4 정기 학습 CronJob

k8s/training-cronjob.yaml

apiVersion: batch/v1

kind: CronJob

metadata:

name: daily-retraining

namespace: ml-platform

spec:

schedule: '0 2 * * *' # 매일 오전 2시

concurrencyPolicy: Forbid # 동시 실행 방지

successfulJobsHistoryLimit: 3

failedJobsHistoryLimit: 3

jobTemplate:

spec:

template:

spec:

restartPolicy: OnFailure

containers:

- name: retrainer

image: my-registry/ml-trainer:latest

command: ['python', 'src/retrain.py']

env:

- name: TRAINING_DATE

value: '$(date +%Y-%m-%d)'

resources:

requests:

memory: '4Gi'

cpu: '2'

limits:

memory: '8Gi'

cpu: '4'

7.5 모델 서빙 Deployment

k8s/model-serving-deployment.yaml

apiVersion: apps/v1

kind: Deployment

metadata:

name: model-server

namespace: ml-platform

spec:

replicas: 3

selector:

matchLabels:

app: model-server

template:

metadata:

labels:

app: model-server

spec:

containers:

- name: model-server

image: my-registry/model-server:v1.0.0

ports:

- containerPort: 8000

resources:

requests:

memory: '2Gi'

cpu: '1'

limits:

memory: '4Gi'

cpu: '2'

헬스체크

livenessProbe:

httpGet:

path: /health

port: 8000

initialDelaySeconds: 30

periodSeconds: 10

readinessProbe:

httpGet:

path: /health

port: 8000

initialDelaySeconds: 15

periodSeconds: 5

환경변수 (ConfigMap에서)

envFrom:

- configMapRef:

name: model-server-config

apiVersion: v1

kind: Service

metadata:

name: model-server-service

namespace: ml-platform

spec:

selector:

app: model-server

ports:

- port: 80

targetPort: 8000

type: ClusterIP

HPA (수평 자동 확장)

apiVersion: autoscaling/v2

kind: HorizontalPodAutoscaler

metadata:

name: model-server-hpa

namespace: ml-platform

spec:

scaleTargetRef:

apiVersion: apps/v1

kind: Deployment

name: model-server

minReplicas: 2

maxReplicas: 20

metrics:

- type: Resource

resource:

name: cpu

target:

type: Utilization

averageUtilization: 70

- type: Resource

resource:

name: memory

target:

type: Utilization

averageUtilization: 80

8. Kubeflow

8.1 Kubeflow 소개

Kubeflow는 Kubernetes 위에서 ML 워크플로를 관리하는 오픈소스 플랫폼입니다. 다음 구성요소로 이루어져 있습니다:

- **Pipelines**: ML 워크플로 오케스트레이션

- **Notebooks**: 관리형 Jupyter 노트북

- **Katib**: 하이퍼파라미터 튜닝

- **KServe**: 모델 서빙

- **Training Operator**: 분산 학습

8.2 Kubeflow Pipelines

kubeflow_pipeline.py

from kfp import dsl

from kfp.components import func_to_container_op

컴포넌트 정의

@dsl.component(base_image="python:3.11", packages_to_install=["pandas", "scikit-learn"])

def prepare_data(data_path: str, output_path: kfp.dsl.OutputPath(str)):

"""데이터 준비 컴포넌트"""

from sklearn.model_selection import train_test_split

df = pd.read_parquet(data_path)

train, test = train_test_split(df, test_size=0.2)

train.to_parquet(f"{output_path}/train.parquet")

test.to_parquet(f"{output_path}/test.parquet")

@dsl.component(

base_image="python:3.11",

packages_to_install=["scikit-learn", "mlflow", "pandas"]

)

def train_model(

data_path: str,

n_estimators: int,

max_depth: int,

model_output: kfp.dsl.OutputPath(str),

mlflow_uri: str

):

"""모델 학습 컴포넌트"""

from sklearn.ensemble import RandomForestClassifier

mlflow.set_tracking_uri(mlflow_uri)

df = pd.read_parquet(f"{data_path}/train.parquet")

X = df.drop('target', axis=1)

y = df['target']

with mlflow.start_run():

model = RandomForestClassifier(

n_estimators=n_estimators,

max_depth=max_depth

)

model.fit(X, y)

run_id = mlflow.active_run().info.run_id

mlflow.sklearn.log_model(model, "model")

with open(model_output, 'w') as f:

f.write(run_id)

@dsl.component(packages_to_install=["mlflow", "scikit-learn", "pandas"])

def evaluate_model(

data_path: str,

run_id_path: str,

mlflow_uri: str

) -> float:

"""모델 평가 컴포넌트"""

mlflow.set_tracking_uri(mlflow_uri)

with open(run_id_path, 'r') as f:

run_id = f.read()

model = mlflow.sklearn.load_model(f"runs:/{run_id}/model")

test_df = pd.read_parquet(f"{data_path}/test.parquet")

X_test = test_df.drop('target', axis=1)

y_test = test_df['target']

accuracy = model.score(X_test, y_test)

return accuracy

파이프라인 정의

@dsl.pipeline(

name="ML Training Pipeline",

description="End-to-end ML training pipeline"

)

def ml_pipeline(

data_path: str = "gs://my-bucket/data/train.parquet",

n_estimators: int = 100,

max_depth: int = 5,

mlflow_uri: str = "http://mlflow:5000"

):

데이터 준비

prepare_task = prepare_data(data_path=data_path)

모델 학습

train_task = train_model(

data_path=prepare_task.output,

n_estimators=n_estimators,

max_depth=max_depth,

mlflow_uri=mlflow_uri

)

모델 평가

evaluate_task = evaluate_model(

data_path=prepare_task.output,

run_id_path=train_task.outputs['model_output'],

mlflow_uri=mlflow_uri

)

파이프라인 컴파일 및 실행

if __name__ == "__main__":

파이프라인 컴파일

kfp.compiler.Compiler().compile(

pipeline_func=ml_pipeline,

package_path="ml_pipeline.yaml"

)

Kubeflow에 제출

client = kfp.Client(host="http://kubeflow-host/pipeline")

run = client.create_run_from_pipeline_package(

pipeline_file="ml_pipeline.yaml",

arguments={

"n_estimators": 200,

"max_depth": 7,

},

run_name="training-run-001"

)

8.3 Katib 하이퍼파라미터 튜닝

katib-experiment.yaml

apiVersion: kubeflow.org/v1beta1

kind: Experiment

metadata:

name: hp-tuning-experiment

namespace: kubeflow

spec:

objective:

type: maximize

goal: 0.95

objectiveMetricName: accuracy

additionalMetricNames:

- f1_score

algorithm:

algorithmName: bayesianoptimization

parallelTrialCount: 3

maxTrialCount: 20

maxFailedTrialCount: 5

parameters:

- name: learning_rate

parameterType: double

feasibleSpace:

min: '0.0001'

max: '0.01'

- name: n_estimators

parameterType: int

feasibleSpace:

min: '50'

max: '500'

- name: max_depth

parameterType: int

feasibleSpace:

min: '3'

max: '10'

trialTemplate:

primaryContainerName: training-container

trialParameters:

- name: learning_rate

description: Learning rate

reference: learning_rate

- name: n_estimators

description: Number of estimators

reference: n_estimators

- name: max_depth

description: Max tree depth

reference: max_depth

trialSpec:

apiVersion: batch/v1

kind: Job

spec:

template:

spec:

restartPolicy: Never

containers:

- name: training-container

image: my-registry/trainer:latest

command:

- 'python'

- 'train.py'

- '--lr=${trialParameters.learning_rate}'

- '--n-estimators=${trialParameters.n_estimators}'

- '--max-depth=${trialParameters.max_depth}'

9. CI/CD for ML

9.1 GitHub Actions for ML

.github/workflows/ml-pipeline.yml

name: ML CI/CD Pipeline

on:

push:

branches: [main, develop]

paths:

- 'src/**'

- 'params.yaml'

- 'requirements.txt'

pull_request:

branches: [main]

env:

MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}

AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}

AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}

jobs:

test:

name: Unit Tests

runs-on: ubuntu-latest

steps:

- uses: actions/checkout@v4

- name: Set up Python

uses: actions/setup-python@v5

with:

python-version: '3.11'

cache: 'pip'

- name: Install dependencies

run: |

pip install -r requirements.txt

pip install pytest pytest-cov

- name: Run unit tests

run: |

pytest tests/unit/ -v --cov=src --cov-report=xml

- name: Upload coverage

uses: codecov/codecov-action@v4

with:

file: coverage.xml

data-validation:

name: Data Validation

runs-on: ubuntu-latest

needs: test

steps:

- uses: actions/checkout@v4

- name: Setup DVC

run: pip install "dvc[s3]"

- name: Pull data

run: dvc pull data/

- name: Validate data

run: python src/validate_data.py

train-and-evaluate:

name: Train and Evaluate Model

runs-on: ubuntu-latest

needs: data-validation

if: github.ref == 'refs/heads/main'

steps:

- uses: actions/checkout@v4

- name: Setup Python

uses: actions/setup-python@v5

with:

python-version: '3.11'

- name: Install dependencies

run: pip install -r requirements.txt

- name: Run DVC pipeline

run: dvc repro

- name: Check model performance

run: |

python scripts/check_metrics.py \

--min-accuracy 0.90 \

--metrics-file metrics/scores.json

- name: Register model if metrics pass

run: python scripts/register_model.py

- name: Report metrics

uses: iterative/cml@v2

with:

report-type: md

metrics: metrics/scores.json

build-and-push:

name: Build Docker Image

runs-on: ubuntu-latest

needs: train-and-evaluate

steps:

- uses: actions/checkout@v4

- name: Login to Container Registry

uses: docker/login-action@v3

with:

registry: ghcr.io

username: ${{ github.actor }}

password: ${{ secrets.GITHUB_TOKEN }}

- name: Build and push

uses: docker/build-push-action@v5

with:

context: .

file: Dockerfile.serve

push: true

tags: |

ghcr.io/${{ github.repository }}/model-server:latest

ghcr.io/${{ github.repository }}/model-server:${{ github.sha }}

deploy:

name: Deploy to Kubernetes

runs-on: ubuntu-latest

needs: build-and-push

environment: production

steps:

- uses: actions/checkout@v4

- name: Configure kubectl

uses: azure/k8s-set-context@v3

with:

kubeconfig: ${{ secrets.KUBECONFIG }}

- name: Deploy

run: |

kubectl set image deployment/model-server \

model-server=ghcr.io/${{ github.repository }}/model-server:${{ github.sha }} \

-n ml-platform

kubectl rollout status deployment/model-server -n ml-platform

10. 모델 모니터링

10.1 Evidently AI로 데이터 드리프트 감지

pip install evidently

from evidently.report import Report

from evidently.metric_preset import DataDriftPreset, DataQualityPreset

from evidently.metrics import *

참조 데이터 (학습 시 데이터)

reference_data = pd.read_parquet("data/reference.parquet")

현재 데이터 (프로덕션 예측 데이터)

current_data = pd.read_parquet("data/current.parquet")

데이터 드리프트 리포트

drift_report = Report(metrics=[

DataDriftPreset(),

DataQualityPreset(),

ColumnDriftMetric(column_name="age"),

ColumnDriftMetric(column_name="income"),

])

drift_report.run(

reference_data=reference_data,

current_data=current_data

)

HTML 리포트 저장

drift_report.save_html("drift_report.html")

JSON으로 결과 추출

result = drift_report.as_dict()

drift_detected = result['metrics'][0]['result']['dataset_drift']

if drift_detected:

print("Data drift detected! Consider retraining.")

알림 전송 (Slack, Email 등)

send_alert("Data drift detected in production model")

10.2 모델 성능 모니터링

from evidently.report import Report

from evidently.metric_preset import ClassificationPreset

from evidently.metrics import ClassificationQualityMetric

예측 데이터 준비 (실제 레이블이 있는 경우)

prediction_data = pd.DataFrame({

'target': y_true,

'prediction': y_pred,

'prediction_proba': y_proba,

'feature1': X_test['feature1'],

'feature2': X_test['feature2'],

})

분류 성능 리포트

performance_report = Report(metrics=[

ClassificationPreset(),

ClassificationQualityMetric(),

])

performance_report.run(

reference_data=reference_predictions,

current_data=prediction_data

)

결과 추출

metrics = performance_report.as_dict()

current_accuracy = metrics['metrics'][0]['result']['current']['accuracy']

reference_accuracy = metrics['metrics'][0]['result']['reference']['accuracy']

degradation = reference_accuracy - current_accuracy

if degradation > 0.05: # 5% 이상 성능 저하

trigger_retraining(f"Model degraded by {degradation:.2%}")

10.3 실시간 예측 모니터링 파이프라인

from datetime import datetime

from prometheus_client import Counter, Histogram, Gauge, start_http_server

Prometheus 메트릭 정의

prediction_counter = Counter('model_predictions_total', 'Total predictions')

prediction_latency = Histogram('model_prediction_duration_seconds', 'Prediction latency')

model_accuracy_gauge = Gauge('model_accuracy', 'Current model accuracy')

drift_score_gauge = Gauge('data_drift_score', 'Data drift score')

class MonitoredModelServer:

def __init__(self, model, reference_data):

self.model = model

self.reference_data = reference_data

self.predictions_buffer = []

self.labels_buffer = []

Prometheus 메트릭 서버 시작

start_http_server(8001)

def predict(self, features):

with prediction_latency.time():

prediction = self.model.predict(features)

prediction_counter.inc()

self.predictions_buffer.append(prediction)

버퍼가 1000개 차면 모니터링 실행

if len(self.predictions_buffer) >= 1000:

self._run_monitoring()

return prediction

def _run_monitoring(self):

"""주기적 모니터링 실행"""

current_df = pd.DataFrame(self.predictions_buffer)

드리프트 감지

drift_report = Report(metrics=[DataDriftPreset()])

drift_report.run(

reference_data=self.reference_data,

current_data=current_df

)

result = drift_report.as_dict()

drift_share = result['metrics'][0]['result']['share_of_drifted_columns']

drift_score_gauge.set(drift_share)

if drift_share > 0.3:

logging.warning(f"High drift detected: {drift_share:.2%} of columns drifted")

버퍼 초기화

self.predictions_buffer = []

11. Feature Store

11.1 Feature Store 개념

Feature Store는 ML 피처(특성)를 중앙에서 관리하고 재사용할 수 있게 해주는 인프라입니다.

**핵심 개념:**

- **온라인 스토어**: 실시간 예측을 위한 저지연 피처 조회 (Redis, DynamoDB)

- **오프라인 스토어**: 배치 학습을 위한 히스토리컬 피처 (S3, BigQuery)

- **피처 뷰**: 피처 정의와 데이터 소스 매핑

- **엔티티**: 피처의 주체 (사용자 ID, 상품 ID 등)

11.2 Feast 설치 및 설정

Feast 설치

pip install feast

Feast 프로젝트 초기화

feast init my-feature-store

cd my-feature-store

feature_store.yaml

project: my_feature_store

registry: data/registry.db

provider: local

online_store:

type: redis

connection_string: "localhost:6379"

offline_store:

type: file

entity_key_serialization_version: 2

features.py - 피처 정의

from feast import (

Entity, Feature, FeatureView,

FileSource, ValueType, Field

)

from feast.types import Float64, Int64, String

from datetime import timedelta

엔티티 정의

user = Entity(

name="user_id",

value_type=ValueType.INT64,

description="User ID"

)

데이터 소스

user_stats_source = FileSource(

path="data/user_stats.parquet",

timestamp_field="event_timestamp",

)

피처 뷰 정의

user_stats_fv = FeatureView(

name="user_statistics",

entities=[user],

ttl=timedelta(days=30),

schema=[

Field(name="purchase_count_7d", dtype=Int64),

Field(name="purchase_amount_7d", dtype=Float64),

Field(name="avg_session_duration", dtype=Float64),

Field(name="last_purchase_days_ago", dtype=Int64),

],

online=True,

source=user_stats_source,

)

feast_usage.py - Feast 활용

from feast import FeatureStore

store = FeatureStore(repo_path=".")

오프라인 피처 조회 (학습용)

entity_df = pd.DataFrame({

"user_id": [1001, 1002, 1003],

"event_timestamp": pd.to_datetime(["2026-01-01", "2026-01-02", "2026-01-03"])

})

training_df = store.get_historical_features(

entity_df=entity_df,

features=[

"user_statistics:purchase_count_7d",

"user_statistics:purchase_amount_7d",

"user_statistics:avg_session_duration",

]

).to_df()

print(training_df.head())

온라인 피처 조회 (실시간 예측용)

먼저 온라인 스토어에 데이터 로드

store.materialize_incremental(end_date=datetime.now())

실시간 조회

online_features = store.get_online_features(

features=[

"user_statistics:purchase_count_7d",

"user_statistics:purchase_amount_7d",

],

entity_rows=[

{"user_id": 1001},

{"user_id": 1002},

]

).to_dict()

print(online_features)

12. 실전 MLOps 프로젝트

12.1 전체 파이프라인 아키텍처

데이터 소스 (DB, S3, API)

데이터 수집 (Airflow DAG)

데이터 검증 (Great Expectations)

피처 엔지니어링 (Feast)

모델 학습 (MLflow 추적)

모델 평가 (자동화된 검증)

모델 레지스트리 (MLflow Registry)

CI/CD (GitHub Actions)

컨테이너 빌드 (Docker)

K8s 배포 (Kubernetes)

서빙 (FastAPI + Triton)

모니터링 (Evidently + Prometheus + Grafana)

알림 (드리프트 감지 시 재학습 트리거)

12.2 실전 프로젝트: 고객 이탈 예측 시스템

scripts/full_pipeline.py

"""

전체 MLOps 파이프라인: 고객 이탈 예측

"""

from sklearn.ensemble import GradientBoostingClassifier

from sklearn.model_selection import train_test_split, cross_val_score

from sklearn.metrics import (

accuracy_score, roc_auc_score,

classification_report, confusion_matrix

)

from sklearn.preprocessing import StandardScaler

logging.basicConfig(level=logging.INFO)

logger = logging.getLogger(__name__)

class ChurnPredictionPipeline:

"""고객 이탈 예측 파이프라인"""

def __init__(self, mlflow_uri: str = "http://localhost:5000"):

mlflow.set_tracking_uri(mlflow_uri)

mlflow.set_experiment("churn-prediction")

self.scaler = StandardScaler()

def load_data(self, data_path: str) -> pd.DataFrame:

"""데이터 로드 및 기본 검증"""

logger.info(f"Loading data from {data_path}")

df = pd.read_parquet(data_path)

기본 데이터 품질 검사

assert df.shape[0] > 1000, "Too few samples"

assert 'churn' in df.columns, "Target column 'churn' missing"

assert df['churn'].nunique() == 2, "Binary classification expected"

missing_rate = df.isnull().mean()

high_missing = missing_rate[missing_rate > 0.5].index.tolist()

if high_missing:

logger.warning(f"Columns with >50% missing: {high_missing}")

df = df.drop(columns=high_missing)

logger.info(f"Data loaded: {df.shape[0]} rows, {df.shape[1]} columns")

return df

def engineer_features(self, df: pd.DataFrame) -> pd.DataFrame:

"""피처 엔지니어링"""

logger.info("Engineering features...")

결측값 처리

numeric_cols = df.select_dtypes(include=[np.number]).columns

df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].median())

파생 피처 생성

if 'tenure_months' in df.columns and 'monthly_charges' in df.columns:

df['total_value'] = df['tenure_months'] * df['monthly_charges']

if 'num_support_tickets' in df.columns and 'tenure_months' in df.columns:

df['tickets_per_month'] = (

df['num_support_tickets'] / (df['tenure_months'] + 1)

)

return df

def train(

self,

df: pd.DataFrame,

params: dict = None

) -> str:

"""모델 학습 및 MLflow 추적"""

if params is None:

params = {

"n_estimators": 200,

"max_depth": 5,

"learning_rate": 0.05,

"subsample": 0.8,

"random_state": 42,

}

feature_cols = [c for c in df.columns if c != 'churn']

X = df[feature_cols]

y = df['churn']

X_train, X_test, y_train, y_test = train_test_split(

X, y, test_size=0.2, random_state=42, stratify=y

)

스케일링

X_train_scaled = self.scaler.fit_transform(X_train)

X_test_scaled = self.scaler.transform(X_test)

with mlflow.start_run(run_name="churn-gbt"):

파라미터 로깅

mlflow.log_params(params)

mlflow.log_param("train_size", len(X_train))

mlflow.log_param("test_size", len(X_test))

mlflow.log_param("n_features", len(feature_cols))

모델 학습

model = GradientBoostingClassifier(**params)

model.fit(X_train_scaled, y_train)

평가

y_pred = model.predict(X_test_scaled)

y_proba = model.predict_proba(X_test_scaled)[:, 1]

accuracy = accuracy_score(y_test, y_pred)

auc_roc = roc_auc_score(y_test, y_proba)

mlflow.log_metrics({

"accuracy": accuracy,

"auc_roc": auc_roc,

})

교차 검증

cv_scores = cross_val_score(

model, X_train_scaled, y_train, cv=5, scoring='roc_auc'

)

mlflow.log_metric("cv_auc_mean", cv_scores.mean())

mlflow.log_metric("cv_auc_std", cv_scores.std())

피처 중요도

feature_importance = pd.DataFrame({

'feature': feature_cols,

'importance': model.feature_importances_

}).sort_values('importance', ascending=False)

feature_importance.to_csv('/tmp/feature_importance.csv', index=False)

mlflow.log_artifact('/tmp/feature_importance.csv')

모델 저장

mlflow.sklearn.log_model(

model,

"model",

registered_model_name="ChurnPredictor"

)

run_id = mlflow.active_run().info.run_id

logger.info(f"Training complete. Accuracy: {accuracy:.4f}, AUC-ROC: {auc_roc:.4f}")

logger.info(f"MLflow Run ID: {run_id}")

return run_id

def promote_best_model(self, min_auc: float = 0.85):

"""최고 성능 모델을 Production으로 승격"""

client = mlflow.tracking.MlflowClient()

최신 실험 결과 조회

experiment = client.get_experiment_by_name("churn-prediction")

runs = client.search_runs(

experiment_ids=[experiment.experiment_id],

filter_string="metrics.auc_roc > 0.80",

order_by=["metrics.auc_roc DESC"],

max_results=1

)

if not runs:

raise ValueError("No runs found meeting criteria")

best_run = runs[0]

auc = best_run.data.metrics['auc_roc']

if auc < min_auc:

raise ValueError(f"Best AUC {auc:.4f} below minimum {min_auc}")

모델 등록 및 Production 승격

model_uri = f"runs:/{best_run.info.run_id}/model"

mv = mlflow.register_model(model_uri, "ChurnPredictor")

client.transition_model_version_stage(

name="ChurnPredictor",

version=mv.version,

stage="Production",

archive_existing_versions=True

)

logger.info(f"Model v{mv.version} promoted to Production! AUC: {auc:.4f}")

return mv.version

실행

if __name__ == "__main__":

pipeline = ChurnPredictionPipeline()

df = pipeline.load_data("data/customers.parquet")

df = pipeline.engineer_features(df)

run_id = pipeline.train(df)

pipeline.promote_best_model(min_auc=0.85)

마무리

MLOps는 단순히 도구를 배우는 것이 아니라, ML 시스템을 **안정적이고 재현 가능하며 확장 가능하게** 운영하는 문화와 프로세스입니다.

핵심 원칙을 기억하세요:

1. **모든 것을 버전 관리하라**: 코드, 데이터, 모델, 환경 모두

2. **자동화를 최우선으로**: 수동 과정은 오류의 원인

3. **측정하고 모니터링하라**: 측정하지 않으면 개선할 수 없다

4. **실패를 빠르게 감지하라**: 드리프트, 성능 저하를 즉시 감지

5. **재현 가능성을 보장하라**: 누구든, 언제든 동일한 결과를 재현할 수 있어야 함

MLOps 여정은 점진적입니다. 레벨 0에서 시작해 조직의 필요에 맞게 성숙도를 높여가세요.

참고 자료

- [MLflow 공식 문서](https://mlflow.org/docs/latest/)

- [Weights & Biases 문서](https://docs.wandb.ai/)

- [Kubeflow 공식 문서](https://www.kubeflow.org/docs/)

- [DVC 공식 문서](https://dvc.org/doc)

- [Evidently AI 문서](https://docs.evidentlyai.com/)

- [Feast 문서](https://docs.feast.dev/)

현재 단락 (1/1461)

현대의 머신러닝 프로젝트는 Jupyter Notebook에서 모델을 만들고 끝나는 게 아닙니다. 데이터 수집, 전처리, 학습, 평가, 배포, 모니터링, 재학습까지 전체 라이프사이클...

작성 글자: 0원문 글자: 38,450작성 단락: 0/1461