Skip to content
Published on

Deep Learning Time Series Analysis Complete Guide: LSTM, Transformer, PatchTST, TimesFM

Authors

Introduction

Time series data is everywhere — stock prices, temperatures, energy demand, traffic patterns, medical signals. The recent advances in deep learning have rapidly evolved the time series forecasting field, introducing tools ranging from LSTMs to Transformers and foundation models like TimesFM.

This guide takes you step by step from the basics of time series analysis to the latest foundation models. Every section includes runnable Python code.


1. Time Series Data Fundamentals

1.1 Definition and Characteristics

A time series is a sequence of data points indexed in chronological order. The key distinction from ordinary data is temporal dependency — current values depend on past values.

Core characteristics:

  • Order dependency: The temporal ordering of data points matters
  • Autocorrelation: Past values carry predictive information about future values
  • Seasonality: Recurring patterns at fixed intervals
  • Trend: Long-term directional movement
  • Non-stationarity: Statistical properties change over time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.seasonal import seasonal_decompose

# Generate synthetic time series
np.random.seed(42)
dates = pd.date_range(start='2020-01-01', periods=365*3, freq='D')
trend = np.linspace(10, 50, len(dates))
seasonality = 10 * np.sin(2 * np.pi * np.arange(len(dates)) / 365)
noise = np.random.normal(0, 2, len(dates))
series = trend + seasonality + noise

ts = pd.Series(series, index=dates, name='value')

# Decompose the time series
decomp = seasonal_decompose(ts, model='additive', period=365)

fig, axes = plt.subplots(4, 1, figsize=(12, 10))
decomp.observed.plot(ax=axes[0], title='Observed')
decomp.trend.plot(ax=axes[1], title='Trend')
decomp.seasonal.plot(ax=axes[2], title='Seasonal')
decomp.resid.plot(ax=axes[3], title='Residual')
plt.tight_layout()
plt.show()

1.2 Trend, Seasonality, and Residuals

Time series decomposition separates a series into three components.

Additive Model: Y(t) = Trend(t) + Seasonal(t) + Residual(t)

Multiplicative Model: Y(t) = Trend(t) x Seasonal(t) x Residual(t)

Use the multiplicative model when seasonal variation grows proportionally with the trend level; otherwise use additive.

1.3 Stationarity and the ADF Test

A stationary time series has constant mean, variance, and autocovariance over time. Most classical statistical models assume stationarity.

The ADF (Augmented Dickey-Fuller) test checks for the presence of a unit root.

  • Null hypothesis: a unit root exists (non-stationary)
  • p-value < 0.05 → reject null → stationary series
from statsmodels.tsa.stattools import adfuller

def check_stationarity(series, name='series'):
    """Check stationarity using the ADF test"""
    result = adfuller(series.dropna())
    print(f"\n{'='*50}")
    print(f"Series: {name}")
    print(f"{'='*50}")
    print(f"ADF Statistic: {result[0]:.4f}")
    print(f"p-value: {result[1]:.4f}")
    print("Critical Values:")
    for key, val in result[4].items():
        print(f"  {key}: {val:.4f}")

    if result[1] < 0.05:
        print("Conclusion: Stationary (reject null hypothesis)")
    else:
        print("Conclusion: Non-stationary (fail to reject null hypothesis)")

    return result[1] < 0.05

# Non-stationary original series
check_stationarity(ts, 'Original series')

# First-difference to achieve stationarity
diff_series = ts.diff().dropna()
check_stationarity(diff_series, 'First-differenced series')

1.4 Autocorrelation, ACF, and PACF

ACF (Autocorrelation Function): Correlation between a series and its own lagged values. PACF (Partial Autocorrelation Function): Direct correlation at each lag after removing the effect of intermediate lags.

ACF and PACF plots guide the selection of the (p, q) orders for ARIMA models.

from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

fig, axes = plt.subplots(2, 1, figsize=(12, 8))
plot_acf(diff_series, lags=40, ax=axes[0], title='ACF (Autocorrelation Function)')
plot_pacf(diff_series, lags=40, ax=axes[1], title='PACF (Partial Autocorrelation Function)')
plt.tight_layout()
plt.show()

# Interpretation guide:
# AR(p): PACF cuts off at lag p, ACF tails off gradually
# MA(q): ACF cuts off at lag q, PACF tails off gradually
# ARMA(p,q): Both functions tail off gradually

