Split View: [심층 강화학습] 19. 심층 강화학습의 실전 응용 사례
[심층 강화학습] 19. 심층 강화학습의 실전 응용 사례
개요
심층 강화학습은 Atari 게임과 바둑을 넘어 다양한 실전 영역에서 활용되고 있다. 이 글에서는 로봇 제어, 자율주행, 자원 관리, 추천 시스템, 자연어 처리, 게임 AI 등에서의 응용 사례와 실전 적용 시 고려사항을 정리한다.
로봇 제어 (Robotics)
시뮬레이션에서 현실로 (Sim-to-Real)
로봇 강화학습의 가장 큰 도전은 시뮬레이션과 현실의 간극(sim-to-real gap)이다. 시뮬레이터에서 학습한 정책이 실제 로봇에서는 작동하지 않을 수 있다.
import torch
import torch.nn as nn
import numpy as np
class DomainRandomization:
"""도메인 랜덤화: 시뮬레이션 파라미터를 무작위로 변경하여
현실 환경에 대한 일반화 능력 향상"""
def __init__(self, base_env):
self.base_env = base_env
def randomize(self):
"""물리 파라미터를 랜덤화"""
params = {
'friction': np.random.uniform(0.5, 1.5),
'mass_scale': np.random.uniform(0.8, 1.2),
'gravity': np.random.uniform(9.5, 10.1),
'actuator_noise': np.random.uniform(0.0, 0.05),
'sensor_noise': np.random.uniform(0.0, 0.02),
}
self.base_env.set_physics_params(params)
return params
def step_with_randomization(self, action):
"""노이즈가 추가된 환경 스텝"""
# 행동에 노이즈 추가
noisy_action = action + np.random.normal(
0, self.current_params['actuator_noise'],
size=action.shape
)
obs, reward, done, info = self.base_env.step(noisy_action)
# 관측에 노이즈 추가
noisy_obs = obs + np.random.normal(
0, self.current_params['sensor_noise'],
size=obs.shape
)
return noisy_obs, reward, done, info
로봇 매니퓰레이션 학습
class RobotGraspingPolicy(nn.Module):
"""로봇 파지(grasping) 정책"""
def __init__(self, image_size=84, proprioception_size=7,
action_size=4):
super().__init__()
# 카메라 이미지 처리
self.vision = nn.Sequential(
nn.Conv2d(3, 32, 8, stride=4),
nn.ReLU(),
nn.Conv2d(32, 64, 4, stride=2),
nn.ReLU(),
nn.Conv2d(64, 64, 3, stride=1),
nn.ReLU(),
nn.Flatten(),
)
# 비전 특징 크기 계산
with torch.no_grad():
dummy = torch.zeros(1, 3, image_size, image_size)
vision_size = self.vision(dummy).shape[1]
# 고유 감각(관절 각도 등) + 비전 결합
self.policy = nn.Sequential(
nn.Linear(vision_size + proprioception_size, 256),
nn.ReLU(),
nn.Linear(256, 128),
nn.ReLU(),
)
self.mu = nn.Linear(128, action_size)
self.log_std = nn.Parameter(torch.zeros(action_size))
def forward(self, image, proprioception):
vision_features = self.vision(image)
combined = torch.cat([vision_features, proprioception], dim=-1)
features = self.policy(combined)
mu = self.mu(features)
std = self.log_std.exp()
return mu, std
핵심 도전과제
- 안전성: 실제 로봇은 위험한 행동을 시도할 수 없다
- 샘플 효율성: 실제 환경에서의 데이터 수집이 느리고 비싸다
- 보상 설계: 복잡한 조작 작업의 보상을 정의하기 어렵다
자율주행 (Autonomous Driving)
RL 기반 자율주행 아키텍처
class DrivingPolicy(nn.Module):
"""자율주행 정책 네트워크"""
def __init__(self):
super().__init__()
# 센서 퓨전: 카메라 + 라이다 + 레이더
self.camera_encoder = nn.Sequential(
nn.Conv2d(3, 32, 5, stride=2),
nn.ReLU(),
nn.Conv2d(32, 64, 3, stride=2),
nn.ReLU(),
nn.Flatten(),
)
self.lidar_encoder = nn.Sequential(
nn.Linear(360, 128), # 360도 라이다 포인트
nn.ReLU(),
nn.Linear(128, 64),
nn.ReLU(),
)
# 주행 상태 (속도, 가속도, 조향 각도 등)
self.state_encoder = nn.Sequential(
nn.Linear(10, 32),
nn.ReLU(),
)
# 통합 후 행동 결정
# 특징 크기는 실제 입력에 따라 조정
self.decision = nn.Sequential(
nn.Linear(256 + 64 + 32, 256),
nn.ReLU(),
nn.Linear(256, 128),
nn.ReLU(),
)
# 연속 행동: 조향, 가속, 제동
self.steering = nn.Linear(128, 1)
self.throttle = nn.Linear(128, 1)
self.brake = nn.Linear(128, 1)
def forward(self, camera, lidar, state):
cam_feat = self.camera_encoder(camera)
lid_feat = self.lidar_encoder(lidar)
state_feat = self.state_encoder(state)
combined = torch.cat([cam_feat, lid_feat, state_feat], dim=-1)
features = self.decision(combined)
steering = torch.tanh(self.steering(features))
throttle = torch.sigmoid(self.throttle(features))
brake = torch.sigmoid(self.brake(features))
return steering, throttle, brake
보상 설계
def driving_reward(state, action, next_state):
"""자율주행 보상 함수"""
reward = 0.0
# 진행 보상: 목적지 방향으로 이동
progress = next_state['distance_to_goal'] - state['distance_to_goal']
reward += -progress * 10.0 # 가까워지면 양수
# 차선 유지 보상
lane_deviation = abs(next_state['lane_offset'])
reward -= lane_deviation * 2.0
# 속도 보상: 적정 속도 유지
speed = next_state['speed']
target_speed = next_state['speed_limit']
speed_diff = abs(speed - target_speed) / target_speed
reward -= speed_diff * 1.0
# 안전 페널티
if next_state['collision']:
reward -= 100.0
if next_state['traffic_violation']:
reward -= 50.0
# 편안함: 급가속/급감속 페널티
jerk = abs(action['acceleration_change'])
reward -= jerk * 0.5
return reward
자원 관리 (Resource Management)
클라우드 자원 스케줄링
class ResourceScheduler(nn.Module):
"""클라우드 자원 스케줄링 RL 에이전트"""
def __init__(self, num_servers, num_job_types, num_actions):
super().__init__()
# 상태: 서버 부하 + 대기열 + 현재 시간 특성
state_size = num_servers * 3 + num_job_types * 2 + 4
self.net = nn.Sequential(
nn.Linear(state_size, 256),
nn.ReLU(),
nn.Linear(256, 128),
nn.ReLU(),
)
self.policy = nn.Linear(128, num_actions)
self.value = nn.Linear(128, 1)
def forward(self, state):
features = self.net(state)
return self.policy(features), self.value(features)
def resource_reward(state, action, next_state, config):
"""자원 관리 보상"""
reward = 0.0
# 처리량 최대화
throughput = next_state['completed_jobs']
reward += throughput * config['throughput_weight']
# 지연 시간 최소화
avg_latency = next_state['avg_latency']
reward -= avg_latency * config['latency_weight']
# 자원 비용 최소화
cost = next_state['resource_cost']
reward -= cost * config['cost_weight']
# SLA 위반 페널티
sla_violations = next_state['sla_violations']
reward -= sla_violations * config['sla_penalty']
return reward
네트워크 라우팅 최적화
class NetworkRouter(nn.Module):
"""네트워크 트래픽 라우팅 에이전트"""
def __init__(self, num_nodes, num_links):
super().__init__()
state_size = num_nodes + num_links * 2 # 노드 수요 + 링크 용량/부하
self.encoder = nn.Sequential(
nn.Linear(state_size, 128),
nn.ReLU(),
nn.Linear(128, 128),
nn.ReLU(),
)
# 각 링크의 트래픽 비율 결정
self.router = nn.Sequential(
nn.Linear(128, num_links),
nn.Softmax(dim=-1), # 트래픽 분배 비율
)
def forward(self, network_state):
features = self.encoder(network_state)
routing_weights = self.router(features)
return routing_weights
추천 시스템 (Recommendation Systems)
순차적 추천에 RL 적용
추천 시스템을 강화학습으로 모델링하면 장기적인 사용자 만족도를 최적화할 수 있다.
class RecommendationAgent(nn.Module):
"""RL 기반 추천 에이전트"""
def __init__(self, num_items, embed_dim=64, hidden_size=128):
super().__init__()
# 아이템 임베딩
self.item_embedding = nn.Embedding(num_items, embed_dim)
# 사용자 히스토리 인코딩 (GRU)
self.history_encoder = nn.GRU(
embed_dim, hidden_size, batch_first=True
)
# 사용자 특성 인코딩
self.user_encoder = nn.Linear(20, 32) # 사용자 프로파일
# Q-네트워크: 각 아이템의 가치 예측
self.q_network = nn.Sequential(
nn.Linear(hidden_size + 32, 256),
nn.ReLU(),
nn.Linear(256, num_items),
)
def forward(self, user_history, user_features):
# 히스토리 인코딩
history_embedded = self.item_embedding(user_history)
_, hidden = self.history_encoder(history_embedded)
history_feature = hidden.squeeze(0)
# 사용자 특성 인코딩
user_feature = self.user_encoder(user_features)
# Q 값 계산
combined = torch.cat([history_feature, user_feature], dim=-1)
q_values = self.q_network(combined)
return q_values
def recommendation_reward(user_response, item_type):
"""추천 보상: 단기 + 장기"""
immediate = 0.0
if user_response == 'click':
immediate += 1.0
elif user_response == 'purchase':
immediate += 5.0
elif user_response == 'skip':
immediate -= 0.1
# 다양성 보너스: 다양한 카테고리 추천 장려
# (장기적 사용자 만족을 위해)
return immediate
자연어 처리 (NLP)
RLHF: 인간 피드백 기반 강화학습
현대 대규모 언어모델(LLM)의 핵심 학습 기법이다:
class RLHFTrainer:
"""RLHF 학습 개념적 구현"""
def __init__(self, policy_model, reward_model, ref_model,
kl_coef=0.1):
self.policy = policy_model # 학습 대상 LLM
self.reward_model = reward_model # 인간 선호도 모델
self.ref_model = ref_model # 기준 모델 (고정)
self.kl_coef = kl_coef
self.optimizer = torch.optim.Adam(
self.policy.parameters(), lr=1e-5
)
def compute_reward(self, prompt, response):
"""보상 = 보상 모델 점수 - KL 페널티"""
# 보상 모델 점수
reward_score = self.reward_model.score(prompt, response)
# KL 발산 페널티 (기준 모델과의 차이)
policy_logprobs = self.policy.log_prob(prompt, response)
ref_logprobs = self.ref_model.log_prob(prompt, response)
kl_penalty = policy_logprobs - ref_logprobs
return reward_score - self.kl_coef * kl_penalty
def train_step(self, prompts):
"""PPO 기반 RLHF 학습 스텝"""
# 1. 현재 정책으로 응답 생성
responses = self.policy.generate(prompts)
# 2. 보상 계산
rewards = [
self.compute_reward(p, r)
for p, r in zip(prompts, responses)
]
# 3. PPO 업데이트
self._ppo_update(prompts, responses, rewards)
텍스트 요약에 RL 적용
class SummarizationRL:
"""강화학습 기반 텍스트 요약"""
def __init__(self, model, rouge_weight=1.0, length_weight=0.1):
self.model = model
self.rouge_weight = rouge_weight
self.length_weight = length_weight
def reward_function(self, source, generated_summary, reference):
"""ROUGE 점수 기반 보상"""
rouge_score = compute_rouge(generated_summary, reference)
# 길이 페널티: 너무 길거나 짧은 요약 방지
target_ratio = 0.3 # 원문의 30%
actual_ratio = len(generated_summary) / max(len(source), 1)
length_penalty = -abs(actual_ratio - target_ratio)
reward = (self.rouge_weight * rouge_score
+ self.length_weight * length_penalty)
return reward
게임 AI (Beyond Atari)
OpenAI Five (Dota 2)
OpenAI Five는 5명이 한 팀인 Dota 2에서 세계 챔피언 팀을 이겼다. 핵심 도전:
- 부분 관측: 전장 안개(fog of war)로 적 위치를 모름
- 장기 계획: 45분 이상의 긴 경기
- 팀 협력: 5명의 에이전트가 협력해야 함
- 거대한 행동 공간: 수천 개의 가능한 행동
멀티 에이전트 협력 학습
class MultiAgentPolicy(nn.Module):
"""멀티 에이전트 협력 정책 (파라미터 공유)"""
def __init__(self, obs_size, act_size, num_agents,
comm_size=32):
super().__init__()
self.num_agents = num_agents
# 개별 관측 인코더
self.obs_encoder = nn.Sequential(
nn.Linear(obs_size, 128),
nn.ReLU(),
)
# 통신 채널
self.comm_encoder = nn.Sequential(
nn.Linear(128, comm_size),
nn.ReLU(),
)
# 수신 메시지 통합
self.message_integrator = nn.Sequential(
nn.Linear(comm_size * (num_agents - 1), 64),
nn.ReLU(),
)
# 최종 정책
self.policy = nn.Sequential(
nn.Linear(128 + 64, 128),
nn.ReLU(),
nn.Linear(128, act_size),
)
def forward(self, observations):
"""
observations: (batch, num_agents, obs_size)
"""
batch_size = observations.shape[0]
# 각 에이전트의 관측 인코딩
obs_flat = observations.view(-1, observations.shape[-1])
encoded = self.obs_encoder(obs_flat)
encoded = encoded.view(batch_size, self.num_agents, -1)
# 통신 메시지 생성
messages = self.comm_encoder(
encoded.view(-1, encoded.shape[-1])
).view(batch_size, self.num_agents, -1)
# 각 에이전트별 타 에이전트 메시지 수집 및 정책 계산
all_policies = []
for i in range(self.num_agents):
other_msgs = torch.cat(
[messages[:, j] for j in range(self.num_agents) if j != i],
dim=-1
)
integrated = self.message_integrator(other_msgs)
combined = torch.cat([encoded[:, i], integrated], dim=-1)
policy_logits = self.policy(combined)
all_policies.append(policy_logits)
return torch.stack(all_policies, dim=1)
실전 적용 시 고려사항
공통 도전과제
- 보상 설계: 잘못된 보상은 의도하지 않은 행동을 유발한다 (reward hacking)
# 나쁜 예: 점수만 최대화하면 비윤리적 전략 학습 가능
reward = game_score
# 좋은 예: 다양한 목표를 균형 있게 반영
reward = (score_weight * game_score
- safety_weight * violations
+ fairness_weight * equity_metric)
-
샘플 효율성: 실제 환경에서의 데이터 수집이 비싸다
-
안전성: 학습 중 위험한 행동을 방지해야 한다
-
평가: 시뮬레이터 성능이 반드시 실전 성능을 보장하지 않는다
실전 적용 체크리스트
| 항목 | 확인 내용 |
|---|---|
| 문제 정의 | 상태/행동/보상이 명확하게 정의되었는가 |
| 시뮬레이터 | 충분히 현실적인 시뮬레이터가 있는가 |
| 기준선 | 간단한 규칙 기반 방법과 비교했는가 |
| 안전성 | 학습 중 안전 제약이 보장되는가 |
| 평가 | 다양한 시나리오에서 테스트했는가 |
| 배포 | 실시간 추론 성능이 충분한가 |
핵심 요약
- 강화학습은 로봇 제어, 자율주행, 자원 관리, 추천, NLP, 게임 등 폭넓게 활용된다
- Sim-to-Real, RLHF, 멀티 에이전트 등 도메인별 특화 기법이 중요하다
- 보상 설계, 안전성, 샘플 효율성이 실전 적용의 핵심 과제이다
- 실전 적용 시 간단한 기준선부터 시작하여 점진적으로 복잡도를 높이는 것이 좋다
다음 글에서는 이 시리즈의 마지막으로 심층 강화학습 알고리즘의 총정리와 선택 가이드를 다룬다.
[Deep RL] 19. Practical Applications of Deep Reinforcement Learning
Overview
Deep reinforcement learning is being applied across diverse real-world domains beyond Atari games and Go. This post covers application examples and practical considerations in robot control, autonomous driving, resource management, recommendation systems, natural language processing, and game AI.
Robot Control (Robotics)
Simulation to Reality (Sim-to-Real)
The biggest challenge in robot RL is the sim-to-real gap. A policy trained in simulation may not work on a real robot.
import torch
import torch.nn as nn
import numpy as np
class DomainRandomization:
"""Domain randomization: randomly vary simulation parameters
to improve generalization to real environments"""
def __init__(self, base_env):
self.base_env = base_env
def randomize(self):
"""Randomize physics parameters"""
params = {
'friction': np.random.uniform(0.5, 1.5),
'mass_scale': np.random.uniform(0.8, 1.2),
'gravity': np.random.uniform(9.5, 10.1),
'actuator_noise': np.random.uniform(0.0, 0.05),
'sensor_noise': np.random.uniform(0.0, 0.02),
}
self.base_env.set_physics_params(params)
return params
def step_with_randomization(self, action):
"""Environment step with added noise"""
noisy_action = action + np.random.normal(
0, self.current_params['actuator_noise'],
size=action.shape
)
obs, reward, done, info = self.base_env.step(noisy_action)
noisy_obs = obs + np.random.normal(
0, self.current_params['sensor_noise'],
size=obs.shape
)
return noisy_obs, reward, done, info
Robot Manipulation Learning
class RobotGraspingPolicy(nn.Module):
"""Robot grasping policy"""
def __init__(self, image_size=84, proprioception_size=7,
action_size=4):
super().__init__()
self.vision = nn.Sequential(
nn.Conv2d(3, 32, 8, stride=4), nn.ReLU(),
nn.Conv2d(32, 64, 4, stride=2), nn.ReLU(),
nn.Conv2d(64, 64, 3, stride=1), nn.ReLU(),
nn.Flatten(),
)
with torch.no_grad():
dummy = torch.zeros(1, 3, image_size, image_size)
vision_size = self.vision(dummy).shape[1]
self.policy = nn.Sequential(
nn.Linear(vision_size + proprioception_size, 256), nn.ReLU(),
nn.Linear(256, 128), nn.ReLU(),
)
self.mu = nn.Linear(128, action_size)
self.log_std = nn.Parameter(torch.zeros(action_size))
def forward(self, image, proprioception):
vision_features = self.vision(image)
combined = torch.cat([vision_features, proprioception], dim=-1)
features = self.policy(combined)
mu = self.mu(features)
std = self.log_std.exp()
return mu, std
Key Challenges
- Safety: Real robots cannot attempt dangerous actions
- Sample efficiency: Data collection in real environments is slow and expensive
- Reward design: Defining rewards for complex manipulation tasks is difficult
Autonomous Driving
RL-Based Autonomous Driving Architecture
class DrivingPolicy(nn.Module):
"""Autonomous driving policy network"""
def __init__(self):
super().__init__()
self.camera_encoder = nn.Sequential(
nn.Conv2d(3, 32, 5, stride=2), nn.ReLU(),
nn.Conv2d(32, 64, 3, stride=2), nn.ReLU(),
nn.Flatten(),
)
self.lidar_encoder = nn.Sequential(
nn.Linear(360, 128), nn.ReLU(),
nn.Linear(128, 64), nn.ReLU(),
)
self.state_encoder = nn.Sequential(
nn.Linear(10, 32), nn.ReLU(),
)
self.decision = nn.Sequential(
nn.Linear(256 + 64 + 32, 256), nn.ReLU(),
nn.Linear(256, 128), nn.ReLU(),
)
self.steering = nn.Linear(128, 1)
self.throttle = nn.Linear(128, 1)
self.brake = nn.Linear(128, 1)
def forward(self, camera, lidar, state):
cam_feat = self.camera_encoder(camera)
lid_feat = self.lidar_encoder(lidar)
state_feat = self.state_encoder(state)
combined = torch.cat([cam_feat, lid_feat, state_feat], dim=-1)
features = self.decision(combined)
steering = torch.tanh(self.steering(features))
throttle = torch.sigmoid(self.throttle(features))
brake = torch.sigmoid(self.brake(features))
return steering, throttle, brake
Reward Design
def driving_reward(state, action, next_state):
"""Autonomous driving reward function"""
reward = 0.0
progress = next_state['distance_to_goal'] - state['distance_to_goal']
reward += -progress * 10.0
lane_deviation = abs(next_state['lane_offset'])
reward -= lane_deviation * 2.0
speed = next_state['speed']
target_speed = next_state['speed_limit']
speed_diff = abs(speed - target_speed) / target_speed
reward -= speed_diff * 1.0
if next_state['collision']:
reward -= 100.0
if next_state['traffic_violation']:
reward -= 50.0
jerk = abs(action['acceleration_change'])
reward -= jerk * 0.5
return reward
Resource Management
Cloud Resource Scheduling
class ResourceScheduler(nn.Module):
"""Cloud resource scheduling RL agent"""
def __init__(self, num_servers, num_job_types, num_actions):
super().__init__()
state_size = num_servers * 3 + num_job_types * 2 + 4
self.net = nn.Sequential(
nn.Linear(state_size, 256), nn.ReLU(),
nn.Linear(256, 128), nn.ReLU(),
)
self.policy = nn.Linear(128, num_actions)
self.value = nn.Linear(128, 1)
def forward(self, state):
features = self.net(state)
return self.policy(features), self.value(features)
def resource_reward(state, action, next_state, config):
"""Resource management reward"""
reward = 0.0
reward += next_state['completed_jobs'] * config['throughput_weight']
reward -= next_state['avg_latency'] * config['latency_weight']
reward -= next_state['resource_cost'] * config['cost_weight']
reward -= next_state['sla_violations'] * config['sla_penalty']
return reward
Network Routing Optimization
class NetworkRouter(nn.Module):
"""Network traffic routing agent"""
def __init__(self, num_nodes, num_links):
super().__init__()
state_size = num_nodes + num_links * 2
self.encoder = nn.Sequential(
nn.Linear(state_size, 128), nn.ReLU(),
nn.Linear(128, 128), nn.ReLU(),
)
self.router = nn.Sequential(
nn.Linear(128, num_links),
nn.Softmax(dim=-1),
)
def forward(self, network_state):
features = self.encoder(network_state)
routing_weights = self.router(features)
return routing_weights
Recommendation Systems
Applying RL to Sequential Recommendation
Modeling recommendation systems with RL enables optimization of long-term user satisfaction.
class RecommendationAgent(nn.Module):
"""RL-based recommendation agent"""
def __init__(self, num_items, embed_dim=64, hidden_size=128):
super().__init__()
self.item_embedding = nn.Embedding(num_items, embed_dim)
self.history_encoder = nn.GRU(embed_dim, hidden_size, batch_first=True)
self.user_encoder = nn.Linear(20, 32)
self.q_network = nn.Sequential(
nn.Linear(hidden_size + 32, 256), nn.ReLU(),
nn.Linear(256, num_items),
)
def forward(self, user_history, user_features):
history_embedded = self.item_embedding(user_history)
_, hidden = self.history_encoder(history_embedded)
history_feature = hidden.squeeze(0)
user_feature = self.user_encoder(user_features)
combined = torch.cat([history_feature, user_feature], dim=-1)
q_values = self.q_network(combined)
return q_values
def recommendation_reward(user_response, item_type):
"""Recommendation reward: short-term + long-term"""
immediate = 0.0
if user_response == 'click':
immediate += 1.0
elif user_response == 'purchase':
immediate += 5.0
elif user_response == 'skip':
immediate -= 0.1
return immediate
Natural Language Processing (NLP)
RLHF: Reinforcement Learning from Human Feedback
A core training technique for modern large language models (LLMs):
class RLHFTrainer:
"""Conceptual implementation of RLHF training"""
def __init__(self, policy_model, reward_model, ref_model, kl_coef=0.1):
self.policy = policy_model
self.reward_model = reward_model
self.ref_model = ref_model
self.kl_coef = kl_coef
self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=1e-5)
def compute_reward(self, prompt, response):
"""Reward = reward model score - KL penalty"""
reward_score = self.reward_model.score(prompt, response)
policy_logprobs = self.policy.log_prob(prompt, response)
ref_logprobs = self.ref_model.log_prob(prompt, response)
kl_penalty = policy_logprobs - ref_logprobs
return reward_score - self.kl_coef * kl_penalty
def train_step(self, prompts):
"""PPO-based RLHF training step"""
responses = self.policy.generate(prompts)
rewards = [self.compute_reward(p, r) for p, r in zip(prompts, responses)]
self._ppo_update(prompts, responses, rewards)
Applying RL to Text Summarization
class SummarizationRL:
"""RL-based text summarization"""
def __init__(self, model, rouge_weight=1.0, length_weight=0.1):
self.model = model
self.rouge_weight = rouge_weight
self.length_weight = length_weight
def reward_function(self, source, generated_summary, reference):
"""ROUGE score-based reward"""
rouge_score = compute_rouge(generated_summary, reference)
target_ratio = 0.3
actual_ratio = len(generated_summary) / max(len(source), 1)
length_penalty = -abs(actual_ratio - target_ratio)
reward = (self.rouge_weight * rouge_score
+ self.length_weight * length_penalty)
return reward
Game AI (Beyond Atari)
OpenAI Five (Dota 2)
OpenAI Five defeated the world champion team in Dota 2, a 5v5 game. Key challenges:
- Partial observability: Fog of war hides enemy positions
- Long-term planning: Matches lasting over 45 minutes
- Team cooperation: 5 agents must cooperate
- Enormous action space: Thousands of possible actions
Multi-Agent Cooperative Learning
class MultiAgentPolicy(nn.Module):
"""Multi-agent cooperative policy (parameter sharing)"""
def __init__(self, obs_size, act_size, num_agents, comm_size=32):
super().__init__()
self.num_agents = num_agents
self.obs_encoder = nn.Sequential(nn.Linear(obs_size, 128), nn.ReLU())
self.comm_encoder = nn.Sequential(nn.Linear(128, comm_size), nn.ReLU())
self.message_integrator = nn.Sequential(
nn.Linear(comm_size * (num_agents - 1), 64), nn.ReLU(),
)
self.policy = nn.Sequential(
nn.Linear(128 + 64, 128), nn.ReLU(),
nn.Linear(128, act_size),
)
def forward(self, observations):
batch_size = observations.shape[0]
obs_flat = observations.view(-1, observations.shape[-1])
encoded = self.obs_encoder(obs_flat)
encoded = encoded.view(batch_size, self.num_agents, -1)
messages = self.comm_encoder(
encoded.view(-1, encoded.shape[-1])
).view(batch_size, self.num_agents, -1)
all_policies = []
for i in range(self.num_agents):
other_msgs = torch.cat(
[messages[:, j] for j in range(self.num_agents) if j != i],
dim=-1
)
integrated = self.message_integrator(other_msgs)
combined = torch.cat([encoded[:, i], integrated], dim=-1)
policy_logits = self.policy(combined)
all_policies.append(policy_logits)
return torch.stack(all_policies, dim=1)
Practical Considerations for Real-World Deployment
Common Challenges
- Reward design: Incorrect rewards can induce unintended behavior (reward hacking)
# Bad example: maximizing score alone may learn unethical strategies
reward = game_score
# Good example: balance multiple objectives
reward = (score_weight * game_score
- safety_weight * violations
+ fairness_weight * equity_metric)
- Sample efficiency: Data collection in real environments is expensive
- Safety: Dangerous actions must be prevented during training
- Evaluation: Simulator performance does not necessarily guarantee real-world performance
Practical Deployment Checklist
| Item | What to Verify |
|---|---|
| Problem definition | Are state/action/reward clearly defined? |
| Simulator | Is the simulator sufficiently realistic? |
| Baseline | Compared with simple rule-based methods? |
| Safety | Are safety constraints guaranteed during training? |
| Evaluation | Tested across diverse scenarios? |
| Deployment | Is real-time inference performance sufficient? |
Key Takeaways
- RL is broadly applied in robotics, autonomous driving, resource management, recommendations, NLP, and games
- Domain-specific techniques like Sim-to-Real, RLHF, and multi-agent are important
- Reward design, safety, and sample efficiency are the core challenges of practical deployment
- Start with simple baselines and gradually increase complexity for practical applications
In the next post, we will provide a comprehensive summary and selection guide for deep RL algorithms as the final installment of this series.