Skip to content
Published on

Edge AIとオンデバイスML完全ガイド:TFLite、ONNX、Core ML、llama.cpp

Authors

目次

  1. Edge AI概要
  2. TensorFlow Lite(TFLite)
  3. ONNXとONNX Runtime
  4. Core ML(Apple)
  5. NVIDIA Jetsonプラットフォーム
  6. Raspberry Pi AI
  7. MediaPipe
  8. llama.cppとGGUF
  9. Whisper.cpp
  10. Webブラウザ AI
  11. AIモデル最適化パイプライン

1. Edge AI概要

クラウドAI vs エッジAI

AIの推論をどこで実行するかは、アプリケーションの性質と要件に大きく依存します。従来のクラウドAIはデータをリモートサーバーに送信して推論を実行し、結果を返します。一方、エッジAIはデータが生成されるデバイス(スマートフォン、IoTセンサー、カメラなど)上で直接推論を実行します。

次元クラウドAIエッジAI
計算場所リモートサーバーローカルデバイス
レイテンシ数百ms〜数秒数ms〜数十ms
プライバシーデータがデバイスを離れるデータがデバイスに留まる
インターネット依存必要不要
コストAPI呼び出しごとの料金一度限りのモデルコスト
モデルサイズの制限なしメモリ/ストレージによる制限

エッジAIの利点

1. プライバシー保護

医療画像、生体情報、個人音声などの機密データはデバイスを離れることがありません。GDPR、HIPAAなどのデータ規制への準拠が自然に達成されます。

2. 超低レイテンシ

自動運転、産業自動化、リアルタイム翻訳、AR/VRなどのアプリケーションにはミリ秒単位の応答が必要です。ネットワークのラウンドトリップレイテンシがないため、一定した応答時間が保証されます。

3. コスト削減

クラウドAPIコールのコストがかかりません。大規模では、数百万台のデバイスでローカル推論を実行することで、中央サーバーのコストをほぼゼロに削減できます。

4. オフライン動作

インターネット接続が不安定または存在しない環境(農村地帯、地下、航空機など)でもAI機能が動作します。

5. リアルタイムデータ処理

IoTセンサーデータをアップロード前にローカルでフィルタリング、異常検出、分類することで、伝送量とストレージコストを大幅に削減できます。

エッジハードウェア

モバイルGPUとNPU

現代のスマートフォンには専用のAIハードウェアが搭載されています:

  • Apple Neural Engine(ANE):iPhone 8以降とM系Macに搭載。A17 Proは35 TOPSを実現
  • Qualcomm Hexagon DSP:Androidのフラッグシップ。Snapdragon 8 Gen 3にはHexagon NPUが搭載
  • Google Tensor:Pixel専用チップ、オンデバイス音声認識と翻訳に最適化
  • MediaTek APU:ミドルレンジのAndroidデバイスに広く採用

エッジコンピューティングボード

  • NVIDIA Jetson:自動運転、ロボット工学、スマートカメラ向け。Jetson Orinは275 TOPSを実現
  • Raspberry Pi 5:4GB/8GBメモリ、一般的なコンピュータビジョンタスクに適している
  • Google Coral:TFLite専用加速のためのEdge TPU
  • Intel Neural Compute Stick:USB推論アクセラレータ

エッジAIの応用分野

  • スマートフォン:顔認証、写真分類、リアルタイム翻訳、音声アシスタント
  • スマートホーム:音声コマンド処理、動体検知、エネルギー最適化
  • 産業用IoT:欠陥検出、予知保全、異常検知
  • 医療機器:心電図分析、血糖値予測、皮膚状態診断
  • 自動運転:リアルタイム物体検出、車線認識、障害物回避
  • 農業:ドローンによる作物監視、病害虫の検出

2. TensorFlow Lite(TFLite)

TensorFlow LiteはGoogleのモバイルとエッジデバイス向け軽量MLフレームワークです。TensorFlowモデルをTFLite形式(.tflite)に変換し、Android、iOS、組み込みLinux、マイクロコントローラにデプロイします。

モデルの変換(SavedModel → TFLite)

import tensorflow as tf

# 方法1:SavedModelから変換
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model_dir')
tflite_model = converter.convert()

with open('model.tflite', 'wb') as f:
    f.write(tflite_model)

# 方法2:Kerasモデルから直接変換
model = tf.keras.applications.MobileNetV2(weights='imagenet')
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()

with open('mobilenetv2.tflite', 'wb') as f:
    f.write(tflite_model)

# 方法3:具象関数から変換
@tf.function(input_signature=[tf.TensorSpec(shape=[1, 224, 224, 3], dtype=tf.float32)])
def predict(x):
    return model(x)

converter = tf.lite.TFLiteConverter.from_concrete_functions(
    [predict.get_concrete_function()]
)
tflite_model = converter.convert()

量子化

量子化はモデルの重みと活性化を浮動小数点数から低精度整数に変換し、モデルサイズを削減して推論速度を向上させます。

Float16量子化

converter = tf.lite.TFLiteConverter.from_saved_model('saved_model_dir')
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_types = [tf.float16]
tflite_model = converter.convert()
# ~50%のサイズ削減、精度の低下は最小限

完全整数(INT8)量子化

import numpy as np

def representative_dataset():
    # 実際のデータから100〜1000の代表サンプルを使用
    for _ in range(100):
        data = np.random.rand(1, 224, 224, 3).astype(np.float32)
        yield [data]

converter = tf.lite.TFLiteConverter.from_saved_model('saved_model_dir')
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_dataset
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8
tflite_model = converter.convert()
# ~75%のサイズ削減、推論2〜4倍高速化

動的範囲量子化

converter = tf.lite.TFLiteConverter.from_saved_model('saved_model_dir')
converter.optimizations = [tf.lite.Optimize.DEFAULT]
# 代表データセット不要 - 重みのみを量子化
tflite_model = converter.convert()

TFLiteインタープリタ

import tensorflow as tf
import numpy as np
from PIL import Image

# インタープリタの初期化
interpreter = tf.lite.Interpreter(model_path='mobilenetv2.tflite')
interpreter.allocate_tensors()

# 入出力の詳細を確認
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
print(f"Input shape: {input_details[0]['shape']}")
print(f"Input dtype: {input_details[0]['dtype']}")
print(f"Output shape: {output_details[0]['shape']}")

