Split View: TensorFlow & Keras 완전 정복 가이드: Zero to Hero - 설치부터 프로덕션 배포까지
TensorFlow & Keras 완전 정복 가이드: Zero to Hero - 설치부터 프로덕션 배포까지
들어가며
TensorFlow는 Google Brain 팀이 2015년에 오픈소스로 공개한 머신러닝/딥러닝 프레임워크입니다. 현재 가장 널리 사용되는 딥러닝 프레임워크 중 하나로, 연구부터 프로덕션 배포까지 전 과정을 지원합니다. Keras는 TensorFlow 2.x부터 공식 고수준 API로 통합되어, 직관적이고 빠른 모델 개발을 가능하게 합니다.
이 가이드에서는 TensorFlow와 Keras의 기초부터 시작해 실제 프로덕션 환경에 배포하는 것까지 단계별로 완전히 다룹니다.
1. TensorFlow 소개와 설치
TensorFlow vs PyTorch 비교
두 프레임워크는 각각 장단점이 있습니다.
| 항목 | TensorFlow/Keras | PyTorch |
|---|---|---|
| 개발사 | Meta (Facebook) | |
| 배포 도구 | TF Serving, TFLite, TF.js | TorchServe, TorchScript |
| 프로덕션 성숙도 | 매우 높음 | 높음 |
| 연구 인기도 | 높음 | 매우 높음 |
| 학습 곡선 | 중간 | 낮음 |
| 모바일/엣지 | TFLite 우수 | ExecuTorch |
| 생태계 | TFX, TFHub 등 | HuggingFace 통합 우수 |
설치
pip으로 설치하는 방법입니다.
# CPU 전용
pip install tensorflow
# GPU 지원 (CUDA 자동 감지, TF 2.9+)
pip install tensorflow[and-cuda]
# 특정 버전
pip install tensorflow==2.15.0
# conda 환경
conda create -n tf_env python=3.10
conda activate tf_env
conda install -c conda-forge tensorflow
macOS Apple Silicon에서는 다음과 같이 설치합니다.
pip install tensorflow-macos
pip install tensorflow-metal # GPU 가속
TensorFlow 2.x 주요 변화
TensorFlow 2.0에서 가장 큰 변화는 Eager Execution의 기본 활성화입니다. TF 1.x에서는 계산 그래프를 먼저 정의하고 세션(Session)을 통해 실행했지만, TF 2.x에서는 Python 코드처럼 즉시 실행됩니다.
import tensorflow as tf
print(tf.__version__)
# Eager execution 확인
print(tf.executing_eagerly()) # True
# TF 1.x 스타일의 그래프 실행 (호환성)
@tf.function
def compute(x, y):
return x + y
result = compute(tf.constant(1.0), tf.constant(2.0))
print(result) # tf.Tensor(3.0, shape=(), dtype=float32)
GPU 설정 확인
import tensorflow as tf
# GPU 목록 확인
gpus = tf.config.list_physical_devices('GPU')
print("사용 가능한 GPU:", gpus)
# GPU 메모리 증분 할당 (OOM 방지)
if gpus:
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
# 특정 GPU만 사용
if gpus:
tf.config.set_visible_devices(gpus[0], 'GPU')
# 논리적 GPU 분할 (하나의 물리 GPU를 여러 개처럼 사용)
if gpus:
tf.config.set_logical_device_configuration(
gpus[0],
[tf.config.LogicalDeviceConfiguration(memory_limit=2048),
tf.config.LogicalDeviceConfiguration(memory_limit=2048)]
)
# 현재 연산이 실행되는 디바이스 확인
with tf.device('/GPU:0'):
a = tf.constant([[1.0, 2.0], [3.0, 4.0]])
b = tf.constant([[5.0, 6.0], [7.0, 8.0]])
c = tf.matmul(a, b)
print(c.device)
2. TensorFlow 텐서(Tensor) 기초
텐서(Tensor)는 TensorFlow의 핵심 데이터 구조입니다. NumPy 배열과 유사하지만 GPU에서 가속할 수 있으며, 자동 미분을 지원합니다.
tf.constant와 tf.Variable
import tensorflow as tf
import numpy as np
# 스칼라 (0차원 텐서)
scalar = tf.constant(42)
print(scalar) # tf.Tensor(42, shape=(), dtype=int32)
print(scalar.dtype) # tf.int32
print(scalar.shape) # ()
# 벡터 (1차원 텐서)
vector = tf.constant([1.0, 2.0, 3.0])
print(vector) # tf.Tensor([1. 2. 3.], shape=(3,), dtype=float32)
# 행렬 (2차원 텐서)
matrix = tf.constant([[1, 2, 3],
[4, 5, 6]], dtype=tf.float32)
print(matrix.shape) # (2, 3)
# 3차원 텐서
tensor_3d = tf.constant([[[1, 2], [3, 4]],
[[5, 6], [7, 8]]])
print(tensor_3d.shape) # (2, 2, 2)
# 특수 텐서
zeros = tf.zeros([3, 4]) # 0으로 채운 3x4 행렬
ones = tf.ones([2, 3]) # 1로 채운 2x3 행렬
identity = tf.eye(4) # 4x4 단위 행렬
random = tf.random.normal([3, 3]) # 정규분포 랜덤 행렬
# tf.Variable - 학습 파라미터에 사용 (값 변경 가능)
var = tf.Variable([1.0, 2.0, 3.0])
print(var) # <tf.Variable 'Variable:0' ...>
var.assign([4.0, 5.0, 6.0]) # 값 변경
var.assign_add([1.0, 1.0, 1.0]) # 더하기
var.assign_sub([0.5, 0.5, 0.5]) # 빼기
텐서 연산
import tensorflow as tf
a = tf.constant([[1.0, 2.0], [3.0, 4.0]])
b = tf.constant([[5.0, 6.0], [7.0, 8.0]])
# 기본 산술 연산 (element-wise)
print(a + b) # 덧셈
print(a - b) # 뺄셈
print(a * b) # 곱셈 (element-wise)
print(a / b) # 나눗셈
print(a ** 2) # 제곱
# 동등한 TF 함수들
print(tf.add(a, b))
print(tf.subtract(a, b))
print(tf.multiply(a, b))
print(tf.divide(a, b))
# 행렬 곱셈
print(tf.matmul(a, b)) # 또는 a @ b
print(a @ b)
# 수학 함수
x = tf.constant([1.0, 4.0, 9.0, 16.0])
print(tf.sqrt(x)) # [1, 2, 3, 4]
print(tf.exp(x)) # e^x
print(tf.math.log(x)) # 자연로그
# 집계 연산
matrix = tf.constant([[1.0, 2.0, 3.0],
[4.0, 5.0, 6.0]])
print(tf.reduce_sum(matrix)) # 전체 합: 21
print(tf.reduce_sum(matrix, axis=0)) # 열별 합: [5, 7, 9]
print(tf.reduce_sum(matrix, axis=1)) # 행별 합: [6, 15]
print(tf.reduce_mean(matrix)) # 평균
print(tf.reduce_max(matrix)) # 최대값
print(tf.reduce_min(matrix)) # 최소값
print(tf.argmax(matrix, axis=1)) # 행별 최대값 인덱스
# 비교 연산
print(tf.equal(a, b))
print(tf.greater(a, b))
print(tf.less_equal(a, b))
형태 변환
import tensorflow as tf
t = tf.constant([[1, 2, 3, 4],
[5, 6, 7, 8]])
print(t.shape) # (2, 4)
# reshape
reshaped = tf.reshape(t, [4, 2])
print(reshaped.shape) # (4, 2)
reshaped2 = tf.reshape(t, [8])
print(reshaped2.shape) # (8,)
reshaped3 = tf.reshape(t, [-1, 2]) # -1은 자동 계산
print(reshaped3.shape) # (4, 2)
# transpose
transposed = tf.transpose(t)
print(transposed.shape) # (4, 2)
# 고차원 transpose
t3d = tf.random.normal([2, 3, 4])
transposed_3d = tf.transpose(t3d, perm=[0, 2, 1])
print(transposed_3d.shape) # (2, 4, 3)
# expand_dims - 차원 추가
t1d = tf.constant([1, 2, 3])
print(t1d.shape) # (3,)
expanded_0 = tf.expand_dims(t1d, axis=0)
print(expanded_0.shape) # (1, 3)
expanded_1 = tf.expand_dims(t1d, axis=1)
print(expanded_1.shape) # (3, 1)
# squeeze - 크기 1인 차원 제거
t_squeezable = tf.constant([[[1, 2, 3]]])
print(t_squeezable.shape) # (1, 1, 3)
squeezed = tf.squeeze(t_squeezable)
print(squeezed.shape) # (3,)
# concat과 stack
a = tf.constant([[1, 2], [3, 4]])
b = tf.constant([[5, 6], [7, 8]])
concat_0 = tf.concat([a, b], axis=0)
print(concat_0.shape) # (4, 2)
concat_1 = tf.concat([a, b], axis=1)
print(concat_1.shape) # (2, 4)
stacked = tf.stack([a, b], axis=0)
print(stacked.shape) # (2, 2, 2)
Broadcasting
import tensorflow as tf
# 스칼라 broadcasting
matrix = tf.constant([[1.0, 2.0], [3.0, 4.0]])
print(matrix + 10) # 모든 원소에 10 더함
# 벡터 broadcasting
row_vector = tf.constant([10.0, 20.0]) # shape (2,)
print(matrix + row_vector) # 각 행에 row_vector 더함
col_vector = tf.constant([[10.0], [20.0]]) # shape (2, 1)
print(matrix + col_vector) # 각 열에 col_vector 더함
# 배치 연산에서의 broadcasting
batch = tf.random.normal([32, 128]) # 배치 크기 32, 특징 128
mean = tf.reduce_mean(batch, axis=0, keepdims=True) # shape (1, 128)
std = tf.math.reduce_std(batch, axis=0, keepdims=True)
normalized = (batch - mean) / (std + 1e-8) # broadcasting 적용
NumPy와 상호 변환
import tensorflow as tf
import numpy as np
# NumPy -> TensorFlow
np_array = np.array([[1.0, 2.0], [3.0, 4.0]])
tf_tensor = tf.constant(np_array)
tf_tensor2 = tf.convert_to_tensor(np_array)
# TensorFlow -> NumPy
numpy_from_tf = tf_tensor.numpy()
print(type(numpy_from_tf)) # numpy.ndarray
# tf.Tensor는 NumPy 함수 대부분 직접 사용 가능
print(np.sin(tf_tensor))
print(np.sqrt(tf_tensor))
3. Keras Sequential API
Sequential API는 Keras에서 가장 간단한 모델 구축 방법입니다. 레이어를 순서대로 쌓아 선형적인 모델을 만듭니다.
레이어 쌓기와 모델 컴파일
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# 방법 1: add() 메서드
model = keras.Sequential()
model.add(layers.Dense(128, activation='relu', input_shape=(784,)))
model.add(layers.Dropout(0.2))
model.add(layers.Dense(64, activation='relu'))
model.add(layers.Dropout(0.2))
model.add(layers.Dense(10, activation='softmax'))
# 방법 2: 리스트로 정의
model = keras.Sequential([
layers.Dense(128, activation='relu', input_shape=(784,)),
layers.Dropout(0.2),
layers.Dense(64, activation='relu'),
layers.Dropout(0.2),
layers.Dense(10, activation='softmax')
])
# 모델 구조 확인
model.summary()
# 컴파일
model.compile(
optimizer='adam', # 또는 keras.optimizers.Adam(learning_rate=0.001)
loss='sparse_categorical_crossentropy', # 정수 레이블
metrics=['accuracy']
)
# 다양한 optimizer와 loss
model.compile(
optimizer=keras.optimizers.SGD(learning_rate=0.01, momentum=0.9),
loss=keras.losses.CategoricalCrossentropy(),
metrics=[keras.metrics.CategoricalAccuracy()]
)
MNIST 분류 완전 예제
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
# 데이터 로딩
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
# 전처리
x_train = x_train.astype('float32') / 255.0
x_test = x_test.astype('float32') / 255.0
x_train = x_train.reshape(-1, 784)
x_test = x_test.reshape(-1, 784)
print(f"훈련 데이터: {x_train.shape}, 레이블: {y_train.shape}")
print(f"테스트 데이터: {x_test.shape}, 레이블: {y_test.shape}")
# 모델 정의
model = keras.Sequential([
layers.Dense(256, activation='relu', input_shape=(784,)),
layers.BatchNormalization(),
layers.Dropout(0.3),
layers.Dense(128, activation='relu'),
layers.BatchNormalization(),
layers.Dropout(0.3),
layers.Dense(10, activation='softmax')
])
model.summary()
# 컴파일
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=1e-3),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# 훈련
history = model.fit(
x_train, y_train,
batch_size=128,
epochs=20,
validation_split=0.1,
verbose=1
)
# 평가
test_loss, test_acc = model.evaluate(x_test, y_test, verbose=0)
print(f"테스트 손실: {test_loss:.4f}")
print(f"테스트 정확도: {test_acc:.4f}")
# 예측
predictions = model.predict(x_test[:10])
predicted_classes = tf.argmax(predictions, axis=1)
print("예측:", predicted_classes.numpy())
print("실제:", y_test[:10])
# 학습 곡선 시각화
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
ax1.plot(history.history['loss'], label='훈련 손실')
ax1.plot(history.history['val_loss'], label='검증 손실')
ax1.set_title('손실')
ax1.set_xlabel('에포크')
ax1.legend()
ax2.plot(history.history['accuracy'], label='훈련 정확도')
ax2.plot(history.history['val_accuracy'], label='검증 정확도')
ax2.set_title('정확도')
ax2.set_xlabel('에포크')
ax2.legend()
plt.tight_layout()
plt.savefig('mnist_training.png')
plt.show()
4. Keras Functional API
Functional API는 더 복잡한 모델 아키텍처를 구성할 수 있습니다. 다중 입력/출력, 잔차 연결, 공유 레이어 등이 가능합니다.
기본 Functional API 사용법
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# 입력 정의
inputs = keras.Input(shape=(784,), name='input_layer')
# 레이어 연결 (함수처럼 호출)
x = layers.Dense(256, activation='relu', name='dense_1')(inputs)
x = layers.BatchNormalization(name='bn_1')(x)
x = layers.Dropout(0.3, name='dropout_1')(x)
x = layers.Dense(128, activation='relu', name='dense_2')(x)
x = layers.BatchNormalization(name='bn_2')(x)
x = layers.Dropout(0.3, name='dropout_2')(x)
outputs = layers.Dense(10, activation='softmax', name='output')(x)
# 모델 생성
model = keras.Model(inputs=inputs, outputs=outputs, name='mnist_model')
model.summary()
다중 입력/출력 모델
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# 다중 입력 모델 예시: 이미지 + 메타데이터 결합
# 이미지 입력
image_input = keras.Input(shape=(32, 32, 3), name='image')
x1 = layers.Conv2D(32, 3, activation='relu')(image_input)
x1 = layers.GlobalAveragePooling2D()(x1)
x1 = layers.Dense(64, activation='relu')(x1)
# 메타데이터 입력
meta_input = keras.Input(shape=(10,), name='metadata')
x2 = layers.Dense(32, activation='relu')(meta_input)
# 결합
combined = layers.concatenate([x1, x2])
combined = layers.Dense(64, activation='relu')(combined)
# 다중 출력
main_output = layers.Dense(1, activation='sigmoid', name='main_output')(combined)
aux_output = layers.Dense(5, activation='softmax', name='aux_output')(combined)
# 모델 생성
model = keras.Model(
inputs=[image_input, meta_input],
outputs=[main_output, aux_output]
)
# 컴파일 (출력별 손실 및 가중치 지정)
model.compile(
optimizer='adam',
loss={
'main_output': 'binary_crossentropy',
'aux_output': 'categorical_crossentropy'
},
loss_weights={
'main_output': 1.0,
'aux_output': 0.2
},
metrics={
'main_output': ['accuracy'],
'aux_output': ['accuracy']
}
)
잔차 연결 (Residual Connection)
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
def residual_block(x, filters, stride=1):
"""ResNet 스타일 잔차 블록"""
shortcut = x
# 메인 경로
x = layers.Conv2D(filters, 3, strides=stride, padding='same')(x)
x = layers.BatchNormalization()(x)
x = layers.ReLU()(x)
x = layers.Conv2D(filters, 3, padding='same')(x)
x = layers.BatchNormalization()(x)
# 채널 수나 크기가 다를 경우 shortcut 조정
if stride != 1 or shortcut.shape[-1] != filters:
shortcut = layers.Conv2D(filters, 1, strides=stride)(shortcut)
shortcut = layers.BatchNormalization()(shortcut)
# 잔차 더하기
x = layers.add([x, shortcut])
x = layers.ReLU()(x)
return x
# ResNet 스타일 모델 구성
inputs = keras.Input(shape=(32, 32, 3))
x = layers.Conv2D(64, 7, strides=2, padding='same')(inputs)
x = layers.BatchNormalization()(x)
x = layers.ReLU()(x)
x = layers.MaxPooling2D(3, strides=2, padding='same')(x)
x = residual_block(x, 64)
x = residual_block(x, 64)
x = residual_block(x, 128, stride=2)
x = residual_block(x, 128)
x = residual_block(x, 256, stride=2)
x = residual_block(x, 256)
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dense(256, activation='relu')(x)
outputs = layers.Dense(10, activation='softmax')(x)
model = keras.Model(inputs, outputs, name='mini_resnet')
model.summary()
공유 레이어 (Siamese Network)
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# 공유 인코더 정의
shared_encoder = keras.Sequential([
layers.Dense(128, activation='relu'),
layers.Dense(64, activation='relu'),
layers.Dense(32)
], name='shared_encoder')
# 두 개의 입력
input_a = keras.Input(shape=(100,), name='input_a')
input_b = keras.Input(shape=(100,), name='input_b')
# 같은 인코더로 처리
encoded_a = shared_encoder(input_a)
encoded_b = shared_encoder(input_b)
# 거리 계산 (Cosine Similarity)
similarity = layers.Dot(axes=1, normalize=True)([encoded_a, encoded_b])
output = layers.Dense(1, activation='sigmoid')(similarity)
siamese_model = keras.Model(inputs=[input_a, input_b], outputs=output)
siamese_model.summary()
5. Keras Subclassing API
Subclassing API는 가장 유연한 방법으로, PyTorch와 유사한 방식입니다. 커스텀 레이어와 모델을 Python 클래스로 정의합니다.
커스텀 레이어
import tensorflow as tf
from tensorflow import keras
class MyDenseLayer(keras.layers.Layer):
def __init__(self, units, activation=None, **kwargs):
super().__init__(**kwargs)
self.units = units
self.activation = keras.activations.get(activation)
def build(self, input_shape):
# 가중치 초기화 (build는 첫 번째 호출 시 한 번만 실행)
self.w = self.add_weight(
name='kernel',
shape=(input_shape[-1], self.units),
initializer='glorot_uniform',
trainable=True
)
self.b = self.add_weight(
name='bias',
shape=(self.units,),
initializer='zeros',
trainable=True
)
super().build(input_shape)
def call(self, inputs, training=False):
output = tf.matmul(inputs, self.w) + self.b
if self.activation is not None:
output = self.activation(output)
return output
def get_config(self):
config = super().get_config()
config.update({'units': self.units, 'activation': keras.activations.serialize(self.activation)})
return config
# 사용
layer = MyDenseLayer(64, activation='relu')
x = tf.random.normal([32, 128])
y = layer(x)
print(y.shape) # (32, 64)
class MultiHeadSelfAttention(keras.layers.Layer):
"""간단한 멀티헤드 셀프어텐션 레이어"""
def __init__(self, embed_dim, num_heads, **kwargs):
super().__init__(**kwargs)
self.embed_dim = embed_dim
self.num_heads = num_heads
self.head_dim = embed_dim // num_heads
assert self.head_dim * num_heads == embed_dim
self.wq = keras.layers.Dense(embed_dim)
self.wk = keras.layers.Dense(embed_dim)
self.wv = keras.layers.Dense(embed_dim)
self.wo = keras.layers.Dense(embed_dim)
def split_heads(self, x, batch_size):
x = tf.reshape(x, (batch_size, -1, self.num_heads, self.head_dim))
return tf.transpose(x, perm=[0, 2, 1, 3])
def call(self, x, training=False):
batch_size = tf.shape(x)[0]
seq_len = tf.shape(x)[1]
q = self.split_heads(self.wq(x), batch_size)
k = self.split_heads(self.wk(x), batch_size)
v = self.split_heads(self.wv(x), batch_size)
# 스케일드 닷 프로덕트 어텐션
scale = tf.cast(self.head_dim, tf.float32) ** 0.5
scores = tf.matmul(q, k, transpose_b=True) / scale
weights = tf.nn.softmax(scores, axis=-1)
context = tf.matmul(weights, v)
# 헤드 결합
context = tf.transpose(context, perm=[0, 2, 1, 3])
context = tf.reshape(context, (batch_size, seq_len, self.embed_dim))
return self.wo(context)
커스텀 모델 (Model 서브클래싱)
import tensorflow as tf
from tensorflow import keras
class ResidualBlock(keras.layers.Layer):
def __init__(self, filters, **kwargs):
super().__init__(**kwargs)
self.conv1 = keras.layers.Conv2D(filters, 3, padding='same')
self.conv2 = keras.layers.Conv2D(filters, 3, padding='same')
self.bn1 = keras.layers.BatchNormalization()
self.bn2 = keras.layers.BatchNormalization()
self.relu = keras.layers.ReLU()
def call(self, inputs, training=False):
x = self.conv1(inputs)
x = self.bn1(x, training=training)
x = self.relu(x)
x = self.conv2(x)
x = self.bn2(x, training=training)
x = x + inputs # 잔차 연결
return self.relu(x)
class CustomCNN(keras.Model):
def __init__(self, num_classes=10, **kwargs):
super().__init__(**kwargs)
self.conv_stem = keras.layers.Conv2D(32, 3, padding='same', activation='relu')
self.res_block1 = ResidualBlock(32)
self.res_block2 = ResidualBlock(32)
self.pool = keras.layers.MaxPooling2D(2)
self.conv_expand = keras.layers.Conv2D(64, 3, padding='same', activation='relu')
self.res_block3 = ResidualBlock(64)
self.gap = keras.layers.GlobalAveragePooling2D()
self.dropout = keras.layers.Dropout(0.5)
self.fc = keras.layers.Dense(num_classes, activation='softmax')
def call(self, inputs, training=False):
x = self.conv_stem(inputs)
x = self.res_block1(x, training=training)
x = self.res_block2(x, training=training)
x = self.pool(x)
x = self.conv_expand(x)
x = self.res_block3(x, training=training)
x = self.gap(x)
x = self.dropout(x, training=training)
return self.fc(x)
# 사용
model = CustomCNN(num_classes=10)
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
# 더미 데이터로 테스트
dummy_input = tf.random.normal([4, 32, 32, 3])
output = model(dummy_input, training=False)
print(output.shape) # (4, 10)
model.summary()
6. CNN 구현 (CIFAR-10)
기본 CNN 모델
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# CIFAR-10 데이터 로딩
(x_train, y_train), (x_test, y_test) = keras.datasets.cifar10.load_data()
x_train = x_train.astype('float32') / 255.0
x_test = x_test.astype('float32') / 255.0
class_names = ['비행기', '자동차', '새', '고양이', '사슴',
'개', '개구리', '말', '배', '트럭']
print(f"훈련 데이터: {x_train.shape}") # (50000, 32, 32, 3)
print(f"테스트 데이터: {x_test.shape}") # (10000, 32, 32, 3)
# CNN 모델 정의
def build_cnn_model():
model = keras.Sequential([
# 첫 번째 합성곱 블록
layers.Conv2D(32, (3, 3), padding='same', input_shape=(32, 32, 3)),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.Conv2D(32, (3, 3), padding='same'),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.MaxPooling2D((2, 2)),
layers.Dropout(0.25),
# 두 번째 합성곱 블록
layers.Conv2D(64, (3, 3), padding='same'),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.Conv2D(64, (3, 3), padding='same'),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.MaxPooling2D((2, 2)),
layers.Dropout(0.25),
# 세 번째 합성곱 블록
layers.Conv2D(128, (3, 3), padding='same'),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.Conv2D(128, (3, 3), padding='same'),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.GlobalAveragePooling2D(),
layers.Dropout(0.5),
# 완전 연결층
layers.Dense(256, activation='relu'),
layers.BatchNormalization(),
layers.Dropout(0.5),
layers.Dense(10, activation='softmax')
])
return model
model = build_cnn_model()
model.summary()
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=1e-3),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
데이터 증강 (Data Augmentation)
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# Keras 내장 데이터 증강 레이어
data_augmentation = keras.Sequential([
layers.RandomFlip('horizontal'),
layers.RandomRotation(0.1),
layers.RandomZoom(0.1),
layers.RandomTranslation(0.1, 0.1),
layers.RandomContrast(0.1),
], name='data_augmentation')
# 모델에 증강 레이어 통합
inputs = keras.Input(shape=(32, 32, 3))
x = data_augmentation(inputs) # 훈련 시에만 적용됨
x = layers.Rescaling(1./255)(x) # 정규화
# ... CNN 블록들 ...
x = layers.Conv2D(32, 3, padding='same', activation='relu')(x)
x = layers.MaxPooling2D()(x)
x = layers.Conv2D(64, 3, padding='same', activation='relu')(x)
x = layers.MaxPooling2D()(x)
x = layers.GlobalAveragePooling2D()(x)
outputs = layers.Dense(10, activation='softmax')(x)
augmented_model = keras.Model(inputs, outputs)
# tf.data를 이용한 데이터 증강 파이프라인
def augment_image(image, label):
image = tf.cast(image, tf.float32) / 255.0
image = tf.image.random_flip_left_right(image)
image = tf.image.random_brightness(image, 0.2)
image = tf.image.random_contrast(image, 0.8, 1.2)
image = tf.image.random_saturation(image, 0.8, 1.2)
image = tf.image.pad_to_bounding_box(image, 4, 4, 40, 40)
image = tf.image.random_crop(image, [32, 32, 3])
return image, label
(x_train, y_train), (x_test, y_test) = keras.datasets.cifar10.load_data()
y_train = y_train.flatten()
y_test = y_test.flatten()
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.map(augment_image, num_parallel_calls=tf.data.AUTOTUNE)
train_dataset = train_dataset.shuffle(10000).batch(128).prefetch(tf.data.AUTOTUNE)
전이학습 (Transfer Learning)
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.applications import EfficientNetB0
# EfficientNetB0으로 전이학습
def build_transfer_model(num_classes=10):
# 사전 훈련된 베이스 모델 (분류기 제외)
base_model = EfficientNetB0(
include_top=False,
weights='imagenet',
input_shape=(224, 224, 3)
)
# 초기에는 베이스 모델 동결
base_model.trainable = False
inputs = keras.Input(shape=(224, 224, 3))
# EfficientNet은 내부에 전처리 포함
x = base_model(inputs, training=False)
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dense(256, activation='relu')(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(num_classes, activation='softmax')(x)
model = keras.Model(inputs, outputs)
return model, base_model
model, base_model = build_transfer_model(num_classes=10)
model.compile(
optimizer=keras.optimizers.Adam(1e-3),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# 1단계: 동결된 상태로 분류기 훈련
# model.fit(train_dataset, epochs=10, ...)
# Fine-tuning: 베이스 모델의 일부 레이어 해동
def fine_tune(model, base_model, fine_tune_at=100):
base_model.trainable = True
# fine_tune_at 이전 레이어는 계속 동결
for layer in base_model.layers[:fine_tune_at]:
layer.trainable = False
model.compile(
optimizer=keras.optimizers.Adam(1e-5), # 매우 낮은 학습률
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
return model
# fine_tune(model, base_model, fine_tune_at=100)
# model.fit(train_dataset, epochs=20, ...)
7. RNN/LSTM 구현
시계열 예측 (LSTM)
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
# 시계열 데이터 생성
def generate_time_series(n_samples, n_steps):
t = np.linspace(0, 4 * np.pi, n_steps + 1)
series = np.sin(t) + 0.1 * np.random.randn(n_samples, n_steps + 1)
X = series[:, :-1].reshape(-1, n_steps, 1)
y = series[:, 1:].reshape(-1, n_steps, 1)
return X, y
X_train, y_train = generate_time_series(10000, 50)
X_val, y_val = generate_time_series(1000, 50)
X_test, y_test = generate_time_series(1000, 50)
# LSTM 모델
def build_lstm_model(n_steps=50):
model = keras.Sequential([
layers.LSTM(64, return_sequences=True, input_shape=(n_steps, 1)),
layers.Dropout(0.2),
layers.LSTM(64, return_sequences=True),
layers.Dropout(0.2),
layers.TimeDistributed(layers.Dense(1))
])
model.compile(
optimizer='adam',
loss='mse',
metrics=['mae']
)
return model
model = build_lstm_model()
model.summary()
history = model.fit(
X_train, y_train,
epochs=20,
batch_size=128,
validation_data=(X_val, y_val)
)
텍스트 생성 (문자 수준 RNN)
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
# 예시 텍스트 (실제로는 더 큰 텍스트 사용)
text = """TensorFlow는 구글이 만든 딥러닝 프레임워크입니다.
Keras는 TensorFlow 위에서 동작하는 고수준 API입니다.
두 라이브러리를 함께 사용하면 강력한 딥러닝 모델을 빠르게 구축할 수 있습니다."""
# 문자 집합 생성
chars = sorted(set(text))
char2idx = {c: i for i, c in enumerate(chars)}
idx2char = np.array(chars)
vocab_size = len(chars)
# 텍스트를 인덱스로 변환
text_as_int = np.array([char2idx[c] for c in text])
# 시퀀스 생성
seq_length = 50
sequences = tf.data.Dataset.from_tensor_slices(text_as_int)
sequences = sequences.batch(seq_length + 1, drop_remainder=True)
def split_input_target(chunk):
return chunk[:-1], chunk[1:]
dataset = sequences.map(split_input_target)
dataset = dataset.shuffle(100).batch(32, drop_remainder=True)
# 문자 수준 RNN 모델
def build_char_rnn(vocab_size, embed_dim=64, rnn_units=256):
model = keras.Sequential([
layers.Embedding(vocab_size, embed_dim),
layers.GRU(rnn_units, return_sequences=True, stateful=False),
layers.GRU(rnn_units, return_sequences=True),
layers.Dense(vocab_size)
])
return model
model = build_char_rnn(vocab_size)
model.compile(
optimizer='adam',
loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy']
)
# 텍스트 생성 함수
def generate_text(model, start_string, num_generate=100, temperature=1.0):
input_eval = [char2idx[s] for s in start_string]
input_eval = tf.expand_dims(input_eval, 0)
text_generated = []
for _ in range(num_generate):
predictions = model(input_eval)
predictions = tf.squeeze(predictions, 0) / temperature
predicted_id = tf.random.categorical(predictions, num_samples=1)[-1, 0].numpy()
input_eval = tf.expand_dims([predicted_id], 0)
text_generated.append(idx2char[predicted_id])
return start_string + ''.join(text_generated)
Bidirectional LSTM
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# 양방향 LSTM (텍스트 분류)
def build_bidirectional_model(vocab_size, max_len=100, embed_dim=64):
model = keras.Sequential([
layers.Embedding(vocab_size, embed_dim, input_length=max_len),
layers.Bidirectional(layers.LSTM(64, return_sequences=True)),
layers.Bidirectional(layers.LSTM(32)),
layers.Dense(64, activation='relu'),
layers.Dropout(0.5),
layers.Dense(1, activation='sigmoid')
])
model.compile(
optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy']
)
return model
model = build_bidirectional_model(vocab_size=10000, max_len=100)
model.summary()
8. Transformer with Keras
Multi-head Attention과 Transformer Encoder
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
class TransformerBlock(keras.layers.Layer):
def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1, **kwargs):
super().__init__(**kwargs)
self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
self.ffn = keras.Sequential([
layers.Dense(ff_dim, activation='relu'),
layers.Dense(embed_dim)
])
self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
self.dropout1 = layers.Dropout(rate)
self.dropout2 = layers.Dropout(rate)
def call(self, inputs, training=False):
attn_output = self.att(inputs, inputs)
attn_output = self.dropout1(attn_output, training=training)
out1 = self.layernorm1(inputs + attn_output)
ffn_output = self.ffn(out1)
ffn_output = self.dropout2(ffn_output, training=training)
return self.layernorm2(out1 + ffn_output)
class TokenAndPositionEmbedding(keras.layers.Layer):
def __init__(self, maxlen, vocab_size, embed_dim, **kwargs):
super().__init__(**kwargs)
self.token_emb = layers.Embedding(vocab_size, embed_dim)
self.pos_emb = layers.Embedding(maxlen, embed_dim)
def call(self, x):
maxlen = tf.shape(x)[-1]
positions = tf.range(start=0, limit=maxlen, delta=1)
positions = self.pos_emb(positions)
x = self.token_emb(x)
return x + positions
# 텍스트 분류 Transformer 모델
def build_transformer_classifier(
vocab_size=20000,
maxlen=200,
embed_dim=32,
num_heads=2,
ff_dim=32,
num_classes=2
):
inputs = layers.Input(shape=(maxlen,))
embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
x = embedding_layer(inputs)
transformer_block = TransformerBlock(embed_dim, num_heads, ff_dim)
x = transformer_block(x)
x = layers.GlobalAveragePooling1D()(x)
x = layers.Dropout(0.1)(x)
x = layers.Dense(20, activation='relu')(x)
x = layers.Dropout(0.1)(x)
outputs = layers.Dense(num_classes, activation='softmax')(x)
model = keras.Model(inputs=inputs, outputs=outputs)
return model
model = build_transformer_classifier()
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
model.summary()
# IMDB 감성 분류 예시
vocab_size = 20000
maxlen = 200
(x_train, y_train), (x_val, y_val) = keras.datasets.imdb.load_data(num_words=vocab_size)
x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=maxlen)
x_val = keras.preprocessing.sequence.pad_sequences(x_val, maxlen=maxlen)
# model.fit(x_train, y_train, batch_size=32, epochs=5, validation_data=(x_val, y_val))
9. 데이터 파이프라인 (tf.data)
기본 tf.data.Dataset 사용법
import tensorflow as tf
import numpy as np
# 텐서에서 Dataset 생성
X = np.random.randn(1000, 10)
y = np.random.randint(0, 2, 1000)
dataset = tf.data.Dataset.from_tensor_slices((X, y))
print(dataset) # TensorSliceDataset
# 기본 변환
dataset = (dataset
.shuffle(buffer_size=1000)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
# 데이터 확인
for batch_X, batch_y in dataset.take(1):
print(f"배치 X 형태: {batch_X.shape}") # (32, 10)
print(f"배치 y 형태: {batch_y.shape}") # (32,)
# map 변환
def preprocess(x, y):
x = tf.cast(x, tf.float32)
y = tf.cast(y, tf.int32)
x = (x - tf.reduce_mean(x)) / tf.math.reduce_std(x)
return x, y
dataset = tf.data.Dataset.from_tensor_slices((X, y))
dataset = dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
# filter 변환
positive_dataset = dataset.filter(lambda x, y: y == 1)
# 범위로 생성
range_dataset = tf.data.Dataset.range(100)
# zip으로 결합
features_ds = tf.data.Dataset.from_tensor_slices(X)
labels_ds = tf.data.Dataset.from_tensor_slices(y)
combined_ds = tf.data.Dataset.zip((features_ds, labels_ds))
이미지 데이터 로딩 파이프라인
import tensorflow as tf
import os
# 디렉토리 구조: data/train/class_name/image.jpg
def load_and_preprocess_image(path, label, image_size=(224, 224)):
image = tf.io.read_file(path)
image = tf.image.decode_jpeg(image, channels=3)
image = tf.image.resize(image, image_size)
image = tf.cast(image, tf.float32) / 255.0
return image, label
def create_image_dataset(data_dir, batch_size=32, image_size=(224, 224)):
class_names = sorted(os.listdir(data_dir))
class_map = {name: idx for idx, name in enumerate(class_names)}
file_paths = []
labels = []
for class_name in class_names:
class_dir = os.path.join(data_dir, class_name)
for fname in os.listdir(class_dir):
if fname.endswith(('.jpg', '.jpeg', '.png')):
file_paths.append(os.path.join(class_dir, fname))
labels.append(class_map[class_name])
path_ds = tf.data.Dataset.from_tensor_slices(file_paths)
label_ds = tf.data.Dataset.from_tensor_slices(labels)
combined = tf.data.Dataset.zip((path_ds, label_ds))
dataset = combined.map(
lambda p, l: load_and_preprocess_image(p, l, image_size),
num_parallel_calls=tf.data.AUTOTUNE
)
dataset = dataset.shuffle(1000).batch(batch_size).prefetch(tf.data.AUTOTUNE)
return dataset, class_names
# keras 내장 image_dataset_from_directory 사용 (더 편리)
# train_ds = keras.utils.image_dataset_from_directory(
# 'data/train',
# image_size=(224, 224),
# batch_size=32
# )
TFRecord 형식
import tensorflow as tf
# TFRecord 파일 작성
def bytes_feature(value):
if isinstance(value, type(tf.constant(0))):
value = value.numpy()
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def int64_feature(value):
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
def float_feature(value):
return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))
def image_to_tfrecord(image_path, label, writer):
image_string = open(image_path, 'rb').read()
feature = {
'image': bytes_feature(image_string),
'label': int64_feature(label),
}
example = tf.train.Example(features=tf.train.Features(feature=feature))
writer.write(example.SerializeToString())
# TFRecord 파일 읽기
def parse_tfrecord(serialized_example):
feature_description = {
'image': tf.io.FixedLenFeature([], tf.string),
'label': tf.io.FixedLenFeature([], tf.int64),
}
example = tf.io.parse_single_example(serialized_example, feature_description)
image = tf.image.decode_jpeg(example['image'], channels=3)
image = tf.image.resize(image, [224, 224])
image = tf.cast(image, tf.float32) / 255.0
label = example['label']
return image, label
# TFRecord Dataset 생성
# dataset = tf.data.TFRecordDataset(['data.tfrecord'])
# dataset = dataset.map(parse_tfrecord, num_parallel_calls=tf.data.AUTOTUNE)
# dataset = dataset.shuffle(1000).batch(32).prefetch(tf.data.AUTOTUNE)
10. 훈련 고급 기법
콜백 (Callbacks)
import tensorflow as tf
from tensorflow import keras
import os
# ModelCheckpoint: 최적 모델 저장
checkpoint_cb = keras.callbacks.ModelCheckpoint(
filepath='best_model.keras',
monitor='val_accuracy',
mode='max',
save_best_only=True,
verbose=1
)
# EarlyStopping: 과적합 방지
early_stopping_cb = keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=10,
restore_best_weights=True,
verbose=1
)
# ReduceLROnPlateau: 학습률 자동 감소
reduce_lr_cb = keras.callbacks.ReduceLROnPlateau(
monitor='val_loss',
factor=0.5,
patience=5,
min_lr=1e-7,
verbose=1
)
# TensorBoard: 시각화
tensorboard_cb = keras.callbacks.TensorBoard(
log_dir='logs/',
histogram_freq=1,
write_graph=True,
write_images=True,
update_freq='epoch'
)
# LearningRateScheduler: 학습률 스케줄링
def cosine_decay_schedule(epoch, lr):
import math
initial_lr = 1e-3
total_epochs = 100
return initial_lr * (1 + math.cos(math.pi * epoch / total_epochs)) / 2
lr_scheduler_cb = keras.callbacks.LearningRateScheduler(
cosine_decay_schedule, verbose=1
)
# CSV로 로그 저장
csv_logger_cb = keras.callbacks.CSVLogger('training_log.csv')
# 커스텀 콜백
class ConfusionMatrixCallback(keras.callbacks.Callback):
def __init__(self, validation_data, class_names):
super().__init__()
self.X_val, self.y_val = validation_data
self.class_names = class_names
def on_epoch_end(self, epoch, logs=None):
y_pred = tf.argmax(self.model.predict(self.X_val), axis=1)
cm = tf.math.confusion_matrix(self.y_val, y_pred)
if epoch % 10 == 0:
print(f"\n에포크 {epoch} 혼동 행렬:\n{cm.numpy()}")
# 모든 콜백을 하나의 리스트로
callbacks = [
checkpoint_cb,
early_stopping_cb,
reduce_lr_cb,
tensorboard_cb,
csv_logger_cb
]
# 사용 예시
# history = model.fit(
# X_train, y_train,
# epochs=100,
# validation_data=(X_val, y_val),
# callbacks=callbacks
# )
커스텀 훈련 루프 (GradientTape)
import tensorflow as tf
from tensorflow import keras
import time
# 간단한 분류 모델
model = keras.Sequential([
keras.layers.Dense(64, activation='relu', input_shape=(784,)),
keras.layers.Dense(64, activation='relu'),
keras.layers.Dense(10)
])
# 손실 함수와 옵티마이저
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = keras.optimizers.Adam(learning_rate=1e-3)
# 메트릭
train_loss = keras.metrics.Mean(name='train_loss')
train_accuracy = keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')
val_loss = keras.metrics.Mean(name='val_loss')
val_accuracy = keras.metrics.SparseCategoricalAccuracy(name='val_accuracy')
# 단일 훈련 스텝
@tf.function # JIT 컴파일로 속도 향상
def train_step(images, labels):
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = loss_fn(labels, predictions)
# L2 정규화 추가
l2_loss = tf.add_n([tf.nn.l2_loss(v) for v in model.trainable_variables
if 'bias' not in v.name])
total_loss = loss + 1e-4 * l2_loss
gradients = tape.gradient(total_loss, model.trainable_variables)
# 그래디언트 클리핑
gradients, _ = tf.clip_by_global_norm(gradients, 1.0)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
train_loss.update_state(loss)
train_accuracy.update_state(labels, predictions)
return loss
@tf.function
def val_step(images, labels):
predictions = model(images, training=False)
loss = loss_fn(labels, predictions)
val_loss.update_state(loss)
val_accuracy.update_state(labels, predictions)
# 전체 훈련 루프
def train(train_ds, val_ds, epochs=10):
for epoch in range(epochs):
start_time = time.time()
# 메트릭 초기화
train_loss.reset_states()
train_accuracy.reset_states()
val_loss.reset_states()
val_accuracy.reset_states()
# 훈련
for step, (images, labels) in enumerate(train_ds):
train_step(images, labels)
if step % 100 == 0:
print(f"스텝 {step}: 손실={train_loss.result():.4f}")
# 검증
for images, labels in val_ds:
val_step(images, labels)
elapsed = time.time() - start_time
print(f"에포크 {epoch+1}/{epochs} ({elapsed:.1f}s) - "
f"손실: {train_loss.result():.4f}, "
f"정확도: {train_accuracy.result():.4f}, "
f"검증 손실: {val_loss.result():.4f}, "
f"검증 정확도: {val_accuracy.result():.4f}")
분산 학습 (tf.distribute.Strategy)
import tensorflow as tf
from tensorflow import keras
# 다중 GPU 전략
strategy = tf.distribute.MirroredStrategy()
print(f"사용 가능한 디바이스 수: {strategy.num_replicas_in_sync}")
with strategy.scope():
# 모델 정의, 컴파일은 strategy.scope() 안에서
model = keras.Sequential([
keras.layers.Dense(64, activation='relu', input_shape=(784,)),
keras.layers.Dense(64, activation='relu'),
keras.layers.Dense(10, activation='softmax')
])
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# 배치 크기는 GPU 수에 비례하여 증가
BATCH_SIZE_PER_REPLICA = 64
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
# TPU 사용
# resolver = tf.distribute.cluster_resolver.TPUClusterResolver()
# tf.config.experimental_connect_to_cluster(resolver)
# tf.tpu.experimental.initialize_tpu_system(resolver)
# tpu_strategy = tf.distribute.TPUStrategy(resolver)
# 혼합 정밀도 훈련 (Mixed Precision)
from tensorflow.keras import mixed_precision
policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)
with strategy.scope():
inputs = keras.Input(shape=(784,))
x = keras.layers.Dense(64, activation='relu')(inputs)
# 마지막 출력은 float32로
outputs = keras.layers.Dense(10, activation='softmax', dtype='float32')(x)
model_mp = keras.Model(inputs, outputs)
opt = keras.optimizers.Adam(1e-3)
opt = mixed_precision.LossScaleOptimizer(opt)
model_mp.compile(
optimizer=opt,
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
11. TensorBoard 시각화
기본 TensorBoard 사용
import tensorflow as tf
from tensorflow import keras
import datetime
import numpy as np
# 로그 디렉토리
log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
# 콜백 설정
tensorboard_callback = keras.callbacks.TensorBoard(
log_dir=log_dir,
histogram_freq=1, # 가중치 히스토그램 주기
write_graph=True, # 계산 그래프 기록
write_images=True, # 가중치 이미지 기록
update_freq='epoch', # 업데이트 주기
profile_batch=2 # 프로파일링 배치
)
# 커스텀 스칼라 기록
file_writer = tf.summary.create_file_writer(log_dir + '/custom_scalars')
def log_custom_metrics(epoch, logs):
with file_writer.as_default():
tf.summary.scalar('learning_rate',
data=keras.backend.get_value(model.optimizer.lr),
step=epoch)
lr_callback = keras.callbacks.LambdaCallback(on_epoch_end=log_custom_metrics)
# 이미지 기록
def log_images(epoch, logs):
# 테스트 이미지 가져오기
(_, _), (x_test, y_test) = keras.datasets.mnist.load_data()
x_test = x_test[:10].reshape(-1, 28, 28, 1).astype('float32') / 255.0
with file_writer.as_default():
tf.summary.image("테스트 샘플", x_test, step=epoch, max_outputs=10)
image_callback = keras.callbacks.LambdaCallback(on_epoch_end=log_images)
# TensorBoard 시작 명령어
# tensorboard --logdir logs/fit
임베딩 시각화
import tensorflow as tf
from tensorflow import keras
import os
import numpy as np
from tensorboard.plugins import projector
# 임베딩 레이어 학습 후 시각화
(x_train, y_train), _ = keras.datasets.mnist.load_data()
x_train = x_train.astype('float32').reshape(-1, 784) / 255.0
# 임베딩 모델
embedding_model = keras.Sequential([
keras.layers.Dense(64, activation='relu', input_shape=(784,)),
keras.layers.Dense(32, name='embedding') # 임베딩 레이어
])
# 임베딩 추출
embeddings = embedding_model.predict(x_train[:1000])
# 임베딩 파일 저장
log_dir = 'logs/embedding'
os.makedirs(log_dir, exist_ok=True)
np.savetxt(os.path.join(log_dir, 'vectors.tsv'), embeddings, delimiter='\t')
# 메타데이터 (레이블)
with open(os.path.join(log_dir, 'metadata.tsv'), 'w') as f:
for label in y_train[:1000]:
f.write(f"{label}\n")
# Projector 설정
config = projector.ProjectorConfig()
embedding_config = config.embeddings.add()
embedding_config.tensor_name = "embedding/.ATTRIBUTES/VARIABLE_VALUE"
embedding_config.tensor_path = 'vectors.tsv'
embedding_config.metadata_path = 'metadata.tsv'
projector.visualize_embeddings(log_dir, config)
12. 모델 저장과 변환
SavedModel 형식
import tensorflow as tf
from tensorflow import keras
# 모델 저장
model.save('saved_model/my_model')
# 로드
loaded_model = keras.models.load_model('saved_model/my_model')
# HDF5 형식 (레거시)
model.save('my_model.h5')
loaded_h5 = keras.models.load_model('my_model.h5')
# 가중치만 저장/로드
model.save_weights('model_weights.h5')
model.load_weights('model_weights.h5')
# Keras 네이티브 형식 (권장)
model.save('my_model.keras')
loaded_keras = keras.models.load_model('my_model.keras')
# 서브클래싱 모델의 경우 get_config 구현 필요
class MyModel(keras.Model):
def __init__(self, units):
super().__init__()
self.units = units
self.dense = keras.layers.Dense(units)
def call(self, inputs):
return self.dense(inputs)
def get_config(self):
return {'units': self.units}
@classmethod
def from_config(cls, config):
return cls(**config)
TensorFlow Lite 변환 (모바일/엣지)
import tensorflow as tf
from tensorflow import keras
import numpy as np
# 기본 TFLite 변환
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model/my_model')
tflite_model = converter.convert()
with open('model.tflite', 'wb') as f:
f.write(tflite_model)
# 최적화 옵션들
# 동적 범위 양자화
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model/my_model')
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_quantized = converter.convert()
# 전체 정수 양자화 (INT8)
def representative_dataset():
for _ in range(100):
data = np.random.random((1, 784)).astype(np.float32)
yield [data]
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model/my_model')
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_dataset
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8
tflite_int8 = converter.convert()
# TFLite 모델 실행
interpreter = tf.lite.Interpreter(model_content=tflite_quantized)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
print("입력:", input_details[0]['shape'])
print("출력:", output_details[0]['shape'])
# 추론
input_data = np.random.random((1, 784)).astype(np.float32)
interpreter.set_tensor(input_details[0]['index'], input_data)
interpreter.invoke()
output_data = interpreter.get_tensor(output_details[0]['index'])
print("TFLite 출력:", output_data.shape)
TensorFlow.js 변환
# tfjs 변환 도구 설치
pip install tensorflowjs
# SavedModel에서 TFJS로 변환
tensorflowjs_converter \
--input_format=tf_saved_model \
--output_format=tfjs_graph_model \
--signature_name=serving_default \
saved_model/my_model \
tfjs_model/
13. TF-Serving으로 프로덕션 배포
TensorFlow Serving 설치 및 기본 사용
# Docker로 TF Serving 실행 (가장 쉬운 방법)
docker pull tensorflow/serving
# 모델 서빙
docker run -t --rm \
-p 8501:8501 \
-v "/path/to/saved_model:/models/my_model" \
-e MODEL_NAME=my_model \
tensorflow/serving
# GPU 지원
docker run --gpus all -t --rm \
-p 8501:8501 \
-v "/path/to/saved_model:/models/my_model" \
-e MODEL_NAME=my_model \
tensorflow/serving:latest-gpu
REST API로 추론 요청
import requests
import json
import numpy as np
# REST API 요청
url = 'http://localhost:8501/v1/models/my_model:predict'
# 입력 데이터 준비
data = np.random.random((1, 784)).astype(float)
payload = {
"instances": data.tolist()
}
response = requests.post(url, json=payload)
result = json.loads(response.text)
print("예측 결과:", result['predictions'])
# 모델 정보 확인
info_url = 'http://localhost:8501/v1/models/my_model'
info_response = requests.get(info_url)
print("모델 정보:", json.loads(info_response.text))
gRPC 클라이언트
import grpc
import numpy as np
from tensorflow_serving.apis import predict_pb2
from tensorflow_serving.apis import prediction_service_pb2_grpc
import tensorflow as tf
# gRPC 채널 생성
channel = grpc.insecure_channel('localhost:8500')
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
# 요청 생성
request = predict_pb2.PredictRequest()
request.model_spec.name = 'my_model'
request.model_spec.signature_name = 'serving_default'
# 입력 텐서 설정
input_data = np.random.random((1, 784)).astype(np.float32)
request.inputs['input_layer'].CopyFrom(
tf.make_tensor_proto(input_data, shape=input_data.shape)
)
# 추론 실행
response = stub.Predict(request, 10.0) # 10초 타임아웃
output = tf.make_ndarray(response.outputs['output'])
print("gRPC 예측 결과:", output)
버전 관리와 Canary 배포
# model.config 파일
model_config_list {
config {
name: 'my_model'
base_path: '/models/my_model/'
model_platform: 'tensorflow'
model_version_policy {
specific {
versions: 1
versions: 2
}
}
version_labels {
key: 'stable'
value: 1
}
version_labels {
key: 'canary'
value: 2
}
}
}
# 설정 파일로 서빙
docker run -t --rm \
-p 8501:8501 -p 8500:8500 \
-v "/path/to/models:/models" \
-v "/path/to/model.config:/models/model.config" \
tensorflow/serving \
--model_config_file=/models/model.config
# 특정 버전으로 요청
url = 'http://localhost:8501/v1/models/my_model/versions/1:predict'
# 또는 레이블로 요청
url_label = 'http://localhost:8501/v1/models/my_model/labels/stable:predict'
14. TensorFlow Extended (TFX)
TFX는 TensorFlow 기반의 프로덕션 머신러닝 파이프라인 플랫폼입니다.
ML 파이프라인 개요
import tfx
from tfx.components import (
CsvExampleGen,
StatisticsGen,
SchemaGen,
ExampleValidator,
Transform,
Trainer,
Evaluator,
Pusher
)
from tfx.proto import pusher_pb2, trainer_pb2
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
# 인터랙티브 컨텍스트 생성 (노트북에서 사용)
context = InteractiveContext()
# 1. ExampleGen: 데이터 수집
example_gen = CsvExampleGen(input_base='data/')
context.run(example_gen)
# 2. StatisticsGen: 데이터 통계
statistics_gen = StatisticsGen(
examples=example_gen.outputs['examples']
)
context.run(statistics_gen)
# 3. SchemaGen: 스키마 생성
schema_gen = SchemaGen(
statistics=statistics_gen.outputs['statistics'],
infer_feature_shape=True
)
context.run(schema_gen)
# 4. ExampleValidator: 데이터 검증
example_validator = ExampleValidator(
statistics=statistics_gen.outputs['statistics'],
schema=schema_gen.outputs['schema']
)
context.run(example_validator)
# 5. Transform: 특징 엔지니어링
# transform.py 파일에 preprocessing_fn 정의 필요
# 6. Trainer: 모델 훈련
trainer = Trainer(
module_file='trainer_module.py',
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
train_args=trainer_pb2.TrainArgs(num_steps=1000),
eval_args=trainer_pb2.EvalArgs(num_steps=500)
)
# 7. Pusher: 모델 배포
pusher = Pusher(
model=trainer.outputs['model'],
push_destination=pusher_pb2.PushDestination(
filesystem=pusher_pb2.PushDestination.Filesystem(
base_directory='serving_model/'
)
)
)
Transform 컴포넌트 예시
import tensorflow as tf
import tensorflow_transform as tft
# transform.py
FEATURE_KEYS = ['feature1', 'feature2', 'feature3']
LABEL_KEY = 'label'
def preprocessing_fn(inputs):
"""특징 전처리 함수"""
outputs = {}
for key in FEATURE_KEYS:
# 정규화
outputs[key] = tft.scale_to_z_score(inputs[key])
# 레이블 인코딩
outputs[LABEL_KEY] = tf.cast(inputs[LABEL_KEY], tf.int64)
return outputs
마무리
이 가이드에서는 TensorFlow와 Keras의 핵심 개념부터 프로덕션 배포까지 광범위하게 다루었습니다.
핵심 정리
- Eager Execution: TF 2.x의 기본 실행 모드,
@tf.function으로 그래프 최적화 - Keras 3가지 API: Sequential(간단), Functional(복잡한 토폴로지), Subclassing(완전한 커스터마이징)
- tf.data: 효율적인 데이터 파이프라인을 위한 필수 도구
- GradientTape: 커스텀 훈련 루프와 자동 미분
- 배포 옵션: TF Serving(서버), TFLite(모바일/엣지), TF.js(브라우저)
- TFX: 프로덕션 ML 파이프라인의 표준
더 배울 내용
- TensorFlow Probability (확률론적 딥러닝)
- Keras Tuner (하이퍼파라미터 최적화)
- TensorFlow Datasets (표준 데이터셋)
- TensorFlow Hub (사전 훈련된 모델)
- tf-agents (강화학습)
참고 자료
TensorFlow & Keras Complete Guide: Zero to Hero - From Installation to Production Deployment
Introduction
TensorFlow is a machine learning and deep learning framework open-sourced by Google Brain in 2015. It is one of the most widely used deep learning frameworks today, supporting the entire process from research to production deployment. Keras, integrated as the official high-level API starting from TensorFlow 2.x, enables intuitive and rapid model development.
This guide covers TensorFlow and Keras comprehensively, from fundamental concepts to deploying models in real production environments, step by step.
1. Introduction to TensorFlow and Installation
TensorFlow vs PyTorch Comparison
Both frameworks have their strengths and weaknesses.
| Feature | TensorFlow/Keras | PyTorch |
|---|---|---|
| Creator | Meta (Facebook) | |
| Deployment Tools | TF Serving, TFLite, TF.js | TorchServe, TorchScript |
| Production Maturity | Very High | High |
| Research Popularity | High | Very High |
| Learning Curve | Moderate | Low |
| Mobile/Edge | TFLite Excellent | ExecuTorch |
| Ecosystem | TFX, TFHub etc. | HuggingFace Integration |
Installation
Install using pip:
# CPU only
pip install tensorflow
# GPU support (auto-detects CUDA, TF 2.9+)
pip install tensorflow[and-cuda]
# Specific version
pip install tensorflow==2.15.0
# conda environment
conda create -n tf_env python=3.10
conda activate tf_env
conda install -c conda-forge tensorflow
For macOS Apple Silicon:
pip install tensorflow-macos
pip install tensorflow-metal # GPU acceleration
Key Changes in TensorFlow 2.x
The most significant change in TensorFlow 2.0 is that Eager Execution is enabled by default. In TF 1.x, you had to define a computation graph first and run it through a Session. In TF 2.x, operations execute immediately like regular Python code.
import tensorflow as tf
print(tf.__version__)
# Check eager execution
print(tf.executing_eagerly()) # True
# Graph execution (TF 1.x compatible style)
@tf.function
def compute(x, y):
return x + y
result = compute(tf.constant(1.0), tf.constant(2.0))
print(result) # tf.Tensor(3.0, shape=(), dtype=float32)
Verifying GPU Setup
import tensorflow as tf
# List available GPUs
gpus = tf.config.list_physical_devices('GPU')
print("Available GPUs:", gpus)
# Enable memory growth (prevents OOM errors)
if gpus:
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
# Use only specific GPU
if gpus:
tf.config.set_visible_devices(gpus[0], 'GPU')
# Split one physical GPU into multiple logical GPUs
if gpus:
tf.config.set_logical_device_configuration(
gpus[0],
[tf.config.LogicalDeviceConfiguration(memory_limit=2048),
tf.config.LogicalDeviceConfiguration(memory_limit=2048)]
)
# Check device where operation runs
with tf.device('/GPU:0'):
a = tf.constant([[1.0, 2.0], [3.0, 4.0]])
b = tf.constant([[5.0, 6.0], [7.0, 8.0]])
c = tf.matmul(a, b)
print(c.device)
2. TensorFlow Tensor Basics
Tensors are the core data structure in TensorFlow. Similar to NumPy arrays but can be accelerated on GPUs and support automatic differentiation.
tf.constant and tf.Variable
import tensorflow as tf
import numpy as np
# Scalar (0-dimensional tensor)
scalar = tf.constant(42)
print(scalar) # tf.Tensor(42, shape=(), dtype=int32)
print(scalar.dtype) # tf.int32
print(scalar.shape) # ()
# Vector (1-dimensional tensor)
vector = tf.constant([1.0, 2.0, 3.0])
print(vector) # tf.Tensor([1. 2. 3.], shape=(3,), dtype=float32)
# Matrix (2-dimensional tensor)
matrix = tf.constant([[1, 2, 3],
[4, 5, 6]], dtype=tf.float32)
print(matrix.shape) # (2, 3)
# 3-dimensional tensor
tensor_3d = tf.constant([[[1, 2], [3, 4]],
[[5, 6], [7, 8]]])
print(tensor_3d.shape) # (2, 2, 2)
# Special tensors
zeros = tf.zeros([3, 4]) # 3x4 matrix of zeros
ones = tf.ones([2, 3]) # 2x3 matrix of ones
identity = tf.eye(4) # 4x4 identity matrix
random = tf.random.normal([3, 3]) # 3x3 random normal matrix
# tf.Variable - used for trainable parameters (mutable)
var = tf.Variable([1.0, 2.0, 3.0])
print(var) # <tf.Variable 'Variable:0' ...>
var.assign([4.0, 5.0, 6.0]) # Update value
var.assign_add([1.0, 1.0, 1.0]) # Add to value
var.assign_sub([0.5, 0.5, 0.5]) # Subtract from value
Tensor Operations
import tensorflow as tf
a = tf.constant([[1.0, 2.0], [3.0, 4.0]])
b = tf.constant([[5.0, 6.0], [7.0, 8.0]])
# Basic arithmetic (element-wise)
print(a + b) # Addition
print(a - b) # Subtraction
print(a * b) # Multiplication (element-wise)
print(a / b) # Division
print(a ** 2) # Power
# Equivalent TF functions
print(tf.add(a, b))
print(tf.subtract(a, b))
print(tf.multiply(a, b))
print(tf.divide(a, b))
# Matrix multiplication
print(tf.matmul(a, b)) # or a @ b
print(a @ b)
# Math functions
x = tf.constant([1.0, 4.0, 9.0, 16.0])
print(tf.sqrt(x)) # [1, 2, 3, 4]
print(tf.exp(x)) # e^x
print(tf.math.log(x)) # Natural log
# Reduction operations
matrix = tf.constant([[1.0, 2.0, 3.0],
[4.0, 5.0, 6.0]])
print(tf.reduce_sum(matrix)) # Total sum: 21
print(tf.reduce_sum(matrix, axis=0)) # Column sum: [5, 7, 9]
print(tf.reduce_sum(matrix, axis=1)) # Row sum: [6, 15]
print(tf.reduce_mean(matrix)) # Mean
print(tf.reduce_max(matrix)) # Max value
print(tf.reduce_min(matrix)) # Min value
print(tf.argmax(matrix, axis=1)) # Index of max per row
# Comparison operations
print(tf.equal(a, b))
print(tf.greater(a, b))
print(tf.less_equal(a, b))
Shape Transformations
import tensorflow as tf
t = tf.constant([[1, 2, 3, 4],
[5, 6, 7, 8]])
print(t.shape) # (2, 4)
# reshape
reshaped = tf.reshape(t, [4, 2])
print(reshaped.shape) # (4, 2)
reshaped2 = tf.reshape(t, [8])
print(reshaped2.shape) # (8,)
reshaped3 = tf.reshape(t, [-1, 2]) # -1 is inferred
print(reshaped3.shape) # (4, 2)
# transpose
transposed = tf.transpose(t)
print(transposed.shape) # (4, 2)
# Higher-dimensional transpose
t3d = tf.random.normal([2, 3, 4])
transposed_3d = tf.transpose(t3d, perm=[0, 2, 1])
print(transposed_3d.shape) # (2, 4, 3)
# expand_dims - add dimension
t1d = tf.constant([1, 2, 3])
print(t1d.shape) # (3,)
expanded_0 = tf.expand_dims(t1d, axis=0)
print(expanded_0.shape) # (1, 3)
expanded_1 = tf.expand_dims(t1d, axis=1)
print(expanded_1.shape) # (3, 1)
# squeeze - remove size-1 dimensions
t_squeezable = tf.constant([[[1, 2, 3]]])
print(t_squeezable.shape) # (1, 1, 3)
squeezed = tf.squeeze(t_squeezable)
print(squeezed.shape) # (3,)
# concat and stack
a = tf.constant([[1, 2], [3, 4]])
b = tf.constant([[5, 6], [7, 8]])
concat_0 = tf.concat([a, b], axis=0)
print(concat_0.shape) # (4, 2)
concat_1 = tf.concat([a, b], axis=1)
print(concat_1.shape) # (2, 4)
stacked = tf.stack([a, b], axis=0)
print(stacked.shape) # (2, 2, 2)
Broadcasting
import tensorflow as tf
# Scalar broadcasting
matrix = tf.constant([[1.0, 2.0], [3.0, 4.0]])
print(matrix + 10) # Adds 10 to every element
# Vector broadcasting
row_vector = tf.constant([10.0, 20.0]) # shape (2,)
print(matrix + row_vector) # Adds row_vector to each row
col_vector = tf.constant([[10.0], [20.0]]) # shape (2, 1)
print(matrix + col_vector) # Adds col_vector to each column
# Broadcasting in batch operations
batch = tf.random.normal([32, 128]) # batch size 32, 128 features
mean = tf.reduce_mean(batch, axis=0, keepdims=True) # shape (1, 128)
std = tf.math.reduce_std(batch, axis=0, keepdims=True)
normalized = (batch - mean) / (std + 1e-8) # broadcasting applied
Interoperability with NumPy
import tensorflow as tf
import numpy as np
# NumPy -> TensorFlow
np_array = np.array([[1.0, 2.0], [3.0, 4.0]])
tf_tensor = tf.constant(np_array)
tf_tensor2 = tf.convert_to_tensor(np_array)
# TensorFlow -> NumPy
numpy_from_tf = tf_tensor.numpy()
print(type(numpy_from_tf)) # numpy.ndarray
# tf.Tensor supports most NumPy functions directly
print(np.sin(tf_tensor))
print(np.sqrt(tf_tensor))
3. Keras Sequential API
The Sequential API is the simplest way to build models in Keras. It stacks layers in a linear sequence.
Stacking Layers and Compiling the Model
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# Method 1: add() method
model = keras.Sequential()
model.add(layers.Dense(128, activation='relu', input_shape=(784,)))
model.add(layers.Dropout(0.2))
model.add(layers.Dense(64, activation='relu'))
model.add(layers.Dropout(0.2))
model.add(layers.Dense(10, activation='softmax'))
# Method 2: List definition
model = keras.Sequential([
layers.Dense(128, activation='relu', input_shape=(784,)),
layers.Dropout(0.2),
layers.Dense(64, activation='relu'),
layers.Dropout(0.2),
layers.Dense(10, activation='softmax')
])
# View model architecture
model.summary()
# Compile
model.compile(
optimizer='adam', # or keras.optimizers.Adam(learning_rate=0.001)
loss='sparse_categorical_crossentropy', # integer labels
metrics=['accuracy']
)
# Various optimizers and losses
model.compile(
optimizer=keras.optimizers.SGD(learning_rate=0.01, momentum=0.9),
loss=keras.losses.CategoricalCrossentropy(),
metrics=[keras.metrics.CategoricalAccuracy()]
)
Complete MNIST Classification Example
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
# Load data
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
# Preprocessing
x_train = x_train.astype('float32') / 255.0
x_test = x_test.astype('float32') / 255.0
x_train = x_train.reshape(-1, 784)
x_test = x_test.reshape(-1, 784)
print(f"Training data: {x_train.shape}, Labels: {y_train.shape}")
print(f"Test data: {x_test.shape}, Labels: {y_test.shape}")
# Define model
model = keras.Sequential([
layers.Dense(256, activation='relu', input_shape=(784,)),
layers.BatchNormalization(),
layers.Dropout(0.3),
layers.Dense(128, activation='relu'),
layers.BatchNormalization(),
layers.Dropout(0.3),
layers.Dense(10, activation='softmax')
])
model.summary()
# Compile
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=1e-3),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# Train
history = model.fit(
x_train, y_train,
batch_size=128,
epochs=20,
validation_split=0.1,
verbose=1
)
# Evaluate
test_loss, test_acc = model.evaluate(x_test, y_test, verbose=0)
print(f"Test loss: {test_loss:.4f}")
print(f"Test accuracy: {test_acc:.4f}")
# Predict
predictions = model.predict(x_test[:10])
predicted_classes = tf.argmax(predictions, axis=1)
print("Predicted:", predicted_classes.numpy())
print("Actual:", y_test[:10])
# Plot training curves
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
ax1.plot(history.history['loss'], label='Train Loss')
ax1.plot(history.history['val_loss'], label='Val Loss')
ax1.set_title('Loss')
ax1.set_xlabel('Epoch')
ax1.legend()
ax2.plot(history.history['accuracy'], label='Train Accuracy')
ax2.plot(history.history['val_accuracy'], label='Val Accuracy')
ax2.set_title('Accuracy')
ax2.set_xlabel('Epoch')
ax2.legend()
plt.tight_layout()
plt.savefig('mnist_training.png')
plt.show()
4. Keras Functional API
The Functional API allows building more complex model architectures, including multiple inputs/outputs, residual connections, and shared layers.
Basic Functional API Usage
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# Define input
inputs = keras.Input(shape=(784,), name='input_layer')
# Connect layers (called as functions)
x = layers.Dense(256, activation='relu', name='dense_1')(inputs)
x = layers.BatchNormalization(name='bn_1')(x)
x = layers.Dropout(0.3, name='dropout_1')(x)
x = layers.Dense(128, activation='relu', name='dense_2')(x)
x = layers.BatchNormalization(name='bn_2')(x)
x = layers.Dropout(0.3, name='dropout_2')(x)
outputs = layers.Dense(10, activation='softmax', name='output')(x)
# Create model
model = keras.Model(inputs=inputs, outputs=outputs, name='mnist_model')
model.summary()
Multiple Input/Output Model
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# Example: Combining image + metadata
# Image input
image_input = keras.Input(shape=(32, 32, 3), name='image')
x1 = layers.Conv2D(32, 3, activation='relu')(image_input)
x1 = layers.GlobalAveragePooling2D()(x1)
x1 = layers.Dense(64, activation='relu')(x1)
# Metadata input
meta_input = keras.Input(shape=(10,), name='metadata')
x2 = layers.Dense(32, activation='relu')(meta_input)
# Merge
combined = layers.concatenate([x1, x2])
combined = layers.Dense(64, activation='relu')(combined)
# Multiple outputs
main_output = layers.Dense(1, activation='sigmoid', name='main_output')(combined)
aux_output = layers.Dense(5, activation='softmax', name='aux_output')(combined)
# Build model
model = keras.Model(
inputs=[image_input, meta_input],
outputs=[main_output, aux_output]
)
# Compile with per-output loss and weights
model.compile(
optimizer='adam',
loss={
'main_output': 'binary_crossentropy',
'aux_output': 'categorical_crossentropy'
},
loss_weights={
'main_output': 1.0,
'aux_output': 0.2
},
metrics={
'main_output': ['accuracy'],
'aux_output': ['accuracy']
}
)
Residual Connections
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
def residual_block(x, filters, stride=1):
"""ResNet-style residual block"""
shortcut = x
# Main path
x = layers.Conv2D(filters, 3, strides=stride, padding='same')(x)
x = layers.BatchNormalization()(x)
x = layers.ReLU()(x)
x = layers.Conv2D(filters, 3, padding='same')(x)
x = layers.BatchNormalization()(x)
# Adjust shortcut when dimensions differ
if stride != 1 or shortcut.shape[-1] != filters:
shortcut = layers.Conv2D(filters, 1, strides=stride)(shortcut)
shortcut = layers.BatchNormalization()(shortcut)
# Add residual
x = layers.add([x, shortcut])
x = layers.ReLU()(x)
return x
# Build a ResNet-style model
inputs = keras.Input(shape=(32, 32, 3))
x = layers.Conv2D(64, 7, strides=2, padding='same')(inputs)
x = layers.BatchNormalization()(x)
x = layers.ReLU()(x)
x = layers.MaxPooling2D(3, strides=2, padding='same')(x)
x = residual_block(x, 64)
x = residual_block(x, 64)
x = residual_block(x, 128, stride=2)
x = residual_block(x, 128)
x = residual_block(x, 256, stride=2)
x = residual_block(x, 256)
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dense(256, activation='relu')(x)
outputs = layers.Dense(10, activation='softmax')(x)
model = keras.Model(inputs, outputs, name='mini_resnet')
model.summary()
Shared Layers (Siamese Network)
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# Define shared encoder
shared_encoder = keras.Sequential([
layers.Dense(128, activation='relu'),
layers.Dense(64, activation='relu'),
layers.Dense(32)
], name='shared_encoder')
# Two inputs
input_a = keras.Input(shape=(100,), name='input_a')
input_b = keras.Input(shape=(100,), name='input_b')
# Process with the same encoder
encoded_a = shared_encoder(input_a)
encoded_b = shared_encoder(input_b)
# Cosine similarity
similarity = layers.Dot(axes=1, normalize=True)([encoded_a, encoded_b])
output = layers.Dense(1, activation='sigmoid')(similarity)
siamese_model = keras.Model(inputs=[input_a, input_b], outputs=output)
siamese_model.summary()
5. Keras Subclassing API
The Subclassing API is the most flexible approach, similar to PyTorch. Define custom layers and models as Python classes.
Custom Layers
import tensorflow as tf
from tensorflow import keras
class MyDenseLayer(keras.layers.Layer):
def __init__(self, units, activation=None, **kwargs):
super().__init__(**kwargs)
self.units = units
self.activation = keras.activations.get(activation)
def build(self, input_shape):
# Initialize weights (build runs once on first call)
self.w = self.add_weight(
name='kernel',
shape=(input_shape[-1], self.units),
initializer='glorot_uniform',
trainable=True
)
self.b = self.add_weight(
name='bias',
shape=(self.units,),
initializer='zeros',
trainable=True
)
super().build(input_shape)
def call(self, inputs, training=False):
output = tf.matmul(inputs, self.w) + self.b
if self.activation is not None:
output = self.activation(output)
return output
def get_config(self):
config = super().get_config()
config.update({'units': self.units, 'activation': keras.activations.serialize(self.activation)})
return config
# Usage
layer = MyDenseLayer(64, activation='relu')
x = tf.random.normal([32, 128])
y = layer(x)
print(y.shape) # (32, 64)
class MultiHeadSelfAttention(keras.layers.Layer):
"""Simple multi-head self-attention layer"""
def __init__(self, embed_dim, num_heads, **kwargs):
super().__init__(**kwargs)
self.embed_dim = embed_dim
self.num_heads = num_heads
self.head_dim = embed_dim // num_heads
assert self.head_dim * num_heads == embed_dim
self.wq = keras.layers.Dense(embed_dim)
self.wk = keras.layers.Dense(embed_dim)
self.wv = keras.layers.Dense(embed_dim)
self.wo = keras.layers.Dense(embed_dim)
def split_heads(self, x, batch_size):
x = tf.reshape(x, (batch_size, -1, self.num_heads, self.head_dim))
return tf.transpose(x, perm=[0, 2, 1, 3])
def call(self, x, training=False):
batch_size = tf.shape(x)[0]
seq_len = tf.shape(x)[1]
q = self.split_heads(self.wq(x), batch_size)
k = self.split_heads(self.wk(x), batch_size)
v = self.split_heads(self.wv(x), batch_size)
# Scaled dot-product attention
scale = tf.cast(self.head_dim, tf.float32) ** 0.5
scores = tf.matmul(q, k, transpose_b=True) / scale
weights = tf.nn.softmax(scores, axis=-1)
context = tf.matmul(weights, v)
# Merge heads
context = tf.transpose(context, perm=[0, 2, 1, 3])
context = tf.reshape(context, (batch_size, seq_len, self.embed_dim))
return self.wo(context)
Custom Model (Model Subclassing)
import tensorflow as tf
from tensorflow import keras
class ResidualBlock(keras.layers.Layer):
def __init__(self, filters, **kwargs):
super().__init__(**kwargs)
self.conv1 = keras.layers.Conv2D(filters, 3, padding='same')
self.conv2 = keras.layers.Conv2D(filters, 3, padding='same')
self.bn1 = keras.layers.BatchNormalization()
self.bn2 = keras.layers.BatchNormalization()
self.relu = keras.layers.ReLU()
def call(self, inputs, training=False):
x = self.conv1(inputs)
x = self.bn1(x, training=training)
x = self.relu(x)
x = self.conv2(x)
x = self.bn2(x, training=training)
x = x + inputs # residual connection
return self.relu(x)
class CustomCNN(keras.Model):
def __init__(self, num_classes=10, **kwargs):
super().__init__(**kwargs)
self.conv_stem = keras.layers.Conv2D(32, 3, padding='same', activation='relu')
self.res_block1 = ResidualBlock(32)
self.res_block2 = ResidualBlock(32)
self.pool = keras.layers.MaxPooling2D(2)
self.conv_expand = keras.layers.Conv2D(64, 3, padding='same', activation='relu')
self.res_block3 = ResidualBlock(64)
self.gap = keras.layers.GlobalAveragePooling2D()
self.dropout = keras.layers.Dropout(0.5)
self.fc = keras.layers.Dense(num_classes, activation='softmax')
def call(self, inputs, training=False):
x = self.conv_stem(inputs)
x = self.res_block1(x, training=training)
x = self.res_block2(x, training=training)
x = self.pool(x)
x = self.conv_expand(x)
x = self.res_block3(x, training=training)
x = self.gap(x)
x = self.dropout(x, training=training)
return self.fc(x)
# Usage
model = CustomCNN(num_classes=10)
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
# Test with dummy data
dummy_input = tf.random.normal([4, 32, 32, 3])
output = model(dummy_input, training=False)
print(output.shape) # (4, 10)
model.summary()
6. CNN Implementation (CIFAR-10)
Basic CNN Model
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# Load CIFAR-10
(x_train, y_train), (x_test, y_test) = keras.datasets.cifar10.load_data()
x_train = x_train.astype('float32') / 255.0
x_test = x_test.astype('float32') / 255.0
class_names = ['airplane', 'automobile', 'bird', 'cat', 'deer',
'dog', 'frog', 'horse', 'ship', 'truck']
print(f"Training data: {x_train.shape}") # (50000, 32, 32, 3)
print(f"Test data: {x_test.shape}") # (10000, 32, 32, 3)
# Define CNN model
def build_cnn_model():
model = keras.Sequential([
# First conv block
layers.Conv2D(32, (3, 3), padding='same', input_shape=(32, 32, 3)),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.Conv2D(32, (3, 3), padding='same'),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.MaxPooling2D((2, 2)),
layers.Dropout(0.25),
# Second conv block
layers.Conv2D(64, (3, 3), padding='same'),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.Conv2D(64, (3, 3), padding='same'),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.MaxPooling2D((2, 2)),
layers.Dropout(0.25),
# Third conv block
layers.Conv2D(128, (3, 3), padding='same'),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.Conv2D(128, (3, 3), padding='same'),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.GlobalAveragePooling2D(),
layers.Dropout(0.5),
# Fully connected layers
layers.Dense(256, activation='relu'),
layers.BatchNormalization(),
layers.Dropout(0.5),
layers.Dense(10, activation='softmax')
])
return model
model = build_cnn_model()
model.summary()
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=1e-3),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
Data Augmentation
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# Keras built-in augmentation layers
data_augmentation = keras.Sequential([
layers.RandomFlip('horizontal'),
layers.RandomRotation(0.1),
layers.RandomZoom(0.1),
layers.RandomTranslation(0.1, 0.1),
layers.RandomContrast(0.1),
], name='data_augmentation')
# Integrate augmentation into the model
inputs = keras.Input(shape=(32, 32, 3))
x = data_augmentation(inputs) # Only applied during training
x = layers.Rescaling(1./255)(x)
x = layers.Conv2D(32, 3, padding='same', activation='relu')(x)
x = layers.MaxPooling2D()(x)
x = layers.Conv2D(64, 3, padding='same', activation='relu')(x)
x = layers.MaxPooling2D()(x)
x = layers.GlobalAveragePooling2D()(x)
outputs = layers.Dense(10, activation='softmax')(x)
augmented_model = keras.Model(inputs, outputs)
# tf.data augmentation pipeline
def augment_image(image, label):
image = tf.cast(image, tf.float32) / 255.0
image = tf.image.random_flip_left_right(image)
image = tf.image.random_brightness(image, 0.2)
image = tf.image.random_contrast(image, 0.8, 1.2)
image = tf.image.random_saturation(image, 0.8, 1.2)
image = tf.image.pad_to_bounding_box(image, 4, 4, 40, 40)
image = tf.image.random_crop(image, [32, 32, 3])
return image, label
(x_train, y_train), (x_test, y_test) = keras.datasets.cifar10.load_data()
y_train = y_train.flatten()
y_test = y_test.flatten()
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.map(augment_image, num_parallel_calls=tf.data.AUTOTUNE)
train_dataset = train_dataset.shuffle(10000).batch(128).prefetch(tf.data.AUTOTUNE)
Transfer Learning
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.applications import EfficientNetB0
# Transfer learning with EfficientNetB0
def build_transfer_model(num_classes=10):
# Pre-trained base model (without classifier head)
base_model = EfficientNetB0(
include_top=False,
weights='imagenet',
input_shape=(224, 224, 3)
)
# Freeze base model initially
base_model.trainable = False
inputs = keras.Input(shape=(224, 224, 3))
# EfficientNet includes internal preprocessing
x = base_model(inputs, training=False)
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dense(256, activation='relu')(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(num_classes, activation='softmax')(x)
model = keras.Model(inputs, outputs)
return model, base_model
model, base_model = build_transfer_model(num_classes=10)
model.compile(
optimizer=keras.optimizers.Adam(1e-3),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# Phase 1: Train classifier with frozen base
# model.fit(train_dataset, epochs=10, ...)
# Fine-tuning: Unfreeze some base layers
def fine_tune(model, base_model, fine_tune_at=100):
base_model.trainable = True
# Keep layers before fine_tune_at frozen
for layer in base_model.layers[:fine_tune_at]:
layer.trainable = False
model.compile(
optimizer=keras.optimizers.Adam(1e-5), # Very low learning rate
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
return model
# fine_tune(model, base_model, fine_tune_at=100)
# model.fit(train_dataset, epochs=20, ...)
7. RNN/LSTM Implementation
Time Series Prediction (LSTM)
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
# Generate synthetic time series data
def generate_time_series(n_samples, n_steps):
t = np.linspace(0, 4 * np.pi, n_steps + 1)
series = np.sin(t) + 0.1 * np.random.randn(n_samples, n_steps + 1)
X = series[:, :-1].reshape(-1, n_steps, 1)
y = series[:, 1:].reshape(-1, n_steps, 1)
return X, y
X_train, y_train = generate_time_series(10000, 50)
X_val, y_val = generate_time_series(1000, 50)
X_test, y_test = generate_time_series(1000, 50)
# LSTM model
def build_lstm_model(n_steps=50):
model = keras.Sequential([
layers.LSTM(64, return_sequences=True, input_shape=(n_steps, 1)),
layers.Dropout(0.2),
layers.LSTM(64, return_sequences=True),
layers.Dropout(0.2),
layers.TimeDistributed(layers.Dense(1))
])
model.compile(
optimizer='adam',
loss='mse',
metrics=['mae']
)
return model
model = build_lstm_model()
model.summary()
history = model.fit(
X_train, y_train,
epochs=20,
batch_size=128,
validation_data=(X_val, y_val)
)
Text Generation (Character-Level RNN)
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
# Example text (use larger corpus in practice)
text = """TensorFlow is a deep learning framework created by Google.
Keras is a high-level API that runs on top of TensorFlow.
Using both libraries together, you can build powerful deep learning models quickly."""
# Create character set
chars = sorted(set(text))
char2idx = {c: i for i, c in enumerate(chars)}
idx2char = np.array(chars)
vocab_size = len(chars)
# Convert text to indices
text_as_int = np.array([char2idx[c] for c in text])
# Create sequences
seq_length = 50
sequences = tf.data.Dataset.from_tensor_slices(text_as_int)
sequences = sequences.batch(seq_length + 1, drop_remainder=True)
def split_input_target(chunk):
return chunk[:-1], chunk[1:]
dataset = sequences.map(split_input_target)
dataset = dataset.shuffle(100).batch(32, drop_remainder=True)
# Character-level RNN model
def build_char_rnn(vocab_size, embed_dim=64, rnn_units=256):
model = keras.Sequential([
layers.Embedding(vocab_size, embed_dim),
layers.GRU(rnn_units, return_sequences=True, stateful=False),
layers.GRU(rnn_units, return_sequences=True),
layers.Dense(vocab_size)
])
return model
model = build_char_rnn(vocab_size)
model.compile(
optimizer='adam',
loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy']
)
# Text generation function
def generate_text(model, start_string, num_generate=100, temperature=1.0):
input_eval = [char2idx[s] for s in start_string]
input_eval = tf.expand_dims(input_eval, 0)
text_generated = []
for _ in range(num_generate):
predictions = model(input_eval)
predictions = tf.squeeze(predictions, 0) / temperature
predicted_id = tf.random.categorical(predictions, num_samples=1)[-1, 0].numpy()
input_eval = tf.expand_dims([predicted_id], 0)
text_generated.append(idx2char[predicted_id])
return start_string + ''.join(text_generated)
Bidirectional LSTM
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# Bidirectional LSTM for text classification
def build_bidirectional_model(vocab_size, max_len=100, embed_dim=64):
model = keras.Sequential([
layers.Embedding(vocab_size, embed_dim, input_length=max_len),
layers.Bidirectional(layers.LSTM(64, return_sequences=True)),
layers.Bidirectional(layers.LSTM(32)),
layers.Dense(64, activation='relu'),
layers.Dropout(0.5),
layers.Dense(1, activation='sigmoid')
])
model.compile(
optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy']
)
return model
model = build_bidirectional_model(vocab_size=10000, max_len=100)
model.summary()
8. Transformer with Keras
Multi-head Attention and Transformer Encoder
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
class TransformerBlock(keras.layers.Layer):
def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1, **kwargs):
super().__init__(**kwargs)
self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
self.ffn = keras.Sequential([
layers.Dense(ff_dim, activation='relu'),
layers.Dense(embed_dim)
])
self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
self.dropout1 = layers.Dropout(rate)
self.dropout2 = layers.Dropout(rate)
def call(self, inputs, training=False):
attn_output = self.att(inputs, inputs)
attn_output = self.dropout1(attn_output, training=training)
out1 = self.layernorm1(inputs + attn_output)
ffn_output = self.ffn(out1)
ffn_output = self.dropout2(ffn_output, training=training)
return self.layernorm2(out1 + ffn_output)
class TokenAndPositionEmbedding(keras.layers.Layer):
def __init__(self, maxlen, vocab_size, embed_dim, **kwargs):
super().__init__(**kwargs)
self.token_emb = layers.Embedding(vocab_size, embed_dim)
self.pos_emb = layers.Embedding(maxlen, embed_dim)
def call(self, x):
maxlen = tf.shape(x)[-1]
positions = tf.range(start=0, limit=maxlen, delta=1)
positions = self.pos_emb(positions)
x = self.token_emb(x)
return x + positions
# Transformer classifier for text
def build_transformer_classifier(
vocab_size=20000,
maxlen=200,
embed_dim=32,
num_heads=2,
ff_dim=32,
num_classes=2
):
inputs = layers.Input(shape=(maxlen,))
embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
x = embedding_layer(inputs)
transformer_block = TransformerBlock(embed_dim, num_heads, ff_dim)
x = transformer_block(x)
x = layers.GlobalAveragePooling1D()(x)
x = layers.Dropout(0.1)(x)
x = layers.Dense(20, activation='relu')(x)
x = layers.Dropout(0.1)(x)
outputs = layers.Dense(num_classes, activation='softmax')(x)
model = keras.Model(inputs=inputs, outputs=outputs)
return model
model = build_transformer_classifier()
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
model.summary()
# IMDB sentiment classification
vocab_size = 20000
maxlen = 200
(x_train, y_train), (x_val, y_val) = keras.datasets.imdb.load_data(num_words=vocab_size)
x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=maxlen)
x_val = keras.preprocessing.sequence.pad_sequences(x_val, maxlen=maxlen)
# model.fit(x_train, y_train, batch_size=32, epochs=5, validation_data=(x_val, y_val))
9. Data Pipeline (tf.data)
Basic tf.data.Dataset Usage
import tensorflow as tf
import numpy as np
# Create Dataset from tensors
X = np.random.randn(1000, 10)
y = np.random.randint(0, 2, 1000)
dataset = tf.data.Dataset.from_tensor_slices((X, y))
print(dataset) # TensorSliceDataset
# Basic transformations
dataset = (dataset
.shuffle(buffer_size=1000)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
# Inspect data
for batch_X, batch_y in dataset.take(1):
print(f"Batch X shape: {batch_X.shape}") # (32, 10)
print(f"Batch y shape: {batch_y.shape}") # (32,)
# map transformation
def preprocess(x, y):
x = tf.cast(x, tf.float32)
y = tf.cast(y, tf.int32)
x = (x - tf.reduce_mean(x)) / tf.math.reduce_std(x)
return x, y
dataset = tf.data.Dataset.from_tensor_slices((X, y))
dataset = dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
# filter transformation
positive_dataset = dataset.filter(lambda x, y: y == 1)
# Create from range
range_dataset = tf.data.Dataset.range(100)
# zip to combine
features_ds = tf.data.Dataset.from_tensor_slices(X)
labels_ds = tf.data.Dataset.from_tensor_slices(y)
combined_ds = tf.data.Dataset.zip((features_ds, labels_ds))
Image Data Loading Pipeline
import tensorflow as tf
import os
def load_and_preprocess_image(path, label, image_size=(224, 224)):
image = tf.io.read_file(path)
image = tf.image.decode_jpeg(image, channels=3)
image = tf.image.resize(image, image_size)
image = tf.cast(image, tf.float32) / 255.0
return image, label
def create_image_dataset(data_dir, batch_size=32, image_size=(224, 224)):
class_names = sorted(os.listdir(data_dir))
class_map = {name: idx for idx, name in enumerate(class_names)}
file_paths = []
labels = []
for class_name in class_names:
class_dir = os.path.join(data_dir, class_name)
for fname in os.listdir(class_dir):
if fname.endswith(('.jpg', '.jpeg', '.png')):
file_paths.append(os.path.join(class_dir, fname))
labels.append(class_map[class_name])
path_ds = tf.data.Dataset.from_tensor_slices(file_paths)
label_ds = tf.data.Dataset.from_tensor_slices(labels)
combined = tf.data.Dataset.zip((path_ds, label_ds))
dataset = combined.map(
lambda p, l: load_and_preprocess_image(p, l, image_size),
num_parallel_calls=tf.data.AUTOTUNE
)
dataset = dataset.shuffle(1000).batch(batch_size).prefetch(tf.data.AUTOTUNE)
return dataset, class_names
# Alternatively, use the built-in helper (easier)
# train_ds = keras.utils.image_dataset_from_directory(
# 'data/train',
# image_size=(224, 224),
# batch_size=32
# )
TFRecord Format
import tensorflow as tf
# Writing TFRecord files
def bytes_feature(value):
if isinstance(value, type(tf.constant(0))):
value = value.numpy()
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def int64_feature(value):
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
def float_feature(value):
return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))
def image_to_tfrecord(image_path, label, writer):
image_string = open(image_path, 'rb').read()
feature = {
'image': bytes_feature(image_string),
'label': int64_feature(label),
}
example = tf.train.Example(features=tf.train.Features(feature=feature))
writer.write(example.SerializeToString())
# Reading TFRecord files
def parse_tfrecord(serialized_example):
feature_description = {
'image': tf.io.FixedLenFeature([], tf.string),
'label': tf.io.FixedLenFeature([], tf.int64),
}
example = tf.io.parse_single_example(serialized_example, feature_description)
image = tf.image.decode_jpeg(example['image'], channels=3)
image = tf.image.resize(image, [224, 224])
image = tf.cast(image, tf.float32) / 255.0
label = example['label']
return image, label
# Create TFRecord Dataset
# dataset = tf.data.TFRecordDataset(['data.tfrecord'])
# dataset = dataset.map(parse_tfrecord, num_parallel_calls=tf.data.AUTOTUNE)
# dataset = dataset.shuffle(1000).batch(32).prefetch(tf.data.AUTOTUNE)
10. Advanced Training Techniques
Callbacks
import tensorflow as tf
from tensorflow import keras
import os
# ModelCheckpoint: Save best model
checkpoint_cb = keras.callbacks.ModelCheckpoint(
filepath='best_model.keras',
monitor='val_accuracy',
mode='max',
save_best_only=True,
verbose=1
)
# EarlyStopping: Prevent overfitting
early_stopping_cb = keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=10,
restore_best_weights=True,
verbose=1
)
# ReduceLROnPlateau: Reduce learning rate
reduce_lr_cb = keras.callbacks.ReduceLROnPlateau(
monitor='val_loss',
factor=0.5,
patience=5,
min_lr=1e-7,
verbose=1
)
# TensorBoard: Visualization
tensorboard_cb = keras.callbacks.TensorBoard(
log_dir='logs/',
histogram_freq=1,
write_graph=True,
write_images=True,
update_freq='epoch'
)
# LearningRateScheduler: Custom schedule
def cosine_decay_schedule(epoch, lr):
import math
initial_lr = 1e-3
total_epochs = 100
return initial_lr * (1 + math.cos(math.pi * epoch / total_epochs)) / 2
lr_scheduler_cb = keras.callbacks.LearningRateScheduler(
cosine_decay_schedule, verbose=1
)
# CSV logging
csv_logger_cb = keras.callbacks.CSVLogger('training_log.csv')
# Custom callback
class ConfusionMatrixCallback(keras.callbacks.Callback):
def __init__(self, validation_data, class_names):
super().__init__()
self.X_val, self.y_val = validation_data
self.class_names = class_names
def on_epoch_end(self, epoch, logs=None):
y_pred = tf.argmax(self.model.predict(self.X_val), axis=1)
cm = tf.math.confusion_matrix(self.y_val, y_pred)
if epoch % 10 == 0:
print(f"\nEpoch {epoch} confusion matrix:\n{cm.numpy()}")
callbacks = [
checkpoint_cb,
early_stopping_cb,
reduce_lr_cb,
tensorboard_cb,
csv_logger_cb
]
# Usage example
# history = model.fit(
# X_train, y_train,
# epochs=100,
# validation_data=(X_val, y_val),
# callbacks=callbacks
# )
Custom Training Loop (GradientTape)
import tensorflow as tf
from tensorflow import keras
import time
# Simple classification model
model = keras.Sequential([
keras.layers.Dense(64, activation='relu', input_shape=(784,)),
keras.layers.Dense(64, activation='relu'),
keras.layers.Dense(10)
])
# Loss and optimizer
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = keras.optimizers.Adam(learning_rate=1e-3)
# Metrics
train_loss = keras.metrics.Mean(name='train_loss')
train_accuracy = keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')
val_loss = keras.metrics.Mean(name='val_loss')
val_accuracy = keras.metrics.SparseCategoricalAccuracy(name='val_accuracy')
# Single training step
@tf.function # JIT compilation for speedup
def train_step(images, labels):
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = loss_fn(labels, predictions)
# L2 regularization
l2_loss = tf.add_n([tf.nn.l2_loss(v) for v in model.trainable_variables
if 'bias' not in v.name])
total_loss = loss + 1e-4 * l2_loss
gradients = tape.gradient(total_loss, model.trainable_variables)
# Gradient clipping
gradients, _ = tf.clip_by_global_norm(gradients, 1.0)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
train_loss.update_state(loss)
train_accuracy.update_state(labels, predictions)
return loss
@tf.function
def val_step(images, labels):
predictions = model(images, training=False)
loss = loss_fn(labels, predictions)
val_loss.update_state(loss)
val_accuracy.update_state(labels, predictions)
# Full training loop
def train(train_ds, val_ds, epochs=10):
for epoch in range(epochs):
start_time = time.time()
# Reset metrics
train_loss.reset_states()
train_accuracy.reset_states()
val_loss.reset_states()
val_accuracy.reset_states()
# Training
for step, (images, labels) in enumerate(train_ds):
train_step(images, labels)
if step % 100 == 0:
print(f"Step {step}: loss={train_loss.result():.4f}")
# Validation
for images, labels in val_ds:
val_step(images, labels)
elapsed = time.time() - start_time
print(f"Epoch {epoch+1}/{epochs} ({elapsed:.1f}s) - "
f"loss: {train_loss.result():.4f}, "
f"accuracy: {train_accuracy.result():.4f}, "
f"val_loss: {val_loss.result():.4f}, "
f"val_accuracy: {val_accuracy.result():.4f}")
Distributed Training (tf.distribute.Strategy)
import tensorflow as tf
from tensorflow import keras
# Multi-GPU strategy
strategy = tf.distribute.MirroredStrategy()
print(f"Number of devices: {strategy.num_replicas_in_sync}")
with strategy.scope():
# Define and compile model inside strategy.scope()
model = keras.Sequential([
keras.layers.Dense(64, activation='relu', input_shape=(784,)),
keras.layers.Dense(64, activation='relu'),
keras.layers.Dense(10, activation='softmax')
])
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# Scale batch size proportionally to number of GPUs
BATCH_SIZE_PER_REPLICA = 64
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
# Mixed precision training
from tensorflow.keras import mixed_precision
policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)
with strategy.scope():
inputs = keras.Input(shape=(784,))
x = keras.layers.Dense(64, activation='relu')(inputs)
# Output layer must use float32
outputs = keras.layers.Dense(10, activation='softmax', dtype='float32')(x)
model_mp = keras.Model(inputs, outputs)
opt = keras.optimizers.Adam(1e-3)
opt = mixed_precision.LossScaleOptimizer(opt)
model_mp.compile(
optimizer=opt,
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
11. TensorBoard Visualization
Basic TensorBoard Usage
import tensorflow as tf
from tensorflow import keras
import datetime
import numpy as np
# Log directory
log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
# Callback setup
tensorboard_callback = keras.callbacks.TensorBoard(
log_dir=log_dir,
histogram_freq=1, # Weight histogram frequency
write_graph=True, # Record computation graph
write_images=True, # Record weight images
update_freq='epoch', # Update frequency
profile_batch=2 # Profiling batch
)
# Custom scalar logging
file_writer = tf.summary.create_file_writer(log_dir + '/custom_scalars')
def log_custom_metrics(epoch, logs):
with file_writer.as_default():
tf.summary.scalar('learning_rate',
data=keras.backend.get_value(model.optimizer.lr),
step=epoch)
lr_callback = keras.callbacks.LambdaCallback(on_epoch_end=log_custom_metrics)
# Log images
def log_images(epoch, logs):
(_, _), (x_test, y_test) = keras.datasets.mnist.load_data()
x_test = x_test[:10].reshape(-1, 28, 28, 1).astype('float32') / 255.0
with file_writer.as_default():
tf.summary.image("Test Samples", x_test, step=epoch, max_outputs=10)
image_callback = keras.callbacks.LambdaCallback(on_epoch_end=log_images)
# Start TensorBoard with:
# tensorboard --logdir logs/fit
Embedding Visualization
import tensorflow as tf
from tensorflow import keras
import os
import numpy as np
from tensorboard.plugins import projector
# Train and visualize embedding layer
(x_train, y_train), _ = keras.datasets.mnist.load_data()
x_train = x_train.astype('float32').reshape(-1, 784) / 255.0
# Embedding model
embedding_model = keras.Sequential([
keras.layers.Dense(64, activation='relu', input_shape=(784,)),
keras.layers.Dense(32, name='embedding')
])
# Extract embeddings
embeddings = embedding_model.predict(x_train[:1000])
# Save embedding file
log_dir = 'logs/embedding'
os.makedirs(log_dir, exist_ok=True)
np.savetxt(os.path.join(log_dir, 'vectors.tsv'), embeddings, delimiter='\t')
# Save metadata (labels)
with open(os.path.join(log_dir, 'metadata.tsv'), 'w') as f:
for label in y_train[:1000]:
f.write(f"{label}\n")
# Configure Projector
config = projector.ProjectorConfig()
embedding_config = config.embeddings.add()
embedding_config.tensor_name = "embedding/.ATTRIBUTES/VARIABLE_VALUE"
embedding_config.tensor_path = 'vectors.tsv'
embedding_config.metadata_path = 'metadata.tsv'
projector.visualize_embeddings(log_dir, config)
12. Saving and Converting Models
SavedModel Format
import tensorflow as tf
from tensorflow import keras
# Save model
model.save('saved_model/my_model')
# Load
loaded_model = keras.models.load_model('saved_model/my_model')
# HDF5 format (legacy)
model.save('my_model.h5')
loaded_h5 = keras.models.load_model('my_model.h5')
# Save/load weights only
model.save_weights('model_weights.h5')
model.load_weights('model_weights.h5')
# Keras native format (recommended)
model.save('my_model.keras')
loaded_keras = keras.models.load_model('my_model.keras')
# Subclassed models require get_config
class MyModel(keras.Model):
def __init__(self, units):
super().__init__()
self.units = units
self.dense = keras.layers.Dense(units)
def call(self, inputs):
return self.dense(inputs)
def get_config(self):
return {'units': self.units}
@classmethod
def from_config(cls, config):
return cls(**config)
TensorFlow Lite Conversion (Mobile/Edge)
import tensorflow as tf
from tensorflow import keras
import numpy as np
# Basic TFLite conversion
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model/my_model')
tflite_model = converter.convert()
with open('model.tflite', 'wb') as f:
f.write(tflite_model)
# Dynamic range quantization
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model/my_model')
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_quantized = converter.convert()
# Full integer quantization (INT8)
def representative_dataset():
for _ in range(100):
data = np.random.random((1, 784)).astype(np.float32)
yield [data]
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model/my_model')
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_dataset
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8
tflite_int8 = converter.convert()
# Run TFLite model
interpreter = tf.lite.Interpreter(model_content=tflite_quantized)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
print("Input:", input_details[0]['shape'])
print("Output:", output_details[0]['shape'])
# Run inference
input_data = np.random.random((1, 784)).astype(np.float32)
interpreter.set_tensor(input_details[0]['index'], input_data)
interpreter.invoke()
output_data = interpreter.get_tensor(output_details[0]['index'])
print("TFLite output:", output_data.shape)
TensorFlow.js Conversion
# Install tfjs conversion tool
pip install tensorflowjs
# Convert from SavedModel to TFJS
tensorflowjs_converter \
--input_format=tf_saved_model \
--output_format=tfjs_graph_model \
--signature_name=serving_default \
saved_model/my_model \
tfjs_model/
13. Production Deployment with TF-Serving
TensorFlow Serving Setup
# Run TF Serving with Docker (easiest approach)
docker pull tensorflow/serving
# Serve the model
docker run -t --rm \
-p 8501:8501 \
-v "/path/to/saved_model:/models/my_model" \
-e MODEL_NAME=my_model \
tensorflow/serving
# GPU support
docker run --gpus all -t --rm \
-p 8501:8501 \
-v "/path/to/saved_model:/models/my_model" \
-e MODEL_NAME=my_model \
tensorflow/serving:latest-gpu
Making Predictions via REST API
import requests
import json
import numpy as np
# REST API request
url = 'http://localhost:8501/v1/models/my_model:predict'
# Prepare input data
data = np.random.random((1, 784)).astype(float)
payload = {
"instances": data.tolist()
}
response = requests.post(url, json=payload)
result = json.loads(response.text)
print("Predictions:", result['predictions'])
# Check model info
info_url = 'http://localhost:8501/v1/models/my_model'
info_response = requests.get(info_url)
print("Model info:", json.loads(info_response.text))
gRPC Client
import grpc
import numpy as np
from tensorflow_serving.apis import predict_pb2
from tensorflow_serving.apis import prediction_service_pb2_grpc
import tensorflow as tf
# Create gRPC channel
channel = grpc.insecure_channel('localhost:8500')
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
# Build request
request = predict_pb2.PredictRequest()
request.model_spec.name = 'my_model'
request.model_spec.signature_name = 'serving_default'
# Set input tensor
input_data = np.random.random((1, 784)).astype(np.float32)
request.inputs['input_layer'].CopyFrom(
tf.make_tensor_proto(input_data, shape=input_data.shape)
)
# Run inference
response = stub.Predict(request, 10.0) # 10-second timeout
output = tf.make_ndarray(response.outputs['output'])
print("gRPC prediction:", output)
Version Management and Canary Deployment
# model.config file
model_config_list {
config {
name: 'my_model'
base_path: '/models/my_model/'
model_platform: 'tensorflow'
model_version_policy {
specific {
versions: 1
versions: 2
}
}
version_labels {
key: 'stable'
value: 1
}
version_labels {
key: 'canary'
value: 2
}
}
}
# Serve with config file
docker run -t --rm \
-p 8501:8501 -p 8500:8500 \
-v "/path/to/models:/models" \
-v "/path/to/model.config:/models/model.config" \
tensorflow/serving \
--model_config_file=/models/model.config
# Request a specific version
url = 'http://localhost:8501/v1/models/my_model/versions/1:predict'
# Or request by label
url_label = 'http://localhost:8501/v1/models/my_model/labels/stable:predict'
14. TensorFlow Extended (TFX)
TFX is a production machine learning pipeline platform built on TensorFlow.
ML Pipeline Overview
import tfx
from tfx.components import (
CsvExampleGen,
StatisticsGen,
SchemaGen,
ExampleValidator,
Transform,
Trainer,
Evaluator,
Pusher
)
from tfx.proto import pusher_pb2, trainer_pb2
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
# Interactive context (for notebooks)
context = InteractiveContext()
# 1. ExampleGen: Data ingestion
example_gen = CsvExampleGen(input_base='data/')
context.run(example_gen)
# 2. StatisticsGen: Data statistics
statistics_gen = StatisticsGen(
examples=example_gen.outputs['examples']
)
context.run(statistics_gen)
# 3. SchemaGen: Schema generation
schema_gen = SchemaGen(
statistics=statistics_gen.outputs['statistics'],
infer_feature_shape=True
)
context.run(schema_gen)
# 4. ExampleValidator: Data validation
example_validator = ExampleValidator(
statistics=statistics_gen.outputs['statistics'],
schema=schema_gen.outputs['schema']
)
context.run(example_validator)
# 5. Transform: Feature engineering
# Requires preprocessing_fn defined in transform.py
# 6. Trainer: Model training
trainer = Trainer(
module_file='trainer_module.py',
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
train_args=trainer_pb2.TrainArgs(num_steps=1000),
eval_args=trainer_pb2.EvalArgs(num_steps=500)
)
# 7. Pusher: Model deployment
pusher = Pusher(
model=trainer.outputs['model'],
push_destination=pusher_pb2.PushDestination(
filesystem=pusher_pb2.PushDestination.Filesystem(
base_directory='serving_model/'
)
)
)
Transform Component Example
import tensorflow as tf
import tensorflow_transform as tft
# transform.py
FEATURE_KEYS = ['feature1', 'feature2', 'feature3']
LABEL_KEY = 'label'
def preprocessing_fn(inputs):
"""Feature preprocessing function"""
outputs = {}
for key in FEATURE_KEYS:
# Normalize with z-score
outputs[key] = tft.scale_to_z_score(inputs[key])
# Encode label
outputs[LABEL_KEY] = tf.cast(inputs[LABEL_KEY], tf.int64)
return outputs
Conclusion
This guide has covered TensorFlow and Keras comprehensively, from core concepts to production deployment.
Key Takeaways
- Eager Execution: Default execution mode in TF 2.x; use
@tf.functionfor graph optimization - Three Keras APIs: Sequential (simple), Functional (complex topologies), Subclassing (full customization)
- tf.data: Essential tool for efficient data pipelines with map, filter, batch, shuffle, prefetch
- GradientTape: Custom training loops and automatic differentiation
- Deployment Options: TF Serving (server), TFLite (mobile/edge), TF.js (browser)
- TFX: The standard for production ML pipelines
Topics for Further Study
- TensorFlow Probability (probabilistic deep learning)
- Keras Tuner (hyperparameter optimization)
- TensorFlow Datasets (standard datasets)
- TensorFlow Hub (pre-trained models)
- tf-agents (reinforcement learning)