- Authors

- Name
- Youngju Kim
- @fjvbn20031
概要
これまで扱ってきた強化学習アルゴリズムはすべて勾配ベースでした。方策ネットワークのパラメータを逆伝播(backpropagation)で更新していました。しかし、報酬関数が微分不可能だったり、環境がブラックボックスだったりする場合はどうすればよいでしょうか?
Black-Box最適化は目的関数の内部構造を知らないまま、入出力関係のみで最適化する方法です。進化戦略(Evolution Strategies, ES)と遺伝的アルゴリズム(Genetic Algorithms, GA)が代表的です。
Black-Box手法の特徴
勾配ベース vs Black-Box
| 項目 | 勾配ベース | Black-Box |
|---|---|---|
| 微分の必要性 | 目的関数が微分可能である必要 | 不要 |
| 情報の活用 | 勾配(方向 + 大きさ) | 関数値のみ使用 |
| スケーラビリティ | 数百万パラメータ可能 | パラメータ数に敏感 |
| 並列化 | 制限的(同期が必要) | 自然な並列化 |
| 探索の性質 | ローカル探索 | グローバル探索可能 |
長所
- 報酬関数が微分不可能でも適用可能
- 環境の内部構造を知る必要がない
- 大規模並列化が自然
- 局所最適に陥りにくい
短所
- サンプル効率が低い
- 高次元パラメータ空間で遅い
- 勾配情報を活用できない
進化戦略(Evolution Strategies)
基本原理
ESはパラメータ空間で**ランダムな摂動(perturbation)**を作り、良い性能を示した摂動方向にパラメータを更新します。驚くべきことに、このプロセスが勾配推定と数学的に等価であることが明らかになっています。
ESの数学的直観
パラメータthetaにガウシアンノイズepsilonを加えたtheta + sigma * epsilonでの期待報酬を最大化します。この報酬の期待値に対するthetaの勾配をモンテカルロ推定すると、結局報酬で重み付けされたノイズ方向の平均になります。
CartPoleでのES実装
import torch
import torch.nn as nn
import numpy as np
import gymnasium as gym
class ESPolicy(nn.Module):
"""ES用方策ネットワーク"""
def __init__(self, obs_size, act_size):
super().__init__()
self.net = nn.Sequential(
nn.Linear(obs_size, 64),
nn.Tanh(),
nn.Linear(64, act_size),
)
def forward(self, obs):
return self.net(obs)
def get_action(self, obs):
with torch.no_grad():
logits = self.forward(torch.FloatTensor(obs))
return logits.argmax().item()
def evaluate_policy(env, policy, num_episodes=5):
"""方策の平均報酬を計算"""
total_reward = 0.0
for _ in range(num_episodes):
obs, _ = env.reset()
done = False
while not done:
action = policy.get_action(obs)
obs, reward, terminated, truncated, _ = env.step(action)
total_reward += reward
done = terminated or truncated
return total_reward / num_episodes
def train_es_cartpole(pop_size=50, num_generations=100,
sigma=0.1, lr=0.01):
"""CartPoleでのES学習"""
env = gym.make('CartPole-v1')
obs_size = env.observation_space.shape[0]
act_size = env.action_space.n
# 基本方策
policy = ESPolicy(obs_size, act_size)
base_params = nn.utils.parameters_to_vector(
policy.parameters()
).detach()
for gen in range(num_generations):
# 1. ランダム摂動の生成
noise = torch.randn(pop_size, len(base_params))
# 2. 各摂動に対して報酬を評価
rewards = []
for i in range(pop_size):
# 双方向摂動(ミラーリング)
for sign in [1, -1]:
perturbed = base_params + sign * sigma * noise[i]
nn.utils.vector_to_parameters(perturbed, policy.parameters())
reward = evaluate_policy(env, policy, num_episodes=3)
rewards.append((reward, sign, i))
# 3. 報酬で重み付けされた更新方向の計算
rewards_array = np.array([r[0] for r in rewards])
# 報酬正規化(ランクベース)
ranked = np.argsort(np.argsort(-rewards_array)).astype(np.float32)
ranked = (ranked - ranked.mean()) / (ranked.std() + 1e-8)
# 勾配推定
grad = torch.zeros_like(base_params)
for idx, (_, sign, noise_idx) in enumerate(rewards):
grad += ranked[idx] * sign * noise[noise_idx]
grad /= (2 * pop_size * sigma)
# 4. パラメータ更新
base_params += lr * grad
# 性能確認
nn.utils.vector_to_parameters(base_params, policy.parameters())
avg_reward = evaluate_policy(env, policy, num_episodes=10)
if gen % 10 == 0:
print(f"世代 {gen}: 平均報酬 = {avg_reward:.1f}")
if avg_reward >= 495:
print(f"世代 {gen}で解決!")
break
return policy
HalfCheetahでのES
連続行動空間でのESはより多くの集団(population)と世代(generation)が必要です:
class ESContinuousPolicy(nn.Module):
def __init__(self, obs_size, act_size):
super().__init__()
self.net = nn.Sequential(
nn.Linear(obs_size, 256),
nn.Tanh(),
nn.Linear(256, 256),
nn.Tanh(),
nn.Linear(256, act_size),
nn.Tanh(),
)
def forward(self, obs):
return self.net(obs)
def get_action(self, obs):
with torch.no_grad():
return self.forward(torch.FloatTensor(obs)).numpy()
def train_es_continuous(env_name='HalfCheetah-v4', pop_size=200,
num_generations=500, sigma=0.02, lr=0.01):
"""連続環境でのES学習(並列化可能)"""
env = gym.make(env_name)
obs_size = env.observation_space.shape[0]
act_size = env.action_space.shape[0]
policy = ESContinuousPolicy(obs_size, act_size)
base_params = nn.utils.parameters_to_vector(
policy.parameters()
).detach()
# Adamモメンタム追跡
m = torch.zeros_like(base_params) # 1次モーメント
v = torch.zeros_like(base_params) # 2次モーメント
beta1, beta2 = 0.9, 0.999
for gen in range(num_generations):
noise = torch.randn(pop_size, len(base_params))
rewards_pos = np.zeros(pop_size)
rewards_neg = np.zeros(pop_size)
# 並列化可能な評価ループ
for i in range(pop_size):
# 双方向摂動の評価
params_pos = base_params + sigma * noise[i]
nn.utils.vector_to_parameters(params_pos, policy.parameters())
rewards_pos[i] = evaluate_policy(env, policy, num_episodes=1)
params_neg = base_params - sigma * noise[i]
nn.utils.vector_to_parameters(params_neg, policy.parameters())
rewards_neg[i] = evaluate_policy(env, policy, num_episodes=1)
# 報酬差から勾配推定
all_rewards = np.concatenate([rewards_pos, rewards_neg])
reward_mean = all_rewards.mean()
reward_std = all_rewards.std() + 1e-8
grad = torch.zeros_like(base_params)
for i in range(pop_size):
normalized = (rewards_pos[i] - rewards_neg[i]) / reward_std
grad += normalized * noise[i]
grad /= (2 * pop_size * sigma)
# Adam更新
m = beta1 * m + (1 - beta1) * grad
v = beta2 * v + (1 - beta2) * grad ** 2
m_hat = m / (1 - beta1 ** (gen + 1))
v_hat = v / (1 - beta2 ** (gen + 1))
base_params += lr * m_hat / (torch.sqrt(v_hat) + 1e-8)
nn.utils.vector_to_parameters(base_params, policy.parameters())
if gen % 10 == 0:
avg = evaluate_policy(env, policy, num_episodes=5)
print(f"世代 {gen}: 平均報酬 = {avg:.1f}")
return policy
遺伝的アルゴリズム(Genetic Algorithm)
GAの基本原理
遺伝的アルゴリズムは生物学的進化を模倣します:
- 集団(Population):複数の方策(個体)を維持
- 適応度評価(Fitness):各個体の報酬を測定
- 選択(Selection):適応度が高い個体を選択
- 交叉(Crossover):選択された個体のパラメータを混合
- 突然変異(Mutation):少量のランダムな変化を追加
CartPoleでのGA実装
import copy
def train_ga_cartpole(pop_size=100, num_generations=50,
elite_ratio=0.1, mutation_rate=0.1,
mutation_sigma=0.1):
"""遺伝的アルゴリズムでCartPoleを学習"""
env = gym.make('CartPole-v1')
obs_size = env.observation_space.shape[0]
act_size = env.action_space.n
# 初期集団の生成
population = [ESPolicy(obs_size, act_size) for _ in range(pop_size)]
num_elite = max(int(pop_size * elite_ratio), 1)
for gen in range(num_generations):
# 1. 適応度評価
fitness = []
for individual in population:
reward = evaluate_policy(env, individual, num_episodes=5)
fitness.append(reward)
# ソート(適応度の降順)
sorted_indices = np.argsort(fitness)[::-1]
best_fitness = fitness[sorted_indices[0]]
if gen % 5 == 0:
print(f"世代 {gen}: 最高適応度 = {best_fitness:.1f}, "
f"平均 = {np.mean(fitness):.1f}")
if best_fitness >= 495:
print(f"世代 {gen}で解決!")
return population[sorted_indices[0]]
# 2. エリート選択
elites = [copy.deepcopy(population[i]) for i in sorted_indices[:num_elite]]
# 3. 次世代の生成
new_population = list(elites) # エリートはそのまま維持
while len(new_population) < pop_size:
# トーナメント選択
parent1 = tournament_select(population, fitness)
parent2 = tournament_select(population, fitness)
# 交叉
child = crossover(parent1, parent2)
# 突然変異
mutate(child, mutation_rate, mutation_sigma)
new_population.append(child)
population = new_population
return population[sorted_indices[0]]
def tournament_select(population, fitness, k=3):
"""トーナメント選択:k個中最高適応度を選択"""
indices = np.random.choice(len(population), k, replace=False)
best_idx = indices[np.argmax([fitness[i] for i in indices])]
return copy.deepcopy(population[best_idx])
def crossover(parent1, parent2):
"""一様交叉:各パラメータを50%の確率で選択"""
child = copy.deepcopy(parent1)
for c_param, p2_param in zip(child.parameters(), parent2.parameters()):
mask = torch.rand_like(c_param) > 0.5
c_param.data[mask] = p2_param.data[mask]
return child
def mutate(individual, rate=0.1, sigma=0.1):
"""ガウシアン突然変異"""
for param in individual.parameters():
mask = torch.rand_like(param) < rate
noise = torch.randn_like(param) * sigma
param.data += mask.float() * noise
GA改善技法
Deep GA
Uber AI LabsのDeep GAは大規模ニューラルネットワークにGAを適用するための技法です:
class DeepGA:
"""Deep GA: パラメータエンコーディングでメモリ効率的なGA"""
def __init__(self, policy_fn, pop_size=1000, seed_size=5):
self.pop_size = pop_size
self.policy_fn = policy_fn
# 各個体をシードシーケンスでエンコード(メモリ節約)
self.population = []
for _ in range(pop_size):
seeds = [np.random.randint(0, 2**31) for _ in range(seed_size)]
self.population.append(seeds)
def decode(self, seed_sequence):
"""シードシーケンスから方策パラメータを復元"""
policy = self.policy_fn()
base_params = nn.utils.parameters_to_vector(
policy.parameters()
).detach()
params = torch.zeros_like(base_params)
for seed in seed_sequence:
rng = np.random.RandomState(seed)
noise = torch.FloatTensor(
rng.randn(len(base_params))
) * 0.01
params += noise
nn.utils.vector_to_parameters(params, policy.parameters())
return policy
def evolve(self, fitness_fn, num_generations=100):
for gen in range(num_generations):
# 適応度評価
fitness = []
for seeds in self.population:
policy = self.decode(seeds)
fitness.append(fitness_fn(policy))
# 突然変異:シードを1つ追加
sorted_idx = np.argsort(fitness)[::-1]
elites = [self.population[i] for i in sorted_idx[:10]]
new_pop = [list(e) for e in elites]
while len(new_pop) < self.pop_size:
parent = elites[np.random.randint(len(elites))]
child = list(parent) + [np.random.randint(0, 2**31)]
new_pop.append(child)
self.population = new_pop
if gen % 10 == 0:
print(f"世代 {gen}: 最高 = {max(fitness):.1f}")
新規性探索(Novelty Search)
適応度の代わりに行動の新規性を基準に選択する方法です。新しい行動パターンを示す個体が好まれ、多様な戦略を探索します:
class NoveltySearch:
"""新規性探索:行動の多様性で個体を選択"""
def __init__(self, archive_size=1000, k_nearest=15):
self.archive = []
self.archive_size = archive_size
self.k = k_nearest
def behavior_characterization(self, trajectory):
"""エージェントの行動を特性ベクトルに要約"""
# 例:最終位置、訪問状態の統計など
if len(trajectory) == 0:
return np.zeros(4)
states = np.array(trajectory)
return np.array([
states[:, 0].mean(), # 平均x位置
states[:, 1].mean(), # 平均y位置
states[:, 0].std(), # x分散
states[:, 1].std(), # y分散
])
def novelty_score(self, behavior):
"""k最近傍との平均距離 = 新規性スコア"""
if len(self.archive) < self.k:
return float('inf')
distances = [np.linalg.norm(behavior - b)
for b in self.archive]
distances.sort()
return np.mean(distances[:self.k])
def add_to_archive(self, behavior):
self.archive.append(behavior)
if len(self.archive) > self.archive_size:
# ランダムに1つ除去
idx = np.random.randint(len(self.archive))
self.archive.pop(idx)
ES vs GAの比較
| 項目 | 進化戦略(ES) | 遺伝的アルゴリズム(GA) |
|---|---|---|
| 更新方式 | 勾配推定(連続的) | 選択 + 交叉 + 突然変異(離散的) |
| 数学的基盤 | 正規分布ベースの最適化 | 生物学的進化の模倣 |
| 集団の多様性 | 単一分布からサンプリング | 複数の個体が独立に存在 |
| 並列化 | 非常に簡単 | 簡単 |
| 大規模パラメータ | 比較的良くスケール | 交叉演算が非効率的 |
要点まとめ
- Black-Box最適化は勾配なしで報酬信号のみで方策を最適化する
- ESはランダム摂動の報酬から勾配を推定し、大規模並列化に有利
- GAは選択、交叉、突然変異を通じた集団ベースの最適化を行う
- Deep GAと新規性探索でGAの限界を克服できる
- 勾配ベース手法よりサンプル効率は低いが、特殊な環境では有用
次の記事では、環境モデルを学習して活用するモデルベース強化学習を見ていきます。