# 画像の前処理
img = Image.open('test_image.jpg').resize((224, 224))
input_data = np.expand_dims(np.array(img, dtype=np.float32) / 255.0, axis=0)

# 推論の実行
interpreter.set_tensor(input_details[0]['index'], input_data)
interpreter.invoke()

# 結果の取得
output_data = interpreter.get_tensor(output_details[0]['index'])
predicted_class = np.argmax(output_data[0])
confidence = output_data[0][predicted_class]
print(f"Predicted class: {predicted_class}, Confidence: {confidence:.4f}")

マルチスレッドとGPUデリゲート

# マルチスレッドの設定
interpreter = tf.lite.Interpreter(
    model_path='model.tflite',
    num_threads=4
)

# GPUデリゲート(Android/iOS)
try:
    from tensorflow.lite.python.interpreter import load_delegate
    gpu_delegate = load_delegate('libdelegate.so')
    interpreter = tf.lite.Interpreter(
        model_path='model.tflite',
        experimental_delegates=[gpu_delegate]
    )
    print("GPUデリゲート有効化")
except Exception as e:
    print(f"GPUデリゲート使用不可、CPUを使用: {e}")
    interpreter = tf.lite.Interpreter(model_path='model.tflite')

interpreter.allocate_tensors()

Androidデプロイ

build.gradle:

dependencies {
    implementation 'org.tensorflow:tensorflow-lite:2.13.0'
    implementation 'org.tensorflow:tensorflow-lite-gpu:2.13.0'
    implementation 'org.tensorflow:tensorflow-lite-support:0.4.4'
}

Kotlinコード:

import org.tensorflow.lite.Interpreter
import java.nio.ByteBuffer
import java.nio.ByteOrder

class TFLiteClassifier(private val context: Context) {

    private lateinit var interpreter: Interpreter
    private val inputSize = 224
    private val numClasses = 1000

    fun initialize() {
        val model = loadModelFile("mobilenetv2.tflite")
        val options = Interpreter.Options().apply {
            numThreads = 4
            useNNAPI = true  // Android Neural Networks API
        }
        interpreter = Interpreter(model, options)
    }

    private fun loadModelFile(filename: String): ByteBuffer {
        val assetFileDescriptor = context.assets.openFd(filename)
        val fileInputStream = FileInputStream(assetFileDescriptor.fileDescriptor)
        val fileChannel = fileInputStream.channel
        val startOffset = assetFileDescriptor.startOffset
        val declaredLength = assetFileDescriptor.declaredLength
        return fileChannel.map(FileChannel.MapMode.READ_ONLY, startOffset, declaredLength)
    }

    fun classify(bitmap: Bitmap): FloatArray {
        val resized = Bitmap.createScaledBitmap(bitmap, inputSize, inputSize, true)
        val inputBuffer = ByteBuffer.allocateDirect(1 * inputSize * inputSize * 3 * 4)
        inputBuffer.order(ByteOrder.nativeOrder())

        for (y in 0 until inputSize) {
            for (x in 0 until inputSize) {
                val pixel = resized.getPixel(x, y)
                inputBuffer.putFloat(((pixel shr 16 and 0xFF) / 255.0f))
                inputBuffer.putFloat(((pixel shr 8 and 0xFF) / 255.0f))
                inputBuffer.putFloat(((pixel and 0xFF) / 255.0f))
            }
        }

        val outputBuffer = Array(1) { FloatArray(numClasses) }
        interpreter.run(inputBuffer, outputBuffer)
        return outputBuffer[0]
    }
}

iOSデプロイ

import TensorFlowLite
import UIKit

class TFLiteImageClassifier {
    private var interpreter: Interpreter
    private let inputWidth = 224
    private let inputHeight = 224

    init(modelName: String) throws {
        guard let modelPath = Bundle.main.path(forResource: modelName, ofType: "tflite") else {
            throw NSError(domain: "ModelNotFound", code: 0, userInfo: nil)
        }
        var options = Interpreter.Options()
        options.threadCount = 4
        interpreter = try Interpreter(modelPath: modelPath, options: options)
        try interpreter.allocateTensors()
    }

    func classify(image: UIImage) throws -> [Float] {
        guard let cgImage = image.cgImage else { return [] }
        let inputData = preprocessImage(cgImage: cgImage)
        try interpreter.copy(inputData, toInputAt: 0)
        try interpreter.invoke()
        let outputTensor = try interpreter.output(at: 0)
        let results: [Float] = [Float](unsafeData: outputTensor.data) ?? []
        return results
    }

    private func preprocessImage(cgImage: CGImage) -> Data {
        var data = Data(count: inputWidth * inputHeight * 3 * 4)
        return data
    }
}

3. ONNXとONNX Runtime

ONNX(Open Neural Network Exchange)はMLモデルをフレームワーク間でポータブルにするオープン形式です。PyTorch、TensorFlow、scikit-learnでトレーニングされたモデルを単一の標準形式にエクスポートし、ONNX Runtimeを使ってどこでも実行できます。

PyTorchからONNXへの変換

import torch
import torch.nn as nn
import torchvision.models as models

# モデルの読み込み
model = models.resnet50(pretrained=True)
model.eval()

# ダミー入力の作成
dummy_input = torch.randn(1, 3, 224, 224)

# ONNXにエクスポート
torch.onnx.export(
    model,
    dummy_input,
    "resnet50.onnx",
    export_params=True,
    opset_version=17,
    do_constant_folding=True,
    input_names=['input'],
    output_names=['output'],
    dynamic_axes={
        'input': {0: 'batch_size'},
        'output': {0: 'batch_size'}
    }
)
print("ONNXエクスポート完了!")

# モデルの検証
import onnx
onnx_model = onnx.load("resnet50.onnx")
onnx.checker.check_model(onnx_model)
print(f"ONNX IRバージョン: {onnx_model.ir_version}")
print(f"Opsetバージョン: {onnx_model.opset_import[0].version}")

ONNX Runtime推論

import onnxruntime as ort
import numpy as np
from PIL import Image
import torchvision.transforms as transforms

# 実行プロバイダーでセッションを作成
providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
session = ort.InferenceSession("resnet50.onnx", providers=providers)

print(f"アクティブなプロバイダー: {session.get_providers()}")

input_name = session.get_inputs()[0].name
output_name = session.get_outputs()[0].name
input_shape = session.get_inputs()[0].shape
print(f"入力: {input_name}、シェイプ: {input_shape}")

# 画像の前処理
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

img = Image.open('test.jpg')
input_tensor = transform(img).unsqueeze(0).numpy()

# 推論の実行
outputs = session.run([output_name], {input_name: input_tensor})
logits = outputs[0]
predicted_class = np.argmax(logits[0])
print(f"予測クラス: {predicted_class}")

ONNX Runtimeの最適化

from onnxruntime.transformers import optimizer
from onnxruntime.quantization import quantize_dynamic, QuantType

# トランスフォーマーモデルのグラフ最適化
optimized_model = optimizer.optimize_model(
    'bert_base.onnx',
    model_type='bert',
    num_heads=12,
    hidden_size=768
)
optimized_model.save_model_to_file('bert_optimized.onnx')

# 動的INT8量子化
quantize_dynamic(
    model_input='bert_optimized.onnx',
    model_output='bert_quantized_int8.onnx',
    weight_type=QuantType.QInt8,
    per_channel=True
)
print("量子化完了!")

# セッションオプションの調整
so = ort.SessionOptions()
so.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
so.intra_op_num_threads = 4
so.inter_op_num_threads = 2
so.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL

session = ort.InferenceSession('model.onnx', sess_options=so)

ONNX Runtime Web(ブラウザ)

// npm install onnxruntime-web
import * as ort from 'onnxruntime-web'

async function runInference() {
  // WebAssemblyバックエンドの設定
  ort.env.wasm.wasmPaths = '/static/'
  ort.env.wasm.numThreads = 4

  // セッションの作成
  const session = await ort.InferenceSession.create('/models/mobilenet.onnx', {
    executionProviders: ['webgpu', 'wasm'],
    graphOptimizationLevel: 'all',
  })

  // 入力テンソルの作成(1, 3, 224, 224)
  const inputData = new Float32Array(1 * 3 * 224 * 224).fill(0.5)
  const inputTensor = new ort.Tensor('float32', inputData, [1, 3, 224, 224])

  // 推論の実行
  const feeds = { input: inputTensor }
  const results = await session.run(feeds)

  const outputData = results.output.data
  const maxIndex = Array.from(outputData).indexOf(Math.max(...outputData))
  console.log('予測クラス:', maxIndex)
}

runInference()

4. Core ML(Apple)

Core MLはAppleプラットフォーム(iOS、macOS、watchOS、tvOS)でMLモデルを実行するためのAppleのフレームワークです。Neural Engineを活用して電力効率の高い高速推論を実現します。

coremltoolsによるモデル変換

import coremltools as ct
import torch
import torchvision.models as models

# PyTorchモデルの変換
torch_model = models.mobilenet_v2(pretrained=True)
torch_model.eval()

example_input = torch.rand(1, 3, 224, 224)
traced_model = torch.jit.trace(torch_model, example_input)

# Core MLに変換
mlmodel = ct.convert(
    traced_model,
    inputs=[ct.TensorType(name='input', shape=(1, 3, 224, 224))],
    compute_units=ct.ComputeUnit.ALL,  # CPU + GPU + Neural Engine
    minimum_deployment_target=ct.target.iOS16
)

# メタデータの追加
mlmodel.short_description = "MobileNetV2 Image Classifier"
mlmodel.author = "YJ Blog"
mlmodel.version = "1.0"

mlmodel.save("MobileNetV2.mlpackage")
print("Core ML変換完了!")

Float16とINT8量子化

import coremltools as ct
from coremltools.optimize.coreml import (
    OpLinearQuantizerConfig,
    OptimizationConfig,
    linearly_quantize_weights
)

mlmodel = ct.models.MLModel("MobileNetV2.mlpackage")

# 線形重み量子化(8ビット)
op_config = OpLinearQuantizerConfig(mode="linear_symmetric", dtype="int8")
config = OptimizationConfig(global_config=op_config)

compressed_model = linearly_quantize_weights(mlmodel, config)
compressed_model.save("MobileNetV2_int8.mlpackage")

# パレタイズ(4ビット)
from coremltools.optimize.coreml import palettize_weights, OpPalettizerConfig

palette_config = OptimizationConfig(
    global_config=OpPalettizerConfig(mode="kmeans", nbits=4)
)
palette_model = palettize_weights(mlmodel, palette_config)
palette_model.save("MobileNetV2_4bit.mlpackage")

SwiftでのCore MLの使用

import CoreML
import Vision
import UIKit

class CoreMLClassifier {
    private var model: VNCoreMLModel?

    func loadModel() {
        guard let modelURL = Bundle.main.url(forResource: "MobileNetV2", withExtension: "mlpackage") else {
            print("モデルファイルが見つかりません")
            return
        }

        let config = MLModelConfiguration()
        config.computeUnits = .all  // CPU + GPU + Neural Engine

        do {
            let coreMLModel = try MLModel(contentsOf: modelURL, configuration: config)
            model = try VNCoreMLModel(for: coreMLModel)
            print("モデルの読み込みに成功")
        } catch {
            print("モデルの読み込みに失敗: \(error)")
        }
    }

    func classify(image: UIImage, completion: @escaping ([VNClassificationObservation]?) -> Void) {
        guard let model = model,
              let cgImage = image.cgImage else {
            completion(nil)
            return
        }

        let request = VNCoreMLRequest(model: model) { request, error in
            guard let results = request.results as? [VNClassificationObservation] else {
                completion(nil)
                return
            }
            completion(Array(results.prefix(5)))
        }

        request.imageCropAndScaleOption = .centerCrop

        let handler = VNImageRequestHandler(cgImage: cgImage, options: [:])
        DispatchQueue.global(qos: .userInteractive).async {
            try? handler.perform([request])
        }
    }
}

Create MLによるカスタムモデルのトレーニング

import CreateML
import Foundation

// 画像分類器をトレーニング
let trainingData = MLImageClassifier.DataSource.labeledDirectories(
    at: URL(fileURLWithPath: "/path/to/training_data")
)

let parameters = MLImageClassifier.ModelParameters(
    featureExtractor: .scenePrint(revision: 2),
    maxIterations: 25,
    augmentation: [.flip, .crop, .rotation]
)

