Skip to content

Split View: [심층 강화학습] 08. 강화학습으로 주식 트레이딩 만들기

|

[심층 강화학습] 08. 강화학습으로 주식 트레이딩 만들기

트레이딩과 강화학습

주식 트레이딩은 강화학습의 자연스러운 적용 분야입니다. 트레이더(에이전트)가 시장(환경)에서 매수/매도/보유(행동)를 결정하고, 수익(보상)을 얻는 구조가 강화학습 프레임워크와 정확히 일치합니다.

주의사항

이 글은 교육 목적의 실습이며, 실제 투자에 직접 적용하는 것은 권장하지 않습니다. 실제 금융 시장은 여기서 다루는 것보다 훨씬 복잡합니다.


트레이딩 기초 개념

기본 용어

  • 매수 (Buy/Long): 주식을 구매하여 가격 상승에 베팅
  • 매도 (Sell/Short): 보유 주식을 판매하여 포지션 청산
  • 포지션 (Position): 현재 보유하고 있는 주식의 상태
  • 수익률 (Return): 투자 대비 이익의 비율
  • 수수료 (Commission): 거래 시 발생하는 비용
  • 슬리피지 (Slippage): 주문 가격과 실제 체결 가격의 차이

강화학습으로 트레이딩을 모델링하기

RL 요소트레이딩 대응
상태과거 가격 데이터, 기술적 지표, 현재 포지션
행동매수, 매도, 보유
보상실현 수익, 미실현 수익 변화량
에피소드일정 기간의 트레이딩 세션

데이터 준비

가격 데이터 생성

실습을 위해 합성 데이터를 생성합니다. 실제 적용 시에는 Yahoo Finance 등에서 데이터를 가져올 수 있습니다.

import numpy as np
import pandas as pd

def generate_stock_data(n_days=1000, initial_price=100.0, volatility=0.02, seed=42):
    """합성 주가 데이터 생성 (기하 브라운 운동 모델)"""
    np.random.seed(seed)

    # 일별 수익률 생성
    daily_returns = np.random.normal(0.0005, volatility, n_days)

    # 가격 계산
    prices = initial_price * np.cumprod(1 + daily_returns)

    # OHLCV 데이터 생성
    data = pd.DataFrame()
    data['close'] = prices
    data['open'] = prices * (1 + np.random.normal(0, 0.005, n_days))
    data['high'] = np.maximum(data['open'], data['close']) * (1 + np.abs(np.random.normal(0, 0.01, n_days)))
    data['low'] = np.minimum(data['open'], data['close']) * (1 - np.abs(np.random.normal(0, 0.01, n_days)))
    data['volume'] = np.random.randint(100000, 1000000, n_days).astype(float)

    return data

# 데이터 생성
stock_data = generate_stock_data(n_days=2000)
print(f"데이터 크기: {len(stock_data)}")
print(stock_data.head())

기술적 지표 계산

def add_technical_indicators(df):
    """기술적 지표 추가"""
    # 이동평균
    df['sma_10'] = df['close'].rolling(window=10).mean()
    df['sma_30'] = df['close'].rolling(window=30).mean()

    # 상대강도지수 (RSI)
    delta = df['close'].diff()
    gain = delta.where(delta > 0, 0).rolling(window=14).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
    rs = gain / (loss + 1e-10)
    df['rsi'] = 100 - (100 / (1 + rs))

    # 볼린저 밴드
    bb_mean = df['close'].rolling(window=20).mean()
    bb_std = df['close'].rolling(window=20).std()
    df['bb_upper'] = bb_mean + 2 * bb_std
    df['bb_lower'] = bb_mean - 2 * bb_std
    df['bb_position'] = (df['close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower'] + 1e-10)

    # MACD
    ema12 = df['close'].ewm(span=12).mean()
    ema26 = df['close'].ewm(span=26).mean()
    df['macd'] = ema12 - ema26
    df['macd_signal'] = df['macd'].ewm(span=9).mean()

    # 수익률
    df['returns'] = df['close'].pct_change()
    df['returns_5d'] = df['close'].pct_change(5)

    # NaN 제거
    df.dropna(inplace=True)
    df.reset_index(drop=True, inplace=True)

    return df

stock_data = add_technical_indicators(stock_data)
print(f"지표 추가 후 데이터 크기: {len(stock_data)}")
print(f"특성 목록: {list(stock_data.columns)}")

트레이딩 환경 설계

Gymnasium 인터페이스를 따르는 커스텀 트레이딩 환경을 구현합니다.

import gymnasium as gym
from gymnasium import spaces

