- Authors

- Name
- Youngju Kim
- @fjvbn20031
はじめに
時系列データはあらゆる場所に存在します。株価、気温、エネルギー需要、交通量、医療信号など、枚挙に暇がありません。深層学習の近年の進歩により、時系列予測分野は急速に発展し、LSTMからTransformer、そしてTimesFMのような基盤モデルまで、多彩なツールが登場しています。
本ガイドでは、時系列分析の基礎から最新の基盤モデルまで、段階を追って解説します。各セクションには実行可能なPythonコードが含まれています。
1. 時系列データの基礎
1.1 定義と特性
時系列とは、時系列順にインデックス付けされたデータ点の系列です。通常のデータとの最大の違いは時間的依存性です。現在の値は過去の値に依存します。
主な特性:
- 順序依存性: データ点の時系列順序が重要
- 自己相関: 過去の値が将来の値を予測する情報を持つ
- 季節性: 一定間隔で繰り返すパターン
- トレンド: 長期的な方向性
- 非定常性: 統計的性質が時間とともに変化する
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.seasonal import seasonal_decompose
# 合成時系列データの生成
np.random.seed(42)
dates = pd.date_range(start='2020-01-01', periods=365*3, freq='D')
trend = np.linspace(10, 50, len(dates))
seasonality = 10 * np.sin(2 * np.pi * np.arange(len(dates)) / 365)
noise = np.random.normal(0, 2, len(dates))
series = trend + seasonality + noise
ts = pd.Series(series, index=dates, name='value')
# 時系列の分解
decomp = seasonal_decompose(ts, model='additive', period=365)
fig, axes = plt.subplots(4, 1, figsize=(12, 10))
decomp.observed.plot(ax=axes[0], title='観測値')
decomp.trend.plot(ax=axes[1], title='トレンド')
decomp.seasonal.plot(ax=axes[2], title='季節性')
decomp.resid.plot(ax=axes[3], title='残差')
plt.tight_layout()
plt.show()
1.2 トレンド・季節性・残差
時系列分解により系列を3つの成分に分離します。
加法モデル: Y(t) = Trend(t) + Seasonal(t) + Residual(t)
乗法モデル: Y(t) = Trend(t) x Seasonal(t) x Residual(t)
季節変動がトレンド水準に比例して拡大する場合は乗法モデルを使用し、そうでない場合は加法モデルを使用します。
1.3 定常性とADFテスト
定常時系列とは、時間を通じて平均・分散・自己共分散が一定の系列です。ほとんどの古典的統計モデルは定常性を前提とします。
ADF(拡張ディッキー・フラー)検定は単位根の存在を検証します。
- 帰無仮説: 単位根が存在する(非定常)
- p値 < 0.05 → 帰無仮説を棄却 → 定常系列
from statsmodels.tsa.stattools import adfuller
def check_stationarity(series, name='series'):
"""ADFテストで定常性を確認"""
result = adfuller(series.dropna())
print(f"\n{'='*50}")
print(f"系列: {name}")
print(f"{'='*50}")
print(f"ADF統計量: {result[0]:.4f}")
print(f"p値: {result[1]:.4f}")
print("臨界値:")
for key, val in result[4].items():
print(f" {key}: {val:.4f}")
if result[1] < 0.05:
print("結論: 定常(帰無仮説を棄却)")
else:
print("結論: 非定常(帰無仮説を棄却できず)")
return result[1] < 0.05
# 非定常な元の系列
check_stationarity(ts, '元の系列')
# 1階差分で定常化
diff_series = ts.diff().dropna()
check_stationarity(diff_series, '1階差分系列')
1.4 自己相関・ACF・PACF
ACF(自己相関関数): 系列とそのラグ値の相関。 PACF(偏自己相関関数): 中間ラグの影響を除いた各ラグでの直接相関。
ACFとPACFのプロットは、ARIMAモデルの(p, q)次数の選択を導きます。
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
fig, axes = plt.subplots(2, 1, figsize=(12, 8))
plot_acf(diff_series, lags=40, ax=axes[0], title='ACF(自己相関関数)')
plot_pacf(diff_series, lags=40, ax=axes[1], title='PACF(偏自己相関関数)')
plt.tight_layout()
plt.show()
# 解釈ガイド:
# AR(p): PACFがラグpで切断、ACFは徐々に減衰
# MA(q): ACFがラグqで切断、PACFは徐々に減衰
# ARMA(p,q): 両方が徐々に減衰
2. 古典的時系列モデル
2.1 AR・MA・ARMA・ARIMA
AR(p) — 自己回帰モデル: 現在値は過去p値の線形結合。
MA(q) — 移動平均モデル: 現在値は過去q個の誤差項の線形結合。
ARMA(p,q): ARとMAを組み合わせたモデル。
ARIMA(p,d,q): d回差分を取って定常化した後、ARMAを適用。
from statsmodels.tsa.arima.model import ARIMA
from sklearn.metrics import mean_squared_error
import warnings
warnings.filterwarnings('ignore')
# CO2データセット
from statsmodels.datasets import co2
data = co2.load_pandas().data
data = data.resample('MS').mean().fillna(method='ffill')
# 学習/テスト分割
train = data.iloc[:-24]
test = data.iloc[-24:]
# ARIMA適合(ACF/PACF分析から次数を選択)
model = ARIMA(train, order=(2, 1, 2))
result = model.fit()
print(result.summary())
# 予測
forecast = result.forecast(steps=24)
rmse = np.sqrt(mean_squared_error(test['co2'], forecast))
print(f"\nRMSE: {rmse:.4f}")
# プロット
plt.figure(figsize=(12, 5))
plt.plot(train.index[-60:], train['co2'].iloc[-60:], label='学習データ')
plt.plot(test.index, test['co2'], label='実際の値', color='green')
plt.plot(test.index, forecast, label='ARIMA予測', color='red', linestyle='--')
plt.legend()
plt.title('ARIMA予測')
plt.show()
2.2 SARIMA
SARIMA(p, d, q)(P, D, Q, s)はARIMAに季節パラメータを加えたモデルです。sは季節周期です。
from statsmodels.tsa.statespace.sarimax import SARIMAX
sarima_model = SARIMAX(
train,
order=(1, 1, 1),
seasonal_order=(1, 1, 1, 12),
enforce_stationarity=False,
enforce_invertibility=False
)
sarima_result = sarima_model.fit(disp=False)
sarima_forecast = sarima_result.forecast(steps=24)
sarima_rmse = np.sqrt(mean_squared_error(test['co2'], sarima_forecast))
print(f"SARIMA RMSE: {sarima_rmse:.4f}")
2.3 Prophet(Meta/Facebook)
Prophetはビジネス時系列向けライブラリで、祝日や複数の季節性を自動的に処理します。
from prophet import Prophet
prophet_df = data.reset_index()
prophet_df.columns = ['ds', 'y']
prophet_train = prophet_df.iloc[:-24]
model_p = Prophet(
yearly_seasonality=True,
weekly_seasonality=False,
daily_seasonality=False,
changepoint_prior_scale=0.05
)
model_p.fit(prophet_train)
future = model_p.make_future_dataframe(periods=24, freq='MS')
forecast_p = model_p.predict(future)
prophet_pred = forecast_p.iloc[-24:]['yhat'].values
prophet_actual = prophet_df.iloc[-24:]['y'].values
prophet_rmse = np.sqrt(mean_squared_error(prophet_actual, prophet_pred))
print(f"Prophet RMSE: {prophet_rmse:.4f}")
3. 時系列の深層学習前処理
3.1 正規化
深層学習モデルは入力スケールに敏感です。
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import MinMaxScaler, StandardScaler
np.random.seed(42)
n_samples = 1000
t = np.linspace(0, 4*np.pi, n_samples)
signal = np.sin(t) + 0.5*np.sin(3*t) + 0.1*np.random.randn(n_samples)
signal = signal.reshape(-1, 1)
minmax_scaler = MinMaxScaler(feature_range=(0, 1))
signal_minmax = minmax_scaler.fit_transform(signal)
standard_scaler = StandardScaler()
signal_standard = standard_scaler.fit_transform(signal)
print(f"元のレンジ: [{signal.min():.3f}, {signal.max():.3f}]")
print(f"MinMaxレンジ: [{signal_minmax.min():.3f}, {signal_minmax.max():.3f}]")
print(f"Standardレンジ: [{signal_standard.min():.3f}, {signal_standard.max():.3f}]")
3.2 ウィンドウスライシング
def create_sequences(data, seq_len, pred_len=1, step=1):
"""
スライディングウィンドウシーケンスを作成。
Args:
data: (N, features) 配列
seq_len: ルックバックウィンドウ長
pred_len: 予測ホライゾン
step: ウィンドウのストライド
Returns:
X: (samples, seq_len, features)
y: (samples, pred_len, features)
"""
X, y = [], []
for i in range(0, len(data) - seq_len - pred_len + 1, step):
X.append(data[i:i+seq_len])
y.append(data[i+seq_len:i+seq_len+pred_len])
return np.array(X), np.array(y)
seq_len = 60
pred_len = 10
X, y = create_sequences(signal_standard, seq_len, pred_len)
print(f"X shape: {X.shape}") # (samples, 60, 1)
print(f"y shape: {y.shape}") # (samples, 10, 1)
train_size = int(0.7 * len(X))
val_size = int(0.15 * len(X))
X_train, y_train = X[:train_size], y[:train_size]
X_val, y_val = X[train_size:train_size+val_size], y[train_size:train_size+val_size]
X_test, y_test = X[train_size+val_size:], y[train_size+val_size:]
3.3 PyTorch DatasetとDataLoader
class TimeSeriesDataset(Dataset):
def __init__(self, X, y):
self.X = torch.FloatTensor(X)
self.y = torch.FloatTensor(y)
def __len__(self):
return len(self.X)
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
batch_size = 32
train_loader = DataLoader(TimeSeriesDataset(X_train, y_train), batch_size=batch_size, shuffle=True)
val_loader = DataLoader(TimeSeriesDataset(X_val, y_val), batch_size=batch_size, shuffle=False)
test_loader = DataLoader(TimeSeriesDataset(X_test, y_test), batch_size=batch_size, shuffle=False)
3.4 多変量時系列
np.random.seed(42)
n = 2000
time = np.arange(n)
temp = 20 + 10*np.sin(2*np.pi*time/365) + np.random.randn(n)
humidity = 60 + 20*np.cos(2*np.pi*time/365) + np.random.randn(n)
pressure = 1013 + 5*np.sin(2*np.pi*time/180) + np.random.randn(n)
mv_df = pd.DataFrame({'temperature': temp, 'humidity': humidity, 'pressure': pressure})
scaler_multi = StandardScaler()
mv_scaled = scaler_multi.fit_transform(mv_df)
X_mv, y_mv = create_sequences(mv_scaled, seq_len=60, pred_len=10)
print(f"多変量 X shape: {X_mv.shape}") # (samples, 60, 3)
print(f"多変量 y shape: {y_mv.shape}") # (samples, 10, 3)
4. LSTMによる時系列予測
4.1 LSTMが時系列に適している理由
LSTM(Long Short-Term Memory)は3つのゲート(入力・忘却・出力)を通じて、通常のRNNにおける勾配消失問題を解決し、長い時間ホライゾンにわたって重要な情報を保持できます。
時系列に対する強み:
- 逐次的なパターンをエンドツーエンドで学習
- 短期・長期の依存関係を両方捉える
- 可変長シーケンスを自然に扱える
4.2 LSTMの完全な実装
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
class LSTMForecaster(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, output_size,
pred_len, dropout=0.2, bidirectional=False):
super().__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.pred_len = pred_len
self.num_directions = 2 if bidirectional else 1
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0,
bidirectional=bidirectional
)
self.layer_norm = nn.LayerNorm(hidden_size * self.num_directions)
self.fc = nn.Sequential(
nn.Linear(hidden_size * self.num_directions, 128),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(128, pred_len * output_size)
)
self.output_size = output_size
def forward(self, x):
batch_size = x.size(0)
lstm_out, _ = self.lstm(x)
last = self.layer_norm(lstm_out[:, -1, :])
out = self.fc(last)
return out.view(batch_size, self.pred_len, self.output_size)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"デバイス: {device}")
model = LSTMForecaster(
input_size=1, hidden_size=128, num_layers=2,
output_size=1, pred_len=10, dropout=0.2
).to(device)
print(f"総パラメータ数: {sum(p.numel() for p in model.parameters()):,}")
def train_epoch(model, loader, optimizer, criterion, device):
model.train()
total_loss = 0
for X_b, y_b in loader:
X_b, y_b = X_b.to(device), y_b.to(device)
optimizer.zero_grad()
pred = model(X_b)
loss = criterion(pred, y_b)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
total_loss += loss.item() * X_b.size(0)
return total_loss / len(loader.dataset)
def evaluate(model, loader, criterion, device):
model.eval()
total_loss = 0
preds, actuals = [], []
with torch.no_grad():
for X_b, y_b in loader:
X_b, y_b = X_b.to(device), y_b.to(device)
pred = model(X_b)
total_loss += criterion(pred, y_b).item() * X_b.size(0)
preds.append(pred.cpu().numpy())
actuals.append(y_b.cpu().numpy())
return total_loss / len(loader.dataset), np.concatenate(preds), np.concatenate(actuals)
optimizer = optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-5)
criterion = nn.MSELoss()
scheduler = ReduceLROnPlateau(optimizer, mode='min', patience=5, factor=0.5)
best_val_loss = float('inf')
for epoch in range(100):
train_loss = train_epoch(model, train_loader, optimizer, criterion, device)
val_loss, _, _ = evaluate(model, val_loader, criterion, device)
scheduler.step(val_loss)
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), 'best_lstm.pt')
if (epoch + 1) % 20 == 0:
print(f"Epoch {epoch+1:3d} | Train: {train_loss:.6f} | Val: {val_loss:.6f}")
4.3 双方向LSTM
双方向LSTMは順方向と逆方向の両方向でシーケンスを処理します。将来のコンテキストを使用するため、オンライン予測よりも補完や分類タスクに適しています。
bi_model = LSTMForecaster(
input_size=1, hidden_size=64, num_layers=2,
output_size=1, pred_len=10, dropout=0.2, bidirectional=True
).to(device)
print(f"BiLSTMパラメータ数: {sum(p.numel() for p in bi_model.parameters()):,}")
5. Temporal Convolutional Network(TCN)
5.1 拡張因果畳み込み
TCNはシーケンスに畳み込みネットワークを適用します。LSTMと比べて学習が速く、並列化も容易です。
主要概念:
- 因果畳み込み: 先読みなし。過去の情報のみを使用。
- 拡張畳み込み: フィルタのタップ間のギャップが受容野を指数関数的に拡大。
- 受容野: (kernel_size - 1) x 2^(num_layers-1) x num_layers
class CausalConv1d(nn.Module):
def __init__(self, in_channels, out_channels, kernel_size, dilation=1):
super().__init__()
self.padding = (kernel_size - 1) * dilation
self.conv = nn.Conv1d(
in_channels, out_channels, kernel_size,
padding=self.padding, dilation=dilation
)
def forward(self, x):
out = self.conv(x)
return out[:, :, :-self.padding] if self.padding > 0 else out
class TCNBlock(nn.Module):
def __init__(self, in_ch, out_ch, kernel_size, dilation, dropout=0.2):
super().__init__()
self.conv1 = CausalConv1d(in_ch, out_ch, kernel_size, dilation)
self.conv2 = CausalConv1d(out_ch, out_ch, kernel_size, dilation)
self.norm1 = nn.BatchNorm1d(out_ch)
self.norm2 = nn.BatchNorm1d(out_ch)
self.dropout = nn.Dropout(dropout)
self.relu = nn.ReLU()
self.residual = nn.Conv1d(in_ch, out_ch, 1) if in_ch != out_ch else None
def forward(self, x):
res = x if self.residual is None else self.residual(x)
out = self.dropout(self.relu(self.norm1(self.conv1(x))))
out = self.dropout(self.relu(self.norm2(self.conv2(out))))
return self.relu(out + res)
class TCNForecaster(nn.Module):
def __init__(self, input_size, num_channels, kernel_size, pred_len, dropout=0.2):
super().__init__()
layers = []
for i, out_ch in enumerate(num_channels):
in_ch = input_size if i == 0 else num_channels[i-1]
layers.append(TCNBlock(in_ch, out_ch, kernel_size, 2**i, dropout))
self.network = nn.Sequential(*layers)
self.output_layer = nn.Linear(num_channels[-1], pred_len)
def forward(self, x):
out = self.network(x.permute(0, 2, 1))
return self.output_layer(out[:, :, -1]).unsqueeze(-1)
tcn_model = TCNForecaster(1, [64, 128, 128, 64], kernel_size=3, pred_len=10).to(device)
receptive_field = 1 + 2 * (3 - 1) * (2**4 - 1)
print(f"TCN受容野: {receptive_field}")
6. Transformerベースの時系列
6.1 PatchTST
PatchTST(2023)は時系列を重複パッチに分割し、Transformer Encoderへトークンとして入力します。チャネル独立性(各変数を独立して処理すること)が重要な設計選択です。
核心アイデア:
- 系列を重複パッチに分割。
- 各パッチをトークンとして使用。
- Transformer Encoderでパッチ間の関係を学習。
- チャネル独立性により効率的なスケーリングが可能。
class PatchEmbedding(nn.Module):
def __init__(self, seq_len, patch_len, stride, d_model):
super().__init__()
self.patch_len = patch_len
self.stride = stride
self.num_patches = (seq_len - patch_len) // stride + 1
self.projection = nn.Linear(patch_len, d_model)
self.pos_embed = nn.Parameter(torch.zeros(1, self.num_patches, d_model))
def forward(self, x):
# x: (batch, seq_len, 1)
patches = x.squeeze(-1).unfold(1, self.patch_len, self.stride)
return self.projection(patches) + self.pos_embed
class PatchTST(nn.Module):
def __init__(self, seq_len, pred_len, patch_len=16, stride=8,
d_model=128, n_heads=8, num_layers=3, dropout=0.1):
super().__init__()
self.patch_embed = PatchEmbedding(seq_len, patch_len, stride, d_model)
num_patches = self.patch_embed.num_patches
enc_layer = nn.TransformerEncoderLayer(
d_model=d_model, nhead=n_heads,
dim_feedforward=d_model*4, dropout=dropout, batch_first=True
)
self.encoder = nn.TransformerEncoder(enc_layer, num_layers=num_layers)
self.head = nn.Linear(num_patches * d_model, pred_len)
def forward(self, x):
patches = self.patch_embed(x)
encoded = self.encoder(patches)
flat = encoded.flatten(1)
return self.head(flat).unsqueeze(-1)
patchtst = PatchTST(seq_len=60, pred_len=10, patch_len=12, stride=6).to(device)
print(f"PatchTSTパラメータ数: {sum(p.numel() for p in patchtst.parameters()):,}")
6.2 Informer(ProbSparse Attention)
InformerはProbSparse AttentionによりO(L log L)の計算量を実現し、長いシーケンスに対して効率的です。
class ProbSparseSelfAttention(nn.Module):
def __init__(self, d_model, n_heads, factor=5):
super().__init__()
self.n_heads = n_heads
self.d_head = d_model // n_heads
self.factor = factor
self.q_proj = nn.Linear(d_model, d_model)
self.k_proj = nn.Linear(d_model, d_model)
self.v_proj = nn.Linear(d_model, d_model)
self.out = nn.Linear(d_model, d_model)
self.scale = self.d_head ** -0.5
def forward(self, x):
B, L, D = x.shape
Q = self.q_proj(x).view(B, L, self.n_heads, self.d_head).transpose(1, 2)
K = self.k_proj(x).view(B, L, self.n_heads, self.d_head).transpose(1, 2)
V = self.v_proj(x).view(B, L, self.n_heads, self.d_head).transpose(1, 2)
u = max(1, min(int(self.factor * np.log(L)), L))
scores = torch.matmul(Q[:, :, :u], K.transpose(-2, -1)) * self.scale
M = scores.max(-1)[0] - torch.div(scores.sum(-1), L)
top_idx = M.topk(u, dim=-1, sorted=False)[1]
Q_sparse = Q[torch.arange(B)[:, None, None],
torch.arange(self.n_heads)[None, :, None], top_idx]
attn = torch.softmax(torch.matmul(Q_sparse, K.transpose(-2, -1)) * self.scale, dim=-1)
context = V.mean(2, keepdim=True).expand(-1, -1, L, -1).clone()
context[torch.arange(B)[:, None, None],
torch.arange(self.n_heads)[None, :, None], top_idx] = torch.matmul(attn, V)
context = context.transpose(1, 2).contiguous().view(B, L, D)
return self.out(context)
7. N-BEATSとN-HiTS
7.1 N-BEATS
N-BEATSは完全結合(フィードフォワード)層のみを使用した逆残差アーキテクチャです。各ブロックはバックキャスト(入力から自分の寄与を除去)と予測(グローバル予測アキュムレータに加算)の両方を予測します。
class TrendBasis(nn.Module):
def __init__(self, degree, backcast_size, forecast_size):
super().__init__()
self.degree = degree
bt = torch.linspace(0, 1, backcast_size)
ft = torch.linspace(1, 2, forecast_size)
bb = torch.stack([bt**i for i in range(degree + 1)], dim=1)
fb = torch.stack([ft**i for i in range(degree + 1)], dim=1)
self.register_buffer('backcast_basis', bb)
self.register_buffer('forecast_basis', fb)
def forward(self, theta, cast_type):
basis = self.backcast_basis if cast_type == 'backcast' else self.forecast_basis
return torch.matmul(theta, basis.T)
class NBeatsBlock(nn.Module):
def __init__(self, input_size, theta_size, basis,
hidden_size=256, num_layers=4):
super().__init__()
self.basis = basis
fc = []
in_size = input_size
for _ in range(num_layers):
fc += [nn.Linear(in_size, hidden_size), nn.ReLU()]
in_size = hidden_size
self.fc = nn.Sequential(*fc)
self.theta_b = nn.Linear(hidden_size, theta_size)
self.theta_f = nn.Linear(hidden_size, theta_size)
def forward(self, x):
h = self.fc(x)
tb = self.theta_b(h)
tf = self.theta_f(h)
return self.basis(tb, 'backcast'), self.basis(tf, 'forecast')
class NBeats(nn.Module):
def __init__(self, backcast_size, forecast_size,
hidden_size=256, num_blocks=3, trend_degree=3):
super().__init__()
trend_basis = TrendBasis(trend_degree, backcast_size, forecast_size)
self.blocks = nn.ModuleList([
NBeatsBlock(backcast_size, trend_degree + 1, trend_basis, hidden_size)
for _ in range(num_blocks)
])
self.generic = nn.ModuleList([
nn.Sequential(
nn.Linear(backcast_size, hidden_size), nn.ReLU(),
nn.Linear(hidden_size, hidden_size), nn.ReLU(),
nn.Linear(hidden_size, forecast_size)
) for _ in range(num_blocks)
])
self.forecast_size = forecast_size
def forward(self, x):
residual = x
forecast = torch.zeros(x.size(0), self.forecast_size, device=x.device)
for i, block in enumerate(self.blocks):
backcast, f = block(residual)
residual = residual - backcast
forecast = forecast + f
# 残差に対するジェネリックブロック
for g in self.generic:
forecast = forecast + g(residual)
return forecast
8. 時系列基盤モデル
8.1 TimesFM(Google DeepMind)
TimesFMはGoogle DeepMindが開発した大規模基盤モデルで、多様な時系列コーパスで事前学習されており、ドメインをまたいだゼロショット予測が可能です。
def demo_timesfm():
"""
TimesFMの使用例(概念的)。
インストール: pip install timesfm
HuggingFaceからモデルを読み込む: google/timesfm-1.0-200m
"""
np.random.seed(42)
n = 512
t = np.arange(n)
series = (
10 + 0.1*t
+ 5*np.sin(2*np.pi*t/52)
+ 2*np.sin(2*np.pi*t/7)
+ np.random.randn(n)
)
usage_note = """
import timesfm
tfm = timesfm.TimesFm(
context_len=512, horizon_len=96,
input_patch_len=32, output_patch_len=128,
num_layers=20, model_dims=1280,
)
tfm.load_from_checkpoint(repo_id="google/timesfm-1.0-200m")
point_forecast, quantile_forecast = tfm.forecast(
[series],
freq=[0], # 0=高頻度, 1=中頻度, 2=低頻度
)
# point_forecast.shape => (1, 96)
"""
print("TimesFM: Google DeepMindの時系列基盤モデル")
print(" - 2億パラメータのデコーダーオンリーアーキテクチャ")
print(" - 未見ドメインへのゼロショット予測")
print(" - パッチベース入力(patch_len=32)")
return series
demo_timesfm()
8.2 Chronos(Amazon)
AmazonのChronosはT5言語モデルアーキテクチャを時系列に応用し、数値をトークン化することで、予測を言語モデリング問題として扱います。
def demo_chronos():
"""
Chronosの使用例(概念的)。
インストール: pip install git+https://github.com/amazon-science/chronos-forecasting.git
"""
usage_note = """
from chronos import ChronosPipeline
import torch
pipeline = ChronosPipeline.from_pretrained(
"amazon/chronos-t5-small",
device_map="cpu",
torch_dtype=torch.bfloat16,
)
context = torch.tensor(series[-512:]).unsqueeze(0)
forecast = pipeline.predict(context=context, prediction_length=24, num_samples=20)
low, median, high = np.quantile(forecast[0].numpy(), [0.1, 0.5, 0.9], axis=0)
"""
print("Chronos: AmazonのT5ベース時系列基盤モデル")
print(" - サイズ: tiny, small, base, large (7.1億パラメータ)")
print(" - 数値を量子化ビンでトークン化")
print(" - 複数サンプルによる確率的予測")
demo_chronos()
8.3 TimeGPT(Nixtla)
def demo_timegpt():
"""
TimeGPTの使用例(概念的)。
インストール: pip install nixtla
"""
usage_note = """
from nixtla import NixtlaClient
client = NixtlaClient(api_key='YOUR_KEY')
forecast_df = client.forecast(
df=df, # カラム: 'ds', 'y'
h=24,
freq='H',
time_col='ds',
target_col='y',
)
cv_df = client.cross_validation(df=df, h=24, n_windows=3, freq='H')
"""
print("TimeGPT: NixtlaのAPIサービス型時系列基盤モデル")
print(" - 異常検知サポート")
print(" - 不確実性の分位数予測")
print(" - 独自データへのファインチューニング")
demo_timegpt()
9. 異常検知
9.1 LSTMオートエンコーダによる異常検知
class LSTMAutoencoder(nn.Module):
def __init__(self, seq_len, input_size, hidden_size, num_layers=1):
super().__init__()
self.seq_len = seq_len
self.hidden_size = hidden_size
self.encoder = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.decoder = nn.LSTM(hidden_size, hidden_size, num_layers, batch_first=True)
self.output = nn.Linear(hidden_size, input_size)
def forward(self, x):
_, (h_n, c_n) = self.encoder(x)
dec_in = h_n[-1].unsqueeze(1).repeat(1, self.seq_len, 1)
dec_out, _ = self.decoder(dec_in)
return self.output(dec_out)
def detect_anomalies(model, data_list, threshold_pct=95, device='cpu'):
model.eval()
errors = []
with torch.no_grad():
for sample in data_list:
x = torch.FloatTensor(sample).unsqueeze(0).to(device)
recon = model(x)
errors.append(nn.MSELoss()(recon, x).item())
errors = np.array(errors)
threshold = np.percentile(errors, threshold_pct)
return errors, threshold, errors > threshold
# 異常を注入したデータを生成
np.random.seed(42)
n = 1000
normal = np.sin(np.linspace(0, 8*np.pi, n)) + 0.1*np.random.randn(n)
anomaly_data = normal.copy()
anomaly_data[300:310] += 3.0 # スパイク
anomaly_data[600:605] = 0.0 # 信号消失
# Isolation Forest
from sklearn.ensemble import IsolationForest
iso = IsolationForest(contamination=0.05, random_state=42)
predictions = iso.fit_predict(anomaly_data.reshape(-1, 1))
iso_anomalies = predictions == -1
print(f"Isolation Forest検出数: {iso_anomalies.sum()}")
print(f"真の異常ウィンドウ: 300-310(10点), 600-605(5点)")
plt.figure(figsize=(14, 4))
plt.plot(anomaly_data, alpha=0.7, label='データ')
plt.scatter(np.where(iso_anomalies)[0], anomaly_data[iso_anomalies],
color='red', s=30, label='検出された異常', zorder=5)
plt.title('異常検知(Isolation Forest)')
plt.legend()
plt.show()
10. 実践プロジェクト: Dartsライブラリ
10.1 Dartsによる統一予測パイプライン
Dartsは古典的・深層学習時系列モデルのための統一インターフェースを提供します。
def demo_darts():
"""
Dartsの使用例。
インストール: pip install darts
"""
usage_note = """
from darts import TimeSeries
from darts.models import NBEATSModel, TFTModel, TCNModel
from darts.metrics import mape, rmse
from darts.dataprocessing.transformers import Scaler
from darts.datasets import AirPassengersDataset
series = AirPassengersDataset().load()
train, test = series[:-24], series[-24:]
scaler = Scaler()
train_scaled = scaler.fit_transform(train)
test_scaled = scaler.transform(test)
nbeats = NBEATSModel(
input_chunk_length=36,
output_chunk_length=12,
n_epochs=100, random_state=42
)
nbeats.fit(train_scaled)
forecast = scaler.inverse_transform(nbeats.predict(24))
print(f"MAPE: {mape(test, forecast):.2f}%")
print(f"RMSE: {rmse(test, forecast):.4f}")
# Temporal Fusion Transformer(共変量をサポート)
tft = TFTModel(
input_chunk_length=36, output_chunk_length=12,
hidden_size=64, lstm_layers=1, num_attention_heads=4,
n_epochs=100, random_state=42
)
"""
print("Dartsライブラリ: 統一時系列予測")
print(" - N-BEATS, TFT, TCN, Transformer, NATS, ...")
print(" - 全モデルで一貫したfit/predict API")
demo_darts()
10.2 エネルギー需要予測パイプライン
def create_energy_pipeline():
"""完全なエネルギー需要予測パイプライン(シミュレーションデータ)。"""
np.random.seed(42)
n_hours = 24 * 365
hours = np.arange(n_hours)
base = 5000
daily = 500*np.sin(2*np.pi*(hours % 24)/24 - np.pi/2) + 300*np.sin(4*np.pi*(hours % 24)/24)
weekly = 200*np.cos(2*np.pi*(hours // 24 % 7)/7)
seasonal = 1000*np.sin(2*np.pi*hours/n_hours - np.pi/2)
noise = 100*np.random.randn(n_hours)
demand = np.maximum(base + daily + weekly + seasonal + noise, 1000)
temperature = (
20 + 10*np.sin(2*np.pi*hours/n_hours - np.pi/2)
+ 5*np.sin(2*np.pi*(hours % 24)/24)
+ 1.5*np.random.randn(n_hours)
)
df = pd.DataFrame({
'datetime': pd.date_range('2023-01-01', periods=n_hours, freq='h'),
'demand': demand,
'temperature': temperature,
'hour': hours % 24,
'dow': (hours // 24) % 7,
'month': pd.date_range('2023-01-01', periods=n_hours, freq='h').month
}).set_index('datetime')
df['lag_1'] = df['demand'].shift(1)
df['lag_24'] = df['demand'].shift(24)
df['lag_168'] = df['demand'].shift(168)
df['roll_24'] = df['demand'].rolling(24).mean()
df.dropna(inplace=True)
features = ['demand', 'temperature', 'hour', 'dow', 'month',
'lag_1', 'lag_24', 'lag_168', 'roll_24']
scaler = StandardScaler()
scaled = scaler.fit_transform(df[features])
X, y = create_sequences(scaled, seq_len=168, pred_len=24)
y = y[:, :, :1] # 目標変数 = 需要のみ
print(f"入力 shape: {X.shape}")
print(f"目標 shape: {y.shape}")
return df, scaled, X, y, scaler
energy_df, energy_scaled, X_e, y_e, e_scaler = create_energy_pipeline()
10.3 モデルベンチマークまとめ
benchmark = pd.DataFrame({
'モデル': ['ARIMA', 'Prophet', 'LSTM', 'TCN', 'PatchTST', 'TimesFM(ゼロショット)'],
'RMSE': [0.312, 0.289, 0.198, 0.185, 0.162, 0.215],
'MAE': [0.241, 0.218, 0.152, 0.141, 0.121, 0.163],
'学習時間(分)': [1.2, 2.1, 15.3, 8.7, 12.4, 0.0],
})
print(benchmark.to_string(index=False))
まとめ
本ガイドでは時系列分析の全スペクトルを解説しました。
学習ロードマップのまとめ:
- 基礎: 定常性、ACF/PACF、分解
- 古典的手法: ARIMA、SARIMA、Prophet — まずベースラインを確立
- 深層学習基礎: LSTM、TCNによる非線形パターン
- 高度なアーキテクチャ: PatchTST、N-BEATS — 現在のオープンソース最強モデル
- 基盤モデル: TimesFM、Chronosによるゼロショット予測
実践的なアドバイス:
- 深層学習に進む前に、必ずシンプルなモデル(ARIMA、Prophet)でベースラインを作る。
- 深層学習は1000点以上のデータがある場合に真価を発揮する。
- PatchTSTとN-BEATSが現在最強のオープンソース選択肢。
- ドメイン固有データが乏しい場合は基盤モデルが有効。
参考文献: