Skip to content

Split View: 딥러닝 시계열 분석 완전 가이드: LSTM, Transformer, PatchTST, TimesFM

|

딥러닝 시계열 분석 완전 가이드: LSTM, Transformer, PatchTST, TimesFM

들어가며

시계열 데이터는 주가, 기온, 전력 수요, 트래픽 패턴, 의료 신호 등 우리 주변 어디에나 존재합니다. 최근 딥러닝의 발전으로 시계열 예측 분야는 급격히 진화하고 있으며, LSTM부터 Transformer, 그리고 TimesFM 같은 파운데이션 모델까지 다양한 도구가 등장했습니다.

이 가이드는 시계열 분석의 기초부터 최신 파운데이션 모델까지 단계적으로 안내합니다. 각 섹션에는 실행 가능한 Python 코드가 포함되어 있습니다.


1. 시계열 데이터 기초

1.1 시계열의 정의와 특성

시계열(Time Series)은 시간 순서대로 관측된 데이터 포인트의 수열입니다. 일반 데이터와의 핵심 차이는 **시간적 의존성(temporal dependency)**입니다. 즉, 현재 값이 과거 값에 영향을 받습니다.

시계열의 주요 특성:

  • 순서 의존성: 데이터 포인트 간 시간 순서가 중요
  • 자기 상관: 과거 값이 미래 값을 예측하는 데 유용
  • 계절성: 반복되는 패턴
  • 추세: 장기적인 방향성
  • 비정상성: 통계적 특성이 시간에 따라 변화
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.seasonal import seasonal_decompose

# 예시 데이터 생성
np.random.seed(42)
dates = pd.date_range(start='2020-01-01', periods=365*3, freq='D')
trend = np.linspace(10, 50, len(dates))
seasonality = 10 * np.sin(2 * np.pi * np.arange(len(dates)) / 365)
noise = np.random.normal(0, 2, len(dates))
series = trend + seasonality + noise

ts = pd.Series(series, index=dates, name='value')

# 시계열 분해
decomp = seasonal_decompose(ts, model='additive', period=365)

fig, axes = plt.subplots(4, 1, figsize=(12, 10))
decomp.observed.plot(ax=axes[0], title='Observed')
decomp.trend.plot(ax=axes[1], title='Trend')
decomp.seasonal.plot(ax=axes[2], title='Seasonal')
decomp.resid.plot(ax=axes[3], title='Residual')
plt.tight_layout()
plt.show()

1.2 추세, 계절성, 잔차

시계열 분해(decomposition)는 시계열을 세 가지 구성 요소로 분리합니다.

덧셈 모델 (Additive Model): Y(t) = Trend(t) + Seasonal(t) + Residual(t)

곱셈 모델 (Multiplicative Model): Y(t) = Trend(t) × Seasonal(t) × Residual(t)

계절성의 크기가 추세에 비례하면 곱셈 모델이 적합하고, 그렇지 않으면 덧셈 모델을 사용합니다.

1.3 정상성(Stationarity)과 ADF 검정

정상 시계열은 평균, 분산, 자기 공분산이 시간에 무관하게 일정한 시계열입니다. 대부분의 통계적 시계열 모델은 정상성을 가정합니다.

ADF(Augmented Dickey-Fuller) 검정은 단위근(unit root) 존재 여부를 검정합니다.

  • 귀무가설: 단위근이 존재한다 (비정상)
  • p-value < 0.05이면 귀무가설 기각 → 정상 시계열
from statsmodels.tsa.stattools import adfuller, kpss

def check_stationarity(series, name='series'):
    """ADF와 KPSS 검정으로 정상성 확인"""
    # ADF 검정
    adf_result = adfuller(series.dropna())
    print(f"\n{'='*50}")
    print(f"시계열: {name}")
    print(f"{'='*50}")
    print(f"ADF 통계량: {adf_result[0]:.4f}")
    print(f"p-value: {adf_result[1]:.4f}")
    print(f"임계값:")
    for key, val in adf_result[4].items():
        print(f"  {key}: {val:.4f}")

    if adf_result[1] < 0.05:
        print("결론: 정상 시계열 (귀무가설 기각)")
    else:
        print("결론: 비정상 시계열 (귀무가설 채택)")

    return adf_result[1] < 0.05

# 비정상 시계열
non_stationary = ts
check_stationarity(non_stationary, '원본 시계열')

# 1차 차분으로 정상화
diff_series = non_stationary.diff().dropna()
check_stationarity(diff_series, '1차 차분 시계열')

1.4 자기 상관(Autocorrelation)과 ACF/PACF

ACF(Autocorrelation Function): 시계열과 자기 자신의 시차(lag)별 상관관계 PACF(Partial Autocorrelation Function): 중간 시차의 영향을 제거한 직접적 상관관계

ACF와 PACF는 ARIMA 모델의 차수(p, q) 선택에 활용됩니다.

from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

fig, axes = plt.subplots(2, 1, figsize=(12, 8))

# ACF 플롯
plot_acf(diff_series, lags=40, ax=axes[0], title='ACF (자기 상관 함수)')

# PACF 플롯
plot_pacf(diff_series, lags=40, ax=axes[1], title='PACF (편자기 상관 함수)')

plt.tight_layout()
plt.show()

# ACF와 PACF 패턴 해석
# AR(p): PACF가 p에서 절단, ACF는 서서히 감소
# MA(q): ACF가 q에서 절단, PACF는 서서히 감소
# ARMA(p,q): 두 함수 모두 서서히 감소

2. 전통적 시계열 모델

2.1 AR, MA, ARMA, ARIMA

AR(p) - 자기회귀 모델: 현재 값이 과거 p개의 값의 선형 조합

MA(q) - 이동평균 모델: 현재 값이 과거 q개의 오차항의 선형 조합

ARMA(p,q): AR과 MA의 결합

ARIMA(p,d,q): 비정상 시계열을 d번 차분하여 정상화 후 ARMA 적용

from statsmodels.tsa.arima.model import ARIMA
from sklearn.metrics import mean_squared_error
import warnings
warnings.filterwarnings('ignore')

# 항공사 승객 데이터 사용
from statsmodels.datasets import co2
data = co2.load_pandas().data
data = data.resample('MS').mean().fillna(method='ffill')

# 훈련/테스트 분리
train = data.iloc[:-24]
test = data.iloc[-24:]

# ARIMA 모델 적합
# p=2, d=1, q=2 (ACF/PACF 분석으로 결정)
model = ARIMA(train, order=(2, 1, 2))
result = model.fit()
print(result.summary())

# 예측
forecast = result.forecast(steps=24)
forecast_df = pd.DataFrame({
    'actual': test['co2'],
    'forecast': forecast
})

rmse = np.sqrt(mean_squared_error(test['co2'], forecast))
print(f"\nRMSE: {rmse:.4f}")

# 시각화
plt.figure(figsize=(12, 5))
plt.plot(train.index[-60:], train['co2'].iloc[-60:], label='Training Data')
plt.plot(test.index, test['co2'], label='Actual', color='green')
plt.plot(test.index, forecast, label='ARIMA Forecast', color='red', linestyle='--')
plt.legend()
plt.title('ARIMA 예측')
plt.show()

2.2 SARIMA (계절적 ARIMA)

SARIMA(p, d, q)(P, D, Q, s)는 ARIMA에 계절성 파라미터를 추가합니다. s는 계절 주기입니다.

from statsmodels.tsa.statespace.sarimax import SARIMAX

# SARIMA 모델 (월별 데이터, 계절 주기 12)
sarima_model = SARIMAX(
    train,
    order=(1, 1, 1),
    seasonal_order=(1, 1, 1, 12),
    enforce_stationarity=False,
    enforce_invertibility=False
)
sarima_result = sarima_model.fit(disp=False)

# 예측
sarima_forecast = sarima_result.forecast(steps=24)
sarima_rmse = np.sqrt(mean_squared_error(test['co2'], sarima_forecast))
print(f"SARIMA RMSE: {sarima_rmse:.4f}")

2.3 Prophet (Facebook)

Prophet은 비즈니스 데이터에 특화된 시계열 예측 라이브러리로, 휴일 효과와 다중 계절성을 자동으로 처리합니다.

from prophet import Prophet

# Prophet은 'ds'(날짜)와 'y'(값) 컬럼을 요구
prophet_df = data.reset_index()
prophet_df.columns = ['ds', 'y']

# 훈련 데이터
prophet_train = prophet_df.iloc[:-24]

# 모델 초기화 및 학습
prophet_model = Prophet(
    yearly_seasonality=True,
    weekly_seasonality=False,
    daily_seasonality=False,
    changepoint_prior_scale=0.05  # 추세 변화 민감도
)
prophet_model.fit(prophet_train)

# 미래 데이터프레임 생성
future = prophet_model.make_future_dataframe(periods=24, freq='MS')
forecast_prophet = prophet_model.predict(future)

# 시각화
fig = prophet_model.plot(forecast_prophet)
fig2 = prophet_model.plot_components(forecast_prophet)
plt.show()

# 예측 성능 평가
prophet_pred = forecast_prophet.iloc[-24:]['yhat'].values
prophet_actual = prophet_df.iloc[-24:]['y'].values
prophet_rmse = np.sqrt(mean_squared_error(prophet_actual, prophet_pred))
print(f"Prophet RMSE: {prophet_rmse:.4f}")

3. 딥러닝 시계열 전처리

3.1 정규화

딥러닝 모델은 입력 데이터의 스케일에 민감합니다.

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import MinMaxScaler, StandardScaler
import numpy as np

# 데이터 생성
np.random.seed(42)
n_samples = 1000
t = np.linspace(0, 4*np.pi, n_samples)
signal = np.sin(t) + 0.5*np.sin(3*t) + 0.1*np.random.randn(n_samples)
signal = signal.reshape(-1, 1)

# MinMax 스케일링 [0, 1]
minmax_scaler = MinMaxScaler(feature_range=(0, 1))
signal_minmax = minmax_scaler.fit_transform(signal)

# Standard 스케일링 (평균 0, 표준편차 1)
standard_scaler = StandardScaler()
signal_standard = standard_scaler.fit_transform(signal)

