Skip to content

Split View: 생성형 AI 완전 정복: GAN, VAE, 디퓨전 모델 마스터하기

|

생성형 AI 완전 정복: GAN, VAE, 디퓨전 모델 마스터하기

들어가며

생성형 AI(Generative AI)는 현재 기술 세계에서 가장 뜨거운 분야입니다. DALL-E 3가 텍스트 한 줄로 사진 같은 이미지를 만들고, Stable Diffusion이 예술 작품을 생성하며, Sora가 동영상을 만들어냅니다.

이 모든 혁신의 뒤에는 수십 년에 걸쳐 발전한 생성 모델들이 있습니다. VAE(변분 오토인코더), GAN(생성적 적대 신경망), 그리고 현재 지배적인 디퓨전 모델까지 — 각각의 아이디어와 수학적 원리, 그리고 실제 구현 방법을 완전히 마스터해봅시다.


1. 생성 모델(Generative Model) 개요

1.1 생성 모델 vs 판별 모델

딥러닝 모델은 크게 두 가지로 나뉩니다.

판별 모델(Discriminative Model): 입력 데이터 x가 주어졌을 때 레이블 y의 조건부 확률 P(y|x)를 학습합니다. 이미지 분류, 객체 검출 등이 해당합니다.

생성 모델(Generative Model): 데이터의 결합 확률 분포 P(x) 또는 P(x, y)를 학습합니다. 학습 후 새로운 데이터 샘플을 생성할 수 있습니다.

생성 모델의 핵심 질문: "이 데이터가 어떻게 생성되었는가? 같은 분포에서 새로운 샘플을 어떻게 만들 수 있는가?"

1.2 잠재 공간(Latent Space)의 개념

대부분의 생성 모델은 **잠재 공간(Latent Space)**이라는 저차원 표현 공간을 활용합니다.

예를 들어 28x28 MNIST 이미지(784차원)는 2~100차원의 잠재 벡터로 압축될 수 있습니다. 이 잠재 공간에서:

  • 숫자 '3'과 숫자 '8'은 서로 가까이 위치
  • 중간 지점의 벡터를 디코딩하면 '3'과 '8'의 중간 형태
  • 잠재 공간을 걸어다니며(interpolation) 연속적인 변환 가능

1.3 생성 모델의 응용 분야

  • 이미지 생성: 사실적인 얼굴, 풍경, 예술 작품 생성
  • 이미지-이미지 변환: 낮 사진 → 밤 사진, 스케치 → 사진
  • 데이터 증강: 부족한 학습 데이터 보완
  • 약물 발견: 새로운 분자 구조 생성
  • 텍스트 생성: GPT 계열의 언어 모델
  • 음악/음성 합성: 새로운 음악 작곡, TTS

2. 오토인코더(Autoencoder)

VAE를 이해하기 위한 기초인 오토인코더부터 시작합니다.

2.1 인코더-디코더 구조

오토인코더는 두 부분으로 구성됩니다.

  • 인코더(Encoder): 고차원 입력 x → 저차원 잠재 벡터 z
  • 디코더(Decoder): 저차원 잠재 벡터 z → 재구성된 출력 x'

목표: x'이 x와 최대한 비슷하도록 학습 (재구성 손실 최소화)

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

class Autoencoder(nn.Module):
    """기본 오토인코더 (MNIST용)"""
    def __init__(self, input_dim=784, latent_dim=32):
        super().__init__()

        # 인코더
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, latent_dim),
            nn.ReLU()
        )

        # 디코더
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Linear(256, input_dim),
            nn.Sigmoid()  # 픽셀 값을 [0, 1]로 제한
        )

    def forward(self, x):
        # 평탄화
        x = x.view(x.size(0), -1)
        z = self.encoder(x)
        x_reconstructed = self.decoder(z)
        return x_reconstructed, z

    def encode(self, x):
        x = x.view(x.size(0), -1)
        return self.encoder(x)

    def decode(self, z):
        return self.decoder(z).view(-1, 1, 28, 28)


def train_autoencoder(epochs=20):
    """오토인코더 학습"""
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # MNIST 데이터셋
    transform = transforms.Compose([
        transforms.ToTensor(),
    ])
    train_dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
    train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)

    model = Autoencoder().to(device)
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    criterion = nn.BCELoss()  # Binary Cross-Entropy

    for epoch in range(epochs):
        total_loss = 0
        for batch_idx, (data, _) in enumerate(train_loader):
            data = data.to(device)

            optimizer.zero_grad()
            reconstructed, z = model(data)
            target = data.view(data.size(0), -1)
            loss = criterion(reconstructed, target)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(train_loader)
        print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")

    return model

2.2 오토인코더의 한계

기본 오토인코더는 데이터를 잘 압축하지만, 새로운 샘플을 생성하기 어렵습니다. 잠재 공간이 규칙적이지 않아 임의의 잠재 벡터에서 디코딩하면 의미 없는 이미지가 나올 수 있습니다.

이 문제를 해결하는 것이 VAE입니다.


3. 변분 오토인코더(VAE)

VAE(Variational Autoencoder)는 Kingma와 Welling이 2013년에 발표한 획기적인 논문(arXiv:1312.6114)에서 제안되었습니다.

3.1 VAE의 핵심 아이디어

VAE의 핵심은 잠재 공간에 확률 분포를 학습하는 것입니다.

  • 기본 오토인코더: z = encoder(x) (결정론적 포인트)
  • VAE: z ~ N(μ, σ²) (가우시안 분포에서 샘플링)

인코더는 이제 잠재 벡터 z 대신 **분포의 파라미터(평균 μ, 분산 σ²)**를 출력합니다.

새로운 이미지를 생성할 때는 표준 정규 분포 N(0, I)에서 z를 샘플링하여 디코더에 입력합니다.

3.2 ELBO (Evidence Lower Bound)

VAE의 학습 목표는 데이터 로그 우도 log P(x)를 최대화하는 것입니다. 이를 직접 최적화하기 어렵기 때문에, ELBO를 최대화하는 방식을 사용합니다.

log P(x) >= E_q[log P(x|z)] - KL[q(z|x) || P(z)]
                    ↑                    ↑
            재구성 손실              KL Divergence

ELBO = 재구성 손실 + KL Divergence

  • 재구성 손실: 인코딩 후 디코딩한 결과가 원본과 얼마나 비슷한가
  • KL Divergence: 학습된 잠재 분포 q(z|x)가 사전 분포 P(z) = N(0, I)에 얼마나 가까운가

3.3 재파라미터화 트릭(Reparameterization Trick)

z ~ N(μ, σ²)에서 직접 샘플링하면 역전파가 불가능합니다. 재파라미터화 트릭으로 이를 해결합니다.

z = μ + σ * ε,  ε ~ N(0, I)

이렇게 하면 랜덤성(ε)이 네트워크 파라미터와 분리되어 μ와 σ에 대한 그래디언트를 계산할 수 있습니다.

3.4 완전한 VAE 구현 (MNIST)

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import numpy as np

class VAE(nn.Module):
    """변분 오토인코더"""
    def __init__(self, input_dim=784, hidden_dim=400, latent_dim=20):
        super(VAE, self).__init__()
        self.latent_dim = latent_dim

        # 인코더: 입력 -> μ, log(σ²)
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc_mu = nn.Linear(hidden_dim, latent_dim)      # 평균
        self.fc_logvar = nn.Linear(hidden_dim, latent_dim)  # log 분산

        # 디코더: 잠재 벡터 -> 재구성
        self.fc3 = nn.Linear(latent_dim, hidden_dim)
        self.fc4 = nn.Linear(hidden_dim, input_dim)

    def encode(self, x):
        """인코더: x -> (μ, log_var)"""
        h = F.relu(self.fc1(x))
        return self.fc_mu(h), self.fc_logvar(h)

    def reparameterize(self, mu, logvar):
        """재파라미터화 트릭: z = μ + ε * σ"""
        if self.training:
            std = torch.exp(0.5 * logvar)  # σ = exp(0.5 * log σ²)
            eps = torch.randn_like(std)     # ε ~ N(0, I)
            return mu + eps * std
        else:
            return mu  # 추론 시 평균만 사용

    def decode(self, z):
        """디코더: z -> x'"""
        h = F.relu(self.fc3(z))
        return torch.sigmoid(self.fc4(h))

    def forward(self, x):
        x_flat = x.view(-1, 784)
        mu, logvar = self.encode(x_flat)
        z = self.reparameterize(mu, logvar)
        x_recon = self.decode(z)
        return x_recon, mu, logvar

    def generate(self, num_samples, device):
        """표준 정규 분포에서 샘플링하여 이미지 생성"""
        with torch.no_grad():
            z = torch.randn(num_samples, self.latent_dim).to(device)
            samples = self.decode(z)
            return samples.view(num_samples, 1, 28, 28)


def vae_loss(x_recon, x, mu, logvar, beta=1.0):
    """
    VAE 손실 = 재구성 손실 + β * KL Divergence
    beta=1: 표준 VAE
    beta>1: β-VAE (더 분리된 잠재 표현)
    """
    # 재구성 손실 (BCE)
    recon_loss = F.binary_cross_entropy(
        x_recon, x.view(-1, 784),
        reduction='sum'
    )

    # KL Divergence: KL[N(μ, σ²) || N(0, 1)]
    # = -0.5 * Σ(1 + log σ² - μ² - σ²)
    kl_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())

    return recon_loss + beta * kl_loss


def train_vae(epochs=50, latent_dim=20, beta=1.0):
    """VAE 학습"""
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    transform = transforms.Compose([transforms.ToTensor()])
    train_dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
    train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
    test_dataset = datasets.MNIST('./data', train=False, transform=transform)
    test_loader = DataLoader(test_dataset, batch_size=128)

    model = VAE(latent_dim=latent_dim).to(device)
    optimizer = optim.Adam(model.parameters(), lr=1e-3)

    train_losses = []

    for epoch in range(epochs):
        model.train()
        total_loss = 0

        for data, _ in train_loader:
            data = data.to(device)
            optimizer.zero_grad()

            x_recon, mu, logvar = model(data)
            loss = vae_loss(x_recon, data, mu, logvar, beta)

            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        avg_loss = total_loss / len(train_dataset)
        train_losses.append(avg_loss)

        if (epoch + 1) % 10 == 0:
            print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")

    return model, train_losses


def interpolate_latent_space(model, img1, img2, steps=10, device='cpu'):
    """잠재 공간에서 두 이미지 사이를 보간"""
    model.eval()
    with torch.no_grad():
        # 잠재 벡터 인코딩
        z1_flat = img1.view(-1, 784).to(device)
        z2_flat = img2.view(-1, 784).to(device)

        mu1, _ = model.encode(z1_flat)
        mu2, _ = model.encode(z2_flat)

        # 선형 보간
        interpolated_images = []
        for alpha in np.linspace(0, 1, steps):
            z_interp = (1 - alpha) * mu1 + alpha * mu2
            img_recon = model.decode(z_interp)
            interpolated_images.append(img_recon.view(1, 28, 28))

    return interpolated_images

