- Authors

- Name
- Youngju Kim
- @fjvbn20031
はじめに
TensorFlowは、2015年にGoogle Brainがオープンソース化した機械学習・深層学習フレームワークです。現在最も広く使われている深層学習フレームワークの一つで、研究から本番デプロイまで全プロセスをサポートしています。TensorFlow 2.x から公式高レベルAPIとして統合されたKerasは、直感的かつ迅速なモデル開発を可能にします。
このガイドでは、TensorFlowとKerasの基本概念から実際の本番環境へのモデルデプロイまでを段階的に解説します。
1. TensorFlowの紹介とインストール
TensorFlow vs PyTorch 比較
両フレームワークにはそれぞれ長所と短所があります。
| 特徴 | TensorFlow/Keras | PyTorch |
|---|---|---|
| 開発元 | Meta (Facebook) | |
| デプロイツール | TF Serving, TFLite, TF.js | TorchServe, TorchScript |
| 本番環境の成熟度 | 非常に高い | 高い |
| 研究での人気 | 高い | 非常に高い |
| 学習曲線 | 中程度 | 低い |
| モバイル/エッジ | TFLite 優秀 | ExecuTorch |
| エコシステム | TFX, TFHub など | HuggingFace 統合 |
インストール
pipを使ってインストールします。
# CPUのみ
pip install tensorflow
# GPU対応(CUDA自動検出、TF 2.9+)
pip install tensorflow[and-cuda]
# バージョン指定
pip install tensorflow==2.15.0
# conda環境
conda create -n tf_env python=3.10
conda activate tf_env
conda install -c conda-forge tensorflow
macOS Apple Siliconの場合:
pip install tensorflow-macos
pip install tensorflow-metal # GPU高速化
TensorFlow 2.x の主な変更点
TensorFlow 2.0 の最大の変更点は、Eager Executionがデフォルトで有効になったことです。TF 1.x では計算グラフを先に定義してSessionで実行する必要がありましたが、TF 2.x では通常のPythonコードのようにすぐに実行されます。
import tensorflow as tf
print(tf.__version__)
# Eager executionを確認
print(tf.executing_eagerly()) # True
# グラフ実行(TF 1.x互換スタイル)
@tf.function
def compute(x, y):
return x + y
result = compute(tf.constant(1.0), tf.constant(2.0))
print(result) # tf.Tensor(3.0, shape=(), dtype=float32)
GPU設定の確認
import tensorflow as tf
# 利用可能なGPUのリスト
gpus = tf.config.list_physical_devices('GPU')
print("利用可能なGPU:", gpus)
# メモリ増加を有効化(OOMエラーを防止)
if gpus:
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
# 特定のGPUのみを使用
if gpus:
tf.config.set_visible_devices(gpus[0], 'GPU')
# 1つの物理GPUを複数の論理GPUに分割
if gpus:
tf.config.set_logical_device_configuration(
gpus[0],
[tf.config.LogicalDeviceConfiguration(memory_limit=2048),
tf.config.LogicalDeviceConfiguration(memory_limit=2048)]
)
# 演算が実行されるデバイスを確認
with tf.device('/GPU:0'):
a = tf.constant([[1.0, 2.0], [3.0, 4.0]])
b = tf.constant([[5.0, 6.0], [7.0, 8.0]])
c = tf.matmul(a, b)
print(c.device)
2. TensorFlowテンソルの基礎
テンソルはTensorFlowのコアデータ構造です。NumPy配列に似ていますが、GPUで高速化でき、自動微分もサポートしています。
tf.constant と tf.Variable
import tensorflow as tf
import numpy as np
# スカラー(0次元テンソル)
scalar = tf.constant(42)
print(scalar) # tf.Tensor(42, shape=(), dtype=int32)
print(scalar.dtype) # tf.int32
print(scalar.shape) # ()
# ベクトル(1次元テンソル)
vector = tf.constant([1.0, 2.0, 3.0])
print(vector) # tf.Tensor([1. 2. 3.], shape=(3,), dtype=float32)
# 行列(2次元テンソル)
matrix = tf.constant([[1, 2, 3],
[4, 5, 6]], dtype=tf.float32)
print(matrix.shape) # (2, 3)
# 3次元テンソル
tensor_3d = tf.constant([[[1, 2], [3, 4]],
[[5, 6], [7, 8]]])
print(tensor_3d.shape) # (2, 2, 2)
# 特殊テンソル
zeros = tf.zeros([3, 4]) # 3x4のゼロ行列
ones = tf.ones([2, 3]) # 2x3の1行列
identity = tf.eye(4) # 4x4の単位行列
random = tf.random.normal([3, 3]) # 3x3の正規乱数行列
# tf.Variable - 学習可能なパラメータ(変更可能)
var = tf.Variable([1.0, 2.0, 3.0])
print(var) # <tf.Variable 'Variable:0' ...>
var.assign([4.0, 5.0, 6.0]) # 値を更新
var.assign_add([1.0, 1.0, 1.0]) # 値を加算
var.assign_sub([0.5, 0.5, 0.5]) # 値を減算
テンソル演算
import tensorflow as tf
a = tf.constant([[1.0, 2.0], [3.0, 4.0]])
b = tf.constant([[5.0, 6.0], [7.0, 8.0]])
# 基本演算(要素ごと)
print(a + b) # 加算
print(a - b) # 減算
print(a * b) # 乗算(要素ごと)
print(a / b) # 除算
print(a ** 2) # べき乗
# TF関数の等価形式
print(tf.add(a, b))
print(tf.subtract(a, b))
print(tf.multiply(a, b))
print(tf.divide(a, b))
# 行列乗算
print(tf.matmul(a, b)) # または a @ b
print(a @ b)
# 数学関数
x = tf.constant([1.0, 4.0, 9.0, 16.0])
print(tf.sqrt(x)) # [1, 2, 3, 4]
print(tf.exp(x)) # e^x
print(tf.math.log(x)) # 自然対数
# 縮約演算
matrix = tf.constant([[1.0, 2.0, 3.0],
[4.0, 5.0, 6.0]])
print(tf.reduce_sum(matrix)) # 合計: 21
print(tf.reduce_sum(matrix, axis=0)) # 列方向の合計: [5, 7, 9]
print(tf.reduce_sum(matrix, axis=1)) # 行方向の合計: [6, 15]
print(tf.reduce_mean(matrix)) # 平均
print(tf.reduce_max(matrix)) # 最大値
print(tf.reduce_min(matrix)) # 最小値
print(tf.argmax(matrix, axis=1)) # 行ごとの最大値インデックス
# 比較演算
print(tf.equal(a, b))
print(tf.greater(a, b))
print(tf.less_equal(a, b))
形状変換
import tensorflow as tf
t = tf.constant([[1, 2, 3, 4],
[5, 6, 7, 8]])
print(t.shape) # (2, 4)
# reshape
reshaped = tf.reshape(t, [4, 2])
print(reshaped.shape) # (4, 2)
reshaped2 = tf.reshape(t, [8])
print(reshaped2.shape) # (8,)
reshaped3 = tf.reshape(t, [-1, 2]) # -1 は自動推論
print(reshaped3.shape) # (4, 2)
# transpose
transposed = tf.transpose(t)
print(transposed.shape) # (4, 2)
# 高次元のtranspose
t3d = tf.random.normal([2, 3, 4])
transposed_3d = tf.transpose(t3d, perm=[0, 2, 1])
print(transposed_3d.shape) # (2, 4, 3)
# expand_dims - 次元を追加
t1d = tf.constant([1, 2, 3])
print(t1d.shape) # (3,)
expanded_0 = tf.expand_dims(t1d, axis=0)
print(expanded_0.shape) # (1, 3)
expanded_1 = tf.expand_dims(t1d, axis=1)
print(expanded_1.shape) # (3, 1)
# squeeze - サイズ1の次元を削除
t_squeezable = tf.constant([[[1, 2, 3]]])
print(t_squeezable.shape) # (1, 1, 3)
squeezed = tf.squeeze(t_squeezable)
print(squeezed.shape) # (3,)
# concat と stack
a = tf.constant([[1, 2], [3, 4]])
b = tf.constant([[5, 6], [7, 8]])
concat_0 = tf.concat([a, b], axis=0)
print(concat_0.shape) # (4, 2)
concat_1 = tf.concat([a, b], axis=1)
print(concat_1.shape) # (2, 4)
stacked = tf.stack([a, b], axis=0)
print(stacked.shape) # (2, 2, 2)
ブロードキャスト
import tensorflow as tf
# スカラーのブロードキャスト
matrix = tf.constant([[1.0, 2.0], [3.0, 4.0]])
print(matrix + 10) # 全要素に10を加算
# ベクトルのブロードキャスト
row_vector = tf.constant([10.0, 20.0]) # shape (2,)
print(matrix + row_vector) # 各行にrow_vectorを加算
col_vector = tf.constant([[10.0], [20.0]]) # shape (2, 1)
print(matrix + col_vector) # 各列にcol_vectorを加算
# バッチ演算でのブロードキャスト
batch = tf.random.normal([32, 128]) # バッチサイズ32、128特徴
mean = tf.reduce_mean(batch, axis=0, keepdims=True) # shape (1, 128)
std = tf.math.reduce_std(batch, axis=0, keepdims=True)
normalized = (batch - mean) / (std + 1e-8) # ブロードキャスト適用
NumPyとの相互運用性
import tensorflow as tf
import numpy as np
# NumPy -> TensorFlow
np_array = np.array([[1.0, 2.0], [3.0, 4.0]])
tf_tensor = tf.constant(np_array)
tf_tensor2 = tf.convert_to_tensor(np_array)
# TensorFlow -> NumPy
numpy_from_tf = tf_tensor.numpy()
print(type(numpy_from_tf)) # numpy.ndarray
# tf.Tensor はほとんどのNumPy関数を直接サポート
print(np.sin(tf_tensor))
print(np.sqrt(tf_tensor))
3. Keras Sequential API
Sequential APIはKerasでモデルを構築する最もシンプルな方法です。レイヤーを線形に積み重ねます。
レイヤーの積み重ねとモデルのコンパイル
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# 方法1: add() メソッド
model = keras.Sequential()
model.add(layers.Dense(128, activation='relu', input_shape=(784,)))
model.add(layers.Dropout(0.2))
model.add(layers.Dense(64, activation='relu'))
model.add(layers.Dropout(0.2))
model.add(layers.Dense(10, activation='softmax'))
# 方法2: リスト定義
model = keras.Sequential([
layers.Dense(128, activation='relu', input_shape=(784,)),
layers.Dropout(0.2),
layers.Dense(64, activation='relu'),
layers.Dropout(0.2),
layers.Dense(10, activation='softmax')
])
# モデルアーキテクチャを確認
model.summary()
# コンパイル
model.compile(
optimizer='adam', # または keras.optimizers.Adam(learning_rate=0.001)
loss='sparse_categorical_crossentropy', # 整数ラベル
metrics=['accuracy']
)
# 各種オプティマイザと損失関数
model.compile(
optimizer=keras.optimizers.SGD(learning_rate=0.01, momentum=0.9),
loss=keras.losses.CategoricalCrossentropy(),
metrics=[keras.metrics.CategoricalAccuracy()]
)
MNIST分類の完全なサンプル
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
# データの読み込み
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
# 前処理
x_train = x_train.astype('float32') / 255.0
x_test = x_test.astype('float32') / 255.0
x_train = x_train.reshape(-1, 784)
x_test = x_test.reshape(-1, 784)
print(f"学習データ: {x_train.shape}, ラベル: {y_train.shape}")
print(f"テストデータ: {x_test.shape}, ラベル: {y_test.shape}")
# モデル定義
model = keras.Sequential([
layers.Dense(256, activation='relu', input_shape=(784,)),
layers.BatchNormalization(),
layers.Dropout(0.3),
layers.Dense(128, activation='relu'),
layers.BatchNormalization(),
layers.Dropout(0.3),
layers.Dense(10, activation='softmax')
])
model.summary()
# コンパイル
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=1e-3),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# 学習
history = model.fit(
x_train, y_train,
batch_size=128,
epochs=20,
validation_split=0.1,
verbose=1
)
# 評価
test_loss, test_acc = model.evaluate(x_test, y_test, verbose=0)
print(f"テスト損失: {test_loss:.4f}")
print(f"テスト精度: {test_acc:.4f}")
# 予測
predictions = model.predict(x_test[:10])
predicted_classes = tf.argmax(predictions, axis=1)
print("予測:", predicted_classes.numpy())
print("正解:", y_test[:10])
# 学習曲線のプロット
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
ax1.plot(history.history['loss'], label='訓練損失')
ax1.plot(history.history['val_loss'], label='検証損失')
ax1.set_title('損失')
ax1.set_xlabel('エポック')
ax1.legend()
ax2.plot(history.history['accuracy'], label='訓練精度')
ax2.plot(history.history['val_accuracy'], label='検証精度')
ax2.set_title('精度')
ax2.set_xlabel('エポック')
ax2.legend()
plt.tight_layout()
plt.savefig('mnist_training.png')
plt.show()
4. Keras Functional API
Functional APIを使うと、複数入出力、残差接続、共有レイヤーなど、より複雑なモデルアーキテクチャを構築できます。
Functional APIの基本的な使い方
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# 入力の定義
inputs = keras.Input(shape=(784,), name='input_layer')
# レイヤーを関数として接続
x = layers.Dense(256, activation='relu', name='dense_1')(inputs)
x = layers.BatchNormalization(name='bn_1')(x)
x = layers.Dropout(0.3, name='dropout_1')(x)
x = layers.Dense(128, activation='relu', name='dense_2')(x)
x = layers.BatchNormalization(name='bn_2')(x)
x = layers.Dropout(0.3, name='dropout_2')(x)
outputs = layers.Dense(10, activation='softmax', name='output')(x)
# モデルの作成
model = keras.Model(inputs=inputs, outputs=outputs, name='mnist_model')
model.summary()
複数入出力モデル
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# 例:画像とメタデータを組み合わせる
# 画像入力
image_input = keras.Input(shape=(32, 32, 3), name='image')
x1 = layers.Conv2D(32, 3, activation='relu')(image_input)
x1 = layers.GlobalAveragePooling2D()(x1)
x1 = layers.Dense(64, activation='relu')(x1)
# メタデータ入力
meta_input = keras.Input(shape=(10,), name='metadata')
x2 = layers.Dense(32, activation='relu')(meta_input)
# マージ
combined = layers.concatenate([x1, x2])
combined = layers.Dense(64, activation='relu')(combined)
# 複数出力
main_output = layers.Dense(1, activation='sigmoid', name='main_output')(combined)
aux_output = layers.Dense(5, activation='softmax', name='aux_output')(combined)
# モデルの構築
model = keras.Model(
inputs=[image_input, meta_input],
outputs=[main_output, aux_output]
)
# 出力ごとの損失と重みでコンパイル
model.compile(
optimizer='adam',
loss={
'main_output': 'binary_crossentropy',
'aux_output': 'categorical_crossentropy'
},
loss_weights={
'main_output': 1.0,
'aux_output': 0.2
},
metrics={
'main_output': ['accuracy'],
'aux_output': ['accuracy']
}
)
残差接続
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
def residual_block(x, filters, stride=1):
"""ResNetスタイルの残差ブロック"""
shortcut = x
# メインパス
x = layers.Conv2D(filters, 3, strides=stride, padding='same')(x)
x = layers.BatchNormalization()(x)
x = layers.ReLU()(x)
x = layers.Conv2D(filters, 3, padding='same')(x)
x = layers.BatchNormalization()(x)
# 次元が異なる場合はショートカットを調整
if stride != 1 or shortcut.shape[-1] != filters:
shortcut = layers.Conv2D(filters, 1, strides=stride)(shortcut)
shortcut = layers.BatchNormalization()(shortcut)
# 残差を加算
x = layers.add([x, shortcut])
x = layers.ReLU()(x)
return x
# ResNetスタイルのモデルを構築
inputs = keras.Input(shape=(32, 32, 3))
x = layers.Conv2D(64, 7, strides=2, padding='same')(inputs)
x = layers.BatchNormalization()(x)
x = layers.ReLU()(x)
x = layers.MaxPooling2D(3, strides=2, padding='same')(x)
x = residual_block(x, 64)
x = residual_block(x, 64)
x = residual_block(x, 128, stride=2)
x = residual_block(x, 128)
x = residual_block(x, 256, stride=2)
x = residual_block(x, 256)
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dense(256, activation='relu')(x)
outputs = layers.Dense(10, activation='softmax')(x)
model = keras.Model(inputs, outputs, name='mini_resnet')
model.summary()
共有レイヤー(Siameseネットワーク)
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# 共有エンコーダを定義
shared_encoder = keras.Sequential([
layers.Dense(128, activation='relu'),
layers.Dense(64, activation='relu'),
layers.Dense(32)
], name='shared_encoder')
# 2つの入力
input_a = keras.Input(shape=(100,), name='input_a')
input_b = keras.Input(shape=(100,), name='input_b')
# 同じエンコーダで処理
encoded_a = shared_encoder(input_a)
encoded_b = shared_encoder(input_b)
# コサイン類似度
similarity = layers.Dot(axes=1, normalize=True)([encoded_a, encoded_b])
output = layers.Dense(1, activation='sigmoid')(similarity)
siamese_model = keras.Model(inputs=[input_a, input_b], outputs=output)
siamese_model.summary()
5. Keras Subclassing API
Subclassing APIはPyTorchに似た最も柔軟なアプローチです。カスタムレイヤーとモデルをPythonクラスとして定義します。
カスタムレイヤー
import tensorflow as tf
from tensorflow import keras
class MyDenseLayer(keras.layers.Layer):
def __init__(self, units, activation=None, **kwargs):
super().__init__(**kwargs)
self.units = units
self.activation = keras.activations.get(activation)
def build(self, input_shape):
# 重みを初期化(buildは初回呼び出し時に一度だけ実行)
self.w = self.add_weight(
name='kernel',
shape=(input_shape[-1], self.units),
initializer='glorot_uniform',
trainable=True
)
self.b = self.add_weight(
name='bias',
shape=(self.units,),
initializer='zeros',
trainable=True
)
super().build(input_shape)
def call(self, inputs, training=False):
output = tf.matmul(inputs, self.w) + self.b
if self.activation is not None:
output = self.activation(output)
return output
def get_config(self):
config = super().get_config()
config.update({'units': self.units, 'activation': keras.activations.serialize(self.activation)})
return config
# 使用例
layer = MyDenseLayer(64, activation='relu')
x = tf.random.normal([32, 128])
y = layer(x)
print(y.shape) # (32, 64)
class MultiHeadSelfAttention(keras.layers.Layer):
"""シンプルなマルチヘッド自己注意レイヤー"""
def __init__(self, embed_dim, num_heads, **kwargs):
super().__init__(**kwargs)
self.embed_dim = embed_dim
self.num_heads = num_heads
self.head_dim = embed_dim // num_heads
assert self.head_dim * num_heads == embed_dim
self.wq = keras.layers.Dense(embed_dim)
self.wk = keras.layers.Dense(embed_dim)
self.wv = keras.layers.Dense(embed_dim)
self.wo = keras.layers.Dense(embed_dim)
def split_heads(self, x, batch_size):
x = tf.reshape(x, (batch_size, -1, self.num_heads, self.head_dim))
return tf.transpose(x, perm=[0, 2, 1, 3])
def call(self, x, training=False):
batch_size = tf.shape(x)[0]
seq_len = tf.shape(x)[1]
q = self.split_heads(self.wq(x), batch_size)
k = self.split_heads(self.wk(x), batch_size)
v = self.split_heads(self.wv(x), batch_size)
# スケールドドット積アテンション
scale = tf.cast(self.head_dim, tf.float32) ** 0.5
scores = tf.matmul(q, k, transpose_b=True) / scale
weights = tf.nn.softmax(scores, axis=-1)
context = tf.matmul(weights, v)
# ヘッドをマージ
context = tf.transpose(context, perm=[0, 2, 1, 3])
context = tf.reshape(context, (batch_size, seq_len, self.embed_dim))
return self.wo(context)
カスタムモデル(Model Subclassing)
import tensorflow as tf
from tensorflow import keras
class ResidualBlock(keras.layers.Layer):
def __init__(self, filters, **kwargs):
super().__init__(**kwargs)
self.conv1 = keras.layers.Conv2D(filters, 3, padding='same')
self.conv2 = keras.layers.Conv2D(filters, 3, padding='same')
self.bn1 = keras.layers.BatchNormalization()
self.bn2 = keras.layers.BatchNormalization()
self.relu = keras.layers.ReLU()
def call(self, inputs, training=False):
x = self.conv1(inputs)
x = self.bn1(x, training=training)
x = self.relu(x)
x = self.conv2(x)
x = self.bn2(x, training=training)
x = x + inputs # 残差接続
return self.relu(x)
class CustomCNN(keras.Model):
def __init__(self, num_classes=10, **kwargs):
super().__init__(**kwargs)
self.conv_stem = keras.layers.Conv2D(32, 3, padding='same', activation='relu')
self.res_block1 = ResidualBlock(32)
self.res_block2 = ResidualBlock(32)
self.pool = keras.layers.MaxPooling2D(2)
self.conv_expand = keras.layers.Conv2D(64, 3, padding='same', activation='relu')
self.res_block3 = ResidualBlock(64)
self.gap = keras.layers.GlobalAveragePooling2D()
self.dropout = keras.layers.Dropout(0.5)
self.fc = keras.layers.Dense(num_classes, activation='softmax')
def call(self, inputs, training=False):
x = self.conv_stem(inputs)
x = self.res_block1(x, training=training)
x = self.res_block2(x, training=training)
x = self.pool(x)
x = self.conv_expand(x)
x = self.res_block3(x, training=training)
x = self.gap(x)
x = self.dropout(x, training=training)
return self.fc(x)
# 使用例
model = CustomCNN(num_classes=10)
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
# ダミーデータでテスト
dummy_input = tf.random.normal([4, 32, 32, 3])
output = model(dummy_input, training=False)
print(output.shape) # (4, 10)
model.summary()
6. CNN実装 (CIFAR-10)
基本的なCNNモデル
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# CIFAR-10を読み込む
(x_train, y_train), (x_test, y_test) = keras.datasets.cifar10.load_data()
x_train = x_train.astype('float32') / 255.0
x_test = x_test.astype('float32') / 255.0
class_names = ['飛行機', '自動車', '鳥', '猫', '鹿',
'犬', 'カエル', '馬', '船', 'トラック']
print(f"学習データ: {x_train.shape}") # (50000, 32, 32, 3)
print(f"テストデータ: {x_test.shape}") # (10000, 32, 32, 3)
# CNNモデルの定義
def build_cnn_model():
model = keras.Sequential([
# 第1畳み込みブロック
layers.Conv2D(32, (3, 3), padding='same', input_shape=(32, 32, 3)),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.Conv2D(32, (3, 3), padding='same'),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.MaxPooling2D((2, 2)),
layers.Dropout(0.25),
# 第2畳み込みブロック
layers.Conv2D(64, (3, 3), padding='same'),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.Conv2D(64, (3, 3), padding='same'),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.MaxPooling2D((2, 2)),
layers.Dropout(0.25),
# 第3畳み込みブロック
layers.Conv2D(128, (3, 3), padding='same'),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.Conv2D(128, (3, 3), padding='same'),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.GlobalAveragePooling2D(),
layers.Dropout(0.5),
# 全結合層
layers.Dense(256, activation='relu'),
layers.BatchNormalization(),
layers.Dropout(0.5),
layers.Dense(10, activation='softmax')
])
return model
model = build_cnn_model()
model.summary()
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=1e-3),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
データ拡張
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# Keras組み込みの拡張レイヤー
data_augmentation = keras.Sequential([
layers.RandomFlip('horizontal'),
layers.RandomRotation(0.1),
layers.RandomZoom(0.1),
layers.RandomTranslation(0.1, 0.1),
layers.RandomContrast(0.1),
], name='data_augmentation')
# モデルに拡張を統合
inputs = keras.Input(shape=(32, 32, 3))
x = data_augmentation(inputs) # 学習時のみ適用
x = layers.Rescaling(1./255)(x)
x = layers.Conv2D(32, 3, padding='same', activation='relu')(x)
x = layers.MaxPooling2D()(x)
x = layers.Conv2D(64, 3, padding='same', activation='relu')(x)
x = layers.MaxPooling2D()(x)
x = layers.GlobalAveragePooling2D()(x)
outputs = layers.Dense(10, activation='softmax')(x)
augmented_model = keras.Model(inputs, outputs)
転移学習
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.applications import EfficientNetB0
# EfficientNetB0を使った転移学習
def build_transfer_model(num_classes=10):
# 事前学習済みベースモデル(分類ヘッドなし)
base_model = EfficientNetB0(
include_top=False,
weights='imagenet',
input_shape=(224, 224, 3)
)
# 最初はベースモデルを凍結
base_model.trainable = False
inputs = keras.Input(shape=(224, 224, 3))
# EfficientNetは内部で前処理を含む
x = base_model(inputs, training=False)
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dense(256, activation='relu')(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(num_classes, activation='softmax')(x)
model = keras.Model(inputs, outputs)
return model, base_model
model, base_model = build_transfer_model(num_classes=10)
model.compile(
optimizer=keras.optimizers.Adam(1e-3),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# フェーズ1: 凍結したベースで分類器を学習
# model.fit(train_dataset, epochs=10, ...)
# ファインチューニング: 一部のベース層を解凍
def fine_tune(model, base_model, fine_tune_at=100):
base_model.trainable = True
# fine_tune_at より前の層を凍結したまま
for layer in base_model.layers[:fine_tune_at]:
layer.trainable = False
model.compile(
optimizer=keras.optimizers.Adam(1e-5), # 非常に低い学習率
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
return model
7. RNN/LSTM実装
時系列予測(LSTM)
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
# 合成時系列データの生成
def generate_time_series(n_samples, n_steps):
t = np.linspace(0, 4 * np.pi, n_steps + 1)
series = np.sin(t) + 0.1 * np.random.randn(n_samples, n_steps + 1)
X = series[:, :-1].reshape(-1, n_steps, 1)
y = series[:, 1:].reshape(-1, n_steps, 1)
return X, y
X_train, y_train = generate_time_series(10000, 50)
X_val, y_val = generate_time_series(1000, 50)
X_test, y_test = generate_time_series(1000, 50)
# LSTMモデル
def build_lstm_model(n_steps=50):
model = keras.Sequential([
layers.LSTM(64, return_sequences=True, input_shape=(n_steps, 1)),
layers.Dropout(0.2),
layers.LSTM(64, return_sequences=True),
layers.Dropout(0.2),
layers.TimeDistributed(layers.Dense(1))
])
model.compile(
optimizer='adam',
loss='mse',
metrics=['mae']
)
return model
model = build_lstm_model()
model.summary()
history = model.fit(
X_train, y_train,
epochs=20,
batch_size=128,
validation_data=(X_val, y_val)
)
テキスト生成(文字レベルRNN)
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
# サンプルテキスト(実際には大きなコーパスを使用)
text = """TensorFlowはGoogleが開発した深層学習フレームワークです。
KerasはTensorFlowの上で動作する高レベルAPIです。
両ライブラリを使えば、強力な深層学習モデルを素早く構築できます。"""
# 文字セットを作成
chars = sorted(set(text))
char2idx = {c: i for i, c in enumerate(chars)}
idx2char = np.array(chars)
vocab_size = len(chars)
# テキストをインデックスに変換
text_as_int = np.array([char2idx[c] for c in text])
# シーケンスを作成
seq_length = 50
sequences = tf.data.Dataset.from_tensor_slices(text_as_int)
sequences = sequences.batch(seq_length + 1, drop_remainder=True)
def split_input_target(chunk):
return chunk[:-1], chunk[1:]
dataset = sequences.map(split_input_target)
dataset = dataset.shuffle(100).batch(32, drop_remainder=True)
# 文字レベルRNNモデル
def build_char_rnn(vocab_size, embed_dim=64, rnn_units=256):
model = keras.Sequential([
layers.Embedding(vocab_size, embed_dim),
layers.GRU(rnn_units, return_sequences=True, stateful=False),
layers.GRU(rnn_units, return_sequences=True),
layers.Dense(vocab_size)
])
return model
model = build_char_rnn(vocab_size)
model.compile(
optimizer='adam',
loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy']
)
# テキスト生成関数
def generate_text(model, start_string, num_generate=100, temperature=1.0):
input_eval = [char2idx[s] for s in start_string]
input_eval = tf.expand_dims(input_eval, 0)
text_generated = []
for _ in range(num_generate):
predictions = model(input_eval)
predictions = tf.squeeze(predictions, 0) / temperature
predicted_id = tf.random.categorical(predictions, num_samples=1)[-1, 0].numpy()
input_eval = tf.expand_dims([predicted_id], 0)
text_generated.append(idx2char[predicted_id])
return start_string + ''.join(text_generated)
双方向LSTM
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# テキスト分類のための双方向LSTM
def build_bidirectional_model(vocab_size, max_len=100, embed_dim=64):
model = keras.Sequential([
layers.Embedding(vocab_size, embed_dim, input_length=max_len),
layers.Bidirectional(layers.LSTM(64, return_sequences=True)),
layers.Bidirectional(layers.LSTM(32)),
layers.Dense(64, activation='relu'),
layers.Dropout(0.5),
layers.Dense(1, activation='sigmoid')
])
model.compile(
optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy']
)
return model
model = build_bidirectional_model(vocab_size=10000, max_len=100)
model.summary()
8. KerasによるTransformer
マルチヘッドアテンションとTransformerエンコーダ
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
class TransformerBlock(keras.layers.Layer):
def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1, **kwargs):
super().__init__(**kwargs)
self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
self.ffn = keras.Sequential([
layers.Dense(ff_dim, activation='relu'),
layers.Dense(embed_dim)
])
self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
self.dropout1 = layers.Dropout(rate)
self.dropout2 = layers.Dropout(rate)
def call(self, inputs, training=False):
attn_output = self.att(inputs, inputs)
attn_output = self.dropout1(attn_output, training=training)
out1 = self.layernorm1(inputs + attn_output)
ffn_output = self.ffn(out1)
ffn_output = self.dropout2(ffn_output, training=training)
return self.layernorm2(out1 + ffn_output)
class TokenAndPositionEmbedding(keras.layers.Layer):
def __init__(self, maxlen, vocab_size, embed_dim, **kwargs):
super().__init__(**kwargs)
self.token_emb = layers.Embedding(vocab_size, embed_dim)
self.pos_emb = layers.Embedding(maxlen, embed_dim)
def call(self, x):
maxlen = tf.shape(x)[-1]
positions = tf.range(start=0, limit=maxlen, delta=1)
positions = self.pos_emb(positions)
x = self.token_emb(x)
return x + positions
# テキスト分類のTransformer
def build_transformer_classifier(
vocab_size=20000,
maxlen=200,
embed_dim=32,
num_heads=2,
ff_dim=32,
num_classes=2
):
inputs = layers.Input(shape=(maxlen,))
embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
x = embedding_layer(inputs)
transformer_block = TransformerBlock(embed_dim, num_heads, ff_dim)
x = transformer_block(x)
x = layers.GlobalAveragePooling1D()(x)
x = layers.Dropout(0.1)(x)
x = layers.Dense(20, activation='relu')(x)
x = layers.Dropout(0.1)(x)
outputs = layers.Dense(num_classes, activation='softmax')(x)
model = keras.Model(inputs=inputs, outputs=outputs)
return model
model = build_transformer_classifier()
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
model.summary()
9. データパイプライン (tf.data)
tf.data.Datasetの基本的な使い方
import tensorflow as tf
import numpy as np
# テンソルからDatasetを作成
X = np.random.randn(1000, 10)
y = np.random.randint(0, 2, 1000)
dataset = tf.data.Dataset.from_tensor_slices((X, y))
print(dataset) # TensorSliceDataset
# 基本変換
dataset = (dataset
.shuffle(buffer_size=1000)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
# データの確認
for batch_X, batch_y in dataset.take(1):
print(f"バッチX形状: {batch_X.shape}") # (32, 10)
print(f"バッチy形状: {batch_y.shape}") # (32,)
# map変換
def preprocess(x, y):
x = tf.cast(x, tf.float32)
y = tf.cast(y, tf.int32)
x = (x - tf.reduce_mean(x)) / tf.math.reduce_std(x)
return x, y
dataset = tf.data.Dataset.from_tensor_slices((X, y))
dataset = dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
# filter変換
positive_dataset = dataset.filter(lambda x, y: y == 1)
10. 高度な学習テクニック
コールバック
import tensorflow as tf
from tensorflow import keras
import os
# ModelCheckpoint: 最良モデルを保存
checkpoint_cb = keras.callbacks.ModelCheckpoint(
filepath='best_model.keras',
monitor='val_accuracy',
mode='max',
save_best_only=True,
verbose=1
)
# EarlyStopping: 過学習を防止
early_stopping_cb = keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=10,
restore_best_weights=True,
verbose=1
)
# ReduceLROnPlateau: 学習率を下げる
reduce_lr_cb = keras.callbacks.ReduceLROnPlateau(
monitor='val_loss',
factor=0.5,
patience=5,
min_lr=1e-7,
verbose=1
)
# TensorBoard: 可視化
tensorboard_cb = keras.callbacks.TensorBoard(
log_dir='logs/',
histogram_freq=1,
write_graph=True,
write_images=True,
update_freq='epoch'
)
callbacks = [
checkpoint_cb,
early_stopping_cb,
reduce_lr_cb,
tensorboard_cb,
]
カスタム学習ループ (GradientTape)
import tensorflow as tf
from tensorflow import keras
import time
# シンプルな分類モデル
model = keras.Sequential([
keras.layers.Dense(64, activation='relu', input_shape=(784,)),
keras.layers.Dense(64, activation='relu'),
keras.layers.Dense(10)
])
# 損失関数とオプティマイザ
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = keras.optimizers.Adam(learning_rate=1e-3)
# メトリクス
train_loss = keras.metrics.Mean(name='train_loss')
train_accuracy = keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')
# 1ステップの学習
@tf.function # JITコンパイルで高速化
def train_step(images, labels):
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = loss_fn(labels, predictions)
# L2正則化
l2_loss = tf.add_n([tf.nn.l2_loss(v) for v in model.trainable_variables
if 'bias' not in v.name])
total_loss = loss + 1e-4 * l2_loss
gradients = tape.gradient(total_loss, model.trainable_variables)
# 勾配クリッピング
gradients, _ = tf.clip_by_global_norm(gradients, 1.0)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
train_loss.update_state(loss)
train_accuracy.update_state(labels, predictions)
return loss
分散学習 (tf.distribute.Strategy)
import tensorflow as tf
from tensorflow import keras
# マルチGPU戦略
strategy = tf.distribute.MirroredStrategy()
print(f"デバイス数: {strategy.num_replicas_in_sync}")
with strategy.scope():
# strategy.scope() の内側でモデルを定義してコンパイル
model = keras.Sequential([
keras.layers.Dense(64, activation='relu', input_shape=(784,)),
keras.layers.Dense(64, activation='relu'),
keras.layers.Dense(10, activation='softmax')
])
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# GPU数に比例してバッチサイズをスケール
BATCH_SIZE_PER_REPLICA = 64
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
# 混合精度学習
from tensorflow.keras import mixed_precision
policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)
11. TensorBoard可視化
基本的なTensorBoard使用法
import tensorflow as tf
from tensorflow import keras
import datetime
# ログディレクトリ
log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
# コールバック設定
tensorboard_callback = keras.callbacks.TensorBoard(
log_dir=log_dir,
histogram_freq=1, # 重みヒストグラムの頻度
write_graph=True, # 計算グラフを記録
write_images=True, # 重みの画像を記録
update_freq='epoch', # 更新頻度
profile_batch=2 # プロファイリングのバッチ
)
# TensorBoardの起動コマンド:
# tensorboard --logdir logs/fit
12. モデルの保存と変換
SavedModel形式
import tensorflow as tf
from tensorflow import keras
# モデルを保存
model.save('saved_model/my_model')
# 読み込み
loaded_model = keras.models.load_model('saved_model/my_model')
# HDF5形式(旧形式)
model.save('my_model.h5')
# 重みのみ保存/読み込み
model.save_weights('model_weights.h5')
model.load_weights('model_weights.h5')
# Keras形式(推奨)
model.save('my_model.keras')
loaded_keras = keras.models.load_model('my_model.keras')
TensorFlow Lite変換(モバイル/エッジ)
import tensorflow as tf
import numpy as np
# 基本的なTFLite変換
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model/my_model')
tflite_model = converter.convert()
with open('model.tflite', 'wb') as f:
f.write(tflite_model)
# 動的範囲量子化
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model/my_model')
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_quantized = converter.convert()
# 完全整数量子化(INT8)
def representative_dataset():
for _ in range(100):
data = np.random.random((1, 784)).astype(np.float32)
yield [data]
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model/my_model')
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_dataset
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8
tflite_int8 = converter.convert()
13. TF-Servingによる本番デプロイ
TensorFlow Servingのセットアップ
# DockerでTF Servingを実行(最も簡単な方法)
docker pull tensorflow/serving
# モデルを提供
docker run -t --rm \
-p 8501:8501 \
-v "/path/to/saved_model:/models/my_model" \
-e MODEL_NAME=my_model \
tensorflow/serving
REST APIで予測を実行
import requests
import json
import numpy as np
# REST APIリクエスト
url = 'http://localhost:8501/v1/models/my_model:predict'
# 入力データを準備
data = np.random.random((1, 784)).astype(float)
payload = {
"instances": data.tolist()
}
response = requests.post(url, json=payload)
result = json.loads(response.text)
print("予測:", result['predictions'])
14. TensorFlow Extended (TFX)
TFXはTensorFlowを基盤とした本番機械学習パイプラインプラットフォームです。
MLパイプラインの概要
import tfx
from tfx.components import (
CsvExampleGen,
StatisticsGen,
SchemaGen,
ExampleValidator,
Transform,
Trainer,
Evaluator,
Pusher
)
from tfx.proto import pusher_pb2, trainer_pb2
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
# インタラクティブコンテキスト(ノートブック向け)
context = InteractiveContext()
# 1. ExampleGen: データ取り込み
example_gen = CsvExampleGen(input_base='data/')
context.run(example_gen)
# 2. StatisticsGen: データ統計
statistics_gen = StatisticsGen(
examples=example_gen.outputs['examples']
)
context.run(statistics_gen)
# 3. SchemaGen: スキーマ生成
schema_gen = SchemaGen(
statistics=statistics_gen.outputs['statistics'],
infer_feature_shape=True
)
context.run(schema_gen)
まとめ
このガイドではTensorFlowとKerasの基本概念から本番デプロイまでを解説しました。
重要なポイント
- Eager Execution: TF 2.x のデフォルト実行モード。グラフ最適化には
@tf.functionを使用 - 3つのKeras API: Sequential(シンプル)、Functional(複雑なトポロジー)、Subclassing(完全カスタマイズ)
- tf.data: map、filter、batch、shuffle、prefetchを使った効率的なデータパイプラインに不可欠
- GradientTape: カスタム学習ループと自動微分
- デプロイオプション: TF Serving(サーバー)、TFLite(モバイル/エッジ)、TF.js(ブラウザ)
- TFX: 本番MLパイプラインのスタンダード
クイズ
Q1. Kerasの3種類のモデル構築APIの違いは何ですか?
答え: Sequential APIは線形スタック向け、Functional APIは複雑な非線形トポロジー向け、Subclassing APIは最大の柔軟性が必要な場合に使います。
解説: 入門者はSequential、マルチ入出力モデルはFunctional、カスタム学習ループが必要な場合はSubclassingが適しています。
Q2. tf.VariableとtF.constantの違いは何ですか?
答え: tf.constantは不変のテンソルで、tf.Variableは値を変更できる学習可能なパラメータです。
解説: モデルの重みやバイアスはtf.Variableとして定義します。学習中にassign、assign_add、assign_subで値を更新できます。
Q3. TFLiteとTF Servingの使い分けは?
答え: TFLiteはモバイル・エッジデバイス向け、TF Servingはサーバーサイドの本番APIとして使います。
解説: TFLiteは量子化によるモデル圧縮とエッジでの推論最適化に特化しています。TF ServingはREST/gRPC APIとバージョン管理機能を提供します。