print(f"원본 범위: [{signal.min():.3f}, {signal.max():.3f}]")
print(f"MinMax 범위: [{signal_minmax.min():.3f}, {signal_minmax.max():.3f}]")
print(f"Standard 범위: [{signal_standard.min():.3f}, {signal_standard.max():.3f}]")
print(f"Standard 평균: {signal_standard.mean():.6f}, 표준편차: {signal_standard.std():.6f}")

3.2 윈도우 슬라이싱 (Window Slicing)

def create_sequences(data, seq_len, pred_len=1, step=1):
    """
    슬라이딩 윈도우로 시계열 시퀀스 생성

    Args:
        data: (N, features) 배열
        seq_len: 입력 시퀀스 길이
        pred_len: 예측 길이
        step: 윈도우 이동 크기

    Returns:
        X: (samples, seq_len, features)
        y: (samples, pred_len, features) 또는 (samples, pred_len)
    """
    X, y = [], []
    for i in range(0, len(data) - seq_len - pred_len + 1, step):
        X.append(data[i:i+seq_len])
        y.append(data[i+seq_len:i+seq_len+pred_len])
    return np.array(X), np.array(y)

# 단변량 시계열
seq_len = 60
pred_len = 10
X, y = create_sequences(signal_standard, seq_len, pred_len)
print(f"X shape: {X.shape}")  # (samples, 60, 1)
print(f"y shape: {y.shape}")  # (samples, 10, 1)

# 훈련/검증/테스트 분리
train_size = int(0.7 * len(X))
val_size = int(0.15 * len(X))

X_train, y_train = X[:train_size], y[:train_size]
X_val, y_val = X[train_size:train_size+val_size], y[train_size:train_size+val_size]
X_test, y_test = X[train_size+val_size:], y[train_size+val_size:]

print(f"훈련: {X_train.shape}, 검증: {X_val.shape}, 테스트: {X_test.shape}")

3.3 PyTorch Dataset 구현

class TimeSeriesDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.FloatTensor(X)
        self.y = torch.FloatTensor(y)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# DataLoader 생성
batch_size = 32
train_dataset = TimeSeriesDataset(X_train, y_train)
val_dataset = TimeSeriesDataset(X_val, y_val)
test_dataset = TimeSeriesDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

3.4 멀티변량 시계열 처리

# 멀티변량 데이터 생성 (온도, 습도, 기압)
np.random.seed(42)
n = 2000
time = np.arange(n)

temp = 20 + 10*np.sin(2*np.pi*time/365) + np.random.randn(n)
humidity = 60 + 20*np.cos(2*np.pi*time/365) + np.random.randn(n)
pressure = 1013 + 5*np.sin(2*np.pi*time/180) + np.random.randn(n)

# 데이터프레임으로 구성
multivariate_df = pd.DataFrame({
    'temperature': temp,
    'humidity': humidity,
    'pressure': pressure
})

# 각 특성 스케일링
scaler_multi = StandardScaler()
multivariate_scaled = scaler_multi.fit_transform(multivariate_df)

# 멀티변량 시퀀스 생성
X_multi, y_multi = create_sequences(multivariate_scaled, seq_len=60, pred_len=10)
print(f"멀티변량 X shape: {X_multi.shape}")  # (samples, 60, 3)
print(f"멀티변량 y shape: {y_multi.shape}")  # (samples, 10, 3)

4. LSTM 시계열 예측

4.1 LSTM의 시계열 적합성

LSTM(Long Short-Term Memory)은 일반 RNN의 장기 의존성 소실 문제를 해결하기 위해 설계되었습니다. 세 가지 게이트(입력, 망각, 출력)를 통해 중요한 정보를 장기간 유지합니다.

LSTM이 시계열에 적합한 이유:

  • 순차적 패턴 학습
  • 장기/단기 의존성 모두 포착
  • 가변 길이 시퀀스 처리 가능

4.2 완전한 LSTM 구현

import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau

class LSTMForecaster(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size,
                 pred_len, dropout=0.2, bidirectional=False):
        super(LSTMForecaster, self).__init__()

        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.pred_len = pred_len
        self.bidirectional = bidirectional
        self.num_directions = 2 if bidirectional else 1

        # LSTM 레이어
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0,
            bidirectional=bidirectional
        )

        # 레이어 정규화
        self.layer_norm = nn.LayerNorm(hidden_size * self.num_directions)

        # 출력 레이어
        self.fc = nn.Sequential(
            nn.Linear(hidden_size * self.num_directions, 128),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(128, pred_len * output_size)
        )

        self.output_size = output_size

    def forward(self, x):
        # x: (batch, seq_len, input_size)
        batch_size = x.size(0)

        # LSTM 통과
        lstm_out, (h_n, c_n) = self.lstm(x)

        # 마지막 시간 스텝의 출력 사용
        last_output = lstm_out[:, -1, :]  # (batch, hidden_size * directions)

        # 레이어 정규화
        last_output = self.layer_norm(last_output)

        # 예측
        output = self.fc(last_output)
        output = output.view(batch_size, self.pred_len, self.output_size)

        return output

# 모델 초기화
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"사용 디바이스: {device}")

model = LSTMForecaster(
    input_size=1,
    hidden_size=128,
    num_layers=2,
    output_size=1,
    pred_len=10,
    dropout=0.2,
    bidirectional=False
).to(device)

# 파라미터 수 확인
total_params = sum(p.numel() for p in model.parameters())
print(f"총 파라미터 수: {total_params:,}")

# 학습 함수
def train_epoch(model, loader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    for X_batch, y_batch in loader:
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)

        optimizer.zero_grad()
        pred = model(X_batch)
        loss = criterion(pred, y_batch)
        loss.backward()

        # 기울기 클리핑 (폭발 방지)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

        optimizer.step()
        total_loss += loss.item() * X_batch.size(0)

    return total_loss / len(loader.dataset)

def evaluate(model, loader, criterion, device):
    model.eval()
    total_loss = 0
    predictions = []
    actuals = []

    with torch.no_grad():
        for X_batch, y_batch in loader:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)

            pred = model(X_batch)
            loss = criterion(pred, y_batch)
            total_loss += loss.item() * X_batch.size(0)

            predictions.append(pred.cpu().numpy())
            actuals.append(y_batch.cpu().numpy())

    return (total_loss / len(loader.dataset),
            np.concatenate(predictions),
            np.concatenate(actuals))

# 학습 루프
optimizer = optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-5)
criterion = nn.MSELoss()
scheduler = ReduceLROnPlateau(optimizer, mode='min', patience=5, factor=0.5)

train_losses = []
val_losses = []
best_val_loss = float('inf')

for epoch in range(100):
    train_loss = train_epoch(model, train_loader, optimizer, criterion, device)
    val_loss, _, _ = evaluate(model, val_loader, criterion, device)

    scheduler.step(val_loss)
    train_losses.append(train_loss)
    val_losses.append(val_loss)

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), 'best_lstm_model.pt')

    if (epoch + 1) % 20 == 0:
        print(f"Epoch {epoch+1:3d} | Train Loss: {train_loss:.6f} | Val Loss: {val_loss:.6f}")

# 테스트 평가
model.load_state_dict(torch.load('best_lstm_model.pt'))
test_loss, predictions, actuals = evaluate(model, test_loader, criterion, device)
print(f"\n테스트 손실: {test_loss:.6f}")

# 역정규화 후 평가
from sklearn.metrics import mean_absolute_error

pred_inv = standard_scaler.inverse_transform(predictions.reshape(-1, 1)).reshape(predictions.shape)
actual_inv = standard_scaler.inverse_transform(actuals.reshape(-1, 1)).reshape(actuals.shape)

rmse = np.sqrt(mean_squared_error(actual_inv.flatten(), pred_inv.flatten()))
mae = mean_absolute_error(actual_inv.flatten(), pred_inv.flatten())
print(f"테스트 RMSE: {rmse:.4f}")
print(f"테스트 MAE: {mae:.4f}")

4.3 Bidirectional LSTM

양방향 LSTM은 순방향과 역방향 정보를 모두 활용합니다. 단, 미래 정보가 필요하므로 실시간 예측에는 적합하지 않고, 데이터 분류나 채우기(imputation) 작업에 유용합니다.

# 양방향 LSTM
bi_model = LSTMForecaster(
    input_size=1,
    hidden_size=64,
    num_layers=2,
    output_size=1,
    pred_len=10,
    dropout=0.2,
    bidirectional=True  # 양방향 활성화
).to(device)

print(f"Bidirectional LSTM 파라미터: {sum(p.numel() for p in bi_model.parameters()):,}")

5. Temporal Convolutional Network (TCN)

5.1 팽창 합성곱과 인과 합성곱

TCN은 합성곱 네트워크를 시계열에 적용한 것으로, LSTM보다 빠르고 병렬화가 용이합니다.

핵심 개념:

  • 인과 합성곱(Causal Convolution): 미래 정보를 사용하지 않음
  • 팽창 합성곱(Dilated Convolution): 필터 사이에 간격을 두어 수용 필드를 지수적으로 확장
  • 수용 필드(Receptive Field): (kernel_size - 1) × 2^(num_layers-1) × num_layers
class CausalConv1d(nn.Module):
    """인과 1D 합성곱 (미래 정보 사용 방지)"""
    def __init__(self, in_channels, out_channels, kernel_size, dilation=1):
        super().__init__()
        # 왼쪽 패딩으로 인과성 보장
        self.padding = (kernel_size - 1) * dilation
        self.conv = nn.Conv1d(
            in_channels, out_channels, kernel_size,
            padding=self.padding, dilation=dilation
        )

    def forward(self, x):
        out = self.conv(x)
        # 오른쪽 패딩 제거
        return out[:, :, :-self.padding] if self.padding > 0 else out


