Skip to content
Published on

[Deep RL] 16. Black-Box Optimization: Evolution Strategies and Genetic Algorithms

Authors

Overview

All the reinforcement learning algorithms we have covered so far were gradient-based. Policy network parameters were updated through backpropagation. But what if the reward function is non-differentiable, or the environment is a black box?

Black-Box optimization optimizes using only the input-output relationship without knowing the internal structure of the objective function. Evolution Strategies (ES) and Genetic Algorithms (GA) are representative methods.


Characteristics of Black-Box Methods

Gradient-Based vs Black-Box

ItemGradient-BasedBlack-Box
DifferentiabilityObjective function must be differentiableNot required
Information usageGradient (direction + magnitude)Only function values
ScalabilityMillions of parameters possibleSensitive to parameter count
ParallelizationLimited (requires synchronization)Natural parallelization
Search characterLocal searchGlobal search possible

Advantages

  • Applicable even when the reward function is non-differentiable
  • No need to know the internal structure of the environment
  • Large-scale parallelization is natural
  • Less likely to get stuck in local optima

Disadvantages

  • Low sample efficiency
  • Slow in high-dimensional parameter spaces
  • Cannot leverage gradient information

Evolution Strategies (ES)

Basic Principle

ES creates random perturbations in the parameter space and updates parameters in the direction of perturbations that showed good performance. Remarkably, this process has been shown to be mathematically equivalent to gradient estimation.

Mathematical Intuition of ES

We maximize the expected reward at theta + sigma * epsilon, where epsilon is Gaussian noise added to parameter theta. The gradient of the expected reward with respect to theta, estimated via Monte Carlo, turns out to be the mean of noise directions weighted by rewards.

ES Implementation on CartPole

import torch
import torch.nn as nn
import numpy as np
import gymnasium as gym

class ESPolicy(nn.Module):
    """Policy network for ES"""
    def __init__(self, obs_size, act_size):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_size, 64),
            nn.Tanh(),
            nn.Linear(64, act_size),
        )

    def forward(self, obs):
        return self.net(obs)

    def get_action(self, obs):
        with torch.no_grad():
            logits = self.forward(torch.FloatTensor(obs))
            return logits.argmax().item()

def evaluate_policy(env, policy, num_episodes=5):
    """Compute average reward of a policy"""
    total_reward = 0.0
    for _ in range(num_episodes):
        obs, _ = env.reset()
        done = False
        while not done:
            action = policy.get_action(obs)
            obs, reward, terminated, truncated, _ = env.step(action)
            total_reward += reward
            done = terminated or truncated
    return total_reward / num_episodes

def train_es_cartpole(pop_size=50, num_generations=100,
                      sigma=0.1, lr=0.01):
    """ES training on CartPole"""
    env = gym.make('CartPole-v1')
    obs_size = env.observation_space.shape[0]
    act_size = env.action_space.n

    # Base policy
    policy = ESPolicy(obs_size, act_size)
    base_params = nn.utils.parameters_to_vector(
        policy.parameters()
    ).detach()

    for gen in range(num_generations):
        # 1. Generate random perturbations
        noise = torch.randn(pop_size, len(base_params))

        # 2. Evaluate reward for each perturbation
        rewards = []
        for i in range(pop_size):
            # Bidirectional perturbation (mirroring)
            for sign in [1, -1]:
                perturbed = base_params + sign * sigma * noise[i]
                nn.utils.vector_to_parameters(perturbed, policy.parameters())
                reward = evaluate_policy(env, policy, num_episodes=3)
                rewards.append((reward, sign, i))

        # 3. Compute update direction weighted by rewards
        rewards_array = np.array([r[0] for r in rewards])

        # Reward normalization (rank-based)
        ranked = np.argsort(np.argsort(-rewards_array)).astype(np.float32)
        ranked = (ranked - ranked.mean()) / (ranked.std() + 1e-8)

        # Gradient estimation
        grad = torch.zeros_like(base_params)
        for idx, (_, sign, noise_idx) in enumerate(rewards):
            grad += ranked[idx] * sign * noise[noise_idx]
        grad /= (2 * pop_size * sigma)

        # 4. Parameter update
        base_params += lr * grad

        # Check performance
        nn.utils.vector_to_parameters(base_params, policy.parameters())
        avg_reward = evaluate_policy(env, policy, num_episodes=10)
        if gen % 10 == 0:
            print(f"Generation {gen}: Average reward = {avg_reward:.1f}")

        if avg_reward >= 495:
            print(f"Solved at generation {gen}!")
            break

    return policy

ES on HalfCheetah

ES on continuous action spaces requires more population and generations:

