Skip to content

Split View: [심층 강화학습] 02. OpenAI Gym으로 시작하는 강화학습

|

[심층 강화학습] 02. OpenAI Gym으로 시작하는 강화학습

에이전트의 구조 (Anatomy of an Agent)

강화학습에서 에이전트는 단순히 환경과 상호작용하는 주체가 아니라, 내부적으로 몇 가지 핵심 구성 요소를 갖고 있습니다.

에이전트의 핵심 구성 요소

  1. 정책 (Policy): 현재 상태에서 어떤 행동을 할지 결정하는 규칙
  2. 가치 함수 (Value Function): 각 상태 또는 상태-행동 쌍의 기대 보상을 추정
  3. 모델 (Model): 환경의 동작을 내부적으로 시뮬레이션 (선택적)
  4. 메모리 (Memory): 과거 경험을 저장하여 학습에 활용
class SimpleAgent:
    """에이전트의 기본 구조"""
    def __init__(self, action_space):
        self.action_space = action_space
        self.memory = []

    def select_action(self, observation):
        """정책: 관찰을 받아 행동을 결정"""
        raise NotImplementedError

    def store_experience(self, obs, action, reward, next_obs, done):
        """경험을 메모리에 저장"""
        self.memory.append((obs, action, reward, next_obs, done))

    def learn(self):
        """저장된 경험으로부터 학습"""
        raise NotImplementedError

OpenAI Gym 소개

OpenAI Gym(현재 Gymnasium으로 이름이 변경됨)은 강화학습 알고리즘을 개발하고 비교하기 위한 표준 도구입니다. 다양한 환경을 일관된 인터페이스로 제공합니다.

설치

pip install gymnasium
pip install gymnasium[classic-control]  # CartPole 등 클래식 환경
pip install gymnasium[atari]            # Atari 게임 환경
pip install gymnasium[box2d]            # Box2D 물리 환경

핵심 API

Gym의 모든 환경은 동일한 인터페이스를 따릅니다. 이것이 Gym의 가장 큰 장점입니다.

import gymnasium as gym

# 환경 생성
env = gym.make("CartPole-v1")

# 환경 초기화 - 초기 관찰과 정보를 반환
observation, info = env.reset()

# 한 스텝 실행 - 행동을 전달하고 결과를 받음
action = env.action_space.sample()  # 무작위 행동 선택
observation, reward, terminated, truncated, info = env.step(action)

# 종료 조건 확인
done = terminated or truncated

# 환경 정리
env.close()

행동 공간과 관찰 공간

Gym 환경은 행동 공간(action space)과 관찰 공간(observation space)이라는 두 가지 핵심 속성을 가집니다.

이산 공간 (Discrete Space)

정해진 개수의 행동 중 하나를 선택합니다.

import gymnasium as gym

env = gym.make("CartPole-v1")

# 행동 공간 확인
print(f"행동 공간: {env.action_space}")
# 출력: Discrete(2)  - 0: 왼쪽으로 밀기, 1: 오른쪽으로 밀기

print(f"행동 개수: {env.action_space.n}")
# 출력: 2

# 무작위 행동 샘플링
random_action = env.action_space.sample()
print(f"무작위 행동: {random_action}")

연속 공간 (Box Space)

연속적인 값의 범위를 가지는 공간입니다.

env = gym.make("MountainCarContinuous-v0")

# 관찰 공간 확인
print(f"관찰 공간: {env.observation_space}")
print(f"관찰 하한: {env.observation_space.low}")
print(f"관찰 상한: {env.observation_space.high}")
print(f"관찰 차원: {env.observation_space.shape}")

# 행동 공간 확인 (연속적)
print(f"행동 공간: {env.action_space}")
print(f"행동 하한: {env.action_space.low}")
print(f"행동 상한: {env.action_space.high}")

주요 환경의 공간 비교

환경관찰 공간행동 공간
CartPole-v1Box(4,)Discrete(2)
MountainCar-v0Box(2,)Discrete(3)
Pendulum-v1Box(3,)Box(1,)
Pong-v5Box(210,160,3)Discrete(6)

