Skip to content
Published on

[Deep RL] 19. Practical Applications of Deep Reinforcement Learning

Authors

Overview

Deep reinforcement learning is being applied across diverse real-world domains beyond Atari games and Go. This post covers application examples and practical considerations in robot control, autonomous driving, resource management, recommendation systems, natural language processing, and game AI.


Robot Control (Robotics)

Simulation to Reality (Sim-to-Real)

The biggest challenge in robot RL is the sim-to-real gap. A policy trained in simulation may not work on a real robot.

import torch
import torch.nn as nn
import numpy as np

class DomainRandomization:
    """Domain randomization: randomly vary simulation parameters
    to improve generalization to real environments"""

    def __init__(self, base_env):
        self.base_env = base_env

    def randomize(self):
        """Randomize physics parameters"""
        params = {
            'friction': np.random.uniform(0.5, 1.5),
            'mass_scale': np.random.uniform(0.8, 1.2),
            'gravity': np.random.uniform(9.5, 10.1),
            'actuator_noise': np.random.uniform(0.0, 0.05),
            'sensor_noise': np.random.uniform(0.0, 0.02),
        }
        self.base_env.set_physics_params(params)
        return params

    def step_with_randomization(self, action):
        """Environment step with added noise"""
        noisy_action = action + np.random.normal(
            0, self.current_params['actuator_noise'],
            size=action.shape
        )
        obs, reward, done, info = self.base_env.step(noisy_action)
        noisy_obs = obs + np.random.normal(
            0, self.current_params['sensor_noise'],
            size=obs.shape
        )
        return noisy_obs, reward, done, info

Robot Manipulation Learning

class RobotGraspingPolicy(nn.Module):
    """Robot grasping policy"""

    def __init__(self, image_size=84, proprioception_size=7,
                 action_size=4):
        super().__init__()
        self.vision = nn.Sequential(
            nn.Conv2d(3, 32, 8, stride=4), nn.ReLU(),
            nn.Conv2d(32, 64, 4, stride=2), nn.ReLU(),
            nn.Conv2d(64, 64, 3, stride=1), nn.ReLU(),
            nn.Flatten(),
        )
        with torch.no_grad():
            dummy = torch.zeros(1, 3, image_size, image_size)
            vision_size = self.vision(dummy).shape[1]
        self.policy = nn.Sequential(
            nn.Linear(vision_size + proprioception_size, 256), nn.ReLU(),
            nn.Linear(256, 128), nn.ReLU(),
        )
        self.mu = nn.Linear(128, action_size)
        self.log_std = nn.Parameter(torch.zeros(action_size))

    def forward(self, image, proprioception):
        vision_features = self.vision(image)
        combined = torch.cat([vision_features, proprioception], dim=-1)
        features = self.policy(combined)
        mu = self.mu(features)
        std = self.log_std.exp()
        return mu, std

Key Challenges

  • Safety: Real robots cannot attempt dangerous actions
  • Sample efficiency: Data collection in real environments is slow and expensive
  • Reward design: Defining rewards for complex manipulation tasks is difficult

Autonomous Driving

RL-Based Autonomous Driving Architecture

class DrivingPolicy(nn.Module):
    """Autonomous driving policy network"""

    def __init__(self):
        super().__init__()
        self.camera_encoder = nn.Sequential(
            nn.Conv2d(3, 32, 5, stride=2), nn.ReLU(),
            nn.Conv2d(32, 64, 3, stride=2), nn.ReLU(),
            nn.Flatten(),
        )
        self.lidar_encoder = nn.Sequential(
            nn.Linear(360, 128), nn.ReLU(),
            nn.Linear(128, 64), nn.ReLU(),
        )
        self.state_encoder = nn.Sequential(
            nn.Linear(10, 32), nn.ReLU(),
        )
        self.decision = nn.Sequential(
            nn.Linear(256 + 64 + 32, 256), nn.ReLU(),
            nn.Linear(256, 128), nn.ReLU(),
        )
        self.steering = nn.Linear(128, 1)
        self.throttle = nn.Linear(128, 1)
        self.brake = nn.Linear(128, 1)

    def forward(self, camera, lidar, state):
        cam_feat = self.camera_encoder(camera)
        lid_feat = self.lidar_encoder(lidar)
        state_feat = self.state_encoder(state)
        combined = torch.cat([cam_feat, lid_feat, state_feat], dim=-1)
        features = self.decision(combined)
        steering = torch.tanh(self.steering(features))
        throttle = torch.sigmoid(self.throttle(features))
        brake = torch.sigmoid(self.brake(features))
        return steering, throttle, brake

Reward Design

def driving_reward(state, action, next_state):
    """Autonomous driving reward function"""
    reward = 0.0
    progress = next_state['distance_to_goal'] - state['distance_to_goal']
    reward += -progress * 10.0
    lane_deviation = abs(next_state['lane_offset'])
    reward -= lane_deviation * 2.0
    speed = next_state['speed']
    target_speed = next_state['speed_limit']
    speed_diff = abs(speed - target_speed) / target_speed
    reward -= speed_diff * 1.0
    if next_state['collision']:
        reward -= 100.0
    if next_state['traffic_violation']:
        reward -= 50.0
    jerk = abs(action['acceleration_change'])
    reward -= jerk * 0.5
    return reward

Resource Management

Cloud Resource Scheduling

class ResourceScheduler(nn.Module):
    """Cloud resource scheduling RL agent"""

    def __init__(self, num_servers, num_job_types, num_actions):
        super().__init__()
        state_size = num_servers * 3 + num_job_types * 2 + 4
        self.net = nn.Sequential(
            nn.Linear(state_size, 256), nn.ReLU(),
            nn.Linear(256, 128), nn.ReLU(),
        )
        self.policy = nn.Linear(128, num_actions)
        self.value = nn.Linear(128, 1)

    def forward(self, state):
        features = self.net(state)
        return self.policy(features), self.value(features)

def resource_reward(state, action, next_state, config):
    """Resource management reward"""
    reward = 0.0
    reward += next_state['completed_jobs'] * config['throughput_weight']
    reward -= next_state['avg_latency'] * config['latency_weight']
    reward -= next_state['resource_cost'] * config['cost_weight']
    reward -= next_state['sla_violations'] * config['sla_penalty']
    return reward

Network Routing Optimization

class NetworkRouter(nn.Module):
    """Network traffic routing agent"""

    def __init__(self, num_nodes, num_links):
        super().__init__()
        state_size = num_nodes + num_links * 2
        self.encoder = nn.Sequential(
            nn.Linear(state_size, 128), nn.ReLU(),
            nn.Linear(128, 128), nn.ReLU(),
        )
        self.router = nn.Sequential(
            nn.Linear(128, num_links),
            nn.Softmax(dim=-1),
        )

    def forward(self, network_state):
        features = self.encoder(network_state)
        routing_weights = self.router(features)
        return routing_weights

Recommendation Systems

Applying RL to Sequential Recommendation

Modeling recommendation systems with RL enables optimization of long-term user satisfaction.

class RecommendationAgent(nn.Module):
    """RL-based recommendation agent"""

    def __init__(self, num_items, embed_dim=64, hidden_size=128):
        super().__init__()
        self.item_embedding = nn.Embedding(num_items, embed_dim)
        self.history_encoder = nn.GRU(embed_dim, hidden_size, batch_first=True)
        self.user_encoder = nn.Linear(20, 32)
        self.q_network = nn.Sequential(
            nn.Linear(hidden_size + 32, 256), nn.ReLU(),
            nn.Linear(256, num_items),
        )

    def forward(self, user_history, user_features):
        history_embedded = self.item_embedding(user_history)
        _, hidden = self.history_encoder(history_embedded)
        history_feature = hidden.squeeze(0)
        user_feature = self.user_encoder(user_features)
        combined = torch.cat([history_feature, user_feature], dim=-1)
        q_values = self.q_network(combined)
        return q_values

def recommendation_reward(user_response, item_type):
    """Recommendation reward: short-term + long-term"""
    immediate = 0.0
    if user_response == 'click':
        immediate += 1.0
    elif user_response == 'purchase':
        immediate += 5.0
    elif user_response == 'skip':
        immediate -= 0.1
    return immediate

Natural Language Processing (NLP)

RLHF: Reinforcement Learning from Human Feedback

A core training technique for modern large language models (LLMs):

class RLHFTrainer:
    """Conceptual implementation of RLHF training"""

    def __init__(self, policy_model, reward_model, ref_model, kl_coef=0.1):
        self.policy = policy_model
        self.reward_model = reward_model
        self.ref_model = ref_model
        self.kl_coef = kl_coef
        self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=1e-5)

    def compute_reward(self, prompt, response):
        """Reward = reward model score - KL penalty"""
        reward_score = self.reward_model.score(prompt, response)
        policy_logprobs = self.policy.log_prob(prompt, response)
        ref_logprobs = self.ref_model.log_prob(prompt, response)
        kl_penalty = policy_logprobs - ref_logprobs
        return reward_score - self.kl_coef * kl_penalty

    def train_step(self, prompts):
        """PPO-based RLHF training step"""
        responses = self.policy.generate(prompts)
        rewards = [self.compute_reward(p, r) for p, r in zip(prompts, responses)]
        self._ppo_update(prompts, responses, rewards)

Applying RL to Text Summarization

class SummarizationRL:
    """RL-based text summarization"""

    def __init__(self, model, rouge_weight=1.0, length_weight=0.1):
        self.model = model
        self.rouge_weight = rouge_weight
        self.length_weight = length_weight

    def reward_function(self, source, generated_summary, reference):
        """ROUGE score-based reward"""
        rouge_score = compute_rouge(generated_summary, reference)
        target_ratio = 0.3
        actual_ratio = len(generated_summary) / max(len(source), 1)
        length_penalty = -abs(actual_ratio - target_ratio)
        reward = (self.rouge_weight * rouge_score
                  + self.length_weight * length_penalty)
        return reward

Game AI (Beyond Atari)

OpenAI Five (Dota 2)

OpenAI Five defeated the world champion team in Dota 2, a 5v5 game. Key challenges:

  • Partial observability: Fog of war hides enemy positions
  • Long-term planning: Matches lasting over 45 minutes
  • Team cooperation: 5 agents must cooperate
  • Enormous action space: Thousands of possible actions

Multi-Agent Cooperative Learning

class MultiAgentPolicy(nn.Module):
    """Multi-agent cooperative policy (parameter sharing)"""

    def __init__(self, obs_size, act_size, num_agents, comm_size=32):
        super().__init__()
        self.num_agents = num_agents
        self.obs_encoder = nn.Sequential(nn.Linear(obs_size, 128), nn.ReLU())
        self.comm_encoder = nn.Sequential(nn.Linear(128, comm_size), nn.ReLU())
        self.message_integrator = nn.Sequential(
            nn.Linear(comm_size * (num_agents - 1), 64), nn.ReLU(),
        )
        self.policy = nn.Sequential(
            nn.Linear(128 + 64, 128), nn.ReLU(),
            nn.Linear(128, act_size),
        )

    def forward(self, observations):
        batch_size = observations.shape[0]
        obs_flat = observations.view(-1, observations.shape[-1])
        encoded = self.obs_encoder(obs_flat)
        encoded = encoded.view(batch_size, self.num_agents, -1)
        messages = self.comm_encoder(
            encoded.view(-1, encoded.shape[-1])
        ).view(batch_size, self.num_agents, -1)

        all_policies = []
        for i in range(self.num_agents):
            other_msgs = torch.cat(
                [messages[:, j] for j in range(self.num_agents) if j != i],
                dim=-1
            )
            integrated = self.message_integrator(other_msgs)
            combined = torch.cat([encoded[:, i], integrated], dim=-1)
            policy_logits = self.policy(combined)
            all_policies.append(policy_logits)
        return torch.stack(all_policies, dim=1)

Practical Considerations for Real-World Deployment

Common Challenges

  1. Reward design: Incorrect rewards can induce unintended behavior (reward hacking)
# Bad example: maximizing score alone may learn unethical strategies
reward = game_score

# Good example: balance multiple objectives
reward = (score_weight * game_score
          - safety_weight * violations
          + fairness_weight * equity_metric)
  1. Sample efficiency: Data collection in real environments is expensive
  2. Safety: Dangerous actions must be prevented during training
  3. Evaluation: Simulator performance does not necessarily guarantee real-world performance

Practical Deployment Checklist

ItemWhat to Verify
Problem definitionAre state/action/reward clearly defined?
SimulatorIs the simulator sufficiently realistic?
BaselineCompared with simple rule-based methods?
SafetyAre safety constraints guaranteed during training?
EvaluationTested across diverse scenarios?
DeploymentIs real-time inference performance sufficient?

Key Takeaways

  • RL is broadly applied in robotics, autonomous driving, resource management, recommendations, NLP, and games
  • Domain-specific techniques like Sim-to-Real, RLHF, and multi-agent are important
  • Reward design, safety, and sample efficiency are the core challenges of practical deployment
  • Start with simple baselines and gradually increase complexity for practical applications

In the next post, we will provide a comprehensive summary and selection guide for deep RL algorithms as the final installment of this series.