Skip to content
Published on

[Deep RL] 14. Continuous Action Spaces: DDPG and Distributional Policies

Authors

Overview

Most environments we have covered so far had discrete action spaces (left/right, directional keys, etc.). However, real-world problems like robot control, autonomous driving, and physics simulations require continuous action spaces. Joint angles, torques, and accelerations are real-valued quantities.

This post covers policy design for continuous action spaces, A2C extension, DDPG (Deep Deterministic Policy Gradient), and distributional policy gradients.


Why Continuous Spaces Are Needed

Limitations of Discretization

Discretizing continuous actions (e.g., splitting torque into -1, -0.5, 0, 0.5, 1) is simple but problematic:

  • Curse of dimensionality: Discretizing each of N joints into K levels causes the action space to explode to K^N
  • Lack of precision: Fine-grained control between discretization intervals is impossible
  • Inefficiency: Most discrete action combinations are meaningless

For example, discretizing a 6-joint robot into 10 levels creates 10^6 = one million actions.

Representing Continuous Action Spaces

Continuous policies output probability distributions over actions:

  • Gaussian policy: Outputs mean and variance, samples actions from a normal distribution
  • Beta policy: Uses beta distribution suitable for bounded action ranges
  • Deterministic policy: Directly outputs a single action value (DDPG)

Action Space Design

import torch
import torch.nn as nn
import numpy as np

class ContinuousActionSpace:
    """Continuous action space definition"""

    def __init__(self, low, high):
        self.low = np.array(low, dtype=np.float32)
        self.high = np.array(high, dtype=np.float32)
        self.shape = self.low.shape

    def clip(self, action):
        return np.clip(action, self.low, self.high)

    def sample(self):
        return np.random.uniform(self.low, self.high)

Applying A2C to Continuous Action Spaces

We replace the categorical distribution in the existing A2C with a Gaussian distribution:

class ContinuousActorCritic(nn.Module):
    """Actor-Critic for continuous action spaces"""

    def __init__(self, obs_size, act_size):
        super().__init__()

        self.shared = nn.Sequential(
            nn.Linear(obs_size, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
        )

        # Actor: outputs mean and log standard deviation
        self.mu = nn.Sequential(
            nn.Linear(256, act_size),
            nn.Tanh(),  # Constrains action range to -1, 1
        )
        # Log standard deviation is a state-independent learnable parameter
        self.log_std = nn.Parameter(torch.zeros(act_size))

        # Critic
        self.value = nn.Linear(256, 1)

    def forward(self, obs):
        features = self.shared(obs)
        mu = self.mu(features)
        std = self.log_std.exp()
        value = self.value(features)
        return mu, std, value

    def get_action(self, obs):
        mu, std, value = self.forward(obs)
        dist = torch.distributions.Normal(mu, std)
        action = dist.sample()
        log_prob = dist.log_prob(action).sum(dim=-1)
        return action, log_prob, value

    def evaluate(self, obs, action):
        mu, std, value = self.forward(obs)
        dist = torch.distributions.Normal(mu, std)
        log_prob = dist.log_prob(action).sum(dim=-1)
        entropy = dist.entropy().sum(dim=-1)
        return log_prob, entropy, value

Continuous A2C Training

def train_continuous_a2c(env, model, num_steps=2048, num_updates=1000,
                         gamma=0.99, lr=3e-4, entropy_coef=0.01):
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    obs = env.reset()
    for update in range(num_updates):
        # Collect experiences
        observations = []
        actions = []
        rewards = []
        dones = []
        log_probs = []
        values = []

        for _ in range(num_steps):
            obs_t = torch.FloatTensor(obs).unsqueeze(0)
            action, log_prob, value = model.get_action(obs_t)

            action_np = action.detach().numpy().flatten()
            action_clipped = np.clip(action_np, env.action_space.low,
                                     env.action_space.high)

            next_obs, reward, done, _ = env.step(action_clipped)

            observations.append(obs)
            actions.append(action.detach())
            rewards.append(reward)
            dones.append(done)
            log_probs.append(log_prob)
            values.append(value.squeeze())

            obs = next_obs if not done else env.reset()

        # GAE (Generalized Advantage Estimation) computation
        advantages, returns = compute_gae(rewards, values, dones, gamma)

        # Update
        obs_batch = torch.FloatTensor(np.array(observations))
        act_batch = torch.cat(actions)
        adv_batch = torch.FloatTensor(advantages)
        ret_batch = torch.FloatTensor(returns)

        new_log_probs, entropy, new_values = model.evaluate(
            obs_batch, act_batch
        )

        policy_loss = -(new_log_probs * adv_batch).mean()
        value_loss = (ret_batch - new_values.squeeze()).pow(2).mean()
        entropy_loss = -entropy.mean()

        loss = policy_loss + 0.5 * value_loss + entropy_coef * entropy_loss

        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
        optimizer.step()

def compute_gae(rewards, values, dones, gamma, lam=0.95):
    """Generalized Advantage Estimation"""
    advantages = []
    gae = 0
    values_list = [v.detach().item() for v in values]
    values_list.append(0)  # Bootstrap

    for t in reversed(range(len(rewards))):
        if dones[t]:
            delta = rewards[t] - values_list[t]
            gae = delta
        else:
            delta = rewards[t] + gamma * values_list[t+1] - values_list[t]
            gae = delta + gamma * lam * gae
        advantages.insert(0, gae)

    returns = [a + v for a, v in zip(advantages, values_list[:-1])]
    return advantages, returns

Deterministic Policy Gradient (DDPG)

Core Idea of DDPG

DDPG (Deep Deterministic Policy Gradient) applies DQN ideas to continuous action spaces:

  • Deterministic policy: Instead of probabilistic sampling, directly outputs a single action for a given state
  • Experience replay: Stores past experiences in a buffer and reuses them, like DQN
  • Target network: Soft updates for learning stability
  • Exploration noise: Adds noise to the deterministic policy for exploration

Network Architecture

class DDPGActor(nn.Module):
    """Deterministic policy network"""

    def __init__(self, obs_size, act_size, act_limit):
        super().__init__()
        self.act_limit = act_limit
        self.net = nn.Sequential(
            nn.Linear(obs_size, 400),
            nn.ReLU(),
            nn.Linear(400, 300),
            nn.ReLU(),
            nn.Linear(300, act_size),
            nn.Tanh(),
        )

    def forward(self, obs):
        return self.act_limit * self.net(obs)

class DDPGCritic(nn.Module):
    """Action-value function Q(s, a)"""

    def __init__(self, obs_size, act_size):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_size + act_size, 400),
            nn.ReLU(),
            nn.Linear(400, 300),
            nn.ReLU(),
            nn.Linear(300, 1),
        )

    def forward(self, obs, action):
        x = torch.cat([obs, action], dim=-1)
        return self.net(x)