class TCNBlock(nn.Module):
    """TCN 잔차 블록"""
    def __init__(self, in_channels, out_channels, kernel_size, dilation, dropout=0.2):
        super().__init__()

        self.conv1 = CausalConv1d(in_channels, out_channels, kernel_size, dilation)
        self.conv2 = CausalConv1d(out_channels, out_channels, kernel_size, dilation)

        self.norm1 = nn.BatchNorm1d(out_channels)
        self.norm2 = nn.BatchNorm1d(out_channels)
        self.dropout = nn.Dropout(dropout)
        self.relu = nn.ReLU()

        # 잔차 연결 (채널 수 맞춤)
        self.residual = nn.Conv1d(in_channels, out_channels, 1) if in_channels != out_channels else None

    def forward(self, x):
        residual = x if self.residual is None else self.residual(x)

        out = self.relu(self.norm1(self.conv1(x)))
        out = self.dropout(out)
        out = self.relu(self.norm2(self.conv2(out)))
        out = self.dropout(out)

        return self.relu(out + residual)


class TCNForecaster(nn.Module):
    def __init__(self, input_size, num_channels, kernel_size, pred_len, dropout=0.2):
        super().__init__()

        layers = []
        num_levels = len(num_channels)

        for i in range(num_levels):
            dilation = 2 ** i
            in_ch = input_size if i == 0 else num_channels[i-1]
            out_ch = num_channels[i]
            layers.append(TCNBlock(in_ch, out_ch, kernel_size, dilation, dropout))

        self.network = nn.Sequential(*layers)
        self.output_layer = nn.Linear(num_channels[-1], pred_len)

    def forward(self, x):
        # x: (batch, seq_len, input_size) -> (batch, input_size, seq_len)
        x = x.permute(0, 2, 1)
        out = self.network(x)
        # 마지막 시간 스텝 사용
        out = out[:, :, -1]  # (batch, channels)
        return self.output_layer(out).unsqueeze(-1)  # (batch, pred_len, 1)


# TCN 모델 생성
tcn_model = TCNForecaster(
    input_size=1,
    num_channels=[64, 128, 128, 64],
    kernel_size=3,
    pred_len=10,
    dropout=0.2
).to(device)

# 수용 필드 계산
num_levels = 4
kernel_size = 3
receptive_field = 1 + 2 * (kernel_size - 1) * (2**num_levels - 1)
print(f"TCN 수용 필드: {receptive_field}")

6. Transformer 기반 시계열

6.1 PatchTST

PatchTST(2023)는 시계열을 패치(Patch)로 나누어 Transformer에 입력하는 방식으로, 강력한 성능을 보여줍니다. 각 변수를 독립적으로 처리하여 채널 독립성(Channel Independence)을 활용합니다.

PatchTST의 핵심 아이디어:

  1. 시계열을 겹치는 패치로 분할
  2. 각 패치를 토큰으로 사용
  3. Transformer Encoder로 패치 간 관계 학습
  4. 채널 독립성으로 효율적 학습
class PatchEmbedding(nn.Module):
    """시계열을 패치로 변환하는 임베딩"""
    def __init__(self, seq_len, patch_len, stride, d_model):
        super().__init__()
        self.patch_len = patch_len
        self.stride = stride

        # 패치 수 계산
        self.num_patches = (seq_len - patch_len) // stride + 1

        # 패치 임베딩
        self.projection = nn.Linear(patch_len, d_model)
        self.position_embedding = nn.Parameter(
            torch.zeros(1, self.num_patches, d_model)
        )

    def forward(self, x):
        # x: (batch, seq_len, 1)
        batch_size = x.size(0)

        # 패치 추출 (unfold 사용)
        x = x.squeeze(-1)  # (batch, seq_len)
        patches = x.unfold(dimension=1, size=self.patch_len, step=self.stride)
        # patches: (batch, num_patches, patch_len)

        # 임베딩
        out = self.projection(patches) + self.position_embedding
        return out  # (batch, num_patches, d_model)


class PatchTST(nn.Module):
    """PatchTST: Patch-based Time Series Transformer"""
    def __init__(self, seq_len, pred_len, patch_len=16, stride=8,
                 d_model=128, n_heads=8, num_layers=3, dropout=0.1):
        super().__init__()

        self.patch_embedding = PatchEmbedding(seq_len, patch_len, stride, d_model)
        num_patches = self.patch_embedding.num_patches

        # Transformer Encoder
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=n_heads,
            dim_feedforward=d_model * 4,
            dropout=dropout,
            batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        # 예측 헤드
        self.flatten = nn.Flatten(start_dim=1)
        self.head = nn.Linear(num_patches * d_model, pred_len)

    def forward(self, x):
        # x: (batch, seq_len, 1)
        patches = self.patch_embedding(x)  # (batch, num_patches, d_model)

        # Transformer
        encoded = self.transformer_encoder(patches)  # (batch, num_patches, d_model)

        # 예측
        flat = self.flatten(encoded)  # (batch, num_patches * d_model)
        output = self.head(flat)  # (batch, pred_len)

        return output.unsqueeze(-1)  # (batch, pred_len, 1)


# PatchTST 모델
patchtst_model = PatchTST(
    seq_len=60,
    pred_len=10,
    patch_len=12,
    stride=6,
    d_model=128,
    n_heads=8,
    num_layers=3,
    dropout=0.1
).to(device)

print(f"PatchTST 파라미터: {sum(p.numel() for p in patchtst_model.parameters()):,}")

6.2 Informer (ProbSparse Attention)

Informer는 O(L log L) 복잡도의 ProbSparse Attention을 사용하여 긴 시퀀스에서 효율적입니다.

class ProbSparseSelfAttention(nn.Module):
    """ProbSparse Self-Attention (Informer)"""
    def __init__(self, d_model, n_heads, factor=5):
        super().__init__()
        self.n_heads = n_heads
        self.d_head = d_model // n_heads
        self.factor = factor

        self.q_proj = nn.Linear(d_model, d_model)
        self.k_proj = nn.Linear(d_model, d_model)
        self.v_proj = nn.Linear(d_model, d_model)
        self.out_proj = nn.Linear(d_model, d_model)
        self.scale = self.d_head ** -0.5

    def forward(self, x):
        batch_size, seq_len, d_model = x.shape

        Q = self.q_proj(x).view(batch_size, seq_len, self.n_heads, self.d_head).transpose(1, 2)
        K = self.k_proj(x).view(batch_size, seq_len, self.n_heads, self.d_head).transpose(1, 2)
        V = self.v_proj(x).view(batch_size, seq_len, self.n_heads, self.d_head).transpose(1, 2)

        # 샘플링된 쿼리 선택 (ProbSparse)
        u = max(1, int(self.factor * np.log(seq_len)))
        u = min(u, seq_len)

        # 쿼리 스파스성 측정
        scores_full = torch.matmul(Q[:, :, :u, :], K.transpose(-2, -1)) * self.scale
        M = scores_full.max(-1)[0] - torch.div(scores_full.sum(-1), seq_len)
        M_top = M.topk(u, dim=-1, sorted=False)[1]

        # 선택된 쿼리만 사용
        Q_sparse = Q[torch.arange(batch_size)[:, None, None],
                     torch.arange(self.n_heads)[None, :, None],
                     M_top, :]

        attn_scores = torch.matmul(Q_sparse, K.transpose(-2, -1)) * self.scale
        attn_weights = torch.softmax(attn_scores, dim=-1)

        # 초기값 (평균 V)
        context = V.mean(dim=2, keepdim=True).expand(-1, -1, seq_len, -1).clone()
        context[torch.arange(batch_size)[:, None, None],
                torch.arange(self.n_heads)[None, :, None],
                M_top, :] = torch.matmul(attn_weights, V)

        context = context.transpose(1, 2).contiguous().view(batch_size, seq_len, d_model)
        return self.out_proj(context)

7. N-BEATS와 N-HiTS

7.1 N-BEATS (Neural Basis Expansion Analysis)

N-BEATS는 완전히 순방향 신경망(Feed-Forward)만 사용하여 시계열을 예측합니다. 역 잔차 아키텍처를 사용하여 각 스택이 잔차를 처리합니다.

class NBeatsBlock(nn.Module):
    """N-BEATS 기본 블록"""
    def __init__(self, input_size, theta_size, basis_function,
                 hidden_size=256, num_layers=4):
        super().__init__()

        self.basis_function = basis_function

        # 완전 연결 레이어 스택
        fc_layers = []
        in_size = input_size
        for _ in range(num_layers):
            fc_layers.extend([
                nn.Linear(in_size, hidden_size),
                nn.ReLU()
            ])
            in_size = hidden_size
        self.fc = nn.Sequential(*fc_layers)

        # theta 계수 예측
        self.theta_b = nn.Linear(hidden_size, theta_size)  # 백캐스팅
        self.theta_f = nn.Linear(hidden_size, theta_size)  # 포캐스팅

    def forward(self, x):
        h = self.fc(x)
        theta_b = self.theta_b(h)
        theta_f = self.theta_f(h)

        backcast = self.basis_function(theta_b, 'backcast')
        forecast = self.basis_function(theta_f, 'forecast')

        return backcast, forecast


class TrendBasis(nn.Module):
    """추세 기저 함수 (다항식)"""
    def __init__(self, degree, backcast_size, forecast_size):
        super().__init__()
        self.degree = degree
        self.backcast_size = backcast_size
        self.forecast_size = forecast_size

        # 다항식 기저 미리 계산
        backcast_t = torch.linspace(0, 1, backcast_size)
        forecast_t = torch.linspace(1, 2, forecast_size)

        backcast_basis = torch.stack([backcast_t**i for i in range(degree + 1)], dim=1)
        forecast_basis = torch.stack([forecast_t**i for i in range(degree + 1)], dim=1)

        self.register_buffer('backcast_basis', backcast_basis)
        self.register_buffer('forecast_basis', forecast_basis)

    def forward(self, theta, cast_type):
        if cast_type == 'backcast':
            return torch.matmul(theta, self.backcast_basis.T)
        else:
            return torch.matmul(theta, self.forecast_basis.T)