CartPole 환경 분석

CartPole은 강화학습 입문에서 가장 많이 사용되는 환경입니다. 카트 위에 세워진 막대가 쓰러지지 않도록 카트를 좌우로 움직이는 문제입니다.

환경 세부 사항

import gymnasium as gym
import numpy as np

env = gym.make("CartPole-v1")
observation, info = env.reset()

print("=== CartPole-v1 환경 정보 ===")
print(f"관찰 공간 차원: {env.observation_space.shape[0]}")
print(f"  [0] 카트 위치: {env.observation_space.low[0]:.1f} ~ {env.observation_space.high[0]:.1f}")
print(f"  [1] 카트 속도: {env.observation_space.low[1]:.1f} ~ {env.observation_space.high[1]:.1f}")
print(f"  [2] 막대 각도: {env.observation_space.low[2]:.4f} ~ {env.observation_space.high[2]:.4f}")
print(f"  [3] 막대 각속도: {env.observation_space.low[3]:.1f} ~ {env.observation_space.high[3]:.1f}")
print(f"\n행동: 0=왼쪽, 1=오른쪽")
print(f"\n초기 관찰값: {observation}")

종료 조건

CartPole 에피소드는 다음 조건에서 종료됩니다.

  • 막대 각도가 12도를 초과
  • 카트가 화면 경계(2.4 단위)를 벗어남
  • 에피소드 길이가 500 스텝을 초과 (truncation)

무작위 에이전트 (Random Agent)

가장 단순한 에이전트는 환경의 행동 공간에서 무작위로 행동을 선택하는 것입니다.

import gymnasium as gym
import numpy as np

def run_random_agent(env_name, n_episodes=100):
    """무작위 에이전트 실행 및 성능 측정"""
    env = gym.make(env_name)
    episode_rewards = []
    episode_lengths = []

    for episode in range(n_episodes):
        obs, info = env.reset()
        total_reward = 0
        steps = 0

        while True:
            action = env.action_space.sample()
            obs, reward, terminated, truncated, info = env.step(action)
            total_reward += reward
            steps += 1

            if terminated or truncated:
                break

        episode_rewards.append(total_reward)
        episode_lengths.append(steps)

    env.close()

    print(f"=== {env_name} 무작위 에이전트 결과 ({n_episodes}회) ===")
    print(f"평균 보상: {np.mean(episode_rewards):.2f} +/- {np.std(episode_rewards):.2f}")
    print(f"최대 보상: {np.max(episode_rewards):.2f}")
    print(f"최소 보상: {np.min(episode_rewards):.2f}")
    print(f"평균 에피소드 길이: {np.mean(episode_lengths):.1f}")

    return episode_rewards

# CartPole에서 무작위 에이전트 실행
rewards = run_random_agent("CartPole-v1", n_episodes=1000)

무작위 에이전트의 CartPole 평균 보상은 대략 20 내외입니다. 최대 500점까지 가능하므로 무작위 에이전트는 매우 나쁜 성능을 보입니다.


단순한 규칙 기반 에이전트

학습 없이도 간단한 규칙으로 무작위 에이전트보다 훨씬 좋은 성능을 낼 수 있습니다.

def run_heuristic_agent(n_episodes=100):
    """휴리스틱 에이전트: 막대가 기울어진 방향으로 카트를 움직임"""
    env = gym.make("CartPole-v1")
    episode_rewards = []

    for episode in range(n_episodes):
        obs, info = env.reset()
        total_reward = 0

        while True:
            # 관찰값에서 막대 각도(인덱스 2)를 확인
            pole_angle = obs[2]

            # 간단한 규칙: 막대가 오른쪽으로 기울면 오른쪽으로
            if pole_angle > 0:
                action = 1
            else:
                action = 0

            obs, reward, terminated, truncated, info = env.step(action)
            total_reward += reward

            if terminated or truncated:
                break

        episode_rewards.append(total_reward)

    env.close()

    print(f"=== 휴리스틱 에이전트 결과 ({n_episodes}회) ===")
    print(f"평균 보상: {np.mean(episode_rewards):.2f}")
    print(f"최대 보상: {np.max(episode_rewards):.2f}")

    return episode_rewards

