Skip to content
Published on

WebSocketリアルタイム通信 & オブザーバビリティ(監視)完全ガイド

Authors

Part 1: リアルタイム通信

1. HTTPポーリング vs ロングポーリング vs SSE vs WebSocket

リアルタイムデータ配信方式は要件に応じて適切な技術が異なります。

比較表

特性HTTPポーリングロングポーリングSSEWebSocket
方向単方向(クライアント要求)単方向(サーバー応答待機)単方向(サーバープッシュ)双方向(全二重)
接続維持毎リクエスト新規接続応答後に再接続単一HTTP接続単一TCP接続
オーバーヘッド高(繰り返しリクエスト)非常に低い
遅延ポーリング間隔依存イベント即時イベント即時イベント即時
プロトコルHTTP/1.1HTTP/1.1HTTP/1.1ws:// または wss://
ブラウザ対応すべてすべてIE以外すべて
ユースケース簡易ステータス確認通知株価、ニュースフィードチャット、ゲーム、共同編集

ユースケース別推奨技術

  • ダッシュボードデータ更新: SSE(サーバーから一方的にデータプッシュ)
  • リアルタイムチャット: WebSocket(双方向メッセージ交換)
  • ファイルアップロード進捗: SSE(サーバーが進捗をプッシュ)
  • オンラインゲーム: WebSocket(超低遅延双方向通信)
  • IoTセンサー監視: WebSocketまたはMQTT

2. WebSocketプロトコル詳細

ハンドシェイクプロセス

WebSocket接続はHTTP Upgradeリクエストから始まります。

GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13

サーバー応答:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

フレーム構造

WebSocketフレームは以下のバイナリ構造を持ちます。

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len |    Extended payload length    |
|I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
|N|V|V|V|       |S|             |   (if payload len==126/127)   |
| |1|2|3|       |K|             |                               |
+-+-+-+-+-------+-+-------------+-------------------------------+

主なopcodeの値:

  • 0x1: テキストフレーム
  • 0x2: バイナリフレーム
  • 0x8: 接続クローズ
  • 0x9: Ping
  • 0xA: Pong

Ping/Pongハートビート

// サーバー側(Node.js)
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', (ws) => {
  ws.isAlive = true;

  ws.on('pong', () => {
    ws.isAlive = true;
  });
});

// 30秒ごとにハートビート確認
const interval = setInterval(() => {
  wss.clients.forEach((ws) => {
    if (!ws.isAlive) return ws.terminate();
    ws.isAlive = false;
    ws.ping();
  });
}, 30000);

接続管理とエラーハンドリング

// クライアント側
class WebSocketClient {
  constructor(url) {
    this.url = url;
    this.reconnectAttempts = 0;
    this.maxReconnectAttempts = 5;
    this.reconnectDelay = 1000;
    this.connect();
  }

  connect() {
    this.ws = new WebSocket(this.url);

    this.ws.onopen = () => {
      console.log('Connected');
      this.reconnectAttempts = 0;
    };

    this.ws.onclose = (event) => {
      if (event.code !== 1000) {
        this.reconnect();
      }
    };

    this.ws.onerror = (error) => {
      console.error('WebSocket error:', error);
    };
  }

  reconnect() {
    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
      console.error('Max reconnect attempts reached');
      return;
    }
    this.reconnectAttempts++;
    const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts);
    setTimeout(() => this.connect(), delay);
  }
}

3. Socket.IOの活用

Socket.IOはWebSocketの上に抽象化レイヤーを提供し、リアルタイム通信の開発を簡素化します。

ネームスペースとルーム

const io = require('socket.io')(server);

// ネームスペースの分離
const chatNamespace = io.of('/chat');
const notificationNamespace = io.of('/notifications');