Exploration Noise: Ornstein-Uhlenbeck Process

DDPG uses temporally correlated OU noise for smooth, continuous exploration:

class OrnsteinUhlenbeckNoise:
    """Temporally correlated exploration noise"""

    def __init__(self, size, mu=0.0, theta=0.15, sigma=0.2):
        self.size = size
        self.mu = mu
        self.theta = theta
        self.sigma = sigma
        self.state = np.ones(size) * mu

    def reset(self):
        self.state = np.ones(self.size) * self.mu

    def sample(self):
        dx = (self.theta * (self.mu - self.state)
              + self.sigma * np.random.randn(self.size))
        self.state += dx
        return self.state.copy()

Full DDPG Implementation

import copy
from collections import deque
import random

class ReplayBuffer:
    def __init__(self, capacity=1000000):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (np.array(states), np.array(actions),
                np.array(rewards, dtype=np.float32),
                np.array(next_states),
                np.array(dones, dtype=np.float32))

    def __len__(self):
        return len(self.buffer)

class DDPGAgent:
    def __init__(self, obs_size, act_size, act_limit,
                 gamma=0.99, tau=0.005, lr_actor=1e-4, lr_critic=1e-3):
        self.gamma = gamma
        self.tau = tau

        # Main networks
        self.actor = DDPGActor(obs_size, act_size, act_limit)
        self.critic = DDPGCritic(obs_size, act_size)

        # Target networks (soft update targets)
        self.target_actor = copy.deepcopy(self.actor)
        self.target_critic = copy.deepcopy(self.critic)

        self.actor_optimizer = torch.optim.Adam(
            self.actor.parameters(), lr=lr_actor)
        self.critic_optimizer = torch.optim.Adam(
            self.critic.parameters(), lr=lr_critic)

        self.replay_buffer = ReplayBuffer()
        self.noise = OrnsteinUhlenbeckNoise(act_size)

    def select_action(self, state, add_noise=True):
        state_t = torch.FloatTensor(state).unsqueeze(0)
        action = self.actor(state_t).detach().numpy().flatten()
        if add_noise:
            action += self.noise.sample()
        return np.clip(action, -self.actor.act_limit, self.actor.act_limit)

    def update(self, batch_size=256):
        if len(self.replay_buffer) < batch_size:
            return

        states, actions, rewards, next_states, dones = \
            self.replay_buffer.sample(batch_size)

        states_t = torch.FloatTensor(states)
        actions_t = torch.FloatTensor(actions)
        rewards_t = torch.FloatTensor(rewards).unsqueeze(1)
        next_states_t = torch.FloatTensor(next_states)
        dones_t = torch.FloatTensor(dones).unsqueeze(1)

        # === Critic update ===
        with torch.no_grad():
            next_actions = self.target_actor(next_states_t)
            target_q = self.target_critic(next_states_t, next_actions)
            target_value = rewards_t + self.gamma * (1 - dones_t) * target_q

        current_q = self.critic(states_t, actions_t)
        critic_loss = nn.MSELoss()(current_q, target_value)

        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        # === Actor update ===
        # Policy objective: maximize Q(s, mu(s))
        predicted_actions = self.actor(states_t)
        actor_loss = -self.critic(states_t, predicted_actions).mean()

        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        # === Target network soft update ===
        self._soft_update(self.actor, self.target_actor)
        self._soft_update(self.critic, self.target_critic)

    def _soft_update(self, source, target):
        for src_param, tgt_param in zip(source.parameters(),
                                         target.parameters()):
            tgt_param.data.copy_(
                self.tau * src_param.data + (1 - self.tau) * tgt_param.data
            )

Distributional Policy Gradient

D4PG: Distributed Distributional DDPG

D4PG combines distributional reinforcement learning with DDPG. Instead of learning the expected Q-value, it learns the entire distribution of returns.

class DistributionalCritic(nn.Module):
    """Distributional Q function: predicts the distribution of returns"""

    def __init__(self, obs_size, act_size,
                 num_atoms=51, v_min=-10.0, v_max=10.0):
        super().__init__()
        self.num_atoms = num_atoms
        self.v_min = v_min
        self.v_max = v_max

        self.support = torch.linspace(v_min, v_max, num_atoms)
        self.delta_z = (v_max - v_min) / (num_atoms - 1)

        self.net = nn.Sequential(
            nn.Linear(obs_size + act_size, 400),
            nn.ReLU(),
            nn.Linear(400, 300),
            nn.ReLU(),
            nn.Linear(300, num_atoms),
        )

    def forward(self, obs, action):
        x = torch.cat([obs, action], dim=-1)
        logits = self.net(x)
        probs = torch.softmax(logits, dim=-1)
        return probs

    def get_q_value(self, obs, action):
        """Compute expected Q-value"""
        probs = self.forward(obs, action)
        return (probs * self.support.unsqueeze(0)).sum(dim=-1, keepdim=True)

Distributional TD Learning

def distributional_critic_loss(critic, target_critic, target_actor,
                                states, actions, rewards, next_states,
                                dones, gamma=0.99):
    """Distributional critic loss computation (categorical projection)"""
    num_atoms = critic.num_atoms
    v_min = critic.v_min
    v_max = critic.v_max
    delta_z = critic.delta_z
    support = critic.support

    with torch.no_grad():
        next_actions = target_actor(next_states)
        target_probs = target_critic(next_states, next_actions)

        # Bellman-updated support
        tz = rewards.unsqueeze(1) + gamma * (1 - dones.unsqueeze(1)) * \
             support.unsqueeze(0)
        tz = tz.clamp(v_min, v_max)

        # Categorical projection
        b = (tz - v_min) / delta_z
        l = b.floor().long()
        u = b.ceil().long()

        projected = torch.zeros_like(target_probs)
        for i in range(num_atoms):
            projected.scatter_add_(1, l[:, i:i+1],
                                   target_probs[:, i:i+1] * (u[:, i:i+1].float() - b[:, i:i+1]))
            projected.scatter_add_(1, u[:, i:i+1],
                                   target_probs[:, i:i+1] * (b[:, i:i+1] - l[:, i:i+1].float()))

    # Cross-entropy loss
    current_logits = critic.net(torch.cat([states, actions], dim=-1))
    loss = -(projected * torch.log_softmax(current_logits, dim=-1)).sum(dim=-1).mean()

    return loss

Experimental Results Comparison

Performance comparison on MuJoCo's HalfCheetah-v4 environment:

AlgorithmAverage reward at 1M stepsLearning stabilitySample efficiency
A2C (continuous)~3000MediumLow
DDPG~8000Low (sensitive)High
D4PG~9500HighHigh

DDPG is sensitive to hyperparameters but achieves high performance when well-tuned. D4PG is more stable thanks to distributional learning.


Key Takeaways

  • Continuous action spaces are handled with Gaussian policies (A2C) or deterministic policies (DDPG)
  • DDPG applies DQN's core techniques (replay buffer, target network) to continuous spaces
  • OU noise provides temporally correlated, smooth exploration
  • Distributional policy gradient (D4PG) improves stability by learning the distribution of returns

In the next post, we will cover Trust Region methods (TRPO, PPO, ACKTR) that guarantee stability of policy updates.