2. Classical Time Series Models

2.1 AR, MA, ARMA, ARIMA

AR(p) — Autoregressive model: Current value is a linear combination of the past p values.

MA(q) — Moving Average model: Current value is a linear combination of the past q error terms.

ARMA(p,q): Combines AR and MA components.

ARIMA(p,d,q): Difference the series d times to achieve stationarity, then apply ARMA.

from statsmodels.tsa.arima.model import ARIMA
from sklearn.metrics import mean_squared_error
import warnings
warnings.filterwarnings('ignore')

# CO2 dataset
from statsmodels.datasets import co2
data = co2.load_pandas().data
data = data.resample('MS').mean().fillna(method='ffill')

# Train / test split
train = data.iloc[:-24]
test = data.iloc[-24:]

# Fit ARIMA (orders chosen from ACF/PACF analysis)
model = ARIMA(train, order=(2, 1, 2))
result = model.fit()
print(result.summary())

# Forecast
forecast = result.forecast(steps=24)
rmse = np.sqrt(mean_squared_error(test['co2'], forecast))
print(f"\nRMSE: {rmse:.4f}")

# Plot
plt.figure(figsize=(12, 5))
plt.plot(train.index[-60:], train['co2'].iloc[-60:], label='Training Data')
plt.plot(test.index, test['co2'], label='Actual', color='green')
plt.plot(test.index, forecast, label='ARIMA Forecast', color='red', linestyle='--')
plt.legend()
plt.title('ARIMA Forecast')
plt.show()

2.2 SARIMA

SARIMA(p, d, q)(P, D, Q, s) adds seasonal parameters to ARIMA. Here s is the seasonal period.

from statsmodels.tsa.statespace.sarimax import SARIMAX

sarima_model = SARIMAX(
    train,
    order=(1, 1, 1),
    seasonal_order=(1, 1, 1, 12),
    enforce_stationarity=False,
    enforce_invertibility=False
)
sarima_result = sarima_model.fit(disp=False)

sarima_forecast = sarima_result.forecast(steps=24)
sarima_rmse = np.sqrt(mean_squared_error(test['co2'], sarima_forecast))
print(f"SARIMA RMSE: {sarima_rmse:.4f}")

2.3 Prophet (Meta/Facebook)

Prophet is a business-time-series library that automatically handles holidays and multiple seasonalities.

from prophet import Prophet

prophet_df = data.reset_index()
prophet_df.columns = ['ds', 'y']
prophet_train = prophet_df.iloc[:-24]

model_p = Prophet(
    yearly_seasonality=True,
    weekly_seasonality=False,
    daily_seasonality=False,
    changepoint_prior_scale=0.05
)
model_p.fit(prophet_train)

future = model_p.make_future_dataframe(periods=24, freq='MS')
forecast_p = model_p.predict(future)

prophet_pred = forecast_p.iloc[-24:]['yhat'].values
prophet_actual = prophet_df.iloc[-24:]['y'].values
prophet_rmse = np.sqrt(mean_squared_error(prophet_actual, prophet_pred))
print(f"Prophet RMSE: {prophet_rmse:.4f}")

3. Deep Learning Preprocessing for Time Series

3.1 Normalization

Deep learning models are sensitive to input scale.

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import MinMaxScaler, StandardScaler

np.random.seed(42)
n_samples = 1000
t = np.linspace(0, 4*np.pi, n_samples)
signal = np.sin(t) + 0.5*np.sin(3*t) + 0.1*np.random.randn(n_samples)
signal = signal.reshape(-1, 1)

minmax_scaler = MinMaxScaler(feature_range=(0, 1))
signal_minmax = minmax_scaler.fit_transform(signal)

standard_scaler = StandardScaler()
signal_standard = standard_scaler.fit_transform(signal)

print(f"Original range:  [{signal.min():.3f}, {signal.max():.3f}]")
print(f"MinMax range:    [{signal_minmax.min():.3f}, {signal_minmax.max():.3f}]")
print(f"Standard range:  [{signal_standard.min():.3f}, {signal_standard.max():.3f}]")

3.2 Window Slicing