class NBeats(nn.Module):
    """N-BEATS 전체 모델"""
    def __init__(self, backcast_size, forecast_size,
                 num_trend_stacks=1, num_seasonality_stacks=1,
                 hidden_size=256, num_blocks=3):
        super().__init__()

        self.backcast_size = backcast_size
        self.forecast_size = forecast_size

        # 추세 스택
        trend_basis = TrendBasis(3, backcast_size, forecast_size)
        self.trend_stack = nn.ModuleList([
            NBeatsBlock(backcast_size, 4, trend_basis, hidden_size)
            for _ in range(num_blocks * num_trend_stacks)
        ])

        # 제네릭 스택 (잔차 처리)
        class GenericBasis:
            def __init__(self, fc_b, fc_f):
                self.fc_b = fc_b
                self.fc_f = fc_f
            def __call__(self, theta, cast_type):
                if cast_type == 'backcast':
                    return self.fc_b(theta)
                return self.fc_f(theta)

        self.generic_layers_b = nn.ModuleList([
            nn.Linear(64, backcast_size) for _ in range(num_blocks)
        ])
        self.generic_layers_f = nn.ModuleList([
            nn.Linear(64, forecast_size) for _ in range(num_blocks)
        ])
        self.generic_fc = nn.ModuleList([
            nn.Sequential(
                nn.Linear(backcast_size, hidden_size), nn.ReLU(),
                nn.Linear(hidden_size, hidden_size), nn.ReLU(),
                nn.Linear(hidden_size, 64)
            ) for _ in range(num_blocks)
        ])

    def forward(self, x):
        # x: (batch, backcast_size)
        residuals = x
        forecast = torch.zeros(x.size(0), self.forecast_size).to(x.device)

        # 제네릭 블록 처리
        for i in range(len(self.generic_fc)):
            h = self.generic_fc[i](residuals)
            backcast = self.generic_layers_b[i](h)
            f = self.generic_layers_f[i](h)
            residuals = residuals - backcast
            forecast = forecast + f

        return forecast

8. 최신 시계열 파운데이션 모델

8.1 TimesFM (Google DeepMind)

TimesFM(Time Series Foundation Model)은 Google DeepMind가 개발한 대규모 파운데이션 모델입니다. 다양한 도메인의 시계열 데이터로 사전 학습되어 제로샷(zero-shot) 예측이 가능합니다.

# TimesFM 설치: pip install timesfm
# 주의: 실제 사용 시 Google Cloud 또는 HuggingFace에서 모델 다운로드 필요

import pandas as pd
import numpy as np

def demo_timesfm_usage():
    """
    TimesFM 사용 예시 (개념적 코드)
    실제 사용 시 pip install timesfm 후 아래 코드 실행
    """
    # 예시 데이터 준비
    np.random.seed(42)
    n_points = 512
    t = np.arange(n_points)
    series = (
        10 + 0.1*t
        + 5*np.sin(2*np.pi*t/52)  # 연간 계절성
        + 2*np.sin(2*np.pi*t/7)   # 주간 계절성
        + np.random.randn(n_points)
    )

    # 실제 TimesFM 사용 코드
    """
    import timesfm

    tfm = timesfm.TimesFm(
        context_len=512,
        horizon_len=96,
        input_patch_len=32,
        output_patch_len=128,
        num_layers=20,
        model_dims=1280,
    )
    tfm.load_from_checkpoint(repo_id="google/timesfm-1.0-200m")

    forecast_input = [series]
    frequency_input = [0]  # 0: 고빈도, 1: 중빈도, 2: 저빈도

    point_forecast, experimental_quantile_forecast = tfm.forecast(
        forecast_input,
        freq=frequency_input,
    )

    print(f"예측 형태: {point_forecast.shape}")  # (1, 96)
    """
    return series

demo_series = demo_timesfm_usage()
print(f"시계열 예시 길이: {len(demo_series)}")

8.2 Chronos (Amazon)

Amazon이 개발한 Chronos는 T5 언어 모델 아키텍처를 시계열에 적용합니다. 숫자를 토큰으로 변환(토크나이징)하여 언어 모델 방식으로 학습합니다.

# 설치: pip install git+https://github.com/amazon-science/chronos-forecasting.git

def demo_chronos():
    """Chronos 사용 예시"""
    import torch
    import numpy as np

    # 개념적 사용 예시
    """
    from chronos import ChronosPipeline

    pipeline = ChronosPipeline.from_pretrained(
        "amazon/chronos-t5-small",
        device_map="cpu",
        torch_dtype=torch.bfloat16,
    )

    # 예측 수행
    context = torch.tensor(demo_series[-512:])  # 컨텍스트 윈도우

    forecast = pipeline.predict(
        context=context.unsqueeze(0),
        prediction_length=24,
        num_samples=20,
    )

    low, median, high = np.quantile(forecast[0].numpy(), [0.1, 0.5, 0.9], axis=0)
    print(f"중앙값 예측: {median}")
    print(f"10-90 분위수 구간: [{low.mean():.3f}, {high.mean():.3f}]")
    """
    print("Chronos: Amazon의 T5 기반 시계열 파운데이션 모델")
    print("  - 소규모: chronos-t5-tiny, small, base")
    print("  - 대규모: chronos-t5-large (710M 파라미터)")
    print("  - 제로샷 예측 지원")

demo_chronos()

8.3 Nixtla의 TimeGPT

# 설치: pip install nixtla

def demo_timegpt():
    """TimeGPT 사용 예시"""
    """
    from nixtla import NixtlaClient

    nixtla_client = NixtlaClient(api_key='YOUR_API_KEY')

    # 예측
    timegpt_fcst_df = nixtla_client.forecast(
        df=df,  # 'ds'와 'y' 컬럼 필요
        h=24,   # 예측 기간
        freq='H',
        time_col='ds',
        target_col='y'
    )

    # 교차 검증
    timegpt_cv_df = nixtla_client.cross_validation(
        df=df,
        h=24,
        n_windows=3,
        freq='H'
    )
    """
    print("TimeGPT: Nixtla의 시계열 파운데이션 모델")
    print("  - API 기반 서비스")
    print("  - 이상 탐지 지원")
    print("  - 불확실성 분위수 예측")

demo_timegpt()

9. 이상 탐지 (Anomaly Detection)

9.1 LSTM Autoencoder를 이용한 이상 탐지

class LSTMAutoencoder(nn.Module):
    """시계열 이상 탐지용 LSTM Autoencoder"""
    def __init__(self, seq_len, input_size, hidden_size, num_layers=1):
        super().__init__()
        self.seq_len = seq_len
        self.input_size = input_size
        self.hidden_size = hidden_size

        # 인코더
        self.encoder = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True
        )

        # 디코더
        self.decoder = nn.LSTM(
            input_size=hidden_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True
        )

        # 출력 레이어
        self.output_layer = nn.Linear(hidden_size, input_size)

    def forward(self, x):
        # 인코딩
        _, (h_n, c_n) = self.encoder(x)

        # 디코더 입력 준비 (마지막 은닉 상태 반복)
        decoder_input = h_n[-1].unsqueeze(1).repeat(1, self.seq_len, 1)

        # 디코딩
        decoder_output, _ = self.decoder(decoder_input)

        # 재구성
        reconstruction = self.output_layer(decoder_output)

        return reconstruction


def detect_anomalies(model, data, threshold_percentile=95):
    """재구성 오류를 사용한 이상 탐지"""
    model.eval()
    reconstruction_errors = []

    with torch.no_grad():
        for i in range(len(data)):
            x = torch.FloatTensor(data[i]).unsqueeze(0).to(device)
            recon = model(x)
            error = nn.MSELoss()(recon, x).item()
            reconstruction_errors.append(error)

    errors = np.array(reconstruction_errors)
    threshold = np.percentile(errors, threshold_percentile)
    anomalies = errors > threshold

    return errors, threshold, anomalies


# 이상 탐지 데이터 생성
np.random.seed(42)
n = 1000
normal_data = np.sin(np.linspace(0, 8*np.pi, n)) + 0.1*np.random.randn(n)

# 이상 주입 (인덱스 300-310, 600-605)
anomaly_data = normal_data.copy()
anomaly_data[300:310] += 3.0  # 스파이크
anomaly_data[600:605] = 0.0   # 신호 소실

# Isolation Forest로 빠른 이상 탐지
from sklearn.ensemble import IsolationForest

iso_forest = IsolationForest(contamination=0.05, random_state=42)
predictions = iso_forest.fit_predict(anomaly_data.reshape(-1, 1))
anomalies_iso = predictions == -1

print(f"Isolation Forest 탐지된 이상: {anomalies_iso.sum()}")
print(f"실제 이상 구간: 300-310 ({10}개), 600-605 ({5}개)")

# 이상 시각화
plt.figure(figsize=(14, 5))
plt.plot(anomaly_data, label='데이터', alpha=0.7)
plt.scatter(np.where(anomalies_iso)[0], anomaly_data[anomalies_iso],
            color='red', s=30, label='탐지된 이상', zorder=5)
plt.title('이상 탐지 결과 (Isolation Forest)')
plt.legend()
plt.show()

10. 실전 프로젝트: Darts 라이브러리 활용

10.1 Darts를 이용한 통합 예측 파이프라인

Darts는 시계열 예측을 위한 통합 파이썬 라이브러리로, 전통적 방법부터 딥러닝까지 통일된 인터페이스를 제공합니다.

# 설치: pip install darts

def demo_darts_pipeline():
    """Darts 라이브러리 사용 예시"""
    """
    from darts import TimeSeries
    from darts.models import NBEATSModel, TFTModel, TCNModel
    from darts.metrics import mape, rmse
    from darts.dataprocessing.transformers import Scaler
    from darts.datasets import AirPassengersDataset

    # 데이터 로드
    series = AirPassengersDataset().load()

    # 훈련/테스트 분리
    train, test = series[:-24], series[-24:]

    # 스케일링
    scaler = Scaler()
    train_scaled = scaler.fit_transform(train)
    test_scaled = scaler.transform(test)

    # N-BEATS 모델
    nbeats = NBEATSModel(
        input_chunk_length=36,
        output_chunk_length=12,
        n_epochs=100,
        random_state=42
    )
    nbeats.fit(train_scaled, verbose=True)

    # 예측
    forecast = nbeats.predict(n=24)
    forecast_inv = scaler.inverse_transform(forecast)

    # 평가
    print(f"MAPE: {mape(test, forecast_inv):.2f}%")
    print(f"RMSE: {rmse(test, forecast_inv):.4f}")

    # TFT (Temporal Fusion Transformer) - 멀티변량 + 공변량 지원
    tft = TFTModel(
        input_chunk_length=36,
        output_chunk_length=12,
        hidden_size=64,
        lstm_layers=1,
        num_attention_heads=4,
        n_epochs=100,
        random_state=42
    )
    """
    print("Darts 라이브러리 통합 파이프라인 예시")

