Split View: [심층 강화학습] 02. OpenAI Gym으로 시작하는 강화학습
[심층 강화학습] 02. OpenAI Gym으로 시작하는 강화학습
에이전트의 구조 (Anatomy of an Agent)
강화학습에서 에이전트는 단순히 환경과 상호작용하는 주체가 아니라, 내부적으로 몇 가지 핵심 구성 요소를 갖고 있습니다.
에이전트의 핵심 구성 요소
- 정책 (Policy): 현재 상태에서 어떤 행동을 할지 결정하는 규칙
- 가치 함수 (Value Function): 각 상태 또는 상태-행동 쌍의 기대 보상을 추정
- 모델 (Model): 환경의 동작을 내부적으로 시뮬레이션 (선택적)
- 메모리 (Memory): 과거 경험을 저장하여 학습에 활용
class SimpleAgent:
"""에이전트의 기본 구조"""
def __init__(self, action_space):
self.action_space = action_space
self.memory = []
def select_action(self, observation):
"""정책: 관찰을 받아 행동을 결정"""
raise NotImplementedError
def store_experience(self, obs, action, reward, next_obs, done):
"""경험을 메모리에 저장"""
self.memory.append((obs, action, reward, next_obs, done))
def learn(self):
"""저장된 경험으로부터 학습"""
raise NotImplementedError
OpenAI Gym 소개
OpenAI Gym(현재 Gymnasium으로 이름이 변경됨)은 강화학습 알고리즘을 개발하고 비교하기 위한 표준 도구입니다. 다양한 환경을 일관된 인터페이스로 제공합니다.
설치
pip install gymnasium
pip install gymnasium[classic-control] # CartPole 등 클래식 환경
pip install gymnasium[atari] # Atari 게임 환경
pip install gymnasium[box2d] # Box2D 물리 환경
핵심 API
Gym의 모든 환경은 동일한 인터페이스를 따릅니다. 이것이 Gym의 가장 큰 장점입니다.
import gymnasium as gym
# 환경 생성
env = gym.make("CartPole-v1")
# 환경 초기화 - 초기 관찰과 정보를 반환
observation, info = env.reset()
# 한 스텝 실행 - 행동을 전달하고 결과를 받음
action = env.action_space.sample() # 무작위 행동 선택
observation, reward, terminated, truncated, info = env.step(action)
# 종료 조건 확인
done = terminated or truncated
# 환경 정리
env.close()
행동 공간과 관찰 공간
Gym 환경은 행동 공간(action space)과 관찰 공간(observation space)이라는 두 가지 핵심 속성을 가집니다.
이산 공간 (Discrete Space)
정해진 개수의 행동 중 하나를 선택합니다.
import gymnasium as gym
env = gym.make("CartPole-v1")
# 행동 공간 확인
print(f"행동 공간: {env.action_space}")
# 출력: Discrete(2) - 0: 왼쪽으로 밀기, 1: 오른쪽으로 밀기
print(f"행동 개수: {env.action_space.n}")
# 출력: 2
# 무작위 행동 샘플링
random_action = env.action_space.sample()
print(f"무작위 행동: {random_action}")
연속 공간 (Box Space)
연속적인 값의 범위를 가지는 공간입니다.
env = gym.make("MountainCarContinuous-v0")
# 관찰 공간 확인
print(f"관찰 공간: {env.observation_space}")
print(f"관찰 하한: {env.observation_space.low}")
print(f"관찰 상한: {env.observation_space.high}")
print(f"관찰 차원: {env.observation_space.shape}")
# 행동 공간 확인 (연속적)
print(f"행동 공간: {env.action_space}")
print(f"행동 하한: {env.action_space.low}")
print(f"행동 상한: {env.action_space.high}")
주요 환경의 공간 비교
| 환경 | 관찰 공간 | 행동 공간 |
|---|---|---|
| CartPole-v1 | Box(4,) | Discrete(2) |
| MountainCar-v0 | Box(2,) | Discrete(3) |
| Pendulum-v1 | Box(3,) | Box(1,) |
| Pong-v5 | Box(210,160,3) | Discrete(6) |
CartPole 환경 분석
CartPole은 강화학습 입문에서 가장 많이 사용되는 환경입니다. 카트 위에 세워진 막대가 쓰러지지 않도록 카트를 좌우로 움직이는 문제입니다.
환경 세부 사항
import gymnasium as gym
import numpy as np
env = gym.make("CartPole-v1")
observation, info = env.reset()
print("=== CartPole-v1 환경 정보 ===")
print(f"관찰 공간 차원: {env.observation_space.shape[0]}")
print(f" [0] 카트 위치: {env.observation_space.low[0]:.1f} ~ {env.observation_space.high[0]:.1f}")
print(f" [1] 카트 속도: {env.observation_space.low[1]:.1f} ~ {env.observation_space.high[1]:.1f}")
print(f" [2] 막대 각도: {env.observation_space.low[2]:.4f} ~ {env.observation_space.high[2]:.4f}")
print(f" [3] 막대 각속도: {env.observation_space.low[3]:.1f} ~ {env.observation_space.high[3]:.1f}")
print(f"\n행동: 0=왼쪽, 1=오른쪽")
print(f"\n초기 관찰값: {observation}")
종료 조건
CartPole 에피소드는 다음 조건에서 종료됩니다.
- 막대 각도가 12도를 초과
- 카트가 화면 경계(2.4 단위)를 벗어남
- 에피소드 길이가 500 스텝을 초과 (truncation)
무작위 에이전트 (Random Agent)
가장 단순한 에이전트는 환경의 행동 공간에서 무작위로 행동을 선택하는 것입니다.
import gymnasium as gym
import numpy as np
def run_random_agent(env_name, n_episodes=100):
"""무작위 에이전트 실행 및 성능 측정"""
env = gym.make(env_name)
episode_rewards = []
episode_lengths = []
for episode in range(n_episodes):
obs, info = env.reset()
total_reward = 0
steps = 0
while True:
action = env.action_space.sample()
obs, reward, terminated, truncated, info = env.step(action)
total_reward += reward
steps += 1
if terminated or truncated:
break
episode_rewards.append(total_reward)
episode_lengths.append(steps)
env.close()
print(f"=== {env_name} 무작위 에이전트 결과 ({n_episodes}회) ===")
print(f"평균 보상: {np.mean(episode_rewards):.2f} +/- {np.std(episode_rewards):.2f}")
print(f"최대 보상: {np.max(episode_rewards):.2f}")
print(f"최소 보상: {np.min(episode_rewards):.2f}")
print(f"평균 에피소드 길이: {np.mean(episode_lengths):.1f}")
return episode_rewards
# CartPole에서 무작위 에이전트 실행
rewards = run_random_agent("CartPole-v1", n_episodes=1000)
무작위 에이전트의 CartPole 평균 보상은 대략 20 내외입니다. 최대 500점까지 가능하므로 무작위 에이전트는 매우 나쁜 성능을 보입니다.
단순한 규칙 기반 에이전트
학습 없이도 간단한 규칙으로 무작위 에이전트보다 훨씬 좋은 성능을 낼 수 있습니다.
def run_heuristic_agent(n_episodes=100):
"""휴리스틱 에이전트: 막대가 기울어진 방향으로 카트를 움직임"""
env = gym.make("CartPole-v1")
episode_rewards = []
for episode in range(n_episodes):
obs, info = env.reset()
total_reward = 0
while True:
# 관찰값에서 막대 각도(인덱스 2)를 확인
pole_angle = obs[2]
# 간단한 규칙: 막대가 오른쪽으로 기울면 오른쪽으로
if pole_angle > 0:
action = 1
else:
action = 0
obs, reward, terminated, truncated, info = env.step(action)
total_reward += reward
if terminated or truncated:
break
episode_rewards.append(total_reward)
env.close()
print(f"=== 휴리스틱 에이전트 결과 ({n_episodes}회) ===")
print(f"평균 보상: {np.mean(episode_rewards):.2f}")
print(f"최대 보상: {np.max(episode_rewards):.2f}")
return episode_rewards
rewards = run_heuristic_agent()
이 단순한 규칙만으로도 평균 40~60점 정도의 성능을 얻을 수 있습니다. 하지만 최적 성능(500점)에는 여전히 한참 못 미칩니다.
Gym 래퍼 (Wrappers)
Gym 래퍼는 환경을 감싸서 관찰, 행동, 보상 등을 변환하는 기능을 제공합니다. 래퍼를 사용하면 환경의 핵심 로직을 수정하지 않고도 다양한 전처리를 적용할 수 있습니다.
래퍼의 종류
import gymnasium as gym
from gymnasium import Wrapper, ObservationWrapper, ActionWrapper, RewardWrapper
class ClipRewardWrapper(RewardWrapper):
"""보상을 -1, 0, +1로 클리핑하는 래퍼"""
def reward(self, reward):
if reward > 0:
return 1.0
elif reward < 0:
return -1.0
return 0.0
class NormalizeObservationWrapper(ObservationWrapper):
"""관찰값을 정규화하는 래퍼"""
def __init__(self, env):
super().__init__(env)
self.obs_mean = None
self.obs_var = None
self.count = 0
def observation(self, observation):
# 간단한 min-max 정규화
low = self.observation_space.low
high = self.observation_space.high
# 무한대 값 처리
low = np.clip(low, -10, 10)
high = np.clip(high, -10, 10)
return (observation - low) / (high - low + 1e-8)
래퍼 체이닝
여러 래퍼를 순서대로 적용하여 복잡한 전처리 파이프라인을 구성할 수 있습니다.
class EpisodeStatsWrapper(Wrapper):
"""에피소드 통계를 기록하는 래퍼"""
def __init__(self, env):
super().__init__(env)
self.episode_reward = 0
self.episode_length = 0
self.episode_rewards = []
self.episode_lengths = []
def reset(self, **kwargs):
if self.episode_length > 0:
self.episode_rewards.append(self.episode_reward)
self.episode_lengths.append(self.episode_length)
self.episode_reward = 0
self.episode_length = 0
return self.env.reset(**kwargs)
def step(self, action):
obs, reward, terminated, truncated, info = self.env.step(action)
self.episode_reward += reward
self.episode_length += 1
return obs, reward, terminated, truncated, info
def get_stats(self):
return {
'mean_reward': np.mean(self.episode_rewards) if self.episode_rewards else 0,
'mean_length': np.mean(self.episode_lengths) if self.episode_lengths else 0,
'n_episodes': len(self.episode_rewards),
}
# 래퍼 적용 예시
env = gym.make("CartPole-v1")
env = EpisodeStatsWrapper(env)
env = ClipRewardWrapper(env)
Atari 환경 전처리
Atari 게임 환경은 210x160 크기의 RGB 이미지를 관찰로 제공합니다. 이를 그대로 사용하면 입력이 너무 크므로 여러 전처리 과정이 필요합니다.
class AtariPreprocessingWrapper(ObservationWrapper):
"""Atari 환경 전처리 래퍼"""
def __init__(self, env, frame_size=84):
super().__init__(env)
self.frame_size = frame_size
self.observation_space = gym.spaces.Box(
low=0, high=255,
shape=(frame_size, frame_size, 1),
dtype=np.uint8
)
def observation(self, obs):
# RGB를 그레이스케일로 변환
gray = np.mean(obs, axis=2, keepdims=True).astype(np.uint8)
# 크기 조정 (간단한 다운샘플링)
h, w = gray.shape[:2]
h_step = h // self.frame_size
w_step = w // self.frame_size
resized = gray[::h_step, ::w_step][:self.frame_size, :self.frame_size]
return resized
class FrameStackWrapper(ObservationWrapper):
"""연속 프레임을 쌓아서 하나의 관찰로 만드는 래퍼"""
def __init__(self, env, n_frames=4):
super().__init__(env)
self.n_frames = n_frames
self.frames = []
old_space = env.observation_space
self.observation_space = gym.spaces.Box(
low=0, high=255,
shape=(old_space.shape[0], old_space.shape[1], n_frames),
dtype=np.uint8
)
def reset(self, **kwargs):
obs, info = self.env.reset(**kwargs)
self.frames = [obs] * self.n_frames
return np.concatenate(self.frames, axis=-1), info
def observation(self, obs):
self.frames.pop(0)
self.frames.append(obs)
return np.concatenate(self.frames, axis=-1)
영상 녹화와 모니터링
에이전트의 학습 과정을 시각적으로 확인하기 위해 영상을 녹화할 수 있습니다.
import gymnasium as gym
# RecordVideo 래퍼로 영상 녹화
env = gym.make("CartPole-v1", render_mode="rgb_array")
env = gym.wrappers.RecordVideo(
env,
video_folder="./videos",
episode_trigger=lambda ep: ep % 100 == 0 # 100 에피소드마다 녹화
)
# 에피소드 실행
obs, info = env.reset()
done = False
while not done:
action = env.action_space.sample()
obs, reward, terminated, truncated, info = env.step(action)
done = terminated or truncated
env.close()
print("영상이 ./videos 폴더에 저장되었습니다")
벡터 환경 (Vectorized Environments)
학습 속도를 높이기 위해 여러 환경을 병렬로 실행할 수 있습니다.
import gymnasium as gym
# 4개의 환경을 동시에 실행
envs = gym.make_vec("CartPole-v1", num_envs=4)
# 모든 환경 초기화
observations, infos = envs.reset()
print(f"관찰 shape: {observations.shape}") # (4, 4) - 4개 환경, 각 4차원 관찰
# 모든 환경에서 동시에 한 스텝 실행
actions = np.array([envs.single_action_space.sample() for _ in range(4)])
observations, rewards, terminateds, truncateds, infos = envs.step(actions)
print(f"보상: {rewards}") # 4개의 보상값
print(f"종료: {terminateds}")
envs.close()
커스텀 환경 만들기
Gym의 인터페이스를 따라 나만의 환경을 만들 수 있습니다.
import gymnasium as gym
from gymnasium import spaces
import numpy as np
class SimpleCorridorEnv(gym.Env):
"""간단한 1D 복도 환경"""
metadata = {"render_modes": ["human"]}
def __init__(self, corridor_length=10):
super().__init__()
self.corridor_length = corridor_length
# 행동: 0=왼쪽, 1=오른쪽
self.action_space = spaces.Discrete(2)
# 관찰: 현재 위치 (0 ~ corridor_length-1)
self.observation_space = spaces.Box(
low=0, high=corridor_length - 1,
shape=(1,), dtype=np.float32
)
self.position = 0
self.goal = corridor_length - 1
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self.position = 0
return np.array([self.position], dtype=np.float32), {}
def step(self, action):
if action == 0: # 왼쪽
self.position = max(0, self.position - 1)
else: # 오른쪽
self.position = min(self.corridor_length - 1, self.position + 1)
observation = np.array([self.position], dtype=np.float32)
# 목표 도달 시 큰 보상
if self.position == self.goal:
reward = 10.0
terminated = True
else:
reward = -0.1 # 스텝 페널티
terminated = False
return observation, reward, terminated, False, {}
# 커스텀 환경 사용
env = SimpleCorridorEnv(corridor_length=5)
obs, info = env.reset()
print(f"초기 위치: {obs[0]}")
for step in range(20):
action = env.action_space.sample()
obs, reward, terminated, truncated, info = env.step(action)
direction = "오른쪽" if action == 1 else "왼쪽"
print(f" 스텝 {step + 1}: {direction} -> 위치={obs[0]:.0f}, 보상={reward:.1f}")
if terminated:
print(" 목표 도달!")
break
정리
이번 글에서 다룬 내용을 정리합니다.
- 에이전트의 구조: 정책, 가치 함수, 모델, 메모리로 구성
- Gym API:
reset(),step(),action_space,observation_space로 일관된 인터페이스 제공 - 공간 타입: Discrete(이산), Box(연속) 등 다양한 공간 타입 지원
- CartPole: 강화학습 입문의 대표 환경, 무작위 에이전트로 약 20점, 최적 500점
- 래퍼: 환경을 감싸서 관찰, 행동, 보상을 변환하는 유연한 메커니즘
- 벡터 환경: 여러 환경을 병렬로 실행하여 학습 속도 향상
다음 글에서는 PyTorch의 기초를 다루고, 신경망을 이용한 강화학습의 기반을 쌓겠습니다.
[Deep RL] 02. Getting Started with Reinforcement Learning Using OpenAI Gym
Anatomy of an Agent
In reinforcement learning, an agent is not simply an entity that interacts with the environment -- it internally has several core components.
Core Components of an Agent
- Policy: A rule that determines what action to take in the current state
- Value Function: Estimates the expected reward for each state or state-action pair
- Model: Internally simulates environment dynamics (optional)
- Memory: Stores past experiences for learning
class SimpleAgent:
"""에이전트의 기본 구조"""
def __init__(self, action_space):
self.action_space = action_space
self.memory = []
def select_action(self, observation):
"""정책: 관찰을 받아 행동을 결정"""
raise NotImplementedError
def store_experience(self, obs, action, reward, next_obs, done):
"""경험을 메모리에 저장"""
self.memory.append((obs, action, reward, next_obs, done))
def learn(self):
"""저장된 경험으로부터 학습"""
raise NotImplementedError
Introduction to OpenAI Gym
OpenAI Gym (now renamed to Gymnasium) is a standard tool for developing and comparing reinforcement learning algorithms. It provides diverse environments through a consistent interface.
Installation
pip install gymnasium
pip install gymnasium[classic-control] # CartPole 등 클래식 환경
pip install gymnasium[atari] # Atari 게임 환경
pip install gymnasium[box2d] # Box2D 물리 환경
Core API
All Gym environments follow the same interface. This is Gym's greatest advantage.
import gymnasium as gym
# 환경 생성
env = gym.make("CartPole-v1")
# 환경 초기화 - 초기 관찰과 정보를 반환
observation, info = env.reset()
# 한 스텝 실행 - 행동을 전달하고 결과를 받음
action = env.action_space.sample() # 무작위 행동 선택
observation, reward, terminated, truncated, info = env.step(action)
# 종료 조건 확인
done = terminated or truncated
# 환경 정리
env.close()
Action Space and Observation Space
Gym environments have two core attributes: action space and observation space.
Discrete Space
Select one from a fixed number of actions.
import gymnasium as gym
env = gym.make("CartPole-v1")
# 행동 공간 확인
print(f"행동 공간: {env.action_space}")
# 출력: Discrete(2) - 0: 왼쪽으로 밀기, 1: 오른쪽으로 밀기
print(f"행동 개수: {env.action_space.n}")
# 출력: 2
# 무작위 행동 샘플링
random_action = env.action_space.sample()
print(f"무작위 행동: {random_action}")
Continuous Space (Box Space)
A space with continuous value ranges.
env = gym.make("MountainCarContinuous-v0")
# 관찰 공간 확인
print(f"관찰 공간: {env.observation_space}")
print(f"관찰 하한: {env.observation_space.low}")
print(f"관찰 상한: {env.observation_space.high}")
print(f"관찰 차원: {env.observation_space.shape}")
# 행동 공간 확인 (연속적)
print(f"행동 공간: {env.action_space}")
print(f"행동 하한: {env.action_space.low}")
print(f"행동 상한: {env.action_space.high}")
Comparison of Major Environment Spaces
| Environment | Observation Space | Action Space |
|---|---|---|
| CartPole-v1 | Box(4,) | Discrete(2) |
| MountainCar-v0 | Box(2,) | Discrete(3) |
| Pendulum-v1 | Box(3,) | Box(1,) |
| Pong-v5 | Box(210,160,3) | Discrete(6) |
CartPole Environment Analysis
CartPole is the most commonly used environment for reinforcement learning introductions. The goal is to move a cart left and right to prevent a pole mounted on it from falling over.
Environment Details
import gymnasium as gym
import numpy as np
env = gym.make("CartPole-v1")
observation, info = env.reset()
print("=== CartPole-v1 환경 정보 ===")
print(f"관찰 공간 차원: {env.observation_space.shape[0]}")
print(f" [0] 카트 위치: {env.observation_space.low[0]:.1f} ~ {env.observation_space.high[0]:.1f}")
print(f" [1] 카트 속도: {env.observation_space.low[1]:.1f} ~ {env.observation_space.high[1]:.1f}")
print(f" [2] 막대 각도: {env.observation_space.low[2]:.4f} ~ {env.observation_space.high[2]:.4f}")
print(f" [3] 막대 각속도: {env.observation_space.low[3]:.1f} ~ {env.observation_space.high[3]:.1f}")
print(f"\n행동: 0=왼쪽, 1=오른쪽")
print(f"\n초기 관찰값: {observation}")
Termination Conditions
A CartPole episode terminates under the following conditions:
- Pole angle exceeds 12 degrees
- Cart moves beyond screen boundary (2.4 units)
- Episode length exceeds 500 steps (truncation)
Random Agent
The simplest agent randomly selects actions from the environment's action space.
import gymnasium as gym
import numpy as np
def run_random_agent(env_name, n_episodes=100):
"""무작위 에이전트 실행 및 성능 측정"""
env = gym.make(env_name)
episode_rewards = []
episode_lengths = []
for episode in range(n_episodes):
obs, info = env.reset()
total_reward = 0
steps = 0
while True:
action = env.action_space.sample()
obs, reward, terminated, truncated, info = env.step(action)
total_reward += reward
steps += 1
if terminated or truncated:
break
episode_rewards.append(total_reward)
episode_lengths.append(steps)
env.close()
print(f"=== {env_name} 무작위 에이전트 결과 ({n_episodes}회) ===")
print(f"평균 보상: {np.mean(episode_rewards):.2f} +/- {np.std(episode_rewards):.2f}")
print(f"최대 보상: {np.max(episode_rewards):.2f}")
print(f"최소 보상: {np.min(episode_rewards):.2f}")
print(f"평균 에피소드 길이: {np.mean(episode_lengths):.1f}")
return episode_rewards
# CartPole에서 무작위 에이전트 실행
rewards = run_random_agent("CartPole-v1", n_episodes=1000)
The random agent's average reward in CartPole is approximately 20. Since the maximum possible score is 500, the random agent performs very poorly.
Simple Rule-Based Agent
Even without learning, simple rules can achieve much better performance than a random agent.
def run_heuristic_agent(n_episodes=100):
"""휴리스틱 에이전트: 막대가 기울어진 방향으로 카트를 움직임"""
env = gym.make("CartPole-v1")
episode_rewards = []
for episode in range(n_episodes):
obs, info = env.reset()
total_reward = 0
while True:
# 관찰값에서 막대 각도(인덱스 2)를 확인
pole_angle = obs[2]
# 간단한 규칙: 막대가 오른쪽으로 기울면 오른쪽으로
if pole_angle > 0:
action = 1
else:
action = 0
obs, reward, terminated, truncated, info = env.step(action)
total_reward += reward
if terminated or truncated:
break
episode_rewards.append(total_reward)
env.close()
print(f"=== 휴리스틱 에이전트 결과 ({n_episodes}회) ===")
print(f"평균 보상: {np.mean(episode_rewards):.2f}")
print(f"최대 보상: {np.max(episode_rewards):.2f}")
return episode_rewards
rewards = run_heuristic_agent()
This simple rule alone can achieve an average score of around 40-60. However, it still falls far short of optimal performance (500).
Gym Wrappers
Gym wrappers provide functionality to wrap environments and transform observations, actions, and rewards. Using wrappers, you can apply various preprocessing without modifying the core logic of the environment.
Types of Wrappers
import gymnasium as gym
from gymnasium import Wrapper, ObservationWrapper, ActionWrapper, RewardWrapper
class ClipRewardWrapper(RewardWrapper):
"""보상을 -1, 0, +1로 클리핑하는 래퍼"""
def reward(self, reward):
if reward > 0:
return 1.0
elif reward < 0:
return -1.0
return 0.0
class NormalizeObservationWrapper(ObservationWrapper):
"""관찰값을 정규화하는 래퍼"""
def __init__(self, env):
super().__init__(env)
self.obs_mean = None
self.obs_var = None
self.count = 0
def observation(self, observation):
# 간단한 min-max 정규화
low = self.observation_space.low
high = self.observation_space.high
# 무한대 값 처리
low = np.clip(low, -10, 10)
high = np.clip(high, -10, 10)
return (observation - low) / (high - low + 1e-8)
Wrapper Chaining
Multiple wrappers can be applied sequentially to construct complex preprocessing pipelines.
class EpisodeStatsWrapper(Wrapper):
"""에피소드 통계를 기록하는 래퍼"""
def __init__(self, env):
super().__init__(env)
self.episode_reward = 0
self.episode_length = 0
self.episode_rewards = []
self.episode_lengths = []
def reset(self, **kwargs):
if self.episode_length > 0:
self.episode_rewards.append(self.episode_reward)
self.episode_lengths.append(self.episode_length)
self.episode_reward = 0
self.episode_length = 0
return self.env.reset(**kwargs)
def step(self, action):
obs, reward, terminated, truncated, info = self.env.step(action)
self.episode_reward += reward
self.episode_length += 1
return obs, reward, terminated, truncated, info
def get_stats(self):
return {
'mean_reward': np.mean(self.episode_rewards) if self.episode_rewards else 0,
'mean_length': np.mean(self.episode_lengths) if self.episode_lengths else 0,
'n_episodes': len(self.episode_rewards),
}
# 래퍼 적용 예시
env = gym.make("CartPole-v1")
env = EpisodeStatsWrapper(env)
env = ClipRewardWrapper(env)
Atari Environment Preprocessing
Atari game environments provide 210x160 RGB images as observations. Using them directly makes the input too large, requiring several preprocessing steps.
class AtariPreprocessingWrapper(ObservationWrapper):
"""Atari 환경 전처리 래퍼"""
def __init__(self, env, frame_size=84):
super().__init__(env)
self.frame_size = frame_size
self.observation_space = gym.spaces.Box(
low=0, high=255,
shape=(frame_size, frame_size, 1),
dtype=np.uint8
)
def observation(self, obs):
# RGB를 그레이스케일로 변환
gray = np.mean(obs, axis=2, keepdims=True).astype(np.uint8)
# 크기 조정 (간단한 다운샘플링)
h, w = gray.shape[:2]
h_step = h // self.frame_size
w_step = w // self.frame_size
resized = gray[::h_step, ::w_step][:self.frame_size, :self.frame_size]
return resized
class FrameStackWrapper(ObservationWrapper):
"""연속 프레임을 쌓아서 하나의 관찰로 만드는 래퍼"""
def __init__(self, env, n_frames=4):
super().__init__(env)
self.n_frames = n_frames
self.frames = []
old_space = env.observation_space
self.observation_space = gym.spaces.Box(
low=0, high=255,
shape=(old_space.shape[0], old_space.shape[1], n_frames),
dtype=np.uint8
)
def reset(self, **kwargs):
obs, info = self.env.reset(**kwargs)
self.frames = [obs] * self.n_frames
return np.concatenate(self.frames, axis=-1), info
def observation(self, obs):
self.frames.pop(0)
self.frames.append(obs)
return np.concatenate(self.frames, axis=-1)
Video Recording and Monitoring
You can record videos to visually check the agent's learning process.
import gymnasium as gym
# RecordVideo 래퍼로 영상 녹화
env = gym.make("CartPole-v1", render_mode="rgb_array")
env = gym.wrappers.RecordVideo(
env,
video_folder="./videos",
episode_trigger=lambda ep: ep % 100 == 0 # 100 에피소드마다 녹화
)
# 에피소드 실행
obs, info = env.reset()
done = False
while not done:
action = env.action_space.sample()
obs, reward, terminated, truncated, info = env.step(action)
done = terminated or truncated
env.close()
print("영상이 ./videos 폴더에 저장되었습니다")
Vectorized Environments
Multiple environments can be run in parallel to speed up training.
import gymnasium as gym
# 4개의 환경을 동시에 실행
envs = gym.make_vec("CartPole-v1", num_envs=4)
# 모든 환경 초기화
observations, infos = envs.reset()
print(f"관찰 shape: {observations.shape}") # (4, 4) - 4개 환경, 각 4차원 관찰
# 모든 환경에서 동시에 한 스텝 실행
actions = np.array([envs.single_action_space.sample() for _ in range(4)])
observations, rewards, terminateds, truncateds, infos = envs.step(actions)
print(f"보상: {rewards}") # 4개의 보상값
print(f"종료: {terminateds}")
envs.close()
Creating Custom Environments
You can create your own environments following Gym's interface.
import gymnasium as gym
from gymnasium import spaces
import numpy as np
class SimpleCorridorEnv(gym.Env):
"""간단한 1D 복도 환경"""
metadata = {"render_modes": ["human"]}
def __init__(self, corridor_length=10):
super().__init__()
self.corridor_length = corridor_length
# 행동: 0=왼쪽, 1=오른쪽
self.action_space = spaces.Discrete(2)
# 관찰: 현재 위치 (0 ~ corridor_length-1)
self.observation_space = spaces.Box(
low=0, high=corridor_length - 1,
shape=(1,), dtype=np.float32
)
self.position = 0
self.goal = corridor_length - 1
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self.position = 0
return np.array([self.position], dtype=np.float32), {}
def step(self, action):
if action == 0: # 왼쪽
self.position = max(0, self.position - 1)
else: # 오른쪽
self.position = min(self.corridor_length - 1, self.position + 1)
observation = np.array([self.position], dtype=np.float32)
# 목표 도달 시 큰 보상
if self.position == self.goal:
reward = 10.0
terminated = True
else:
reward = -0.1 # 스텝 페널티
terminated = False
return observation, reward, terminated, False, {}
# 커스텀 환경 사용
env = SimpleCorridorEnv(corridor_length=5)
obs, info = env.reset()
print(f"초기 위치: {obs[0]}")
for step in range(20):
action = env.action_space.sample()
obs, reward, terminated, truncated, info = env.step(action)
direction = "오른쪽" if action == 1 else "왼쪽"
print(f" 스텝 {step + 1}: {direction} -> 위치={obs[0]:.0f}, 보상={reward:.1f}")
if terminated:
print(" 목표 도달!")
break
Summary
Here is a summary of the content covered in this article:
- Agent structure: Composed of policy, value function, model, and memory
- Gym API: Provides a consistent interface with
reset(),step(),action_space,observation_space - Space types: Supports various space types including Discrete and Box (continuous)
- CartPole: The representative introductory RL environment, scoring about 20 with a random agent and 500 optimally
- Wrappers: A flexible mechanism for transforming observations, actions, and rewards by wrapping environments
- Vectorized environments: Running multiple environments in parallel to improve training speed
In the next article, we will cover the basics of PyTorch and build the foundation for neural network-based reinforcement learning.