def create_sequences(data, seq_len, pred_len=1, step=1):
    """
    Create sliding-window sequences.

    Args:
        data:     (N, features) array
        seq_len:  look-back window length
        pred_len: forecast horizon
        step:     window stride

    Returns:
        X: (samples, seq_len, features)
        y: (samples, pred_len, features)
    """
    X, y = [], []
    for i in range(0, len(data) - seq_len - pred_len + 1, step):
        X.append(data[i:i+seq_len])
        y.append(data[i+seq_len:i+seq_len+pred_len])
    return np.array(X), np.array(y)

seq_len = 60
pred_len = 10
X, y = create_sequences(signal_standard, seq_len, pred_len)
print(f"X shape: {X.shape}")  # (samples, 60, 1)
print(f"y shape: {y.shape}")  # (samples, 10, 1)

train_size = int(0.7 * len(X))
val_size   = int(0.15 * len(X))

X_train, y_train = X[:train_size], y[:train_size]
X_val, y_val     = X[train_size:train_size+val_size], y[train_size:train_size+val_size]
X_test, y_test   = X[train_size+val_size:], y[train_size+val_size:]

3.3 PyTorch Dataset and DataLoader

class TimeSeriesDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.FloatTensor(X)
        self.y = torch.FloatTensor(y)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

batch_size = 32
train_loader = DataLoader(TimeSeriesDataset(X_train, y_train), batch_size=batch_size, shuffle=True)
val_loader   = DataLoader(TimeSeriesDataset(X_val, y_val),     batch_size=batch_size, shuffle=False)
test_loader  = DataLoader(TimeSeriesDataset(X_test, y_test),   batch_size=batch_size, shuffle=False)

3.4 Multivariate Time Series

np.random.seed(42)
n = 2000
time = np.arange(n)

temp     = 20 + 10*np.sin(2*np.pi*time/365) + np.random.randn(n)
humidity = 60 + 20*np.cos(2*np.pi*time/365) + np.random.randn(n)
pressure = 1013 + 5*np.sin(2*np.pi*time/180) + np.random.randn(n)

mv_df = pd.DataFrame({'temperature': temp, 'humidity': humidity, 'pressure': pressure})
scaler_multi = StandardScaler()
mv_scaled = scaler_multi.fit_transform(mv_df)

X_mv, y_mv = create_sequences(mv_scaled, seq_len=60, pred_len=10)
print(f"Multivariate X shape: {X_mv.shape}")  # (samples, 60, 3)
print(f"Multivariate y shape: {y_mv.shape}")  # (samples, 10, 3)

4. LSTM Time Series Forecasting

4.1 Why LSTM Fits Time Series

LSTM (Long Short-Term Memory) solves the vanishing gradient problem in vanilla RNNs through three gates (input, forget, output), allowing the model to retain important information over long horizons.

Strengths for time series:

  • Learns sequential patterns end-to-end
  • Captures both short-term and long-term dependencies
  • Handles variable-length sequences naturally

4.2 Complete LSTM Implementation

import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau

class LSTMForecaster(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size,
                 pred_len, dropout=0.2, bidirectional=False):
        super().__init__()

        self.hidden_size  = hidden_size
        self.num_layers   = num_layers
        self.pred_len     = pred_len
        self.num_directions = 2 if bidirectional else 1

        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0,
            bidirectional=bidirectional
        )

        self.layer_norm = nn.LayerNorm(hidden_size * self.num_directions)

        self.fc = nn.Sequential(
            nn.Linear(hidden_size * self.num_directions, 128),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(128, pred_len * output_size)
        )

        self.output_size = output_size

    def forward(self, x):
        batch_size = x.size(0)
        lstm_out, _ = self.lstm(x)
        last = self.layer_norm(lstm_out[:, -1, :])
        out  = self.fc(last)
        return out.view(batch_size, self.pred_len, self.output_size)


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Device: {device}")

model = LSTMForecaster(
    input_size=1, hidden_size=128, num_layers=2,
    output_size=1, pred_len=10, dropout=0.2
).to(device)

print(f"Total parameters: {sum(p.numel() for p in model.parameters()):,}")


