Skip to content
Published on

TensorFlow & Keras 완전 정복 가이드: Zero to Hero - 설치부터 프로덕션 배포까지

Authors

들어가며

TensorFlow는 Google Brain 팀이 2015년에 오픈소스로 공개한 머신러닝/딥러닝 프레임워크입니다. 현재 가장 널리 사용되는 딥러닝 프레임워크 중 하나로, 연구부터 프로덕션 배포까지 전 과정을 지원합니다. Keras는 TensorFlow 2.x부터 공식 고수준 API로 통합되어, 직관적이고 빠른 모델 개발을 가능하게 합니다.

이 가이드에서는 TensorFlow와 Keras의 기초부터 시작해 실제 프로덕션 환경에 배포하는 것까지 단계별로 완전히 다룹니다.


1. TensorFlow 소개와 설치

TensorFlow vs PyTorch 비교

두 프레임워크는 각각 장단점이 있습니다.

항목TensorFlow/KerasPyTorch
개발사GoogleMeta (Facebook)
배포 도구TF Serving, TFLite, TF.jsTorchServe, TorchScript
프로덕션 성숙도매우 높음높음
연구 인기도높음매우 높음
학습 곡선중간낮음
모바일/엣지TFLite 우수ExecuTorch
생태계TFX, TFHub 등HuggingFace 통합 우수

설치

pip으로 설치하는 방법입니다.

# CPU 전용
pip install tensorflow

# GPU 지원 (CUDA 자동 감지, TF 2.9+)
pip install tensorflow[and-cuda]

# 특정 버전
pip install tensorflow==2.15.0

# conda 환경
conda create -n tf_env python=3.10
conda activate tf_env
conda install -c conda-forge tensorflow

macOS Apple Silicon에서는 다음과 같이 설치합니다.

pip install tensorflow-macos
pip install tensorflow-metal  # GPU 가속

TensorFlow 2.x 주요 변화

TensorFlow 2.0에서 가장 큰 변화는 Eager Execution의 기본 활성화입니다. TF 1.x에서는 계산 그래프를 먼저 정의하고 세션(Session)을 통해 실행했지만, TF 2.x에서는 Python 코드처럼 즉시 실행됩니다.

import tensorflow as tf
print(tf.__version__)

# Eager execution 확인
print(tf.executing_eagerly())  # True

# TF 1.x 스타일의 그래프 실행 (호환성)
@tf.function
def compute(x, y):
    return x + y

result = compute(tf.constant(1.0), tf.constant(2.0))
print(result)  # tf.Tensor(3.0, shape=(), dtype=float32)

GPU 설정 확인

import tensorflow as tf

# GPU 목록 확인
gpus = tf.config.list_physical_devices('GPU')
print("사용 가능한 GPU:", gpus)

# GPU 메모리 증분 할당 (OOM 방지)
if gpus:
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)

# 특정 GPU만 사용
if gpus:
    tf.config.set_visible_devices(gpus[0], 'GPU')

# 논리적 GPU 분할 (하나의 물리 GPU를 여러 개처럼 사용)
if gpus:
    tf.config.set_logical_device_configuration(
        gpus[0],
        [tf.config.LogicalDeviceConfiguration(memory_limit=2048),
         tf.config.LogicalDeviceConfiguration(memory_limit=2048)]
    )

# 현재 연산이 실행되는 디바이스 확인
with tf.device('/GPU:0'):
    a = tf.constant([[1.0, 2.0], [3.0, 4.0]])
    b = tf.constant([[5.0, 6.0], [7.0, 8.0]])
    c = tf.matmul(a, b)
    print(c.device)

2. TensorFlow 텐서(Tensor) 기초

텐서(Tensor)는 TensorFlow의 핵심 데이터 구조입니다. NumPy 배열과 유사하지만 GPU에서 가속할 수 있으며, 자동 미분을 지원합니다.

tf.constant와 tf.Variable

import tensorflow as tf
import numpy as np

# 스칼라 (0차원 텐서)
scalar = tf.constant(42)
print(scalar)          # tf.Tensor(42, shape=(), dtype=int32)
print(scalar.dtype)    # tf.int32
print(scalar.shape)    # ()

# 벡터 (1차원 텐서)
vector = tf.constant([1.0, 2.0, 3.0])
print(vector)          # tf.Tensor([1. 2. 3.], shape=(3,), dtype=float32)

# 행렬 (2차원 텐서)
matrix = tf.constant([[1, 2, 3],
                       [4, 5, 6]], dtype=tf.float32)
print(matrix.shape)    # (2, 3)

# 3차원 텐서
tensor_3d = tf.constant([[[1, 2], [3, 4]],
                          [[5, 6], [7, 8]]])
print(tensor_3d.shape)  # (2, 2, 2)

# 특수 텐서
zeros = tf.zeros([3, 4])          # 0으로 채운 3x4 행렬
ones = tf.ones([2, 3])            # 1로 채운 2x3 행렬
identity = tf.eye(4)              # 4x4 단위 행렬
random = tf.random.normal([3, 3]) # 정규분포 랜덤 행렬

# tf.Variable - 학습 파라미터에 사용 (값 변경 가능)
var = tf.Variable([1.0, 2.0, 3.0])
print(var)             # <tf.Variable 'Variable:0' ...>

var.assign([4.0, 5.0, 6.0])      # 값 변경
var.assign_add([1.0, 1.0, 1.0])  # 더하기
var.assign_sub([0.5, 0.5, 0.5])  # 빼기

텐서 연산

import tensorflow as tf

a = tf.constant([[1.0, 2.0], [3.0, 4.0]])
b = tf.constant([[5.0, 6.0], [7.0, 8.0]])

# 기본 산술 연산 (element-wise)
print(a + b)         # 덧셈
print(a - b)         # 뺄셈
print(a * b)         # 곱셈 (element-wise)
print(a / b)         # 나눗셈
print(a ** 2)        # 제곱

# 동등한 TF 함수들
print(tf.add(a, b))
print(tf.subtract(a, b))
print(tf.multiply(a, b))
print(tf.divide(a, b))

# 행렬 곱셈
print(tf.matmul(a, b))   # 또는 a @ b
print(a @ b)

# 수학 함수
x = tf.constant([1.0, 4.0, 9.0, 16.0])
print(tf.sqrt(x))        # [1, 2, 3, 4]
print(tf.exp(x))         # e^x
print(tf.math.log(x))    # 자연로그

# 집계 연산
matrix = tf.constant([[1.0, 2.0, 3.0],
                       [4.0, 5.0, 6.0]])
print(tf.reduce_sum(matrix))           # 전체 합: 21
print(tf.reduce_sum(matrix, axis=0))   # 열별 합: [5, 7, 9]
print(tf.reduce_sum(matrix, axis=1))   # 행별 합: [6, 15]
print(tf.reduce_mean(matrix))          # 평균
print(tf.reduce_max(matrix))           # 최대값
print(tf.reduce_min(matrix))           # 최소값
print(tf.argmax(matrix, axis=1))       # 행별 최대값 인덱스

# 비교 연산
print(tf.equal(a, b))
print(tf.greater(a, b))
print(tf.less_equal(a, b))

형태 변환

import tensorflow as tf

t = tf.constant([[1, 2, 3, 4],
                 [5, 6, 7, 8]])
print(t.shape)  # (2, 4)

# reshape
reshaped = tf.reshape(t, [4, 2])
print(reshaped.shape)  # (4, 2)

reshaped2 = tf.reshape(t, [8])
print(reshaped2.shape)  # (8,)

reshaped3 = tf.reshape(t, [-1, 2])  # -1은 자동 계산
print(reshaped3.shape)  # (4, 2)

# transpose
transposed = tf.transpose(t)
print(transposed.shape)  # (4, 2)

# 고차원 transpose
t3d = tf.random.normal([2, 3, 4])
transposed_3d = tf.transpose(t3d, perm=[0, 2, 1])
print(transposed_3d.shape)  # (2, 4, 3)

# expand_dims - 차원 추가
t1d = tf.constant([1, 2, 3])
print(t1d.shape)             # (3,)

expanded_0 = tf.expand_dims(t1d, axis=0)
print(expanded_0.shape)      # (1, 3)

expanded_1 = tf.expand_dims(t1d, axis=1)
print(expanded_1.shape)      # (3, 1)

# squeeze - 크기 1인 차원 제거
t_squeezable = tf.constant([[[1, 2, 3]]])
print(t_squeezable.shape)    # (1, 1, 3)
squeezed = tf.squeeze(t_squeezable)
print(squeezed.shape)        # (3,)

# concat과 stack
a = tf.constant([[1, 2], [3, 4]])
b = tf.constant([[5, 6], [7, 8]])

concat_0 = tf.concat([a, b], axis=0)
print(concat_0.shape)  # (4, 2)

concat_1 = tf.concat([a, b], axis=1)
print(concat_1.shape)  # (2, 4)

stacked = tf.stack([a, b], axis=0)
print(stacked.shape)   # (2, 2, 2)

Broadcasting

import tensorflow as tf

# 스칼라 broadcasting
matrix = tf.constant([[1.0, 2.0], [3.0, 4.0]])
print(matrix + 10)    # 모든 원소에 10 더함

# 벡터 broadcasting
row_vector = tf.constant([10.0, 20.0])  # shape (2,)
print(matrix + row_vector)  # 각 행에 row_vector 더함

col_vector = tf.constant([[10.0], [20.0]])  # shape (2, 1)
print(matrix + col_vector)  # 각 열에 col_vector 더함

# 배치 연산에서의 broadcasting
batch = tf.random.normal([32, 128])  # 배치 크기 32, 특징 128
mean = tf.reduce_mean(batch, axis=0, keepdims=True)  # shape (1, 128)
std = tf.math.reduce_std(batch, axis=0, keepdims=True)
normalized = (batch - mean) / (std + 1e-8)  # broadcasting 적용

NumPy와 상호 변환

import tensorflow as tf
import numpy as np

# NumPy -> TensorFlow
np_array = np.array([[1.0, 2.0], [3.0, 4.0]])
tf_tensor = tf.constant(np_array)
tf_tensor2 = tf.convert_to_tensor(np_array)

# TensorFlow -> NumPy
numpy_from_tf = tf_tensor.numpy()
print(type(numpy_from_tf))  # numpy.ndarray

# tf.Tensor는 NumPy 함수 대부분 직접 사용 가능
print(np.sin(tf_tensor))
print(np.sqrt(tf_tensor))

3. Keras Sequential API

Sequential API는 Keras에서 가장 간단한 모델 구축 방법입니다. 레이어를 순서대로 쌓아 선형적인 모델을 만듭니다.

레이어 쌓기와 모델 컴파일

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# 방법 1: add() 메서드
model = keras.Sequential()
model.add(layers.Dense(128, activation='relu', input_shape=(784,)))
model.add(layers.Dropout(0.2))
model.add(layers.Dense(64, activation='relu'))
model.add(layers.Dropout(0.2))
model.add(layers.Dense(10, activation='softmax'))

# 방법 2: 리스트로 정의
model = keras.Sequential([
    layers.Dense(128, activation='relu', input_shape=(784,)),
    layers.Dropout(0.2),
    layers.Dense(64, activation='relu'),
    layers.Dropout(0.2),
    layers.Dense(10, activation='softmax')
])

# 모델 구조 확인
model.summary()

# 컴파일
model.compile(
    optimizer='adam',                        # 또는 keras.optimizers.Adam(learning_rate=0.001)
    loss='sparse_categorical_crossentropy',  # 정수 레이블
    metrics=['accuracy']
)

# 다양한 optimizer와 loss
model.compile(
    optimizer=keras.optimizers.SGD(learning_rate=0.01, momentum=0.9),
    loss=keras.losses.CategoricalCrossentropy(),
    metrics=[keras.metrics.CategoricalAccuracy()]
)

MNIST 분류 완전 예제

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt

# 데이터 로딩
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

# 전처리
x_train = x_train.astype('float32') / 255.0
x_test = x_test.astype('float32') / 255.0
x_train = x_train.reshape(-1, 784)
x_test = x_test.reshape(-1, 784)

print(f"훈련 데이터: {x_train.shape}, 레이블: {y_train.shape}")
print(f"테스트 데이터: {x_test.shape}, 레이블: {y_test.shape}")

# 모델 정의
model = keras.Sequential([
    layers.Dense(256, activation='relu', input_shape=(784,)),
    layers.BatchNormalization(),
    layers.Dropout(0.3),
    layers.Dense(128, activation='relu'),
    layers.BatchNormalization(),
    layers.Dropout(0.3),
    layers.Dense(10, activation='softmax')
])

model.summary()

# 컴파일
model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=1e-3),
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

# 훈련
history = model.fit(
    x_train, y_train,
    batch_size=128,
    epochs=20,
    validation_split=0.1,
    verbose=1
)

# 평가
test_loss, test_acc = model.evaluate(x_test, y_test, verbose=0)
print(f"테스트 손실: {test_loss:.4f}")
print(f"테스트 정확도: {test_acc:.4f}")

# 예측
predictions = model.predict(x_test[:10])
predicted_classes = tf.argmax(predictions, axis=1)
print("예측:", predicted_classes.numpy())
print("실제:", y_test[:10])

# 학습 곡선 시각화
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))

ax1.plot(history.history['loss'], label='훈련 손실')
ax1.plot(history.history['val_loss'], label='검증 손실')
ax1.set_title('손실')
ax1.set_xlabel('에포크')
ax1.legend()

ax2.plot(history.history['accuracy'], label='훈련 정확도')
ax2.plot(history.history['val_accuracy'], label='검증 정확도')
ax2.set_title('정확도')
ax2.set_xlabel('에포크')
ax2.legend()

plt.tight_layout()
plt.savefig('mnist_training.png')
plt.show()

4. Keras Functional API

Functional API는 더 복잡한 모델 아키텍처를 구성할 수 있습니다. 다중 입력/출력, 잔차 연결, 공유 레이어 등이 가능합니다.

기본 Functional API 사용법

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# 입력 정의
inputs = keras.Input(shape=(784,), name='input_layer')

