- Authors

- Name
- Youngju Kim
- @fjvbn20031
目次
- PyTorch 内部構造: ATen と Tensor 層
- Autograd エンジンと計算グラフ
- カスタム演算の実装
- torch.compile() と TorchInductor
- メモリ最適化技術
- 分散学習: DDP と FSDP
- 推論最適化
- デバッグツール
- クイズ
PyTorch 内部構造
ATen ライブラリ
PyTorch の核心は ATen (A Tensor Library) です。C++ ベースのテンソル演算ライブラリで、すべての PyTorch 演算の基盤実装です。
Python API (torch.*)
↓
TorchDispatch / Dispatcher
↓
ATen (C++ テンソル演算)
↓
CUDA / CPU / MPS バックエンド
ATen の主要コンポーネント:
- Tensor: storage、dtype、device、stride 情報を保持する多次元配列
- Storage: 実際のメモリブロック (テンソル間で共有可能)
- Dispatcher: 演算を適切なバックエンドへルーティング
import torch
x = torch.randn(3, 4)
print(x.storage()) # 実メモリブロック
print(x.stride()) # (4, 1) - 行優先レイアウト
print(x.storage_offset()) # 0
# View は storage を共有
y = x.view(2, 6)
print(x.storage().data_ptr() == y.storage().data_ptr()) # True
TorchDispatch
TorchDispatch は Python レベルで PyTorch 演算をインターセプトするメカニズムです。カスタムテンソル型の実装に活用されます。
import torch
from torch.utils._pytree import tree_map
class LoggingTensor(torch.Tensor):
@staticmethod
def __new__(cls, elem):
return torch.Tensor._make_subclass(cls, elem)
@classmethod
def __torch_dispatch__(cls, func, types, args=(), kwargs=None):
print(f"Calling: {func.__name__}")
kwargs = kwargs or {}
return func(*args, **kwargs)
x = LoggingTensor(torch.randn(3, 3))
y = x + x # 出力: Calling: add.Tensor
Autograd エンジン
計算グラフ (DAG)
PyTorch autograd は動的計算グラフ (Dynamic Computational Graph) を使用します。演算が実行されるたびに DAG (Directed Acyclic Graph) が構築されます。
import torch
x = torch.tensor(2.0, requires_grad=True) # leaf テンソル
y = x ** 2 # non-leaf、grad_fn=PowBackward0
z = y * 3 # non-leaf、grad_fn=MulBackward0
print(x.is_leaf) # True
print(y.is_leaf) # False
print(z.grad_fn) # MulBackward0
z.backward()
print(x.grad) # dz/dx = 3 * 2x = 12.0
Leaf テンソル vs Non-leaf テンソル:
- Leaf テンソル:
requires_grad=Trueでユーザーが直接作成したテンソル。.gradに勾配が累積される - Non-leaf テンソル: 演算の結果として生成されたテンソル。デフォルトでは
.gradが None (.retain_grad()の呼び出しが必要)
勾配累積メカニズム
import torch
model = torch.nn.Linear(10, 1)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
# 大きなバッチサイズをシミュレートする勾配累積
ACCUMULATION_STEPS = 4
for i, (x, y) in enumerate(dataloader):
output = model(x)
loss = criterion(output, y) / ACCUMULATION_STEPS
loss.backward() # .grad に勾配が累積
if (i + 1) % ACCUMULATION_STEPS == 0:
optimizer.step()
optimizer.zero_grad() # 累積勾配をリセット
retain_graph と create_graph
x = torch.tensor(3.0, requires_grad=True)
y = x ** 3
# 高階微分: create_graph=True で勾配グラフを保持
grad_1 = torch.autograd.grad(y, x, create_graph=True)[0]
grad_2 = torch.autograd.grad(grad_1, x)[0]
print(grad_1) # 3x^2 = 27.0
print(grad_2) # 6x = 18.0
カスタム演算の実装
torch.autograd.Function
カスタムの forward/backward を定義する際に使用します。
import torch
class SigmoidFunction(torch.autograd.Function):
@staticmethod
def forward(ctx, x):
# backward で必要な値を ctx に保存
output = 1 / (1 + torch.exp(-x))
ctx.save_for_backward(output)
return output
@staticmethod
def backward(ctx, grad_output):
(output,) = ctx.saved_tensors
# sigmoid の微分: sigma(x) * (1 - sigma(x))
grad_input = grad_output * output * (1 - output)
return grad_input
# 使用例
x = torch.randn(4, requires_grad=True)
y = SigmoidFunction.apply(x)
y.sum().backward()
print(x.grad)
torch.library API (カスタム演算の登録)
import torch
from torch.library import Library, impl
my_lib = Library("my_ops", "DEF")
my_lib.define("relu_squared(Tensor x) -> Tensor")
@impl(my_lib, "relu_squared", "CPU")
def relu_squared_cpu(x):
return torch.relu(x) ** 2
@impl(my_lib, "relu_squared", "CUDA")
def relu_squared_cuda(x):
return torch.relu(x) ** 2
# カスタム演算を使用
x = torch.randn(5)
result = torch.ops.my_ops.relu_squared(x)
Triton を使ったカスタム CUDA カーネル
import triton
import triton.language as tl
import torch
@triton.jit
def relu_squared_kernel(
x_ptr, out_ptr,
n_elements,
BLOCK_SIZE: tl.constexpr,
):
pid = tl.program_id(0)
offsets = pid * BLOCK_SIZE + tl.arange(0, BLOCK_SIZE)
mask = offsets < n_elements
x = tl.load(x_ptr + offsets, mask=mask)
relu_x = tl.where(x > 0, x, 0.0)
out = relu_x * relu_x
tl.store(out_ptr + offsets, out, mask=mask)
def relu_squared_triton(x: torch.Tensor):
out = torch.empty_like(x)
n_elements = x.numel()
BLOCK_SIZE = 1024
grid = (triton.cdiv(n_elements, BLOCK_SIZE),)
relu_squared_kernel[grid](x, out, n_elements, BLOCK_SIZE)
return out
torch.compile()
Dynamo とグラフキャプチャ
torch.compile() は Python バイトコードを解析して計算グラフを抽出し、最適化します。
import torch
def model_forward(x, weight):
x = torch.nn.functional.relu(x @ weight)
return x.sum()
# fullgraph=True はグラフブレークを禁止
compiled_fn = torch.compile(model_forward, fullgraph=True, backend="inductor")
x = torch.randn(128, 256, device="cuda")
w = torch.randn(256, 512, device="cuda")
out = compiled_fn(x, w)
グラフブレークが発生する条件:
- テンソル値に依存する制御フロー (例:
if tensor.sum() > 0) - 外部ライブラリの呼び出し (numpy など)
- サポートされていない Python パターン
import torch._dynamo
torch._dynamo.config.verbose = True # グラフブレークのデバッグ
# fullgraph=False (デフォルト) はグラフブレークを許可
compiled = torch.compile(model, backend="inductor")
AOTAutograd と TorchInductor
torch.compile() パイプライン:
Python コード → Dynamo (グラフ抽出)
→ AOTAutograd (forward + backward の結合)
→ TorchInductor (カーネル生成)
→ Triton / C++ コード
import torch
import torch.nn as nn
class SimpleModel(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(512, 1024)
self.fc2 = nn.Linear(1024, 10)
def forward(self, x):
x = torch.relu(self.fc1(x))
return self.fc2(x)
model = SimpleModel().cuda()
# mode オプション: "default"、"reduce-overhead"、"max-autotune"
compiled_model = torch.compile(model, mode="max-autotune")
x = torch.randn(32, 512, device="cuda")
out = compiled_model(x)
メモリ最適化
Gradient Checkpointing
順伝播中の中間活性化を保存せず、逆伝播時に再計算してメモリを節約します。
import torch
import torch.utils.checkpoint as checkpoint
import torch.nn as nn
class CheckpointedBlock(nn.Module):
def __init__(self, dim):
super().__init__()
self.layers = nn.Sequential(
nn.Linear(dim, dim * 4),
nn.GELU(),
nn.Linear(dim * 4, dim),
)
def forward(self, x):
# 逆伝播時に layers を再実行してメモリを節約
return checkpoint.checkpoint(self.layers, x, use_reentrant=False)
model = nn.Sequential(*[CheckpointedBlock(512) for _ in range(24)]).cuda()
x = torch.randn(32, 512, device="cuda", requires_grad=True)
out = model(x)
out.sum().backward()
AMP (自動混合精度)
import torch
from torch.cuda.amp import autocast, GradScaler
model = SimpleModel().cuda()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
scaler = GradScaler() # FP16 アンダーフロー防止
for x, y in dataloader:
x, y = x.cuda(), y.cuda()
optimizer.zero_grad()
# autocast: 演算に応じて FP16/BF16 を自動適用
with autocast(dtype=torch.float16):
output = model(x)
loss = criterion(output, y)
# scaler: 損失をスケーリングして勾配を FP16 範囲内に保つ
scaler.scale(loss).backward()
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
scaler.step(optimizer)
scaler.update()
メモリプロファイラー
import torch
from torch.profiler import profile, ProfilerActivity, record_function
with profile(
activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA],
profile_memory=True,
record_shapes=True,
) as prof:
with record_function("model_inference"):
output = model(x)
print(prof.key_averages().table(
sort_by="cuda_memory_usage", row_limit=10
))
prof.export_chrome_trace("trace.json")
活性化のオフロード
# 活性化を CPU にオフロードして GPU メモリを解放
def offload_checkpoint(module, x):
"""活性化を CPU にオフロードし、逆伝播時に GPU に戻す"""
def forward_and_save(*inputs):
output = module(*inputs)
return output
return checkpoint.checkpoint(forward_and_save, x, use_reentrant=False)
分散学習
DDP (DistributedDataParallel)
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
import os
def setup(rank, world_size):
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "12355"
dist.init_process_group("nccl", rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()
def train(rank, world_size):
setup(rank, world_size)
torch.cuda.set_device(rank)
model = SimpleModel().cuda(rank)
ddp_model = DDP(model, device_ids=[rank])
optimizer = torch.optim.Adam(ddp_model.parameters())
for x, y in dataloader:
x, y = x.cuda(rank), y.cuda(rank)
output = ddp_model(x)
loss = criterion(output, y)
loss.backward() # gradient の all-reduce が自動実行
optimizer.step()
optimizer.zero_grad()
cleanup()
FSDP (Fully Sharded Data Parallel)
FSDP はパラメータ、勾配、オプティマイザの状態をすべての GPU に分散します。
import torch
from torch.distributed.fsdp import (
FullyShardedDataParallel as FSDP,
MixedPrecision,
ShardingStrategy,
)
from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy
import functools
# Transformer ブロック単位で FSDP を適用
auto_wrap_policy = functools.partial(
transformer_auto_wrap_policy,
transformer_layer_cls={nn.TransformerEncoderLayer},
)
# 混合精度ポリシー
mp_policy = MixedPrecision(
param_dtype=torch.bfloat16,
reduce_dtype=torch.float32,
buffer_dtype=torch.bfloat16,
)
model = LargeTransformer().cuda()
fsdp_model = FSDP(
model,
auto_wrap_policy=auto_wrap_policy,
mixed_precision=mp_policy,
sharding_strategy=ShardingStrategy.FULL_SHARD, # ZeRO-3 相当
)
optimizer = torch.optim.AdamW(fsdp_model.parameters(), lr=1e-4)
FSDP と DDP のメモリ比較:
DDP は各 GPU がモデル全体のコピーを保持します。FSDP はパラメータを world_size で分割し、各 GPU のメモリ使用量を約 1/N に削減します。
DeepSpeed ZeRO 統合
# deepspeed_config.json:
# {
# "zero_optimization": {"stage": 3},
# "fp16": {"enabled": true},
# "gradient_accumulation_steps": 4
# }
import deepspeed
model_engine, optimizer, _, _ = deepspeed.initialize(
model=model,
model_parameters=model.parameters(),
config="deepspeed_config.json"
)
for x, y in dataloader:
output = model_engine(x)
loss = criterion(output, y)
model_engine.backward(loss)
model_engine.step()
推論最適化
torch.export() と ONNX
import torch
from torch.export import export
model = SimpleModel().eval()
x = torch.randn(1, 512)
# torch.export: 静的計算グラフを抽出
exported = export(model, (x,))
print(exported.graph)
# ONNX エクスポート
torch.onnx.export(
model, x,
"model.onnx",
input_names=["input"],
output_names=["output"],
dynamic_axes={"input": {0: "batch_size"}},
opset_version=17,
)
量子化対応学習 (QAT)
import torch
from torch.quantization import get_default_qat_qconfig, prepare_qat, convert
model = SimpleModel()
model.qconfig = get_default_qat_qconfig("fbgemm")
# フェイク量子化を挿入
model_prepared = prepare_qat(model.train())
# 通常通り学習
for x, y in dataloader:
output = model_prepared(x)
loss = criterion(output, y)
loss.backward()
optimizer.step()
# INT8 モデルへ変換
model_int8 = convert(model_prepared.eval())
デバッグツール
torch.profiler
import torch
from torch.profiler import profile, ProfilerActivity, schedule
with profile(
activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA],
schedule=schedule(wait=1, warmup=1, active=3, repeat=2),
on_trace_ready=torch.profiler.tensorboard_trace_handler("./log"),
record_shapes=True,
profile_memory=True,
with_stack=True,
) as prof:
for step, (x, y) in enumerate(dataloader):
output = model(x.cuda())
loss = criterion(output, y.cuda())
loss.backward()
optimizer.step()
optimizer.zero_grad()
prof.step()
異常検知 (Anomaly Detection)
# NaN/Inf 勾配をスタックトレース付きで検知
with torch.autograd.detect_anomaly():
output = model(x)
loss = output.sum()
loss.backward() # NaN 発生時にスタックトレースを出力
grad_fn のトレース
def trace_grad_fn(tensor, depth=0):
if tensor.grad_fn is None:
print(" " * depth + f"Leaf: {tensor.shape}")
return
print(" " * depth + f"{tensor.grad_fn.__class__.__name__}: {tensor.shape}")
for inp, _ in tensor.grad_fn.next_functions:
if inp is not None:
trace_grad_fn(inp.variable if hasattr(inp, 'variable') else inp, depth + 1)
x = torch.randn(3, requires_grad=True)
y = torch.randn(3, requires_grad=True)
z = (x * y).sum()
trace_grad_fn(z)
クイズ
Q1. PyTorch autograd における leaf テンソルと non-leaf テンソルの違い、および勾配累積の仕組みは?
答え: leaf テンソルはユーザーが直接作成し requires_grad=True を持つテンソルです。.grad 属性に勾配が累積されます。
解説: non-leaf テンソルは演算の結果として生成されたテンソルで、デフォルトでは勾配が保存されません (メモリ節約のため)。retain_grad() を呼び出すと non-leaf テンソルの勾配も保持できます。勾配累積は、zero_grad() を呼ばずに backward() を複数回呼び出すと .grad に加算される仕組みを利用します。これを活用して GPU メモリを増やさずに実効バッチサイズを仮想的に大きくできます。
Q2. torch.compile() の Dynamo が Python バイトコードをトレースする方法と、グラフブレークの発生条件は?
答え: Dynamo は PEP 523 の API を用いて Python フレーム評価をインターセプトし、バイトコードをシンボリックにトレースします。サポートされないパターンでグラフブレークが発生します。
解説: Dynamo は CPython のフレーム評価フックを使用してバイトコードを追跡します。グラフブレークは次の場合に発生します: テンソル値に依存する制御フロー (例: if tensor.sum() > 0)、サポートされていない外部ライブラリの呼び出し、C 拡張の使用など。各ブレーク地点で Dynamo はそこまでのグラフをコンパイルし、残りは通常の Python として実行します。fullgraph=True を設定するとグラフブレークをエラーとして扱います。
Q3. パラメータシャーディングの観点から、FSDP が DDP よりメモリ効率が高い理由は?
答え: FSDP はパラメータ、勾配、オプティマイザの状態すべてを world_size 個の GPU に分散し、各 GPU のメモリを約 1/N に削減します。
解説: DDP は各 GPU がモデルパラメータ全体を複製して保持します。100 億パラメータのモデルは FP32 で GPU あたり約 40 GB 必要です。FSDP (FULL_SHARD 戦略、ZeRO-3 相当) は forward/backward で必要なパラメータのみ all-gather で収集し、使用後すぐに解放します。8 GPU 環境では GPU あたりのメモリ使用量が約 1/8 に削減され、単一 GPU に収まらない巨大モデルの学習が可能になります。
Q4. Gradient checkpointing における forward pass の再実行トレードオフは?
答え: メモリ使用量を O(sqrt(N)) に削減できる代わりに、逆伝播時間が約 33% 増加する演算とメモリのトレードオフです。
解説: 通常の逆伝播はすべての forward 活性化を保存するため O(N) メモリが必要です。Gradient checkpointing はチェックポイント境界の活性化のみを保存し、逆伝播時にその区間の forward を再実行します。Transformer でレイヤーごとにチェックポイントを設定すると、メモリはレイヤー数に比例するのではなくその平方根に比例します。再計算のオーバーヘッドは全学習時間を約 30〜40% 増加させますが、バッチサイズを増やせることで実効スループットが改善する場合があります。
Q5. AMP の GradScaler が FP16 学習でアンダーフローを防止する方法は?
答え: GradScaler は損失に大きなスケール値を掛けて backward を実行し、勾配を FP16 の表現可能な範囲に収め、オプティマイザ更新前にスケールを逆適用します。
解説: FP16 の最小正規化値は約 6e-5 で、小さい勾配はゼロにアンダーフローします。GradScaler は損失にスケール係数 (初期値 65536 など) を掛けることで勾配が比例して大きくなり、FP16 でも表現できます。scaler.unscale_() で勾配を元のスケールに戻します。Inf/NaN が検出されるとそのステップをスキップしてスケールを半分にします。BF16 は FP32 と同じ指数範囲を持つため GradScaler は不要です。