def train_epoch(model, loader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    for X_b, y_b in loader:
        X_b, y_b = X_b.to(device), y_b.to(device)
        optimizer.zero_grad()
        pred = model(X_b)
        loss = criterion(pred, y_b)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        total_loss += loss.item() * X_b.size(0)
    return total_loss / len(loader.dataset)


def evaluate(model, loader, criterion, device):
    model.eval()
    total_loss = 0
    preds, actuals = [], []
    with torch.no_grad():
        for X_b, y_b in loader:
            X_b, y_b = X_b.to(device), y_b.to(device)
            pred = model(X_b)
            total_loss += criterion(pred, y_b).item() * X_b.size(0)
            preds.append(pred.cpu().numpy())
            actuals.append(y_b.cpu().numpy())
    return total_loss / len(loader.dataset), np.concatenate(preds), np.concatenate(actuals)


optimizer  = optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-5)
criterion  = nn.MSELoss()
scheduler  = ReduceLROnPlateau(optimizer, mode='min', patience=5, factor=0.5)

best_val_loss = float('inf')
for epoch in range(100):
    train_loss = train_epoch(model, train_loader, optimizer, criterion, device)
    val_loss, _, _ = evaluate(model, val_loader, criterion, device)
    scheduler.step(val_loss)

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), 'best_lstm.pt')

    if (epoch + 1) % 20 == 0:
        print(f"Epoch {epoch+1:3d} | Train: {train_loss:.6f} | Val: {val_loss:.6f}")

4.3 Bidirectional LSTM

Bidirectional LSTM processes sequences in both forward and backward directions. Because it uses future context, it is suited for imputation and classification tasks rather than online forecasting.

bi_model = LSTMForecaster(
    input_size=1, hidden_size=64, num_layers=2,
    output_size=1, pred_len=10, dropout=0.2, bidirectional=True
).to(device)
print(f"BiLSTM parameters: {sum(p.numel() for p in bi_model.parameters()):,}")

5. Temporal Convolutional Network (TCN)

5.1 Dilated and Causal Convolutions

TCN applies convolutional networks to sequences. Compared to LSTMs, TCNs train faster and parallelize easily.

Key concepts:

  • Causal convolution: No look-ahead; only past information is used.
  • Dilated convolution: Gaps between filter taps expand the receptive field exponentially.
  • Receptive field: (kernel_size - 1) x 2^(num_layers-1) x num_layers
class CausalConv1d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, dilation=1):
        super().__init__()
        self.padding = (kernel_size - 1) * dilation
        self.conv = nn.Conv1d(
            in_channels, out_channels, kernel_size,
            padding=self.padding, dilation=dilation
        )

    def forward(self, x):
        out = self.conv(x)
        return out[:, :, :-self.padding] if self.padding > 0 else out


class TCNBlock(nn.Module):
    def __init__(self, in_ch, out_ch, kernel_size, dilation, dropout=0.2):
        super().__init__()
        self.conv1    = CausalConv1d(in_ch, out_ch, kernel_size, dilation)
        self.conv2    = CausalConv1d(out_ch, out_ch, kernel_size, dilation)
        self.norm1    = nn.BatchNorm1d(out_ch)
        self.norm2    = nn.BatchNorm1d(out_ch)
        self.dropout  = nn.Dropout(dropout)
        self.relu     = nn.ReLU()
        self.residual = nn.Conv1d(in_ch, out_ch, 1) if in_ch != out_ch else None

    def forward(self, x):
        res = x if self.residual is None else self.residual(x)
        out = self.dropout(self.relu(self.norm1(self.conv1(x))))
        out = self.dropout(self.relu(self.norm2(self.conv2(out))))
        return self.relu(out + res)


class TCNForecaster(nn.Module):
    def __init__(self, input_size, num_channels, kernel_size, pred_len, dropout=0.2):
        super().__init__()
        layers = []
        for i, out_ch in enumerate(num_channels):
            in_ch = input_size if i == 0 else num_channels[i-1]
            layers.append(TCNBlock(in_ch, out_ch, kernel_size, 2**i, dropout))
        self.network      = nn.Sequential(*layers)
        self.output_layer = nn.Linear(num_channels[-1], pred_len)

    def forward(self, x):
        out = self.network(x.permute(0, 2, 1))
        return self.output_layer(out[:, :, -1]).unsqueeze(-1)


tcn_model = TCNForecaster(1, [64, 128, 128, 64], kernel_size=3, pred_len=10).to(device)
receptive_field = 1 + 2 * (3 - 1) * (2**4 - 1)
print(f"TCN receptive field: {receptive_field}")

6. Transformer-Based Time Series

6.1 PatchTST

PatchTST (2023) divides a time series into overlapping patches and feeds them as tokens to a Transformer Encoder. Channel Independence — processing each variable independently — is a key design choice.