chatNamespace.on('connection', (socket) => {
  // ルームに参加
  socket.on('joinRoom', (roomId) => {
    socket.join(roomId);
    chatNamespace.to(roomId).emit('userJoined', {
      userId: socket.id,
      room: roomId
    });
  });

  // ルーム内メッセージ送信
  socket.on('message', (data) => {
    chatNamespace.to(data.roomId).emit('newMessage', {
      userId: socket.id,
      content: data.content,
      timestamp: Date.now()
    });
  });

  // ルーム退出
  socket.on('leaveRoom', (roomId) => {
    socket.leave(roomId);
  });
});

自動再接続とフォールバック

// クライアント設定
const socket = io('https://api.example.com', {
  // トランスポート層フォールバック(WebSocket失敗時にHTTPロングポーリング)
  transports: ['websocket', 'polling'],

  // 再接続設定
  reconnection: true,
  reconnectionAttempts: 10,
  reconnectionDelay: 1000,
  reconnectionDelayMax: 5000,
  randomizationFactor: 0.5,

  // タイムアウト
  timeout: 20000,

  // 認証
  auth: {
    token: 'my-jwt-token'
  }
});

socket.on('connect_error', (error) => {
  if (error.message === 'unauthorized') {
    // トークン更新ロジック
    refreshToken().then((newToken) => {
      socket.auth.token = newToken;
      socket.connect();
    });
  }
});

4. gRPCストリーミング

gRPCはHTTP/2ベースで4つの通信モードをサポートします。

4つのストリーミングモード

syntax = "proto3";

service ChatService {
  // 1. Unary: リクエスト1つ - レスポンス1つ
  rpc SendMessage (ChatMessage) returns (MessageResponse);

  // 2. Server Streaming: リクエスト1つ - レスポンスストリーム
  rpc SubscribeMessages (SubscriptionRequest) returns (stream ChatMessage);

  // 3. Client Streaming: リクエストストリーム - レスポンス1つ
  rpc UploadLogs (stream LogEntry) returns (UploadSummary);

  // 4. Bidirectional Streaming: リクエストストリーム - レスポンスストリーム
  rpc Chat (stream ChatMessage) returns (stream ChatMessage);
}

message ChatMessage {
  string user_id = 1;
  string content = 2;
  int64 timestamp = 3;
}

Bidirectional Streaming実装(Go)

func (s *chatServer) Chat(stream pb.ChatService_ChatServer) error {
    for {
        msg, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }

        // メッセージ処理とブロードキャスト
        response := &pb.ChatMessage{
            UserId:    msg.UserId,
            Content:   msg.Content,
            Timestamp: time.Now().UnixMilli(),
        }

        if err := stream.Send(response); err != nil {
            return err
        }
    }
}

5. 実践:リアルタイムチャットアーキテクチャ

Redis Pub/Sub + WebSocket

複数のWebSocketサーバーインスタンス間でメッセージを同期するためにRedis Pub/Subを使用します。

[クライアントA] <--ws--> [サーバー1] <--pub/sub--> [Redis]
[クライアントB] <--ws--> [サーバー2] <--pub/sub--> [Redis]
[クライアントC] <--ws--> [サーバー1] <--pub/sub--> [Redis]

Node.js実装

const Redis = require('ioredis');
const WebSocket = require('ws');

const redisPub = new Redis();
const redisSub = new Redis();
const wss = new WebSocket.Server({ port: 8080 });

// チャンネル別クライアント管理
const channels = new Map();

wss.on('connection', (ws) => {
  ws.on('message', (raw) => {
    const data = JSON.parse(raw);

    switch (data.type) {
      case 'subscribe':
        // チャンネル購読
        if (!channels.has(data.channel)) {
          channels.set(data.channel, new Set());
          redisSub.subscribe(data.channel);
        }
        channels.get(data.channel).add(ws);
        break;

      case 'message':
        // Redisを通じて全サーバーにメッセージを伝播
        redisPub.publish(data.channel, JSON.stringify({
          user: data.user,
          content: data.content,
          timestamp: Date.now()
        }));
        break;
    }
  });

  ws.on('close', () => {
    // 接続終了時にすべてのチャンネルから削除
    channels.forEach((clients, channel) => {
      clients.delete(ws);
      if (clients.size === 0) {
        channels.delete(channel);
        redisSub.unsubscribe(channel);
      }
    });
  });
});