3.5 합성곱 VAE (CIFAR-10용)

class ConvVAE(nn.Module):
    """합성곱 VAE (컬러 이미지용)"""
    def __init__(self, latent_dim=128):
        super().__init__()
        self.latent_dim = latent_dim

        # 합성곱 인코더
        self.encoder_conv = nn.Sequential(
            nn.Conv2d(3, 32, 4, stride=2, padding=1),  # 16x16
            nn.ReLU(),
            nn.Conv2d(32, 64, 4, stride=2, padding=1), # 8x8
            nn.ReLU(),
            nn.Conv2d(64, 128, 4, stride=2, padding=1),# 4x4
            nn.ReLU(),
        )
        self.fc_mu = nn.Linear(128 * 4 * 4, latent_dim)
        self.fc_logvar = nn.Linear(128 * 4 * 4, latent_dim)

        # 전치 합성곱 디코더
        self.decoder_fc = nn.Linear(latent_dim, 128 * 4 * 4)
        self.decoder_conv = nn.Sequential(
            nn.ConvTranspose2d(128, 64, 4, stride=2, padding=1),  # 8x8
            nn.ReLU(),
            nn.ConvTranspose2d(64, 32, 4, stride=2, padding=1),   # 16x16
            nn.ReLU(),
            nn.ConvTranspose2d(32, 3, 4, stride=2, padding=1),    # 32x32
            nn.Sigmoid()
        )

    def encode(self, x):
        h = self.encoder_conv(x).view(x.size(0), -1)
        return self.fc_mu(h), self.fc_logvar(h)

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def decode(self, z):
        h = F.relu(self.decoder_fc(z)).view(-1, 128, 4, 4)
        return self.decoder_conv(h)

    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        return self.decode(z), mu, logvar

4. 생성적 적대 신경망(GAN)

GAN(Generative Adversarial Network)은 Ian Goodfellow가 2014년에 발표한 논문(arXiv:1406.2661)에서 제안된 혁신적인 아이디어입니다.

4.1 Generator와 Discriminator의 게임

GAN은 두 신경망이 경쟁하며 학습합니다.

생성자(Generator, G): 랜덤 노이즈 z에서 가짜 데이터를 생성합니다.

  • 목표: Discriminator를 속일 만큼 진짜 같은 데이터 생성

판별자(Discriminator, D): 실제 데이터와 생성된 가짜 데이터를 구별합니다.

  • 목표: 실제 데이터는 1, 가짜 데이터는 0으로 분류

이 게임에서 두 네트워크는 Nash Equilibrium에 도달할 때까지 경쟁하며 학습합니다.

4.2 미니맥스 손실

min_G max_D [E_x[log D(x)] + E_z[log(1 - D(G(z)))]]
  • D 최대화: 실제 데이터에서 log D(x) 최대화, 가짜 데이터에서 log(1 - D(G(z))) 최대화
  • G 최소화: G(z)가 D를 속이도록 log(1 - D(G(z))) 최소화

실제 학습에서는 G의 손실로 -log(D(G(z)))를 사용합니다 (비포화 손실, non-saturating loss).

4.3 기본 GAN 구현

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
import numpy as np

class Generator(nn.Module):
    """GAN Generator"""
    def __init__(self, noise_dim=100, output_dim=784):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(noise_dim, 256),
            nn.LeakyReLU(0.2),
            nn.BatchNorm1d(256),
            nn.Linear(256, 512),
            nn.LeakyReLU(0.2),
            nn.BatchNorm1d(512),
            nn.Linear(512, 1024),
            nn.LeakyReLU(0.2),
            nn.BatchNorm1d(1024),
            nn.Linear(1024, output_dim),
            nn.Tanh()  # [-1, 1] 범위
        )

    def forward(self, z):
        return self.model(z).view(-1, 1, 28, 28)


class Discriminator(nn.Module):
    """GAN Discriminator"""
    def __init__(self, input_dim=784):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, 1024),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),
            nn.Linear(1024, 512),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),
            nn.Linear(512, 256),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),
            nn.Linear(256, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.model(x.view(x.size(0), -1))


def train_gan(epochs=200, noise_dim=100, lr=2e-4):
    """GAN 학습"""
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize([0.5], [0.5])  # [-1, 1] 범위로 정규화
    ])
    train_dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
    dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)

    G = Generator(noise_dim).to(device)
    D = Discriminator().to(device)

    # 별도 옵티마이저
    optimizer_G = optim.Adam(G.parameters(), lr=lr, betas=(0.5, 0.999))
    optimizer_D = optim.Adam(D.parameters(), lr=lr, betas=(0.5, 0.999))

    criterion = nn.BCELoss()

    for epoch in range(epochs):
        for i, (real_imgs, _) in enumerate(dataloader):
            batch_size = real_imgs.size(0)
            real_imgs = real_imgs.to(device)

            real_labels = torch.ones(batch_size, 1).to(device)
            fake_labels = torch.zeros(batch_size, 1).to(device)

            # === Discriminator 업데이트 ===
            optimizer_D.zero_grad()

            # 실제 이미지 손실
            d_real = D(real_imgs)
            d_loss_real = criterion(d_real, real_labels)

            # 가짜 이미지 손실
            z = torch.randn(batch_size, noise_dim).to(device)
            fake_imgs = G(z).detach()  # G의 그래디언트 차단
            d_fake = D(fake_imgs)
            d_loss_fake = criterion(d_fake, fake_labels)

            d_loss = d_loss_real + d_loss_fake
            d_loss.backward()
            optimizer_D.step()

            # === Generator 업데이트 ===
            optimizer_G.zero_grad()

            z = torch.randn(batch_size, noise_dim).to(device)
            fake_imgs = G(z)
            g_pred = D(fake_imgs)
            # G는 D를 속이려 함: 가짜를 진짜로 분류하게
            g_loss = criterion(g_pred, real_labels)

            g_loss.backward()
            optimizer_G.step()

        if (epoch + 1) % 20 == 0:
            print(f"Epoch {epoch+1}/{epochs} | D Loss: {d_loss.item():.4f} | G Loss: {g_loss.item():.4f}")

    return G, D

4.4 모드 붕괴(Mode Collapse) 문제

GAN의 가장 큰 문제 중 하나입니다. Generator가 다양한 이미지를 생성하지 않고, Discriminator를 속일 수 있는 몇 가지 패턴만 반복적으로 생성하는 현상입니다.


5. GAN 발전사

5.1 DCGAN (Deep Convolutional GAN)

2015년에 제안된 DCGAN은 합성곱 신경망을 GAN에 성공적으로 적용했습니다.

class DCGANGenerator(nn.Module):
    """DCGAN Generator (64x64 이미지용)"""
    def __init__(self, noise_dim=100, ngf=64, nc=3):
        super().__init__()
        self.main = nn.Sequential(
            # 입력: noise_dim x 1 x 1
            nn.ConvTranspose2d(noise_dim, ngf * 8, 4, 1, 0, bias=False),
            nn.BatchNorm2d(ngf * 8),
            nn.ReLU(True),
            # 상태: (ngf*8) x 4 x 4
            nn.ConvTranspose2d(ngf * 8, ngf * 4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf * 4),
            nn.ReLU(True),
            # 상태: (ngf*4) x 8 x 8
            nn.ConvTranspose2d(ngf * 4, ngf * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf * 2),
            nn.ReLU(True),
            # 상태: (ngf*2) x 16 x 16
            nn.ConvTranspose2d(ngf * 2, ngf, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf),
            nn.ReLU(True),
            # 상태: (ngf) x 32 x 32
            nn.ConvTranspose2d(ngf, nc, 4, 2, 1, bias=False),
            nn.Tanh()
            # 출력: nc x 64 x 64
        )

    def forward(self, z):
        z = z.view(z.size(0), -1, 1, 1)
        return self.main(z)


class DCGANDiscriminator(nn.Module):
    """DCGAN Discriminator (64x64 이미지용)"""
    def __init__(self, ndf=64, nc=3):
        super().__init__()
        self.main = nn.Sequential(
            # 입력: nc x 64 x 64
            nn.Conv2d(nc, ndf, 4, 2, 1, bias=False),
            nn.LeakyReLU(0.2, inplace=True),
            # 상태: (ndf) x 32 x 32
            nn.Conv2d(ndf, ndf * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 2),
            nn.LeakyReLU(0.2, inplace=True),
            # 상태: (ndf*2) x 16 x 16
            nn.Conv2d(ndf * 2, ndf * 4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 4),
            nn.LeakyReLU(0.2, inplace=True),
            # 상태: (ndf*4) x 8 x 8
            nn.Conv2d(ndf * 4, ndf * 8, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 8),
            nn.LeakyReLU(0.2, inplace=True),
            # 상태: (ndf*8) x 4 x 4
            nn.Conv2d(ndf * 8, 1, 4, 1, 0, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.main(x).view(-1, 1)

5.2 WGAN (Wasserstein GAN)

WGAN(arXiv:1701.07875)은 Jensen-Shannon Divergence 대신 Wasserstein 거리를 사용하여 학습 안정성을 크게 향상시켰습니다.

class WGANDiscriminator(nn.Module):
    """WGAN Critic (Discriminator): Sigmoid 없음"""
    def __init__(self, input_dim=784):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, 512),
            nn.LeakyReLU(0.2),
            nn.Linear(512, 256),
            nn.LeakyReLU(0.2),
            nn.Linear(256, 1)
            # Sigmoid 없음! Wasserstein distance 계산을 위해
        )

    def forward(self, x):
        return self.model(x.view(x.size(0), -1))


def train_wgan(G, D, dataloader, device, epochs=100,
               n_critic=5, clip_value=0.01, lr=5e-5):
    """WGAN 학습"""
    optimizer_G = optim.RMSprop(G.parameters(), lr=lr)
    optimizer_D = optim.RMSprop(D.parameters(), lr=lr)

    for epoch in range(epochs):
        for i, (real_imgs, _) in enumerate(dataloader):
            real_imgs = real_imgs.to(device)
            batch_size = real_imgs.size(0)

            # Critic을 n_critic번 업데이트
            for _ in range(n_critic):
                optimizer_D.zero_grad()

                z = torch.randn(batch_size, 100).to(device)
                fake_imgs = G(z).detach()

                # Wasserstein 손실: E[D(real)] - E[D(fake)]
                d_loss = -torch.mean(D(real_imgs)) + torch.mean(D(fake_imgs))
                d_loss.backward()
                optimizer_D.step()

                # 가중치 클리핑 (Lipschitz 조건)
                for p in D.parameters():
                    p.data.clamp_(-clip_value, clip_value)

            # Generator 업데이트
            optimizer_G.zero_grad()
            z = torch.randn(batch_size, 100).to(device)
            fake_imgs = G(z)
            g_loss = -torch.mean(D(fake_imgs))
            g_loss.backward()
            optimizer_G.step()

