Skip to content
Published on

[심층 강화학습] 08. 강화학습으로 주식 트레이딩 만들기

Authors

트레이딩과 강화학습

주식 트레이딩은 강화학습의 자연스러운 적용 분야입니다. 트레이더(에이전트)가 시장(환경)에서 매수/매도/보유(행동)를 결정하고, 수익(보상)을 얻는 구조가 강화학습 프레임워크와 정확히 일치합니다.

주의사항

이 글은 교육 목적의 실습이며, 실제 투자에 직접 적용하는 것은 권장하지 않습니다. 실제 금융 시장은 여기서 다루는 것보다 훨씬 복잡합니다.


트레이딩 기초 개념

기본 용어

  • 매수 (Buy/Long): 주식을 구매하여 가격 상승에 베팅
  • 매도 (Sell/Short): 보유 주식을 판매하여 포지션 청산
  • 포지션 (Position): 현재 보유하고 있는 주식의 상태
  • 수익률 (Return): 투자 대비 이익의 비율
  • 수수료 (Commission): 거래 시 발생하는 비용
  • 슬리피지 (Slippage): 주문 가격과 실제 체결 가격의 차이

강화학습으로 트레이딩을 모델링하기

RL 요소트레이딩 대응
상태과거 가격 데이터, 기술적 지표, 현재 포지션
행동매수, 매도, 보유
보상실현 수익, 미실현 수익 변화량
에피소드일정 기간의 트레이딩 세션

데이터 준비

가격 데이터 생성

실습을 위해 합성 데이터를 생성합니다. 실제 적용 시에는 Yahoo Finance 등에서 데이터를 가져올 수 있습니다.

import numpy as np
import pandas as pd

def generate_stock_data(n_days=1000, initial_price=100.0, volatility=0.02, seed=42):
    """합성 주가 데이터 생성 (기하 브라운 운동 모델)"""
    np.random.seed(seed)

    # 일별 수익률 생성
    daily_returns = np.random.normal(0.0005, volatility, n_days)

    # 가격 계산
    prices = initial_price * np.cumprod(1 + daily_returns)

    # OHLCV 데이터 생성
    data = pd.DataFrame()
    data['close'] = prices
    data['open'] = prices * (1 + np.random.normal(0, 0.005, n_days))
    data['high'] = np.maximum(data['open'], data['close']) * (1 + np.abs(np.random.normal(0, 0.01, n_days)))
    data['low'] = np.minimum(data['open'], data['close']) * (1 - np.abs(np.random.normal(0, 0.01, n_days)))
    data['volume'] = np.random.randint(100000, 1000000, n_days).astype(float)

    return data

# 데이터 생성
stock_data = generate_stock_data(n_days=2000)
print(f"데이터 크기: {len(stock_data)}")
print(stock_data.head())

기술적 지표 계산

def add_technical_indicators(df):
    """기술적 지표 추가"""
    # 이동평균
    df['sma_10'] = df['close'].rolling(window=10).mean()
    df['sma_30'] = df['close'].rolling(window=30).mean()

    # 상대강도지수 (RSI)
    delta = df['close'].diff()
    gain = delta.where(delta > 0, 0).rolling(window=14).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
    rs = gain / (loss + 1e-10)
    df['rsi'] = 100 - (100 / (1 + rs))

    # 볼린저 밴드
    bb_mean = df['close'].rolling(window=20).mean()
    bb_std = df['close'].rolling(window=20).std()
    df['bb_upper'] = bb_mean + 2 * bb_std
    df['bb_lower'] = bb_mean - 2 * bb_std
    df['bb_position'] = (df['close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower'] + 1e-10)

    # MACD
    ema12 = df['close'].ewm(span=12).mean()
    ema26 = df['close'].ewm(span=26).mean()
    df['macd'] = ema12 - ema26
    df['macd_signal'] = df['macd'].ewm(span=9).mean()

    # 수익률
    df['returns'] = df['close'].pct_change()
    df['returns_5d'] = df['close'].pct_change(5)

    # NaN 제거
    df.dropna(inplace=True)
    df.reset_index(drop=True, inplace=True)

    return df