// Redisメッセージ受信時、該当チャンネルの全クライアントに配信
redisSub.on('message', (channel, message) => {
  const clients = channels.get(channel);
  if (clients) {
    clients.forEach((ws) => {
      if (ws.readyState === WebSocket.OPEN) {
        ws.send(message);
      }
    });
  }
});

スケーラビリティの考慮事項

  • 水平スケーリング: Redis Pub/SubまたはRedis Streamsでサーバー間メッセージ同期
  • 接続制限: サーバーあたりの最大接続数設定(OSのファイルディスクリプタ制限を考慮)
  • ロードバランシング: スティッキーセッションまたはIPハッシュベースのロードバランシング
  • メッセージ永続化: Redis StreamsまたはKafkaでメッセージ履歴を保管
  • 認証: JWTベースのWebSocketハンドシェイク認証

Part 2: オブザーバビリティ

6. オブザーバビリティの3本柱

オブザーバビリティ(Observability)とは、システムの外部出力のみからその内部状態を把握できる度合いを意味します。

メトリクス(Metrics)

数値で表現される時系列データです。

- リクエスト数 (requests_total)
- レスポンス時間 (response_duration_seconds)
- エラー率 (error_rate_percent)
- CPU/メモリ使用率 (cpu_usage_percent, memory_usage_bytes)

ログ(Logs)

特定の時点で発生したイベントのテキスト記録です。

{
  "timestamp": "2026-04-12T16:00:00Z",
  "level": "ERROR",
  "service": "chat-server",
  "trace_id": "abc123def456",
  "message": "WebSocket connection failed",
  "details": {
    "user_id": "user-789",
    "error_code": 1006,
    "reason": "abnormal closure"
  }
}

トレース(Traces)

分散システムにおいてリクエストが複数のサービスを経由する全体経路を追跡します。

[API Gateway] ---> [Auth Service] ---> [Chat Service] ---> [Redis]
   Span 1            Span 2              Span 3            Span 4
   |____________ Trace ID: abc123 __________________________|

3本柱の相互関係

質問解決手段
何が問題なのか?メトリクス(エラー率急増を検知)
なぜ問題が起きたのか?トレース(ボトルネックの特定)
正確に何が起きたのか?ログ(詳細なエラーメッセージの確認)

7. OpenTelemetry

OpenTelemetry(OTel)はCNCF傘下のプロジェクトで、メトリクス・ログ・トレースをベンダー中立的に収集します。

アーキテクチャ

[アプリケーション + OTel SDK]
        |
        | OTLP (gRPC/HTTP)
        v
[OTel Collector]
   |       |       |
   v       v       v
[Jaeger] [Prom.] [Loki]

Auto-Instrumentation(Node.js)

// tracing.js - アプリケーション起動前にロード
const { NodeSDK } = require('@opentelemetry/sdk-node');
const { OTLPTraceExporter } = require('@opentelemetry/exporter-trace-otlp-grpc');
const { OTLPMetricExporter } = require('@opentelemetry/exporter-metrics-otlp-grpc');
const { getNodeAutoInstrumentations } = require('@opentelemetry/auto-instrumentations-node');
const { PeriodicExportingMetricReader } = require('@opentelemetry/sdk-metrics');

const sdk = new NodeSDK({
  serviceName: 'chat-service',
  traceExporter: new OTLPTraceExporter({
    url: 'http://otel-collector:4317',
  }),
  metricReader: new PeriodicExportingMetricReader({
    exporter: new OTLPMetricExporter({
      url: 'http://otel-collector:4317',
    }),
    exportIntervalMillis: 15000,
  }),
  instrumentations: [getNodeAutoInstrumentations()],
});

sdk.start();

OTel Collector設定

# otel-collector-config.yaml
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318

processors:
  batch:
    timeout: 5s
    send_batch_size: 1024
  memory_limiter:
    check_interval: 1s
    limit_mib: 512

exporters:
  prometheus:
    endpoint: 0.0.0.0:8889
  otlp/jaeger:
    endpoint: jaeger:4317
    tls:
      insecure: true
  loki:
    endpoint: http://loki:3100/loki/api/v1/push

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [batch, memory_limiter]
      exporters: [otlp/jaeger]
    metrics:
      receivers: [otlp]
      processors: [batch]
      exporters: [prometheus]
    logs:
      receivers: [otlp]
      processors: [batch]
      exporters: [loki]

8. Prometheus + Grafana

Prometheusのメトリクスタイプ

1. Counter   - 単調増加値(例:総リクエスト数)
2. Gauge     - 増減可能値(例:現在のアクティブ接続数)
3. Histogram - 値の分布(例:レスポンス時間の分布)
4. Summary   - パーセンタイル(例:p99レスポンス時間)

WebSocketカスタムメトリクスの例(Node.js)

const promClient = require('prom-client');

// Counter: 総接続数
const wsConnectionsTotal = new promClient.Counter({
  name: 'websocket_connections_total',
  help: 'Total WebSocket connections',
  labelNames: ['status'],
});

// Gauge: 現在のアクティブ接続
const wsActiveConnections = new promClient.Gauge({
  name: 'websocket_active_connections',
  help: 'Current active WebSocket connections',
  labelNames: ['namespace'],
});

// Histogram: メッセージ処理時間
const wsMessageDuration = new promClient.Histogram({
  name: 'websocket_message_duration_seconds',
  help: 'WebSocket message processing duration',
  buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1],
});

// WebSocketイベントにメトリクスを連携
wss.on('connection', (ws) => {
  wsConnectionsTotal.inc({ status: 'connected' });
  wsActiveConnections.inc({ namespace: 'default' });

  ws.on('message', (data) => {
    const end = wsMessageDuration.startTimer();
    processMessage(data);
    end();
  });

  ws.on('close', () => {
    wsActiveConnections.dec({ namespace: 'default' });
  });
});

主要PromQLクエリ

# 秒あたりWebSocket接続リクエスト率(5分移動平均)
rate(websocket_connections_total[5m])

# 現在のアクティブWebSocket接続数
websocket_active_connections

# メッセージ処理時間 p99
histogram_quantile(0.99, rate(websocket_message_duration_seconds_bucket[5m]))

# エラー率(パーセント)
rate(websocket_errors_total[5m]) / rate(websocket_connections_total[5m]) * 100

# CPU使用率80%超過のインスタンス
node_cpu_usage_percent > 80

Grafanaダッシュボード設計

{
  "dashboard": {
    "title": "WebSocket Realtime Monitoring",
    "panels": [
      {
        "title": "Active Connections",
        "type": "stat",
        "targets": [
          { "expr": "sum(websocket_active_connections)" }
        ]
      },
      {
        "title": "Connection Rate",
        "type": "timeseries",
        "targets": [
          { "expr": "rate(websocket_connections_total[5m])" }
        ]
      },
      {
        "title": "Message Latency (p50/p95/p99)",
        "type": "timeseries",
        "targets": [
          { "expr": "histogram_quantile(0.50, rate(websocket_message_duration_seconds_bucket[5m]))", "legendFormat": "p50" },
          { "expr": "histogram_quantile(0.95, rate(websocket_message_duration_seconds_bucket[5m]))", "legendFormat": "p95" },
          { "expr": "histogram_quantile(0.99, rate(websocket_message_duration_seconds_bucket[5m]))", "legendFormat": "p99" }
        ]
      },
      {
        "title": "Error Rate",
        "type": "gauge",
        "targets": [
          { "expr": "rate(websocket_errors_total[5m]) / rate(websocket_connections_total[5m]) * 100" }
        ]
      }
    ]
  }
}

9. 分散トレーシング

スパン(Span)とトレース(Trace)の構造

Trace ID: 7f3a8b2c1d4e5f6a