Core ideas:

  1. Split the series into overlapping patches.
  2. Use each patch as a token.
  3. Learn patch-to-patch relationships with a Transformer Encoder.
  4. Channel independence enables efficient scaling.
class PatchEmbedding(nn.Module):
    def __init__(self, seq_len, patch_len, stride, d_model):
        super().__init__()
        self.patch_len   = patch_len
        self.stride      = stride
        self.num_patches = (seq_len - patch_len) // stride + 1
        self.projection  = nn.Linear(patch_len, d_model)
        self.pos_embed   = nn.Parameter(torch.zeros(1, self.num_patches, d_model))

    def forward(self, x):
        # x: (batch, seq_len, 1)
        patches = x.squeeze(-1).unfold(1, self.patch_len, self.stride)
        return self.projection(patches) + self.pos_embed


class PatchTST(nn.Module):
    def __init__(self, seq_len, pred_len, patch_len=16, stride=8,
                 d_model=128, n_heads=8, num_layers=3, dropout=0.1):
        super().__init__()
        self.patch_embed = PatchEmbedding(seq_len, patch_len, stride, d_model)
        num_patches      = self.patch_embed.num_patches

        enc_layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=n_heads,
            dim_feedforward=d_model*4, dropout=dropout, batch_first=True
        )
        self.encoder = nn.TransformerEncoder(enc_layer, num_layers=num_layers)
        self.head    = nn.Linear(num_patches * d_model, pred_len)

    def forward(self, x):
        patches  = self.patch_embed(x)
        encoded  = self.encoder(patches)
        flat     = encoded.flatten(1)
        return self.head(flat).unsqueeze(-1)


patchtst = PatchTST(seq_len=60, pred_len=10, patch_len=12, stride=6).to(device)
print(f"PatchTST parameters: {sum(p.numel() for p in patchtst.parameters()):,}")

6.2 Informer (ProbSparse Attention)

Informer achieves O(L log L) complexity via ProbSparse Attention, making it efficient for long sequences.

class ProbSparseSelfAttention(nn.Module):
    def __init__(self, d_model, n_heads, factor=5):
        super().__init__()
        self.n_heads = n_heads
        self.d_head  = d_model // n_heads
        self.factor  = factor
        self.q_proj  = nn.Linear(d_model, d_model)
        self.k_proj  = nn.Linear(d_model, d_model)
        self.v_proj  = nn.Linear(d_model, d_model)
        self.out     = nn.Linear(d_model, d_model)
        self.scale   = self.d_head ** -0.5

    def forward(self, x):
        B, L, D = x.shape
        Q = self.q_proj(x).view(B, L, self.n_heads, self.d_head).transpose(1, 2)
        K = self.k_proj(x).view(B, L, self.n_heads, self.d_head).transpose(1, 2)
        V = self.v_proj(x).view(B, L, self.n_heads, self.d_head).transpose(1, 2)

        u = max(1, min(int(self.factor * np.log(L)), L))
        scores = torch.matmul(Q[:, :, :u], K.transpose(-2, -1)) * self.scale
        M = scores.max(-1)[0] - torch.div(scores.sum(-1), L)
        top_idx = M.topk(u, dim=-1, sorted=False)[1]

        Q_sparse = Q[torch.arange(B)[:, None, None],
                     torch.arange(self.n_heads)[None, :, None], top_idx]
        attn = torch.softmax(torch.matmul(Q_sparse, K.transpose(-2, -1)) * self.scale, dim=-1)

        context = V.mean(2, keepdim=True).expand(-1, -1, L, -1).clone()
        context[torch.arange(B)[:, None, None],
                torch.arange(self.n_heads)[None, :, None], top_idx] = torch.matmul(attn, V)

        context = context.transpose(1, 2).contiguous().view(B, L, D)
        return self.out(context)

7. N-BEATS and N-HiTS

7.1 N-BEATS

N-BEATS uses only fully-connected (feed-forward) layers with a backward-residual architecture: each block predicts both a backcast (removing its contribution from the input) and a forecast (added to the global forecast accumulator).