# 레이어 연결 (함수처럼 호출)
x = layers.Dense(256, activation='relu', name='dense_1')(inputs)
x = layers.BatchNormalization(name='bn_1')(x)
x = layers.Dropout(0.3, name='dropout_1')(x)
x = layers.Dense(128, activation='relu', name='dense_2')(x)
x = layers.BatchNormalization(name='bn_2')(x)
x = layers.Dropout(0.3, name='dropout_2')(x)
outputs = layers.Dense(10, activation='softmax', name='output')(x)

# 모델 생성
model = keras.Model(inputs=inputs, outputs=outputs, name='mnist_model')
model.summary()

다중 입력/출력 모델

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# 다중 입력 모델 예시: 이미지 + 메타데이터 결합
# 이미지 입력
image_input = keras.Input(shape=(32, 32, 3), name='image')
x1 = layers.Conv2D(32, 3, activation='relu')(image_input)
x1 = layers.GlobalAveragePooling2D()(x1)
x1 = layers.Dense(64, activation='relu')(x1)

# 메타데이터 입력
meta_input = keras.Input(shape=(10,), name='metadata')
x2 = layers.Dense(32, activation='relu')(meta_input)

# 결합
combined = layers.concatenate([x1, x2])
combined = layers.Dense(64, activation='relu')(combined)

# 다중 출력
main_output = layers.Dense(1, activation='sigmoid', name='main_output')(combined)
aux_output = layers.Dense(5, activation='softmax', name='aux_output')(combined)

# 모델 생성
model = keras.Model(
    inputs=[image_input, meta_input],
    outputs=[main_output, aux_output]
)

# 컴파일 (출력별 손실 및 가중치 지정)
model.compile(
    optimizer='adam',
    loss={
        'main_output': 'binary_crossentropy',
        'aux_output': 'categorical_crossentropy'
    },
    loss_weights={
        'main_output': 1.0,
        'aux_output': 0.2
    },
    metrics={
        'main_output': ['accuracy'],
        'aux_output': ['accuracy']
    }
)

잔차 연결 (Residual Connection)

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

def residual_block(x, filters, stride=1):
    """ResNet 스타일 잔차 블록"""
    shortcut = x

    # 메인 경로
    x = layers.Conv2D(filters, 3, strides=stride, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = layers.Conv2D(filters, 3, padding='same')(x)
    x = layers.BatchNormalization()(x)

    # 채널 수나 크기가 다를 경우 shortcut 조정
    if stride != 1 or shortcut.shape[-1] != filters:
        shortcut = layers.Conv2D(filters, 1, strides=stride)(shortcut)
        shortcut = layers.BatchNormalization()(shortcut)

    # 잔차 더하기
    x = layers.add([x, shortcut])
    x = layers.ReLU()(x)
    return x

# ResNet 스타일 모델 구성
inputs = keras.Input(shape=(32, 32, 3))
x = layers.Conv2D(64, 7, strides=2, padding='same')(inputs)
x = layers.BatchNormalization()(x)
x = layers.ReLU()(x)
x = layers.MaxPooling2D(3, strides=2, padding='same')(x)

x = residual_block(x, 64)
x = residual_block(x, 64)
x = residual_block(x, 128, stride=2)
x = residual_block(x, 128)
x = residual_block(x, 256, stride=2)
x = residual_block(x, 256)

x = layers.GlobalAveragePooling2D()(x)
x = layers.Dense(256, activation='relu')(x)
outputs = layers.Dense(10, activation='softmax')(x)

model = keras.Model(inputs, outputs, name='mini_resnet')
model.summary()

공유 레이어 (Siamese Network)

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# 공유 인코더 정의
shared_encoder = keras.Sequential([
    layers.Dense(128, activation='relu'),
    layers.Dense(64, activation='relu'),
    layers.Dense(32)
], name='shared_encoder')

# 두 개의 입력
input_a = keras.Input(shape=(100,), name='input_a')
input_b = keras.Input(shape=(100,), name='input_b')

# 같은 인코더로 처리
encoded_a = shared_encoder(input_a)
encoded_b = shared_encoder(input_b)

# 거리 계산 (Cosine Similarity)
similarity = layers.Dot(axes=1, normalize=True)([encoded_a, encoded_b])
output = layers.Dense(1, activation='sigmoid')(similarity)

siamese_model = keras.Model(inputs=[input_a, input_b], outputs=output)
siamese_model.summary()

5. Keras Subclassing API

Subclassing API는 가장 유연한 방법으로, PyTorch와 유사한 방식입니다. 커스텀 레이어와 모델을 Python 클래스로 정의합니다.

커스텀 레이어

import tensorflow as tf
from tensorflow import keras

class MyDenseLayer(keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.activation = keras.activations.get(activation)

    def build(self, input_shape):
        # 가중치 초기화 (build는 첫 번째 호출 시 한 번만 실행)
        self.w = self.add_weight(
            name='kernel',
            shape=(input_shape[-1], self.units),
            initializer='glorot_uniform',
            trainable=True
        )
        self.b = self.add_weight(
            name='bias',
            shape=(self.units,),
            initializer='zeros',
            trainable=True
        )
        super().build(input_shape)

    def call(self, inputs, training=False):
        output = tf.matmul(inputs, self.w) + self.b
        if self.activation is not None:
            output = self.activation(output)
        return output

    def get_config(self):
        config = super().get_config()
        config.update({'units': self.units, 'activation': keras.activations.serialize(self.activation)})
        return config

# 사용
layer = MyDenseLayer(64, activation='relu')
x = tf.random.normal([32, 128])
y = layer(x)
print(y.shape)  # (32, 64)


class MultiHeadSelfAttention(keras.layers.Layer):
    """간단한 멀티헤드 셀프어텐션 레이어"""
    def __init__(self, embed_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads
        assert self.head_dim * num_heads == embed_dim

        self.wq = keras.layers.Dense(embed_dim)
        self.wk = keras.layers.Dense(embed_dim)
        self.wv = keras.layers.Dense(embed_dim)
        self.wo = keras.layers.Dense(embed_dim)

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.head_dim))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, x, training=False):
        batch_size = tf.shape(x)[0]
        seq_len = tf.shape(x)[1]

        q = self.split_heads(self.wq(x), batch_size)
        k = self.split_heads(self.wk(x), batch_size)
        v = self.split_heads(self.wv(x), batch_size)

        # 스케일드 닷 프로덕트 어텐션
        scale = tf.cast(self.head_dim, tf.float32) ** 0.5
        scores = tf.matmul(q, k, transpose_b=True) / scale
        weights = tf.nn.softmax(scores, axis=-1)
        context = tf.matmul(weights, v)

        # 헤드 결합
        context = tf.transpose(context, perm=[0, 2, 1, 3])
        context = tf.reshape(context, (batch_size, seq_len, self.embed_dim))
        return self.wo(context)

커스텀 모델 (Model 서브클래싱)

import tensorflow as tf
from tensorflow import keras

class ResidualBlock(keras.layers.Layer):
    def __init__(self, filters, **kwargs):
        super().__init__(**kwargs)
        self.conv1 = keras.layers.Conv2D(filters, 3, padding='same')
        self.conv2 = keras.layers.Conv2D(filters, 3, padding='same')
        self.bn1 = keras.layers.BatchNormalization()
        self.bn2 = keras.layers.BatchNormalization()
        self.relu = keras.layers.ReLU()

    def call(self, inputs, training=False):
        x = self.conv1(inputs)
        x = self.bn1(x, training=training)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x, training=training)
        x = x + inputs  # 잔차 연결
        return self.relu(x)


class CustomCNN(keras.Model):
    def __init__(self, num_classes=10, **kwargs):
        super().__init__(**kwargs)
        self.conv_stem = keras.layers.Conv2D(32, 3, padding='same', activation='relu')
        self.res_block1 = ResidualBlock(32)
        self.res_block2 = ResidualBlock(32)
        self.pool = keras.layers.MaxPooling2D(2)
        self.conv_expand = keras.layers.Conv2D(64, 3, padding='same', activation='relu')
        self.res_block3 = ResidualBlock(64)
        self.gap = keras.layers.GlobalAveragePooling2D()
        self.dropout = keras.layers.Dropout(0.5)
        self.fc = keras.layers.Dense(num_classes, activation='softmax')

    def call(self, inputs, training=False):
        x = self.conv_stem(inputs)
        x = self.res_block1(x, training=training)
        x = self.res_block2(x, training=training)
        x = self.pool(x)
        x = self.conv_expand(x)
        x = self.res_block3(x, training=training)
        x = self.gap(x)
        x = self.dropout(x, training=training)
        return self.fc(x)


# 사용
model = CustomCNN(num_classes=10)
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# 더미 데이터로 테스트
dummy_input = tf.random.normal([4, 32, 32, 3])
output = model(dummy_input, training=False)
print(output.shape)  # (4, 10)
model.summary()

6. CNN 구현 (CIFAR-10)

기본 CNN 모델

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# CIFAR-10 데이터 로딩
(x_train, y_train), (x_test, y_test) = keras.datasets.cifar10.load_data()
x_train = x_train.astype('float32') / 255.0
x_test = x_test.astype('float32') / 255.0

class_names = ['비행기', '자동차', '새', '고양이', '사슴',
               '개', '개구리', '말', '배', '트럭']

print(f"훈련 데이터: {x_train.shape}")  # (50000, 32, 32, 3)
print(f"테스트 데이터: {x_test.shape}")  # (10000, 32, 32, 3)

# CNN 모델 정의
def build_cnn_model():
    model = keras.Sequential([
        # 첫 번째 합성곱 블록
        layers.Conv2D(32, (3, 3), padding='same', input_shape=(32, 32, 3)),
        layers.BatchNormalization(),
        layers.Activation('relu'),
        layers.Conv2D(32, (3, 3), padding='same'),
        layers.BatchNormalization(),
        layers.Activation('relu'),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25),

        # 두 번째 합성곱 블록
        layers.Conv2D(64, (3, 3), padding='same'),
        layers.BatchNormalization(),
        layers.Activation('relu'),
        layers.Conv2D(64, (3, 3), padding='same'),
        layers.BatchNormalization(),
        layers.Activation('relu'),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25),

        # 세 번째 합성곱 블록
        layers.Conv2D(128, (3, 3), padding='same'),
        layers.BatchNormalization(),
        layers.Activation('relu'),
        layers.Conv2D(128, (3, 3), padding='same'),
        layers.BatchNormalization(),
        layers.Activation('relu'),
        layers.GlobalAveragePooling2D(),
        layers.Dropout(0.5),

        # 완전 연결층
        layers.Dense(256, activation='relu'),
        layers.BatchNormalization(),
        layers.Dropout(0.5),
        layers.Dense(10, activation='softmax')
    ])
    return model

model = build_cnn_model()
model.summary()

model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=1e-3),
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

데이터 증강 (Data Augmentation)

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# Keras 내장 데이터 증강 레이어
data_augmentation = keras.Sequential([
    layers.RandomFlip('horizontal'),
    layers.RandomRotation(0.1),
    layers.RandomZoom(0.1),
    layers.RandomTranslation(0.1, 0.1),
    layers.RandomContrast(0.1),
], name='data_augmentation')

# 모델에 증강 레이어 통합
inputs = keras.Input(shape=(32, 32, 3))
x = data_augmentation(inputs)  # 훈련 시에만 적용됨
x = layers.Rescaling(1./255)(x)  # 정규화

# ... CNN 블록들 ...
x = layers.Conv2D(32, 3, padding='same', activation='relu')(x)
x = layers.MaxPooling2D()(x)
x = layers.Conv2D(64, 3, padding='same', activation='relu')(x)
x = layers.MaxPooling2D()(x)
x = layers.GlobalAveragePooling2D()(x)
outputs = layers.Dense(10, activation='softmax')(x)

augmented_model = keras.Model(inputs, outputs)

# tf.data를 이용한 데이터 증강 파이프라인
def augment_image(image, label):
    image = tf.cast(image, tf.float32) / 255.0
    image = tf.image.random_flip_left_right(image)
    image = tf.image.random_brightness(image, 0.2)
    image = tf.image.random_contrast(image, 0.8, 1.2)
    image = tf.image.random_saturation(image, 0.8, 1.2)
    image = tf.image.pad_to_bounding_box(image, 4, 4, 40, 40)
    image = tf.image.random_crop(image, [32, 32, 3])
    return image, label

(x_train, y_train), (x_test, y_test) = keras.datasets.cifar10.load_data()
y_train = y_train.flatten()
y_test = y_test.flatten()

train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.map(augment_image, num_parallel_calls=tf.data.AUTOTUNE)
train_dataset = train_dataset.shuffle(10000).batch(128).prefetch(tf.data.AUTOTUNE)

전이학습 (Transfer Learning)

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.applications import EfficientNetB0

# EfficientNetB0으로 전이학습
def build_transfer_model(num_classes=10):
    # 사전 훈련된 베이스 모델 (분류기 제외)
    base_model = EfficientNetB0(
        include_top=False,
        weights='imagenet',
        input_shape=(224, 224, 3)
    )

    # 초기에는 베이스 모델 동결
    base_model.trainable = False

    inputs = keras.Input(shape=(224, 224, 3))
    # EfficientNet은 내부에 전처리 포함
    x = base_model(inputs, training=False)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(256, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(num_classes, activation='softmax')(x)

    model = keras.Model(inputs, outputs)
    return model, base_model

model, base_model = build_transfer_model(num_classes=10)

model.compile(
    optimizer=keras.optimizers.Adam(1e-3),
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

# 1단계: 동결된 상태로 분류기 훈련
# model.fit(train_dataset, epochs=10, ...)

# Fine-tuning: 베이스 모델의 일부 레이어 해동
def fine_tune(model, base_model, fine_tune_at=100):
    base_model.trainable = True
    # fine_tune_at 이전 레이어는 계속 동결
    for layer in base_model.layers[:fine_tune_at]:
        layer.trainable = False

    model.compile(
        optimizer=keras.optimizers.Adam(1e-5),  # 매우 낮은 학습률
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    return model

# fine_tune(model, base_model, fine_tune_at=100)
# model.fit(train_dataset, epochs=20, ...)

7. RNN/LSTM 구현

시계열 예측 (LSTM)

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np

# 시계열 데이터 생성
def generate_time_series(n_samples, n_steps):
    t = np.linspace(0, 4 * np.pi, n_steps + 1)
    series = np.sin(t) + 0.1 * np.random.randn(n_samples, n_steps + 1)
    X = series[:, :-1].reshape(-1, n_steps, 1)
    y = series[:, 1:].reshape(-1, n_steps, 1)
    return X, y

X_train, y_train = generate_time_series(10000, 50)
X_val, y_val = generate_time_series(1000, 50)
X_test, y_test = generate_time_series(1000, 50)

# LSTM 모델
def build_lstm_model(n_steps=50):
    model = keras.Sequential([
        layers.LSTM(64, return_sequences=True, input_shape=(n_steps, 1)),
        layers.Dropout(0.2),
        layers.LSTM(64, return_sequences=True),
        layers.Dropout(0.2),
        layers.TimeDistributed(layers.Dense(1))
    ])
    model.compile(
        optimizer='adam',
        loss='mse',
        metrics=['mae']
    )
    return model

model = build_lstm_model()
model.summary()

history = model.fit(
    X_train, y_train,
    epochs=20,
    batch_size=128,
    validation_data=(X_val, y_val)
)

텍스트 생성 (문자 수준 RNN)

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np

# 예시 텍스트 (실제로는 더 큰 텍스트 사용)
text = """TensorFlow는 구글이 만든 딥러닝 프레임워크입니다.
Keras는 TensorFlow 위에서 동작하는 고수준 API입니다.
두 라이브러리를 함께 사용하면 강력한 딥러닝 모델을 빠르게 구축할 수 있습니다."""

# 문자 집합 생성
chars = sorted(set(text))
char2idx = {c: i for i, c in enumerate(chars)}
idx2char = np.array(chars)
vocab_size = len(chars)

# 텍스트를 인덱스로 변환
text_as_int = np.array([char2idx[c] for c in text])

# 시퀀스 생성
seq_length = 50
sequences = tf.data.Dataset.from_tensor_slices(text_as_int)
sequences = sequences.batch(seq_length + 1, drop_remainder=True)

def split_input_target(chunk):
    return chunk[:-1], chunk[1:]

dataset = sequences.map(split_input_target)
dataset = dataset.shuffle(100).batch(32, drop_remainder=True)

# 문자 수준 RNN 모델
def build_char_rnn(vocab_size, embed_dim=64, rnn_units=256):
    model = keras.Sequential([
        layers.Embedding(vocab_size, embed_dim),
        layers.GRU(rnn_units, return_sequences=True, stateful=False),
        layers.GRU(rnn_units, return_sequences=True),
        layers.Dense(vocab_size)
    ])
    return model

model = build_char_rnn(vocab_size)
model.compile(
    optimizer='adam',
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=['accuracy']
)

# 텍스트 생성 함수
def generate_text(model, start_string, num_generate=100, temperature=1.0):
    input_eval = [char2idx[s] for s in start_string]
    input_eval = tf.expand_dims(input_eval, 0)
    text_generated = []

    for _ in range(num_generate):
        predictions = model(input_eval)
        predictions = tf.squeeze(predictions, 0) / temperature
        predicted_id = tf.random.categorical(predictions, num_samples=1)[-1, 0].numpy()
        input_eval = tf.expand_dims([predicted_id], 0)
        text_generated.append(idx2char[predicted_id])

    return start_string + ''.join(text_generated)

Bidirectional LSTM

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# 양방향 LSTM (텍스트 분류)
def build_bidirectional_model(vocab_size, max_len=100, embed_dim=64):
    model = keras.Sequential([
        layers.Embedding(vocab_size, embed_dim, input_length=max_len),
        layers.Bidirectional(layers.LSTM(64, return_sequences=True)),
        layers.Bidirectional(layers.LSTM(32)),
        layers.Dense(64, activation='relu'),
        layers.Dropout(0.5),
        layers.Dense(1, activation='sigmoid')
    ])
    model.compile(
        optimizer='adam',
        loss='binary_crossentropy',
        metrics=['accuracy']
    )
    return model

model = build_bidirectional_model(vocab_size=10000, max_len=100)
model.summary()

8. Transformer with Keras

Multi-head Attention과 Transformer Encoder

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

class TransformerBlock(keras.layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1, **kwargs):
        super().__init__(**kwargs)
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = keras.Sequential([
            layers.Dense(ff_dim, activation='relu'),
            layers.Dense(embed_dim)
        ])
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training=False):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)

        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)


class TokenAndPositionEmbedding(keras.layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_emb = layers.Embedding(vocab_size, embed_dim)
        self.pos_emb = layers.Embedding(maxlen, embed_dim)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions


# 텍스트 분류 Transformer 모델
def build_transformer_classifier(
    vocab_size=20000,
    maxlen=200,
    embed_dim=32,
    num_heads=2,
    ff_dim=32,
    num_classes=2
):
    inputs = layers.Input(shape=(maxlen,))
    embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
    x = embedding_layer(inputs)

    transformer_block = TransformerBlock(embed_dim, num_heads, ff_dim)
    x = transformer_block(x)

    x = layers.GlobalAveragePooling1D()(x)
    x = layers.Dropout(0.1)(x)
    x = layers.Dense(20, activation='relu')(x)
    x = layers.Dropout(0.1)(x)
    outputs = layers.Dense(num_classes, activation='softmax')(x)

    model = keras.Model(inputs=inputs, outputs=outputs)
    return model


model = build_transformer_classifier()
model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)
model.summary()

# IMDB 감성 분류 예시
vocab_size = 20000
maxlen = 200

(x_train, y_train), (x_val, y_val) = keras.datasets.imdb.load_data(num_words=vocab_size)
x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=maxlen)
x_val = keras.preprocessing.sequence.pad_sequences(x_val, maxlen=maxlen)

# model.fit(x_train, y_train, batch_size=32, epochs=5, validation_data=(x_val, y_val))

9. 데이터 파이프라인 (tf.data)

기본 tf.data.Dataset 사용법

import tensorflow as tf
import numpy as np

# 텐서에서 Dataset 생성
X = np.random.randn(1000, 10)
y = np.random.randint(0, 2, 1000)

dataset = tf.data.Dataset.from_tensor_slices((X, y))
print(dataset)  # TensorSliceDataset

# 기본 변환
dataset = (dataset
    .shuffle(buffer_size=1000)
    .batch(32)
    .prefetch(tf.data.AUTOTUNE)
)

# 데이터 확인
for batch_X, batch_y in dataset.take(1):
    print(f"배치 X 형태: {batch_X.shape}")  # (32, 10)
    print(f"배치 y 형태: {batch_y.shape}")  # (32,)

# map 변환
def preprocess(x, y):
    x = tf.cast(x, tf.float32)
    y = tf.cast(y, tf.int32)
    x = (x - tf.reduce_mean(x)) / tf.math.reduce_std(x)
    return x, y

dataset = tf.data.Dataset.from_tensor_slices((X, y))
dataset = dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)

# filter 변환
positive_dataset = dataset.filter(lambda x, y: y == 1)

# 범위로 생성
range_dataset = tf.data.Dataset.range(100)

# zip으로 결합
features_ds = tf.data.Dataset.from_tensor_slices(X)
labels_ds = tf.data.Dataset.from_tensor_slices(y)
combined_ds = tf.data.Dataset.zip((features_ds, labels_ds))

이미지 데이터 로딩 파이프라인

import tensorflow as tf
import os

# 디렉토리 구조: data/train/class_name/image.jpg
def load_and_preprocess_image(path, label, image_size=(224, 224)):
    image = tf.io.read_file(path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, image_size)
    image = tf.cast(image, tf.float32) / 255.0
    return image, label

def create_image_dataset(data_dir, batch_size=32, image_size=(224, 224)):
    class_names = sorted(os.listdir(data_dir))
    class_map = {name: idx for idx, name in enumerate(class_names)}

    file_paths = []
    labels = []
    for class_name in class_names:
        class_dir = os.path.join(data_dir, class_name)
        for fname in os.listdir(class_dir):
            if fname.endswith(('.jpg', '.jpeg', '.png')):
                file_paths.append(os.path.join(class_dir, fname))
                labels.append(class_map[class_name])

    path_ds = tf.data.Dataset.from_tensor_slices(file_paths)
    label_ds = tf.data.Dataset.from_tensor_slices(labels)
    combined = tf.data.Dataset.zip((path_ds, label_ds))

    dataset = combined.map(
        lambda p, l: load_and_preprocess_image(p, l, image_size),
        num_parallel_calls=tf.data.AUTOTUNE
    )
    dataset = dataset.shuffle(1000).batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return dataset, class_names

# keras 내장 image_dataset_from_directory 사용 (더 편리)
# train_ds = keras.utils.image_dataset_from_directory(
#     'data/train',
#     image_size=(224, 224),
#     batch_size=32
# )

TFRecord 형식

import tensorflow as tf

# TFRecord 파일 작성
def bytes_feature(value):
    if isinstance(value, type(tf.constant(0))):
        value = value.numpy()
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

def float_feature(value):
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def image_to_tfrecord(image_path, label, writer):
    image_string = open(image_path, 'rb').read()
    feature = {
        'image': bytes_feature(image_string),
        'label': int64_feature(label),
    }
    example = tf.train.Example(features=tf.train.Features(feature=feature))
    writer.write(example.SerializeToString())

# TFRecord 파일 읽기
def parse_tfrecord(serialized_example):
    feature_description = {
        'image': tf.io.FixedLenFeature([], tf.string),
        'label': tf.io.FixedLenFeature([], tf.int64),
    }
    example = tf.io.parse_single_example(serialized_example, feature_description)
    image = tf.image.decode_jpeg(example['image'], channels=3)
    image = tf.image.resize(image, [224, 224])
    image = tf.cast(image, tf.float32) / 255.0
    label = example['label']
    return image, label

# TFRecord Dataset 생성
# dataset = tf.data.TFRecordDataset(['data.tfrecord'])
# dataset = dataset.map(parse_tfrecord, num_parallel_calls=tf.data.AUTOTUNE)
# dataset = dataset.shuffle(1000).batch(32).prefetch(tf.data.AUTOTUNE)

10. 훈련 고급 기법

콜백 (Callbacks)

import tensorflow as tf
from tensorflow import keras
import os

# ModelCheckpoint: 최적 모델 저장
checkpoint_cb = keras.callbacks.ModelCheckpoint(
    filepath='best_model.keras',
    monitor='val_accuracy',
    mode='max',
    save_best_only=True,
    verbose=1
)

# EarlyStopping: 과적합 방지
early_stopping_cb = keras.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=10,
    restore_best_weights=True,
    verbose=1
)

# ReduceLROnPlateau: 학습률 자동 감소
reduce_lr_cb = keras.callbacks.ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=5,
    min_lr=1e-7,
    verbose=1
)

# TensorBoard: 시각화
tensorboard_cb = keras.callbacks.TensorBoard(
    log_dir='logs/',
    histogram_freq=1,
    write_graph=True,
    write_images=True,
    update_freq='epoch'
)

# LearningRateScheduler: 학습률 스케줄링
def cosine_decay_schedule(epoch, lr):
    import math
    initial_lr = 1e-3
    total_epochs = 100
    return initial_lr * (1 + math.cos(math.pi * epoch / total_epochs)) / 2

lr_scheduler_cb = keras.callbacks.LearningRateScheduler(
    cosine_decay_schedule, verbose=1
)

# CSV로 로그 저장
csv_logger_cb = keras.callbacks.CSVLogger('training_log.csv')

# 커스텀 콜백
class ConfusionMatrixCallback(keras.callbacks.Callback):
    def __init__(self, validation_data, class_names):
        super().__init__()
        self.X_val, self.y_val = validation_data
        self.class_names = class_names

    def on_epoch_end(self, epoch, logs=None):
        y_pred = tf.argmax(self.model.predict(self.X_val), axis=1)
        cm = tf.math.confusion_matrix(self.y_val, y_pred)
        if epoch % 10 == 0:
            print(f"\n에포크 {epoch} 혼동 행렬:\n{cm.numpy()}")


# 모든 콜백을 하나의 리스트로
callbacks = [
    checkpoint_cb,
    early_stopping_cb,
    reduce_lr_cb,
    tensorboard_cb,
    csv_logger_cb
]

# 사용 예시
# history = model.fit(
#     X_train, y_train,
#     epochs=100,
#     validation_data=(X_val, y_val),
#     callbacks=callbacks
# )

커스텀 훈련 루프 (GradientTape)

import tensorflow as tf
from tensorflow import keras
import time

# 간단한 분류 모델
model = keras.Sequential([
    keras.layers.Dense(64, activation='relu', input_shape=(784,)),
    keras.layers.Dense(64, activation='relu'),
    keras.layers.Dense(10)
])

# 손실 함수와 옵티마이저
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = keras.optimizers.Adam(learning_rate=1e-3)

# 메트릭
train_loss = keras.metrics.Mean(name='train_loss')
train_accuracy = keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')
val_loss = keras.metrics.Mean(name='val_loss')
val_accuracy = keras.metrics.SparseCategoricalAccuracy(name='val_accuracy')

# 단일 훈련 스텝
@tf.function  # JIT 컴파일로 속도 향상
def train_step(images, labels):
    with tf.GradientTape() as tape:
        predictions = model(images, training=True)
        loss = loss_fn(labels, predictions)
        # L2 정규화 추가
        l2_loss = tf.add_n([tf.nn.l2_loss(v) for v in model.trainable_variables
                            if 'bias' not in v.name])
        total_loss = loss + 1e-4 * l2_loss

    gradients = tape.gradient(total_loss, model.trainable_variables)
    # 그래디언트 클리핑
    gradients, _ = tf.clip_by_global_norm(gradients, 1.0)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    train_loss.update_state(loss)
    train_accuracy.update_state(labels, predictions)
    return loss

@tf.function
def val_step(images, labels):
    predictions = model(images, training=False)
    loss = loss_fn(labels, predictions)
    val_loss.update_state(loss)
    val_accuracy.update_state(labels, predictions)

# 전체 훈련 루프
def train(train_ds, val_ds, epochs=10):
    for epoch in range(epochs):
        start_time = time.time()

        # 메트릭 초기화
        train_loss.reset_states()
        train_accuracy.reset_states()
        val_loss.reset_states()
        val_accuracy.reset_states()

        # 훈련
        for step, (images, labels) in enumerate(train_ds):
            train_step(images, labels)
            if step % 100 == 0:
                print(f"스텝 {step}: 손실={train_loss.result():.4f}")

        # 검증
        for images, labels in val_ds:
            val_step(images, labels)

        elapsed = time.time() - start_time
        print(f"에포크 {epoch+1}/{epochs} ({elapsed:.1f}s) - "
              f"손실: {train_loss.result():.4f}, "
              f"정확도: {train_accuracy.result():.4f}, "
              f"검증 손실: {val_loss.result():.4f}, "
              f"검증 정확도: {val_accuracy.result():.4f}")

분산 학습 (tf.distribute.Strategy)

import tensorflow as tf
from tensorflow import keras

# 다중 GPU 전략
strategy = tf.distribute.MirroredStrategy()
print(f"사용 가능한 디바이스 수: {strategy.num_replicas_in_sync}")

with strategy.scope():
    # 모델 정의, 컴파일은 strategy.scope() 안에서
    model = keras.Sequential([
        keras.layers.Dense(64, activation='relu', input_shape=(784,)),
        keras.layers.Dense(64, activation='relu'),
        keras.layers.Dense(10, activation='softmax')
    ])
    model.compile(
        optimizer='adam',
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )

# 배치 크기는 GPU 수에 비례하여 증가
BATCH_SIZE_PER_REPLICA = 64
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

# TPU 사용
# resolver = tf.distribute.cluster_resolver.TPUClusterResolver()
# tf.config.experimental_connect_to_cluster(resolver)
# tf.tpu.experimental.initialize_tpu_system(resolver)
# tpu_strategy = tf.distribute.TPUStrategy(resolver)

# 혼합 정밀도 훈련 (Mixed Precision)
from tensorflow.keras import mixed_precision
policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)

with strategy.scope():
    inputs = keras.Input(shape=(784,))
    x = keras.layers.Dense(64, activation='relu')(inputs)
    # 마지막 출력은 float32로
    outputs = keras.layers.Dense(10, activation='softmax', dtype='float32')(x)
    model_mp = keras.Model(inputs, outputs)

    opt = keras.optimizers.Adam(1e-3)
    opt = mixed_precision.LossScaleOptimizer(opt)

    model_mp.compile(
        optimizer=opt,
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )

11. TensorBoard 시각화

기본 TensorBoard 사용

import tensorflow as tf
from tensorflow import keras
import datetime
import numpy as np

# 로그 디렉토리
log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")

# 콜백 설정
tensorboard_callback = keras.callbacks.TensorBoard(
    log_dir=log_dir,
    histogram_freq=1,     # 가중치 히스토그램 주기
    write_graph=True,     # 계산 그래프 기록
    write_images=True,    # 가중치 이미지 기록
    update_freq='epoch',  # 업데이트 주기
    profile_batch=2       # 프로파일링 배치
)

# 커스텀 스칼라 기록
file_writer = tf.summary.create_file_writer(log_dir + '/custom_scalars')

def log_custom_metrics(epoch, logs):
    with file_writer.as_default():
        tf.summary.scalar('learning_rate',
                          data=keras.backend.get_value(model.optimizer.lr),
                          step=epoch)

lr_callback = keras.callbacks.LambdaCallback(on_epoch_end=log_custom_metrics)

# 이미지 기록
def log_images(epoch, logs):
    # 테스트 이미지 가져오기
    (_, _), (x_test, y_test) = keras.datasets.mnist.load_data()
    x_test = x_test[:10].reshape(-1, 28, 28, 1).astype('float32') / 255.0

    with file_writer.as_default():
        tf.summary.image("테스트 샘플", x_test, step=epoch, max_outputs=10)

image_callback = keras.callbacks.LambdaCallback(on_epoch_end=log_images)

# TensorBoard 시작 명령어
# tensorboard --logdir logs/fit

임베딩 시각화

import tensorflow as tf
from tensorflow import keras
import os
import numpy as np
from tensorboard.plugins import projector

# 임베딩 레이어 학습 후 시각화
(x_train, y_train), _ = keras.datasets.mnist.load_data()
x_train = x_train.astype('float32').reshape(-1, 784) / 255.0

# 임베딩 모델
embedding_model = keras.Sequential([
    keras.layers.Dense(64, activation='relu', input_shape=(784,)),
    keras.layers.Dense(32, name='embedding')  # 임베딩 레이어
])

# 임베딩 추출
embeddings = embedding_model.predict(x_train[:1000])

# 임베딩 파일 저장
log_dir = 'logs/embedding'
os.makedirs(log_dir, exist_ok=True)
np.savetxt(os.path.join(log_dir, 'vectors.tsv'), embeddings, delimiter='\t')

# 메타데이터 (레이블)
with open(os.path.join(log_dir, 'metadata.tsv'), 'w') as f:
    for label in y_train[:1000]:
        f.write(f"{label}\n")

# Projector 설정
config = projector.ProjectorConfig()
embedding_config = config.embeddings.add()
embedding_config.tensor_name = "embedding/.ATTRIBUTES/VARIABLE_VALUE"
embedding_config.tensor_path = 'vectors.tsv'
embedding_config.metadata_path = 'metadata.tsv'
projector.visualize_embeddings(log_dir, config)

12. 모델 저장과 변환

SavedModel 형식

import tensorflow as tf
from tensorflow import keras

# 모델 저장
model.save('saved_model/my_model')

# 로드
loaded_model = keras.models.load_model('saved_model/my_model')

# HDF5 형식 (레거시)
model.save('my_model.h5')
loaded_h5 = keras.models.load_model('my_model.h5')

# 가중치만 저장/로드
model.save_weights('model_weights.h5')
model.load_weights('model_weights.h5')

# Keras 네이티브 형식 (권장)
model.save('my_model.keras')
loaded_keras = keras.models.load_model('my_model.keras')

# 서브클래싱 모델의 경우 get_config 구현 필요
class MyModel(keras.Model):
    def __init__(self, units):
        super().__init__()
        self.units = units
        self.dense = keras.layers.Dense(units)

    def call(self, inputs):
        return self.dense(inputs)

    def get_config(self):
        return {'units': self.units}

    @classmethod
    def from_config(cls, config):
        return cls(**config)

TensorFlow Lite 변환 (모바일/엣지)

import tensorflow as tf
from tensorflow import keras
import numpy as np

# 기본 TFLite 변환
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model/my_model')
tflite_model = converter.convert()

with open('model.tflite', 'wb') as f:
    f.write(tflite_model)

# 최적화 옵션들
# 동적 범위 양자화
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model/my_model')
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_quantized = converter.convert()

# 전체 정수 양자화 (INT8)
def representative_dataset():
    for _ in range(100):
        data = np.random.random((1, 784)).astype(np.float32)
        yield [data]

converter = tf.lite.TFLiteConverter.from_saved_model('saved_model/my_model')
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_dataset
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8
tflite_int8 = converter.convert()

# TFLite 모델 실행
interpreter = tf.lite.Interpreter(model_content=tflite_quantized)
interpreter.allocate_tensors()

input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

print("입력:", input_details[0]['shape'])
print("출력:", output_details[0]['shape'])

# 추론
input_data = np.random.random((1, 784)).astype(np.float32)
interpreter.set_tensor(input_details[0]['index'], input_data)
interpreter.invoke()
output_data = interpreter.get_tensor(output_details[0]['index'])
print("TFLite 출력:", output_data.shape)

TensorFlow.js 변환

# tfjs 변환 도구 설치
pip install tensorflowjs

# SavedModel에서 TFJS로 변환
tensorflowjs_converter \
    --input_format=tf_saved_model \
    --output_format=tfjs_graph_model \
    --signature_name=serving_default \
    saved_model/my_model \
    tfjs_model/

13. TF-Serving으로 프로덕션 배포

TensorFlow Serving 설치 및 기본 사용

# Docker로 TF Serving 실행 (가장 쉬운 방법)
docker pull tensorflow/serving

# 모델 서빙
docker run -t --rm \
    -p 8501:8501 \
    -v "/path/to/saved_model:/models/my_model" \
    -e MODEL_NAME=my_model \
    tensorflow/serving

# GPU 지원
docker run --gpus all -t --rm \
    -p 8501:8501 \
    -v "/path/to/saved_model:/models/my_model" \
    -e MODEL_NAME=my_model \
    tensorflow/serving:latest-gpu

REST API로 추론 요청

import requests
import json
import numpy as np

# REST API 요청
url = 'http://localhost:8501/v1/models/my_model:predict'

# 입력 데이터 준비
data = np.random.random((1, 784)).astype(float)
payload = {
    "instances": data.tolist()
}

response = requests.post(url, json=payload)
result = json.loads(response.text)
print("예측 결과:", result['predictions'])

# 모델 정보 확인
info_url = 'http://localhost:8501/v1/models/my_model'
info_response = requests.get(info_url)
print("모델 정보:", json.loads(info_response.text))

gRPC 클라이언트

import grpc
import numpy as np
from tensorflow_serving.apis import predict_pb2
from tensorflow_serving.apis import prediction_service_pb2_grpc
import tensorflow as tf

# gRPC 채널 생성
channel = grpc.insecure_channel('localhost:8500')
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)

# 요청 생성
request = predict_pb2.PredictRequest()
request.model_spec.name = 'my_model'
request.model_spec.signature_name = 'serving_default'

# 입력 텐서 설정
input_data = np.random.random((1, 784)).astype(np.float32)
request.inputs['input_layer'].CopyFrom(
    tf.make_tensor_proto(input_data, shape=input_data.shape)
)

# 추론 실행
response = stub.Predict(request, 10.0)  # 10초 타임아웃
output = tf.make_ndarray(response.outputs['output'])
print("gRPC 예측 결과:", output)

버전 관리와 Canary 배포

# model.config 파일
model_config_list {
  config {
    name: 'my_model'
    base_path: '/models/my_model/'
    model_platform: 'tensorflow'
    model_version_policy {
      specific {
        versions: 1
        versions: 2
      }
    }
    version_labels {
      key: 'stable'
      value: 1
    }
    version_labels {
      key: 'canary'
      value: 2
    }
  }
}
# 설정 파일로 서빙
docker run -t --rm \
    -p 8501:8501 -p 8500:8500 \
    -v "/path/to/models:/models" \
    -v "/path/to/model.config:/models/model.config" \
    tensorflow/serving \
    --model_config_file=/models/model.config
# 특정 버전으로 요청
url = 'http://localhost:8501/v1/models/my_model/versions/1:predict'

# 또는 레이블로 요청
url_label = 'http://localhost:8501/v1/models/my_model/labels/stable:predict'

14. TensorFlow Extended (TFX)

TFX는 TensorFlow 기반의 프로덕션 머신러닝 파이프라인 플랫폼입니다.

ML 파이프라인 개요

import tfx
from tfx.components import (
    CsvExampleGen,
    StatisticsGen,
    SchemaGen,
    ExampleValidator,
    Transform,
    Trainer,
    Evaluator,
    Pusher
)
from tfx.proto import pusher_pb2, trainer_pb2
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

# 인터랙티브 컨텍스트 생성 (노트북에서 사용)
context = InteractiveContext()

# 1. ExampleGen: 데이터 수집
example_gen = CsvExampleGen(input_base='data/')
context.run(example_gen)

# 2. StatisticsGen: 데이터 통계
statistics_gen = StatisticsGen(
    examples=example_gen.outputs['examples']
)
context.run(statistics_gen)

# 3. SchemaGen: 스키마 생성
schema_gen = SchemaGen(
    statistics=statistics_gen.outputs['statistics'],
    infer_feature_shape=True
)
context.run(schema_gen)

# 4. ExampleValidator: 데이터 검증
example_validator = ExampleValidator(
    statistics=statistics_gen.outputs['statistics'],
    schema=schema_gen.outputs['schema']
)
context.run(example_validator)

# 5. Transform: 특징 엔지니어링
# transform.py 파일에 preprocessing_fn 정의 필요

# 6. Trainer: 모델 훈련
trainer = Trainer(
    module_file='trainer_module.py',
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    train_args=trainer_pb2.TrainArgs(num_steps=1000),
    eval_args=trainer_pb2.EvalArgs(num_steps=500)
)

# 7. Pusher: 모델 배포
pusher = Pusher(
    model=trainer.outputs['model'],
    push_destination=pusher_pb2.PushDestination(
        filesystem=pusher_pb2.PushDestination.Filesystem(
            base_directory='serving_model/'
        )
    )
)

Transform 컴포넌트 예시

import tensorflow as tf
import tensorflow_transform as tft

# transform.py
FEATURE_KEYS = ['feature1', 'feature2', 'feature3']
LABEL_KEY = 'label'

def preprocessing_fn(inputs):
    """특징 전처리 함수"""
    outputs = {}

    for key in FEATURE_KEYS:
        # 정규화
        outputs[key] = tft.scale_to_z_score(inputs[key])

    # 레이블 인코딩
    outputs[LABEL_KEY] = tf.cast(inputs[LABEL_KEY], tf.int64)

    return outputs

마무리

이 가이드에서는 TensorFlow와 Keras의 핵심 개념부터 프로덕션 배포까지 광범위하게 다루었습니다.

핵심 정리

  • Eager Execution: TF 2.x의 기본 실행 모드, @tf.function으로 그래프 최적화
  • Keras 3가지 API: Sequential(간단), Functional(복잡한 토폴로지), Subclassing(완전한 커스터마이징)
  • tf.data: 효율적인 데이터 파이프라인을 위한 필수 도구
  • GradientTape: 커스텀 훈련 루프와 자동 미분
  • 배포 옵션: TF Serving(서버), TFLite(모바일/엣지), TF.js(브라우저)
  • TFX: 프로덕션 ML 파이프라인의 표준

더 배울 내용

  • TensorFlow Probability (확률론적 딥러닝)
  • Keras Tuner (하이퍼파라미터 최적화)
  • TensorFlow Datasets (표준 데이터셋)
  • TensorFlow Hub (사전 훈련된 모델)
  • tf-agents (강화학습)

참고 자료