필사 모드: TensorFlow & Keras Complete Guide: Zero to Hero - From Installation to Production Deployment
EnglishIntroduction
TensorFlow is a machine learning and deep learning framework open-sourced by Google Brain in 2015. It is one of the most widely used deep learning frameworks today, supporting the entire process from research to production deployment. Keras, integrated as the official high-level API starting from TensorFlow 2.x, enables intuitive and rapid model development.
This guide covers TensorFlow and Keras comprehensively, from fundamental concepts to deploying models in real production environments, step by step.
1. Introduction to TensorFlow and Installation
TensorFlow vs PyTorch Comparison
Both frameworks have their strengths and weaknesses.
| Feature | TensorFlow/Keras | PyTorch |
| ------------------- | ------------------------- | ----------------------- |
| Creator | Google | Meta (Facebook) |
| Deployment Tools | TF Serving, TFLite, TF.js | TorchServe, TorchScript |
| Production Maturity | Very High | High |
| Research Popularity | High | Very High |
| Learning Curve | Moderate | Low |
| Mobile/Edge | TFLite Excellent | ExecuTorch |
| Ecosystem | TFX, TFHub etc. | HuggingFace Integration |
Installation
Install using pip:
CPU only
pip install tensorflow
GPU support (auto-detects CUDA, TF 2.9+)
pip install tensorflow[and-cuda]
Specific version
pip install tensorflow==2.15.0
conda environment
conda create -n tf_env python=3.10
conda activate tf_env
conda install -c conda-forge tensorflow
For macOS Apple Silicon:
pip install tensorflow-macos
pip install tensorflow-metal # GPU acceleration
Key Changes in TensorFlow 2.x
The most significant change in TensorFlow 2.0 is that Eager Execution is enabled by default. In TF 1.x, you had to define a computation graph first and run it through a Session. In TF 2.x, operations execute immediately like regular Python code.
print(tf.__version__)
Check eager execution
print(tf.executing_eagerly()) # True
Graph execution (TF 1.x compatible style)
@tf.function
def compute(x, y):
return x + y
result = compute(tf.constant(1.0), tf.constant(2.0))
print(result) # tf.Tensor(3.0, shape=(), dtype=float32)
Verifying GPU Setup
List available GPUs
gpus = tf.config.list_physical_devices('GPU')
print("Available GPUs:", gpus)
Enable memory growth (prevents OOM errors)
if gpus:
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
Use only specific GPU
if gpus:
tf.config.set_visible_devices(gpus[0], 'GPU')
Split one physical GPU into multiple logical GPUs
if gpus:
tf.config.set_logical_device_configuration(
gpus[0],
[tf.config.LogicalDeviceConfiguration(memory_limit=2048),
tf.config.LogicalDeviceConfiguration(memory_limit=2048)]
)
Check device where operation runs
with tf.device('/GPU:0'):
a = tf.constant([[1.0, 2.0], [3.0, 4.0]])
b = tf.constant([[5.0, 6.0], [7.0, 8.0]])
c = tf.matmul(a, b)
print(c.device)
2. TensorFlow Tensor Basics
Tensors are the core data structure in TensorFlow. Similar to NumPy arrays but can be accelerated on GPUs and support automatic differentiation.
tf.constant and tf.Variable
Scalar (0-dimensional tensor)
scalar = tf.constant(42)
print(scalar) # tf.Tensor(42, shape=(), dtype=int32)
print(scalar.dtype) # tf.int32
print(scalar.shape) # ()
Vector (1-dimensional tensor)
vector = tf.constant([1.0, 2.0, 3.0])
print(vector) # tf.Tensor([1. 2. 3.], shape=(3,), dtype=float32)
Matrix (2-dimensional tensor)
matrix = tf.constant([[1, 2, 3],
[4, 5, 6]], dtype=tf.float32)
print(matrix.shape) # (2, 3)
3-dimensional tensor
tensor_3d = tf.constant([[[1, 2], [3, 4]],
[[5, 6], [7, 8]]])
print(tensor_3d.shape) # (2, 2, 2)
Special tensors
zeros = tf.zeros([3, 4]) # 3x4 matrix of zeros
ones = tf.ones([2, 3]) # 2x3 matrix of ones
identity = tf.eye(4) # 4x4 identity matrix
random = tf.random.normal([3, 3]) # 3x3 random normal matrix
tf.Variable - used for trainable parameters (mutable)
var = tf.Variable([1.0, 2.0, 3.0])
print(var) # <tf.Variable 'Variable:0' ...>
var.assign([4.0, 5.0, 6.0]) # Update value
var.assign_add([1.0, 1.0, 1.0]) # Add to value
var.assign_sub([0.5, 0.5, 0.5]) # Subtract from value
Tensor Operations
a = tf.constant([[1.0, 2.0], [3.0, 4.0]])
b = tf.constant([[5.0, 6.0], [7.0, 8.0]])
Basic arithmetic (element-wise)
print(a + b) # Addition
print(a - b) # Subtraction
print(a * b) # Multiplication (element-wise)
print(a / b) # Division
print(a ** 2) # Power
Equivalent TF functions
print(tf.add(a, b))
print(tf.subtract(a, b))
print(tf.multiply(a, b))
print(tf.divide(a, b))
Matrix multiplication
print(tf.matmul(a, b)) # or a @ b
print(a @ b)
Math functions
x = tf.constant([1.0, 4.0, 9.0, 16.0])
print(tf.sqrt(x)) # [1, 2, 3, 4]
print(tf.exp(x)) # e^x
print(tf.math.log(x)) # Natural log
Reduction operations
matrix = tf.constant([[1.0, 2.0, 3.0],
[4.0, 5.0, 6.0]])
print(tf.reduce_sum(matrix)) # Total sum: 21
print(tf.reduce_sum(matrix, axis=0)) # Column sum: [5, 7, 9]
print(tf.reduce_sum(matrix, axis=1)) # Row sum: [6, 15]
print(tf.reduce_mean(matrix)) # Mean
print(tf.reduce_max(matrix)) # Max value
print(tf.reduce_min(matrix)) # Min value
print(tf.argmax(matrix, axis=1)) # Index of max per row
Comparison operations
print(tf.equal(a, b))
print(tf.greater(a, b))
print(tf.less_equal(a, b))
Shape Transformations
t = tf.constant([[1, 2, 3, 4],
[5, 6, 7, 8]])
print(t.shape) # (2, 4)
reshape
reshaped = tf.reshape(t, [4, 2])
print(reshaped.shape) # (4, 2)
reshaped2 = tf.reshape(t, [8])
print(reshaped2.shape) # (8,)
reshaped3 = tf.reshape(t, [-1, 2]) # -1 is inferred
print(reshaped3.shape) # (4, 2)
transpose
transposed = tf.transpose(t)
print(transposed.shape) # (4, 2)
Higher-dimensional transpose
t3d = tf.random.normal([2, 3, 4])
transposed_3d = tf.transpose(t3d, perm=[0, 2, 1])
print(transposed_3d.shape) # (2, 4, 3)
expand_dims - add dimension
t1d = tf.constant([1, 2, 3])
print(t1d.shape) # (3,)
expanded_0 = tf.expand_dims(t1d, axis=0)
print(expanded_0.shape) # (1, 3)
expanded_1 = tf.expand_dims(t1d, axis=1)
print(expanded_1.shape) # (3, 1)
squeeze - remove size-1 dimensions
t_squeezable = tf.constant([[[1, 2, 3]]])
print(t_squeezable.shape) # (1, 1, 3)
squeezed = tf.squeeze(t_squeezable)
print(squeezed.shape) # (3,)
concat and stack
a = tf.constant([[1, 2], [3, 4]])
b = tf.constant([[5, 6], [7, 8]])
concat_0 = tf.concat([a, b], axis=0)
print(concat_0.shape) # (4, 2)
concat_1 = tf.concat([a, b], axis=1)
print(concat_1.shape) # (2, 4)
stacked = tf.stack([a, b], axis=0)
print(stacked.shape) # (2, 2, 2)
Broadcasting
Scalar broadcasting
matrix = tf.constant([[1.0, 2.0], [3.0, 4.0]])
print(matrix + 10) # Adds 10 to every element
Vector broadcasting
row_vector = tf.constant([10.0, 20.0]) # shape (2,)
print(matrix + row_vector) # Adds row_vector to each row
col_vector = tf.constant([[10.0], [20.0]]) # shape (2, 1)
print(matrix + col_vector) # Adds col_vector to each column
Broadcasting in batch operations
batch = tf.random.normal([32, 128]) # batch size 32, 128 features
mean = tf.reduce_mean(batch, axis=0, keepdims=True) # shape (1, 128)
std = tf.math.reduce_std(batch, axis=0, keepdims=True)
normalized = (batch - mean) / (std + 1e-8) # broadcasting applied
Interoperability with NumPy
NumPy -> TensorFlow
np_array = np.array([[1.0, 2.0], [3.0, 4.0]])
tf_tensor = tf.constant(np_array)
tf_tensor2 = tf.convert_to_tensor(np_array)
TensorFlow -> NumPy
numpy_from_tf = tf_tensor.numpy()
print(type(numpy_from_tf)) # numpy.ndarray
tf.Tensor supports most NumPy functions directly
print(np.sin(tf_tensor))
print(np.sqrt(tf_tensor))
3. Keras Sequential API
The Sequential API is the simplest way to build models in Keras. It stacks layers in a linear sequence.
Stacking Layers and Compiling the Model
from tensorflow import keras
from tensorflow.keras import layers
Method 1: add() method
model = keras.Sequential()
model.add(layers.Dense(128, activation='relu', input_shape=(784,)))
model.add(layers.Dropout(0.2))
model.add(layers.Dense(64, activation='relu'))
model.add(layers.Dropout(0.2))
model.add(layers.Dense(10, activation='softmax'))
Method 2: List definition
model = keras.Sequential([
layers.Dense(128, activation='relu', input_shape=(784,)),
layers.Dropout(0.2),
layers.Dense(64, activation='relu'),
layers.Dropout(0.2),
layers.Dense(10, activation='softmax')
])
View model architecture
model.summary()
Compile
model.compile(
optimizer='adam', # or keras.optimizers.Adam(learning_rate=0.001)
loss='sparse_categorical_crossentropy', # integer labels
metrics=['accuracy']
)
Various optimizers and losses
model.compile(
optimizer=keras.optimizers.SGD(learning_rate=0.01, momentum=0.9),
loss=keras.losses.CategoricalCrossentropy(),
metrics=[keras.metrics.CategoricalAccuracy()]
)
Complete MNIST Classification Example
from tensorflow import keras
from tensorflow.keras import layers
Load data
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
Preprocessing
x_train = x_train.astype('float32') / 255.0
x_test = x_test.astype('float32') / 255.0
x_train = x_train.reshape(-1, 784)
x_test = x_test.reshape(-1, 784)
print(f"Training data: {x_train.shape}, Labels: {y_train.shape}")
print(f"Test data: {x_test.shape}, Labels: {y_test.shape}")
Define model
model = keras.Sequential([
layers.Dense(256, activation='relu', input_shape=(784,)),
layers.BatchNormalization(),
layers.Dropout(0.3),
layers.Dense(128, activation='relu'),
layers.BatchNormalization(),
layers.Dropout(0.3),
layers.Dense(10, activation='softmax')
])
model.summary()
Compile
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=1e-3),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
Train
history = model.fit(
x_train, y_train,
batch_size=128,
epochs=20,
validation_split=0.1,
verbose=1
)
Evaluate
test_loss, test_acc = model.evaluate(x_test, y_test, verbose=0)
print(f"Test loss: {test_loss:.4f}")
print(f"Test accuracy: {test_acc:.4f}")
Predict
predictions = model.predict(x_test[:10])
predicted_classes = tf.argmax(predictions, axis=1)
print("Predicted:", predicted_classes.numpy())
print("Actual:", y_test[:10])
Plot training curves
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
ax1.plot(history.history['loss'], label='Train Loss')
ax1.plot(history.history['val_loss'], label='Val Loss')
ax1.set_title('Loss')
ax1.set_xlabel('Epoch')
ax1.legend()
ax2.plot(history.history['accuracy'], label='Train Accuracy')
ax2.plot(history.history['val_accuracy'], label='Val Accuracy')
ax2.set_title('Accuracy')
ax2.set_xlabel('Epoch')
ax2.legend()
plt.tight_layout()
plt.savefig('mnist_training.png')
plt.show()
4. Keras Functional API
The Functional API allows building more complex model architectures, including multiple inputs/outputs, residual connections, and shared layers.
Basic Functional API Usage
from tensorflow import keras
from tensorflow.keras import layers
Define input
inputs = keras.Input(shape=(784,), name='input_layer')
Connect layers (called as functions)
x = layers.Dense(256, activation='relu', name='dense_1')(inputs)
x = layers.BatchNormalization(name='bn_1')(x)
x = layers.Dropout(0.3, name='dropout_1')(x)
x = layers.Dense(128, activation='relu', name='dense_2')(x)
x = layers.BatchNormalization(name='bn_2')(x)
x = layers.Dropout(0.3, name='dropout_2')(x)
outputs = layers.Dense(10, activation='softmax', name='output')(x)
Create model
model = keras.Model(inputs=inputs, outputs=outputs, name='mnist_model')
model.summary()
Multiple Input/Output Model
from tensorflow import keras
from tensorflow.keras import layers
Example: Combining image + metadata
Image input
image_input = keras.Input(shape=(32, 32, 3), name='image')
x1 = layers.Conv2D(32, 3, activation='relu')(image_input)
x1 = layers.GlobalAveragePooling2D()(x1)
x1 = layers.Dense(64, activation='relu')(x1)
Metadata input
meta_input = keras.Input(shape=(10,), name='metadata')
x2 = layers.Dense(32, activation='relu')(meta_input)
Merge
combined = layers.concatenate([x1, x2])
combined = layers.Dense(64, activation='relu')(combined)
Multiple outputs
main_output = layers.Dense(1, activation='sigmoid', name='main_output')(combined)
aux_output = layers.Dense(5, activation='softmax', name='aux_output')(combined)
Build model
model = keras.Model(
inputs=[image_input, meta_input],
outputs=[main_output, aux_output]
)
Compile with per-output loss and weights
model.compile(
optimizer='adam',
loss={
'main_output': 'binary_crossentropy',
'aux_output': 'categorical_crossentropy'
},
loss_weights={
'main_output': 1.0,
'aux_output': 0.2
},
metrics={
'main_output': ['accuracy'],
'aux_output': ['accuracy']
}
)
Residual Connections
from tensorflow import keras
from tensorflow.keras import layers
def residual_block(x, filters, stride=1):
"""ResNet-style residual block"""
shortcut = x
Main path
x = layers.Conv2D(filters, 3, strides=stride, padding='same')(x)
x = layers.BatchNormalization()(x)
x = layers.ReLU()(x)
x = layers.Conv2D(filters, 3, padding='same')(x)
x = layers.BatchNormalization()(x)
Adjust shortcut when dimensions differ
if stride != 1 or shortcut.shape[-1] != filters:
shortcut = layers.Conv2D(filters, 1, strides=stride)(shortcut)
shortcut = layers.BatchNormalization()(shortcut)
Add residual
x = layers.add([x, shortcut])
x = layers.ReLU()(x)
return x
Build a ResNet-style model
inputs = keras.Input(shape=(32, 32, 3))
x = layers.Conv2D(64, 7, strides=2, padding='same')(inputs)
x = layers.BatchNormalization()(x)
x = layers.ReLU()(x)
x = layers.MaxPooling2D(3, strides=2, padding='same')(x)
x = residual_block(x, 64)
x = residual_block(x, 64)
x = residual_block(x, 128, stride=2)
x = residual_block(x, 128)
x = residual_block(x, 256, stride=2)
x = residual_block(x, 256)
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dense(256, activation='relu')(x)
outputs = layers.Dense(10, activation='softmax')(x)
model = keras.Model(inputs, outputs, name='mini_resnet')
model.summary()
Shared Layers (Siamese Network)
from tensorflow import keras
from tensorflow.keras import layers
Define shared encoder
shared_encoder = keras.Sequential([
layers.Dense(128, activation='relu'),
layers.Dense(64, activation='relu'),
layers.Dense(32)
], name='shared_encoder')
Two inputs
input_a = keras.Input(shape=(100,), name='input_a')
input_b = keras.Input(shape=(100,), name='input_b')
Process with the same encoder
encoded_a = shared_encoder(input_a)
encoded_b = shared_encoder(input_b)
Cosine similarity
similarity = layers.Dot(axes=1, normalize=True)([encoded_a, encoded_b])
output = layers.Dense(1, activation='sigmoid')(similarity)
siamese_model = keras.Model(inputs=[input_a, input_b], outputs=output)
siamese_model.summary()
5. Keras Subclassing API
The Subclassing API is the most flexible approach, similar to PyTorch. Define custom layers and models as Python classes.
Custom Layers
from tensorflow import keras
class MyDenseLayer(keras.layers.Layer):
def __init__(self, units, activation=None, **kwargs):
super().__init__(**kwargs)
self.units = units
self.activation = keras.activations.get(activation)
def build(self, input_shape):
Initialize weights (build runs once on first call)
self.w = self.add_weight(
name='kernel',
shape=(input_shape[-1], self.units),
initializer='glorot_uniform',
trainable=True
)
self.b = self.add_weight(
name='bias',
shape=(self.units,),
initializer='zeros',
trainable=True
)
super().build(input_shape)
def call(self, inputs, training=False):
output = tf.matmul(inputs, self.w) + self.b
if self.activation is not None:
output = self.activation(output)
return output
def get_config(self):
config = super().get_config()
config.update({'units': self.units, 'activation': keras.activations.serialize(self.activation)})
return config
Usage
layer = MyDenseLayer(64, activation='relu')
x = tf.random.normal([32, 128])
y = layer(x)
print(y.shape) # (32, 64)
class MultiHeadSelfAttention(keras.layers.Layer):
"""Simple multi-head self-attention layer"""
def __init__(self, embed_dim, num_heads, **kwargs):
super().__init__(**kwargs)
self.embed_dim = embed_dim
self.num_heads = num_heads
self.head_dim = embed_dim // num_heads
assert self.head_dim * num_heads == embed_dim
self.wq = keras.layers.Dense(embed_dim)
self.wk = keras.layers.Dense(embed_dim)
self.wv = keras.layers.Dense(embed_dim)
self.wo = keras.layers.Dense(embed_dim)
def split_heads(self, x, batch_size):
x = tf.reshape(x, (batch_size, -1, self.num_heads, self.head_dim))
return tf.transpose(x, perm=[0, 2, 1, 3])
def call(self, x, training=False):
batch_size = tf.shape(x)[0]
seq_len = tf.shape(x)[1]
q = self.split_heads(self.wq(x), batch_size)
k = self.split_heads(self.wk(x), batch_size)
v = self.split_heads(self.wv(x), batch_size)
Scaled dot-product attention
scale = tf.cast(self.head_dim, tf.float32) ** 0.5
scores = tf.matmul(q, k, transpose_b=True) / scale
weights = tf.nn.softmax(scores, axis=-1)
context = tf.matmul(weights, v)
Merge heads
context = tf.transpose(context, perm=[0, 2, 1, 3])
context = tf.reshape(context, (batch_size, seq_len, self.embed_dim))
return self.wo(context)
Custom Model (Model Subclassing)
from tensorflow import keras
class ResidualBlock(keras.layers.Layer):
def __init__(self, filters, **kwargs):
super().__init__(**kwargs)
self.conv1 = keras.layers.Conv2D(filters, 3, padding='same')
self.conv2 = keras.layers.Conv2D(filters, 3, padding='same')
self.bn1 = keras.layers.BatchNormalization()
self.bn2 = keras.layers.BatchNormalization()
self.relu = keras.layers.ReLU()
def call(self, inputs, training=False):
x = self.conv1(inputs)
x = self.bn1(x, training=training)
x = self.relu(x)
x = self.conv2(x)
x = self.bn2(x, training=training)
x = x + inputs # residual connection
return self.relu(x)
class CustomCNN(keras.Model):
def __init__(self, num_classes=10, **kwargs):
super().__init__(**kwargs)
self.conv_stem = keras.layers.Conv2D(32, 3, padding='same', activation='relu')
self.res_block1 = ResidualBlock(32)
self.res_block2 = ResidualBlock(32)
self.pool = keras.layers.MaxPooling2D(2)
self.conv_expand = keras.layers.Conv2D(64, 3, padding='same', activation='relu')
self.res_block3 = ResidualBlock(64)
self.gap = keras.layers.GlobalAveragePooling2D()
self.dropout = keras.layers.Dropout(0.5)
self.fc = keras.layers.Dense(num_classes, activation='softmax')
def call(self, inputs, training=False):
x = self.conv_stem(inputs)
x = self.res_block1(x, training=training)
x = self.res_block2(x, training=training)
x = self.pool(x)
x = self.conv_expand(x)
x = self.res_block3(x, training=training)
x = self.gap(x)
x = self.dropout(x, training=training)
return self.fc(x)
Usage
model = CustomCNN(num_classes=10)
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
Test with dummy data
dummy_input = tf.random.normal([4, 32, 32, 3])
output = model(dummy_input, training=False)
print(output.shape) # (4, 10)
model.summary()
6. CNN Implementation (CIFAR-10)
Basic CNN Model
from tensorflow import keras
from tensorflow.keras import layers
Load CIFAR-10
(x_train, y_train), (x_test, y_test) = keras.datasets.cifar10.load_data()
x_train = x_train.astype('float32') / 255.0
x_test = x_test.astype('float32') / 255.0
class_names = ['airplane', 'automobile', 'bird', 'cat', 'deer',
'dog', 'frog', 'horse', 'ship', 'truck']
print(f"Training data: {x_train.shape}") # (50000, 32, 32, 3)
print(f"Test data: {x_test.shape}") # (10000, 32, 32, 3)
Define CNN model
def build_cnn_model():
model = keras.Sequential([
First conv block
layers.Conv2D(32, (3, 3), padding='same', input_shape=(32, 32, 3)),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.Conv2D(32, (3, 3), padding='same'),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.MaxPooling2D((2, 2)),
layers.Dropout(0.25),
Second conv block
layers.Conv2D(64, (3, 3), padding='same'),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.Conv2D(64, (3, 3), padding='same'),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.MaxPooling2D((2, 2)),
layers.Dropout(0.25),
Third conv block
layers.Conv2D(128, (3, 3), padding='same'),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.Conv2D(128, (3, 3), padding='same'),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.GlobalAveragePooling2D(),
layers.Dropout(0.5),
Fully connected layers
layers.Dense(256, activation='relu'),
layers.BatchNormalization(),
layers.Dropout(0.5),
layers.Dense(10, activation='softmax')
])
return model
model = build_cnn_model()
model.summary()
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=1e-3),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
Data Augmentation
from tensorflow import keras
from tensorflow.keras import layers
Keras built-in augmentation layers
data_augmentation = keras.Sequential([
layers.RandomFlip('horizontal'),
layers.RandomRotation(0.1),
layers.RandomZoom(0.1),
layers.RandomTranslation(0.1, 0.1),
layers.RandomContrast(0.1),
], name='data_augmentation')
Integrate augmentation into the model
inputs = keras.Input(shape=(32, 32, 3))
x = data_augmentation(inputs) # Only applied during training
x = layers.Rescaling(1./255)(x)
x = layers.Conv2D(32, 3, padding='same', activation='relu')(x)
x = layers.MaxPooling2D()(x)
x = layers.Conv2D(64, 3, padding='same', activation='relu')(x)
x = layers.MaxPooling2D()(x)
x = layers.GlobalAveragePooling2D()(x)
outputs = layers.Dense(10, activation='softmax')(x)
augmented_model = keras.Model(inputs, outputs)
tf.data augmentation pipeline
def augment_image(image, label):
image = tf.cast(image, tf.float32) / 255.0
image = tf.image.random_flip_left_right(image)
image = tf.image.random_brightness(image, 0.2)
image = tf.image.random_contrast(image, 0.8, 1.2)
image = tf.image.random_saturation(image, 0.8, 1.2)
image = tf.image.pad_to_bounding_box(image, 4, 4, 40, 40)
image = tf.image.random_crop(image, [32, 32, 3])
return image, label
(x_train, y_train), (x_test, y_test) = keras.datasets.cifar10.load_data()
y_train = y_train.flatten()
y_test = y_test.flatten()
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.map(augment_image, num_parallel_calls=tf.data.AUTOTUNE)
train_dataset = train_dataset.shuffle(10000).batch(128).prefetch(tf.data.AUTOTUNE)
Transfer Learning
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.applications import EfficientNetB0
Transfer learning with EfficientNetB0
def build_transfer_model(num_classes=10):
Pre-trained base model (without classifier head)
base_model = EfficientNetB0(
include_top=False,
weights='imagenet',
input_shape=(224, 224, 3)
)
Freeze base model initially
base_model.trainable = False
inputs = keras.Input(shape=(224, 224, 3))
EfficientNet includes internal preprocessing
x = base_model(inputs, training=False)
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dense(256, activation='relu')(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(num_classes, activation='softmax')(x)
model = keras.Model(inputs, outputs)
return model, base_model
model, base_model = build_transfer_model(num_classes=10)
model.compile(
optimizer=keras.optimizers.Adam(1e-3),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
Phase 1: Train classifier with frozen base
model.fit(train_dataset, epochs=10, ...)
Fine-tuning: Unfreeze some base layers
def fine_tune(model, base_model, fine_tune_at=100):
base_model.trainable = True
Keep layers before fine_tune_at frozen
for layer in base_model.layers[:fine_tune_at]:
layer.trainable = False
model.compile(
optimizer=keras.optimizers.Adam(1e-5), # Very low learning rate
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
return model
fine_tune(model, base_model, fine_tune_at=100)
model.fit(train_dataset, epochs=20, ...)
7. RNN/LSTM Implementation
Time Series Prediction (LSTM)
from tensorflow import keras
from tensorflow.keras import layers
Generate synthetic time series data
def generate_time_series(n_samples, n_steps):
t = np.linspace(0, 4 * np.pi, n_steps + 1)
series = np.sin(t) + 0.1 * np.random.randn(n_samples, n_steps + 1)
X = series[:, :-1].reshape(-1, n_steps, 1)
y = series[:, 1:].reshape(-1, n_steps, 1)
return X, y
X_train, y_train = generate_time_series(10000, 50)
X_val, y_val = generate_time_series(1000, 50)
X_test, y_test = generate_time_series(1000, 50)
LSTM model
def build_lstm_model(n_steps=50):
model = keras.Sequential([
layers.LSTM(64, return_sequences=True, input_shape=(n_steps, 1)),
layers.Dropout(0.2),
layers.LSTM(64, return_sequences=True),
layers.Dropout(0.2),
layers.TimeDistributed(layers.Dense(1))
])
model.compile(
optimizer='adam',
loss='mse',
metrics=['mae']
)
return model
model = build_lstm_model()
model.summary()
history = model.fit(
X_train, y_train,
epochs=20,
batch_size=128,
validation_data=(X_val, y_val)
)
Text Generation (Character-Level RNN)
from tensorflow import keras
from tensorflow.keras import layers
Example text (use larger corpus in practice)
text = """TensorFlow is a deep learning framework created by Google.
Keras is a high-level API that runs on top of TensorFlow.
Using both libraries together, you can build powerful deep learning models quickly."""
Create character set
chars = sorted(set(text))
char2idx = {c: i for i, c in enumerate(chars)}
idx2char = np.array(chars)
vocab_size = len(chars)
Convert text to indices
text_as_int = np.array([char2idx[c] for c in text])
Create sequences
seq_length = 50
sequences = tf.data.Dataset.from_tensor_slices(text_as_int)
sequences = sequences.batch(seq_length + 1, drop_remainder=True)
def split_input_target(chunk):
return chunk[:-1], chunk[1:]
dataset = sequences.map(split_input_target)
dataset = dataset.shuffle(100).batch(32, drop_remainder=True)
Character-level RNN model
def build_char_rnn(vocab_size, embed_dim=64, rnn_units=256):
model = keras.Sequential([
layers.Embedding(vocab_size, embed_dim),
layers.GRU(rnn_units, return_sequences=True, stateful=False),
layers.GRU(rnn_units, return_sequences=True),
layers.Dense(vocab_size)
])
return model
model = build_char_rnn(vocab_size)
model.compile(
optimizer='adam',
loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy']
)
Text generation function
def generate_text(model, start_string, num_generate=100, temperature=1.0):
input_eval = [char2idx[s] for s in start_string]
input_eval = tf.expand_dims(input_eval, 0)
text_generated = []
for _ in range(num_generate):
predictions = model(input_eval)
predictions = tf.squeeze(predictions, 0) / temperature
predicted_id = tf.random.categorical(predictions, num_samples=1)[-1, 0].numpy()
input_eval = tf.expand_dims([predicted_id], 0)
text_generated.append(idx2char[predicted_id])
return start_string + ''.join(text_generated)
Bidirectional LSTM
from tensorflow import keras
from tensorflow.keras import layers
Bidirectional LSTM for text classification
def build_bidirectional_model(vocab_size, max_len=100, embed_dim=64):
model = keras.Sequential([
layers.Embedding(vocab_size, embed_dim, input_length=max_len),
layers.Bidirectional(layers.LSTM(64, return_sequences=True)),
layers.Bidirectional(layers.LSTM(32)),
layers.Dense(64, activation='relu'),
layers.Dropout(0.5),
layers.Dense(1, activation='sigmoid')
])
model.compile(
optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy']
)
return model
model = build_bidirectional_model(vocab_size=10000, max_len=100)
model.summary()
8. Transformer with Keras
Multi-head Attention and Transformer Encoder
from tensorflow import keras
from tensorflow.keras import layers
class TransformerBlock(keras.layers.Layer):
def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1, **kwargs):
super().__init__(**kwargs)
self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
self.ffn = keras.Sequential([
layers.Dense(ff_dim, activation='relu'),
layers.Dense(embed_dim)
])
self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
self.dropout1 = layers.Dropout(rate)
self.dropout2 = layers.Dropout(rate)
def call(self, inputs, training=False):
attn_output = self.att(inputs, inputs)
attn_output = self.dropout1(attn_output, training=training)
out1 = self.layernorm1(inputs + attn_output)
ffn_output = self.ffn(out1)
ffn_output = self.dropout2(ffn_output, training=training)
return self.layernorm2(out1 + ffn_output)
class TokenAndPositionEmbedding(keras.layers.Layer):
def __init__(self, maxlen, vocab_size, embed_dim, **kwargs):
super().__init__(**kwargs)
self.token_emb = layers.Embedding(vocab_size, embed_dim)
self.pos_emb = layers.Embedding(maxlen, embed_dim)
def call(self, x):
maxlen = tf.shape(x)[-1]
positions = tf.range(start=0, limit=maxlen, delta=1)
positions = self.pos_emb(positions)
x = self.token_emb(x)
return x + positions
Transformer classifier for text
def build_transformer_classifier(
vocab_size=20000,
maxlen=200,
embed_dim=32,
num_heads=2,
ff_dim=32,
num_classes=2
):
inputs = layers.Input(shape=(maxlen,))
embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
x = embedding_layer(inputs)
transformer_block = TransformerBlock(embed_dim, num_heads, ff_dim)
x = transformer_block(x)
x = layers.GlobalAveragePooling1D()(x)
x = layers.Dropout(0.1)(x)
x = layers.Dense(20, activation='relu')(x)
x = layers.Dropout(0.1)(x)
outputs = layers.Dense(num_classes, activation='softmax')(x)
model = keras.Model(inputs=inputs, outputs=outputs)
return model
model = build_transformer_classifier()
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
model.summary()
IMDB sentiment classification
vocab_size = 20000
maxlen = 200
(x_train, y_train), (x_val, y_val) = keras.datasets.imdb.load_data(num_words=vocab_size)
x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=maxlen)
x_val = keras.preprocessing.sequence.pad_sequences(x_val, maxlen=maxlen)
model.fit(x_train, y_train, batch_size=32, epochs=5, validation_data=(x_val, y_val))
9. Data Pipeline (tf.data)
Basic tf.data.Dataset Usage
Create Dataset from tensors
X = np.random.randn(1000, 10)
y = np.random.randint(0, 2, 1000)
dataset = tf.data.Dataset.from_tensor_slices((X, y))
print(dataset) # TensorSliceDataset
Basic transformations
dataset = (dataset
.shuffle(buffer_size=1000)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
Inspect data
for batch_X, batch_y in dataset.take(1):
print(f"Batch X shape: {batch_X.shape}") # (32, 10)
print(f"Batch y shape: {batch_y.shape}") # (32,)
map transformation
def preprocess(x, y):
x = tf.cast(x, tf.float32)
y = tf.cast(y, tf.int32)
x = (x - tf.reduce_mean(x)) / tf.math.reduce_std(x)
return x, y
dataset = tf.data.Dataset.from_tensor_slices((X, y))
dataset = dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
filter transformation
positive_dataset = dataset.filter(lambda x, y: y == 1)
Create from range
range_dataset = tf.data.Dataset.range(100)
zip to combine
features_ds = tf.data.Dataset.from_tensor_slices(X)
labels_ds = tf.data.Dataset.from_tensor_slices(y)
combined_ds = tf.data.Dataset.zip((features_ds, labels_ds))
Image Data Loading Pipeline
def load_and_preprocess_image(path, label, image_size=(224, 224)):
image = tf.io.read_file(path)
image = tf.image.decode_jpeg(image, channels=3)
image = tf.image.resize(image, image_size)
image = tf.cast(image, tf.float32) / 255.0
return image, label
def create_image_dataset(data_dir, batch_size=32, image_size=(224, 224)):
class_names = sorted(os.listdir(data_dir))
class_map = {name: idx for idx, name in enumerate(class_names)}
file_paths = []
labels = []
for class_name in class_names:
class_dir = os.path.join(data_dir, class_name)
for fname in os.listdir(class_dir):
if fname.endswith(('.jpg', '.jpeg', '.png')):
file_paths.append(os.path.join(class_dir, fname))
labels.append(class_map[class_name])
path_ds = tf.data.Dataset.from_tensor_slices(file_paths)
label_ds = tf.data.Dataset.from_tensor_slices(labels)
combined = tf.data.Dataset.zip((path_ds, label_ds))
dataset = combined.map(
lambda p, l: load_and_preprocess_image(p, l, image_size),
num_parallel_calls=tf.data.AUTOTUNE
)
dataset = dataset.shuffle(1000).batch(batch_size).prefetch(tf.data.AUTOTUNE)
return dataset, class_names
Alternatively, use the built-in helper (easier)
train_ds = keras.utils.image_dataset_from_directory(
'data/train',
image_size=(224, 224),
batch_size=32
)
TFRecord Format
Writing TFRecord files
def bytes_feature(value):
if isinstance(value, type(tf.constant(0))):
value = value.numpy()
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def int64_feature(value):
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
def float_feature(value):
return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))
def image_to_tfrecord(image_path, label, writer):
image_string = open(image_path, 'rb').read()
feature = {
'image': bytes_feature(image_string),
'label': int64_feature(label),
}
example = tf.train.Example(features=tf.train.Features(feature=feature))
writer.write(example.SerializeToString())
Reading TFRecord files
def parse_tfrecord(serialized_example):
feature_description = {
'image': tf.io.FixedLenFeature([], tf.string),
'label': tf.io.FixedLenFeature([], tf.int64),
}
example = tf.io.parse_single_example(serialized_example, feature_description)
image = tf.image.decode_jpeg(example['image'], channels=3)
image = tf.image.resize(image, [224, 224])
image = tf.cast(image, tf.float32) / 255.0
label = example['label']
return image, label
Create TFRecord Dataset
dataset = tf.data.TFRecordDataset(['data.tfrecord'])
dataset = dataset.map(parse_tfrecord, num_parallel_calls=tf.data.AUTOTUNE)
dataset = dataset.shuffle(1000).batch(32).prefetch(tf.data.AUTOTUNE)
10. Advanced Training Techniques
Callbacks
from tensorflow import keras
ModelCheckpoint: Save best model
checkpoint_cb = keras.callbacks.ModelCheckpoint(
filepath='best_model.keras',
monitor='val_accuracy',
mode='max',
save_best_only=True,
verbose=1
)
EarlyStopping: Prevent overfitting
early_stopping_cb = keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=10,
restore_best_weights=True,
verbose=1
)
ReduceLROnPlateau: Reduce learning rate
reduce_lr_cb = keras.callbacks.ReduceLROnPlateau(
monitor='val_loss',
factor=0.5,
patience=5,
min_lr=1e-7,
verbose=1
)
TensorBoard: Visualization
tensorboard_cb = keras.callbacks.TensorBoard(
log_dir='logs/',
histogram_freq=1,
write_graph=True,
write_images=True,
update_freq='epoch'
)
LearningRateScheduler: Custom schedule
def cosine_decay_schedule(epoch, lr):
initial_lr = 1e-3
total_epochs = 100
return initial_lr * (1 + math.cos(math.pi * epoch / total_epochs)) / 2
lr_scheduler_cb = keras.callbacks.LearningRateScheduler(
cosine_decay_schedule, verbose=1
)
CSV logging
csv_logger_cb = keras.callbacks.CSVLogger('training_log.csv')
Custom callback
class ConfusionMatrixCallback(keras.callbacks.Callback):
def __init__(self, validation_data, class_names):
super().__init__()
self.X_val, self.y_val = validation_data
self.class_names = class_names
def on_epoch_end(self, epoch, logs=None):
y_pred = tf.argmax(self.model.predict(self.X_val), axis=1)
cm = tf.math.confusion_matrix(self.y_val, y_pred)
if epoch % 10 == 0:
print(f"\nEpoch {epoch} confusion matrix:\n{cm.numpy()}")
callbacks = [
checkpoint_cb,
early_stopping_cb,
reduce_lr_cb,
tensorboard_cb,
csv_logger_cb
]
Usage example
history = model.fit(
X_train, y_train,
epochs=100,
validation_data=(X_val, y_val),
callbacks=callbacks
)
Custom Training Loop (GradientTape)
from tensorflow import keras
Simple classification model
model = keras.Sequential([
keras.layers.Dense(64, activation='relu', input_shape=(784,)),
keras.layers.Dense(64, activation='relu'),
keras.layers.Dense(10)
])
Loss and optimizer
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = keras.optimizers.Adam(learning_rate=1e-3)
Metrics
train_loss = keras.metrics.Mean(name='train_loss')
train_accuracy = keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')
val_loss = keras.metrics.Mean(name='val_loss')
val_accuracy = keras.metrics.SparseCategoricalAccuracy(name='val_accuracy')
Single training step
@tf.function # JIT compilation for speedup
def train_step(images, labels):
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = loss_fn(labels, predictions)
L2 regularization
l2_loss = tf.add_n([tf.nn.l2_loss(v) for v in model.trainable_variables
if 'bias' not in v.name])
total_loss = loss + 1e-4 * l2_loss
gradients = tape.gradient(total_loss, model.trainable_variables)
Gradient clipping
gradients, _ = tf.clip_by_global_norm(gradients, 1.0)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
train_loss.update_state(loss)
train_accuracy.update_state(labels, predictions)
return loss
@tf.function
def val_step(images, labels):
predictions = model(images, training=False)
loss = loss_fn(labels, predictions)
val_loss.update_state(loss)
val_accuracy.update_state(labels, predictions)
Full training loop
def train(train_ds, val_ds, epochs=10):
for epoch in range(epochs):
start_time = time.time()
Reset metrics
train_loss.reset_states()
train_accuracy.reset_states()
val_loss.reset_states()
val_accuracy.reset_states()
Training
for step, (images, labels) in enumerate(train_ds):
train_step(images, labels)
if step % 100 == 0:
print(f"Step {step}: loss={train_loss.result():.4f}")
Validation
for images, labels in val_ds:
val_step(images, labels)
elapsed = time.time() - start_time
print(f"Epoch {epoch+1}/{epochs} ({elapsed:.1f}s) - "
f"loss: {train_loss.result():.4f}, "
f"accuracy: {train_accuracy.result():.4f}, "
f"val_loss: {val_loss.result():.4f}, "
f"val_accuracy: {val_accuracy.result():.4f}")
Distributed Training (tf.distribute.Strategy)
from tensorflow import keras
Multi-GPU strategy
strategy = tf.distribute.MirroredStrategy()
print(f"Number of devices: {strategy.num_replicas_in_sync}")
with strategy.scope():
Define and compile model inside strategy.scope()
model = keras.Sequential([
keras.layers.Dense(64, activation='relu', input_shape=(784,)),
keras.layers.Dense(64, activation='relu'),
keras.layers.Dense(10, activation='softmax')
])
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
Scale batch size proportionally to number of GPUs
BATCH_SIZE_PER_REPLICA = 64
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
Mixed precision training
from tensorflow.keras import mixed_precision
policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)
with strategy.scope():
inputs = keras.Input(shape=(784,))
x = keras.layers.Dense(64, activation='relu')(inputs)
Output layer must use float32
outputs = keras.layers.Dense(10, activation='softmax', dtype='float32')(x)
model_mp = keras.Model(inputs, outputs)
opt = keras.optimizers.Adam(1e-3)
opt = mixed_precision.LossScaleOptimizer(opt)
model_mp.compile(
optimizer=opt,
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
11. TensorBoard Visualization
Basic TensorBoard Usage
from tensorflow import keras
Log directory
log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
Callback setup
tensorboard_callback = keras.callbacks.TensorBoard(
log_dir=log_dir,
histogram_freq=1, # Weight histogram frequency
write_graph=True, # Record computation graph
write_images=True, # Record weight images
update_freq='epoch', # Update frequency
profile_batch=2 # Profiling batch
)
Custom scalar logging
file_writer = tf.summary.create_file_writer(log_dir + '/custom_scalars')
def log_custom_metrics(epoch, logs):
with file_writer.as_default():
tf.summary.scalar('learning_rate',
data=keras.backend.get_value(model.optimizer.lr),
step=epoch)
lr_callback = keras.callbacks.LambdaCallback(on_epoch_end=log_custom_metrics)
Log images
def log_images(epoch, logs):
(_, _), (x_test, y_test) = keras.datasets.mnist.load_data()
x_test = x_test[:10].reshape(-1, 28, 28, 1).astype('float32') / 255.0
with file_writer.as_default():
tf.summary.image("Test Samples", x_test, step=epoch, max_outputs=10)
image_callback = keras.callbacks.LambdaCallback(on_epoch_end=log_images)
Start TensorBoard with:
tensorboard --logdir logs/fit
Embedding Visualization
from tensorflow import keras
from tensorboard.plugins import projector
Train and visualize embedding layer
(x_train, y_train), _ = keras.datasets.mnist.load_data()
x_train = x_train.astype('float32').reshape(-1, 784) / 255.0
Embedding model
embedding_model = keras.Sequential([
keras.layers.Dense(64, activation='relu', input_shape=(784,)),
keras.layers.Dense(32, name='embedding')
])
Extract embeddings
embeddings = embedding_model.predict(x_train[:1000])
Save embedding file
log_dir = 'logs/embedding'
os.makedirs(log_dir, exist_ok=True)
np.savetxt(os.path.join(log_dir, 'vectors.tsv'), embeddings, delimiter='\t')
Save metadata (labels)
with open(os.path.join(log_dir, 'metadata.tsv'), 'w') as f:
for label in y_train[:1000]:
f.write(f"{label}\n")
Configure Projector
config = projector.ProjectorConfig()
embedding_config = config.embeddings.add()
embedding_config.tensor_name = "embedding/.ATTRIBUTES/VARIABLE_VALUE"
embedding_config.tensor_path = 'vectors.tsv'
embedding_config.metadata_path = 'metadata.tsv'
projector.visualize_embeddings(log_dir, config)
12. Saving and Converting Models
SavedModel Format
from tensorflow import keras
Save model
model.save('saved_model/my_model')
Load
loaded_model = keras.models.load_model('saved_model/my_model')
HDF5 format (legacy)
model.save('my_model.h5')
loaded_h5 = keras.models.load_model('my_model.h5')
Save/load weights only
model.save_weights('model_weights.h5')
model.load_weights('model_weights.h5')
Keras native format (recommended)
model.save('my_model.keras')
loaded_keras = keras.models.load_model('my_model.keras')
Subclassed models require get_config
class MyModel(keras.Model):
def __init__(self, units):
super().__init__()
self.units = units
self.dense = keras.layers.Dense(units)
def call(self, inputs):
return self.dense(inputs)
def get_config(self):
return {'units': self.units}
@classmethod
def from_config(cls, config):
return cls(**config)
TensorFlow Lite Conversion (Mobile/Edge)
from tensorflow import keras
Basic TFLite conversion
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model/my_model')
tflite_model = converter.convert()
with open('model.tflite', 'wb') as f:
f.write(tflite_model)
Dynamic range quantization
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model/my_model')
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_quantized = converter.convert()
Full integer quantization (INT8)
def representative_dataset():
for _ in range(100):
data = np.random.random((1, 784)).astype(np.float32)
yield [data]
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model/my_model')
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_dataset
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8
tflite_int8 = converter.convert()
Run TFLite model
interpreter = tf.lite.Interpreter(model_content=tflite_quantized)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
print("Input:", input_details[0]['shape'])
print("Output:", output_details[0]['shape'])
Run inference
input_data = np.random.random((1, 784)).astype(np.float32)
interpreter.set_tensor(input_details[0]['index'], input_data)
interpreter.invoke()
output_data = interpreter.get_tensor(output_details[0]['index'])
print("TFLite output:", output_data.shape)
TensorFlow.js Conversion
Install tfjs conversion tool
pip install tensorflowjs
Convert from SavedModel to TFJS
tensorflowjs_converter \
--input_format=tf_saved_model \
--output_format=tfjs_graph_model \
--signature_name=serving_default \
saved_model/my_model \
tfjs_model/
13. Production Deployment with TF-Serving
TensorFlow Serving Setup
Run TF Serving with Docker (easiest approach)
docker pull tensorflow/serving
Serve the model
docker run -t --rm \
-p 8501:8501 \
-v "/path/to/saved_model:/models/my_model" \
-e MODEL_NAME=my_model \
tensorflow/serving
GPU support
docker run --gpus all -t --rm \
-p 8501:8501 \
-v "/path/to/saved_model:/models/my_model" \
-e MODEL_NAME=my_model \
tensorflow/serving:latest-gpu
Making Predictions via REST API
REST API request
url = 'http://localhost:8501/v1/models/my_model:predict'
Prepare input data
data = np.random.random((1, 784)).astype(float)
payload = {
"instances": data.tolist()
}
response = requests.post(url, json=payload)
result = json.loads(response.text)
print("Predictions:", result['predictions'])
Check model info
info_url = 'http://localhost:8501/v1/models/my_model'
info_response = requests.get(info_url)
print("Model info:", json.loads(info_response.text))
gRPC Client
from tensorflow_serving.apis import predict_pb2
from tensorflow_serving.apis import prediction_service_pb2_grpc
Create gRPC channel
channel = grpc.insecure_channel('localhost:8500')
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
Build request
request = predict_pb2.PredictRequest()
request.model_spec.name = 'my_model'
request.model_spec.signature_name = 'serving_default'
Set input tensor
input_data = np.random.random((1, 784)).astype(np.float32)
request.inputs['input_layer'].CopyFrom(
tf.make_tensor_proto(input_data, shape=input_data.shape)
)
Run inference
response = stub.Predict(request, 10.0) # 10-second timeout
output = tf.make_ndarray(response.outputs['output'])
print("gRPC prediction:", output)
Version Management and Canary Deployment
model.config file
model_config_list {
config {
name: 'my_model'
base_path: '/models/my_model/'
model_platform: 'tensorflow'
model_version_policy {
specific {
versions: 1
versions: 2
}
}
version_labels {
key: 'stable'
value: 1
}
version_labels {
key: 'canary'
value: 2
}
}
}
Serve with config file
docker run -t --rm \
-p 8501:8501 -p 8500:8500 \
-v "/path/to/models:/models" \
-v "/path/to/model.config:/models/model.config" \
tensorflow/serving \
--model_config_file=/models/model.config
Request a specific version
url = 'http://localhost:8501/v1/models/my_model/versions/1:predict'
Or request by label
url_label = 'http://localhost:8501/v1/models/my_model/labels/stable:predict'
14. TensorFlow Extended (TFX)
TFX is a production machine learning pipeline platform built on TensorFlow.
ML Pipeline Overview
from tfx.components import (
CsvExampleGen,
StatisticsGen,
SchemaGen,
ExampleValidator,
Transform,
Trainer,
Evaluator,
Pusher
)
from tfx.proto import pusher_pb2, trainer_pb2
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
Interactive context (for notebooks)
context = InteractiveContext()
1. ExampleGen: Data ingestion
example_gen = CsvExampleGen(input_base='data/')
context.run(example_gen)
2. StatisticsGen: Data statistics
statistics_gen = StatisticsGen(
examples=example_gen.outputs['examples']
)
context.run(statistics_gen)
3. SchemaGen: Schema generation
schema_gen = SchemaGen(
statistics=statistics_gen.outputs['statistics'],
infer_feature_shape=True
)
context.run(schema_gen)
4. ExampleValidator: Data validation
example_validator = ExampleValidator(
statistics=statistics_gen.outputs['statistics'],
schema=schema_gen.outputs['schema']
)
context.run(example_validator)
5. Transform: Feature engineering
Requires preprocessing_fn defined in transform.py
6. Trainer: Model training
trainer = Trainer(
module_file='trainer_module.py',
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
train_args=trainer_pb2.TrainArgs(num_steps=1000),
eval_args=trainer_pb2.EvalArgs(num_steps=500)
)
7. Pusher: Model deployment
pusher = Pusher(
model=trainer.outputs['model'],
push_destination=pusher_pb2.PushDestination(
filesystem=pusher_pb2.PushDestination.Filesystem(
base_directory='serving_model/'
)
)
)
Transform Component Example
transform.py
FEATURE_KEYS = ['feature1', 'feature2', 'feature3']
LABEL_KEY = 'label'
def preprocessing_fn(inputs):
"""Feature preprocessing function"""
outputs = {}
for key in FEATURE_KEYS:
Normalize with z-score
outputs[key] = tft.scale_to_z_score(inputs[key])
Encode label
outputs[LABEL_KEY] = tf.cast(inputs[LABEL_KEY], tf.int64)
return outputs
Conclusion
This guide has covered TensorFlow and Keras comprehensively, from core concepts to production deployment.
Key Takeaways
- **Eager Execution**: Default execution mode in TF 2.x; use `@tf.function` for graph optimization
- **Three Keras APIs**: Sequential (simple), Functional (complex topologies), Subclassing (full customization)
- **tf.data**: Essential tool for efficient data pipelines with map, filter, batch, shuffle, prefetch
- **GradientTape**: Custom training loops and automatic differentiation
- **Deployment Options**: TF Serving (server), TFLite (mobile/edge), TF.js (browser)
- **TFX**: The standard for production ML pipelines
Topics for Further Study
- TensorFlow Probability (probabilistic deep learning)
- Keras Tuner (hyperparameter optimization)
- TensorFlow Datasets (standard datasets)
- TensorFlow Hub (pre-trained models)
- tf-agents (reinforcement learning)
References
- [TensorFlow Official Documentation](https://www.tensorflow.org/api_docs/python/tf)
- [TensorFlow Tutorials](https://www.tensorflow.org/tutorials)
- [Keras API Documentation](https://keras.io/api/)
- [TensorFlow Keras Guide](https://www.tensorflow.org/guide/keras)
- [tf.data Guide](https://www.tensorflow.org/guide/data)
- [TensorBoard Guide](https://www.tensorflow.org/tensorboard)
- [TFX Guide](https://www.tensorflow.org/tfx)
- [TFLite Guide](https://www.tensorflow.org/lite)
- [TensorFlow.js](https://www.tensorflow.org/js)
현재 단락 (1/1153)
TensorFlow is a machine learning and deep learning framework open-sourced by Google Brain in 2015. I...