Skip to content

필사 모드: TensorFlow & Keras 완전 정복 가이드: Zero to Hero - 설치부터 프로덕션 배포까지

한국어
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

들어가며

TensorFlow는 Google Brain 팀이 2015년에 오픈소스로 공개한 머신러닝/딥러닝 프레임워크입니다. 현재 가장 널리 사용되는 딥러닝 프레임워크 중 하나로, 연구부터 프로덕션 배포까지 전 과정을 지원합니다. Keras는 TensorFlow 2.x부터 공식 고수준 API로 통합되어, 직관적이고 빠른 모델 개발을 가능하게 합니다.

이 가이드에서는 TensorFlow와 Keras의 기초부터 시작해 실제 프로덕션 환경에 배포하는 것까지 단계별로 완전히 다룹니다.

1. TensorFlow 소개와 설치

TensorFlow vs PyTorch 비교

두 프레임워크는 각각 장단점이 있습니다.

| 항목 | TensorFlow/Keras | PyTorch |

| --------------- | ------------------------- | ----------------------- |

| 개발사 | Google | Meta (Facebook) |

| 배포 도구 | TF Serving, TFLite, TF.js | TorchServe, TorchScript |

| 프로덕션 성숙도 | 매우 높음 | 높음 |

| 연구 인기도 | 높음 | 매우 높음 |

| 학습 곡선 | 중간 | 낮음 |

| 모바일/엣지 | TFLite 우수 | ExecuTorch |

| 생태계 | TFX, TFHub 등 | HuggingFace 통합 우수 |

설치

pip으로 설치하는 방법입니다.

CPU 전용

pip install tensorflow

GPU 지원 (CUDA 자동 감지, TF 2.9+)

pip install tensorflow[and-cuda]

특정 버전

pip install tensorflow==2.15.0

conda 환경

conda create -n tf_env python=3.10

conda activate tf_env

conda install -c conda-forge tensorflow

macOS Apple Silicon에서는 다음과 같이 설치합니다.

pip install tensorflow-macos

pip install tensorflow-metal # GPU 가속

TensorFlow 2.x 주요 변화

TensorFlow 2.0에서 가장 큰 변화는 Eager Execution의 기본 활성화입니다. TF 1.x에서는 계산 그래프를 먼저 정의하고 세션(Session)을 통해 실행했지만, TF 2.x에서는 Python 코드처럼 즉시 실행됩니다.

print(tf.__version__)

Eager execution 확인

print(tf.executing_eagerly()) # True

TF 1.x 스타일의 그래프 실행 (호환성)

@tf.function

def compute(x, y):

return x + y

result = compute(tf.constant(1.0), tf.constant(2.0))

print(result) # tf.Tensor(3.0, shape=(), dtype=float32)

GPU 설정 확인

GPU 목록 확인

gpus = tf.config.list_physical_devices('GPU')

print("사용 가능한 GPU:", gpus)

GPU 메모리 증분 할당 (OOM 방지)

if gpus:

for gpu in gpus:

tf.config.experimental.set_memory_growth(gpu, True)

특정 GPU만 사용

if gpus:

tf.config.set_visible_devices(gpus[0], 'GPU')

논리적 GPU 분할 (하나의 물리 GPU를 여러 개처럼 사용)

if gpus:

tf.config.set_logical_device_configuration(

gpus[0],

[tf.config.LogicalDeviceConfiguration(memory_limit=2048),

tf.config.LogicalDeviceConfiguration(memory_limit=2048)]

)

현재 연산이 실행되는 디바이스 확인

with tf.device('/GPU:0'):

a = tf.constant([[1.0, 2.0], [3.0, 4.0]])

b = tf.constant([[5.0, 6.0], [7.0, 8.0]])

c = tf.matmul(a, b)

print(c.device)

2. TensorFlow 텐서(Tensor) 기초

텐서(Tensor)는 TensorFlow의 핵심 데이터 구조입니다. NumPy 배열과 유사하지만 GPU에서 가속할 수 있으며, 자동 미분을 지원합니다.

tf.constant와 tf.Variable

스칼라 (0차원 텐서)

scalar = tf.constant(42)

print(scalar) # tf.Tensor(42, shape=(), dtype=int32)

print(scalar.dtype) # tf.int32

print(scalar.shape) # ()

벡터 (1차원 텐서)

vector = tf.constant([1.0, 2.0, 3.0])

print(vector) # tf.Tensor([1. 2. 3.], shape=(3,), dtype=float32)

행렬 (2차원 텐서)

matrix = tf.constant([[1, 2, 3],

[4, 5, 6]], dtype=tf.float32)

print(matrix.shape) # (2, 3)

3차원 텐서

tensor_3d = tf.constant([[[1, 2], [3, 4]],

[[5, 6], [7, 8]]])

print(tensor_3d.shape) # (2, 2, 2)

특수 텐서

zeros = tf.zeros([3, 4]) # 0으로 채운 3x4 행렬

ones = tf.ones([2, 3]) # 1로 채운 2x3 행렬

identity = tf.eye(4) # 4x4 단위 행렬

random = tf.random.normal([3, 3]) # 정규분포 랜덤 행렬

tf.Variable - 학습 파라미터에 사용 (값 변경 가능)

var = tf.Variable([1.0, 2.0, 3.0])

print(var) # <tf.Variable 'Variable:0' ...>

var.assign([4.0, 5.0, 6.0]) # 값 변경

var.assign_add([1.0, 1.0, 1.0]) # 더하기

var.assign_sub([0.5, 0.5, 0.5]) # 빼기

텐서 연산

a = tf.constant([[1.0, 2.0], [3.0, 4.0]])

b = tf.constant([[5.0, 6.0], [7.0, 8.0]])

기본 산술 연산 (element-wise)

print(a + b) # 덧셈

print(a - b) # 뺄셈

print(a * b) # 곱셈 (element-wise)

print(a / b) # 나눗셈

print(a ** 2) # 제곱

동등한 TF 함수들

print(tf.add(a, b))

print(tf.subtract(a, b))

print(tf.multiply(a, b))

print(tf.divide(a, b))

행렬 곱셈

print(tf.matmul(a, b)) # 또는 a @ b

print(a @ b)

수학 함수

x = tf.constant([1.0, 4.0, 9.0, 16.0])

print(tf.sqrt(x)) # [1, 2, 3, 4]

print(tf.exp(x)) # e^x

print(tf.math.log(x)) # 자연로그

집계 연산

matrix = tf.constant([[1.0, 2.0, 3.0],

[4.0, 5.0, 6.0]])

print(tf.reduce_sum(matrix)) # 전체 합: 21

print(tf.reduce_sum(matrix, axis=0)) # 열별 합: [5, 7, 9]

print(tf.reduce_sum(matrix, axis=1)) # 행별 합: [6, 15]

print(tf.reduce_mean(matrix)) # 평균

print(tf.reduce_max(matrix)) # 최대값

print(tf.reduce_min(matrix)) # 최소값

print(tf.argmax(matrix, axis=1)) # 행별 최대값 인덱스

비교 연산

print(tf.equal(a, b))

print(tf.greater(a, b))

print(tf.less_equal(a, b))

형태 변환

t = tf.constant([[1, 2, 3, 4],

[5, 6, 7, 8]])

print(t.shape) # (2, 4)

reshape

reshaped = tf.reshape(t, [4, 2])

print(reshaped.shape) # (4, 2)

reshaped2 = tf.reshape(t, [8])

print(reshaped2.shape) # (8,)

