Skip to content

필사 모드: [심층 강화학습] 07. DQN 확장: Double DQN, Dueling DQN, Rainbow

한국어
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

DQN의 개선 방향

기본 DQN은 Atari 게임에서 인간 수준의 성능을 보여주었지만, 여전히 개선할 여지가 많습니다. 이번 글에서는 DQN의 성능을 크게 향상시키는 6가지 확장 기법을 살펴보겠습니다.

1. N-step DQN

문제: 단일 스텝 TD 타겟의 한계

기본 DQN은 1-step TD 타겟을 사용합니다.

1-step: target = r_t + gamma * max_a Q(s_{t+1}, a)

이는 즉각적인 보상만 직접 사용하고 나머지는 추정에 의존합니다.

해결: N-step 보상 사용

N-step 방법은 여러 스텝의 실제 보상을 사용하여 더 정확한 타겟을 만듭니다.

N-step: target = r_t + gamma * r_{t+1} + ... + gamma^{n-1} * r_{t+n-1} + gamma^n * max_a Q(s_{t+n}, a)

from collections import deque

class NStepBuffer:

"""N-step 리턴을 위한 버퍼"""

def __init__(self, n_steps, gamma):

self.n_steps = n_steps

self.gamma = gamma

self.buffer = deque(maxlen=n_steps)

def push(self, state, action, reward, next_state, done):

self.buffer.append((state, action, reward, next_state, done))

def get(self):

"""N-step 전이 데이터를 반환"""

첫 번째 전이의 상태와 행동

state, action, _, _, _ = self.buffer[0]

N-step 리턴 계산

n_step_return = 0.0

for i, (_, _, reward, _, done) in enumerate(self.buffer):

n_step_return += (self.gamma ** i) * reward

if done:

break

마지막 전이의 다음 상태

_, _, _, next_state, done = self.buffer[-1]

return state, action, n_step_return, next_state, done

def is_ready(self):

return len(self.buffer) == self.n_steps

N-step DQN 학습에서의 사용

def compute_nstep_target(n_step_return, next_state, done, target_net, gamma, n_steps, device):

"""N-step TD 타겟 계산"""

if done:

return n_step_return

with torch.no_grad():

next_q = target_net(

torch.tensor([next_state], dtype=torch.float32).to(device)

).max(dim=1)[0].item()

return n_step_return + (gamma ** n_steps) * next_q

2. Double DQN

문제: Q값 과대추정 (Overestimation)

기본 DQN의 타겟 계산에서 max 연산자는 Q값을 체계적으로 과대추정하는 경향이 있습니다.

기본 DQN 타겟: r + gamma * max_a Q_target(s', a)

max 연산이 행동 선택과 가치 평가를 동시에 수행하기 때문에, 노이즈에 의해 높게 추정된 행동이 선택되면 과대추정이 발생합니다.

해결: 행동 선택과 가치 평가 분리

Double DQN은 행동 선택에는 온라인 네트워크를, 가치 평가에는 타겟 네트워크를 사용합니다.

Double DQN 타겟:

a* = argmax_a Q_online(s', a) # 온라인 네트워크로 행동 선택

target = r + gamma * Q_target(s', a*) # 타겟 네트워크로 가치 평가

def compute_double_dqn_loss(online_net, target_net, states, actions,

rewards, next_states, dones, gamma, device):

"""Double DQN 손실 계산"""

states_t = torch.tensor(states, dtype=torch.float32).to(device)

actions_t = torch.tensor(actions, dtype=torch.long).to(device)

rewards_t = torch.tensor(rewards, dtype=torch.float32).to(device)

next_states_t = torch.tensor(next_states, dtype=torch.float32).to(device)

dones_t = torch.tensor(dones, dtype=torch.bool).to(device)

현재 Q값

current_q = online_net(states_t).gather(1, actions_t.unsqueeze(1)).squeeze(1)

with torch.no_grad():

핵심: 온라인 네트워크로 행동 선택

best_actions = online_net(next_states_t).argmax(dim=1)

타겟 네트워크로 해당 행동의 가치 평가

next_q = target_net(next_states_t).gather(1, best_actions.unsqueeze(1)).squeeze(1)

next_q[dones_t] = 0.0

target_q = rewards_t + gamma * next_q

loss = nn.MSELoss()(current_q, target_q)

return loss

과대추정 감소 효과

Double DQN은 특히 행동 공간이 크거나 보상이 노이즈가 많은 환경에서 효과적입니다. 기본 DQN 대비 안정적이고 더 나은 최종 성능을 보여줍니다.

3. Noisy Networks

문제: 엡실론-탐욕의 한계

엡실론-탐욕 정책은 모든 상태에서 동일한 확률로 무작위 탐색을 수행합니다. 이는 상태별로 적절한 탐색 수준을 조절하지 못합니다.

해결: 파라미터 노이즈

네트워크 가중치에 학습 가능한 노이즈를 추가합니다. 에이전트가 자동으로 탐색의 정도를 학습합니다.

class NoisyLinear(nn.Module):

"""Factorized Gaussian Noise를 사용하는 선형 레이어"""

def __init__(self, in_features, out_features, sigma_init=0.5):

super().__init__()

self.in_features = in_features

self.out_features = out_features

학습 가능한 파라미터

self.weight_mu = nn.Parameter(torch.empty(out_features, in_features))

self.weight_sigma = nn.Parameter(torch.empty(out_features, in_features))

self.bias_mu = nn.Parameter(torch.empty(out_features))

self.bias_sigma = nn.Parameter(torch.empty(out_features))

노이즈를 위한 버퍼 (학습 대상 아님)

self.register_buffer('weight_epsilon', torch.empty(out_features, in_features))

self.register_buffer('bias_epsilon', torch.empty(out_features))

self.sigma_init = sigma_init

self.reset_parameters()

self.reset_noise()

def reset_parameters(self):

mu_range = 1.0 / math.sqrt(self.in_features)

self.weight_mu.data.uniform_(-mu_range, mu_range)

self.weight_sigma.data.fill_(self.sigma_init / math.sqrt(self.in_features))

self.bias_mu.data.uniform_(-mu_range, mu_range)

self.bias_sigma.data.fill_(self.sigma_init / math.sqrt(self.out_features))

def reset_noise(self):

"""새로운 노이즈 생성"""

epsilon_in = self._scale_noise(self.in_features)

epsilon_out = self._scale_noise(self.out_features)

self.weight_epsilon.copy_(epsilon_out.outer(epsilon_in))

self.bias_epsilon.copy_(epsilon_out)

def _scale_noise(self, size):

x = torch.randn(size)

return x.sign() * x.abs().sqrt()

def forward(self, x):

if self.training:

weight = self.weight_mu + self.weight_sigma * self.weight_epsilon

bias = self.bias_mu + self.bias_sigma * self.bias_epsilon

else:

weight = self.weight_mu

bias = self.bias_mu

return nn.functional.linear(x, weight, bias)

class NoisyDQN(nn.Module):

"""Noisy Network를 사용하는 DQN"""

def __init__(self, obs_size, n_actions):

super().__init__()

self.fc1 = nn.Linear(obs_size, 128)

self.noisy_fc2 = NoisyLinear(128, 128)

self.noisy_fc3 = NoisyLinear(128, n_actions)

def forward(self, x):

x = torch.relu(self.fc1(x))

x = torch.relu(self.noisy_fc2(x))

return self.noisy_fc3(x)

def reset_noise(self):

"""모든 NoisyLinear 레이어의 노이즈 재생성"""

self.noisy_fc2.reset_noise()

self.noisy_fc3.reset_noise()

Noisy Network를 사용하면 엡실론 스케줄이 필요 없습니다. 네트워크 자체가 탐색을 관리합니다.

4. Prioritized Experience Replay

문제: 균일 샘플링의 비효율성

기본 경험 리플레이는 모든 경험을 동일한 확률로 샘플링합니다. 하지만 학습에 더 유용한 경험(TD 오차가 큰 경험)이 있습니다.

해결: TD 오차 기반 우선순위

TD 오차가 큰 경험을 더 자주 샘플링합니다.

class SumTree:

"""효율적인 우선순위 샘플링을 위한 합 트리"""

def __init__(self, capacity):

self.capacity = capacity

self.tree = np.zeros(2 * capacity - 1)

self.data = [None] * capacity

self.write_idx = 0

self.size = 0

def total(self):

return self.tree[0]

def add(self, priority, data):

idx = self.write_idx + self.capacity - 1

self.data[self.write_idx] = data

self._update(idx, priority)

self.write_idx = (self.write_idx + 1) % self.capacity

self.size = min(self.size + 1, self.capacity)

def _update(self, idx, priority):

change = priority - self.tree[idx]

self.tree[idx] = priority

while idx > 0:

idx = (idx - 1) // 2

self.tree[idx] += change

def get(self, value):

"""값에 해당하는 데이터를 검색"""

idx = 0

while idx < self.capacity - 1:

left = 2 * idx + 1

right = left + 1

if value <= self.tree[left]:

idx = left

else:

value -= self.tree[left]

idx = right

data_idx = idx - self.capacity + 1

return idx, self.tree[idx], self.data[data_idx]

class PrioritizedReplayBuffer:

"""우선순위 경험 리플레이 버퍼"""

def __init__(self, capacity, alpha=0.6, beta_start=0.4, beta_frames=100000):

self.tree = SumTree(capacity)

self.alpha = alpha # 우선순위의 지수 (0=균일, 1=완전 우선순위)

self.beta_start = beta_start

self.beta_frames = beta_frames

self.frame = 0

self.max_priority = 1.0

def get_beta(self):

"""중요도 샘플링 보정 계수 (점차 1로 증가)"""

beta = self.beta_start + (1.0 - self.beta_start) * \

min(1.0, self.frame / self.beta_frames)

self.frame += 1

return beta

def push(self, state, action, reward, next_state, done):

data = (state, action, reward, next_state, done)

priority = self.max_priority ** self.alpha

self.tree.add(priority, data)

def sample(self, batch_size):

beta = self.get_beta()

batch = []

indices = []

priorities = []

segment = self.tree.total() / batch_size

for i in range(batch_size):

low = segment * i

high = segment * (i + 1)

value = np.random.uniform(low, high)

idx, priority, data = self.tree.get(value)

batch.append(data)

indices.append(idx)

priorities.append(priority)

중요도 샘플링 가중치 계산

probs = np.array(priorities) / self.tree.total()

weights = (self.tree.size * probs) ** (-beta)

weights = weights / weights.max()

states, actions, rewards, next_states, dones = zip(*batch)

return (

np.array(states),

np.array(actions),

np.array(rewards, dtype=np.float32),

np.array(next_states),

np.array(dones, dtype=np.bool_),

np.array(indices),

torch.tensor(weights, dtype=torch.float32),

)

def update_priorities(self, indices, td_errors):

"""TD 오차를 기반으로 우선순위 업데이트"""

for idx, td_error in zip(indices, td_errors):

priority = (abs(td_error) + 1e-6) ** self.alpha

self.max_priority = max(self.max_priority, priority)

self.tree._update(idx, priority)

5. Dueling DQN

핵심 아이디어

Q값을 **상태 가치 V(s)**와 **어드밴티지 A(s, a)**로 분리합니다.

Q(s, a) = V(s) + A(s, a) - mean_a(A(s, a))

이 분리의 장점은, 어떤 상태에서는 어떤 행동을 취하든 결과가 비슷한 경우가 있는데, 이때 V(s)만 정확히 추정하면 된다는 것입니다.

class DuelingDQN(nn.Module):

"""Dueling DQN 네트워크"""

def __init__(self, obs_size, n_actions):

super().__init__()

공유 특징 추출기

self.feature = nn.Sequential(

nn.Linear(obs_size, 128),

nn.ReLU(),

)

가치 스트림: V(s) 추정

self.value_stream = nn.Sequential(

nn.Linear(128, 128),

nn.ReLU(),

nn.Linear(128, 1),

)

어드밴티지 스트림: A(s, a) 추정

self.advantage_stream = nn.Sequential(

nn.Linear(128, 128),

nn.ReLU(),

nn.Linear(128, n_actions),

)

def forward(self, x):

features = self.feature(x)

value = self.value_stream(features)

advantage = self.advantage_stream(features)

Q = V + (A - mean(A))

q_values = value + advantage - advantage.mean(dim=-1, keepdim=True)

return q_values

6. Categorical DQN (C51)

핵심 아이디어

기존 DQN은 Q값의 기대값만 추정합니다. Categorical DQN은 Q값의 **전체 확률 분포**를 추정합니다.

class CategoricalDQN(nn.Module):

"""Categorical DQN (C51) 네트워크"""

def __init__(self, obs_size, n_actions, n_atoms=51, v_min=-10, v_max=10):

super().__init__()

self.n_actions = n_actions

self.n_atoms = n_atoms

self.v_min = v_min

self.v_max = v_max

self.delta_z = (v_max - v_min) / (n_atoms - 1)

self.register_buffer(

'support',

torch.linspace(v_min, v_max, n_atoms)

)

self.network = nn.Sequential(

nn.Linear(obs_size, 128),

nn.ReLU(),

nn.Linear(128, 128),

nn.ReLU(),

nn.Linear(128, n_actions * n_atoms),

)

def forward(self, x):

logits = self.network(x)

(batch, n_actions * n_atoms) -> (batch, n_actions, n_atoms)

logits = logits.view(-1, self.n_actions, self.n_atoms)

소프트맥스로 확률 분포 변환

probs = torch.softmax(logits, dim=-1)

return probs

def get_q_values(self, x):

"""확률 분포로부터 Q값(기대값) 계산"""

probs = self.forward(x)

Q(s, a) = sum_i p_i * z_i

q_values = (probs * self.support.unsqueeze(0).unsqueeze(0)).sum(dim=-1)

return q_values

def compute_categorical_loss(online_net, target_net, states, actions,

rewards, next_states, dones, gamma, device):

"""Categorical DQN 손실 계산"""

batch_size = len(states)

n_atoms = online_net.n_atoms

v_min = online_net.v_min

v_max = online_net.v_max

delta_z = online_net.delta_z

support = online_net.support

states_t = torch.tensor(states, dtype=torch.float32).to(device)

actions_t = torch.tensor(actions, dtype=torch.long).to(device)

rewards_t = torch.tensor(rewards, dtype=torch.float32).to(device)

next_states_t = torch.tensor(next_states, dtype=torch.float32).to(device)

dones_t = torch.tensor(dones, dtype=torch.bool).to(device)

with torch.no_grad():

다음 상태의 최적 행동 (Double DQN 스타일)

next_q = online_net.get_q_values(next_states_t)

next_actions = next_q.argmax(dim=1)

타겟 분포

next_probs = target_net(next_states_t)

next_dist = next_probs[range(batch_size), next_actions]

벨만 업데이트: T_z = r + gamma * z

t_z = rewards_t.unsqueeze(1) + gamma * support.unsqueeze(0) * (~dones_t).float().unsqueeze(1)

t_z = t_z.clamp(v_min, v_max)

원자(atom)에 투영

b = (t_z - v_min) / delta_z

l = b.floor().long()

u = b.ceil().long()

분포 투영

target_dist = torch.zeros_like(next_dist)

for i in range(batch_size):

for j in range(n_atoms):

target_dist[i, l[i, j]] += next_dist[i, j] * (u[i, j].float() - b[i, j])

target_dist[i, u[i, j]] += next_dist[i, j] * (b[i, j] - l[i, j].float())

현재 분포

current_probs = online_net(states_t)

current_dist = current_probs[range(batch_size), actions_t]

Cross-Entropy 손실

loss = -(target_dist * (current_dist + 1e-8).log()).sum(dim=-1).mean()

return loss

Rainbow: 모두 결합하기

Rainbow는 위의 6가지 기법을 모두 결합한 알고리즘입니다. 각 기법이 독립적으로 기여하며, 결합하면 시너지 효과를 냅니다.

Rainbow 구성 요소

1. **N-step returns**: 더 정확한 타겟

2. **Double DQN**: 과대추정 방지

3. **Noisy Networks**: 자동 탐색 관리

4. **Prioritized Replay**: 효율적 경험 활용

5. **Dueling Architecture**: V와 A 분리

6. **Categorical DQN**: 가치 분포 학습

class RainbowDQN(nn.Module):

"""Rainbow DQN: 모든 확장 기법 결합"""

def __init__(self, obs_size, n_actions, n_atoms=51, v_min=-10, v_max=10):

super().__init__()

self.n_actions = n_actions

self.n_atoms = n_atoms

self.v_min = v_min

self.v_max = v_max

self.register_buffer('support', torch.linspace(v_min, v_max, n_atoms))

공유 특징 추출기

self.feature = nn.Sequential(

nn.Linear(obs_size, 128),

nn.ReLU(),

)

Dueling + Noisy + Categorical

가치 스트림 (Noisy 레이어 사용)

self.value_noisy1 = NoisyLinear(128, 128)

self.value_noisy2 = NoisyLinear(128, n_atoms)

어드밴티지 스트림 (Noisy 레이어 사용)

self.advantage_noisy1 = NoisyLinear(128, 128)

self.advantage_noisy2 = NoisyLinear(128, n_actions * n_atoms)

def forward(self, x):

features = self.feature(x)

가치 스트림

value = torch.relu(self.value_noisy1(features))

value = self.value_noisy2(value).view(-1, 1, self.n_atoms)

어드밴티지 스트림

advantage = torch.relu(self.advantage_noisy1(features))

advantage = self.advantage_noisy2(advantage).view(-1, self.n_actions, self.n_atoms)

Dueling: Q = V + A - mean(A) (분포 수준에서)

q_dist = value + advantage - advantage.mean(dim=1, keepdim=True)

probs = torch.softmax(q_dist, dim=-1)

return probs

def get_q_values(self, x):

probs = self.forward(x)

q_values = (probs * self.support.unsqueeze(0).unsqueeze(0)).sum(dim=-1)

return q_values

def reset_noise(self):

self.value_noisy1.reset_noise()

self.value_noisy2.reset_noise()

self.advantage_noisy1.reset_noise()

self.advantage_noisy2.reset_noise()

Rainbow 학습 루프의 핵심

def train_rainbow_step(online_net, target_net, optimizer, buffer, n_steps, gamma, device):

"""Rainbow 한 스텝 학습"""

1. 우선순위 기반 샘플링

states, actions, rewards, next_states, dones, indices, weights = \

buffer.sample(batch_size=32)

weights = weights.to(device)

2. Categorical 손실 계산 (Double DQN 스타일)

loss = compute_categorical_loss(

online_net, target_net,

states, actions, rewards, next_states, dones,

gamma=(gamma ** n_steps), # N-step 감마

device=device,

)

3. 중요도 샘플링 가중치 적용

weighted_loss = (loss * weights).mean()

4. 역전파

optimizer.zero_grad()

weighted_loss.backward()

torch.nn.utils.clip_grad_norm_(online_net.parameters(), 10)

optimizer.step()

5. 우선순위 업데이트

td_errors = loss.detach().cpu().numpy()

buffer.update_priorities(indices, td_errors)

6. 노이즈 재생성

online_net.reset_noise()

target_net.reset_noise()

return weighted_loss.item()

각 기법의 성능 기여도

Atari 게임에서의 각 기법의 기여도를 살펴보면 다음과 같습니다.

| 기법 | 주요 효과 | 기여도 |

| ------------------ | -------------- | --------- |

| Prioritized Replay | 학습 효율 향상 | 높음 |

| N-step Returns | 더 빠른 수렴 | 높음 |

| Categorical DQN | 가치 분포 학습 | 중간~높음 |

| Dueling | 상태 가치 분리 | 중간 |

| Double DQN | 과대추정 방지 | 중간 |

| Noisy Networks | 적응적 탐색 | 중간 |

모든 기법을 결합한 Rainbow는 개별 기법보다 훨씬 뛰어난 성능을 보입니다. 특히 데이터 효율성에서 큰 향상을 보여, 같은 수의 프레임에서 훨씬 높은 점수를 달성합니다.

CartPole에서 확장 기법 비교 실험

from collections import deque

def train_and_compare(variant="basic", n_episodes=300):

"""다양한 DQN 변형을 CartPole에서 비교"""

env = gym.make("CartPole-v1")

obs_size = env.observation_space.shape[0]

n_actions = env.action_space.n

device = torch.device("cpu")

if variant == "basic":

net = nn.Sequential(

nn.Linear(obs_size, 128), nn.ReLU(),

nn.Linear(128, 128), nn.ReLU(),

nn.Linear(128, n_actions),

)

elif variant == "dueling":

net = DuelingDQN(obs_size, n_actions)

elif variant == "noisy":

net = NoisyDQN(obs_size, n_actions)

else:

net = nn.Sequential(

nn.Linear(obs_size, 128), nn.ReLU(),

nn.Linear(128, n_actions),

)

target = type(net)(obs_size, n_actions) if variant in ["dueling", "noisy"] else \

nn.Sequential(

nn.Linear(obs_size, 128), nn.ReLU(),

nn.Linear(128, 128), nn.ReLU(),

nn.Linear(128, n_actions),

)

target.load_state_dict(net.state_dict())

optimizer = torch.optim.Adam(net.parameters(), lr=1e-3)

buffer = ReplayBuffer(10000)

rewards_history = []

epsilon = 1.0 if variant != "noisy" else 0.0

for episode in range(n_episodes):

obs, _ = env.reset()

total_reward = 0

while True:

행동 선택

if variant == "noisy":

with torch.no_grad():

q = net(torch.tensor([obs], dtype=torch.float32))

action = q.argmax(dim=1).item()

elif random.random() < epsilon:

action = env.action_space.sample()

else:

with torch.no_grad():

q = net(torch.tensor([obs], dtype=torch.float32))

action = q.argmax(dim=1).item()

next_obs, reward, terminated, truncated, _ = env.step(action)

done = terminated or truncated

buffer.push(obs, action, reward, next_obs, done)

total_reward += reward

obs = next_obs

학습 (Double DQN 방식)

if len(buffer) >= 64:

s, a, r, ns, d = buffer.sample(64)

s_t = torch.tensor(s, dtype=torch.float32)

a_t = torch.tensor(a, dtype=torch.long)

r_t = torch.tensor(r, dtype=torch.float32)

ns_t = torch.tensor(ns, dtype=torch.float32)

d_t = torch.tensor(d, dtype=torch.bool)

curr_q = net(s_t).gather(1, a_t.unsqueeze(1)).squeeze(1)

with torch.no_grad():

Double DQN: 온라인으로 행동 선택, 타겟으로 평가

best_a = net(ns_t).argmax(dim=1)

next_q = target(ns_t).gather(1, best_a.unsqueeze(1)).squeeze(1)

next_q[d_t] = 0.0

tgt = r_t + 0.99 * next_q

loss = nn.MSELoss()(curr_q, tgt)

optimizer.zero_grad()

loss.backward()

optimizer.step()

if done:

break

if variant != "noisy":

epsilon = max(0.01, epsilon * 0.995)

if hasattr(net, 'reset_noise'):

net.reset_noise()

target.reset_noise()

rewards_history.append(total_reward)

if episode % 50 == 0:

mean_r = np.mean(rewards_history[-50:])

print(f"[{variant}] 에피소드 {episode}: 평균 보상={mean_r:.1f}")

if episode % 100 == 0:

target.load_state_dict(net.state_dict())

env.close()

return rewards_history

비교 실행

results_basic = train_and_compare("basic")

results_dueling = train_and_compare("dueling")

results_noisy = train_and_compare("noisy")

정리

1. **N-step DQN**: 여러 스텝의 실제 보상을 사용하여 타겟의 정확도 향상

2. **Double DQN**: 행동 선택과 가치 평가를 분리하여 과대추정 방지

3. **Noisy Networks**: 학습 가능한 노이즈로 상태별 적응적 탐색

4. **Prioritized Replay**: TD 오차가 큰 경험을 더 자주 학습

5. **Dueling DQN**: Q값을 V(s)와 A(s, a)로 분리하여 학습 효율 향상

6. **Categorical DQN**: 가치의 확률 분포를 학습하여 더 풍부한 정보 활용

7. **Rainbow**: 6가지 기법의 결합으로 최고 성능 달성

다음 글에서는 강화학습을 실제 금융 문제에 적용하여 주식 트레이딩 에이전트를 만들어 보겠습니다.

현재 단락 (1/461)

기본 DQN은 Atari 게임에서 인간 수준의 성능을 보여주었지만, 여전히 개선할 여지가 많습니다. 이번 글에서는 DQN의 성능을 크게 향상시키는 6가지 확장 기법을 살펴보겠습니다...

작성 글자: 0원문 글자: 15,527작성 단락: 0/461