Skip to content
Published on

[深層強化学習] 02. OpenAI Gymで始める強化学習

Authors

エージェントの構造(Anatomy of an Agent)

強化学習においてエージェントは単に環境と相互作用する主体ではなく、内部的にいくつかの核心構成要素を持っています。

エージェントの核心構成要素

  1. 方策(Policy): 現在の状態でどの行動を取るかを決定するルール
  2. 価値関数(Value Function): 各状態または状態-行動ペアの期待報酬を推定
  3. モデル(Model): 環境の動作を内部的にシミュレーション(オプション)
  4. メモリ(Memory): 過去の経験を保存して学習に活用
class SimpleAgent:
    """에이전트의 기본 구조"""
    def __init__(self, action_space):
        self.action_space = action_space
        self.memory = []

    def select_action(self, observation):
        """정책: 관찰을 받아 행동을 결정"""
        raise NotImplementedError

    def store_experience(self, obs, action, reward, next_obs, done):
        """경험을 메모리에 저장"""
        self.memory.append((obs, action, reward, next_obs, done))

    def learn(self):
        """저장된 경험으로부터 학습"""
        raise NotImplementedError

OpenAI Gym紹介

OpenAI Gym(現在Gymnasiumに名称変更)は、強化学習アルゴリズムを開発・比較するための標準ツールです。多様な環境を一貫したインターフェースで提供します。

インストール

pip install gymnasium
pip install gymnasium[classic-control]  # CartPole 등 클래식 환경
pip install gymnasium[atari]            # Atari 게임 환경
pip install gymnasium[box2d]            # Box2D 물리 환경

コアAPI

Gymのすべての環境は同一のインターフェースに従います。これがGymの最大の利点です。

import gymnasium as gym

# 환경 생성
env = gym.make("CartPole-v1")

# 환경 초기화 - 초기 관찰과 정보를 반환
observation, info = env.reset()

# 한 스텝 실행 - 행동을 전달하고 결과를 받음
action = env.action_space.sample()  # 무작위 행동 선택
observation, reward, terminated, truncated, info = env.step(action)

# 종료 조건 확인
done = terminated or truncated

# 환경 정리
env.close()

行動空間と観測空間

Gym環境は行動空間(action space)と観測空間(observation space)という2つの核心属性を持ちます。

離散空間(Discrete Space)

決まった数の行動から1つを選択します。

import gymnasium as gym

env = gym.make("CartPole-v1")

# 행동 공간 확인
print(f"행동 공간: {env.action_space}")
# 출력: Discrete(2)  - 0: 왼쪽으로 밀기, 1: 오른쪽으로 밀기

print(f"행동 개수: {env.action_space.n}")
# 출력: 2

# 무작위 행동 샘플링
random_action = env.action_space.sample()
print(f"무작위 행동: {random_action}")

連続空間(Box Space)

連続的な値の範囲を持つ空間です。

env = gym.make("MountainCarContinuous-v0")

# 관찰 공간 확인
print(f"관찰 공간: {env.observation_space}")
print(f"관찰 하한: {env.observation_space.low}")
print(f"관찰 상한: {env.observation_space.high}")
print(f"관찰 차원: {env.observation_space.shape}")

# 행동 공간 확인 (연속적)
print(f"행동 공간: {env.action_space}")
print(f"행동 하한: {env.action_space.low}")
print(f"행동 상한: {env.action_space.high}")

主要環境の空間比較

環境観測空間行動空間
CartPole-v1Box(4,)Discrete(2)
MountainCar-v0Box(2,)Discrete(3)
Pendulum-v1Box(3,)Box(1,)
Pong-v5Box(210,160,3)Discrete(6)

CartPole環境分析

CartPoleは強化学習入門で最もよく使われる環境です。カートの上に立てた棒が倒れないようにカートを左右に動かす問題です。

環境詳細

import gymnasium as gym
import numpy as np

env = gym.make("CartPole-v1")
observation, info = env.reset()

