Split View: 엣지 AI와 온디바이스 ML 완전 가이드: TFLite, ONNX, Core ML, llama.cpp
엣지 AI와 온디바이스 ML 완전 가이드: TFLite, ONNX, Core ML, llama.cpp
목차
- 엣지 AI 개요
- TensorFlow Lite (TFLite)
- ONNX와 ONNX Runtime
- Core ML (Apple)
- NVIDIA Jetson 플랫폼
- Raspberry Pi AI
- MediaPipe
- llama.cpp와 GGUF
- Whisper.cpp
- 웹 브라우저 AI
- AI 모델 최적화 파이프라인
1. 엣지 AI 개요
클라우드 AI vs 엣지 AI
AI 추론을 어디서 실행하느냐는 애플리케이션의 성격과 요구사항에 따라 크게 달라집니다. 전통적인 클라우드 AI 방식은 데이터를 원격 서버로 전송하고, 서버에서 추론한 뒤 결과를 다시 받아오는 구조입니다. 반면 엣지 AI는 데이터가 생성되는 기기(스마트폰, IoT 센서, 카메라 등)에서 직접 추론을 수행합니다.
| 구분 | 클라우드 AI | 엣지 AI |
|---|---|---|
| 연산 위치 | 원격 서버 | 로컬 디바이스 |
| 지연 시간 | 수백 ms ~ 수 초 | 수 ms ~ 수십 ms |
| 프라이버시 | 데이터 외부 전송 | 데이터 로컬 처리 |
| 인터넷 의존성 | 필수 | 불필요 |
| 비용 | API 호출 비용 발생 | 초기 모델 비용만 |
| 모델 크기 제한 | 없음 | 메모리/스토리지 제한 |
엣지 AI의 장점
1. 프라이버시 보호
의료 이미지, 생체 인식 데이터, 개인 음성 등 민감한 정보가 기기를 벗어나지 않습니다. GDPR, HIPAA 등 데이터 규제 준수가 자연스럽게 이루어집니다.
2. 초저지연 응답
자율주행, 산업 자동화, 실시간 번역, AR/VR 등 밀리초 단위 응답이 필요한 애플리케이션에서 결정적 이점을 제공합니다. 네트워크 왕복 지연이 없으므로 일관된 응답 속도를 보장합니다.
3. 비용 절감
클라우드 API 호출 비용이 없으며, 대규모 배포 시 서버 비용이 크게 줄어듭니다. 수백만 기기가 로컬에서 추론하면 중앙 서버 비용이 사실상 0에 가깝습니다.
4. 오프라인 동작
인터넷 연결이 불안정하거나 없는 환경(농촌, 지하, 항공기 등)에서도 AI 기능이 작동합니다.
5. 실시간 데이터 처리
IoT 센서 데이터를 클라우드로 올리기 전에 로컬에서 필터링, 이상 탐지, 분류를 수행하면 전송 데이터량과 스토리지 비용을 크게 줄일 수 있습니다.
엣지 하드웨어
모바일 GPU/NPU
현대 스마트폰에는 AI 전용 하드웨어가 탑재되어 있습니다.
- Apple Neural Engine (ANE): iPhone 8 이후 탑재, M-시리즈 맥에도 포함. A17 Pro는 35 TOPS 성능
- Qualcomm Hexagon DSP: Android 플래그십 폰. Snapdragon 8 Gen 3는 Hexagon NPU 탑재
- Google Tensor: Pixel 폰 전용 칩. 온디바이스 음성 인식, 번역 최적화
- MediaTek APU: 미들레인지 Android 폰 광범위 지원
엣지 컴퓨팅 보드
- NVIDIA Jetson: 자율주행, 로봇공학, 스마트 카메라용. Jetson Orin은 275 TOPS
- Raspberry Pi 5: 4GB/8GB 메모리, 일반 컴퓨터 비전 태스크에 적합
- Google Coral: Edge TPU 탑재, TFLite 전용 가속
- Intel Neural Compute Stick: USB 형태의 추론 가속기
엣지 AI 응용 분야
- 스마트폰: 얼굴 잠금 해제, 사진 분류, 실시간 번역, 음성 어시스턴트
- 스마트홈: 음성 명령 처리, 동작 감지, 에너지 최적화
- 산업 IoT: 불량품 검출, 예측 유지보수, 이상 탐지
- 의료기기: 심전도 분석, 혈당 예측, 피부 질환 진단
- 자율주행: 실시간 객체 탐지, 차선 인식, 장애물 회피
- 농업: 드론 기반 작물 모니터링, 병해충 탐지
2. TensorFlow Lite (TFLite)
TensorFlow Lite는 Google이 개발한 모바일 및 엣지 기기용 경량 ML 프레임워크입니다. TensorFlow 모델을 TFLite 형식(.tflite)으로 변환하여 Android, iOS, 임베디드 Linux, 마이크로컨트롤러에서 실행합니다.
TFLite 변환 (SavedModel → TFLite)
import tensorflow as tf
# 방법 1: SavedModel에서 변환
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model_dir')
tflite_model = converter.convert()
with open('model.tflite', 'wb') as f:
f.write(tflite_model)
# 방법 2: Keras 모델에서 직접 변환
model = tf.keras.applications.MobileNetV2(weights='imagenet')
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()
with open('mobilenetv2.tflite', 'wb') as f:
f.write(tflite_model)
# 방법 3: concrete function에서 변환
@tf.function(input_signature=[tf.TensorSpec(shape=[1, 224, 224, 3], dtype=tf.float32)])
def predict(x):
return model(x)
converter = tf.lite.TFLiteConverter.from_concrete_functions(
[predict.get_concrete_function()]
)
tflite_model = converter.convert()
양자화 (Quantization)
양자화는 모델의 가중치와 활성화를 부동소수점에서 저정밀도 정수로 변환하여 모델 크기를 줄이고 추론 속도를 높입니다.
Float16 양자화
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model_dir')
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_types = [tf.float16]
tflite_model = converter.convert()
# 모델 크기 약 50% 감소, 정확도 거의 유지
INT8 전체 정수 양자화
import numpy as np
def representative_dataset():
# 실제 데이터 100~1000개 샘플 사용
for _ in range(100):
data = np.random.rand(1, 224, 224, 3).astype(np.float32)
yield [data]
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model_dir')
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_dataset
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8
tflite_model = converter.convert()
# 모델 크기 약 75% 감소, 추론 속도 2-4배 향상
동적 범위 양자화
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model_dir')
converter.optimizations = [tf.lite.Optimize.DEFAULT]
# representative_dataset 없이도 가능 - 가중치만 양자화
tflite_model = converter.convert()
TFLite Interpreter
import tensorflow as tf
import numpy as np
from PIL import Image
# 인터프리터 초기화
interpreter = tf.lite.Interpreter(model_path='mobilenetv2.tflite')
interpreter.allocate_tensors()
# 입출력 텐서 정보 확인
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
print(f"입력 shape: {input_details[0]['shape']}")
print(f"입력 dtype: {input_details[0]['dtype']}")
print(f"출력 shape: {output_details[0]['shape']}")
# 이미지 전처리
img = Image.open('test_image.jpg').resize((224, 224))
input_data = np.expand_dims(np.array(img, dtype=np.float32) / 255.0, axis=0)
# 추론 실행
interpreter.set_tensor(input_details[0]['index'], input_data)
interpreter.invoke()
# 결과 추출
output_data = interpreter.get_tensor(output_details[0]['index'])
predicted_class = np.argmax(output_data[0])
confidence = output_data[0][predicted_class]
print(f"예측 클래스: {predicted_class}, 신뢰도: {confidence:.4f}")
멀티스레드 및 GPU 델리게이트
# 멀티스레드 설정
interpreter = tf.lite.Interpreter(
model_path='model.tflite',
num_threads=4
)
# GPU 델리게이트 (Android/iOS)
try:
from tensorflow.lite.python.interpreter import load_delegate
gpu_delegate = load_delegate('libdelegate.so')
interpreter = tf.lite.Interpreter(
model_path='model.tflite',
experimental_delegates=[gpu_delegate]
)
print("GPU 델리게이트 활성화")
except Exception as e:
print(f"GPU 델리게이트 실패, CPU 사용: {e}")
interpreter = tf.lite.Interpreter(model_path='model.tflite')
interpreter.allocate_tensors()
Android 배포
build.gradle 설정:
dependencies {
implementation 'org.tensorflow:tensorflow-lite:2.13.0'
implementation 'org.tensorflow:tensorflow-lite-gpu:2.13.0'
implementation 'org.tensorflow:tensorflow-lite-support:0.4.4'
}
Kotlin 코드:
import org.tensorflow.lite.Interpreter
import org.tensorflow.lite.support.image.TensorImage
import org.tensorflow.lite.support.tensorbuffer.TensorBuffer
import java.nio.ByteBuffer
import java.nio.ByteOrder
class TFLiteClassifier(private val context: Context) {
private lateinit var interpreter: Interpreter
private val inputSize = 224
private val numClasses = 1000
fun initialize() {
val model = loadModelFile("mobilenetv2.tflite")
val options = Interpreter.Options().apply {
numThreads = 4
useNNAPI = true // Android Neural Networks API
}
interpreter = Interpreter(model, options)
}
private fun loadModelFile(filename: String): ByteBuffer {
val assetFileDescriptor = context.assets.openFd(filename)
val fileInputStream = FileInputStream(assetFileDescriptor.fileDescriptor)
val fileChannel = fileInputStream.channel
val startOffset = assetFileDescriptor.startOffset
val declaredLength = assetFileDescriptor.declaredLength
return fileChannel.map(FileChannel.MapMode.READ_ONLY, startOffset, declaredLength)
}
fun classify(bitmap: Bitmap): FloatArray {
val resized = Bitmap.createScaledBitmap(bitmap, inputSize, inputSize, true)
val inputBuffer = ByteBuffer.allocateDirect(1 * inputSize * inputSize * 3 * 4)
inputBuffer.order(ByteOrder.nativeOrder())
for (y in 0 until inputSize) {
for (x in 0 until inputSize) {
val pixel = resized.getPixel(x, y)
inputBuffer.putFloat(((pixel shr 16 and 0xFF) / 255.0f))
inputBuffer.putFloat(((pixel shr 8 and 0xFF) / 255.0f))
inputBuffer.putFloat(((pixel and 0xFF) / 255.0f))
}
}
val outputBuffer = Array(1) { FloatArray(numClasses) }
interpreter.run(inputBuffer, outputBuffer)
return outputBuffer[0]
}
}
iOS 배포
Swift 코드:
import TensorFlowLite
import UIKit
class TFLiteImageClassifier {
private var interpreter: Interpreter
private let inputWidth = 224
private let inputHeight = 224
init(modelName: String) throws {
guard let modelPath = Bundle.main.path(forResource: modelName, ofType: "tflite") else {
throw NSError(domain: "ModelNotFound", code: 0, userInfo: nil)
}
var options = Interpreter.Options()
options.threadCount = 4
interpreter = try Interpreter(modelPath: modelPath, options: options)
try interpreter.allocateTensors()
}
func classify(image: UIImage) throws -> [Float] {
guard let cgImage = image.cgImage else { return [] }
let inputTensor = try interpreter.input(at: 0)
let batchSize = 1
let inputChannels = 3
let inputData = preprocessImage(cgImage: cgImage)
try interpreter.copy(inputData, toInputAt: 0)
try interpreter.invoke()
let outputTensor = try interpreter.output(at: 0)
let results: [Float] = [Float](unsafeData: outputTensor.data) ?? []
return results
}
private func preprocessImage(cgImage: CGImage) -> Data {
var data = Data(count: inputWidth * inputHeight * 3 * 4)
// 이미지 리사이즈 및 정규화 처리
return data
}
}
3. ONNX와 ONNX Runtime
ONNX(Open Neural Network Exchange)는 ML 모델을 다양한 프레임워크 간에 이식 가능하게 하는 개방형 포맷입니다. PyTorch, TensorFlow, scikit-learn 등에서 학습한 모델을 하나의 표준 형식으로 내보내고, ONNX Runtime으로 어디서든 실행할 수 있습니다.
PyTorch에서 ONNX로 변환
import torch
import torch.nn as nn
import torchvision.models as models
# 모델 로드
model = models.resnet50(pretrained=True)
model.eval()
# 더미 입력 생성
dummy_input = torch.randn(1, 3, 224, 224)
# ONNX 내보내기
torch.onnx.export(
model,
dummy_input,
"resnet50.onnx",
export_params=True,
opset_version=17,
do_constant_folding=True,
input_names=['input'],
output_names=['output'],
dynamic_axes={
'input': {0: 'batch_size'},
'output': {0: 'batch_size'}
}
)
print("ONNX 변환 완료!")
# ONNX 모델 검증
import onnx
onnx_model = onnx.load("resnet50.onnx")
onnx.checker.check_model(onnx_model)
print(f"ONNX IR 버전: {onnx_model.ir_version}")
print(f"Opset 버전: {onnx_model.opset_import[0].version}")
ONNX Runtime 추론
import onnxruntime as ort
import numpy as np
from PIL import Image
import torchvision.transforms as transforms
# 세션 생성 및 공급자 설정
providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
session = ort.InferenceSession("resnet50.onnx", providers=providers)
# 실제 사용된 공급자 확인
print(f"사용 공급자: {session.get_providers()}")
# 입출력 정보 확인
input_name = session.get_inputs()[0].name
output_name = session.get_outputs()[0].name
input_shape = session.get_inputs()[0].shape
print(f"입력 이름: {input_name}, shape: {input_shape}")
# 이미지 전처리
transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
img = Image.open('test.jpg')
input_tensor = transform(img).unsqueeze(0).numpy()
# 추론
outputs = session.run([output_name], {input_name: input_tensor})
logits = outputs[0]
predicted_class = np.argmax(logits[0])
print(f"예측 클래스: {predicted_class}")
ONNX Runtime 최적화
from onnxruntime.transformers import optimizer
from onnxruntime.quantization import quantize_dynamic, QuantType
# 그래프 최적화 (트랜스포머 모델)
optimized_model = optimizer.optimize_model(
'bert_base.onnx',
model_type='bert',
num_heads=12,
hidden_size=768,
optimization_options=None
)
optimized_model.save_model_to_file('bert_optimized.onnx')
# 동적 양자화 (INT8)
quantize_dynamic(
model_input='bert_optimized.onnx',
model_output='bert_quantized_int8.onnx',
weight_type=QuantType.QInt8,
per_channel=True
)
print("양자화 완료!")
# 세션 옵션 튜닝
so = ort.SessionOptions()
so.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
so.intra_op_num_threads = 4
so.inter_op_num_threads = 2
so.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL
session = ort.InferenceSession('model.onnx', sess_options=so)
ONNX Runtime Web (브라우저)
// npm install onnxruntime-web
import * as ort from 'onnxruntime-web'
async function runInference() {
// WebAssembly 백엔드 설정
ort.env.wasm.wasmPaths = '/static/'
ort.env.wasm.numThreads = 4
// 세션 생성
const session = await ort.InferenceSession.create('/models/mobilenet.onnx', {
executionProviders: ['webgpu', 'wasm'],
graphOptimizationLevel: 'all',
})
// 입력 텐서 생성 (1, 3, 224, 224)
const inputData = new Float32Array(1 * 3 * 224 * 224).fill(0.5)
const inputTensor = new ort.Tensor('float32', inputData, [1, 3, 224, 224])
// 추론 실행
const feeds = { input: inputTensor }
const results = await session.run(feeds)
// 결과 처리
const outputData = results.output.data
const maxIndex = Array.from(outputData).indexOf(Math.max(...outputData))
console.log('예측 클래스:', maxIndex)
}
runInference()
4. Core ML (Apple)
Core ML은 Apple 플랫폼(iOS, macOS, watchOS, tvOS)에서 ML 모델을 실행하기 위한 프레임워크입니다. Neural Engine을 활용해 전력 효율적이고 빠른 추론을 제공합니다.
coremltools를 사용한 변환
import coremltools as ct
import torch
import torchvision.models as models
# PyTorch 모델 변환
torch_model = models.mobilenet_v2(pretrained=True)
torch_model.eval()
example_input = torch.rand(1, 3, 224, 224)
traced_model = torch.jit.trace(torch_model, example_input)
# Core ML 변환
mlmodel = ct.convert(
traced_model,
inputs=[ct.TensorType(name='input', shape=(1, 3, 224, 224))],
compute_units=ct.ComputeUnit.ALL, # CPU + GPU + Neural Engine 모두 활용
minimum_deployment_target=ct.target.iOS16
)
# 메타데이터 추가
mlmodel.short_description = "MobileNetV2 이미지 분류기"
mlmodel.author = "YJ Blog"
mlmodel.version = "1.0"
mlmodel.save("MobileNetV2.mlpackage")
print("Core ML 변환 완료!")
Float16 및 INT8 양자화
import coremltools as ct
from coremltools.optimize.coreml import (
OpLinearQuantizerConfig,
OptimizationConfig,
linearly_quantize_weights
)
# 원본 모델 로드
mlmodel = ct.models.MLModel("MobileNetV2.mlpackage")
# 선형 가중치 양자화 (8비트)
op_config = OpLinearQuantizerConfig(mode="linear_symmetric", dtype="int8")
config = OptimizationConfig(global_config=op_config)
compressed_model = linearly_quantize_weights(mlmodel, config)
compressed_model.save("MobileNetV2_int8.mlpackage")
# 팔렛트 양자화 (4비트)
from coremltools.optimize.coreml import palettize_weights, OpPalettizerConfig
palette_config = OptimizationConfig(
global_config=OpPalettizerConfig(mode="kmeans", nbits=4)
)
palette_model = palettize_weights(mlmodel, palette_config)
palette_model.save("MobileNetV2_4bit.mlpackage")
Swift에서 Core ML 사용
import CoreML
import Vision
import UIKit
class CoreMLClassifier {
private var model: VNCoreMLModel?
func loadModel() {
guard let modelURL = Bundle.main.url(forResource: "MobileNetV2", withExtension: "mlpackage") else {
print("모델 파일을 찾을 수 없습니다")
return
}
let config = MLModelConfiguration()
config.computeUnits = .all // CPU + GPU + Neural Engine
do {
let coreMLModel = try MLModel(contentsOf: modelURL, configuration: config)
model = try VNCoreMLModel(for: coreMLModel)
print("모델 로드 성공")
} catch {
print("모델 로드 실패: \(error)")
}
}
func classify(image: UIImage, completion: @escaping ([VNClassificationObservation]?) -> Void) {
guard let model = model,
let cgImage = image.cgImage else {
completion(nil)
return
}
let request = VNCoreMLRequest(model: model) { request, error in
guard let results = request.results as? [VNClassificationObservation] else {
completion(nil)
return
}
let topResults = results.prefix(5)
completion(Array(topResults))
}
request.imageCropAndScaleOption = .centerCrop
let handler = VNImageRequestHandler(cgImage: cgImage, options: [:])
DispatchQueue.global(qos: .userInteractive).async {
do {
try handler.perform([request])
} catch {
print("추론 실패: \(error)")
completion(nil)
}
}
}
}
// 사용 예시
let classifier = CoreMLClassifier()
classifier.loadModel()
classifier.classify(image: UIImage(named: "test")!) { results in
results?.forEach { result in
print("\(result.identifier): \(String(format: "%.2f", result.confidence * 100))%")
}
}
Create ML로 커스텀 모델 학습
import CreateML
import Foundation
// 이미지 분류 모델 학습
let trainingData = MLImageClassifier.DataSource.labeledDirectories(
at: URL(fileURLWithPath: "/path/to/training_data")
)
let parameters = MLImageClassifier.ModelParameters(
featureExtractor: .scenePrint(revision: 2),
maxIterations: 25,
augmentation: [.flip, .crop, .rotation]
)
let classifier = try MLImageClassifier(
trainingData: trainingData,
parameters: parameters
)
// 평가
let evaluationData = MLImageClassifier.DataSource.labeledDirectories(
at: URL(fileURLWithPath: "/path/to/test_data")
)
let metrics = classifier.evaluation(on: evaluationData)
print("정확도: \(metrics.classificationError)")
// 저장
try classifier.write(to: URL(fileURLWithPath: "MyClassifier.mlmodel"))
5. NVIDIA Jetson 플랫폼
NVIDIA Jetson은 AI 엣지 컴퓨팅을 위한 임베디드 플랫폼으로, 로봇공학, 자율주행, 스마트 카메라 등에 광범위하게 사용됩니다.
Jetson 모델 비교
| 모델 | AI 성능 | RAM | 전력 | 주요 용도 |
|---|---|---|---|---|
| Jetson Nano | 472 GFLOPS | 4GB | 10W | 교육, 프로토타입 |
| Jetson Xavier NX | 21 TOPS | 8/16GB | 15W | 산업 IoT |
| Jetson AGX Orin | 275 TOPS | 64GB | 60W | 자율주행, 로봇 |
| Jetson Orin NX | 100 TOPS | 16GB | 25W | 엣지 AI |
TensorRT 변환
import tensorrt as trt
import numpy as np
import pycuda.driver as cuda
import pycuda.autoinit
# TensorRT 로거 생성
TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
def build_engine_from_onnx(onnx_path, engine_path, fp16=True, int8=False):
with trt.Builder(TRT_LOGGER) as builder, \
builder.create_network(1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH)) as network, \
trt.OnnxParser(network, TRT_LOGGER) as parser:
config = builder.create_builder_config()
config.max_workspace_size = 1 << 30 # 1GB
if fp16:
config.set_flag(trt.BuilderFlag.FP16)
if int8:
config.set_flag(trt.BuilderFlag.INT8)
# ONNX 파싱
with open(onnx_path, 'rb') as f:
if not parser.parse(f.read()):
for error in range(parser.num_errors):
print(parser.get_error(error))
return None
# 엔진 빌드
print("TensorRT 엔진 빌드 중... (수 분 소요)")
serialized_engine = builder.build_serialized_network(network, config)
# 저장
with open(engine_path, 'wb') as f:
f.write(serialized_engine)
print(f"엔진 저장 완료: {engine_path}")
build_engine_from_onnx('resnet50.onnx', 'resnet50_fp16.trt', fp16=True)
TensorRT 추론
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit
import numpy as np
class TRTInference:
def __init__(self, engine_path):
TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
with open(engine_path, 'rb') as f:
runtime = trt.Runtime(TRT_LOGGER)
self.engine = runtime.deserialize_cuda_engine(f.read())
self.context = self.engine.create_execution_context()
self.inputs, self.outputs, self.bindings, self.stream = self._allocate_buffers()
def _allocate_buffers(self):
inputs, outputs, bindings = [], [], []
stream = cuda.Stream()
for binding in self.engine:
size = trt.volume(self.engine.get_binding_shape(binding))
dtype = trt.nptype(self.engine.get_binding_dtype(binding))
host_mem = cuda.pagelocked_empty(size, dtype)
device_mem = cuda.mem_alloc(host_mem.nbytes)
bindings.append(int(device_mem))
if self.engine.binding_is_input(binding):
inputs.append({'host': host_mem, 'device': device_mem})
else:
outputs.append({'host': host_mem, 'device': device_mem})
return inputs, outputs, bindings, stream
def infer(self, input_data):
np.copyto(self.inputs[0]['host'], input_data.ravel())
cuda.memcpy_htod_async(self.inputs[0]['device'], self.inputs[0]['host'], self.stream)
self.context.execute_async_v2(bindings=self.bindings, stream_handle=self.stream.handle)
cuda.memcpy_dtoh_async(self.outputs[0]['host'], self.outputs[0]['device'], self.stream)
self.stream.synchronize()
return self.outputs[0]['host']
# 사용
trt_model = TRTInference('resnet50_fp16.trt')
input_array = np.random.rand(1, 3, 224, 224).astype(np.float32)
result = trt_model.infer(input_array)
print(f"예측 클래스: {np.argmax(result)}")
DeepStream SDK
# DeepStream Python 바인딩을 사용한 비디오 파이프라인
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst, GLib
Gst.init(None)
def create_pipeline():
pipeline = Gst.Pipeline()
# 소스: USB 카메라
source = Gst.ElementFactory.make("v4l2src", "usb-cam-source")
source.set_property("device", "/dev/video0")
# 캡스필터
caps = Gst.ElementFactory.make("capsfilter", "capsfilter")
caps.set_property("caps", Gst.Caps.from_string("video/x-raw,width=1280,height=720,framerate=30/1"))
# NVARGUS (카메라 ISP)
nvconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
# nvinfer (TensorRT 추론)
nvinfer = Gst.ElementFactory.make("nvinfer", "primary-inference")
nvinfer.set_property("config-file-path", "config_infer_primary.txt")
# 트래커
tracker = Gst.ElementFactory.make("nvtracker", "tracker")
tracker.set_property("ll-lib-file", "/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so")
# OSD (On-Screen Display)
osd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
# 출력
sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
for element in [source, caps, nvconv, nvinfer, tracker, osd, sink]:
pipeline.add(element)
source.link(caps)
caps.link(nvconv)
nvconv.link(nvinfer)
nvinfer.link(tracker)
tracker.link(osd)
osd.link(sink)
return pipeline
pipeline = create_pipeline()
pipeline.set_state(Gst.State.PLAYING)
6. Raspberry Pi AI
라즈베리파이는 교육용에서 시작해 이제 실제 엣지 AI 배포 플랫폼으로 널리 사용됩니다.
Raspberry Pi 5 + Hailo-8
Hailo-8은 라즈베리파이 HAT 형태의 AI 가속기로 26 TOPS 성능을 제공합니다.
# Hailo SDK 설치
pip install hailort
# 모델 변환 (ONNX -> HEF)
# Hailo Model Zoo에서 사전 변환된 모델 제공
wget https://hailo-model-zoo.s3.eu-west-2.amazonaws.com/ModelZoo/Compiled/v2.11.0/hailo8/resnet_v1_50.hef
import hailo_platform as hp
import numpy as np
# HEF 로드 및 추론
with hp.VDevice() as vdevice:
hef = hp.Hef("resnet_v1_50.hef")
network_groups = vdevice.configure(hef)
network_group = network_groups[0]
input_vstreams_params = hp.InputVStreamParams.make_from_network_group(
network_group, quantized=False, format_type=hp.FormatType.FLOAT32
)
output_vstreams_params = hp.OutputVStreamParams.make_from_network_group(
network_group, quantized=False, format_type=hp.FormatType.FLOAT32
)
with hp.InferVStreams(network_group, input_vstreams_params, output_vstreams_params) as infer_pipeline:
input_data = {"input_layer1": np.random.rand(1, 224, 224, 3).astype(np.float32)}
with network_group.activate():
infer_results = infer_pipeline.infer(input_data)
print(f"결과: {np.argmax(infer_results['resnet_v1_50/softmax1'])}")
OpenCV + 라즈베리파이 카메라
import cv2
import numpy as np
import tflite_runtime.interpreter as tflite
# TFLite Interpreter (라즈베리파이에서 경량 버전 사용)
interpreter = tflite.Interpreter(
model_path='ssd_mobilenet_v2.tflite',
num_threads=4
)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
# 카메라 초기화 (라즈베리파이 카메라 v2)
cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
while True:
ret, frame = cap.read()
if not ret:
break
# 전처리
input_size = (input_details[0]['shape'][2], input_details[0]['shape'][1])
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
resized = cv2.resize(rgb_frame, input_size)
input_data = np.expand_dims(resized, axis=0).astype(np.uint8)
# 추론
interpreter.set_tensor(input_details[0]['index'], input_data)
interpreter.invoke()
# 결과 추출 (SSD MobileNet)
boxes = interpreter.get_tensor(output_details[0]['index'])[0]
classes = interpreter.get_tensor(output_details[1]['index'])[0]
scores = interpreter.get_tensor(output_details[2]['index'])[0]
# 결과 시각화
h, w = frame.shape[:2]
for i in range(len(scores)):
if scores[i] > 0.5:
ymin, xmin, ymax, xmax = boxes[i]
cv2.rectangle(frame,
(int(xmin * w), int(ymin * h)),
(int(xmax * w), int(ymax * h)),
(0, 255, 0), 2)
label = f"클래스 {int(classes[i])}: {scores[i]:.2f}"
cv2.putText(frame, label, (int(xmin * w), int(ymin * h) - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
cv2.imshow('라즈베리파이 AI', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
7. MediaPipe
Google MediaPipe는 얼굴 감지, 손 추적, 자세 추정, 객체 탐지 등 다양한 비전 ML 솔루션을 제공하는 프레임워크입니다.
Python으로 손 추적
import mediapipe as mp
import cv2
import numpy as np
mp_hands = mp.solutions.hands
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
def run_hand_tracking():
cap = cv2.VideoCapture(0)
with mp_hands.Hands(
static_image_mode=False,
max_num_hands=2,
min_detection_confidence=0.7,
min_tracking_confidence=0.5
) as hands:
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# BGR -> RGB 변환
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
rgb_frame.flags.writeable = False
results = hands.process(rgb_frame)
rgb_frame.flags.writeable = True
frame = cv2.cvtColor(rgb_frame, cv2.COLOR_RGB2BGR)
if results.multi_hand_landmarks:
for hand_landmarks in results.multi_hand_landmarks:
mp_drawing.draw_landmarks(
frame,
hand_landmarks,
mp_hands.HAND_CONNECTIONS,
mp_drawing_styles.get_default_hand_landmarks_style(),
mp_drawing_styles.get_default_hand_connections_style()
)
# 랜드마크 좌표 추출 (21개 키포인트)
for idx, landmark in enumerate(hand_landmarks.landmark):
h, w, _ = frame.shape
cx, cy = int(landmark.x * w), int(landmark.y * h)
if idx == 8: # 검지 끝
cv2.circle(frame, (cx, cy), 10, (255, 0, 0), -1)
cv2.imshow('손 추적', frame)
if cv2.waitKey(5) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
run_hand_tracking()
자세 추정 (Pose Estimation)
import mediapipe as mp
import cv2
mp_pose = mp.solutions.pose
def calculate_angle(a, b, c):
"""세 점으로 각도 계산"""
import numpy as np
a = np.array(a)
b = np.array(b)
c = np.array(c)
radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - \
np.arctan2(a[1] - b[1], a[0] - b[0])
angle = np.abs(radians * 180.0 / np.pi)
if angle > 180.0:
angle = 360 - angle
return angle
cap = cv2.VideoCapture(0)
with mp_pose.Pose(
min_detection_confidence=0.5,
min_tracking_confidence=0.5,
model_complexity=1 # 0: Lite, 1: Full, 2: Heavy
) as pose:
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = pose.process(rgb_frame)
frame = cv2.cvtColor(rgb_frame, cv2.COLOR_RGB2BGR)
if results.pose_landmarks:
landmarks = results.pose_landmarks.landmark
h, w, _ = frame.shape
# 팔꿈치 각도 계산
shoulder = [landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x * w,
landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y * h]
elbow = [landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].x * w,
landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].y * h]
wrist = [landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].x * w,
landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].y * h]
angle = calculate_angle(shoulder, elbow, wrist)
cv2.putText(frame, f"팔꿈치 각도: {angle:.1f}도",
(int(elbow[0]), int(elbow[1])),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
mp.solutions.drawing_utils.draw_landmarks(
frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS
)
cv2.imshow('자세 추정', frame)
if cv2.waitKey(5) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
MediaPipe Tasks API (최신 버전)
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
# 객체 탐지 태스크
base_options = python.BaseOptions(model_asset_path='efficientdet_lite0.tflite')
options = vision.ObjectDetectorOptions(
base_options=base_options,
running_mode=vision.RunningMode.IMAGE,
max_results=5,
score_threshold=0.5
)
with vision.ObjectDetector.create_from_options(options) as detector:
image = mp.Image.create_from_file('test_image.jpg')
detection_result = detector.detect(image)
for detection in detection_result.detections:
category = detection.categories[0]
print(f"객체: {category.category_name}, 신뢰도: {category.score:.2f}")
bbox = detection.bounding_box
print(f" 위치: ({bbox.origin_x}, {bbox.origin_y}), 크기: {bbox.width}x{bbox.height}")
8. llama.cpp와 GGUF
llama.cpp는 Meta의 LLaMA 모델을 C++로 구현한 프레임워크로, GPU 없이 CPU만으로도 대형 언어 모델을 실행할 수 있게 해줍니다.
설치 및 기본 사용법
# 소스 빌드
git clone https://github.com/ggerganov/llama.cpp
cd llama.cpp
# CPU only
make -j4
# Apple Silicon (Metal GPU 가속)
make LLAMA_METAL=1 -j4
# NVIDIA CUDA
make LLAMA_CUDA=1 -j4
# GGUF 모델 다운로드 (예: Llama 3.2 3B)
huggingface-cli download \
bartowski/Llama-3.2-3B-Instruct-GGUF \
Llama-3.2-3B-Instruct-Q4_K_M.gguf \
--local-dir ./models
# 인터랙티브 채팅
./llama-cli \
-m models/Llama-3.2-3B-Instruct-Q4_K_M.gguf \
-n 512 \
-p "당신은 친절한 AI 어시스턴트입니다." \
--repeat-penalty 1.1 \
-t 8 \
--color
양자화 수준 (GGUF)
GGUF 파일의 양자화 수준은 모델 크기와 품질 간의 트레이드오프를 결정합니다.
| 양자화 | 비트/가중치 | 크기(7B) | 품질 | 권장 상황 |
|---|---|---|---|---|
| Q2_K | ~2.6비트 | ~2.7GB | 낮음 | 매우 제한적 메모리 |
| Q4_0 | 4.5비트 | ~3.8GB | 보통 | 기본 사용 |
| Q4_K_M | 4.8비트 | ~4.1GB | 양호 | 추천: 균형 |
| Q5_K_M | 5.7비트 | ~4.8GB | 좋음 | 품질 중시 |
| Q6_K | 6.6비트 | ~5.5GB | 매우 좋음 | 고품질 필요 |
| Q8_0 | 8.5비트 | ~7.2GB | 최고 | 메모리 여유 있을 때 |
llama-cpp-python
from llama_cpp import Llama
# 모델 로드
llm = Llama(
model_path="./models/Llama-3.2-3B-Instruct-Q4_K_M.gguf",
n_ctx=4096, # 컨텍스트 윈도우
n_threads=8, # CPU 스레드 수
n_gpu_layers=35, # GPU에 올릴 레이어 수 (-1이면 전체)
verbose=False
)
# 기본 텍스트 생성
output = llm(
"한국의 수도는 어디인가요?",
max_tokens=128,
temperature=0.7,
top_p=0.95,
top_k=40,
repeat_penalty=1.1
)
print(output['choices'][0]['text'])
# 채팅 형식
messages = [
{"role": "system", "content": "당신은 친절한 AI 어시스턴트입니다. 한국어로 답변하세요."},
{"role": "user", "content": "파이썬으로 피보나치 수열을 구하는 코드를 작성해주세요."}
]
response = llm.create_chat_completion(
messages=messages,
max_tokens=512,
temperature=0.7
)
print(response['choices'][0]['message']['content'])
# 스트리밍 출력
stream = llm.create_chat_completion(
messages=messages,
max_tokens=512,
stream=True
)
for chunk in stream:
delta = chunk['choices'][0]['delta']
if 'content' in delta:
print(delta['content'], end='', flush=True)
print()
OpenAI 호환 서버
# llama.cpp 서버 실행
./llama-server \
-m models/Llama-3.2-3B-Instruct-Q4_K_M.gguf \
--port 8080 \
--host 0.0.0.0 \
-n 2048 \
-t 8 \
--n-gpu-layers 35
# OpenAI SDK로 로컬 서버 사용
from openai import OpenAI
client = OpenAI(
base_url="http://localhost:8080/v1",
api_key="none"
)
response = client.chat.completions.create(
model="local-model",
messages=[
{"role": "user", "content": "머신러닝과 딥러닝의 차이를 설명해주세요."}
],
max_tokens=512,
temperature=0.7
)
print(response.choices[0].message.content)
PyTorch 모델 → GGUF 변환
# HuggingFace 모델 → GGUF
cd llama.cpp
# 파이썬 의존성 설치
pip install -r requirements.txt
# 변환
python convert_hf_to_gguf.py \
/path/to/hf_model \
--outfile models/my_model.gguf \
--outtype f16
# 양자화
./quantize models/my_model.gguf models/my_model_q4km.gguf Q4_K_M
9. Whisper.cpp
Whisper.cpp는 OpenAI Whisper 음성 인식 모델을 C++로 구현한 것으로, 라즈베리파이부터 스마트폰까지 다양한 환경에서 오프라인 음성 인식을 가능하게 합니다.
설치 및 기본 사용법
# 빌드
git clone https://github.com/ggerganov/whisper.cpp
cd whisper.cpp
make -j4
# Apple Silicon
make WHISPER_METAL=1 -j4
# 모델 다운로드
bash ./models/download-ggml-model.sh base.en # 영어 전용, 142MB
bash ./models/download-ggml-model.sh medium # 다국어, 1.5GB
bash ./models/download-ggml-model.sh large-v3 # 최고 품질, 3.1GB
# 음성 파일 전사
./main -m models/ggml-medium.bin \
-f audio.wav \
-l ko \
--output-txt \
-of output
# 실시간 마이크 입력
./stream -m models/ggml-medium.bin \
-t 8 \
--step 500 \
--length 5000 \
-l ko
whisper-cpp-python
import whisper_cpp
import numpy as np
import soundfile as sf
# 모델 로드
model = whisper_cpp.Whisper.from_pretrained("medium")
# WAV 파일 전사
audio, sr = sf.read("audio.wav", dtype="float32")
if audio.ndim > 1:
audio = audio.mean(axis=1) # 스테레오 -> 모노
# 샘플레이트 변환 (16kHz 필요)
if sr != 16000:
import librosa
audio = librosa.resample(audio, orig_sr=sr, target_sr=16000)
result = model.transcribe(audio, language="ko")
print(f"전사 결과:\n{result['text']}")
# 타임스탬프 포함
for segment in result['segments']:
start = segment['start']
end = segment['end']
text = segment['text']
print(f"[{start:.2f}s -> {end:.2f}s] {text}")
양자화 Whisper 모델
# Whisper GGML 모델 양자화
./quantize models/ggml-medium.bin models/ggml-medium-q5_0.bin q5_0
# 크기 비교
ls -lh models/ggml-medium*.bin
# ggml-medium.bin: 1.5GB
# ggml-medium-q5_0.bin: 약 900MB
iOS/Android에서 Whisper.cpp
iOS는 Metal GPU 가속을 지원합니다.
// iOS에서 whisper.cpp 사용 (WhisperKit 라이브러리)
import WhisperKit
class SpeechRecognizer {
var whisperKit: WhisperKit?
func initialize() async {
do {
whisperKit = try await WhisperKit(
model: "openai_whisper-medium",
computeOptions: ModelComputeOptions(melCompute: .cpuAndGPU)
)
print("Whisper 모델 로드 완료")
} catch {
print("초기화 실패: \(error)")
}
}
func transcribe(audioURL: URL) async -> String? {
guard let whisperKit = whisperKit else { return nil }
do {
let result = try await whisperKit.transcribe(
audioPath: audioURL.path,
decodeOptions: DecodingOptions(language: "ko")
)
return result.map(\.text).joined(separator: " ")
} catch {
print("전사 실패: \(error)")
return nil
}
}
}
10. 웹 브라우저 AI
WebAssembly와 WebGPU의 발전으로 브라우저에서도 강력한 AI 추론이 가능해졌습니다.
TensorFlow.js
<!DOCTYPE html>
<html>
<head>
<title>브라우저 이미지 분류</title>
<script src="https://cdn.jsdelivr.net/npm/@tensorflow/tfjs@latest"></script>
<script src="https://cdn.jsdelivr.net/npm/@tensorflow-models/mobilenet@2.1.1"></script>
</head>
<body>
<input type="file" id="imageInput" accept="image/*" />
<img id="preview" style="max-width: 400px;" />
<div id="result"></div>
<script>
let model
async function loadModel() {
model = await mobilenet.load({ version: 2, alpha: 1.0 })
console.log('모델 로드 완료!')
document.getElementById('result').textContent = '모델 준비됨. 이미지를 선택하세요.'
}
document.getElementById('imageInput').addEventListener('change', async (e) => {
const file = e.target.files[0]
if (!file) return
const img = document.getElementById('preview')
img.src = URL.createObjectURL(file)
img.onload = async () => {
const predictions = await model.classify(img, 5)
const resultDiv = document.getElementById('result')
resultDiv.innerHTML = '<h3>예측 결과:</h3>'
predictions.forEach((pred) => {
resultDiv.innerHTML += ``
})
}
})
loadModel()
</script>
</body>
</html>
Transformers.js (HuggingFace)
import { pipeline, env } from '@xenova/transformers'
// WASM 경로 설정
env.backends.onnx.wasm.wasmPaths = 'https://cdn.jsdelivr.net/npm/onnxruntime-web/dist/'
// 텍스트 분류 파이프라인
async function runTextClassification() {
const classifier = await pipeline(
'sentiment-analysis',
'Xenova/distilbert-base-uncased-finetuned-sst-2-english'
)
const results = await classifier(['I love machine learning!', 'This is terrible.'])
results.forEach((result, i) => {
console.log(`텍스트 ${i + 1}: ${result.label} (${(result.score * 100).toFixed(2)}%)`)
})
}
// 이미지 분류
async function runImageClassification() {
const classifier = await pipeline('image-classification', 'Xenova/vit-base-patch16-224')
const result = await classifier('https://example.com/image.jpg')
console.log('이미지 분류 결과:', result)
}
// 텍스트 생성 (소규모 LLM)
async function runTextGeneration() {
const generator = await pipeline('text-generation', 'Xenova/gpt2')
const output = await generator('인공지능의 미래는', {
max_new_tokens: 100,
temperature: 0.7,
})
console.log('생성된 텍스트:', output[0].generated_text)
}
runTextClassification()
WebGPU 가속 추론
import * as ort from 'onnxruntime-web'
async function runWithWebGPU() {
// WebGPU 지원 확인
if (!navigator.gpu) {
console.log('WebGPU를 지원하지 않는 브라우저입니다.')
return
}
const adapter = await navigator.gpu.requestAdapter()
const device = await adapter.requestDevice()
console.log('WebGPU 어댑터:', adapter.info)
// ONNX Runtime에서 WebGPU 사용
ort.env.wasm.wasmPaths = '/'
const session = await ort.InferenceSession.create('/models/resnet50.onnx', {
executionProviders: ['webgpu'],
graphOptimizationLevel: 'all',
})
// 배치 추론
const batchSize = 4
const inputData = new Float32Array(batchSize * 3 * 224 * 224)
const inputTensor = new ort.Tensor('float32', inputData, [batchSize, 3, 224, 224])
const startTime = performance.now()
const output = await session.run({ input: inputTensor })
const elapsed = performance.now() - startTime
console.log(`WebGPU 추론 시간: ${elapsed.toFixed(2)}ms`)
console.log(`배치 처리량: ${((batchSize / elapsed) * 1000).toFixed(1)} images/sec`)
}
runWithWebGPU()
11. AI 모델 최적화 파이프라인
학습 → 최적화 → 배포 전체 흐름
학습 (PyTorch/TF)
↓
프루닝 (불필요한 가중치 제거)
↓
지식 증류 (Teacher-Student)
↓
양자화 인식 학습 (QAT)
↓
형식 변환 (ONNX/TFLite/GGUF)
↓
런타임 최적화 (TensorRT/OpenVINO)
↓
배포 (모바일/엣지/웹)
프루닝 (Pruning)
import torch
import torch.nn.utils.prune as prune
import torchvision.models as models
model = models.resnet50(pretrained=True)
# 구조적 프루닝: Conv2d 레이어 50% 필터 제거
for name, module in model.named_modules():
if isinstance(module, torch.nn.Conv2d):
prune.l1_unstructured(module, name='weight', amount=0.3)
prune.remove(module, 'weight') # 마스크 영구 적용
# 모델 크기 확인
original_params = sum(p.numel() for p in models.resnet50().parameters())
pruned_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"원본 파라미터: {original_params:,}")
print(f"프루닝 후: {pruned_params:,}")
print(f"감소율: {(1 - pruned_params/original_params)*100:.1f}%")
지식 증류 (Knowledge Distillation)
import torch
import torch.nn as nn
import torch.nn.functional as F
class DistillationLoss(nn.Module):
def __init__(self, temperature=4.0, alpha=0.7):
super().__init__()
self.T = temperature
self.alpha = alpha
def forward(self, student_logits, teacher_logits, labels):
# 소프트 타겟 손실 (증류 손실)
soft_loss = F.kl_div(
F.log_softmax(student_logits / self.T, dim=1),
F.softmax(teacher_logits / self.T, dim=1),
reduction='batchmean'
) * (self.T ** 2)
# 하드 타겟 손실 (교차 엔트로피)
hard_loss = F.cross_entropy(student_logits, labels)
return self.alpha * soft_loss + (1 - self.alpha) * hard_loss
def train_student(teacher, student, dataloader, epochs=10):
teacher.eval()
student.train()
optimizer = torch.optim.AdamW(student.parameters(), lr=1e-4)
criterion = DistillationLoss(temperature=4.0, alpha=0.7)
for epoch in range(epochs):
total_loss = 0
for images, labels in dataloader:
with torch.no_grad():
teacher_logits = teacher(images)
student_logits = student(images)
loss = criterion(student_logits, teacher_logits, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(dataloader):.4f}")
# 교사: ResNet50, 학생: MobileNetV2
teacher = models.resnet50(pretrained=True)
student = models.mobilenet_v2(pretrained=False)
양자화 인식 학습 (QAT)
import torch
from torch.quantization import get_default_qat_qconfig, prepare_qat, convert
model = models.mobilenet_v2(pretrained=True)
model.train()
# QAT 설정
model.qconfig = get_default_qat_qconfig('qnnpack') # ARM/모바일
# model.qconfig = get_default_qat_qconfig('fbgemm') # x86
# QAT 준비 (가짜 양자화 삽입)
model = prepare_qat(model, inplace=False)
# 미세 조정 학습 (소규모 에폭)
optimizer = torch.optim.SGD(model.parameters(), lr=0.0001)
model.train()
for epoch in range(5):
for images, labels in dataloader:
outputs = model(images)
loss = nn.CrossEntropyLoss()(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f"QAT Epoch {epoch+1}/5 완료")
# INT8 모델로 변환
model.eval()
quantized_model = convert(model.eval(), inplace=False)
torch.save(quantized_model.state_dict(), 'mobilenetv2_int8.pth')
print("QAT 완료! 모델 정확도를 거의 유지하면서 4배 작아짐")
종합 벤치마크 도구
import time
import numpy as np
import psutil
import os
class EdgeAIBenchmark:
def __init__(self, model_path, framework='tflite'):
self.model_path = model_path
self.framework = framework
self.results = {}
def measure_latency(self, input_data, num_runs=100, warmup=10):
"""평균 추론 지연 시간 측정"""
import tensorflow as tf
interpreter = tf.lite.Interpreter(model_path=self.model_path)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
# 워밍업
for _ in range(warmup):
interpreter.set_tensor(input_details[0]['index'], input_data)
interpreter.invoke()
# 측정
latencies = []
for _ in range(num_runs):
start = time.perf_counter()
interpreter.set_tensor(input_details[0]['index'], input_data)
interpreter.invoke()
_ = interpreter.get_tensor(output_details[0]['index'])
latencies.append((time.perf_counter() - start) * 1000)
self.results['latency_mean_ms'] = np.mean(latencies)
self.results['latency_p99_ms'] = np.percentile(latencies, 99)
self.results['throughput_fps'] = 1000 / np.mean(latencies)
return self.results
def measure_memory(self):
"""메모리 사용량 측정"""
process = psutil.Process(os.getpid())
before = process.memory_info().rss / 1024 / 1024
import tensorflow as tf
interpreter = tf.lite.Interpreter(model_path=self.model_path)
interpreter.allocate_tensors()
after = process.memory_info().rss / 1024 / 1024
self.results['memory_mb'] = after - before
return self.results
def measure_model_size(self):
"""모델 파일 크기"""
size_bytes = os.path.getsize(self.model_path)
self.results['model_size_mb'] = size_bytes / 1024 / 1024
return self.results
def run_full_benchmark(self, input_data):
self.measure_model_size()
self.measure_memory()
self.measure_latency(input_data)
print(f"\n=== {self.model_path} 벤치마크 ===")
print(f"모델 크기: {self.results.get('model_size_mb', 0):.2f} MB")
print(f"메모리 사용: {self.results.get('memory_mb', 0):.2f} MB")
print(f"평균 지연: {self.results.get('latency_mean_ms', 0):.2f} ms")
print(f"P99 지연: {self.results.get('latency_p99_ms', 0):.2f} ms")
print(f"처리량: {self.results.get('throughput_fps', 0):.1f} FPS")
return self.results
# 사용
import numpy as np
input_data = np.random.rand(1, 224, 224, 3).astype(np.float32)
bench = EdgeAIBenchmark('mobilenetv2.tflite')
results = bench.run_full_benchmark(input_data)
bench_q = EdgeAIBenchmark('mobilenetv2_int8.tflite')
results_q = bench_q.run_full_benchmark(input_data)
print("\n=== 양자화 효과 ===")
size_reduction = (1 - results_q['model_size_mb'] / results['model_size_mb']) * 100
speed_improvement = results['latency_mean_ms'] / results_q['latency_mean_ms']
print(f"크기 감소: {size_reduction:.1f}%")
print(f"속도 향상: {speed_improvement:.1f}배")
마무리
엣지 AI는 더 이상 연구 영역이 아니라 실제 제품에 적용되는 기술입니다. 이 가이드에서 다룬 주요 사항을 정리하면:
- TFLite: 모바일 앱에서 가장 광범위하게 사용. Android/iOS 모두 지원
- ONNX Runtime: 프레임워크 독립적. 크로스 플랫폼 배포에 최적
- Core ML: Apple 기기에서 Neural Engine 최대 활용
- TensorRT: NVIDIA GPU 가속 최대화
- llama.cpp: CPU에서 LLM 실행. Apple Silicon에서 특히 강력
- Whisper.cpp: 오프라인 음성 인식의 표준
- MediaPipe: 비전 ML 솔루션의 빠른 프로토타이핑
- Transformers.js: 브라우저에서 HuggingFace 모델 직접 실행
모델 선택과 최적화 전략은 타겟 하드웨어, 요구 정확도, 지연 시간 목표에 따라 결정됩니다. INT8 양자화는 대부분의 시나리오에서 정확도 손실을 최소화하면서 크기와 속도를 크게 개선하므로, 엣지 AI 프로젝트의 첫 번째 최적화 단계로 강력히 추천합니다.
참고 자료
Edge AI and On-Device ML Complete Guide: TFLite, ONNX, Core ML, llama.cpp
Table of Contents
- Edge AI Overview
- TensorFlow Lite (TFLite)
- ONNX and ONNX Runtime
- Core ML (Apple)
- NVIDIA Jetson Platform
- Raspberry Pi AI
- MediaPipe
- llama.cpp and GGUF
- Whisper.cpp
- Web Browser AI
- AI Model Optimization Pipeline
1. Edge AI Overview
Cloud AI vs Edge AI
Where AI inference runs depends heavily on the nature and requirements of your application. Traditional cloud AI sends data to a remote server, performs inference, and returns results. Edge AI, by contrast, runs inference directly on the device where data is generated — smartphones, IoT sensors, cameras, and more.
| Dimension | Cloud AI | Edge AI |
|---|---|---|
| Compute location | Remote server | Local device |
| Latency | Hundreds of ms to seconds | A few ms to tens of ms |
| Privacy | Data leaves device | Data stays on device |
| Internet dependency | Required | Not required |
| Cost | Per-API-call charges | One-time model cost |
| Model size limit | None | Memory/storage constrained |
Advantages of Edge AI
1. Privacy Protection
Sensitive data such as medical images, biometrics, and personal audio never leaves the device. Compliance with GDPR, HIPAA, and other data regulations is naturally achieved.
2. Ultra-Low Latency
Applications like autonomous vehicles, industrial automation, real-time translation, and AR/VR require millisecond responses. Without network round-trip latency, consistent response times are guaranteed.
3. Cost Reduction
No cloud API call costs. At scale, running inference locally across millions of devices reduces central server costs to near zero.
4. Offline Operation
AI features work in environments with unreliable or no internet connectivity — rural areas, underground locations, aircraft, etc.
5. Real-Time Data Processing
Filtering, anomaly detection, and classification of IoT sensor data locally before uploading dramatically reduces transmission volume and storage costs.
Edge Hardware
Mobile GPUs and NPUs
Modern smartphones include dedicated AI hardware:
- Apple Neural Engine (ANE): Available since iPhone 8 and in M-series Macs. A17 Pro delivers 35 TOPS
- Qualcomm Hexagon DSP: Android flagships. Snapdragon 8 Gen 3 features Hexagon NPU
- Google Tensor: Pixel-exclusive chip, optimized for on-device speech recognition and translation
- MediaTek APU: Widely used in mid-range Android devices
Edge Computing Boards
- NVIDIA Jetson: For autonomous driving, robotics, smart cameras. Jetson Orin delivers 275 TOPS
- Raspberry Pi 5: 4GB/8GB memory, suitable for general computer vision tasks
- Google Coral: Edge TPU for TFLite-specific acceleration
- Intel Neural Compute Stick: USB inference accelerator
Edge AI Application Areas
- Smartphones: Face unlock, photo classification, real-time translation, voice assistants
- Smart Home: Voice command processing, motion detection, energy optimization
- Industrial IoT: Defect detection, predictive maintenance, anomaly detection
- Medical Devices: ECG analysis, glucose prediction, skin condition diagnosis
- Autonomous Vehicles: Real-time object detection, lane recognition, obstacle avoidance
- Agriculture: Drone-based crop monitoring, pest and disease detection
2. TensorFlow Lite (TFLite)
TensorFlow Lite is Google's lightweight ML framework for mobile and edge devices. It converts TensorFlow models to the TFLite format (.tflite) for deployment on Android, iOS, embedded Linux, and microcontrollers.
Converting Models (SavedModel to TFLite)
import tensorflow as tf
# Method 1: Convert from SavedModel
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model_dir')
tflite_model = converter.convert()
with open('model.tflite', 'wb') as f:
f.write(tflite_model)
# Method 2: Convert directly from a Keras model
model = tf.keras.applications.MobileNetV2(weights='imagenet')
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()
with open('mobilenetv2.tflite', 'wb') as f:
f.write(tflite_model)
# Method 3: Convert from a concrete function
@tf.function(input_signature=[tf.TensorSpec(shape=[1, 224, 224, 3], dtype=tf.float32)])
def predict(x):
return model(x)
converter = tf.lite.TFLiteConverter.from_concrete_functions(
[predict.get_concrete_function()]
)
tflite_model = converter.convert()
Quantization
Quantization converts model weights and activations from floating point to lower-precision integers, reducing model size and improving inference speed.
Float16 Quantization
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model_dir')
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_types = [tf.float16]
tflite_model = converter.convert()
# ~50% size reduction, minimal accuracy loss
Full Integer (INT8) Quantization
import numpy as np
def representative_dataset():
# Use 100-1000 representative samples from real data
for _ in range(100):
data = np.random.rand(1, 224, 224, 3).astype(np.float32)
yield [data]
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model_dir')
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_dataset
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8
tflite_model = converter.convert()
# ~75% size reduction, 2-4x inference speedup
Dynamic Range Quantization
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model_dir')
converter.optimizations = [tf.lite.Optimize.DEFAULT]
# No representative dataset needed - quantizes weights only
tflite_model = converter.convert()
TFLite Interpreter
import tensorflow as tf
import numpy as np
from PIL import Image
# Initialize interpreter
interpreter = tf.lite.Interpreter(model_path='mobilenetv2.tflite')
interpreter.allocate_tensors()
# Inspect input/output details
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
print(f"Input shape: {input_details[0]['shape']}")
print(f"Input dtype: {input_details[0]['dtype']}")
print(f"Output shape: {output_details[0]['shape']}")
# Preprocess image
img = Image.open('test_image.jpg').resize((224, 224))
input_data = np.expand_dims(np.array(img, dtype=np.float32) / 255.0, axis=0)
# Run inference
interpreter.set_tensor(input_details[0]['index'], input_data)
interpreter.invoke()
# Extract result
output_data = interpreter.get_tensor(output_details[0]['index'])
predicted_class = np.argmax(output_data[0])
confidence = output_data[0][predicted_class]
print(f"Predicted class: {predicted_class}, Confidence: {confidence:.4f}")
Multithreading and GPU Delegate
# Configure multithreading
interpreter = tf.lite.Interpreter(
model_path='model.tflite',
num_threads=4
)
# GPU delegate (Android/iOS)
try:
from tensorflow.lite.python.interpreter import load_delegate
gpu_delegate = load_delegate('libdelegate.so')
interpreter = tf.lite.Interpreter(
model_path='model.tflite',
experimental_delegates=[gpu_delegate]
)
print("GPU delegate activated")
except Exception as e:
print(f"GPU delegate unavailable, using CPU: {e}")
interpreter = tf.lite.Interpreter(model_path='model.tflite')
interpreter.allocate_tensors()
Android Deployment
build.gradle:
dependencies {
implementation 'org.tensorflow:tensorflow-lite:2.13.0'
implementation 'org.tensorflow:tensorflow-lite-gpu:2.13.0'
implementation 'org.tensorflow:tensorflow-lite-support:0.4.4'
}
Kotlin code:
import org.tensorflow.lite.Interpreter
import java.nio.ByteBuffer
import java.nio.ByteOrder
class TFLiteClassifier(private val context: Context) {
private lateinit var interpreter: Interpreter
private val inputSize = 224
private val numClasses = 1000
fun initialize() {
val model = loadModelFile("mobilenetv2.tflite")
val options = Interpreter.Options().apply {
numThreads = 4
useNNAPI = true // Android Neural Networks API
}
interpreter = Interpreter(model, options)
}
private fun loadModelFile(filename: String): ByteBuffer {
val assetFileDescriptor = context.assets.openFd(filename)
val fileInputStream = FileInputStream(assetFileDescriptor.fileDescriptor)
val fileChannel = fileInputStream.channel
val startOffset = assetFileDescriptor.startOffset
val declaredLength = assetFileDescriptor.declaredLength
return fileChannel.map(FileChannel.MapMode.READ_ONLY, startOffset, declaredLength)
}
fun classify(bitmap: Bitmap): FloatArray {
val resized = Bitmap.createScaledBitmap(bitmap, inputSize, inputSize, true)
val inputBuffer = ByteBuffer.allocateDirect(1 * inputSize * inputSize * 3 * 4)
inputBuffer.order(ByteOrder.nativeOrder())
for (y in 0 until inputSize) {
for (x in 0 until inputSize) {
val pixel = resized.getPixel(x, y)
inputBuffer.putFloat(((pixel shr 16 and 0xFF) / 255.0f))
inputBuffer.putFloat(((pixel shr 8 and 0xFF) / 255.0f))
inputBuffer.putFloat(((pixel and 0xFF) / 255.0f))
}
}
val outputBuffer = Array(1) { FloatArray(numClasses) }
interpreter.run(inputBuffer, outputBuffer)
return outputBuffer[0]
}
}
iOS Deployment
import TensorFlowLite
import UIKit
class TFLiteImageClassifier {
private var interpreter: Interpreter
private let inputWidth = 224
private let inputHeight = 224
init(modelName: String) throws {
guard let modelPath = Bundle.main.path(forResource: modelName, ofType: "tflite") else {
throw NSError(domain: "ModelNotFound", code: 0, userInfo: nil)
}
var options = Interpreter.Options()
options.threadCount = 4
interpreter = try Interpreter(modelPath: modelPath, options: options)
try interpreter.allocateTensors()
}
func classify(image: UIImage) throws -> [Float] {
guard let cgImage = image.cgImage else { return [] }
let inputData = preprocessImage(cgImage: cgImage)
try interpreter.copy(inputData, toInputAt: 0)
try interpreter.invoke()
let outputTensor = try interpreter.output(at: 0)
let results: [Float] = [Float](unsafeData: outputTensor.data) ?? []
return results
}
private func preprocessImage(cgImage: CGImage) -> Data {
var data = Data(count: inputWidth * inputHeight * 3 * 4)
return data
}
}
3. ONNX and ONNX Runtime
ONNX (Open Neural Network Exchange) is an open format that makes ML models portable across frameworks. Models trained in PyTorch, TensorFlow, or scikit-learn can be exported to a single standard format and run with ONNX Runtime anywhere.
Converting PyTorch to ONNX
import torch
import torch.nn as nn
import torchvision.models as models
# Load model
model = models.resnet50(pretrained=True)
model.eval()
# Create dummy input
dummy_input = torch.randn(1, 3, 224, 224)
# Export to ONNX
torch.onnx.export(
model,
dummy_input,
"resnet50.onnx",
export_params=True,
opset_version=17,
do_constant_folding=True,
input_names=['input'],
output_names=['output'],
dynamic_axes={
'input': {0: 'batch_size'},
'output': {0: 'batch_size'}
}
)
print("ONNX export complete!")
# Validate the model
import onnx
onnx_model = onnx.load("resnet50.onnx")
onnx.checker.check_model(onnx_model)
print(f"ONNX IR version: {onnx_model.ir_version}")
print(f"Opset version: {onnx_model.opset_import[0].version}")
ONNX Runtime Inference
import onnxruntime as ort
import numpy as np
from PIL import Image
import torchvision.transforms as transforms
# Create session with execution providers
providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
session = ort.InferenceSession("resnet50.onnx", providers=providers)
print(f"Active providers: {session.get_providers()}")
input_name = session.get_inputs()[0].name
output_name = session.get_outputs()[0].name
input_shape = session.get_inputs()[0].shape
print(f"Input: {input_name}, shape: {input_shape}")
# Preprocess image
transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
img = Image.open('test.jpg')
input_tensor = transform(img).unsqueeze(0).numpy()
# Run inference
outputs = session.run([output_name], {input_name: input_tensor})
logits = outputs[0]
predicted_class = np.argmax(logits[0])
print(f"Predicted class: {predicted_class}")
ONNX Runtime Optimization
from onnxruntime.transformers import optimizer
from onnxruntime.quantization import quantize_dynamic, QuantType
# Graph optimization for transformer models
optimized_model = optimizer.optimize_model(
'bert_base.onnx',
model_type='bert',
num_heads=12,
hidden_size=768
)
optimized_model.save_model_to_file('bert_optimized.onnx')
# Dynamic INT8 quantization
quantize_dynamic(
model_input='bert_optimized.onnx',
model_output='bert_quantized_int8.onnx',
weight_type=QuantType.QInt8,
per_channel=True
)
print("Quantization complete!")
# Session options tuning
so = ort.SessionOptions()
so.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
so.intra_op_num_threads = 4
so.inter_op_num_threads = 2
so.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL
session = ort.InferenceSession('model.onnx', sess_options=so)
ONNX Runtime Web (Browser)
// npm install onnxruntime-web
import * as ort from 'onnxruntime-web'
async function runInference() {
// Configure WebAssembly backend
ort.env.wasm.wasmPaths = '/static/'
ort.env.wasm.numThreads = 4
// Create session
const session = await ort.InferenceSession.create('/models/mobilenet.onnx', {
executionProviders: ['webgpu', 'wasm'],
graphOptimizationLevel: 'all',
})
// Create input tensor (1, 3, 224, 224)
const inputData = new Float32Array(1 * 3 * 224 * 224).fill(0.5)
const inputTensor = new ort.Tensor('float32', inputData, [1, 3, 224, 224])
// Run inference
const feeds = { input: inputTensor }
const results = await session.run(feeds)
const outputData = results.output.data
const maxIndex = Array.from(outputData).indexOf(Math.max(...outputData))
console.log('Predicted class:', maxIndex)
}
runInference()
4. Core ML (Apple)
Core ML is Apple's framework for running ML models on Apple platforms (iOS, macOS, watchOS, tvOS). It leverages the Neural Engine for power-efficient and fast inference.
Converting Models with coremltools
import coremltools as ct
import torch
import torchvision.models as models
# PyTorch model conversion
torch_model = models.mobilenet_v2(pretrained=True)
torch_model.eval()
example_input = torch.rand(1, 3, 224, 224)
traced_model = torch.jit.trace(torch_model, example_input)
# Convert to Core ML
mlmodel = ct.convert(
traced_model,
inputs=[ct.TensorType(name='input', shape=(1, 3, 224, 224))],
compute_units=ct.ComputeUnit.ALL, # CPU + GPU + Neural Engine
minimum_deployment_target=ct.target.iOS16
)
# Add metadata
mlmodel.short_description = "MobileNetV2 Image Classifier"
mlmodel.author = "YJ Blog"
mlmodel.version = "1.0"
mlmodel.save("MobileNetV2.mlpackage")
print("Core ML conversion complete!")
Float16 and INT8 Quantization
import coremltools as ct
from coremltools.optimize.coreml import (
OpLinearQuantizerConfig,
OptimizationConfig,
linearly_quantize_weights
)
mlmodel = ct.models.MLModel("MobileNetV2.mlpackage")
# Linear weight quantization (8-bit)
op_config = OpLinearQuantizerConfig(mode="linear_symmetric", dtype="int8")
config = OptimizationConfig(global_config=op_config)
compressed_model = linearly_quantize_weights(mlmodel, config)
compressed_model.save("MobileNetV2_int8.mlpackage")
# Palettization (4-bit)
from coremltools.optimize.coreml import palettize_weights, OpPalettizerConfig
palette_config = OptimizationConfig(
global_config=OpPalettizerConfig(mode="kmeans", nbits=4)
)
palette_model = palettize_weights(mlmodel, palette_config)
palette_model.save("MobileNetV2_4bit.mlpackage")
Using Core ML in Swift
import CoreML
import Vision
import UIKit
class CoreMLClassifier {
private var model: VNCoreMLModel?
func loadModel() {
guard let modelURL = Bundle.main.url(forResource: "MobileNetV2", withExtension: "mlpackage") else {
print("Model file not found")
return
}
let config = MLModelConfiguration()
config.computeUnits = .all // CPU + GPU + Neural Engine
do {
let coreMLModel = try MLModel(contentsOf: modelURL, configuration: config)
model = try VNCoreMLModel(for: coreMLModel)
print("Model loaded successfully")
} catch {
print("Failed to load model: \(error)")
}
}
func classify(image: UIImage, completion: @escaping ([VNClassificationObservation]?) -> Void) {
guard let model = model,
let cgImage = image.cgImage else {
completion(nil)
return
}
let request = VNCoreMLRequest(model: model) { request, error in
guard let results = request.results as? [VNClassificationObservation] else {
completion(nil)
return
}
completion(Array(results.prefix(5)))
}
request.imageCropAndScaleOption = .centerCrop
let handler = VNImageRequestHandler(cgImage: cgImage, options: [:])
DispatchQueue.global(qos: .userInteractive).async {
try? handler.perform([request])
}
}
}
Training Custom Models with Create ML
import CreateML
import Foundation
// Train an image classifier
let trainingData = MLImageClassifier.DataSource.labeledDirectories(
at: URL(fileURLWithPath: "/path/to/training_data")
)
let parameters = MLImageClassifier.ModelParameters(
featureExtractor: .scenePrint(revision: 2),
maxIterations: 25,
augmentation: [.flip, .crop, .rotation]
)
let classifier = try MLImageClassifier(
trainingData: trainingData,
parameters: parameters
)
let evaluationData = MLImageClassifier.DataSource.labeledDirectories(
at: URL(fileURLWithPath: "/path/to/test_data")
)
let metrics = classifier.evaluation(on: evaluationData)
print("Accuracy: \(1.0 - metrics.classificationError)")
try classifier.write(to: URL(fileURLWithPath: "MyClassifier.mlmodel"))
5. NVIDIA Jetson Platform
NVIDIA Jetson is an embedded AI computing platform widely used in robotics, autonomous vehicles, and smart cameras.
Jetson Model Comparison
| Model | AI Performance | RAM | Power | Primary Use |
|---|---|---|---|---|
| Jetson Nano | 472 GFLOPS | 4GB | 10W | Education, prototyping |
| Jetson Xavier NX | 21 TOPS | 8/16GB | 15W | Industrial IoT |
| Jetson AGX Orin | 275 TOPS | 64GB | 60W | Autonomous vehicles, robotics |
| Jetson Orin NX | 100 TOPS | 16GB | 25W | Edge AI |
TensorRT Conversion
import tensorrt as trt
import numpy as np
import pycuda.driver as cuda
import pycuda.autoinit
TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
def build_engine_from_onnx(onnx_path, engine_path, fp16=True, int8=False):
with trt.Builder(TRT_LOGGER) as builder, \
builder.create_network(1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH)) as network, \
trt.OnnxParser(network, TRT_LOGGER) as parser:
config = builder.create_builder_config()
config.max_workspace_size = 1 << 30 # 1GB
if fp16:
config.set_flag(trt.BuilderFlag.FP16)
if int8:
config.set_flag(trt.BuilderFlag.INT8)
with open(onnx_path, 'rb') as f:
if not parser.parse(f.read()):
for error in range(parser.num_errors):
print(parser.get_error(error))
return None
print("Building TensorRT engine (may take several minutes)...")
serialized_engine = builder.build_serialized_network(network, config)
with open(engine_path, 'wb') as f:
f.write(serialized_engine)
print(f"Engine saved: {engine_path}")
build_engine_from_onnx('resnet50.onnx', 'resnet50_fp16.trt', fp16=True)
TensorRT Inference
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit
import numpy as np
class TRTInference:
def __init__(self, engine_path):
TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
with open(engine_path, 'rb') as f:
runtime = trt.Runtime(TRT_LOGGER)
self.engine = runtime.deserialize_cuda_engine(f.read())
self.context = self.engine.create_execution_context()
self.inputs, self.outputs, self.bindings, self.stream = self._allocate_buffers()
def _allocate_buffers(self):
inputs, outputs, bindings = [], [], []
stream = cuda.Stream()
for binding in self.engine:
size = trt.volume(self.engine.get_binding_shape(binding))
dtype = trt.nptype(self.engine.get_binding_dtype(binding))
host_mem = cuda.pagelocked_empty(size, dtype)
device_mem = cuda.mem_alloc(host_mem.nbytes)
bindings.append(int(device_mem))
if self.engine.binding_is_input(binding):
inputs.append({'host': host_mem, 'device': device_mem})
else:
outputs.append({'host': host_mem, 'device': device_mem})
return inputs, outputs, bindings, stream
def infer(self, input_data):
np.copyto(self.inputs[0]['host'], input_data.ravel())
cuda.memcpy_htod_async(self.inputs[0]['device'], self.inputs[0]['host'], self.stream)
self.context.execute_async_v2(bindings=self.bindings, stream_handle=self.stream.handle)
cuda.memcpy_dtoh_async(self.outputs[0]['host'], self.outputs[0]['device'], self.stream)
self.stream.synchronize()
return self.outputs[0]['host']
trt_model = TRTInference('resnet50_fp16.trt')
input_array = np.random.rand(1, 3, 224, 224).astype(np.float32)
result = trt_model.infer(input_array)
print(f"Predicted class: {np.argmax(result)}")
DeepStream SDK for Video Pipelines
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst, GLib
Gst.init(None)
def create_pipeline():
pipeline = Gst.Pipeline()
# Source: USB camera
source = Gst.ElementFactory.make("v4l2src", "usb-cam-source")
source.set_property("device", "/dev/video0")
caps = Gst.ElementFactory.make("capsfilter", "capsfilter")
caps.set_property("caps", Gst.Caps.from_string(
"video/x-raw,width=1280,height=720,framerate=30/1"
))
nvconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
# nvinfer element runs TensorRT inference
nvinfer = Gst.ElementFactory.make("nvinfer", "primary-inference")
nvinfer.set_property("config-file-path", "config_infer_primary.txt")
tracker = Gst.ElementFactory.make("nvtracker", "tracker")
tracker.set_property(
"ll-lib-file",
"/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so"
)
osd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
for element in [source, caps, nvconv, nvinfer, tracker, osd, sink]:
pipeline.add(element)
source.link(caps)
caps.link(nvconv)
nvconv.link(nvinfer)
nvinfer.link(tracker)
tracker.link(osd)
osd.link(sink)
return pipeline
pipeline = create_pipeline()
pipeline.set_state(Gst.State.PLAYING)
6. Raspberry Pi AI
Raspberry Pi has evolved from an educational platform into a genuine edge AI deployment target.
Raspberry Pi 5 + Hailo-8
The Hailo-8 is an AI accelerator HAT that delivers 26 TOPS for Raspberry Pi.
# Install Hailo SDK
pip install hailort
# Download pre-compiled model (ONNX -> HEF format)
wget https://hailo-model-zoo.s3.eu-west-2.amazonaws.com/ModelZoo/Compiled/v2.11.0/hailo8/resnet_v1_50.hef
import hailo_platform as hp
import numpy as np
with hp.VDevice() as vdevice:
hef = hp.Hef("resnet_v1_50.hef")
network_groups = vdevice.configure(hef)
network_group = network_groups[0]
input_vstreams_params = hp.InputVStreamParams.make_from_network_group(
network_group, quantized=False, format_type=hp.FormatType.FLOAT32
)
output_vstreams_params = hp.OutputVStreamParams.make_from_network_group(
network_group, quantized=False, format_type=hp.FormatType.FLOAT32
)
with hp.InferVStreams(network_group, input_vstreams_params, output_vstreams_params) as infer_pipeline:
input_data = {"input_layer1": np.random.rand(1, 224, 224, 3).astype(np.float32)}
with network_group.activate():
infer_results = infer_pipeline.infer(input_data)
output_key = 'resnet_v1_50/softmax1'
print(f"Result: {np.argmax(infer_results[output_key])}")
OpenCV + Raspberry Pi Camera
import cv2
import numpy as np
import tflite_runtime.interpreter as tflite
# Use tflite_runtime (lightweight) on Raspberry Pi
interpreter = tflite.Interpreter(
model_path='ssd_mobilenet_v2.tflite',
num_threads=4
)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
while True:
ret, frame = cap.read()
if not ret:
break
input_size = (input_details[0]['shape'][2], input_details[0]['shape'][1])
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
resized = cv2.resize(rgb_frame, input_size)
input_data = np.expand_dims(resized, axis=0).astype(np.uint8)
interpreter.set_tensor(input_details[0]['index'], input_data)
interpreter.invoke()
boxes = interpreter.get_tensor(output_details[0]['index'])[0]
classes = interpreter.get_tensor(output_details[1]['index'])[0]
scores = interpreter.get_tensor(output_details[2]['index'])[0]
h, w = frame.shape[:2]
for i in range(len(scores)):
if scores[i] > 0.5:
ymin, xmin, ymax, xmax = boxes[i]
cv2.rectangle(frame,
(int(xmin * w), int(ymin * h)),
(int(xmax * w), int(ymax * h)),
(0, 255, 0), 2)
label = f"class {int(classes[i])}: {scores[i]:.2f}"
cv2.putText(frame, label, (int(xmin * w), int(ymin * h) - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
cv2.imshow('Raspberry Pi AI', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
7. MediaPipe
Google MediaPipe provides ready-to-use ML solutions for face detection, hand tracking, pose estimation, object detection, and more.
Hand Tracking in Python
import mediapipe as mp
import cv2
import numpy as np
mp_hands = mp.solutions.hands
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
def run_hand_tracking():
cap = cv2.VideoCapture(0)
with mp_hands.Hands(
static_image_mode=False,
max_num_hands=2,
min_detection_confidence=0.7,
min_tracking_confidence=0.5
) as hands:
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
rgb_frame.flags.writeable = False
results = hands.process(rgb_frame)
rgb_frame.flags.writeable = True
frame = cv2.cvtColor(rgb_frame, cv2.COLOR_RGB2BGR)
if results.multi_hand_landmarks:
for hand_landmarks in results.multi_hand_landmarks:
mp_drawing.draw_landmarks(
frame,
hand_landmarks,
mp_hands.HAND_CONNECTIONS,
mp_drawing_styles.get_default_hand_landmarks_style(),
mp_drawing_styles.get_default_hand_connections_style()
)
# Extract landmark coordinates (21 keypoints)
for idx, landmark in enumerate(hand_landmarks.landmark):
h, w, _ = frame.shape
cx, cy = int(landmark.x * w), int(landmark.y * h)
if idx == 8: # Index fingertip
cv2.circle(frame, (cx, cy), 10, (255, 0, 0), -1)
cv2.imshow('Hand Tracking', frame)
if cv2.waitKey(5) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
run_hand_tracking()
Pose Estimation
import mediapipe as mp
import cv2
import numpy as np
mp_pose = mp.solutions.pose
def calculate_angle(a, b, c):
"""Calculate angle between three points."""
a = np.array(a)
b = np.array(b)
c = np.array(c)
radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - \
np.arctan2(a[1] - b[1], a[0] - b[0])
angle = np.abs(radians * 180.0 / np.pi)
if angle > 180.0:
angle = 360 - angle
return angle
cap = cv2.VideoCapture(0)
with mp_pose.Pose(
min_detection_confidence=0.5,
min_tracking_confidence=0.5,
model_complexity=1 # 0: Lite, 1: Full, 2: Heavy
) as pose:
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = pose.process(rgb_frame)
frame = cv2.cvtColor(rgb_frame, cv2.COLOR_RGB2BGR)
if results.pose_landmarks:
landmarks = results.pose_landmarks.landmark
h, w, _ = frame.shape
shoulder = [landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x * w,
landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y * h]
elbow = [landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].x * w,
landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].y * h]
wrist = [landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].x * w,
landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].y * h]
angle = calculate_angle(shoulder, elbow, wrist)
cv2.putText(frame, f"Elbow: {angle:.1f} deg",
(int(elbow[0]), int(elbow[1])),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
mp.solutions.drawing_utils.draw_landmarks(
frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS
)
cv2.imshow('Pose Estimation', frame)
if cv2.waitKey(5) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
MediaPipe Tasks API
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
base_options = python.BaseOptions(model_asset_path='efficientdet_lite0.tflite')
options = vision.ObjectDetectorOptions(
base_options=base_options,
running_mode=vision.RunningMode.IMAGE,
max_results=5,
score_threshold=0.5
)
with vision.ObjectDetector.create_from_options(options) as detector:
image = mp.Image.create_from_file('test_image.jpg')
detection_result = detector.detect(image)
for detection in detection_result.detections:
category = detection.categories[0]
print(f"Object: {category.category_name}, Score: {category.score:.2f}")
bbox = detection.bounding_box
print(f" Location: ({bbox.origin_x}, {bbox.origin_y}), Size: {bbox.width}x{bbox.height}")
8. llama.cpp and GGUF
llama.cpp is a C++ implementation of Meta's LLaMA model that enables running large language models on CPU without requiring a GPU.
Installation and Basic Usage
# Build from source
git clone https://github.com/ggerganov/llama.cpp
cd llama.cpp
# CPU only
make -j4
# Apple Silicon (Metal GPU acceleration)
make LLAMA_METAL=1 -j4
# NVIDIA CUDA
make LLAMA_CUDA=1 -j4
# Download a GGUF model
huggingface-cli download \
bartowski/Llama-3.2-3B-Instruct-GGUF \
Llama-3.2-3B-Instruct-Q4_K_M.gguf \
--local-dir ./models
# Interactive chat
./llama-cli \
-m models/Llama-3.2-3B-Instruct-Q4_K_M.gguf \
-n 512 \
-p "You are a helpful AI assistant." \
--repeat-penalty 1.1 \
-t 8 \
--color
Quantization Levels (GGUF)
| Quantization | Bits/weight | Size (7B) | Quality | When to Use |
|---|---|---|---|---|
| Q2_K | ~2.6 bits | ~2.7GB | Low | Very limited memory |
| Q4_0 | 4.5 bits | ~3.8GB | Moderate | Basic use |
| Q4_K_M | 4.8 bits | ~4.1GB | Good | Recommended: balanced |
| Q5_K_M | 5.7 bits | ~4.8GB | Very Good | Quality-focused |
| Q6_K | 6.6 bits | ~5.5GB | Excellent | High quality needed |
| Q8_0 | 8.5 bits | ~7.2GB | Best | Ample memory available |
llama-cpp-python
from llama_cpp import Llama
# Load model
llm = Llama(
model_path="./models/Llama-3.2-3B-Instruct-Q4_K_M.gguf",
n_ctx=4096, # Context window
n_threads=8, # CPU threads
n_gpu_layers=35, # Layers to offload to GPU (-1 for all)
verbose=False
)
# Basic text generation
output = llm(
"What is the capital of France?",
max_tokens=128,
temperature=0.7,
top_p=0.95,
top_k=40,
repeat_penalty=1.1
)
print(output['choices'][0]['text'])
# Chat completion
messages = [
{"role": "system", "content": "You are a helpful coding assistant."},
{"role": "user", "content": "Write a Python function to compute the Fibonacci sequence."}
]
response = llm.create_chat_completion(
messages=messages,
max_tokens=512,
temperature=0.7
)
print(response['choices'][0]['message']['content'])
# Streaming output
stream = llm.create_chat_completion(
messages=messages,
max_tokens=512,
stream=True
)
for chunk in stream:
delta = chunk['choices'][0]['delta']
if 'content' in delta:
print(delta['content'], end='', flush=True)
print()
OpenAI-Compatible Server
# Start llama.cpp server
./llama-server \
-m models/Llama-3.2-3B-Instruct-Q4_K_M.gguf \
--port 8080 \
--host 0.0.0.0 \
-n 2048 \
-t 8 \
--n-gpu-layers 35
# Use OpenAI SDK with local server
from openai import OpenAI
client = OpenAI(
base_url="http://localhost:8080/v1",
api_key="none"
)
response = client.chat.completions.create(
model="local-model",
messages=[
{"role": "user", "content": "Explain the difference between machine learning and deep learning."}
],
max_tokens=512,
temperature=0.7
)
print(response.choices[0].message.content)
Converting HuggingFace Models to GGUF
cd llama.cpp
# Install Python dependencies
pip install -r requirements.txt
# Convert HuggingFace model to GGUF
python convert_hf_to_gguf.py \
/path/to/hf_model \
--outfile models/my_model.gguf \
--outtype f16
# Quantize the model
./quantize models/my_model.gguf models/my_model_q4km.gguf Q4_K_M
9. Whisper.cpp
Whisper.cpp is a C++ implementation of OpenAI's Whisper speech recognition model, enabling offline speech recognition from Raspberry Pi to smartphones.
Installation and Basic Usage
# Build
git clone https://github.com/ggerganov/whisper.cpp
cd whisper.cpp
make -j4
# Apple Silicon with Metal
make WHISPER_METAL=1 -j4
# Download models
bash ./models/download-ggml-model.sh base.en # English only, 142MB
bash ./models/download-ggml-model.sh medium # Multilingual, 1.5GB
bash ./models/download-ggml-model.sh large-v3 # Best quality, 3.1GB
# Transcribe an audio file
./main -m models/ggml-medium.bin \
-f audio.wav \
-l en \
--output-txt \
-of output
# Real-time microphone input
./stream -m models/ggml-medium.bin \
-t 8 \
--step 500 \
--length 5000 \
-l en
whisper-cpp-python
import whisper_cpp
import numpy as np
import soundfile as sf
# Load model
model = whisper_cpp.Whisper.from_pretrained("medium")
# Transcribe a WAV file
audio, sr = sf.read("audio.wav", dtype="float32")
if audio.ndim > 1:
audio = audio.mean(axis=1) # Stereo -> mono
# Resample to 16kHz if needed
if sr != 16000:
import librosa
audio = librosa.resample(audio, orig_sr=sr, target_sr=16000)
result = model.transcribe(audio, language="en")
print(f"Transcript:\n{result['text']}")
# With timestamps
for segment in result['segments']:
start = segment['start']
end = segment['end']
text = segment['text']
print(f"[{start:.2f}s -> {end:.2f}s] {text}")
Quantizing Whisper Models
# Quantize GGML model
./quantize models/ggml-medium.bin models/ggml-medium-q5_0.bin q5_0
# Size comparison
ls -lh models/ggml-medium*.bin
# ggml-medium.bin: 1.5GB
# ggml-medium-q5_0.bin: ~900MB
Whisper on iOS with WhisperKit
import WhisperKit
class SpeechRecognizer {
var whisperKit: WhisperKit?
func initialize() async {
do {
whisperKit = try await WhisperKit(
model: "openai_whisper-medium",
computeOptions: ModelComputeOptions(melCompute: .cpuAndGPU)
)
print("Whisper model loaded")
} catch {
print("Initialization failed: \(error)")
}
}
func transcribe(audioURL: URL) async -> String? {
guard let whisperKit = whisperKit else { return nil }
do {
let result = try await whisperKit.transcribe(
audioPath: audioURL.path,
decodeOptions: DecodingOptions(language: "en")
)
return result.map(\.text).joined(separator: " ")
} catch {
print("Transcription failed: \(error)")
return nil
}
}
}
10. Web Browser AI
Thanks to WebAssembly and WebGPU, powerful AI inference in the browser is now practical.
TensorFlow.js
<!DOCTYPE html>
<html>
<head>
<title>Browser Image Classification</title>
<script src="https://cdn.jsdelivr.net/npm/@tensorflow/tfjs@latest"></script>
<script src="https://cdn.jsdelivr.net/npm/@tensorflow-models/mobilenet@2.1.1"></script>
</head>
<body>
<input type="file" id="imageInput" accept="image/*" />
<img id="preview" style="max-width: 400px;" />
<div id="result"></div>
<script>
let model
async function loadModel() {
model = await mobilenet.load({ version: 2, alpha: 1.0 })
console.log('Model loaded!')
document.getElementById('result').textContent = 'Model ready. Select an image.'
}
document.getElementById('imageInput').addEventListener('change', async (e) => {
const file = e.target.files[0]
if (!file) return
const img = document.getElementById('preview')
img.src = URL.createObjectURL(file)
img.onload = async () => {
const predictions = await model.classify(img, 5)
const resultDiv = document.getElementById('result')
resultDiv.innerHTML = '<h3>Top Predictions:</h3>'
predictions.forEach((pred) => {
resultDiv.innerHTML += ``
})
}
})
loadModel()
</script>
</body>
</html>
Transformers.js (HuggingFace)
import { pipeline, env } from '@xenova/transformers'
env.backends.onnx.wasm.wasmPaths = 'https://cdn.jsdelivr.net/npm/onnxruntime-web/dist/'
// Sentiment analysis pipeline
async function runTextClassification() {
const classifier = await pipeline(
'sentiment-analysis',
'Xenova/distilbert-base-uncased-finetuned-sst-2-english'
)
const results = await classifier(['I love machine learning!', 'This is terrible.'])
results.forEach((result, i) => {
console.log(`Text ${i + 1}: ${result.label} (${(result.score * 100).toFixed(2)}%)`)
})
}
// Image classification
async function runImageClassification() {
const classifier = await pipeline('image-classification', 'Xenova/vit-base-patch16-224')
const result = await classifier('https://example.com/image.jpg')
console.log('Image classification result:', result)
}
// Text generation with a small LLM
async function runTextGeneration() {
const generator = await pipeline('text-generation', 'Xenova/gpt2')
const output = await generator('The future of AI is', {
max_new_tokens: 100,
temperature: 0.7,
})
console.log('Generated text:', output[0].generated_text)
}
runTextClassification()
WebGPU-Accelerated Inference
import * as ort from 'onnxruntime-web'
async function runWithWebGPU() {
if (!navigator.gpu) {
console.log('WebGPU is not supported in this browser.')
return
}
const adapter = await navigator.gpu.requestAdapter()
const device = await adapter.requestDevice()
console.log('WebGPU adapter:', adapter.info)
ort.env.wasm.wasmPaths = '/'
const session = await ort.InferenceSession.create('/models/resnet50.onnx', {
executionProviders: ['webgpu'],
graphOptimizationLevel: 'all',
})
const batchSize = 4
const inputData = new Float32Array(batchSize * 3 * 224 * 224)
const inputTensor = new ort.Tensor('float32', inputData, [batchSize, 3, 224, 224])
const startTime = performance.now()
const output = await session.run({ input: inputTensor })
const elapsed = performance.now() - startTime
console.log(`WebGPU inference time: ${elapsed.toFixed(2)}ms`)
console.log(`Throughput: ${((batchSize / elapsed) * 1000).toFixed(1)} images/sec`)
}
runWithWebGPU()
11. AI Model Optimization Pipeline
End-to-End Flow: Train → Optimize → Deploy
Training (PyTorch/TF)
|
Pruning (remove unnecessary weights)
|
Knowledge Distillation (Teacher-Student)
|
Quantization-Aware Training (QAT)
|
Format Conversion (ONNX/TFLite/GGUF)
|
Runtime Optimization (TensorRT/OpenVINO)
|
Deployment (Mobile/Edge/Web)
Pruning
import torch
import torch.nn.utils.prune as prune
import torchvision.models as models
model = models.resnet50(pretrained=True)
# Unstructured pruning: remove 30% of weights in Conv2d layers
for name, module in model.named_modules():
if isinstance(module, torch.nn.Conv2d):
prune.l1_unstructured(module, name='weight', amount=0.3)
prune.remove(module, 'weight') # Make mask permanent
original_params = sum(p.numel() for p in models.resnet50().parameters())
pruned_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Original parameters: {original_params:,}")
print(f"After pruning: {pruned_params:,}")
print(f"Reduction: {(1 - pruned_params/original_params)*100:.1f}%")
Knowledge Distillation
import torch
import torch.nn as nn
import torch.nn.functional as F
class DistillationLoss(nn.Module):
def __init__(self, temperature=4.0, alpha=0.7):
super().__init__()
self.T = temperature
self.alpha = alpha
def forward(self, student_logits, teacher_logits, labels):
# Soft target loss (distillation)
soft_loss = F.kl_div(
F.log_softmax(student_logits / self.T, dim=1),
F.softmax(teacher_logits / self.T, dim=1),
reduction='batchmean'
) * (self.T ** 2)
# Hard target loss (cross-entropy)
hard_loss = F.cross_entropy(student_logits, labels)
return self.alpha * soft_loss + (1 - self.alpha) * hard_loss
def train_student(teacher, student, dataloader, epochs=10):
teacher.eval()
student.train()
optimizer = torch.optim.AdamW(student.parameters(), lr=1e-4)
criterion = DistillationLoss(temperature=4.0, alpha=0.7)
for epoch in range(epochs):
total_loss = 0
for images, labels in dataloader:
with torch.no_grad():
teacher_logits = teacher(images)
student_logits = student(images)
loss = criterion(student_logits, teacher_logits, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(dataloader):.4f}")
# Teacher: ResNet50, Student: MobileNetV2
teacher = models.resnet50(pretrained=True)
student = models.mobilenet_v2(pretrained=False)
Quantization-Aware Training (QAT)
import torch
from torch.quantization import get_default_qat_qconfig, prepare_qat, convert
model = models.mobilenet_v2(pretrained=True)
model.train()
# Set QAT config
model.qconfig = get_default_qat_qconfig('qnnpack') # ARM/mobile
# model.qconfig = get_default_qat_qconfig('fbgemm') # x86
# Prepare for QAT (insert fake quantization nodes)
model = prepare_qat(model, inplace=False)
# Fine-tune with QAT for a few epochs
optimizer = torch.optim.SGD(model.parameters(), lr=0.0001)
model.train()
for epoch in range(5):
for images, labels in dataloader:
outputs = model(images)
loss = nn.CrossEntropyLoss()(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f"QAT Epoch {epoch+1}/5 complete")
# Convert to INT8 model
model.eval()
quantized_model = convert(model.eval(), inplace=False)
torch.save(quantized_model.state_dict(), 'mobilenetv2_int8.pth')
print("QAT complete! 4x smaller model with minimal accuracy loss")
Comprehensive Benchmark Tool
import time
import numpy as np
import psutil
import os
class EdgeAIBenchmark:
def __init__(self, model_path, framework='tflite'):
self.model_path = model_path
self.framework = framework
self.results = {}
def measure_latency(self, input_data, num_runs=100, warmup=10):
"""Measure average inference latency."""
import tensorflow as tf
interpreter = tf.lite.Interpreter(model_path=self.model_path)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
# Warmup
for _ in range(warmup):
interpreter.set_tensor(input_details[0]['index'], input_data)
interpreter.invoke()
# Measurement
latencies = []
for _ in range(num_runs):
start = time.perf_counter()
interpreter.set_tensor(input_details[0]['index'], input_data)
interpreter.invoke()
_ = interpreter.get_tensor(output_details[0]['index'])
latencies.append((time.perf_counter() - start) * 1000)
self.results['latency_mean_ms'] = np.mean(latencies)
self.results['latency_p99_ms'] = np.percentile(latencies, 99)
self.results['throughput_fps'] = 1000 / np.mean(latencies)
return self.results
def measure_memory(self):
"""Measure memory consumption."""
process = psutil.Process(os.getpid())
before = process.memory_info().rss / 1024 / 1024
import tensorflow as tf
interpreter = tf.lite.Interpreter(model_path=self.model_path)
interpreter.allocate_tensors()
after = process.memory_info().rss / 1024 / 1024
self.results['memory_mb'] = after - before
return self.results
def measure_model_size(self):
"""Measure model file size."""
size_bytes = os.path.getsize(self.model_path)
self.results['model_size_mb'] = size_bytes / 1024 / 1024
return self.results
def run_full_benchmark(self, input_data):
self.measure_model_size()
self.measure_memory()
self.measure_latency(input_data)
print(f"\n=== {self.model_path} Benchmark ===")
print(f"Model size: {self.results.get('model_size_mb', 0):.2f} MB")
print(f"Memory usage: {self.results.get('memory_mb', 0):.2f} MB")
print(f"Mean latency: {self.results.get('latency_mean_ms', 0):.2f} ms")
print(f"P99 latency: {self.results.get('latency_p99_ms', 0):.2f} ms")
print(f"Throughput: {self.results.get('throughput_fps', 0):.1f} FPS")
return self.results
# Usage
input_data = np.random.rand(1, 224, 224, 3).astype(np.float32)
bench = EdgeAIBenchmark('mobilenetv2.tflite')
results = bench.run_full_benchmark(input_data)
bench_q = EdgeAIBenchmark('mobilenetv2_int8.tflite')
results_q = bench_q.run_full_benchmark(input_data)
print("\n=== Quantization Impact ===")
size_reduction = (1 - results_q['model_size_mb'] / results['model_size_mb']) * 100
speed_improvement = results['latency_mean_ms'] / results_q['latency_mean_ms']
print(f"Size reduction: {size_reduction:.1f}%")
print(f"Speed improvement: {speed_improvement:.1f}x")
Summary
Edge AI is no longer a research curiosity — it is deployed in real products at scale. Here is a quick reference for the frameworks covered in this guide:
- TFLite: Widest adoption in mobile apps. Supports both Android and iOS natively
- ONNX Runtime: Framework-agnostic. Ideal for cross-platform deployment
- Core ML: Maximizes Apple Neural Engine on Apple devices
- TensorRT: Maximizes NVIDIA GPU acceleration on Jetson and servers
- llama.cpp: Runs LLMs on CPU. Especially powerful on Apple Silicon
- Whisper.cpp: The de facto standard for offline speech recognition
- MediaPipe: Fast prototyping of vision ML solutions
- Transformers.js: Run HuggingFace models directly in the browser
Model selection and optimization strategy depend on target hardware, accuracy requirements, and latency goals. INT8 quantization is the first optimization step strongly recommended for virtually every edge AI project — it dramatically reduces size and improves speed with minimal accuracy loss.