Split View: [심층 강화학습] 08. 강화학습으로 주식 트레이딩 만들기
[심층 강화학습] 08. 강화학습으로 주식 트레이딩 만들기
트레이딩과 강화학습
주식 트레이딩은 강화학습의 자연스러운 적용 분야입니다. 트레이더(에이전트)가 시장(환경)에서 매수/매도/보유(행동)를 결정하고, 수익(보상)을 얻는 구조가 강화학습 프레임워크와 정확히 일치합니다.
주의사항
이 글은 교육 목적의 실습이며, 실제 투자에 직접 적용하는 것은 권장하지 않습니다. 실제 금융 시장은 여기서 다루는 것보다 훨씬 복잡합니다.
트레이딩 기초 개념
기본 용어
- 매수 (Buy/Long): 주식을 구매하여 가격 상승에 베팅
- 매도 (Sell/Short): 보유 주식을 판매하여 포지션 청산
- 포지션 (Position): 현재 보유하고 있는 주식의 상태
- 수익률 (Return): 투자 대비 이익의 비율
- 수수료 (Commission): 거래 시 발생하는 비용
- 슬리피지 (Slippage): 주문 가격과 실제 체결 가격의 차이
강화학습으로 트레이딩을 모델링하기
| RL 요소 | 트레이딩 대응 |
|---|---|
| 상태 | 과거 가격 데이터, 기술적 지표, 현재 포지션 |
| 행동 | 매수, 매도, 보유 |
| 보상 | 실현 수익, 미실현 수익 변화량 |
| 에피소드 | 일정 기간의 트레이딩 세션 |
데이터 준비
가격 데이터 생성
실습을 위해 합성 데이터를 생성합니다. 실제 적용 시에는 Yahoo Finance 등에서 데이터를 가져올 수 있습니다.
import numpy as np
import pandas as pd
def generate_stock_data(n_days=1000, initial_price=100.0, volatility=0.02, seed=42):
"""합성 주가 데이터 생성 (기하 브라운 운동 모델)"""
np.random.seed(seed)
# 일별 수익률 생성
daily_returns = np.random.normal(0.0005, volatility, n_days)
# 가격 계산
prices = initial_price * np.cumprod(1 + daily_returns)
# OHLCV 데이터 생성
data = pd.DataFrame()
data['close'] = prices
data['open'] = prices * (1 + np.random.normal(0, 0.005, n_days))
data['high'] = np.maximum(data['open'], data['close']) * (1 + np.abs(np.random.normal(0, 0.01, n_days)))
data['low'] = np.minimum(data['open'], data['close']) * (1 - np.abs(np.random.normal(0, 0.01, n_days)))
data['volume'] = np.random.randint(100000, 1000000, n_days).astype(float)
return data
# 데이터 생성
stock_data = generate_stock_data(n_days=2000)
print(f"데이터 크기: {len(stock_data)}")
print(stock_data.head())
기술적 지표 계산
def add_technical_indicators(df):
"""기술적 지표 추가"""
# 이동평균
df['sma_10'] = df['close'].rolling(window=10).mean()
df['sma_30'] = df['close'].rolling(window=30).mean()
# 상대강도지수 (RSI)
delta = df['close'].diff()
gain = delta.where(delta > 0, 0).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / (loss + 1e-10)
df['rsi'] = 100 - (100 / (1 + rs))
# 볼린저 밴드
bb_mean = df['close'].rolling(window=20).mean()
bb_std = df['close'].rolling(window=20).std()
df['bb_upper'] = bb_mean + 2 * bb_std
df['bb_lower'] = bb_mean - 2 * bb_std
df['bb_position'] = (df['close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower'] + 1e-10)
# MACD
ema12 = df['close'].ewm(span=12).mean()
ema26 = df['close'].ewm(span=26).mean()
df['macd'] = ema12 - ema26
df['macd_signal'] = df['macd'].ewm(span=9).mean()
# 수익률
df['returns'] = df['close'].pct_change()
df['returns_5d'] = df['close'].pct_change(5)
# NaN 제거
df.dropna(inplace=True)
df.reset_index(drop=True, inplace=True)
return df
stock_data = add_technical_indicators(stock_data)
print(f"지표 추가 후 데이터 크기: {len(stock_data)}")
print(f"특성 목록: {list(stock_data.columns)}")
트레이딩 환경 설계
Gymnasium 인터페이스를 따르는 커스텀 트레이딩 환경을 구현합니다.
import gymnasium as gym
from gymnasium import spaces
class StockTradingEnv(gym.Env):
"""주식 트레이딩 환경"""
metadata = {"render_modes": ["human"]}
def __init__(self, df, window_size=30, commission=0.001,
initial_balance=100000):
super().__init__()
self.df = df
self.window_size = window_size
self.commission = commission
self.initial_balance = initial_balance
# 특성 열 선택
self.feature_columns = [
'close', 'volume', 'sma_10', 'sma_30', 'rsi',
'bb_position', 'macd', 'macd_signal', 'returns', 'returns_5d'
]
self.n_features = len(self.feature_columns)
# 행동 공간: 0=보유, 1=매수, 2=매도
self.action_space = spaces.Discrete(3)
# 관찰 공간: 가격 윈도우 + 포지션 정보
obs_shape = self.window_size * self.n_features + 3 # +3: 포지션, 수익률, 보유비율
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf, shape=(obs_shape,), dtype=np.float32
)
def _get_observation(self):
"""현재 관찰값 생성"""
# 가격 윈도우 데이터
start = self.current_step - self.window_size
end = self.current_step
window_data = self.df[self.feature_columns].iloc[start:end].values
# 정규화 (각 특성별로 윈도우 내 min-max)
for i in range(self.n_features):
col = window_data[:, i]
min_val = col.min()
max_val = col.max()
if max_val - min_val > 0:
window_data[:, i] = (col - min_val) / (max_val - min_val)
else:
window_data[:, i] = 0.0
flat_window = window_data.flatten()
# 포지션 정보
position_info = np.array([
1.0 if self.position > 0 else 0.0, # 포지션 보유 여부
self.unrealized_pnl / self.initial_balance, # 미실현 수익률
self.shares * self.current_price / self.total_value, # 주식 보유 비율
], dtype=np.float32)
return np.concatenate([flat_window, position_info]).astype(np.float32)
@property
def current_price(self):
return self.df['close'].iloc[self.current_step]
@property
def unrealized_pnl(self):
if self.position > 0:
return self.shares * (self.current_price - self.entry_price)
return 0.0
@property
def total_value(self):
return self.balance + self.shares * self.current_price
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self.current_step = self.window_size
self.balance = self.initial_balance
self.shares = 0
self.position = 0 # 0: 없음, 1: 롱
self.entry_price = 0.0
self.total_trades = 0
self.winning_trades = 0
self.trade_history = []
return self._get_observation(), {}
def step(self, action):
prev_total = self.total_value
reward = 0.0
trade_info = ""
current_price = self.current_price
if action == 1 and self.position == 0: # 매수
# 전체 잔고의 95%로 매수 (수수료 고려)
max_shares = int(self.balance * 0.95 / (current_price * (1 + self.commission)))
if max_shares > 0:
cost = max_shares * current_price * (1 + self.commission)
self.balance -= cost
self.shares = max_shares
self.position = 1
self.entry_price = current_price
trade_info = f"매수 {max_shares}주 @ {current_price:.2f}"
elif action == 2 and self.position == 1: # 매도
proceeds = self.shares * current_price * (1 - self.commission)
self.balance += proceeds
pnl = (current_price - self.entry_price) / self.entry_price
self.total_trades += 1
if pnl > 0:
self.winning_trades += 1
self.trade_history.append(pnl)
trade_info = f"매도 {self.shares}주 @ {current_price:.2f}, 수익률: {pnl:.2%}"
self.shares = 0
self.position = 0
self.entry_price = 0.0
# 다음 스텝으로 이동
self.current_step += 1
# 보상: 포트폴리오 가치 변화율
current_total = self.total_value
reward = (current_total - prev_total) / prev_total
# 종료 조건
terminated = self.current_step >= len(self.df) - 1
truncated = self.total_value < self.initial_balance * 0.5 # 50% 이상 손실
info = {
"total_value": self.total_value,
"balance": self.balance,
"position": self.position,
"total_trades": self.total_trades,
"trade_info": trade_info,
}
return self._get_observation(), reward, terminated, truncated, info
# 환경 테스트
env = StockTradingEnv(stock_data, window_size=30)
obs, info = env.reset()
print(f"관찰 차원: {obs.shape}")
print(f"초기 포트폴리오: {env.total_value:,.0f}원")
무작위 에이전트 기준선
def evaluate_random_agent(env, n_episodes=10):
"""무작위 에이전트 평가"""
results = []
for episode in range(n_episodes):
obs, _ = env.reset()
while True:
action = env.action_space.sample()
obs, reward, terminated, truncated, info = env.step(action)
if terminated or truncated:
break
final_value = info['total_value']
total_return = (final_value - env.initial_balance) / env.initial_balance
results.append({
'final_value': final_value,
'return': total_return,
'trades': info['total_trades'],
})
returns = [r['return'] for r in results]
print(f"=== 무작위 에이전트 ({n_episodes}회) ===")
print(f"평균 수익률: {np.mean(returns):.2%}")
print(f"최대 수익률: {np.max(returns):.2%}")
print(f"최소 수익률: {np.min(returns):.2%}")
print(f"평균 거래 횟수: {np.mean([r['trades'] for r in results]):.1f}")
return results
# random_results = evaluate_random_agent(env)
피드포워드 DQN 모델
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random
class TradingDQN(nn.Module):
"""트레이딩용 피드포워드 DQN"""
def __init__(self, obs_size, n_actions):
super().__init__()
self.net = nn.Sequential(
nn.Linear(obs_size, 256),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(256, 128),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, n_actions),
)
def forward(self, x):
return self.net(x)
CNN 모델: 가격 차트를 이미지처럼 처리
가격 데이터의 시간적 패턴을 1D 합성곱으로 포착합니다.
class TradingCNN(nn.Module):
"""1D CNN 기반 트레이딩 모델"""
def __init__(self, window_size, n_features, n_actions):
super().__init__()
self.conv = nn.Sequential(
nn.Conv1d(n_features, 32, kernel_size=5, padding=2),
nn.ReLU(),
nn.Conv1d(32, 64, kernel_size=3, padding=1),
nn.ReLU(),
nn.AdaptiveAvgPool1d(1), # 시간 차원을 1로 축소
)
# CNN 출력 + 포지션 정보 (3개)
self.fc = nn.Sequential(
nn.Linear(64 + 3, 64),
nn.ReLU(),
nn.Linear(64, n_actions),
)
self.window_size = window_size
self.n_features = n_features
def forward(self, x):
batch_size = x.shape[0]
# 윈도우 데이터와 포지션 정보 분리
window_data = x[:, :-3].view(batch_size, self.window_size, self.n_features)
position_info = x[:, -3:]
# CNN: (batch, features, time) 형태로 변환
window_data = window_data.transpose(1, 2)
conv_out = self.conv(window_data).squeeze(-1)
# 포지션 정보와 결합
combined = torch.cat([conv_out, position_info], dim=1)
return self.fc(combined)
트레이딩 에이전트 학습
class ReplayBuffer:
def __init__(self, capacity):
self.buffer = deque(maxlen=capacity)
def push(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
batch = random.sample(self.buffer, batch_size)
s, a, r, ns, d = zip(*batch)
return (np.array(s), np.array(a), np.array(r, dtype=np.float32),
np.array(ns), np.array(d, dtype=np.bool_))
def __len__(self):
return len(self.buffer)
def train_trading_agent(env, model_type="ff", n_episodes=500):
"""트레이딩 에이전트 학습"""
obs_size = env.observation_space.shape[0]
n_actions = env.action_space.n
device = torch.device("cpu")
if model_type == "cnn":
online_net = TradingCNN(env.window_size, env.n_features, n_actions).to(device)
target_net = TradingCNN(env.window_size, env.n_features, n_actions).to(device)
else:
online_net = TradingDQN(obs_size, n_actions).to(device)
target_net = TradingDQN(obs_size, n_actions).to(device)
target_net.load_state_dict(online_net.state_dict())
optimizer = optim.Adam(online_net.parameters(), lr=1e-4)
buffer = ReplayBuffer(50000)
epsilon = 1.0
epsilon_min = 0.05
epsilon_decay = 0.995
gamma = 0.99
batch_size = 64
target_update = 50
best_return = -float('inf')
returns_history = []
for episode in range(n_episodes):
obs, _ = env.reset()
total_reward = 0
while True:
# 엡실론-탐욕 행동 선택
if random.random() < epsilon:
action = env.action_space.sample()
else:
with torch.no_grad():
q = online_net(torch.tensor([obs], dtype=torch.float32).to(device))
action = q.argmax(dim=1).item()
next_obs, reward, terminated, truncated, info = env.step(action)
done = terminated or truncated
buffer.push(obs, action, reward, next_obs, done)
total_reward += reward
obs = next_obs
# 학습
if len(buffer) >= batch_size:
s, a, r, ns, d = buffer.sample(batch_size)
s_t = torch.tensor(s, dtype=torch.float32).to(device)
a_t = torch.tensor(a, dtype=torch.long).to(device)
r_t = torch.tensor(r, dtype=torch.float32).to(device)
ns_t = torch.tensor(ns, dtype=torch.float32).to(device)
d_t = torch.tensor(d, dtype=torch.bool).to(device)
current_q = online_net(s_t).gather(1, a_t.unsqueeze(1)).squeeze(1)
with torch.no_grad():
# Double DQN
best_a = online_net(ns_t).argmax(dim=1)
next_q = target_net(ns_t).gather(1, best_a.unsqueeze(1)).squeeze(1)
next_q[d_t] = 0.0
target_q = r_t + gamma * next_q
loss = nn.SmoothL1Loss()(current_q, target_q)
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(online_net.parameters(), 1.0)
optimizer.step()
if done:
break
# 타겟 네트워크 업데이트
if episode % target_update == 0:
target_net.load_state_dict(online_net.state_dict())
epsilon = max(epsilon_min, epsilon * epsilon_decay)
# 결과 기록
episode_return = (env.total_value - env.initial_balance) / env.initial_balance
returns_history.append(episode_return)
if episode_return > best_return:
best_return = episode_return
torch.save(online_net.state_dict(), "best_trading_model.pth")
if episode % 50 == 0:
mean_return = np.mean(returns_history[-50:])
print(
f"에피소드 {episode}: "
f"수익률={episode_return:.2%}, "
f"평균 수익률={mean_return:.2%}, "
f"거래={info['total_trades']}, "
f"엡실론={epsilon:.3f}"
)
return online_net, returns_history
# 학습 실행
# trained_model, history = train_trading_agent(env, model_type="ff", n_episodes=500)
에이전트 평가 및 분석
def backtest_agent(env, net, n_episodes=5):
"""학습된 에이전트 백테스트"""
all_results = []
for episode in range(n_episodes):
obs, _ = env.reset()
portfolio_values = [env.total_value]
trade_log = []
while True:
with torch.no_grad():
q = net(torch.tensor([obs], dtype=torch.float32))
action = q.argmax(dim=1).item()
obs, reward, terminated, truncated, info = env.step(action)
portfolio_values.append(env.total_value)
if info.get('trade_info'):
trade_log.append(info['trade_info'])
if terminated or truncated:
break
total_return = (portfolio_values[-1] - portfolio_values[0]) / portfolio_values[0]
# 성과 지표 계산
daily_returns = np.diff(portfolio_values) / portfolio_values[:-1]
sharpe_ratio = np.mean(daily_returns) / (np.std(daily_returns) + 1e-10) * np.sqrt(252)
# 최대 낙폭 (MDD)
peak = np.maximum.accumulate(portfolio_values)
drawdown = (np.array(portfolio_values) - peak) / peak
max_drawdown = drawdown.min()
result = {
'total_return': total_return,
'sharpe_ratio': sharpe_ratio,
'max_drawdown': max_drawdown,
'total_trades': info['total_trades'],
'portfolio_values': portfolio_values,
}
all_results.append(result)
print(f"\n에피소드 {episode + 1}:")
print(f" 총 수익률: {total_return:.2%}")
print(f" 샤프 비율: {sharpe_ratio:.2f}")
print(f" 최대 낙폭: {max_drawdown:.2%}")
print(f" 총 거래 횟수: {info['total_trades']}")
# 전체 평균
print("\n=== 전체 백테스트 결과 ===")
avg_return = np.mean([r['total_return'] for r in all_results])
avg_sharpe = np.mean([r['sharpe_ratio'] for r in all_results])
avg_mdd = np.mean([r['max_drawdown'] for r in all_results])
print(f"평균 수익률: {avg_return:.2%}")
print(f"평균 샤프 비율: {avg_sharpe:.2f}")
print(f"평균 최대 낙폭: {avg_mdd:.2%}")
return all_results
# backtest_results = backtest_agent(env, trained_model)
바이앤홀드 전략과 비교
def compare_with_buy_and_hold(df, agent_results, initial_balance=100000):
"""바이앤홀드 전략과 RL 에이전트 비교"""
# 바이앤홀드: 처음에 매수하고 끝까지 보유
start_price = df['close'].iloc[30] # window_size 이후
end_price = df['close'].iloc[-1]
bnh_return = (end_price - start_price) / start_price
agent_return = np.mean([r['total_return'] for r in agent_results])
print("=== 전략 비교 ===")
print(f"바이앤홀드 수익률: {bnh_return:.2%}")
print(f"RL 에이전트 수익률: {agent_return:.2%}")
print(f"초과 수익률: {agent_return - bnh_return:.2%}")
# compare_with_buy_and_hold(stock_data, backtest_results)
보상 함수 설계의 중요성
보상 함수의 설계는 트레이딩 에이전트의 성능에 결정적입니다.
class ImprovedRewardEnv(StockTradingEnv):
"""개선된 보상 함수를 사용하는 트레이딩 환경"""
def _compute_reward(self, prev_total, action):
"""다양한 보상 설계 방식"""
current_total = self.total_value
# 1. 단순 포트폴리오 변화율
basic_reward = (current_total - prev_total) / prev_total
# 2. 리스크 조정 보상 (샤프 비율 유사)
# 수익률에서 변동성 페널티를 차감
volatility_penalty = 0.0
if len(self.trade_history) > 1:
recent_returns = self.trade_history[-10:]
volatility_penalty = np.std(recent_returns) * 0.1
# 3. 과도한 거래 페널티
trade_penalty = 0.0
if action != 0: # 보유가 아닌 경우
trade_penalty = -0.0001
# 4. 승률 보너스
win_bonus = 0.0
if self.total_trades > 10:
win_rate = self.winning_trades / self.total_trades
if win_rate > 0.5:
win_bonus = 0.0001
return basic_reward - volatility_penalty + trade_penalty + win_bonus
정리
- 문제 정의: 주식 트레이딩을 MDP로 모델링 (상태=시장 데이터, 행동=매수/매도/보유, 보상=수익)
- 데이터 준비: 가격 데이터에 기술적 지표를 추가하여 관찰 공간 구성
- 환경 설계: Gymnasium 인터페이스의 커스텀 트레이딩 환경 구현
- 모델: 피드포워드 DQN과 1D CNN 모델 모두 활용 가능
- 보상 설계: 단순 수익률 외에 리스크 조정, 거래 페널티 등 다양한 요소 고려
- 평가: 수익률, 샤프 비율, 최대 낙폭 등 다양한 성과 지표로 평가
다음 글에서는 가치 기반 방법에서 벗어나 정책을 직접 최적화하는 Policy Gradient 방법을 살펴보겠습니다.
[Deep RL] 08. Building a Stock Trading Agent with Reinforcement Learning
Trading and Reinforcement Learning
Stock trading is a natural application of reinforcement learning. The structure where a trader (agent) decides to buy/sell/hold (actions) in the market (environment) and earns profit (reward) matches the RL framework exactly.
Disclaimer
This article is for educational purposes only, and applying it directly to real investments is not recommended. Real financial markets are far more complex than what is covered here.
Trading Basics
Basic Terminology
- Buy/Long: Purchase stocks betting on price increase
- Sell/Short: Sell held stocks to close a position
- Position: Current state of held stocks
- Return: Ratio of profit to investment
- Commission: Cost incurred during transactions
- Slippage: Difference between order price and actual execution price
Modeling Trading as Reinforcement Learning
| RL Element | Trading Counterpart |
|---|---|
| State | Historical price data, technical indicators, current position |
| Action | Buy, Sell, Hold |
| Reward | Realized profit, unrealized P&L change |
| Episode | A trading session over a given period |
Data Preparation
Price Data Generation
We generate synthetic data for practice. For real applications, data can be fetched from Yahoo Finance, etc.
import numpy as np
import pandas as pd
def generate_stock_data(n_days=1000, initial_price=100.0, volatility=0.02, seed=42):
"""합성 주가 데이터 생성 (기하 브라운 운동 모델)"""
np.random.seed(seed)
daily_returns = np.random.normal(0.0005, volatility, n_days)
prices = initial_price * np.cumprod(1 + daily_returns)
data = pd.DataFrame()
data['close'] = prices
data['open'] = prices * (1 + np.random.normal(0, 0.005, n_days))
data['high'] = np.maximum(data['open'], data['close']) * (1 + np.abs(np.random.normal(0, 0.01, n_days)))
data['low'] = np.minimum(data['open'], data['close']) * (1 - np.abs(np.random.normal(0, 0.01, n_days)))
data['volume'] = np.random.randint(100000, 1000000, n_days).astype(float)
return data
stock_data = generate_stock_data(n_days=2000)
print(f"데이터 크기: {len(stock_data)}")
print(stock_data.head())
Technical Indicators
def add_technical_indicators(df):
"""기술적 지표 추가"""
df['sma_10'] = df['close'].rolling(window=10).mean()
df['sma_30'] = df['close'].rolling(window=30).mean()
delta = df['close'].diff()
gain = delta.where(delta > 0, 0).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / (loss + 1e-10)
df['rsi'] = 100 - (100 / (1 + rs))
bb_mean = df['close'].rolling(window=20).mean()
bb_std = df['close'].rolling(window=20).std()
df['bb_upper'] = bb_mean + 2 * bb_std
df['bb_lower'] = bb_mean - 2 * bb_std
df['bb_position'] = (df['close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower'] + 1e-10)
ema12 = df['close'].ewm(span=12).mean()
ema26 = df['close'].ewm(span=26).mean()
df['macd'] = ema12 - ema26
df['macd_signal'] = df['macd'].ewm(span=9).mean()
df['returns'] = df['close'].pct_change()
df['returns_5d'] = df['close'].pct_change(5)
df.dropna(inplace=True)
df.reset_index(drop=True, inplace=True)
return df
stock_data = add_technical_indicators(stock_data)
print(f"지표 추가 후 데이터 크기: {len(stock_data)}")
print(f"특성 목록: {list(stock_data.columns)}")
Trading Environment Design
We implement a custom trading environment following the Gymnasium interface.
import gymnasium as gym
from gymnasium import spaces
class StockTradingEnv(gym.Env):
"""주식 트레이딩 환경"""
metadata = {"render_modes": ["human"]}
def __init__(self, df, window_size=30, commission=0.001,
initial_balance=100000):
super().__init__()
self.df = df
self.window_size = window_size
self.commission = commission
self.initial_balance = initial_balance
self.feature_columns = [
'close', 'volume', 'sma_10', 'sma_30', 'rsi',
'bb_position', 'macd', 'macd_signal', 'returns', 'returns_5d'
]
self.n_features = len(self.feature_columns)
self.action_space = spaces.Discrete(3)
obs_shape = self.window_size * self.n_features + 3
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf, shape=(obs_shape,), dtype=np.float32
)
def _get_observation(self):
"""현재 관찰값 생성"""
start = self.current_step - self.window_size
end = self.current_step
window_data = self.df[self.feature_columns].iloc[start:end].values
for i in range(self.n_features):
col = window_data[:, i]
min_val = col.min()
max_val = col.max()
if max_val - min_val > 0:
window_data[:, i] = (col - min_val) / (max_val - min_val)
else:
window_data[:, i] = 0.0
flat_window = window_data.flatten()
position_info = np.array([
1.0 if self.position > 0 else 0.0,
self.unrealized_pnl / self.initial_balance,
self.shares * self.current_price / self.total_value,
], dtype=np.float32)
return np.concatenate([flat_window, position_info]).astype(np.float32)
@property
def current_price(self):
return self.df['close'].iloc[self.current_step]
@property
def unrealized_pnl(self):
if self.position > 0:
return self.shares * (self.current_price - self.entry_price)
return 0.0
@property
def total_value(self):
return self.balance + self.shares * self.current_price
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self.current_step = self.window_size
self.balance = self.initial_balance
self.shares = 0
self.position = 0
self.entry_price = 0.0
self.total_trades = 0
self.winning_trades = 0
self.trade_history = []
return self._get_observation(), {}
def step(self, action):
prev_total = self.total_value
reward = 0.0
trade_info = ""
current_price = self.current_price
if action == 1 and self.position == 0:
max_shares = int(self.balance * 0.95 / (current_price * (1 + self.commission)))
if max_shares > 0:
cost = max_shares * current_price * (1 + self.commission)
self.balance -= cost
self.shares = max_shares
self.position = 1
self.entry_price = current_price
trade_info = f"매수 {max_shares}주 @ {current_price:.2f}"
elif action == 2 and self.position == 1:
proceeds = self.shares * current_price * (1 - self.commission)
self.balance += proceeds
pnl = (current_price - self.entry_price) / self.entry_price
self.total_trades += 1
if pnl > 0:
self.winning_trades += 1
self.trade_history.append(pnl)
trade_info = f"매도 {self.shares}주 @ {current_price:.2f}, 수익률: {pnl:.2%}"
self.shares = 0
self.position = 0
self.entry_price = 0.0
self.current_step += 1
current_total = self.total_value
reward = (current_total - prev_total) / prev_total
terminated = self.current_step >= len(self.df) - 1
truncated = self.total_value < self.initial_balance * 0.5
info = {
"total_value": self.total_value,
"balance": self.balance,
"position": self.position,
"total_trades": self.total_trades,
"trade_info": trade_info,
}
return self._get_observation(), reward, terminated, truncated, info
env = StockTradingEnv(stock_data, window_size=30)
obs, info = env.reset()
print(f"관찰 차원: {obs.shape}")
print(f"초기 포트폴리오: {env.total_value:,.0f}원")
Random Agent Baseline
def evaluate_random_agent(env, n_episodes=10):
"""무작위 에이전트 평가"""
results = []
for episode in range(n_episodes):
obs, _ = env.reset()
while True:
action = env.action_space.sample()
obs, reward, terminated, truncated, info = env.step(action)
if terminated or truncated:
break
final_value = info['total_value']
total_return = (final_value - env.initial_balance) / env.initial_balance
results.append({'final_value': final_value, 'return': total_return, 'trades': info['total_trades']})
returns = [r['return'] for r in results]
print(f"=== 무작위 에이전트 ({n_episodes}회) ===")
print(f"평균 수익률: {np.mean(returns):.2%}")
print(f"최대 수익률: {np.max(returns):.2%}")
print(f"최소 수익률: {np.min(returns):.2%}")
return results
# random_results = evaluate_random_agent(env)
Feedforward DQN Model
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random
class TradingDQN(nn.Module):
"""트레이딩용 피드포워드 DQN"""
def __init__(self, obs_size, n_actions):
super().__init__()
self.net = nn.Sequential(
nn.Linear(obs_size, 256), nn.ReLU(), nn.Dropout(0.2),
nn.Linear(256, 128), nn.ReLU(), nn.Dropout(0.2),
nn.Linear(128, 64), nn.ReLU(),
nn.Linear(64, n_actions),
)
def forward(self, x):
return self.net(x)
CNN Model: Processing Price Charts Like Images
Temporal patterns in price data are captured using 1D convolutions.
class TradingCNN(nn.Module):
"""1D CNN 기반 트레이딩 모델"""
def __init__(self, window_size, n_features, n_actions):
super().__init__()
self.conv = nn.Sequential(
nn.Conv1d(n_features, 32, kernel_size=5, padding=2), nn.ReLU(),
nn.Conv1d(32, 64, kernel_size=3, padding=1), nn.ReLU(),
nn.AdaptiveAvgPool1d(1),
)
self.fc = nn.Sequential(
nn.Linear(64 + 3, 64), nn.ReLU(),
nn.Linear(64, n_actions),
)
self.window_size = window_size
self.n_features = n_features
def forward(self, x):
batch_size = x.shape[0]
window_data = x[:, :-3].view(batch_size, self.window_size, self.n_features)
position_info = x[:, -3:]
window_data = window_data.transpose(1, 2)
conv_out = self.conv(window_data).squeeze(-1)
combined = torch.cat([conv_out, position_info], dim=1)
return self.fc(combined)
Training the Trading Agent
class ReplayBuffer:
def __init__(self, capacity):
self.buffer = deque(maxlen=capacity)
def push(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
batch = random.sample(self.buffer, batch_size)
s, a, r, ns, d = zip(*batch)
return (np.array(s), np.array(a), np.array(r, dtype=np.float32),
np.array(ns), np.array(d, dtype=np.bool_))
def __len__(self):
return len(self.buffer)
def train_trading_agent(env, model_type="ff", n_episodes=500):
"""트레이딩 에이전트 학습"""
obs_size = env.observation_space.shape[0]
n_actions = env.action_space.n
device = torch.device("cpu")
if model_type == "cnn":
online_net = TradingCNN(env.window_size, env.n_features, n_actions).to(device)
target_net = TradingCNN(env.window_size, env.n_features, n_actions).to(device)
else:
online_net = TradingDQN(obs_size, n_actions).to(device)
target_net = TradingDQN(obs_size, n_actions).to(device)
target_net.load_state_dict(online_net.state_dict())
optimizer = optim.Adam(online_net.parameters(), lr=1e-4)
buffer = ReplayBuffer(50000)
epsilon = 1.0
epsilon_min = 0.05
epsilon_decay = 0.995
gamma = 0.99
batch_size = 64
target_update = 50
best_return = -float('inf')
returns_history = []
for episode in range(n_episodes):
obs, _ = env.reset()
total_reward = 0
while True:
if random.random() < epsilon:
action = env.action_space.sample()
else:
with torch.no_grad():
q = online_net(torch.tensor([obs], dtype=torch.float32).to(device))
action = q.argmax(dim=1).item()
next_obs, reward, terminated, truncated, info = env.step(action)
done = terminated or truncated
buffer.push(obs, action, reward, next_obs, done)
total_reward += reward
obs = next_obs
if len(buffer) >= batch_size:
s, a, r, ns, d = buffer.sample(batch_size)
s_t = torch.tensor(s, dtype=torch.float32).to(device)
a_t = torch.tensor(a, dtype=torch.long).to(device)
r_t = torch.tensor(r, dtype=torch.float32).to(device)
ns_t = torch.tensor(ns, dtype=torch.float32).to(device)
d_t = torch.tensor(d, dtype=torch.bool).to(device)
current_q = online_net(s_t).gather(1, a_t.unsqueeze(1)).squeeze(1)
with torch.no_grad():
best_a = online_net(ns_t).argmax(dim=1)
next_q = target_net(ns_t).gather(1, best_a.unsqueeze(1)).squeeze(1)
next_q[d_t] = 0.0
target_q = r_t + gamma * next_q
loss = nn.SmoothL1Loss()(current_q, target_q)
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(online_net.parameters(), 1.0)
optimizer.step()
if done:
break
if episode % target_update == 0:
target_net.load_state_dict(online_net.state_dict())
epsilon = max(epsilon_min, epsilon * epsilon_decay)
episode_return = (env.total_value - env.initial_balance) / env.initial_balance
returns_history.append(episode_return)
if episode_return > best_return:
best_return = episode_return
torch.save(online_net.state_dict(), "best_trading_model.pth")
if episode % 50 == 0:
mean_return = np.mean(returns_history[-50:])
print(f"에피소드 {episode}: 수익률={episode_return:.2%}, 평균 수익률={mean_return:.2%}, 거래={info['total_trades']}, 엡실론={epsilon:.3f}")
return online_net, returns_history
# trained_model, history = train_trading_agent(env, model_type="ff", n_episodes=500)
Agent Evaluation and Analysis
def backtest_agent(env, net, n_episodes=5):
"""학습된 에이전트 백테스트"""
all_results = []
for episode in range(n_episodes):
obs, _ = env.reset()
portfolio_values = [env.total_value]
while True:
with torch.no_grad():
q = net(torch.tensor([obs], dtype=torch.float32))
action = q.argmax(dim=1).item()
obs, reward, terminated, truncated, info = env.step(action)
portfolio_values.append(env.total_value)
if terminated or truncated:
break
total_return = (portfolio_values[-1] - portfolio_values[0]) / portfolio_values[0]
daily_returns = np.diff(portfolio_values) / portfolio_values[:-1]
sharpe_ratio = np.mean(daily_returns) / (np.std(daily_returns) + 1e-10) * np.sqrt(252)
peak = np.maximum.accumulate(portfolio_values)
drawdown = (np.array(portfolio_values) - peak) / peak
max_drawdown = drawdown.min()
result = {'total_return': total_return, 'sharpe_ratio': sharpe_ratio, 'max_drawdown': max_drawdown, 'total_trades': info['total_trades']}
all_results.append(result)
print(f"\n에피소드 {episode + 1}: 총 수익률: {total_return:.2%}, 샤프 비율: {sharpe_ratio:.2f}, 최대 낙폭: {max_drawdown:.2%}")
avg_return = np.mean([r['total_return'] for r in all_results])
print(f"\n=== 전체 백테스트 결과 === 평균 수익률: {avg_return:.2%}")
return all_results
# backtest_results = backtest_agent(env, trained_model)
Summary
- Problem definition: Model stock trading as an MDP (state=market data, action=buy/sell/hold, reward=profit)
- Data preparation: Add technical indicators to price data to construct observation space
- Environment design: Implement custom trading environment with Gymnasium interface
- Models: Both feedforward DQN and 1D CNN models can be used
- Reward design: Consider various factors beyond simple returns, such as risk adjustment and trading penalties
- Evaluation: Evaluate with diverse performance metrics including returns, Sharpe ratio, and maximum drawdown
In the next article, we will move beyond value-based methods to explore Policy Gradient methods that directly optimize the policy.