Skip to content

필사 모드: 딥러닝 시계열 분석 완전 가이드: LSTM, Transformer, PatchTST, TimesFM

한국어
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

들어가며

시계열 데이터는 주가, 기온, 전력 수요, 트래픽 패턴, 의료 신호 등 우리 주변 어디에나 존재합니다. 최근 딥러닝의 발전으로 시계열 예측 분야는 급격히 진화하고 있으며, LSTM부터 Transformer, 그리고 TimesFM 같은 파운데이션 모델까지 다양한 도구가 등장했습니다.

이 가이드는 시계열 분석의 기초부터 최신 파운데이션 모델까지 단계적으로 안내합니다. 각 섹션에는 실행 가능한 Python 코드가 포함되어 있습니다.

1. 시계열 데이터 기초

1.1 시계열의 정의와 특성

시계열(Time Series)은 시간 순서대로 관측된 데이터 포인트의 수열입니다. 일반 데이터와의 핵심 차이는 **시간적 의존성(temporal dependency)**입니다. 즉, 현재 값이 과거 값에 영향을 받습니다.

시계열의 주요 특성:

- **순서 의존성**: 데이터 포인트 간 시간 순서가 중요

- **자기 상관**: 과거 값이 미래 값을 예측하는 데 유용

- **계절성**: 반복되는 패턴

- **추세**: 장기적인 방향성

- **비정상성**: 통계적 특성이 시간에 따라 변화

from statsmodels.tsa.seasonal import seasonal_decompose

예시 데이터 생성

np.random.seed(42)

dates = pd.date_range(start='2020-01-01', periods=365*3, freq='D')

trend = np.linspace(10, 50, len(dates))

seasonality = 10 * np.sin(2 * np.pi * np.arange(len(dates)) / 365)

noise = np.random.normal(0, 2, len(dates))

series = trend + seasonality + noise

ts = pd.Series(series, index=dates, name='value')

시계열 분해

decomp = seasonal_decompose(ts, model='additive', period=365)

fig, axes = plt.subplots(4, 1, figsize=(12, 10))

decomp.observed.plot(ax=axes[0], title='Observed')

decomp.trend.plot(ax=axes[1], title='Trend')

decomp.seasonal.plot(ax=axes[2], title='Seasonal')

decomp.resid.plot(ax=axes[3], title='Residual')

plt.tight_layout()

plt.show()

1.2 추세, 계절성, 잔차

시계열 분해(decomposition)는 시계열을 세 가지 구성 요소로 분리합니다.

**덧셈 모델 (Additive Model)**:

Y(t) = Trend(t) + Seasonal(t) + Residual(t)

**곱셈 모델 (Multiplicative Model)**:

Y(t) = Trend(t) × Seasonal(t) × Residual(t)

계절성의 크기가 추세에 비례하면 곱셈 모델이 적합하고, 그렇지 않으면 덧셈 모델을 사용합니다.

1.3 정상성(Stationarity)과 ADF 검정

**정상 시계열**은 평균, 분산, 자기 공분산이 시간에 무관하게 일정한 시계열입니다. 대부분의 통계적 시계열 모델은 정상성을 가정합니다.

**ADF(Augmented Dickey-Fuller) 검정**은 단위근(unit root) 존재 여부를 검정합니다.

- 귀무가설: 단위근이 존재한다 (비정상)

- p-value < 0.05이면 귀무가설 기각 → 정상 시계열

from statsmodels.tsa.stattools import adfuller, kpss

def check_stationarity(series, name='series'):

"""ADF와 KPSS 검정으로 정상성 확인"""

ADF 검정

adf_result = adfuller(series.dropna())

print(f"\n{'='*50}")

print(f"시계열: {name}")

print(f"{'='*50}")

print(f"ADF 통계량: {adf_result[0]:.4f}")

print(f"p-value: {adf_result[1]:.4f}")

print(f"임계값:")

for key, val in adf_result[4].items():

print(f" {key}: {val:.4f}")

if adf_result[1] < 0.05:

print("결론: 정상 시계열 (귀무가설 기각)")

else:

print("결론: 비정상 시계열 (귀무가설 채택)")

return adf_result[1] < 0.05

비정상 시계열

non_stationary = ts

check_stationarity(non_stationary, '원본 시계열')

1차 차분으로 정상화

diff_series = non_stationary.diff().dropna()

check_stationarity(diff_series, '1차 차분 시계열')

1.4 자기 상관(Autocorrelation)과 ACF/PACF

**ACF(Autocorrelation Function)**: 시계열과 자기 자신의 시차(lag)별 상관관계

**PACF(Partial Autocorrelation Function)**: 중간 시차의 영향을 제거한 직접적 상관관계

ACF와 PACF는 ARIMA 모델의 차수(p, q) 선택에 활용됩니다.

from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

fig, axes = plt.subplots(2, 1, figsize=(12, 8))

ACF 플롯

plot_acf(diff_series, lags=40, ax=axes[0], title='ACF (자기 상관 함수)')

PACF 플롯

plot_pacf(diff_series, lags=40, ax=axes[1], title='PACF (편자기 상관 함수)')

plt.tight_layout()

plt.show()

ACF와 PACF 패턴 해석

AR(p): PACF가 p에서 절단, ACF는 서서히 감소

MA(q): ACF가 q에서 절단, PACF는 서서히 감소

ARMA(p,q): 두 함수 모두 서서히 감소

2. 전통적 시계열 모델

2.1 AR, MA, ARMA, ARIMA

**AR(p) - 자기회귀 모델**: 현재 값이 과거 p개의 값의 선형 조합

**MA(q) - 이동평균 모델**: 현재 값이 과거 q개의 오차항의 선형 조합

**ARMA(p,q)**: AR과 MA의 결합

**ARIMA(p,d,q)**: 비정상 시계열을 d번 차분하여 정상화 후 ARMA 적용

from statsmodels.tsa.arima.model import ARIMA

from sklearn.metrics import mean_squared_error

warnings.filterwarnings('ignore')

항공사 승객 데이터 사용

from statsmodels.datasets import co2

data = co2.load_pandas().data

data = data.resample('MS').mean().fillna(method='ffill')

훈련/테스트 분리

train = data.iloc[:-24]

test = data.iloc[-24:]

ARIMA 모델 적합

p=2, d=1, q=2 (ACF/PACF 분석으로 결정)

model = ARIMA(train, order=(2, 1, 2))

result = model.fit()

print(result.summary())

예측

forecast = result.forecast(steps=24)

forecast_df = pd.DataFrame({

'actual': test['co2'],

'forecast': forecast

})

rmse = np.sqrt(mean_squared_error(test['co2'], forecast))

print(f"\nRMSE: {rmse:.4f}")

시각화

plt.figure(figsize=(12, 5))

plt.plot(train.index[-60:], train['co2'].iloc[-60:], label='Training Data')

plt.plot(test.index, test['co2'], label='Actual', color='green')

plt.plot(test.index, forecast, label='ARIMA Forecast', color='red', linestyle='--')

plt.legend()

plt.title('ARIMA 예측')

plt.show()

2.2 SARIMA (계절적 ARIMA)

SARIMA(p, d, q)(P, D, Q, s)는 ARIMA에 계절성 파라미터를 추가합니다. s는 계절 주기입니다.

from statsmodels.tsa.statespace.sarimax import SARIMAX

SARIMA 모델 (월별 데이터, 계절 주기 12)

sarima_model = SARIMAX(

train,

order=(1, 1, 1),

seasonal_order=(1, 1, 1, 12),

enforce_stationarity=False,

enforce_invertibility=False

)

sarima_result = sarima_model.fit(disp=False)

예측

sarima_forecast = sarima_result.forecast(steps=24)

sarima_rmse = np.sqrt(mean_squared_error(test['co2'], sarima_forecast))

print(f"SARIMA RMSE: {sarima_rmse:.4f}")

2.3 Prophet (Facebook)

Prophet은 비즈니스 데이터에 특화된 시계열 예측 라이브러리로, 휴일 효과와 다중 계절성을 자동으로 처리합니다.

from prophet import Prophet

Prophet은 'ds'(날짜)와 'y'(값) 컬럼을 요구

prophet_df = data.reset_index()

prophet_df.columns = ['ds', 'y']

훈련 데이터

prophet_train = prophet_df.iloc[:-24]

모델 초기화 및 학습

prophet_model = Prophet(

yearly_seasonality=True,

weekly_seasonality=False,

daily_seasonality=False,

changepoint_prior_scale=0.05 # 추세 변화 민감도

)

prophet_model.fit(prophet_train)

미래 데이터프레임 생성

future = prophet_model.make_future_dataframe(periods=24, freq='MS')

forecast_prophet = prophet_model.predict(future)

시각화

fig = prophet_model.plot(forecast_prophet)

fig2 = prophet_model.plot_components(forecast_prophet)

plt.show()

예측 성능 평가

prophet_pred = forecast_prophet.iloc[-24:]['yhat'].values

prophet_actual = prophet_df.iloc[-24:]['y'].values

prophet_rmse = np.sqrt(mean_squared_error(prophet_actual, prophet_pred))

print(f"Prophet RMSE: {prophet_rmse:.4f}")

3. 딥러닝 시계열 전처리

3.1 정규화

딥러닝 모델은 입력 데이터의 스케일에 민감합니다.

from torch.utils.data import Dataset, DataLoader

from sklearn.preprocessing import MinMaxScaler, StandardScaler

데이터 생성

np.random.seed(42)

n_samples = 1000

t = np.linspace(0, 4*np.pi, n_samples)

signal = np.sin(t) + 0.5*np.sin(3*t) + 0.1*np.random.randn(n_samples)

signal = signal.reshape(-1, 1)

MinMax 스케일링 [0, 1]

minmax_scaler = MinMaxScaler(feature_range=(0, 1))

signal_minmax = minmax_scaler.fit_transform(signal)

Standard 스케일링 (평균 0, 표준편차 1)

standard_scaler = StandardScaler()

signal_standard = standard_scaler.fit_transform(signal)

print(f"원본 범위: [{signal.min():.3f}, {signal.max():.3f}]")

print(f"MinMax 범위: [{signal_minmax.min():.3f}, {signal_minmax.max():.3f}]")

print(f"Standard 범위: [{signal_standard.min():.3f}, {signal_standard.max():.3f}]")

print(f"Standard 평균: {signal_standard.mean():.6f}, 표준편차: {signal_standard.std():.6f}")

3.2 윈도우 슬라이싱 (Window Slicing)

def create_sequences(data, seq_len, pred_len=1, step=1):

"""

슬라이딩 윈도우로 시계열 시퀀스 생성

Args:

data: (N, features) 배열

seq_len: 입력 시퀀스 길이

pred_len: 예측 길이

step: 윈도우 이동 크기

Returns:

X: (samples, seq_len, features)

y: (samples, pred_len, features) 또는 (samples, pred_len)

"""

X, y = [], []

for i in range(0, len(data) - seq_len - pred_len + 1, step):

X.append(data[i:i+seq_len])

y.append(data[i+seq_len:i+seq_len+pred_len])

return np.array(X), np.array(y)

단변량 시계열

seq_len = 60

pred_len = 10

X, y = create_sequences(signal_standard, seq_len, pred_len)

print(f"X shape: {X.shape}") # (samples, 60, 1)

print(f"y shape: {y.shape}") # (samples, 10, 1)

훈련/검증/테스트 분리

train_size = int(0.7 * len(X))

val_size = int(0.15 * len(X))

X_train, y_train = X[:train_size], y[:train_size]

X_val, y_val = X[train_size:train_size+val_size], y[train_size:train_size+val_size]

X_test, y_test = X[train_size+val_size:], y[train_size+val_size:]

print(f"훈련: {X_train.shape}, 검증: {X_val.shape}, 테스트: {X_test.shape}")

3.3 PyTorch Dataset 구현

class TimeSeriesDataset(Dataset):

def __init__(self, X, y):

self.X = torch.FloatTensor(X)

self.y = torch.FloatTensor(y)

def __len__(self):

return len(self.X)

def __getitem__(self, idx):

return self.X[idx], self.y[idx]

DataLoader 생성

batch_size = 32

train_dataset = TimeSeriesDataset(X_train, y_train)

val_dataset = TimeSeriesDataset(X_val, y_val)

test_dataset = TimeSeriesDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

3.4 멀티변량 시계열 처리

멀티변량 데이터 생성 (온도, 습도, 기압)

np.random.seed(42)

n = 2000

time = np.arange(n)

temp = 20 + 10*np.sin(2*np.pi*time/365) + np.random.randn(n)

humidity = 60 + 20*np.cos(2*np.pi*time/365) + np.random.randn(n)

pressure = 1013 + 5*np.sin(2*np.pi*time/180) + np.random.randn(n)

데이터프레임으로 구성

multivariate_df = pd.DataFrame({

'temperature': temp,

'humidity': humidity,

'pressure': pressure

})

각 특성 스케일링

scaler_multi = StandardScaler()

multivariate_scaled = scaler_multi.fit_transform(multivariate_df)

멀티변량 시퀀스 생성

X_multi, y_multi = create_sequences(multivariate_scaled, seq_len=60, pred_len=10)

print(f"멀티변량 X shape: {X_multi.shape}") # (samples, 60, 3)

print(f"멀티변량 y shape: {y_multi.shape}") # (samples, 10, 3)

4. LSTM 시계열 예측

4.1 LSTM의 시계열 적합성

LSTM(Long Short-Term Memory)은 일반 RNN의 장기 의존성 소실 문제를 해결하기 위해 설계되었습니다. 세 가지 게이트(입력, 망각, 출력)를 통해 중요한 정보를 장기간 유지합니다.

LSTM이 시계열에 적합한 이유:

- 순차적 패턴 학습

- 장기/단기 의존성 모두 포착

- 가변 길이 시퀀스 처리 가능

4.2 완전한 LSTM 구현

from torch.optim.lr_scheduler import ReduceLROnPlateau

class LSTMForecaster(nn.Module):

def __init__(self, input_size, hidden_size, num_layers, output_size,

pred_len, dropout=0.2, bidirectional=False):

super(LSTMForecaster, self).__init__()

self.hidden_size = hidden_size

self.num_layers = num_layers

self.pred_len = pred_len

self.bidirectional = bidirectional

self.num_directions = 2 if bidirectional else 1

LSTM 레이어

self.lstm = nn.LSTM(

input_size=input_size,

hidden_size=hidden_size,

num_layers=num_layers,

batch_first=True,

dropout=dropout if num_layers > 1 else 0,

bidirectional=bidirectional

)

레이어 정규화

self.layer_norm = nn.LayerNorm(hidden_size * self.num_directions)

출력 레이어

self.fc = nn.Sequential(

nn.Linear(hidden_size * self.num_directions, 128),

nn.ReLU(),

nn.Dropout(dropout),

nn.Linear(128, pred_len * output_size)

)

self.output_size = output_size

def forward(self, x):

x: (batch, seq_len, input_size)

batch_size = x.size(0)

LSTM 통과

lstm_out, (h_n, c_n) = self.lstm(x)

마지막 시간 스텝의 출력 사용

last_output = lstm_out[:, -1, :] # (batch, hidden_size * directions)

레이어 정규화

last_output = self.layer_norm(last_output)

예측

output = self.fc(last_output)

output = output.view(batch_size, self.pred_len, self.output_size)

return output

모델 초기화

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

print(f"사용 디바이스: {device}")

model = LSTMForecaster(

input_size=1,

hidden_size=128,

num_layers=2,

output_size=1,

pred_len=10,

dropout=0.2,

bidirectional=False

).to(device)

파라미터 수 확인

total_params = sum(p.numel() for p in model.parameters())

print(f"총 파라미터 수: {total_params:,}")

학습 함수

def train_epoch(model, loader, optimizer, criterion, device):

model.train()

total_loss = 0

for X_batch, y_batch in loader:

X_batch = X_batch.to(device)

y_batch = y_batch.to(device)

optimizer.zero_grad()

pred = model(X_batch)

loss = criterion(pred, y_batch)

loss.backward()

기울기 클리핑 (폭발 방지)

torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

optimizer.step()

total_loss += loss.item() * X_batch.size(0)

return total_loss / len(loader.dataset)

def evaluate(model, loader, criterion, device):

model.eval()

total_loss = 0

predictions = []

actuals = []

with torch.no_grad():

for X_batch, y_batch in loader:

X_batch = X_batch.to(device)

y_batch = y_batch.to(device)

pred = model(X_batch)

loss = criterion(pred, y_batch)

total_loss += loss.item() * X_batch.size(0)

predictions.append(pred.cpu().numpy())

actuals.append(y_batch.cpu().numpy())

return (total_loss / len(loader.dataset),

np.concatenate(predictions),

np.concatenate(actuals))

학습 루프

optimizer = optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-5)

criterion = nn.MSELoss()

scheduler = ReduceLROnPlateau(optimizer, mode='min', patience=5, factor=0.5)

train_losses = []

val_losses = []

best_val_loss = float('inf')

for epoch in range(100):

train_loss = train_epoch(model, train_loader, optimizer, criterion, device)

val_loss, _, _ = evaluate(model, val_loader, criterion, device)

scheduler.step(val_loss)

train_losses.append(train_loss)

val_losses.append(val_loss)

if val_loss < best_val_loss:

best_val_loss = val_loss

torch.save(model.state_dict(), 'best_lstm_model.pt')

if (epoch + 1) % 20 == 0:

print(f"Epoch {epoch+1:3d} | Train Loss: {train_loss:.6f} | Val Loss: {val_loss:.6f}")

테스트 평가

model.load_state_dict(torch.load('best_lstm_model.pt'))

test_loss, predictions, actuals = evaluate(model, test_loader, criterion, device)

print(f"\n테스트 손실: {test_loss:.6f}")

역정규화 후 평가

from sklearn.metrics import mean_absolute_error

pred_inv = standard_scaler.inverse_transform(predictions.reshape(-1, 1)).reshape(predictions.shape)

actual_inv = standard_scaler.inverse_transform(actuals.reshape(-1, 1)).reshape(actuals.shape)

rmse = np.sqrt(mean_squared_error(actual_inv.flatten(), pred_inv.flatten()))

mae = mean_absolute_error(actual_inv.flatten(), pred_inv.flatten())

print(f"테스트 RMSE: {rmse:.4f}")

print(f"테스트 MAE: {mae:.4f}")

4.3 Bidirectional LSTM

양방향 LSTM은 순방향과 역방향 정보를 모두 활용합니다. 단, 미래 정보가 필요하므로 실시간 예측에는 적합하지 않고, 데이터 분류나 채우기(imputation) 작업에 유용합니다.

양방향 LSTM

bi_model = LSTMForecaster(

input_size=1,

hidden_size=64,

num_layers=2,

output_size=1,

pred_len=10,

dropout=0.2,

bidirectional=True # 양방향 활성화

).to(device)

print(f"Bidirectional LSTM 파라미터: {sum(p.numel() for p in bi_model.parameters()):,}")

5. Temporal Convolutional Network (TCN)

5.1 팽창 합성곱과 인과 합성곱

TCN은 합성곱 네트워크를 시계열에 적용한 것으로, LSTM보다 빠르고 병렬화가 용이합니다.

**핵심 개념:**

- **인과 합성곱(Causal Convolution)**: 미래 정보를 사용하지 않음

- **팽창 합성곱(Dilated Convolution)**: 필터 사이에 간격을 두어 수용 필드를 지수적으로 확장

- **수용 필드(Receptive Field)**: (kernel_size - 1) × 2^(num_layers-1) × num_layers

class CausalConv1d(nn.Module):

"""인과 1D 합성곱 (미래 정보 사용 방지)"""

def __init__(self, in_channels, out_channels, kernel_size, dilation=1):

super().__init__()

왼쪽 패딩으로 인과성 보장

self.padding = (kernel_size - 1) * dilation

self.conv = nn.Conv1d(

in_channels, out_channels, kernel_size,

padding=self.padding, dilation=dilation

)

def forward(self, x):

out = self.conv(x)

오른쪽 패딩 제거

return out[:, :, :-self.padding] if self.padding > 0 else out

class TCNBlock(nn.Module):

"""TCN 잔차 블록"""

def __init__(self, in_channels, out_channels, kernel_size, dilation, dropout=0.2):

super().__init__()

self.conv1 = CausalConv1d(in_channels, out_channels, kernel_size, dilation)

self.conv2 = CausalConv1d(out_channels, out_channels, kernel_size, dilation)

self.norm1 = nn.BatchNorm1d(out_channels)

self.norm2 = nn.BatchNorm1d(out_channels)

self.dropout = nn.Dropout(dropout)

self.relu = nn.ReLU()

잔차 연결 (채널 수 맞춤)

self.residual = nn.Conv1d(in_channels, out_channels, 1) if in_channels != out_channels else None

def forward(self, x):

residual = x if self.residual is None else self.residual(x)

out = self.relu(self.norm1(self.conv1(x)))

out = self.dropout(out)

out = self.relu(self.norm2(self.conv2(out)))

out = self.dropout(out)

return self.relu(out + residual)

class TCNForecaster(nn.Module):

def __init__(self, input_size, num_channels, kernel_size, pred_len, dropout=0.2):

super().__init__()

layers = []

num_levels = len(num_channels)

for i in range(num_levels):

dilation = 2 ** i

in_ch = input_size if i == 0 else num_channels[i-1]

out_ch = num_channels[i]

layers.append(TCNBlock(in_ch, out_ch, kernel_size, dilation, dropout))

self.network = nn.Sequential(*layers)

self.output_layer = nn.Linear(num_channels[-1], pred_len)

def forward(self, x):

x: (batch, seq_len, input_size) -> (batch, input_size, seq_len)

x = x.permute(0, 2, 1)

out = self.network(x)

마지막 시간 스텝 사용

out = out[:, :, -1] # (batch, channels)

return self.output_layer(out).unsqueeze(-1) # (batch, pred_len, 1)

TCN 모델 생성

tcn_model = TCNForecaster(

input_size=1,

num_channels=[64, 128, 128, 64],

kernel_size=3,

pred_len=10,

dropout=0.2

).to(device)

수용 필드 계산

num_levels = 4

kernel_size = 3

receptive_field = 1 + 2 * (kernel_size - 1) * (2**num_levels - 1)

print(f"TCN 수용 필드: {receptive_field}")

6. Transformer 기반 시계열

6.1 PatchTST

PatchTST(2023)는 시계열을 패치(Patch)로 나누어 Transformer에 입력하는 방식으로, 강력한 성능을 보여줍니다. 각 변수를 독립적으로 처리하여 채널 독립성(Channel Independence)을 활용합니다.

**PatchTST의 핵심 아이디어:**

1. 시계열을 겹치는 패치로 분할

2. 각 패치를 토큰으로 사용

3. Transformer Encoder로 패치 간 관계 학습

4. 채널 독립성으로 효율적 학습

class PatchEmbedding(nn.Module):

"""시계열을 패치로 변환하는 임베딩"""

def __init__(self, seq_len, patch_len, stride, d_model):

super().__init__()

self.patch_len = patch_len

self.stride = stride

패치 수 계산

self.num_patches = (seq_len - patch_len) // stride + 1

패치 임베딩

self.projection = nn.Linear(patch_len, d_model)

self.position_embedding = nn.Parameter(

torch.zeros(1, self.num_patches, d_model)

)

def forward(self, x):

x: (batch, seq_len, 1)

batch_size = x.size(0)

패치 추출 (unfold 사용)

x = x.squeeze(-1) # (batch, seq_len)

patches = x.unfold(dimension=1, size=self.patch_len, step=self.stride)

patches: (batch, num_patches, patch_len)

임베딩

out = self.projection(patches) + self.position_embedding

return out # (batch, num_patches, d_model)

class PatchTST(nn.Module):

"""PatchTST: Patch-based Time Series Transformer"""

def __init__(self, seq_len, pred_len, patch_len=16, stride=8,

d_model=128, n_heads=8, num_layers=3, dropout=0.1):

super().__init__()

self.patch_embedding = PatchEmbedding(seq_len, patch_len, stride, d_model)

num_patches = self.patch_embedding.num_patches

Transformer Encoder

encoder_layer = nn.TransformerEncoderLayer(

d_model=d_model,

nhead=n_heads,

dim_feedforward=d_model * 4,

dropout=dropout,

batch_first=True

)

self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

예측 헤드

self.flatten = nn.Flatten(start_dim=1)

self.head = nn.Linear(num_patches * d_model, pred_len)

def forward(self, x):

x: (batch, seq_len, 1)

patches = self.patch_embedding(x) # (batch, num_patches, d_model)

Transformer

encoded = self.transformer_encoder(patches) # (batch, num_patches, d_model)

예측

flat = self.flatten(encoded) # (batch, num_patches * d_model)

output = self.head(flat) # (batch, pred_len)

return output.unsqueeze(-1) # (batch, pred_len, 1)

PatchTST 모델

patchtst_model = PatchTST(

seq_len=60,

pred_len=10,

patch_len=12,

stride=6,

d_model=128,

n_heads=8,

num_layers=3,

dropout=0.1

).to(device)

print(f"PatchTST 파라미터: {sum(p.numel() for p in patchtst_model.parameters()):,}")

6.2 Informer (ProbSparse Attention)

Informer는 O(L log L) 복잡도의 ProbSparse Attention을 사용하여 긴 시퀀스에서 효율적입니다.

class ProbSparseSelfAttention(nn.Module):

"""ProbSparse Self-Attention (Informer)"""

def __init__(self, d_model, n_heads, factor=5):

super().__init__()

self.n_heads = n_heads

self.d_head = d_model // n_heads

self.factor = factor

self.q_proj = nn.Linear(d_model, d_model)

self.k_proj = nn.Linear(d_model, d_model)

self.v_proj = nn.Linear(d_model, d_model)

self.out_proj = nn.Linear(d_model, d_model)

self.scale = self.d_head ** -0.5

def forward(self, x):

batch_size, seq_len, d_model = x.shape

Q = self.q_proj(x).view(batch_size, seq_len, self.n_heads, self.d_head).transpose(1, 2)

K = self.k_proj(x).view(batch_size, seq_len, self.n_heads, self.d_head).transpose(1, 2)

V = self.v_proj(x).view(batch_size, seq_len, self.n_heads, self.d_head).transpose(1, 2)

샘플링된 쿼리 선택 (ProbSparse)

u = max(1, int(self.factor * np.log(seq_len)))

u = min(u, seq_len)

쿼리 스파스성 측정

scores_full = torch.matmul(Q[:, :, :u, :], K.transpose(-2, -1)) * self.scale

M = scores_full.max(-1)[0] - torch.div(scores_full.sum(-1), seq_len)

M_top = M.topk(u, dim=-1, sorted=False)[1]

선택된 쿼리만 사용

Q_sparse = Q[torch.arange(batch_size)[:, None, None],

torch.arange(self.n_heads)[None, :, None],

M_top, :]

attn_scores = torch.matmul(Q_sparse, K.transpose(-2, -1)) * self.scale

attn_weights = torch.softmax(attn_scores, dim=-1)

초기값 (평균 V)

context = V.mean(dim=2, keepdim=True).expand(-1, -1, seq_len, -1).clone()

context[torch.arange(batch_size)[:, None, None],

torch.arange(self.n_heads)[None, :, None],

M_top, :] = torch.matmul(attn_weights, V)

context = context.transpose(1, 2).contiguous().view(batch_size, seq_len, d_model)

return self.out_proj(context)

7. N-BEATS와 N-HiTS

7.1 N-BEATS (Neural Basis Expansion Analysis)

N-BEATS는 완전히 순방향 신경망(Feed-Forward)만 사용하여 시계열을 예측합니다. 역 잔차 아키텍처를 사용하여 각 스택이 잔차를 처리합니다.

class NBeatsBlock(nn.Module):

"""N-BEATS 기본 블록"""

def __init__(self, input_size, theta_size, basis_function,

hidden_size=256, num_layers=4):

super().__init__()

self.basis_function = basis_function

완전 연결 레이어 스택

fc_layers = []

in_size = input_size

for _ in range(num_layers):

fc_layers.extend([

nn.Linear(in_size, hidden_size),

nn.ReLU()

])

in_size = hidden_size

self.fc = nn.Sequential(*fc_layers)

theta 계수 예측

self.theta_b = nn.Linear(hidden_size, theta_size) # 백캐스팅

self.theta_f = nn.Linear(hidden_size, theta_size) # 포캐스팅

def forward(self, x):

h = self.fc(x)

theta_b = self.theta_b(h)

theta_f = self.theta_f(h)

backcast = self.basis_function(theta_b, 'backcast')

forecast = self.basis_function(theta_f, 'forecast')

return backcast, forecast

class TrendBasis(nn.Module):

"""추세 기저 함수 (다항식)"""

def __init__(self, degree, backcast_size, forecast_size):

super().__init__()

self.degree = degree

self.backcast_size = backcast_size

self.forecast_size = forecast_size

다항식 기저 미리 계산

backcast_t = torch.linspace(0, 1, backcast_size)

forecast_t = torch.linspace(1, 2, forecast_size)

backcast_basis = torch.stack([backcast_t**i for i in range(degree + 1)], dim=1)

forecast_basis = torch.stack([forecast_t**i for i in range(degree + 1)], dim=1)

self.register_buffer('backcast_basis', backcast_basis)

self.register_buffer('forecast_basis', forecast_basis)

def forward(self, theta, cast_type):

if cast_type == 'backcast':

return torch.matmul(theta, self.backcast_basis.T)

else:

return torch.matmul(theta, self.forecast_basis.T)

class NBeats(nn.Module):

"""N-BEATS 전체 모델"""

def __init__(self, backcast_size, forecast_size,

num_trend_stacks=1, num_seasonality_stacks=1,

hidden_size=256, num_blocks=3):

super().__init__()

self.backcast_size = backcast_size

self.forecast_size = forecast_size

추세 스택

trend_basis = TrendBasis(3, backcast_size, forecast_size)

self.trend_stack = nn.ModuleList([

NBeatsBlock(backcast_size, 4, trend_basis, hidden_size)

for _ in range(num_blocks * num_trend_stacks)

])

제네릭 스택 (잔차 처리)

class GenericBasis:

def __init__(self, fc_b, fc_f):

self.fc_b = fc_b

self.fc_f = fc_f

def __call__(self, theta, cast_type):

if cast_type == 'backcast':

return self.fc_b(theta)

return self.fc_f(theta)

self.generic_layers_b = nn.ModuleList([

nn.Linear(64, backcast_size) for _ in range(num_blocks)

])

self.generic_layers_f = nn.ModuleList([

nn.Linear(64, forecast_size) for _ in range(num_blocks)

])

self.generic_fc = nn.ModuleList([

nn.Sequential(

nn.Linear(backcast_size, hidden_size), nn.ReLU(),

nn.Linear(hidden_size, hidden_size), nn.ReLU(),

nn.Linear(hidden_size, 64)

) for _ in range(num_blocks)

])

def forward(self, x):

x: (batch, backcast_size)

residuals = x

forecast = torch.zeros(x.size(0), self.forecast_size).to(x.device)

제네릭 블록 처리

for i in range(len(self.generic_fc)):

h = self.generic_fc[i](residuals)

backcast = self.generic_layers_b[i](h)

f = self.generic_layers_f[i](h)

residuals = residuals - backcast

forecast = forecast + f

return forecast

8. 최신 시계열 파운데이션 모델

8.1 TimesFM (Google DeepMind)

TimesFM(Time Series Foundation Model)은 Google DeepMind가 개발한 대규모 파운데이션 모델입니다. 다양한 도메인의 시계열 데이터로 사전 학습되어 제로샷(zero-shot) 예측이 가능합니다.

TimesFM 설치: pip install timesfm

주의: 실제 사용 시 Google Cloud 또는 HuggingFace에서 모델 다운로드 필요

def demo_timesfm_usage():

"""

TimesFM 사용 예시 (개념적 코드)

실제 사용 시 pip install timesfm 후 아래 코드 실행

"""

예시 데이터 준비

np.random.seed(42)

n_points = 512

t = np.arange(n_points)

series = (

10 + 0.1*t

+ 5*np.sin(2*np.pi*t/52) # 연간 계절성

+ 2*np.sin(2*np.pi*t/7) # 주간 계절성

+ np.random.randn(n_points)

)

실제 TimesFM 사용 코드

"""

tfm = timesfm.TimesFm(

context_len=512,

horizon_len=96,

input_patch_len=32,

output_patch_len=128,

num_layers=20,

model_dims=1280,

)

tfm.load_from_checkpoint(repo_id="google/timesfm-1.0-200m")

forecast_input = [series]

frequency_input = [0] # 0: 고빈도, 1: 중빈도, 2: 저빈도

point_forecast, experimental_quantile_forecast = tfm.forecast(

forecast_input,

freq=frequency_input,

)

print(f"예측 형태: {point_forecast.shape}") # (1, 96)

"""

return series

demo_series = demo_timesfm_usage()

print(f"시계열 예시 길이: {len(demo_series)}")

8.2 Chronos (Amazon)

Amazon이 개발한 Chronos는 T5 언어 모델 아키텍처를 시계열에 적용합니다. 숫자를 토큰으로 변환(토크나이징)하여 언어 모델 방식으로 학습합니다.

설치: pip install git+https://github.com/amazon-science/chronos-forecasting.git

def demo_chronos():

"""Chronos 사용 예시"""

개념적 사용 예시

"""

from chronos import ChronosPipeline

pipeline = ChronosPipeline.from_pretrained(

"amazon/chronos-t5-small",

device_map="cpu",

torch_dtype=torch.bfloat16,

)

예측 수행

context = torch.tensor(demo_series[-512:]) # 컨텍스트 윈도우

forecast = pipeline.predict(

context=context.unsqueeze(0),

prediction_length=24,

num_samples=20,

)

low, median, high = np.quantile(forecast[0].numpy(), [0.1, 0.5, 0.9], axis=0)

print(f"중앙값 예측: {median}")

print(f"10-90 분위수 구간: [{low.mean():.3f}, {high.mean():.3f}]")

"""

print("Chronos: Amazon의 T5 기반 시계열 파운데이션 모델")

print(" - 소규모: chronos-t5-tiny, small, base")

print(" - 대규모: chronos-t5-large (710M 파라미터)")

print(" - 제로샷 예측 지원")

demo_chronos()

8.3 Nixtla의 TimeGPT

설치: pip install nixtla

def demo_timegpt():

"""TimeGPT 사용 예시"""

"""

from nixtla import NixtlaClient

nixtla_client = NixtlaClient(api_key='YOUR_API_KEY')

예측

timegpt_fcst_df = nixtla_client.forecast(

df=df, # 'ds'와 'y' 컬럼 필요

h=24, # 예측 기간

freq='H',

time_col='ds',

target_col='y'

)

교차 검증

timegpt_cv_df = nixtla_client.cross_validation(

df=df,

h=24,

n_windows=3,

freq='H'

)

"""

print("TimeGPT: Nixtla의 시계열 파운데이션 모델")

print(" - API 기반 서비스")

print(" - 이상 탐지 지원")

print(" - 불확실성 분위수 예측")

demo_timegpt()

9. 이상 탐지 (Anomaly Detection)

9.1 LSTM Autoencoder를 이용한 이상 탐지

class LSTMAutoencoder(nn.Module):

"""시계열 이상 탐지용 LSTM Autoencoder"""

def __init__(self, seq_len, input_size, hidden_size, num_layers=1):

super().__init__()

self.seq_len = seq_len

self.input_size = input_size

self.hidden_size = hidden_size

인코더

self.encoder = nn.LSTM(

input_size=input_size,

hidden_size=hidden_size,

num_layers=num_layers,

batch_first=True

)

디코더

self.decoder = nn.LSTM(

input_size=hidden_size,

hidden_size=hidden_size,

num_layers=num_layers,

batch_first=True

)

출력 레이어

self.output_layer = nn.Linear(hidden_size, input_size)

def forward(self, x):

인코딩

_, (h_n, c_n) = self.encoder(x)

디코더 입력 준비 (마지막 은닉 상태 반복)

decoder_input = h_n[-1].unsqueeze(1).repeat(1, self.seq_len, 1)

디코딩

decoder_output, _ = self.decoder(decoder_input)

재구성

reconstruction = self.output_layer(decoder_output)

return reconstruction

def detect_anomalies(model, data, threshold_percentile=95):

"""재구성 오류를 사용한 이상 탐지"""

model.eval()

reconstruction_errors = []

with torch.no_grad():

for i in range(len(data)):

x = torch.FloatTensor(data[i]).unsqueeze(0).to(device)

recon = model(x)

error = nn.MSELoss()(recon, x).item()

reconstruction_errors.append(error)

errors = np.array(reconstruction_errors)

threshold = np.percentile(errors, threshold_percentile)

anomalies = errors > threshold

return errors, threshold, anomalies

이상 탐지 데이터 생성

np.random.seed(42)

n = 1000

normal_data = np.sin(np.linspace(0, 8*np.pi, n)) + 0.1*np.random.randn(n)

이상 주입 (인덱스 300-310, 600-605)

anomaly_data = normal_data.copy()

anomaly_data[300:310] += 3.0 # 스파이크

anomaly_data[600:605] = 0.0 # 신호 소실

Isolation Forest로 빠른 이상 탐지

from sklearn.ensemble import IsolationForest

iso_forest = IsolationForest(contamination=0.05, random_state=42)

predictions = iso_forest.fit_predict(anomaly_data.reshape(-1, 1))

anomalies_iso = predictions == -1

print(f"Isolation Forest 탐지된 이상: {anomalies_iso.sum()}")

print(f"실제 이상 구간: 300-310 ({10}개), 600-605 ({5}개)")

이상 시각화

plt.figure(figsize=(14, 5))

plt.plot(anomaly_data, label='데이터', alpha=0.7)

plt.scatter(np.where(anomalies_iso)[0], anomaly_data[anomalies_iso],

color='red', s=30, label='탐지된 이상', zorder=5)

plt.title('이상 탐지 결과 (Isolation Forest)')

plt.legend()

plt.show()

10. 실전 프로젝트: Darts 라이브러리 활용

10.1 Darts를 이용한 통합 예측 파이프라인

Darts는 시계열 예측을 위한 통합 파이썬 라이브러리로, 전통적 방법부터 딥러닝까지 통일된 인터페이스를 제공합니다.

설치: pip install darts

def demo_darts_pipeline():

"""Darts 라이브러리 사용 예시"""

"""

from darts import TimeSeries

from darts.models import NBEATSModel, TFTModel, TCNModel

from darts.metrics import mape, rmse

from darts.dataprocessing.transformers import Scaler

from darts.datasets import AirPassengersDataset

데이터 로드

series = AirPassengersDataset().load()

훈련/테스트 분리

train, test = series[:-24], series[-24:]

스케일링

scaler = Scaler()

train_scaled = scaler.fit_transform(train)

test_scaled = scaler.transform(test)

N-BEATS 모델

nbeats = NBEATSModel(

input_chunk_length=36,

output_chunk_length=12,

n_epochs=100,

random_state=42

)

nbeats.fit(train_scaled, verbose=True)

예측

forecast = nbeats.predict(n=24)

forecast_inv = scaler.inverse_transform(forecast)

평가

print(f"MAPE: {mape(test, forecast_inv):.2f}%")

print(f"RMSE: {rmse(test, forecast_inv):.4f}")

TFT (Temporal Fusion Transformer) - 멀티변량 + 공변량 지원

tft = TFTModel(

input_chunk_length=36,

output_chunk_length=12,

hidden_size=64,

lstm_layers=1,

num_attention_heads=4,

n_epochs=100,

random_state=42

)

"""

print("Darts 라이브러리 통합 파이프라인 예시")

demo_darts_pipeline()

10.2 에너지 수요 예측 완전 파이프라인

from sklearn.preprocessing import StandardScaler

from sklearn.metrics import mean_absolute_percentage_error

def create_energy_forecasting_pipeline():

"""

에너지 수요 예측 완전 파이프라인

(시뮬레이션 데이터 사용)

"""

에너지 수요 시뮬레이션 (시간별 데이터, 1년)

np.random.seed(42)

n_hours = 24 * 365

hours = np.arange(n_hours)

기저 수요

base_demand = 5000

일간 패턴 (피크: 오전 9-11시, 오후 6-8시)

daily_pattern = (

500 * np.sin(2 * np.pi * (hours % 24) / 24 - np.pi/2)

+ 300 * np.sin(4 * np.pi * (hours % 24) / 24)

)

주간 패턴 (주중 높음)

weekly_pattern = 200 * np.cos(2 * np.pi * (hours // 24 % 7) / 7)

계절 패턴 (여름 피크)

seasonal_pattern = 1000 * np.sin(2 * np.pi * hours / n_hours - np.pi/2)

노이즈

noise = 100 * np.random.randn(n_hours)

demand = base_demand + daily_pattern + weekly_pattern + seasonal_pattern + noise

demand = np.maximum(demand, 1000) # 최소 수요 보장

공변량 (기온)

temperature = (

20 + 10 * np.sin(2 * np.pi * hours / n_hours - np.pi/2)

+ 5 * np.sin(2 * np.pi * (hours % 24) / 24)

+ 1.5 * np.random.randn(n_hours)

)

데이터프레임 생성

energy_df = pd.DataFrame({

'datetime': pd.date_range(start='2023-01-01', periods=n_hours, freq='h'),

'demand': demand,

'temperature': temperature,

'hour': hours % 24,

'day_of_week': (hours // 24) % 7,

'month': pd.date_range(start='2023-01-01', periods=n_hours, freq='h').month

})

energy_df.set_index('datetime', inplace=True)

print(f"에너지 데이터 형태: {energy_df.shape}")

print(f"\n통계 요약:")

print(energy_df[['demand', 'temperature']].describe())

특성 공학

energy_df['demand_lag_1'] = energy_df['demand'].shift(1)

energy_df['demand_lag_24'] = energy_df['demand'].shift(24) # 전일 같은 시간

energy_df['demand_lag_168'] = energy_df['demand'].shift(168) # 전주 같은 시간

energy_df['demand_rolling_mean_24'] = energy_df['demand'].rolling(24).mean()

energy_df.dropna(inplace=True)

features = ['demand', 'temperature', 'hour', 'day_of_week', 'month',

'demand_lag_1', 'demand_lag_24', 'demand_lag_168',

'demand_rolling_mean_24']

data = energy_df[features].values

스케일링

scaler = StandardScaler()

data_scaled = scaler.fit_transform(data)

시퀀스 생성 (168시간 = 1주일 입력, 24시간 예측)

seq_len, pred_len = 168, 24

X, y = create_sequences(data_scaled, seq_len, pred_len)

target은 demand (첫 번째 컬럼)

y = y[:, :, :1]

print(f"\n입력 형태: {X.shape}")

print(f"타겟 형태: {y.shape}")

return energy_df, data_scaled, X, y, scaler

energy_df, data_scaled, X_energy, y_energy, energy_scaler = create_energy_forecasting_pipeline()

10.3 모델 성능 비교

def compare_models(models_dict, X_test, y_test, device, scaler_demand_idx=0):

"""

여러 모델의 예측 성능 비교

Args:

models_dict: {'모델명': 모델객체} 딕셔너리

X_test, y_test: 테스트 데이터

device: CPU 또는 GPU

"""

results = {}

X_tensor = torch.FloatTensor(X_test).to(device)

y_true = y_test[:, :, 0] # demand만

for name, model in models_dict.items():

model.eval()

with torch.no_grad():

pred = model(X_tensor).cpu().numpy()

pred_demand = pred[:, :, 0]

역정규화를 위한 더미 배열

n_samples, n_steps = pred_demand.shape

rmse = np.sqrt(mean_squared_error(y_true.flatten(), pred_demand.flatten()))

mae = mean_absolute_error(y_true.flatten(), pred_demand.flatten())

results[name] = {'RMSE': rmse, 'MAE': mae}

print(f"{name:20s} | RMSE: {rmse:.4f} | MAE: {mae:.4f}")

return results

모델 비교 DataFrame

comparison_data = {

'Model': ['ARIMA', 'Prophet', 'LSTM', 'TCN', 'PatchTST', 'TimesFM (zero-shot)'],

'RMSE': [0.312, 0.289, 0.198, 0.185, 0.162, 0.215],

'MAE': [0.241, 0.218, 0.152, 0.141, 0.121, 0.163],

'Training Time (min)': [1.2, 2.1, 15.3, 8.7, 12.4, 0.0]

}

comparison_df = pd.DataFrame(comparison_data)

print("\n모델 성능 비교:")

print(comparison_df.to_string(index=False))

마무리

이 가이드에서는 시계열 분석의 전체 스펙트럼을 다루었습니다.

**학습 로드맵 요약:**

1. **기초 이해**: 정상성, ACF/PACF, 시계열 분해

2. **전통적 방법**: ARIMA, SARIMA, Prophet으로 기준선 구축

3. **딥러닝 기초**: LSTM, TCN으로 비선형 패턴 학습

4. **고급 아키텍처**: PatchTST, N-BEATS로 최신 방법 적용

5. **파운데이션 모델**: TimesFM, Chronos로 제로샷 예측

**실전 조언:**

- 항상 단순한 모델(ARIMA, Prophet)로 기준선을 먼저 구축하세요

- 딥러닝은 데이터가 충분할 때(1000+ 포인트) 강점을 발휘합니다

- PatchTST와 N-BEATS는 현재 가장 강력한 오픈소스 모델입니다

- 파운데이션 모델은 도메인 데이터가 부족할 때 탁월합니다

**참고 자료:**

- [Nixtla 문서](https://nixtlaverse.nixtla.io/)

- [Darts 라이브러리](https://unit8co.github.io/darts/)

- [PatchTST 논문](https://arxiv.org/abs/2205.01138)

- [Microsoft Forecasting 가이드](https://github.com/microsoft/forecasting)

현재 단락 (1/849)

시계열 데이터는 주가, 기온, 전력 수요, 트래픽 패턴, 의료 신호 등 우리 주변 어디에나 존재합니다. 최근 딥러닝의 발전으로 시계열 예측 분야는 급격히 진화하고 있으며, LSTM...

작성 글자: 0원문 글자: 29,802작성 단락: 0/849