개요
이전 글에서 살펴본 A2C(Advantage Actor-Critic)는 단일 환경에서 수집한 경험 데이터의 **상관관계(correlation)** 문제가 있었다. 연속된 상태 전이는 서로 강하게 연관되어 있어 학습 효율이 떨어진다. DQN에서는 experience replay로 이 문제를 해결했지만, on-policy 방법인 Actor-Critic에서는 다른 접근이 필요하다.
**A3C(Asynchronous Advantage Actor-Critic)** 는 여러 환경을 동시에 실행하여 데이터의 상관관계를 깨뜨리는 방법이다. 2016년 DeepMind의 Mnih 등이 제안한 이 방법은, replay buffer 없이도 안정적인 학습을 달성한다.
상관관계와 샘플 효율성
왜 상관관계가 문제인가
강화학습에서 에이전트가 한 에피소드 동안 수집하는 전이(transition)들은 시간적으로 연속되어 있다. 예를 들어 Pong 게임에서 공이 오른쪽으로 날아가는 연속 프레임은 매우 유사한 상태를 가진다. 이런 상관된 데이터로 신경망을 업데이트하면:
- 그래디언트가 특정 방향으로 편향된다
- 학습이 불안정해지고 수렴 속도가 느려진다
- 최악의 경우 학습이 발산할 수 있다
해결 접근법 비교
| 방법 | 원리 | 장점 | 단점 |
| ----------------- | ----------------------------------------- | ------------------ | ----------------- |
| Experience Replay | 과거 전이를 버퍼에 저장하고 무작위 샘플링 | 샘플 효율적 | Off-policy만 가능 |
| 병렬 환경 | 여러 환경을 동시에 실행 | On-policy 호환 | 더 많은 연산 필요 |
| A3C | 비동기 병렬 환경 + 독립 학습 | 탐색 다양성 극대화 | 구현 복잡 |
A2C에서 A3C로: 추가된 A의 의미
A2C는 여러 환경을 병렬로 실행하되 **동기적(synchronous)** 으로 경험을 모아 한꺼번에 업데이트한다. A3C는 여기에 **Asynchronous(비동기)** 를 추가한다.
A2C의 동기 방식
A2C: 모든 워커가 동기적으로 동작
class A2CAgent:
def __init__(self, num_envs, model):
self.envs = [make_env() for _ in range(num_envs)]
self.model = model # 공유 모델
def train_step(self):
1. 모든 환경에서 동시에 행동 수집
states = [env.get_state() for env in self.envs]
actions, values = self.model.predict(states)
2. 모든 환경에서 동시에 스텝 실행
rewards, next_states, dones = [], [], []
for env, action in zip(self.envs, actions):
r, ns, d = env.step(action)
rewards.append(r)
next_states.append(ns)
dones.append(d)
3. 모아서 한 번에 업데이트
self.model.update(states, actions, rewards, next_states, dones)
A3C의 비동기 방식
A3C에서는 각 워커가 **독립적으로** 환경과 상호작용하고, 자체적으로 그래디언트를 계산한 뒤 중앙 모델에 비동기적으로 반영한다.
핵심 차이점:
- 각 워커는 다른 워커를 기다리지 않는다
- 워커마다 다른 탐색 정책을 사용할 수 있다(엡실론 값 다르게 설정 등)
- 자연스럽게 탐색 다양성이 확보된다
Python 멀티프로세싱 기초
A3C를 구현하기 전에 Python의 멀티프로세싱을 이해해야 한다. GIL(Global Interpreter Lock) 때문에 스레드 기반 병렬화는 CPU 바운드 작업에 적합하지 않다.
def worker_process(worker_id, shared_model, optimizer, device):
"""각 워커 프로세스가 실행하는 함수"""
env = make_env()
local_model = ActorCritic(env.observation_space.shape[0],
env.action_space.n)
local_model.to(device)
while True:
공유 모델의 파라미터를 로컬 모델에 복사
local_model.load_state_dict(shared_model.state_dict())
로컬 환경에서 경험 수집
experiences = collect_experiences(env, local_model, n_steps=20)
로컬에서 그래디언트 계산
loss = compute_loss(local_model, experiences)
loss.backward()
공유 모델에 그래디언트 반영
for shared_param, local_param in zip(shared_model.parameters(),
local_model.parameters()):
shared_param.grad = local_param.grad
optimizer.step()
optimizer.zero_grad()
if __name__ == '__main__':
mp.set_start_method('spawn')
shared_model = ActorCritic(obs_size, act_size)
shared_model.share_memory() # 프로세스 간 메모리 공유
optimizer = SharedAdam(shared_model.parameters(), lr=1e-4)
optimizer.share_memory()
processes = []
for i in range(mp.cpu_count()):
p = mp.Process(target=worker_process,
args=(i, shared_model, optimizer, 'cpu'))
p.start()
processes.append(p)
for p in processes:
p.join()
SharedAdam 옵티마이저
Adam 옵티마이저의 모멘텀 상태도 프로세스 간 공유해야 한다:
class SharedAdam(torch.optim.Adam):
def __init__(self, params, lr=1e-3, betas=(0.9, 0.999), eps=1e-8):
super().__init__(params, lr=lr, betas=betas, eps=eps)
Adam의 내부 상태를 공유 메모리로 이동
for group in self.param_groups:
for p in group['params']:
state = self.state[p]
state['step'] = torch.zeros(1)
state['exp_avg'] = torch.zeros_like(p.data)
state['exp_avg_sq'] = torch.zeros_like(p.data)
공유 메모리 설정
state['step'].share_memory_()
state['exp_avg'].share_memory_()
state['exp_avg_sq'].share_memory_()
A3C 데이터 병렬화
데이터 병렬화(Data Parallelism)는 각 워커가 **경험 데이터를 수집**하여 중앙에 전송하고, 중앙에서 모아서 학습하는 방식이다.
from collections import namedtuple
Experience = namedtuple('Experience',
['state', 'action', 'reward', 'done', 'next_state'])
class ActorCritic(nn.Module):
def __init__(self, obs_size, act_size):
super().__init__()
self.shared = nn.Sequential(
nn.Linear(obs_size, 256),
nn.ReLU(),
)
self.policy = nn.Sequential(
nn.Linear(256, act_size),
nn.Softmax(dim=-1),
)
self.value = nn.Linear(256, 1)
def forward(self, x):
shared_out = self.shared(x)
return self.policy(shared_out), self.value(shared_out)
def data_worker(worker_id, shared_model, data_queue, num_steps=20):
"""데이터 수집 워커: 경험을 큐에 넣는다"""
env = make_env()
state = env.reset()
while True:
공유 모델 파라미터 동기화
local_model = ActorCritic(env.observation_space.shape[0],
env.action_space.n)
local_model.load_state_dict(shared_model.state_dict())
experiences = []
for _ in range(num_steps):
state_t = torch.FloatTensor(state)
probs, _ = local_model(state_t.unsqueeze(0))
action = torch.multinomial(probs, 1).item()
next_state, reward, done, _ = env.step(action)
experiences.append(Experience(state, action, reward, done, next_state))
state = next_state
if done:
state = env.reset()
수집한 경험을 큐에 전송
data_queue.put(experiences)
A3C 그래디언트 병렬화
그래디언트 병렬화(Gradient Parallelism)는 각 워커가 **경험 수집과 그래디언트 계산**을 모두 수행하고, 계산된 그래디언트를 중앙 모델에 직접 적용하는 방식이다. 이것이 원래 A3C 논문의 방식이다.
def gradient_worker(worker_id, shared_model, optimizer, counter, lock,
max_episodes=10000, gamma=0.99, entropy_beta=0.01):
"""그래디언트 계산 워커: 로컬에서 그래디언트까지 계산"""
env = make_env()
local_model = ActorCritic(env.observation_space.shape[0],
env.action_space.n)
state = env.reset()
episode_reward = 0.0
while True:
공유 모델 동기화
local_model.load_state_dict(shared_model.state_dict())
log_probs = []
values = []
rewards = []
entropies = []
for _ in range(20): # n-step
state_t = torch.FloatTensor(state).unsqueeze(0)
probs, value = local_model(state_t)
dist = torch.distributions.Categorical(probs)
action = dist.sample()
log_prob = dist.log_prob(action)
entropy = dist.entropy()
next_state, reward, done, _ = env.step(action.item())
log_probs.append(log_prob)
values.append(value.squeeze())
rewards.append(reward)
entropies.append(entropy)
episode_reward += reward
state = next_state
if done:
state = env.reset()
with lock:
counter.value += 1
episode_reward = 0.0
break
부트스트랩 값 계산
if done:
R = torch.tensor(0.0)
else:
_, R = local_model(torch.FloatTensor(state).unsqueeze(0))
R = R.squeeze().detach()
역방향으로 리턴 계산 및 손실 계산
policy_loss = 0.0
value_loss = 0.0
entropy_loss = 0.0
for i in reversed(range(len(rewards))):
R = rewards[i] + gamma * R
advantage = R - values[i].detach()
policy_loss -= log_probs[i] * advantage
value_loss += 0.5 * (R - values[i]) ** 2
entropy_loss -= entropies[i]
total_loss = policy_loss + value_loss + entropy_beta * entropy_loss
로컬 그래디언트 계산
optimizer.zero_grad()
total_loss.backward()
그래디언트 클리핑
torch.nn.utils.clip_grad_norm_(local_model.parameters(), 40.0)
공유 모델에 그래디언트 전달 및 업데이트
for shared_param, local_param in zip(shared_model.parameters(),
local_model.parameters()):
if shared_param.grad is None:
shared_param.grad = local_param.grad.clone()
else:
shared_param.grad.copy_(local_param.grad)
optimizer.step()
전체 A3C 학습 루프
def train_a3c(env_name='CartPole-v1', num_workers=4, max_episodes=5000):
"""A3C 메인 학습 함수"""
env = make_env()
obs_size = env.observation_space.shape[0]
act_size = env.action_space.n
env.close()
shared_model = ActorCritic(obs_size, act_size)
shared_model.share_memory()
optimizer = SharedAdam(shared_model.parameters(), lr=1e-4)
optimizer.share_memory()
counter = mp.Value('i', 0)
lock = mp.Lock()
processes = []
for i in range(num_workers):
p = mp.Process(
target=gradient_worker,
args=(i, shared_model, optimizer, counter, lock, max_episodes)
)
p.start()
processes.append(p)
모니터링 프로세스
while counter.value < max_episodes:
time.sleep(10)
print(f"완료 에피소드: {counter.value}/{max_episodes}")
for p in processes:
p.terminate()
p.join()
return shared_model
if __name__ == '__main__':
mp.set_start_method('spawn')
model = train_a3c()
torch.save(model.state_dict(), 'a3c_model.pth')
데이터 병렬화 vs 그래디언트 병렬화 비교
| 항목 | 데이터 병렬화 | 그래디언트 병렬화 |
| ----------- | ------------------------------ | --------------------------- |
| 워커 역할 | 경험 수집만 | 경험 수집 + 그래디언트 계산 |
| 통신 내용 | 전이 데이터 (상태, 행동, 보상) | 그래디언트 텐서 |
| 통신량 | 상태 크기에 비례 | 모델 파라미터 수에 비례 |
| 중앙 부담 | 학습 연산 집중 | 업데이트만 수행 |
| 구현 난이도 | 상대적으로 쉬움 | 공유 메모리 관리 필요 |
| 확장성 | 중앙 병목 가능 | 워커 수에 따라 선형 확장 |
실험 결과 비교
CartPole-v1 환경에서의 학습 성능:
- **A2C (단일 환경)**: 약 500 에피소드에서 수렴
- **A2C (8 병렬 환경)**: 약 200 에피소드에서 수렴
- **A3C (8 워커, 데이터 병렬)**: 약 150 에피소드에서 수렴
- **A3C (8 워커, 그래디언트 병렬)**: 약 120 에피소드에서 수렴
비동기 업데이트로 인한 약간의 노이즈가 있지만, 탐색 다양성 덕분에 전체적으로 더 빠르게 수렴하는 경향을 보인다.
실전 팁과 주의사항
1. **워커 수 선택**: CPU 코어 수와 동일하게 설정하는 것이 일반적이다. GPU 사용 시에는 워커 수를 줄이고 배치 크기를 늘리는 것이 효율적이다.
2. **비동기 업데이트의 불안정성**: 워커 간 모델 버전 차이(staleness)가 클수록 학습이 불안정해진다. 그래디언트 클리핑이 필수적이다.
3. **A2C vs A3C 선택 기준**: GPU를 사용한다면 A2C가 더 효율적인 경우가 많다. 벡터화된 환경을 사용하면 A2C도 충분한 탐색 다양성을 확보할 수 있다. A3C는 CPU 기반 학습에서 장점이 크다.
4. **디버깅**: 비동기 프로그램은 디버깅이 어렵다. 먼저 단일 워커로 정상 동작을 확인한 뒤 워커 수를 늘리는 것이 좋다.
핵심 요약
- A3C는 비동기 병렬 학습으로 데이터 상관관계 문제를 해결한다
- 데이터 병렬화는 경험 수집을 분산하고, 그래디언트 병렬화는 연산도 분산한다
- Python의 multiprocessing과 shared memory를 활용하여 구현한다
- 최근에는 GPU 효율성 때문에 동기 방식(A2C)이나 PPO가 더 선호되는 추세이다
다음 글에서는 강화학습을 자연어 처리에 적용하여 **챗봇을 훈련하는 방법**을 알아보겠다.
현재 단락 (1/227)
이전 글에서 살펴본 A2C(Advantage Actor-Critic)는 단일 환경에서 수집한 경험 데이터의 **상관관계(correlation)** 문제가 있었다. 연속된 상태 전이...