class ESContinuousPolicy(nn.Module):
    def __init__(self, obs_size, act_size):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_size, 256),
            nn.Tanh(),
            nn.Linear(256, 256),
            nn.Tanh(),
            nn.Linear(256, act_size),
            nn.Tanh(),
        )

    def forward(self, obs):
        return self.net(obs)

    def get_action(self, obs):
        with torch.no_grad():
            return self.forward(torch.FloatTensor(obs)).numpy()

def train_es_continuous(env_name='HalfCheetah-v4', pop_size=200,
                        num_generations=500, sigma=0.02, lr=0.01):
    """ES training on continuous environments (parallelizable)"""
    env = gym.make(env_name)
    obs_size = env.observation_space.shape[0]
    act_size = env.action_space.shape[0]

    policy = ESContinuousPolicy(obs_size, act_size)
    base_params = nn.utils.parameters_to_vector(
        policy.parameters()
    ).detach()

    # Adam momentum tracking
    m = torch.zeros_like(base_params)  # First moment
    v = torch.zeros_like(base_params)  # Second moment
    beta1, beta2 = 0.9, 0.999

    for gen in range(num_generations):
        noise = torch.randn(pop_size, len(base_params))
        rewards_pos = np.zeros(pop_size)
        rewards_neg = np.zeros(pop_size)

        # Parallelizable evaluation loop
        for i in range(pop_size):
            # Bidirectional perturbation evaluation
            params_pos = base_params + sigma * noise[i]
            nn.utils.vector_to_parameters(params_pos, policy.parameters())
            rewards_pos[i] = evaluate_policy(env, policy, num_episodes=1)

            params_neg = base_params - sigma * noise[i]
            nn.utils.vector_to_parameters(params_neg, policy.parameters())
            rewards_neg[i] = evaluate_policy(env, policy, num_episodes=1)

        # Gradient estimation from reward differences
        all_rewards = np.concatenate([rewards_pos, rewards_neg])
        reward_mean = all_rewards.mean()
        reward_std = all_rewards.std() + 1e-8

        grad = torch.zeros_like(base_params)
        for i in range(pop_size):
            normalized = (rewards_pos[i] - rewards_neg[i]) / reward_std
            grad += normalized * noise[i]
        grad /= (2 * pop_size * sigma)

        # Adam update
        m = beta1 * m + (1 - beta1) * grad
        v = beta2 * v + (1 - beta2) * grad ** 2
        m_hat = m / (1 - beta1 ** (gen + 1))
        v_hat = v / (1 - beta2 ** (gen + 1))

        base_params += lr * m_hat / (torch.sqrt(v_hat) + 1e-8)

        nn.utils.vector_to_parameters(base_params, policy.parameters())
        if gen % 10 == 0:
            avg = evaluate_policy(env, policy, num_episodes=5)
            print(f"Generation {gen}: Average reward = {avg:.1f}")

    return policy

Genetic Algorithms (GA)

GA Basic Principle

Genetic algorithms mimic biological evolution:

  1. Population: Maintain multiple policies (individuals)
  2. Fitness evaluation: Measure each individual's reward
  3. Selection: Select individuals with high fitness
  4. Crossover: Mix parameters of selected individuals
  5. Mutation: Add small random changes

GA Implementation on CartPole

import copy

def train_ga_cartpole(pop_size=100, num_generations=50,
                      elite_ratio=0.1, mutation_rate=0.1,
                      mutation_sigma=0.1):
    """Train CartPole with genetic algorithm"""
    env = gym.make('CartPole-v1')
    obs_size = env.observation_space.shape[0]
    act_size = env.action_space.n

    # Create initial population
    population = [ESPolicy(obs_size, act_size) for _ in range(pop_size)]
    num_elite = max(int(pop_size * elite_ratio), 1)

    for gen in range(num_generations):
        # 1. Fitness evaluation
        fitness = []
        for individual in population:
            reward = evaluate_policy(env, individual, num_episodes=5)
            fitness.append(reward)

        # Sort (descending by fitness)
        sorted_indices = np.argsort(fitness)[::-1]
        best_fitness = fitness[sorted_indices[0]]

        if gen % 5 == 0:
            print(f"Generation {gen}: Best fitness = {best_fitness:.1f}, "
                  f"Average = {np.mean(fitness):.1f}")

        if best_fitness >= 495:
            print(f"Solved at generation {gen}!")
            return population[sorted_indices[0]]

        # 2. Elite selection
        elites = [copy.deepcopy(population[i]) for i in sorted_indices[:num_elite]]

        # 3. Create next generation
        new_population = list(elites)  # Elites carry over unchanged

        while len(new_population) < pop_size:
            # Tournament selection
            parent1 = tournament_select(population, fitness)
            parent2 = tournament_select(population, fitness)

            # Crossover
            child = crossover(parent1, parent2)

            # Mutation
            mutate(child, mutation_rate, mutation_sigma)

            new_population.append(child)

        population = new_population

    return population[sorted_indices[0]]