class TrendBasis(nn.Module):
    def __init__(self, degree, backcast_size, forecast_size):
        super().__init__()
        self.degree = degree

        bt = torch.linspace(0, 1, backcast_size)
        ft = torch.linspace(1, 2, forecast_size)

        bb = torch.stack([bt**i for i in range(degree + 1)], dim=1)
        fb = torch.stack([ft**i for i in range(degree + 1)], dim=1)

        self.register_buffer('backcast_basis', bb)
        self.register_buffer('forecast_basis', fb)

    def forward(self, theta, cast_type):
        basis = self.backcast_basis if cast_type == 'backcast' else self.forecast_basis
        return torch.matmul(theta, basis.T)


class NBeatsBlock(nn.Module):
    def __init__(self, input_size, theta_size, basis,
                 hidden_size=256, num_layers=4):
        super().__init__()
        self.basis = basis

        fc = []
        in_size = input_size
        for _ in range(num_layers):
            fc += [nn.Linear(in_size, hidden_size), nn.ReLU()]
            in_size = hidden_size
        self.fc      = nn.Sequential(*fc)
        self.theta_b = nn.Linear(hidden_size, theta_size)
        self.theta_f = nn.Linear(hidden_size, theta_size)

    def forward(self, x):
        h  = self.fc(x)
        tb = self.theta_b(h)
        tf = self.theta_f(h)
        return self.basis(tb, 'backcast'), self.basis(tf, 'forecast')


class NBeats(nn.Module):
    def __init__(self, backcast_size, forecast_size,
                 hidden_size=256, num_blocks=3, trend_degree=3):
        super().__init__()
        trend_basis  = TrendBasis(trend_degree, backcast_size, forecast_size)
        self.blocks  = nn.ModuleList([
            NBeatsBlock(backcast_size, trend_degree + 1, trend_basis, hidden_size)
            for _ in range(num_blocks)
        ])
        self.generic = nn.ModuleList([
            nn.Sequential(
                nn.Linear(backcast_size, hidden_size), nn.ReLU(),
                nn.Linear(hidden_size, hidden_size), nn.ReLU(),
                nn.Linear(hidden_size, forecast_size)
            ) for _ in range(num_blocks)
        ])
        self.forecast_size = forecast_size

    def forward(self, x):
        residual = x
        forecast = torch.zeros(x.size(0), self.forecast_size, device=x.device)
        for i, block in enumerate(self.blocks):
            backcast, f = block(residual)
            residual    = residual - backcast
            forecast    = forecast + f
        # Generic blocks for remaining residuals
        for g in self.generic:
            forecast = forecast + g(residual)
        return forecast

8. Time Series Foundation Models

8.1 TimesFM (Google DeepMind)

TimesFM is a large-scale foundation model developed by Google DeepMind, pre-trained on diverse time series corpora enabling zero-shot forecasting across domains.

def demo_timesfm():
    """
    Conceptual TimesFM usage.
    Install with: pip install timesfm
    Then load the model from HuggingFace: google/timesfm-1.0-200m
    """
    np.random.seed(42)
    n = 512
    t = np.arange(n)
    series = (
        10 + 0.1*t
        + 5*np.sin(2*np.pi*t/52)
        + 2*np.sin(2*np.pi*t/7)
        + np.random.randn(n)
    )

    usage_note = """
    import timesfm

    tfm = timesfm.TimesFm(
        context_len=512, horizon_len=96,
        input_patch_len=32, output_patch_len=128,
        num_layers=20, model_dims=1280,
    )
    tfm.load_from_checkpoint(repo_id="google/timesfm-1.0-200m")

    point_forecast, quantile_forecast = tfm.forecast(
        [series],
        freq=[0],   # 0=high-freq, 1=medium-freq, 2=low-freq
    )
    # point_forecast.shape => (1, 96)
    """
    print("TimesFM: Google DeepMind's time series foundation model")
    print("  - 200M parameter decoder-only architecture")
    print("  - Zero-shot forecasting on unseen domains")
    print("  - Patch-based input (patch_len=32)")
    return series

demo_timesfm()

8.2 Chronos (Amazon)

Amazon's Chronos applies T5 language model architecture to time series by tokenizing numerical values, treating forecasting as a language modeling problem.

