- Authors

- Name
- Youngju Kim
- @fjvbn20031
가치 기반 vs 정책 기반
지금까지 다룬 DQN 계열의 방법은 가치 기반(value-based) 접근법이었습니다. Q 함수를 학습하고, Q값이 가장 높은 행동을 선택하는 간접적인 방식입니다.
정책 기반(policy-based) 방법은 정책을 직접 파라미터화하고 최적화합니다. 정책 네트워크 pi(a|s; theta)가 각 상태에서 행동의 확률 분포를 출력합니다.
가치 기반의 한계
- 이산 행동에 한정: DQN은 연속 행동 공간에 직접 적용하기 어렵습니다
- 결정적 정책: Q값의 argmax를 취하므로 확률적 정책을 자연스럽게 표현하기 어렵습니다
- 수렴 불안정: 가치 함수의 작은 변화가 정책의 급격한 변화를 야기할 수 있습니다
정책 기반의 장점
- 연속 행동 공간: 가우시안 정책 등으로 자연스럽게 연속 행동을 다룰 수 있습니다
- 확률적 정책: 탐색이 정책에 내장되어 있어 별도의 엡실론 스케줄이 불필요합니다
- 수렴 보장: 지역 최적해로의 수렴이 이론적으로 보장됩니다 (적절한 학습률 하에)
- 부분 관찰 환경: 확률적 정책이 부분 관찰 문제에서 더 자연스럽습니다
정책 표현 (Policy Representation)
이산 행동: 소프트맥스 정책
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
class DiscretePolicyNetwork(nn.Module):
"""이산 행동 공간을 위한 정책 네트워크"""
def __init__(self, obs_size, n_actions, hidden_size=128):
super().__init__()
self.net = nn.Sequential(
nn.Linear(obs_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, n_actions),
)
def forward(self, x):
logits = self.net(x)
return logits
def get_action_prob(self, state):
"""행동 확률 분포 반환"""
logits = self.forward(state)
probs = F.softmax(logits, dim=-1)
return probs
def select_action(self, state):
"""확률에 따라 행동 샘플링"""
probs = self.get_action_prob(state)
dist = torch.distributions.Categorical(probs)
action = dist.sample()
log_prob = dist.log_prob(action)
return action.item(), log_prob
연속 행동: 가우시안 정책
class ContinuousPolicyNetwork(nn.Module):
"""연속 행동 공간을 위한 가우시안 정책 네트워크"""
def __init__(self, obs_size, action_size, hidden_size=128):
super().__init__()
self.shared = nn.Sequential(
nn.Linear(obs_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
)
# 평균과 로그 분산을 각각 출력
self.mean_head = nn.Linear(hidden_size, action_size)
self.log_std_head = nn.Linear(hidden_size, action_size)
def forward(self, x):
features = self.shared(x)
mean = self.mean_head(features)
log_std = self.log_std_head(features).clamp(-20, 2)
std = log_std.exp()
return mean, std
def select_action(self, state):
"""가우시안 분포에서 행동 샘플링"""
mean, std = self.forward(state)
dist = torch.distributions.Normal(mean, std)
action = dist.sample()
log_prob = dist.log_prob(action).sum(dim=-1)
return action.detach().numpy(), log_prob
Policy Gradient 유도
목표 함수
정책 기반 방법의 목표는 기대 누적 보상을 최대화하는 것입니다.
J(theta) = E_pi[ sum_t gamma^t * r_t ]
이 목표 함수를 theta에 대해 미분하여 그래디언트를 구해야 합니다.
Policy Gradient Theorem
핵심 결과는 다음과 같습니다.
grad J(theta) = E_pi[ sum_t grad log pi(a_t | s_t; theta) * G_t ]
여기서 G_t는 시점 t부터의 할인 누적 보상입니다.
이 정리의 의미를 직관적으로 설명하면 다음과 같습니다.
- 높은 보상을 받은 행동: log pi의 그래디언트 방향으로 파라미터를 업데이트하여 해당 행동의 확률을 높임
- 낮은 보상을 받은 행동: 반대 방향으로 업데이트하여 해당 행동의 확률을 낮춤
유도 과정의 핵심
유도의 핵심 트릭은 "로그 미분 트릭(log-derivative trick)"입니다.
grad pi(a|s; theta) = pi(a|s; theta) * grad log pi(a|s; theta)
이를 통해 기대값 형태로 변환할 수 있고, 샘플링을 통한 근사가 가능해집니다.
REINFORCE 알고리즘
REINFORCE는 가장 기본적인 Policy Gradient 알고리즘입니다. 몬테카를로 방식으로 전체 에피소드를 수집한 후 업데이트합니다.
알고리즘 의사 코드
1. 정책 네트워크 pi(a|s; theta) 초기화
2. 반복:
a. 현재 정책으로 에피소드 하나를 수집
- 각 스텝에서 (s_t, a_t, r_t, log pi(a_t|s_t))를 기록
b. 각 시점의 할인 누적 보상 G_t를 계산
c. 정책 그래디언트 계산:
loss = -sum_t log pi(a_t|s_t) * G_t
d. 역전파로 theta 업데이트
CartPole REINFORCE 구현
import gymnasium as gym
import torch
import torch.optim as optim
def train_reinforce_cartpole():
"""REINFORCE로 CartPole 학습"""
env = gym.make("CartPole-v1")
obs_size = env.observation_space.shape[0]
n_actions = env.action_space.n
policy = DiscretePolicyNetwork(obs_size, n_actions, hidden_size=128)
optimizer = optim.Adam(policy.parameters(), lr=0.001)
gamma = 0.99
rewards_history = []
for episode in range(1000):
# 에피소드 수집
log_probs = []
rewards = []
obs, _ = env.reset()
while True:
obs_tensor = torch.tensor([obs], dtype=torch.float32)
action, log_prob = policy.select_action(obs_tensor)
next_obs, reward, terminated, truncated, _ = env.step(action)
log_probs.append(log_prob)
rewards.append(reward)
obs = next_obs
if terminated or truncated:
break
# 할인 누적 보상 계산
returns = []
G = 0
for r in reversed(rewards):
G = r + gamma * G
returns.insert(0, G)
returns = torch.tensor(returns, dtype=torch.float32)
# 리턴 정규화 (분산 감소)
if len(returns) > 1:
returns = (returns - returns.mean()) / (returns.std() + 1e-8)
# 정책 그래디언트 손실
log_probs_tensor = torch.stack(log_probs)
policy_loss = -(log_probs_tensor * returns).sum()
# 업데이트
optimizer.zero_grad()
policy_loss.backward()
optimizer.step()
total_reward = sum(rewards)
rewards_history.append(total_reward)
if episode % 50 == 0:
mean_reward = np.mean(rewards_history[-50:])
print(f"에피소드 {episode}: 보상={total_reward:.0f}, 평균={mean_reward:.1f}")
if mean_reward >= 475:
print(f"에피소드 {episode}에서 해결!")
break
env.close()
return policy, rewards_history
# policy, history = train_reinforce_cartpole()
베이스라인을 이용한 분산 감소
문제: 높은 분산
기본 REINFORCE의 그래디언트 추정은 분산이 매우 높습니다. 하나의 에피소드에서 계산한 그래디언트가 매우 노이지하여 학습이 불안정합니다.
해결: 베이스라인 함수
그래디언트에 상수 베이스라인 b를 빼도 기대값은 변하지 않지만 분산은 줄어듭니다.
grad J(theta) = E_pi[ sum_t grad log pi(a_t|s_t; theta) * (G_t - b) ]
가장 일반적인 베이스라인은 **상태 가치 함수 V(s)**입니다.
class PolicyWithBaseline(nn.Module):
"""베이스라인이 있는 정책 네트워크"""
def __init__(self, obs_size, n_actions, hidden_size=128):
super().__init__()
# 공유 특징 추출기
self.shared = nn.Sequential(
nn.Linear(obs_size, hidden_size),
nn.ReLU(),
)
# 정책 헤드
self.policy_head = nn.Sequential(
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, n_actions),
)
# 가치 헤드 (베이스라인)
self.value_head = nn.Sequential(
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, 1),
)
def forward(self, x):
features = self.shared(x)
logits = self.policy_head(features)
value = self.value_head(features)
return logits, value
def select_action(self, state):
logits, value = self.forward(state)
probs = F.softmax(logits, dim=-1)
dist = torch.distributions.Categorical(probs)
action = dist.sample()
log_prob = dist.log_prob(action)
return action.item(), log_prob, value
베이스라인을 사용한 REINFORCE
def train_reinforce_with_baseline(n_episodes=1000):
"""베이스라인을 사용한 REINFORCE"""
env = gym.make("CartPole-v1")
obs_size = env.observation_space.shape[0]
n_actions = env.action_space.n
model = PolicyWithBaseline(obs_size, n_actions)
optimizer = optim.Adam(model.parameters(), lr=0.001)
gamma = 0.99
rewards_history = []
for episode in range(n_episodes):
log_probs = []
values = []
rewards = []
obs, _ = env.reset()
while True:
obs_tensor = torch.tensor([obs], dtype=torch.float32)
action, log_prob, value = model.select_action(obs_tensor)
next_obs, reward, terminated, truncated, _ = env.step(action)
log_probs.append(log_prob)
values.append(value.squeeze())
rewards.append(reward)
obs = next_obs
if terminated or truncated:
break
# 할인 누적 보상 계산
returns = []
G = 0
for r in reversed(rewards):
G = r + gamma * G
returns.insert(0, G)
returns = torch.tensor(returns, dtype=torch.float32)
log_probs_t = torch.stack(log_probs)
values_t = torch.stack(values)
# 어드밴티지 = 리턴 - 베이스라인(가치 추정)
advantages = returns - values_t.detach()
# 정규화
if len(advantages) > 1:
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
# 정책 손실
policy_loss = -(log_probs_t * advantages).sum()
# 가치 손실 (베이스라인 학습)
value_loss = F.mse_loss(values_t, returns)
# 전체 손실
total_loss = policy_loss + 0.5 * value_loss
optimizer.zero_grad()
total_loss.backward()
optimizer.step()
total_reward = sum(rewards)
rewards_history.append(total_reward)
if episode % 50 == 0:
mean_reward = np.mean(rewards_history[-50:])
print(f"에피소드 {episode}: 보상={total_reward:.0f}, 평균={mean_reward:.1f}")
if mean_reward >= 475:
print(f"에피소드 {episode}에서 해결!")
break
env.close()
return model, rewards_history
# model, history = train_reinforce_with_baseline()
Pong에서의 REINFORCE
Atari Pong에 REINFORCE를 적용하려면 CNN 기반 정책 네트워크를 사용합니다.
class PongPolicy(nn.Module):
"""Pong용 CNN 정책 네트워크"""
def __init__(self, input_channels=4, n_actions=6):
super().__init__()
self.conv = nn.Sequential(
nn.Conv2d(input_channels, 32, kernel_size=8, stride=4),
nn.ReLU(),
nn.Conv2d(32, 64, kernel_size=4, stride=2),
nn.ReLU(),
nn.Conv2d(64, 64, kernel_size=3, stride=1),
nn.ReLU(),
)
conv_out_size = self._get_conv_out(input_channels)
self.fc = nn.Sequential(
nn.Linear(conv_out_size, 512),
nn.ReLU(),
nn.Linear(512, n_actions),
)
def _get_conv_out(self, channels):
o = self.conv(torch.zeros(1, channels, 84, 84))
return int(np.prod(o.size()))
def forward(self, x):
x = x.float() / 255.0
conv_out = self.conv(x).view(x.size(0), -1)
logits = self.fc(conv_out)
return logits
def select_action(self, state):
logits = self.forward(state)
probs = F.softmax(logits, dim=-1)
dist = torch.distributions.Categorical(probs)
action = dist.sample()
return action.item(), dist.log_prob(action)
def train_reinforce_pong(n_episodes=5000):
"""REINFORCE로 Pong 학습"""
# make_atari_env는 06장에서 정의한 전처리 함수
# env = make_atari_env("ALE/Pong-v5")
# 여기서는 의사 코드로 작성
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
policy = PongPolicy(input_channels=4, n_actions=6).to(device)
optimizer = optim.Adam(policy.parameters(), lr=1e-4)
gamma = 0.99
for episode in range(n_episodes):
log_probs = []
rewards = []
# obs, _ = env.reset()
# while True:
# obs_t = torch.tensor([obs], dtype=torch.uint8).to(device)
# action, log_prob = policy.select_action(obs_t)
# next_obs, reward, terminated, truncated, _ = env.step(action)
# log_probs.append(log_prob)
# rewards.append(reward)
# obs = next_obs
# if terminated or truncated:
# break
# 할인 누적 보상 계산 및 업데이트
# (CartPole과 동일한 방식)
pass
# REINFORCE는 Pong에서 학습이 매우 느림 (수천 에피소드 필요)
# Actor-Critic 방법이 더 효율적
print("REINFORCE는 Pong에서 수천 에피소드가 필요합니다")
print("다음 글에서 다룰 Actor-Critic이 더 효율적입니다")
탐색 문제와 엔트로피 보너스
조기 수렴 문제
정책 기반 방법은 높은 보상을 받은 행동의 확률을 빠르게 높이기 때문에, 충분히 탐색하기 전에 차선의 정책으로 수렴할 수 있습니다.
엔트로피 보너스
정책의 엔트로피를 손실 함수에 추가하면 탐색을 장려할 수 있습니다.
total_loss = policy_loss + value_loss_coef * value_loss - entropy_coef * entropy
엔트로피가 높다는 것은 행동 확률이 균등하다는 의미이므로, 엔트로피를 최대화하면 탐색을 촉진합니다.
def compute_entropy_loss(logits):
"""정책 엔트로피 계산"""
probs = F.softmax(logits, dim=-1)
log_probs = F.log_softmax(logits, dim=-1)
entropy = -(probs * log_probs).sum(dim=-1)
return entropy.mean()
def compute_total_loss(log_probs, advantages, values, returns, logits,
value_loss_coef=0.5, entropy_coef=0.01):
"""정책 손실 + 가치 손실 - 엔트로피 보너스"""
# 정책 손실
policy_loss = -(log_probs * advantages.detach()).mean()
# 가치 손실
value_loss = F.mse_loss(values, returns)
# 엔트로피 보너스 (탐색 장려)
entropy = compute_entropy_loss(logits)
total_loss = policy_loss + value_loss_coef * value_loss - entropy_coef * entropy
return total_loss, policy_loss.item(), value_loss.item(), entropy.item()
분산 문제의 직관적 이해
왜 REINFORCE의 분산이 높은가
에피소드 중간에 좋은 행동과 나쁜 행동이 섞여 있을 때, 전체 에피소드의 리턴이 높으면 나쁜 행동도 강화되고, 리턴이 낮으면 좋은 행동도 약화됩니다.
def illustrate_variance_problem():
"""분산 문제를 시각적으로 설명하는 예시"""
# 에피소드 1: 나쁜 행동 이후 운 좋게 높은 보상
ep1_actions = ["나쁜 행동", "좋은 행동", "나쁜 행동"]
ep1_rewards = [-1, 10, -1]
ep1_return = sum(ep1_rewards) # 8
print(f"에피소드 1: 리턴={ep1_return}")
print(" -> 나쁜 행동들도 리턴 8로 강화됨 (문제!)")
# 에피소드 2: 좋은 행동이지만 운 나쁘게 낮은 보상
ep2_actions = ["좋은 행동", "좋은 행동", "좋은 행동"]
ep2_rewards = [1, 1, -5]
ep2_return = sum(ep2_rewards) # -3
print(f"\n에피소드 2: 리턴={ep2_return}")
print(" -> 좋은 행동들이 리턴 -3으로 약화됨 (문제!)")
# 베이스라인 사용 시
baseline = (ep1_return + ep2_return) / 2 # 2.5
print(f"\n베이스라인: {baseline}")
print(f"에피소드 1 어드밴티지: {ep1_return - baseline:.1f}")
print(f"에피소드 2 어드밴티지: {ep2_return - baseline:.1f}")
print(" -> 상대적 차이로 업데이트하여 분산 감소")
# illustrate_variance_problem()
분산 감소 기법 비교
| 기법 | 설명 | 분산 감소 효과 |
|---|---|---|
| 리턴 정규화 | G_t를 평균 0, 분산 1로 정규화 | 중간 |
| 베이스라인 | G_t - V(s_t)로 어드밴티지 사용 | 높음 |
| 시간 의존 베이스라인 | 미래 보상만 고려 | 높음 |
| GAE | 여러 스텝의 어드밴티지를 가중 평균 | 매우 높음 |
정리
- 정책 기반 방법: 정책을 직접 파라미터화하고 최적화하는 접근법
- Policy Gradient Theorem: 기대 보상의 그래디언트를 로그 확률과 리턴의 곱으로 표현
- REINFORCE: 가장 기본적인 몬테카를로 Policy Gradient 알고리즘
- 베이스라인: 가치 함수를 베이스라인으로 사용하여 분산을 크게 감소
- 엔트로피 보너스: 정책의 엔트로피를 높여 조기 수렴 방지
- 한계: 높은 분산, 온폴리시 학습으로 인한 낮은 데이터 효율
REINFORCE는 CartPole 같은 간단한 환경에서는 잘 작동하지만, Pong 같은 복잡한 환경에서는 학습이 매우 느립니다. 다음 글에서는 이 문제를 해결하는 Actor-Critic 방법을 다루겠습니다.