5.3 WGAN-GP (Gradient Penalty)

WGAN의 가중치 클리핑 대신 그래디언트 패널티를 사용하여 Lipschitz 조건을 강제합니다.

def gradient_penalty(D, real_imgs, fake_imgs, device):
    """그래디언트 패널티 계산"""
    batch_size = real_imgs.size(0)
    # 실제와 가짜 이미지 사이의 무작위 보간
    alpha = torch.rand(batch_size, 1, 1, 1).to(device)
    interpolated = alpha * real_imgs + (1 - alpha) * fake_imgs
    interpolated.requires_grad_(True)

    d_interpolated = D(interpolated)

    gradients = torch.autograd.grad(
        outputs=d_interpolated,
        inputs=interpolated,
        grad_outputs=torch.ones_like(d_interpolated),
        create_graph=True,
        retain_graph=True
    )[0]

    gradients = gradients.view(batch_size, -1)
    gradient_norm = gradients.norm(2, dim=1)
    penalty = ((gradient_norm - 1) ** 2).mean()
    return penalty


def wgan_gp_d_loss(D, real_imgs, fake_imgs, device, lambda_gp=10):
    """WGAN-GP Discriminator 손실"""
    d_real = D(real_imgs).mean()
    d_fake = D(fake_imgs).mean()
    gp = gradient_penalty(D, real_imgs, fake_imgs, device)
    return -d_real + d_fake + lambda_gp * gp

5.4 Conditional GAN (cGAN)

레이블 조건을 추가하여 특정 클래스의 이미지를 생성합니다.

class ConditionalGenerator(nn.Module):
    """조건부 GAN Generator"""
    def __init__(self, noise_dim=100, num_classes=10, embed_dim=50):
        super().__init__()
        self.label_embedding = nn.Embedding(num_classes, embed_dim)
        self.model = nn.Sequential(
            nn.Linear(noise_dim + embed_dim, 256),
            nn.LeakyReLU(0.2),
            nn.Linear(256, 512),
            nn.LeakyReLU(0.2),
            nn.Linear(512, 784),
            nn.Tanh()
        )

    def forward(self, z, labels):
        label_embed = self.label_embedding(labels)
        x = torch.cat([z, label_embed], dim=1)
        return self.model(x).view(-1, 1, 28, 28)

6. 디퓨전 모델(Diffusion Models)

디퓨전 모델은 현재 이미지 생성의 표준이 된 최신 생성 모델입니다. DDPM(Denoising Diffusion Probabilistic Models, Ho et al., 2020, arXiv:2006.11239)이 이 분야를 개척했습니다.

6.1 디퓨전 모델의 직관

디퓨전 모델의 핵심 아이디어는 두 가지 과정에 있습니다.

Forward Process (노이즈 추가): 실제 이미지에 단계적으로 가우시안 노이즈를 추가하여 순수한 노이즈로 만듭니다. T 스텝 후에는 표준 정규 분포가 됩니다.

Reverse Process (노이즈 제거): 순수한 노이즈에서 시작하여 단계적으로 노이즈를 제거하여 실제 이미지를 복원합니다. 신경망이 이 역방향 과정을 학습합니다.

6.2 Forward Process 수식

각 스텝에서 노이즈를 추가합니다.

q(x_t | x_{t-1}) = N(x_t; sqrt(1-β_t) * x_{t-1}, β_t * I)

여기서 β_t는 노이즈 스케줄(noise schedule)입니다.

한 번에 t 스텝 점프 (중요한 특성):

q(x_t | x_0) = N(x_t; sqrt(ā_t) * x_0, (1-ā_t) * I)

여기서 ā_t는 s가 1부터 t까지 변할 때 (1 - β_s)를 모두 곱한 값입니다.

따라서 임의의 타임스텝 t에서의 노이즈 이미지를 직접 계산할 수 있습니다.

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

class NoiseSchedule:
    """노이즈 스케줄 관리"""
    def __init__(self, timesteps=1000, beta_start=1e-4, beta_end=0.02):
        self.timesteps = timesteps

        # 선형 노이즈 스케줄
        self.betas = torch.linspace(beta_start, beta_end, timesteps)
        self.alphas = 1.0 - self.betas
        self.alphas_cumprod = torch.cumprod(self.alphas, dim=0)
        self.alphas_cumprod_prev = F.pad(self.alphas_cumprod[:-1], (1, 0), value=1.0)

        # Forward process 계수
        self.sqrt_alphas_cumprod = torch.sqrt(self.alphas_cumprod)
        self.sqrt_one_minus_alphas_cumprod = torch.sqrt(1.0 - self.alphas_cumprod)

        # Reverse process 계수
        self.posterior_variance = (
            self.betas * (1.0 - self.alphas_cumprod_prev) /
            (1.0 - self.alphas_cumprod)
        )

    def q_sample(self, x_start, t, noise=None):
        """Forward process: x_0에 t 스텝만큼 노이즈 추가"""
        if noise is None:
            noise = torch.randn_like(x_start)

        # ā_t에 해당하는 계수 추출
        sqrt_alphas_cumprod_t = self.sqrt_alphas_cumprod[t].view(-1, 1, 1, 1)
        sqrt_one_minus_alphas_cumprod_t = self.sqrt_one_minus_alphas_cumprod[t].view(-1, 1, 1, 1)

        return sqrt_alphas_cumprod_t * x_start + sqrt_one_minus_alphas_cumprod_t * noise

    def predict_start_from_noise(self, x_t, t, noise):
        """예측된 노이즈에서 x_0 복원"""
        sqrt_recip_alphas_cumprod = 1.0 / self.sqrt_alphas_cumprod[t].view(-1, 1, 1, 1)
        sqrt_recipm1_alphas_cumprod = (
            torch.sqrt(1.0 / self.alphas_cumprod[t] - 1.0).view(-1, 1, 1, 1)
        )
        return sqrt_recip_alphas_cumprod * x_t - sqrt_recipm1_alphas_cumprod * noise

6.3 U-Net 아키텍처

디퓨전 모델의 노이즈 예측 네트워크는 U-Net 구조를 사용합니다. 타임스텝 t를 사인/코사인 임베딩으로 조건화합니다.

class SinusoidalPositionEmbeddings(nn.Module):
    """타임스텝을 위한 사인파 위치 임베딩"""
    def __init__(self, dim):
        super().__init__()
        self.dim = dim

    def forward(self, time):
        device = time.device
        half_dim = self.dim // 2
        embeddings = np.log(10000) / (half_dim - 1)
        embeddings = torch.exp(torch.arange(half_dim, device=device) * -embeddings)
        embeddings = time[:, None] * embeddings[None, :]
        embeddings = torch.cat((embeddings.sin(), embeddings.cos()), dim=-1)
        return embeddings


class ResidualBlock(nn.Module):
    """타임스텝 조건화가 있는 잔차 블록"""
    def __init__(self, in_channels, out_channels, time_emb_dim):
        super().__init__()
        self.time_mlp = nn.Sequential(
            nn.SiLU(),
            nn.Linear(time_emb_dim, out_channels)
        )
        self.block1 = nn.Sequential(
            nn.GroupNorm(8, in_channels),
            nn.SiLU(),
            nn.Conv2d(in_channels, out_channels, 3, padding=1)
        )
        self.block2 = nn.Sequential(
            nn.GroupNorm(8, out_channels),
            nn.SiLU(),
            nn.Conv2d(out_channels, out_channels, 3, padding=1)
        )
        self.residual_conv = (
            nn.Conv2d(in_channels, out_channels, 1)
            if in_channels != out_channels else nn.Identity()
        )

    def forward(self, x, time_emb):
        h = self.block1(x)
        time_emb = self.time_mlp(time_emb)[:, :, None, None]
        h = h + time_emb
        h = self.block2(h)
        return h + self.residual_conv(x)


class SimpleUNet(nn.Module):
    """간소화된 U-Net (DDPM용)"""
    def __init__(self, in_channels=1, model_channels=64, time_emb_dim=256):
        super().__init__()

        # 타임스텝 임베딩
        self.time_mlp = nn.Sequential(
            SinusoidalPositionEmbeddings(model_channels),
            nn.Linear(model_channels, time_emb_dim),
            nn.SiLU(),
            nn.Linear(time_emb_dim, time_emb_dim)
        )

        # 인코더
        self.down1 = ResidualBlock(in_channels, model_channels, time_emb_dim)
        self.down2 = ResidualBlock(model_channels, model_channels * 2, time_emb_dim)
        self.pool = nn.MaxPool2d(2)

        # 병목
        self.bottleneck = ResidualBlock(model_channels * 2, model_channels * 2, time_emb_dim)

        # 디코더
        self.up1 = ResidualBlock(model_channels * 4, model_channels, time_emb_dim)
        self.up2 = ResidualBlock(model_channels * 2, model_channels, time_emb_dim)
        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)

        # 출력
        self.final = nn.Conv2d(model_channels, in_channels, 1)

    def forward(self, x, t):
        time_emb = self.time_mlp(t)

        # 인코더 경로
        x1 = self.down1(x, time_emb)
        x2 = self.down2(self.pool(x1), time_emb)

        # 병목
        x_mid = self.bottleneck(x2, time_emb)

        # 디코더 경로 (스킵 연결)
        x = self.up1(torch.cat([self.upsample(x_mid), x2], dim=1), time_emb)
        x = self.up2(torch.cat([self.upsample(x), x1], dim=1), time_emb)

        return self.final(x)

6.4 DDPM 학습과 샘플링

class DDPM:
    """DDPM 학습 및 샘플링"""
    def __init__(self, model, noise_schedule, device):
        self.model = model
        self.schedule = noise_schedule
        self.device = device

    def get_loss(self, x_start, t):
        """학습 손실: 노이즈 예측 오차"""
        noise = torch.randn_like(x_start)
        x_noisy = self.schedule.q_sample(x_start, t, noise)

        # 신경망이 추가된 노이즈를 예측
        predicted_noise = self.model(x_noisy, t)

        # 단순 MSE 손실
        return F.mse_loss(noise, predicted_noise)

    @torch.no_grad()
    def p_sample(self, x_t, t):
        """Reverse process: x_t에서 x_{t-1}로 한 스텝 디노이징"""
        betas_t = self.schedule.betas[t].view(-1, 1, 1, 1).to(self.device)
        sqrt_one_minus_alphas_cumprod_t = (
            self.schedule.sqrt_one_minus_alphas_cumprod[t].view(-1, 1, 1, 1).to(self.device)
        )
        sqrt_recip_alphas_t = torch.sqrt(
            1.0 / self.schedule.alphas[t]
        ).view(-1, 1, 1, 1).to(self.device)

        # 예측된 노이즈
        predicted_noise = self.model(x_t, t)

        # 평균 계산
        model_mean = sqrt_recip_alphas_t * (
            x_t - betas_t * predicted_noise / sqrt_one_minus_alphas_cumprod_t
        )

        if t[0] == 0:
            return model_mean
        else:
            # 분산 추가
            posterior_variance_t = self.schedule.posterior_variance[t].view(-1, 1, 1, 1).to(self.device)
            noise = torch.randn_like(x_t)
            return model_mean + torch.sqrt(posterior_variance_t) * noise

    @torch.no_grad()
    def sample(self, batch_size, image_shape):
        """순수 노이즈에서 시작하여 이미지 생성"""
        img = torch.randn(batch_size, *image_shape).to(self.device)

        for i in reversed(range(self.schedule.timesteps)):
            t = torch.full((batch_size,), i, dtype=torch.long, device=self.device)
            img = self.p_sample(img, t)

        return img