let classifier = try MLImageClassifier(
    trainingData: trainingData,
    parameters: parameters
)

let evaluationData = MLImageClassifier.DataSource.labeledDirectories(
    at: URL(fileURLWithPath: "/path/to/test_data")
)
let metrics = classifier.evaluation(on: evaluationData)
print("精度: \(1.0 - metrics.classificationError)")

try classifier.write(to: URL(fileURLWithPath: "MyClassifier.mlmodel"))

5. NVIDIA Jetsonプラットフォーム

NVIDIA Jetsonはロボット工学、自動運転、スマートカメラに広く使用される組み込みAIコンピューティングプラットフォームです。

Jetsonモデル比較

モデルAI性能RAM電力主な用途
Jetson Nano472 GFLOPS4GB10W教育、プロトタイピング
Jetson Xavier NX21 TOPS8/16GB15W産業用IoT
Jetson AGX Orin275 TOPS64GB60W自動運転、ロボット工学
Jetson Orin NX100 TOPS16GB25WエッジAI

TensorRT変換

import tensorrt as trt
import numpy as np
import pycuda.driver as cuda
import pycuda.autoinit

TRT_LOGGER = trt.Logger(trt.Logger.WARNING)

def build_engine_from_onnx(onnx_path, engine_path, fp16=True, int8=False):
    with trt.Builder(TRT_LOGGER) as builder, \
         builder.create_network(1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH)) as network, \
         trt.OnnxParser(network, TRT_LOGGER) as parser:

        config = builder.create_builder_config()
        config.max_workspace_size = 1 << 30  # 1GB

        if fp16:
            config.set_flag(trt.BuilderFlag.FP16)
        if int8:
            config.set_flag(trt.BuilderFlag.INT8)

        with open(onnx_path, 'rb') as f:
            if not parser.parse(f.read()):
                for error in range(parser.num_errors):
                    print(parser.get_error(error))
                return None

        print("TensorRTエンジンをビルド中(数分かかる場合があります)...")
        serialized_engine = builder.build_serialized_network(network, config)

        with open(engine_path, 'wb') as f:
            f.write(serialized_engine)
        print(f"エンジンを保存: {engine_path}")

build_engine_from_onnx('resnet50.onnx', 'resnet50_fp16.trt', fp16=True)

TensorRT推論

import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit
import numpy as np

class TRTInference:
    def __init__(self, engine_path):
        TRT_LOGGER = trt.Logger(trt.Logger.WARNING)

        with open(engine_path, 'rb') as f:
            runtime = trt.Runtime(TRT_LOGGER)
            self.engine = runtime.deserialize_cuda_engine(f.read())

        self.context = self.engine.create_execution_context()
        self.inputs, self.outputs, self.bindings, self.stream = self._allocate_buffers()

    def _allocate_buffers(self):
        inputs, outputs, bindings = [], [], []
        stream = cuda.Stream()

        for binding in self.engine:
            size = trt.volume(self.engine.get_binding_shape(binding))
            dtype = trt.nptype(self.engine.get_binding_dtype(binding))
            host_mem = cuda.pagelocked_empty(size, dtype)
            device_mem = cuda.mem_alloc(host_mem.nbytes)
            bindings.append(int(device_mem))

            if self.engine.binding_is_input(binding):
                inputs.append({'host': host_mem, 'device': device_mem})
            else:
                outputs.append({'host': host_mem, 'device': device_mem})

        return inputs, outputs, bindings, stream

    def infer(self, input_data):
        np.copyto(self.inputs[0]['host'], input_data.ravel())
        cuda.memcpy_htod_async(self.inputs[0]['device'], self.inputs[0]['host'], self.stream)
        self.context.execute_async_v2(bindings=self.bindings, stream_handle=self.stream.handle)
        cuda.memcpy_dtoh_async(self.outputs[0]['host'], self.outputs[0]['device'], self.stream)
        self.stream.synchronize()
        return self.outputs[0]['host']

trt_model = TRTInference('resnet50_fp16.trt')
input_array = np.random.rand(1, 3, 224, 224).astype(np.float32)
result = trt_model.infer(input_array)
print(f"予測クラス: {np.argmax(result)}")

ビデオパイプライン向けDeepStream SDK

import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst, GLib

Gst.init(None)

def create_pipeline():
    pipeline = Gst.Pipeline()

    # ソース:USBカメラ
    source = Gst.ElementFactory.make("v4l2src", "usb-cam-source")
    source.set_property("device", "/dev/video0")

    caps = Gst.ElementFactory.make("capsfilter", "capsfilter")
    caps.set_property("caps", Gst.Caps.from_string(
        "video/x-raw,width=1280,height=720,framerate=30/1"
    ))

    nvconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")

    # nvinfer要素がTensorRT推論を実行
    nvinfer = Gst.ElementFactory.make("nvinfer", "primary-inference")
    nvinfer.set_property("config-file-path", "config_infer_primary.txt")

    tracker = Gst.ElementFactory.make("nvtracker", "tracker")
    tracker.set_property(
        "ll-lib-file",
        "/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so"
    )

    osd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
    sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")

    for element in [source, caps, nvconv, nvinfer, tracker, osd, sink]:
        pipeline.add(element)

    source.link(caps)
    caps.link(nvconv)
    nvconv.link(nvinfer)
    nvinfer.link(tracker)
    tracker.link(osd)
    osd.link(sink)

    return pipeline

pipeline = create_pipeline()
pipeline.set_state(Gst.State.PLAYING)

6. Raspberry Pi AI

Raspberry Piは教育プラットフォームから本格的なエッジAIデプロイターゲットへと進化しました。

Raspberry Pi 5 + Hailo-8

Hailo-8はRaspberry Pi向けに26 TOPSを実現するAIアクセラレータHATです。

# Hailo SDKのインストール
pip install hailort

# 事前コンパイル済みモデルのダウンロード(ONNX -> HEF形式)
wget https://hailo-model-zoo.s3.eu-west-2.amazonaws.com/ModelZoo/Compiled/v2.11.0/hailo8/resnet_v1_50.hef
import hailo_platform as hp
import numpy as np

