- Authors

- Name
- Youngju Kim
- @fjvbn20031
Overview
Deep reinforcement learning is being applied across diverse real-world domains beyond Atari games and Go. This post covers application examples and practical considerations in robot control, autonomous driving, resource management, recommendation systems, natural language processing, and game AI.
Robot Control (Robotics)
Simulation to Reality (Sim-to-Real)
The biggest challenge in robot RL is the sim-to-real gap. A policy trained in simulation may not work on a real robot.
import torch
import torch.nn as nn
import numpy as np
class DomainRandomization:
"""Domain randomization: randomly vary simulation parameters
to improve generalization to real environments"""
def __init__(self, base_env):
self.base_env = base_env
def randomize(self):
"""Randomize physics parameters"""
params = {
'friction': np.random.uniform(0.5, 1.5),
'mass_scale': np.random.uniform(0.8, 1.2),
'gravity': np.random.uniform(9.5, 10.1),
'actuator_noise': np.random.uniform(0.0, 0.05),
'sensor_noise': np.random.uniform(0.0, 0.02),
}
self.base_env.set_physics_params(params)
return params
def step_with_randomization(self, action):
"""Environment step with added noise"""
noisy_action = action + np.random.normal(
0, self.current_params['actuator_noise'],
size=action.shape
)
obs, reward, done, info = self.base_env.step(noisy_action)
noisy_obs = obs + np.random.normal(
0, self.current_params['sensor_noise'],
size=obs.shape
)
return noisy_obs, reward, done, info
Robot Manipulation Learning
class RobotGraspingPolicy(nn.Module):
"""Robot grasping policy"""
def __init__(self, image_size=84, proprioception_size=7,
action_size=4):
super().__init__()
self.vision = nn.Sequential(
nn.Conv2d(3, 32, 8, stride=4), nn.ReLU(),
nn.Conv2d(32, 64, 4, stride=2), nn.ReLU(),
nn.Conv2d(64, 64, 3, stride=1), nn.ReLU(),
nn.Flatten(),
)
with torch.no_grad():
dummy = torch.zeros(1, 3, image_size, image_size)
vision_size = self.vision(dummy).shape[1]
self.policy = nn.Sequential(
nn.Linear(vision_size + proprioception_size, 256), nn.ReLU(),
nn.Linear(256, 128), nn.ReLU(),
)
self.mu = nn.Linear(128, action_size)
self.log_std = nn.Parameter(torch.zeros(action_size))
def forward(self, image, proprioception):
vision_features = self.vision(image)
combined = torch.cat([vision_features, proprioception], dim=-1)
features = self.policy(combined)
mu = self.mu(features)
std = self.log_std.exp()
return mu, std
Key Challenges
- Safety: Real robots cannot attempt dangerous actions
- Sample efficiency: Data collection in real environments is slow and expensive
- Reward design: Defining rewards for complex manipulation tasks is difficult
Autonomous Driving
RL-Based Autonomous Driving Architecture
class DrivingPolicy(nn.Module):
"""Autonomous driving policy network"""
def __init__(self):
super().__init__()
self.camera_encoder = nn.Sequential(
nn.Conv2d(3, 32, 5, stride=2), nn.ReLU(),
nn.Conv2d(32, 64, 3, stride=2), nn.ReLU(),
nn.Flatten(),
)
self.lidar_encoder = nn.Sequential(
nn.Linear(360, 128), nn.ReLU(),
nn.Linear(128, 64), nn.ReLU(),
)
self.state_encoder = nn.Sequential(
nn.Linear(10, 32), nn.ReLU(),
)
self.decision = nn.Sequential(
nn.Linear(256 + 64 + 32, 256), nn.ReLU(),
nn.Linear(256, 128), nn.ReLU(),
)
self.steering = nn.Linear(128, 1)
self.throttle = nn.Linear(128, 1)
self.brake = nn.Linear(128, 1)
def forward(self, camera, lidar, state):
cam_feat = self.camera_encoder(camera)
lid_feat = self.lidar_encoder(lidar)
state_feat = self.state_encoder(state)
combined = torch.cat([cam_feat, lid_feat, state_feat], dim=-1)
features = self.decision(combined)
steering = torch.tanh(self.steering(features))
throttle = torch.sigmoid(self.throttle(features))
brake = torch.sigmoid(self.brake(features))
return steering, throttle, brake
Reward Design
def driving_reward(state, action, next_state):
"""Autonomous driving reward function"""
reward = 0.0
progress = next_state['distance_to_goal'] - state['distance_to_goal']
reward += -progress * 10.0
lane_deviation = abs(next_state['lane_offset'])
reward -= lane_deviation * 2.0
speed = next_state['speed']
target_speed = next_state['speed_limit']
speed_diff = abs(speed - target_speed) / target_speed
reward -= speed_diff * 1.0
if next_state['collision']:
reward -= 100.0
if next_state['traffic_violation']:
reward -= 50.0
jerk = abs(action['acceleration_change'])
reward -= jerk * 0.5
return reward
Resource Management
Cloud Resource Scheduling
class ResourceScheduler(nn.Module):
"""Cloud resource scheduling RL agent"""
def __init__(self, num_servers, num_job_types, num_actions):
super().__init__()
state_size = num_servers * 3 + num_job_types * 2 + 4
self.net = nn.Sequential(
nn.Linear(state_size, 256), nn.ReLU(),
nn.Linear(256, 128), nn.ReLU(),
)
self.policy = nn.Linear(128, num_actions)
self.value = nn.Linear(128, 1)
def forward(self, state):
features = self.net(state)
return self.policy(features), self.value(features)
def resource_reward(state, action, next_state, config):
"""Resource management reward"""
reward = 0.0
reward += next_state['completed_jobs'] * config['throughput_weight']
reward -= next_state['avg_latency'] * config['latency_weight']
reward -= next_state['resource_cost'] * config['cost_weight']
reward -= next_state['sla_violations'] * config['sla_penalty']
return reward
Network Routing Optimization
class NetworkRouter(nn.Module):
"""Network traffic routing agent"""
def __init__(self, num_nodes, num_links):
super().__init__()
state_size = num_nodes + num_links * 2
self.encoder = nn.Sequential(
nn.Linear(state_size, 128), nn.ReLU(),
nn.Linear(128, 128), nn.ReLU(),
)
self.router = nn.Sequential(
nn.Linear(128, num_links),
nn.Softmax(dim=-1),
)
def forward(self, network_state):
features = self.encoder(network_state)
routing_weights = self.router(features)
return routing_weights
Recommendation Systems
Applying RL to Sequential Recommendation
Modeling recommendation systems with RL enables optimization of long-term user satisfaction.
class RecommendationAgent(nn.Module):
"""RL-based recommendation agent"""
def __init__(self, num_items, embed_dim=64, hidden_size=128):
super().__init__()
self.item_embedding = nn.Embedding(num_items, embed_dim)
self.history_encoder = nn.GRU(embed_dim, hidden_size, batch_first=True)
self.user_encoder = nn.Linear(20, 32)
self.q_network = nn.Sequential(
nn.Linear(hidden_size + 32, 256), nn.ReLU(),
nn.Linear(256, num_items),
)
def forward(self, user_history, user_features):
history_embedded = self.item_embedding(user_history)
_, hidden = self.history_encoder(history_embedded)
history_feature = hidden.squeeze(0)
user_feature = self.user_encoder(user_features)
combined = torch.cat([history_feature, user_feature], dim=-1)
q_values = self.q_network(combined)
return q_values
def recommendation_reward(user_response, item_type):
"""Recommendation reward: short-term + long-term"""
immediate = 0.0
if user_response == 'click':
immediate += 1.0
elif user_response == 'purchase':
immediate += 5.0
elif user_response == 'skip':
immediate -= 0.1
return immediate
Natural Language Processing (NLP)
RLHF: Reinforcement Learning from Human Feedback
A core training technique for modern large language models (LLMs):
class RLHFTrainer:
"""Conceptual implementation of RLHF training"""
def __init__(self, policy_model, reward_model, ref_model, kl_coef=0.1):
self.policy = policy_model
self.reward_model = reward_model
self.ref_model = ref_model
self.kl_coef = kl_coef
self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=1e-5)
def compute_reward(self, prompt, response):
"""Reward = reward model score - KL penalty"""
reward_score = self.reward_model.score(prompt, response)
policy_logprobs = self.policy.log_prob(prompt, response)
ref_logprobs = self.ref_model.log_prob(prompt, response)
kl_penalty = policy_logprobs - ref_logprobs
return reward_score - self.kl_coef * kl_penalty
def train_step(self, prompts):
"""PPO-based RLHF training step"""
responses = self.policy.generate(prompts)
rewards = [self.compute_reward(p, r) for p, r in zip(prompts, responses)]
self._ppo_update(prompts, responses, rewards)
Applying RL to Text Summarization
class SummarizationRL:
"""RL-based text summarization"""
def __init__(self, model, rouge_weight=1.0, length_weight=0.1):
self.model = model
self.rouge_weight = rouge_weight
self.length_weight = length_weight
def reward_function(self, source, generated_summary, reference):
"""ROUGE score-based reward"""
rouge_score = compute_rouge(generated_summary, reference)
target_ratio = 0.3
actual_ratio = len(generated_summary) / max(len(source), 1)
length_penalty = -abs(actual_ratio - target_ratio)
reward = (self.rouge_weight * rouge_score
+ self.length_weight * length_penalty)
return reward
Game AI (Beyond Atari)
OpenAI Five (Dota 2)
OpenAI Five defeated the world champion team in Dota 2, a 5v5 game. Key challenges:
- Partial observability: Fog of war hides enemy positions
- Long-term planning: Matches lasting over 45 minutes
- Team cooperation: 5 agents must cooperate
- Enormous action space: Thousands of possible actions
Multi-Agent Cooperative Learning
class MultiAgentPolicy(nn.Module):
"""Multi-agent cooperative policy (parameter sharing)"""
def __init__(self, obs_size, act_size, num_agents, comm_size=32):
super().__init__()
self.num_agents = num_agents
self.obs_encoder = nn.Sequential(nn.Linear(obs_size, 128), nn.ReLU())
self.comm_encoder = nn.Sequential(nn.Linear(128, comm_size), nn.ReLU())
self.message_integrator = nn.Sequential(
nn.Linear(comm_size * (num_agents - 1), 64), nn.ReLU(),
)
self.policy = nn.Sequential(
nn.Linear(128 + 64, 128), nn.ReLU(),
nn.Linear(128, act_size),
)
def forward(self, observations):
batch_size = observations.shape[0]
obs_flat = observations.view(-1, observations.shape[-1])
encoded = self.obs_encoder(obs_flat)
encoded = encoded.view(batch_size, self.num_agents, -1)
messages = self.comm_encoder(
encoded.view(-1, encoded.shape[-1])
).view(batch_size, self.num_agents, -1)
all_policies = []
for i in range(self.num_agents):
other_msgs = torch.cat(
[messages[:, j] for j in range(self.num_agents) if j != i],
dim=-1
)
integrated = self.message_integrator(other_msgs)
combined = torch.cat([encoded[:, i], integrated], dim=-1)
policy_logits = self.policy(combined)
all_policies.append(policy_logits)
return torch.stack(all_policies, dim=1)
Practical Considerations for Real-World Deployment
Common Challenges
- Reward design: Incorrect rewards can induce unintended behavior (reward hacking)
# Bad example: maximizing score alone may learn unethical strategies
reward = game_score
# Good example: balance multiple objectives
reward = (score_weight * game_score
- safety_weight * violations
+ fairness_weight * equity_metric)
- Sample efficiency: Data collection in real environments is expensive
- Safety: Dangerous actions must be prevented during training
- Evaluation: Simulator performance does not necessarily guarantee real-world performance
Practical Deployment Checklist
| Item | What to Verify |
|---|---|
| Problem definition | Are state/action/reward clearly defined? |
| Simulator | Is the simulator sufficiently realistic? |
| Baseline | Compared with simple rule-based methods? |
| Safety | Are safety constraints guaranteed during training? |
| Evaluation | Tested across diverse scenarios? |
| Deployment | Is real-time inference performance sufficient? |
Key Takeaways
- RL is broadly applied in robotics, autonomous driving, resource management, recommendations, NLP, and games
- Domain-specific techniques like Sim-to-Real, RLHF, and multi-agent are important
- Reward design, safety, and sample efficiency are the core challenges of practical deployment
- Start with simple baselines and gradually increase complexity for practical applications
In the next post, we will provide a comprehensive summary and selection guide for deep RL algorithms as the final installment of this series.