rewards = run_heuristic_agent()

이 단순한 규칙만으로도 평균 40~60점 정도의 성능을 얻을 수 있습니다. 하지만 최적 성능(500점)에는 여전히 한참 못 미칩니다.


Gym 래퍼 (Wrappers)

Gym 래퍼는 환경을 감싸서 관찰, 행동, 보상 등을 변환하는 기능을 제공합니다. 래퍼를 사용하면 환경의 핵심 로직을 수정하지 않고도 다양한 전처리를 적용할 수 있습니다.

래퍼의 종류

import gymnasium as gym
from gymnasium import Wrapper, ObservationWrapper, ActionWrapper, RewardWrapper

class ClipRewardWrapper(RewardWrapper):
    """보상을 -1, 0, +1로 클리핑하는 래퍼"""
    def reward(self, reward):
        if reward > 0:
            return 1.0
        elif reward < 0:
            return -1.0
        return 0.0

class NormalizeObservationWrapper(ObservationWrapper):
    """관찰값을 정규화하는 래퍼"""
    def __init__(self, env):
        super().__init__(env)
        self.obs_mean = None
        self.obs_var = None
        self.count = 0

    def observation(self, observation):
        # 간단한 min-max 정규화
        low = self.observation_space.low
        high = self.observation_space.high
        # 무한대 값 처리
        low = np.clip(low, -10, 10)
        high = np.clip(high, -10, 10)
        return (observation - low) / (high - low + 1e-8)

래퍼 체이닝

여러 래퍼를 순서대로 적용하여 복잡한 전처리 파이프라인을 구성할 수 있습니다.

class EpisodeStatsWrapper(Wrapper):
    """에피소드 통계를 기록하는 래퍼"""
    def __init__(self, env):
        super().__init__(env)
        self.episode_reward = 0
        self.episode_length = 0
        self.episode_rewards = []
        self.episode_lengths = []

    def reset(self, **kwargs):
        if self.episode_length > 0:
            self.episode_rewards.append(self.episode_reward)
            self.episode_lengths.append(self.episode_length)
        self.episode_reward = 0
        self.episode_length = 0
        return self.env.reset(**kwargs)

    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)
        self.episode_reward += reward
        self.episode_length += 1
        return obs, reward, terminated, truncated, info

    def get_stats(self):
        return {
            'mean_reward': np.mean(self.episode_rewards) if self.episode_rewards else 0,
            'mean_length': np.mean(self.episode_lengths) if self.episode_lengths else 0,
            'n_episodes': len(self.episode_rewards),
        }

# 래퍼 적용 예시
env = gym.make("CartPole-v1")
env = EpisodeStatsWrapper(env)
env = ClipRewardWrapper(env)

Atari 환경 전처리

Atari 게임 환경은 210x160 크기의 RGB 이미지를 관찰로 제공합니다. 이를 그대로 사용하면 입력이 너무 크므로 여러 전처리 과정이 필요합니다.

class AtariPreprocessingWrapper(ObservationWrapper):
    """Atari 환경 전처리 래퍼"""
    def __init__(self, env, frame_size=84):
        super().__init__(env)
        self.frame_size = frame_size
        self.observation_space = gym.spaces.Box(
            low=0, high=255,
            shape=(frame_size, frame_size, 1),
            dtype=np.uint8
        )

    def observation(self, obs):
        # RGB를 그레이스케일로 변환
        gray = np.mean(obs, axis=2, keepdims=True).astype(np.uint8)
        # 크기 조정 (간단한 다운샘플링)
        h, w = gray.shape[:2]
        h_step = h // self.frame_size
        w_step = w // self.frame_size
        resized = gray[::h_step, ::w_step][:self.frame_size, :self.frame_size]
        return resized