reshaped3 = tf.reshape(t, [-1, 2]) # -1은 자동 계산

print(reshaped3.shape) # (4, 2)

transpose

transposed = tf.transpose(t)

print(transposed.shape) # (4, 2)

고차원 transpose

t3d = tf.random.normal([2, 3, 4])

transposed_3d = tf.transpose(t3d, perm=[0, 2, 1])

print(transposed_3d.shape) # (2, 4, 3)

expand_dims - 차원 추가

t1d = tf.constant([1, 2, 3])

print(t1d.shape) # (3,)

expanded_0 = tf.expand_dims(t1d, axis=0)

print(expanded_0.shape) # (1, 3)

expanded_1 = tf.expand_dims(t1d, axis=1)

print(expanded_1.shape) # (3, 1)

squeeze - 크기 1인 차원 제거

t_squeezable = tf.constant([[[1, 2, 3]]])

print(t_squeezable.shape) # (1, 1, 3)

squeezed = tf.squeeze(t_squeezable)

print(squeezed.shape) # (3,)

concat과 stack

a = tf.constant([[1, 2], [3, 4]])

b = tf.constant([[5, 6], [7, 8]])

concat_0 = tf.concat([a, b], axis=0)

print(concat_0.shape) # (4, 2)

concat_1 = tf.concat([a, b], axis=1)

print(concat_1.shape) # (2, 4)

stacked = tf.stack([a, b], axis=0)

print(stacked.shape) # (2, 2, 2)

Broadcasting

스칼라 broadcasting

matrix = tf.constant([[1.0, 2.0], [3.0, 4.0]])

print(matrix + 10) # 모든 원소에 10 더함

벡터 broadcasting

row_vector = tf.constant([10.0, 20.0]) # shape (2,)

print(matrix + row_vector) # 각 행에 row_vector 더함

col_vector = tf.constant([[10.0], [20.0]]) # shape (2, 1)

print(matrix + col_vector) # 각 열에 col_vector 더함

배치 연산에서의 broadcasting

batch = tf.random.normal([32, 128]) # 배치 크기 32, 특징 128

mean = tf.reduce_mean(batch, axis=0, keepdims=True) # shape (1, 128)

std = tf.math.reduce_std(batch, axis=0, keepdims=True)

normalized = (batch - mean) / (std + 1e-8) # broadcasting 적용

NumPy와 상호 변환

NumPy -> TensorFlow

np_array = np.array([[1.0, 2.0], [3.0, 4.0]])

tf_tensor = tf.constant(np_array)

tf_tensor2 = tf.convert_to_tensor(np_array)

TensorFlow -> NumPy

numpy_from_tf = tf_tensor.numpy()

print(type(numpy_from_tf)) # numpy.ndarray

tf.Tensor는 NumPy 함수 대부분 직접 사용 가능

print(np.sin(tf_tensor))

print(np.sqrt(tf_tensor))

3. Keras Sequential API

Sequential API는 Keras에서 가장 간단한 모델 구축 방법입니다. 레이어를 순서대로 쌓아 선형적인 모델을 만듭니다.

레이어 쌓기와 모델 컴파일

from tensorflow import keras

from tensorflow.keras import layers

방법 1: add() 메서드

model = keras.Sequential()

model.add(layers.Dense(128, activation='relu', input_shape=(784,)))

model.add(layers.Dropout(0.2))

model.add(layers.Dense(64, activation='relu'))

model.add(layers.Dropout(0.2))

model.add(layers.Dense(10, activation='softmax'))

방법 2: 리스트로 정의

model = keras.Sequential([

layers.Dense(128, activation='relu', input_shape=(784,)),

layers.Dropout(0.2),

layers.Dense(64, activation='relu'),

layers.Dropout(0.2),

layers.Dense(10, activation='softmax')

])

모델 구조 확인

model.summary()

컴파일

model.compile(

optimizer='adam', # 또는 keras.optimizers.Adam(learning_rate=0.001)

loss='sparse_categorical_crossentropy', # 정수 레이블

metrics=['accuracy']

)

다양한 optimizer와 loss

model.compile(

optimizer=keras.optimizers.SGD(learning_rate=0.01, momentum=0.9),

loss=keras.losses.CategoricalCrossentropy(),

metrics=[keras.metrics.CategoricalAccuracy()]

)

MNIST 분류 완전 예제

from tensorflow import keras

from tensorflow.keras import layers

데이터 로딩

(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

전처리

x_train = x_train.astype('float32') / 255.0

x_test = x_test.astype('float32') / 255.0

x_train = x_train.reshape(-1, 784)

x_test = x_test.reshape(-1, 784)

print(f"훈련 데이터: {x_train.shape}, 레이블: {y_train.shape}")

print(f"테스트 데이터: {x_test.shape}, 레이블: {y_test.shape}")

모델 정의

model = keras.Sequential([

layers.Dense(256, activation='relu', input_shape=(784,)),

layers.BatchNormalization(),

layers.Dropout(0.3),

layers.Dense(128, activation='relu'),

layers.BatchNormalization(),

layers.Dropout(0.3),

layers.Dense(10, activation='softmax')

])

model.summary()

컴파일

model.compile(

optimizer=keras.optimizers.Adam(learning_rate=1e-3),

loss='sparse_categorical_crossentropy',

metrics=['accuracy']

)

훈련

history = model.fit(

x_train, y_train,

batch_size=128,

epochs=20,

validation_split=0.1,

verbose=1

)

평가

test_loss, test_acc = model.evaluate(x_test, y_test, verbose=0)

print(f"테스트 손실: {test_loss:.4f}")

print(f"테스트 정확도: {test_acc:.4f}")

예측

predictions = model.predict(x_test[:10])

predicted_classes = tf.argmax(predictions, axis=1)

print("예측:", predicted_classes.numpy())

print("실제:", y_test[:10])

학습 곡선 시각화

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))

ax1.plot(history.history['loss'], label='훈련 손실')

ax1.plot(history.history['val_loss'], label='검증 손실')

ax1.set_title('손실')

ax1.set_xlabel('에포크')

ax1.legend()

ax2.plot(history.history['accuracy'], label='훈련 정확도')

ax2.plot(history.history['val_accuracy'], label='검증 정확도')

ax2.set_title('정확도')

ax2.set_xlabel('에포크')

ax2.legend()

plt.tight_layout()

plt.savefig('mnist_training.png')

plt.show()

4. Keras Functional API

Functional API는 더 복잡한 모델 아키텍처를 구성할 수 있습니다. 다중 입력/출력, 잔차 연결, 공유 레이어 등이 가능합니다.

기본 Functional API 사용법

from tensorflow import keras

from tensorflow.keras import layers

입력 정의

inputs = keras.Input(shape=(784,), name='input_layer')

레이어 연결 (함수처럼 호출)

x = layers.Dense(256, activation='relu', name='dense_1')(inputs)

x = layers.BatchNormalization(name='bn_1')(x)

x = layers.Dropout(0.3, name='dropout_1')(x)

x = layers.Dense(128, activation='relu', name='dense_2')(x)

x = layers.BatchNormalization(name='bn_2')(x)

x = layers.Dropout(0.3, name='dropout_2')(x)

outputs = layers.Dense(10, activation='softmax', name='output')(x)

모델 생성

model = keras.Model(inputs=inputs, outputs=outputs, name='mnist_model')

model.summary()

다중 입력/출력 모델

from tensorflow import keras

from tensorflow.keras import layers

다중 입력 모델 예시: 이미지 + 메타데이터 결합

