Skip to content
Published on

[Deep RL] 08. Building a Stock Trading Agent with Reinforcement Learning

Authors

Trading and Reinforcement Learning

Stock trading is a natural application of reinforcement learning. The structure where a trader (agent) decides to buy/sell/hold (actions) in the market (environment) and earns profit (reward) matches the RL framework exactly.

Disclaimer

This article is for educational purposes only, and applying it directly to real investments is not recommended. Real financial markets are far more complex than what is covered here.


Trading Basics

Basic Terminology

  • Buy/Long: Purchase stocks betting on price increase
  • Sell/Short: Sell held stocks to close a position
  • Position: Current state of held stocks
  • Return: Ratio of profit to investment
  • Commission: Cost incurred during transactions
  • Slippage: Difference between order price and actual execution price

Modeling Trading as Reinforcement Learning

RL ElementTrading Counterpart
StateHistorical price data, technical indicators, current position
ActionBuy, Sell, Hold
RewardRealized profit, unrealized P&L change
EpisodeA trading session over a given period

Data Preparation

Price Data Generation

We generate synthetic data for practice. For real applications, data can be fetched from Yahoo Finance, etc.

import numpy as np
import pandas as pd

def generate_stock_data(n_days=1000, initial_price=100.0, volatility=0.02, seed=42):
    """합성 주가 데이터 생성 (기하 브라운 운동 모델)"""
    np.random.seed(seed)
    daily_returns = np.random.normal(0.0005, volatility, n_days)
    prices = initial_price * np.cumprod(1 + daily_returns)
    data = pd.DataFrame()
    data['close'] = prices
    data['open'] = prices * (1 + np.random.normal(0, 0.005, n_days))
    data['high'] = np.maximum(data['open'], data['close']) * (1 + np.abs(np.random.normal(0, 0.01, n_days)))
    data['low'] = np.minimum(data['open'], data['close']) * (1 - np.abs(np.random.normal(0, 0.01, n_days)))
    data['volume'] = np.random.randint(100000, 1000000, n_days).astype(float)
    return data

stock_data = generate_stock_data(n_days=2000)
print(f"데이터 크기: {len(stock_data)}")
print(stock_data.head())

Technical Indicators

def add_technical_indicators(df):
    """기술적 지표 추가"""
    df['sma_10'] = df['close'].rolling(window=10).mean()
    df['sma_30'] = df['close'].rolling(window=30).mean()
    delta = df['close'].diff()
    gain = delta.where(delta > 0, 0).rolling(window=14).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
    rs = gain / (loss + 1e-10)
    df['rsi'] = 100 - (100 / (1 + rs))
    bb_mean = df['close'].rolling(window=20).mean()
    bb_std = df['close'].rolling(window=20).std()
    df['bb_upper'] = bb_mean + 2 * bb_std
    df['bb_lower'] = bb_mean - 2 * bb_std
    df['bb_position'] = (df['close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower'] + 1e-10)
    ema12 = df['close'].ewm(span=12).mean()
    ema26 = df['close'].ewm(span=26).mean()
    df['macd'] = ema12 - ema26
    df['macd_signal'] = df['macd'].ewm(span=9).mean()
    df['returns'] = df['close'].pct_change()
    df['returns_5d'] = df['close'].pct_change(5)
    df.dropna(inplace=True)
    df.reset_index(drop=True, inplace=True)
    return df

stock_data = add_technical_indicators(stock_data)
print(f"지표 추가 후 데이터 크기: {len(stock_data)}")
print(f"특성 목록: {list(stock_data.columns)}")

Trading Environment Design

We implement a custom trading environment following the Gymnasium interface.

import gymnasium as gym
from gymnasium import spaces