print("=== CartPole-v1 환경 정보 ===")
print(f"관찰 공간 차원: {env.observation_space.shape[0]}")
print(f"  [0] 카트 위치: {env.observation_space.low[0]:.1f} ~ {env.observation_space.high[0]:.1f}")
print(f"  [1] 카트 속도: {env.observation_space.low[1]:.1f} ~ {env.observation_space.high[1]:.1f}")
print(f"  [2] 막대 각도: {env.observation_space.low[2]:.4f} ~ {env.observation_space.high[2]:.4f}")
print(f"  [3] 막대 각속도: {env.observation_space.low[3]:.1f} ~ {env.observation_space.high[3]:.1f}")
print(f"\n행동: 0=왼쪽, 1=오른쪽")
print(f"\n초기 관찰값: {observation}")

終了条件

CartPoleエピソードは以下の条件で終了します:

  • 棒の角度が12度を超過
  • カートが画面境界(2.4単位)を超える
  • エピソード長が500ステップを超過(truncation)

ランダムエージェント(Random Agent)

最も単純なエージェントは、環境の行動空間からランダムに行動を選択するものです。

import gymnasium as gym
import numpy as np

def run_random_agent(env_name, n_episodes=100):
    """무작위 에이전트 실행 및 성능 측정"""
    env = gym.make(env_name)
    episode_rewards = []
    episode_lengths = []

    for episode in range(n_episodes):
        obs, info = env.reset()
        total_reward = 0
        steps = 0

        while True:
            action = env.action_space.sample()
            obs, reward, terminated, truncated, info = env.step(action)
            total_reward += reward
            steps += 1

            if terminated or truncated:
                break

        episode_rewards.append(total_reward)
        episode_lengths.append(steps)

    env.close()

    print(f"=== {env_name} 무작위 에이전트 결과 ({n_episodes}회) ===")
    print(f"평균 보상: {np.mean(episode_rewards):.2f} +/- {np.std(episode_rewards):.2f}")
    print(f"최대 보상: {np.max(episode_rewards):.2f}")
    print(f"최소 보상: {np.min(episode_rewards):.2f}")
    print(f"평균 에피소드 길이: {np.mean(episode_lengths):.1f}")

    return episode_rewards

# CartPole에서 무작위 에이전트 실행
rewards = run_random_agent("CartPole-v1", n_episodes=1000)

ランダムエージェントのCartPole平均報酬は約20前後です。最大500点まで可能なので、ランダムエージェントは非常に低い性能を示します。


単純なルールベースエージェント

学習なしでも簡単なルールでランダムエージェントよりはるかに良い性能を出すことができます。

def run_heuristic_agent(n_episodes=100):
    """휴리스틱 에이전트: 막대가 기울어진 방향으로 카트를 움직임"""
    env = gym.make("CartPole-v1")
    episode_rewards = []

    for episode in range(n_episodes):
        obs, info = env.reset()
        total_reward = 0

        while True:
            # 관찰값에서 막대 각도(인덱스 2)를 확인
            pole_angle = obs[2]

            # 간단한 규칙: 막대가 오른쪽으로 기울면 오른쪽으로
            if pole_angle > 0:
                action = 1
            else:
                action = 0

            obs, reward, terminated, truncated, info = env.step(action)
            total_reward += reward

            if terminated or truncated:
                break

        episode_rewards.append(total_reward)

    env.close()

    print(f"=== 휴리스틱 에이전트 결과 ({n_episodes}회) ===")
    print(f"평균 보상: {np.mean(episode_rewards):.2f}")
    print(f"최대 보상: {np.max(episode_rewards):.2f}")

    return episode_rewards

rewards = run_heuristic_agent()

この単純なルールだけでも平均40〜60点程度の性能を得ることができます。しかし最適性能(500点)にはまだ遠く及びません。


Gymラッパー(Wrappers)

Gymラッパーは環境をラップして観測、行動、報酬などを変換する機能を提供します。ラッパーを使えば、環境の核心ロジックを修正せずに様々な前処理を適用できます。

ラッパーの種類

import gymnasium as gym
from gymnasium import Wrapper, ObservationWrapper, ActionWrapper, RewardWrapper

class ClipRewardWrapper(RewardWrapper):
    """보상을 -1, 0, +1로 클리핑하는 래퍼"""
    def reward(self, reward):
        if reward > 0:
            return 1.0
        elif reward < 0:
            return -1.0
        return 0.0

class NormalizeObservationWrapper(ObservationWrapper):
    """관찰값을 정규화하는 래퍼"""
    def __init__(self, env):
        super().__init__(env)
        self.obs_mean = None
        self.obs_var = None
        self.count = 0

    def observation(self, observation):
        # 간단한 min-max 정규화
        low = self.observation_space.low
        high = self.observation_space.high
        # 무한대 값 처리
        low = np.clip(low, -10, 10)
        high = np.clip(high, -10, 10)
        return (observation - low) / (high - low + 1e-8)

ラッパーチェイニング

複数のラッパーを順番に適用して複雑な前処理パイプラインを構成できます。

class EpisodeStatsWrapper(Wrapper):
    """에피소드 통계를 기록하는 래퍼"""
    def __init__(self, env):
        super().__init__(env)
        self.episode_reward = 0
        self.episode_length = 0
        self.episode_rewards = []
        self.episode_lengths = []

    def reset(self, **kwargs):
        if self.episode_length > 0:
            self.episode_rewards.append(self.episode_reward)
            self.episode_lengths.append(self.episode_length)
        self.episode_reward = 0
        self.episode_length = 0
        return self.env.reset(**kwargs)

    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)
        self.episode_reward += reward
        self.episode_length += 1
        return obs, reward, terminated, truncated, info

    def get_stats(self):
        return {
            'mean_reward': np.mean(self.episode_rewards) if self.episode_rewards else 0,
            'mean_length': np.mean(self.episode_lengths) if self.episode_lengths else 0,
            'n_episodes': len(self.episode_rewards),
        }

# 래퍼 적용 예시
env = gym.make("CartPole-v1")
env = EpisodeStatsWrapper(env)
env = ClipRewardWrapper(env)

Atari環境の前処理

Atariゲーム環境は210x160サイズのRGB画像を観測として提供します。これをそのまま使うと入力が大きすぎるため、いくつかの前処理が必要です。

class AtariPreprocessingWrapper(ObservationWrapper):
    """Atari 환경 전처리 래퍼"""
    def __init__(self, env, frame_size=84):
        super().__init__(env)
        self.frame_size = frame_size
        self.observation_space = gym.spaces.Box(
            low=0, high=255,
            shape=(frame_size, frame_size, 1),
            dtype=np.uint8
        )

    def observation(self, obs):
        # RGB를 그레이스케일로 변환
        gray = np.mean(obs, axis=2, keepdims=True).astype(np.uint8)
        # 크기 조정 (간단한 다운샘플링)
        h, w = gray.shape[:2]
        h_step = h // self.frame_size
        w_step = w // self.frame_size
        resized = gray[::h_step, ::w_step][:self.frame_size, :self.frame_size]
        return resized

class FrameStackWrapper(ObservationWrapper):
    """연속 프레임을 쌓아서 하나의 관찰로 만드는 래퍼"""
    def __init__(self, env, n_frames=4):
        super().__init__(env)
        self.n_frames = n_frames
        self.frames = []
        old_space = env.observation_space
        self.observation_space = gym.spaces.Box(
            low=0, high=255,
            shape=(old_space.shape[0], old_space.shape[1], n_frames),
            dtype=np.uint8
        )

    def reset(self, **kwargs):
        obs, info = self.env.reset(**kwargs)
        self.frames = [obs] * self.n_frames
        return np.concatenate(self.frames, axis=-1), info

    def observation(self, obs):
        self.frames.pop(0)
        self.frames.append(obs)
        return np.concatenate(self.frames, axis=-1)

映像録画とモニタリング

エージェントの学習過程を視覚的に確認するために映像を録画できます。

import gymnasium as gym

# RecordVideo 래퍼로 영상 녹화
env = gym.make("CartPole-v1", render_mode="rgb_array")
env = gym.wrappers.RecordVideo(
    env,
    video_folder="./videos",
    episode_trigger=lambda ep: ep % 100 == 0  # 100 에피소드마다 녹화
)

# 에피소드 실행
obs, info = env.reset()
done = False
while not done:
    action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(action)
    done = terminated or truncated

env.close()
print("영상이 ./videos 폴더에 저장되었습니다")

ベクトル環境(Vectorized Environments)

学習速度を上げるために複数の環境を並列で実行できます。

import gymnasium as gym

# 4개의 환경을 동시에 실행
envs = gym.make_vec("CartPole-v1", num_envs=4)

# 모든 환경 초기화
observations, infos = envs.reset()
print(f"관찰 shape: {observations.shape}")  # (4, 4) - 4개 환경, 각 4차원 관찰

# 모든 환경에서 동시에 한 스텝 실행
actions = np.array([envs.single_action_space.sample() for _ in range(4)])
observations, rewards, terminateds, truncateds, infos = envs.step(actions)

print(f"보상: {rewards}")  # 4개의 보상값
print(f"종료: {terminateds}")

envs.close()

カスタム環境の作成

Gymのインターフェースに従って独自の環境を作成できます。

import gymnasium as gym
from gymnasium import spaces
import numpy as np

class SimpleCorridorEnv(gym.Env):
    """간단한 1D 복도 환경"""
    metadata = {"render_modes": ["human"]}

    def __init__(self, corridor_length=10):
        super().__init__()
        self.corridor_length = corridor_length

        # 행동: 0=왼쪽, 1=오른쪽
        self.action_space = spaces.Discrete(2)

        # 관찰: 현재 위치 (0 ~ corridor_length-1)
        self.observation_space = spaces.Box(
            low=0, high=corridor_length - 1,
            shape=(1,), dtype=np.float32
        )

        self.position = 0
        self.goal = corridor_length - 1

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.position = 0
        return np.array([self.position], dtype=np.float32), {}

    def step(self, action):
        if action == 0:  # 왼쪽
            self.position = max(0, self.position - 1)
        else:  # 오른쪽
            self.position = min(self.corridor_length - 1, self.position + 1)

        observation = np.array([self.position], dtype=np.float32)

        # 목표 도달 시 큰 보상
        if self.position == self.goal:
            reward = 10.0
            terminated = True
        else:
            reward = -0.1  # 스텝 페널티
            terminated = False

        return observation, reward, terminated, False, {}

# 커스텀 환경 사용
env = SimpleCorridorEnv(corridor_length=5)
obs, info = env.reset()
print(f"초기 위치: {obs[0]}")

for step in range(20):
    action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(action)
    direction = "오른쪽" if action == 1 else "왼쪽"
    print(f"  스텝 {step + 1}: {direction} -> 위치={obs[0]:.0f}, 보상={reward:.1f}")
    if terminated:
        print("  목표 도달!")
        break

まとめ

この記事で扱った内容を整理します。

  1. エージェントの構造: 方策、価値関数、モデル、メモリで構成
  2. Gym API: reset()step()action_spaceobservation_spaceで一貫したインターフェースを提供
  3. 空間タイプ: Discrete(離散)、Box(連続)など多様な空間タイプをサポート
  4. CartPole: 強化学習入門の代表環境、ランダムエージェントで約20点、最適500点
  5. ラッパー: 環境をラップして観測、行動、報酬を変換する柔軟なメカニズム
  6. ベクトル環境: 複数の環境を並列実行して学習速度を向上

次の記事では、PyTorchの基礎を扱い、ニューラルネットワークを利用した強化学習の基盤を築きます。