Split View: Ray Serve로 구현하는 확장 가능한 LLM 서빙 파이프라인
Ray Serve로 구현하는 확장 가능한 LLM 서빙 파이프라인
- 들어가며
- Ray Serve 핵심 개념
- LLM 서빙 (vLLM 통합)
- 멀티모델 파이프라인
- 배치 추론
- 멀티노드 배포
- Kubernetes 배포 (KubeRay)
- 모니터링
- A/B 테스트
- 퀴즈
- 마무리
- 참고 자료

들어가며
ML/LLM 모델을 프로덕션에 배포할 때 가장 큰 과제는 확장성과 복잡한 파이프라인 관리입니다. 단일 모델 서빙은 간단하지만, 실제 서비스는 전처리 → 모델 추론 → 후처리 → 재랭킹 같은 멀티스텝 파이프라인이 필요합니다.
Ray Serve는 Ray 위에 구축된 확장 가능한 모델 서빙 프레임워크로, Python 네이티브 코드로 복잡한 서빙 파이프라인을 구성할 수 있습니다.
Ray Serve 핵심 개념
Deployment
from ray import serve
import ray
# Ray 클러스터 연결
ray.init()
# 가장 간단한 Deployment
@serve.deployment
class HelloModel:
def __call__(self, request):
return {"message": "Hello from Ray Serve!"}
# 배포
app = HelloModel.bind()
serve.run(app, route_prefix="/hello")
Deployment 설정
@serve.deployment(
num_replicas=3, # 복제본 수
max_ongoing_requests=100, # 복제본당 최대 동시 요청
ray_actor_options={
"num_cpus": 2,
"num_gpus": 1,
"memory": 4 * 1024**3, # 4GB
},
health_check_period_s=10,
health_check_timeout_s=30,
graceful_shutdown_wait_loop_s=2,
graceful_shutdown_timeout_s=20,
)
class MLModel:
def __init__(self, model_path: str):
self.model = self.load_model(model_path)
def load_model(self, path):
import torch
return torch.load(path)
async def __call__(self, request):
data = await request.json()
prediction = self.model.predict(data["features"])
return {"prediction": prediction.tolist()}
오토스케일링
@serve.deployment(
autoscaling_config={
"min_replicas": 1,
"max_replicas": 10,
"initial_replicas": 2,
"target_ongoing_requests": 5, # 복제본당 목표 동시 요청
"upscale_delay_s": 30,
"downscale_delay_s": 300,
"upscaling_factor": 1.5, # 스케일 업 배수
"downscaling_factor": 0.7, # 스케일 다운 배수
"smoothing_factor": 0.5,
"metrics_interval_s": 10,
}
)
class AutoScaledModel:
def __init__(self):
from transformers import pipeline
self.classifier = pipeline("sentiment-analysis")
async def __call__(self, request):
data = await request.json()
result = self.classifier(data["text"])
return result
LLM 서빙 (vLLM 통합)
from ray import serve
from ray.serve.llm import LLMServer, LLMConfig
# Ray Serve LLM 모듈 사용 (vLLM 백엔드)
llm_config = LLMConfig(
model="meta-llama/Llama-3.1-8B-Instruct",
tensor_parallel_size=1,
max_model_len=8192,
gpu_memory_utilization=0.9,
)
# OpenAI 호환 엔드포인트 자동 생성
app = LLMServer.bind(llm_config)
serve.run(app)
커스텀 LLM 서빙
@serve.deployment(
ray_actor_options={"num_gpus": 1},
autoscaling_config={
"min_replicas": 1,
"max_replicas": 4,
"target_ongoing_requests": 3,
},
)
class CustomLLMDeployment:
def __init__(self):
from vllm import LLM, SamplingParams
self.llm = LLM(
model="meta-llama/Llama-3.1-8B-Instruct",
dtype="auto",
max_model_len=4096,
gpu_memory_utilization=0.85,
)
self.default_params = SamplingParams(
temperature=0.7,
top_p=0.9,
max_tokens=1024,
)
async def __call__(self, request):
data = await request.json()
prompt = data["prompt"]
params = SamplingParams(
temperature=data.get("temperature", 0.7),
max_tokens=data.get("max_tokens", 1024),
)
outputs = self.llm.generate([prompt], params)
return {
"text": outputs[0].outputs[0].text,
"tokens": len(outputs[0].outputs[0].token_ids),
}
멀티모델 파이프라인
@serve.deployment(num_replicas=2, ray_actor_options={"num_cpus": 1})
class Preprocessor:
"""텍스트 전처리"""
def __init__(self):
from transformers import AutoTokenizer
self.tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
async def preprocess(self, text: str) -> dict:
# 언어 감지, 정규화, 토큰화
cleaned = text.strip().lower()
tokens = self.tokenizer.encode(cleaned, max_length=512, truncation=True)
return {"text": cleaned, "tokens": tokens, "length": len(tokens)}
@serve.deployment(ray_actor_options={"num_gpus": 1})
class SentimentModel:
"""감성 분석 모델"""
def __init__(self):
from transformers import pipeline
self.model = pipeline(
"sentiment-analysis",
model="nlptown/bert-base-multilingual-uncased-sentiment",
device=0
)
async def predict(self, text: str) -> dict:
result = self.model(text)[0]
return {"label": result["label"], "score": result["score"]}
@serve.deployment(ray_actor_options={"num_gpus": 1})
class SummaryModel:
"""요약 모델"""
def __init__(self):
from transformers import pipeline
self.model = pipeline("summarization", device=0)
async def summarize(self, text: str) -> str:
if len(text.split()) < 30:
return text
result = self.model(text, max_length=100, min_length=30)
return result[0]["summary_text"]
@serve.deployment
class Pipeline:
"""오케스트레이터 — 여러 모델 조합"""
def __init__(self, preprocessor, sentiment, summary):
self.preprocessor = preprocessor
self.sentiment = sentiment
self.summary = summary
async def __call__(self, request):
data = await request.json()
text = data["text"]
# 전처리
processed = await self.preprocessor.preprocess.remote(text)
# 병렬 추론
sentiment_future = self.sentiment.predict.remote(text)
summary_future = self.summary.summarize.remote(text)
sentiment_result = await sentiment_future
summary_result = await summary_future
return {
"original_length": processed["length"],
"sentiment": sentiment_result,
"summary": summary_result,
}
# 파이프라인 구성 (DAG)
preprocessor = Preprocessor.bind()
sentiment = SentimentModel.bind()
summary = SummaryModel.bind()
app = Pipeline.bind(preprocessor, sentiment, summary)
serve.run(app, route_prefix="/analyze")
배치 추론
@serve.deployment(
ray_actor_options={"num_gpus": 1},
)
class BatchedModel:
def __init__(self):
from transformers import pipeline
self.model = pipeline("text-classification", device=0)
@serve.batch(max_batch_size=32, batch_wait_timeout_s=0.1)
async def __call__(self, texts: list[str]) -> list[dict]:
"""
Ray Serve가 자동으로 개별 요청을 배치로 모음
최대 32개, 또는 0.1초 대기 후 배치 실행
"""
results = self.model(texts)
return results
멀티노드 배포
# 대형 모델 (여러 GPU/노드에 걸쳐 배포)
@serve.deployment(
ray_actor_options={
"num_gpus": 4, # 4 GPU 사용
},
placement_group_strategy="STRICT_PACK", # 같은 노드에 배치
)
class LargeModel:
def __init__(self):
from vllm import LLM
self.llm = LLM(
model="meta-llama/Llama-3.1-70B-Instruct",
tensor_parallel_size=4,
)
Kubernetes 배포 (KubeRay)
# Ray Cluster 정의
apiVersion: ray.io/v1
kind: RayService
metadata:
name: llm-service
spec:
serveConfigV2: |
applications:
- name: llm-app
import_path: serve_app:app
runtime_env:
pip:
- vllm>=0.6.0
- transformers
deployments:
- name: LLMDeployment
num_replicas: 2
ray_actor_options:
num_gpus: 1
autoscaling_config:
min_replicas: 1
max_replicas: 4
rayClusterConfig:
headGroupSpec:
rayStartParams:
dashboard-host: '0.0.0.0'
template:
spec:
containers:
- name: ray-head
image: rayproject/ray-ml:2.40.0-py310-gpu
resources:
limits:
cpu: '4'
memory: '16Gi'
ports:
- containerPort: 6379
- containerPort: 8265
- containerPort: 8000
workerGroupSpecs:
- groupName: gpu-workers
replicas: 2
minReplicas: 1
maxReplicas: 4
rayStartParams: {}
template:
spec:
containers:
- name: ray-worker
image: rayproject/ray-ml:2.40.0-py310-gpu
resources:
limits:
cpu: '8'
memory: '32Gi'
nvidia.com/gpu: 1
모니터링
# Ray Dashboard: http://localhost:8265
# Serve 메트릭: http://localhost:8000/-/metrics
# Prometheus 메트릭
# ray_serve_deployment_request_counter
# ray_serve_deployment_error_counter
# ray_serve_deployment_processing_latency_ms
# ray_serve_deployment_replica_starts
# ray_serve_num_ongoing_requests
# Grafana 대시보드 쿼리
panels:
- title: 'Request Latency (p99)'
query: |
histogram_quantile(0.99,
rate(ray_serve_deployment_processing_latency_ms_bucket[5m])
)
- title: 'Throughput (req/s)'
query: |
rate(ray_serve_deployment_request_counter[1m])
- title: 'Active Replicas'
query: |
ray_serve_deployment_replica_healthy_total
A/B 테스트
import random
@serve.deployment
class ABRouter:
def __init__(self, model_a, model_b, traffic_split=0.9):
self.model_a = model_a # 안정 버전
self.model_b = model_b # 실험 버전
self.split = traffic_split
async def __call__(self, request):
if random.random() < self.split:
return await self.model_a.__call__.remote(request)
else:
return await self.model_b.__call__.remote(request)
model_v1 = StableModel.bind()
model_v2 = ExperimentalModel.bind()
app = ABRouter.bind(model_v1, model_v2, traffic_split=0.95)
퀴즈
Q1. Ray Serve의 Deployment란?
단일 ML 모델이나 비즈니스 로직을 감싸는 확장 가능한 단위입니다. 복제본 수, GPU 할당, 오토스케일링 등을 독립적으로 설정할 수 있습니다.
Q2. Ray Serve에서 멀티모델 파이프라인의 장점은?
각 모델을 독립적인 Deployment로 분리하여 개별적으로 스케일링하고, DAG 형태로 병렬/순차 처리를 자유롭게 구성할 수 있습니다.
Q3. @serve.batch 데코레이터의 역할은?
개별 요청을 자동으로 모아서 배치로 처리합니다. GPU 활용률을 높이고 처리량을 극대화합니다. max_batch_size와 timeout으로 제어합니다.
Q4. target_ongoing_requests 오토스케일링 설정의 의미는?
복제본당 목표 동시 처리 요청 수입니다. 이 값을 초과하면 스케일 업, 미만이면 스케일 다운합니다.
Q5. KubeRay의 RayService 리소스는 어떤 역할을 하나요?
Kubernetes에서 Ray 클러스터를 자동 생성하고, Ray Serve 애플리케이션을 배포/관리합니다. Worker 노드 오토스케일링도 처리합니다.
Q6. placement_group_strategy의 STRICT_PACK 의미는?
모든 GPU가 같은 물리 노드에 배치되도록 강제합니다. Tensor Parallelism처럼 GPU 간 고속 통신이 필요한 경우에 사용합니다.
Q7. Ray Serve vs 직접 Flask/FastAPI 서빙의 차이는?
Ray Serve는 분산 컴퓨팅, 오토스케일링, GPU 관리, 배치 처리, 멀티모델 파이프라인을 네이티브로 지원합니다. Flask/FastAPI는 단일 프로세스 서빙에 적합합니다.
마무리
Ray Serve는 복잡한 ML/LLM 서빙 파이프라인을 Python 코드만으로 구성하고 확장할 수 있는 강력한 프레임워크입니다. vLLM 통합으로 LLM 서빙이 간편해졌고, KubeRay를 통한 Kubernetes 네이티브 배포로 프로덕션 운영도 용이합니다.
참고 자료
Building Scalable LLM Serving Pipelines with Ray Serve
- Introduction
- Ray Serve Core Concepts
- LLM Serving (vLLM Integration)
- Multi-Model Pipeline
- Batch Inference
- Multi-Node Deployment
- Kubernetes Deployment (KubeRay)
- Monitoring
- A/B Testing
- Quiz
- Conclusion
- References

Introduction
When deploying ML/LLM models to production, the biggest challenges are scalability and complex pipeline management. Serving a single model is straightforward, but real-world services require multi-step pipelines such as preprocessing, model inference, post-processing, and re-ranking.
Ray Serve is a scalable model serving framework built on top of Ray that lets you compose complex serving pipelines using native Python code.
Ray Serve Core Concepts
Deployment
from ray import serve
import ray
# Connect to Ray cluster
ray.init()
# Simplest Deployment
@serve.deployment
class HelloModel:
def __call__(self, request):
return {"message": "Hello from Ray Serve!"}
# Deploy
app = HelloModel.bind()
serve.run(app, route_prefix="/hello")
Deployment Configuration
@serve.deployment(
num_replicas=3, # Number of replicas
max_ongoing_requests=100, # Max concurrent requests per replica
ray_actor_options={
"num_cpus": 2,
"num_gpus": 1,
"memory": 4 * 1024**3, # 4GB
},
health_check_period_s=10,
health_check_timeout_s=30,
graceful_shutdown_wait_loop_s=2,
graceful_shutdown_timeout_s=20,
)
class MLModel:
def __init__(self, model_path: str):
self.model = self.load_model(model_path)
def load_model(self, path):
import torch
return torch.load(path)
async def __call__(self, request):
data = await request.json()
prediction = self.model.predict(data["features"])
return {"prediction": prediction.tolist()}
Autoscaling
@serve.deployment(
autoscaling_config={
"min_replicas": 1,
"max_replicas": 10,
"initial_replicas": 2,
"target_ongoing_requests": 5, # Target concurrent requests per replica
"upscale_delay_s": 30,
"downscale_delay_s": 300,
"upscaling_factor": 1.5, # Scale-up multiplier
"downscaling_factor": 0.7, # Scale-down multiplier
"smoothing_factor": 0.5,
"metrics_interval_s": 10,
}
)
class AutoScaledModel:
def __init__(self):
from transformers import pipeline
self.classifier = pipeline("sentiment-analysis")
async def __call__(self, request):
data = await request.json()
result = self.classifier(data["text"])
return result
LLM Serving (vLLM Integration)
from ray import serve
from ray.serve.llm import LLMServer, LLMConfig
# Using Ray Serve LLM module (vLLM backend)
llm_config = LLMConfig(
model="meta-llama/Llama-3.1-8B-Instruct",
tensor_parallel_size=1,
max_model_len=8192,
gpu_memory_utilization=0.9,
)
# Automatically generates OpenAI-compatible endpoints
app = LLMServer.bind(llm_config)
serve.run(app)
Custom LLM Serving
@serve.deployment(
ray_actor_options={"num_gpus": 1},
autoscaling_config={
"min_replicas": 1,
"max_replicas": 4,
"target_ongoing_requests": 3,
},
)
class CustomLLMDeployment:
def __init__(self):
from vllm import LLM, SamplingParams
self.llm = LLM(
model="meta-llama/Llama-3.1-8B-Instruct",
dtype="auto",
max_model_len=4096,
gpu_memory_utilization=0.85,
)
self.default_params = SamplingParams(
temperature=0.7,
top_p=0.9,
max_tokens=1024,
)
async def __call__(self, request):
data = await request.json()
prompt = data["prompt"]
params = SamplingParams(
temperature=data.get("temperature", 0.7),
max_tokens=data.get("max_tokens", 1024),
)
outputs = self.llm.generate([prompt], params)
return {
"text": outputs[0].outputs[0].text,
"tokens": len(outputs[0].outputs[0].token_ids),
}
Multi-Model Pipeline
@serve.deployment(num_replicas=2, ray_actor_options={"num_cpus": 1})
class Preprocessor:
"""Text preprocessing"""
def __init__(self):
from transformers import AutoTokenizer
self.tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
async def preprocess(self, text: str) -> dict:
# Language detection, normalization, tokenization
cleaned = text.strip().lower()
tokens = self.tokenizer.encode(cleaned, max_length=512, truncation=True)
return {"text": cleaned, "tokens": tokens, "length": len(tokens)}
@serve.deployment(ray_actor_options={"num_gpus": 1})
class SentimentModel:
"""Sentiment analysis model"""
def __init__(self):
from transformers import pipeline
self.model = pipeline(
"sentiment-analysis",
model="nlptown/bert-base-multilingual-uncased-sentiment",
device=0
)
async def predict(self, text: str) -> dict:
result = self.model(text)[0]
return {"label": result["label"], "score": result["score"]}
@serve.deployment(ray_actor_options={"num_gpus": 1})
class SummaryModel:
"""Summarization model"""
def __init__(self):
from transformers import pipeline
self.model = pipeline("summarization", device=0)
async def summarize(self, text: str) -> str:
if len(text.split()) < 30:
return text
result = self.model(text, max_length=100, min_length=30)
return result[0]["summary_text"]
@serve.deployment
class Pipeline:
"""Orchestrator — combines multiple models"""
def __init__(self, preprocessor, sentiment, summary):
self.preprocessor = preprocessor
self.sentiment = sentiment
self.summary = summary
async def __call__(self, request):
data = await request.json()
text = data["text"]
# Preprocessing
processed = await self.preprocessor.preprocess.remote(text)
# Parallel inference
sentiment_future = self.sentiment.predict.remote(text)
summary_future = self.summary.summarize.remote(text)
sentiment_result = await sentiment_future
summary_result = await summary_future
return {
"original_length": processed["length"],
"sentiment": sentiment_result,
"summary": summary_result,
}
# Pipeline composition (DAG)
preprocessor = Preprocessor.bind()
sentiment = SentimentModel.bind()
summary = SummaryModel.bind()
app = Pipeline.bind(preprocessor, sentiment, summary)
serve.run(app, route_prefix="/analyze")
Batch Inference
@serve.deployment(
ray_actor_options={"num_gpus": 1},
)
class BatchedModel:
def __init__(self):
from transformers import pipeline
self.model = pipeline("text-classification", device=0)
@serve.batch(max_batch_size=32, batch_wait_timeout_s=0.1)
async def __call__(self, texts: list[str]) -> list[dict]:
"""
Ray Serve automatically batches individual requests.
Up to 32 requests, or executes the batch after 0.1 second wait.
"""
results = self.model(texts)
return results
Multi-Node Deployment
# Large model (deployed across multiple GPUs/nodes)
@serve.deployment(
ray_actor_options={
"num_gpus": 4, # Use 4 GPUs
},
placement_group_strategy="STRICT_PACK", # Place on the same node
)
class LargeModel:
def __init__(self):
from vllm import LLM
self.llm = LLM(
model="meta-llama/Llama-3.1-70B-Instruct",
tensor_parallel_size=4,
)
Kubernetes Deployment (KubeRay)
# Ray Cluster definition
apiVersion: ray.io/v1
kind: RayService
metadata:
name: llm-service
spec:
serveConfigV2: |
applications:
- name: llm-app
import_path: serve_app:app
runtime_env:
pip:
- vllm>=0.6.0
- transformers
deployments:
- name: LLMDeployment
num_replicas: 2
ray_actor_options:
num_gpus: 1
autoscaling_config:
min_replicas: 1
max_replicas: 4
rayClusterConfig:
headGroupSpec:
rayStartParams:
dashboard-host: '0.0.0.0'
template:
spec:
containers:
- name: ray-head
image: rayproject/ray-ml:2.40.0-py310-gpu
resources:
limits:
cpu: '4'
memory: '16Gi'
ports:
- containerPort: 6379
- containerPort: 8265
- containerPort: 8000
workerGroupSpecs:
- groupName: gpu-workers
replicas: 2
minReplicas: 1
maxReplicas: 4
rayStartParams: {}
template:
spec:
containers:
- name: ray-worker
image: rayproject/ray-ml:2.40.0-py310-gpu
resources:
limits:
cpu: '8'
memory: '32Gi'
nvidia.com/gpu: 1
Monitoring
# Ray Dashboard: http://localhost:8265
# Serve metrics: http://localhost:8000/-/metrics
# Prometheus metrics
# ray_serve_deployment_request_counter
# ray_serve_deployment_error_counter
# ray_serve_deployment_processing_latency_ms
# ray_serve_deployment_replica_starts
# ray_serve_num_ongoing_requests
# Grafana dashboard queries
panels:
- title: 'Request Latency (p99)'
query: |
histogram_quantile(0.99,
rate(ray_serve_deployment_processing_latency_ms_bucket[5m])
)
- title: 'Throughput (req/s)'
query: |
rate(ray_serve_deployment_request_counter[1m])
- title: 'Active Replicas'
query: |
ray_serve_deployment_replica_healthy_total
A/B Testing
import random
@serve.deployment
class ABRouter:
def __init__(self, model_a, model_b, traffic_split=0.9):
self.model_a = model_a # Stable version
self.model_b = model_b # Experimental version
self.split = traffic_split
async def __call__(self, request):
if random.random() < self.split:
return await self.model_a.__call__.remote(request)
else:
return await self.model_b.__call__.remote(request)
model_v1 = StableModel.bind()
model_v2 = ExperimentalModel.bind()
app = ABRouter.bind(model_v1, model_v2, traffic_split=0.95)
Quiz
Q1. What is a Deployment in Ray Serve?
A scalable unit that wraps a single ML model or business logic. You can independently configure the number of replicas, GPU allocation, autoscaling, and more.
Q2. What are the advantages of multi-model pipelines in Ray Serve?
Each model is separated into an independent Deployment that can be scaled individually, and you can freely compose parallel/sequential processing in a DAG structure.
Q3. What does the @serve.batch decorator do?
It automatically collects individual requests and processes them as a batch. This maximizes GPU utilization and throughput. It is controlled via max_batch_size and timeout.
Q4. What does the target_ongoing_requests autoscaling setting mean?
It is the target number of concurrent requests per replica. When this value is exceeded, the system scales up; when it falls below, it scales down.
Q5. What role does KubeRay's RayService resource play?
It automatically creates a Ray cluster on Kubernetes and deploys/manages Ray Serve applications. It also handles worker node autoscaling.
Q6. What does STRICT_PACK mean in placement_group_strategy?
It forces all GPUs to be placed on the same physical node. This is used when high-speed inter-GPU communication is required, such as with Tensor Parallelism.
Q7. How does Ray Serve differ from serving directly with Flask/FastAPI?
Ray Serve natively supports distributed computing, autoscaling, GPU management, batch processing, and multi-model pipelines. Flask/FastAPI is better suited for single-process serving.
Conclusion
Ray Serve is a powerful framework that lets you compose and scale complex ML/LLM serving pipelines using nothing but Python code. LLM serving has become straightforward with vLLM integration, and production operations are simplified through Kubernetes-native deployment via KubeRay.