def train_ddpm(epochs=100):
    """DDPM 학습"""
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize([0.5], [0.5])  # [-1, 1] 범위
    ])
    dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
    dataloader = DataLoader(dataset, batch_size=128, shuffle=True)

    model = SimpleUNet(in_channels=1).to(device)
    schedule = NoiseSchedule(timesteps=1000)
    ddpm = DDPM(model, schedule, device)

    optimizer = optim.Adam(model.parameters(), lr=2e-4)

    for epoch in range(epochs):
        total_loss = 0
        for batch, (x, _) in enumerate(dataloader):
            x = x.to(device)
            # 무작위 타임스텝 샘플링
            t = torch.randint(0, schedule.timesteps, (x.size(0),), device=device)

            loss = ddpm.get_loss(x, t)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        if (epoch + 1) % 10 == 0:
            avg_loss = total_loss / len(dataloader)
            print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")
            # 샘플 생성
            samples = ddpm.sample(16, (1, 28, 28))
            print(f"Generated samples: {samples.shape}")

    return model, ddpm

7. DDIM - 빠른 샘플링

DDPM은 1000 스텝의 샘플링이 필요하여 느립니다. DDIM(Denoising Diffusion Implicit Models, Song et al., 2020)은 더 빠른 결정론적 샘플링을 제공합니다.

7.1 DDIM의 아이디어

DDIM은 샘플링 과정을 비마르코프적(non-Markovian) 과정으로 재정의합니다. 핵심은 동일한 학습된 모델을 사용하되, 샘플링 스텝 수를 대폭 줄이는 것입니다 (1000 → 50 스텝).

@torch.no_grad()
def ddim_sample(model, schedule, batch_size, image_shape,
                ddim_timesteps=50, eta=0.0, device='cpu'):
    """
    DDIM 샘플링
    eta=0.0: 완전 결정론적
    eta=1.0: DDPM과 동일
    """
    # 균등 간격으로 타임스텝 선택
    c = schedule.timesteps // ddim_timesteps
    timestep_seq = list(range(0, schedule.timesteps, c))[::-1]

    img = torch.randn(batch_size, *image_shape).to(device)

    for i, t in enumerate(timestep_seq):
        t_tensor = torch.full((batch_size,), t, dtype=torch.long, device=device)
        t_prev = timestep_seq[i + 1] if i + 1 < len(timestep_seq) else -1

        # 현재 타임스텝의 계수
        alpha_bar = schedule.alphas_cumprod[t].to(device)
        alpha_bar_prev = (
            schedule.alphas_cumprod[t_prev].to(device) if t_prev >= 0
            else torch.tensor(1.0, device=device)
        )

        # 노이즈 예측
        pred_noise = model(img, t_tensor)

        # x_0 예측
        pred_x0 = (img - torch.sqrt(1 - alpha_bar) * pred_noise) / torch.sqrt(alpha_bar)
        pred_x0 = torch.clamp(pred_x0, -1, 1)

        # sigma 계산
        sigma = eta * torch.sqrt(
            (1 - alpha_bar_prev) / (1 - alpha_bar) * (1 - alpha_bar / alpha_bar_prev)
        )

        # 방향 계산
        pred_dir = torch.sqrt(1 - alpha_bar_prev - sigma**2) * pred_noise

        # 다음 스텝 이미지
        noise = torch.randn_like(img) if t_prev >= 0 else 0
        img = torch.sqrt(alpha_bar_prev) * pred_x0 + pred_dir + sigma * noise

    return img

8. Stable Diffusion 분석

Stable Diffusion(arXiv:2112.10752)은 디퓨전 모델을 픽셀 공간이 아닌 **잠재 공간(Latent Space)**에서 수행하는 혁신적인 방법입니다.

8.1 잠재 디퓨전 모델 (LDM)

고해상도 이미지(512x512)에 직접 디퓨전을 적용하면 계산 비용이 매우 높습니다. LDM은:

  1. VAE 인코더로 이미지를 잠재 공간으로 압축 (512x512x3 → 64x64x4)
  2. 잠재 공간에서 디퓨전 수행
  3. VAE 디코더로 원본 해상도 복원

이로써 계산 비용이 8배 이상 절감됩니다.

8.2 Stable Diffusion 구성 요소

텍스트 프롬프트 → CLIP 텍스트 인코더 → 텍스트 임베딩
순수 노이즈 z_T → [U-Net with Cross-Attention] → 잠재 벡터 z_0
                                    VAE 디코더 → 최종 이미지

CLIP 텍스트 인코더: 텍스트를 의미 있는 벡터 표현으로 변환합니다.

U-Net with Cross-Attention: 텍스트 임베딩을 교차 주의(cross-attention)를 통해 조건으로 사용합니다.

Classifier-Free Guidance (CFG): 텍스트 조건 없이 생성한 예측과 조건 있을 때의 예측을 결합하여 텍스트 충실도를 높입니다.

guided_noise = uncond_noise + guidance_scale * (cond_noise - uncond_noise)

8.3 diffusers 라이브러리로 Stable Diffusion 사용

from diffusers import StableDiffusionPipeline, DPMSolverMultistepScheduler
import torch

# 모델 로드
model_id = "stabilityai/stable-diffusion-2-1"
pipe = StableDiffusionPipeline.from_pretrained(
    model_id,
    torch_dtype=torch.float16,
    use_safetensors=True,
)

# 빠른 스케줄러 사용
pipe.scheduler = DPMSolverMultistepScheduler.from_config(pipe.scheduler.config)
pipe = pipe.to("cuda")

# 이미지 생성
prompt = "a photorealistic landscape of mountains at sunset, 8k, highly detailed"
negative_prompt = "blurry, low quality, distorted"

image = pipe(
    prompt,
    negative_prompt=negative_prompt,
    num_inference_steps=25,    # DDPM의 1000 스텝을 25 스텝으로
    guidance_scale=7.5,        # CFG 스케일
    height=512,
    width=512,
    generator=torch.Generator("cuda").manual_seed(42)
).images[0]

image.save("generated_image.png")


# 이미지-이미지 변환
from diffusers import StableDiffusionImg2ImgPipeline
from PIL import Image

img2img_pipe = StableDiffusionImg2ImgPipeline.from_pretrained(
    model_id,
    torch_dtype=torch.float16
).to("cuda")

init_image = Image.open("input.jpg").resize((512, 512))
result = img2img_pipe(
    prompt="a painting in Van Gogh style",
    image=init_image,
    strength=0.75,  # 원본 이미지 보존 정도 (0-1)
    guidance_scale=7.5,
    num_inference_steps=50
).images[0]

8.4 Inpainting (이미지 인페인팅)

from diffusers import StableDiffusionInpaintPipeline

inpaint_pipe = StableDiffusionInpaintPipeline.from_pretrained(
    "runwayml/stable-diffusion-inpainting",
    torch_dtype=torch.float16
).to("cuda")

# 원본 이미지와 마스크
image = Image.open("photo.jpg").resize((512, 512))
mask = Image.open("mask.jpg").resize((512, 512))  # 흰색 = 인페인팅 영역

result = inpaint_pipe(
    prompt="a beautiful garden with flowers",
    image=image,
    mask_image=mask,
    num_inference_steps=50
).images[0]

9. ControlNet - 세밀한 이미지 제어

9.1 ControlNet 아키텍처

ControlNet(Zhang et al., 2023)은 Stable Diffusion에 추가적인 제어 신호(Canny 엣지, 깊이 맵, 포즈 등)를 조건으로 추가할 수 있게 합니다.

기존 U-Net 가중치는 고정하고, 별도의 제어 네트워크를 추가하는 방식입니다. 약 360M 파라미터의 Stable Diffusion에 추가로 360M 파라미터의 복사본을 훈련합니다.

from diffusers import StableDiffusionControlNetPipeline, ControlNetModel
from diffusers.utils import load_image
import cv2
import numpy as np

# Canny ControlNet 로드
controlnet = ControlNetModel.from_pretrained(
    "lllyasviel/sd-controlnet-canny",
    torch_dtype=torch.float16
)
pipe = StableDiffusionControlNetPipeline.from_pretrained(
    "runwayml/stable-diffusion-v1-5",
    controlnet=controlnet,
    torch_dtype=torch.float16
).to("cuda")

# Canny 엣지 추출
image = load_image("input.jpg")
image_array = np.array(image)
canny = cv2.Canny(image_array, 100, 200)
canny_image = Image.fromarray(canny)

# ControlNet으로 이미지 생성
result = pipe(
    prompt="a beautiful oil painting",
    image=canny_image,       # Canny 엣지를 제어 신호로 사용
    controlnet_conditioning_scale=1.0,
    num_inference_steps=50,
    guidance_scale=7.5
).images[0]

10. 최신 생성 모델 트렌드

10.1 DiT (Diffusion Transformers)

Peebles & Xie(2022)가 제안한 DiT는 U-Net 대신 Transformer 아키텍처를 디퓨전 모델의 백본으로 사용합니다. Sora, Flux, SD3 등 최신 모델들이 DiT 기반입니다.

class DiTBlock(nn.Module):
    """Diffusion Transformer Block"""
    def __init__(self, hidden_dim, num_heads, mlp_ratio=4.0):
        super().__init__()
        self.norm1 = nn.LayerNorm(hidden_dim)
        self.attn = nn.MultiheadAttention(hidden_dim, num_heads, batch_first=True)
        self.norm2 = nn.LayerNorm(hidden_dim)

        mlp_dim = int(hidden_dim * mlp_ratio)
        self.mlp = nn.Sequential(
            nn.Linear(hidden_dim, mlp_dim),
            nn.GELU(),
            nn.Linear(mlp_dim, hidden_dim)
        )

        # AdaLN (Adaptive Layer Normalization): 타임스텝 조건화
        self.adaLN_modulation = nn.Sequential(
            nn.SiLU(),
            nn.Linear(hidden_dim, 6 * hidden_dim)
        )

    def forward(self, x, c):
        # 타임스텝/조건 임베딩에서 변조 파라미터 계산
        shift_msa, scale_msa, gate_msa, shift_mlp, scale_mlp, gate_mlp = (
            self.adaLN_modulation(c).chunk(6, dim=-1)
        )

        # 변조된 정규화
        x_norm = (1 + scale_msa.unsqueeze(1)) * self.norm1(x) + shift_msa.unsqueeze(1)
        attn_out, _ = self.attn(x_norm, x_norm, x_norm)
        x = x + gate_msa.unsqueeze(1) * attn_out

        x_norm = (1 + scale_mlp.unsqueeze(1)) * self.norm2(x) + shift_mlp.unsqueeze(1)
        x = x + gate_mlp.unsqueeze(1) * self.mlp(x_norm)

        return x

10.2 SDXL (Stable Diffusion XL)

더 고해상도(1024x1024)의 이미지를 생성하는 Stable Diffusion의 업그레이드 버전입니다.

from diffusers import StableDiffusionXLPipeline

pipe = StableDiffusionXLPipeline.from_pretrained(
    "stabilityai/stable-diffusion-xl-base-1.0",
    torch_dtype=torch.float16,
    use_safetensors=True,
    variant="fp16"
).to("cuda")

image = pipe(
    prompt="a majestic lion in photorealistic style, 4k",
    negative_prompt="cartoon, blurry",
    height=1024,
    width=1024,
    num_inference_steps=50,
    guidance_scale=5.0
).images[0]

10.3 생성 모델 비교

모델연도핵심 아이디어장점단점
VAE2013잠재 분포 학습안정적, 잠재 공간 해석 가능흐릿한 이미지
GAN2014적대적 학습날카로운 이미지학습 불안정, 모드 붕괴
DDPM2020반복적 디노이징높은 품질, 다양성느린 샘플링
LDM2022잠재 공간 디퓨전효율적, 텍스트 조건복잡한 구조
DiT2022Transformer 백본스케일링 효율높은 계산 비용

마무리

생성형 AI의 여정을 VAE의 잠재 공간 이론부터 시작하여, GAN의 적대적 게임, DDPM의 반복적 디노이징, 그리고 Stable Diffusion의 잠재 공간 확산까지 완전히 탐험했습니다.

이 분야는 매우 빠르게 발전하고 있습니다. 오늘의 최신 기술이 내일은 기본이 됩니다. 기초를 탄탄히 이해하고, 최신 논문을 지속적으로 팔로우하는 것이 중요합니다.

계속 학습을 위한 추천 자료입니다.

  • Hugging Face Diffusers: https://huggingface.co/docs/diffusers/
  • VAE 원논문: arXiv:1312.6114
  • GAN 원논문: arXiv:1406.2661
  • DDPM 원논문: arXiv:2006.11239
  • LDM/Stable Diffusion: arXiv:2112.10752

참고 자료

  • Kingma, D. P., & Welling, M. (2013). Auto-Encoding Variational Bayes. arXiv:1312.6114
  • Goodfellow, I., et al. (2014). Generative Adversarial Networks. arXiv:1406.2661
  • Ho, J., et al. (2020). Denoising Diffusion Probabilistic Models. arXiv:2006.11239
  • Rombach, R., et al. (2022). High-Resolution Image Synthesis with Latent Diffusion Models. arXiv:2112.10752
  • Arjovsky, M., et al. (2017). Wasserstein GAN. arXiv:1701.07875
  • Peebles, W., & Xie, S. (2022). Scalable Diffusion Models with Transformers. arXiv:2212.09748
  • Hugging Face Diffusers: https://huggingface.co/docs/diffusers/

Generative AI Complete Guide: Master GANs, VAEs, and Diffusion Models

Introduction

Generative AI is the hottest area in technology today. DALL-E 3 creates photorealistic images from a single line of text, Stable Diffusion generates artwork, and Sora produces videos. Behind all these breakthroughs lie generative models developed over decades.

This guide provides complete coverage of VAE (Variational Autoencoder), GAN (Generative Adversarial Network), and modern diffusion models — with mathematical intuition and full PyTorch code implementations.


1. Overview of Generative Models

1.1 Generative vs Discriminative Models

Deep learning models are broadly divided into two categories.

Discriminative Model: Given input data x, learns the conditional probability P(y|x). Used for image classification, object detection, etc.

Generative Model: Learns the joint probability distribution P(x) or P(x, y). After training, it can generate new data samples.

The core question of generative models: "How was this data generated? How can we generate new samples from the same distribution?"

1.2 The Concept of Latent Space

Most generative models leverage a latent space — a lower-dimensional representation space.

For example, a 28x28 MNIST image (784 dimensions) can be compressed into a 2-to-100 dimensional latent vector. In this latent space:

  • The digit '3' and digit '8' are positioned close to each other
  • Decoding a midpoint vector produces something between '3' and '8'
  • Walking through (interpolating) the latent space enables continuous transformations

1.3 Applications of Generative Models

  • Image generation: Realistic faces, landscapes, artwork
  • Image-to-image translation: Day photo to night photo, sketch to photo
  • Data augmentation: Supplement scarce training data
  • Drug discovery: Generate novel molecular structures
  • Text generation: GPT family language models
  • Music/speech synthesis: Compose music, TTS systems

2. Autoencoder

We begin with the autoencoder, the foundation needed to understand VAEs.

2.1 Encoder-Decoder Architecture

An autoencoder has two parts.

  • Encoder: High-dimensional input x → low-dimensional latent vector z
  • Decoder: Low-dimensional latent vector z → reconstructed output x'

Goal: Train so that x' is as similar as possible to x (minimize reconstruction loss).

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

class Autoencoder(nn.Module):
    """Basic Autoencoder for MNIST"""
    def __init__(self, input_dim=784, latent_dim=32):
        super().__init__()

        # Encoder
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, latent_dim),
            nn.ReLU()
        )

        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Linear(256, input_dim),
            nn.Sigmoid()  # Constrain pixel values to [0, 1]
        )

    def forward(self, x):
        x = x.view(x.size(0), -1)
        z = self.encoder(x)
        x_reconstructed = self.decoder(z)
        return x_reconstructed, z

    def encode(self, x):
        x = x.view(x.size(0), -1)
        return self.encoder(x)

    def decode(self, z):
        return self.decoder(z).view(-1, 1, 28, 28)


def train_autoencoder(epochs=20):
    """Train Autoencoder"""
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    transform = transforms.Compose([transforms.ToTensor()])
    train_dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
    train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)

    model = Autoencoder().to(device)
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    criterion = nn.BCELoss()

    for epoch in range(epochs):
        total_loss = 0
        for data, _ in train_loader:
            data = data.to(device)
            optimizer.zero_grad()
            reconstructed, z = model(data)
            target = data.view(data.size(0), -1)
            loss = criterion(reconstructed, target)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        avg_loss = total_loss / len(train_loader)
        print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")

    return model

2.2 Limitations of Basic Autoencoders

Basic autoencoders compress data well, but generating new samples is difficult. The latent space is not regularized, so decoding a random latent vector often produces meaningless images.

This is exactly the problem that VAEs solve.


3. Variational Autoencoder (VAE)

The VAE was proposed in the groundbreaking 2013 paper by Kingma and Welling (arXiv:1312.6114).

3.1 Core Idea of VAE

The core of VAE is learning a probability distribution in the latent space.

  • Basic autoencoder: z = encoder(x) (deterministic point)
  • VAE: z ~ N(μ, σ²) (sampled from a Gaussian distribution)

The encoder now outputs the distribution parameters (mean μ, variance σ²) instead of a latent vector z directly.

When generating new images, we sample z from a standard normal distribution N(0, I) and feed it to the decoder.

3.2 ELBO (Evidence Lower Bound)

The training objective of VAE is to maximize the data log-likelihood log P(x). Since this is hard to optimize directly, we maximize the ELBO instead.

log P(x) >= E_q[log P(x|z)] - KL[q(z|x) || P(z)]
                 ↑                     ↑
        Reconstruction loss       KL Divergence

ELBO = Reconstruction Loss + KL Divergence

  • Reconstruction loss: How similar is the decoded output to the original?
  • KL Divergence: How close is the learned latent distribution q(z|x) to the prior P(z) = N(0, I)?

3.3 Reparameterization Trick

Directly sampling z ~ N(μ, σ²) blocks backpropagation. The reparameterization trick solves this.

z = μ + σ * ε,  ε ~ N(0, I)

This separates the randomness (ε) from the network parameters, enabling gradient computation with respect to μ and σ.

3.4 Complete VAE Implementation (MNIST)

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import numpy as np

class VAE(nn.Module):
    """Variational Autoencoder"""
    def __init__(self, input_dim=784, hidden_dim=400, latent_dim=20):
        super(VAE, self).__init__()
        self.latent_dim = latent_dim

        # Encoder: input -> μ, log(σ²)
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc_mu = nn.Linear(hidden_dim, latent_dim)       # Mean
        self.fc_logvar = nn.Linear(hidden_dim, latent_dim)   # Log variance

        # Decoder: latent vector -> reconstruction
        self.fc3 = nn.Linear(latent_dim, hidden_dim)
        self.fc4 = nn.Linear(hidden_dim, input_dim)

    def encode(self, x):
        """Encoder: x -> (μ, log_var)"""
        h = F.relu(self.fc1(x))
        return self.fc_mu(h), self.fc_logvar(h)

    def reparameterize(self, mu, logvar):
        """Reparameterization trick: z = μ + ε * σ"""
        if self.training:
            std = torch.exp(0.5 * logvar)  # σ = exp(0.5 * log σ²)
            eps = torch.randn_like(std)     # ε ~ N(0, I)
            return mu + eps * std
        else:
            return mu  # During inference, use mean only

    def decode(self, z):
        """Decoder: z -> x'"""
        h = F.relu(self.fc3(z))
        return torch.sigmoid(self.fc4(h))

    def forward(self, x):
        x_flat = x.view(-1, 784)
        mu, logvar = self.encode(x_flat)
        z = self.reparameterize(mu, logvar)
        x_recon = self.decode(z)
        return x_recon, mu, logvar

    def generate(self, num_samples, device):
        """Generate images by sampling from standard normal distribution"""
        with torch.no_grad():
            z = torch.randn(num_samples, self.latent_dim).to(device)
            samples = self.decode(z)
            return samples.view(num_samples, 1, 28, 28)


def vae_loss(x_recon, x, mu, logvar, beta=1.0):
    """
    VAE loss = Reconstruction loss + β * KL Divergence
    beta=1: Standard VAE
    beta>1: β-VAE (more disentangled representations)
    """
    # Reconstruction loss (BCE)
    recon_loss = F.binary_cross_entropy(
        x_recon, x.view(-1, 784),
        reduction='sum'
    )

    # KL Divergence: KL[N(μ, σ²) || N(0, 1)]
    # = -0.5 * Σ(1 + log σ² - μ² - σ²)
    kl_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())

    return recon_loss + beta * kl_loss


