- Authors

- Name
- Youngju Kim
- @fjvbn20031
목차
- 엣지 AI 개요
- TensorFlow Lite (TFLite)
- ONNX와 ONNX Runtime
- Core ML (Apple)
- NVIDIA Jetson 플랫폼
- Raspberry Pi AI
- MediaPipe
- llama.cpp와 GGUF
- Whisper.cpp
- 웹 브라우저 AI
- AI 모델 최적화 파이프라인
1. 엣지 AI 개요
클라우드 AI vs 엣지 AI
AI 추론을 어디서 실행하느냐는 애플리케이션의 성격과 요구사항에 따라 크게 달라집니다. 전통적인 클라우드 AI 방식은 데이터를 원격 서버로 전송하고, 서버에서 추론한 뒤 결과를 다시 받아오는 구조입니다. 반면 엣지 AI는 데이터가 생성되는 기기(스마트폰, IoT 센서, 카메라 등)에서 직접 추론을 수행합니다.
| 구분 | 클라우드 AI | 엣지 AI |
|---|---|---|
| 연산 위치 | 원격 서버 | 로컬 디바이스 |
| 지연 시간 | 수백 ms ~ 수 초 | 수 ms ~ 수십 ms |
| 프라이버시 | 데이터 외부 전송 | 데이터 로컬 처리 |
| 인터넷 의존성 | 필수 | 불필요 |
| 비용 | API 호출 비용 발생 | 초기 모델 비용만 |
| 모델 크기 제한 | 없음 | 메모리/스토리지 제한 |
엣지 AI의 장점
1. 프라이버시 보호
의료 이미지, 생체 인식 데이터, 개인 음성 등 민감한 정보가 기기를 벗어나지 않습니다. GDPR, HIPAA 등 데이터 규제 준수가 자연스럽게 이루어집니다.
2. 초저지연 응답
자율주행, 산업 자동화, 실시간 번역, AR/VR 등 밀리초 단위 응답이 필요한 애플리케이션에서 결정적 이점을 제공합니다. 네트워크 왕복 지연이 없으므로 일관된 응답 속도를 보장합니다.
3. 비용 절감
클라우드 API 호출 비용이 없으며, 대규모 배포 시 서버 비용이 크게 줄어듭니다. 수백만 기기가 로컬에서 추론하면 중앙 서버 비용이 사실상 0에 가깝습니다.
4. 오프라인 동작
인터넷 연결이 불안정하거나 없는 환경(농촌, 지하, 항공기 등)에서도 AI 기능이 작동합니다.
5. 실시간 데이터 처리
IoT 센서 데이터를 클라우드로 올리기 전에 로컬에서 필터링, 이상 탐지, 분류를 수행하면 전송 데이터량과 스토리지 비용을 크게 줄일 수 있습니다.
엣지 하드웨어
모바일 GPU/NPU
현대 스마트폰에는 AI 전용 하드웨어가 탑재되어 있습니다.
- Apple Neural Engine (ANE): iPhone 8 이후 탑재, M-시리즈 맥에도 포함. A17 Pro는 35 TOPS 성능
- Qualcomm Hexagon DSP: Android 플래그십 폰. Snapdragon 8 Gen 3는 Hexagon NPU 탑재
- Google Tensor: Pixel 폰 전용 칩. 온디바이스 음성 인식, 번역 최적화
- MediaTek APU: 미들레인지 Android 폰 광범위 지원
엣지 컴퓨팅 보드
- NVIDIA Jetson: 자율주행, 로봇공학, 스마트 카메라용. Jetson Orin은 275 TOPS
- Raspberry Pi 5: 4GB/8GB 메모리, 일반 컴퓨터 비전 태스크에 적합
- Google Coral: Edge TPU 탑재, TFLite 전용 가속
- Intel Neural Compute Stick: USB 형태의 추론 가속기
엣지 AI 응용 분야
- 스마트폰: 얼굴 잠금 해제, 사진 분류, 실시간 번역, 음성 어시스턴트
- 스마트홈: 음성 명령 처리, 동작 감지, 에너지 최적화
- 산업 IoT: 불량품 검출, 예측 유지보수, 이상 탐지
- 의료기기: 심전도 분석, 혈당 예측, 피부 질환 진단
- 자율주행: 실시간 객체 탐지, 차선 인식, 장애물 회피
- 농업: 드론 기반 작물 모니터링, 병해충 탐지
2. TensorFlow Lite (TFLite)
TensorFlow Lite는 Google이 개발한 모바일 및 엣지 기기용 경량 ML 프레임워크입니다. TensorFlow 모델을 TFLite 형식(.tflite)으로 변환하여 Android, iOS, 임베디드 Linux, 마이크로컨트롤러에서 실행합니다.
TFLite 변환 (SavedModel → TFLite)
import tensorflow as tf
# 방법 1: SavedModel에서 변환
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model_dir')
tflite_model = converter.convert()
with open('model.tflite', 'wb') as f:
f.write(tflite_model)
# 방법 2: Keras 모델에서 직접 변환
model = tf.keras.applications.MobileNetV2(weights='imagenet')
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()
with open('mobilenetv2.tflite', 'wb') as f:
f.write(tflite_model)
# 방법 3: concrete function에서 변환
@tf.function(input_signature=[tf.TensorSpec(shape=[1, 224, 224, 3], dtype=tf.float32)])
def predict(x):
return model(x)
converter = tf.lite.TFLiteConverter.from_concrete_functions(
[predict.get_concrete_function()]
)
tflite_model = converter.convert()
양자화 (Quantization)
양자화는 모델의 가중치와 활성화를 부동소수점에서 저정밀도 정수로 변환하여 모델 크기를 줄이고 추론 속도를 높입니다.
Float16 양자화
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model_dir')
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_types = [tf.float16]
tflite_model = converter.convert()
# 모델 크기 약 50% 감소, 정확도 거의 유지
INT8 전체 정수 양자화
import numpy as np
def representative_dataset():
# 실제 데이터 100~1000개 샘플 사용
for _ in range(100):
data = np.random.rand(1, 224, 224, 3).astype(np.float32)
yield [data]
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model_dir')
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_dataset
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8
tflite_model = converter.convert()
# 모델 크기 약 75% 감소, 추론 속도 2-4배 향상
동적 범위 양자화
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model_dir')
converter.optimizations = [tf.lite.Optimize.DEFAULT]
# representative_dataset 없이도 가능 - 가중치만 양자화
tflite_model = converter.convert()
TFLite Interpreter
import tensorflow as tf
import numpy as np
from PIL import Image
# 인터프리터 초기화
interpreter = tf.lite.Interpreter(model_path='mobilenetv2.tflite')
interpreter.allocate_tensors()
# 입출력 텐서 정보 확인
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
print(f"입력 shape: {input_details[0]['shape']}")
print(f"입력 dtype: {input_details[0]['dtype']}")
print(f"출력 shape: {output_details[0]['shape']}")
# 이미지 전처리
img = Image.open('test_image.jpg').resize((224, 224))
input_data = np.expand_dims(np.array(img, dtype=np.float32) / 255.0, axis=0)
# 추론 실행
interpreter.set_tensor(input_details[0]['index'], input_data)
interpreter.invoke()
# 결과 추출
output_data = interpreter.get_tensor(output_details[0]['index'])
predicted_class = np.argmax(output_data[0])
confidence = output_data[0][predicted_class]
print(f"예측 클래스: {predicted_class}, 신뢰도: {confidence:.4f}")
멀티스레드 및 GPU 델리게이트
# 멀티스레드 설정
interpreter = tf.lite.Interpreter(
model_path='model.tflite',
num_threads=4
)
# GPU 델리게이트 (Android/iOS)
try:
from tensorflow.lite.python.interpreter import load_delegate
gpu_delegate = load_delegate('libdelegate.so')
interpreter = tf.lite.Interpreter(
model_path='model.tflite',
experimental_delegates=[gpu_delegate]
)
print("GPU 델리게이트 활성화")
except Exception as e:
print(f"GPU 델리게이트 실패, CPU 사용: {e}")
interpreter = tf.lite.Interpreter(model_path='model.tflite')
interpreter.allocate_tensors()
Android 배포
build.gradle 설정:
dependencies {
implementation 'org.tensorflow:tensorflow-lite:2.13.0'
implementation 'org.tensorflow:tensorflow-lite-gpu:2.13.0'
implementation 'org.tensorflow:tensorflow-lite-support:0.4.4'
}
Kotlin 코드:
import org.tensorflow.lite.Interpreter
import org.tensorflow.lite.support.image.TensorImage
import org.tensorflow.lite.support.tensorbuffer.TensorBuffer
import java.nio.ByteBuffer
import java.nio.ByteOrder
class TFLiteClassifier(private val context: Context) {
private lateinit var interpreter: Interpreter
private val inputSize = 224
private val numClasses = 1000
fun initialize() {
val model = loadModelFile("mobilenetv2.tflite")
val options = Interpreter.Options().apply {
numThreads = 4
useNNAPI = true // Android Neural Networks API
}
interpreter = Interpreter(model, options)
}
private fun loadModelFile(filename: String): ByteBuffer {
val assetFileDescriptor = context.assets.openFd(filename)
val fileInputStream = FileInputStream(assetFileDescriptor.fileDescriptor)
val fileChannel = fileInputStream.channel
val startOffset = assetFileDescriptor.startOffset
val declaredLength = assetFileDescriptor.declaredLength
return fileChannel.map(FileChannel.MapMode.READ_ONLY, startOffset, declaredLength)
}
fun classify(bitmap: Bitmap): FloatArray {
val resized = Bitmap.createScaledBitmap(bitmap, inputSize, inputSize, true)
val inputBuffer = ByteBuffer.allocateDirect(1 * inputSize * inputSize * 3 * 4)
inputBuffer.order(ByteOrder.nativeOrder())
for (y in 0 until inputSize) {
for (x in 0 until inputSize) {
val pixel = resized.getPixel(x, y)
inputBuffer.putFloat(((pixel shr 16 and 0xFF) / 255.0f))
inputBuffer.putFloat(((pixel shr 8 and 0xFF) / 255.0f))
inputBuffer.putFloat(((pixel and 0xFF) / 255.0f))
}
}
val outputBuffer = Array(1) { FloatArray(numClasses) }
interpreter.run(inputBuffer, outputBuffer)
return outputBuffer[0]
}
}
iOS 배포
Swift 코드:
import TensorFlowLite
import UIKit
class TFLiteImageClassifier {
private var interpreter: Interpreter
private let inputWidth = 224
private let inputHeight = 224
init(modelName: String) throws {
guard let modelPath = Bundle.main.path(forResource: modelName, ofType: "tflite") else {
throw NSError(domain: "ModelNotFound", code: 0, userInfo: nil)
}
var options = Interpreter.Options()
options.threadCount = 4
interpreter = try Interpreter(modelPath: modelPath, options: options)
try interpreter.allocateTensors()
}
func classify(image: UIImage) throws -> [Float] {
guard let cgImage = image.cgImage else { return [] }
let inputTensor = try interpreter.input(at: 0)
let batchSize = 1
let inputChannels = 3
let inputData = preprocessImage(cgImage: cgImage)
try interpreter.copy(inputData, toInputAt: 0)
try interpreter.invoke()
let outputTensor = try interpreter.output(at: 0)
let results: [Float] = [Float](unsafeData: outputTensor.data) ?? []
return results
}
private func preprocessImage(cgImage: CGImage) -> Data {
var data = Data(count: inputWidth * inputHeight * 3 * 4)
// 이미지 리사이즈 및 정규화 처리
return data
}
}
3. ONNX와 ONNX Runtime
ONNX(Open Neural Network Exchange)는 ML 모델을 다양한 프레임워크 간에 이식 가능하게 하는 개방형 포맷입니다. PyTorch, TensorFlow, scikit-learn 등에서 학습한 모델을 하나의 표준 형식으로 내보내고, ONNX Runtime으로 어디서든 실행할 수 있습니다.
PyTorch에서 ONNX로 변환
import torch
import torch.nn as nn
import torchvision.models as models
# 모델 로드
model = models.resnet50(pretrained=True)
model.eval()
# 더미 입력 생성
dummy_input = torch.randn(1, 3, 224, 224)
# ONNX 내보내기
torch.onnx.export(
model,
dummy_input,
"resnet50.onnx",
export_params=True,
opset_version=17,
do_constant_folding=True,
input_names=['input'],
output_names=['output'],
dynamic_axes={
'input': {0: 'batch_size'},
'output': {0: 'batch_size'}
}
)
print("ONNX 변환 완료!")
# ONNX 모델 검증
import onnx
onnx_model = onnx.load("resnet50.onnx")
onnx.checker.check_model(onnx_model)
print(f"ONNX IR 버전: {onnx_model.ir_version}")
print(f"Opset 버전: {onnx_model.opset_import[0].version}")
ONNX Runtime 추론
import onnxruntime as ort
import numpy as np
from PIL import Image
import torchvision.transforms as transforms
# 세션 생성 및 공급자 설정
providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
session = ort.InferenceSession("resnet50.onnx", providers=providers)
# 실제 사용된 공급자 확인
print(f"사용 공급자: {session.get_providers()}")
# 입출력 정보 확인
input_name = session.get_inputs()[0].name
output_name = session.get_outputs()[0].name
input_shape = session.get_inputs()[0].shape
print(f"입력 이름: {input_name}, shape: {input_shape}")
# 이미지 전처리
transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
img = Image.open('test.jpg')
input_tensor = transform(img).unsqueeze(0).numpy()
# 추론
outputs = session.run([output_name], {input_name: input_tensor})
logits = outputs[0]
predicted_class = np.argmax(logits[0])
print(f"예측 클래스: {predicted_class}")
ONNX Runtime 최적화
from onnxruntime.transformers import optimizer
from onnxruntime.quantization import quantize_dynamic, QuantType
# 그래프 최적화 (트랜스포머 모델)
optimized_model = optimizer.optimize_model(
'bert_base.onnx',
model_type='bert',
num_heads=12,
hidden_size=768,
optimization_options=None
)
optimized_model.save_model_to_file('bert_optimized.onnx')
# 동적 양자화 (INT8)
quantize_dynamic(
model_input='bert_optimized.onnx',
model_output='bert_quantized_int8.onnx',
weight_type=QuantType.QInt8,
per_channel=True
)
print("양자화 완료!")
# 세션 옵션 튜닝
so = ort.SessionOptions()
so.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
so.intra_op_num_threads = 4
so.inter_op_num_threads = 2
so.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL
session = ort.InferenceSession('model.onnx', sess_options=so)
ONNX Runtime Web (브라우저)
// npm install onnxruntime-web
import * as ort from 'onnxruntime-web'
async function runInference() {
// WebAssembly 백엔드 설정
ort.env.wasm.wasmPaths = '/static/'
ort.env.wasm.numThreads = 4
// 세션 생성
const session = await ort.InferenceSession.create('/models/mobilenet.onnx', {
executionProviders: ['webgpu', 'wasm'],
graphOptimizationLevel: 'all',
})
// 입력 텐서 생성 (1, 3, 224, 224)
const inputData = new Float32Array(1 * 3 * 224 * 224).fill(0.5)
const inputTensor = new ort.Tensor('float32', inputData, [1, 3, 224, 224])
// 추론 실행
const feeds = { input: inputTensor }
const results = await session.run(feeds)
// 결과 처리
const outputData = results.output.data
const maxIndex = Array.from(outputData).indexOf(Math.max(...outputData))
console.log('예측 클래스:', maxIndex)
}
runInference()
4. Core ML (Apple)
Core ML은 Apple 플랫폼(iOS, macOS, watchOS, tvOS)에서 ML 모델을 실행하기 위한 프레임워크입니다. Neural Engine을 활용해 전력 효율적이고 빠른 추론을 제공합니다.
coremltools를 사용한 변환
import coremltools as ct
import torch
import torchvision.models as models
# PyTorch 모델 변환
torch_model = models.mobilenet_v2(pretrained=True)
torch_model.eval()
example_input = torch.rand(1, 3, 224, 224)
traced_model = torch.jit.trace(torch_model, example_input)
# Core ML 변환
mlmodel = ct.convert(
traced_model,
inputs=[ct.TensorType(name='input', shape=(1, 3, 224, 224))],
compute_units=ct.ComputeUnit.ALL, # CPU + GPU + Neural Engine 모두 활용
minimum_deployment_target=ct.target.iOS16
)
# 메타데이터 추가
mlmodel.short_description = "MobileNetV2 이미지 분류기"
mlmodel.author = "YJ Blog"
mlmodel.version = "1.0"
mlmodel.save("MobileNetV2.mlpackage")
print("Core ML 변환 완료!")
Float16 및 INT8 양자화
import coremltools as ct
from coremltools.optimize.coreml import (
OpLinearQuantizerConfig,
OptimizationConfig,
linearly_quantize_weights
)
# 원본 모델 로드
mlmodel = ct.models.MLModel("MobileNetV2.mlpackage")
# 선형 가중치 양자화 (8비트)
op_config = OpLinearQuantizerConfig(mode="linear_symmetric", dtype="int8")
config = OptimizationConfig(global_config=op_config)
compressed_model = linearly_quantize_weights(mlmodel, config)
compressed_model.save("MobileNetV2_int8.mlpackage")
# 팔렛트 양자화 (4비트)
from coremltools.optimize.coreml import palettize_weights, OpPalettizerConfig
palette_config = OptimizationConfig(
global_config=OpPalettizerConfig(mode="kmeans", nbits=4)
)
palette_model = palettize_weights(mlmodel, palette_config)
palette_model.save("MobileNetV2_4bit.mlpackage")
Swift에서 Core ML 사용
import CoreML
import Vision
import UIKit
class CoreMLClassifier {
private var model: VNCoreMLModel?
func loadModel() {
guard let modelURL = Bundle.main.url(forResource: "MobileNetV2", withExtension: "mlpackage") else {
print("모델 파일을 찾을 수 없습니다")
return
}
let config = MLModelConfiguration()
config.computeUnits = .all // CPU + GPU + Neural Engine
do {
let coreMLModel = try MLModel(contentsOf: modelURL, configuration: config)
model = try VNCoreMLModel(for: coreMLModel)
print("모델 로드 성공")
} catch {
print("모델 로드 실패: \(error)")
}
}
func classify(image: UIImage, completion: @escaping ([VNClassificationObservation]?) -> Void) {
guard let model = model,
let cgImage = image.cgImage else {
completion(nil)
return
}
let request = VNCoreMLRequest(model: model) { request, error in
guard let results = request.results as? [VNClassificationObservation] else {
completion(nil)
return
}
let topResults = results.prefix(5)
completion(Array(topResults))
}
request.imageCropAndScaleOption = .centerCrop
let handler = VNImageRequestHandler(cgImage: cgImage, options: [:])
DispatchQueue.global(qos: .userInteractive).async {
do {
try handler.perform([request])
} catch {
print("추론 실패: \(error)")
completion(nil)
}
}
}
}
// 사용 예시
let classifier = CoreMLClassifier()
classifier.loadModel()
classifier.classify(image: UIImage(named: "test")!) { results in
results?.forEach { result in
print("\(result.identifier): \(String(format: "%.2f", result.confidence * 100))%")
}
}
Create ML로 커스텀 모델 학습
import CreateML
import Foundation
// 이미지 분류 모델 학습
let trainingData = MLImageClassifier.DataSource.labeledDirectories(
at: URL(fileURLWithPath: "/path/to/training_data")
)
let parameters = MLImageClassifier.ModelParameters(
featureExtractor: .scenePrint(revision: 2),
maxIterations: 25,
augmentation: [.flip, .crop, .rotation]
)
let classifier = try MLImageClassifier(
trainingData: trainingData,
parameters: parameters
)
// 평가
let evaluationData = MLImageClassifier.DataSource.labeledDirectories(
at: URL(fileURLWithPath: "/path/to/test_data")
)
let metrics = classifier.evaluation(on: evaluationData)
print("정확도: \(metrics.classificationError)")
// 저장
try classifier.write(to: URL(fileURLWithPath: "MyClassifier.mlmodel"))
5. NVIDIA Jetson 플랫폼
NVIDIA Jetson은 AI 엣지 컴퓨팅을 위한 임베디드 플랫폼으로, 로봇공학, 자율주행, 스마트 카메라 등에 광범위하게 사용됩니다.
Jetson 모델 비교
| 모델 | AI 성능 | RAM | 전력 | 주요 용도 |
|---|---|---|---|---|
| Jetson Nano | 472 GFLOPS | 4GB | 10W | 교육, 프로토타입 |
| Jetson Xavier NX | 21 TOPS | 8/16GB | 15W | 산업 IoT |
| Jetson AGX Orin | 275 TOPS | 64GB | 60W | 자율주행, 로봇 |
| Jetson Orin NX | 100 TOPS | 16GB | 25W | 엣지 AI |
TensorRT 변환
import tensorrt as trt
import numpy as np
import pycuda.driver as cuda
import pycuda.autoinit
# TensorRT 로거 생성
TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
def build_engine_from_onnx(onnx_path, engine_path, fp16=True, int8=False):
with trt.Builder(TRT_LOGGER) as builder, \
builder.create_network(1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH)) as network, \
trt.OnnxParser(network, TRT_LOGGER) as parser:
config = builder.create_builder_config()
config.max_workspace_size = 1 << 30 # 1GB
if fp16:
config.set_flag(trt.BuilderFlag.FP16)
if int8:
config.set_flag(trt.BuilderFlag.INT8)
# ONNX 파싱
with open(onnx_path, 'rb') as f:
if not parser.parse(f.read()):
for error in range(parser.num_errors):
print(parser.get_error(error))
return None
# 엔진 빌드
print("TensorRT 엔진 빌드 중... (수 분 소요)")
serialized_engine = builder.build_serialized_network(network, config)
# 저장
with open(engine_path, 'wb') as f:
f.write(serialized_engine)
print(f"엔진 저장 완료: {engine_path}")
build_engine_from_onnx('resnet50.onnx', 'resnet50_fp16.trt', fp16=True)
TensorRT 추론
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit
import numpy as np
class TRTInference:
def __init__(self, engine_path):
TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
with open(engine_path, 'rb') as f:
runtime = trt.Runtime(TRT_LOGGER)
self.engine = runtime.deserialize_cuda_engine(f.read())
self.context = self.engine.create_execution_context()
self.inputs, self.outputs, self.bindings, self.stream = self._allocate_buffers()
def _allocate_buffers(self):
inputs, outputs, bindings = [], [], []
stream = cuda.Stream()
for binding in self.engine:
size = trt.volume(self.engine.get_binding_shape(binding))
dtype = trt.nptype(self.engine.get_binding_dtype(binding))
host_mem = cuda.pagelocked_empty(size, dtype)
device_mem = cuda.mem_alloc(host_mem.nbytes)
bindings.append(int(device_mem))
if self.engine.binding_is_input(binding):
inputs.append({'host': host_mem, 'device': device_mem})
else:
outputs.append({'host': host_mem, 'device': device_mem})
return inputs, outputs, bindings, stream
def infer(self, input_data):
np.copyto(self.inputs[0]['host'], input_data.ravel())
cuda.memcpy_htod_async(self.inputs[0]['device'], self.inputs[0]['host'], self.stream)
self.context.execute_async_v2(bindings=self.bindings, stream_handle=self.stream.handle)
cuda.memcpy_dtoh_async(self.outputs[0]['host'], self.outputs[0]['device'], self.stream)
self.stream.synchronize()
return self.outputs[0]['host']
# 사용
trt_model = TRTInference('resnet50_fp16.trt')
input_array = np.random.rand(1, 3, 224, 224).astype(np.float32)
result = trt_model.infer(input_array)
print(f"예측 클래스: {np.argmax(result)}")
DeepStream SDK
# DeepStream Python 바인딩을 사용한 비디오 파이프라인
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst, GLib
Gst.init(None)
def create_pipeline():
pipeline = Gst.Pipeline()
# 소스: USB 카메라
source = Gst.ElementFactory.make("v4l2src", "usb-cam-source")
source.set_property("device", "/dev/video0")
# 캡스필터
caps = Gst.ElementFactory.make("capsfilter", "capsfilter")
caps.set_property("caps", Gst.Caps.from_string("video/x-raw,width=1280,height=720,framerate=30/1"))
# NVARGUS (카메라 ISP)
nvconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
# nvinfer (TensorRT 추론)
nvinfer = Gst.ElementFactory.make("nvinfer", "primary-inference")
nvinfer.set_property("config-file-path", "config_infer_primary.txt")
# 트래커
tracker = Gst.ElementFactory.make("nvtracker", "tracker")
tracker.set_property("ll-lib-file", "/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so")
# OSD (On-Screen Display)
osd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
# 출력
sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
for element in [source, caps, nvconv, nvinfer, tracker, osd, sink]:
pipeline.add(element)
source.link(caps)
caps.link(nvconv)
nvconv.link(nvinfer)
nvinfer.link(tracker)
tracker.link(osd)
osd.link(sink)
return pipeline
pipeline = create_pipeline()
pipeline.set_state(Gst.State.PLAYING)
6. Raspberry Pi AI
라즈베리파이는 교육용에서 시작해 이제 실제 엣지 AI 배포 플랫폼으로 널리 사용됩니다.
Raspberry Pi 5 + Hailo-8
Hailo-8은 라즈베리파이 HAT 형태의 AI 가속기로 26 TOPS 성능을 제공합니다.
# Hailo SDK 설치
pip install hailort
# 모델 변환 (ONNX -> HEF)
# Hailo Model Zoo에서 사전 변환된 모델 제공
wget https://hailo-model-zoo.s3.eu-west-2.amazonaws.com/ModelZoo/Compiled/v2.11.0/hailo8/resnet_v1_50.hef
import hailo_platform as hp
import numpy as np
# HEF 로드 및 추론
with hp.VDevice() as vdevice:
hef = hp.Hef("resnet_v1_50.hef")
network_groups = vdevice.configure(hef)
network_group = network_groups[0]
input_vstreams_params = hp.InputVStreamParams.make_from_network_group(
network_group, quantized=False, format_type=hp.FormatType.FLOAT32
)
output_vstreams_params = hp.OutputVStreamParams.make_from_network_group(
network_group, quantized=False, format_type=hp.FormatType.FLOAT32
)
with hp.InferVStreams(network_group, input_vstreams_params, output_vstreams_params) as infer_pipeline:
input_data = {"input_layer1": np.random.rand(1, 224, 224, 3).astype(np.float32)}
with network_group.activate():
infer_results = infer_pipeline.infer(input_data)
print(f"결과: {np.argmax(infer_results['resnet_v1_50/softmax1'])}")
OpenCV + 라즈베리파이 카메라
import cv2
import numpy as np
import tflite_runtime.interpreter as tflite
# TFLite Interpreter (라즈베리파이에서 경량 버전 사용)
interpreter = tflite.Interpreter(
model_path='ssd_mobilenet_v2.tflite',
num_threads=4
)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
# 카메라 초기화 (라즈베리파이 카메라 v2)
cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
while True:
ret, frame = cap.read()
if not ret:
break
# 전처리
input_size = (input_details[0]['shape'][2], input_details[0]['shape'][1])
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
resized = cv2.resize(rgb_frame, input_size)
input_data = np.expand_dims(resized, axis=0).astype(np.uint8)
# 추론
interpreter.set_tensor(input_details[0]['index'], input_data)
interpreter.invoke()
# 결과 추출 (SSD MobileNet)
boxes = interpreter.get_tensor(output_details[0]['index'])[0]
classes = interpreter.get_tensor(output_details[1]['index'])[0]
scores = interpreter.get_tensor(output_details[2]['index'])[0]
# 결과 시각화
h, w = frame.shape[:2]
for i in range(len(scores)):
if scores[i] > 0.5:
ymin, xmin, ymax, xmax = boxes[i]
cv2.rectangle(frame,
(int(xmin * w), int(ymin * h)),
(int(xmax * w), int(ymax * h)),
(0, 255, 0), 2)
label = f"클래스 {int(classes[i])}: {scores[i]:.2f}"
cv2.putText(frame, label, (int(xmin * w), int(ymin * h) - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
cv2.imshow('라즈베리파이 AI', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
7. MediaPipe
Google MediaPipe는 얼굴 감지, 손 추적, 자세 추정, 객체 탐지 등 다양한 비전 ML 솔루션을 제공하는 프레임워크입니다.
Python으로 손 추적
import mediapipe as mp
import cv2
import numpy as np
mp_hands = mp.solutions.hands
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
def run_hand_tracking():
cap = cv2.VideoCapture(0)
with mp_hands.Hands(
static_image_mode=False,
max_num_hands=2,
min_detection_confidence=0.7,
min_tracking_confidence=0.5
) as hands:
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# BGR -> RGB 변환
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
rgb_frame.flags.writeable = False
results = hands.process(rgb_frame)
rgb_frame.flags.writeable = True
frame = cv2.cvtColor(rgb_frame, cv2.COLOR_RGB2BGR)
if results.multi_hand_landmarks:
for hand_landmarks in results.multi_hand_landmarks:
mp_drawing.draw_landmarks(
frame,
hand_landmarks,
mp_hands.HAND_CONNECTIONS,
mp_drawing_styles.get_default_hand_landmarks_style(),
mp_drawing_styles.get_default_hand_connections_style()
)
# 랜드마크 좌표 추출 (21개 키포인트)
for idx, landmark in enumerate(hand_landmarks.landmark):
h, w, _ = frame.shape
cx, cy = int(landmark.x * w), int(landmark.y * h)
if idx == 8: # 검지 끝
cv2.circle(frame, (cx, cy), 10, (255, 0, 0), -1)
cv2.imshow('손 추적', frame)
if cv2.waitKey(5) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
run_hand_tracking()
자세 추정 (Pose Estimation)
import mediapipe as mp
import cv2
mp_pose = mp.solutions.pose
def calculate_angle(a, b, c):
"""세 점으로 각도 계산"""
import numpy as np
a = np.array(a)
b = np.array(b)
c = np.array(c)
radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - \
np.arctan2(a[1] - b[1], a[0] - b[0])
angle = np.abs(radians * 180.0 / np.pi)
if angle > 180.0:
angle = 360 - angle
return angle
cap = cv2.VideoCapture(0)
with mp_pose.Pose(
min_detection_confidence=0.5,
min_tracking_confidence=0.5,
model_complexity=1 # 0: Lite, 1: Full, 2: Heavy
) as pose:
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = pose.process(rgb_frame)
frame = cv2.cvtColor(rgb_frame, cv2.COLOR_RGB2BGR)
if results.pose_landmarks:
landmarks = results.pose_landmarks.landmark
h, w, _ = frame.shape
# 팔꿈치 각도 계산
shoulder = [landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x * w,
landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y * h]
elbow = [landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].x * w,
landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].y * h]
wrist = [landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].x * w,
landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].y * h]
angle = calculate_angle(shoulder, elbow, wrist)
cv2.putText(frame, f"팔꿈치 각도: {angle:.1f}도",
(int(elbow[0]), int(elbow[1])),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
mp.solutions.drawing_utils.draw_landmarks(
frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS
)
cv2.imshow('자세 추정', frame)
if cv2.waitKey(5) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
MediaPipe Tasks API (최신 버전)
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
# 객체 탐지 태스크
base_options = python.BaseOptions(model_asset_path='efficientdet_lite0.tflite')
options = vision.ObjectDetectorOptions(
base_options=base_options,
running_mode=vision.RunningMode.IMAGE,
max_results=5,
score_threshold=0.5
)
with vision.ObjectDetector.create_from_options(options) as detector:
image = mp.Image.create_from_file('test_image.jpg')
detection_result = detector.detect(image)
for detection in detection_result.detections:
category = detection.categories[0]
print(f"객체: {category.category_name}, 신뢰도: {category.score:.2f}")
bbox = detection.bounding_box
print(f" 위치: ({bbox.origin_x}, {bbox.origin_y}), 크기: {bbox.width}x{bbox.height}")
8. llama.cpp와 GGUF
llama.cpp는 Meta의 LLaMA 모델을 C++로 구현한 프레임워크로, GPU 없이 CPU만으로도 대형 언어 모델을 실행할 수 있게 해줍니다.
설치 및 기본 사용법
# 소스 빌드
git clone https://github.com/ggerganov/llama.cpp
cd llama.cpp
# CPU only
make -j4
# Apple Silicon (Metal GPU 가속)
make LLAMA_METAL=1 -j4
# NVIDIA CUDA
make LLAMA_CUDA=1 -j4
# GGUF 모델 다운로드 (예: Llama 3.2 3B)
huggingface-cli download \
bartowski/Llama-3.2-3B-Instruct-GGUF \
Llama-3.2-3B-Instruct-Q4_K_M.gguf \
--local-dir ./models
# 인터랙티브 채팅
./llama-cli \
-m models/Llama-3.2-3B-Instruct-Q4_K_M.gguf \
-n 512 \
-p "당신은 친절한 AI 어시스턴트입니다." \
--repeat-penalty 1.1 \
-t 8 \
--color
양자화 수준 (GGUF)
GGUF 파일의 양자화 수준은 모델 크기와 품질 간의 트레이드오프를 결정합니다.
| 양자화 | 비트/가중치 | 크기(7B) | 품질 | 권장 상황 |
|---|---|---|---|---|
| Q2_K | ~2.6비트 | ~2.7GB | 낮음 | 매우 제한적 메모리 |
| Q4_0 | 4.5비트 | ~3.8GB | 보통 | 기본 사용 |
| Q4_K_M | 4.8비트 | ~4.1GB | 양호 | 추천: 균형 |
| Q5_K_M | 5.7비트 | ~4.8GB | 좋음 | 품질 중시 |
| Q6_K | 6.6비트 | ~5.5GB | 매우 좋음 | 고품질 필요 |
| Q8_0 | 8.5비트 | ~7.2GB | 최고 | 메모리 여유 있을 때 |
llama-cpp-python
from llama_cpp import Llama
# 모델 로드
llm = Llama(
model_path="./models/Llama-3.2-3B-Instruct-Q4_K_M.gguf",
n_ctx=4096, # 컨텍스트 윈도우
n_threads=8, # CPU 스레드 수
n_gpu_layers=35, # GPU에 올릴 레이어 수 (-1이면 전체)
verbose=False
)
# 기본 텍스트 생성
output = llm(
"한국의 수도는 어디인가요?",
max_tokens=128,
temperature=0.7,
top_p=0.95,
top_k=40,
repeat_penalty=1.1
)
print(output['choices'][0]['text'])
# 채팅 형식
messages = [
{"role": "system", "content": "당신은 친절한 AI 어시스턴트입니다. 한국어로 답변하세요."},
{"role": "user", "content": "파이썬으로 피보나치 수열을 구하는 코드를 작성해주세요."}
]
response = llm.create_chat_completion(
messages=messages,
max_tokens=512,
temperature=0.7
)
print(response['choices'][0]['message']['content'])
# 스트리밍 출력
stream = llm.create_chat_completion(
messages=messages,
max_tokens=512,
stream=True
)
for chunk in stream:
delta = chunk['choices'][0]['delta']
if 'content' in delta:
print(delta['content'], end='', flush=True)
print()
OpenAI 호환 서버
# llama.cpp 서버 실행
./llama-server \
-m models/Llama-3.2-3B-Instruct-Q4_K_M.gguf \
--port 8080 \
--host 0.0.0.0 \
-n 2048 \
-t 8 \
--n-gpu-layers 35
# OpenAI SDK로 로컬 서버 사용
from openai import OpenAI
client = OpenAI(
base_url="http://localhost:8080/v1",
api_key="none"
)
response = client.chat.completions.create(
model="local-model",
messages=[
{"role": "user", "content": "머신러닝과 딥러닝의 차이를 설명해주세요."}
],
max_tokens=512,
temperature=0.7
)
print(response.choices[0].message.content)
PyTorch 모델 → GGUF 변환
# HuggingFace 모델 → GGUF
cd llama.cpp
# 파이썬 의존성 설치
pip install -r requirements.txt
# 변환
python convert_hf_to_gguf.py \
/path/to/hf_model \
--outfile models/my_model.gguf \
--outtype f16
# 양자화
./quantize models/my_model.gguf models/my_model_q4km.gguf Q4_K_M
9. Whisper.cpp
Whisper.cpp는 OpenAI Whisper 음성 인식 모델을 C++로 구현한 것으로, 라즈베리파이부터 스마트폰까지 다양한 환경에서 오프라인 음성 인식을 가능하게 합니다.
설치 및 기본 사용법
# 빌드
git clone https://github.com/ggerganov/whisper.cpp
cd whisper.cpp
make -j4
# Apple Silicon
make WHISPER_METAL=1 -j4
# 모델 다운로드
bash ./models/download-ggml-model.sh base.en # 영어 전용, 142MB
bash ./models/download-ggml-model.sh medium # 다국어, 1.5GB
bash ./models/download-ggml-model.sh large-v3 # 최고 품질, 3.1GB
# 음성 파일 전사
./main -m models/ggml-medium.bin \
-f audio.wav \
-l ko \
--output-txt \
-of output
# 실시간 마이크 입력
./stream -m models/ggml-medium.bin \
-t 8 \
--step 500 \
--length 5000 \
-l ko
whisper-cpp-python
import whisper_cpp
import numpy as np
import soundfile as sf
# 모델 로드
model = whisper_cpp.Whisper.from_pretrained("medium")
# WAV 파일 전사
audio, sr = sf.read("audio.wav", dtype="float32")
if audio.ndim > 1:
audio = audio.mean(axis=1) # 스테레오 -> 모노
# 샘플레이트 변환 (16kHz 필요)
if sr != 16000:
import librosa
audio = librosa.resample(audio, orig_sr=sr, target_sr=16000)
result = model.transcribe(audio, language="ko")
print(f"전사 결과:\n{result['text']}")
# 타임스탬프 포함
for segment in result['segments']:
start = segment['start']
end = segment['end']
text = segment['text']
print(f"[{start:.2f}s -> {end:.2f}s] {text}")
양자화 Whisper 모델
# Whisper GGML 모델 양자화
./quantize models/ggml-medium.bin models/ggml-medium-q5_0.bin q5_0
# 크기 비교
ls -lh models/ggml-medium*.bin
# ggml-medium.bin: 1.5GB
# ggml-medium-q5_0.bin: 약 900MB
iOS/Android에서 Whisper.cpp
iOS는 Metal GPU 가속을 지원합니다.
// iOS에서 whisper.cpp 사용 (WhisperKit 라이브러리)
import WhisperKit
class SpeechRecognizer {
var whisperKit: WhisperKit?
func initialize() async {
do {
whisperKit = try await WhisperKit(
model: "openai_whisper-medium",
computeOptions: ModelComputeOptions(melCompute: .cpuAndGPU)
)
print("Whisper 모델 로드 완료")
} catch {
print("초기화 실패: \(error)")
}
}
func transcribe(audioURL: URL) async -> String? {
guard let whisperKit = whisperKit else { return nil }
do {
let result = try await whisperKit.transcribe(
audioPath: audioURL.path,
decodeOptions: DecodingOptions(language: "ko")
)
return result.map(\.text).joined(separator: " ")
} catch {
print("전사 실패: \(error)")
return nil
}
}
}
10. 웹 브라우저 AI
WebAssembly와 WebGPU의 발전으로 브라우저에서도 강력한 AI 추론이 가능해졌습니다.
TensorFlow.js
<!DOCTYPE html>
<html>
<head>
<title>브라우저 이미지 분류</title>
<script src="https://cdn.jsdelivr.net/npm/@tensorflow/tfjs@latest"></script>
<script src="https://cdn.jsdelivr.net/npm/@tensorflow-models/mobilenet@2.1.1"></script>
</head>
<body>
<input type="file" id="imageInput" accept="image/*" />
<img id="preview" style="max-width: 400px;" />
<div id="result"></div>
<script>
let model
async function loadModel() {
model = await mobilenet.load({ version: 2, alpha: 1.0 })
console.log('모델 로드 완료!')
document.getElementById('result').textContent = '모델 준비됨. 이미지를 선택하세요.'
}
document.getElementById('imageInput').addEventListener('change', async (e) => {
const file = e.target.files[0]
if (!file) return
const img = document.getElementById('preview')
img.src = URL.createObjectURL(file)
img.onload = async () => {
const predictions = await model.classify(img, 5)
const resultDiv = document.getElementById('result')
resultDiv.innerHTML = '<h3>예측 결과:</h3>'
predictions.forEach((pred) => {
resultDiv.innerHTML += ``
})
}
})
loadModel()
</script>
</body>
</html>
Transformers.js (HuggingFace)
import { pipeline, env } from '@xenova/transformers'
// WASM 경로 설정
env.backends.onnx.wasm.wasmPaths = 'https://cdn.jsdelivr.net/npm/onnxruntime-web/dist/'
// 텍스트 분류 파이프라인
async function runTextClassification() {
const classifier = await pipeline(
'sentiment-analysis',
'Xenova/distilbert-base-uncased-finetuned-sst-2-english'
)
const results = await classifier(['I love machine learning!', 'This is terrible.'])
results.forEach((result, i) => {
console.log(`텍스트 ${i + 1}: ${result.label} (${(result.score * 100).toFixed(2)}%)`)
})
}
// 이미지 분류
async function runImageClassification() {
const classifier = await pipeline('image-classification', 'Xenova/vit-base-patch16-224')
const result = await classifier('https://example.com/image.jpg')
console.log('이미지 분류 결과:', result)
}
// 텍스트 생성 (소규모 LLM)
async function runTextGeneration() {
const generator = await pipeline('text-generation', 'Xenova/gpt2')
const output = await generator('인공지능의 미래는', {
max_new_tokens: 100,
temperature: 0.7,
})
console.log('생성된 텍스트:', output[0].generated_text)
}
runTextClassification()
WebGPU 가속 추론
import * as ort from 'onnxruntime-web'
async function runWithWebGPU() {
// WebGPU 지원 확인
if (!navigator.gpu) {
console.log('WebGPU를 지원하지 않는 브라우저입니다.')
return
}
const adapter = await navigator.gpu.requestAdapter()
const device = await adapter.requestDevice()
console.log('WebGPU 어댑터:', adapter.info)
// ONNX Runtime에서 WebGPU 사용
ort.env.wasm.wasmPaths = '/'
const session = await ort.InferenceSession.create('/models/resnet50.onnx', {
executionProviders: ['webgpu'],
graphOptimizationLevel: 'all',
})
// 배치 추론
const batchSize = 4
const inputData = new Float32Array(batchSize * 3 * 224 * 224)
const inputTensor = new ort.Tensor('float32', inputData, [batchSize, 3, 224, 224])
const startTime = performance.now()
const output = await session.run({ input: inputTensor })
const elapsed = performance.now() - startTime
console.log(`WebGPU 추론 시간: ${elapsed.toFixed(2)}ms`)
console.log(`배치 처리량: ${((batchSize / elapsed) * 1000).toFixed(1)} images/sec`)
}
runWithWebGPU()
11. AI 모델 최적화 파이프라인
학습 → 최적화 → 배포 전체 흐름
학습 (PyTorch/TF)
↓
프루닝 (불필요한 가중치 제거)
↓
지식 증류 (Teacher-Student)
↓
양자화 인식 학습 (QAT)
↓
형식 변환 (ONNX/TFLite/GGUF)
↓
런타임 최적화 (TensorRT/OpenVINO)
↓
배포 (모바일/엣지/웹)
프루닝 (Pruning)
import torch
import torch.nn.utils.prune as prune
import torchvision.models as models
model = models.resnet50(pretrained=True)
# 구조적 프루닝: Conv2d 레이어 50% 필터 제거
for name, module in model.named_modules():
if isinstance(module, torch.nn.Conv2d):
prune.l1_unstructured(module, name='weight', amount=0.3)
prune.remove(module, 'weight') # 마스크 영구 적용
# 모델 크기 확인
original_params = sum(p.numel() for p in models.resnet50().parameters())
pruned_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"원본 파라미터: {original_params:,}")
print(f"프루닝 후: {pruned_params:,}")
print(f"감소율: {(1 - pruned_params/original_params)*100:.1f}%")
지식 증류 (Knowledge Distillation)
import torch
import torch.nn as nn
import torch.nn.functional as F
class DistillationLoss(nn.Module):
def __init__(self, temperature=4.0, alpha=0.7):
super().__init__()
self.T = temperature
self.alpha = alpha
def forward(self, student_logits, teacher_logits, labels):
# 소프트 타겟 손실 (증류 손실)
soft_loss = F.kl_div(
F.log_softmax(student_logits / self.T, dim=1),
F.softmax(teacher_logits / self.T, dim=1),
reduction='batchmean'
) * (self.T ** 2)
# 하드 타겟 손실 (교차 엔트로피)
hard_loss = F.cross_entropy(student_logits, labels)
return self.alpha * soft_loss + (1 - self.alpha) * hard_loss
def train_student(teacher, student, dataloader, epochs=10):
teacher.eval()
student.train()
optimizer = torch.optim.AdamW(student.parameters(), lr=1e-4)
criterion = DistillationLoss(temperature=4.0, alpha=0.7)
for epoch in range(epochs):
total_loss = 0
for images, labels in dataloader:
with torch.no_grad():
teacher_logits = teacher(images)
student_logits = student(images)
loss = criterion(student_logits, teacher_logits, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(dataloader):.4f}")
# 교사: ResNet50, 학생: MobileNetV2
teacher = models.resnet50(pretrained=True)
student = models.mobilenet_v2(pretrained=False)
양자화 인식 학습 (QAT)
import torch
from torch.quantization import get_default_qat_qconfig, prepare_qat, convert
model = models.mobilenet_v2(pretrained=True)
model.train()
# QAT 설정
model.qconfig = get_default_qat_qconfig('qnnpack') # ARM/모바일
# model.qconfig = get_default_qat_qconfig('fbgemm') # x86
# QAT 준비 (가짜 양자화 삽입)
model = prepare_qat(model, inplace=False)
# 미세 조정 학습 (소규모 에폭)
optimizer = torch.optim.SGD(model.parameters(), lr=0.0001)
model.train()
for epoch in range(5):
for images, labels in dataloader:
outputs = model(images)
loss = nn.CrossEntropyLoss()(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f"QAT Epoch {epoch+1}/5 완료")
# INT8 모델로 변환
model.eval()
quantized_model = convert(model.eval(), inplace=False)
torch.save(quantized_model.state_dict(), 'mobilenetv2_int8.pth')
print("QAT 완료! 모델 정확도를 거의 유지하면서 4배 작아짐")
종합 벤치마크 도구
import time
import numpy as np
import psutil
import os
class EdgeAIBenchmark:
def __init__(self, model_path, framework='tflite'):
self.model_path = model_path
self.framework = framework
self.results = {}
def measure_latency(self, input_data, num_runs=100, warmup=10):
"""평균 추론 지연 시간 측정"""
import tensorflow as tf
interpreter = tf.lite.Interpreter(model_path=self.model_path)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
# 워밍업
for _ in range(warmup):
interpreter.set_tensor(input_details[0]['index'], input_data)
interpreter.invoke()
# 측정
latencies = []
for _ in range(num_runs):
start = time.perf_counter()
interpreter.set_tensor(input_details[0]['index'], input_data)
interpreter.invoke()
_ = interpreter.get_tensor(output_details[0]['index'])
latencies.append((time.perf_counter() - start) * 1000)
self.results['latency_mean_ms'] = np.mean(latencies)
self.results['latency_p99_ms'] = np.percentile(latencies, 99)
self.results['throughput_fps'] = 1000 / np.mean(latencies)
return self.results
def measure_memory(self):
"""메모리 사용량 측정"""
process = psutil.Process(os.getpid())
before = process.memory_info().rss / 1024 / 1024
import tensorflow as tf
interpreter = tf.lite.Interpreter(model_path=self.model_path)
interpreter.allocate_tensors()
after = process.memory_info().rss / 1024 / 1024
self.results['memory_mb'] = after - before
return self.results
def measure_model_size(self):
"""모델 파일 크기"""
size_bytes = os.path.getsize(self.model_path)
self.results['model_size_mb'] = size_bytes / 1024 / 1024
return self.results
def run_full_benchmark(self, input_data):
self.measure_model_size()
self.measure_memory()
self.measure_latency(input_data)
print(f"\n=== {self.model_path} 벤치마크 ===")
print(f"모델 크기: {self.results.get('model_size_mb', 0):.2f} MB")
print(f"메모리 사용: {self.results.get('memory_mb', 0):.2f} MB")
print(f"평균 지연: {self.results.get('latency_mean_ms', 0):.2f} ms")
print(f"P99 지연: {self.results.get('latency_p99_ms', 0):.2f} ms")
print(f"처리량: {self.results.get('throughput_fps', 0):.1f} FPS")
return self.results
# 사용
import numpy as np
input_data = np.random.rand(1, 224, 224, 3).astype(np.float32)
bench = EdgeAIBenchmark('mobilenetv2.tflite')
results = bench.run_full_benchmark(input_data)
bench_q = EdgeAIBenchmark('mobilenetv2_int8.tflite')
results_q = bench_q.run_full_benchmark(input_data)
print("\n=== 양자화 효과 ===")
size_reduction = (1 - results_q['model_size_mb'] / results['model_size_mb']) * 100
speed_improvement = results['latency_mean_ms'] / results_q['latency_mean_ms']
print(f"크기 감소: {size_reduction:.1f}%")
print(f"속도 향상: {speed_improvement:.1f}배")
마무리
엣지 AI는 더 이상 연구 영역이 아니라 실제 제품에 적용되는 기술입니다. 이 가이드에서 다룬 주요 사항을 정리하면:
- TFLite: 모바일 앱에서 가장 광범위하게 사용. Android/iOS 모두 지원
- ONNX Runtime: 프레임워크 독립적. 크로스 플랫폼 배포에 최적
- Core ML: Apple 기기에서 Neural Engine 최대 활용
- TensorRT: NVIDIA GPU 가속 최대화
- llama.cpp: CPU에서 LLM 실행. Apple Silicon에서 특히 강력
- Whisper.cpp: 오프라인 음성 인식의 표준
- MediaPipe: 비전 ML 솔루션의 빠른 프로토타이핑
- Transformers.js: 브라우저에서 HuggingFace 모델 직접 실행
모델 선택과 최적화 전략은 타겟 하드웨어, 요구 정확도, 지연 시간 목표에 따라 결정됩니다. INT8 양자화는 대부분의 시나리오에서 정확도 손실을 최소화하면서 크기와 속도를 크게 개선하므로, 엣지 AI 프로젝트의 첫 번째 최적화 단계로 강력히 추천합니다.