class StockTradingEnv(gym.Env):
    """주식 트레이딩 환경"""
    metadata = {"render_modes": ["human"]}

    def __init__(self, df, window_size=30, commission=0.001,
                 initial_balance=100000):
        super().__init__()

        self.df = df
        self.window_size = window_size
        self.commission = commission
        self.initial_balance = initial_balance

        # 특성 열 선택
        self.feature_columns = [
            'close', 'volume', 'sma_10', 'sma_30', 'rsi',
            'bb_position', 'macd', 'macd_signal', 'returns', 'returns_5d'
        ]
        self.n_features = len(self.feature_columns)

        # 행동 공간: 0=보유, 1=매수, 2=매도
        self.action_space = spaces.Discrete(3)

        # 관찰 공간: 가격 윈도우 + 포지션 정보
        obs_shape = self.window_size * self.n_features + 3  # +3: 포지션, 수익률, 보유비율
        self.observation_space = spaces.Box(
            low=-np.inf, high=np.inf, shape=(obs_shape,), dtype=np.float32
        )

    def _get_observation(self):
        """현재 관찰값 생성"""
        # 가격 윈도우 데이터
        start = self.current_step - self.window_size
        end = self.current_step
        window_data = self.df[self.feature_columns].iloc[start:end].values

        # 정규화 (각 특성별로 윈도우 내 min-max)
        for i in range(self.n_features):
            col = window_data[:, i]
            min_val = col.min()
            max_val = col.max()
            if max_val - min_val > 0:
                window_data[:, i] = (col - min_val) / (max_val - min_val)
            else:
                window_data[:, i] = 0.0

        flat_window = window_data.flatten()

        # 포지션 정보
        position_info = np.array([
            1.0 if self.position > 0 else 0.0,  # 포지션 보유 여부
            self.unrealized_pnl / self.initial_balance,  # 미실현 수익률
            self.shares * self.current_price / self.total_value,  # 주식 보유 비율
        ], dtype=np.float32)

        return np.concatenate([flat_window, position_info]).astype(np.float32)

    @property
    def current_price(self):
        return self.df['close'].iloc[self.current_step]

    @property
    def unrealized_pnl(self):
        if self.position > 0:
            return self.shares * (self.current_price - self.entry_price)
        return 0.0

    @property
    def total_value(self):
        return self.balance + self.shares * self.current_price

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.current_step = self.window_size
        self.balance = self.initial_balance
        self.shares = 0
        self.position = 0  # 0: 없음, 1: 롱
        self.entry_price = 0.0
        self.total_trades = 0
        self.winning_trades = 0
        self.trade_history = []

        return self._get_observation(), {}

    def step(self, action):
        prev_total = self.total_value
        reward = 0.0
        trade_info = ""

        current_price = self.current_price

        if action == 1 and self.position == 0:  # 매수
            # 전체 잔고의 95%로 매수 (수수료 고려)
            max_shares = int(self.balance * 0.95 / (current_price * (1 + self.commission)))
            if max_shares > 0:
                cost = max_shares * current_price * (1 + self.commission)
                self.balance -= cost
                self.shares = max_shares
                self.position = 1
                self.entry_price = current_price
                trade_info = f"매수 {max_shares}주 @ {current_price:.2f}"

        elif action == 2 and self.position == 1:  # 매도
            proceeds = self.shares * current_price * (1 - self.commission)
            self.balance += proceeds
            pnl = (current_price - self.entry_price) / self.entry_price
            self.total_trades += 1
            if pnl > 0:
                self.winning_trades += 1
            self.trade_history.append(pnl)
            trade_info = f"매도 {self.shares}주 @ {current_price:.2f}, 수익률: {pnl:.2%}"
            self.shares = 0
            self.position = 0
            self.entry_price = 0.0

        # 다음 스텝으로 이동
        self.current_step += 1

        # 보상: 포트폴리오 가치 변화율
        current_total = self.total_value
        reward = (current_total - prev_total) / prev_total

        # 종료 조건
        terminated = self.current_step >= len(self.df) - 1
        truncated = self.total_value < self.initial_balance * 0.5  # 50% 이상 손실

        info = {
            "total_value": self.total_value,
            "balance": self.balance,
            "position": self.position,
            "total_trades": self.total_trades,
            "trade_info": trade_info,
        }

        return self._get_observation(), reward, terminated, truncated, info

# 환경 테스트
env = StockTradingEnv(stock_data, window_size=30)
obs, info = env.reset()
print(f"관찰 차원: {obs.shape}")
print(f"초기 포트폴리오: {env.total_value:,.0f}원")

무작위 에이전트 기준선