with hp.VDevice() as vdevice:
    hef = hp.Hef("resnet_v1_50.hef")
    network_groups = vdevice.configure(hef)
    network_group = network_groups[0]

    input_vstreams_params = hp.InputVStreamParams.make_from_network_group(
        network_group, quantized=False, format_type=hp.FormatType.FLOAT32
    )
    output_vstreams_params = hp.OutputVStreamParams.make_from_network_group(
        network_group, quantized=False, format_type=hp.FormatType.FLOAT32
    )

    with hp.InferVStreams(network_group, input_vstreams_params, output_vstreams_params) as infer_pipeline:
        input_data = {"input_layer1": np.random.rand(1, 224, 224, 3).astype(np.float32)}
        with network_group.activate():
            infer_results = infer_pipeline.infer(input_data)
        output_key = 'resnet_v1_50/softmax1'
        print(f"結果: {np.argmax(infer_results[output_key])}")

OpenCV + Raspberry Piカメラ

import cv2
import numpy as np
import tflite_runtime.interpreter as tflite

# Raspberry Piでtflite_runtime(軽量版)を使用
interpreter = tflite.Interpreter(
    model_path='ssd_mobilenet_v2.tflite',
    num_threads=4
)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

while True:
    ret, frame = cap.read()
    if not ret:
        break

    input_size = (input_details[0]['shape'][2], input_details[0]['shape'][1])
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    resized = cv2.resize(rgb_frame, input_size)
    input_data = np.expand_dims(resized, axis=0).astype(np.uint8)

    interpreter.set_tensor(input_details[0]['index'], input_data)
    interpreter.invoke()

    boxes = interpreter.get_tensor(output_details[0]['index'])[0]
    classes = interpreter.get_tensor(output_details[1]['index'])[0]
    scores = interpreter.get_tensor(output_details[2]['index'])[0]

    h, w = frame.shape[:2]
    for i in range(len(scores)):
        if scores[i] > 0.5:
            ymin, xmin, ymax, xmax = boxes[i]
            cv2.rectangle(frame,
                         (int(xmin * w), int(ymin * h)),
                         (int(xmax * w), int(ymax * h)),
                         (0, 255, 0), 2)
            label = f"class {int(classes[i])}: {scores[i]:.2f}"
            cv2.putText(frame, label, (int(xmin * w), int(ymin * h) - 10),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

    cv2.imshow('Raspberry Pi AI', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

7. MediaPipe

Google MediaPipeは顔検出、手のトラッキング、姿勢推定、物体検出などのすぐに使えるMLソリューションを提供します。

Pythonでの手のトラッキング

import mediapipe as mp
import cv2
import numpy as np

mp_hands = mp.solutions.hands
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles

def run_hand_tracking():
    cap = cv2.VideoCapture(0)

    with mp_hands.Hands(
        static_image_mode=False,
        max_num_hands=2,
        min_detection_confidence=0.7,
        min_tracking_confidence=0.5
    ) as hands:
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            rgb_frame.flags.writeable = False
            results = hands.process(rgb_frame)
            rgb_frame.flags.writeable = True

            frame = cv2.cvtColor(rgb_frame, cv2.COLOR_RGB2BGR)

            if results.multi_hand_landmarks:
                for hand_landmarks in results.multi_hand_landmarks:
                    mp_drawing.draw_landmarks(
                        frame,
                        hand_landmarks,
                        mp_hands.HAND_CONNECTIONS,
                        mp_drawing_styles.get_default_hand_landmarks_style(),
                        mp_drawing_styles.get_default_hand_connections_style()
                    )

                    # ランドマーク座標の抽出(21キーポイント)
                    for idx, landmark in enumerate(hand_landmarks.landmark):
                        h, w, _ = frame.shape
                        cx, cy = int(landmark.x * w), int(landmark.y * h)
                        if idx == 8:  # 人差し指の先端
                            cv2.circle(frame, (cx, cy), 10, (255, 0, 0), -1)

            cv2.imshow('Hand Tracking', frame)
            if cv2.waitKey(5) & 0xFF == ord('q'):
                break

    cap.release()
    cv2.destroyAllWindows()

run_hand_tracking()

姿勢推定

import mediapipe as mp
import cv2
import numpy as np

mp_pose = mp.solutions.pose

def calculate_angle(a, b, c):
    """3点間の角度を計算。"""
    a = np.array(a)
    b = np.array(b)
    c = np.array(c)

    radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - \
              np.arctan2(a[1] - b[1], a[0] - b[0])
    angle = np.abs(radians * 180.0 / np.pi)

    if angle > 180.0:
        angle = 360 - angle
    return angle

cap = cv2.VideoCapture(0)

with mp_pose.Pose(
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5,
    model_complexity=1  # 0: ライト、1: フル、2: ヘビー
) as pose:
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = pose.process(rgb_frame)
        frame = cv2.cvtColor(rgb_frame, cv2.COLOR_RGB2BGR)

        if results.pose_landmarks:
            landmarks = results.pose_landmarks.landmark
            h, w, _ = frame.shape

            shoulder = [landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x * w,
                       landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y * h]
            elbow = [landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].x * w,
                    landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].y * h]
            wrist = [landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].x * w,
                    landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].y * h]

            angle = calculate_angle(shoulder, elbow, wrist)
            cv2.putText(frame, f"Elbow: {angle:.1f} deg",
                       (int(elbow[0]), int(elbow[1])),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)

            mp.solutions.drawing_utils.draw_landmarks(
                frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS
            )

        cv2.imshow('Pose Estimation', frame)
        if cv2.waitKey(5) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()

MediaPipe Tasks API

import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision

base_options = python.BaseOptions(model_asset_path='efficientdet_lite0.tflite')
options = vision.ObjectDetectorOptions(
    base_options=base_options,
    running_mode=vision.RunningMode.IMAGE,
    max_results=5,
    score_threshold=0.5
)

with vision.ObjectDetector.create_from_options(options) as detector:
    image = mp.Image.create_from_file('test_image.jpg')
    detection_result = detector.detect(image)

    for detection in detection_result.detections:
        category = detection.categories[0]
        print(f"物体: {category.category_name}、スコア: {category.score:.2f}")
        bbox = detection.bounding_box
        print(f"  位置: ({bbox.origin_x}, {bbox.origin_y})、サイズ: {bbox.width}x{bbox.height}")

8. llama.cppとGGUF

llama.cppはMetaのLLaMAモデルのC++実装で、GPUなしのCPUで大規模言語モデルを実行できるようにします。

