- Authors

- Name
- Youngju Kim
- @fjvbn20031
트레이딩과 강화학습
주식 트레이딩은 강화학습의 자연스러운 적용 분야입니다. 트레이더(에이전트)가 시장(환경)에서 매수/매도/보유(행동)를 결정하고, 수익(보상)을 얻는 구조가 강화학습 프레임워크와 정확히 일치합니다.
주의사항
이 글은 교육 목적의 실습이며, 실제 투자에 직접 적용하는 것은 권장하지 않습니다. 실제 금융 시장은 여기서 다루는 것보다 훨씬 복잡합니다.
트레이딩 기초 개념
기본 용어
- 매수 (Buy/Long): 주식을 구매하여 가격 상승에 베팅
- 매도 (Sell/Short): 보유 주식을 판매하여 포지션 청산
- 포지션 (Position): 현재 보유하고 있는 주식의 상태
- 수익률 (Return): 투자 대비 이익의 비율
- 수수료 (Commission): 거래 시 발생하는 비용
- 슬리피지 (Slippage): 주문 가격과 실제 체결 가격의 차이
강화학습으로 트레이딩을 모델링하기
| RL 요소 | 트레이딩 대응 |
|---|---|
| 상태 | 과거 가격 데이터, 기술적 지표, 현재 포지션 |
| 행동 | 매수, 매도, 보유 |
| 보상 | 실현 수익, 미실현 수익 변화량 |
| 에피소드 | 일정 기간의 트레이딩 세션 |
데이터 준비
가격 데이터 생성
실습을 위해 합성 데이터를 생성합니다. 실제 적용 시에는 Yahoo Finance 등에서 데이터를 가져올 수 있습니다.
import numpy as np
import pandas as pd
def generate_stock_data(n_days=1000, initial_price=100.0, volatility=0.02, seed=42):
"""합성 주가 데이터 생성 (기하 브라운 운동 모델)"""
np.random.seed(seed)
# 일별 수익률 생성
daily_returns = np.random.normal(0.0005, volatility, n_days)
# 가격 계산
prices = initial_price * np.cumprod(1 + daily_returns)
# OHLCV 데이터 생성
data = pd.DataFrame()
data['close'] = prices
data['open'] = prices * (1 + np.random.normal(0, 0.005, n_days))
data['high'] = np.maximum(data['open'], data['close']) * (1 + np.abs(np.random.normal(0, 0.01, n_days)))
data['low'] = np.minimum(data['open'], data['close']) * (1 - np.abs(np.random.normal(0, 0.01, n_days)))
data['volume'] = np.random.randint(100000, 1000000, n_days).astype(float)
return data
# 데이터 생성
stock_data = generate_stock_data(n_days=2000)
print(f"데이터 크기: {len(stock_data)}")
print(stock_data.head())
기술적 지표 계산
def add_technical_indicators(df):
"""기술적 지표 추가"""
# 이동평균
df['sma_10'] = df['close'].rolling(window=10).mean()
df['sma_30'] = df['close'].rolling(window=30).mean()
# 상대강도지수 (RSI)
delta = df['close'].diff()
gain = delta.where(delta > 0, 0).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / (loss + 1e-10)
df['rsi'] = 100 - (100 / (1 + rs))
# 볼린저 밴드
bb_mean = df['close'].rolling(window=20).mean()
bb_std = df['close'].rolling(window=20).std()
df['bb_upper'] = bb_mean + 2 * bb_std
df['bb_lower'] = bb_mean - 2 * bb_std
df['bb_position'] = (df['close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower'] + 1e-10)
# MACD
ema12 = df['close'].ewm(span=12).mean()
ema26 = df['close'].ewm(span=26).mean()
df['macd'] = ema12 - ema26
df['macd_signal'] = df['macd'].ewm(span=9).mean()
# 수익률
df['returns'] = df['close'].pct_change()
df['returns_5d'] = df['close'].pct_change(5)
# NaN 제거
df.dropna(inplace=True)
df.reset_index(drop=True, inplace=True)
return df
stock_data = add_technical_indicators(stock_data)
print(f"지표 추가 후 데이터 크기: {len(stock_data)}")
print(f"특성 목록: {list(stock_data.columns)}")
트레이딩 환경 설계
Gymnasium 인터페이스를 따르는 커스텀 트레이딩 환경을 구현합니다.
import gymnasium as gym
from gymnasium import spaces
class StockTradingEnv(gym.Env):
"""주식 트레이딩 환경"""
metadata = {"render_modes": ["human"]}
def __init__(self, df, window_size=30, commission=0.001,
initial_balance=100000):
super().__init__()
self.df = df
self.window_size = window_size
self.commission = commission
self.initial_balance = initial_balance
# 특성 열 선택
self.feature_columns = [
'close', 'volume', 'sma_10', 'sma_30', 'rsi',
'bb_position', 'macd', 'macd_signal', 'returns', 'returns_5d'
]
self.n_features = len(self.feature_columns)
# 행동 공간: 0=보유, 1=매수, 2=매도
self.action_space = spaces.Discrete(3)
# 관찰 공간: 가격 윈도우 + 포지션 정보
obs_shape = self.window_size * self.n_features + 3 # +3: 포지션, 수익률, 보유비율
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf, shape=(obs_shape,), dtype=np.float32
)
def _get_observation(self):
"""현재 관찰값 생성"""
# 가격 윈도우 데이터
start = self.current_step - self.window_size
end = self.current_step
window_data = self.df[self.feature_columns].iloc[start:end].values
# 정규화 (각 특성별로 윈도우 내 min-max)
for i in range(self.n_features):
col = window_data[:, i]
min_val = col.min()
max_val = col.max()
if max_val - min_val > 0:
window_data[:, i] = (col - min_val) / (max_val - min_val)
else:
window_data[:, i] = 0.0
flat_window = window_data.flatten()
# 포지션 정보
position_info = np.array([
1.0 if self.position > 0 else 0.0, # 포지션 보유 여부
self.unrealized_pnl / self.initial_balance, # 미실현 수익률
self.shares * self.current_price / self.total_value, # 주식 보유 비율
], dtype=np.float32)
return np.concatenate([flat_window, position_info]).astype(np.float32)
@property
def current_price(self):
return self.df['close'].iloc[self.current_step]
@property
def unrealized_pnl(self):
if self.position > 0:
return self.shares * (self.current_price - self.entry_price)
return 0.0
@property
def total_value(self):
return self.balance + self.shares * self.current_price
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self.current_step = self.window_size
self.balance = self.initial_balance
self.shares = 0
self.position = 0 # 0: 없음, 1: 롱
self.entry_price = 0.0
self.total_trades = 0
self.winning_trades = 0
self.trade_history = []
return self._get_observation(), {}
def step(self, action):
prev_total = self.total_value
reward = 0.0
trade_info = ""
current_price = self.current_price
if action == 1 and self.position == 0: # 매수
# 전체 잔고의 95%로 매수 (수수료 고려)
max_shares = int(self.balance * 0.95 / (current_price * (1 + self.commission)))
if max_shares > 0:
cost = max_shares * current_price * (1 + self.commission)
self.balance -= cost
self.shares = max_shares
self.position = 1
self.entry_price = current_price
trade_info = f"매수 {max_shares}주 @ {current_price:.2f}"
elif action == 2 and self.position == 1: # 매도
proceeds = self.shares * current_price * (1 - self.commission)
self.balance += proceeds
pnl = (current_price - self.entry_price) / self.entry_price
self.total_trades += 1
if pnl > 0:
self.winning_trades += 1
self.trade_history.append(pnl)
trade_info = f"매도 {self.shares}주 @ {current_price:.2f}, 수익률: {pnl:.2%}"
self.shares = 0
self.position = 0
self.entry_price = 0.0
# 다음 스텝으로 이동
self.current_step += 1
# 보상: 포트폴리오 가치 변화율
current_total = self.total_value
reward = (current_total - prev_total) / prev_total
# 종료 조건
terminated = self.current_step >= len(self.df) - 1
truncated = self.total_value < self.initial_balance * 0.5 # 50% 이상 손실
info = {
"total_value": self.total_value,
"balance": self.balance,
"position": self.position,
"total_trades": self.total_trades,
"trade_info": trade_info,
}
return self._get_observation(), reward, terminated, truncated, info
# 환경 테스트
env = StockTradingEnv(stock_data, window_size=30)
obs, info = env.reset()
print(f"관찰 차원: {obs.shape}")
print(f"초기 포트폴리오: {env.total_value:,.0f}원")
무작위 에이전트 기준선
def evaluate_random_agent(env, n_episodes=10):
"""무작위 에이전트 평가"""
results = []
for episode in range(n_episodes):
obs, _ = env.reset()
while True:
action = env.action_space.sample()
obs, reward, terminated, truncated, info = env.step(action)
if terminated or truncated:
break
final_value = info['total_value']
total_return = (final_value - env.initial_balance) / env.initial_balance
results.append({
'final_value': final_value,
'return': total_return,
'trades': info['total_trades'],
})
returns = [r['return'] for r in results]
print(f"=== 무작위 에이전트 ({n_episodes}회) ===")
print(f"평균 수익률: {np.mean(returns):.2%}")
print(f"최대 수익률: {np.max(returns):.2%}")
print(f"최소 수익률: {np.min(returns):.2%}")
print(f"평균 거래 횟수: {np.mean([r['trades'] for r in results]):.1f}")
return results
# random_results = evaluate_random_agent(env)
피드포워드 DQN 모델
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random
class TradingDQN(nn.Module):
"""트레이딩용 피드포워드 DQN"""
def __init__(self, obs_size, n_actions):
super().__init__()
self.net = nn.Sequential(
nn.Linear(obs_size, 256),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(256, 128),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, n_actions),
)
def forward(self, x):
return self.net(x)
CNN 모델: 가격 차트를 이미지처럼 처리
가격 데이터의 시간적 패턴을 1D 합성곱으로 포착합니다.
class TradingCNN(nn.Module):
"""1D CNN 기반 트레이딩 모델"""
def __init__(self, window_size, n_features, n_actions):
super().__init__()
self.conv = nn.Sequential(
nn.Conv1d(n_features, 32, kernel_size=5, padding=2),
nn.ReLU(),
nn.Conv1d(32, 64, kernel_size=3, padding=1),
nn.ReLU(),
nn.AdaptiveAvgPool1d(1), # 시간 차원을 1로 축소
)
# CNN 출력 + 포지션 정보 (3개)
self.fc = nn.Sequential(
nn.Linear(64 + 3, 64),
nn.ReLU(),
nn.Linear(64, n_actions),
)
self.window_size = window_size
self.n_features = n_features
def forward(self, x):
batch_size = x.shape[0]
# 윈도우 데이터와 포지션 정보 분리
window_data = x[:, :-3].view(batch_size, self.window_size, self.n_features)
position_info = x[:, -3:]
# CNN: (batch, features, time) 형태로 변환
window_data = window_data.transpose(1, 2)
conv_out = self.conv(window_data).squeeze(-1)
# 포지션 정보와 결합
combined = torch.cat([conv_out, position_info], dim=1)
return self.fc(combined)
트레이딩 에이전트 학습
class ReplayBuffer:
def __init__(self, capacity):
self.buffer = deque(maxlen=capacity)
def push(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
batch = random.sample(self.buffer, batch_size)
s, a, r, ns, d = zip(*batch)
return (np.array(s), np.array(a), np.array(r, dtype=np.float32),
np.array(ns), np.array(d, dtype=np.bool_))
def __len__(self):
return len(self.buffer)
def train_trading_agent(env, model_type="ff", n_episodes=500):
"""트레이딩 에이전트 학습"""
obs_size = env.observation_space.shape[0]
n_actions = env.action_space.n
device = torch.device("cpu")
if model_type == "cnn":
online_net = TradingCNN(env.window_size, env.n_features, n_actions).to(device)
target_net = TradingCNN(env.window_size, env.n_features, n_actions).to(device)
else:
online_net = TradingDQN(obs_size, n_actions).to(device)
target_net = TradingDQN(obs_size, n_actions).to(device)
target_net.load_state_dict(online_net.state_dict())
optimizer = optim.Adam(online_net.parameters(), lr=1e-4)
buffer = ReplayBuffer(50000)
epsilon = 1.0
epsilon_min = 0.05
epsilon_decay = 0.995
gamma = 0.99
batch_size = 64
target_update = 50
best_return = -float('inf')
returns_history = []
for episode in range(n_episodes):
obs, _ = env.reset()
total_reward = 0
while True:
# 엡실론-탐욕 행동 선택
if random.random() < epsilon:
action = env.action_space.sample()
else:
with torch.no_grad():
q = online_net(torch.tensor([obs], dtype=torch.float32).to(device))
action = q.argmax(dim=1).item()
next_obs, reward, terminated, truncated, info = env.step(action)
done = terminated or truncated
buffer.push(obs, action, reward, next_obs, done)
total_reward += reward
obs = next_obs
# 학습
if len(buffer) >= batch_size:
s, a, r, ns, d = buffer.sample(batch_size)
s_t = torch.tensor(s, dtype=torch.float32).to(device)
a_t = torch.tensor(a, dtype=torch.long).to(device)
r_t = torch.tensor(r, dtype=torch.float32).to(device)
ns_t = torch.tensor(ns, dtype=torch.float32).to(device)
d_t = torch.tensor(d, dtype=torch.bool).to(device)
current_q = online_net(s_t).gather(1, a_t.unsqueeze(1)).squeeze(1)
with torch.no_grad():
# Double DQN
best_a = online_net(ns_t).argmax(dim=1)
next_q = target_net(ns_t).gather(1, best_a.unsqueeze(1)).squeeze(1)
next_q[d_t] = 0.0
target_q = r_t + gamma * next_q
loss = nn.SmoothL1Loss()(current_q, target_q)
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(online_net.parameters(), 1.0)
optimizer.step()
if done:
break
# 타겟 네트워크 업데이트
if episode % target_update == 0:
target_net.load_state_dict(online_net.state_dict())
epsilon = max(epsilon_min, epsilon * epsilon_decay)
# 결과 기록
episode_return = (env.total_value - env.initial_balance) / env.initial_balance
returns_history.append(episode_return)
if episode_return > best_return:
best_return = episode_return
torch.save(online_net.state_dict(), "best_trading_model.pth")
if episode % 50 == 0:
mean_return = np.mean(returns_history[-50:])
print(
f"에피소드 {episode}: "
f"수익률={episode_return:.2%}, "
f"평균 수익률={mean_return:.2%}, "
f"거래={info['total_trades']}, "
f"엡실론={epsilon:.3f}"
)
return online_net, returns_history
# 학습 실행
# trained_model, history = train_trading_agent(env, model_type="ff", n_episodes=500)
에이전트 평가 및 분석
def backtest_agent(env, net, n_episodes=5):
"""학습된 에이전트 백테스트"""
all_results = []
for episode in range(n_episodes):
obs, _ = env.reset()
portfolio_values = [env.total_value]
trade_log = []
while True:
with torch.no_grad():
q = net(torch.tensor([obs], dtype=torch.float32))
action = q.argmax(dim=1).item()
obs, reward, terminated, truncated, info = env.step(action)
portfolio_values.append(env.total_value)
if info.get('trade_info'):
trade_log.append(info['trade_info'])
if terminated or truncated:
break
total_return = (portfolio_values[-1] - portfolio_values[0]) / portfolio_values[0]
# 성과 지표 계산
daily_returns = np.diff(portfolio_values) / portfolio_values[:-1]
sharpe_ratio = np.mean(daily_returns) / (np.std(daily_returns) + 1e-10) * np.sqrt(252)
# 최대 낙폭 (MDD)
peak = np.maximum.accumulate(portfolio_values)
drawdown = (np.array(portfolio_values) - peak) / peak
max_drawdown = drawdown.min()
result = {
'total_return': total_return,
'sharpe_ratio': sharpe_ratio,
'max_drawdown': max_drawdown,
'total_trades': info['total_trades'],
'portfolio_values': portfolio_values,
}
all_results.append(result)
print(f"\n에피소드 {episode + 1}:")
print(f" 총 수익률: {total_return:.2%}")
print(f" 샤프 비율: {sharpe_ratio:.2f}")
print(f" 최대 낙폭: {max_drawdown:.2%}")
print(f" 총 거래 횟수: {info['total_trades']}")
# 전체 평균
print("\n=== 전체 백테스트 결과 ===")
avg_return = np.mean([r['total_return'] for r in all_results])
avg_sharpe = np.mean([r['sharpe_ratio'] for r in all_results])
avg_mdd = np.mean([r['max_drawdown'] for r in all_results])
print(f"평균 수익률: {avg_return:.2%}")
print(f"평균 샤프 비율: {avg_sharpe:.2f}")
print(f"평균 최대 낙폭: {avg_mdd:.2%}")
return all_results
# backtest_results = backtest_agent(env, trained_model)
바이앤홀드 전략과 비교
def compare_with_buy_and_hold(df, agent_results, initial_balance=100000):
"""바이앤홀드 전략과 RL 에이전트 비교"""
# 바이앤홀드: 처음에 매수하고 끝까지 보유
start_price = df['close'].iloc[30] # window_size 이후
end_price = df['close'].iloc[-1]
bnh_return = (end_price - start_price) / start_price
agent_return = np.mean([r['total_return'] for r in agent_results])
print("=== 전략 비교 ===")
print(f"바이앤홀드 수익률: {bnh_return:.2%}")
print(f"RL 에이전트 수익률: {agent_return:.2%}")
print(f"초과 수익률: {agent_return - bnh_return:.2%}")
# compare_with_buy_and_hold(stock_data, backtest_results)
보상 함수 설계의 중요성
보상 함수의 설계는 트레이딩 에이전트의 성능에 결정적입니다.
class ImprovedRewardEnv(StockTradingEnv):
"""개선된 보상 함수를 사용하는 트레이딩 환경"""
def _compute_reward(self, prev_total, action):
"""다양한 보상 설계 방식"""
current_total = self.total_value
# 1. 단순 포트폴리오 변화율
basic_reward = (current_total - prev_total) / prev_total
# 2. 리스크 조정 보상 (샤프 비율 유사)
# 수익률에서 변동성 페널티를 차감
volatility_penalty = 0.0
if len(self.trade_history) > 1:
recent_returns = self.trade_history[-10:]
volatility_penalty = np.std(recent_returns) * 0.1
# 3. 과도한 거래 페널티
trade_penalty = 0.0
if action != 0: # 보유가 아닌 경우
trade_penalty = -0.0001
# 4. 승률 보너스
win_bonus = 0.0
if self.total_trades > 10:
win_rate = self.winning_trades / self.total_trades
if win_rate > 0.5:
win_bonus = 0.0001
return basic_reward - volatility_penalty + trade_penalty + win_bonus
정리
- 문제 정의: 주식 트레이딩을 MDP로 모델링 (상태=시장 데이터, 행동=매수/매도/보유, 보상=수익)
- 데이터 준비: 가격 데이터에 기술적 지표를 추가하여 관찰 공간 구성
- 환경 설계: Gymnasium 인터페이스의 커스텀 트레이딩 환경 구현
- 모델: 피드포워드 DQN과 1D CNN 모델 모두 활용 가능
- 보상 설계: 단순 수익률 외에 리스크 조정, 거래 페널티 등 다양한 요소 고려
- 평가: 수익률, 샤프 비율, 최대 낙폭 등 다양한 성과 지표로 평가
다음 글에서는 가치 기반 방법에서 벗어나 정책을 직접 최적화하는 Policy Gradient 방법을 살펴보겠습니다.