class StockTradingEnv(gym.Env):
    """주식 트레이딩 환경"""
    metadata = {"render_modes": ["human"]}

    def __init__(self, df, window_size=30, commission=0.001,
                 initial_balance=100000):
        super().__init__()
        self.df = df
        self.window_size = window_size
        self.commission = commission
        self.initial_balance = initial_balance
        self.feature_columns = [
            'close', 'volume', 'sma_10', 'sma_30', 'rsi',
            'bb_position', 'macd', 'macd_signal', 'returns', 'returns_5d'
        ]
        self.n_features = len(self.feature_columns)
        self.action_space = spaces.Discrete(3)
        obs_shape = self.window_size * self.n_features + 3
        self.observation_space = spaces.Box(
            low=-np.inf, high=np.inf, shape=(obs_shape,), dtype=np.float32
        )

    def _get_observation(self):
        """현재 관찰값 생성"""
        start = self.current_step - self.window_size
        end = self.current_step
        window_data = self.df[self.feature_columns].iloc[start:end].values
        for i in range(self.n_features):
            col = window_data[:, i]
            min_val = col.min()
            max_val = col.max()
            if max_val - min_val > 0:
                window_data[:, i] = (col - min_val) / (max_val - min_val)
            else:
                window_data[:, i] = 0.0
        flat_window = window_data.flatten()
        position_info = np.array([
            1.0 if self.position > 0 else 0.0,
            self.unrealized_pnl / self.initial_balance,
            self.shares * self.current_price / self.total_value,
        ], dtype=np.float32)
        return np.concatenate([flat_window, position_info]).astype(np.float32)

    @property
    def current_price(self):
        return self.df['close'].iloc[self.current_step]

    @property
    def unrealized_pnl(self):
        if self.position > 0:
            return self.shares * (self.current_price - self.entry_price)
        return 0.0

    @property
    def total_value(self):
        return self.balance + self.shares * self.current_price

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.current_step = self.window_size
        self.balance = self.initial_balance
        self.shares = 0
        self.position = 0
        self.entry_price = 0.0
        self.total_trades = 0
        self.winning_trades = 0
        self.trade_history = []
        return self._get_observation(), {}

    def step(self, action):
        prev_total = self.total_value
        reward = 0.0
        trade_info = ""
        current_price = self.current_price

        if action == 1 and self.position == 0:
            max_shares = int(self.balance * 0.95 / (current_price * (1 + self.commission)))
            if max_shares > 0:
                cost = max_shares * current_price * (1 + self.commission)
                self.balance -= cost
                self.shares = max_shares
                self.position = 1
                self.entry_price = current_price
                trade_info = f"매수 {max_shares}주 @ {current_price:.2f}"
        elif action == 2 and self.position == 1:
            proceeds = self.shares * current_price * (1 - self.commission)
            self.balance += proceeds
            pnl = (current_price - self.entry_price) / self.entry_price
            self.total_trades += 1
            if pnl > 0:
                self.winning_trades += 1
            self.trade_history.append(pnl)
            trade_info = f"매도 {self.shares}주 @ {current_price:.2f}, 수익률: {pnl:.2%}"
            self.shares = 0
            self.position = 0
            self.entry_price = 0.0

        self.current_step += 1
        current_total = self.total_value
        reward = (current_total - prev_total) / prev_total
        terminated = self.current_step >= len(self.df) - 1
        truncated = self.total_value < self.initial_balance * 0.5

        info = {
            "total_value": self.total_value,
            "balance": self.balance,
            "position": self.position,
            "total_trades": self.total_trades,
            "trade_info": trade_info,
        }
        return self._get_observation(), reward, terminated, truncated, info

env = StockTradingEnv(stock_data, window_size=30)
obs, info = env.reset()
print(f"관찰 차원: {obs.shape}")
print(f"초기 포트폴리오: {env.total_value:,.0f}원")

Random Agent Baseline

def evaluate_random_agent(env, n_episodes=10):
    """무작위 에이전트 평가"""
    results = []
    for episode in range(n_episodes):
        obs, _ = env.reset()
        while True:
            action = env.action_space.sample()
            obs, reward, terminated, truncated, info = env.step(action)
            if terminated or truncated:
                break
        final_value = info['total_value']
        total_return = (final_value - env.initial_balance) / env.initial_balance
        results.append({'final_value': final_value, 'return': total_return, 'trades': info['total_trades']})

    returns = [r['return'] for r in results]
    print(f"=== 무작위 에이전트 ({n_episodes}회) ===")
    print(f"평균 수익률: {np.mean(returns):.2%}")
    print(f"최대 수익률: {np.max(returns):.2%}")
    print(f"최소 수익률: {np.min(returns):.2%}")
    return results

# random_results = evaluate_random_agent(env)

Feedforward DQN Model

import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random

class TradingDQN(nn.Module):
    """트레이딩용 피드포워드 DQN"""
    def __init__(self, obs_size, n_actions):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_size, 256), nn.ReLU(), nn.Dropout(0.2),
            nn.Linear(256, 128), nn.ReLU(), nn.Dropout(0.2),
            nn.Linear(128, 64), nn.ReLU(),
            nn.Linear(64, n_actions),
        )

    def forward(self, x):
        return self.net(x)

CNN Model: Processing Price Charts Like Images

Temporal patterns in price data are captured using 1D convolutions.

class TradingCNN(nn.Module):
    """1D CNN 기반 트레이딩 모델"""
    def __init__(self, window_size, n_features, n_actions):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv1d(n_features, 32, kernel_size=5, padding=2), nn.ReLU(),
            nn.Conv1d(32, 64, kernel_size=3, padding=1), nn.ReLU(),
            nn.AdaptiveAvgPool1d(1),
        )
        self.fc = nn.Sequential(
            nn.Linear(64 + 3, 64), nn.ReLU(),
            nn.Linear(64, n_actions),
        )
        self.window_size = window_size
        self.n_features = n_features

    def forward(self, x):
        batch_size = x.shape[0]
        window_data = x[:, :-3].view(batch_size, self.window_size, self.n_features)
        position_info = x[:, -3:]
        window_data = window_data.transpose(1, 2)
        conv_out = self.conv(window_data).squeeze(-1)
        combined = torch.cat([conv_out, position_info], dim=1)
        return self.fc(combined)

Training the Trading Agent

class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        s, a, r, ns, d = zip(*batch)
        return (np.array(s), np.array(a), np.array(r, dtype=np.float32),
                np.array(ns), np.array(d, dtype=np.bool_))
    def __len__(self):
        return len(self.buffer)

def train_trading_agent(env, model_type="ff", n_episodes=500):
    """트레이딩 에이전트 학습"""
    obs_size = env.observation_space.shape[0]
    n_actions = env.action_space.n
    device = torch.device("cpu")

    if model_type == "cnn":
        online_net = TradingCNN(env.window_size, env.n_features, n_actions).to(device)
        target_net = TradingCNN(env.window_size, env.n_features, n_actions).to(device)
    else:
        online_net = TradingDQN(obs_size, n_actions).to(device)
        target_net = TradingDQN(obs_size, n_actions).to(device)

    target_net.load_state_dict(online_net.state_dict())
    optimizer = optim.Adam(online_net.parameters(), lr=1e-4)
    buffer = ReplayBuffer(50000)
    epsilon = 1.0
    epsilon_min = 0.05
    epsilon_decay = 0.995
    gamma = 0.99
    batch_size = 64
    target_update = 50
    best_return = -float('inf')
    returns_history = []

    for episode in range(n_episodes):
        obs, _ = env.reset()
        total_reward = 0
        while True:
            if random.random() < epsilon:
                action = env.action_space.sample()
            else:
                with torch.no_grad():
                    q = online_net(torch.tensor([obs], dtype=torch.float32).to(device))
                action = q.argmax(dim=1).item()
            next_obs, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
            buffer.push(obs, action, reward, next_obs, done)
            total_reward += reward
            obs = next_obs
            if len(buffer) >= batch_size:
                s, a, r, ns, d = buffer.sample(batch_size)
                s_t = torch.tensor(s, dtype=torch.float32).to(device)
                a_t = torch.tensor(a, dtype=torch.long).to(device)
                r_t = torch.tensor(r, dtype=torch.float32).to(device)
                ns_t = torch.tensor(ns, dtype=torch.float32).to(device)
                d_t = torch.tensor(d, dtype=torch.bool).to(device)
                current_q = online_net(s_t).gather(1, a_t.unsqueeze(1)).squeeze(1)
                with torch.no_grad():
                    best_a = online_net(ns_t).argmax(dim=1)
                    next_q = target_net(ns_t).gather(1, best_a.unsqueeze(1)).squeeze(1)
                    next_q[d_t] = 0.0
                    target_q = r_t + gamma * next_q
                loss = nn.SmoothL1Loss()(current_q, target_q)
                optimizer.zero_grad()
                loss.backward()
                torch.nn.utils.clip_grad_norm_(online_net.parameters(), 1.0)
                optimizer.step()
            if done:
                break
        if episode % target_update == 0:
            target_net.load_state_dict(online_net.state_dict())
        epsilon = max(epsilon_min, epsilon * epsilon_decay)
        episode_return = (env.total_value - env.initial_balance) / env.initial_balance
        returns_history.append(episode_return)
        if episode_return > best_return:
            best_return = episode_return
            torch.save(online_net.state_dict(), "best_trading_model.pth")
        if episode % 50 == 0:
            mean_return = np.mean(returns_history[-50:])
            print(f"에피소드 {episode}: 수익률={episode_return:.2%}, 평균 수익률={mean_return:.2%}, 거래={info['total_trades']}, 엡실론={epsilon:.3f}")
    return online_net, returns_history

# trained_model, history = train_trading_agent(env, model_type="ff", n_episodes=500)

Agent Evaluation and Analysis

def backtest_agent(env, net, n_episodes=5):
    """학습된 에이전트 백테스트"""
    all_results = []
    for episode in range(n_episodes):
        obs, _ = env.reset()
        portfolio_values = [env.total_value]
        while True:
            with torch.no_grad():
                q = net(torch.tensor([obs], dtype=torch.float32))
            action = q.argmax(dim=1).item()
            obs, reward, terminated, truncated, info = env.step(action)
            portfolio_values.append(env.total_value)
            if terminated or truncated:
                break
        total_return = (portfolio_values[-1] - portfolio_values[0]) / portfolio_values[0]
        daily_returns = np.diff(portfolio_values) / portfolio_values[:-1]
        sharpe_ratio = np.mean(daily_returns) / (np.std(daily_returns) + 1e-10) * np.sqrt(252)
        peak = np.maximum.accumulate(portfolio_values)
        drawdown = (np.array(portfolio_values) - peak) / peak
        max_drawdown = drawdown.min()
        result = {'total_return': total_return, 'sharpe_ratio': sharpe_ratio, 'max_drawdown': max_drawdown, 'total_trades': info['total_trades']}
        all_results.append(result)
        print(f"\n에피소드 {episode + 1}: 총 수익률: {total_return:.2%}, 샤프 비율: {sharpe_ratio:.2f}, 최대 낙폭: {max_drawdown:.2%}")
    avg_return = np.mean([r['total_return'] for r in all_results])
    print(f"\n=== 전체 백테스트 결과 === 평균 수익률: {avg_return:.2%}")
    return all_results

# backtest_results = backtest_agent(env, trained_model)

Summary

  1. Problem definition: Model stock trading as an MDP (state=market data, action=buy/sell/hold, reward=profit)
  2. Data preparation: Add technical indicators to price data to construct observation space
  3. Environment design: Implement custom trading environment with Gymnasium interface
  4. Models: Both feedforward DQN and 1D CNN models can be used
  5. Reward design: Consider various factors beyond simple returns, such as risk adjustment and trading penalties
  6. Evaluation: Evaluate with diverse performance metrics including returns, Sharpe ratio, and maximum drawdown

In the next article, we will move beyond value-based methods to explore Policy Gradient methods that directly optimize the policy.