def tournament_select(population, fitness, k=3):
    """Tournament selection: select the best among k individuals"""
    indices = np.random.choice(len(population), k, replace=False)
    best_idx = indices[np.argmax([fitness[i] for i in indices])]
    return copy.deepcopy(population[best_idx])

def crossover(parent1, parent2):
    """Uniform crossover: select each parameter with 50% probability"""
    child = copy.deepcopy(parent1)
    for c_param, p2_param in zip(child.parameters(), parent2.parameters()):
        mask = torch.rand_like(c_param) > 0.5
        c_param.data[mask] = p2_param.data[mask]
    return child

def mutate(individual, rate=0.1, sigma=0.1):
    """Gaussian mutation"""
    for param in individual.parameters():
        mask = torch.rand_like(param) < rate
        noise = torch.randn_like(param) * sigma
        param.data += mask.float() * noise

GA Improvement Techniques

Deep GA

Uber AI Labs' Deep GA is a technique for applying GA to large-scale neural networks:

class DeepGA:
    """Deep GA: memory-efficient GA through parameter encoding"""

    def __init__(self, policy_fn, pop_size=1000, seed_size=5):
        self.pop_size = pop_size
        self.policy_fn = policy_fn

        # Encode each individual as a seed sequence (memory savings)
        self.population = []
        for _ in range(pop_size):
            seeds = [np.random.randint(0, 2**31) for _ in range(seed_size)]
            self.population.append(seeds)

    def decode(self, seed_sequence):
        """Reconstruct policy parameters from seed sequence"""
        policy = self.policy_fn()
        base_params = nn.utils.parameters_to_vector(
            policy.parameters()
        ).detach()

        params = torch.zeros_like(base_params)
        for seed in seed_sequence:
            rng = np.random.RandomState(seed)
            noise = torch.FloatTensor(
                rng.randn(len(base_params))
            ) * 0.01
            params += noise

        nn.utils.vector_to_parameters(params, policy.parameters())
        return policy

    def evolve(self, fitness_fn, num_generations=100):
        for gen in range(num_generations):
            # Fitness evaluation
            fitness = []
            for seeds in self.population:
                policy = self.decode(seeds)
                fitness.append(fitness_fn(policy))

            # Mutation: append one seed
            sorted_idx = np.argsort(fitness)[::-1]
            elites = [self.population[i] for i in sorted_idx[:10]]

            new_pop = [list(e) for e in elites]
            while len(new_pop) < self.pop_size:
                parent = elites[np.random.randint(len(elites))]
                child = list(parent) + [np.random.randint(0, 2**31)]
                new_pop.append(child)

            self.population = new_pop

            if gen % 10 == 0:
                print(f"Generation {gen}: Best = {max(fitness):.1f}")

A method that selects based on behavioral novelty instead of fitness. Individuals showing new behavioral patterns are preferred, exploring diverse strategies:

class NoveltySearch:
    """Novelty search: select individuals by behavioral diversity"""

    def __init__(self, archive_size=1000, k_nearest=15):
        self.archive = []
        self.archive_size = archive_size
        self.k = k_nearest

    def behavior_characterization(self, trajectory):
        """Summarize agent behavior as a feature vector"""
        # e.g., final position, statistics of visited states
        if len(trajectory) == 0:
            return np.zeros(4)

        states = np.array(trajectory)
        return np.array([
            states[:, 0].mean(),  # Average x position
            states[:, 1].mean(),  # Average y position
            states[:, 0].std(),   # x variance
            states[:, 1].std(),   # y variance
        ])

    def novelty_score(self, behavior):
        """Average distance to k-nearest neighbors = novelty score"""
        if len(self.archive) < self.k:
            return float('inf')

        distances = [np.linalg.norm(behavior - b)
                     for b in self.archive]
        distances.sort()
        return np.mean(distances[:self.k])

    def add_to_archive(self, behavior):
        self.archive.append(behavior)
        if len(self.archive) > self.archive_size:
            # Remove one randomly
            idx = np.random.randint(len(self.archive))
            self.archive.pop(idx)

ES vs GA Comparison

ItemEvolution Strategies (ES)Genetic Algorithms (GA)
Update methodGradient estimation (continuous)Selection + crossover + mutation (discrete)
Mathematical basisNormal distribution optimizationBiological evolution mimicry
Population diversitySampling from single distributionMultiple independent individuals
ParallelizationVery easyEasy
Large parametersScales relatively wellCrossover operations inefficient

Key Takeaways

  • Black-Box optimization optimizes policies using only reward signals without gradients
  • ES estimates gradients from rewards of random perturbations and is well-suited for large-scale parallelization
  • GA performs population-based optimization through selection, crossover, and mutation
  • Deep GA and novelty search can overcome GA limitations
  • Lower sample efficiency than gradient-based methods, but useful in special environments

In the next post, we will explore model-based reinforcement learning that learns and utilizes environment models.