- Part 1: リアルタイム通信
- Part 2: オブザーバビリティ
Part 1: リアルタイム通信
1. HTTPポーリング vs ロングポーリング vs SSE vs WebSocket
リアルタイムデータ配信方式は要件に応じて適切な技術が異なります。
比較表
| 特性 | HTTPポーリング | ロングポーリング | SSE | WebSocket |
|---|---|---|---|---|
| 方向 | 単方向(クライアント要求) | 単方向(サーバー応答待機) | 単方向(サーバープッシュ) | 双方向(全二重) |
| 接続維持 | 毎リクエスト新規接続 | 応答後に再接続 | 単一HTTP接続 | 単一TCP接続 |
| オーバーヘッド | 高(繰り返しリクエスト) | 中 | 低 | 非常に低い |
| 遅延 | ポーリング間隔依存 | イベント即時 | イベント即時 | イベント即時 |
| プロトコル | HTTP/1.1 | HTTP/1.1 | HTTP/1.1 | ws:// または wss:// |
| ブラウザ対応 | すべて | すべて | IE以外 | すべて |
| ユースケース | 簡易ステータス確認 | 通知 | 株価、ニュースフィード | チャット、ゲーム、共同編集 |
ユースケース別推奨技術
- ダッシュボードデータ更新: SSE(サーバーから一方的にデータプッシュ)
- リアルタイムチャット: WebSocket(双方向メッセージ交換)
- ファイルアップロード進捗: SSE(サーバーが進捗をプッシュ)
- オンラインゲーム: WebSocket(超低遅延双方向通信)
- IoTセンサー監視: WebSocketまたはMQTT
2. WebSocketプロトコル詳細
ハンドシェイクプロセス
WebSocket接続はHTTP Upgradeリクエストから始まります。
GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
サーバー応答:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
フレーム構造
WebSocketフレームは以下のバイナリ構造を持ちます。
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+-------------------------------+
主なopcodeの値:
0x1: テキストフレーム0x2: バイナリフレーム0x8: 接続クローズ0x9: Ping0xA: Pong
Ping/Pongハートビート
// サーバー側(Node.js)
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', (ws) => {
ws.isAlive = true;
ws.on('pong', () => {
ws.isAlive = true;
});
});
// 30秒ごとにハートビート確認
const interval = setInterval(() => {
wss.clients.forEach((ws) => {
if (!ws.isAlive) return ws.terminate();
ws.isAlive = false;
ws.ping();
});
}, 30000);
接続管理とエラーハンドリング
// クライアント側
class WebSocketClient {
constructor(url) {
this.url = url;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.reconnectDelay = 1000;
this.connect();
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log('Connected');
this.reconnectAttempts = 0;
};
this.ws.onclose = (event) => {
if (event.code !== 1000) {
this.reconnect();
}
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
}
reconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('Max reconnect attempts reached');
return;
}
this.reconnectAttempts++;
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts);
setTimeout(() => this.connect(), delay);
}
}
3. Socket.IOの活用
Socket.IOはWebSocketの上に抽象化レイヤーを提供し、リアルタイム通信の開発を簡素化します。
ネームスペースとルーム
const io = require('socket.io')(server);
// ネームスペースの分離
const chatNamespace = io.of('/chat');
const notificationNamespace = io.of('/notifications');
chatNamespace.on('connection', (socket) => {
// ルームに参加
socket.on('joinRoom', (roomId) => {
socket.join(roomId);
chatNamespace.to(roomId).emit('userJoined', {
userId: socket.id,
room: roomId
});
});
// ルーム内メッセージ送信
socket.on('message', (data) => {
chatNamespace.to(data.roomId).emit('newMessage', {
userId: socket.id,
content: data.content,
timestamp: Date.now()
});
});
// ルーム退出
socket.on('leaveRoom', (roomId) => {
socket.leave(roomId);
});
});
自動再接続とフォールバック
// クライアント設定
const socket = io('https://api.example.com', {
// トランスポート層フォールバック(WebSocket失敗時にHTTPロングポーリング)
transports: ['websocket', 'polling'],
// 再接続設定
reconnection: true,
reconnectionAttempts: 10,
reconnectionDelay: 1000,
reconnectionDelayMax: 5000,
randomizationFactor: 0.5,
// タイムアウト
timeout: 20000,
// 認証
auth: {
token: 'my-jwt-token'
}
});
socket.on('connect_error', (error) => {
if (error.message === 'unauthorized') {
// トークン更新ロジック
refreshToken().then((newToken) => {
socket.auth.token = newToken;
socket.connect();
});
}
});
4. gRPCストリーミング
gRPCはHTTP/2ベースで4つの通信モードをサポートします。
4つのストリーミングモード
syntax = "proto3";
service ChatService {
// 1. Unary: リクエスト1つ - レスポンス1つ
rpc SendMessage (ChatMessage) returns (MessageResponse);
// 2. Server Streaming: リクエスト1つ - レスポンスストリーム
rpc SubscribeMessages (SubscriptionRequest) returns (stream ChatMessage);
// 3. Client Streaming: リクエストストリーム - レスポンス1つ
rpc UploadLogs (stream LogEntry) returns (UploadSummary);
// 4. Bidirectional Streaming: リクエストストリーム - レスポンスストリーム
rpc Chat (stream ChatMessage) returns (stream ChatMessage);
}
message ChatMessage {
string user_id = 1;
string content = 2;
int64 timestamp = 3;
}
Bidirectional Streaming実装(Go)
func (s *chatServer) Chat(stream pb.ChatService_ChatServer) error {
for {
msg, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
// メッセージ処理とブロードキャスト
response := &pb.ChatMessage{
UserId: msg.UserId,
Content: msg.Content,
Timestamp: time.Now().UnixMilli(),
}
if err := stream.Send(response); err != nil {
return err
}
}
}
5. 実践:リアルタイムチャットアーキテクチャ
Redis Pub/Sub + WebSocket
複数のWebSocketサーバーインスタンス間でメッセージを同期するためにRedis Pub/Subを使用します。
[クライアントA] <--ws--> [サーバー1] <--pub/sub--> [Redis]
[クライアントB] <--ws--> [サーバー2] <--pub/sub--> [Redis]
[クライアントC] <--ws--> [サーバー1] <--pub/sub--> [Redis]
Node.js実装
const Redis = require('ioredis');
const WebSocket = require('ws');
const redisPub = new Redis();
const redisSub = new Redis();
const wss = new WebSocket.Server({ port: 8080 });
// チャンネル別クライアント管理
const channels = new Map();
wss.on('connection', (ws) => {
ws.on('message', (raw) => {
const data = JSON.parse(raw);
switch (data.type) {
case 'subscribe':
// チャンネル購読
if (!channels.has(data.channel)) {
channels.set(data.channel, new Set());
redisSub.subscribe(data.channel);
}
channels.get(data.channel).add(ws);
break;
case 'message':
// Redisを通じて全サーバーにメッセージを伝播
redisPub.publish(data.channel, JSON.stringify({
user: data.user,
content: data.content,
timestamp: Date.now()
}));
break;
}
});
ws.on('close', () => {
// 接続終了時にすべてのチャンネルから削除
channels.forEach((clients, channel) => {
clients.delete(ws);
if (clients.size === 0) {
channels.delete(channel);
redisSub.unsubscribe(channel);
}
});
});
});
// Redisメッセージ受信時、該当チャンネルの全クライアントに配信
redisSub.on('message', (channel, message) => {
const clients = channels.get(channel);
if (clients) {
clients.forEach((ws) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(message);
}
});
}
});
スケーラビリティの考慮事項
- 水平スケーリング: Redis Pub/SubまたはRedis Streamsでサーバー間メッセージ同期
- 接続制限: サーバーあたりの最大接続数設定(OSのファイルディスクリプタ制限を考慮)
- ロードバランシング: スティッキーセッションまたはIPハッシュベースのロードバランシング
- メッセージ永続化: Redis StreamsまたはKafkaでメッセージ履歴を保管
- 認証: JWTベースのWebSocketハンドシェイク認証
Part 2: オブザーバビリティ
6. オブザーバビリティの3本柱
オブザーバビリティ(Observability)とは、システムの外部出力のみからその内部状態を把握できる度合いを意味します。
メトリクス(Metrics)
数値で表現される時系列データです。
- リクエスト数 (requests_total)
- レスポンス時間 (response_duration_seconds)
- エラー率 (error_rate_percent)
- CPU/メモリ使用率 (cpu_usage_percent, memory_usage_bytes)
ログ(Logs)
特定の時点で発生したイベントのテキスト記録です。
{
"timestamp": "2026-04-12T16:00:00Z",
"level": "ERROR",
"service": "chat-server",
"trace_id": "abc123def456",
"message": "WebSocket connection failed",
"details": {
"user_id": "user-789",
"error_code": 1006,
"reason": "abnormal closure"
}
}
トレース(Traces)
分散システムにおいてリクエストが複数のサービスを経由する全体経路を追跡します。
[API Gateway] ---> [Auth Service] ---> [Chat Service] ---> [Redis]
Span 1 Span 2 Span 3 Span 4
|____________ Trace ID: abc123 __________________________|
3本柱の相互関係
| 質問 | 解決手段 |
|---|---|
| 何が問題なのか? | メトリクス(エラー率急増を検知) |
| なぜ問題が起きたのか? | トレース(ボトルネックの特定) |
| 正確に何が起きたのか? | ログ(詳細なエラーメッセージの確認) |
7. OpenTelemetry
OpenTelemetry(OTel)はCNCF傘下のプロジェクトで、メトリクス・ログ・トレースをベンダー中立的に収集します。
アーキテクチャ
[アプリケーション + OTel SDK]
|
| OTLP (gRPC/HTTP)
v
[OTel Collector]
| | |
v v v
[Jaeger] [Prom.] [Loki]
Auto-Instrumentation(Node.js)
// tracing.js - アプリケーション起動前にロード
const { NodeSDK } = require('@opentelemetry/sdk-node');
const { OTLPTraceExporter } = require('@opentelemetry/exporter-trace-otlp-grpc');
const { OTLPMetricExporter } = require('@opentelemetry/exporter-metrics-otlp-grpc');
const { getNodeAutoInstrumentations } = require('@opentelemetry/auto-instrumentations-node');
const { PeriodicExportingMetricReader } = require('@opentelemetry/sdk-metrics');
const sdk = new NodeSDK({
serviceName: 'chat-service',
traceExporter: new OTLPTraceExporter({
url: 'http://otel-collector:4317',
}),
metricReader: new PeriodicExportingMetricReader({
exporter: new OTLPMetricExporter({
url: 'http://otel-collector:4317',
}),
exportIntervalMillis: 15000,
}),
instrumentations: [getNodeAutoInstrumentations()],
});
sdk.start();
OTel Collector設定
# otel-collector-config.yaml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
batch:
timeout: 5s
send_batch_size: 1024
memory_limiter:
check_interval: 1s
limit_mib: 512
exporters:
prometheus:
endpoint: 0.0.0.0:8889
otlp/jaeger:
endpoint: jaeger:4317
tls:
insecure: true
loki:
endpoint: http://loki:3100/loki/api/v1/push
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch, memory_limiter]
exporters: [otlp/jaeger]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [prometheus]
logs:
receivers: [otlp]
processors: [batch]
exporters: [loki]
8. Prometheus + Grafana
Prometheusのメトリクスタイプ
1. Counter - 単調増加値(例:総リクエスト数)
2. Gauge - 増減可能値(例:現在のアクティブ接続数)
3. Histogram - 値の分布(例:レスポンス時間の分布)
4. Summary - パーセンタイル(例:p99レスポンス時間)
WebSocketカスタムメトリクスの例(Node.js)
const promClient = require('prom-client');
// Counter: 総接続数
const wsConnectionsTotal = new promClient.Counter({
name: 'websocket_connections_total',
help: 'Total WebSocket connections',
labelNames: ['status'],
});
// Gauge: 現在のアクティブ接続
const wsActiveConnections = new promClient.Gauge({
name: 'websocket_active_connections',
help: 'Current active WebSocket connections',
labelNames: ['namespace'],
});
// Histogram: メッセージ処理時間
const wsMessageDuration = new promClient.Histogram({
name: 'websocket_message_duration_seconds',
help: 'WebSocket message processing duration',
buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1],
});
// WebSocketイベントにメトリクスを連携
wss.on('connection', (ws) => {
wsConnectionsTotal.inc({ status: 'connected' });
wsActiveConnections.inc({ namespace: 'default' });
ws.on('message', (data) => {
const end = wsMessageDuration.startTimer();
processMessage(data);
end();
});
ws.on('close', () => {
wsActiveConnections.dec({ namespace: 'default' });
});
});
主要PromQLクエリ
# 秒あたりWebSocket接続リクエスト率(5分移動平均)
rate(websocket_connections_total[5m])
# 現在のアクティブWebSocket接続数
websocket_active_connections
# メッセージ処理時間 p99
histogram_quantile(0.99, rate(websocket_message_duration_seconds_bucket[5m]))
# エラー率(パーセント)
rate(websocket_errors_total[5m]) / rate(websocket_connections_total[5m]) * 100
# CPU使用率80%超過のインスタンス
node_cpu_usage_percent > 80
Grafanaダッシュボード設計
{
"dashboard": {
"title": "WebSocket Realtime Monitoring",
"panels": [
{
"title": "Active Connections",
"type": "stat",
"targets": [
{ "expr": "sum(websocket_active_connections)" }
]
},
{
"title": "Connection Rate",
"type": "timeseries",
"targets": [
{ "expr": "rate(websocket_connections_total[5m])" }
]
},
{
"title": "Message Latency (p50/p95/p99)",
"type": "timeseries",
"targets": [
{ "expr": "histogram_quantile(0.50, rate(websocket_message_duration_seconds_bucket[5m]))", "legendFormat": "p50" },
{ "expr": "histogram_quantile(0.95, rate(websocket_message_duration_seconds_bucket[5m]))", "legendFormat": "p95" },
{ "expr": "histogram_quantile(0.99, rate(websocket_message_duration_seconds_bucket[5m]))", "legendFormat": "p99" }
]
},
{
"title": "Error Rate",
"type": "gauge",
"targets": [
{ "expr": "rate(websocket_errors_total[5m]) / rate(websocket_connections_total[5m]) * 100" }
]
}
]
}
}
9. 分散トレーシング
スパン(Span)とトレース(Trace)の構造
Trace ID: 7f3a8b2c1d4e5f6a
Span A: API Gateway [============ ] 0-200ms
Span B: Auth Service [==== ] 10-50ms
Span C: Chat Service [================== ] 60-180ms
Span D: Redis Pub/Sub [==== ] 80-120ms
Span E: Database Write [======= ] 130-170ms
Jaegerセットアップ(Docker Compose)
version: '3.8'
services:
jaeger:
image: jaegertracing/all-in-one:1.54
ports:
- "16686:16686" # Jaeger UI
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
environment:
- COLLECTOR_OTLP_ENABLED=true
- SPAN_STORAGE_TYPE=elasticsearch
- ES_SERVER_URLS=http://elasticsearch:9200
手動スパン作成(Python)
from opentelemetry import trace
from opentelemetry.trace import StatusCode
tracer = trace.get_tracer("chat-service")
def handle_websocket_message(message):
with tracer.start_as_current_span("process_message") as span:
span.set_attribute("message.type", message.get("type"))
span.set_attribute("message.size", len(str(message)))
try:
# メッセージ処理ロジック
result = process(message)
span.set_attribute("processing.result", "success")
return result
except Exception as e:
span.set_status(StatusCode.ERROR, str(e))
span.record_exception(e)
raise
10. ログ集約
ELKスタック(Elasticsearch + Logstash + Kibana)
# logstash.conf
input {
tcp {
port => 5000
codec => json
}
}
filter {
if [level] == "ERROR" {
mutate {
add_tag => ["alert"]
}
}
date {
match => ["timestamp", "ISO8601"]
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "websocket-logs-%{+YYYY.MM.dd}"
}
}
Loki + Grafana(軽量代替)
# loki-config.yaml
auth_enabled: false
server:
http_listen_port: 3100
ingester:
lifecycler:
ring:
kvstore:
store: inmemory
replication_factor: 1
schema_config:
configs:
- from: 2026-01-01
store: tsdb
object_store: filesystem
schema: v13
index:
prefix: index_
period: 24h
storage_config:
tsdb_shipper:
active_index_directory: /loki/index
cache_location: /loki/cache
filesystem:
directory: /loki/chunks
構造化ロギング(Node.js + Winston)
const winston = require('winston');
const logger = winston.createLogger({
level: 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.json()
),
defaultMeta: { service: 'websocket-server' },
transports: [
new winston.transports.Console(),
new winston.transports.Http({
host: 'loki-gateway',
port: 3100,
path: '/loki/api/v1/push',
}),
],
});
// 使用例
logger.info('WebSocket connection established', {
userId: 'user-123',
remoteAddress: '192.168.1.100',
protocol: 'wss',
});
logger.error('Message processing failed', {
userId: 'user-456',
errorCode: 'PARSE_ERROR',
rawMessage: '<sanitized>',
traceId: 'abc123',
});
11. アラート設計
アラート疲れ(Alert Fatigue)の防止
アラートが多すぎると、重要なアラートさえも無視されるようになります。効果的なアラート設計の原則は以下の通りです。
| 原則 | 説明 |
|---|---|
| アクション可能性 | アラートを受信したら即座に対処できること |
| 症状ベース | 原因ではなくユーザー影響(症状)に基づくアラート |
| 階層化 | Critical / Warning / Infoの3段階に分離 |
| 自動解決検知 | 問題解決時に自動的にアラートを終了 |
| グルーピング | 関連アラートをまとめて1回だけ送信 |
Prometheusアラートルール
# alerting-rules.yaml
groups:
- name: websocket-alerts
rules:
- alert: HighConnectionDropRate
expr: rate(websocket_disconnections_total[5m]) > 10
for: 5m
labels:
severity: warning
annotations:
summary: "WebSocket接続切断率が高い"
description: "5分間で秒間10件以上の接続切断が発生"
- alert: WebSocketLatencyHigh
expr: histogram_quantile(0.99, rate(websocket_message_duration_seconds_bucket[5m])) > 1
for: 3m
labels:
severity: critical
annotations:
summary: "WebSocketメッセージ遅延時間が高い"
description: "p99遅延時間が1秒を超過"
- alert: ActiveConnectionsSpiking
expr: websocket_active_connections > 10000
for: 1m
labels:
severity: warning
annotations:
summary: "アクティブWebSocket接続数が急増"
description: "アクティブ接続数が10,000を超過"
Alertmanagerルーティング設定
# alertmanager.yaml
global:
resolve_timeout: 5m
route:
receiver: 'default-receiver'
group_by: ['alertname', 'severity']
group_wait: 30s
group_interval: 5m
repeat_interval: 4h
routes:
- match:
severity: critical
receiver: 'pagerduty-critical'
repeat_interval: 1h
- match:
severity: warning
receiver: 'slack-warnings'
repeat_interval: 4h
receivers:
- name: 'default-receiver'
slack_configs:
- api_url: 'https://hooks.slack.com/services/XXXX'
channel: '#alerts-general'
- name: 'pagerduty-critical'
pagerduty_configs:
- service_key: 'your-pagerduty-key'
severity: critical
- name: 'slack-warnings'
slack_configs:
- api_url: 'https://hooks.slack.com/services/XXXX'
channel: '#alerts-warning'
title: 'Warning Alert'
オンコール連携:PagerDuty / OpsGenie
オンコールローテーションとエスカレーションポリシーを設定すると、障害発生時に担当者へ自動的にアラートが送信されます。
[Prometheusアラート発生]
|
v
[Alertmanager受信およびグルーピング]
|
v
[PagerDuty / OpsGenieインシデント作成]
|
v
[オンコール担当者に電話/SMS/Slack通知]
|
v
[ACK(確認)がない場合 -> エスカレーション(マネージャーへ通知)]
まとめチェックリスト
リアルタイム通信とオブザーバビリティを本番環境に適用する際の確認項目です。
リアルタイム通信
- WebSocketハンドシェイク認証の実装
- Ping/Pongハートビート間隔の設定
- 自動再接続と指数バックオフの実装
- Redis Pub/Subによる水平スケーリング対応
- メッセージキュー(Kafka/Redis Streams)によるメッセージ永続化
オブザーバビリティ
- メトリクス・ログ・トレースの3本柱すべてを収集
- OpenTelemetry Collectorのデプロイとパイプライン設定
- Grafanaダッシュボードに主要指標を可視化
- 症状ベースのアラートルール設定
- オンコールローテーションとエスカレーションポリシーの策定
- アラート疲れ防止のためのしきい値の継続的なチューニング
クイズ:リアルタイム通信とオブザーバビリティの重要ポイント
Q1. WebSocketとSSEの最大の違いは何ですか?
A1. WebSocketは双方向(全二重)通信をサポートし、SSEはサーバーからクライアントへの単方向プッシュのみをサポートします。WebSocketはチャットやゲームのように双方がデータを送信する必要がある場合に適しており、SSEは株価やニュースフィードのようにサーバーが一方的にデータを送信する場合に適しています。
Q2. オブザーバビリティの3本柱とは何であり、それぞれの役割は?
A2. メトリクス(Metrics)は時系列の数値データで問題検知に、トレース(Traces)は分散システムのリクエスト経路追跡でボトルネック特定に、ログ(Logs)はイベントの詳細記録で根本原因分析に使用されます。
Q3. アラート疲れを防止するための核心的な原則は?
A3. アクション可能なアラートのみを設定し、原因ではなく症状ベースでアラートを構成し、Critical/Warning/Infoの階層化、関連アラートのグルーピング、自動解決検知を適用します。
현재 단락 (1/626)
リアルタイムデータ配信方式は要件に応じて適切な技術が異なります。