stock_data = add_technical_indicators(stock_data)
print(f"지표 추가 후 데이터 크기: {len(stock_data)}")
print(f"특성 목록: {list(stock_data.columns)}")

트레이딩 환경 설계

Gymnasium 인터페이스를 따르는 커스텀 트레이딩 환경을 구현합니다.

import gymnasium as gym
from gymnasium import spaces

class StockTradingEnv(gym.Env):
    """주식 트레이딩 환경"""
    metadata = {"render_modes": ["human"]}

    def __init__(self, df, window_size=30, commission=0.001,
                 initial_balance=100000):
        super().__init__()

        self.df = df
        self.window_size = window_size
        self.commission = commission
        self.initial_balance = initial_balance

        # 특성 열 선택
        self.feature_columns = [
            'close', 'volume', 'sma_10', 'sma_30', 'rsi',
            'bb_position', 'macd', 'macd_signal', 'returns', 'returns_5d'
        ]
        self.n_features = len(self.feature_columns)

        # 행동 공간: 0=보유, 1=매수, 2=매도
        self.action_space = spaces.Discrete(3)

        # 관찰 공간: 가격 윈도우 + 포지션 정보
        obs_shape = self.window_size * self.n_features + 3  # +3: 포지션, 수익률, 보유비율
        self.observation_space = spaces.Box(
            low=-np.inf, high=np.inf, shape=(obs_shape,), dtype=np.float32
        )

    def _get_observation(self):
        """현재 관찰값 생성"""
        # 가격 윈도우 데이터
        start = self.current_step - self.window_size
        end = self.current_step
        window_data = self.df[self.feature_columns].iloc[start:end].values

        # 정규화 (각 특성별로 윈도우 내 min-max)
        for i in range(self.n_features):
            col = window_data[:, i]
            min_val = col.min()
            max_val = col.max()
            if max_val - min_val > 0:
                window_data[:, i] = (col - min_val) / (max_val - min_val)
            else:
                window_data[:, i] = 0.0

        flat_window = window_data.flatten()

        # 포지션 정보
        position_info = np.array([
            1.0 if self.position > 0 else 0.0,  # 포지션 보유 여부
            self.unrealized_pnl / self.initial_balance,  # 미실현 수익률
            self.shares * self.current_price / self.total_value,  # 주식 보유 비율
        ], dtype=np.float32)

        return np.concatenate([flat_window, position_info]).astype(np.float32)

    @property
    def current_price(self):
        return self.df['close'].iloc[self.current_step]

    @property
    def unrealized_pnl(self):
        if self.position > 0:
            return self.shares * (self.current_price - self.entry_price)
        return 0.0

    @property
    def total_value(self):
        return self.balance + self.shares * self.current_price

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.current_step = self.window_size
        self.balance = self.initial_balance
        self.shares = 0
        self.position = 0  # 0: 없음, 1: 롱
        self.entry_price = 0.0
        self.total_trades = 0
        self.winning_trades = 0
        self.trade_history = []

        return self._get_observation(), {}

    def step(self, action):
        prev_total = self.total_value
        reward = 0.0
        trade_info = ""

        current_price = self.current_price

        if action == 1 and self.position == 0:  # 매수
            # 전체 잔고의 95%로 매수 (수수료 고려)
            max_shares = int(self.balance * 0.95 / (current_price * (1 + self.commission)))
            if max_shares > 0:
                cost = max_shares * current_price * (1 + self.commission)
                self.balance -= cost
                self.shares = max_shares
                self.position = 1
                self.entry_price = current_price
                trade_info = f"매수 {max_shares}주 @ {current_price:.2f}"

        elif action == 2 and self.position == 1:  # 매도
            proceeds = self.shares * current_price * (1 - self.commission)
            self.balance += proceeds
            pnl = (current_price - self.entry_price) / self.entry_price
            self.total_trades += 1
            if pnl > 0:
                self.winning_trades += 1
            self.trade_history.append(pnl)
            trade_info = f"매도 {self.shares}주 @ {current_price:.2f}, 수익률: {pnl:.2%}"
            self.shares = 0
            self.position = 0
            self.entry_price = 0.0

        # 다음 스텝으로 이동
        self.current_step += 1

        # 보상: 포트폴리오 가치 변화율
        current_total = self.total_value
        reward = (current_total - prev_total) / prev_total

        # 종료 조건
        terminated = self.current_step >= len(self.df) - 1
        truncated = self.total_value < self.initial_balance * 0.5  # 50% 이상 손실

        info = {
            "total_value": self.total_value,
            "balance": self.balance,
            "position": self.position,
            "total_trades": self.total_trades,
            "trade_info": trade_info,
        }

        return self._get_observation(), reward, terminated, truncated, info

# 환경 테스트
env = StockTradingEnv(stock_data, window_size=30)
obs, info = env.reset()
print(f"관찰 차원: {obs.shape}")
print(f"초기 포트폴리오: {env.total_value:,.0f}원")

무작위 에이전트 기준선

def evaluate_random_agent(env, n_episodes=10):
    """무작위 에이전트 평가"""
    results = []

    for episode in range(n_episodes):
        obs, _ = env.reset()
        while True:
            action = env.action_space.sample()
            obs, reward, terminated, truncated, info = env.step(action)
            if terminated or truncated:
                break

        final_value = info['total_value']
        total_return = (final_value - env.initial_balance) / env.initial_balance
        results.append({
            'final_value': final_value,
            'return': total_return,
            'trades': info['total_trades'],
        })

    returns = [r['return'] for r in results]
    print(f"=== 무작위 에이전트 ({n_episodes}회) ===")
    print(f"평균 수익률: {np.mean(returns):.2%}")
    print(f"최대 수익률: {np.max(returns):.2%}")
    print(f"최소 수익률: {np.min(returns):.2%}")
    print(f"평균 거래 횟수: {np.mean([r['trades'] for r in results]):.1f}")

    return results

# random_results = evaluate_random_agent(env)

피드포워드 DQN 모델

import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random

class TradingDQN(nn.Module):
    """트레이딩용 피드포워드 DQN"""
    def __init__(self, obs_size, n_actions):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_size, 256),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, n_actions),
        )

    def forward(self, x):
        return self.net(x)

CNN 모델: 가격 차트를 이미지처럼 처리

가격 데이터의 시간적 패턴을 1D 합성곱으로 포착합니다.

class TradingCNN(nn.Module):
    """1D CNN 기반 트레이딩 모델"""
    def __init__(self, window_size, n_features, n_actions):
        super().__init__()

        self.conv = nn.Sequential(
            nn.Conv1d(n_features, 32, kernel_size=5, padding=2),
            nn.ReLU(),
            nn.Conv1d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool1d(1),  # 시간 차원을 1로 축소
        )

        # CNN 출력 + 포지션 정보 (3개)
        self.fc = nn.Sequential(
            nn.Linear(64 + 3, 64),
            nn.ReLU(),
            nn.Linear(64, n_actions),
        )

        self.window_size = window_size
        self.n_features = n_features

    def forward(self, x):
        batch_size = x.shape[0]

        # 윈도우 데이터와 포지션 정보 분리
        window_data = x[:, :-3].view(batch_size, self.window_size, self.n_features)
        position_info = x[:, -3:]

        # CNN: (batch, features, time) 형태로 변환
        window_data = window_data.transpose(1, 2)
        conv_out = self.conv(window_data).squeeze(-1)

        # 포지션 정보와 결합
        combined = torch.cat([conv_out, position_info], dim=1)
        return self.fc(combined)

트레이딩 에이전트 학습

class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        s, a, r, ns, d = zip(*batch)
        return (np.array(s), np.array(a), np.array(r, dtype=np.float32),
                np.array(ns), np.array(d, dtype=np.bool_))

    def __len__(self):
        return len(self.buffer)


