- Authors

- Name
- Youngju Kim
- @fjvbn20031
개요
지금까지 다룬 대부분의 환경은 이산 행동 공간(왼쪽/오른쪽, 각 방향키 등)을 가졌다. 하지만 로봇 제어, 자율주행, 물리 시뮬레이션 같은 현실 문제에서는 연속 행동 공간이 필요하다. 관절 각도, 토크, 가속도 등은 실수 값이기 때문이다.
이 글에서는 연속 행동 공간에 대한 정책 설계, A2C 확장, DDPG(Deep Deterministic Policy Gradient), 그리고 분포 정책 그래디언트를 다룬다.
왜 연속 공간이 필요한가
이산화의 한계
연속 행동을 이산화하는 방법(예: 토크를 -1, -0.5, 0, 0.5, 1로 분할)은 간단하지만 문제가 있다:
- 차원의 저주: N개의 관절 각각을 K단계로 이산화하면 행동 공간이 K^N으로 폭발한다
- 정밀도 부족: 이산화 간격 사이의 세밀한 제어가 불가능하다
- 비효율성: 대부분의 이산 행동 조합은 무의미하다
예를 들어 6관절 로봇을 10단계로 이산화하면 10^6 = 백만 개의 행동이 생긴다.
연속 행동 공간의 표현
연속 정책은 행동의 확률 분포를 출력한다:
- 가우시안 정책: 평균과 분산을 출력하여 정규분포에서 행동을 샘플링
- 베타 정책: 유한 범위의 행동에 적합한 베타 분포 사용
- 결정적 정책: 하나의 행동 값을 직접 출력 (DDPG)
행동 공간 설계
import torch
import torch.nn as nn
import numpy as np
class ContinuousActionSpace:
"""연속 행동 공간 정의"""
def __init__(self, low, high):
self.low = np.array(low, dtype=np.float32)
self.high = np.array(high, dtype=np.float32)
self.shape = self.low.shape
def clip(self, action):
return np.clip(action, self.low, self.high)
def sample(self):
return np.random.uniform(self.low, self.high)
A2C를 연속 행동 공간에 적용
기존 A2C의 카테고리컬 분포를 가우시안 분포로 교체한다:
class ContinuousActorCritic(nn.Module):
"""연속 행동 공간용 Actor-Critic"""
def __init__(self, obs_size, act_size):
super().__init__()
self.shared = nn.Sequential(
nn.Linear(obs_size, 256),
nn.ReLU(),
nn.Linear(256, 256),
nn.ReLU(),
)
# Actor: 평균과 로그 표준편차 출력
self.mu = nn.Sequential(
nn.Linear(256, act_size),
nn.Tanh(), # 행동 범위를 -1, 1로 제한
)
# 로그 표준편차는 상태 독립 파라미터로 학습
self.log_std = nn.Parameter(torch.zeros(act_size))
# Critic
self.value = nn.Linear(256, 1)
def forward(self, obs):
features = self.shared(obs)
mu = self.mu(features)
std = self.log_std.exp()
value = self.value(features)
return mu, std, value
def get_action(self, obs):
mu, std, value = self.forward(obs)
dist = torch.distributions.Normal(mu, std)
action = dist.sample()
log_prob = dist.log_prob(action).sum(dim=-1)
return action, log_prob, value
def evaluate(self, obs, action):
mu, std, value = self.forward(obs)
dist = torch.distributions.Normal(mu, std)
log_prob = dist.log_prob(action).sum(dim=-1)
entropy = dist.entropy().sum(dim=-1)
return log_prob, entropy, value
연속 A2C 학습
def train_continuous_a2c(env, model, num_steps=2048, num_updates=1000,
gamma=0.99, lr=3e-4, entropy_coef=0.01):
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
obs = env.reset()
for update in range(num_updates):
# 경험 수집
observations = []
actions = []
rewards = []
dones = []
log_probs = []
values = []
for _ in range(num_steps):
obs_t = torch.FloatTensor(obs).unsqueeze(0)
action, log_prob, value = model.get_action(obs_t)
action_np = action.detach().numpy().flatten()
action_clipped = np.clip(action_np, env.action_space.low,
env.action_space.high)
next_obs, reward, done, _ = env.step(action_clipped)
observations.append(obs)
actions.append(action.detach())
rewards.append(reward)
dones.append(done)
log_probs.append(log_prob)
values.append(value.squeeze())
obs = next_obs if not done else env.reset()
# GAE(Generalized Advantage Estimation) 계산
advantages, returns = compute_gae(rewards, values, dones, gamma)
# 업데이트
obs_batch = torch.FloatTensor(np.array(observations))
act_batch = torch.cat(actions)
adv_batch = torch.FloatTensor(advantages)
ret_batch = torch.FloatTensor(returns)
new_log_probs, entropy, new_values = model.evaluate(
obs_batch, act_batch
)
policy_loss = -(new_log_probs * adv_batch).mean()
value_loss = (ret_batch - new_values.squeeze()).pow(2).mean()
entropy_loss = -entropy.mean()
loss = policy_loss + 0.5 * value_loss + entropy_coef * entropy_loss
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
optimizer.step()
def compute_gae(rewards, values, dones, gamma, lam=0.95):
"""Generalized Advantage Estimation"""
advantages = []
gae = 0
values_list = [v.detach().item() for v in values]
values_list.append(0) # 부트스트랩
for t in reversed(range(len(rewards))):
if dones[t]:
delta = rewards[t] - values_list[t]
gae = delta
else:
delta = rewards[t] + gamma * values_list[t+1] - values_list[t]
gae = delta + gamma * lam * gae
advantages.insert(0, gae)
returns = [a + v for a, v in zip(advantages, values_list[:-1])]
return advantages, returns
결정적 정책 그래디언트 (DDPG)
DDPG의 핵심 아이디어
DDPG(Deep Deterministic Policy Gradient) 는 DQN의 아이디어를 연속 행동 공간에 적용한 알고리즘이다:
- 결정적 정책: 확률적으로 샘플링하지 않고, 상태에 대해 하나의 행동을 직접 출력
- 경험 리플레이: DQN처럼 과거 경험을 버퍼에 저장하고 재사용
- 타겟 네트워크: 학습 안정화를 위한 소프트 업데이트
- 탐색 노이즈: 결정적 정책에 노이즈를 추가하여 탐색
네트워크 구조
class DDPGActor(nn.Module):
"""결정적 정책 네트워크"""
def __init__(self, obs_size, act_size, act_limit):
super().__init__()
self.act_limit = act_limit
self.net = nn.Sequential(
nn.Linear(obs_size, 400),
nn.ReLU(),
nn.Linear(400, 300),
nn.ReLU(),
nn.Linear(300, act_size),
nn.Tanh(),
)
def forward(self, obs):
return self.act_limit * self.net(obs)
class DDPGCritic(nn.Module):
"""행동 가치 함수 Q(s, a)"""
def __init__(self, obs_size, act_size):
super().__init__()
self.net = nn.Sequential(
nn.Linear(obs_size + act_size, 400),
nn.ReLU(),
nn.Linear(400, 300),
nn.ReLU(),
nn.Linear(300, 1),
)
def forward(self, obs, action):
x = torch.cat([obs, action], dim=-1)
return self.net(x)
탐색 노이즈: Ornstein-Uhlenbeck 프로세스
DDPG는 시간적 상관관계가 있는 OU 노이즈를 사용하여 연속적이고 부드러운 탐색을 수행한다:
class OrnsteinUhlenbeckNoise:
"""시간 상관 탐색 노이즈"""
def __init__(self, size, mu=0.0, theta=0.15, sigma=0.2):
self.size = size
self.mu = mu
self.theta = theta
self.sigma = sigma
self.state = np.ones(size) * mu
def reset(self):
self.state = np.ones(self.size) * self.mu
def sample(self):
dx = (self.theta * (self.mu - self.state)
+ self.sigma * np.random.randn(self.size))
self.state += dx
return self.state.copy()
DDPG 전체 구현
import copy
from collections import deque
import random
class ReplayBuffer:
def __init__(self, capacity=1000000):
self.buffer = deque(maxlen=capacity)
def push(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
batch = random.sample(self.buffer, batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
return (np.array(states), np.array(actions),
np.array(rewards, dtype=np.float32),
np.array(next_states),
np.array(dones, dtype=np.float32))
def __len__(self):
return len(self.buffer)
class DDPGAgent:
def __init__(self, obs_size, act_size, act_limit,
gamma=0.99, tau=0.005, lr_actor=1e-4, lr_critic=1e-3):
self.gamma = gamma
self.tau = tau
# 메인 네트워크
self.actor = DDPGActor(obs_size, act_size, act_limit)
self.critic = DDPGCritic(obs_size, act_size)
# 타겟 네트워크 (소프트 업데이트 대상)
self.target_actor = copy.deepcopy(self.actor)
self.target_critic = copy.deepcopy(self.critic)
self.actor_optimizer = torch.optim.Adam(
self.actor.parameters(), lr=lr_actor)
self.critic_optimizer = torch.optim.Adam(
self.critic.parameters(), lr=lr_critic)
self.replay_buffer = ReplayBuffer()
self.noise = OrnsteinUhlenbeckNoise(act_size)
def select_action(self, state, add_noise=True):
state_t = torch.FloatTensor(state).unsqueeze(0)
action = self.actor(state_t).detach().numpy().flatten()
if add_noise:
action += self.noise.sample()
return np.clip(action, -self.actor.act_limit, self.actor.act_limit)
def update(self, batch_size=256):
if len(self.replay_buffer) < batch_size:
return
states, actions, rewards, next_states, dones = \
self.replay_buffer.sample(batch_size)
states_t = torch.FloatTensor(states)
actions_t = torch.FloatTensor(actions)
rewards_t = torch.FloatTensor(rewards).unsqueeze(1)
next_states_t = torch.FloatTensor(next_states)
dones_t = torch.FloatTensor(dones).unsqueeze(1)
# === Critic 업데이트 ===
with torch.no_grad():
next_actions = self.target_actor(next_states_t)
target_q = self.target_critic(next_states_t, next_actions)
target_value = rewards_t + self.gamma * (1 - dones_t) * target_q
current_q = self.critic(states_t, actions_t)
critic_loss = nn.MSELoss()(current_q, target_value)
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
# === Actor 업데이트 ===
# 정책의 목표: Q(s, mu(s))를 최대화
predicted_actions = self.actor(states_t)
actor_loss = -self.critic(states_t, predicted_actions).mean()
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
# === 타겟 네트워크 소프트 업데이트 ===
self._soft_update(self.actor, self.target_actor)
self._soft_update(self.critic, self.target_critic)
def _soft_update(self, source, target):
for src_param, tgt_param in zip(source.parameters(),
target.parameters()):
tgt_param.data.copy_(
self.tau * src_param.data + (1 - self.tau) * tgt_param.data
)
분포 정책 그래디언트
D4PG: Distributed Distributional DDPG
D4PG는 DDPG에 분포 강화학습(Distributional RL)을 결합한 알고리즘이다. Q 값의 기대값 대신 전체 분포를 학습한다.
class DistributionalCritic(nn.Module):
"""분포 Q 함수: 리턴의 분포를 예측"""
def __init__(self, obs_size, act_size,
num_atoms=51, v_min=-10.0, v_max=10.0):
super().__init__()
self.num_atoms = num_atoms
self.v_min = v_min
self.v_max = v_max
self.support = torch.linspace(v_min, v_max, num_atoms)
self.delta_z = (v_max - v_min) / (num_atoms - 1)
self.net = nn.Sequential(
nn.Linear(obs_size + act_size, 400),
nn.ReLU(),
nn.Linear(400, 300),
nn.ReLU(),
nn.Linear(300, num_atoms),
)
def forward(self, obs, action):
x = torch.cat([obs, action], dim=-1)
logits = self.net(x)
probs = torch.softmax(logits, dim=-1)
return probs
def get_q_value(self, obs, action):
"""기대 Q값 계산"""
probs = self.forward(obs, action)
return (probs * self.support.unsqueeze(0)).sum(dim=-1, keepdim=True)
분포 TD 학습
def distributional_critic_loss(critic, target_critic, target_actor,
states, actions, rewards, next_states,
dones, gamma=0.99):
"""분포 크리틱의 손실 계산 (카테고리컬 프로젝션)"""
num_atoms = critic.num_atoms
v_min = critic.v_min
v_max = critic.v_max
delta_z = critic.delta_z
support = critic.support
with torch.no_grad():
next_actions = target_actor(next_states)
target_probs = target_critic(next_states, next_actions)
# 벨만 업데이트된 서포트
tz = rewards.unsqueeze(1) + gamma * (1 - dones.unsqueeze(1)) * \
support.unsqueeze(0)
tz = tz.clamp(v_min, v_max)
# 카테고리컬 프로젝션
b = (tz - v_min) / delta_z
l = b.floor().long()
u = b.ceil().long()
projected = torch.zeros_like(target_probs)
for i in range(num_atoms):
projected.scatter_add_(1, l[:, i:i+1],
target_probs[:, i:i+1] * (u[:, i:i+1].float() - b[:, i:i+1]))
projected.scatter_add_(1, u[:, i:i+1],
target_probs[:, i:i+1] * (b[:, i:i+1] - l[:, i:i+1].float()))
# 크로스 엔트로피 손실
current_logits = critic.net(torch.cat([states, actions], dim=-1))
loss = -(projected * torch.log_softmax(current_logits, dim=-1)).sum(dim=-1).mean()
return loss
실험 결과 비교
MuJoCo의 HalfCheetah-v4 환경에서의 성능 비교:
| 알고리즘 | 100만 스텝 평균 보상 | 학습 안정성 | 샘플 효율성 |
|---|---|---|---|
| A2C (연속) | 약 3000 | 중간 | 낮음 |
| DDPG | 약 8000 | 낮음 (민감) | 높음 |
| D4PG | 약 9500 | 높음 | 높음 |
DDPG는 하이퍼파라미터에 민감하지만 잘 튜닝하면 높은 성능을 보인다. D4PG는 분포 학습 덕분에 더 안정적이다.
핵심 요약
- 연속 행동 공간은 가우시안 정책(A2C) 또는 결정적 정책(DDPG)으로 처리한다
- DDPG는 DQN의 핵심 기법(리플레이 버퍼, 타겟 네트워크)을 연속 공간에 적용한다
- OU 노이즈로 시간적 상관이 있는 부드러운 탐색을 수행한다
- 분포 정책 그래디언트(D4PG)는 리턴의 분포를 학습하여 안정성을 높인다
다음 글에서는 정책 업데이트의 안정성을 보장하는 Trust Region 방법(TRPO, PPO, ACKTR) 을 다룬다.