Skip to content

필사 모드: [심층 강화학습] 08. 강화학습으로 주식 트레이딩 만들기

한국어
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

트레이딩과 강화학습

주식 트레이딩은 강화학습의 자연스러운 적용 분야입니다. 트레이더(에이전트)가 시장(환경)에서 매수/매도/보유(행동)를 결정하고, 수익(보상)을 얻는 구조가 강화학습 프레임워크와 정확히 일치합니다.

주의사항

이 글은 교육 목적의 실습이며, 실제 투자에 직접 적용하는 것은 권장하지 않습니다. 실제 금융 시장은 여기서 다루는 것보다 훨씬 복잡합니다.

트레이딩 기초 개념

기본 용어

- **매수 (Buy/Long)**: 주식을 구매하여 가격 상승에 베팅

- **매도 (Sell/Short)**: 보유 주식을 판매하여 포지션 청산

- **포지션 (Position)**: 현재 보유하고 있는 주식의 상태

- **수익률 (Return)**: 투자 대비 이익의 비율

- **수수료 (Commission)**: 거래 시 발생하는 비용

- **슬리피지 (Slippage)**: 주문 가격과 실제 체결 가격의 차이

강화학습으로 트레이딩을 모델링하기

| RL 요소 | 트레이딩 대응 |

| -------- | ------------------------------------------ |

| 상태 | 과거 가격 데이터, 기술적 지표, 현재 포지션 |

| 행동 | 매수, 매도, 보유 |

| 보상 | 실현 수익, 미실현 수익 변화량 |

| 에피소드 | 일정 기간의 트레이딩 세션 |

데이터 준비

가격 데이터 생성

실습을 위해 합성 데이터를 생성합니다. 실제 적용 시에는 Yahoo Finance 등에서 데이터를 가져올 수 있습니다.

def generate_stock_data(n_days=1000, initial_price=100.0, volatility=0.02, seed=42):

"""합성 주가 데이터 생성 (기하 브라운 운동 모델)"""

np.random.seed(seed)

일별 수익률 생성

daily_returns = np.random.normal(0.0005, volatility, n_days)

가격 계산

prices = initial_price * np.cumprod(1 + daily_returns)

OHLCV 데이터 생성

data = pd.DataFrame()

data['close'] = prices

data['open'] = prices * (1 + np.random.normal(0, 0.005, n_days))

data['high'] = np.maximum(data['open'], data['close']) * (1 + np.abs(np.random.normal(0, 0.01, n_days)))

data['low'] = np.minimum(data['open'], data['close']) * (1 - np.abs(np.random.normal(0, 0.01, n_days)))

data['volume'] = np.random.randint(100000, 1000000, n_days).astype(float)

return data

데이터 생성

stock_data = generate_stock_data(n_days=2000)

print(f"데이터 크기: {len(stock_data)}")

print(stock_data.head())

기술적 지표 계산

def add_technical_indicators(df):

"""기술적 지표 추가"""

이동평균

df['sma_10'] = df['close'].rolling(window=10).mean()

df['sma_30'] = df['close'].rolling(window=30).mean()

상대강도지수 (RSI)

delta = df['close'].diff()

gain = delta.where(delta > 0, 0).rolling(window=14).mean()

loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()

rs = gain / (loss + 1e-10)

df['rsi'] = 100 - (100 / (1 + rs))

볼린저 밴드

bb_mean = df['close'].rolling(window=20).mean()

bb_std = df['close'].rolling(window=20).std()

df['bb_upper'] = bb_mean + 2 * bb_std

df['bb_lower'] = bb_mean - 2 * bb_std

df['bb_position'] = (df['close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower'] + 1e-10)

MACD

ema12 = df['close'].ewm(span=12).mean()

ema26 = df['close'].ewm(span=26).mean()

df['macd'] = ema12 - ema26

df['macd_signal'] = df['macd'].ewm(span=9).mean()

수익률

df['returns'] = df['close'].pct_change()

df['returns_5d'] = df['close'].pct_change(5)

NaN 제거

df.dropna(inplace=True)

df.reset_index(drop=True, inplace=True)

return df

stock_data = add_technical_indicators(stock_data)

print(f"지표 추가 후 데이터 크기: {len(stock_data)}")

print(f"특성 목록: {list(stock_data.columns)}")

트레이딩 환경 설계

Gymnasium 인터페이스를 따르는 커스텀 트레이딩 환경을 구현합니다.

from gymnasium import spaces

class StockTradingEnv(gym.Env):

"""주식 트레이딩 환경"""

metadata = {"render_modes": ["human"]}

def __init__(self, df, window_size=30, commission=0.001,

initial_balance=100000):

super().__init__()

self.df = df

self.window_size = window_size

self.commission = commission

self.initial_balance = initial_balance

특성 열 선택

self.feature_columns = [

'close', 'volume', 'sma_10', 'sma_30', 'rsi',

'bb_position', 'macd', 'macd_signal', 'returns', 'returns_5d'

]

self.n_features = len(self.feature_columns)

행동 공간: 0=보유, 1=매수, 2=매도

self.action_space = spaces.Discrete(3)

관찰 공간: 가격 윈도우 + 포지션 정보

obs_shape = self.window_size * self.n_features + 3 # +3: 포지션, 수익률, 보유비율

self.observation_space = spaces.Box(

low=-np.inf, high=np.inf, shape=(obs_shape,), dtype=np.float32

)

def _get_observation(self):

"""현재 관찰값 생성"""

가격 윈도우 데이터

start = self.current_step - self.window_size

end = self.current_step

window_data = self.df[self.feature_columns].iloc[start:end].values

정규화 (각 특성별로 윈도우 내 min-max)

for i in range(self.n_features):

col = window_data[:, i]

min_val = col.min()

max_val = col.max()

if max_val - min_val > 0:

window_data[:, i] = (col - min_val) / (max_val - min_val)

else:

window_data[:, i] = 0.0

flat_window = window_data.flatten()

포지션 정보

position_info = np.array([

1.0 if self.position > 0 else 0.0, # 포지션 보유 여부

self.unrealized_pnl / self.initial_balance, # 미실현 수익률

self.shares * self.current_price / self.total_value, # 주식 보유 비율

], dtype=np.float32)

return np.concatenate([flat_window, position_info]).astype(np.float32)

@property

def current_price(self):

return self.df['close'].iloc[self.current_step]

@property

def unrealized_pnl(self):

if self.position > 0:

return self.shares * (self.current_price - self.entry_price)

return 0.0

@property

def total_value(self):

return self.balance + self.shares * self.current_price

def reset(self, seed=None, options=None):

super().reset(seed=seed)

self.current_step = self.window_size

self.balance = self.initial_balance

self.shares = 0

self.position = 0 # 0: 없음, 1: 롱

self.entry_price = 0.0

self.total_trades = 0

self.winning_trades = 0

self.trade_history = []

return self._get_observation(), {}

def step(self, action):

prev_total = self.total_value

reward = 0.0

trade_info = ""

current_price = self.current_price

if action == 1 and self.position == 0: # 매수

전체 잔고의 95%로 매수 (수수료 고려)

max_shares = int(self.balance * 0.95 / (current_price * (1 + self.commission)))

if max_shares > 0:

cost = max_shares * current_price * (1 + self.commission)

self.balance -= cost

self.shares = max_shares

self.position = 1

self.entry_price = current_price

trade_info = f"매수 {max_shares}주 @ {current_price:.2f}"

elif action == 2 and self.position == 1: # 매도

proceeds = self.shares * current_price * (1 - self.commission)

self.balance += proceeds

pnl = (current_price - self.entry_price) / self.entry_price

self.total_trades += 1

if pnl > 0:

self.winning_trades += 1

self.trade_history.append(pnl)

trade_info = f"매도 {self.shares}주 @ {current_price:.2f}, 수익률: {pnl:.2%}"

self.shares = 0

self.position = 0

self.entry_price = 0.0

다음 스텝으로 이동

self.current_step += 1

보상: 포트폴리오 가치 변화율

current_total = self.total_value

reward = (current_total - prev_total) / prev_total

종료 조건

terminated = self.current_step >= len(self.df) - 1

truncated = self.total_value < self.initial_balance * 0.5 # 50% 이상 손실

info = {

"total_value": self.total_value,

"balance": self.balance,

"position": self.position,

"total_trades": self.total_trades,

"trade_info": trade_info,

}

return self._get_observation(), reward, terminated, truncated, info

환경 테스트

env = StockTradingEnv(stock_data, window_size=30)

obs, info = env.reset()

print(f"관찰 차원: {obs.shape}")

print(f"초기 포트폴리오: {env.total_value:,.0f}원")

무작위 에이전트 기준선

def evaluate_random_agent(env, n_episodes=10):

"""무작위 에이전트 평가"""

results = []

for episode in range(n_episodes):

obs, _ = env.reset()

while True:

action = env.action_space.sample()

obs, reward, terminated, truncated, info = env.step(action)

if terminated or truncated:

break

final_value = info['total_value']

total_return = (final_value - env.initial_balance) / env.initial_balance

results.append({

'final_value': final_value,

'return': total_return,

'trades': info['total_trades'],

})

returns = [r['return'] for r in results]

print(f"=== 무작위 에이전트 ({n_episodes}회) ===")

print(f"평균 수익률: {np.mean(returns):.2%}")

print(f"최대 수익률: {np.max(returns):.2%}")

print(f"최소 수익률: {np.min(returns):.2%}")

print(f"평균 거래 횟수: {np.mean([r['trades'] for r in results]):.1f}")

return results

random_results = evaluate_random_agent(env)

피드포워드 DQN 모델

from collections import deque

class TradingDQN(nn.Module):

"""트레이딩용 피드포워드 DQN"""

def __init__(self, obs_size, n_actions):

super().__init__()

self.net = nn.Sequential(

nn.Linear(obs_size, 256),

nn.ReLU(),

nn.Dropout(0.2),

nn.Linear(256, 128),

nn.ReLU(),

nn.Dropout(0.2),

nn.Linear(128, 64),

nn.ReLU(),

nn.Linear(64, n_actions),

)

def forward(self, x):

return self.net(x)

CNN 모델: 가격 차트를 이미지처럼 처리

가격 데이터의 시간적 패턴을 1D 합성곱으로 포착합니다.

class TradingCNN(nn.Module):

"""1D CNN 기반 트레이딩 모델"""

def __init__(self, window_size, n_features, n_actions):

super().__init__()

self.conv = nn.Sequential(

nn.Conv1d(n_features, 32, kernel_size=5, padding=2),

nn.ReLU(),

nn.Conv1d(32, 64, kernel_size=3, padding=1),

nn.ReLU(),

nn.AdaptiveAvgPool1d(1), # 시간 차원을 1로 축소

)

CNN 출력 + 포지션 정보 (3개)

self.fc = nn.Sequential(

nn.Linear(64 + 3, 64),

nn.ReLU(),

nn.Linear(64, n_actions),

)

self.window_size = window_size

self.n_features = n_features

def forward(self, x):

batch_size = x.shape[0]

윈도우 데이터와 포지션 정보 분리

window_data = x[:, :-3].view(batch_size, self.window_size, self.n_features)

position_info = x[:, -3:]

CNN: (batch, features, time) 형태로 변환

window_data = window_data.transpose(1, 2)

conv_out = self.conv(window_data).squeeze(-1)

포지션 정보와 결합

combined = torch.cat([conv_out, position_info], dim=1)

return self.fc(combined)

트레이딩 에이전트 학습

class ReplayBuffer:

def __init__(self, capacity):

self.buffer = deque(maxlen=capacity)

def push(self, state, action, reward, next_state, done):

self.buffer.append((state, action, reward, next_state, done))

def sample(self, batch_size):

batch = random.sample(self.buffer, batch_size)

s, a, r, ns, d = zip(*batch)

return (np.array(s), np.array(a), np.array(r, dtype=np.float32),

np.array(ns), np.array(d, dtype=np.bool_))

def __len__(self):

return len(self.buffer)

def train_trading_agent(env, model_type="ff", n_episodes=500):

"""트레이딩 에이전트 학습"""

obs_size = env.observation_space.shape[0]

n_actions = env.action_space.n

device = torch.device("cpu")

if model_type == "cnn":

online_net = TradingCNN(env.window_size, env.n_features, n_actions).to(device)

target_net = TradingCNN(env.window_size, env.n_features, n_actions).to(device)

else:

online_net = TradingDQN(obs_size, n_actions).to(device)

target_net = TradingDQN(obs_size, n_actions).to(device)

target_net.load_state_dict(online_net.state_dict())

optimizer = optim.Adam(online_net.parameters(), lr=1e-4)

buffer = ReplayBuffer(50000)

epsilon = 1.0

epsilon_min = 0.05

epsilon_decay = 0.995

gamma = 0.99

batch_size = 64

target_update = 50

best_return = -float('inf')

returns_history = []

for episode in range(n_episodes):

obs, _ = env.reset()

total_reward = 0

while True:

엡실론-탐욕 행동 선택

if random.random() < epsilon:

action = env.action_space.sample()

else:

with torch.no_grad():

q = online_net(torch.tensor([obs], dtype=torch.float32).to(device))

action = q.argmax(dim=1).item()

next_obs, reward, terminated, truncated, info = env.step(action)

done = terminated or truncated

buffer.push(obs, action, reward, next_obs, done)

total_reward += reward

obs = next_obs

학습

if len(buffer) >= batch_size:

s, a, r, ns, d = buffer.sample(batch_size)

s_t = torch.tensor(s, dtype=torch.float32).to(device)

a_t = torch.tensor(a, dtype=torch.long).to(device)

r_t = torch.tensor(r, dtype=torch.float32).to(device)

ns_t = torch.tensor(ns, dtype=torch.float32).to(device)

d_t = torch.tensor(d, dtype=torch.bool).to(device)

current_q = online_net(s_t).gather(1, a_t.unsqueeze(1)).squeeze(1)

with torch.no_grad():

Double DQN

best_a = online_net(ns_t).argmax(dim=1)

next_q = target_net(ns_t).gather(1, best_a.unsqueeze(1)).squeeze(1)

next_q[d_t] = 0.0

target_q = r_t + gamma * next_q

loss = nn.SmoothL1Loss()(current_q, target_q)

optimizer.zero_grad()

loss.backward()

torch.nn.utils.clip_grad_norm_(online_net.parameters(), 1.0)

optimizer.step()

if done:

break

타겟 네트워크 업데이트

if episode % target_update == 0:

target_net.load_state_dict(online_net.state_dict())

epsilon = max(epsilon_min, epsilon * epsilon_decay)

결과 기록

episode_return = (env.total_value - env.initial_balance) / env.initial_balance

returns_history.append(episode_return)

if episode_return > best_return:

best_return = episode_return

torch.save(online_net.state_dict(), "best_trading_model.pth")

if episode % 50 == 0:

mean_return = np.mean(returns_history[-50:])

print(

f"에피소드 {episode}: "

f"수익률={episode_return:.2%}, "

f"평균 수익률={mean_return:.2%}, "

f"거래={info['total_trades']}, "

f"엡실론={epsilon:.3f}"

)

return online_net, returns_history

학습 실행

trained_model, history = train_trading_agent(env, model_type="ff", n_episodes=500)

에이전트 평가 및 분석

def backtest_agent(env, net, n_episodes=5):

"""학습된 에이전트 백테스트"""

all_results = []

for episode in range(n_episodes):

obs, _ = env.reset()

portfolio_values = [env.total_value]

trade_log = []

while True:

with torch.no_grad():

q = net(torch.tensor([obs], dtype=torch.float32))

action = q.argmax(dim=1).item()

obs, reward, terminated, truncated, info = env.step(action)

portfolio_values.append(env.total_value)

if info.get('trade_info'):

trade_log.append(info['trade_info'])

if terminated or truncated:

break

total_return = (portfolio_values[-1] - portfolio_values[0]) / portfolio_values[0]

성과 지표 계산

daily_returns = np.diff(portfolio_values) / portfolio_values[:-1]

sharpe_ratio = np.mean(daily_returns) / (np.std(daily_returns) + 1e-10) * np.sqrt(252)

최대 낙폭 (MDD)

peak = np.maximum.accumulate(portfolio_values)

drawdown = (np.array(portfolio_values) - peak) / peak

max_drawdown = drawdown.min()

result = {

'total_return': total_return,

'sharpe_ratio': sharpe_ratio,

'max_drawdown': max_drawdown,

'total_trades': info['total_trades'],

'portfolio_values': portfolio_values,

}

all_results.append(result)

print(f"\n에피소드 {episode + 1}:")

print(f" 총 수익률: {total_return:.2%}")

print(f" 샤프 비율: {sharpe_ratio:.2f}")

print(f" 최대 낙폭: {max_drawdown:.2%}")

print(f" 총 거래 횟수: {info['total_trades']}")

전체 평균

print("\n=== 전체 백테스트 결과 ===")

avg_return = np.mean([r['total_return'] for r in all_results])

avg_sharpe = np.mean([r['sharpe_ratio'] for r in all_results])

avg_mdd = np.mean([r['max_drawdown'] for r in all_results])

print(f"평균 수익률: {avg_return:.2%}")

print(f"평균 샤프 비율: {avg_sharpe:.2f}")

print(f"평균 최대 낙폭: {avg_mdd:.2%}")

return all_results

backtest_results = backtest_agent(env, trained_model)

바이앤홀드 전략과 비교

def compare_with_buy_and_hold(df, agent_results, initial_balance=100000):

"""바이앤홀드 전략과 RL 에이전트 비교"""

바이앤홀드: 처음에 매수하고 끝까지 보유

start_price = df['close'].iloc[30] # window_size 이후

end_price = df['close'].iloc[-1]

bnh_return = (end_price - start_price) / start_price

agent_return = np.mean([r['total_return'] for r in agent_results])

print("=== 전략 비교 ===")

print(f"바이앤홀드 수익률: {bnh_return:.2%}")

print(f"RL 에이전트 수익률: {agent_return:.2%}")

print(f"초과 수익률: {agent_return - bnh_return:.2%}")

compare_with_buy_and_hold(stock_data, backtest_results)

보상 함수 설계의 중요성

보상 함수의 설계는 트레이딩 에이전트의 성능에 결정적입니다.

class ImprovedRewardEnv(StockTradingEnv):

"""개선된 보상 함수를 사용하는 트레이딩 환경"""

def _compute_reward(self, prev_total, action):

"""다양한 보상 설계 방식"""

current_total = self.total_value

1. 단순 포트폴리오 변화율

basic_reward = (current_total - prev_total) / prev_total

2. 리스크 조정 보상 (샤프 비율 유사)

수익률에서 변동성 페널티를 차감

volatility_penalty = 0.0

if len(self.trade_history) > 1:

recent_returns = self.trade_history[-10:]

volatility_penalty = np.std(recent_returns) * 0.1

3. 과도한 거래 페널티

trade_penalty = 0.0

if action != 0: # 보유가 아닌 경우

trade_penalty = -0.0001

4. 승률 보너스

win_bonus = 0.0

if self.total_trades > 10:

win_rate = self.winning_trades / self.total_trades

if win_rate > 0.5:

win_bonus = 0.0001

return basic_reward - volatility_penalty + trade_penalty + win_bonus

정리

1. **문제 정의**: 주식 트레이딩을 MDP로 모델링 (상태=시장 데이터, 행동=매수/매도/보유, 보상=수익)

2. **데이터 준비**: 가격 데이터에 기술적 지표를 추가하여 관찰 공간 구성

3. **환경 설계**: Gymnasium 인터페이스의 커스텀 트레이딩 환경 구현

4. **모델**: 피드포워드 DQN과 1D CNN 모델 모두 활용 가능

5. **보상 설계**: 단순 수익률 외에 리스크 조정, 거래 페널티 등 다양한 요소 고려

6. **평가**: 수익률, 샤프 비율, 최대 낙폭 등 다양한 성과 지표로 평가

다음 글에서는 가치 기반 방법에서 벗어나 정책을 직접 최적화하는 Policy Gradient 방법을 살펴보겠습니다.

현재 단락 (1/400)

주식 트레이딩은 강화학습의 자연스러운 적용 분야입니다. 트레이더(에이전트)가 시장(환경)에서 매수/매도/보유(행동)를 결정하고, 수익(보상)을 얻는 구조가 강화학습 프레임워크와 정...

작성 글자: 0원문 글자: 13,572작성 단락: 0/400