def evaluate_random_agent(env, n_episodes=10):
    """무작위 에이전트 평가"""
    results = []

    for episode in range(n_episodes):
        obs, _ = env.reset()
        while True:
            action = env.action_space.sample()
            obs, reward, terminated, truncated, info = env.step(action)
            if terminated or truncated:
                break

        final_value = info['total_value']
        total_return = (final_value - env.initial_balance) / env.initial_balance
        results.append({
            'final_value': final_value,
            'return': total_return,
            'trades': info['total_trades'],
        })

    returns = [r['return'] for r in results]
    print(f"=== 무작위 에이전트 ({n_episodes}회) ===")
    print(f"평균 수익률: {np.mean(returns):.2%}")
    print(f"최대 수익률: {np.max(returns):.2%}")
    print(f"최소 수익률: {np.min(returns):.2%}")
    print(f"평균 거래 횟수: {np.mean([r['trades'] for r in results]):.1f}")

    return results

# random_results = evaluate_random_agent(env)

피드포워드 DQN 모델

import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random

class TradingDQN(nn.Module):
    """트레이딩용 피드포워드 DQN"""
    def __init__(self, obs_size, n_actions):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_size, 256),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, n_actions),
        )

    def forward(self, x):
        return self.net(x)

CNN 모델: 가격 차트를 이미지처럼 처리

가격 데이터의 시간적 패턴을 1D 합성곱으로 포착합니다.

class TradingCNN(nn.Module):
    """1D CNN 기반 트레이딩 모델"""
    def __init__(self, window_size, n_features, n_actions):
        super().__init__()

        self.conv = nn.Sequential(
            nn.Conv1d(n_features, 32, kernel_size=5, padding=2),
            nn.ReLU(),
            nn.Conv1d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool1d(1),  # 시간 차원을 1로 축소
        )

        # CNN 출력 + 포지션 정보 (3개)
        self.fc = nn.Sequential(
            nn.Linear(64 + 3, 64),
            nn.ReLU(),
            nn.Linear(64, n_actions),
        )

        self.window_size = window_size
        self.n_features = n_features

    def forward(self, x):
        batch_size = x.shape[0]

        # 윈도우 데이터와 포지션 정보 분리
        window_data = x[:, :-3].view(batch_size, self.window_size, self.n_features)
        position_info = x[:, -3:]

        # CNN: (batch, features, time) 형태로 변환
        window_data = window_data.transpose(1, 2)
        conv_out = self.conv(window_data).squeeze(-1)

        # 포지션 정보와 결합
        combined = torch.cat([conv_out, position_info], dim=1)
        return self.fc(combined)

트레이딩 에이전트 학습

class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        s, a, r, ns, d = zip(*batch)
        return (np.array(s), np.array(a), np.array(r, dtype=np.float32),
                np.array(ns), np.array(d, dtype=np.bool_))

    def __len__(self):
        return len(self.buffer)


