Skip to content
Published on

[深層強化学習] 19. 深層強化学習の実践的な応用事例

Authors

概要

深層強化学習はAtariゲームや囲碁を超えて多様な実践領域で活用されています。この記事では、ロボット制御、自律走行、リソース管理、推薦システム、自然言語処理、ゲームAIなどでの応用事例と実践適用時の考慮事項を整理します。


ロボット制御(Robotics)

シミュレーションから現実へ(Sim-to-Real)

ロボット強化学習の最大の課題はシミュレーションと現実のギャップ(sim-to-real gap)です。シミュレータで学習した方策が実際のロボットでは動作しない可能性があります。

import torch
import torch.nn as nn
import numpy as np

class DomainRandomization:
    """ドメインランダム化:シミュレーションパラメータをランダムに変更して
    現実環境に対する汎化能力を向上"""

    def __init__(self, base_env):
        self.base_env = base_env

    def randomize(self):
        """物理パラメータをランダム化"""
        params = {
            'friction': np.random.uniform(0.5, 1.5),
            'mass_scale': np.random.uniform(0.8, 1.2),
            'gravity': np.random.uniform(9.5, 10.1),
            'actuator_noise': np.random.uniform(0.0, 0.05),
            'sensor_noise': np.random.uniform(0.0, 0.02),
        }
        self.base_env.set_physics_params(params)
        return params

    def step_with_randomization(self, action):
        """ノイズが追加された環境ステップ"""
        noisy_action = action + np.random.normal(
            0, self.current_params['actuator_noise'],
            size=action.shape
        )
        obs, reward, done, info = self.base_env.step(noisy_action)
        noisy_obs = obs + np.random.normal(
            0, self.current_params['sensor_noise'],
            size=obs.shape
        )
        return noisy_obs, reward, done, info

ロボットマニピュレーション学習

class RobotGraspingPolicy(nn.Module):
    """ロボット把持(grasping)方策"""

    def __init__(self, image_size=84, proprioception_size=7, action_size=4):
        super().__init__()
        self.vision = nn.Sequential(
            nn.Conv2d(3, 32, 8, stride=4), nn.ReLU(),
            nn.Conv2d(32, 64, 4, stride=2), nn.ReLU(),
            nn.Conv2d(64, 64, 3, stride=1), nn.ReLU(),
            nn.Flatten(),
        )
        with torch.no_grad():
            dummy = torch.zeros(1, 3, image_size, image_size)
            vision_size = self.vision(dummy).shape[1]
        self.policy = nn.Sequential(
            nn.Linear(vision_size + proprioception_size, 256), nn.ReLU(),
            nn.Linear(256, 128), nn.ReLU(),
        )
        self.mu = nn.Linear(128, action_size)
        self.log_std = nn.Parameter(torch.zeros(action_size))

    def forward(self, image, proprioception):
        vision_features = self.vision(image)
        combined = torch.cat([vision_features, proprioception], dim=-1)
        features = self.policy(combined)
        mu = self.mu(features)
        std = self.log_std.exp()
        return mu, std

核心的な課題

  • 安全性:実際のロボットは危険な行動を試すことができない
  • サンプル効率:実環境でのデータ収集が遅くコストがかかる
  • 報酬設計:複雑な操作タスクの報酬を定義するのが難しい

自律走行(Autonomous Driving)

RLベースの自律走行アーキテクチャ

class DrivingPolicy(nn.Module):
    """自律走行方策ネットワーク"""

    def __init__(self):
        super().__init__()
        self.camera_encoder = nn.Sequential(
            nn.Conv2d(3, 32, 5, stride=2), nn.ReLU(),
            nn.Conv2d(32, 64, 3, stride=2), nn.ReLU(),
            nn.Flatten(),
        )
        self.lidar_encoder = nn.Sequential(
            nn.Linear(360, 128), nn.ReLU(),
            nn.Linear(128, 64), nn.ReLU(),
        )
        self.state_encoder = nn.Sequential(nn.Linear(10, 32), nn.ReLU())
        self.decision = nn.Sequential(
            nn.Linear(256 + 64 + 32, 256), nn.ReLU(),
            nn.Linear(256, 128), nn.ReLU(),
        )
        self.steering = nn.Linear(128, 1)
        self.throttle = nn.Linear(128, 1)
        self.brake = nn.Linear(128, 1)

    def forward(self, camera, lidar, state):
        cam_feat = self.camera_encoder(camera)
        lid_feat = self.lidar_encoder(lidar)
        state_feat = self.state_encoder(state)
        combined = torch.cat([cam_feat, lid_feat, state_feat], dim=-1)
        features = self.decision(combined)
        return (torch.tanh(self.steering(features)),
                torch.sigmoid(self.throttle(features)),
                torch.sigmoid(self.brake(features)))

