Skip to content
Published on

[深層強化学習] 16. Black-Box最適化:進化戦略と遺伝的アルゴリズム

Authors

概要

これまで扱ってきた強化学習アルゴリズムはすべて勾配ベースでした。方策ネットワークのパラメータを逆伝播(backpropagation)で更新していました。しかし、報酬関数が微分不可能だったり、環境がブラックボックスだったりする場合はどうすればよいでしょうか?

Black-Box最適化は目的関数の内部構造を知らないまま、入出力関係のみで最適化する方法です。進化戦略(Evolution Strategies, ES)と遺伝的アルゴリズム(Genetic Algorithms, GA)が代表的です。


Black-Box手法の特徴

勾配ベース vs Black-Box

項目勾配ベースBlack-Box
微分の必要性目的関数が微分可能である必要不要
情報の活用勾配(方向 + 大きさ)関数値のみ使用
スケーラビリティ数百万パラメータ可能パラメータ数に敏感
並列化制限的(同期が必要)自然な並列化
探索の性質ローカル探索グローバル探索可能

長所

  • 報酬関数が微分不可能でも適用可能
  • 環境の内部構造を知る必要がない
  • 大規模並列化が自然
  • 局所最適に陥りにくい

短所

  • サンプル効率が低い
  • 高次元パラメータ空間で遅い
  • 勾配情報を活用できない

進化戦略(Evolution Strategies)

基本原理

ESはパラメータ空間で**ランダムな摂動(perturbation)**を作り、良い性能を示した摂動方向にパラメータを更新します。驚くべきことに、このプロセスが勾配推定と数学的に等価であることが明らかになっています。

ESの数学的直観

パラメータthetaにガウシアンノイズepsilonを加えたtheta + sigma * epsilonでの期待報酬を最大化します。この報酬の期待値に対するthetaの勾配をモンテカルロ推定すると、結局報酬で重み付けされたノイズ方向の平均になります。

CartPoleでのES実装

import torch
import torch.nn as nn
import numpy as np
import gymnasium as gym

class ESPolicy(nn.Module):
    """ES用方策ネットワーク"""
    def __init__(self, obs_size, act_size):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_size, 64),
            nn.Tanh(),
            nn.Linear(64, act_size),
        )

    def forward(self, obs):
        return self.net(obs)

    def get_action(self, obs):
        with torch.no_grad():
            logits = self.forward(torch.FloatTensor(obs))
            return logits.argmax().item()

def evaluate_policy(env, policy, num_episodes=5):
    """方策の平均報酬を計算"""
    total_reward = 0.0
    for _ in range(num_episodes):
        obs, _ = env.reset()
        done = False
        while not done:
            action = policy.get_action(obs)
            obs, reward, terminated, truncated, _ = env.step(action)
            total_reward += reward
            done = terminated or truncated
    return total_reward / num_episodes

def train_es_cartpole(pop_size=50, num_generations=100,
                      sigma=0.1, lr=0.01):
    """CartPoleでのES学習"""
    env = gym.make('CartPole-v1')
    obs_size = env.observation_space.shape[0]
    act_size = env.action_space.n

    # 基本方策
    policy = ESPolicy(obs_size, act_size)
    base_params = nn.utils.parameters_to_vector(
        policy.parameters()
    ).detach()

    for gen in range(num_generations):
        # 1. ランダム摂動の生成
        noise = torch.randn(pop_size, len(base_params))

        # 2. 各摂動に対して報酬を評価
        rewards = []
        for i in range(pop_size):
            # 双方向摂動(ミラーリング)
            for sign in [1, -1]:
                perturbed = base_params + sign * sigma * noise[i]
                nn.utils.vector_to_parameters(perturbed, policy.parameters())
                reward = evaluate_policy(env, policy, num_episodes=3)
                rewards.append((reward, sign, i))

        # 3. 報酬で重み付けされた更新方向の計算
        rewards_array = np.array([r[0] for r in rewards])

        # 報酬正規化(ランクベース)
        ranked = np.argsort(np.argsort(-rewards_array)).astype(np.float32)
        ranked = (ranked - ranked.mean()) / (ranked.std() + 1e-8)

        # 勾配推定
        grad = torch.zeros_like(base_params)
        for idx, (_, sign, noise_idx) in enumerate(rewards):
            grad += ranked[idx] * sign * noise[noise_idx]
        grad /= (2 * pop_size * sigma)

        # 4. パラメータ更新
        base_params += lr * grad

        # 性能確認
        nn.utils.vector_to_parameters(base_params, policy.parameters())
        avg_reward = evaluate_policy(env, policy, num_episodes=10)
        if gen % 10 == 0:
            print(f"世代 {gen}: 平均報酬 = {avg_reward:.1f}")

        if avg_reward >= 495:
            print(f"世代 {gen}で解決!")
            break

    return policy

HalfCheetahでのES

連続行動空間でのESはより多くの集団(population)と世代(generation)が必要です:

class ESContinuousPolicy(nn.Module):
    def __init__(self, obs_size, act_size):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_size, 256),
            nn.Tanh(),
            nn.Linear(256, 256),
            nn.Tanh(),
            nn.Linear(256, act_size),
            nn.Tanh(),
        )

    def forward(self, obs):
        return self.net(obs)

    def get_action(self, obs):
        with torch.no_grad():
            return self.forward(torch.FloatTensor(obs)).numpy()

def train_es_continuous(env_name='HalfCheetah-v4', pop_size=200,
                        num_generations=500, sigma=0.02, lr=0.01):
    """連続環境でのES学習(並列化可能)"""
    env = gym.make(env_name)
    obs_size = env.observation_space.shape[0]
    act_size = env.action_space.shape[0]

    policy = ESContinuousPolicy(obs_size, act_size)
    base_params = nn.utils.parameters_to_vector(
        policy.parameters()
    ).detach()

    # Adamモメンタム追跡
    m = torch.zeros_like(base_params)  # 1次モーメント
    v = torch.zeros_like(base_params)  # 2次モーメント
    beta1, beta2 = 0.9, 0.999

    for gen in range(num_generations):
        noise = torch.randn(pop_size, len(base_params))
        rewards_pos = np.zeros(pop_size)
        rewards_neg = np.zeros(pop_size)

        # 並列化可能な評価ループ
        for i in range(pop_size):
            # 双方向摂動の評価
            params_pos = base_params + sigma * noise[i]
            nn.utils.vector_to_parameters(params_pos, policy.parameters())
            rewards_pos[i] = evaluate_policy(env, policy, num_episodes=1)

            params_neg = base_params - sigma * noise[i]
            nn.utils.vector_to_parameters(params_neg, policy.parameters())
            rewards_neg[i] = evaluate_policy(env, policy, num_episodes=1)

        # 報酬差から勾配推定
        all_rewards = np.concatenate([rewards_pos, rewards_neg])
        reward_mean = all_rewards.mean()
        reward_std = all_rewards.std() + 1e-8

        grad = torch.zeros_like(base_params)
        for i in range(pop_size):
            normalized = (rewards_pos[i] - rewards_neg[i]) / reward_std
            grad += normalized * noise[i]
        grad /= (2 * pop_size * sigma)

        # Adam更新
        m = beta1 * m + (1 - beta1) * grad
        v = beta2 * v + (1 - beta2) * grad ** 2
        m_hat = m / (1 - beta1 ** (gen + 1))
        v_hat = v / (1 - beta2 ** (gen + 1))

        base_params += lr * m_hat / (torch.sqrt(v_hat) + 1e-8)

        nn.utils.vector_to_parameters(base_params, policy.parameters())
        if gen % 10 == 0:
            avg = evaluate_policy(env, policy, num_episodes=5)
            print(f"世代 {gen}: 平均報酬 = {avg:.1f}")

    return policy

遺伝的アルゴリズム(Genetic Algorithm)

GAの基本原理

遺伝的アルゴリズムは生物学的進化を模倣します:

  1. 集団(Population):複数の方策(個体)を維持
  2. 適応度評価(Fitness):各個体の報酬を測定
  3. 選択(Selection):適応度が高い個体を選択
  4. 交叉(Crossover):選択された個体のパラメータを混合
  5. 突然変異(Mutation):少量のランダムな変化を追加

CartPoleでのGA実装

import copy

def train_ga_cartpole(pop_size=100, num_generations=50,
                      elite_ratio=0.1, mutation_rate=0.1,
                      mutation_sigma=0.1):
    """遺伝的アルゴリズムでCartPoleを学習"""
    env = gym.make('CartPole-v1')
    obs_size = env.observation_space.shape[0]
    act_size = env.action_space.n

    # 初期集団の生成
    population = [ESPolicy(obs_size, act_size) for _ in range(pop_size)]
    num_elite = max(int(pop_size * elite_ratio), 1)

    for gen in range(num_generations):
        # 1. 適応度評価
        fitness = []
        for individual in population:
            reward = evaluate_policy(env, individual, num_episodes=5)
            fitness.append(reward)

        # ソート(適応度の降順)
        sorted_indices = np.argsort(fitness)[::-1]
        best_fitness = fitness[sorted_indices[0]]

        if gen % 5 == 0:
            print(f"世代 {gen}: 最高適応度 = {best_fitness:.1f}, "
                  f"平均 = {np.mean(fitness):.1f}")

        if best_fitness >= 495:
            print(f"世代 {gen}で解決!")
            return population[sorted_indices[0]]

        # 2. エリート選択
        elites = [copy.deepcopy(population[i]) for i in sorted_indices[:num_elite]]

        # 3. 次世代の生成
        new_population = list(elites)  # エリートはそのまま維持

        while len(new_population) < pop_size:
            # トーナメント選択
            parent1 = tournament_select(population, fitness)
            parent2 = tournament_select(population, fitness)

            # 交叉
            child = crossover(parent1, parent2)

            # 突然変異
            mutate(child, mutation_rate, mutation_sigma)

            new_population.append(child)

        population = new_population

    return population[sorted_indices[0]]

def tournament_select(population, fitness, k=3):
    """トーナメント選択:k個中最高適応度を選択"""
    indices = np.random.choice(len(population), k, replace=False)
    best_idx = indices[np.argmax([fitness[i] for i in indices])]
    return copy.deepcopy(population[best_idx])

def crossover(parent1, parent2):
    """一様交叉:各パラメータを50%の確率で選択"""
    child = copy.deepcopy(parent1)
    for c_param, p2_param in zip(child.parameters(), parent2.parameters()):
        mask = torch.rand_like(c_param) > 0.5
        c_param.data[mask] = p2_param.data[mask]
    return child

def mutate(individual, rate=0.1, sigma=0.1):
    """ガウシアン突然変異"""
    for param in individual.parameters():
        mask = torch.rand_like(param) < rate
        noise = torch.randn_like(param) * sigma
        param.data += mask.float() * noise

GA改善技法

Deep GA

Uber AI LabsのDeep GAは大規模ニューラルネットワークにGAを適用するための技法です:

class DeepGA:
    """Deep GA: パラメータエンコーディングでメモリ効率的なGA"""

    def __init__(self, policy_fn, pop_size=1000, seed_size=5):
        self.pop_size = pop_size
        self.policy_fn = policy_fn

        # 各個体をシードシーケンスでエンコード(メモリ節約)
        self.population = []
        for _ in range(pop_size):
            seeds = [np.random.randint(0, 2**31) for _ in range(seed_size)]
            self.population.append(seeds)

    def decode(self, seed_sequence):
        """シードシーケンスから方策パラメータを復元"""
        policy = self.policy_fn()
        base_params = nn.utils.parameters_to_vector(
            policy.parameters()
        ).detach()

        params = torch.zeros_like(base_params)
        for seed in seed_sequence:
            rng = np.random.RandomState(seed)
            noise = torch.FloatTensor(
                rng.randn(len(base_params))
            ) * 0.01
            params += noise

        nn.utils.vector_to_parameters(params, policy.parameters())
        return policy

    def evolve(self, fitness_fn, num_generations=100):
        for gen in range(num_generations):
            # 適応度評価
            fitness = []
            for seeds in self.population:
                policy = self.decode(seeds)
                fitness.append(fitness_fn(policy))

            # 突然変異:シードを1つ追加
            sorted_idx = np.argsort(fitness)[::-1]
            elites = [self.population[i] for i in sorted_idx[:10]]

            new_pop = [list(e) for e in elites]
            while len(new_pop) < self.pop_size:
                parent = elites[np.random.randint(len(elites))]
                child = list(parent) + [np.random.randint(0, 2**31)]
                new_pop.append(child)

            self.population = new_pop

            if gen % 10 == 0:
                print(f"世代 {gen}: 最高 = {max(fitness):.1f}")

適応度の代わりに行動の新規性を基準に選択する方法です。新しい行動パターンを示す個体が好まれ、多様な戦略を探索します:

class NoveltySearch:
    """新規性探索:行動の多様性で個体を選択"""

    def __init__(self, archive_size=1000, k_nearest=15):
        self.archive = []
        self.archive_size = archive_size
        self.k = k_nearest

    def behavior_characterization(self, trajectory):
        """エージェントの行動を特性ベクトルに要約"""
        # 例:最終位置、訪問状態の統計など
        if len(trajectory) == 0:
            return np.zeros(4)

        states = np.array(trajectory)
        return np.array([
            states[:, 0].mean(),  # 平均x位置
            states[:, 1].mean(),  # 平均y位置
            states[:, 0].std(),   # x分散
            states[:, 1].std(),   # y分散
        ])

    def novelty_score(self, behavior):
        """k最近傍との平均距離 = 新規性スコア"""
        if len(self.archive) < self.k:
            return float('inf')

        distances = [np.linalg.norm(behavior - b)
                     for b in self.archive]
        distances.sort()
        return np.mean(distances[:self.k])

    def add_to_archive(self, behavior):
        self.archive.append(behavior)
        if len(self.archive) > self.archive_size:
            # ランダムに1つ除去
            idx = np.random.randint(len(self.archive))
            self.archive.pop(idx)

ES vs GAの比較

項目進化戦略(ES)遺伝的アルゴリズム(GA)
更新方式勾配推定(連続的)選択 + 交叉 + 突然変異(離散的)
数学的基盤正規分布ベースの最適化生物学的進化の模倣
集団の多様性単一分布からサンプリング複数の個体が独立に存在
並列化非常に簡単簡単
大規模パラメータ比較的良くスケール交叉演算が非効率的

要点まとめ

  • Black-Box最適化は勾配なしで報酬信号のみで方策を最適化する
  • ESはランダム摂動の報酬から勾配を推定し、大規模並列化に有利
  • GAは選択、交叉、突然変異を通じた集団ベースの最適化を行う
  • Deep GAと新規性探索でGAの限界を克服できる
  • 勾配ベース手法よりサンプル効率は低いが、特殊な環境では有用

次の記事では、環境モデルを学習して活用するモデルベース強化学習を見ていきます。