Skip to content
Published on

PyTorch 内部構造 & 高度な最適化: autograd、torch.compile、FSDP、Tritonまで

Authors

目次

  1. PyTorch 内部構造: ATen と Tensor 層
  2. Autograd エンジンと計算グラフ
  3. カスタム演算の実装
  4. torch.compile() と TorchInductor
  5. メモリ最適化技術
  6. 分散学習: DDP と FSDP
  7. 推論最適化
  8. デバッグツール
  9. クイズ

PyTorch 内部構造

ATen ライブラリ

PyTorch の核心は ATen (A Tensor Library) です。C++ ベースのテンソル演算ライブラリで、すべての PyTorch 演算の基盤実装です。

Python API (torch.*)
TorchDispatch / Dispatcher
ATen (C++ テンソル演算)
CUDA / CPU / MPS バックエンド

ATen の主要コンポーネント:

  • Tensor: storage、dtype、device、stride 情報を保持する多次元配列
  • Storage: 実際のメモリブロック (テンソル間で共有可能)
  • Dispatcher: 演算を適切なバックエンドへルーティング
import torch

x = torch.randn(3, 4)
print(x.storage())          # 実メモリブロック
print(x.stride())           # (4, 1) - 行優先レイアウト
print(x.storage_offset())   # 0

# View は storage を共有
y = x.view(2, 6)
print(x.storage().data_ptr() == y.storage().data_ptr())  # True

TorchDispatch

TorchDispatch は Python レベルで PyTorch 演算をインターセプトするメカニズムです。カスタムテンソル型の実装に活用されます。

import torch
from torch.utils._pytree import tree_map

class LoggingTensor(torch.Tensor):
    @staticmethod
    def __new__(cls, elem):
        return torch.Tensor._make_subclass(cls, elem)

    @classmethod
    def __torch_dispatch__(cls, func, types, args=(), kwargs=None):
        print(f"Calling: {func.__name__}")
        kwargs = kwargs or {}
        return func(*args, **kwargs)

x = LoggingTensor(torch.randn(3, 3))
y = x + x  # 出力: Calling: add.Tensor

Autograd エンジン

計算グラフ (DAG)

PyTorch autograd は動的計算グラフ (Dynamic Computational Graph) を使用します。演算が実行されるたびに DAG (Directed Acyclic Graph) が構築されます。

import torch

x = torch.tensor(2.0, requires_grad=True)  # leaf テンソル
y = x ** 2          # non-leaf、grad_fn=PowBackward0
z = y * 3           # non-leaf、grad_fn=MulBackward0

print(x.is_leaf)    # True
print(y.is_leaf)    # False
print(z.grad_fn)    # MulBackward0

z.backward()
print(x.grad)       # dz/dx = 3 * 2x = 12.0

Leaf テンソル vs Non-leaf テンソル:

  • Leaf テンソル: requires_grad=True でユーザーが直接作成したテンソル。.grad に勾配が累積される
  • Non-leaf テンソル: 演算の結果として生成されたテンソル。デフォルトでは .grad が None (.retain_grad() の呼び出しが必要)

勾配累積メカニズム

import torch

model = torch.nn.Linear(10, 1)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

# 大きなバッチサイズをシミュレートする勾配累積
ACCUMULATION_STEPS = 4
for i, (x, y) in enumerate(dataloader):
    output = model(x)
    loss = criterion(output, y) / ACCUMULATION_STEPS
    loss.backward()  # .grad に勾配が累積

    if (i + 1) % ACCUMULATION_STEPS == 0:
        optimizer.step()
        optimizer.zero_grad()  # 累積勾配をリセット

retain_graph と create_graph

x = torch.tensor(3.0, requires_grad=True)
y = x ** 3

# 高階微分: create_graph=True で勾配グラフを保持
grad_1 = torch.autograd.grad(y, x, create_graph=True)[0]
grad_2 = torch.autograd.grad(grad_1, x)[0]

print(grad_1)  # 3x^2 = 27.0
print(grad_2)  # 6x   = 18.0

カスタム演算の実装

torch.autograd.Function

カスタムの forward/backward を定義する際に使用します。

import torch

class SigmoidFunction(torch.autograd.Function):
    @staticmethod
    def forward(ctx, x):
        # backward で必要な値を ctx に保存
        output = 1 / (1 + torch.exp(-x))
        ctx.save_for_backward(output)
        return output

    @staticmethod
    def backward(ctx, grad_output):
        (output,) = ctx.saved_tensors
        # sigmoid の微分: sigma(x) * (1 - sigma(x))
        grad_input = grad_output * output * (1 - output)
        return grad_input

# 使用例
x = torch.randn(4, requires_grad=True)
y = SigmoidFunction.apply(x)
y.sum().backward()
print(x.grad)

torch.library API (カスタム演算の登録)

import torch
from torch.library import Library, impl

my_lib = Library("my_ops", "DEF")
my_lib.define("relu_squared(Tensor x) -> Tensor")

@impl(my_lib, "relu_squared", "CPU")
def relu_squared_cpu(x):
    return torch.relu(x) ** 2

@impl(my_lib, "relu_squared", "CUDA")
def relu_squared_cuda(x):
    return torch.relu(x) ** 2

# カスタム演算を使用
x = torch.randn(5)
result = torch.ops.my_ops.relu_squared(x)

Triton を使ったカスタム CUDA カーネル

import triton
import triton.language as tl
import torch

@triton.jit
def relu_squared_kernel(
    x_ptr, out_ptr,
    n_elements,
    BLOCK_SIZE: tl.constexpr,
):
    pid = tl.program_id(0)
    offsets = pid * BLOCK_SIZE + tl.arange(0, BLOCK_SIZE)
    mask = offsets < n_elements

    x = tl.load(x_ptr + offsets, mask=mask)
    relu_x = tl.where(x > 0, x, 0.0)
    out = relu_x * relu_x

    tl.store(out_ptr + offsets, out, mask=mask)

def relu_squared_triton(x: torch.Tensor):
    out = torch.empty_like(x)
    n_elements = x.numel()
    BLOCK_SIZE = 1024
    grid = (triton.cdiv(n_elements, BLOCK_SIZE),)
    relu_squared_kernel[grid](x, out, n_elements, BLOCK_SIZE)
    return out

torch.compile()

Dynamo とグラフキャプチャ

torch.compile() は Python バイトコードを解析して計算グラフを抽出し、最適化します。

import torch

def model_forward(x, weight):
    x = torch.nn.functional.relu(x @ weight)
    return x.sum()

# fullgraph=True はグラフブレークを禁止
compiled_fn = torch.compile(model_forward, fullgraph=True, backend="inductor")

x = torch.randn(128, 256, device="cuda")
w = torch.randn(256, 512, device="cuda")
out = compiled_fn(x, w)

グラフブレークが発生する条件:

  • テンソル値に依存する制御フロー (例: if tensor.sum() > 0)
  • 外部ライブラリの呼び出し (numpy など)
  • サポートされていない Python パターン
import torch._dynamo
torch._dynamo.config.verbose = True  # グラフブレークのデバッグ

# fullgraph=False (デフォルト) はグラフブレークを許可
compiled = torch.compile(model, backend="inductor")

AOTAutograd と TorchInductor

torch.compile() パイプライン:
Python コード → Dynamo (グラフ抽出)
AOTAutograd (forward + backward の結合)
TorchInductor (カーネル生成)
Triton / C++ コード
import torch
import torch.nn as nn

class SimpleModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(512, 1024)
        self.fc2 = nn.Linear(1024, 10)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        return self.fc2(x)

model = SimpleModel().cuda()

# mode オプション: "default"、"reduce-overhead"、"max-autotune"
compiled_model = torch.compile(model, mode="max-autotune")

x = torch.randn(32, 512, device="cuda")
out = compiled_model(x)

メモリ最適化

Gradient Checkpointing

順伝播中の中間活性化を保存せず、逆伝播時に再計算してメモリを節約します。

import torch
import torch.utils.checkpoint as checkpoint
import torch.nn as nn

class CheckpointedBlock(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(dim, dim * 4),
            nn.GELU(),
            nn.Linear(dim * 4, dim),
        )

    def forward(self, x):
        # 逆伝播時に layers を再実行してメモリを節約
        return checkpoint.checkpoint(self.layers, x, use_reentrant=False)

model = nn.Sequential(*[CheckpointedBlock(512) for _ in range(24)]).cuda()
x = torch.randn(32, 512, device="cuda", requires_grad=True)
out = model(x)
out.sum().backward()

AMP (自動混合精度)

import torch
from torch.cuda.amp import autocast, GradScaler

model = SimpleModel().cuda()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
scaler = GradScaler()  # FP16 アンダーフロー防止

for x, y in dataloader:
    x, y = x.cuda(), y.cuda()
    optimizer.zero_grad()

    # autocast: 演算に応じて FP16/BF16 を自動適用
    with autocast(dtype=torch.float16):
        output = model(x)
        loss = criterion(output, y)

    # scaler: 損失をスケーリングして勾配を FP16 範囲内に保つ
    scaler.scale(loss).backward()
    scaler.unscale_(optimizer)
    torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
    scaler.step(optimizer)
    scaler.update()

メモリプロファイラー