def train_trading_agent(env, model_type="ff", n_episodes=500):
    """트레이딩 에이전트 학습"""
    obs_size = env.observation_space.shape[0]
    n_actions = env.action_space.n
    device = torch.device("cpu")

    if model_type == "cnn":
        online_net = TradingCNN(env.window_size, env.n_features, n_actions).to(device)
        target_net = TradingCNN(env.window_size, env.n_features, n_actions).to(device)
    else:
        online_net = TradingDQN(obs_size, n_actions).to(device)
        target_net = TradingDQN(obs_size, n_actions).to(device)

    target_net.load_state_dict(online_net.state_dict())
    optimizer = optim.Adam(online_net.parameters(), lr=1e-4)
    buffer = ReplayBuffer(50000)

    epsilon = 1.0
    epsilon_min = 0.05
    epsilon_decay = 0.995
    gamma = 0.99
    batch_size = 64
    target_update = 50

    best_return = -float('inf')
    returns_history = []

    for episode in range(n_episodes):
        obs, _ = env.reset()
        total_reward = 0

        while True:
            # 엡실론-탐욕 행동 선택
            if random.random() < epsilon:
                action = env.action_space.sample()
            else:
                with torch.no_grad():
                    q = online_net(torch.tensor([obs], dtype=torch.float32).to(device))
                action = q.argmax(dim=1).item()

            next_obs, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
            buffer.push(obs, action, reward, next_obs, done)
            total_reward += reward
            obs = next_obs

            # 학습
            if len(buffer) >= batch_size:
                s, a, r, ns, d = buffer.sample(batch_size)
                s_t = torch.tensor(s, dtype=torch.float32).to(device)
                a_t = torch.tensor(a, dtype=torch.long).to(device)
                r_t = torch.tensor(r, dtype=torch.float32).to(device)
                ns_t = torch.tensor(ns, dtype=torch.float32).to(device)
                d_t = torch.tensor(d, dtype=torch.bool).to(device)

                current_q = online_net(s_t).gather(1, a_t.unsqueeze(1)).squeeze(1)

                with torch.no_grad():
                    # Double DQN
                    best_a = online_net(ns_t).argmax(dim=1)
                    next_q = target_net(ns_t).gather(1, best_a.unsqueeze(1)).squeeze(1)
                    next_q[d_t] = 0.0
                    target_q = r_t + gamma * next_q

                loss = nn.SmoothL1Loss()(current_q, target_q)
                optimizer.zero_grad()
                loss.backward()
                torch.nn.utils.clip_grad_norm_(online_net.parameters(), 1.0)
                optimizer.step()

            if done:
                break

        # 타겟 네트워크 업데이트
        if episode % target_update == 0:
            target_net.load_state_dict(online_net.state_dict())

        epsilon = max(epsilon_min, epsilon * epsilon_decay)

        # 결과 기록
        episode_return = (env.total_value - env.initial_balance) / env.initial_balance
        returns_history.append(episode_return)

        if episode_return > best_return:
            best_return = episode_return
            torch.save(online_net.state_dict(), "best_trading_model.pth")

        if episode % 50 == 0:
            mean_return = np.mean(returns_history[-50:])
            print(
                f"에피소드 {episode}: "
                f"수익률={episode_return:.2%}, "
                f"평균 수익률={mean_return:.2%}, "
                f"거래={info['total_trades']}, "
                f"엡실론={epsilon:.3f}"
            )

    return online_net, returns_history

# 학습 실행
# trained_model, history = train_trading_agent(env, model_type="ff", n_episodes=500)

에이전트 평가 및 분석