報酬設計

def driving_reward(state, action, next_state):
    """自律走行報酬関数"""
    reward = 0.0
    progress = next_state['distance_to_goal'] - state['distance_to_goal']
    reward += -progress * 10.0
    lane_deviation = abs(next_state['lane_offset'])
    reward -= lane_deviation * 2.0
    speed = next_state['speed']
    target_speed = next_state['speed_limit']
    speed_diff = abs(speed - target_speed) / target_speed
    reward -= speed_diff * 1.0
    if next_state['collision']:
        reward -= 100.0
    if next_state['traffic_violation']:
        reward -= 50.0
    jerk = abs(action['acceleration_change'])
    reward -= jerk * 0.5
    return reward

リソース管理(Resource Management)

クラウドリソーススケジューリング

class ResourceScheduler(nn.Module):
    """クラウドリソーススケジューリングRLエージェント"""

    def __init__(self, num_servers, num_job_types, num_actions):
        super().__init__()
        state_size = num_servers * 3 + num_job_types * 2 + 4
        self.net = nn.Sequential(
            nn.Linear(state_size, 256), nn.ReLU(),
            nn.Linear(256, 128), nn.ReLU(),
        )
        self.policy = nn.Linear(128, num_actions)
        self.value = nn.Linear(128, 1)

    def forward(self, state):
        features = self.net(state)
        return self.policy(features), self.value(features)

def resource_reward(state, action, next_state, config):
    """リソース管理報酬"""
    reward = 0.0
    reward += next_state['completed_jobs'] * config['throughput_weight']
    reward -= next_state['avg_latency'] * config['latency_weight']
    reward -= next_state['resource_cost'] * config['cost_weight']
    reward -= next_state['sla_violations'] * config['sla_penalty']
    return reward

ネットワークルーティング最適化

class NetworkRouter(nn.Module):
    """ネットワークトラフィックルーティングエージェント"""

    def __init__(self, num_nodes, num_links):
        super().__init__()
        state_size = num_nodes + num_links * 2
        self.encoder = nn.Sequential(
            nn.Linear(state_size, 128), nn.ReLU(),
            nn.Linear(128, 128), nn.ReLU(),
        )
        self.router = nn.Sequential(
            nn.Linear(128, num_links), nn.Softmax(dim=-1),
        )

    def forward(self, network_state):
        features = self.encoder(network_state)
        return self.router(features)

推薦システム(Recommendation Systems)

逐次推薦にRLを適用

推薦システムを強化学習でモデリングすると長期的なユーザー満足度を最適化できます。

class RecommendationAgent(nn.Module):
    """RLベースの推薦エージェント"""

    def __init__(self, num_items, embed_dim=64, hidden_size=128):
        super().__init__()
        self.item_embedding = nn.Embedding(num_items, embed_dim)
        self.history_encoder = nn.GRU(embed_dim, hidden_size, batch_first=True)
        self.user_encoder = nn.Linear(20, 32)
        self.q_network = nn.Sequential(
            nn.Linear(hidden_size + 32, 256), nn.ReLU(),
            nn.Linear(256, num_items),
        )

    def forward(self, user_history, user_features):
        history_embedded = self.item_embedding(user_history)
        _, hidden = self.history_encoder(history_embedded)
        history_feature = hidden.squeeze(0)
        user_feature = self.user_encoder(user_features)
        combined = torch.cat([history_feature, user_feature], dim=-1)
        return self.q_network(combined)

def recommendation_reward(user_response, item_type):
    """推薦報酬:短期 + 長期"""
    immediate = 0.0
    if user_response == 'click':
        immediate += 1.0
    elif user_response == 'purchase':
        immediate += 5.0
    elif user_response == 'skip':
        immediate -= 0.1
    return immediate

自然言語処理(NLP)

RLHF:人間フィードバックによる強化学習

現代の大規模言語モデル(LLM)の核心的な学習技法です:

class RLHFTrainer:
    """RLHF学習の概念的実装"""

    def __init__(self, policy_model, reward_model, ref_model, kl_coef=0.1):
        self.policy = policy_model
        self.reward_model = reward_model
        self.ref_model = ref_model
        self.kl_coef = kl_coef
        self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=1e-5)

    def compute_reward(self, prompt, response):
        """報酬 = 報酬モデルスコア - KLペナルティ"""
        reward_score = self.reward_model.score(prompt, response)
        policy_logprobs = self.policy.log_prob(prompt, response)
        ref_logprobs = self.ref_model.log_prob(prompt, response)
        kl_penalty = policy_logprobs - ref_logprobs
        return reward_score - self.kl_coef * kl_penalty

    def train_step(self, prompts):
        """PPOベースRLHF学習ステップ"""
        responses = self.policy.generate(prompts)
        rewards = [self.compute_reward(p, r) for p, r in zip(prompts, responses)]
        self._ppo_update(prompts, responses, rewards)

テキスト要約にRLを適用

class SummarizationRL:
    """強化学習ベースのテキスト要約"""

    def __init__(self, model, rouge_weight=1.0, length_weight=0.1):
        self.model = model
        self.rouge_weight = rouge_weight
        self.length_weight = length_weight

    def reward_function(self, source, generated_summary, reference):
        """ROUGEスコアベースの報酬"""
        rouge_score = compute_rouge(generated_summary, reference)
        target_ratio = 0.3
        actual_ratio = len(generated_summary) / max(len(source), 1)
        length_penalty = -abs(actual_ratio - target_ratio)
        return (self.rouge_weight * rouge_score
                + self.length_weight * length_penalty)

ゲームAI(Beyond Atari)

OpenAI Five(Dota 2)

OpenAI Fiveは5人1チームのDota 2で世界チャンピオンチームを破りました。主な課題:

  • 部分観測:戦場の霧(fog of war)で敵の位置が分からない
  • 長期計画:45分以上の長い試合
  • チーム協力:5人のエージェントが協力する必要がある
  • 巨大な行動空間:数千個の可能な行動

マルチエージェント協力学習

class MultiAgentPolicy(nn.Module):
    """マルチエージェント協力方策(パラメータ共有)"""

    def __init__(self, obs_size, act_size, num_agents, comm_size=32):
        super().__init__()
        self.num_agents = num_agents
        self.obs_encoder = nn.Sequential(nn.Linear(obs_size, 128), nn.ReLU())
        self.comm_encoder = nn.Sequential(nn.Linear(128, comm_size), nn.ReLU())
        self.message_integrator = nn.Sequential(
            nn.Linear(comm_size * (num_agents - 1), 64), nn.ReLU(),
        )
        self.policy = nn.Sequential(
            nn.Linear(128 + 64, 128), nn.ReLU(),
            nn.Linear(128, act_size),
        )

    def forward(self, observations):
        batch_size = observations.shape[0]
        obs_flat = observations.view(-1, observations.shape[-1])
        encoded = self.obs_encoder(obs_flat)
        encoded = encoded.view(batch_size, self.num_agents, -1)
        messages = self.comm_encoder(
            encoded.view(-1, encoded.shape[-1])
        ).view(batch_size, self.num_agents, -1)

        all_policies = []
        for i in range(self.num_agents):
            other_msgs = torch.cat(
                [messages[:, j] for j in range(self.num_agents) if j != i],
                dim=-1
            )
            integrated = self.message_integrator(other_msgs)
            combined = torch.cat([encoded[:, i], integrated], dim=-1)
            all_policies.append(self.policy(combined))
        return torch.stack(all_policies, dim=1)

実践適用時の考慮事項

共通の課題

  1. 報酬設計:間違った報酬は意図しない行動を誘発する(reward hacking)
# 悪い例:スコアのみ最大化すると非倫理的な戦略を学習する可能性
reward = game_score

# 良い例:多様な目標をバランスよく反映
reward = (score_weight * game_score
          - safety_weight * violations
          + fairness_weight * equity_metric)
  1. サンプル効率:実環境でのデータ収集にコストがかかる
  2. 安全性:学習中の危険な行動を防止する必要がある
  3. 評価:シミュレータの性能が必ずしも実践性能を保証しない

実践適用チェックリスト

項目確認内容
問題定義状態/行動/報酬が明確に定義されているか
シミュレータ十分に現実的なシミュレータがあるか
ベースライン簡単なルールベース手法と比較したか
安全性学習中の安全制約が保証されるか
評価多様なシナリオでテストしたか
デプロイリアルタイム推論性能は十分か

要点まとめ

  • 強化学習はロボット制御、自律走行、リソース管理、推薦、NLP、ゲームなど幅広く活用される
  • Sim-to-Real、RLHF、マルチエージェントなどドメイン別の特化技法が重要
  • 報酬設計、安全性、サンプル効率が実践適用の核心課題
  • 実践適用時はシンプルなベースラインから始めて段階的に複雑度を高めるのが良い

次の記事では、このシリーズの最後として深層強化学習アルゴリズムの総整理と選択ガイドを扱います。