이미지 입력

image_input = keras.Input(shape=(32, 32, 3), name='image')

x1 = layers.Conv2D(32, 3, activation='relu')(image_input)

x1 = layers.GlobalAveragePooling2D()(x1)

x1 = layers.Dense(64, activation='relu')(x1)

메타데이터 입력

meta_input = keras.Input(shape=(10,), name='metadata')

x2 = layers.Dense(32, activation='relu')(meta_input)

결합

combined = layers.concatenate([x1, x2])

combined = layers.Dense(64, activation='relu')(combined)

다중 출력

main_output = layers.Dense(1, activation='sigmoid', name='main_output')(combined)

aux_output = layers.Dense(5, activation='softmax', name='aux_output')(combined)

모델 생성

model = keras.Model(

inputs=[image_input, meta_input],

outputs=[main_output, aux_output]

)

컴파일 (출력별 손실 및 가중치 지정)

model.compile(

optimizer='adam',

loss={

'main_output': 'binary_crossentropy',

'aux_output': 'categorical_crossentropy'

},

loss_weights={

'main_output': 1.0,

'aux_output': 0.2

},

metrics={

'main_output': ['accuracy'],

'aux_output': ['accuracy']

}

)

잔차 연결 (Residual Connection)

from tensorflow import keras

from tensorflow.keras import layers

def residual_block(x, filters, stride=1):

"""ResNet 스타일 잔차 블록"""

shortcut = x

메인 경로

x = layers.Conv2D(filters, 3, strides=stride, padding='same')(x)

x = layers.BatchNormalization()(x)

x = layers.ReLU()(x)

x = layers.Conv2D(filters, 3, padding='same')(x)

x = layers.BatchNormalization()(x)

채널 수나 크기가 다를 경우 shortcut 조정

if stride != 1 or shortcut.shape[-1] != filters:

shortcut = layers.Conv2D(filters, 1, strides=stride)(shortcut)

shortcut = layers.BatchNormalization()(shortcut)

잔차 더하기

x = layers.add([x, shortcut])

x = layers.ReLU()(x)

return x

ResNet 스타일 모델 구성

inputs = keras.Input(shape=(32, 32, 3))

x = layers.Conv2D(64, 7, strides=2, padding='same')(inputs)

x = layers.BatchNormalization()(x)

x = layers.ReLU()(x)

x = layers.MaxPooling2D(3, strides=2, padding='same')(x)

x = residual_block(x, 64)

x = residual_block(x, 64)

x = residual_block(x, 128, stride=2)

x = residual_block(x, 128)

x = residual_block(x, 256, stride=2)

x = residual_block(x, 256)

x = layers.GlobalAveragePooling2D()(x)

x = layers.Dense(256, activation='relu')(x)

outputs = layers.Dense(10, activation='softmax')(x)

model = keras.Model(inputs, outputs, name='mini_resnet')

model.summary()

공유 레이어 (Siamese Network)

from tensorflow import keras

from tensorflow.keras import layers

공유 인코더 정의

shared_encoder = keras.Sequential([

layers.Dense(128, activation='relu'),

layers.Dense(64, activation='relu'),

layers.Dense(32)

], name='shared_encoder')

두 개의 입력

input_a = keras.Input(shape=(100,), name='input_a')

input_b = keras.Input(shape=(100,), name='input_b')

같은 인코더로 처리

encoded_a = shared_encoder(input_a)

encoded_b = shared_encoder(input_b)

거리 계산 (Cosine Similarity)

similarity = layers.Dot(axes=1, normalize=True)([encoded_a, encoded_b])

output = layers.Dense(1, activation='sigmoid')(similarity)

siamese_model = keras.Model(inputs=[input_a, input_b], outputs=output)

siamese_model.summary()

5. Keras Subclassing API

Subclassing API는 가장 유연한 방법으로, PyTorch와 유사한 방식입니다. 커스텀 레이어와 모델을 Python 클래스로 정의합니다.

커스텀 레이어

from tensorflow import keras

class MyDenseLayer(keras.layers.Layer):

def __init__(self, units, activation=None, **kwargs):

super().__init__(**kwargs)

self.units = units

self.activation = keras.activations.get(activation)

def build(self, input_shape):

가중치 초기화 (build는 첫 번째 호출 시 한 번만 실행)

self.w = self.add_weight(

name='kernel',

shape=(input_shape[-1], self.units),

initializer='glorot_uniform',

trainable=True

)

self.b = self.add_weight(

name='bias',

shape=(self.units,),

initializer='zeros',

trainable=True

)

super().build(input_shape)

def call(self, inputs, training=False):

output = tf.matmul(inputs, self.w) + self.b

if self.activation is not None:

output = self.activation(output)

return output

def get_config(self):

config = super().get_config()

config.update({'units': self.units, 'activation': keras.activations.serialize(self.activation)})

return config

사용

layer = MyDenseLayer(64, activation='relu')

x = tf.random.normal([32, 128])

y = layer(x)

print(y.shape) # (32, 64)

class MultiHeadSelfAttention(keras.layers.Layer):

"""간단한 멀티헤드 셀프어텐션 레이어"""

def __init__(self, embed_dim, num_heads, **kwargs):

super().__init__(**kwargs)

self.embed_dim = embed_dim

self.num_heads = num_heads

self.head_dim = embed_dim // num_heads

assert self.head_dim * num_heads == embed_dim

self.wq = keras.layers.Dense(embed_dim)

self.wk = keras.layers.Dense(embed_dim)

self.wv = keras.layers.Dense(embed_dim)

self.wo = keras.layers.Dense(embed_dim)

def split_heads(self, x, batch_size):

x = tf.reshape(x, (batch_size, -1, self.num_heads, self.head_dim))

return tf.transpose(x, perm=[0, 2, 1, 3])

def call(self, x, training=False):

batch_size = tf.shape(x)[0]

seq_len = tf.shape(x)[1]

q = self.split_heads(self.wq(x), batch_size)

k = self.split_heads(self.wk(x), batch_size)

v = self.split_heads(self.wv(x), batch_size)

스케일드 닷 프로덕트 어텐션

scale = tf.cast(self.head_dim, tf.float32) ** 0.5

scores = tf.matmul(q, k, transpose_b=True) / scale

weights = tf.nn.softmax(scores, axis=-1)

context = tf.matmul(weights, v)

헤드 결합

context = tf.transpose(context, perm=[0, 2, 1, 3])

context = tf.reshape(context, (batch_size, seq_len, self.embed_dim))

return self.wo(context)

커스텀 모델 (Model 서브클래싱)

from tensorflow import keras

class ResidualBlock(keras.layers.Layer):

def __init__(self, filters, **kwargs):

super().__init__(**kwargs)

self.conv1 = keras.layers.Conv2D(filters, 3, padding='same')

self.conv2 = keras.layers.Conv2D(filters, 3, padding='same')

self.bn1 = keras.layers.BatchNormalization()

self.bn2 = keras.layers.BatchNormalization()

self.relu = keras.layers.ReLU()

def call(self, inputs, training=False):

x = self.conv1(inputs)

x = self.bn1(x, training=training)

x = self.relu(x)

x = self.conv2(x)

x = self.bn2(x, training=training)

x = x + inputs # 잔차 연결

return self.relu(x)

class CustomCNN(keras.Model):

def __init__(self, num_classes=10, **kwargs):

super().__init__(**kwargs)

self.conv_stem = keras.layers.Conv2D(32, 3, padding='same', activation='relu')

