- Authors

- Name
- Youngju Kim
- @fjvbn20031
개요
심층 강화학습은 Atari 게임과 바둑을 넘어 다양한 실전 영역에서 활용되고 있다. 이 글에서는 로봇 제어, 자율주행, 자원 관리, 추천 시스템, 자연어 처리, 게임 AI 등에서의 응용 사례와 실전 적용 시 고려사항을 정리한다.
로봇 제어 (Robotics)
시뮬레이션에서 현실로 (Sim-to-Real)
로봇 강화학습의 가장 큰 도전은 시뮬레이션과 현실의 간극(sim-to-real gap)이다. 시뮬레이터에서 학습한 정책이 실제 로봇에서는 작동하지 않을 수 있다.
import torch
import torch.nn as nn
import numpy as np
class DomainRandomization:
"""도메인 랜덤화: 시뮬레이션 파라미터를 무작위로 변경하여
현실 환경에 대한 일반화 능력 향상"""
def __init__(self, base_env):
self.base_env = base_env
def randomize(self):
"""물리 파라미터를 랜덤화"""
params = {
'friction': np.random.uniform(0.5, 1.5),
'mass_scale': np.random.uniform(0.8, 1.2),
'gravity': np.random.uniform(9.5, 10.1),
'actuator_noise': np.random.uniform(0.0, 0.05),
'sensor_noise': np.random.uniform(0.0, 0.02),
}
self.base_env.set_physics_params(params)
return params
def step_with_randomization(self, action):
"""노이즈가 추가된 환경 스텝"""
# 행동에 노이즈 추가
noisy_action = action + np.random.normal(
0, self.current_params['actuator_noise'],
size=action.shape
)
obs, reward, done, info = self.base_env.step(noisy_action)
# 관측에 노이즈 추가
noisy_obs = obs + np.random.normal(
0, self.current_params['sensor_noise'],
size=obs.shape
)
return noisy_obs, reward, done, info
로봇 매니퓰레이션 학습
class RobotGraspingPolicy(nn.Module):
"""로봇 파지(grasping) 정책"""
def __init__(self, image_size=84, proprioception_size=7,
action_size=4):
super().__init__()
# 카메라 이미지 처리
self.vision = nn.Sequential(
nn.Conv2d(3, 32, 8, stride=4),
nn.ReLU(),
nn.Conv2d(32, 64, 4, stride=2),
nn.ReLU(),
nn.Conv2d(64, 64, 3, stride=1),
nn.ReLU(),
nn.Flatten(),
)
# 비전 특징 크기 계산
with torch.no_grad():
dummy = torch.zeros(1, 3, image_size, image_size)
vision_size = self.vision(dummy).shape[1]
# 고유 감각(관절 각도 등) + 비전 결합
self.policy = nn.Sequential(
nn.Linear(vision_size + proprioception_size, 256),
nn.ReLU(),
nn.Linear(256, 128),
nn.ReLU(),
)
self.mu = nn.Linear(128, action_size)
self.log_std = nn.Parameter(torch.zeros(action_size))
def forward(self, image, proprioception):
vision_features = self.vision(image)
combined = torch.cat([vision_features, proprioception], dim=-1)
features = self.policy(combined)
mu = self.mu(features)
std = self.log_std.exp()
return mu, std
핵심 도전과제
- 안전성: 실제 로봇은 위험한 행동을 시도할 수 없다
- 샘플 효율성: 실제 환경에서의 데이터 수집이 느리고 비싸다
- 보상 설계: 복잡한 조작 작업의 보상을 정의하기 어렵다
자율주행 (Autonomous Driving)
RL 기반 자율주행 아키텍처
class DrivingPolicy(nn.Module):
"""자율주행 정책 네트워크"""
def __init__(self):
super().__init__()
# 센서 퓨전: 카메라 + 라이다 + 레이더
self.camera_encoder = nn.Sequential(
nn.Conv2d(3, 32, 5, stride=2),
nn.ReLU(),
nn.Conv2d(32, 64, 3, stride=2),
nn.ReLU(),
nn.Flatten(),
)
self.lidar_encoder = nn.Sequential(
nn.Linear(360, 128), # 360도 라이다 포인트
nn.ReLU(),
nn.Linear(128, 64),
nn.ReLU(),
)
# 주행 상태 (속도, 가속도, 조향 각도 등)
self.state_encoder = nn.Sequential(
nn.Linear(10, 32),
nn.ReLU(),
)
# 통합 후 행동 결정
# 특징 크기는 실제 입력에 따라 조정
self.decision = nn.Sequential(
nn.Linear(256 + 64 + 32, 256),
nn.ReLU(),
nn.Linear(256, 128),
nn.ReLU(),
)
# 연속 행동: 조향, 가속, 제동
self.steering = nn.Linear(128, 1)
self.throttle = nn.Linear(128, 1)
self.brake = nn.Linear(128, 1)
def forward(self, camera, lidar, state):
cam_feat = self.camera_encoder(camera)
lid_feat = self.lidar_encoder(lidar)
state_feat = self.state_encoder(state)
combined = torch.cat([cam_feat, lid_feat, state_feat], dim=-1)
features = self.decision(combined)
steering = torch.tanh(self.steering(features))
throttle = torch.sigmoid(self.throttle(features))
brake = torch.sigmoid(self.brake(features))
return steering, throttle, brake
보상 설계
def driving_reward(state, action, next_state):
"""자율주행 보상 함수"""
reward = 0.0
# 진행 보상: 목적지 방향으로 이동
progress = next_state['distance_to_goal'] - state['distance_to_goal']
reward += -progress * 10.0 # 가까워지면 양수
# 차선 유지 보상
lane_deviation = abs(next_state['lane_offset'])
reward -= lane_deviation * 2.0
# 속도 보상: 적정 속도 유지
speed = next_state['speed']
target_speed = next_state['speed_limit']
speed_diff = abs(speed - target_speed) / target_speed
reward -= speed_diff * 1.0
# 안전 페널티
if next_state['collision']:
reward -= 100.0
if next_state['traffic_violation']:
reward -= 50.0
# 편안함: 급가속/급감속 페널티
jerk = abs(action['acceleration_change'])
reward -= jerk * 0.5
return reward
자원 관리 (Resource Management)
클라우드 자원 스케줄링
class ResourceScheduler(nn.Module):
"""클라우드 자원 스케줄링 RL 에이전트"""
def __init__(self, num_servers, num_job_types, num_actions):
super().__init__()
# 상태: 서버 부하 + 대기열 + 현재 시간 특성
state_size = num_servers * 3 + num_job_types * 2 + 4
self.net = nn.Sequential(
nn.Linear(state_size, 256),
nn.ReLU(),
nn.Linear(256, 128),
nn.ReLU(),
)
self.policy = nn.Linear(128, num_actions)
self.value = nn.Linear(128, 1)
def forward(self, state):
features = self.net(state)
return self.policy(features), self.value(features)
def resource_reward(state, action, next_state, config):
"""자원 관리 보상"""
reward = 0.0
# 처리량 최대화
throughput = next_state['completed_jobs']
reward += throughput * config['throughput_weight']
# 지연 시간 최소화
avg_latency = next_state['avg_latency']
reward -= avg_latency * config['latency_weight']
# 자원 비용 최소화
cost = next_state['resource_cost']
reward -= cost * config['cost_weight']
# SLA 위반 페널티
sla_violations = next_state['sla_violations']
reward -= sla_violations * config['sla_penalty']
return reward
네트워크 라우팅 최적화
class NetworkRouter(nn.Module):
"""네트워크 트래픽 라우팅 에이전트"""
def __init__(self, num_nodes, num_links):
super().__init__()
state_size = num_nodes + num_links * 2 # 노드 수요 + 링크 용량/부하
self.encoder = nn.Sequential(
nn.Linear(state_size, 128),
nn.ReLU(),
nn.Linear(128, 128),
nn.ReLU(),
)
# 각 링크의 트래픽 비율 결정
self.router = nn.Sequential(
nn.Linear(128, num_links),
nn.Softmax(dim=-1), # 트래픽 분배 비율
)
def forward(self, network_state):
features = self.encoder(network_state)
routing_weights = self.router(features)
return routing_weights
추천 시스템 (Recommendation Systems)
순차적 추천에 RL 적용
추천 시스템을 강화학습으로 모델링하면 장기적인 사용자 만족도를 최적화할 수 있다.
class RecommendationAgent(nn.Module):
"""RL 기반 추천 에이전트"""
def __init__(self, num_items, embed_dim=64, hidden_size=128):
super().__init__()
# 아이템 임베딩
self.item_embedding = nn.Embedding(num_items, embed_dim)
# 사용자 히스토리 인코딩 (GRU)
self.history_encoder = nn.GRU(
embed_dim, hidden_size, batch_first=True
)
# 사용자 특성 인코딩
self.user_encoder = nn.Linear(20, 32) # 사용자 프로파일
# Q-네트워크: 각 아이템의 가치 예측
self.q_network = nn.Sequential(
nn.Linear(hidden_size + 32, 256),
nn.ReLU(),
nn.Linear(256, num_items),
)
def forward(self, user_history, user_features):
# 히스토리 인코딩
history_embedded = self.item_embedding(user_history)
_, hidden = self.history_encoder(history_embedded)
history_feature = hidden.squeeze(0)
# 사용자 특성 인코딩
user_feature = self.user_encoder(user_features)
# Q 값 계산
combined = torch.cat([history_feature, user_feature], dim=-1)
q_values = self.q_network(combined)
return q_values
def recommendation_reward(user_response, item_type):
"""추천 보상: 단기 + 장기"""
immediate = 0.0
if user_response == 'click':
immediate += 1.0
elif user_response == 'purchase':
immediate += 5.0
elif user_response == 'skip':
immediate -= 0.1
# 다양성 보너스: 다양한 카테고리 추천 장려
# (장기적 사용자 만족을 위해)
return immediate
자연어 처리 (NLP)
RLHF: 인간 피드백 기반 강화학습
현대 대규모 언어모델(LLM)의 핵심 학습 기법이다:
class RLHFTrainer:
"""RLHF 학습 개념적 구현"""
def __init__(self, policy_model, reward_model, ref_model,
kl_coef=0.1):
self.policy = policy_model # 학습 대상 LLM
self.reward_model = reward_model # 인간 선호도 모델
self.ref_model = ref_model # 기준 모델 (고정)
self.kl_coef = kl_coef
self.optimizer = torch.optim.Adam(
self.policy.parameters(), lr=1e-5
)
def compute_reward(self, prompt, response):
"""보상 = 보상 모델 점수 - KL 페널티"""
# 보상 모델 점수
reward_score = self.reward_model.score(prompt, response)
# KL 발산 페널티 (기준 모델과의 차이)
policy_logprobs = self.policy.log_prob(prompt, response)
ref_logprobs = self.ref_model.log_prob(prompt, response)
kl_penalty = policy_logprobs - ref_logprobs
return reward_score - self.kl_coef * kl_penalty
def train_step(self, prompts):
"""PPO 기반 RLHF 학습 스텝"""
# 1. 현재 정책으로 응답 생성
responses = self.policy.generate(prompts)
# 2. 보상 계산
rewards = [
self.compute_reward(p, r)
for p, r in zip(prompts, responses)
]
# 3. PPO 업데이트
self._ppo_update(prompts, responses, rewards)
텍스트 요약에 RL 적용
class SummarizationRL:
"""강화학습 기반 텍스트 요약"""
def __init__(self, model, rouge_weight=1.0, length_weight=0.1):
self.model = model
self.rouge_weight = rouge_weight
self.length_weight = length_weight
def reward_function(self, source, generated_summary, reference):
"""ROUGE 점수 기반 보상"""
rouge_score = compute_rouge(generated_summary, reference)
# 길이 페널티: 너무 길거나 짧은 요약 방지
target_ratio = 0.3 # 원문의 30%
actual_ratio = len(generated_summary) / max(len(source), 1)
length_penalty = -abs(actual_ratio - target_ratio)
reward = (self.rouge_weight * rouge_score
+ self.length_weight * length_penalty)
return reward
게임 AI (Beyond Atari)
OpenAI Five (Dota 2)
OpenAI Five는 5명이 한 팀인 Dota 2에서 세계 챔피언 팀을 이겼다. 핵심 도전:
- 부분 관측: 전장 안개(fog of war)로 적 위치를 모름
- 장기 계획: 45분 이상의 긴 경기
- 팀 협력: 5명의 에이전트가 협력해야 함
- 거대한 행동 공간: 수천 개의 가능한 행동
멀티 에이전트 협력 학습
class MultiAgentPolicy(nn.Module):
"""멀티 에이전트 협력 정책 (파라미터 공유)"""
def __init__(self, obs_size, act_size, num_agents,
comm_size=32):
super().__init__()
self.num_agents = num_agents
# 개별 관측 인코더
self.obs_encoder = nn.Sequential(
nn.Linear(obs_size, 128),
nn.ReLU(),
)
# 통신 채널
self.comm_encoder = nn.Sequential(
nn.Linear(128, comm_size),
nn.ReLU(),
)
# 수신 메시지 통합
self.message_integrator = nn.Sequential(
nn.Linear(comm_size * (num_agents - 1), 64),
nn.ReLU(),
)
# 최종 정책
self.policy = nn.Sequential(
nn.Linear(128 + 64, 128),
nn.ReLU(),
nn.Linear(128, act_size),
)
def forward(self, observations):
"""
observations: (batch, num_agents, obs_size)
"""
batch_size = observations.shape[0]
# 각 에이전트의 관측 인코딩
obs_flat = observations.view(-1, observations.shape[-1])
encoded = self.obs_encoder(obs_flat)
encoded = encoded.view(batch_size, self.num_agents, -1)
# 통신 메시지 생성
messages = self.comm_encoder(
encoded.view(-1, encoded.shape[-1])
).view(batch_size, self.num_agents, -1)
# 각 에이전트별 타 에이전트 메시지 수집 및 정책 계산
all_policies = []
for i in range(self.num_agents):
other_msgs = torch.cat(
[messages[:, j] for j in range(self.num_agents) if j != i],
dim=-1
)
integrated = self.message_integrator(other_msgs)
combined = torch.cat([encoded[:, i], integrated], dim=-1)
policy_logits = self.policy(combined)
all_policies.append(policy_logits)
return torch.stack(all_policies, dim=1)
실전 적용 시 고려사항
공통 도전과제
- 보상 설계: 잘못된 보상은 의도하지 않은 행동을 유발한다 (reward hacking)
# 나쁜 예: 점수만 최대화하면 비윤리적 전략 학습 가능
reward = game_score
# 좋은 예: 다양한 목표를 균형 있게 반영
reward = (score_weight * game_score
- safety_weight * violations
+ fairness_weight * equity_metric)
-
샘플 효율성: 실제 환경에서의 데이터 수집이 비싸다
-
안전성: 학습 중 위험한 행동을 방지해야 한다
-
평가: 시뮬레이터 성능이 반드시 실전 성능을 보장하지 않는다
실전 적용 체크리스트
| 항목 | 확인 내용 |
|---|---|
| 문제 정의 | 상태/행동/보상이 명확하게 정의되었는가 |
| 시뮬레이터 | 충분히 현실적인 시뮬레이터가 있는가 |
| 기준선 | 간단한 규칙 기반 방법과 비교했는가 |
| 안전성 | 학습 중 안전 제약이 보장되는가 |
| 평가 | 다양한 시나리오에서 테스트했는가 |
| 배포 | 실시간 추론 성능이 충분한가 |
핵심 요약
- 강화학습은 로봇 제어, 자율주행, 자원 관리, 추천, NLP, 게임 등 폭넓게 활용된다
- Sim-to-Real, RLHF, 멀티 에이전트 등 도메인별 특화 기법이 중요하다
- 보상 설계, 안전성, 샘플 효율성이 실전 적용의 핵심 과제이다
- 실전 적용 시 간단한 기준선부터 시작하여 점진적으로 복잡도를 높이는 것이 좋다
다음 글에서는 이 시리즈의 마지막으로 심층 강화학습 알고리즘의 총정리와 선택 가이드를 다룬다.