Span A: API Gateway        [============                    ] 0-200ms
  Span B: Auth Service        [====                         ] 10-50ms
  Span C: Chat Service            [==================       ] 60-180ms
    Span D: Redis Pub/Sub              [====                ] 80-120ms
    Span E: Database Write                  [=======        ] 130-170ms

Jaegerセットアップ(Docker Compose)

version: '3.8'
services:
  jaeger:
    image: jaegertracing/all-in-one:1.54
    ports:
      - "16686:16686"   # Jaeger UI
      - "4317:4317"     # OTLP gRPC
      - "4318:4318"     # OTLP HTTP
    environment:
      - COLLECTOR_OTLP_ENABLED=true
      - SPAN_STORAGE_TYPE=elasticsearch
      - ES_SERVER_URLS=http://elasticsearch:9200

手動スパン作成(Python)

from opentelemetry import trace
from opentelemetry.trace import StatusCode

tracer = trace.get_tracer("chat-service")

def handle_websocket_message(message):
    with tracer.start_as_current_span("process_message") as span:
        span.set_attribute("message.type", message.get("type"))
        span.set_attribute("message.size", len(str(message)))

        try:
            # メッセージ処理ロジック
            result = process(message)
            span.set_attribute("processing.result", "success")
            return result
        except Exception as e:
            span.set_status(StatusCode.ERROR, str(e))
            span.record_exception(e)
            raise

10. ログ集約

ELKスタック(Elasticsearch + Logstash + Kibana)

# logstash.conf
input {
  tcp {
    port => 5000
    codec => json
  }
}

filter {
  if [level] == "ERROR" {
    mutate {
      add_tag => ["alert"]
    }
  }
  date {
    match => ["timestamp", "ISO8601"]
  }
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "websocket-logs-%{+YYYY.MM.dd}"
  }
}

Loki + Grafana(軽量代替)

# loki-config.yaml
auth_enabled: false

server:
  http_listen_port: 3100

ingester:
  lifecycler:
    ring:
      kvstore:
        store: inmemory
      replication_factor: 1

schema_config:
  configs:
    - from: 2026-01-01
      store: tsdb
      object_store: filesystem
      schema: v13
      index:
        prefix: index_
        period: 24h

storage_config:
  tsdb_shipper:
    active_index_directory: /loki/index
    cache_location: /loki/cache
  filesystem:
    directory: /loki/chunks

構造化ロギング(Node.js + Winston)

const winston = require('winston');

const logger = winston.createLogger({
  level: 'info',
  format: winston.format.combine(
    winston.format.timestamp(),
    winston.format.json()
  ),
  defaultMeta: { service: 'websocket-server' },
  transports: [
    new winston.transports.Console(),
    new winston.transports.Http({
      host: 'loki-gateway',
      port: 3100,
      path: '/loki/api/v1/push',
    }),
  ],
});

// 使用例
logger.info('WebSocket connection established', {
  userId: 'user-123',
  remoteAddress: '192.168.1.100',
  protocol: 'wss',
});

logger.error('Message processing failed', {
  userId: 'user-456',
  errorCode: 'PARSE_ERROR',
  rawMessage: '<sanitized>',
  traceId: 'abc123',
});

11. アラート設計

アラート疲れ(Alert Fatigue)の防止

アラートが多すぎると、重要なアラートさえも無視されるようになります。効果的なアラート設計の原則は以下の通りです。

原則説明
アクション可能性アラートを受信したら即座に対処できること
症状ベース原因ではなくユーザー影響(症状)に基づくアラート
階層化Critical / Warning / Infoの3段階に分離
自動解決検知問題解決時に自動的にアラートを終了
グルーピング関連アラートをまとめて1回だけ送信

Prometheusアラートルール