self.res_block1 = ResidualBlock(32)

self.res_block2 = ResidualBlock(32)

self.pool = keras.layers.MaxPooling2D(2)

self.conv_expand = keras.layers.Conv2D(64, 3, padding='same', activation='relu')

self.res_block3 = ResidualBlock(64)

self.gap = keras.layers.GlobalAveragePooling2D()

self.dropout = keras.layers.Dropout(0.5)

self.fc = keras.layers.Dense(num_classes, activation='softmax')

def call(self, inputs, training=False):

x = self.conv_stem(inputs)

x = self.res_block1(x, training=training)

x = self.res_block2(x, training=training)

x = self.pool(x)

x = self.conv_expand(x)

x = self.res_block3(x, training=training)

x = self.gap(x)

x = self.dropout(x, training=training)

return self.fc(x)

사용

model = CustomCNN(num_classes=10)

model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

더미 데이터로 테스트

dummy_input = tf.random.normal([4, 32, 32, 3])

output = model(dummy_input, training=False)

print(output.shape) # (4, 10)

model.summary()

6. CNN 구현 (CIFAR-10)

기본 CNN 모델

from tensorflow import keras

from tensorflow.keras import layers

CIFAR-10 데이터 로딩

(x_train, y_train), (x_test, y_test) = keras.datasets.cifar10.load_data()

x_train = x_train.astype('float32') / 255.0

x_test = x_test.astype('float32') / 255.0

class_names = ['비행기', '자동차', '새', '고양이', '사슴',

'개', '개구리', '말', '배', '트럭']

print(f"훈련 데이터: {x_train.shape}") # (50000, 32, 32, 3)

print(f"테스트 데이터: {x_test.shape}") # (10000, 32, 32, 3)

CNN 모델 정의

def build_cnn_model():

model = keras.Sequential([

첫 번째 합성곱 블록

layers.Conv2D(32, (3, 3), padding='same', input_shape=(32, 32, 3)),

layers.BatchNormalization(),

layers.Activation('relu'),

layers.Conv2D(32, (3, 3), padding='same'),

layers.BatchNormalization(),

layers.Activation('relu'),

layers.MaxPooling2D((2, 2)),

layers.Dropout(0.25),

두 번째 합성곱 블록

layers.Conv2D(64, (3, 3), padding='same'),

layers.BatchNormalization(),

layers.Activation('relu'),

layers.Conv2D(64, (3, 3), padding='same'),

layers.BatchNormalization(),

layers.Activation('relu'),

layers.MaxPooling2D((2, 2)),

layers.Dropout(0.25),

세 번째 합성곱 블록

layers.Conv2D(128, (3, 3), padding='same'),

layers.BatchNormalization(),

layers.Activation('relu'),

layers.Conv2D(128, (3, 3), padding='same'),

layers.BatchNormalization(),

layers.Activation('relu'),

layers.GlobalAveragePooling2D(),

layers.Dropout(0.5),

완전 연결층

layers.Dense(256, activation='relu'),

layers.BatchNormalization(),

layers.Dropout(0.5),

layers.Dense(10, activation='softmax')

])

return model

model = build_cnn_model()

model.summary()

model.compile(

optimizer=keras.optimizers.Adam(learning_rate=1e-3),

loss='sparse_categorical_crossentropy',

metrics=['accuracy']

)

데이터 증강 (Data Augmentation)

from tensorflow import keras

from tensorflow.keras import layers

Keras 내장 데이터 증강 레이어

data_augmentation = keras.Sequential([

layers.RandomFlip('horizontal'),

layers.RandomRotation(0.1),

layers.RandomZoom(0.1),

layers.RandomTranslation(0.1, 0.1),

layers.RandomContrast(0.1),

], name='data_augmentation')

모델에 증강 레이어 통합

inputs = keras.Input(shape=(32, 32, 3))

x = data_augmentation(inputs) # 훈련 시에만 적용됨

x = layers.Rescaling(1./255)(x) # 정규화

... CNN 블록들 ...

x = layers.Conv2D(32, 3, padding='same', activation='relu')(x)

x = layers.MaxPooling2D()(x)

x = layers.Conv2D(64, 3, padding='same', activation='relu')(x)

x = layers.MaxPooling2D()(x)

x = layers.GlobalAveragePooling2D()(x)

outputs = layers.Dense(10, activation='softmax')(x)

augmented_model = keras.Model(inputs, outputs)

tf.data를 이용한 데이터 증강 파이프라인

def augment_image(image, label):

image = tf.cast(image, tf.float32) / 255.0

image = tf.image.random_flip_left_right(image)

image = tf.image.random_brightness(image, 0.2)

image = tf.image.random_contrast(image, 0.8, 1.2)

image = tf.image.random_saturation(image, 0.8, 1.2)

image = tf.image.pad_to_bounding_box(image, 4, 4, 40, 40)

image = tf.image.random_crop(image, [32, 32, 3])

return image, label

(x_train, y_train), (x_test, y_test) = keras.datasets.cifar10.load_data()

y_train = y_train.flatten()

y_test = y_test.flatten()

train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))

train_dataset = train_dataset.map(augment_image, num_parallel_calls=tf.data.AUTOTUNE)

train_dataset = train_dataset.shuffle(10000).batch(128).prefetch(tf.data.AUTOTUNE)

전이학습 (Transfer Learning)

from tensorflow import keras

from tensorflow.keras import layers

from tensorflow.keras.applications import EfficientNetB0

EfficientNetB0으로 전이학습

def build_transfer_model(num_classes=10):

사전 훈련된 베이스 모델 (분류기 제외)

base_model = EfficientNetB0(

include_top=False,

weights='imagenet',

input_shape=(224, 224, 3)

)

초기에는 베이스 모델 동결

base_model.trainable = False

inputs = keras.Input(shape=(224, 224, 3))

EfficientNet은 내부에 전처리 포함

x = base_model(inputs, training=False)

x = layers.GlobalAveragePooling2D()(x)

x = layers.Dense(256, activation='relu')(x)

x = layers.Dropout(0.5)(x)

outputs = layers.Dense(num_classes, activation='softmax')(x)

model = keras.Model(inputs, outputs)

return model, base_model

model, base_model = build_transfer_model(num_classes=10)

model.compile(

optimizer=keras.optimizers.Adam(1e-3),

loss='sparse_categorical_crossentropy',

metrics=['accuracy']

)

1단계: 동결된 상태로 분류기 훈련

model.fit(train_dataset, epochs=10, ...)

Fine-tuning: 베이스 모델의 일부 레이어 해동

def fine_tune(model, base_model, fine_tune_at=100):

base_model.trainable = True

fine_tune_at 이전 레이어는 계속 동결

for layer in base_model.layers[:fine_tune_at]:

layer.trainable = False

model.compile(

optimizer=keras.optimizers.Adam(1e-5), # 매우 낮은 학습률

loss='sparse_categorical_crossentropy',

metrics=['accuracy']

)

return model

fine_tune(model, base_model, fine_tune_at=100)

model.fit(train_dataset, epochs=20, ...)

7. RNN/LSTM 구현

시계열 예측 (LSTM)

from tensorflow import keras

from tensorflow.keras import layers

시계열 데이터 생성

def generate_time_series(n_samples, n_steps):

t = np.linspace(0, 4 * np.pi, n_steps + 1)

series = np.sin(t) + 0.1 * np.random.randn(n_samples, n_steps + 1)

X = series[:, :-1].reshape(-1, n_steps, 1)

y = series[:, 1:].reshape(-1, n_steps, 1)

return X, y

X_train, y_train = generate_time_series(10000, 50)

X_val, y_val = generate_time_series(1000, 50)

X_test, y_test = generate_time_series(1000, 50)

LSTM 모델

def build_lstm_model(n_steps=50):

model = keras.Sequential([

layers.LSTM(64, return_sequences=True, input_shape=(n_steps, 1)),

layers.Dropout(0.2),

layers.LSTM(64, return_sequences=True),

layers.Dropout(0.2),

layers.TimeDistributed(layers.Dense(1))

])

model.compile(

optimizer='adam',

loss='mse',

metrics=['mae']

)

return model

model = build_lstm_model()

model.summary()

history = model.fit(

X_train, y_train,

epochs=20,

batch_size=128,

validation_data=(X_val, y_val)

)

텍스트 생성 (문자 수준 RNN)

from tensorflow import keras

from tensorflow.keras import layers

예시 텍스트 (실제로는 더 큰 텍스트 사용)

text = """TensorFlow는 구글이 만든 딥러닝 프레임워크입니다.

Keras는 TensorFlow 위에서 동작하는 고수준 API입니다.

두 라이브러리를 함께 사용하면 강력한 딥러닝 모델을 빠르게 구축할 수 있습니다."""

문자 집합 생성

chars = sorted(set(text))

char2idx = {c: i for i, c in enumerate(chars)}

idx2char = np.array(chars)

vocab_size = len(chars)

텍스트를 인덱스로 변환

text_as_int = np.array([char2idx[c] for c in text])

시퀀스 생성

seq_length = 50

sequences = tf.data.Dataset.from_tensor_slices(text_as_int)

sequences = sequences.batch(seq_length + 1, drop_remainder=True)

def split_input_target(chunk):

return chunk[:-1], chunk[1:]

dataset = sequences.map(split_input_target)

dataset = dataset.shuffle(100).batch(32, drop_remainder=True)

문자 수준 RNN 모델

def build_char_rnn(vocab_size, embed_dim=64, rnn_units=256):

model = keras.Sequential([

layers.Embedding(vocab_size, embed_dim),

layers.GRU(rnn_units, return_sequences=True, stateful=False),

layers.GRU(rnn_units, return_sequences=True),

layers.Dense(vocab_size)

])

return model

model = build_char_rnn(vocab_size)

model.compile(

optimizer='adam',

loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),

metrics=['accuracy']

)

텍스트 생성 함수

def generate_text(model, start_string, num_generate=100, temperature=1.0):

input_eval = [char2idx[s] for s in start_string]

input_eval = tf.expand_dims(input_eval, 0)

text_generated = []

for _ in range(num_generate):

predictions = model(input_eval)

predictions = tf.squeeze(predictions, 0) / temperature

predicted_id = tf.random.categorical(predictions, num_samples=1)[-1, 0].numpy()

input_eval = tf.expand_dims([predicted_id], 0)

text_generated.append(idx2char[predicted_id])

return start_string + ''.join(text_generated)

Bidirectional LSTM

from tensorflow import keras

from tensorflow.keras import layers

양방향 LSTM (텍스트 분류)

def build_bidirectional_model(vocab_size, max_len=100, embed_dim=64):

model = keras.Sequential([

layers.Embedding(vocab_size, embed_dim, input_length=max_len),

layers.Bidirectional(layers.LSTM(64, return_sequences=True)),

layers.Bidirectional(layers.LSTM(32)),

layers.Dense(64, activation='relu'),

layers.Dropout(0.5),

layers.Dense(1, activation='sigmoid')

])

model.compile(

optimizer='adam',

loss='binary_crossentropy',

metrics=['accuracy']

)

return model

model = build_bidirectional_model(vocab_size=10000, max_len=100)

model.summary()

8. Transformer with Keras

Multi-head Attention과 Transformer Encoder

from tensorflow import keras

from tensorflow.keras import layers

class TransformerBlock(keras.layers.Layer):

def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1, **kwargs):

super().__init__(**kwargs)

self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)

self.ffn = keras.Sequential([

layers.Dense(ff_dim, activation='relu'),

layers.Dense(embed_dim)

])

self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)

self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)

self.dropout1 = layers.Dropout(rate)

self.dropout2 = layers.Dropout(rate)

def call(self, inputs, training=False):

attn_output = self.att(inputs, inputs)

attn_output = self.dropout1(attn_output, training=training)

out1 = self.layernorm1(inputs + attn_output)

ffn_output = self.ffn(out1)

ffn_output = self.dropout2(ffn_output, training=training)

return self.layernorm2(out1 + ffn_output)

class TokenAndPositionEmbedding(keras.layers.Layer):

def __init__(self, maxlen, vocab_size, embed_dim, **kwargs):

super().__init__(**kwargs)

self.token_emb = layers.Embedding(vocab_size, embed_dim)

self.pos_emb = layers.Embedding(maxlen, embed_dim)

def call(self, x):

maxlen = tf.shape(x)[-1]

positions = tf.range(start=0, limit=maxlen, delta=1)

positions = self.pos_emb(positions)

x = self.token_emb(x)

return x + positions

텍스트 분류 Transformer 모델

def build_transformer_classifier(

vocab_size=20000,

maxlen=200,

embed_dim=32,

num_heads=2,

ff_dim=32,

num_classes=2

):

inputs = layers.Input(shape=(maxlen,))

embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)

x = embedding_layer(inputs)

transformer_block = TransformerBlock(embed_dim, num_heads, ff_dim)

x = transformer_block(x)

x = layers.GlobalAveragePooling1D()(x)

x = layers.Dropout(0.1)(x)

x = layers.Dense(20, activation='relu')(x)

x = layers.Dropout(0.1)(x)

outputs = layers.Dense(num_classes, activation='softmax')(x)

model = keras.Model(inputs=inputs, outputs=outputs)

return model

model = build_transformer_classifier()

model.compile(

optimizer='adam',

loss='sparse_categorical_crossentropy',

metrics=['accuracy']

)

model.summary()

IMDB 감성 분류 예시

vocab_size = 20000

maxlen = 200

(x_train, y_train), (x_val, y_val) = keras.datasets.imdb.load_data(num_words=vocab_size)

x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=maxlen)

x_val = keras.preprocessing.sequence.pad_sequences(x_val, maxlen=maxlen)

model.fit(x_train, y_train, batch_size=32, epochs=5, validation_data=(x_val, y_val))

9. 데이터 파이프라인 (tf.data)

기본 tf.data.Dataset 사용법

텐서에서 Dataset 생성

X = np.random.randn(1000, 10)

y = np.random.randint(0, 2, 1000)

dataset = tf.data.Dataset.from_tensor_slices((X, y))

print(dataset) # TensorSliceDataset

기본 변환

dataset = (dataset

.shuffle(buffer_size=1000)

.batch(32)

.prefetch(tf.data.AUTOTUNE)

)

데이터 확인

for batch_X, batch_y in dataset.take(1):

print(f"배치 X 형태: {batch_X.shape}") # (32, 10)

print(f"배치 y 형태: {batch_y.shape}") # (32,)

map 변환

def preprocess(x, y):

x = tf.cast(x, tf.float32)

y = tf.cast(y, tf.int32)

x = (x - tf.reduce_mean(x)) / tf.math.reduce_std(x)

return x, y

dataset = tf.data.Dataset.from_tensor_slices((X, y))

dataset = dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)

filter 변환

positive_dataset = dataset.filter(lambda x, y: y == 1)

범위로 생성

range_dataset = tf.data.Dataset.range(100)

zip으로 결합

features_ds = tf.data.Dataset.from_tensor_slices(X)

labels_ds = tf.data.Dataset.from_tensor_slices(y)

combined_ds = tf.data.Dataset.zip((features_ds, labels_ds))

이미지 데이터 로딩 파이프라인

디렉토리 구조: data/train/class_name/image.jpg

def load_and_preprocess_image(path, label, image_size=(224, 224)):

image = tf.io.read_file(path)

image = tf.image.decode_jpeg(image, channels=3)

image = tf.image.resize(image, image_size)

image = tf.cast(image, tf.float32) / 255.0

return image, label

def create_image_dataset(data_dir, batch_size=32, image_size=(224, 224)):

class_names = sorted(os.listdir(data_dir))

class_map = {name: idx for idx, name in enumerate(class_names)}

file_paths = []

labels = []

for class_name in class_names:

class_dir = os.path.join(data_dir, class_name)

for fname in os.listdir(class_dir):

if fname.endswith(('.jpg', '.jpeg', '.png')):

file_paths.append(os.path.join(class_dir, fname))

labels.append(class_map[class_name])

path_ds = tf.data.Dataset.from_tensor_slices(file_paths)

label_ds = tf.data.Dataset.from_tensor_slices(labels)

combined = tf.data.Dataset.zip((path_ds, label_ds))

dataset = combined.map(

lambda p, l: load_and_preprocess_image(p, l, image_size),

num_parallel_calls=tf.data.AUTOTUNE

)

dataset = dataset.shuffle(1000).batch(batch_size).prefetch(tf.data.AUTOTUNE)

return dataset, class_names

keras 내장 image_dataset_from_directory 사용 (더 편리)

train_ds = keras.utils.image_dataset_from_directory(

'data/train',

image_size=(224, 224),

batch_size=32

)

TFRecord 형식

TFRecord 파일 작성

def bytes_feature(value):

if isinstance(value, type(tf.constant(0))):

value = value.numpy()

return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def int64_feature(value):

return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

def float_feature(value):

return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def image_to_tfrecord(image_path, label, writer):

image_string = open(image_path, 'rb').read()

feature = {

'image': bytes_feature(image_string),

'label': int64_feature(label),

}

example = tf.train.Example(features=tf.train.Features(feature=feature))

writer.write(example.SerializeToString())

TFRecord 파일 읽기

def parse_tfrecord(serialized_example):

feature_description = {

'image': tf.io.FixedLenFeature([], tf.string),

'label': tf.io.FixedLenFeature([], tf.int64),

}

example = tf.io.parse_single_example(serialized_example, feature_description)

image = tf.image.decode_jpeg(example['image'], channels=3)

image = tf.image.resize(image, [224, 224])

image = tf.cast(image, tf.float32) / 255.0

label = example['label']

return image, label

TFRecord Dataset 생성

dataset = tf.data.TFRecordDataset(['data.tfrecord'])

dataset = dataset.map(parse_tfrecord, num_parallel_calls=tf.data.AUTOTUNE)

dataset = dataset.shuffle(1000).batch(32).prefetch(tf.data.AUTOTUNE)

10. 훈련 고급 기법

콜백 (Callbacks)

from tensorflow import keras

ModelCheckpoint: 최적 모델 저장

checkpoint_cb = keras.callbacks.ModelCheckpoint(

filepath='best_model.keras',

monitor='val_accuracy',

mode='max',

save_best_only=True,

verbose=1

)

EarlyStopping: 과적합 방지

early_stopping_cb = keras.callbacks.EarlyStopping(

monitor='val_loss',

patience=10,

restore_best_weights=True,

verbose=1

)

ReduceLROnPlateau: 학습률 자동 감소

reduce_lr_cb = keras.callbacks.ReduceLROnPlateau(

monitor='val_loss',

factor=0.5,

patience=5,

min_lr=1e-7,

verbose=1

)

TensorBoard: 시각화

tensorboard_cb = keras.callbacks.TensorBoard(

log_dir='logs/',

histogram_freq=1,

write_graph=True,

write_images=True,

update_freq='epoch'

)

LearningRateScheduler: 학습률 스케줄링

def cosine_decay_schedule(epoch, lr):

initial_lr = 1e-3

total_epochs = 100

return initial_lr * (1 + math.cos(math.pi * epoch / total_epochs)) / 2

lr_scheduler_cb = keras.callbacks.LearningRateScheduler(

cosine_decay_schedule, verbose=1

)

CSV로 로그 저장

csv_logger_cb = keras.callbacks.CSVLogger('training_log.csv')

커스텀 콜백

class ConfusionMatrixCallback(keras.callbacks.Callback):

def __init__(self, validation_data, class_names):

super().__init__()

self.X_val, self.y_val = validation_data

self.class_names = class_names

def on_epoch_end(self, epoch, logs=None):

y_pred = tf.argmax(self.model.predict(self.X_val), axis=1)

cm = tf.math.confusion_matrix(self.y_val, y_pred)

if epoch % 10 == 0:

print(f"\n에포크 {epoch} 혼동 행렬:\n{cm.numpy()}")

모든 콜백을 하나의 리스트로

callbacks = [

checkpoint_cb,

early_stopping_cb,

reduce_lr_cb,

tensorboard_cb,

csv_logger_cb

]

사용 예시

history = model.fit(

X_train, y_train,

epochs=100,

validation_data=(X_val, y_val),

callbacks=callbacks

)

커스텀 훈련 루프 (GradientTape)

from tensorflow import keras

간단한 분류 모델

model = keras.Sequential([

keras.layers.Dense(64, activation='relu', input_shape=(784,)),

keras.layers.Dense(64, activation='relu'),

keras.layers.Dense(10)

])

손실 함수와 옵티마이저

loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

optimizer = keras.optimizers.Adam(learning_rate=1e-3)

메트릭

train_loss = keras.metrics.Mean(name='train_loss')

train_accuracy = keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

val_loss = keras.metrics.Mean(name='val_loss')

val_accuracy = keras.metrics.SparseCategoricalAccuracy(name='val_accuracy')

단일 훈련 스텝

@tf.function # JIT 컴파일로 속도 향상

def train_step(images, labels):

with tf.GradientTape() as tape:

predictions = model(images, training=True)

loss = loss_fn(labels, predictions)

L2 정규화 추가

l2_loss = tf.add_n([tf.nn.l2_loss(v) for v in model.trainable_variables

if 'bias' not in v.name])

total_loss = loss + 1e-4 * l2_loss

gradients = tape.gradient(total_loss, model.trainable_variables)

그래디언트 클리핑

gradients, _ = tf.clip_by_global_norm(gradients, 1.0)

optimizer.apply_gradients(zip(gradients, model.trainable_variables))

train_loss.update_state(loss)

train_accuracy.update_state(labels, predictions)

return loss

@tf.function

def val_step(images, labels):

predictions = model(images, training=False)

loss = loss_fn(labels, predictions)

val_loss.update_state(loss)

val_accuracy.update_state(labels, predictions)

전체 훈련 루프

def train(train_ds, val_ds, epochs=10):

for epoch in range(epochs):

start_time = time.time()

메트릭 초기화

train_loss.reset_states()

train_accuracy.reset_states()

val_loss.reset_states()

val_accuracy.reset_states()

훈련

for step, (images, labels) in enumerate(train_ds):

train_step(images, labels)

if step % 100 == 0:

print(f"스텝 {step}: 손실={train_loss.result():.4f}")

검증

for images, labels in val_ds:

val_step(images, labels)

elapsed = time.time() - start_time

print(f"에포크 {epoch+1}/{epochs} ({elapsed:.1f}s) - "

f"손실: {train_loss.result():.4f}, "

f"정확도: {train_accuracy.result():.4f}, "

f"검증 손실: {val_loss.result():.4f}, "

f"검증 정확도: {val_accuracy.result():.4f}")

분산 학습 (tf.distribute.Strategy)

from tensorflow import keras

다중 GPU 전략

strategy = tf.distribute.MirroredStrategy()

print(f"사용 가능한 디바이스 수: {strategy.num_replicas_in_sync}")

with strategy.scope():

모델 정의, 컴파일은 strategy.scope() 안에서

model = keras.Sequential([

keras.layers.Dense(64, activation='relu', input_shape=(784,)),

keras.layers.Dense(64, activation='relu'),

keras.layers.Dense(10, activation='softmax')

])

model.compile(

optimizer='adam',

loss='sparse_categorical_crossentropy',

metrics=['accuracy']

)

배치 크기는 GPU 수에 비례하여 증가

BATCH_SIZE_PER_REPLICA = 64

BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

TPU 사용

resolver = tf.distribute.cluster_resolver.TPUClusterResolver()

tf.config.experimental_connect_to_cluster(resolver)

tf.tpu.experimental.initialize_tpu_system(resolver)

tpu_strategy = tf.distribute.TPUStrategy(resolver)

혼합 정밀도 훈련 (Mixed Precision)

from tensorflow.keras import mixed_precision

policy = mixed_precision.Policy('mixed_float16')

mixed_precision.set_global_policy(policy)

with strategy.scope():

inputs = keras.Input(shape=(784,))

x = keras.layers.Dense(64, activation='relu')(inputs)

마지막 출력은 float32로

outputs = keras.layers.Dense(10, activation='softmax', dtype='float32')(x)

model_mp = keras.Model(inputs, outputs)

opt = keras.optimizers.Adam(1e-3)

opt = mixed_precision.LossScaleOptimizer(opt)

model_mp.compile(

optimizer=opt,

loss='sparse_categorical_crossentropy',

metrics=['accuracy']

)

11. TensorBoard 시각화

기본 TensorBoard 사용

from tensorflow import keras

로그 디렉토리

log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")

콜백 설정

tensorboard_callback = keras.callbacks.TensorBoard(

log_dir=log_dir,

histogram_freq=1, # 가중치 히스토그램 주기

write_graph=True, # 계산 그래프 기록

write_images=True, # 가중치 이미지 기록

update_freq='epoch', # 업데이트 주기

profile_batch=2 # 프로파일링 배치

)

커스텀 스칼라 기록

file_writer = tf.summary.create_file_writer(log_dir + '/custom_scalars')

def log_custom_metrics(epoch, logs):

with file_writer.as_default():

tf.summary.scalar('learning_rate',

data=keras.backend.get_value(model.optimizer.lr),

step=epoch)

lr_callback = keras.callbacks.LambdaCallback(on_epoch_end=log_custom_metrics)

이미지 기록

def log_images(epoch, logs):

테스트 이미지 가져오기

(_, _), (x_test, y_test) = keras.datasets.mnist.load_data()

x_test = x_test[:10].reshape(-1, 28, 28, 1).astype('float32') / 255.0

with file_writer.as_default():

tf.summary.image("테스트 샘플", x_test, step=epoch, max_outputs=10)

image_callback = keras.callbacks.LambdaCallback(on_epoch_end=log_images)

TensorBoard 시작 명령어

tensorboard --logdir logs/fit

임베딩 시각화

from tensorflow import keras

from tensorboard.plugins import projector

임베딩 레이어 학습 후 시각화

(x_train, y_train), _ = keras.datasets.mnist.load_data()

x_train = x_train.astype('float32').reshape(-1, 784) / 255.0

임베딩 모델

embedding_model = keras.Sequential([

keras.layers.Dense(64, activation='relu', input_shape=(784,)),

keras.layers.Dense(32, name='embedding') # 임베딩 레이어

])

임베딩 추출

embeddings = embedding_model.predict(x_train[:1000])

임베딩 파일 저장

log_dir = 'logs/embedding'

os.makedirs(log_dir, exist_ok=True)

np.savetxt(os.path.join(log_dir, 'vectors.tsv'), embeddings, delimiter='\t')

메타데이터 (레이블)

with open(os.path.join(log_dir, 'metadata.tsv'), 'w') as f:

for label in y_train[:1000]:

f.write(f"{label}\n")

Projector 설정

config = projector.ProjectorConfig()

embedding_config = config.embeddings.add()

embedding_config.tensor_name = "embedding/.ATTRIBUTES/VARIABLE_VALUE"

embedding_config.tensor_path = 'vectors.tsv'

embedding_config.metadata_path = 'metadata.tsv'

projector.visualize_embeddings(log_dir, config)

12. 모델 저장과 변환

SavedModel 형식

from tensorflow import keras

모델 저장

model.save('saved_model/my_model')

로드

loaded_model = keras.models.load_model('saved_model/my_model')

HDF5 형식 (레거시)

model.save('my_model.h5')

loaded_h5 = keras.models.load_model('my_model.h5')

가중치만 저장/로드

model.save_weights('model_weights.h5')

model.load_weights('model_weights.h5')

Keras 네이티브 형식 (권장)

model.save('my_model.keras')

loaded_keras = keras.models.load_model('my_model.keras')

서브클래싱 모델의 경우 get_config 구현 필요

class MyModel(keras.Model):

def __init__(self, units):

super().__init__()

self.units = units

self.dense = keras.layers.Dense(units)

def call(self, inputs):

return self.dense(inputs)

def get_config(self):

return {'units': self.units}

@classmethod

def from_config(cls, config):

return cls(**config)

TensorFlow Lite 변환 (모바일/엣지)

from tensorflow import keras

기본 TFLite 변환

converter = tf.lite.TFLiteConverter.from_saved_model('saved_model/my_model')

tflite_model = converter.convert()

with open('model.tflite', 'wb') as f:

f.write(tflite_model)

최적화 옵션들

동적 범위 양자화

converter = tf.lite.TFLiteConverter.from_saved_model('saved_model/my_model')

converter.optimizations = [tf.lite.Optimize.DEFAULT]

tflite_quantized = converter.convert()

전체 정수 양자화 (INT8)

def representative_dataset():

for _ in range(100):

data = np.random.random((1, 784)).astype(np.float32)

yield [data]

converter = tf.lite.TFLiteConverter.from_saved_model('saved_model/my_model')

converter.optimizations = [tf.lite.Optimize.DEFAULT]

converter.representative_dataset = representative_dataset

converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]

converter.inference_input_type = tf.int8

converter.inference_output_type = tf.int8

tflite_int8 = converter.convert()

TFLite 모델 실행

interpreter = tf.lite.Interpreter(model_content=tflite_quantized)

interpreter.allocate_tensors()

input_details = interpreter.get_input_details()

output_details = interpreter.get_output_details()

print("입력:", input_details[0]['shape'])

print("출력:", output_details[0]['shape'])

추론

input_data = np.random.random((1, 784)).astype(np.float32)

interpreter.set_tensor(input_details[0]['index'], input_data)

interpreter.invoke()

output_data = interpreter.get_tensor(output_details[0]['index'])

