- Authors

- Name
- Youngju Kim
- @fjvbn20031
개요
이전 글에서 살펴본 A2C(Advantage Actor-Critic)는 단일 환경에서 수집한 경험 데이터의 상관관계(correlation) 문제가 있었다. 연속된 상태 전이는 서로 강하게 연관되어 있어 학습 효율이 떨어진다. DQN에서는 experience replay로 이 문제를 해결했지만, on-policy 방법인 Actor-Critic에서는 다른 접근이 필요하다.
A3C(Asynchronous Advantage Actor-Critic) 는 여러 환경을 동시에 실행하여 데이터의 상관관계를 깨뜨리는 방법이다. 2016년 DeepMind의 Mnih 등이 제안한 이 방법은, replay buffer 없이도 안정적인 학습을 달성한다.
상관관계와 샘플 효율성
왜 상관관계가 문제인가
강화학습에서 에이전트가 한 에피소드 동안 수집하는 전이(transition)들은 시간적으로 연속되어 있다. 예를 들어 Pong 게임에서 공이 오른쪽으로 날아가는 연속 프레임은 매우 유사한 상태를 가진다. 이런 상관된 데이터로 신경망을 업데이트하면:
- 그래디언트가 특정 방향으로 편향된다
- 학습이 불안정해지고 수렴 속도가 느려진다
- 최악의 경우 학습이 발산할 수 있다
해결 접근법 비교
| 방법 | 원리 | 장점 | 단점 |
|---|---|---|---|
| Experience Replay | 과거 전이를 버퍼에 저장하고 무작위 샘플링 | 샘플 효율적 | Off-policy만 가능 |
| 병렬 환경 | 여러 환경을 동시에 실행 | On-policy 호환 | 더 많은 연산 필요 |
| A3C | 비동기 병렬 환경 + 독립 학습 | 탐색 다양성 극대화 | 구현 복잡 |
A2C에서 A3C로: 추가된 A의 의미
A2C는 여러 환경을 병렬로 실행하되 동기적(synchronous) 으로 경험을 모아 한꺼번에 업데이트한다. A3C는 여기에 Asynchronous(비동기) 를 추가한다.
A2C의 동기 방식
# A2C: 모든 워커가 동기적으로 동작
class A2CAgent:
def __init__(self, num_envs, model):
self.envs = [make_env() for _ in range(num_envs)]
self.model = model # 공유 모델
def train_step(self):
# 1. 모든 환경에서 동시에 행동 수집
states = [env.get_state() for env in self.envs]
actions, values = self.model.predict(states)
# 2. 모든 환경에서 동시에 스텝 실행
rewards, next_states, dones = [], [], []
for env, action in zip(self.envs, actions):
r, ns, d = env.step(action)
rewards.append(r)
next_states.append(ns)
dones.append(d)
# 3. 모아서 한 번에 업데이트
self.model.update(states, actions, rewards, next_states, dones)
A3C의 비동기 방식
A3C에서는 각 워커가 독립적으로 환경과 상호작용하고, 자체적으로 그래디언트를 계산한 뒤 중앙 모델에 비동기적으로 반영한다.
핵심 차이점:
- 각 워커는 다른 워커를 기다리지 않는다
- 워커마다 다른 탐색 정책을 사용할 수 있다(엡실론 값 다르게 설정 등)
- 자연스럽게 탐색 다양성이 확보된다
Python 멀티프로세싱 기초
A3C를 구현하기 전에 Python의 멀티프로세싱을 이해해야 한다. GIL(Global Interpreter Lock) 때문에 스레드 기반 병렬화는 CPU 바운드 작업에 적합하지 않다.
import torch.multiprocessing as mp
def worker_process(worker_id, shared_model, optimizer, device):
"""각 워커 프로세스가 실행하는 함수"""
env = make_env()
local_model = ActorCritic(env.observation_space.shape[0],
env.action_space.n)
local_model.to(device)
while True:
# 공유 모델의 파라미터를 로컬 모델에 복사
local_model.load_state_dict(shared_model.state_dict())
# 로컬 환경에서 경험 수집
experiences = collect_experiences(env, local_model, n_steps=20)
# 로컬에서 그래디언트 계산
loss = compute_loss(local_model, experiences)
loss.backward()
# 공유 모델에 그래디언트 반영
for shared_param, local_param in zip(shared_model.parameters(),
local_model.parameters()):
shared_param.grad = local_param.grad
optimizer.step()
optimizer.zero_grad()
if __name__ == '__main__':
mp.set_start_method('spawn')
shared_model = ActorCritic(obs_size, act_size)
shared_model.share_memory() # 프로세스 간 메모리 공유
optimizer = SharedAdam(shared_model.parameters(), lr=1e-4)
optimizer.share_memory()
processes = []
for i in range(mp.cpu_count()):
p = mp.Process(target=worker_process,
args=(i, shared_model, optimizer, 'cpu'))
p.start()
processes.append(p)
for p in processes:
p.join()
SharedAdam 옵티마이저
Adam 옵티마이저의 모멘텀 상태도 프로세스 간 공유해야 한다:
import torch
class SharedAdam(torch.optim.Adam):
def __init__(self, params, lr=1e-3, betas=(0.9, 0.999), eps=1e-8):
super().__init__(params, lr=lr, betas=betas, eps=eps)
# Adam의 내부 상태를 공유 메모리로 이동
for group in self.param_groups:
for p in group['params']:
state = self.state[p]
state['step'] = torch.zeros(1)
state['exp_avg'] = torch.zeros_like(p.data)
state['exp_avg_sq'] = torch.zeros_like(p.data)
# 공유 메모리 설정
state['step'].share_memory_()
state['exp_avg'].share_memory_()
state['exp_avg_sq'].share_memory_()
A3C 데이터 병렬화
데이터 병렬화(Data Parallelism)는 각 워커가 경험 데이터를 수집하여 중앙에 전송하고, 중앙에서 모아서 학습하는 방식이다.
import torch
import torch.nn as nn
import torch.multiprocessing as mp
from collections import namedtuple
Experience = namedtuple('Experience',
['state', 'action', 'reward', 'done', 'next_state'])
class ActorCritic(nn.Module):
def __init__(self, obs_size, act_size):
super().__init__()
self.shared = nn.Sequential(
nn.Linear(obs_size, 256),
nn.ReLU(),
)
self.policy = nn.Sequential(
nn.Linear(256, act_size),
nn.Softmax(dim=-1),
)
self.value = nn.Linear(256, 1)
def forward(self, x):
shared_out = self.shared(x)
return self.policy(shared_out), self.value(shared_out)
def data_worker(worker_id, shared_model, data_queue, num_steps=20):
"""데이터 수집 워커: 경험을 큐에 넣는다"""
env = make_env()
state = env.reset()
while True:
# 공유 모델 파라미터 동기화
local_model = ActorCritic(env.observation_space.shape[0],
env.action_space.n)
local_model.load_state_dict(shared_model.state_dict())
experiences = []
for _ in range(num_steps):
state_t = torch.FloatTensor(state)
probs, _ = local_model(state_t.unsqueeze(0))
action = torch.multinomial(probs, 1).item()
next_state, reward, done, _ = env.step(action)
experiences.append(Experience(state, action, reward, done, next_state))
state = next_state
if done:
state = env.reset()
# 수집한 경험을 큐에 전송
data_queue.put(experiences)
A3C 그래디언트 병렬화
그래디언트 병렬화(Gradient Parallelism)는 각 워커가 경험 수집과 그래디언트 계산을 모두 수행하고, 계산된 그래디언트를 중앙 모델에 직접 적용하는 방식이다. 이것이 원래 A3C 논문의 방식이다.
def gradient_worker(worker_id, shared_model, optimizer, counter, lock,
max_episodes=10000, gamma=0.99, entropy_beta=0.01):
"""그래디언트 계산 워커: 로컬에서 그래디언트까지 계산"""
env = make_env()
local_model = ActorCritic(env.observation_space.shape[0],
env.action_space.n)
state = env.reset()
episode_reward = 0.0
while True:
# 공유 모델 동기화
local_model.load_state_dict(shared_model.state_dict())
log_probs = []
values = []
rewards = []
entropies = []
for _ in range(20): # n-step
state_t = torch.FloatTensor(state).unsqueeze(0)
probs, value = local_model(state_t)
dist = torch.distributions.Categorical(probs)
action = dist.sample()
log_prob = dist.log_prob(action)
entropy = dist.entropy()
next_state, reward, done, _ = env.step(action.item())
log_probs.append(log_prob)
values.append(value.squeeze())
rewards.append(reward)
entropies.append(entropy)
episode_reward += reward
state = next_state
if done:
state = env.reset()
with lock:
counter.value += 1
episode_reward = 0.0
break
# 부트스트랩 값 계산
if done:
R = torch.tensor(0.0)
else:
_, R = local_model(torch.FloatTensor(state).unsqueeze(0))
R = R.squeeze().detach()
# 역방향으로 리턴 계산 및 손실 계산
policy_loss = 0.0
value_loss = 0.0
entropy_loss = 0.0
for i in reversed(range(len(rewards))):
R = rewards[i] + gamma * R
advantage = R - values[i].detach()
policy_loss -= log_probs[i] * advantage
value_loss += 0.5 * (R - values[i]) ** 2
entropy_loss -= entropies[i]
total_loss = policy_loss + value_loss + entropy_beta * entropy_loss
# 로컬 그래디언트 계산
optimizer.zero_grad()
total_loss.backward()
# 그래디언트 클리핑
torch.nn.utils.clip_grad_norm_(local_model.parameters(), 40.0)
# 공유 모델에 그래디언트 전달 및 업데이트
for shared_param, local_param in zip(shared_model.parameters(),
local_model.parameters()):
if shared_param.grad is None:
shared_param.grad = local_param.grad.clone()
else:
shared_param.grad.copy_(local_param.grad)
optimizer.step()
전체 A3C 학습 루프
def train_a3c(env_name='CartPole-v1', num_workers=4, max_episodes=5000):
"""A3C 메인 학습 함수"""
env = make_env()
obs_size = env.observation_space.shape[0]
act_size = env.action_space.n
env.close()
shared_model = ActorCritic(obs_size, act_size)
shared_model.share_memory()
optimizer = SharedAdam(shared_model.parameters(), lr=1e-4)
optimizer.share_memory()
counter = mp.Value('i', 0)
lock = mp.Lock()
processes = []
for i in range(num_workers):
p = mp.Process(
target=gradient_worker,
args=(i, shared_model, optimizer, counter, lock, max_episodes)
)
p.start()
processes.append(p)
# 모니터링 프로세스
while counter.value < max_episodes:
import time
time.sleep(10)
print(f"완료 에피소드: {counter.value}/{max_episodes}")
for p in processes:
p.terminate()
p.join()
return shared_model
if __name__ == '__main__':
mp.set_start_method('spawn')
model = train_a3c()
torch.save(model.state_dict(), 'a3c_model.pth')
데이터 병렬화 vs 그래디언트 병렬화 비교
| 항목 | 데이터 병렬화 | 그래디언트 병렬화 |
|---|---|---|
| 워커 역할 | 경험 수집만 | 경험 수집 + 그래디언트 계산 |
| 통신 내용 | 전이 데이터 (상태, 행동, 보상) | 그래디언트 텐서 |
| 통신량 | 상태 크기에 비례 | 모델 파라미터 수에 비례 |
| 중앙 부담 | 학습 연산 집중 | 업데이트만 수행 |
| 구현 난이도 | 상대적으로 쉬움 | 공유 메모리 관리 필요 |
| 확장성 | 중앙 병목 가능 | 워커 수에 따라 선형 확장 |
실험 결과 비교
CartPole-v1 환경에서의 학습 성능:
- A2C (단일 환경): 약 500 에피소드에서 수렴
- A2C (8 병렬 환경): 약 200 에피소드에서 수렴
- A3C (8 워커, 데이터 병렬): 약 150 에피소드에서 수렴
- A3C (8 워커, 그래디언트 병렬): 약 120 에피소드에서 수렴
비동기 업데이트로 인한 약간의 노이즈가 있지만, 탐색 다양성 덕분에 전체적으로 더 빠르게 수렴하는 경향을 보인다.
실전 팁과 주의사항
-
워커 수 선택: CPU 코어 수와 동일하게 설정하는 것이 일반적이다. GPU 사용 시에는 워커 수를 줄이고 배치 크기를 늘리는 것이 효율적이다.
-
비동기 업데이트의 불안정성: 워커 간 모델 버전 차이(staleness)가 클수록 학습이 불안정해진다. 그래디언트 클리핑이 필수적이다.
-
A2C vs A3C 선택 기준: GPU를 사용한다면 A2C가 더 효율적인 경우가 많다. 벡터화된 환경을 사용하면 A2C도 충분한 탐색 다양성을 확보할 수 있다. A3C는 CPU 기반 학습에서 장점이 크다.
-
디버깅: 비동기 프로그램은 디버깅이 어렵다. 먼저 단일 워커로 정상 동작을 확인한 뒤 워커 수를 늘리는 것이 좋다.
핵심 요약
- A3C는 비동기 병렬 학습으로 데이터 상관관계 문제를 해결한다
- 데이터 병렬화는 경험 수집을 분산하고, 그래디언트 병렬화는 연산도 분산한다
- Python의 multiprocessing과 shared memory를 활용하여 구현한다
- 최근에는 GPU 효율성 때문에 동기 방식(A2C)이나 PPO가 더 선호되는 추세이다
다음 글에서는 강화학습을 자연어 처리에 적용하여 챗봇을 훈련하는 방법을 알아보겠다.