def train_trading_agent(env, model_type="ff", n_episodes=500):
    """트레이딩 에이전트 학습"""
    obs_size = env.observation_space.shape[0]
    n_actions = env.action_space.n
    device = torch.device("cpu")

    if model_type == "cnn":
        online_net = TradingCNN(env.window_size, env.n_features, n_actions).to(device)
        target_net = TradingCNN(env.window_size, env.n_features, n_actions).to(device)
    else:
        online_net = TradingDQN(obs_size, n_actions).to(device)
        target_net = TradingDQN(obs_size, n_actions).to(device)

    target_net.load_state_dict(online_net.state_dict())
    optimizer = optim.Adam(online_net.parameters(), lr=1e-4)
    buffer = ReplayBuffer(50000)

    epsilon = 1.0
    epsilon_min = 0.05
    epsilon_decay = 0.995
    gamma = 0.99
    batch_size = 64
    target_update = 50

    best_return = -float('inf')
    returns_history = []

    for episode in range(n_episodes):
        obs, _ = env.reset()
        total_reward = 0

        while True:
            # 엡실론-탐욕 행동 선택
            if random.random() < epsilon:
                action = env.action_space.sample()
            else:
                with torch.no_grad():
                    q = online_net(torch.tensor([obs], dtype=torch.float32).to(device))
                action = q.argmax(dim=1).item()

            next_obs, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
            buffer.push(obs, action, reward, next_obs, done)
            total_reward += reward
            obs = next_obs

            # 학습
            if len(buffer) >= batch_size:
                s, a, r, ns, d = buffer.sample(batch_size)
                s_t = torch.tensor(s, dtype=torch.float32).to(device)
                a_t = torch.tensor(a, dtype=torch.long).to(device)
                r_t = torch.tensor(r, dtype=torch.float32).to(device)
                ns_t = torch.tensor(ns, dtype=torch.float32).to(device)
                d_t = torch.tensor(d, dtype=torch.bool).to(device)

                current_q = online_net(s_t).gather(1, a_t.unsqueeze(1)).squeeze(1)

                with torch.no_grad():
                    # Double DQN
                    best_a = online_net(ns_t).argmax(dim=1)
                    next_q = target_net(ns_t).gather(1, best_a.unsqueeze(1)).squeeze(1)
                    next_q[d_t] = 0.0
                    target_q = r_t + gamma * next_q

                loss = nn.SmoothL1Loss()(current_q, target_q)
                optimizer.zero_grad()
                loss.backward()
                torch.nn.utils.clip_grad_norm_(online_net.parameters(), 1.0)
                optimizer.step()

            if done:
                break

        # 타겟 네트워크 업데이트
        if episode % target_update == 0:
            target_net.load_state_dict(online_net.state_dict())

        epsilon = max(epsilon_min, epsilon * epsilon_decay)

        # 결과 기록
        episode_return = (env.total_value - env.initial_balance) / env.initial_balance
        returns_history.append(episode_return)

        if episode_return > best_return:
            best_return = episode_return
            torch.save(online_net.state_dict(), "best_trading_model.pth")

        if episode % 50 == 0:
            mean_return = np.mean(returns_history[-50:])
            print(
                f"에피소드 {episode}: "
                f"수익률={episode_return:.2%}, "
                f"평균 수익률={mean_return:.2%}, "
                f"거래={info['total_trades']}, "
                f"엡실론={epsilon:.3f}"
            )

    return online_net, returns_history

# 학습 실행
# trained_model, history = train_trading_agent(env, model_type="ff", n_episodes=500)

에이전트 평가 및 분석

def backtest_agent(env, net, n_episodes=5):
    """학습된 에이전트 백테스트"""
    all_results = []

    for episode in range(n_episodes):
        obs, _ = env.reset()
        portfolio_values = [env.total_value]
        trade_log = []

        while True:
            with torch.no_grad():
                q = net(torch.tensor([obs], dtype=torch.float32))
            action = q.argmax(dim=1).item()

            obs, reward, terminated, truncated, info = env.step(action)
            portfolio_values.append(env.total_value)

            if info.get('trade_info'):
                trade_log.append(info['trade_info'])

            if terminated or truncated:
                break

        total_return = (portfolio_values[-1] - portfolio_values[0]) / portfolio_values[0]

        # 성과 지표 계산
        daily_returns = np.diff(portfolio_values) / portfolio_values[:-1]
        sharpe_ratio = np.mean(daily_returns) / (np.std(daily_returns) + 1e-10) * np.sqrt(252)

        # 최대 낙폭 (MDD)
        peak = np.maximum.accumulate(portfolio_values)
        drawdown = (np.array(portfolio_values) - peak) / peak
        max_drawdown = drawdown.min()

        result = {
            'total_return': total_return,
            'sharpe_ratio': sharpe_ratio,
            'max_drawdown': max_drawdown,
            'total_trades': info['total_trades'],
            'portfolio_values': portfolio_values,
        }
        all_results.append(result)

        print(f"\n에피소드 {episode + 1}:")
        print(f"  총 수익률: {total_return:.2%}")
        print(f"  샤프 비율: {sharpe_ratio:.2f}")
        print(f"  최대 낙폭: {max_drawdown:.2%}")
        print(f"  총 거래 횟수: {info['total_trades']}")

    # 전체 평균
    print("\n=== 전체 백테스트 결과 ===")
    avg_return = np.mean([r['total_return'] for r in all_results])
    avg_sharpe = np.mean([r['sharpe_ratio'] for r in all_results])
    avg_mdd = np.mean([r['max_drawdown'] for r in all_results])
    print(f"평균 수익률: {avg_return:.2%}")
    print(f"평균 샤프 비율: {avg_sharpe:.2f}")
    print(f"평균 최대 낙폭: {avg_mdd:.2%}")

    return all_results

# backtest_results = backtest_agent(env, trained_model)

바이앤홀드 전략과 비교

def compare_with_buy_and_hold(df, agent_results, initial_balance=100000):
    """바이앤홀드 전략과 RL 에이전트 비교"""
    # 바이앤홀드: 처음에 매수하고 끝까지 보유
    start_price = df['close'].iloc[30]  # window_size 이후
    end_price = df['close'].iloc[-1]
    bnh_return = (end_price - start_price) / start_price

    agent_return = np.mean([r['total_return'] for r in agent_results])

    print("=== 전략 비교 ===")
    print(f"바이앤홀드 수익률: {bnh_return:.2%}")
    print(f"RL 에이전트 수익률: {agent_return:.2%}")
    print(f"초과 수익률: {agent_return - bnh_return:.2%}")

# compare_with_buy_and_hold(stock_data, backtest_results)

보상 함수 설계의 중요성

보상 함수의 설계는 트레이딩 에이전트의 성능에 결정적입니다.

class ImprovedRewardEnv(StockTradingEnv):
    """개선된 보상 함수를 사용하는 트레이딩 환경"""

    def _compute_reward(self, prev_total, action):
        """다양한 보상 설계 방식"""
        current_total = self.total_value

        # 1. 단순 포트폴리오 변화율
        basic_reward = (current_total - prev_total) / prev_total

        # 2. 리스크 조정 보상 (샤프 비율 유사)
        # 수익률에서 변동성 페널티를 차감
        volatility_penalty = 0.0
        if len(self.trade_history) > 1:
            recent_returns = self.trade_history[-10:]
            volatility_penalty = np.std(recent_returns) * 0.1

        # 3. 과도한 거래 페널티
        trade_penalty = 0.0
        if action != 0:  # 보유가 아닌 경우
            trade_penalty = -0.0001

        # 4. 승률 보너스
        win_bonus = 0.0
        if self.total_trades > 10:
            win_rate = self.winning_trades / self.total_trades
            if win_rate > 0.5:
                win_bonus = 0.0001

        return basic_reward - volatility_penalty + trade_penalty + win_bonus

정리

  1. 문제 정의: 주식 트레이딩을 MDP로 모델링 (상태=시장 데이터, 행동=매수/매도/보유, 보상=수익)
  2. 데이터 준비: 가격 데이터에 기술적 지표를 추가하여 관찰 공간 구성
  3. 환경 설계: Gymnasium 인터페이스의 커스텀 트레이딩 환경 구현
  4. 모델: 피드포워드 DQN과 1D CNN 모델 모두 활용 가능
  5. 보상 설계: 단순 수익률 외에 리스크 조정, 거래 페널티 등 다양한 요소 고려
  6. 평가: 수익률, 샤프 비율, 최대 낙폭 등 다양한 성과 지표로 평가

다음 글에서는 가치 기반 방법에서 벗어나 정책을 직접 최적화하는 Policy Gradient 방법을 살펴보겠습니다.

[Deep RL] 08. Building a Stock Trading Agent with Reinforcement Learning

Trading and Reinforcement Learning

Stock trading is a natural application of reinforcement learning. The structure where a trader (agent) decides to buy/sell/hold (actions) in the market (environment) and earns profit (reward) matches the RL framework exactly.

Disclaimer

This article is for educational purposes only, and applying it directly to real investments is not recommended. Real financial markets are far more complex than what is covered here.


Trading Basics

Basic Terminology

  • Buy/Long: Purchase stocks betting on price increase
  • Sell/Short: Sell held stocks to close a position
  • Position: Current state of held stocks
  • Return: Ratio of profit to investment
  • Commission: Cost incurred during transactions
  • Slippage: Difference between order price and actual execution price

Modeling Trading as Reinforcement Learning

RL ElementTrading Counterpart
StateHistorical price data, technical indicators, current position
ActionBuy, Sell, Hold
RewardRealized profit, unrealized P&L change
EpisodeA trading session over a given period

Data Preparation

Price Data Generation

We generate synthetic data for practice. For real applications, data can be fetched from Yahoo Finance, etc.

import numpy as np
import pandas as pd

def generate_stock_data(n_days=1000, initial_price=100.0, volatility=0.02, seed=42):
    """합성 주가 데이터 생성 (기하 브라운 운동 모델)"""
    np.random.seed(seed)
    daily_returns = np.random.normal(0.0005, volatility, n_days)
    prices = initial_price * np.cumprod(1 + daily_returns)
    data = pd.DataFrame()
    data['close'] = prices
    data['open'] = prices * (1 + np.random.normal(0, 0.005, n_days))
    data['high'] = np.maximum(data['open'], data['close']) * (1 + np.abs(np.random.normal(0, 0.01, n_days)))
    data['low'] = np.minimum(data['open'], data['close']) * (1 - np.abs(np.random.normal(0, 0.01, n_days)))
    data['volume'] = np.random.randint(100000, 1000000, n_days).astype(float)
    return data

stock_data = generate_stock_data(n_days=2000)
print(f"데이터 크기: {len(stock_data)}")
print(stock_data.head())