# alerting-rules.yaml
groups:
  - name: websocket-alerts
    rules:
      - alert: HighConnectionDropRate
        expr: rate(websocket_disconnections_total[5m]) > 10
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "WebSocket接続切断率が高い"
          description: "5分間で秒間10件以上の接続切断が発生"

      - alert: WebSocketLatencyHigh
        expr: histogram_quantile(0.99, rate(websocket_message_duration_seconds_bucket[5m])) > 1
        for: 3m
        labels:
          severity: critical
        annotations:
          summary: "WebSocketメッセージ遅延時間が高い"
          description: "p99遅延時間が1秒を超過"

      - alert: ActiveConnectionsSpiking
        expr: websocket_active_connections > 10000
        for: 1m
        labels:
          severity: warning
        annotations:
          summary: "アクティブWebSocket接続数が急増"
          description: "アクティブ接続数が10,000を超過"

Alertmanagerルーティング設定

# alertmanager.yaml
global:
  resolve_timeout: 5m

route:
  receiver: 'default-receiver'
  group_by: ['alertname', 'severity']
  group_wait: 30s
  group_interval: 5m
  repeat_interval: 4h
  routes:
    - match:
        severity: critical
      receiver: 'pagerduty-critical'
      repeat_interval: 1h
    - match:
        severity: warning
      receiver: 'slack-warnings'
      repeat_interval: 4h

receivers:
  - name: 'default-receiver'
    slack_configs:
      - api_url: 'https://hooks.slack.com/services/XXXX'
        channel: '#alerts-general'

  - name: 'pagerduty-critical'
    pagerduty_configs:
      - service_key: 'your-pagerduty-key'
        severity: critical

  - name: 'slack-warnings'
    slack_configs:
      - api_url: 'https://hooks.slack.com/services/XXXX'
        channel: '#alerts-warning'
        title: 'Warning Alert'

オンコール連携:PagerDuty / OpsGenie

オンコールローテーションとエスカレーションポリシーを設定すると、障害発生時に担当者へ自動的にアラートが送信されます。

[Prometheusアラート発生]
        |
        v
[Alertmanager受信およびグルーピング]
        |
        v
[PagerDuty / OpsGenieインシデント作成]
        |
        v
[オンコール担当者に電話/SMS/Slack通知]
        |
        v
[ACK(確認)がない場合 -> エスカレーション(マネージャーへ通知)]

まとめチェックリスト

リアルタイム通信とオブザーバビリティを本番環境に適用する際の確認項目です。

リアルタイム通信

  • WebSocketハンドシェイク認証の実装
  • Ping/Pongハートビート間隔の設定
  • 自動再接続と指数バックオフの実装
  • Redis Pub/Subによる水平スケーリング対応
  • メッセージキュー(Kafka/Redis Streams)によるメッセージ永続化

オブザーバビリティ

  • メトリクス・ログ・トレースの3本柱すべてを収集
  • OpenTelemetry Collectorのデプロイとパイプライン設定
  • Grafanaダッシュボードに主要指標を可視化
  • 症状ベースのアラートルール設定
  • オンコールローテーションとエスカレーションポリシーの策定
  • アラート疲れ防止のためのしきい値の継続的なチューニング

クイズ:リアルタイム通信とオブザーバビリティの重要ポイント

Q1. WebSocketとSSEの最大の違いは何ですか?

A1. WebSocketは双方向(全二重)通信をサポートし、SSEはサーバーからクライアントへの単方向プッシュのみをサポートします。WebSocketはチャットやゲームのように双方がデータを送信する必要がある場合に適しており、SSEは株価やニュースフィードのようにサーバーが一方的にデータを送信する場合に適しています。

Q2. オブザーバビリティの3本柱とは何であり、それぞれの役割は?

A2. メトリクス(Metrics)は時系列の数値データで問題検知に、トレース(Traces)は分散システムのリクエスト経路追跡でボトルネック特定に、ログ(Logs)はイベントの詳細記録で根本原因分析に使用されます。

Q3. アラート疲れを防止するための核心的な原則は?

A3. アクション可能なアラートのみを設定し、原因ではなく症状ベースでアラートを構成し、Critical/Warning/Infoの階層化、関連アラートのグルーピング、自動解決検知を適用します。