def train_vae(epochs=50, latent_dim=20, beta=1.0):
    """Train VAE"""
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    transform = transforms.Compose([transforms.ToTensor()])
    train_dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
    train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)

    model = VAE(latent_dim=latent_dim).to(device)
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    train_losses = []

    for epoch in range(epochs):
        model.train()
        total_loss = 0

        for data, _ in train_loader:
            data = data.to(device)
            optimizer.zero_grad()
            x_recon, mu, logvar = model(data)
            loss = vae_loss(x_recon, data, mu, logvar, beta)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        avg_loss = total_loss / len(train_dataset)
        train_losses.append(avg_loss)

        if (epoch + 1) % 10 == 0:
            print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")

    return model, train_losses


def interpolate_latent_space(model, img1, img2, steps=10, device='cpu'):
    """Interpolate between two images in latent space"""
    model.eval()
    with torch.no_grad():
        z1_flat = img1.view(-1, 784).to(device)
        z2_flat = img2.view(-1, 784).to(device)

        mu1, _ = model.encode(z1_flat)
        mu2, _ = model.encode(z2_flat)

        # Linear interpolation
        interpolated_images = []
        for alpha in np.linspace(0, 1, steps):
            z_interp = (1 - alpha) * mu1 + alpha * mu2
            img_recon = model.decode(z_interp)
            interpolated_images.append(img_recon.view(1, 28, 28))

    return interpolated_images

3.5 Convolutional VAE (for CIFAR-10)

class ConvVAE(nn.Module):
    """Convolutional VAE for color images"""
    def __init__(self, latent_dim=128):
        super().__init__()
        self.latent_dim = latent_dim

        # Convolutional encoder
        self.encoder_conv = nn.Sequential(
            nn.Conv2d(3, 32, 4, stride=2, padding=1),   # 16x16
            nn.ReLU(),
            nn.Conv2d(32, 64, 4, stride=2, padding=1),  # 8x8
            nn.ReLU(),
            nn.Conv2d(64, 128, 4, stride=2, padding=1), # 4x4
            nn.ReLU(),
        )
        self.fc_mu = nn.Linear(128 * 4 * 4, latent_dim)
        self.fc_logvar = nn.Linear(128 * 4 * 4, latent_dim)

        # Transposed convolutional decoder
        self.decoder_fc = nn.Linear(latent_dim, 128 * 4 * 4)
        self.decoder_conv = nn.Sequential(
            nn.ConvTranspose2d(128, 64, 4, stride=2, padding=1),  # 8x8
            nn.ReLU(),
            nn.ConvTranspose2d(64, 32, 4, stride=2, padding=1),   # 16x16
            nn.ReLU(),
            nn.ConvTranspose2d(32, 3, 4, stride=2, padding=1),    # 32x32
            nn.Sigmoid()
        )

    def encode(self, x):
        h = self.encoder_conv(x).view(x.size(0), -1)
        return self.fc_mu(h), self.fc_logvar(h)

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def decode(self, z):
        h = F.relu(self.decoder_fc(z)).view(-1, 128, 4, 4)
        return self.decoder_conv(h)

    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        return self.decode(z), mu, logvar

4. Generative Adversarial Networks (GAN)

GAN was proposed by Ian Goodfellow in a 2014 paper (arXiv:1406.2661) — a revolutionary idea.

4.1 The Generator-Discriminator Game

GAN involves two neural networks competing and learning from each other.

Generator (G): Creates fake data from random noise z.

  • Goal: Generate data realistic enough to fool the Discriminator

Discriminator (D): Distinguishes between real and generated fake data.

  • Goal: Classify real data as 1, fake data as 0

In this game, both networks compete until they reach a Nash Equilibrium.

4.2 Minimax Loss

min_G max_D [E_x[log D(x)] + E_z[log(1 - D(G(z)))]]
  • D maximizes: Maximize log D(x) on real data, maximize log(1 - D(G(z))) on fake data
  • G minimizes: Minimize log(1 - D(G(z))) so that G(z) fools D

In practice, we use -log(D(G(z))) as the generator loss (non-saturating loss).

4.3 Basic GAN Implementation

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
import numpy as np

class Generator(nn.Module):
    """GAN Generator"""
    def __init__(self, noise_dim=100, output_dim=784):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(noise_dim, 256),
            nn.LeakyReLU(0.2),
            nn.BatchNorm1d(256),
            nn.Linear(256, 512),
            nn.LeakyReLU(0.2),
            nn.BatchNorm1d(512),
            nn.Linear(512, 1024),
            nn.LeakyReLU(0.2),
            nn.BatchNorm1d(1024),
            nn.Linear(1024, output_dim),
            nn.Tanh()  # Range [-1, 1]
        )

    def forward(self, z):
        return self.model(z).view(-1, 1, 28, 28)


class Discriminator(nn.Module):
    """GAN Discriminator"""
    def __init__(self, input_dim=784):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, 1024),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),
            nn.Linear(1024, 512),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),
            nn.Linear(512, 256),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),
            nn.Linear(256, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.model(x.view(x.size(0), -1))


def train_gan(epochs=200, noise_dim=100, lr=2e-4):
    """Train GAN"""
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize([0.5], [0.5])  # Normalize to [-1, 1]
    ])
    train_dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
    dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)

    G = Generator(noise_dim).to(device)
    D = Discriminator().to(device)

    optimizer_G = optim.Adam(G.parameters(), lr=lr, betas=(0.5, 0.999))
    optimizer_D = optim.Adam(D.parameters(), lr=lr, betas=(0.5, 0.999))

    criterion = nn.BCELoss()

    for epoch in range(epochs):
        for i, (real_imgs, _) in enumerate(dataloader):
            batch_size = real_imgs.size(0)
            real_imgs = real_imgs.to(device)

            real_labels = torch.ones(batch_size, 1).to(device)
            fake_labels = torch.zeros(batch_size, 1).to(device)

            # === Update Discriminator ===
            optimizer_D.zero_grad()

            d_real = D(real_imgs)
            d_loss_real = criterion(d_real, real_labels)

            z = torch.randn(batch_size, noise_dim).to(device)
            fake_imgs = G(z).detach()  # Detach G's gradients
            d_fake = D(fake_imgs)
            d_loss_fake = criterion(d_fake, fake_labels)

            d_loss = d_loss_real + d_loss_fake
            d_loss.backward()
            optimizer_D.step()

            # === Update Generator ===
            optimizer_G.zero_grad()

            z = torch.randn(batch_size, noise_dim).to(device)
            fake_imgs = G(z)
            g_pred = D(fake_imgs)
            # G wants D to classify fakes as real
            g_loss = criterion(g_pred, real_labels)

            g_loss.backward()
            optimizer_G.step()

        if (epoch + 1) % 20 == 0:
            print(f"Epoch {epoch+1}/{epochs} | D Loss: {d_loss.item():.4f} | G Loss: {g_loss.item():.4f}")

    return G, D

4.4 Mode Collapse

One of the biggest problems with GANs. The Generator stops producing diverse images and repeatedly generates only a few patterns that successfully fool the Discriminator.


5. Evolution of GANs

5.1 DCGAN (Deep Convolutional GAN)

Proposed in 2015, DCGAN successfully applied convolutional neural networks to GANs.

class DCGANGenerator(nn.Module):
    """DCGAN Generator (for 64x64 images)"""
    def __init__(self, noise_dim=100, ngf=64, nc=3):
        super().__init__()
        self.main = nn.Sequential(
            # Input: noise_dim x 1 x 1
            nn.ConvTranspose2d(noise_dim, ngf * 8, 4, 1, 0, bias=False),
            nn.BatchNorm2d(ngf * 8),
            nn.ReLU(True),
            # State: (ngf*8) x 4 x 4
            nn.ConvTranspose2d(ngf * 8, ngf * 4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf * 4),
            nn.ReLU(True),
            # State: (ngf*4) x 8 x 8
            nn.ConvTranspose2d(ngf * 4, ngf * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf * 2),
            nn.ReLU(True),
            # State: (ngf*2) x 16 x 16
            nn.ConvTranspose2d(ngf * 2, ngf, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf),
            nn.ReLU(True),
            # State: (ngf) x 32 x 32
            nn.ConvTranspose2d(ngf, nc, 4, 2, 1, bias=False),
            nn.Tanh()
            # Output: nc x 64 x 64
        )

    def forward(self, z):
        z = z.view(z.size(0), -1, 1, 1)
        return self.main(z)


class DCGANDiscriminator(nn.Module):
    """DCGAN Discriminator (for 64x64 images)"""
    def __init__(self, ndf=64, nc=3):
        super().__init__()
        self.main = nn.Sequential(
            # Input: nc x 64 x 64
            nn.Conv2d(nc, ndf, 4, 2, 1, bias=False),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(ndf, ndf * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 2),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(ndf * 2, ndf * 4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 4),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(ndf * 4, ndf * 8, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 8),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(ndf * 8, 1, 4, 1, 0, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.main(x).view(-1, 1)

5.2 WGAN (Wasserstein GAN)

WGAN (arXiv:1701.07875) uses Wasserstein distance instead of Jensen-Shannon Divergence, greatly improving training stability.

class WGANDiscriminator(nn.Module):
    """WGAN Critic: no Sigmoid output"""
    def __init__(self, input_dim=784):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, 512),
            nn.LeakyReLU(0.2),
            nn.Linear(512, 256),
            nn.LeakyReLU(0.2),
            nn.Linear(256, 1)
            # No Sigmoid! Needed for Wasserstein distance computation
        )

    def forward(self, x):
        return self.model(x.view(x.size(0), -1))


def train_wgan(G, D, dataloader, device, epochs=100,
               n_critic=5, clip_value=0.01, lr=5e-5):
    """Train WGAN"""
    optimizer_G = optim.RMSprop(G.parameters(), lr=lr)
    optimizer_D = optim.RMSprop(D.parameters(), lr=lr)

    for epoch in range(epochs):
        for i, (real_imgs, _) in enumerate(dataloader):
            real_imgs = real_imgs.to(device)
            batch_size = real_imgs.size(0)

            # Update Critic n_critic times
            for _ in range(n_critic):
                optimizer_D.zero_grad()

                z = torch.randn(batch_size, 100).to(device)
                fake_imgs = G(z).detach()

                # Wasserstein loss: E[D(real)] - E[D(fake)]
                d_loss = -torch.mean(D(real_imgs)) + torch.mean(D(fake_imgs))
                d_loss.backward()
                optimizer_D.step()

                # Weight clipping (Lipschitz constraint)
                for p in D.parameters():
                    p.data.clamp_(-clip_value, clip_value)

            # Update Generator
            optimizer_G.zero_grad()
            z = torch.randn(batch_size, 100).to(device)
            fake_imgs = G(z)
            g_loss = -torch.mean(D(fake_imgs))
            g_loss.backward()
            optimizer_G.step()

5.3 WGAN-GP (Gradient Penalty)

Instead of weight clipping in WGAN, gradient penalty enforces the Lipschitz constraint more effectively.

def gradient_penalty(D, real_imgs, fake_imgs, device):
    """Compute gradient penalty"""
    batch_size = real_imgs.size(0)
    # Random interpolation between real and fake images
    alpha = torch.rand(batch_size, 1, 1, 1).to(device)
    interpolated = alpha * real_imgs + (1 - alpha) * fake_imgs
    interpolated.requires_grad_(True)

    d_interpolated = D(interpolated)

    gradients = torch.autograd.grad(
        outputs=d_interpolated,
        inputs=interpolated,
        grad_outputs=torch.ones_like(d_interpolated),
        create_graph=True,
        retain_graph=True
    )[0]

    gradients = gradients.view(batch_size, -1)
    gradient_norm = gradients.norm(2, dim=1)
    penalty = ((gradient_norm - 1) ** 2).mean()
    return penalty


def wgan_gp_d_loss(D, real_imgs, fake_imgs, device, lambda_gp=10):
    """WGAN-GP Discriminator loss"""
    d_real = D(real_imgs).mean()
    d_fake = D(fake_imgs).mean()
    gp = gradient_penalty(D, real_imgs, fake_imgs, device)
    return -d_real + d_fake + lambda_gp * gp

5.4 Conditional GAN (cGAN)

Generate images of specific classes by adding a label condition.

class ConditionalGenerator(nn.Module):
    """Conditional GAN Generator"""
    def __init__(self, noise_dim=100, num_classes=10, embed_dim=50):
        super().__init__()
        self.label_embedding = nn.Embedding(num_classes, embed_dim)
        self.model = nn.Sequential(
            nn.Linear(noise_dim + embed_dim, 256),
            nn.LeakyReLU(0.2),
            nn.Linear(256, 512),
            nn.LeakyReLU(0.2),
            nn.Linear(512, 784),
            nn.Tanh()
        )

    def forward(self, z, labels):
        label_embed = self.label_embedding(labels)
        x = torch.cat([z, label_embed], dim=1)
        return self.model(x).view(-1, 1, 28, 28)

6. Diffusion Models

Diffusion models are now the standard for image generation. DDPM (Denoising Diffusion Probabilistic Models, Ho et al., 2020, arXiv:2006.11239) pioneered this field.

6.1 Intuition for Diffusion Models

The core idea has two processes.

Forward Process (Adding Noise): Gradually add Gaussian noise to a real image until it becomes pure noise. After T steps, it becomes a standard normal distribution.

Reverse Process (Removing Noise): Starting from pure noise, progressively remove noise to recover the original image. A neural network learns this reverse process.

6.2 Forward Process Math

At each step, noise is added.

q(x_t | x_{t-1}) = N(x_t; sqrt(1-β_t) * x_{t-1}, β_t * I)

Where β_t is the noise schedule.

Jump directly to step t (important property):

q(x_t | x_0) = N(x_t; sqrt(ā_t) * x_0, (1-ā_t) * I)

Where ā_t = product of (1 - β_s) for s from 1 to t.

This allows computing the noisy image at any arbitrary timestep t directly.

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

class NoiseSchedule:
    """Manage the noise schedule"""
    def __init__(self, timesteps=1000, beta_start=1e-4, beta_end=0.02):
        self.timesteps = timesteps

        # Linear noise schedule
        self.betas = torch.linspace(beta_start, beta_end, timesteps)
        self.alphas = 1.0 - self.betas
        self.alphas_cumprod = torch.cumprod(self.alphas, dim=0)
        self.alphas_cumprod_prev = F.pad(self.alphas_cumprod[:-1], (1, 0), value=1.0)

        # Forward process coefficients
        self.sqrt_alphas_cumprod = torch.sqrt(self.alphas_cumprod)
        self.sqrt_one_minus_alphas_cumprod = torch.sqrt(1.0 - self.alphas_cumprod)

        # Reverse process coefficients
        self.posterior_variance = (
            self.betas * (1.0 - self.alphas_cumprod_prev) /
            (1.0 - self.alphas_cumprod)
        )

    def q_sample(self, x_start, t, noise=None):
        """Forward process: add t steps of noise to x_0"""
        if noise is None:
            noise = torch.randn_like(x_start)

        sqrt_alphas_cumprod_t = self.sqrt_alphas_cumprod[t].view(-1, 1, 1, 1)
        sqrt_one_minus_alphas_cumprod_t = self.sqrt_one_minus_alphas_cumprod[t].view(-1, 1, 1, 1)

        return sqrt_alphas_cumprod_t * x_start + sqrt_one_minus_alphas_cumprod_t * noise

6.3 U-Net Architecture

The noise prediction network of diffusion models uses a U-Net architecture, conditioned on the timestep t via sinusoidal embeddings.

class SinusoidalPositionEmbeddings(nn.Module):
    """Sinusoidal position embeddings for timesteps"""
    def __init__(self, dim):
        super().__init__()
        self.dim = dim

    def forward(self, time):
        device = time.device
        half_dim = self.dim // 2
        embeddings = np.log(10000) / (half_dim - 1)
        embeddings = torch.exp(torch.arange(half_dim, device=device) * -embeddings)
        embeddings = time[:, None] * embeddings[None, :]
        embeddings = torch.cat((embeddings.sin(), embeddings.cos()), dim=-1)
        return embeddings


class ResidualBlock(nn.Module):
    """Residual block with timestep conditioning"""
    def __init__(self, in_channels, out_channels, time_emb_dim):
        super().__init__()
        self.time_mlp = nn.Sequential(
            nn.SiLU(),
            nn.Linear(time_emb_dim, out_channels)
        )
        self.block1 = nn.Sequential(
            nn.GroupNorm(8, in_channels),
            nn.SiLU(),
            nn.Conv2d(in_channels, out_channels, 3, padding=1)
        )
        self.block2 = nn.Sequential(
            nn.GroupNorm(8, out_channels),
            nn.SiLU(),
            nn.Conv2d(out_channels, out_channels, 3, padding=1)
        )
        self.residual_conv = (
            nn.Conv2d(in_channels, out_channels, 1)
            if in_channels != out_channels else nn.Identity()
        )

    def forward(self, x, time_emb):
        h = self.block1(x)
        time_emb = self.time_mlp(time_emb)[:, :, None, None]
        h = h + time_emb
        h = self.block2(h)
        return h + self.residual_conv(x)


class SimpleUNet(nn.Module):
    """Simplified U-Net for DDPM"""
    def __init__(self, in_channels=1, model_channels=64, time_emb_dim=256):
        super().__init__()

        # Timestep embedding
        self.time_mlp = nn.Sequential(
            SinusoidalPositionEmbeddings(model_channels),
            nn.Linear(model_channels, time_emb_dim),
            nn.SiLU(),
            nn.Linear(time_emb_dim, time_emb_dim)
        )

        # Encoder
        self.down1 = ResidualBlock(in_channels, model_channels, time_emb_dim)
        self.down2 = ResidualBlock(model_channels, model_channels * 2, time_emb_dim)
        self.pool = nn.MaxPool2d(2)

        # Bottleneck
        self.bottleneck = ResidualBlock(model_channels * 2, model_channels * 2, time_emb_dim)

        # Decoder
        self.up1 = ResidualBlock(model_channels * 4, model_channels, time_emb_dim)
        self.up2 = ResidualBlock(model_channels * 2, model_channels, time_emb_dim)
        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)

        # Output
        self.final = nn.Conv2d(model_channels, in_channels, 1)

    def forward(self, x, t):
        time_emb = self.time_mlp(t)

        # Encoder path
        x1 = self.down1(x, time_emb)
        x2 = self.down2(self.pool(x1), time_emb)

        # Bottleneck
        x_mid = self.bottleneck(x2, time_emb)

        # Decoder path (with skip connections)
        x = self.up1(torch.cat([self.upsample(x_mid), x2], dim=1), time_emb)
        x = self.up2(torch.cat([self.upsample(x), x1], dim=1), time_emb)

        return self.final(x)

6.4 DDPM Training and Sampling

class DDPM:
    """DDPM training and sampling"""
    def __init__(self, model, noise_schedule, device):
        self.model = model
        self.schedule = noise_schedule
        self.device = device

    def get_loss(self, x_start, t):
        """Training loss: noise prediction error"""
        noise = torch.randn_like(x_start)
        x_noisy = self.schedule.q_sample(x_start, t, noise)

        # Neural network predicts the added noise
        predicted_noise = self.model(x_noisy, t)

        return F.mse_loss(noise, predicted_noise)

    @torch.no_grad()
    def p_sample(self, x_t, t):
        """Reverse process: denoise one step from x_t to x_{t-1}"""
        betas_t = self.schedule.betas[t].view(-1, 1, 1, 1).to(self.device)
        sqrt_one_minus_alphas_cumprod_t = (
            self.schedule.sqrt_one_minus_alphas_cumprod[t].view(-1, 1, 1, 1).to(self.device)
        )
        sqrt_recip_alphas_t = torch.sqrt(
            1.0 / self.schedule.alphas[t]
        ).view(-1, 1, 1, 1).to(self.device)

        predicted_noise = self.model(x_t, t)

        model_mean = sqrt_recip_alphas_t * (
            x_t - betas_t * predicted_noise / sqrt_one_minus_alphas_cumprod_t
        )

        if t[0] == 0:
            return model_mean
        else:
            posterior_variance_t = (
                self.schedule.posterior_variance[t].view(-1, 1, 1, 1).to(self.device)
            )
            noise = torch.randn_like(x_t)
            return model_mean + torch.sqrt(posterior_variance_t) * noise

    @torch.no_grad()
    def sample(self, batch_size, image_shape):
        """Generate images starting from pure noise"""
        img = torch.randn(batch_size, *image_shape).to(self.device)

        for i in reversed(range(self.schedule.timesteps)):
            t = torch.full((batch_size,), i, dtype=torch.long, device=self.device)
            img = self.p_sample(img, t)

        return img


def train_ddpm(epochs=100):
    """Train DDPM"""
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize([0.5], [0.5])
    ])
    dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
    dataloader = DataLoader(dataset, batch_size=128, shuffle=True)

    model = SimpleUNet(in_channels=1).to(device)
    schedule = NoiseSchedule(timesteps=1000)
    ddpm = DDPM(model, schedule, device)

    optimizer = optim.Adam(model.parameters(), lr=2e-4)

    for epoch in range(epochs):
        total_loss = 0
        for batch, (x, _) in enumerate(dataloader):
            x = x.to(device)
            # Random timestep sampling
            t = torch.randint(0, schedule.timesteps, (x.size(0),), device=device)

            loss = ddpm.get_loss(x, t)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        if (epoch + 1) % 10 == 0:
            avg_loss = total_loss / len(dataloader)
            print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")

    return model, ddpm

7. DDIM - Fast Sampling

DDPM requires 1000 sampling steps, making it slow. DDIM (Denoising Diffusion Implicit Models, Song et al., 2020) provides faster, deterministic sampling.

7.1 DDIM's Idea

DDIM redefines the sampling process as a non-Markovian process. The key is using the same trained model but drastically reducing the number of sampling steps (1000 → 50 steps).

@torch.no_grad()
def ddim_sample(model, schedule, batch_size, image_shape,
                ddim_timesteps=50, eta=0.0, device='cpu'):
    """
    DDIM sampling
    eta=0.0: fully deterministic
    eta=1.0: equivalent to DDPM
    """
    # Select evenly spaced timesteps
    c = schedule.timesteps // ddim_timesteps
    timestep_seq = list(range(0, schedule.timesteps, c))[::-1]

    img = torch.randn(batch_size, *image_shape).to(device)

    for i, t in enumerate(timestep_seq):
        t_tensor = torch.full((batch_size,), t, dtype=torch.long, device=device)
        t_prev = timestep_seq[i + 1] if i + 1 < len(timestep_seq) else -1

        alpha_bar = schedule.alphas_cumprod[t].to(device)
        alpha_bar_prev = (
            schedule.alphas_cumprod[t_prev].to(device) if t_prev >= 0
            else torch.tensor(1.0, device=device)
        )

        # Predict noise
        pred_noise = model(img, t_tensor)

        # Predict x_0
        pred_x0 = (img - torch.sqrt(1 - alpha_bar) * pred_noise) / torch.sqrt(alpha_bar)
        pred_x0 = torch.clamp(pred_x0, -1, 1)

        # Compute sigma
        sigma = eta * torch.sqrt(
            (1 - alpha_bar_prev) / (1 - alpha_bar) * (1 - alpha_bar / alpha_bar_prev)
        )

        # Direction towards x_t
        pred_dir = torch.sqrt(1 - alpha_bar_prev - sigma**2) * pred_noise

        # Next step image
        noise = torch.randn_like(img) if t_prev >= 0 else 0
        img = torch.sqrt(alpha_bar_prev) * pred_x0 + pred_dir + sigma * noise

    return img

8. Stable Diffusion Analysis

Stable Diffusion (arXiv:2112.10752) is a groundbreaking approach that performs diffusion in latent space rather than pixel space.

8.1 Latent Diffusion Models (LDM)

Applying diffusion directly to high-resolution images (512x512) is computationally expensive. LDM:

  1. Compresses images to latent space using a VAE encoder (512x512x3 → 64x64x4)
  2. Performs diffusion in latent space
  3. Recovers original resolution with the VAE decoder

This reduces computational cost by more than 8x.

8.2 Stable Diffusion Components

Text prompt → CLIP text encoder → Text embeddings
Pure noise z_T → [U-Net + Cross-Attention]Latent z_0
                               VAE decoder → Final image

CLIP Text Encoder: Transforms text into meaningful vector representations.

U-Net with Cross-Attention: Uses text embeddings as conditions via cross-attention.

Classifier-Free Guidance (CFG): Combines conditional and unconditional predictions to improve text fidelity.

guided_noise = uncond_noise + guidance_scale * (cond_noise - uncond_noise)

8.3 Using Stable Diffusion with diffusers

from diffusers import StableDiffusionPipeline, DPMSolverMultistepScheduler
import torch

model_id = "stabilityai/stable-diffusion-2-1"
pipe = StableDiffusionPipeline.from_pretrained(
    model_id,
    torch_dtype=torch.float16,
    use_safetensors=True,
)

# Use a fast scheduler
pipe.scheduler = DPMSolverMultistepScheduler.from_config(pipe.scheduler.config)
pipe = pipe.to("cuda")

# Generate image
prompt = "a photorealistic landscape of mountains at sunset, 8k, highly detailed"
negative_prompt = "blurry, low quality, distorted"

image = pipe(
    prompt,
    negative_prompt=negative_prompt,
    num_inference_steps=25,    # 1000 DDPM steps → 25 steps
    guidance_scale=7.5,        # CFG scale
    height=512,
    width=512,
    generator=torch.Generator("cuda").manual_seed(42)
).images[0]

image.save("generated_image.png")


# Image-to-image translation
from diffusers import StableDiffusionImg2ImgPipeline
from PIL import Image

img2img_pipe = StableDiffusionImg2ImgPipeline.from_pretrained(
    model_id,
    torch_dtype=torch.float16
).to("cuda")

init_image = Image.open("input.jpg").resize((512, 512))
result = img2img_pipe(
    prompt="a painting in Van Gogh style",
    image=init_image,
    strength=0.75,  # How much to transform (0-1)
    guidance_scale=7.5,
    num_inference_steps=50
).images[0]

8.4 Inpainting

from diffusers import StableDiffusionInpaintPipeline

inpaint_pipe = StableDiffusionInpaintPipeline.from_pretrained(
    "runwayml/stable-diffusion-inpainting",
    torch_dtype=torch.float16
).to("cuda")

image = Image.open("photo.jpg").resize((512, 512))
mask = Image.open("mask.jpg").resize((512, 512))  # White = area to inpaint

result = inpaint_pipe(
    prompt="a beautiful garden with flowers",
    image=image,
    mask_image=mask,
    num_inference_steps=50
).images[0]

9. ControlNet - Fine-grained Image Control

9.1 ControlNet Architecture

ControlNet (Zhang et al., 2023) allows Stable Diffusion to be conditioned on additional control signals (Canny edges, depth maps, pose, etc.).

The original U-Net weights are frozen, and a separate control network is added. It trains approximately 360M additional parameters as a copy of the SD encoder.

from diffusers import StableDiffusionControlNetPipeline, ControlNetModel
from diffusers.utils import load_image
import cv2
import numpy as np

# Load Canny ControlNet
controlnet = ControlNetModel.from_pretrained(
    "lllyasviel/sd-controlnet-canny",
    torch_dtype=torch.float16
)
pipe = StableDiffusionControlNetPipeline.from_pretrained(
    "runwayml/stable-diffusion-v1-5",
    controlnet=controlnet,
    torch_dtype=torch.float16
).to("cuda")

# Extract Canny edges
image = load_image("input.jpg")
image_array = np.array(image)
canny = cv2.Canny(image_array, 100, 200)
canny_image = Image.fromarray(canny)

# Generate with ControlNet
result = pipe(
    prompt="a beautiful oil painting",
    image=canny_image,   # Canny edges as control signal
    controlnet_conditioning_scale=1.0,
    num_inference_steps=50,
    guidance_scale=7.5
).images[0]

10.1 DiT (Diffusion Transformers)

Proposed by Peebles and Xie (2022), DiT uses Transformer architecture as the backbone for diffusion models instead of U-Net. Modern models like Sora, Flux, and SD3 are DiT-based.

class DiTBlock(nn.Module):
    """Diffusion Transformer Block"""
    def __init__(self, hidden_dim, num_heads, mlp_ratio=4.0):
        super().__init__()
        self.norm1 = nn.LayerNorm(hidden_dim)
        self.attn = nn.MultiheadAttention(hidden_dim, num_heads, batch_first=True)
        self.norm2 = nn.LayerNorm(hidden_dim)

        mlp_dim = int(hidden_dim * mlp_ratio)
        self.mlp = nn.Sequential(
            nn.Linear(hidden_dim, mlp_dim),
            nn.GELU(),
            nn.Linear(mlp_dim, hidden_dim)
        )

        # AdaLN (Adaptive Layer Normalization): timestep conditioning
        self.adaLN_modulation = nn.Sequential(
            nn.SiLU(),
            nn.Linear(hidden_dim, 6 * hidden_dim)
        )

    def forward(self, x, c):
        # Compute modulation parameters from timestep/condition embedding
        shift_msa, scale_msa, gate_msa, shift_mlp, scale_mlp, gate_mlp = (
            self.adaLN_modulation(c).chunk(6, dim=-1)
        )

        # Modulated normalization
        x_norm = (1 + scale_msa.unsqueeze(1)) * self.norm1(x) + shift_msa.unsqueeze(1)
        attn_out, _ = self.attn(x_norm, x_norm, x_norm)
        x = x + gate_msa.unsqueeze(1) * attn_out

        x_norm = (1 + scale_mlp.unsqueeze(1)) * self.norm2(x) + shift_mlp.unsqueeze(1)
        x = x + gate_mlp.unsqueeze(1) * self.mlp(x_norm)

        return x

10.2 SDXL (Stable Diffusion XL)

An upgraded version of Stable Diffusion generating higher-resolution (1024x1024) images.

from diffusers import StableDiffusionXLPipeline

pipe = StableDiffusionXLPipeline.from_pretrained(
    "stabilityai/stable-diffusion-xl-base-1.0",
    torch_dtype=torch.float16,
    use_safetensors=True,
    variant="fp16"
).to("cuda")

image = pipe(
    prompt="a majestic lion in photorealistic style, 4k",
    negative_prompt="cartoon, blurry",
    height=1024,
    width=1024,
    num_inference_steps=50,
    guidance_scale=5.0
).images[0]

10.3 Generative Model Comparison

ModelYearCore IdeaStrengthsWeaknesses
VAE2013Learn latent distributionStable, interpretable latent spaceBlurry images
GAN2014Adversarial trainingSharp imagesUnstable training, mode collapse
DDPM2020Iterative denoisingHigh quality, diversitySlow sampling
LDM2022Latent space diffusionEfficient, text conditioningComplex architecture
DiT2022Transformer backboneScaling efficiencyHigh compute cost

Conclusion

We have fully explored the journey of generative AI — from the latent space theory of VAEs, through the adversarial game of GANs, to the iterative denoising of DDPM, and finally to the latent space diffusion of Stable Diffusion.

This field evolves extremely rapidly. Today's cutting-edge becomes tomorrow's baseline. Understanding the fundamentals deeply, while continuously following new research papers, is essential.

Recommended resources for continued learning:

  • Hugging Face Diffusers: https://huggingface.co/docs/diffusers/
  • VAE paper: arXiv:1312.6114
  • GAN paper: arXiv:1406.2661
  • DDPM paper: arXiv:2006.11239
  • Latent Diffusion / Stable Diffusion: arXiv:2112.10752

References

  • Kingma, D. P., & Welling, M. (2013). Auto-Encoding Variational Bayes. arXiv:1312.6114
  • Goodfellow, I., et al. (2014). Generative Adversarial Networks. arXiv:1406.2661
  • Ho, J., et al. (2020). Denoising Diffusion Probabilistic Models. arXiv:2006.11239
  • Rombach, R., et al. (2022). High-Resolution Image Synthesis with Latent Diffusion Models. arXiv:2112.10752
  • Arjovsky, M., et al. (2017). Wasserstein GAN. arXiv:1701.07875
  • Peebles, W., & Xie, S. (2022). Scalable Diffusion Models with Transformers. arXiv:2212.09748
  • Zhang, L., et al. (2023). Adding Conditional Control to Text-to-Image Diffusion Models. arXiv:2302.05543
  • Hugging Face Diffusers: https://huggingface.co/docs/diffusers/