Technical Indicators

def add_technical_indicators(df):
    """기술적 지표 추가"""
    df['sma_10'] = df['close'].rolling(window=10).mean()
    df['sma_30'] = df['close'].rolling(window=30).mean()
    delta = df['close'].diff()
    gain = delta.where(delta > 0, 0).rolling(window=14).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
    rs = gain / (loss + 1e-10)
    df['rsi'] = 100 - (100 / (1 + rs))
    bb_mean = df['close'].rolling(window=20).mean()
    bb_std = df['close'].rolling(window=20).std()
    df['bb_upper'] = bb_mean + 2 * bb_std
    df['bb_lower'] = bb_mean - 2 * bb_std
    df['bb_position'] = (df['close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower'] + 1e-10)
    ema12 = df['close'].ewm(span=12).mean()
    ema26 = df['close'].ewm(span=26).mean()
    df['macd'] = ema12 - ema26
    df['macd_signal'] = df['macd'].ewm(span=9).mean()
    df['returns'] = df['close'].pct_change()
    df['returns_5d'] = df['close'].pct_change(5)
    df.dropna(inplace=True)
    df.reset_index(drop=True, inplace=True)
    return df

stock_data = add_technical_indicators(stock_data)
print(f"지표 추가 후 데이터 크기: {len(stock_data)}")
print(f"특성 목록: {list(stock_data.columns)}")

Trading Environment Design

We implement a custom trading environment following the Gymnasium interface.

import gymnasium as gym
from gymnasium import spaces

class StockTradingEnv(gym.Env):
    """주식 트레이딩 환경"""
    metadata = {"render_modes": ["human"]}

    def __init__(self, df, window_size=30, commission=0.001,
                 initial_balance=100000):
        super().__init__()
        self.df = df
        self.window_size = window_size
        self.commission = commission
        self.initial_balance = initial_balance
        self.feature_columns = [
            'close', 'volume', 'sma_10', 'sma_30', 'rsi',
            'bb_position', 'macd', 'macd_signal', 'returns', 'returns_5d'
        ]
        self.n_features = len(self.feature_columns)
        self.action_space = spaces.Discrete(3)
        obs_shape = self.window_size * self.n_features + 3
        self.observation_space = spaces.Box(
            low=-np.inf, high=np.inf, shape=(obs_shape,), dtype=np.float32
        )

    def _get_observation(self):
        """현재 관찰값 생성"""
        start = self.current_step - self.window_size
        end = self.current_step
        window_data = self.df[self.feature_columns].iloc[start:end].values
        for i in range(self.n_features):
            col = window_data[:, i]
            min_val = col.min()
            max_val = col.max()
            if max_val - min_val > 0:
                window_data[:, i] = (col - min_val) / (max_val - min_val)
            else:
                window_data[:, i] = 0.0
        flat_window = window_data.flatten()
        position_info = np.array([
            1.0 if self.position > 0 else 0.0,
            self.unrealized_pnl / self.initial_balance,
            self.shares * self.current_price / self.total_value,
        ], dtype=np.float32)
        return np.concatenate([flat_window, position_info]).astype(np.float32)

    @property
    def current_price(self):
        return self.df['close'].iloc[self.current_step]

    @property
    def unrealized_pnl(self):
        if self.position > 0:
            return self.shares * (self.current_price - self.entry_price)
        return 0.0

    @property
    def total_value(self):
        return self.balance + self.shares * self.current_price

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.current_step = self.window_size
        self.balance = self.initial_balance
        self.shares = 0
        self.position = 0
        self.entry_price = 0.0
        self.total_trades = 0
        self.winning_trades = 0
        self.trade_history = []
        return self._get_observation(), {}

    def step(self, action):
        prev_total = self.total_value
        reward = 0.0
        trade_info = ""
        current_price = self.current_price

        if action == 1 and self.position == 0:
            max_shares = int(self.balance * 0.95 / (current_price * (1 + self.commission)))
            if max_shares > 0:
                cost = max_shares * current_price * (1 + self.commission)
                self.balance -= cost
                self.shares = max_shares
                self.position = 1
                self.entry_price = current_price
                trade_info = f"매수 {max_shares}주 @ {current_price:.2f}"
        elif action == 2 and self.position == 1:
            proceeds = self.shares * current_price * (1 - self.commission)
            self.balance += proceeds
            pnl = (current_price - self.entry_price) / self.entry_price
            self.total_trades += 1
            if pnl > 0:
                self.winning_trades += 1
            self.trade_history.append(pnl)
            trade_info = f"매도 {self.shares}주 @ {current_price:.2f}, 수익률: {pnl:.2%}"
            self.shares = 0
            self.position = 0
            self.entry_price = 0.0

        self.current_step += 1
        current_total = self.total_value
        reward = (current_total - prev_total) / prev_total
        terminated = self.current_step >= len(self.df) - 1
        truncated = self.total_value < self.initial_balance * 0.5

        info = {
            "total_value": self.total_value,
            "balance": self.balance,
            "position": self.position,
            "total_trades": self.total_trades,
            "trade_info": trade_info,
        }
        return self._get_observation(), reward, terminated, truncated, info