class FrameStackWrapper(ObservationWrapper):
    """연속 프레임을 쌓아서 하나의 관찰로 만드는 래퍼"""
    def __init__(self, env, n_frames=4):
        super().__init__(env)
        self.n_frames = n_frames
        self.frames = []
        old_space = env.observation_space
        self.observation_space = gym.spaces.Box(
            low=0, high=255,
            shape=(old_space.shape[0], old_space.shape[1], n_frames),
            dtype=np.uint8
        )

    def reset(self, **kwargs):
        obs, info = self.env.reset(**kwargs)
        self.frames = [obs] * self.n_frames
        return np.concatenate(self.frames, axis=-1), info

    def observation(self, obs):
        self.frames.pop(0)
        self.frames.append(obs)
        return np.concatenate(self.frames, axis=-1)

영상 녹화와 모니터링

에이전트의 학습 과정을 시각적으로 확인하기 위해 영상을 녹화할 수 있습니다.

import gymnasium as gym

# RecordVideo 래퍼로 영상 녹화
env = gym.make("CartPole-v1", render_mode="rgb_array")
env = gym.wrappers.RecordVideo(
    env,
    video_folder="./videos",
    episode_trigger=lambda ep: ep % 100 == 0  # 100 에피소드마다 녹화
)

# 에피소드 실행
obs, info = env.reset()
done = False
while not done:
    action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(action)
    done = terminated or truncated

env.close()
print("영상이 ./videos 폴더에 저장되었습니다")

벡터 환경 (Vectorized Environments)

학습 속도를 높이기 위해 여러 환경을 병렬로 실행할 수 있습니다.

import gymnasium as gym

# 4개의 환경을 동시에 실행
envs = gym.make_vec("CartPole-v1", num_envs=4)

# 모든 환경 초기화
observations, infos = envs.reset()
print(f"관찰 shape: {observations.shape}")  # (4, 4) - 4개 환경, 각 4차원 관찰

# 모든 환경에서 동시에 한 스텝 실행
actions = np.array([envs.single_action_space.sample() for _ in range(4)])
observations, rewards, terminateds, truncateds, infos = envs.step(actions)

print(f"보상: {rewards}")  # 4개의 보상값
print(f"종료: {terminateds}")

envs.close()

커스텀 환경 만들기

Gym의 인터페이스를 따라 나만의 환경을 만들 수 있습니다.

import gymnasium as gym
from gymnasium import spaces
import numpy as np

class SimpleCorridorEnv(gym.Env):
    """간단한 1D 복도 환경"""
    metadata = {"render_modes": ["human"]}

    def __init__(self, corridor_length=10):
        super().__init__()
        self.corridor_length = corridor_length

        # 행동: 0=왼쪽, 1=오른쪽
        self.action_space = spaces.Discrete(2)

        # 관찰: 현재 위치 (0 ~ corridor_length-1)
        self.observation_space = spaces.Box(
            low=0, high=corridor_length - 1,
            shape=(1,), dtype=np.float32
        )

        self.position = 0
        self.goal = corridor_length - 1

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.position = 0
        return np.array([self.position], dtype=np.float32), {}

    def step(self, action):
        if action == 0:  # 왼쪽
            self.position = max(0, self.position - 1)
        else:  # 오른쪽
            self.position = min(self.corridor_length - 1, self.position + 1)

        observation = np.array([self.position], dtype=np.float32)

        # 목표 도달 시 큰 보상
        if self.position == self.goal:
            reward = 10.0
            terminated = True
        else:
            reward = -0.1  # 스텝 페널티
            terminated = False

        return observation, reward, terminated, False, {}

# 커스텀 환경 사용
env = SimpleCorridorEnv(corridor_length=5)
obs, info = env.reset()
print(f"초기 위치: {obs[0]}")

for step in range(20):
    action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(action)
    direction = "오른쪽" if action == 1 else "왼쪽"
    print(f"  스텝 {step + 1}: {direction} -> 위치={obs[0]:.0f}, 보상={reward:.1f}")
    if terminated:
        print("  목표 도달!")
        break

정리

이번 글에서 다룬 내용을 정리합니다.

  1. 에이전트의 구조: 정책, 가치 함수, 모델, 메모리로 구성
  2. Gym API: reset(), step(), action_space, observation_space로 일관된 인터페이스 제공
  3. 공간 타입: Discrete(이산), Box(연속) 등 다양한 공간 타입 지원
  4. CartPole: 강화학습 입문의 대표 환경, 무작위 에이전트로 약 20점, 최적 500점
  5. 래퍼: 환경을 감싸서 관찰, 행동, 보상을 변환하는 유연한 메커니즘
  6. 벡터 환경: 여러 환경을 병렬로 실행하여 학습 속도 향상

다음 글에서는 PyTorch의 기초를 다루고, 신경망을 이용한 강화학습의 기반을 쌓겠습니다.

[Deep RL] 02. Getting Started with Reinforcement Learning Using OpenAI Gym

Anatomy of an Agent

In reinforcement learning, an agent is not simply an entity that interacts with the environment -- it internally has several core components.

Core Components of an Agent

  1. Policy: A rule that determines what action to take in the current state
  2. Value Function: Estimates the expected reward for each state or state-action pair
  3. Model: Internally simulates environment dynamics (optional)
  4. Memory: Stores past experiences for learning
class SimpleAgent:
    """에이전트의 기본 구조"""
    def __init__(self, action_space):
        self.action_space = action_space
        self.memory = []

    def select_action(self, observation):
        """정책: 관찰을 받아 행동을 결정"""
        raise NotImplementedError

    def store_experience(self, obs, action, reward, next_obs, done):
        """경험을 메모리에 저장"""
        self.memory.append((obs, action, reward, next_obs, done))

    def learn(self):
        """저장된 경험으로부터 학습"""
        raise NotImplementedError

Introduction to OpenAI Gym

OpenAI Gym (now renamed to Gymnasium) is a standard tool for developing and comparing reinforcement learning algorithms. It provides diverse environments through a consistent interface.

Installation

pip install gymnasium
pip install gymnasium[classic-control]  # CartPole 등 클래식 환경
pip install gymnasium[atari]            # Atari 게임 환경
pip install gymnasium[box2d]            # Box2D 물리 환경

Core API

All Gym environments follow the same interface. This is Gym's greatest advantage.

import gymnasium as gym

# 환경 생성
env = gym.make("CartPole-v1")

# 환경 초기화 - 초기 관찰과 정보를 반환
observation, info = env.reset()

# 한 스텝 실행 - 행동을 전달하고 결과를 받음
action = env.action_space.sample()  # 무작위 행동 선택
observation, reward, terminated, truncated, info = env.step(action)

# 종료 조건 확인
done = terminated or truncated

# 환경 정리
env.close()

Action Space and Observation Space

Gym environments have two core attributes: action space and observation space.

Discrete Space

Select one from a fixed number of actions.

import gymnasium as gym

env = gym.make("CartPole-v1")

# 행동 공간 확인
print(f"행동 공간: {env.action_space}")
# 출력: Discrete(2)  - 0: 왼쪽으로 밀기, 1: 오른쪽으로 밀기

print(f"행동 개수: {env.action_space.n}")
# 출력: 2

# 무작위 행동 샘플링
random_action = env.action_space.sample()
print(f"무작위 행동: {random_action}")

Continuous Space (Box Space)

A space with continuous value ranges.

env = gym.make("MountainCarContinuous-v0")

# 관찰 공간 확인
print(f"관찰 공간: {env.observation_space}")
print(f"관찰 하한: {env.observation_space.low}")
print(f"관찰 상한: {env.observation_space.high}")
print(f"관찰 차원: {env.observation_space.shape}")

# 행동 공간 확인 (연속적)
print(f"행동 공간: {env.action_space}")
print(f"행동 하한: {env.action_space.low}")
print(f"행동 상한: {env.action_space.high}")

Comparison of Major Environment Spaces

EnvironmentObservation SpaceAction Space
CartPole-v1Box(4,)Discrete(2)
MountainCar-v0Box(2,)Discrete(3)
Pendulum-v1Box(3,)Box(1,)
Pong-v5Box(210,160,3)Discrete(6)

CartPole Environment Analysis

CartPole is the most commonly used environment for reinforcement learning introductions. The goal is to move a cart left and right to prevent a pole mounted on it from falling over.

Environment Details

import gymnasium as gym
import numpy as np

env = gym.make("CartPole-v1")
observation, info = env.reset()

print("=== CartPole-v1 환경 정보 ===")
print(f"관찰 공간 차원: {env.observation_space.shape[0]}")
print(f"  [0] 카트 위치: {env.observation_space.low[0]:.1f} ~ {env.observation_space.high[0]:.1f}")
print(f"  [1] 카트 속도: {env.observation_space.low[1]:.1f} ~ {env.observation_space.high[1]:.1f}")
print(f"  [2] 막대 각도: {env.observation_space.low[2]:.4f} ~ {env.observation_space.high[2]:.4f}")
print(f"  [3] 막대 각속도: {env.observation_space.low[3]:.1f} ~ {env.observation_space.high[3]:.1f}")
print(f"\n행동: 0=왼쪽, 1=오른쪽")
print(f"\n초기 관찰값: {observation}")

Termination Conditions

A CartPole episode terminates under the following conditions:

  • Pole angle exceeds 12 degrees
  • Cart moves beyond screen boundary (2.4 units)
  • Episode length exceeds 500 steps (truncation)

Random Agent

The simplest agent randomly selects actions from the environment's action space.

import gymnasium as gym
import numpy as np

def run_random_agent(env_name, n_episodes=100):
    """무작위 에이전트 실행 및 성능 측정"""
    env = gym.make(env_name)
    episode_rewards = []
    episode_lengths = []

    for episode in range(n_episodes):
        obs, info = env.reset()
        total_reward = 0
        steps = 0

        while True:
            action = env.action_space.sample()
            obs, reward, terminated, truncated, info = env.step(action)
            total_reward += reward
            steps += 1

            if terminated or truncated:
                break

        episode_rewards.append(total_reward)
        episode_lengths.append(steps)

    env.close()

    print(f"=== {env_name} 무작위 에이전트 결과 ({n_episodes}회) ===")
    print(f"평균 보상: {np.mean(episode_rewards):.2f} +/- {np.std(episode_rewards):.2f}")
    print(f"최대 보상: {np.max(episode_rewards):.2f}")
    print(f"최소 보상: {np.min(episode_rewards):.2f}")
    print(f"평균 에피소드 길이: {np.mean(episode_lengths):.1f}")

    return episode_rewards

# CartPole에서 무작위 에이전트 실행
rewards = run_random_agent("CartPole-v1", n_episodes=1000)

The random agent's average reward in CartPole is approximately 20. Since the maximum possible score is 500, the random agent performs very poorly.


Simple Rule-Based Agent

Even without learning, simple rules can achieve much better performance than a random agent.

def run_heuristic_agent(n_episodes=100):
    """휴리스틱 에이전트: 막대가 기울어진 방향으로 카트를 움직임"""
    env = gym.make("CartPole-v1")
    episode_rewards = []

    for episode in range(n_episodes):
        obs, info = env.reset()
        total_reward = 0

        while True:
            # 관찰값에서 막대 각도(인덱스 2)를 확인
            pole_angle = obs[2]

            # 간단한 규칙: 막대가 오른쪽으로 기울면 오른쪽으로
            if pole_angle > 0:
                action = 1
            else:
                action = 0

            obs, reward, terminated, truncated, info = env.step(action)
            total_reward += reward

            if terminated or truncated:
                break

        episode_rewards.append(total_reward)

    env.close()

    print(f"=== 휴리스틱 에이전트 결과 ({n_episodes}회) ===")
    print(f"평균 보상: {np.mean(episode_rewards):.2f}")
    print(f"최대 보상: {np.max(episode_rewards):.2f}")

    return episode_rewards

rewards = run_heuristic_agent()

This simple rule alone can achieve an average score of around 40-60. However, it still falls far short of optimal performance (500).


Gym Wrappers

Gym wrappers provide functionality to wrap environments and transform observations, actions, and rewards. Using wrappers, you can apply various preprocessing without modifying the core logic of the environment.

Types of Wrappers

import gymnasium as gym
from gymnasium import Wrapper, ObservationWrapper, ActionWrapper, RewardWrapper

class ClipRewardWrapper(RewardWrapper):
    """보상을 -1, 0, +1로 클리핑하는 래퍼"""
    def reward(self, reward):
        if reward > 0:
            return 1.0
        elif reward < 0:
            return -1.0
        return 0.0

class NormalizeObservationWrapper(ObservationWrapper):
    """관찰값을 정규화하는 래퍼"""
    def __init__(self, env):
        super().__init__(env)
        self.obs_mean = None
        self.obs_var = None
        self.count = 0

    def observation(self, observation):
        # 간단한 min-max 정규화
        low = self.observation_space.low
        high = self.observation_space.high
        # 무한대 값 처리
        low = np.clip(low, -10, 10)
        high = np.clip(high, -10, 10)
        return (observation - low) / (high - low + 1e-8)

Wrapper Chaining

Multiple wrappers can be applied sequentially to construct complex preprocessing pipelines.

class EpisodeStatsWrapper(Wrapper):
    """에피소드 통계를 기록하는 래퍼"""
    def __init__(self, env):
        super().__init__(env)
        self.episode_reward = 0
        self.episode_length = 0
        self.episode_rewards = []
        self.episode_lengths = []

    def reset(self, **kwargs):
        if self.episode_length > 0:
            self.episode_rewards.append(self.episode_reward)
            self.episode_lengths.append(self.episode_length)
        self.episode_reward = 0
        self.episode_length = 0
        return self.env.reset(**kwargs)

    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)
        self.episode_reward += reward
        self.episode_length += 1
        return obs, reward, terminated, truncated, info

    def get_stats(self):
        return {
            'mean_reward': np.mean(self.episode_rewards) if self.episode_rewards else 0,
            'mean_length': np.mean(self.episode_lengths) if self.episode_lengths else 0,
            'n_episodes': len(self.episode_rewards),
        }

# 래퍼 적용 예시
env = gym.make("CartPole-v1")
env = EpisodeStatsWrapper(env)
env = ClipRewardWrapper(env)

Atari Environment Preprocessing

Atari game environments provide 210x160 RGB images as observations. Using them directly makes the input too large, requiring several preprocessing steps.

class AtariPreprocessingWrapper(ObservationWrapper):
    """Atari 환경 전처리 래퍼"""
    def __init__(self, env, frame_size=84):
        super().__init__(env)
        self.frame_size = frame_size
        self.observation_space = gym.spaces.Box(
            low=0, high=255,
            shape=(frame_size, frame_size, 1),
            dtype=np.uint8
        )

    def observation(self, obs):
        # RGB를 그레이스케일로 변환
        gray = np.mean(obs, axis=2, keepdims=True).astype(np.uint8)
        # 크기 조정 (간단한 다운샘플링)
        h, w = gray.shape[:2]
        h_step = h // self.frame_size
        w_step = w // self.frame_size
        resized = gray[::h_step, ::w_step][:self.frame_size, :self.frame_size]
        return resized

class FrameStackWrapper(ObservationWrapper):
    """연속 프레임을 쌓아서 하나의 관찰로 만드는 래퍼"""
    def __init__(self, env, n_frames=4):
        super().__init__(env)
        self.n_frames = n_frames
        self.frames = []
        old_space = env.observation_space
        self.observation_space = gym.spaces.Box(
            low=0, high=255,
            shape=(old_space.shape[0], old_space.shape[1], n_frames),
            dtype=np.uint8
        )

    def reset(self, **kwargs):
        obs, info = self.env.reset(**kwargs)
        self.frames = [obs] * self.n_frames
        return np.concatenate(self.frames, axis=-1), info

    def observation(self, obs):
        self.frames.pop(0)
        self.frames.append(obs)
        return np.concatenate(self.frames, axis=-1)