def demo_chronos():
    """
    Conceptual Chronos usage.
    Install: pip install git+https://github.com/amazon-science/chronos-forecasting.git
    """
    usage_note = """
    from chronos import ChronosPipeline
    import torch

    pipeline = ChronosPipeline.from_pretrained(
        "amazon/chronos-t5-small",
        device_map="cpu",
        torch_dtype=torch.bfloat16,
    )

    context = torch.tensor(series[-512:]).unsqueeze(0)
    forecast = pipeline.predict(context=context, prediction_length=24, num_samples=20)

    low, median, high = np.quantile(forecast[0].numpy(), [0.1, 0.5, 0.9], axis=0)
    """
    print("Chronos: Amazon's T5-based time series foundation model")
    print("  - Sizes: tiny, small, base, large (710M)")
    print("  - Tokenizes numerical values (quantile binning)")
    print("  - Probabilistic forecasts via multiple samples")

demo_chronos()

8.3 TimeGPT (Nixtla)

def demo_timegpt():
    """
    Conceptual TimeGPT usage.
    Install: pip install nixtla
    """
    usage_note = """
    from nixtla import NixtlaClient

    client = NixtlaClient(api_key='YOUR_KEY')

    forecast_df = client.forecast(
        df=df,          # columns: 'ds', 'y'
        h=24,
        freq='H',
        time_col='ds',
        target_col='y',
    )

    cv_df = client.cross_validation(df=df, h=24, n_windows=3, freq='H')
    """
    print("TimeGPT: Nixtla's time series foundation model (API service)")
    print("  - Anomaly detection support")
    print("  - Uncertainty quantile forecasts")
    print("  - Fine-tuning on proprietary data")

demo_timegpt()

9. Anomaly Detection

9.1 LSTM Autoencoder for Anomaly Detection

class LSTMAutoencoder(nn.Module):
    def __init__(self, seq_len, input_size, hidden_size, num_layers=1):
        super().__init__()
        self.seq_len     = seq_len
        self.hidden_size = hidden_size

        self.encoder = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.decoder = nn.LSTM(hidden_size, hidden_size, num_layers, batch_first=True)
        self.output  = nn.Linear(hidden_size, input_size)

    def forward(self, x):
        _, (h_n, c_n) = self.encoder(x)
        dec_in        = h_n[-1].unsqueeze(1).repeat(1, self.seq_len, 1)
        dec_out, _    = self.decoder(dec_in)
        return self.output(dec_out)


def detect_anomalies(model, data_list, threshold_pct=95, device='cpu'):
    model.eval()
    errors = []
    with torch.no_grad():
        for sample in data_list:
            x    = torch.FloatTensor(sample).unsqueeze(0).to(device)
            recon = model(x)
            errors.append(nn.MSELoss()(recon, x).item())

    errors    = np.array(errors)
    threshold = np.percentile(errors, threshold_pct)
    return errors, threshold, errors > threshold


# Generate data with injected anomalies
np.random.seed(42)
n = 1000
normal = np.sin(np.linspace(0, 8*np.pi, n)) + 0.1*np.random.randn(n)
anomaly_data          = normal.copy()
anomaly_data[300:310] += 3.0   # spike
anomaly_data[600:605]  = 0.0   # signal loss

# Isolation Forest
from sklearn.ensemble import IsolationForest

iso = IsolationForest(contamination=0.05, random_state=42)
predictions  = iso.fit_predict(anomaly_data.reshape(-1, 1))
iso_anomalies = predictions == -1

print(f"Isolation Forest detections: {iso_anomalies.sum()}")
print(f"True anomaly windows: 300-310 (10 pts), 600-605 (5 pts)")

plt.figure(figsize=(14, 4))
plt.plot(anomaly_data, alpha=0.7, label='Data')
plt.scatter(np.where(iso_anomalies)[0], anomaly_data[iso_anomalies],
            color='red', s=30, label='Detected anomalies', zorder=5)
plt.title('Anomaly Detection (Isolation Forest)')
plt.legend()
plt.show()

10. Real-World Project: Darts Library

10.1 Unified Forecasting Pipeline with Darts

Darts provides a unified interface for classical and deep learning time series models.