env = StockTradingEnv(stock_data, window_size=30)
obs, info = env.reset()
print(f"관찰 차원: {obs.shape}")
print(f"초기 포트폴리오: {env.total_value:,.0f}원")

Random Agent Baseline

def evaluate_random_agent(env, n_episodes=10):
    """무작위 에이전트 평가"""
    results = []
    for episode in range(n_episodes):
        obs, _ = env.reset()
        while True:
            action = env.action_space.sample()
            obs, reward, terminated, truncated, info = env.step(action)
            if terminated or truncated:
                break
        final_value = info['total_value']
        total_return = (final_value - env.initial_balance) / env.initial_balance
        results.append({'final_value': final_value, 'return': total_return, 'trades': info['total_trades']})

    returns = [r['return'] for r in results]
    print(f"=== 무작위 에이전트 ({n_episodes}회) ===")
    print(f"평균 수익률: {np.mean(returns):.2%}")
    print(f"최대 수익률: {np.max(returns):.2%}")
    print(f"최소 수익률: {np.min(returns):.2%}")
    return results

# random_results = evaluate_random_agent(env)

Feedforward DQN Model

import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random

class TradingDQN(nn.Module):
    """트레이딩용 피드포워드 DQN"""
    def __init__(self, obs_size, n_actions):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_size, 256), nn.ReLU(), nn.Dropout(0.2),
            nn.Linear(256, 128), nn.ReLU(), nn.Dropout(0.2),
            nn.Linear(128, 64), nn.ReLU(),
            nn.Linear(64, n_actions),
        )

    def forward(self, x):
        return self.net(x)

CNN Model: Processing Price Charts Like Images

Temporal patterns in price data are captured using 1D convolutions.

class TradingCNN(nn.Module):
    """1D CNN 기반 트레이딩 모델"""
    def __init__(self, window_size, n_features, n_actions):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv1d(n_features, 32, kernel_size=5, padding=2), nn.ReLU(),
            nn.Conv1d(32, 64, kernel_size=3, padding=1), nn.ReLU(),
            nn.AdaptiveAvgPool1d(1),
        )
        self.fc = nn.Sequential(
            nn.Linear(64 + 3, 64), nn.ReLU(),
            nn.Linear(64, n_actions),
        )
        self.window_size = window_size
        self.n_features = n_features

    def forward(self, x):
        batch_size = x.shape[0]
        window_data = x[:, :-3].view(batch_size, self.window_size, self.n_features)
        position_info = x[:, -3:]
        window_data = window_data.transpose(1, 2)
        conv_out = self.conv(window_data).squeeze(-1)
        combined = torch.cat([conv_out, position_info], dim=1)
        return self.fc(combined)

Training the Trading Agent

class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        s, a, r, ns, d = zip(*batch)
        return (np.array(s), np.array(a), np.array(r, dtype=np.float32),
                np.array(ns), np.array(d, dtype=np.bool_))
    def __len__(self):
        return len(self.buffer)