Video Recording and Monitoring

You can record videos to visually check the agent's learning process.

import gymnasium as gym

# RecordVideo 래퍼로 영상 녹화
env = gym.make("CartPole-v1", render_mode="rgb_array")
env = gym.wrappers.RecordVideo(
    env,
    video_folder="./videos",
    episode_trigger=lambda ep: ep % 100 == 0  # 100 에피소드마다 녹화
)

# 에피소드 실행
obs, info = env.reset()
done = False
while not done:
    action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(action)
    done = terminated or truncated

env.close()
print("영상이 ./videos 폴더에 저장되었습니다")

Vectorized Environments

Multiple environments can be run in parallel to speed up training.

import gymnasium as gym

# 4개의 환경을 동시에 실행
envs = gym.make_vec("CartPole-v1", num_envs=4)

# 모든 환경 초기화
observations, infos = envs.reset()
print(f"관찰 shape: {observations.shape}")  # (4, 4) - 4개 환경, 각 4차원 관찰

# 모든 환경에서 동시에 한 스텝 실행
actions = np.array([envs.single_action_space.sample() for _ in range(4)])
observations, rewards, terminateds, truncateds, infos = envs.step(actions)

print(f"보상: {rewards}")  # 4개의 보상값
print(f"종료: {terminateds}")

envs.close()

Creating Custom Environments

You can create your own environments following Gym's interface.

import gymnasium as gym
from gymnasium import spaces
import numpy as np

class SimpleCorridorEnv(gym.Env):
    """간단한 1D 복도 환경"""
    metadata = {"render_modes": ["human"]}

    def __init__(self, corridor_length=10):
        super().__init__()
        self.corridor_length = corridor_length

        # 행동: 0=왼쪽, 1=오른쪽
        self.action_space = spaces.Discrete(2)

        # 관찰: 현재 위치 (0 ~ corridor_length-1)
        self.observation_space = spaces.Box(
            low=0, high=corridor_length - 1,
            shape=(1,), dtype=np.float32
        )

        self.position = 0
        self.goal = corridor_length - 1

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.position = 0
        return np.array([self.position], dtype=np.float32), {}

    def step(self, action):
        if action == 0:  # 왼쪽
            self.position = max(0, self.position - 1)
        else:  # 오른쪽
            self.position = min(self.corridor_length - 1, self.position + 1)

        observation = np.array([self.position], dtype=np.float32)

        # 목표 도달 시 큰 보상
        if self.position == self.goal:
            reward = 10.0
            terminated = True
        else:
            reward = -0.1  # 스텝 페널티
            terminated = False

        return observation, reward, terminated, False, {}

# 커스텀 환경 사용
env = SimpleCorridorEnv(corridor_length=5)
obs, info = env.reset()
print(f"초기 위치: {obs[0]}")

for step in range(20):
    action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(action)
    direction = "오른쪽" if action == 1 else "왼쪽"
    print(f"  스텝 {step + 1}: {direction} -> 위치={obs[0]:.0f}, 보상={reward:.1f}")
    if terminated:
        print("  목표 도달!")
        break

Summary

Here is a summary of the content covered in this article:

  1. Agent structure: Composed of policy, value function, model, and memory
  2. Gym API: Provides a consistent interface with reset(), step(), action_space, observation_space
  3. Space types: Supports various space types including Discrete and Box (continuous)
  4. CartPole: The representative introductory RL environment, scoring about 20 with a random agent and 500 optimally
  5. Wrappers: A flexible mechanism for transforming observations, actions, and rewards by wrapping environments
  6. Vectorized environments: Running multiple environments in parallel to improve training speed

In the next article, we will cover the basics of PyTorch and build the foundation for neural network-based reinforcement learning.