Skip to content
Published on

[深層強化学習] 07. DQN拡張:Double DQN、Dueling DQN、Rainbow

Authors

DQNの改善方向

基本DQNはAtariゲームで人間レベルの性能を示しましたが、まだ改善の余地が多くあります。この記事ではDQNの性能を大きく向上させる6つの拡張技法を見ていきます。


1. N-step DQN

問題:単一ステップTDターゲットの限界

基本DQNは1-step TDターゲットを使用します:

1-step: target = r_t + gamma * max_a Q(s_{t+1}, a)

これは即時的な報酬のみ直接使用し、残りは推定に依存します。

解決:N-step報酬の使用

N-step方法は複数ステップの実際の報酬を使用してより正確なターゲットを作ります:

N-step: target = r_t + gamma * r_{t+1} + ... + gamma^{n-1} * r_{t+n-1} + gamma^n * max_a Q(s_{t+n}, a)
import torch
import torch.nn as nn
import numpy as np
from collections import deque

class NStepBuffer:
    """N-step 리턴을 위한 버퍼"""
    def __init__(self, n_steps, gamma):
        self.n_steps = n_steps
        self.gamma = gamma
        self.buffer = deque(maxlen=n_steps)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def get(self):
        """N-step 전이 데이터를 반환"""
        state, action, _, _, _ = self.buffer[0]
        n_step_return = 0.0
        for i, (_, _, reward, _, done) in enumerate(self.buffer):
            n_step_return += (self.gamma ** i) * reward
            if done:
                break
        _, _, _, next_state, done = self.buffer[-1]
        return state, action, n_step_return, next_state, done

    def is_ready(self):
        return len(self.buffer) == self.n_steps

def compute_nstep_target(n_step_return, next_state, done, target_net, gamma, n_steps, device):
    """N-step TD 타겟 계산"""
    if done:
        return n_step_return
    with torch.no_grad():
        next_q = target_net(
            torch.tensor([next_state], dtype=torch.float32).to(device)
        ).max(dim=1)[0].item()
    return n_step_return + (gamma ** n_steps) * next_q

2. Double DQN

問題:Q値の過大推定(Overestimation)

基本DQNのターゲット計算でmax演算子はQ値を体系的に過大推定する傾向があります:

기본 DQN 타겟: r + gamma * max_a Q_target(s', a)

max演算が行動選択と価値評価を同時に行うため、ノイズにより高く推定された行動が選択されると過大推定が発生します。

解決:行動選択と価値評価の分離

Double DQNは行動選択にはオンラインネットワークを、価値評価にはターゲットネットワークを使用します:

Double DQN 타겟:
  a* = argmax_a Q_online(s', a)           # 온라인 네트워크로 행동 선택
  target = r + gamma * Q_target(s', a*)   # 타겟 네트워크로 가치 평가
def compute_double_dqn_loss(online_net, target_net, states, actions,
                             rewards, next_states, dones, gamma, device):
    """Double DQN 손실 계산"""
    states_t = torch.tensor(states, dtype=torch.float32).to(device)
    actions_t = torch.tensor(actions, dtype=torch.long).to(device)
    rewards_t = torch.tensor(rewards, dtype=torch.float32).to(device)
    next_states_t = torch.tensor(next_states, dtype=torch.float32).to(device)
    dones_t = torch.tensor(dones, dtype=torch.bool).to(device)

    current_q = online_net(states_t).gather(1, actions_t.unsqueeze(1)).squeeze(1)

    with torch.no_grad():
        best_actions = online_net(next_states_t).argmax(dim=1)
        next_q = target_net(next_states_t).gather(1, best_actions.unsqueeze(1)).squeeze(1)
        next_q[dones_t] = 0.0
        target_q = rewards_t + gamma * next_q

    loss = nn.MSELoss()(current_q, target_q)
    return loss

過大推定削減効果

Double DQNは特に行動空間が大きいか報酬にノイズが多い環境で効果的です。基本DQN対比で安定的かつより良い最終性能を示します。


3. Noisy Networks

問題:イプシロン貪欲の限界

イプシロン貪欲方策はすべての状態で同一の確率でランダム探索を行います。状態別に適切な探索レベルを調整できません。

解決:パラメータノイズ

ネットワーク重みに学習可能なノイズを追加します。エージェントが自動的に探索の程度を学習します。

import torch
import torch.nn as nn
import math

class NoisyLinear(nn.Module):
    """Factorized Gaussian Noise를 사용하는 선형 레이어"""
    def __init__(self, in_features, out_features, sigma_init=0.5):
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.weight_mu = nn.Parameter(torch.empty(out_features, in_features))
        self.weight_sigma = nn.Parameter(torch.empty(out_features, in_features))
        self.bias_mu = nn.Parameter(torch.empty(out_features))
        self.bias_sigma = nn.Parameter(torch.empty(out_features))
        self.register_buffer('weight_epsilon', torch.empty(out_features, in_features))
        self.register_buffer('bias_epsilon', torch.empty(out_features))
        self.sigma_init = sigma_init
        self.reset_parameters()
        self.reset_noise()

    def reset_parameters(self):
        mu_range = 1.0 / math.sqrt(self.in_features)
        self.weight_mu.data.uniform_(-mu_range, mu_range)
        self.weight_sigma.data.fill_(self.sigma_init / math.sqrt(self.in_features))
        self.bias_mu.data.uniform_(-mu_range, mu_range)
        self.bias_sigma.data.fill_(self.sigma_init / math.sqrt(self.out_features))

    def reset_noise(self):
        """새로운 노이즈 생성"""
        epsilon_in = self._scale_noise(self.in_features)
        epsilon_out = self._scale_noise(self.out_features)
        self.weight_epsilon.copy_(epsilon_out.outer(epsilon_in))
        self.bias_epsilon.copy_(epsilon_out)

    def _scale_noise(self, size):
        x = torch.randn(size)
        return x.sign() * x.abs().sqrt()

    def forward(self, x):
        if self.training:
            weight = self.weight_mu + self.weight_sigma * self.weight_epsilon
            bias = self.bias_mu + self.bias_sigma * self.bias_epsilon
        else:
            weight = self.weight_mu
            bias = self.bias_mu
        return nn.functional.linear(x, weight, bias)


class NoisyDQN(nn.Module):
    """Noisy Network를 사용하는 DQN"""
    def __init__(self, obs_size, n_actions):
        super().__init__()
        self.fc1 = nn.Linear(obs_size, 128)
        self.noisy_fc2 = NoisyLinear(128, 128)
        self.noisy_fc3 = NoisyLinear(128, n_actions)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.noisy_fc2(x))
        return self.noisy_fc3(x)

    def reset_noise(self):
        """모든 NoisyLinear 레이어의 노이즈 재생성"""
        self.noisy_fc2.reset_noise()
        self.noisy_fc3.reset_noise()

Noisy Networkを使えばイプシロンスケジュールが不要です。ネットワーク自体が探索を管理します。


4. Prioritized Experience Replay

問題:均一サンプリングの非効率性

基本経験リプレイはすべての経験を同一の確率でサンプリングします。しかし学習により有用な経験(TD誤差が大きい経験)があります。

解決:TD誤差ベースの優先順位

TD誤差が大きい経験をより頻繁にサンプリングします。

class SumTree:
    """효율적인 우선순위 샘플링을 위한 합 트리"""
    def __init__(self, capacity):
        self.capacity = capacity
        self.tree = np.zeros(2 * capacity - 1)
        self.data = [None] * capacity
        self.write_idx = 0
        self.size = 0

    def total(self):
        return self.tree[0]

    def add(self, priority, data):
        idx = self.write_idx + self.capacity - 1
        self.data[self.write_idx] = data
        self._update(idx, priority)
        self.write_idx = (self.write_idx + 1) % self.capacity
        self.size = min(self.size + 1, self.capacity)

    def _update(self, idx, priority):
        change = priority - self.tree[idx]
        self.tree[idx] = priority
        while idx > 0:
            idx = (idx - 1) // 2
            self.tree[idx] += change

    def get(self, value):
        idx = 0
        while idx < self.capacity - 1:
            left = 2 * idx + 1
            right = left + 1
            if value <= self.tree[left]:
                idx = left
            else:
                value -= self.tree[left]
                idx = right
        data_idx = idx - self.capacity + 1
        return idx, self.tree[idx], self.data[data_idx]

class PrioritizedReplayBuffer:
    """우선순위 경험 리플레이 버퍼"""
    def __init__(self, capacity, alpha=0.6, beta_start=0.4, beta_frames=100000):
        self.tree = SumTree(capacity)
        self.alpha = alpha
        self.beta_start = beta_start
        self.beta_frames = beta_frames
        self.frame = 0
        self.max_priority = 1.0

    def get_beta(self):
        beta = self.beta_start + (1.0 - self.beta_start) * \
            min(1.0, self.frame / self.beta_frames)
        self.frame += 1
        return beta

    def push(self, state, action, reward, next_state, done):
        data = (state, action, reward, next_state, done)
        priority = self.max_priority ** self.alpha
        self.tree.add(priority, data)

    def sample(self, batch_size):
        beta = self.get_beta()
        batch = []
        indices = []
        priorities = []
        segment = self.tree.total() / batch_size
        for i in range(batch_size):
            low = segment * i
            high = segment * (i + 1)
            value = np.random.uniform(low, high)
            idx, priority, data = self.tree.get(value)
            batch.append(data)
            indices.append(idx)
            priorities.append(priority)
        probs = np.array(priorities) / self.tree.total()
        weights = (self.tree.size * probs) ** (-beta)
        weights = weights / weights.max()
        states, actions, rewards, next_states, dones = zip(*batch)
        return (
            np.array(states), np.array(actions),
            np.array(rewards, dtype=np.float32), np.array(next_states),
            np.array(dones, dtype=np.bool_), np.array(indices),
            torch.tensor(weights, dtype=torch.float32),
        )

    def update_priorities(self, indices, td_errors):
        for idx, td_error in zip(indices, td_errors):
            priority = (abs(td_error) + 1e-6) ** self.alpha
            self.max_priority = max(self.max_priority, priority)
            self.tree._update(idx, priority)

5. Dueling DQN

核心アイデア

Q値を**状態価値V(s)アドバンテージA(s, a)**に分離します:

Q(s, a) = V(s) + A(s, a) - mean_a(A(s, a))

この分離の利点は、ある状態ではどの行動を取っても結果が似ている場合があり、その時V(s)だけ正確に推定すればよいということです。

class DuelingDQN(nn.Module):
    """Dueling DQN 네트워크"""
    def __init__(self, obs_size, n_actions):
        super().__init__()
        self.feature = nn.Sequential(nn.Linear(obs_size, 128), nn.ReLU())
        self.value_stream = nn.Sequential(nn.Linear(128, 128), nn.ReLU(), nn.Linear(128, 1))
        self.advantage_stream = nn.Sequential(nn.Linear(128, 128), nn.ReLU(), nn.Linear(128, n_actions))

    def forward(self, x):
        features = self.feature(x)
        value = self.value_stream(features)
        advantage = self.advantage_stream(features)
        q_values = value + advantage - advantage.mean(dim=-1, keepdim=True)
        return q_values

6. Categorical DQN (C51)

核心アイデア

既存DQNはQ値の期待値のみ推定します。Categorical DQNはQ値の全体確率分布を推定します。

class CategoricalDQN(nn.Module):
    """Categorical DQN (C51) 네트워크"""
    def __init__(self, obs_size, n_actions, n_atoms=51, v_min=-10, v_max=10):
        super().__init__()
        self.n_actions = n_actions
        self.n_atoms = n_atoms
        self.v_min = v_min
        self.v_max = v_max
        self.delta_z = (v_max - v_min) / (n_atoms - 1)
        self.register_buffer('support', torch.linspace(v_min, v_max, n_atoms))
        self.network = nn.Sequential(
            nn.Linear(obs_size, 128), nn.ReLU(),
            nn.Linear(128, 128), nn.ReLU(),
            nn.Linear(128, n_actions * n_atoms),
        )

    def forward(self, x):
        logits = self.network(x)
        logits = logits.view(-1, self.n_actions, self.n_atoms)
        probs = torch.softmax(logits, dim=-1)
        return probs

    def get_q_values(self, x):
        probs = self.forward(x)
        q_values = (probs * self.support.unsqueeze(0).unsqueeze(0)).sum(dim=-1)
        return q_values

Rainbow:すべてを結合

RainbowはWの6つの技法をすべて結合したアルゴリズムです。各技法が独立して貢献し、結合するとシナジー効果が生まれます。

Rainbow構成要素

  1. N-step returns: より正確なターゲット
  2. Double DQN: 過大推定防止
  3. Noisy Networks: 自動探索管理
  4. Prioritized Replay: 効率的な経験活用
  5. Dueling Architecture: VとAの分離
  6. Categorical DQN: 価値分布学習
class RainbowDQN(nn.Module):
    """Rainbow DQN: 모든 확장 기법 결합"""
    def __init__(self, obs_size, n_actions, n_atoms=51, v_min=-10, v_max=10):
        super().__init__()
        self.n_actions = n_actions
        self.n_atoms = n_atoms
        self.v_min = v_min
        self.v_max = v_max
        self.register_buffer('support', torch.linspace(v_min, v_max, n_atoms))
        self.feature = nn.Sequential(nn.Linear(obs_size, 128), nn.ReLU())
        self.value_noisy1 = NoisyLinear(128, 128)
        self.value_noisy2 = NoisyLinear(128, n_atoms)
        self.advantage_noisy1 = NoisyLinear(128, 128)
        self.advantage_noisy2 = NoisyLinear(128, n_actions * n_atoms)

    def forward(self, x):
        features = self.feature(x)
        value = torch.relu(self.value_noisy1(features))
        value = self.value_noisy2(value).view(-1, 1, self.n_atoms)
        advantage = torch.relu(self.advantage_noisy1(features))
        advantage = self.advantage_noisy2(advantage).view(-1, self.n_actions, self.n_atoms)
        q_dist = value + advantage - advantage.mean(dim=1, keepdim=True)
        probs = torch.softmax(q_dist, dim=-1)
        return probs

    def get_q_values(self, x):
        probs = self.forward(x)
        q_values = (probs * self.support.unsqueeze(0).unsqueeze(0)).sum(dim=-1)
        return q_values

    def reset_noise(self):
        self.value_noisy1.reset_noise()
        self.value_noisy2.reset_noise()
        self.advantage_noisy1.reset_noise()
        self.advantage_noisy2.reset_noise()

各技法の性能貢献度

Atariゲームでの各技法の貢献度:

技法主要効果貢献度
Prioritized Replay学習効率向上高い
N-step Returnsより速い収束高い
Categorical DQN価値分布学習中〜高
Dueling状態価値分離中程度
Double DQN過大推定防止中程度
Noisy Networks適応的探索中程度

すべての技法を結合したRainbowは個別技法よりはるかに優れた性能を示します。特にデータ効率性で大きな向上を見せ、同じ数のフレームではるかに高いスコアを達成します。


まとめ

  1. N-step DQN: 複数ステップの実際の報酬を使用してターゲットの正確度を向上
  2. Double DQN: 行動選択と価値評価を分離して過大推定を防止
  3. Noisy Networks: 学習可能なノイズで状態別適応的探索
  4. Prioritized Replay: TD誤差が大きい経験をより頻繁に学習
  5. Dueling DQN: Q値をV(s)とA(s, a)に分離して学習効率を向上
  6. Categorical DQN: 価値の確率分布を学習してより豊かな情報を活用
  7. Rainbow: 6つの技法の結合で最高性能を達成

次の記事では強化学習を実際の金融問題に適用して株式トレーディングエージェントを作ります。