def train_trading_agent(env, model_type="ff", n_episodes=500):
    """트레이딩 에이전트 학습"""
    obs_size = env.observation_space.shape[0]
    n_actions = env.action_space.n
    device = torch.device("cpu")

    if model_type == "cnn":
        online_net = TradingCNN(env.window_size, env.n_features, n_actions).to(device)
        target_net = TradingCNN(env.window_size, env.n_features, n_actions).to(device)
    else:
        online_net = TradingDQN(obs_size, n_actions).to(device)
        target_net = TradingDQN(obs_size, n_actions).to(device)

    target_net.load_state_dict(online_net.state_dict())
    optimizer = optim.Adam(online_net.parameters(), lr=1e-4)
    buffer = ReplayBuffer(50000)
    epsilon = 1.0
    epsilon_min = 0.05
    epsilon_decay = 0.995
    gamma = 0.99
    batch_size = 64
    target_update = 50
    best_return = -float('inf')
    returns_history = []

    for episode in range(n_episodes):
        obs, _ = env.reset()
        total_reward = 0
        while True:
            if random.random() < epsilon:
                action = env.action_space.sample()
            else:
                with torch.no_grad():
                    q = online_net(torch.tensor([obs], dtype=torch.float32).to(device))
                action = q.argmax(dim=1).item()
            next_obs, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
            buffer.push(obs, action, reward, next_obs, done)
            total_reward += reward
            obs = next_obs
            if len(buffer) >= batch_size:
                s, a, r, ns, d = buffer.sample(batch_size)
                s_t = torch.tensor(s, dtype=torch.float32).to(device)
                a_t = torch.tensor(a, dtype=torch.long).to(device)
                r_t = torch.tensor(r, dtype=torch.float32).to(device)
                ns_t = torch.tensor(ns, dtype=torch.float32).to(device)
                d_t = torch.tensor(d, dtype=torch.bool).to(device)
                current_q = online_net(s_t).gather(1, a_t.unsqueeze(1)).squeeze(1)
                with torch.no_grad():
                    best_a = online_net(ns_t).argmax(dim=1)
                    next_q = target_net(ns_t).gather(1, best_a.unsqueeze(1)).squeeze(1)
                    next_q[d_t] = 0.0
                    target_q = r_t + gamma * next_q
                loss = nn.SmoothL1Loss()(current_q, target_q)
                optimizer.zero_grad()
                loss.backward()
                torch.nn.utils.clip_grad_norm_(online_net.parameters(), 1.0)
                optimizer.step()
            if done:
                break
        if episode % target_update == 0:
            target_net.load_state_dict(online_net.state_dict())
        epsilon = max(epsilon_min, epsilon * epsilon_decay)
        episode_return = (env.total_value - env.initial_balance) / env.initial_balance
        returns_history.append(episode_return)
        if episode_return > best_return:
            best_return = episode_return
            torch.save(online_net.state_dict(), "best_trading_model.pth")
        if episode % 50 == 0:
            mean_return = np.mean(returns_history[-50:])
            print(f"에피소드 {episode}: 수익률={episode_return:.2%}, 평균 수익률={mean_return:.2%}, 거래={info['total_trades']}, 엡실론={epsilon:.3f}")
    return online_net, returns_history

# trained_model, history = train_trading_agent(env, model_type="ff", n_episodes=500)

Agent Evaluation and Analysis

def backtest_agent(env, net, n_episodes=5):
    """학습된 에이전트 백테스트"""
    all_results = []
    for episode in range(n_episodes):
        obs, _ = env.reset()
        portfolio_values = [env.total_value]
        while True:
            with torch.no_grad():
                q = net(torch.tensor([obs], dtype=torch.float32))
            action = q.argmax(dim=1).item()
            obs, reward, terminated, truncated, info = env.step(action)
            portfolio_values.append(env.total_value)
            if terminated or truncated:
                break
        total_return = (portfolio_values[-1] - portfolio_values[0]) / portfolio_values[0]
        daily_returns = np.diff(portfolio_values) / portfolio_values[:-1]
        sharpe_ratio = np.mean(daily_returns) / (np.std(daily_returns) + 1e-10) * np.sqrt(252)
        peak = np.maximum.accumulate(portfolio_values)
        drawdown = (np.array(portfolio_values) - peak) / peak
        max_drawdown = drawdown.min()
        result = {'total_return': total_return, 'sharpe_ratio': sharpe_ratio, 'max_drawdown': max_drawdown, 'total_trades': info['total_trades']}
        all_results.append(result)
        print(f"\n에피소드 {episode + 1}: 총 수익률: {total_return:.2%}, 샤프 비율: {sharpe_ratio:.2f}, 최대 낙폭: {max_drawdown:.2%}")
    avg_return = np.mean([r['total_return'] for r in all_results])
    print(f"\n=== 전체 백테스트 결과 === 평균 수익률: {avg_return:.2%}")
    return all_results

# backtest_results = backtest_agent(env, trained_model)

Summary

  1. Problem definition: Model stock trading as an MDP (state=market data, action=buy/sell/hold, reward=profit)
  2. Data preparation: Add technical indicators to price data to construct observation space
  3. Environment design: Implement custom trading environment with Gymnasium interface
  4. Models: Both feedforward DQN and 1D CNN models can be used
  5. Reward design: Consider various factors beyond simple returns, such as risk adjustment and trading penalties
  6. Evaluation: Evaluate with diverse performance metrics including returns, Sharpe ratio, and maximum drawdown

In the next article, we will move beyond value-based methods to explore Policy Gradient methods that directly optimize the policy.