import torch
from torch.profiler import profile, ProfilerActivity, record_function

with profile(
    activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA],
    profile_memory=True,
    record_shapes=True,
) as prof:
    with record_function("model_inference"):
        output = model(x)

print(prof.key_averages().table(
    sort_by="cuda_memory_usage", row_limit=10
))
prof.export_chrome_trace("trace.json")

活性化のオフロード

# 活性化を CPU にオフロードして GPU メモリを解放
def offload_checkpoint(module, x):
    """活性化を CPU にオフロードし、逆伝播時に GPU に戻す"""
    def forward_and_save(*inputs):
        output = module(*inputs)
        return output

    return checkpoint.checkpoint(forward_and_save, x, use_reentrant=False)

分散学習

DDP (DistributedDataParallel)

import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
import os

def setup(rank, world_size):
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "12355"
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

def train(rank, world_size):
    setup(rank, world_size)
    torch.cuda.set_device(rank)

    model = SimpleModel().cuda(rank)
    ddp_model = DDP(model, device_ids=[rank])

    optimizer = torch.optim.Adam(ddp_model.parameters())

    for x, y in dataloader:
        x, y = x.cuda(rank), y.cuda(rank)
        output = ddp_model(x)
        loss = criterion(output, y)
        loss.backward()     # gradient の all-reduce が自動実行
        optimizer.step()
        optimizer.zero_grad()

    cleanup()

FSDP (Fully Sharded Data Parallel)

FSDP はパラメータ、勾配、オプティマイザの状態をすべての GPU に分散します。

import torch
from torch.distributed.fsdp import (
    FullyShardedDataParallel as FSDP,
    MixedPrecision,
    ShardingStrategy,
)
from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy
import functools

# Transformer ブロック単位で FSDP を適用
auto_wrap_policy = functools.partial(
    transformer_auto_wrap_policy,
    transformer_layer_cls={nn.TransformerEncoderLayer},
)

# 混合精度ポリシー
mp_policy = MixedPrecision(
    param_dtype=torch.bfloat16,
    reduce_dtype=torch.float32,
    buffer_dtype=torch.bfloat16,
)

model = LargeTransformer().cuda()
fsdp_model = FSDP(
    model,
    auto_wrap_policy=auto_wrap_policy,
    mixed_precision=mp_policy,
    sharding_strategy=ShardingStrategy.FULL_SHARD,  # ZeRO-3 相当
)

optimizer = torch.optim.AdamW(fsdp_model.parameters(), lr=1e-4)

FSDP と DDP のメモリ比較:

DDP は各 GPU がモデル全体のコピーを保持します。FSDP はパラメータを world_size で分割し、各 GPU のメモリ使用量を約 1/N に削減します。

DeepSpeed ZeRO 統合

# deepspeed_config.json:
# {
#   "zero_optimization": {"stage": 3},
#   "fp16": {"enabled": true},
#   "gradient_accumulation_steps": 4
# }

import deepspeed

model_engine, optimizer, _, _ = deepspeed.initialize(
    model=model,
    model_parameters=model.parameters(),
    config="deepspeed_config.json"
)

for x, y in dataloader:
    output = model_engine(x)
    loss = criterion(output, y)
    model_engine.backward(loss)
    model_engine.step()

推論最適化

torch.export() と ONNX

import torch
from torch.export import export

model = SimpleModel().eval()
x = torch.randn(1, 512)

# torch.export: 静的計算グラフを抽出
exported = export(model, (x,))
print(exported.graph)

# ONNX エクスポート
torch.onnx.export(
    model, x,
    "model.onnx",
    input_names=["input"],
    output_names=["output"],
    dynamic_axes={"input": {0: "batch_size"}},
    opset_version=17,
)

量子化対応学習 (QAT)

import torch
from torch.quantization import get_default_qat_qconfig, prepare_qat, convert

model = SimpleModel()
model.qconfig = get_default_qat_qconfig("fbgemm")

# フェイク量子化を挿入
model_prepared = prepare_qat(model.train())

# 通常通り学習
for x, y in dataloader:
    output = model_prepared(x)
    loss = criterion(output, y)
    loss.backward()
    optimizer.step()

# INT8 モデルへ変換
model_int8 = convert(model_prepared.eval())

デバッグツール

torch.profiler

import torch
from torch.profiler import profile, ProfilerActivity, schedule

with profile(
    activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA],
    schedule=schedule(wait=1, warmup=1, active=3, repeat=2),
    on_trace_ready=torch.profiler.tensorboard_trace_handler("./log"),
    record_shapes=True,
    profile_memory=True,
    with_stack=True,
) as prof:
    for step, (x, y) in enumerate(dataloader):
        output = model(x.cuda())
        loss = criterion(output, y.cuda())
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        prof.step()