print("TFLite 출력:", output_data.shape)

TensorFlow.js 변환

tfjs 변환 도구 설치

pip install tensorflowjs

SavedModel에서 TFJS로 변환

tensorflowjs_converter \

--input_format=tf_saved_model \

--output_format=tfjs_graph_model \

--signature_name=serving_default \

saved_model/my_model \

tfjs_model/

13. TF-Serving으로 프로덕션 배포

TensorFlow Serving 설치 및 기본 사용

Docker로 TF Serving 실행 (가장 쉬운 방법)

docker pull tensorflow/serving

모델 서빙

docker run -t --rm \

-p 8501:8501 \

-v "/path/to/saved_model:/models/my_model" \

-e MODEL_NAME=my_model \

tensorflow/serving

GPU 지원

docker run --gpus all -t --rm \

-p 8501:8501 \

-v "/path/to/saved_model:/models/my_model" \

-e MODEL_NAME=my_model \

tensorflow/serving:latest-gpu

REST API로 추론 요청

REST API 요청

url = 'http://localhost:8501/v1/models/my_model:predict'

입력 데이터 준비

data = np.random.random((1, 784)).astype(float)

payload = {

"instances": data.tolist()

}

response = requests.post(url, json=payload)

result = json.loads(response.text)

print("예측 결과:", result['predictions'])

모델 정보 확인

info_url = 'http://localhost:8501/v1/models/my_model'

info_response = requests.get(info_url)

print("모델 정보:", json.loads(info_response.text))

gRPC 클라이언트

from tensorflow_serving.apis import predict_pb2

from tensorflow_serving.apis import prediction_service_pb2_grpc

gRPC 채널 생성

channel = grpc.insecure_channel('localhost:8500')

stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)

요청 생성

request = predict_pb2.PredictRequest()

request.model_spec.name = 'my_model'

request.model_spec.signature_name = 'serving_default'

입력 텐서 설정

input_data = np.random.random((1, 784)).astype(np.float32)

request.inputs['input_layer'].CopyFrom(

tf.make_tensor_proto(input_data, shape=input_data.shape)

)

추론 실행

response = stub.Predict(request, 10.0) # 10초 타임아웃

output = tf.make_ndarray(response.outputs['output'])

print("gRPC 예측 결과:", output)

버전 관리와 Canary 배포

model.config 파일

model_config_list {

config {

name: 'my_model'

base_path: '/models/my_model/'

model_platform: 'tensorflow'

model_version_policy {

specific {

versions: 1

versions: 2

}

}

version_labels {

key: 'stable'

value: 1

}

version_labels {

key: 'canary'

value: 2

}

}

}

설정 파일로 서빙

docker run -t --rm \

-p 8501:8501 -p 8500:8500 \

-v "/path/to/models:/models" \

-v "/path/to/model.config:/models/model.config" \

tensorflow/serving \

--model_config_file=/models/model.config

특정 버전으로 요청

url = 'http://localhost:8501/v1/models/my_model/versions/1:predict'

또는 레이블로 요청

url_label = 'http://localhost:8501/v1/models/my_model/labels/stable:predict'

14. TensorFlow Extended (TFX)

TFX는 TensorFlow 기반의 프로덕션 머신러닝 파이프라인 플랫폼입니다.

ML 파이프라인 개요

from tfx.components import (

CsvExampleGen,

StatisticsGen,

SchemaGen,

ExampleValidator,

Transform,

Trainer,

Evaluator,

Pusher

)

from tfx.proto import pusher_pb2, trainer_pb2

from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

인터랙티브 컨텍스트 생성 (노트북에서 사용)

context = InteractiveContext()

1. ExampleGen: 데이터 수집

example_gen = CsvExampleGen(input_base='data/')

context.run(example_gen)

2. StatisticsGen: 데이터 통계

statistics_gen = StatisticsGen(

examples=example_gen.outputs['examples']

)

context.run(statistics_gen)

3. SchemaGen: 스키마 생성

schema_gen = SchemaGen(

statistics=statistics_gen.outputs['statistics'],

infer_feature_shape=True

)

context.run(schema_gen)

4. ExampleValidator: 데이터 검증

example_validator = ExampleValidator(

statistics=statistics_gen.outputs['statistics'],

schema=schema_gen.outputs['schema']

)

context.run(example_validator)

5. Transform: 특징 엔지니어링

transform.py 파일에 preprocessing_fn 정의 필요

6. Trainer: 모델 훈련

trainer = Trainer(

module_file='trainer_module.py',

examples=example_gen.outputs['examples'],

schema=schema_gen.outputs['schema'],

train_args=trainer_pb2.TrainArgs(num_steps=1000),

eval_args=trainer_pb2.EvalArgs(num_steps=500)

)

7. Pusher: 모델 배포

pusher = Pusher(

model=trainer.outputs['model'],

push_destination=pusher_pb2.PushDestination(

filesystem=pusher_pb2.PushDestination.Filesystem(

base_directory='serving_model/'

)

)

)

Transform 컴포넌트 예시

transform.py

FEATURE_KEYS = ['feature1', 'feature2', 'feature3']

LABEL_KEY = 'label'

def preprocessing_fn(inputs):

"""특징 전처리 함수"""

outputs = {}

for key in FEATURE_KEYS:

정규화

outputs[key] = tft.scale_to_z_score(inputs[key])

레이블 인코딩

outputs[LABEL_KEY] = tf.cast(inputs[LABEL_KEY], tf.int64)

return outputs

마무리

이 가이드에서는 TensorFlow와 Keras의 핵심 개념부터 프로덕션 배포까지 광범위하게 다루었습니다.

핵심 정리

- **Eager Execution**: TF 2.x의 기본 실행 모드, `@tf.function`으로 그래프 최적화

- **Keras 3가지 API**: Sequential(간단), Functional(복잡한 토폴로지), Subclassing(완전한 커스터마이징)

- **tf.data**: 효율적인 데이터 파이프라인을 위한 필수 도구

- **GradientTape**: 커스텀 훈련 루프와 자동 미분

- **배포 옵션**: TF Serving(서버), TFLite(모바일/엣지), TF.js(브라우저)

- **TFX**: 프로덕션 ML 파이프라인의 표준

더 배울 내용

- TensorFlow Probability (확률론적 딥러닝)

- Keras Tuner (하이퍼파라미터 최적화)

- TensorFlow Datasets (표준 데이터셋)

- TensorFlow Hub (사전 훈련된 모델)

- tf-agents (강화학습)

참고 자료

- [TensorFlow 공식 문서](https://www.tensorflow.org/api_docs/python/tf)

- [TensorFlow 튜토리얼](https://www.tensorflow.org/tutorials)

- [Keras API 문서](https://keras.io/api/)

- [TensorFlow Guide](https://www.tensorflow.org/guide/keras)

- [tf.data 가이드](https://www.tensorflow.org/guide/data)

- [TensorBoard 가이드](https://www.tensorflow.org/tensorboard)

- [TFX 가이드](https://www.tensorflow.org/tfx)

- [TFLite 가이드](https://www.tensorflow.org/lite)

- [TensorFlow.js](https://www.tensorflow.org/js)

현재 단락 (1/1153)

TensorFlow는 Google Brain 팀이 2015년에 오픈소스로 공개한 머신러닝/딥러닝 프레임워크입니다. 현재 가장 널리 사용되는 딥러닝 프레임워크 중 하나로, 연구부터 ...

작성 글자: 0원문 글자: 41,949작성 단락: 0/1153