def backtest_agent(env, net, n_episodes=5):
    """학습된 에이전트 백테스트"""
    all_results = []

    for episode in range(n_episodes):
        obs, _ = env.reset()
        portfolio_values = [env.total_value]
        trade_log = []

        while True:
            with torch.no_grad():
                q = net(torch.tensor([obs], dtype=torch.float32))
            action = q.argmax(dim=1).item()

            obs, reward, terminated, truncated, info = env.step(action)
            portfolio_values.append(env.total_value)

            if info.get('trade_info'):
                trade_log.append(info['trade_info'])

            if terminated or truncated:
                break

        total_return = (portfolio_values[-1] - portfolio_values[0]) / portfolio_values[0]

        # 성과 지표 계산
        daily_returns = np.diff(portfolio_values) / portfolio_values[:-1]
        sharpe_ratio = np.mean(daily_returns) / (np.std(daily_returns) + 1e-10) * np.sqrt(252)

        # 최대 낙폭 (MDD)
        peak = np.maximum.accumulate(portfolio_values)
        drawdown = (np.array(portfolio_values) - peak) / peak
        max_drawdown = drawdown.min()

        result = {
            'total_return': total_return,
            'sharpe_ratio': sharpe_ratio,
            'max_drawdown': max_drawdown,
            'total_trades': info['total_trades'],
            'portfolio_values': portfolio_values,
        }
        all_results.append(result)

        print(f"\n에피소드 {episode + 1}:")
        print(f"  총 수익률: {total_return:.2%}")
        print(f"  샤프 비율: {sharpe_ratio:.2f}")
        print(f"  최대 낙폭: {max_drawdown:.2%}")
        print(f"  총 거래 횟수: {info['total_trades']}")

    # 전체 평균
    print("\n=== 전체 백테스트 결과 ===")
    avg_return = np.mean([r['total_return'] for r in all_results])
    avg_sharpe = np.mean([r['sharpe_ratio'] for r in all_results])
    avg_mdd = np.mean([r['max_drawdown'] for r in all_results])
    print(f"평균 수익률: {avg_return:.2%}")
    print(f"평균 샤프 비율: {avg_sharpe:.2f}")
    print(f"평균 최대 낙폭: {avg_mdd:.2%}")

    return all_results

# backtest_results = backtest_agent(env, trained_model)

바이앤홀드 전략과 비교

def compare_with_buy_and_hold(df, agent_results, initial_balance=100000):
    """바이앤홀드 전략과 RL 에이전트 비교"""
    # 바이앤홀드: 처음에 매수하고 끝까지 보유
    start_price = df['close'].iloc[30]  # window_size 이후
    end_price = df['close'].iloc[-1]
    bnh_return = (end_price - start_price) / start_price

    agent_return = np.mean([r['total_return'] for r in agent_results])

    print("=== 전략 비교 ===")
    print(f"바이앤홀드 수익률: {bnh_return:.2%}")
    print(f"RL 에이전트 수익률: {agent_return:.2%}")
    print(f"초과 수익률: {agent_return - bnh_return:.2%}")

# compare_with_buy_and_hold(stock_data, backtest_results)

보상 함수 설계의 중요성

보상 함수의 설계는 트레이딩 에이전트의 성능에 결정적입니다.

class ImprovedRewardEnv(StockTradingEnv):
    """개선된 보상 함수를 사용하는 트레이딩 환경"""

    def _compute_reward(self, prev_total, action):
        """다양한 보상 설계 방식"""
        current_total = self.total_value

        # 1. 단순 포트폴리오 변화율
        basic_reward = (current_total - prev_total) / prev_total

        # 2. 리스크 조정 보상 (샤프 비율 유사)
        # 수익률에서 변동성 페널티를 차감
        volatility_penalty = 0.0
        if len(self.trade_history) > 1:
            recent_returns = self.trade_history[-10:]
            volatility_penalty = np.std(recent_returns) * 0.1

        # 3. 과도한 거래 페널티
        trade_penalty = 0.0
        if action != 0:  # 보유가 아닌 경우
            trade_penalty = -0.0001

        # 4. 승률 보너스
        win_bonus = 0.0
        if self.total_trades > 10:
            win_rate = self.winning_trades / self.total_trades
            if win_rate > 0.5:
                win_bonus = 0.0001

        return basic_reward - volatility_penalty + trade_penalty + win_bonus

정리

  1. 문제 정의: 주식 트레이딩을 MDP로 모델링 (상태=시장 데이터, 행동=매수/매도/보유, 보상=수익)
  2. 데이터 준비: 가격 데이터에 기술적 지표를 추가하여 관찰 공간 구성
  3. 환경 설계: Gymnasium 인터페이스의 커스텀 트레이딩 환경 구현
  4. 모델: 피드포워드 DQN과 1D CNN 모델 모두 활용 가능
  5. 보상 설계: 단순 수익률 외에 리스크 조정, 거래 페널티 등 다양한 요소 고려
  6. 평가: 수익률, 샤프 비율, 최대 낙폭 등 다양한 성과 지표로 평가

다음 글에서는 가치 기반 방법에서 벗어나 정책을 직접 최적화하는 Policy Gradient 방법을 살펴보겠습니다.