- Authors

- Name
- Youngju Kim
- @fjvbn20031
- はじめに
- 1. 環境構築
- 2. テンソルの基礎
- 3. 自動微分(Autograd)
- 4. nn.Module — ニューラルネットワークの基盤
- 5. 線形回帰をゼロから実装
- 6. 多層パーセプトロン(MLP)— MNIST分類
- 7. 畳み込みニューラルネットワーク(CNN)— CIFAR-10分類
- 8. 再帰ニューラルネットワーク(RNN / LSTM)— 系列データ
- 9. Transformer — マルチヘッドアテンションをゼロから実装
- 10. データ読み込み — DatasetとDataLoader
- 11. オプティマイザー — SGD、Adam、AdamW
- 12. 学習率スケジューラー
- 13. 正則化 — Dropout、BatchNorm、LayerNorm
- 14. 転移学習
- 15. モデルの保存と読み込み
- 16. TorchScriptとモデルのデプロイ
- 17. 分散学習(DDP)— DistributedDataParallel
- 18. 高度なテクニック
- おわりに
はじめに
ディープラーニングの2大フレームワーク、TensorFlowとPyTorchの中で、PyTorchは研究者とエンジニアの双方に最も好まれる選択肢となっています。2016年にFacebook AI Research(現Meta AI)によってリリースされたPyTorchは、学術論文の実装における標準となり、現在では産業界での採用においてもTensorFlowを上回っています。
このガイドは基本的なPythonの知識を持つ読者を対象とし、PyTorchとの初めての接触から分散学習まですべてを順を追って解説します。各セクションには実行可能なコード例と公式ドキュメントへのリンクが含まれており、読みながら同時に実践することができます。
公式ドキュメント: https://pytorch.org/docs/stable/index.html 公式チュートリアル: https://pytorch.org/tutorials/
1. 環境構築
PyTorchのインストール
PyTorchはpipまたはcondaでインストールできます。GPUを使用する場合は、CUDAバージョンに対応するパッケージを選択してください。
pipインストール(CUDA 12.1):
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
condaインストール:
conda install pytorch torchvision torchaudio pytorch-cuda=12.1 -c pytorch -c nvidia
CPUのみのインストール:
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu
GPU使用可能かどうかの確認
import torch
# PyTorchのバージョン確認
print(f"PyTorch version: {torch.__version__}")
# CUDA使用可能かどうかの確認
print(f"CUDA available: {torch.cuda.is_available()}")
# GPU数と情報
if torch.cuda.is_available():
print(f"GPU count: {torch.cuda.device_count()}")
print(f"Current GPU: {torch.cuda.get_device_name(0)}")
print(f"GPU memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
# Apple Silicon (M1/M2/M3) MPSチェック
print(f"MPS available: {torch.backends.mps.is_available()}")
# デバイスの自動選択
if torch.cuda.is_available():
device = torch.device("cuda")
elif torch.backends.mps.is_available():
device = torch.device("mps")
else:
device = torch.device("cpu")
print(f"Using device: {device}")
2. テンソルの基礎
テンソルはPyTorchの基本的なデータ構造です。NumPyのndarrayに似ていますが、GPU計算と自動微分をサポートしています。
テンソルの作成
import torch
import numpy as np
# データから直接作成
t1 = torch.tensor([1, 2, 3, 4, 5])
print(f"1D tensor: {t1}, shape: {t1.shape}, dtype: {t1.dtype}")
# 2Dテンソル(行列)
t2 = torch.tensor([[1.0, 2.0, 3.0],
[4.0, 5.0, 6.0]])
print(f"2D tensor:\n{t2}, shape: {t2.shape}")
# 特殊なテンソルの作成
zeros = torch.zeros(3, 4) # すべてゼロ
ones = torch.ones(2, 3) # すべて1
rand = torch.rand(3, 3) # 一様分布 [0, 1)
randn = torch.randn(3, 3) # 標準正規分布
eye = torch.eye(4) # 単位行列
arange = torch.arange(0, 10, 2) # [0, 2, 4, 6, 8]
linspace = torch.linspace(0, 1, 5) # 5つの等間隔の値
# 既存テンソルと同じ形状で作成
t3 = torch.zeros_like(t2)
t4 = torch.ones_like(t2)
t5 = torch.rand_like(t2)
# NumPy配列から(共有メモリ)
np_arr = np.array([1.0, 2.0, 3.0])
t_from_np = torch.from_numpy(np_arr)
# テンソルをNumPyに変換(CPUのみ)
np_from_t = t1.numpy()
テンソルの属性と型変換
t = torch.rand(3, 4, 5)
print(f"shape: {t.shape}") # torch.Size([3, 4, 5])
print(f"ndim: {t.ndim}") # 3
print(f"dtype: {t.dtype}") # torch.float32
print(f"device: {t.device}") # cpu
print(f"numel: {t.numel()}") # 60 (総要素数)
# 型変換
t_int = t.to(torch.int32)
t_long = t.long() # torch.int64
t_float = t.float() # torch.float32
t_double = t.double() # torch.float64
t_half = t.half() # torch.float16
# GPUへ移動
if torch.cuda.is_available():
t_gpu = t.to("cuda")
t_back = t_gpu.cpu() # CPUに戻す
テンソルのリシェイプ
t = torch.arange(24) # 1Dテンソル 0..23
t_2d = t.reshape(4, 6)
t_3d = t.reshape(2, 3, 4)
t_auto = t.reshape(6, -1) # -1はサイズを推定(6x4)
# view: reshapeと同様だが連続したメモリが必要
t_view = t.view(3, 8)
# squeeze / unsqueeze
t = torch.zeros(1, 3, 1, 4)
t_sq = t.squeeze() # サイズ1の次元を削除 → [3, 4]
t_sq1 = t.squeeze(0) # dim 0のみ削除 → [3, 1, 4]
t_unsq = t_sq.unsqueeze(0) # dim 0に次元を追加 → [1, 3, 4]
# transpose / permute
t = torch.rand(2, 3, 4)
t_T = t.transpose(0, 1) # [3, 2, 4]
t_perm = t.permute(2, 0, 1) # [4, 2, 3]
t_cont = t_perm.contiguous() # 連続メモリを保証
テンソルの演算
a = torch.tensor([[1.0, 2.0], [3.0, 4.0]])
b = torch.tensor([[5.0, 6.0], [7.0, 8.0]])
# 要素ごとの算術演算
print(a + b) # torch.add(a, b)
print(a - b) # torch.sub(a, b)
print(a * b) # アダマール積
print(a / b)
print(a ** 2)
# 行列の乗算
matmul = a @ b # または torch.matmul(a, b)
mm = torch.mm(a, b) # 2Dのみ
# リダクション
t = torch.rand(3, 4)
print(t.sum())
print(t.mean())
print(t.max())
print(t.std())
print(t.sum(dim=0)) # 行方向の合計
print(t.sum(dim=1, keepdim=True))
# argmax / argmin
print(t.argmax())
print(t.argmax(dim=1))
ブロードキャスト
a = torch.tensor([[1, 2, 3],
[4, 5, 6]]) # shape: [2, 3]
b = torch.tensor([10, 20, 30]) # shape: [3]
# bは[2, 3]にブロードキャストされる
print(a + b)
# tensor([[11, 22, 33],
# [14, 25, 36]])
# 列ベクトル + 行ベクトル
col = torch.tensor([[1], [2], [3]]) # [3, 1]
row = torch.tensor([10, 20, 30]) # [3]
print(col + row) # [3, 3] 外積和
インデックスとスライス
t = torch.arange(24).reshape(2, 3, 4).float()
print(t[0]) # 最初の行列 [3, 4]
print(t[0, 1]) # 2行目 [4]
print(t[0, 1, 2]) # スカラー
print(t[:, 1:, :2]) # スライス
# ファンシーインデックス
indices = torch.tensor([0, 2])
print(t[:, indices, :])
# ブールマスク
mask = t > 10
print(t[mask]) # 10より大きい要素の1Dテンソル
# torch.where
a = torch.tensor([1.0, 2.0, 3.0, 4.0])
b = torch.tensor([10.0, 20.0, 30.0, 40.0])
print(torch.where(a > 2, b, a)) # tensor([ 1., 2., 30., 40.])
3. 自動微分(Autograd)
Autogradは計算グラフを自動的に構築し、逆伝播による勾配を計算します。これはすべてのニューラルネットワーク訓練の基盤となるエンジンです。
requires_gradと計算グラフ
import torch
x = torch.tensor(3.0, requires_grad=True)
y = torch.tensor(4.0, requires_grad=True)
z = x ** 2 + 2 * x * y + y ** 2 # (x + y)^2 = 49
z.backward()
# dz/dx = 2x + 2y = 2*3 + 2*4 = 14
print(f"dz/dx = {x.grad}") # 14.0
print(f"dz/dy = {y.grad}") # 14.0
多次元テンソルの勾配
x = torch.tensor([1.0, 2.0, 3.0], requires_grad=True)
y = x ** 2
z = y.sum()
z.backward()
print(f"x.grad: {x.grad}") # [2, 4, 6] (dz/dx = 2x)
# gradient引数を使った非スカラーのbackward
x = torch.tensor([1.0, 2.0, 3.0], requires_grad=True)
y = x ** 2
grad_output = torch.ones(3)
y.backward(gradient=grad_output)
print(f"x.grad: {x.grad}") # [2, 4, 6]
勾配の制御
x = torch.tensor(2.0, requires_grad=True)
for i in range(3):
y = x ** 2
y.backward()
print(f"iteration {i}: x.grad = {x.grad}")
x.grad.zero_() # 重要: ステップごとに勾配をリセット
# no_grad: 推論時に勾配追跡を無効化
with torch.no_grad():
y = x ** 2
print(f"y.requires_grad: {y.requires_grad}") # False
# detach: テンソルをグラフから切り離す
x = torch.tensor([1.0, 2.0], requires_grad=True)
z = (x * 2).detach()
print(f"z.requires_grad: {z.requires_grad}") # False
高次微分
x = torch.tensor(3.0, requires_grad=True)
y = x ** 4
# 1階微分: dy/dx = 4x^3
dy_dx = torch.autograd.grad(y, x, create_graph=True)[0]
print(f"First derivative: {dy_dx}") # 108
# 2階微分: d2y/dx2 = 12x^2
d2y_dx2 = torch.autograd.grad(dy_dx, x)[0]
print(f"Second derivative: {d2y_dx2}") # 108
4. nn.Module — ニューラルネットワークの基盤
torch.nn.ModuleはすべてのPyTorchモデルの基底クラスです。すべての層、活性化関数、および完全なモデルはこれを継承します。
公式ドキュメント: https://pytorch.org/docs/stable/nn.html
import torch
import torch.nn as nn
class SimpleModel(nn.Module):
def __init__(self, in_features, hidden_size, out_features):
super().__init__()
# 層は自動的にパラメータとして登録される
self.fc1 = nn.Linear(in_features, hidden_size)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(hidden_size, out_features)
self.dropout = nn.Dropout(p=0.5)
def forward(self, x):
x = self.fc1(x)
x = self.relu(x)
x = self.dropout(x)
x = self.fc2(x)
return x
model = SimpleModel(784, 256, 10)
print(model)
# パラメータ数のカウント
total = sum(p.numel() for p in model.parameters())
trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total params: {total:,}")
print(f"Trainable params: {trainable:,}")
# 名前付きパラメータのイテレート
for name, param in model.named_parameters():
print(f"{name}: {param.shape}")
# フォワードパス
x = torch.randn(32, 784)
output = model(x)
print(f"Output shape: {output.shape}") # [32, 10]
Sequential、ModuleList、ModuleDict
# Sequential: 順番に層をスタック
seq_model = nn.Sequential(
nn.Linear(784, 256),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(256, 128),
nn.ReLU(),
nn.Linear(128, 10)
)
# ModuleList: 層をリストとして管理
class ResidualNet(nn.Module):
def __init__(self, num_blocks, hidden_size):
super().__init__()
self.layers = nn.ModuleList([
nn.Linear(hidden_size, hidden_size)
for _ in range(num_blocks)
])
self.relu = nn.ReLU()
def forward(self, x):
for layer in self.layers:
x = self.relu(layer(x)) + x # 残差接続
return x
# ModuleDict: 層を辞書として管理
class MultiTaskModel(nn.Module):
def __init__(self):
super().__init__()
self.backbone = nn.Linear(784, 256)
self.heads = nn.ModuleDict({
'classification': nn.Linear(256, 10),
'regression': nn.Linear(256, 1)
})
def forward(self, x, task='classification'):
features = torch.relu(self.backbone(x))
return self.heads[task](features)
5. 線形回帰をゼロから実装
線形回帰は最もシンプルなディープラーニングモデルです。ゼロから構築することでトレーニングループの理解が深まります。
import torch
import torch.nn as nn
import torch.optim as optim
torch.manual_seed(42)
n_samples = 200
# 合成データの生成: y = 3x + 2 + ノイズ
X = torch.linspace(-5, 5, n_samples).unsqueeze(1)
y_true = 3 * X + 2
y = y_true + torch.randn_like(y_true) * 0.5
class LinearRegression(nn.Module):
def __init__(self):
super().__init__()
self.linear = nn.Linear(1, 1)
def forward(self, x):
return self.linear(x)
model = LinearRegression()
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)
n_epochs = 1000
losses = []
for epoch in range(n_epochs):
# 1. フォワードパス
y_pred = model(X)
# 2. 損失の計算
loss = criterion(y_pred, y)
losses.append(loss.item())
# 3. 勾配のゼロ化(重要!)
optimizer.zero_grad()
# 4. バックワードパス
loss.backward()
# 5. パラメータの更新
optimizer.step()
if (epoch + 1) % 200 == 0:
w = model.linear.weight.item()
b = model.linear.bias.item()
print(f"Epoch {epoch+1}: Loss={loss.item():.4f}, w={w:.4f}, b={b:.4f}")
print(f"\nLearned weight: {model.linear.weight.item():.4f} (true: 3.0)")
print(f"Learned bias: {model.linear.bias.item():.4f} (true: 2.0)")
6. 多層パーセプトロン(MLP)— MNIST分類
MNISTの手書き数字データセットで完全な分類モデルを構築します。
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
BATCH_SIZE = 64
LEARNING_RATE = 0.001
N_EPOCHS = 10
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])
train_dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
test_dataset = datasets.MNIST('./data', train=False, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)
class MLP(nn.Module):
def __init__(self):
super().__init__()
self.network = nn.Sequential(
nn.Flatten(),
nn.Linear(784, 512),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(512, 256),
nn.BatchNorm1d(256),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(256, 10)
)
def forward(self, x):
return self.network(x)
model = MLP().to(DEVICE)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
def train_epoch(model, loader, criterion, optimizer, device):
model.train()
total_loss = 0
correct = 0
total = 0
for data, target in loader:
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
total_loss += loss.item()
pred = output.argmax(dim=1)
correct += pred.eq(target).sum().item()
total += target.size(0)
return total_loss / len(loader), 100.0 * correct / total
def evaluate(model, loader, criterion, device):
model.eval()
total_loss = 0
correct = 0
total = 0
with torch.no_grad():
for data, target in loader:
data, target = data.to(device), target.to(device)
output = model(data)
total_loss += criterion(output, target).item()
pred = output.argmax(dim=1)
correct += pred.eq(target).sum().item()
total += target.size(0)
return total_loss / len(loader), 100.0 * correct / total
for epoch in range(N_EPOCHS):
train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, DEVICE)
test_loss, test_acc = evaluate(model, test_loader, criterion, DEVICE)
print(f"Epoch {epoch+1}/{N_EPOCHS} | "
f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}% | "
f"Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.2f}%")
7. 畳み込みニューラルネットワーク(CNN)— CIFAR-10分類
CIFAR-10での画像分類のためのVGGスタイルCNNの実装。
import torch
import torch.nn as nn
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
transform_train = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ColorJitter(brightness=0.2, contrast=0.2),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465),
(0.2023, 0.1994, 0.2010))
])
transform_test = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465),
(0.2023, 0.1994, 0.2010))
])
train_data = datasets.CIFAR10('./data', train=True, download=True, transform=transform_train)
test_data = datasets.CIFAR10('./data', train=False, transform=transform_test)
train_loader = DataLoader(train_data, batch_size=128, shuffle=True, num_workers=4)
test_loader = DataLoader(test_data, batch_size=128, shuffle=False, num_workers=4)
CLASSES = ['airplane', 'automobile', 'bird', 'cat', 'deer',
'dog', 'frog', 'horse', 'ship', 'truck']
class CNN(nn.Module):
def __init__(self, num_classes=10):
super().__init__()
self.features = nn.Sequential(
# ブロック1: 3 → 64, 32x32 → 16x16
nn.Conv2d(3, 64, kernel_size=3, padding=1),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True),
nn.Conv2d(64, 64, kernel_size=3, padding=1),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True),
nn.MaxPool2d(2, 2),
nn.Dropout2d(0.1),
# ブロック2: 64 → 128, 16x16 → 8x8
nn.Conv2d(64, 128, kernel_size=3, padding=1),
nn.BatchNorm2d(128),
nn.ReLU(inplace=True),
nn.Conv2d(128, 128, kernel_size=3, padding=1),
nn.BatchNorm2d(128),
nn.ReLU(inplace=True),
nn.MaxPool2d(2, 2),
nn.Dropout2d(0.2),
# ブロック3: 128 → 256, 8x8 → 4x4
nn.Conv2d(128, 256, kernel_size=3, padding=1),
nn.BatchNorm2d(256),
nn.ReLU(inplace=True),
nn.Conv2d(256, 256, kernel_size=3, padding=1),
nn.BatchNorm2d(256),
nn.ReLU(inplace=True),
nn.MaxPool2d(2, 2),
)
self.classifier = nn.Sequential(
nn.Flatten(),
nn.Linear(256 * 4 * 4, 1024),
nn.ReLU(inplace=True),
nn.Dropout(0.5),
nn.Linear(1024, 512),
nn.ReLU(inplace=True),
nn.Dropout(0.3),
nn.Linear(512, num_classes)
)
def forward(self, x):
x = self.features(x)
x = self.classifier(x)
return x
model = CNN().to(DEVICE)
print(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}")
8. 再帰ニューラルネットワーク(RNN / LSTM)— 系列データ
RNNとLSTMは時系列データと自然言語処理タスクに優れています。
import torch
import torch.nn as nn
import numpy as np
class LSTMPredictor(nn.Module):
def __init__(self, input_size=1, hidden_size=64, num_layers=2,
output_size=1, dropout=0.2):
super().__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(
input_size = input_size,
hidden_size = hidden_size,
num_layers = num_layers,
batch_first = True, # 入力: [batch, seq_len, features]
dropout = dropout if num_layers > 1 else 0,
bidirectional = False
)
self.fc = nn.Sequential(
nn.Linear(hidden_size, 32),
nn.ReLU(),
nn.Linear(32, output_size)
)
def forward(self, x):
batch_size = x.size(0)
h0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(x.device)
c0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(x.device)
# out: [batch_size, seq_len, hidden_size]
out, (hn, cn) = self.lstm(x, (h0, c0))
# 最後のタイムステップのみ使用
out = self.fc(out[:, -1, :])
return out
# サイン波データセットの生成
t = np.linspace(0, 100, 1000)
data = np.sin(0.5 * t) + 0.1 * np.random.randn(1000)
data = torch.FloatTensor(data).unsqueeze(1)
def create_sequences(data, seq_len=50):
X, y = [], []
for i in range(len(data) - seq_len):
X.append(data[i:i+seq_len])
y.append(data[i+seq_len])
return torch.stack(X), torch.stack(y)
X, y = create_sequences(data, seq_len=50)
print(f"X shape: {X.shape}") # [950, 50, 1]
print(f"y shape: {y.shape}") # [950, 1]
# GRU — LSTMよりパラメータが少ない
class GRUPredictor(nn.Module):
def __init__(self, input_size=1, hidden_size=64, num_layers=2):
super().__init__()
self.gru = nn.GRU(input_size, hidden_size, num_layers,
batch_first=True, dropout=0.2)
self.fc = nn.Linear(hidden_size, 1)
def forward(self, x):
out, _ = self.gru(x)
return self.fc(out[:, -1, :])
9. Transformer — マルチヘッドアテンションをゼロから実装
「Attention Is All You Need」論文の主要コンポーネントを実装します。
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
class MultiHeadAttention(nn.Module):
def __init__(self, d_model=512, num_heads=8, dropout=0.1):
super().__init__()
assert d_model % num_heads == 0
self.d_model = d_model
self.num_heads = num_heads
self.d_k = d_model // num_heads
self.W_q = nn.Linear(d_model, d_model)
self.W_k = nn.Linear(d_model, d_model)
self.W_v = nn.Linear(d_model, d_model)
self.W_o = nn.Linear(d_model, d_model)
self.dropout = nn.Dropout(dropout)
self.scale = math.sqrt(self.d_k)
def split_heads(self, x):
# [batch, seq, d_model] → [batch, num_heads, seq, d_k]
batch, seq, _ = x.shape
x = x.view(batch, seq, self.num_heads, self.d_k)
return x.transpose(1, 2)
def forward(self, query, key, value, mask=None):
batch_size = query.size(0)
Q = self.split_heads(self.W_q(query))
K = self.split_heads(self.W_k(key))
V = self.split_heads(self.W_v(value))
scores = torch.matmul(Q, K.transpose(-2, -1)) / self.scale
if mask is not None:
scores = scores.masked_fill(mask == 0, float('-inf'))
attn_weights = F.softmax(scores, dim=-1)
attn_weights = self.dropout(attn_weights)
context = torch.matmul(attn_weights, V)
context = context.transpose(1, 2).contiguous()
context = context.view(batch_size, -1, self.d_model)
output = self.W_o(context)
return output, attn_weights
class FeedForward(nn.Module):
def __init__(self, d_model=512, d_ff=2048, dropout=0.1):
super().__init__()
self.net = nn.Sequential(
nn.Linear(d_model, d_ff),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(d_ff, d_model)
)
def forward(self, x):
return self.net(x)
class TransformerEncoderLayer(nn.Module):
def __init__(self, d_model=512, num_heads=8, d_ff=2048, dropout=0.1):
super().__init__()
self.self_attn = MultiHeadAttention(d_model, num_heads, dropout)
self.ff = FeedForward(d_model, d_ff, dropout)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask=None):
attn_out, _ = self.self_attn(x, x, x, mask)
x = self.norm1(x + self.dropout(attn_out))
ff_out = self.ff(x)
x = self.norm2(x + self.dropout(ff_out))
return x
class PositionalEncoding(nn.Module):
def __init__(self, d_model=512, max_len=5000, dropout=0.1):
super().__init__()
self.dropout = nn.Dropout(dropout)
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len).unsqueeze(1).float()
div_term = torch.exp(
torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)
)
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)
def forward(self, x):
x = x + self.pe[:, :x.size(1)]
return self.dropout(x)
# 使用例
d_model = 512
encoder_layer = TransformerEncoderLayer(d_model=d_model, num_heads=8)
pos_enc = PositionalEncoding(d_model=d_model)
x = torch.randn(2, 10, d_model)
x = pos_enc(x)
output = encoder_layer(x)
print(f"Transformer Encoder output: {output.shape}") # [2, 10, 512]
10. データ読み込み — DatasetとDataLoader
効率的なデータパイプラインは訓練速度と柔軟性に直結します。
公式チュートリアル: https://pytorch.org/tutorials/beginner/basics/intro.html
import torch
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
from PIL import Image
import os
# カスタム画像Dataset
class CustomImageDataset(Dataset):
def __init__(self, csv_file, img_dir, transform=None):
self.annotations = pd.read_csv(csv_file)
self.img_dir = img_dir
self.transform = transform
def __len__(self):
return len(self.annotations)
def __getitem__(self, idx):
img_path = os.path.join(self.img_dir, self.annotations.iloc[idx, 0])
image = Image.open(img_path).convert('RGB')
label = int(self.annotations.iloc[idx, 1])
if self.transform:
image = self.transform(image)
return image, label
# 表形式データセット
class TabularDataset(Dataset):
def __init__(self, X, y):
self.X = torch.FloatTensor(X)
self.y = torch.LongTensor(y)
def __len__(self):
return len(self.X)
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
dataset = TabularDataset(
X=np.random.randn(1000, 20),
y=np.random.randint(0, 5, 1000)
)
# 高度なDataLoaderの設定
advanced_loader = DataLoader(
dataset,
batch_size = 64,
shuffle = True,
num_workers = 4, # 並列データ読み込み
pin_memory = True, # GPU転送の高速化
drop_last = True, # 不完全な最後のバッチを破棄
prefetch_factor = 2,
persistent_workers = True
)
for batch_X, batch_y in advanced_loader:
print(f"batch X: {batch_X.shape}") # [64, 20]
print(f"batch y: {batch_y.shape}") # [64]
break
# WeightedRandomSampler: クラス不均衡への対処
from torch.utils.data import WeightedRandomSampler
class_counts = [800, 150, 50]
weights = 1.0 / torch.tensor(class_counts, dtype=torch.float)
sample_weights = weights[dataset.y]
sampler = WeightedRandomSampler(
weights = sample_weights,
num_samples = len(dataset),
replacement = True
)
balanced_loader = DataLoader(dataset, batch_size=32, sampler=sampler)
11. オプティマイザー — SGD、Adam、AdamW
import torch.optim as optim
model = nn.Linear(100, 10)
# SGD(モメンタムと重み減衰付き)
sgd = optim.SGD(
model.parameters(),
lr = 0.01,
momentum = 0.9,
weight_decay = 1e-4,
nesterov = True
)
# Adam: 適応学習率
adam = optim.Adam(
model.parameters(),
lr = 0.001,
betas = (0.9, 0.999),
eps = 1e-8,
weight_decay = 0
)
# AdamW: 正しい分離した重み減衰(Transformerに推奨)
adamw = optim.AdamW(
model.parameters(),
lr = 1e-3,
betas = (0.9, 0.999),
weight_decay = 0.01
)
# パラメータごとの学習率(転移学習に有用)
optimizer = optim.Adam([
{'params': model.features.parameters(), 'lr': 1e-4},
{'params': model.classifier.parameters(), 'lr': 1e-3},
], lr=1e-3)
# オプティマイザーの状態の保存と復元
checkpoint = {
'model': model.state_dict(),
'optimizer': optimizer.state_dict(),
'epoch': 10
}
torch.save(checkpoint, 'checkpoint.pt')
ckpt = torch.load('checkpoint.pt')
model.load_state_dict(ckpt['model'])
optimizer.load_state_dict(ckpt['optimizer'])
12. 学習率スケジューラー
スケジューラーを使うと、固定学習率と比べてほぼ常に最終性能が向上します。
from torch.optim.lr_scheduler import (
StepLR, CosineAnnealingLR, OneCycleLR, ReduceLROnPlateau,
CosineAnnealingWarmRestarts
)
model = nn.Linear(10, 2)
optimizer = optim.SGD(model.parameters(), lr=0.1)
# StepLR: step_sizeエポックごとにLRをgamma倍する
step_scheduler = StepLR(optimizer, step_size=30, gamma=0.1)
# CosineAnnealingLR: コサイン減衰
cosine_scheduler = CosineAnnealingLR(optimizer, T_max=100, eta_min=1e-6)
# ReduceLROnPlateau: メトリクスの改善が止まったら減少
plateau_scheduler = ReduceLROnPlateau(
optimizer,
mode = 'min',
factor = 0.5,
patience = 10,
min_lr = 1e-7,
verbose = True
)
# OneCycleLR: 超収束
one_cycle = OneCycleLR(
optimizer,
max_lr = 0.01,
steps_per_epoch = 100,
epochs = 30,
pct_start = 0.3,
anneal_strategy = 'cos'
)
# CosineAnnealingWarmRestarts: 周期的リスタート
warm_restart = CosineAnnealingWarmRestarts(
optimizer,
T_0 = 10,
T_mult = 2,
eta_min = 1e-6
)
# トレーニングループでの使用
for epoch in range(100):
train_loss = 0.5 # 実際のトレーニングから
cosine_scheduler.step()
plateau_scheduler.step(train_loss) # メトリクスを渡す
print(f"Epoch {epoch+1}: LR = {optimizer.param_groups[0]['lr']:.6f}")
13. 正則化 — Dropout、BatchNorm、LayerNorm
正則化は過学習を防ぎ、トレーニングを安定させます。
import torch.nn as nn
# Dropout: トレーニング中にランダムにニューロンをゼロにする
class DropoutDemo(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(100, 50)
self.dropout = nn.Dropout(p=0.5)
self.fc2 = nn.Linear(50, 10)
def forward(self, x):
x = torch.relu(self.fc1(x))
x = self.dropout(x) # train()中は有効、eval()中は無効
return self.fc2(x)
# BatchNorm1d: バッチ次元で正規化(全結合層用)
bn_model = nn.Sequential(
nn.Linear(100, 64),
nn.BatchNorm1d(64),
nn.ReLU(),
nn.Linear(64, 32),
nn.BatchNorm1d(32),
nn.ReLU(),
nn.Linear(32, 10)
)
# BatchNorm2d: 2D特徴マップ用(Conv層の後)
cnn_bn = nn.Sequential(
nn.Conv2d(3, 32, 3, padding=1),
nn.BatchNorm2d(32),
nn.ReLU(),
)
# LayerNorm: 特徴次元で正規化(Transformerに推奨)
transformer_norm = nn.Sequential(
nn.Linear(512, 512),
nn.LayerNorm(512),
nn.ReLU()
)
# GroupNorm: BatchNormとLayerNormの中間
group_norm = nn.GroupNorm(num_groups=8, num_channels=64)
# InstanceNorm: スタイル転送で使用
instance_norm = nn.InstanceNorm2d(64)
# まとめ:
# BatchNorm → CNN、バッチレベルの統計、バッチサイズに依存
# LayerNorm → Transformer / RNN、特徴レベルの統計
# GroupNorm → BatchNormが不安定な小さいバッチ
# InstanceNorm → スタイル転送、画像生成
14. 転移学習
ImageNetで事前学習されたモデルを活用して、限られたデータで高い性能を実現します。
import torchvision.models as models
import torch.nn as nn
# 事前学習済みモデルの読み込み
resnet50 = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V2)
efficientnet = models.efficientnet_b0(weights=models.EfficientNet_B0_Weights.IMAGENET1K_V1)
vit = models.vit_b_16(weights=models.ViT_B_16_Weights.IMAGENET1K_V1)
# 戦略1: 特徴抽出器(バックボーンを凍結)
for param in resnet50.parameters():
param.requires_grad = False
num_classes = 5
resnet50.fc = nn.Linear(resnet50.fc.in_features, num_classes)
trainable = sum(p.numel() for p in resnet50.parameters() if p.requires_grad)
print(f"Trainable params: {trainable:,}") # ~2,050
# 戦略2: 層ごとの学習率でファインチューニング
resnet_ft = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V2)
resnet_ft.fc = nn.Linear(resnet_ft.fc.in_features, num_classes)
optimizer = torch.optim.AdamW([
{'params': resnet_ft.layer1.parameters(), 'lr': 1e-5},
{'params': resnet_ft.layer2.parameters(), 'lr': 1e-5},
{'params': resnet_ft.layer3.parameters(), 'lr': 1e-4},
{'params': resnet_ft.layer4.parameters(), 'lr': 1e-4},
{'params': resnet_ft.fc.parameters(), 'lr': 1e-3},
], lr=1e-4, weight_decay=0.01)
# 前処理用のImageNet正規化
from torchvision import transforms
preprocess = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std =[0.229, 0.224, 0.225])
])
15. モデルの保存と読み込み
import torch
import torch.nn as nn
model = nn.Linear(10, 5)
optimizer = torch.optim.Adam(model.parameters())
# オプション1: state_dict(推奨)
torch.save(model.state_dict(), 'model_weights.pt')
loaded_model = nn.Linear(10, 5)
loaded_model.load_state_dict(torch.load('model_weights.pt', weights_only=True))
loaded_model.eval()
# オプション2: フルモデル(非推奨 — 移植性が低い)
torch.save(model, 'full_model.pt')
# オプション3: チェックポイント — 完全なトレーニング状態を保存
def save_checkpoint(model, optimizer, scheduler, epoch, loss, path):
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'scheduler_state_dict': scheduler.state_dict() if scheduler else None,
'loss': loss,
}, path)
def load_checkpoint(path, model, optimizer=None, scheduler=None):
ckpt = torch.load(path, map_location='cpu', weights_only=True)
model.load_state_dict(ckpt['model_state_dict'])
if optimizer:
optimizer.load_state_dict(ckpt['optimizer_state_dict'])
if scheduler and ckpt['scheduler_state_dict']:
scheduler.load_state_dict(ckpt['scheduler_state_dict'])
return ckpt['epoch'], ckpt['loss']
# GPUモデルをCPUに読み込む
model_cpu = nn.Linear(10, 5)
model_cpu.load_state_dict(
torch.load('model_weights.pt', map_location='cpu', weights_only=True)
)
16. TorchScriptとモデルのデプロイ
訓練済みモデルを本番環境にデプロイします。
import torch
import torch.nn as nn
class SimpleNet(nn.Module):
def __init__(self):
super().__init__()
self.fc = nn.Linear(10, 5)
def forward(self, x):
return torch.relu(self.fc(x))
model = SimpleNet()
model.eval()
# オプション1: torch.jit.script — モデル全体をコンパイル
scripted_model = torch.jit.script(model)
scripted_model.save('model_scripted.pt')
loaded_scripted = torch.jit.load('model_scripted.pt')
x = torch.randn(4, 10)
with torch.no_grad():
out = loaded_scripted(x)
print(f"TorchScript output: {out.shape}")
# オプション2: torch.jit.trace — 入力例でトレース
example_input = torch.randn(1, 10)
traced_model = torch.jit.trace(model, example_input)
traced_model.save('model_traced.pt')
# オプション3: ONNXエクスポート(クロスフレームワーク互換性)
dummy_input = torch.randn(1, 10)
torch.onnx.export(
model,
dummy_input,
'model.onnx',
export_params = True,
opset_version = 17,
input_names = ['input'],
output_names = ['output'],
dynamic_axes = {
'input': {0: 'batch_size'},
'output': {0: 'batch_size'}
}
)
print("ONNX export complete")
# オプション4: torch.compile (PyTorch 2.0+)
compiled_model = torch.compile(model)
out = compiled_model(x)
print(f"torch.compile output: {out.shape}")
17. 分散学習(DDP)— DistributedDataParallel
複数のGPUを使ってトレーニングを大幅に高速化します。
公式チュートリアル: https://pytorch.org/tutorials/intermediate/ddp_tutorial.html
# train_ddp.py — スタンドアロンスクリプトとして実行
import os
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler
from torchvision import datasets, transforms
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
dist.init_process_group(
backend = 'nccl',
rank = rank,
world_size = world_size
)
def cleanup():
dist.destroy_process_group()
class SimpleModel(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Linear(784, 256),
nn.ReLU(),
nn.Linear(256, 10)
)
def forward(self, x):
return self.net(x.view(x.size(0), -1))
def train(rank, world_size, num_epochs=5):
print(f"Process {rank}/{world_size} starting")
setup(rank, world_size)
torch.cuda.set_device(rank)
device = torch.device(f'cuda:{rank}')
# DDPでモデルをラップ
model = SimpleModel().to(device)
ddp_model = DDP(model, device_ids=[rank])
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])
dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
# DistributedSamplerで各プロセスに固有のデータシャードを割り当て
sampler = DistributedSampler(
dataset,
num_replicas = world_size,
rank = rank,
shuffle = True
)
loader = DataLoader(
dataset,
batch_size = 128,
sampler = sampler,
num_workers = 4,
pin_memory = True
)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(ddp_model.parameters(), lr=0.001)
for epoch in range(num_epochs):
sampler.set_epoch(epoch) # エポックごとに異なるシャッフル
ddp_model.train()
total_loss = 0.0
for data, target in loader:
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = ddp_model(data)
loss = criterion(output, target)
loss.backward() # 勾配は自動的にall-reduceされる
optimizer.step()
total_loss += loss.item()
if rank == 0:
avg_loss = total_loss / len(loader)
print(f"Epoch {epoch+1}: avg loss = {avg_loss:.4f}")
cleanup()
if __name__ == '__main__':
import torch.multiprocessing as mp
world_size = torch.cuda.device_count()
mp.spawn(train, args=(world_size, 5), nprocs=world_size, join=True)
torchrunでの起動
# シングルノード、4 GPU
torchrun --nproc_per_node=4 train_ddp.py
# マルチノード(2ノードのうちノード0)
torchrun --nnodes=2 --nproc_per_node=4 \
--node_rank=0 \
--master_addr="192.168.1.100" \
--master_port=12355 \
train_ddp.py
DataParallelとDistributedDataParallelの比較
# DataParallel (DP): シンプルだが非効率
# - すべての勾配がGPU 0を通る → ボトルネック
# - マルチスレッド(マルチプロセスではない)
model_dp = nn.DataParallel(model, device_ids=[0, 1, 2, 3])
# DistributedDataParallel (DDP): 推奨
# - 各GPUが独立して勾配を計算
# - 効率的なall-reduce同期
# - シングルGPUでもDPより高速(Python GILを回避)
model_ddp = DDP(model, device_ids=[rank])
18. 高度なテクニック
混合精度トレーニング
from torch.cuda.amp import autocast, GradScaler
model = SimpleModel().to('cuda')
optimizer = torch.optim.Adam(model.parameters())
scaler = GradScaler()
for data, target in train_loader:
data, target = data.to('cuda'), target.to('cuda')
optimizer.zero_grad()
# FP16でのフォワードパス
with autocast():
output = model(data)
loss = criterion(output, target)
# スケールされたバックワードパス
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
勾配クリッピング
# 勾配爆発を防ぐ
max_grad_norm = 1.0
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)
optimizer.step()
再現性
import random
import numpy as np
def set_seed(seed=42):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
set_seed(42)
おわりに
このガイドでは、PyTorchのコアコンセプトをゼロから、本番環境での分散学習まで網羅しました。推奨学習ロードマップ:
- 基礎: テンソル操作、Autograd、シンプルなモデル実装
- 中級: CNN、RNN、転移学習、DataLoaderの最適化
- 上級: Transformer、DDP、混合精度トレーニング
- デプロイ: TorchScript、ONNX、torch.compile
PyTorchエコシステムは継続的に進化しています。最新の機能とアップデートについては、公式ドキュメントとPyTorchブログをご確認ください。