異常検知 (Anomaly Detection)

# NaN/Inf 勾配をスタックトレース付きで検知
with torch.autograd.detect_anomaly():
    output = model(x)
    loss = output.sum()
    loss.backward()  # NaN 発生時にスタックトレースを出力

grad_fn のトレース

def trace_grad_fn(tensor, depth=0):
    if tensor.grad_fn is None:
        print("  " * depth + f"Leaf: {tensor.shape}")
        return
    print("  " * depth + f"{tensor.grad_fn.__class__.__name__}: {tensor.shape}")
    for inp, _ in tensor.grad_fn.next_functions:
        if inp is not None:
            trace_grad_fn(inp.variable if hasattr(inp, 'variable') else inp, depth + 1)

x = torch.randn(3, requires_grad=True)
y = torch.randn(3, requires_grad=True)
z = (x * y).sum()
trace_grad_fn(z)

クイズ

Q1. PyTorch autograd における leaf テンソルと non-leaf テンソルの違い、および勾配累積の仕組みは?

答え: leaf テンソルはユーザーが直接作成し requires_grad=True を持つテンソルです。.grad 属性に勾配が累積されます。

解説: non-leaf テンソルは演算の結果として生成されたテンソルで、デフォルトでは勾配が保存されません (メモリ節約のため)。retain_grad() を呼び出すと non-leaf テンソルの勾配も保持できます。勾配累積は、zero_grad() を呼ばずに backward() を複数回呼び出すと .grad に加算される仕組みを利用します。これを活用して GPU メモリを増やさずに実効バッチサイズを仮想的に大きくできます。

Q2. torch.compile() の Dynamo が Python バイトコードをトレースする方法と、グラフブレークの発生条件は?

答え: Dynamo は PEP 523 の API を用いて Python フレーム評価をインターセプトし、バイトコードをシンボリックにトレースします。サポートされないパターンでグラフブレークが発生します。

解説: Dynamo は CPython のフレーム評価フックを使用してバイトコードを追跡します。グラフブレークは次の場合に発生します: テンソル値に依存する制御フロー (例: if tensor.sum() > 0)、サポートされていない外部ライブラリの呼び出し、C 拡張の使用など。各ブレーク地点で Dynamo はそこまでのグラフをコンパイルし、残りは通常の Python として実行します。fullgraph=True を設定するとグラフブレークをエラーとして扱います。

Q3. パラメータシャーディングの観点から、FSDP が DDP よりメモリ効率が高い理由は?

答え: FSDP はパラメータ、勾配、オプティマイザの状態すべてを world_size 個の GPU に分散し、各 GPU のメモリを約 1/N に削減します。

解説: DDP は各 GPU がモデルパラメータ全体を複製して保持します。100 億パラメータのモデルは FP32 で GPU あたり約 40 GB 必要です。FSDP (FULL_SHARD 戦略、ZeRO-3 相当) は forward/backward で必要なパラメータのみ all-gather で収集し、使用後すぐに解放します。8 GPU 環境では GPU あたりのメモリ使用量が約 1/8 に削減され、単一 GPU に収まらない巨大モデルの学習が可能になります。

Q4. Gradient checkpointing における forward pass の再実行トレードオフは?

答え: メモリ使用量を O(sqrt(N)) に削減できる代わりに、逆伝播時間が約 33% 増加する演算とメモリのトレードオフです。

解説: 通常の逆伝播はすべての forward 活性化を保存するため O(N) メモリが必要です。Gradient checkpointing はチェックポイント境界の活性化のみを保存し、逆伝播時にその区間の forward を再実行します。Transformer でレイヤーごとにチェックポイントを設定すると、メモリはレイヤー数に比例するのではなくその平方根に比例します。再計算のオーバーヘッドは全学習時間を約 30〜40% 増加させますが、バッチサイズを増やせることで実効スループットが改善する場合があります。

Q5. AMP の GradScaler が FP16 学習でアンダーフローを防止する方法は?

答え: GradScaler は損失に大きなスケール値を掛けて backward を実行し、勾配を FP16 の表現可能な範囲に収め、オプティマイザ更新前にスケールを逆適用します。

解説: FP16 の最小正規化値は約 6e-5 で、小さい勾配はゼロにアンダーフローします。GradScaler は損失にスケール係数 (初期値 65536 など) を掛けることで勾配が比例して大きくなり、FP16 でも表現できます。scaler.unscale_() で勾配を元のスケールに戻します。Inf/NaN が検出されるとそのステップをスキップしてスケールを半分にします。BF16 は FP32 と同じ指数範囲を持つため GradScaler は不要です。