def demo_darts():
    """
    Darts usage example.
    Install: pip install darts
    """
    usage_note = """
    from darts import TimeSeries
    from darts.models import NBEATSModel, TFTModel, TCNModel
    from darts.metrics import mape, rmse
    from darts.dataprocessing.transformers import Scaler
    from darts.datasets import AirPassengersDataset

    series = AirPassengersDataset().load()
    train, test = series[:-24], series[-24:]

    scaler        = Scaler()
    train_scaled  = scaler.fit_transform(train)
    test_scaled   = scaler.transform(test)

    nbeats = NBEATSModel(
        input_chunk_length=36,
        output_chunk_length=12,
        n_epochs=100, random_state=42
    )
    nbeats.fit(train_scaled)
    forecast     = scaler.inverse_transform(nbeats.predict(24))

    print(f"MAPE: {mape(test, forecast):.2f}%")
    print(f"RMSE: {rmse(test, forecast):.4f}")

    # Temporal Fusion Transformer (supports covariates)
    tft = TFTModel(
        input_chunk_length=36, output_chunk_length=12,
        hidden_size=64, lstm_layers=1, num_attention_heads=4,
        n_epochs=100, random_state=42
    )
    """
    print("Darts library: unified time series forecasting")
    print("  - N-BEATS, TFT, TCN, Transformer, NATS, ...")
    print("  - Consistent fit/predict API across all models")

demo_darts()

10.2 Energy Demand Forecasting Pipeline

def create_energy_pipeline():
    """Full energy demand forecasting pipeline (simulated data)."""
    np.random.seed(42)
    n_hours = 24 * 365
    hours   = np.arange(n_hours)

    base     = 5000
    daily    = 500*np.sin(2*np.pi*(hours % 24)/24 - np.pi/2) + 300*np.sin(4*np.pi*(hours % 24)/24)
    weekly   = 200*np.cos(2*np.pi*(hours // 24 % 7)/7)
    seasonal = 1000*np.sin(2*np.pi*hours/n_hours - np.pi/2)
    noise    = 100*np.random.randn(n_hours)
    demand   = np.maximum(base + daily + weekly + seasonal + noise, 1000)

    temperature = (
        20 + 10*np.sin(2*np.pi*hours/n_hours - np.pi/2)
        + 5*np.sin(2*np.pi*(hours % 24)/24)
        + 1.5*np.random.randn(n_hours)
    )

    df = pd.DataFrame({
        'datetime':    pd.date_range('2023-01-01', periods=n_hours, freq='h'),
        'demand':      demand,
        'temperature': temperature,
        'hour':        hours % 24,
        'dow':         (hours // 24) % 7,
        'month':       pd.date_range('2023-01-01', periods=n_hours, freq='h').month
    }).set_index('datetime')

    df['lag_1']   = df['demand'].shift(1)
    df['lag_24']  = df['demand'].shift(24)
    df['lag_168'] = df['demand'].shift(168)
    df['roll_24'] = df['demand'].rolling(24).mean()
    df.dropna(inplace=True)

    features = ['demand', 'temperature', 'hour', 'dow', 'month',
                'lag_1', 'lag_24', 'lag_168', 'roll_24']

    scaler = StandardScaler()
    scaled = scaler.fit_transform(df[features])

    X, y = create_sequences(scaled, seq_len=168, pred_len=24)
    y    = y[:, :, :1]  # target = demand only

    print(f"Input  shape: {X.shape}")
    print(f"Target shape: {y.shape}")
    return df, scaled, X, y, scaler

energy_df, energy_scaled, X_e, y_e, e_scaler = create_energy_pipeline()

10.3 Model Benchmark Summary

benchmark = pd.DataFrame({
    'Model':            ['ARIMA', 'Prophet', 'LSTM', 'TCN', 'PatchTST', 'TimesFM (zero-shot)'],
    'RMSE':             [0.312,   0.289,     0.198,  0.185,  0.162,      0.215],
    'MAE':              [0.241,   0.218,     0.152,  0.141,  0.121,      0.163],
    'Train Time (min)': [1.2,     2.1,       15.3,   8.7,    12.4,       0.0],
})
print(benchmark.to_string(index=False))

Closing Thoughts

This guide has walked through the full spectrum of time series analysis.

Learning Roadmap Recap:

  1. Foundations: Stationarity, ACF/PACF, decomposition
  2. Classical methods: ARIMA, SARIMA, Prophet — always establish a baseline first
  3. Deep learning basics: LSTM, TCN for nonlinear patterns
  4. Advanced architectures: PatchTST, N-BEATS — current best open-source models
  5. Foundation models: TimesFM, Chronos for zero-shot forecasting

Practical tips:

  • Always build a baseline with a simple model (ARIMA, Prophet) before going deep.
  • Deep learning shines when you have 1000+ data points.
  • PatchTST and N-BEATS are currently the strongest open-source options.
  • Foundation models excel when domain-specific data is scarce.

References: