- Published on
AI Model Serving and Inference Optimization Complete Guide: vLLM, TensorRT, Triton, Ollama
- Authors

- Name
- Youngju Kim
- @fjvbn20031
Introduction
Developing an AI model and serving it efficiently in production are entirely different challenges. Serving a GPT-scale LLM to millions of users with sub-100ms response times, or handling real-time image classification on edge devices, requires substantial optimization expertise.
This guide gives you complete mastery of the core AI model serving tools (vLLM, TensorRT, NVIDIA Triton, Ollama) and optimization techniques through real-world examples.
1. The Challenges and Goals of AI Inference
1.1 Training vs Inference
Training and inference have fundamentally different computing requirements.
| Category | Training | Inference |
|---|---|---|
| Goal | Optimize model parameters | Fast prediction generation |
| Batch size | Large (128–2048) | Small or streaming |
| Memory | Gradient storage required | Only activations needed |
| Precision | FP32 or FP16 | INT8, INT4 possible |
| Accelerator | A100, H100 (expensive) | T4, L4, RTX (cheaper) |
| Cost pattern | One-time large cost | Ongoing small cost |
1.2 Latency vs Throughput
The two most important metrics in inference optimization:
Latency
- Response time for a single request
- Critical for real-time applications (chatbots, autocomplete)
- Measured by P50, P95, P99 percentiles
- Target: below 100ms (general), below 50ms (real-time)
Throughput
- Number of requests handled per unit time (QPS, Tokens/second)
- Critical for batch processing, offline inference
- Trade-off relationship with latency
# Latency vs throughput measurement example
import time
import numpy as np
from typing import List
def measure_latency(model_fn, inputs: List, n_runs: int = 100):
"""Measure inference latency"""
latencies = []
# Warm-up
for _ in range(10):
_ = model_fn(inputs[0])
# Measure
for inp in inputs[:n_runs]:
start = time.perf_counter()
_ = model_fn(inp)
end = time.perf_counter()
latencies.append((end - start) * 1000) # ms
latencies = np.array(latencies)
return {
"p50_ms": np.percentile(latencies, 50),
"p95_ms": np.percentile(latencies, 95),
"p99_ms": np.percentile(latencies, 99),
"mean_ms": np.mean(latencies),
"std_ms": np.std(latencies),
}
def measure_throughput(model_fn, inputs: List, duration_sec: int = 60):
"""Measure inference throughput"""
count = 0
start = time.time()
while time.time() - start < duration_sec:
_ = model_fn(inputs[count % len(inputs)])
count += 1
elapsed = time.time() - start
return {
"qps": count / elapsed,
"total_requests": count,
"duration_sec": elapsed,
}
1.3 Hardware Selection Guide
GPU (NVIDIA):
A100 80GB: Best performance, optimal for training/inference, expensive
H100 80GB: Current top-of-line, specialized for LLM inference
A10G 24GB: Widely used on AWS, mid-range performance
T4 16GB: Cost-efficient, inference-only, cheap on AWS/GCP
L4 24GB: T4 successor, inference-optimized
RTX 4090 24GB: Small-scale deployment, local LLMs
CPU:
Pros: Cheap, universally available, large memory
Cons: Limited parallelism, slow matrix ops
Use: INT8 quantized models, small models, edge
TPU (Google):
Cloud TPU v4: Large-scale LLM training/serving
TPU v5e: Inference-optimized version
NPU (Edge):
Apple Neural Engine: Core ML models on iPhone/Mac
Qualcomm AI Engine: On-device Android inference
2. Model Optimization Techniques
2.1 Quantization
Quantization reduces memory and computation by representing model weights and activations in lower bit precision.
FP32 (32bit) → FP16 (16bit) → BF16 (16bit) → INT8 (8bit) → INT4 (4bit)
Memory: 100% 50% 50% 25% 12.5%
Speed: baseline 1.5-2x 1.5-2x 2-4x 4-8x
Accuracy loss: none negligible negligible minor moderate
Post-Training Quantization (PTQ)
# PyTorch PTQ example
import torch
from torch.quantization import quantize_dynamic, prepare, convert
model = MyModel()
model.load_state_dict(torch.load("model.pth"))
model.eval()
# Dynamic quantization (weights only, INT8)
quantized_model = quantize_dynamic(
model,
{torch.nn.Linear},
dtype=torch.qint8
)
def get_model_size_mb(model):
import io
buffer = io.BytesIO()
torch.save(model.state_dict(), buffer)
return buffer.tell() / (1024 * 1024)
print(f"Original model: {get_model_size_mb(model):.2f} MB")
print(f"Quantized model: {get_model_size_mb(quantized_model):.2f} MB")
# Static quantization (weights + activations, INT8)
from torch.quantization import get_default_qconfig
model.qconfig = get_default_qconfig('x86')
prepared_model = prepare(model)
with torch.no_grad():
for batch in calibration_loader:
prepared_model(batch)
static_quantized_model = convert(prepared_model)
GPTQ - LLM Quantization
# INT4 LLM quantization using GPTQ
from transformers import AutoModelForCausalLM, AutoTokenizer, GPTQConfig
model_id = "meta-llama/Llama-2-7b-hf"
gptq_config = GPTQConfig(
bits=4,
dataset="wikitext2",
block_size=128,
damp_percent=0.01,
)
tokenizer = AutoTokenizer.from_pretrained(model_id)
quantized_model = AutoModelForCausalLM.from_pretrained(
model_id,
quantization_config=gptq_config,
device_map="auto"
)
quantized_model.save_pretrained("llama2-7b-gptq-int4")
tokenizer.save_pretrained("llama2-7b-gptq-int4")
AWQ - Activation-Aware Quantization
# AWQ quantization (higher-quality INT4)
from awq import AutoAWQForCausalLM
from transformers import AutoTokenizer
model_path = "meta-llama/Llama-2-7b-hf"
quant_path = "llama2-7b-awq"
quant_config = {
"zero_point": True,
"q_group_size": 128,
"w_bit": 4,
"version": "GEMM"
}
model = AutoAWQForCausalLM.from_pretrained(model_path, device_map="cuda")
tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
model.quantize(tokenizer, quant_config=quant_config)
model.save_quantized(quant_path)
2.2 Pruning
import torch
import torch.nn.utils.prune as prune
model = MyConvNet()
# Unstructured pruning (L1 norm-based, 50% sparsity)
prune.l1_unstructured(
model.conv1,
name='weight',
amount=0.5
)
# Structured pruning (channel-level - actually accelerates inference)
prune.ln_structured(
model.conv1,
name='weight',
amount=0.3,
n=2,
dim=0 # output channel dimension
)
# Global pruning (remove top 20% across entire model)
parameters_to_prune = (
(model.conv1, 'weight'),
(model.conv2, 'weight'),
(model.fc1, 'weight'),
)
prune.global_unstructured(
parameters_to_prune,
pruning_method=prune.L1Unstructured,
amount=0.2,
)
# Make pruning permanent
prune.remove(model.conv1, 'weight')
def print_sparsity(model):
for name, module in model.named_modules():
if isinstance(module, torch.nn.Conv2d):
sparsity = 100. * float(torch.sum(module.weight == 0)) / float(module.weight.nelement())
print(f"{name}: {sparsity:.1f}% sparsity")
2.3 Knowledge Distillation
import torch
import torch.nn as nn
import torch.nn.functional as F
class DistillationTrainer:
"""Teacher-Student Knowledge Distillation"""
def __init__(self, teacher, student, temperature=4.0, alpha=0.7):
self.teacher = teacher
self.student = student
self.temperature = temperature
self.alpha = alpha # Soft label weight
self.teacher.eval() # Freeze teacher
def distillation_loss(self, student_logits, teacher_logits, labels):
"""Distillation loss = soft label loss + hard label loss"""
T = self.temperature
# Soft label loss (uses teacher's knowledge)
soft_targets = F.softmax(teacher_logits / T, dim=1)
soft_pred = F.log_softmax(student_logits / T, dim=1)
soft_loss = F.kl_div(soft_pred, soft_targets, reduction='batchmean') * (T ** 2)
# Hard label loss (ground truth)
hard_loss = F.cross_entropy(student_logits, labels)
total_loss = self.alpha * soft_loss + (1 - self.alpha) * hard_loss
return total_loss
def train_step(self, inputs, labels, optimizer):
optimizer.zero_grad()
with torch.no_grad():
teacher_logits = self.teacher(inputs)
student_logits = self.student(inputs)
loss = self.distillation_loss(student_logits, teacher_logits, labels)
loss.backward()
optimizer.step()
return loss.item()
2.4 TorchScript and ONNX Conversion
import torch
import torch.onnx
model = MyModel()
model.eval()
example_input = torch.randn(1, 3, 224, 224)
# TorchScript tracing
traced_model = torch.jit.trace(model, example_input)
traced_model.save("model_traced.pt")
# TorchScript scripting (supports dynamic control flow)
scripted_model = torch.jit.script(model)
scripted_model.save("model_scripted.pt")
# ONNX export
torch.onnx.export(
model,
example_input,
"model.onnx",
opset_version=17,
input_names=["input"],
output_names=["output"],
dynamic_axes={
"input": {0: "batch_size"},
"output": {0: "batch_size"},
},
verbose=False
)
# Validate ONNX model
import onnx
onnx_model = onnx.load("model.onnx")
onnx.checker.check_model(onnx_model)
# ONNX Runtime inference
import onnxruntime as ort
import numpy as np
session = ort.InferenceSession(
"model.onnx",
providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
)
input_name = session.get_inputs()[0].name
output_name = session.get_outputs()[0].name
input_data = np.random.randn(1, 3, 224, 224).astype(np.float32)
outputs = session.run([output_name], {input_name: input_data})
print(f"Output shape: {outputs[0].shape}")
3. TensorRT
3.1 Introduction to TensorRT
TensorRT is NVIDIA's deep learning inference optimization SDK. It automatically performs these optimizations:
- Layer Fusion: Merges Conv+BN+ReLU into a single operation
- Kernel Auto-Selection: Chooses optimized CUDA kernels for the GPU architecture
- FP16/INT8 Calibration: Minimizes accuracy loss during precision reduction
- Memory Reuse: Optimal tensor memory allocation
3.2 TensorRT Conversion via Python API
import tensorrt as trt
import numpy as np
import pycuda.driver as cuda
import pycuda.autoinit
TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
def build_engine_from_onnx(onnx_path: str, precision: str = "fp16") -> trt.ICudaEngine:
"""Convert ONNX model to TensorRT engine"""
with trt.Builder(TRT_LOGGER) as builder, \
builder.create_network(
1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH)
) as network, \
trt.OnnxParser(network, TRT_LOGGER) as parser:
config = builder.create_builder_config()
config.set_memory_pool_limit(
trt.MemoryPoolType.WORKSPACE,
4 * 1024 * 1024 * 1024 # 4GB
)
if precision == "fp16":
config.set_flag(trt.BuilderFlag.FP16)
elif precision == "int8":
config.set_flag(trt.BuilderFlag.INT8)
config.int8_calibrator = MyCalibrator()
with open(onnx_path, 'rb') as model:
if not parser.parse(model.read()):
for error in range(parser.num_errors):
print(f"ONNX parse error: {parser.get_error(error)}")
raise ValueError("ONNX parsing failed")
# Dynamic input shape (variable batch size)
profile = builder.create_optimization_profile()
profile.set_shape(
"input",
min=(1, 3, 224, 224),
opt=(8, 3, 224, 224),
max=(32, 3, 224, 224)
)
config.add_optimization_profile(profile)
serialized_engine = builder.build_serialized_network(network, config)
runtime = trt.Runtime(TRT_LOGGER)
return runtime.deserialize_cuda_engine(serialized_engine)
class TRTInferenceEngine:
"""TensorRT inference engine wrapper"""
def __init__(self, engine_path: str):
runtime = trt.Runtime(TRT_LOGGER)
with open(engine_path, 'rb') as f:
self.engine = runtime.deserialize_cuda_engine(f.read())
self.context = self.engine.create_execution_context()
self.inputs = []
self.outputs = []
self.bindings = []
for binding in self.engine:
size = trt.volume(self.engine.get_binding_shape(binding))
dtype = trt.nptype(self.engine.get_binding_dtype(binding))
host_mem = cuda.pagelocked_empty(size, dtype)
device_mem = cuda.mem_alloc(host_mem.nbytes)
self.bindings.append(int(device_mem))
if self.engine.binding_is_input(binding):
self.inputs.append({'host': host_mem, 'device': device_mem})
else:
self.outputs.append({'host': host_mem, 'device': device_mem})
def infer(self, input_data: np.ndarray) -> np.ndarray:
np.copyto(self.inputs[0]['host'], input_data.ravel())
cuda.memcpy_htod(self.inputs[0]['device'], self.inputs[0]['host'])
self.context.execute_v2(bindings=self.bindings)
cuda.memcpy_dtoh(self.outputs[0]['host'], self.outputs[0]['device'])
return self.outputs[0]['host'].copy()
3.3 Torch-TensorRT
import torch_tensorrt
model = MyResNet50()
model.eval()
model.cuda()
traced_model = torch.jit.trace(model, torch.randn(1, 3, 224, 224).cuda())
trt_model = torch_tensorrt.compile(
traced_model,
inputs=[
torch_tensorrt.Input(
min_shape=[1, 3, 224, 224],
opt_shape=[8, 3, 224, 224],
max_shape=[32, 3, 224, 224],
dtype=torch.float32
)
],
enabled_precisions={torch.float16},
workspace_size=4 * 1024 * 1024 * 1024,
)
torch.jit.save(trt_model, "model_trt.ts")
loaded_model = torch.jit.load("model_trt.ts")
# Speed comparison
import time
input_tensor = torch.randn(8, 3, 224, 224).cuda()
with torch.no_grad():
start = time.perf_counter()
for _ in range(100):
_ = model(input_tensor)
pytorch_time = (time.perf_counter() - start) / 100 * 1000
start = time.perf_counter()
for _ in range(100):
_ = loaded_model(input_tensor)
trt_time = (time.perf_counter() - start) / 100 * 1000
print(f"PyTorch: {pytorch_time:.2f}ms, TensorRT: {trt_time:.2f}ms")
print(f"Speedup: {pytorch_time / trt_time:.2f}x")
4. NVIDIA Triton Inference Server
4.1 Introduction to Triton
NVIDIA Triton Inference Server is an open-source inference server that serves models from various ML frameworks in production.
Key features:
- Multi-framework support (TensorRT, ONNX, PyTorch, TensorFlow, Python)
- Dynamic Batching
- Concurrent Model Execution
- Efficient GPU/CPU resource utilization
- Model ensemble pipelines
- gRPC and HTTP REST API
4.2 Model Repository Structure
model_repository/
├── resnet50/
│ ├── config.pbtxt
│ ├── 1/
│ │ └── model.plan # TensorRT engine
│ └── 2/
│ └── model.plan
├── bert_onnx/
│ ├── config.pbtxt
│ └── 1/
│ └── model.onnx
└── custom_model/
├── config.pbtxt
└── 1/
└── model.py
4.3 Configuration File (config.pbtxt)
# model_repository/resnet50/config.pbtxt
name: "resnet50"
platform: "tensorrt_plan"
max_batch_size: 32
input [
{
name: "input"
data_type: TYPE_FP32
dims: [3, 224, 224]
}
]
output [
{
name: "output"
data_type: TYPE_FP32
dims: [1000]
}
]
dynamic_batching {
preferred_batch_size: [4, 8, 16, 32]
max_queue_delay_microseconds: 5000 # 5ms wait
}
instance_group [
{
count: 2
kind: KIND_GPU
gpus: [0]
}
]
# model_repository/bert_onnx/config.pbtxt
name: "bert_onnx"
platform: "onnxruntime_onnx"
max_batch_size: 8
input [
{
name: "input_ids"
data_type: TYPE_INT64
dims: [128]
},
{
name: "attention_mask"
data_type: TYPE_INT64
dims: [128]
}
]
output [
{
name: "last_hidden_state"
data_type: TYPE_FP32
dims: [128, 768]
}
]
dynamic_batching {
max_queue_delay_microseconds: 10000
}
optimization {
execution_accelerators {
gpu_execution_accelerator: [
{
name: "tensorrt"
parameters { key: "precision_mode" value: "FP16" }
parameters { key: "max_workspace_size_bytes" value: "1073741824" }
}
]
}
}
4.4 Python Backend Model
# model_repository/custom_model/1/model.py
import numpy as np
import json
import triton_python_backend_utils as pb_utils
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
class TritonPythonModel:
def initialize(self, args):
"""Called once at server startup"""
self.device = 'cuda' if args['model_instance_kind'] == 'GPU' else 'cpu'
model_name = "distilbert-base-uncased-finetuned-sst-2-english"
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForSequenceClassification.from_pretrained(model_name)
self.model.to(self.device)
self.model.eval()
def execute(self, requests):
"""Execute batch inference"""
responses = []
for request in requests:
input_text = pb_utils.get_input_tensor_by_name(request, "TEXT")
texts = input_text.as_numpy().tolist()
texts = [t[0].decode('utf-8') for t in texts]
inputs = self.tokenizer(
texts,
return_tensors="pt",
padding=True,
truncation=True,
max_length=128
).to(self.device)
with torch.no_grad():
outputs = self.model(**inputs)
probs = torch.softmax(outputs.logits, dim=1).cpu().numpy()
output_tensor = pb_utils.Tensor("PROBABILITIES", probs.astype(np.float32))
response = pb_utils.InferenceResponse(output_tensors=[output_tensor])
responses.append(response)
return responses
def finalize(self):
del self.model
torch.cuda.empty_cache()
4.5 Deploying Triton with Docker
# Start Triton server
docker run --gpus all \
-p 8000:8000 \
-p 8001:8001 \
-p 8002:8002 \
-v /path/to/model_repository:/models \
--shm-size=1g \
nvcr.io/nvidia/tritonserver:24.02-py3 \
tritonserver \
--model-repository=/models \
--log-verbose=1 \
--strict-model-config=false
# Check readiness
curl http://localhost:8000/v2/health/ready
# Query model info
curl http://localhost:8000/v2/models/resnet50
# Python client for Triton
import tritonclient.http as httpclient
import numpy as np
client = httpclient.InferenceServerClient(url="localhost:8000")
input_data = np.random.randn(1, 3, 224, 224).astype(np.float32)
inputs = [httpclient.InferInput("input", input_data.shape, "FP32")]
inputs[0].set_data_from_numpy(input_data)
outputs = [httpclient.InferRequestedOutput("output")]
result = client.infer(
model_name="resnet50",
model_version="1",
inputs=inputs,
outputs=outputs
)
output = result.as_numpy("output")
print(f"Output shape: {output.shape}")
print(f"Top-5 predictions: {np.argsort(output[0])[-5:][::-1]}")
5. vLLM - High-Speed LLM Serving
5.1 Introduction to vLLM
vLLM is a high-performance serving library for LLM inference. It achieves up to 24x higher throughput compared to standard HuggingFace Transformers.
Core technologies:
- PagedAttention: Manages KV cache in pages to minimize memory waste
- Continuous Batching: Dynamically processes requests instead of fixed batches
- CUDA Kernel Optimization: Optimized attention kernels including FlashAttention
5.2 vLLM Installation and Basic Usage
# Install vLLM (CUDA 12.1)
pip install vllm
# Specific CUDA version
pip install vllm --extra-index-url https://download.pytorch.org/whl/cu121
from vllm import LLM, SamplingParams
# Load model
llm = LLM(
model="meta-llama/Llama-3.1-8B-Instruct",
tensor_parallel_size=1,
gpu_memory_utilization=0.9,
max_model_len=4096,
)
# Sampling parameters
sampling_params = SamplingParams(
temperature=0.8,
top_p=0.95,
max_tokens=512,
stop=["</s>", "[INST]"],
)
# Batch inference (multiple prompts simultaneously)
prompts = [
"Implement a fibonacci sequence in Python",
"Explain the difference between machine learning and deep learning",
"Describe the types of SQL JOINs",
]
outputs = llm.generate(prompts, sampling_params)
for output in outputs:
print(f"Prompt: {output.prompt[:50]}...")
print(f"Generated: {output.outputs[0].text}")
print(f"Tokens: {len(output.outputs[0].token_ids)}")
print("---")
5.3 OpenAI-Compatible API Server
# Start vLLM OpenAI-compatible server
python -m vllm.entrypoints.openai.api_server \
--model meta-llama/Llama-3.1-8B-Instruct \
--host 0.0.0.0 \
--port 8000 \
--tensor-parallel-size 1 \
--gpu-memory-utilization 0.9 \
--max-model-len 4096 \
--served-model-name llama3-8b
# Serve quantized model
python -m vllm.entrypoints.openai.api_server \
--model TheBloke/Llama-2-7B-Chat-GPTQ \
--quantization gptq \
--dtype float16 \
--port 8000
# Use vLLM with OpenAI client
from openai import OpenAI
client = OpenAI(
base_url="http://localhost:8000/v1",
api_key="not-needed"
)
# Chat completion
response = client.chat.completions.create(
model="llama3-8b",
messages=[
{"role": "system", "content": "You are a helpful AI assistant."},
{"role": "user", "content": "Explain asynchronous programming in Python"},
],
temperature=0.7,
max_tokens=1000,
stream=False,
)
print(response.choices[0].message.content)
# Streaming response
stream = client.chat.completions.create(
model="llama3-8b",
messages=[{"role": "user", "content": "What is the capital of France?"}],
stream=True,
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="", flush=True)
5.4 vLLM Quantization Serving
from vllm import LLM, SamplingParams
# GPTQ INT4 quantized model
llm_gptq = LLM(
model="TheBloke/Llama-2-7B-GPTQ",
quantization="gptq",
dtype="float16",
gpu_memory_utilization=0.85,
)
# AWQ INT4 quantized model
llm_awq = LLM(
model="TheBloke/Llama-2-7B-AWQ",
quantization="awq",
dtype="float16",
)
# FP8 quantization (best performance on H100)
llm_fp8 = LLM(
model="meta-llama/Llama-3.1-8B-Instruct",
quantization="fp8",
dtype="bfloat16",
)
def benchmark_throughput(llm, prompts, n_iterations=5):
import time
sampling_params = SamplingParams(max_tokens=200, temperature=0.8)
llm.generate(prompts[:2], sampling_params) # Warm-up
start = time.time()
for _ in range(n_iterations):
outputs = llm.generate(prompts, sampling_params)
elapsed = time.time() - start
total_tokens = sum(
len(o.outputs[0].token_ids)
for o in outputs
)
return {
"tokens_per_second": total_tokens * n_iterations / elapsed,
"latency_per_batch_ms": elapsed / n_iterations * 1000,
}
5.5 LoRA Adapter Serving
from vllm import LLM
from vllm.lora.request import LoRARequest
llm = LLM(
model="meta-llama/Llama-3.1-8B-Instruct",
enable_lora=True,
max_lora_rank=64,
max_loras=4,
)
sampling_params = SamplingParams(temperature=0.0, max_tokens=200)
outputs = llm.generate(
"Explain the history of the Roman Empire",
sampling_params=sampling_params,
lora_request=LoRARequest(
"history-lora",
1,
"/path/to/history-lora-adapter"
)
)
6. Ollama - Local LLM Serving
6.1 Introduction to Ollama
Ollama makes it easy to run LLMs locally. You can run various LLMs with a single terminal command, without complex configuration.
6.2 Installation and Basic Usage
# macOS/Linux installation
curl -fsSL https://ollama.com/install.sh | sh
# Download and run models
ollama run llama3.1
# Other popular models
ollama run mistral
ollama run codellama
ollama run phi3
ollama run gemma2
# Start background service
ollama serve
# List installed models
ollama list
# Remove model
ollama rm llama3.1
# Show model info
ollama show llama3.1
6.3 REST API Usage
import requests
import json
def ollama_generate(prompt: str, model: str = "llama3.1") -> str:
response = requests.post(
"http://localhost:11434/api/generate",
json={
"model": model,
"prompt": prompt,
"stream": False,
}
)
return response.json()["response"]
def ollama_stream(prompt: str, model: str = "llama3.1"):
response = requests.post(
"http://localhost:11434/api/generate",
json={
"model": model,
"prompt": prompt,
"stream": True,
},
stream=True
)
for line in response.iter_lines():
if line:
data = json.loads(line)
yield data.get("response", "")
if data.get("done", False):
break
def ollama_chat(messages: list, model: str = "llama3.1") -> str:
response = requests.post(
"http://localhost:11434/api/chat",
json={
"model": model,
"messages": messages,
"stream": False,
}
)
return response.json()["message"]["content"]
# Usage examples
result = ollama_generate("Explain decorators in Python")
print(result)
for token in ollama_stream("Explain machine learning basics"):
print(token, end="", flush=True)
messages = [
{"role": "system", "content": "You are a Python expert."},
{"role": "user", "content": "What is the difference between generators and iterators?"},
]
print(ollama_chat(messages))
6.4 Custom Modelfile
# Modelfile - Custom system prompt and settings
FROM llama3.1
SYSTEM """
You are a senior software engineer who provides clear, concise answers.
Always include code examples and be honest when you don't know something.
"""
PARAMETER temperature 0.7
PARAMETER top_p 0.9
PARAMETER top_k 40
PARAMETER num_ctx 4096
PARAMETER num_predict 512
PARAMETER stop "<|im_end|>"
# Create custom model
ollama create my-coding-assistant -f Modelfile
# Run it
ollama run my-coding-assistant
6.5 Python Client (ollama package)
import ollama
# Synchronous generation
response = ollama.generate(
model='llama3.1',
prompt='Explain how to build a REST API with FastAPI',
options={
'temperature': 0.7,
'num_ctx': 2048,
}
)
print(response['response'])
# Chat with conversation history
messages = []
def chat(user_message: str, model: str = "llama3.1") -> str:
messages.append({'role': 'user', 'content': user_message})
response = ollama.chat(model=model, messages=messages)
assistant_message = response['message']['content']
messages.append({'role': 'assistant', 'content': assistant_message})
return assistant_message
print(chat("I want to learn Python. Where should I start?"))
print(chat("What are the best learning resources?"))
print(chat("How long will it take to learn?"))
# Generate embeddings
embeddings = ollama.embeddings(
model='nomic-embed-text',
prompt='This is a sample text for embedding'
)
print(f"Embedding dimension: {len(embeddings['embedding'])}")
7. Text Generation Inference (TGI)
7.1 HuggingFace TGI Introduction
HuggingFace TGI (Text Generation Inference) is a high-performance toolkit for serving LLMs in production.
Key features:
- High throughput via Continuous Batching
- Multi-GPU support with Tensor Parallelism
- Flash Attention 2 integration
- Distributed tracing with OpenTelemetry
- Safetensors support
7.2 Deploying TGI with Docker
# Single GPU
docker run --gpus all \
-p 8080:80 \
-v /path/to/models:/data \
--shm-size 1g \
ghcr.io/huggingface/text-generation-inference:2.4 \
--model-id meta-llama/Llama-3.1-8B-Instruct \
--num-shard 1 \
--max-input-length 2048 \
--max-total-tokens 4096
# Multi-GPU (Tensor Parallelism)
docker run --gpus all \
-p 8080:80 \
-v /path/to/models:/data \
--shm-size 2g \
ghcr.io/huggingface/text-generation-inference:2.4 \
--model-id meta-llama/Llama-3.1-70B-Instruct \
--num-shard 4 \
--quantize bitsandbytes
# TGI client
from huggingface_hub import InferenceClient
client = InferenceClient(model="http://localhost:8080")
response = client.text_generation(
"Explain the core concepts of async programming in Python",
max_new_tokens=500,
temperature=0.7,
repetition_penalty=1.1,
return_full_text=False,
)
print(response)
# Streaming
for token in client.text_generation(
"Compare FastAPI vs Flask",
stream=True,
max_new_tokens=300,
):
print(token, end="", flush=True)
8. Inference Performance Benchmarking
8.1 Latency and Throughput Measurement
import asyncio
import aiohttp
import time
import numpy as np
from dataclasses import dataclass
from typing import List
@dataclass
class BenchmarkResult:
total_requests: int
successful_requests: int
failed_requests: int
total_time_sec: float
mean_latency_ms: float
p50_latency_ms: float
p95_latency_ms: float
p99_latency_ms: float
requests_per_second: float
async def send_request(session: aiohttp.ClientSession, url: str, payload: dict) -> float:
start = time.perf_counter()
try:
async with session.post(url, json=payload) as response:
await response.json()
return (time.perf_counter() - start) * 1000
except Exception as e:
print(f"Request failed: {e}")
return -1.0
async def run_benchmark(
url: str,
payload: dict,
n_requests: int = 1000,
concurrency: int = 10,
) -> BenchmarkResult:
latencies = []
semaphore = asyncio.Semaphore(concurrency)
async def bounded_request(session):
async with semaphore:
return await send_request(session, url, payload)
start_time = time.time()
async with aiohttp.ClientSession() as session:
tasks = [bounded_request(session) for _ in range(n_requests)]
results = await asyncio.gather(*tasks)
total_time = time.time() - start_time
successful = [r for r in results if r > 0]
failed = len(results) - len(successful)
latencies = np.array(successful)
return BenchmarkResult(
total_requests=n_requests,
successful_requests=len(successful),
failed_requests=failed,
total_time_sec=total_time,
mean_latency_ms=np.mean(latencies),
p50_latency_ms=np.percentile(latencies, 50),
p95_latency_ms=np.percentile(latencies, 95),
p99_latency_ms=np.percentile(latencies, 99),
requests_per_second=len(successful) / total_time,
)
# LLM-specific benchmark (token throughput)
def benchmark_llm_throughput(client, prompts: List[str], max_tokens: int = 200) -> dict:
import time
total_input_tokens = 0
total_output_tokens = 0
latencies = []
start_time = time.time()
for prompt in prompts:
req_start = time.perf_counter()
response = client.chat.completions.create(
model="llama3-8b",
messages=[{"role": "user", "content": prompt}],
max_tokens=max_tokens,
)
req_latency = (time.perf_counter() - req_start) * 1000
latencies.append(req_latency)
total_input_tokens += response.usage.prompt_tokens
total_output_tokens += response.usage.completion_tokens
total_time = time.time() - start_time
return {
"total_requests": len(prompts),
"total_time_sec": total_time,
"input_tokens_per_sec": total_input_tokens / total_time,
"output_tokens_per_sec": total_output_tokens / total_time,
"mean_latency_ms": np.mean(latencies),
"p99_latency_ms": np.percentile(latencies, 99),
"requests_per_second": len(prompts) / total_time,
}
8.2 GPU Memory Profiling
import torch
from torch.profiler import profile, record_function, ProfilerActivity
def profile_model_inference(model, input_data):
torch.cuda.reset_peak_memory_stats()
torch.cuda.synchronize()
with torch.no_grad():
with profile(
activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA],
record_shapes=True,
profile_memory=True,
with_stack=True,
) as prof:
with record_function("model_inference"):
output = model(input_data)
torch.cuda.synchronize()
max_memory = torch.cuda.max_memory_allocated() / 1024**3
current_memory = torch.cuda.memory_allocated() / 1024**3
print(f"Peak GPU Memory: {max_memory:.2f} GB")
print(f"Current GPU Memory: {current_memory:.2f} GB")
print("\nTop 10 CUDA kernels by CUDA time:")
print(prof.key_averages().table(
sort_by="cuda_time_total",
row_limit=10
))
prof.export_chrome_trace("trace.json")
return output
9. Cost Optimization Strategies
9.1 Autoscaling (Kubernetes + KEDA)
# keda-scaled-object.yaml - HTTP request-based autoscaling
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: model-server-scaler
namespace: ml-platform
spec:
scaleTargetRef:
name: model-server
minReplicaCount: 1
maxReplicaCount: 20
cooldownPeriod: 300
triggers:
- type: prometheus
metadata:
serverAddress: http://prometheus:9090
metricName: http_requests_per_second
threshold: '100'
query: |
sum(rate(http_requests_total{service="model-server"}[1m]))
9.2 Spot Instance Utilization
# AWS Batch Spot instance for batch inference
import boto3
import time
def submit_spot_inference_job(
job_queue: str,
job_definition: str,
input_s3_path: str,
output_s3_path: str,
):
client = boto3.client('batch')
response = client.submit_job(
jobName=f"inference-{int(time.time())}",
jobQueue=job_queue,
jobDefinition=job_definition,
containerOverrides={
'environment': [
{'name': 'INPUT_PATH', 'value': input_s3_path},
{'name': 'OUTPUT_PATH', 'value': output_s3_path},
],
'resourceRequirements': [
{'type': 'GPU', 'value': '1'},
{'type': 'MEMORY', 'value': '16384'},
{'type': 'VCPU', 'value': '4'},
]
},
retryStrategy={
'attempts': 3,
'evaluateOnExit': [
{
'onReason': 'Host EC2*terminated',
'action': 'RETRY'
}
]
}
)
return response['jobId']
9.3 Edge Inference Optimization
# TFLite conversion (mobile/edge)
import tensorflow as tf
converter = tf.lite.TFLiteConverter.from_saved_model("my_model")
# INT8 quantization
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [
tf.lite.OpsSet.TFLITE_BUILTINS_INT8
]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8
def representative_dataset():
for data in calibration_data.batch(1).take(100):
yield [data]
converter.representative_dataset = representative_dataset
tflite_model = converter.convert()
with open("model_int8.tflite", "wb") as f:
f.write(tflite_model)
print(f"TFLite model size: {len(tflite_model) / 1024:.1f} KB")
9.4 Cost Optimization Checklist
1. Apply Quantization
- FP16: ~0% accuracy loss, 50% memory savings
- INT8: Minor accuracy loss, 75% memory savings, 2-4x speedup
- INT4 (GPTQ/AWQ): LLM-specialized, 87.5% memory savings
2. Use Batching
- Group small requests to maximize GPU utilization
- Dynamic Batching to balance latency and throughput
3. Model Compression
- Knowledge distillation to train smaller models (BERT to DistilBERT)
- Pruning to remove unnecessary weights
4. Infrastructure Optimization
- Spot/Preemptible instances for up to 70% cost reduction
- Autoscaling to eliminate idle resources
- Scale on load (RPS-based, not CPU-based)
5. Caching
- Cache responses for identical inputs
- Reuse KV cache (Prefix Caching in vLLM)
6. Edge Deployment
- Client-side inference with GGUF/TFLite
- Reduces server load and costs
Conclusion
Optimizing AI model serving is not simply about writing fast code. It requires a holistic understanding of model architecture, hardware characteristics, serving frameworks, and business requirements.
Remember the optimization approach order:
- Start with profiling: Identify bottlenecks first
- Simple things first: FP16 quantization → batching → TensorRT → INT8
- Measure accuracy: Always measure the accuracy impact of optimizations
- Test in production: Development and production performance can differ
- Balance cost and performance: Always consider cost-effectiveness
vLLM, TensorRT, Triton, and Ollama are each optimized for different use cases. Select the right tool for your requirements, continuously benchmark, and iterate toward improvement.