Split View: [심층 강화학습] 07. DQN 확장: Double DQN, Dueling DQN, Rainbow
[심층 강화학습] 07. DQN 확장: Double DQN, Dueling DQN, Rainbow
DQN의 개선 방향
기본 DQN은 Atari 게임에서 인간 수준의 성능을 보여주었지만, 여전히 개선할 여지가 많습니다. 이번 글에서는 DQN의 성능을 크게 향상시키는 6가지 확장 기법을 살펴보겠습니다.
1. N-step DQN
문제: 단일 스텝 TD 타겟의 한계
기본 DQN은 1-step TD 타겟을 사용합니다.
1-step: target = r_t + gamma * max_a Q(s_{t+1}, a)
이는 즉각적인 보상만 직접 사용하고 나머지는 추정에 의존합니다.
해결: N-step 보상 사용
N-step 방법은 여러 스텝의 실제 보상을 사용하여 더 정확한 타겟을 만듭니다.
N-step: target = r_t + gamma * r_{t+1} + ... + gamma^{n-1} * r_{t+n-1} + gamma^n * max_a Q(s_{t+n}, a)
import torch
import torch.nn as nn
import numpy as np
from collections import deque
class NStepBuffer:
"""N-step 리턴을 위한 버퍼"""
def __init__(self, n_steps, gamma):
self.n_steps = n_steps
self.gamma = gamma
self.buffer = deque(maxlen=n_steps)
def push(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def get(self):
"""N-step 전이 데이터를 반환"""
# 첫 번째 전이의 상태와 행동
state, action, _, _, _ = self.buffer[0]
# N-step 리턴 계산
n_step_return = 0.0
for i, (_, _, reward, _, done) in enumerate(self.buffer):
n_step_return += (self.gamma ** i) * reward
if done:
break
# 마지막 전이의 다음 상태
_, _, _, next_state, done = self.buffer[-1]
return state, action, n_step_return, next_state, done
def is_ready(self):
return len(self.buffer) == self.n_steps
# N-step DQN 학습에서의 사용
def compute_nstep_target(n_step_return, next_state, done, target_net, gamma, n_steps, device):
"""N-step TD 타겟 계산"""
if done:
return n_step_return
with torch.no_grad():
next_q = target_net(
torch.tensor([next_state], dtype=torch.float32).to(device)
).max(dim=1)[0].item()
return n_step_return + (gamma ** n_steps) * next_q
2. Double DQN
문제: Q값 과대추정 (Overestimation)
기본 DQN의 타겟 계산에서 max 연산자는 Q값을 체계적으로 과대추정하는 경향이 있습니다.
기본 DQN 타겟: r + gamma * max_a Q_target(s', a)
max 연산이 행동 선택과 가치 평가를 동시에 수행하기 때문에, 노이즈에 의해 높게 추정된 행동이 선택되면 과대추정이 발생합니다.
해결: 행동 선택과 가치 평가 분리
Double DQN은 행동 선택에는 온라인 네트워크를, 가치 평가에는 타겟 네트워크를 사용합니다.
Double DQN 타겟:
a* = argmax_a Q_online(s', a) # 온라인 네트워크로 행동 선택
target = r + gamma * Q_target(s', a*) # 타겟 네트워크로 가치 평가
def compute_double_dqn_loss(online_net, target_net, states, actions,
rewards, next_states, dones, gamma, device):
"""Double DQN 손실 계산"""
states_t = torch.tensor(states, dtype=torch.float32).to(device)
actions_t = torch.tensor(actions, dtype=torch.long).to(device)
rewards_t = torch.tensor(rewards, dtype=torch.float32).to(device)
next_states_t = torch.tensor(next_states, dtype=torch.float32).to(device)
dones_t = torch.tensor(dones, dtype=torch.bool).to(device)
# 현재 Q값
current_q = online_net(states_t).gather(1, actions_t.unsqueeze(1)).squeeze(1)
with torch.no_grad():
# 핵심: 온라인 네트워크로 행동 선택
best_actions = online_net(next_states_t).argmax(dim=1)
# 타겟 네트워크로 해당 행동의 가치 평가
next_q = target_net(next_states_t).gather(1, best_actions.unsqueeze(1)).squeeze(1)
next_q[dones_t] = 0.0
target_q = rewards_t + gamma * next_q
loss = nn.MSELoss()(current_q, target_q)
return loss
과대추정 감소 효과
Double DQN은 특히 행동 공간이 크거나 보상이 노이즈가 많은 환경에서 효과적입니다. 기본 DQN 대비 안정적이고 더 나은 최종 성능을 보여줍니다.
3. Noisy Networks
문제: 엡실론-탐욕의 한계
엡실론-탐욕 정책은 모든 상태에서 동일한 확률로 무작위 탐색을 수행합니다. 이는 상태별로 적절한 탐색 수준을 조절하지 못합니다.
해결: 파라미터 노이즈
네트워크 가중치에 학습 가능한 노이즈를 추가합니다. 에이전트가 자동으로 탐색의 정도를 학습합니다.
import torch
import torch.nn as nn
import math
class NoisyLinear(nn.Module):
"""Factorized Gaussian Noise를 사용하는 선형 레이어"""
def __init__(self, in_features, out_features, sigma_init=0.5):
super().__init__()
self.in_features = in_features
self.out_features = out_features
# 학습 가능한 파라미터
self.weight_mu = nn.Parameter(torch.empty(out_features, in_features))
self.weight_sigma = nn.Parameter(torch.empty(out_features, in_features))
self.bias_mu = nn.Parameter(torch.empty(out_features))
self.bias_sigma = nn.Parameter(torch.empty(out_features))
# 노이즈를 위한 버퍼 (학습 대상 아님)
self.register_buffer('weight_epsilon', torch.empty(out_features, in_features))
self.register_buffer('bias_epsilon', torch.empty(out_features))
self.sigma_init = sigma_init
self.reset_parameters()
self.reset_noise()
def reset_parameters(self):
mu_range = 1.0 / math.sqrt(self.in_features)
self.weight_mu.data.uniform_(-mu_range, mu_range)
self.weight_sigma.data.fill_(self.sigma_init / math.sqrt(self.in_features))
self.bias_mu.data.uniform_(-mu_range, mu_range)
self.bias_sigma.data.fill_(self.sigma_init / math.sqrt(self.out_features))
def reset_noise(self):
"""새로운 노이즈 생성"""
epsilon_in = self._scale_noise(self.in_features)
epsilon_out = self._scale_noise(self.out_features)
self.weight_epsilon.copy_(epsilon_out.outer(epsilon_in))
self.bias_epsilon.copy_(epsilon_out)
def _scale_noise(self, size):
x = torch.randn(size)
return x.sign() * x.abs().sqrt()
def forward(self, x):
if self.training:
weight = self.weight_mu + self.weight_sigma * self.weight_epsilon
bias = self.bias_mu + self.bias_sigma * self.bias_epsilon
else:
weight = self.weight_mu
bias = self.bias_mu
return nn.functional.linear(x, weight, bias)
class NoisyDQN(nn.Module):
"""Noisy Network를 사용하는 DQN"""
def __init__(self, obs_size, n_actions):
super().__init__()
self.fc1 = nn.Linear(obs_size, 128)
self.noisy_fc2 = NoisyLinear(128, 128)
self.noisy_fc3 = NoisyLinear(128, n_actions)
def forward(self, x):
x = torch.relu(self.fc1(x))
x = torch.relu(self.noisy_fc2(x))
return self.noisy_fc3(x)
def reset_noise(self):
"""모든 NoisyLinear 레이어의 노이즈 재생성"""
self.noisy_fc2.reset_noise()
self.noisy_fc3.reset_noise()
Noisy Network를 사용하면 엡실론 스케줄이 필요 없습니다. 네트워크 자체가 탐색을 관리합니다.
4. Prioritized Experience Replay
문제: 균일 샘플링의 비효율성
기본 경험 리플레이는 모든 경험을 동일한 확률로 샘플링합니다. 하지만 학습에 더 유용한 경험(TD 오차가 큰 경험)이 있습니다.
해결: TD 오차 기반 우선순위
TD 오차가 큰 경험을 더 자주 샘플링합니다.
class SumTree:
"""효율적인 우선순위 샘플링을 위한 합 트리"""
def __init__(self, capacity):
self.capacity = capacity
self.tree = np.zeros(2 * capacity - 1)
self.data = [None] * capacity
self.write_idx = 0
self.size = 0
def total(self):
return self.tree[0]
def add(self, priority, data):
idx = self.write_idx + self.capacity - 1
self.data[self.write_idx] = data
self._update(idx, priority)
self.write_idx = (self.write_idx + 1) % self.capacity
self.size = min(self.size + 1, self.capacity)
def _update(self, idx, priority):
change = priority - self.tree[idx]
self.tree[idx] = priority
while idx > 0:
idx = (idx - 1) // 2
self.tree[idx] += change
def get(self, value):
"""값에 해당하는 데이터를 검색"""
idx = 0
while idx < self.capacity - 1:
left = 2 * idx + 1
right = left + 1
if value <= self.tree[left]:
idx = left
else:
value -= self.tree[left]
idx = right
data_idx = idx - self.capacity + 1
return idx, self.tree[idx], self.data[data_idx]
class PrioritizedReplayBuffer:
"""우선순위 경험 리플레이 버퍼"""
def __init__(self, capacity, alpha=0.6, beta_start=0.4, beta_frames=100000):
self.tree = SumTree(capacity)
self.alpha = alpha # 우선순위의 지수 (0=균일, 1=완전 우선순위)
self.beta_start = beta_start
self.beta_frames = beta_frames
self.frame = 0
self.max_priority = 1.0
def get_beta(self):
"""중요도 샘플링 보정 계수 (점차 1로 증가)"""
beta = self.beta_start + (1.0 - self.beta_start) * \
min(1.0, self.frame / self.beta_frames)
self.frame += 1
return beta
def push(self, state, action, reward, next_state, done):
data = (state, action, reward, next_state, done)
priority = self.max_priority ** self.alpha
self.tree.add(priority, data)
def sample(self, batch_size):
beta = self.get_beta()
batch = []
indices = []
priorities = []
segment = self.tree.total() / batch_size
for i in range(batch_size):
low = segment * i
high = segment * (i + 1)
value = np.random.uniform(low, high)
idx, priority, data = self.tree.get(value)
batch.append(data)
indices.append(idx)
priorities.append(priority)
# 중요도 샘플링 가중치 계산
probs = np.array(priorities) / self.tree.total()
weights = (self.tree.size * probs) ** (-beta)
weights = weights / weights.max()
states, actions, rewards, next_states, dones = zip(*batch)
return (
np.array(states),
np.array(actions),
np.array(rewards, dtype=np.float32),
np.array(next_states),
np.array(dones, dtype=np.bool_),
np.array(indices),
torch.tensor(weights, dtype=torch.float32),
)
def update_priorities(self, indices, td_errors):
"""TD 오차를 기반으로 우선순위 업데이트"""
for idx, td_error in zip(indices, td_errors):
priority = (abs(td_error) + 1e-6) ** self.alpha
self.max_priority = max(self.max_priority, priority)
self.tree._update(idx, priority)
5. Dueling DQN
핵심 아이디어
Q값을 **상태 가치 V(s)**와 **어드밴티지 A(s, a)**로 분리합니다.
Q(s, a) = V(s) + A(s, a) - mean_a(A(s, a))
이 분리의 장점은, 어떤 상태에서는 어떤 행동을 취하든 결과가 비슷한 경우가 있는데, 이때 V(s)만 정확히 추정하면 된다는 것입니다.
class DuelingDQN(nn.Module):
"""Dueling DQN 네트워크"""
def __init__(self, obs_size, n_actions):
super().__init__()
# 공유 특징 추출기
self.feature = nn.Sequential(
nn.Linear(obs_size, 128),
nn.ReLU(),
)
# 가치 스트림: V(s) 추정
self.value_stream = nn.Sequential(
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, 1),
)
# 어드밴티지 스트림: A(s, a) 추정
self.advantage_stream = nn.Sequential(
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, n_actions),
)
def forward(self, x):
features = self.feature(x)
value = self.value_stream(features)
advantage = self.advantage_stream(features)
# Q = V + (A - mean(A))
q_values = value + advantage - advantage.mean(dim=-1, keepdim=True)
return q_values
6. Categorical DQN (C51)
핵심 아이디어
기존 DQN은 Q값의 기대값만 추정합니다. Categorical DQN은 Q값의 전체 확률 분포를 추정합니다.
class CategoricalDQN(nn.Module):
"""Categorical DQN (C51) 네트워크"""
def __init__(self, obs_size, n_actions, n_atoms=51, v_min=-10, v_max=10):
super().__init__()
self.n_actions = n_actions
self.n_atoms = n_atoms
self.v_min = v_min
self.v_max = v_max
self.delta_z = (v_max - v_min) / (n_atoms - 1)
self.register_buffer(
'support',
torch.linspace(v_min, v_max, n_atoms)
)
self.network = nn.Sequential(
nn.Linear(obs_size, 128),
nn.ReLU(),
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, n_actions * n_atoms),
)
def forward(self, x):
logits = self.network(x)
# (batch, n_actions * n_atoms) -> (batch, n_actions, n_atoms)
logits = logits.view(-1, self.n_actions, self.n_atoms)
# 소프트맥스로 확률 분포 변환
probs = torch.softmax(logits, dim=-1)
return probs
def get_q_values(self, x):
"""확률 분포로부터 Q값(기대값) 계산"""
probs = self.forward(x)
# Q(s, a) = sum_i p_i * z_i
q_values = (probs * self.support.unsqueeze(0).unsqueeze(0)).sum(dim=-1)
return q_values
def compute_categorical_loss(online_net, target_net, states, actions,
rewards, next_states, dones, gamma, device):
"""Categorical DQN 손실 계산"""
batch_size = len(states)
n_atoms = online_net.n_atoms
v_min = online_net.v_min
v_max = online_net.v_max
delta_z = online_net.delta_z
support = online_net.support
states_t = torch.tensor(states, dtype=torch.float32).to(device)
actions_t = torch.tensor(actions, dtype=torch.long).to(device)
rewards_t = torch.tensor(rewards, dtype=torch.float32).to(device)
next_states_t = torch.tensor(next_states, dtype=torch.float32).to(device)
dones_t = torch.tensor(dones, dtype=torch.bool).to(device)
with torch.no_grad():
# 다음 상태의 최적 행동 (Double DQN 스타일)
next_q = online_net.get_q_values(next_states_t)
next_actions = next_q.argmax(dim=1)
# 타겟 분포
next_probs = target_net(next_states_t)
next_dist = next_probs[range(batch_size), next_actions]
# 벨만 업데이트: T_z = r + gamma * z
t_z = rewards_t.unsqueeze(1) + gamma * support.unsqueeze(0) * (~dones_t).float().unsqueeze(1)
t_z = t_z.clamp(v_min, v_max)
# 원자(atom)에 투영
b = (t_z - v_min) / delta_z
l = b.floor().long()
u = b.ceil().long()
# 분포 투영
target_dist = torch.zeros_like(next_dist)
for i in range(batch_size):
for j in range(n_atoms):
target_dist[i, l[i, j]] += next_dist[i, j] * (u[i, j].float() - b[i, j])
target_dist[i, u[i, j]] += next_dist[i, j] * (b[i, j] - l[i, j].float())
# 현재 분포
current_probs = online_net(states_t)
current_dist = current_probs[range(batch_size), actions_t]
# Cross-Entropy 손실
loss = -(target_dist * (current_dist + 1e-8).log()).sum(dim=-1).mean()
return loss
Rainbow: 모두 결합하기
Rainbow는 위의 6가지 기법을 모두 결합한 알고리즘입니다. 각 기법이 독립적으로 기여하며, 결합하면 시너지 효과를 냅니다.
Rainbow 구성 요소
- N-step returns: 더 정확한 타겟
- Double DQN: 과대추정 방지
- Noisy Networks: 자동 탐색 관리
- Prioritized Replay: 효율적 경험 활용
- Dueling Architecture: V와 A 분리
- Categorical DQN: 가치 분포 학습
class RainbowDQN(nn.Module):
"""Rainbow DQN: 모든 확장 기법 결합"""
def __init__(self, obs_size, n_actions, n_atoms=51, v_min=-10, v_max=10):
super().__init__()
self.n_actions = n_actions
self.n_atoms = n_atoms
self.v_min = v_min
self.v_max = v_max
self.register_buffer('support', torch.linspace(v_min, v_max, n_atoms))
# 공유 특징 추출기
self.feature = nn.Sequential(
nn.Linear(obs_size, 128),
nn.ReLU(),
)
# Dueling + Noisy + Categorical
# 가치 스트림 (Noisy 레이어 사용)
self.value_noisy1 = NoisyLinear(128, 128)
self.value_noisy2 = NoisyLinear(128, n_atoms)
# 어드밴티지 스트림 (Noisy 레이어 사용)
self.advantage_noisy1 = NoisyLinear(128, 128)
self.advantage_noisy2 = NoisyLinear(128, n_actions * n_atoms)
def forward(self, x):
features = self.feature(x)
# 가치 스트림
value = torch.relu(self.value_noisy1(features))
value = self.value_noisy2(value).view(-1, 1, self.n_atoms)
# 어드밴티지 스트림
advantage = torch.relu(self.advantage_noisy1(features))
advantage = self.advantage_noisy2(advantage).view(-1, self.n_actions, self.n_atoms)
# Dueling: Q = V + A - mean(A) (분포 수준에서)
q_dist = value + advantage - advantage.mean(dim=1, keepdim=True)
probs = torch.softmax(q_dist, dim=-1)
return probs
def get_q_values(self, x):
probs = self.forward(x)
q_values = (probs * self.support.unsqueeze(0).unsqueeze(0)).sum(dim=-1)
return q_values
def reset_noise(self):
self.value_noisy1.reset_noise()
self.value_noisy2.reset_noise()
self.advantage_noisy1.reset_noise()
self.advantage_noisy2.reset_noise()
Rainbow 학습 루프의 핵심
def train_rainbow_step(online_net, target_net, optimizer, buffer, n_steps, gamma, device):
"""Rainbow 한 스텝 학습"""
# 1. 우선순위 기반 샘플링
states, actions, rewards, next_states, dones, indices, weights = \
buffer.sample(batch_size=32)
weights = weights.to(device)
# 2. Categorical 손실 계산 (Double DQN 스타일)
loss = compute_categorical_loss(
online_net, target_net,
states, actions, rewards, next_states, dones,
gamma=(gamma ** n_steps), # N-step 감마
device=device,
)
# 3. 중요도 샘플링 가중치 적용
weighted_loss = (loss * weights).mean()
# 4. 역전파
optimizer.zero_grad()
weighted_loss.backward()
torch.nn.utils.clip_grad_norm_(online_net.parameters(), 10)
optimizer.step()
# 5. 우선순위 업데이트
td_errors = loss.detach().cpu().numpy()
buffer.update_priorities(indices, td_errors)
# 6. 노이즈 재생성
online_net.reset_noise()
target_net.reset_noise()
return weighted_loss.item()
각 기법의 성능 기여도
Atari 게임에서의 각 기법의 기여도를 살펴보면 다음과 같습니다.
| 기법 | 주요 효과 | 기여도 |
|---|---|---|
| Prioritized Replay | 학습 효율 향상 | 높음 |
| N-step Returns | 더 빠른 수렴 | 높음 |
| Categorical DQN | 가치 분포 학습 | 중간~높음 |
| Dueling | 상태 가치 분리 | 중간 |
| Double DQN | 과대추정 방지 | 중간 |
| Noisy Networks | 적응적 탐색 | 중간 |
모든 기법을 결합한 Rainbow는 개별 기법보다 훨씬 뛰어난 성능을 보입니다. 특히 데이터 효율성에서 큰 향상을 보여, 같은 수의 프레임에서 훨씬 높은 점수를 달성합니다.
CartPole에서 확장 기법 비교 실험
import gymnasium as gym
from collections import deque
import random
def train_and_compare(variant="basic", n_episodes=300):
"""다양한 DQN 변형을 CartPole에서 비교"""
env = gym.make("CartPole-v1")
obs_size = env.observation_space.shape[0]
n_actions = env.action_space.n
device = torch.device("cpu")
if variant == "basic":
net = nn.Sequential(
nn.Linear(obs_size, 128), nn.ReLU(),
nn.Linear(128, 128), nn.ReLU(),
nn.Linear(128, n_actions),
)
elif variant == "dueling":
net = DuelingDQN(obs_size, n_actions)
elif variant == "noisy":
net = NoisyDQN(obs_size, n_actions)
else:
net = nn.Sequential(
nn.Linear(obs_size, 128), nn.ReLU(),
nn.Linear(128, n_actions),
)
target = type(net)(obs_size, n_actions) if variant in ["dueling", "noisy"] else \
nn.Sequential(
nn.Linear(obs_size, 128), nn.ReLU(),
nn.Linear(128, 128), nn.ReLU(),
nn.Linear(128, n_actions),
)
target.load_state_dict(net.state_dict())
optimizer = torch.optim.Adam(net.parameters(), lr=1e-3)
buffer = ReplayBuffer(10000)
rewards_history = []
epsilon = 1.0 if variant != "noisy" else 0.0
for episode in range(n_episodes):
obs, _ = env.reset()
total_reward = 0
while True:
# 행동 선택
if variant == "noisy":
with torch.no_grad():
q = net(torch.tensor([obs], dtype=torch.float32))
action = q.argmax(dim=1).item()
elif random.random() < epsilon:
action = env.action_space.sample()
else:
with torch.no_grad():
q = net(torch.tensor([obs], dtype=torch.float32))
action = q.argmax(dim=1).item()
next_obs, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
buffer.push(obs, action, reward, next_obs, done)
total_reward += reward
obs = next_obs
# 학습 (Double DQN 방식)
if len(buffer) >= 64:
s, a, r, ns, d = buffer.sample(64)
s_t = torch.tensor(s, dtype=torch.float32)
a_t = torch.tensor(a, dtype=torch.long)
r_t = torch.tensor(r, dtype=torch.float32)
ns_t = torch.tensor(ns, dtype=torch.float32)
d_t = torch.tensor(d, dtype=torch.bool)
curr_q = net(s_t).gather(1, a_t.unsqueeze(1)).squeeze(1)
with torch.no_grad():
# Double DQN: 온라인으로 행동 선택, 타겟으로 평가
best_a = net(ns_t).argmax(dim=1)
next_q = target(ns_t).gather(1, best_a.unsqueeze(1)).squeeze(1)
next_q[d_t] = 0.0
tgt = r_t + 0.99 * next_q
loss = nn.MSELoss()(curr_q, tgt)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if done:
break
if variant != "noisy":
epsilon = max(0.01, epsilon * 0.995)
if hasattr(net, 'reset_noise'):
net.reset_noise()
target.reset_noise()
rewards_history.append(total_reward)
if episode % 50 == 0:
mean_r = np.mean(rewards_history[-50:])
print(f"[{variant}] 에피소드 {episode}: 평균 보상={mean_r:.1f}")
if episode % 100 == 0:
target.load_state_dict(net.state_dict())
env.close()
return rewards_history
# 비교 실행
# results_basic = train_and_compare("basic")
# results_dueling = train_and_compare("dueling")
# results_noisy = train_and_compare("noisy")
정리
- N-step DQN: 여러 스텝의 실제 보상을 사용하여 타겟의 정확도 향상
- Double DQN: 행동 선택과 가치 평가를 분리하여 과대추정 방지
- Noisy Networks: 학습 가능한 노이즈로 상태별 적응적 탐색
- Prioritized Replay: TD 오차가 큰 경험을 더 자주 학습
- Dueling DQN: Q값을 V(s)와 A(s, a)로 분리하여 학습 효율 향상
- Categorical DQN: 가치의 확률 분포를 학습하여 더 풍부한 정보 활용
- Rainbow: 6가지 기법의 결합으로 최고 성능 달성
다음 글에서는 강화학습을 실제 금융 문제에 적용하여 주식 트레이딩 에이전트를 만들어 보겠습니다.
[Deep RL] 07. DQN Extensions: Double DQN, Dueling DQN, Rainbow
Directions for Improving DQN
While basic DQN showed human-level performance in Atari games, there is still much room for improvement. In this article, we explore six extension techniques that significantly enhance DQN performance.
1. N-step DQN
Problem: Limitations of Single-Step TD Targets
Basic DQN uses 1-step TD targets:
1-step: target = r_t + gamma * max_a Q(s_{t+1}, a)
This directly uses only the immediate reward and relies on estimation for the rest.
Solution: Using N-step Rewards
The N-step method uses actual rewards from multiple steps to create more accurate targets:
N-step: target = r_t + gamma * r_{t+1} + ... + gamma^{n-1} * r_{t+n-1} + gamma^n * max_a Q(s_{t+n}, a)
import torch
import torch.nn as nn
import numpy as np
from collections import deque
class NStepBuffer:
"""N-step 리턴을 위한 버퍼"""
def __init__(self, n_steps, gamma):
self.n_steps = n_steps
self.gamma = gamma
self.buffer = deque(maxlen=n_steps)
def push(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def get(self):
"""N-step 전이 데이터를 반환"""
# 첫 번째 전이의 상태와 행동
state, action, _, _, _ = self.buffer[0]
# N-step 리턴 계산
n_step_return = 0.0
for i, (_, _, reward, _, done) in enumerate(self.buffer):
n_step_return += (self.gamma ** i) * reward
if done:
break
# 마지막 전이의 다음 상태
_, _, _, next_state, done = self.buffer[-1]
return state, action, n_step_return, next_state, done
def is_ready(self):
return len(self.buffer) == self.n_steps
# N-step DQN 학습에서의 사용
def compute_nstep_target(n_step_return, next_state, done, target_net, gamma, n_steps, device):
"""N-step TD 타겟 계산"""
if done:
return n_step_return
with torch.no_grad():
next_q = target_net(
torch.tensor([next_state], dtype=torch.float32).to(device)
).max(dim=1)[0].item()
return n_step_return + (gamma ** n_steps) * next_q
2. Double DQN
Problem: Q-Value Overestimation
The max operator in basic DQN target computation tends to systematically overestimate Q values:
기본 DQN 타겟: r + gamma * max_a Q_target(s', a)
Because the max operation simultaneously performs action selection and value evaluation, overestimation occurs when actions with noisy high estimates are selected.
Solution: Separating Action Selection and Value Evaluation
Double DQN uses the online network for action selection and the target network for value evaluation:
Double DQN 타겟:
a* = argmax_a Q_online(s', a) # 온라인 네트워크로 행동 선택
target = r + gamma * Q_target(s', a*) # 타겟 네트워크로 가치 평가
def compute_double_dqn_loss(online_net, target_net, states, actions,
rewards, next_states, dones, gamma, device):
"""Double DQN 손실 계산"""
states_t = torch.tensor(states, dtype=torch.float32).to(device)
actions_t = torch.tensor(actions, dtype=torch.long).to(device)
rewards_t = torch.tensor(rewards, dtype=torch.float32).to(device)
next_states_t = torch.tensor(next_states, dtype=torch.float32).to(device)
dones_t = torch.tensor(dones, dtype=torch.bool).to(device)
# 현재 Q값
current_q = online_net(states_t).gather(1, actions_t.unsqueeze(1)).squeeze(1)
with torch.no_grad():
# 핵심: 온라인 네트워크로 행동 선택
best_actions = online_net(next_states_t).argmax(dim=1)
# 타겟 네트워크로 해당 행동의 가치 평가
next_q = target_net(next_states_t).gather(1, best_actions.unsqueeze(1)).squeeze(1)
next_q[dones_t] = 0.0
target_q = rewards_t + gamma * next_q
loss = nn.MSELoss()(current_q, target_q)
return loss
Overestimation Reduction Effect
Double DQN is especially effective when the action space is large or rewards are noisy. It shows more stable and better final performance compared to basic DQN.
3. Noisy Networks
Problem: Limitations of Epsilon-Greedy
The epsilon-greedy policy performs random exploration at the same probability in all states, unable to adjust the appropriate level of exploration per state.
Solution: Parameter Noise
Learnable noise is added to network weights. The agent automatically learns the degree of exploration.
import torch
import torch.nn as nn
import math
class NoisyLinear(nn.Module):
"""Factorized Gaussian Noise를 사용하는 선형 레이어"""
def __init__(self, in_features, out_features, sigma_init=0.5):
super().__init__()
self.in_features = in_features
self.out_features = out_features
# 학습 가능한 파라미터
self.weight_mu = nn.Parameter(torch.empty(out_features, in_features))
self.weight_sigma = nn.Parameter(torch.empty(out_features, in_features))
self.bias_mu = nn.Parameter(torch.empty(out_features))
self.bias_sigma = nn.Parameter(torch.empty(out_features))
# 노이즈를 위한 버퍼 (학습 대상 아님)
self.register_buffer('weight_epsilon', torch.empty(out_features, in_features))
self.register_buffer('bias_epsilon', torch.empty(out_features))
self.sigma_init = sigma_init
self.reset_parameters()
self.reset_noise()
def reset_parameters(self):
mu_range = 1.0 / math.sqrt(self.in_features)
self.weight_mu.data.uniform_(-mu_range, mu_range)
self.weight_sigma.data.fill_(self.sigma_init / math.sqrt(self.in_features))
self.bias_mu.data.uniform_(-mu_range, mu_range)
self.bias_sigma.data.fill_(self.sigma_init / math.sqrt(self.out_features))
def reset_noise(self):
"""새로운 노이즈 생성"""
epsilon_in = self._scale_noise(self.in_features)
epsilon_out = self._scale_noise(self.out_features)
self.weight_epsilon.copy_(epsilon_out.outer(epsilon_in))
self.bias_epsilon.copy_(epsilon_out)
def _scale_noise(self, size):
x = torch.randn(size)
return x.sign() * x.abs().sqrt()
def forward(self, x):
if self.training:
weight = self.weight_mu + self.weight_sigma * self.weight_epsilon
bias = self.bias_mu + self.bias_sigma * self.bias_epsilon
else:
weight = self.weight_mu
bias = self.bias_mu
return nn.functional.linear(x, weight, bias)
class NoisyDQN(nn.Module):
"""Noisy Network를 사용하는 DQN"""
def __init__(self, obs_size, n_actions):
super().__init__()
self.fc1 = nn.Linear(obs_size, 128)
self.noisy_fc2 = NoisyLinear(128, 128)
self.noisy_fc3 = NoisyLinear(128, n_actions)
def forward(self, x):
x = torch.relu(self.fc1(x))
x = torch.relu(self.noisy_fc2(x))
return self.noisy_fc3(x)
def reset_noise(self):
"""모든 NoisyLinear 레이어의 노이즈 재생성"""
self.noisy_fc2.reset_noise()
self.noisy_fc3.reset_noise()
With Noisy Networks, no epsilon schedule is needed. The network itself manages exploration.
4. Prioritized Experience Replay
Problem: Inefficiency of Uniform Sampling
Basic experience replay samples all experiences with equal probability. However, some experiences are more useful for learning (those with large TD errors).
Solution: TD Error-Based Priority
Experiences with larger TD errors are sampled more frequently.
class SumTree:
"""효율적인 우선순위 샘플링을 위한 합 트리"""
def __init__(self, capacity):
self.capacity = capacity
self.tree = np.zeros(2 * capacity - 1)
self.data = [None] * capacity
self.write_idx = 0
self.size = 0
def total(self):
return self.tree[0]
def add(self, priority, data):
idx = self.write_idx + self.capacity - 1
self.data[self.write_idx] = data
self._update(idx, priority)
self.write_idx = (self.write_idx + 1) % self.capacity
self.size = min(self.size + 1, self.capacity)
def _update(self, idx, priority):
change = priority - self.tree[idx]
self.tree[idx] = priority
while idx > 0:
idx = (idx - 1) // 2
self.tree[idx] += change
def get(self, value):
"""값에 해당하는 데이터를 검색"""
idx = 0
while idx < self.capacity - 1:
left = 2 * idx + 1
right = left + 1
if value <= self.tree[left]:
idx = left
else:
value -= self.tree[left]
idx = right
data_idx = idx - self.capacity + 1
return idx, self.tree[idx], self.data[data_idx]
class PrioritizedReplayBuffer:
"""우선순위 경험 리플레이 버퍼"""
def __init__(self, capacity, alpha=0.6, beta_start=0.4, beta_frames=100000):
self.tree = SumTree(capacity)
self.alpha = alpha # 우선순위의 지수 (0=균일, 1=완전 우선순위)
self.beta_start = beta_start
self.beta_frames = beta_frames
self.frame = 0
self.max_priority = 1.0
def get_beta(self):
"""중요도 샘플링 보정 계수 (점차 1로 증가)"""
beta = self.beta_start + (1.0 - self.beta_start) * \
min(1.0, self.frame / self.beta_frames)
self.frame += 1
return beta
def push(self, state, action, reward, next_state, done):
data = (state, action, reward, next_state, done)
priority = self.max_priority ** self.alpha
self.tree.add(priority, data)
def sample(self, batch_size):
beta = self.get_beta()
batch = []
indices = []
priorities = []
segment = self.tree.total() / batch_size
for i in range(batch_size):
low = segment * i
high = segment * (i + 1)
value = np.random.uniform(low, high)
idx, priority, data = self.tree.get(value)
batch.append(data)
indices.append(idx)
priorities.append(priority)
# 중요도 샘플링 가중치 계산
probs = np.array(priorities) / self.tree.total()
weights = (self.tree.size * probs) ** (-beta)
weights = weights / weights.max()
states, actions, rewards, next_states, dones = zip(*batch)
return (
np.array(states),
np.array(actions),
np.array(rewards, dtype=np.float32),
np.array(next_states),
np.array(dones, dtype=np.bool_),
np.array(indices),
torch.tensor(weights, dtype=torch.float32),
)
def update_priorities(self, indices, td_errors):
"""TD 오차를 기반으로 우선순위 업데이트"""
for idx, td_error in zip(indices, td_errors):
priority = (abs(td_error) + 1e-6) ** self.alpha
self.max_priority = max(self.max_priority, priority)
self.tree._update(idx, priority)
5. Dueling DQN
Core Idea
Q values are separated into state value V(s) and advantage A(s, a):
Q(s, a) = V(s) + A(s, a) - mean_a(A(s, a))
The advantage of this separation is that in some states the outcome is similar regardless of which action is taken, and in such cases only V(s) needs to be estimated accurately.
class DuelingDQN(nn.Module):
"""Dueling DQN 네트워크"""
def __init__(self, obs_size, n_actions):
super().__init__()
# 공유 특징 추출기
self.feature = nn.Sequential(
nn.Linear(obs_size, 128),
nn.ReLU(),
)
# 가치 스트림: V(s) 추정
self.value_stream = nn.Sequential(
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, 1),
)
# 어드밴티지 스트림: A(s, a) 추정
self.advantage_stream = nn.Sequential(
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, n_actions),
)
def forward(self, x):
features = self.feature(x)
value = self.value_stream(features)
advantage = self.advantage_stream(features)
# Q = V + (A - mean(A))
q_values = value + advantage - advantage.mean(dim=-1, keepdim=True)
return q_values
6. Categorical DQN (C51)
Core Idea
Traditional DQN only estimates the expected value of Q. Categorical DQN estimates the entire probability distribution of Q values.
class CategoricalDQN(nn.Module):
"""Categorical DQN (C51) 네트워크"""
def __init__(self, obs_size, n_actions, n_atoms=51, v_min=-10, v_max=10):
super().__init__()
self.n_actions = n_actions
self.n_atoms = n_atoms
self.v_min = v_min
self.v_max = v_max
self.delta_z = (v_max - v_min) / (n_atoms - 1)
self.register_buffer(
'support',
torch.linspace(v_min, v_max, n_atoms)
)
self.network = nn.Sequential(
nn.Linear(obs_size, 128),
nn.ReLU(),
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, n_actions * n_atoms),
)
def forward(self, x):
logits = self.network(x)
# (batch, n_actions * n_atoms) -> (batch, n_actions, n_atoms)
logits = logits.view(-1, self.n_actions, self.n_atoms)
# 소프트맥스로 확률 분포 변환
probs = torch.softmax(logits, dim=-1)
return probs
def get_q_values(self, x):
"""확률 분포로부터 Q값(기대값) 계산"""
probs = self.forward(x)
# Q(s, a) = sum_i p_i * z_i
q_values = (probs * self.support.unsqueeze(0).unsqueeze(0)).sum(dim=-1)
return q_values
Rainbow: Combining Everything
Rainbow is an algorithm that combines all six techniques above. Each technique contributes independently, and combining them produces synergistic effects.
Rainbow Components
- N-step returns: More accurate targets
- Double DQN: Prevents overestimation
- Noisy Networks: Automatic exploration management
- Prioritized Replay: Efficient experience utilization
- Dueling Architecture: V and A separation
- Categorical DQN: Value distribution learning
class RainbowDQN(nn.Module):
"""Rainbow DQN: 모든 확장 기법 결합"""
def __init__(self, obs_size, n_actions, n_atoms=51, v_min=-10, v_max=10):
super().__init__()
self.n_actions = n_actions
self.n_atoms = n_atoms
self.v_min = v_min
self.v_max = v_max
self.register_buffer('support', torch.linspace(v_min, v_max, n_atoms))
# 공유 특징 추출기
self.feature = nn.Sequential(
nn.Linear(obs_size, 128),
nn.ReLU(),
)
# Dueling + Noisy + Categorical
# 가치 스트림 (Noisy 레이어 사용)
self.value_noisy1 = NoisyLinear(128, 128)
self.value_noisy2 = NoisyLinear(128, n_atoms)
# 어드밴티지 스트림 (Noisy 레이어 사용)
self.advantage_noisy1 = NoisyLinear(128, 128)
self.advantage_noisy2 = NoisyLinear(128, n_actions * n_atoms)
def forward(self, x):
features = self.feature(x)
# 가치 스트림
value = torch.relu(self.value_noisy1(features))
value = self.value_noisy2(value).view(-1, 1, self.n_atoms)
# 어드밴티지 스트림
advantage = torch.relu(self.advantage_noisy1(features))
advantage = self.advantage_noisy2(advantage).view(-1, self.n_actions, self.n_atoms)
# Dueling: Q = V + A - mean(A) (분포 수준에서)
q_dist = value + advantage - advantage.mean(dim=1, keepdim=True)
probs = torch.softmax(q_dist, dim=-1)
return probs
def get_q_values(self, x):
probs = self.forward(x)
q_values = (probs * self.support.unsqueeze(0).unsqueeze(0)).sum(dim=-1)
return q_values
def reset_noise(self):
self.value_noisy1.reset_noise()
self.value_noisy2.reset_noise()
self.advantage_noisy1.reset_noise()
self.advantage_noisy2.reset_noise()
Performance Contribution of Each Technique
The contribution of each technique in Atari games:
| Technique | Primary Effect | Contribution |
|---|---|---|
| Prioritized Replay | Improved learning efficiency | High |
| N-step Returns | Faster convergence | High |
| Categorical DQN | Value distribution learning | Medium-High |
| Dueling | State value separation | Medium |
| Double DQN | Overestimation prevention | Medium |
| Noisy Networks | Adaptive exploration | Medium |
Rainbow, combining all techniques, shows far superior performance to individual techniques. It shows particularly large improvements in data efficiency, achieving much higher scores with the same number of frames.
Summary
- N-step DQN: Uses actual rewards from multiple steps to improve target accuracy
- Double DQN: Separates action selection and value evaluation to prevent overestimation
- Noisy Networks: Adaptive exploration per state through learnable noise
- Prioritized Replay: Learns more frequently from experiences with large TD errors
- Dueling DQN: Separates Q values into V(s) and A(s, a) for improved learning efficiency
- Categorical DQN: Learns the probability distribution of values for richer information
- Rainbow: Achieves best performance through combination of all six techniques
In the next article, we will apply reinforcement learning to a real financial problem and build a stock trading agent.