- Authors

- Name
- Youngju Kim
- @fjvbn20031
DQN의 개선 방향
기본 DQN은 Atari 게임에서 인간 수준의 성능을 보여주었지만, 여전히 개선할 여지가 많습니다. 이번 글에서는 DQN의 성능을 크게 향상시키는 6가지 확장 기법을 살펴보겠습니다.
1. N-step DQN
문제: 단일 스텝 TD 타겟의 한계
기본 DQN은 1-step TD 타겟을 사용합니다.
1-step: target = r_t + gamma * max_a Q(s_{t+1}, a)
이는 즉각적인 보상만 직접 사용하고 나머지는 추정에 의존합니다.
해결: N-step 보상 사용
N-step 방법은 여러 스텝의 실제 보상을 사용하여 더 정확한 타겟을 만듭니다.
N-step: target = r_t + gamma * r_{t+1} + ... + gamma^{n-1} * r_{t+n-1} + gamma^n * max_a Q(s_{t+n}, a)
import torch
import torch.nn as nn
import numpy as np
from collections import deque
class NStepBuffer:
"""N-step 리턴을 위한 버퍼"""
def __init__(self, n_steps, gamma):
self.n_steps = n_steps
self.gamma = gamma
self.buffer = deque(maxlen=n_steps)
def push(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def get(self):
"""N-step 전이 데이터를 반환"""
# 첫 번째 전이의 상태와 행동
state, action, _, _, _ = self.buffer[0]
# N-step 리턴 계산
n_step_return = 0.0
for i, (_, _, reward, _, done) in enumerate(self.buffer):
n_step_return += (self.gamma ** i) * reward
if done:
break
# 마지막 전이의 다음 상태
_, _, _, next_state, done = self.buffer[-1]
return state, action, n_step_return, next_state, done
def is_ready(self):
return len(self.buffer) == self.n_steps
# N-step DQN 학습에서의 사용
def compute_nstep_target(n_step_return, next_state, done, target_net, gamma, n_steps, device):
"""N-step TD 타겟 계산"""
if done:
return n_step_return
with torch.no_grad():
next_q = target_net(
torch.tensor([next_state], dtype=torch.float32).to(device)
).max(dim=1)[0].item()
return n_step_return + (gamma ** n_steps) * next_q
2. Double DQN
문제: Q값 과대추정 (Overestimation)
기본 DQN의 타겟 계산에서 max 연산자는 Q값을 체계적으로 과대추정하는 경향이 있습니다.
기본 DQN 타겟: r + gamma * max_a Q_target(s', a)
max 연산이 행동 선택과 가치 평가를 동시에 수행하기 때문에, 노이즈에 의해 높게 추정된 행동이 선택되면 과대추정이 발생합니다.
해결: 행동 선택과 가치 평가 분리
Double DQN은 행동 선택에는 온라인 네트워크를, 가치 평가에는 타겟 네트워크를 사용합니다.
Double DQN 타겟:
a* = argmax_a Q_online(s', a) # 온라인 네트워크로 행동 선택
target = r + gamma * Q_target(s', a*) # 타겟 네트워크로 가치 평가
def compute_double_dqn_loss(online_net, target_net, states, actions,
rewards, next_states, dones, gamma, device):
"""Double DQN 손실 계산"""
states_t = torch.tensor(states, dtype=torch.float32).to(device)
actions_t = torch.tensor(actions, dtype=torch.long).to(device)
rewards_t = torch.tensor(rewards, dtype=torch.float32).to(device)
next_states_t = torch.tensor(next_states, dtype=torch.float32).to(device)
dones_t = torch.tensor(dones, dtype=torch.bool).to(device)
# 현재 Q값
current_q = online_net(states_t).gather(1, actions_t.unsqueeze(1)).squeeze(1)
with torch.no_grad():
# 핵심: 온라인 네트워크로 행동 선택
best_actions = online_net(next_states_t).argmax(dim=1)
# 타겟 네트워크로 해당 행동의 가치 평가
next_q = target_net(next_states_t).gather(1, best_actions.unsqueeze(1)).squeeze(1)
next_q[dones_t] = 0.0
target_q = rewards_t + gamma * next_q
loss = nn.MSELoss()(current_q, target_q)
return loss
과대추정 감소 효과
Double DQN은 특히 행동 공간이 크거나 보상이 노이즈가 많은 환경에서 효과적입니다. 기본 DQN 대비 안정적이고 더 나은 최종 성능을 보여줍니다.
3. Noisy Networks
문제: 엡실론-탐욕의 한계
엡실론-탐욕 정책은 모든 상태에서 동일한 확률로 무작위 탐색을 수행합니다. 이는 상태별로 적절한 탐색 수준을 조절하지 못합니다.
해결: 파라미터 노이즈
네트워크 가중치에 학습 가능한 노이즈를 추가합니다. 에이전트가 자동으로 탐색의 정도를 학습합니다.
import torch
import torch.nn as nn
import math
class NoisyLinear(nn.Module):
"""Factorized Gaussian Noise를 사용하는 선형 레이어"""
def __init__(self, in_features, out_features, sigma_init=0.5):
super().__init__()
self.in_features = in_features
self.out_features = out_features
# 학습 가능한 파라미터
self.weight_mu = nn.Parameter(torch.empty(out_features, in_features))
self.weight_sigma = nn.Parameter(torch.empty(out_features, in_features))
self.bias_mu = nn.Parameter(torch.empty(out_features))
self.bias_sigma = nn.Parameter(torch.empty(out_features))
# 노이즈를 위한 버퍼 (학습 대상 아님)
self.register_buffer('weight_epsilon', torch.empty(out_features, in_features))
self.register_buffer('bias_epsilon', torch.empty(out_features))
self.sigma_init = sigma_init
self.reset_parameters()
self.reset_noise()
def reset_parameters(self):
mu_range = 1.0 / math.sqrt(self.in_features)
self.weight_mu.data.uniform_(-mu_range, mu_range)
self.weight_sigma.data.fill_(self.sigma_init / math.sqrt(self.in_features))
self.bias_mu.data.uniform_(-mu_range, mu_range)
self.bias_sigma.data.fill_(self.sigma_init / math.sqrt(self.out_features))
def reset_noise(self):
"""새로운 노이즈 생성"""
epsilon_in = self._scale_noise(self.in_features)
epsilon_out = self._scale_noise(self.out_features)
self.weight_epsilon.copy_(epsilon_out.outer(epsilon_in))
self.bias_epsilon.copy_(epsilon_out)
def _scale_noise(self, size):
x = torch.randn(size)
return x.sign() * x.abs().sqrt()
def forward(self, x):
if self.training:
weight = self.weight_mu + self.weight_sigma * self.weight_epsilon
bias = self.bias_mu + self.bias_sigma * self.bias_epsilon
else:
weight = self.weight_mu
bias = self.bias_mu
return nn.functional.linear(x, weight, bias)
class NoisyDQN(nn.Module):
"""Noisy Network를 사용하는 DQN"""
def __init__(self, obs_size, n_actions):
super().__init__()
self.fc1 = nn.Linear(obs_size, 128)
self.noisy_fc2 = NoisyLinear(128, 128)
self.noisy_fc3 = NoisyLinear(128, n_actions)
def forward(self, x):
x = torch.relu(self.fc1(x))
x = torch.relu(self.noisy_fc2(x))
return self.noisy_fc3(x)
def reset_noise(self):
"""모든 NoisyLinear 레이어의 노이즈 재생성"""
self.noisy_fc2.reset_noise()
self.noisy_fc3.reset_noise()
Noisy Network를 사용하면 엡실론 스케줄이 필요 없습니다. 네트워크 자체가 탐색을 관리합니다.
4. Prioritized Experience Replay
문제: 균일 샘플링의 비효율성
기본 경험 리플레이는 모든 경험을 동일한 확률로 샘플링합니다. 하지만 학습에 더 유용한 경험(TD 오차가 큰 경험)이 있습니다.
해결: TD 오차 기반 우선순위
TD 오차가 큰 경험을 더 자주 샘플링합니다.
class SumTree:
"""효율적인 우선순위 샘플링을 위한 합 트리"""
def __init__(self, capacity):
self.capacity = capacity
self.tree = np.zeros(2 * capacity - 1)
self.data = [None] * capacity
self.write_idx = 0
self.size = 0
def total(self):
return self.tree[0]
def add(self, priority, data):
idx = self.write_idx + self.capacity - 1
self.data[self.write_idx] = data
self._update(idx, priority)
self.write_idx = (self.write_idx + 1) % self.capacity
self.size = min(self.size + 1, self.capacity)
def _update(self, idx, priority):
change = priority - self.tree[idx]
self.tree[idx] = priority
while idx > 0:
idx = (idx - 1) // 2
self.tree[idx] += change
def get(self, value):
"""값에 해당하는 데이터를 검색"""
idx = 0
while idx < self.capacity - 1:
left = 2 * idx + 1
right = left + 1
if value <= self.tree[left]:
idx = left
else:
value -= self.tree[left]
idx = right
data_idx = idx - self.capacity + 1
return idx, self.tree[idx], self.data[data_idx]
class PrioritizedReplayBuffer:
"""우선순위 경험 리플레이 버퍼"""
def __init__(self, capacity, alpha=0.6, beta_start=0.4, beta_frames=100000):
self.tree = SumTree(capacity)
self.alpha = alpha # 우선순위의 지수 (0=균일, 1=완전 우선순위)
self.beta_start = beta_start
self.beta_frames = beta_frames
self.frame = 0
self.max_priority = 1.0
def get_beta(self):
"""중요도 샘플링 보정 계수 (점차 1로 증가)"""
beta = self.beta_start + (1.0 - self.beta_start) * \
min(1.0, self.frame / self.beta_frames)
self.frame += 1
return beta
def push(self, state, action, reward, next_state, done):
data = (state, action, reward, next_state, done)
priority = self.max_priority ** self.alpha
self.tree.add(priority, data)
def sample(self, batch_size):
beta = self.get_beta()
batch = []
indices = []
priorities = []
segment = self.tree.total() / batch_size
for i in range(batch_size):
low = segment * i
high = segment * (i + 1)
value = np.random.uniform(low, high)
idx, priority, data = self.tree.get(value)
batch.append(data)
indices.append(idx)
priorities.append(priority)
# 중요도 샘플링 가중치 계산
probs = np.array(priorities) / self.tree.total()
weights = (self.tree.size * probs) ** (-beta)
weights = weights / weights.max()
states, actions, rewards, next_states, dones = zip(*batch)
return (
np.array(states),
np.array(actions),
np.array(rewards, dtype=np.float32),
np.array(next_states),
np.array(dones, dtype=np.bool_),
np.array(indices),
torch.tensor(weights, dtype=torch.float32),
)
def update_priorities(self, indices, td_errors):
"""TD 오차를 기반으로 우선순위 업데이트"""
for idx, td_error in zip(indices, td_errors):
priority = (abs(td_error) + 1e-6) ** self.alpha
self.max_priority = max(self.max_priority, priority)
self.tree._update(idx, priority)
5. Dueling DQN
핵심 아이디어
Q값을 **상태 가치 V(s)**와 **어드밴티지 A(s, a)**로 분리합니다.
Q(s, a) = V(s) + A(s, a) - mean_a(A(s, a))
이 분리의 장점은, 어떤 상태에서는 어떤 행동을 취하든 결과가 비슷한 경우가 있는데, 이때 V(s)만 정확히 추정하면 된다는 것입니다.
class DuelingDQN(nn.Module):
"""Dueling DQN 네트워크"""
def __init__(self, obs_size, n_actions):
super().__init__()
# 공유 특징 추출기
self.feature = nn.Sequential(
nn.Linear(obs_size, 128),
nn.ReLU(),
)
# 가치 스트림: V(s) 추정
self.value_stream = nn.Sequential(
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, 1),
)
# 어드밴티지 스트림: A(s, a) 추정
self.advantage_stream = nn.Sequential(
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, n_actions),
)
def forward(self, x):
features = self.feature(x)
value = self.value_stream(features)
advantage = self.advantage_stream(features)
# Q = V + (A - mean(A))
q_values = value + advantage - advantage.mean(dim=-1, keepdim=True)
return q_values
6. Categorical DQN (C51)
핵심 아이디어
기존 DQN은 Q값의 기대값만 추정합니다. Categorical DQN은 Q값의 전체 확률 분포를 추정합니다.
class CategoricalDQN(nn.Module):
"""Categorical DQN (C51) 네트워크"""
def __init__(self, obs_size, n_actions, n_atoms=51, v_min=-10, v_max=10):
super().__init__()
self.n_actions = n_actions
self.n_atoms = n_atoms
self.v_min = v_min
self.v_max = v_max
self.delta_z = (v_max - v_min) / (n_atoms - 1)
self.register_buffer(
'support',
torch.linspace(v_min, v_max, n_atoms)
)
self.network = nn.Sequential(
nn.Linear(obs_size, 128),
nn.ReLU(),
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, n_actions * n_atoms),
)
def forward(self, x):
logits = self.network(x)
# (batch, n_actions * n_atoms) -> (batch, n_actions, n_atoms)
logits = logits.view(-1, self.n_actions, self.n_atoms)
# 소프트맥스로 확률 분포 변환
probs = torch.softmax(logits, dim=-1)
return probs
def get_q_values(self, x):
"""확률 분포로부터 Q값(기대값) 계산"""
probs = self.forward(x)
# Q(s, a) = sum_i p_i * z_i
q_values = (probs * self.support.unsqueeze(0).unsqueeze(0)).sum(dim=-1)
return q_values
def compute_categorical_loss(online_net, target_net, states, actions,
rewards, next_states, dones, gamma, device):
"""Categorical DQN 손실 계산"""
batch_size = len(states)
n_atoms = online_net.n_atoms
v_min = online_net.v_min
v_max = online_net.v_max
delta_z = online_net.delta_z
support = online_net.support
states_t = torch.tensor(states, dtype=torch.float32).to(device)
actions_t = torch.tensor(actions, dtype=torch.long).to(device)
rewards_t = torch.tensor(rewards, dtype=torch.float32).to(device)
next_states_t = torch.tensor(next_states, dtype=torch.float32).to(device)
dones_t = torch.tensor(dones, dtype=torch.bool).to(device)
with torch.no_grad():
# 다음 상태의 최적 행동 (Double DQN 스타일)
next_q = online_net.get_q_values(next_states_t)
next_actions = next_q.argmax(dim=1)
# 타겟 분포
next_probs = target_net(next_states_t)
next_dist = next_probs[range(batch_size), next_actions]
# 벨만 업데이트: T_z = r + gamma * z
t_z = rewards_t.unsqueeze(1) + gamma * support.unsqueeze(0) * (~dones_t).float().unsqueeze(1)
t_z = t_z.clamp(v_min, v_max)
# 원자(atom)에 투영
b = (t_z - v_min) / delta_z
l = b.floor().long()
u = b.ceil().long()
# 분포 투영
target_dist = torch.zeros_like(next_dist)
for i in range(batch_size):
for j in range(n_atoms):
target_dist[i, l[i, j]] += next_dist[i, j] * (u[i, j].float() - b[i, j])
target_dist[i, u[i, j]] += next_dist[i, j] * (b[i, j] - l[i, j].float())
# 현재 분포
current_probs = online_net(states_t)
current_dist = current_probs[range(batch_size), actions_t]
# Cross-Entropy 손실
loss = -(target_dist * (current_dist + 1e-8).log()).sum(dim=-1).mean()
return loss
Rainbow: 모두 결합하기
Rainbow는 위의 6가지 기법을 모두 결합한 알고리즘입니다. 각 기법이 독립적으로 기여하며, 결합하면 시너지 효과를 냅니다.
Rainbow 구성 요소
- N-step returns: 더 정확한 타겟
- Double DQN: 과대추정 방지
- Noisy Networks: 자동 탐색 관리
- Prioritized Replay: 효율적 경험 활용
- Dueling Architecture: V와 A 분리
- Categorical DQN: 가치 분포 학습
class RainbowDQN(nn.Module):
"""Rainbow DQN: 모든 확장 기법 결합"""
def __init__(self, obs_size, n_actions, n_atoms=51, v_min=-10, v_max=10):
super().__init__()
self.n_actions = n_actions
self.n_atoms = n_atoms
self.v_min = v_min
self.v_max = v_max
self.register_buffer('support', torch.linspace(v_min, v_max, n_atoms))
# 공유 특징 추출기
self.feature = nn.Sequential(
nn.Linear(obs_size, 128),
nn.ReLU(),
)
# Dueling + Noisy + Categorical
# 가치 스트림 (Noisy 레이어 사용)
self.value_noisy1 = NoisyLinear(128, 128)
self.value_noisy2 = NoisyLinear(128, n_atoms)
# 어드밴티지 스트림 (Noisy 레이어 사용)
self.advantage_noisy1 = NoisyLinear(128, 128)
self.advantage_noisy2 = NoisyLinear(128, n_actions * n_atoms)
def forward(self, x):
features = self.feature(x)
# 가치 스트림
value = torch.relu(self.value_noisy1(features))
value = self.value_noisy2(value).view(-1, 1, self.n_atoms)
# 어드밴티지 스트림
advantage = torch.relu(self.advantage_noisy1(features))
advantage = self.advantage_noisy2(advantage).view(-1, self.n_actions, self.n_atoms)
# Dueling: Q = V + A - mean(A) (분포 수준에서)
q_dist = value + advantage - advantage.mean(dim=1, keepdim=True)
probs = torch.softmax(q_dist, dim=-1)
return probs
def get_q_values(self, x):
probs = self.forward(x)
q_values = (probs * self.support.unsqueeze(0).unsqueeze(0)).sum(dim=-1)
return q_values
def reset_noise(self):
self.value_noisy1.reset_noise()
self.value_noisy2.reset_noise()
self.advantage_noisy1.reset_noise()
self.advantage_noisy2.reset_noise()
Rainbow 학습 루프의 핵심
def train_rainbow_step(online_net, target_net, optimizer, buffer, n_steps, gamma, device):
"""Rainbow 한 스텝 학습"""
# 1. 우선순위 기반 샘플링
states, actions, rewards, next_states, dones, indices, weights = \
buffer.sample(batch_size=32)
weights = weights.to(device)
# 2. Categorical 손실 계산 (Double DQN 스타일)
loss = compute_categorical_loss(
online_net, target_net,
states, actions, rewards, next_states, dones,
gamma=(gamma ** n_steps), # N-step 감마
device=device,
)
# 3. 중요도 샘플링 가중치 적용
weighted_loss = (loss * weights).mean()
# 4. 역전파
optimizer.zero_grad()
weighted_loss.backward()
torch.nn.utils.clip_grad_norm_(online_net.parameters(), 10)
optimizer.step()
# 5. 우선순위 업데이트
td_errors = loss.detach().cpu().numpy()
buffer.update_priorities(indices, td_errors)
# 6. 노이즈 재생성
online_net.reset_noise()
target_net.reset_noise()
return weighted_loss.item()
각 기법의 성능 기여도
Atari 게임에서의 각 기법의 기여도를 살펴보면 다음과 같습니다.
| 기법 | 주요 효과 | 기여도 |
|---|---|---|
| Prioritized Replay | 학습 효율 향상 | 높음 |
| N-step Returns | 더 빠른 수렴 | 높음 |
| Categorical DQN | 가치 분포 학습 | 중간~높음 |
| Dueling | 상태 가치 분리 | 중간 |
| Double DQN | 과대추정 방지 | 중간 |
| Noisy Networks | 적응적 탐색 | 중간 |
모든 기법을 결합한 Rainbow는 개별 기법보다 훨씬 뛰어난 성능을 보입니다. 특히 데이터 효율성에서 큰 향상을 보여, 같은 수의 프레임에서 훨씬 높은 점수를 달성합니다.
CartPole에서 확장 기법 비교 실험
import gymnasium as gym
from collections import deque
import random
def train_and_compare(variant="basic", n_episodes=300):
"""다양한 DQN 변형을 CartPole에서 비교"""
env = gym.make("CartPole-v1")
obs_size = env.observation_space.shape[0]
n_actions = env.action_space.n
device = torch.device("cpu")
if variant == "basic":
net = nn.Sequential(
nn.Linear(obs_size, 128), nn.ReLU(),
nn.Linear(128, 128), nn.ReLU(),
nn.Linear(128, n_actions),
)
elif variant == "dueling":
net = DuelingDQN(obs_size, n_actions)
elif variant == "noisy":
net = NoisyDQN(obs_size, n_actions)
else:
net = nn.Sequential(
nn.Linear(obs_size, 128), nn.ReLU(),
nn.Linear(128, n_actions),
)
target = type(net)(obs_size, n_actions) if variant in ["dueling", "noisy"] else \
nn.Sequential(
nn.Linear(obs_size, 128), nn.ReLU(),
nn.Linear(128, 128), nn.ReLU(),
nn.Linear(128, n_actions),
)
target.load_state_dict(net.state_dict())
optimizer = torch.optim.Adam(net.parameters(), lr=1e-3)
buffer = ReplayBuffer(10000)
rewards_history = []
epsilon = 1.0 if variant != "noisy" else 0.0
for episode in range(n_episodes):
obs, _ = env.reset()
total_reward = 0
while True:
# 행동 선택
if variant == "noisy":
with torch.no_grad():
q = net(torch.tensor([obs], dtype=torch.float32))
action = q.argmax(dim=1).item()
elif random.random() < epsilon:
action = env.action_space.sample()
else:
with torch.no_grad():
q = net(torch.tensor([obs], dtype=torch.float32))
action = q.argmax(dim=1).item()
next_obs, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
buffer.push(obs, action, reward, next_obs, done)
total_reward += reward
obs = next_obs
# 학습 (Double DQN 방식)
if len(buffer) >= 64:
s, a, r, ns, d = buffer.sample(64)
s_t = torch.tensor(s, dtype=torch.float32)
a_t = torch.tensor(a, dtype=torch.long)
r_t = torch.tensor(r, dtype=torch.float32)
ns_t = torch.tensor(ns, dtype=torch.float32)
d_t = torch.tensor(d, dtype=torch.bool)
curr_q = net(s_t).gather(1, a_t.unsqueeze(1)).squeeze(1)
with torch.no_grad():
# Double DQN: 온라인으로 행동 선택, 타겟으로 평가
best_a = net(ns_t).argmax(dim=1)
next_q = target(ns_t).gather(1, best_a.unsqueeze(1)).squeeze(1)
next_q[d_t] = 0.0
tgt = r_t + 0.99 * next_q
loss = nn.MSELoss()(curr_q, tgt)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if done:
break
if variant != "noisy":
epsilon = max(0.01, epsilon * 0.995)
if hasattr(net, 'reset_noise'):
net.reset_noise()
target.reset_noise()
rewards_history.append(total_reward)
if episode % 50 == 0:
mean_r = np.mean(rewards_history[-50:])
print(f"[{variant}] 에피소드 {episode}: 평균 보상={mean_r:.1f}")
if episode % 100 == 0:
target.load_state_dict(net.state_dict())
env.close()
return rewards_history
# 비교 실행
# results_basic = train_and_compare("basic")
# results_dueling = train_and_compare("dueling")
# results_noisy = train_and_compare("noisy")
정리
- N-step DQN: 여러 스텝의 실제 보상을 사용하여 타겟의 정확도 향상
- Double DQN: 행동 선택과 가치 평가를 분리하여 과대추정 방지
- Noisy Networks: 학습 가능한 노이즈로 상태별 적응적 탐색
- Prioritized Replay: TD 오차가 큰 경험을 더 자주 학습
- Dueling DQN: Q값을 V(s)와 A(s, a)로 분리하여 학습 효율 향상
- Categorical DQN: 가치의 확률 분포를 학습하여 더 풍부한 정보 활용
- Rainbow: 6가지 기법의 결합으로 최고 성능 달성
다음 글에서는 강화학습을 실제 금융 문제에 적용하여 주식 트레이딩 에이전트를 만들어 보겠습니다.