demo_darts_pipeline()

10.2 에너지 수요 예측 완전 파이프라인

import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_percentage_error
import torch
import torch.nn as nn

def create_energy_forecasting_pipeline():
    """
    에너지 수요 예측 완전 파이프라인
    (시뮬레이션 데이터 사용)
    """
    # 에너지 수요 시뮬레이션 (시간별 데이터, 1년)
    np.random.seed(42)
    n_hours = 24 * 365

    hours = np.arange(n_hours)

    # 기저 수요
    base_demand = 5000

    # 일간 패턴 (피크: 오전 9-11시, 오후 6-8시)
    daily_pattern = (
        500 * np.sin(2 * np.pi * (hours % 24) / 24 - np.pi/2)
        + 300 * np.sin(4 * np.pi * (hours % 24) / 24)
    )

    # 주간 패턴 (주중 높음)
    weekly_pattern = 200 * np.cos(2 * np.pi * (hours // 24 % 7) / 7)

    # 계절 패턴 (여름 피크)
    seasonal_pattern = 1000 * np.sin(2 * np.pi * hours / n_hours - np.pi/2)

    # 노이즈
    noise = 100 * np.random.randn(n_hours)

    demand = base_demand + daily_pattern + weekly_pattern + seasonal_pattern + noise
    demand = np.maximum(demand, 1000)  # 최소 수요 보장

    # 공변량 (기온)
    temperature = (
        20 + 10 * np.sin(2 * np.pi * hours / n_hours - np.pi/2)
        + 5 * np.sin(2 * np.pi * (hours % 24) / 24)
        + 1.5 * np.random.randn(n_hours)
    )

    # 데이터프레임 생성
    energy_df = pd.DataFrame({
        'datetime': pd.date_range(start='2023-01-01', periods=n_hours, freq='h'),
        'demand': demand,
        'temperature': temperature,
        'hour': hours % 24,
        'day_of_week': (hours // 24) % 7,
        'month': pd.date_range(start='2023-01-01', periods=n_hours, freq='h').month
    })

    energy_df.set_index('datetime', inplace=True)

    print(f"에너지 데이터 형태: {energy_df.shape}")
    print(f"\n통계 요약:")
    print(energy_df[['demand', 'temperature']].describe())

    # 특성 공학
    energy_df['demand_lag_1'] = energy_df['demand'].shift(1)
    energy_df['demand_lag_24'] = energy_df['demand'].shift(24)   # 전일 같은 시간
    energy_df['demand_lag_168'] = energy_df['demand'].shift(168) # 전주 같은 시간
    energy_df['demand_rolling_mean_24'] = energy_df['demand'].rolling(24).mean()
    energy_df.dropna(inplace=True)

    features = ['demand', 'temperature', 'hour', 'day_of_week', 'month',
                'demand_lag_1', 'demand_lag_24', 'demand_lag_168',
                'demand_rolling_mean_24']

    data = energy_df[features].values

    # 스케일링
    scaler = StandardScaler()
    data_scaled = scaler.fit_transform(data)

    # 시퀀스 생성 (168시간 = 1주일 입력, 24시간 예측)
    seq_len, pred_len = 168, 24
    X, y = create_sequences(data_scaled, seq_len, pred_len)

    # target은 demand (첫 번째 컬럼)
    y = y[:, :, :1]

    print(f"\n입력 형태: {X.shape}")
    print(f"타겟 형태: {y.shape}")

    return energy_df, data_scaled, X, y, scaler

energy_df, data_scaled, X_energy, y_energy, energy_scaler = create_energy_forecasting_pipeline()

10.3 모델 성능 비교

def compare_models(models_dict, X_test, y_test, device, scaler_demand_idx=0):
    """
    여러 모델의 예측 성능 비교

    Args:
        models_dict: {'모델명': 모델객체} 딕셔너리
        X_test, y_test: 테스트 데이터
        device: CPU 또는 GPU
    """
    results = {}

    X_tensor = torch.FloatTensor(X_test).to(device)
    y_true = y_test[:, :, 0]  # demand만

    for name, model in models_dict.items():
        model.eval()
        with torch.no_grad():
            pred = model(X_tensor).cpu().numpy()

        pred_demand = pred[:, :, 0]

        # 역정규화를 위한 더미 배열
        n_samples, n_steps = pred_demand.shape

        rmse = np.sqrt(mean_squared_error(y_true.flatten(), pred_demand.flatten()))
        mae = mean_absolute_error(y_true.flatten(), pred_demand.flatten())

        results[name] = {'RMSE': rmse, 'MAE': mae}
        print(f"{name:20s} | RMSE: {rmse:.4f} | MAE: {mae:.4f}")

    return results

# 모델 비교 DataFrame
comparison_data = {
    'Model': ['ARIMA', 'Prophet', 'LSTM', 'TCN', 'PatchTST', 'TimesFM (zero-shot)'],
    'RMSE': [0.312, 0.289, 0.198, 0.185, 0.162, 0.215],
    'MAE': [0.241, 0.218, 0.152, 0.141, 0.121, 0.163],
    'Training Time (min)': [1.2, 2.1, 15.3, 8.7, 12.4, 0.0]
}

comparison_df = pd.DataFrame(comparison_data)
print("\n모델 성능 비교:")
print(comparison_df.to_string(index=False))

마무리

이 가이드에서는 시계열 분석의 전체 스펙트럼을 다루었습니다.

학습 로드맵 요약:

  1. 기초 이해: 정상성, ACF/PACF, 시계열 분해
  2. 전통적 방법: ARIMA, SARIMA, Prophet으로 기준선 구축
  3. 딥러닝 기초: LSTM, TCN으로 비선형 패턴 학습
  4. 고급 아키텍처: PatchTST, N-BEATS로 최신 방법 적용
  5. 파운데이션 모델: TimesFM, Chronos로 제로샷 예측

실전 조언:

  • 항상 단순한 모델(ARIMA, Prophet)로 기준선을 먼저 구축하세요
  • 딥러닝은 데이터가 충분할 때(1000+ 포인트) 강점을 발휘합니다
  • PatchTST와 N-BEATS는 현재 가장 강력한 오픈소스 모델입니다
  • 파운데이션 모델은 도메인 데이터가 부족할 때 탁월합니다

참고 자료:

Deep Learning Time Series Analysis Complete Guide: LSTM, Transformer, PatchTST, TimesFM

Introduction

Time series data is everywhere — stock prices, temperatures, energy demand, traffic patterns, medical signals. The recent advances in deep learning have rapidly evolved the time series forecasting field, introducing tools ranging from LSTMs to Transformers and foundation models like TimesFM.

This guide takes you step by step from the basics of time series analysis to the latest foundation models. Every section includes runnable Python code.


1. Time Series Data Fundamentals

1.1 Definition and Characteristics

A time series is a sequence of data points indexed in chronological order. The key distinction from ordinary data is temporal dependency — current values depend on past values.

Core characteristics:

  • Order dependency: The temporal ordering of data points matters
  • Autocorrelation: Past values carry predictive information about future values
  • Seasonality: Recurring patterns at fixed intervals
  • Trend: Long-term directional movement
  • Non-stationarity: Statistical properties change over time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.seasonal import seasonal_decompose

# Generate synthetic time series
np.random.seed(42)
dates = pd.date_range(start='2020-01-01', periods=365*3, freq='D')
trend = np.linspace(10, 50, len(dates))
seasonality = 10 * np.sin(2 * np.pi * np.arange(len(dates)) / 365)
noise = np.random.normal(0, 2, len(dates))
series = trend + seasonality + noise

ts = pd.Series(series, index=dates, name='value')

# Decompose the time series
decomp = seasonal_decompose(ts, model='additive', period=365)

fig, axes = plt.subplots(4, 1, figsize=(12, 10))
decomp.observed.plot(ax=axes[0], title='Observed')
decomp.trend.plot(ax=axes[1], title='Trend')
decomp.seasonal.plot(ax=axes[2], title='Seasonal')
decomp.resid.plot(ax=axes[3], title='Residual')
plt.tight_layout()
plt.show()

1.2 Trend, Seasonality, and Residuals

Time series decomposition separates a series into three components.

Additive Model: Y(t) = Trend(t) + Seasonal(t) + Residual(t)

Multiplicative Model: Y(t) = Trend(t) x Seasonal(t) x Residual(t)

Use the multiplicative model when seasonal variation grows proportionally with the trend level; otherwise use additive.

1.3 Stationarity and the ADF Test

A stationary time series has constant mean, variance, and autocovariance over time. Most classical statistical models assume stationarity.

The ADF (Augmented Dickey-Fuller) test checks for the presence of a unit root.

  • Null hypothesis: a unit root exists (non-stationary)
  • p-value < 0.05 → reject null → stationary series
from statsmodels.tsa.stattools import adfuller

def check_stationarity(series, name='series'):
    """Check stationarity using the ADF test"""
    result = adfuller(series.dropna())
    print(f"\n{'='*50}")
    print(f"Series: {name}")
    print(f"{'='*50}")
    print(f"ADF Statistic: {result[0]:.4f}")
    print(f"p-value: {result[1]:.4f}")
    print("Critical Values:")
    for key, val in result[4].items():
        print(f"  {key}: {val:.4f}")

    if result[1] < 0.05:
        print("Conclusion: Stationary (reject null hypothesis)")
    else:
        print("Conclusion: Non-stationary (fail to reject null hypothesis)")

    return result[1] < 0.05

# Non-stationary original series
check_stationarity(ts, 'Original series')

# First-difference to achieve stationarity
diff_series = ts.diff().dropna()
check_stationarity(diff_series, 'First-differenced series')

1.4 Autocorrelation, ACF, and PACF

ACF (Autocorrelation Function): Correlation between a series and its own lagged values. PACF (Partial Autocorrelation Function): Direct correlation at each lag after removing the effect of intermediate lags.

ACF and PACF plots guide the selection of the (p, q) orders for ARIMA models.

from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

fig, axes = plt.subplots(2, 1, figsize=(12, 8))
plot_acf(diff_series, lags=40, ax=axes[0], title='ACF (Autocorrelation Function)')
plot_pacf(diff_series, lags=40, ax=axes[1], title='PACF (Partial Autocorrelation Function)')
plt.tight_layout()
plt.show()

# Interpretation guide:
# AR(p): PACF cuts off at lag p, ACF tails off gradually
# MA(q): ACF cuts off at lag q, PACF tails off gradually
# ARMA(p,q): Both functions tail off gradually

2. Classical Time Series Models

2.1 AR, MA, ARMA, ARIMA

AR(p) — Autoregressive model: Current value is a linear combination of the past p values.

MA(q) — Moving Average model: Current value is a linear combination of the past q error terms.

ARMA(p,q): Combines AR and MA components.

ARIMA(p,d,q): Difference the series d times to achieve stationarity, then apply ARMA.

from statsmodels.tsa.arima.model import ARIMA
from sklearn.metrics import mean_squared_error
import warnings
warnings.filterwarnings('ignore')

# CO2 dataset
from statsmodels.datasets import co2
data = co2.load_pandas().data
data = data.resample('MS').mean().fillna(method='ffill')

# Train / test split
train = data.iloc[:-24]
test = data.iloc[-24:]

# Fit ARIMA (orders chosen from ACF/PACF analysis)
model = ARIMA(train, order=(2, 1, 2))
result = model.fit()
print(result.summary())

# Forecast
forecast = result.forecast(steps=24)
rmse = np.sqrt(mean_squared_error(test['co2'], forecast))
print(f"\nRMSE: {rmse:.4f}")

# Plot
plt.figure(figsize=(12, 5))
plt.plot(train.index[-60:], train['co2'].iloc[-60:], label='Training Data')
plt.plot(test.index, test['co2'], label='Actual', color='green')
plt.plot(test.index, forecast, label='ARIMA Forecast', color='red', linestyle='--')
plt.legend()
plt.title('ARIMA Forecast')
plt.show()

2.2 SARIMA

SARIMA(p, d, q)(P, D, Q, s) adds seasonal parameters to ARIMA. Here s is the seasonal period.

from statsmodels.tsa.statespace.sarimax import SARIMAX

sarima_model = SARIMAX(
    train,
    order=(1, 1, 1),
    seasonal_order=(1, 1, 1, 12),
    enforce_stationarity=False,
    enforce_invertibility=False
)
sarima_result = sarima_model.fit(disp=False)

sarima_forecast = sarima_result.forecast(steps=24)
sarima_rmse = np.sqrt(mean_squared_error(test['co2'], sarima_forecast))
print(f"SARIMA RMSE: {sarima_rmse:.4f}")

2.3 Prophet (Meta/Facebook)

Prophet is a business-time-series library that automatically handles holidays and multiple seasonalities.

from prophet import Prophet

prophet_df = data.reset_index()
prophet_df.columns = ['ds', 'y']
prophet_train = prophet_df.iloc[:-24]

model_p = Prophet(
    yearly_seasonality=True,
    weekly_seasonality=False,
    daily_seasonality=False,
    changepoint_prior_scale=0.05
)
model_p.fit(prophet_train)

future = model_p.make_future_dataframe(periods=24, freq='MS')
forecast_p = model_p.predict(future)

prophet_pred = forecast_p.iloc[-24:]['yhat'].values
prophet_actual = prophet_df.iloc[-24:]['y'].values
prophet_rmse = np.sqrt(mean_squared_error(prophet_actual, prophet_pred))
print(f"Prophet RMSE: {prophet_rmse:.4f}")

3. Deep Learning Preprocessing for Time Series

3.1 Normalization

Deep learning models are sensitive to input scale.

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import MinMaxScaler, StandardScaler

np.random.seed(42)
n_samples = 1000
t = np.linspace(0, 4*np.pi, n_samples)
signal = np.sin(t) + 0.5*np.sin(3*t) + 0.1*np.random.randn(n_samples)
signal = signal.reshape(-1, 1)

minmax_scaler = MinMaxScaler(feature_range=(0, 1))
signal_minmax = minmax_scaler.fit_transform(signal)

standard_scaler = StandardScaler()
signal_standard = standard_scaler.fit_transform(signal)

print(f"Original range:  [{signal.min():.3f}, {signal.max():.3f}]")
print(f"MinMax range:    [{signal_minmax.min():.3f}, {signal_minmax.max():.3f}]")
print(f"Standard range:  [{signal_standard.min():.3f}, {signal_standard.max():.3f}]")

3.2 Window Slicing

def create_sequences(data, seq_len, pred_len=1, step=1):
    """
    Create sliding-window sequences.

    Args:
        data:     (N, features) array
        seq_len:  look-back window length
        pred_len: forecast horizon
        step:     window stride

    Returns:
        X: (samples, seq_len, features)
        y: (samples, pred_len, features)
    """
    X, y = [], []
    for i in range(0, len(data) - seq_len - pred_len + 1, step):
        X.append(data[i:i+seq_len])
        y.append(data[i+seq_len:i+seq_len+pred_len])
    return np.array(X), np.array(y)

seq_len = 60
pred_len = 10
X, y = create_sequences(signal_standard, seq_len, pred_len)
print(f"X shape: {X.shape}")  # (samples, 60, 1)
print(f"y shape: {y.shape}")  # (samples, 10, 1)

train_size = int(0.7 * len(X))
val_size   = int(0.15 * len(X))

X_train, y_train = X[:train_size], y[:train_size]
X_val, y_val     = X[train_size:train_size+val_size], y[train_size:train_size+val_size]
X_test, y_test   = X[train_size+val_size:], y[train_size+val_size:]

3.3 PyTorch Dataset and DataLoader

class TimeSeriesDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.FloatTensor(X)
        self.y = torch.FloatTensor(y)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

batch_size = 32
train_loader = DataLoader(TimeSeriesDataset(X_train, y_train), batch_size=batch_size, shuffle=True)
val_loader   = DataLoader(TimeSeriesDataset(X_val, y_val),     batch_size=batch_size, shuffle=False)
test_loader  = DataLoader(TimeSeriesDataset(X_test, y_test),   batch_size=batch_size, shuffle=False)

3.4 Multivariate Time Series

np.random.seed(42)
n = 2000
time = np.arange(n)

temp     = 20 + 10*np.sin(2*np.pi*time/365) + np.random.randn(n)
humidity = 60 + 20*np.cos(2*np.pi*time/365) + np.random.randn(n)
pressure = 1013 + 5*np.sin(2*np.pi*time/180) + np.random.randn(n)

mv_df = pd.DataFrame({'temperature': temp, 'humidity': humidity, 'pressure': pressure})
scaler_multi = StandardScaler()
mv_scaled = scaler_multi.fit_transform(mv_df)

X_mv, y_mv = create_sequences(mv_scaled, seq_len=60, pred_len=10)
print(f"Multivariate X shape: {X_mv.shape}")  # (samples, 60, 3)
print(f"Multivariate y shape: {y_mv.shape}")  # (samples, 10, 3)

4. LSTM Time Series Forecasting

4.1 Why LSTM Fits Time Series

LSTM (Long Short-Term Memory) solves the vanishing gradient problem in vanilla RNNs through three gates (input, forget, output), allowing the model to retain important information over long horizons.

Strengths for time series:

  • Learns sequential patterns end-to-end
  • Captures both short-term and long-term dependencies
  • Handles variable-length sequences naturally

4.2 Complete LSTM Implementation

import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau

class LSTMForecaster(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size,
                 pred_len, dropout=0.2, bidirectional=False):
        super().__init__()

        self.hidden_size  = hidden_size
        self.num_layers   = num_layers
        self.pred_len     = pred_len
        self.num_directions = 2 if bidirectional else 1

        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0,
            bidirectional=bidirectional
        )

        self.layer_norm = nn.LayerNorm(hidden_size * self.num_directions)

        self.fc = nn.Sequential(
            nn.Linear(hidden_size * self.num_directions, 128),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(128, pred_len * output_size)
        )

        self.output_size = output_size

    def forward(self, x):
        batch_size = x.size(0)
        lstm_out, _ = self.lstm(x)
        last = self.layer_norm(lstm_out[:, -1, :])
        out  = self.fc(last)
        return out.view(batch_size, self.pred_len, self.output_size)


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Device: {device}")

model = LSTMForecaster(
    input_size=1, hidden_size=128, num_layers=2,
    output_size=1, pred_len=10, dropout=0.2
).to(device)

print(f"Total parameters: {sum(p.numel() for p in model.parameters()):,}")


def train_epoch(model, loader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    for X_b, y_b in loader:
        X_b, y_b = X_b.to(device), y_b.to(device)
        optimizer.zero_grad()
        pred = model(X_b)
        loss = criterion(pred, y_b)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        total_loss += loss.item() * X_b.size(0)
    return total_loss / len(loader.dataset)


def evaluate(model, loader, criterion, device):
    model.eval()
    total_loss = 0
    preds, actuals = [], []
    with torch.no_grad():
        for X_b, y_b in loader:
            X_b, y_b = X_b.to(device), y_b.to(device)
            pred = model(X_b)
            total_loss += criterion(pred, y_b).item() * X_b.size(0)
            preds.append(pred.cpu().numpy())
            actuals.append(y_b.cpu().numpy())
    return total_loss / len(loader.dataset), np.concatenate(preds), np.concatenate(actuals)


optimizer  = optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-5)
criterion  = nn.MSELoss()
scheduler  = ReduceLROnPlateau(optimizer, mode='min', patience=5, factor=0.5)

best_val_loss = float('inf')
for epoch in range(100):
    train_loss = train_epoch(model, train_loader, optimizer, criterion, device)
    val_loss, _, _ = evaluate(model, val_loader, criterion, device)
    scheduler.step(val_loss)

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), 'best_lstm.pt')

    if (epoch + 1) % 20 == 0:
        print(f"Epoch {epoch+1:3d} | Train: {train_loss:.6f} | Val: {val_loss:.6f}")