インストールと基本的な使用法

# ソースからビルド
git clone https://github.com/ggerganov/llama.cpp
cd llama.cpp

# CPUのみ
make -j4

# Apple Silicon(Metal GPU加速)
make LLAMA_METAL=1 -j4

# NVIDIA CUDA
make LLAMA_CUDA=1 -j4

# GGUFモデルのダウンロード
huggingface-cli download \
    bartowski/Llama-3.2-3B-Instruct-GGUF \
    Llama-3.2-3B-Instruct-Q4_K_M.gguf \
    --local-dir ./models

# インタラクティブチャット
./llama-cli \
    -m models/Llama-3.2-3B-Instruct-Q4_K_M.gguf \
    -n 512 \
    -p "You are a helpful AI assistant." \
    --repeat-penalty 1.1 \
    -t 8 \
    --color

量子化レベル(GGUF)

量子化ビット/重みサイズ(7B)品質使用タイミング
Q2_K約2.6ビット約2.7GBメモリが非常に限られている場合
Q4_04.5ビット約3.8GB中程度基本的な用途
Q4_K_M4.8ビット約4.1GB良好推奨:バランスが取れている
Q5_K_M5.7ビット約4.8GB非常に良好品質重視
Q6_K6.6ビット約5.5GB優秀高品質が必要な場合
Q8_08.5ビット約7.2GB最高十分なメモリがある場合

llama-cpp-python

from llama_cpp import Llama

# モデルの読み込み
llm = Llama(
    model_path="./models/Llama-3.2-3B-Instruct-Q4_K_M.gguf",
    n_ctx=4096,          # コンテキストウィンドウ
    n_threads=8,         # CPUスレッド
    n_gpu_layers=35,     # GPUにオフロードするレイヤー(-1で全て)
    verbose=False
)

# 基本的なテキスト生成
output = llm(
    "What is the capital of France?",
    max_tokens=128,
    temperature=0.7,
    top_p=0.95,
    top_k=40,
    repeat_penalty=1.1
)
print(output['choices'][0]['text'])

# チャット補完
messages = [
    {"role": "system", "content": "You are a helpful coding assistant."},
    {"role": "user", "content": "Write a Python function to compute the Fibonacci sequence."}
]

response = llm.create_chat_completion(
    messages=messages,
    max_tokens=512,
    temperature=0.7
)
print(response['choices'][0]['message']['content'])

# ストリーミング出力
stream = llm.create_chat_completion(
    messages=messages,
    max_tokens=512,
    stream=True
)

for chunk in stream:
    delta = chunk['choices'][0]['delta']
    if 'content' in delta:
        print(delta['content'], end='', flush=True)
print()

OpenAI互換サーバー

# llama.cppサーバーの起動
./llama-server \
    -m models/Llama-3.2-3B-Instruct-Q4_K_M.gguf \
    --port 8080 \
    --host 0.0.0.0 \
    -n 2048 \
    -t 8 \
    --n-gpu-layers 35
# ローカルサーバーでOpenAI SDKを使用
from openai import OpenAI

client = OpenAI(
    base_url="http://localhost:8080/v1",
    api_key="none"
)

response = client.chat.completions.create(
    model="local-model",
    messages=[
        {"role": "user", "content": "Explain the difference between machine learning and deep learning."}
    ],
    max_tokens=512,
    temperature=0.7
)
print(response.choices[0].message.content)

HuggingFaceモデルをGGUFに変換

cd llama.cpp

# Python依存関係のインストール
pip install -r requirements.txt

# HuggingFaceモデルをGGUFに変換
python convert_hf_to_gguf.py \
    /path/to/hf_model \
    --outfile models/my_model.gguf \
    --outtype f16

# モデルの量子化
./quantize models/my_model.gguf models/my_model_q4km.gguf Q4_K_M

9. Whisper.cpp

Whisper.cppはOpenAIのWhisper音声認識モデルのC++実装で、Raspberry Piからスマートフォンまでオフラインでの音声認識を可能にします。

インストールと基本的な使用法

# ビルド
git clone https://github.com/ggerganov/whisper.cpp
cd whisper.cpp
make -j4

# Apple SiliconのMetalを使用
make WHISPER_METAL=1 -j4

# モデルのダウンロード
bash ./models/download-ggml-model.sh base.en    # 英語のみ、142MB
bash ./models/download-ggml-model.sh medium     # 多言語、1.5GB
bash ./models/download-ggml-model.sh large-v3   # 最高品質、3.1GB

# 音声ファイルの文字起こし
./main -m models/ggml-medium.bin \
       -f audio.wav \
       -l en \
       --output-txt \
       -of output

# リアルタイムマイク入力
./stream -m models/ggml-medium.bin \
         -t 8 \
         --step 500 \
         --length 5000 \
         -l en

whisper-cpp-python

import whisper_cpp
import numpy as np
import soundfile as sf

# モデルの読み込み
model = whisper_cpp.Whisper.from_pretrained("medium")

# WAVファイルの文字起こし
audio, sr = sf.read("audio.wav", dtype="float32")
if audio.ndim > 1:
    audio = audio.mean(axis=1)  # ステレオ -> モノラル

# 必要に応じて16kHzにリサンプリング
if sr != 16000:
    import librosa
    audio = librosa.resample(audio, orig_sr=sr, target_sr=16000)

result = model.transcribe(audio, language="en")
print(f"文字起こし:\n{result['text']}")

# タイムスタンプ付き
for segment in result['segments']:
    start = segment['start']
    end = segment['end']
    text = segment['text']
    print(f"[{start:.2f}s -> {end:.2f}s] {text}")

Whisperモデルの量子化

# GGMLモデルの量子化
./quantize models/ggml-medium.bin models/ggml-medium-q5_0.bin q5_0

# サイズ比較
ls -lh models/ggml-medium*.bin
# ggml-medium.bin: 1.5GB
# ggml-medium-q5_0.bin: ~900MB

WhisperKitを使ったiOSでのWhisper

import WhisperKit

class SpeechRecognizer {
    var whisperKit: WhisperKit?

    func initialize() async {
        do {
            whisperKit = try await WhisperKit(
                model: "openai_whisper-medium",
                computeOptions: ModelComputeOptions(melCompute: .cpuAndGPU)
            )
            print("Whisperモデルを読み込みました")
        } catch {
            print("初期化に失敗: \(error)")
        }
    }

    func transcribe(audioURL: URL) async -> String? {
        guard let whisperKit = whisperKit else { return nil }

        do {
            let result = try await whisperKit.transcribe(
                audioPath: audioURL.path,
                decodeOptions: DecodingOptions(language: "en")
            )
            return result.map(\.text).joined(separator: " ")
        } catch {
            print("文字起こしに失敗: \(error)")
            return nil
        }
    }
}

10. WebブラウザAI

WebAssemblyとWebGPUのおかげで、ブラウザでの強力なAI推論が実用的になっています。

TensorFlow.js

<!DOCTYPE html>
<html>
  <head>
    <title>ブラウザ画像分類</title>
    <script src="https://cdn.jsdelivr.net/npm/@tensorflow/tfjs@latest"></script>
    <script src="https://cdn.jsdelivr.net/npm/@tensorflow-models/mobilenet@2.1.1"></script>
  </head>
  <body>
    <input type="file" id="imageInput" accept="image/*" />
    <img id="preview" style="max-width: 400px;" />
    <div id="result"></div>

    <script>
      let model

      async function loadModel() {
        model = await mobilenet.load({ version: 2, alpha: 1.0 })
        console.log('モデルを読み込みました!')
        document.getElementById('result').textContent = 'モデル準備完了。画像を選択してください。'
      }

      document.getElementById('imageInput').addEventListener('change', async (e) => {
        const file = e.target.files[0]
        if (!file) return

        const img = document.getElementById('preview')
        img.src = URL.createObjectURL(file)
        img.onload = async () => {
          const predictions = await model.classify(img, 5)
          const resultDiv = document.getElementById('result')
          resultDiv.innerHTML = '<h3>上位予測:</h3>'
          predictions.forEach((pred) => {
            resultDiv.innerHTML += `
                    <p>${pred.className}: ${(pred.probability * 100).toFixed(2)}%</p>
                `
          })
        }
      })

      loadModel()
    </script>
  </body>
</html>

Transformers.js(HuggingFace)

import { pipeline, env } from '@xenova/transformers'

env.backends.onnx.wasm.wasmPaths = 'https://cdn.jsdelivr.net/npm/onnxruntime-web/dist/'

// 感情分析パイプライン
async function runTextClassification() {
  const classifier = await pipeline(
    'sentiment-analysis',
    'Xenova/distilbert-base-uncased-finetuned-sst-2-english'
  )

  const results = await classifier(['I love machine learning!', 'This is terrible.'])
  results.forEach((result, i) => {
    console.log(`Text ${i + 1}: ${result.label} (${(result.score * 100).toFixed(2)}%)`)
  })
}

// 画像分類
async function runImageClassification() {
  const classifier = await pipeline('image-classification', 'Xenova/vit-base-patch16-224')
  const result = await classifier('https://example.com/image.jpg')
  console.log('画像分類結果:', result)
}

// 小型LLMでのテキスト生成
async function runTextGeneration() {
  const generator = await pipeline('text-generation', 'Xenova/gpt2')
  const output = await generator('The future of AI is', {
    max_new_tokens: 100,
    temperature: 0.7,
  })
  console.log('生成されたテキスト:', output[0].generated_text)
}

runTextClassification()

WebGPUアクセラレーション推論

import * as ort from 'onnxruntime-web'

async function runWithWebGPU() {
  if (!navigator.gpu) {
    console.log('このブラウザはWebGPUをサポートしていません。')
    return
  }

  const adapter = await navigator.gpu.requestAdapter()
  const device = await adapter.requestDevice()
  console.log('WebGPUアダプター:', adapter.info)

  ort.env.wasm.wasmPaths = '/'
  const session = await ort.InferenceSession.create('/models/resnet50.onnx', {
    executionProviders: ['webgpu'],
    graphOptimizationLevel: 'all',
  })

  const batchSize = 4
  const inputData = new Float32Array(batchSize * 3 * 224 * 224)
  const inputTensor = new ort.Tensor('float32', inputData, [batchSize, 3, 224, 224])

  const startTime = performance.now()
  const output = await session.run({ input: inputTensor })
  const elapsed = performance.now() - startTime

  console.log(`WebGPU推論時間: ${elapsed.toFixed(2)}ms`)
  console.log(`スループット: ${((batchSize / elapsed) * 1000).toFixed(1)} 画像/秒`)
}

runWithWebGPU()

11. AIモデル最適化パイプライン

エンドツーエンドフロー:トレーニング → 最適化 → デプロイ

トレーニング(PyTorch/TF     |
プルーニング(不要な重みを削除)
     |
知識蒸留(Teacher-Student)
     |
量子化対応トレーニング(QAT     |
形式変換(ONNX/TFLite/GGUF     |
ランタイム最適化(TensorRT/OpenVINO)
     |
デプロイ(モバイル/エッジ/Web)

プルーニング

import torch
import torch.nn.utils.prune as prune
import torchvision.models as models

model = models.resnet50(pretrained=True)

# 非構造化プルーニング:Conv2d層の重みの30%を削除
for name, module in model.named_modules():
    if isinstance(module, torch.nn.Conv2d):
        prune.l1_unstructured(module, name='weight', amount=0.3)
        prune.remove(module, 'weight')  # マスクを恒久化

original_params = sum(p.numel() for p in models.resnet50().parameters())
pruned_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"元のパラメータ数: {original_params:,}")
print(f"プルーニング後: {pruned_params:,}")
print(f"削減率: {(1 - pruned_params/original_params)*100:.1f}%")

知識蒸留

import torch
import torch.nn as nn
import torch.nn.functional as F

class DistillationLoss(nn.Module):
    def __init__(self, temperature=4.0, alpha=0.7):
        super().__init__()
        self.T = temperature
        self.alpha = alpha

    def forward(self, student_logits, teacher_logits, labels):
        # ソフトターゲット損失(蒸留)
        soft_loss = F.kl_div(
            F.log_softmax(student_logits / self.T, dim=1),
            F.softmax(teacher_logits / self.T, dim=1),
            reduction='batchmean'
        ) * (self.T ** 2)

        # ハードターゲット損失(クロスエントロピー)
        hard_loss = F.cross_entropy(student_logits, labels)

        return self.alpha * soft_loss + (1 - self.alpha) * hard_loss

def train_student(teacher, student, dataloader, epochs=10):
    teacher.eval()
    student.train()

    optimizer = torch.optim.AdamW(student.parameters(), lr=1e-4)
    criterion = DistillationLoss(temperature=4.0, alpha=0.7)

    for epoch in range(epochs):
        total_loss = 0
        for images, labels in dataloader:
            with torch.no_grad():
                teacher_logits = teacher(images)

            student_logits = student(images)
            loss = criterion(student_logits, teacher_logits, labels)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(dataloader):.4f}")

# Teacher: ResNet50、Student: MobileNetV2
teacher = models.resnet50(pretrained=True)
student = models.mobilenet_v2(pretrained=False)

量子化対応トレーニング(QAT)

import torch
from torch.quantization import get_default_qat_qconfig, prepare_qat, convert

model = models.mobilenet_v2(pretrained=True)
model.train()

# QAT設定
model.qconfig = get_default_qat_qconfig('qnnpack')  # ARM/モバイル
# model.qconfig = get_default_qat_qconfig('fbgemm')  # x86

# QATの準備(疑似量子化ノードを挿入)
model = prepare_qat(model, inplace=False)

# 数エポックでQATのファインチューニング
optimizer = torch.optim.SGD(model.parameters(), lr=0.0001)
model.train()
for epoch in range(5):
    for images, labels in dataloader:
        outputs = model(images)
        loss = nn.CrossEntropyLoss()(outputs, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    print(f"QATエポック {epoch+1}/5 完了")

# INT8モデルに変換
model.eval()
quantized_model = convert(model.eval(), inplace=False)
torch.save(quantized_model.state_dict(), 'mobilenetv2_int8.pth')
print("QAT完了!精度の低下を最小限に4倍小さいモデルになりました")

包括的なベンチマークツール

import time
import numpy as np
import psutil
import os

class EdgeAIBenchmark:
    def __init__(self, model_path, framework='tflite'):
        self.model_path = model_path
        self.framework = framework
        self.results = {}

    def measure_latency(self, input_data, num_runs=100, warmup=10):
        """平均推論レイテンシを測定。"""
        import tensorflow as tf

        interpreter = tf.lite.Interpreter(model_path=self.model_path)
        interpreter.allocate_tensors()
        input_details = interpreter.get_input_details()
        output_details = interpreter.get_output_details()

        # ウォームアップ
        for _ in range(warmup):
            interpreter.set_tensor(input_details[0]['index'], input_data)
            interpreter.invoke()

        # 測定
        latencies = []
        for _ in range(num_runs):
            start = time.perf_counter()
            interpreter.set_tensor(input_details[0]['index'], input_data)
            interpreter.invoke()
            _ = interpreter.get_tensor(output_details[0]['index'])
            latencies.append((time.perf_counter() - start) * 1000)

        self.results['latency_mean_ms'] = np.mean(latencies)
        self.results['latency_p99_ms'] = np.percentile(latencies, 99)
        self.results['throughput_fps'] = 1000 / np.mean(latencies)
        return self.results

    def measure_memory(self):
        """メモリ消費量を測定。"""
        process = psutil.Process(os.getpid())
        before = process.memory_info().rss / 1024 / 1024

        import tensorflow as tf
        interpreter = tf.lite.Interpreter(model_path=self.model_path)
        interpreter.allocate_tensors()

        after = process.memory_info().rss / 1024 / 1024
        self.results['memory_mb'] = after - before
        return self.results

    def measure_model_size(self):
        """モデルファイルサイズを測定。"""
        size_bytes = os.path.getsize(self.model_path)
        self.results['model_size_mb'] = size_bytes / 1024 / 1024
        return self.results

    def run_full_benchmark(self, input_data):
        self.measure_model_size()
        self.measure_memory()
        self.measure_latency(input_data)

        print(f"\n=== {self.model_path} ベンチマーク ===")
        print(f"モデルサイズ: {self.results.get('model_size_mb', 0):.2f} MB")
        print(f"メモリ使用量: {self.results.get('memory_mb', 0):.2f} MB")
        print(f"平均レイテンシ: {self.results.get('latency_mean_ms', 0):.2f} ms")
        print(f"P99レイテンシ: {self.results.get('latency_p99_ms', 0):.2f} ms")
        print(f"スループット: {self.results.get('throughput_fps', 0):.1f} FPS")
        return self.results


# 使用例
input_data = np.random.rand(1, 224, 224, 3).astype(np.float32)

bench = EdgeAIBenchmark('mobilenetv2.tflite')
results = bench.run_full_benchmark(input_data)

bench_q = EdgeAIBenchmark('mobilenetv2_int8.tflite')
results_q = bench_q.run_full_benchmark(input_data)

print("\n=== 量子化の影響 ===")
size_reduction = (1 - results_q['model_size_mb'] / results['model_size_mb']) * 100
speed_improvement = results['latency_mean_ms'] / results_q['latency_mean_ms']
print(f"サイズ削減: {size_reduction:.1f}%")
print(f"速度向上: {speed_improvement:.1f}倍")

まとめ

エッジAIはもはや研究の好奇心ではなく、実際の製品に大規模にデプロイされています。このガイドで取り上げたフレームワークの簡易リファレンスです:

  1. TFLite:モバイルアプリで最も広く採用。AndroidとiOSの両方をネイティブにサポート
  2. ONNX Runtime:フレームワーク非依存。クロスプラットフォームデプロイに最適
  3. Core ML:Appleデバイス上でApple Neural Engineを最大限に活用
  4. TensorRT:JetsonとサーバーでNVIDIA GPU加速を最大化
  5. llama.cpp:CPUでLLMを実行。特にApple Siliconで強力
  6. Whisper.cpp:オフライン音声認識のデファクトスタンダード
  7. MediaPipe:ビジョンMLソリューションの迅速なプロトタイピング
  8. Transformers.js:ブラウザ上でHuggingFaceモデルを直接実行

モデルの選択と最適化戦略は、ターゲットハードウェア、精度要件、レイテンシ目標によって異なります。INT8量子化は、ほぼすべてのエッジAIプロジェクトで強く推奨される最初の最適化ステップです。精度の低下を最小限に抑えながら、サイズを劇的に削減し、速度を向上させます。


参考文献