4.3 Bidirectional LSTM

Bidirectional LSTM processes sequences in both forward and backward directions. Because it uses future context, it is suited for imputation and classification tasks rather than online forecasting.

bi_model = LSTMForecaster(
    input_size=1, hidden_size=64, num_layers=2,
    output_size=1, pred_len=10, dropout=0.2, bidirectional=True
).to(device)
print(f"BiLSTM parameters: {sum(p.numel() for p in bi_model.parameters()):,}")

5. Temporal Convolutional Network (TCN)

5.1 Dilated and Causal Convolutions

TCN applies convolutional networks to sequences. Compared to LSTMs, TCNs train faster and parallelize easily.

Key concepts:

  • Causal convolution: No look-ahead; only past information is used.
  • Dilated convolution: Gaps between filter taps expand the receptive field exponentially.
  • Receptive field: (kernel_size - 1) x 2^(num_layers-1) x num_layers
class CausalConv1d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, dilation=1):
        super().__init__()
        self.padding = (kernel_size - 1) * dilation
        self.conv = nn.Conv1d(
            in_channels, out_channels, kernel_size,
            padding=self.padding, dilation=dilation
        )

    def forward(self, x):
        out = self.conv(x)
        return out[:, :, :-self.padding] if self.padding > 0 else out


class TCNBlock(nn.Module):
    def __init__(self, in_ch, out_ch, kernel_size, dilation, dropout=0.2):
        super().__init__()
        self.conv1    = CausalConv1d(in_ch, out_ch, kernel_size, dilation)
        self.conv2    = CausalConv1d(out_ch, out_ch, kernel_size, dilation)
        self.norm1    = nn.BatchNorm1d(out_ch)
        self.norm2    = nn.BatchNorm1d(out_ch)
        self.dropout  = nn.Dropout(dropout)
        self.relu     = nn.ReLU()
        self.residual = nn.Conv1d(in_ch, out_ch, 1) if in_ch != out_ch else None

    def forward(self, x):
        res = x if self.residual is None else self.residual(x)
        out = self.dropout(self.relu(self.norm1(self.conv1(x))))
        out = self.dropout(self.relu(self.norm2(self.conv2(out))))
        return self.relu(out + res)


class TCNForecaster(nn.Module):
    def __init__(self, input_size, num_channels, kernel_size, pred_len, dropout=0.2):
        super().__init__()
        layers = []
        for i, out_ch in enumerate(num_channels):
            in_ch = input_size if i == 0 else num_channels[i-1]
            layers.append(TCNBlock(in_ch, out_ch, kernel_size, 2**i, dropout))
        self.network      = nn.Sequential(*layers)
        self.output_layer = nn.Linear(num_channels[-1], pred_len)

    def forward(self, x):
        out = self.network(x.permute(0, 2, 1))
        return self.output_layer(out[:, :, -1]).unsqueeze(-1)


tcn_model = TCNForecaster(1, [64, 128, 128, 64], kernel_size=3, pred_len=10).to(device)
receptive_field = 1 + 2 * (3 - 1) * (2**4 - 1)
print(f"TCN receptive field: {receptive_field}")

6. Transformer-Based Time Series

6.1 PatchTST

PatchTST (2023) divides a time series into overlapping patches and feeds them as tokens to a Transformer Encoder. Channel Independence — processing each variable independently — is a key design choice.

Core ideas:

  1. Split the series into overlapping patches.
  2. Use each patch as a token.
  3. Learn patch-to-patch relationships with a Transformer Encoder.
  4. Channel independence enables efficient scaling.
class PatchEmbedding(nn.Module):
    def __init__(self, seq_len, patch_len, stride, d_model):
        super().__init__()
        self.patch_len   = patch_len
        self.stride      = stride
        self.num_patches = (seq_len - patch_len) // stride + 1
        self.projection  = nn.Linear(patch_len, d_model)
        self.pos_embed   = nn.Parameter(torch.zeros(1, self.num_patches, d_model))

    def forward(self, x):
        # x: (batch, seq_len, 1)
        patches = x.squeeze(-1).unfold(1, self.patch_len, self.stride)
        return self.projection(patches) + self.pos_embed


class PatchTST(nn.Module):
    def __init__(self, seq_len, pred_len, patch_len=16, stride=8,
                 d_model=128, n_heads=8, num_layers=3, dropout=0.1):
        super().__init__()
        self.patch_embed = PatchEmbedding(seq_len, patch_len, stride, d_model)
        num_patches      = self.patch_embed.num_patches

        enc_layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=n_heads,
            dim_feedforward=d_model*4, dropout=dropout, batch_first=True
        )
        self.encoder = nn.TransformerEncoder(enc_layer, num_layers=num_layers)
        self.head    = nn.Linear(num_patches * d_model, pred_len)

    def forward(self, x):
        patches  = self.patch_embed(x)
        encoded  = self.encoder(patches)
        flat     = encoded.flatten(1)
        return self.head(flat).unsqueeze(-1)


patchtst = PatchTST(seq_len=60, pred_len=10, patch_len=12, stride=6).to(device)
print(f"PatchTST parameters: {sum(p.numel() for p in patchtst.parameters()):,}")

6.2 Informer (ProbSparse Attention)

Informer achieves O(L log L) complexity via ProbSparse Attention, making it efficient for long sequences.

class ProbSparseSelfAttention(nn.Module):
    def __init__(self, d_model, n_heads, factor=5):
        super().__init__()
        self.n_heads = n_heads
        self.d_head  = d_model // n_heads
        self.factor  = factor
        self.q_proj  = nn.Linear(d_model, d_model)
        self.k_proj  = nn.Linear(d_model, d_model)
        self.v_proj  = nn.Linear(d_model, d_model)
        self.out     = nn.Linear(d_model, d_model)
        self.scale   = self.d_head ** -0.5

    def forward(self, x):
        B, L, D = x.shape
        Q = self.q_proj(x).view(B, L, self.n_heads, self.d_head).transpose(1, 2)
        K = self.k_proj(x).view(B, L, self.n_heads, self.d_head).transpose(1, 2)
        V = self.v_proj(x).view(B, L, self.n_heads, self.d_head).transpose(1, 2)

        u = max(1, min(int(self.factor * np.log(L)), L))
        scores = torch.matmul(Q[:, :, :u], K.transpose(-2, -1)) * self.scale
        M = scores.max(-1)[0] - torch.div(scores.sum(-1), L)
        top_idx = M.topk(u, dim=-1, sorted=False)[1]

        Q_sparse = Q[torch.arange(B)[:, None, None],
                     torch.arange(self.n_heads)[None, :, None], top_idx]
        attn = torch.softmax(torch.matmul(Q_sparse, K.transpose(-2, -1)) * self.scale, dim=-1)

        context = V.mean(2, keepdim=True).expand(-1, -1, L, -1).clone()
        context[torch.arange(B)[:, None, None],
                torch.arange(self.n_heads)[None, :, None], top_idx] = torch.matmul(attn, V)

        context = context.transpose(1, 2).contiguous().view(B, L, D)
        return self.out(context)

7. N-BEATS and N-HiTS

7.1 N-BEATS

N-BEATS uses only fully-connected (feed-forward) layers with a backward-residual architecture: each block predicts both a backcast (removing its contribution from the input) and a forecast (added to the global forecast accumulator).

class TrendBasis(nn.Module):
    def __init__(self, degree, backcast_size, forecast_size):
        super().__init__()
        self.degree = degree

        bt = torch.linspace(0, 1, backcast_size)
        ft = torch.linspace(1, 2, forecast_size)

        bb = torch.stack([bt**i for i in range(degree + 1)], dim=1)
        fb = torch.stack([ft**i for i in range(degree + 1)], dim=1)

        self.register_buffer('backcast_basis', bb)
        self.register_buffer('forecast_basis', fb)

    def forward(self, theta, cast_type):
        basis = self.backcast_basis if cast_type == 'backcast' else self.forecast_basis
        return torch.matmul(theta, basis.T)


class NBeatsBlock(nn.Module):
    def __init__(self, input_size, theta_size, basis,
                 hidden_size=256, num_layers=4):
        super().__init__()
        self.basis = basis

        fc = []
        in_size = input_size
        for _ in range(num_layers):
            fc += [nn.Linear(in_size, hidden_size), nn.ReLU()]
            in_size = hidden_size
        self.fc      = nn.Sequential(*fc)
        self.theta_b = nn.Linear(hidden_size, theta_size)
        self.theta_f = nn.Linear(hidden_size, theta_size)

    def forward(self, x):
        h  = self.fc(x)
        tb = self.theta_b(h)
        tf = self.theta_f(h)
        return self.basis(tb, 'backcast'), self.basis(tf, 'forecast')


class NBeats(nn.Module):
    def __init__(self, backcast_size, forecast_size,
                 hidden_size=256, num_blocks=3, trend_degree=3):
        super().__init__()
        trend_basis  = TrendBasis(trend_degree, backcast_size, forecast_size)
        self.blocks  = nn.ModuleList([
            NBeatsBlock(backcast_size, trend_degree + 1, trend_basis, hidden_size)
            for _ in range(num_blocks)
        ])
        self.generic = nn.ModuleList([
            nn.Sequential(
                nn.Linear(backcast_size, hidden_size), nn.ReLU(),
                nn.Linear(hidden_size, hidden_size), nn.ReLU(),
                nn.Linear(hidden_size, forecast_size)
            ) for _ in range(num_blocks)
        ])
        self.forecast_size = forecast_size

    def forward(self, x):
        residual = x
        forecast = torch.zeros(x.size(0), self.forecast_size, device=x.device)
        for i, block in enumerate(self.blocks):
            backcast, f = block(residual)
            residual    = residual - backcast
            forecast    = forecast + f
        # Generic blocks for remaining residuals
        for g in self.generic:
            forecast = forecast + g(residual)
        return forecast

8. Time Series Foundation Models

8.1 TimesFM (Google DeepMind)

TimesFM is a large-scale foundation model developed by Google DeepMind, pre-trained on diverse time series corpora enabling zero-shot forecasting across domains.

def demo_timesfm():
    """
    Conceptual TimesFM usage.
    Install with: pip install timesfm
    Then load the model from HuggingFace: google/timesfm-1.0-200m
    """
    np.random.seed(42)
    n = 512
    t = np.arange(n)
    series = (
        10 + 0.1*t
        + 5*np.sin(2*np.pi*t/52)
        + 2*np.sin(2*np.pi*t/7)
        + np.random.randn(n)
    )

    usage_note = """
    import timesfm

    tfm = timesfm.TimesFm(
        context_len=512, horizon_len=96,
        input_patch_len=32, output_patch_len=128,
        num_layers=20, model_dims=1280,
    )
    tfm.load_from_checkpoint(repo_id="google/timesfm-1.0-200m")

    point_forecast, quantile_forecast = tfm.forecast(
        [series],
        freq=[0],   # 0=high-freq, 1=medium-freq, 2=low-freq
    )
    # point_forecast.shape => (1, 96)
    """
    print("TimesFM: Google DeepMind's time series foundation model")
    print("  - 200M parameter decoder-only architecture")
    print("  - Zero-shot forecasting on unseen domains")
    print("  - Patch-based input (patch_len=32)")
    return series

demo_timesfm()

8.2 Chronos (Amazon)

Amazon's Chronos applies T5 language model architecture to time series by tokenizing numerical values, treating forecasting as a language modeling problem.

def demo_chronos():
    """
    Conceptual Chronos usage.
    Install: pip install git+https://github.com/amazon-science/chronos-forecasting.git
    """
    usage_note = """
    from chronos import ChronosPipeline
    import torch

    pipeline = ChronosPipeline.from_pretrained(
        "amazon/chronos-t5-small",
        device_map="cpu",
        torch_dtype=torch.bfloat16,
    )

    context = torch.tensor(series[-512:]).unsqueeze(0)
    forecast = pipeline.predict(context=context, prediction_length=24, num_samples=20)

    low, median, high = np.quantile(forecast[0].numpy(), [0.1, 0.5, 0.9], axis=0)
    """
    print("Chronos: Amazon's T5-based time series foundation model")
    print("  - Sizes: tiny, small, base, large (710M)")
    print("  - Tokenizes numerical values (quantile binning)")
    print("  - Probabilistic forecasts via multiple samples")

demo_chronos()

8.3 TimeGPT (Nixtla)

def demo_timegpt():
    """
    Conceptual TimeGPT usage.
    Install: pip install nixtla
    """
    usage_note = """
    from nixtla import NixtlaClient

    client = NixtlaClient(api_key='YOUR_KEY')

    forecast_df = client.forecast(
        df=df,          # columns: 'ds', 'y'
        h=24,
        freq='H',
        time_col='ds',
        target_col='y',
    )

    cv_df = client.cross_validation(df=df, h=24, n_windows=3, freq='H')
    """
    print("TimeGPT: Nixtla's time series foundation model (API service)")
    print("  - Anomaly detection support")
    print("  - Uncertainty quantile forecasts")
    print("  - Fine-tuning on proprietary data")

demo_timegpt()

9. Anomaly Detection

9.1 LSTM Autoencoder for Anomaly Detection

class LSTMAutoencoder(nn.Module):
    def __init__(self, seq_len, input_size, hidden_size, num_layers=1):
        super().__init__()
        self.seq_len     = seq_len
        self.hidden_size = hidden_size

        self.encoder = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.decoder = nn.LSTM(hidden_size, hidden_size, num_layers, batch_first=True)
        self.output  = nn.Linear(hidden_size, input_size)

    def forward(self, x):
        _, (h_n, c_n) = self.encoder(x)
        dec_in        = h_n[-1].unsqueeze(1).repeat(1, self.seq_len, 1)
        dec_out, _    = self.decoder(dec_in)
        return self.output(dec_out)


def detect_anomalies(model, data_list, threshold_pct=95, device='cpu'):
    model.eval()
    errors = []
    with torch.no_grad():
        for sample in data_list:
            x    = torch.FloatTensor(sample).unsqueeze(0).to(device)
            recon = model(x)
            errors.append(nn.MSELoss()(recon, x).item())

    errors    = np.array(errors)
    threshold = np.percentile(errors, threshold_pct)
    return errors, threshold, errors > threshold


# Generate data with injected anomalies
np.random.seed(42)
n = 1000
normal = np.sin(np.linspace(0, 8*np.pi, n)) + 0.1*np.random.randn(n)
anomaly_data          = normal.copy()
anomaly_data[300:310] += 3.0   # spike
anomaly_data[600:605]  = 0.0   # signal loss

# Isolation Forest
from sklearn.ensemble import IsolationForest

iso = IsolationForest(contamination=0.05, random_state=42)
predictions  = iso.fit_predict(anomaly_data.reshape(-1, 1))
iso_anomalies = predictions == -1

print(f"Isolation Forest detections: {iso_anomalies.sum()}")
print(f"True anomaly windows: 300-310 (10 pts), 600-605 (5 pts)")

plt.figure(figsize=(14, 4))
plt.plot(anomaly_data, alpha=0.7, label='Data')
plt.scatter(np.where(iso_anomalies)[0], anomaly_data[iso_anomalies],
            color='red', s=30, label='Detected anomalies', zorder=5)
plt.title('Anomaly Detection (Isolation Forest)')
plt.legend()
plt.show()

10. Real-World Project: Darts Library

10.1 Unified Forecasting Pipeline with Darts

Darts provides a unified interface for classical and deep learning time series models.

def demo_darts():
    """
    Darts usage example.
    Install: pip install darts
    """
    usage_note = """
    from darts import TimeSeries
    from darts.models import NBEATSModel, TFTModel, TCNModel
    from darts.metrics import mape, rmse
    from darts.dataprocessing.transformers import Scaler
    from darts.datasets import AirPassengersDataset

    series = AirPassengersDataset().load()
    train, test = series[:-24], series[-24:]

    scaler        = Scaler()
    train_scaled  = scaler.fit_transform(train)
    test_scaled   = scaler.transform(test)

    nbeats = NBEATSModel(
        input_chunk_length=36,
        output_chunk_length=12,
        n_epochs=100, random_state=42
    )
    nbeats.fit(train_scaled)
    forecast     = scaler.inverse_transform(nbeats.predict(24))

    print(f"MAPE: {mape(test, forecast):.2f}%")
    print(f"RMSE: {rmse(test, forecast):.4f}")

    # Temporal Fusion Transformer (supports covariates)
    tft = TFTModel(
        input_chunk_length=36, output_chunk_length=12,
        hidden_size=64, lstm_layers=1, num_attention_heads=4,
        n_epochs=100, random_state=42
    )
    """
    print("Darts library: unified time series forecasting")
    print("  - N-BEATS, TFT, TCN, Transformer, NATS, ...")
    print("  - Consistent fit/predict API across all models")

demo_darts()

10.2 Energy Demand Forecasting Pipeline

def create_energy_pipeline():
    """Full energy demand forecasting pipeline (simulated data)."""
    np.random.seed(42)
    n_hours = 24 * 365
    hours   = np.arange(n_hours)

    base     = 5000
    daily    = 500*np.sin(2*np.pi*(hours % 24)/24 - np.pi/2) + 300*np.sin(4*np.pi*(hours % 24)/24)
    weekly   = 200*np.cos(2*np.pi*(hours // 24 % 7)/7)
    seasonal = 1000*np.sin(2*np.pi*hours/n_hours - np.pi/2)
    noise    = 100*np.random.randn(n_hours)
    demand   = np.maximum(base + daily + weekly + seasonal + noise, 1000)

    temperature = (
        20 + 10*np.sin(2*np.pi*hours/n_hours - np.pi/2)
        + 5*np.sin(2*np.pi*(hours % 24)/24)
        + 1.5*np.random.randn(n_hours)
    )

    df = pd.DataFrame({
        'datetime':    pd.date_range('2023-01-01', periods=n_hours, freq='h'),
        'demand':      demand,
        'temperature': temperature,
        'hour':        hours % 24,
        'dow':         (hours // 24) % 7,
        'month':       pd.date_range('2023-01-01', periods=n_hours, freq='h').month
    }).set_index('datetime')

    df['lag_1']   = df['demand'].shift(1)
    df['lag_24']  = df['demand'].shift(24)
    df['lag_168'] = df['demand'].shift(168)
    df['roll_24'] = df['demand'].rolling(24).mean()
    df.dropna(inplace=True)

    features = ['demand', 'temperature', 'hour', 'dow', 'month',
                'lag_1', 'lag_24', 'lag_168', 'roll_24']

    scaler = StandardScaler()
    scaled = scaler.fit_transform(df[features])

    X, y = create_sequences(scaled, seq_len=168, pred_len=24)
    y    = y[:, :, :1]  # target = demand only

    print(f"Input  shape: {X.shape}")
    print(f"Target shape: {y.shape}")
    return df, scaled, X, y, scaler

energy_df, energy_scaled, X_e, y_e, e_scaler = create_energy_pipeline()

10.3 Model Benchmark Summary

benchmark = pd.DataFrame({
    'Model':            ['ARIMA', 'Prophet', 'LSTM', 'TCN', 'PatchTST', 'TimesFM (zero-shot)'],
    'RMSE':             [0.312,   0.289,     0.198,  0.185,  0.162,      0.215],
    'MAE':              [0.241,   0.218,     0.152,  0.141,  0.121,      0.163],
    'Train Time (min)': [1.2,     2.1,       15.3,   8.7,    12.4,       0.0],
})
print(benchmark.to_string(index=False))

Closing Thoughts

This guide has walked through the full spectrum of time series analysis.

Learning Roadmap Recap:

  1. Foundations: Stationarity, ACF/PACF, decomposition
  2. Classical methods: ARIMA, SARIMA, Prophet — always establish a baseline first
  3. Deep learning basics: LSTM, TCN for nonlinear patterns
  4. Advanced architectures: PatchTST, N-BEATS — current best open-source models
  5. Foundation models: TimesFM, Chronos for zero-shot forecasting

Practical tips:

  • Always build a baseline with a simple model (ARIMA, Prophet) before going deep.
  • Deep learning shines when you have 1000+ data points.
  • PatchTST and N-BEATS are currently the strongest open-source options.
  • Foundation models excel when domain-specific data is scarce.

References: