Skip to content
Published on

WebSocket & リアルタイム通信完全ガイド 2025: SSE, Socket.IO, チャット/通知/コラボレーション実装

Authors

TOC

1. なぜリアルタイム通信(つうしん)が重要(じゅうよう)なのか

現代(げんだい)のWebアプリケーションは、もはや単純(たんじゅん)なリクエスト・レスポンスモデルでは不十分(ふじゅうぶん)です。チャット、通知(つうち)、リアルタイムダッシュボード、協調編集(きょうちょうへんしゅう)、オンラインゲームなど、ユーザー体験(たいけん)の核心(かくしん)は**即時性(そくじせい)**にかかっています。

従来(じゅうらい)のHTTPリクエスト・レスポンスモデルの限界(げんかい):

  • クライアントが常(つね)にリクエストを先(さき)に送信(そうしん)しなければサーバーが応答(おうとう)しない
  • サーバーが自発的(じはつてき)にクライアントにデータを送信できない
  • ポーリングで回避(かいひ)できるが非効率(ひこうりつ)
  • レイテンシが高(たか)く、サーバーリソースの浪費(ろうひ)が大(おお)きい

このガイドでは、WebSocketSSE(Server-Sent Events)Socket.IO などリアルタイム通信技術(ぎじゅつ)のスペクトラム全体(ぜんたい)をカバーします。


2. 通信方式の比較(ひかく):Polling vs Long Polling vs SSE vs WebSocket

2.1 比較テーブル

特性Short PollingLong PollingSSEWebSocket
方向クライアント→サーバークライアント→サーバーサーバー→クライアント双方向
プロトコルHTTPHTTPHTTPws:// / wss://
接続維持毎回新規接続応答まで維持持続接続持続接続
レイテンシ高い(ポーリング間隔)中程度低い非常に低い
サーバー負荷高い中程度低い低い
ブラウザ対応全ブラウザ全ブラウザほぼ全てほぼ全て
自動再接続手動実装手動実装内蔵手動実装
バイナリデータ非効率非効率非対応対応

2.2 Short Polling

最(もっと)もシンプルな方式(ほうしき)です。一定間隔(いっていかんかく)でサーバーにリクエストを送信します。

// Short Polling 実装
function startPolling(interval = 5000) {
  const poll = async () => {
    try {
      const response = await fetch('/api/messages');
      const data = await response.json();
      updateUI(data);
    } catch (error) {
      console.error('Polling error:', error);
    }
  };

  // 即時実行後、インターバル設定
  poll();
  return setInterval(poll, interval);
}

// クリーンアップ
const pollId = startPolling(3000);
// clearInterval(pollId);

問題点(もんだいてん):新(あたら)しいデータがなくてもリクエストを送信し続(つづ)けるため、サーバーリソースが浪費されます。

2.3 Long Polling

サーバーが新しいデータが発生(はっせい)するまでレスポンスを遅延(ちえん)させます。

// Long Polling クライアント
async function longPoll(lastEventId = null) {
  try {
    const url = lastEventId
      ? `/api/events?since=${lastEventId}`
      : '/api/events';

    const response = await fetch(url, {
      signal: AbortSignal.timeout(30000), // 30秒タイムアウト
    });

    if (response.status === 200) {
      const data = await response.json();
      handleNewData(data);
      // 即座に次のLong Pollを開始
      longPoll(data.lastEventId);
    } else if (response.status === 204) {
      // データなし - タイムアウト後にリトライ
      longPoll(lastEventId);
    }
  } catch (error) {
    // エラー時は段階的にリトライ
    console.error('Long poll error:', error);
    setTimeout(() => longPoll(lastEventId), 3000);
  }
}
// Long Polling サーバー (Express)
app.get('/api/events', async (req, res) => {
  const since = req.query.since;
  const timeout = 25000; // 25秒

  const checkForData = () => {
    return new Promise((resolve) => {
      const timer = setTimeout(() => {
        resolve(null); // タイムアウト
      }, timeout);

      eventEmitter.once('newData', (data) => {
        clearTimeout(timer);
        resolve(data);
      });
    });
  };

  const data = await checkForData();
  if (data) {
    res.json(data);
  } else {
    res.status(204).end();
  }
});

2.4 どれを選(えら)ぶべきか

  • Short Polling: データ変更(へんこう)が稀(まれ)でリアルタイム性が重要でない場合
  • Long Polling: WebSocketが使用(しよう)できない環境(かんきょう)での次善策(じぜんさく)
  • SSE: サーバーからクライアントへの単方向(たんほうこう)ストリーミング(通知、フィード、リアルタイムログ)
  • WebSocket: 双方向(そうほうこう)通信が必要な場合(チャット、ゲーム、協調編集)

3. WebSocketプロトコル深層分析(しんそうぶんせき)

3.1 WebSocketハンドシェイク

WebSocketはHTTPアップグレードメカニズムを通(とお)じて接続(せつぞく)を確立(かくりつ)します。

// クライアントリクエスト
GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Sec-WebSocket-Protocol: chat, superchat

// サーバーレスポンス
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat

ポイント:

  • HTTP 101ステータスコードでプロトコル切替(きりかえ)
  • Sec-WebSocket-KeyとマジックGUIDを組(く)み合(あ)わせてAccept値を生成
  • ハンドシェイク後、双方向バイナリフレーム通信が開始(かいし)

3.2 WebSocketフレーム構造(こうぞう)

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len |    Extended payload length    |
|I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
|N|V|V|V|       |S|             |   (if payload len==126/127)   |
| |1|2|3|       |K|             |                               |
+-+-+-+-+-------+-+-------------+-------------------------------+

主要なopcode:

  • 0x1: テキストフレーム
  • 0x2: バイナリフレーム
  • 0x8: 接続終了
  • 0x9: Ping
  • 0xA: Pong

3.3 Node.js基本WebSocketサーバー

import { WebSocketServer } from 'ws';
import { createServer } from 'http';

const server = createServer();
const wss = new WebSocketServer({ server });

// 接続クライアント管理
const clients = new Map();

wss.on('connection', (ws, req) => {
  const clientId = generateId();
  clients.set(clientId, { ws, alive: true });

  console.log(`Client connected: ${clientId}`);

  // Ping/Pong ハートビート
  ws.isAlive = true;
  ws.on('pong', () => {
    ws.isAlive = true;
  });

  // メッセージ受信
  ws.on('message', (data, isBinary) => {
    const message = isBinary ? data : data.toString();

    try {
      const parsed = JSON.parse(message);
      handleMessage(clientId, parsed);
    } catch (e) {
      ws.send(JSON.stringify({ error: 'Invalid message format' }));
    }
  });

  // 接続終了
  ws.on('close', (code, reason) => {
    console.log(`Client disconnected: ${clientId}, code: ${code}`);
    clients.delete(clientId);
  });

  // エラー処理
  ws.on('error', (error) => {
    console.error(`WebSocket error for ${clientId}:`, error);
  });

  // ウェルカムメッセージ
  ws.send(JSON.stringify({
    type: 'welcome',
    clientId,
    timestamp: Date.now(),
  }));
});

// ハートビートインターバル
const heartbeat = setInterval(() => {
  wss.clients.forEach((ws) => {
    if (ws.isAlive === false) {
      return ws.terminate();
    }
    ws.isAlive = false;
    ws.ping();
  });
}, 30000);

wss.on('close', () => {
  clearInterval(heartbeat);
});

function handleMessage(clientId, message) {
  switch (message.type) {
    case 'broadcast':
      broadcast(clientId, message.payload);
      break;
    case 'direct':
      sendToClient(message.targetId, message.payload);
      break;
    default:
      console.warn(`Unknown message type: ${message.type}`);
  }
}

function broadcast(senderId, payload) {
  const msg = JSON.stringify({
    type: 'broadcast',
    from: senderId,
    payload,
    timestamp: Date.now(),
  });

  clients.forEach((client, id) => {
    if (id !== senderId && client.ws.readyState === 1) {
      client.ws.send(msg);
    }
  });
}

function sendToClient(targetId, payload) {
  const target = clients.get(targetId);
  if (target && target.ws.readyState === 1) {
    target.ws.send(JSON.stringify({ type: 'direct', payload }));
  }
}

server.listen(8080, () => {
  console.log('WebSocket server running on port 8080');
});

3.4 クライアント実装(自動再接続)

class ReconnectingWebSocket {
  constructor(url, options = {}) {
    this.url = url;
    this.maxRetries = options.maxRetries || 10;
    this.baseDelay = options.baseDelay || 1000;
    this.maxDelay = options.maxDelay || 30000;
    this.retryCount = 0;
    this.handlers = new Map();
    this.messageQueue = [];
    this.connect();
  }

  connect() {
    this.ws = new WebSocket(this.url);

    this.ws.onopen = () => {
      console.log('WebSocket connected');
      this.retryCount = 0;
      // キューに溜まったメッセージを送信
      while (this.messageQueue.length > 0) {
        const msg = this.messageQueue.shift();
        this.ws.send(msg);
      }
      this.emit('open');
    };

    this.ws.onmessage = (event) => {
      try {
        const data = JSON.parse(event.data);
        this.emit('message', data);
        if (data.type) {
          this.emit(data.type, data);
        }
      } catch (e) {
        this.emit('message', event.data);
      }
    };

    this.ws.onclose = (event) => {
      this.emit('close', event);
      if (!event.wasClean) {
        this.reconnect();
      }
    };

    this.ws.onerror = (error) => {
      this.emit('error', error);
    };
  }

  reconnect() {
    if (this.retryCount >= this.maxRetries) {
      console.error('Max retries reached');
      this.emit('maxRetriesReached');
      return;
    }

    // 指数バックオフ + ジッター
    const delay = Math.min(
      this.baseDelay * Math.pow(2, this.retryCount) + Math.random() * 1000,
      this.maxDelay
    );

    console.log(`Reconnecting in ${delay}ms (attempt ${this.retryCount + 1})`);
    this.retryCount++;
    setTimeout(() => this.connect(), delay);
  }

  send(data) {
    const message = typeof data === 'string' ? data : JSON.stringify(data);
    if (this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(message);
    } else {
      this.messageQueue.push(message);
    }
  }

  on(event, handler) {
    if (!this.handlers.has(event)) {
      this.handlers.set(event, []);
    }
    this.handlers.get(event).push(handler);
  }

  emit(event, data) {
    const handlers = this.handlers.get(event) || [];
    handlers.forEach((handler) => handler(data));
  }

  close() {
    this.maxRetries = 0; // 再接続を防止
    this.ws.close(1000, 'Client closing');
  }
}

// 使用例
const ws = new ReconnectingWebSocket('wss://api.example.com/ws');
ws.on('message', (data) => console.log('Received:', data));
ws.send({ type: 'join', room: 'general' });

4. SSE(Server-Sent Events)

4.1 SSEとは

SSEはサーバーからクライアントへの単方向ストリーミングのための標準(ひょうじゅん)です。HTTP接続を維持(いじ)しながらサーバーがイベントをプッシュします。

利点(りてん):

  • HTTP/2と自然(しぜん)に互換(ごかん)
  • 自動再接続(じどうさいせつぞく)が内蔵
  • Last-Event-IDによるイベント復旧(ふっきゅう)
  • テキストベースでデバッグが容易(ようい)
  • プロキシ/ロードバランサーとの親和性(しんわせい)が高い

4.2 SSEサーバー実装

// Express SSEサーバー
import express from 'express';

const app = express();

// SSEクライアント管理
const sseClients = new Map();
let eventCounter = 0;

app.get('/api/events', (req, res) => {
  const clientId = Date.now().toString();

  // SSEヘッダー設定
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
    'X-Accel-Buffering': 'no', // Nginxバッファリング無効化
  });

  // Last-Event-ID処理(再接続時の復旧)
  const lastEventId = req.headers['last-event-id'];
  if (lastEventId) {
    // 欠落イベントの再送信
    const missedEvents = getEventsSince(parseInt(lastEventId));
    missedEvents.forEach((event) => {
      res.write(`id: ${event.id}\n`);
      res.write(`event: ${event.type}\n`);
      res.write(`data: ${JSON.stringify(event.data)}\n\n`);
    });
  }

  // クライアント登録
  sseClients.set(clientId, res);

  // 再接続間隔設定(ミリ秒)
  res.write('retry: 5000\n\n');

  // 初期接続確認
  res.write(`id: ${++eventCounter}\n`);
  res.write('event: connected\n');
  res.write(`data: ${JSON.stringify({ clientId })}\n\n`);

  // Keep-alive(15秒ごとにコメント送信)
  const keepAlive = setInterval(() => {
    res.write(':keep-alive\n\n');
  }, 15000);

  // 接続終了処理
  req.on('close', () => {
    clearInterval(keepAlive);
    sseClients.delete(clientId);
    console.log(`SSE client disconnected: ${clientId}`);
  });
});

// 全SSEクライアントにイベントをブロードキャスト
function broadcastSSE(eventType, data) {
  const id = ++eventCounter;
  sseClients.forEach((res) => {
    res.write(`id: ${id}\n`);
    res.write(`event: ${eventType}\n`);
    res.write(`data: ${JSON.stringify(data)}\n\n`);
  });
}

4.3 SSEクライアント実装

// EventSource API活用
class SSEClient {
  constructor(url) {
    this.url = url;
    this.handlers = {};
    this.connect();
  }

  connect() {
    this.source = new EventSource(this.url);

    this.source.onopen = () => {
      console.log('SSE connection opened');
    };

    this.source.onerror = (error) => {
      console.error('SSE error:', error);
      if (this.source.readyState === EventSource.CLOSED) {
        console.log('SSE connection closed');
      }
      // EventSourceが自動的に再接続を試みる
    };

    // カスタムイベント登録
    Object.entries(this.handlers).forEach(([event, handler]) => {
      this.source.addEventListener(event, handler);
    });
  }

  on(event, handler) {
    const wrappedHandler = (e) => {
      try {
        const data = JSON.parse(e.data);
        handler(data, e);
      } catch {
        handler(e.data, e);
      }
    };

    this.handlers[event] = wrappedHandler;
    if (this.source) {
      this.source.addEventListener(event, wrappedHandler);
    }
  }

  close() {
    if (this.source) {
      this.source.close();
    }
  }
}

// 使用例
const sse = new SSEClient('/api/events');
sse.on('notification', (data) => {
  showNotification(data.title, data.message);
});
sse.on('update', (data) => {
  refreshDashboard(data);
});

5. Socket.IO完全ガイド

5.1 Socket.IOアーキテクチャ

Socket.IOはWebSocketの上(うえ)に構築(こうちく)されたライブラリで、以下(いか)の機能(きのう)を追加します:

  • 自動再接続(指数バックオフ)
  • パケットバッファリング
  • ルーム(Room)とネームスペース(Namespace)
  • ブロードキャスト
  • マルチプレクシング
  • HTTP Long Pollingフォールバック
// サーバー設定
import { createServer } from 'http';
import { Server } from 'socket.io';

const httpServer = createServer();
const io = new Server(httpServer, {
  cors: {
    origin: ['https://example.com'],
    methods: ['GET', 'POST'],
    credentials: true,
  },
  transports: ['websocket', 'polling'],
  pingTimeout: 20000,
  pingInterval: 25000,
  maxHttpBufferSize: 1e6, // 1MB
});

5.2 ネームスペースとルーム

// ネームスペース - 論理的分離
const chatNamespace = io.of('/chat');
const adminNamespace = io.of('/admin');

// チャットネームスペース
chatNamespace.on('connection', (socket) => {
  console.log('User connected to chat:', socket.id);

  // ルーム参加
  socket.on('joinRoom', async (roomName) => {
    await socket.join(roomName);
    socket.to(roomName).emit('userJoined', {
      userId: socket.id,
      room: roomName,
    });

    const sockets = await chatNamespace.in(roomName).fetchSockets();
    chatNamespace.to(roomName).emit('roomInfo', {
      room: roomName,
      members: sockets.length,
    });
  });

  // ルーム退出
  socket.on('leaveRoom', async (roomName) => {
    await socket.leave(roomName);
    socket.to(roomName).emit('userLeft', {
      userId: socket.id,
      room: roomName,
    });
  });

  // ルーム内メッセージ送信
  socket.on('message', (data) => {
    chatNamespace.to(data.room).emit('message', {
      from: socket.id,
      text: data.text,
      room: data.room,
      timestamp: Date.now(),
    });
  });
});

// 管理者ネームスペース - 認証必要
adminNamespace.use((socket, next) => {
  const token = socket.handshake.auth.token;
  if (verifyAdminToken(token)) {
    next();
  } else {
    next(new Error('Unauthorized'));
  }
});

5.3 ミドルウェアと認証(にんしょう)

// グローバルミドルウェア
io.use((socket, next) => {
  const token = socket.handshake.auth.token;
  if (!token) {
    return next(new Error('Authentication required'));
  }

  try {
    const user = verifyToken(token);
    socket.data.user = user;
    next();
  } catch (err) {
    next(new Error('Invalid token'));
  }
});

// イベント毎のミドルウェア
io.on('connection', (socket) => {
  // 全イベントのロギングミドルウェア
  socket.use(([event, ...args], next) => {
    console.log(`Event: ${event}, User: ${socket.data.user.name}`);
    next();
  });

  // レート制限ミドルウェア
  const rateLimiter = createRateLimiter(100, 60000); // 1分あたり100件
  socket.use(([event, ...args], next) => {
    if (rateLimiter.isAllowed(socket.id)) {
      next();
    } else {
      next(new Error('Rate limit exceeded'));
    }
  });
});

5.4 Acknowledgements(応答確認)

// サーバー
io.on('connection', (socket) => {
  socket.on('createOrder', async (orderData, callback) => {
    try {
      const order = await processOrder(orderData);
      callback({
        status: 'ok',
        orderId: order.id,
      });
    } catch (error) {
      callback({
        status: 'error',
        message: error.message,
      });
    }
  });
});

// クライアント
socket.emit('createOrder', { item: 'laptop', qty: 1 }, (response) => {
  if (response.status === 'ok') {
    console.log('Order created:', response.orderId);
  } else {
    console.error('Order failed:', response.message);
  }
});

// タイムアウト付きemit
socket.timeout(5000).emit('createOrder', orderData, (err, response) => {
  if (err) {
    // サーバーが5秒以内に応答しなかった
    console.error('Timeout');
  } else {
    console.log('Response:', response);
  }
});

6. チャットシステム実装

6.1 メッセージ順序(じゅんじょ)保証(ほしょう)

分散(ぶんさん)環境でメッセージ順序を保証することは核心的(かくしんてき)な課題(かだい)です。

// サーバー:メッセージ順序管理
class ChatRoom {
  constructor(roomId) {
    this.roomId = roomId;
    this.messageSequence = 0;
  }

  async addMessage(userId, content) {
    const sequence = ++this.messageSequence;
    const message = {
      id: `${this.roomId}-${sequence}`,
      sequence,
      roomId: this.roomId,
      userId,
      content,
      timestamp: Date.now(),
      status: 'sent',
    };

    await this.persistMessage(message);
    return message;
  }

  async getMessagesSince(sequence, limit = 50) {
    return await db.messages
      .find({
        roomId: this.roomId,
        sequence: { $gt: sequence },
      })
      .sort({ sequence: 1 })
      .limit(limit)
      .toArray();
  }
}

6.2 プレゼンス(接続状態)管理(かんり)

// プレゼンスシステム
class PresenceManager {
  constructor(io, redis) {
    this.io = io;
    this.redis = redis;
    this.TTL = 60; // 60秒
  }

  async setOnline(userId, socketId) {
    const key = `presence:${userId}`;
    await this.redis.hset(key, {
      status: 'online',
      socketId,
      lastSeen: Date.now().toString(),
    });
    await this.redis.expire(key, this.TTL);

    // フレンドにステータス変更を通知
    const friends = await this.getFriends(userId);
    friends.forEach((friendId) => {
      this.io.to(`user:${friendId}`).emit('presence', {
        userId,
        status: 'online',
      });
    });
  }

  async setOffline(userId) {
    const key = `presence:${userId}`;
    await this.redis.hset(key, {
      status: 'offline',
      lastSeen: Date.now().toString(),
    });

    const friends = await this.getFriends(userId);
    friends.forEach((friendId) => {
      this.io.to(`user:${friendId}`).emit('presence', {
        userId,
        status: 'offline',
        lastSeen: Date.now(),
      });
    });
  }
}

6.3 タイピングインジケーター

// クライアント
let typingTimeout;

messageInput.addEventListener('input', () => {
  socket.emit('typing', { room: currentRoom });

  clearTimeout(typingTimeout);
  typingTimeout = setTimeout(() => {
    socket.emit('stopTyping', { room: currentRoom });
  }, 2000);
});

// サーバー
socket.on('typing', (data) => {
  socket.to(data.room).emit('typing', {
    userId: socket.data.user.id,
    userName: socket.data.user.name,
  });
});

socket.on('stopTyping', (data) => {
  socket.to(data.room).emit('stopTyping', {
    userId: socket.data.user.id,
  });
});

6.4 既読確認(きどくかくにん)

// サーバー:既読確認処理
socket.on('markRead', async (data) => {
  const { roomId, messageId, userId } = data;

  await db.readReceipts.upsert({
    roomId,
    userId,
    lastReadMessageId: messageId,
    readAt: new Date(),
  });

  socket.to(roomId).emit('readReceipt', {
    userId,
    messageId,
    readAt: Date.now(),
  });
});

// クライアント:Intersection Observerで自動既読検知
const observer = new IntersectionObserver(
  (entries) => {
    entries.forEach((entry) => {
      if (entry.isIntersecting) {
        const messageId = entry.target.dataset.messageId;
        socket.emit('markRead', {
          roomId: currentRoom,
          messageId,
          userId: currentUser.id,
        });
        observer.unobserve(entry.target);
      }
    });
  },
  { threshold: 0.5 }
);

7. リアルタイム通知(つうち)システム

7.1 Pushアーキテクチャ

// 通知サービス
class NotificationService {
  constructor(io, redis) {
    this.io = io;
    this.redis = redis;
  }

  async send(userId, notification) {
    const enriched = {
      id: generateId(),
      ...notification,
      createdAt: Date.now(),
      read: false,
    };

    // 永続化
    await db.notifications.insertOne({
      userId,
      ...enriched,
    });

    // リアルタイム配信
    this.io.to(`user:${userId}`).emit('notification', enriched);

    // オフラインならプッシュ通知キューに追加
    const isOnline = await this.isUserOnline(userId);
    if (!isOnline) {
      await this.queuePushNotification(userId, enriched);
    }

    return enriched;
  }

  async sendBatch(userIds, notification) {
    const enriched = {
      id: generateId(),
      ...notification,
      createdAt: Date.now(),
      read: false,
    };

    const docs = userIds.map((userId) => ({
      userId,
      ...enriched,
    }));
    await db.notifications.insertMany(docs);

    userIds.forEach((userId) => {
      this.io.to(`user:${userId}`).emit('notification', enriched);
    });
  }
}

7.2 通知バッチ処理(しょり)

// 高頻度イベントのバッチ処理
class NotificationBatcher {
  constructor(flushInterval = 1000, maxBatchSize = 50) {
    this.buffer = new Map();
    this.flushInterval = flushInterval;
    this.maxBatchSize = maxBatchSize;

    setInterval(() => this.flush(), this.flushInterval);
  }

  add(userId, notification) {
    if (!this.buffer.has(userId)) {
      this.buffer.set(userId, []);
    }

    const userBuffer = this.buffer.get(userId);
    userBuffer.push(notification);

    if (userBuffer.length >= this.maxBatchSize) {
      this.flushUser(userId);
    }
  }

  flush() {
    this.buffer.forEach((notifications, userId) => {
      if (notifications.length > 0) {
        this.flushUser(userId);
      }
    });
  }

  flushUser(userId) {
    const notifications = this.buffer.get(userId) || [];
    if (notifications.length === 0) return;

    this.buffer.set(userId, []);

    const grouped = this.groupNotifications(notifications);
    io.to(`user:${userId}`).emit('notifications:batch', grouped);
  }

  groupNotifications(notifications) {
    const groups = {};
    notifications.forEach((n) => {
      const key = n.type;
      if (!groups[key]) groups[key] = [];
      groups[key].push(n);
    });
    return groups;
  }
}

8. 協調編集(きょうちょうへんしゅう):OT vs CRDT

8.1 OT (Operational Transformation)

OTはGoogle Docsで使用(しよう)されている方式で、同時編集時(どうじへんしゅうじ)にオペレーションを変換(へんかん)して一貫性(いっかんせい)を維持します。

// OTの基本概念
// 2人のユーザーが同時に編集する場合:
// User A: insert('X', position=0) - 位置0に'X'を挿入
// User B: insert('Y', position=2) - 位置2に'Y'を挿入

// Aのオペレーションに対してBの観点で変換:
// Bが位置2に挿入したので、Aのオペレーションは影響なし(位置0)

// Bのオペレーションに対してAの観点で変換:
// Aが位置0に挿入したので、Bの位置は2+1=3に調整

function transformInsert(opA, opB) {
  if (opA.position <= opB.position) {
    return { ...opB, position: opB.position + 1 };
  }
  return opB;
}

8.2 CRDT (Conflict-free Replicated Data Type)

CRDTは中央(ちゅうおう)サーバーなしでも結果整合性(けっかせいごうせい)を保証するデータ構造です。

// Y.jsを活用したCRDTベースの協調編集
import * as Y from 'yjs';
import { WebsocketProvider } from 'y-websocket';

// 共有ドキュメント作成
const ydoc = new Y.Doc();

// WebSocketプロバイダー接続
const provider = new WebsocketProvider(
  'wss://collaboration.example.com',
  'document-room-123',
  ydoc
);

// 共有テキスト
const ytext = ydoc.getText('content');

// テキスト変更監視
ytext.observe((event) => {
  console.log('Text changed:', event.delta);
});

// テキスト編集
ytext.insert(0, 'Hello, ');
ytext.insert(7, 'World!');

// Awareness(カーソル位置、ユーザー情報)
const awareness = provider.awareness;
awareness.setLocalState({
  user: { name: 'Alice', color: '#ff0000' },
  cursor: { position: 42 },
});

awareness.on('change', () => {
  const states = awareness.getStates();
  updateCursors(states);
});

8.3 OT vs CRDT比較

特性OTCRDT
サーバー依存性中央サーバー必須サーバーなしでも可能
実装複雑度高い中程度
メモリ使用量低い高い(メタデータ)
オフライン対応困難自然に対応
代表的実装Google DocsY.js, Automerge
スケーラビリティサーバーボトルネックP2Pスケール可能

9. WebSocket水平(すいへい)スケーリング

9.1 問題:複数(ふくすう)サーバーインスタンス

WebSocket接続は特定(とくてい)のサーバーインスタンスにバインドされます。複数サーバーがある場合、あるサーバーのクライアントが送信したメッセージを別(べつ)のサーバーのクライアントに配信する必要があります。

9.2 Redis Adapter (Socket.IO)

import { Server } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';

const io = new Server(httpServer);

const pubClient = createClient({ url: 'redis://redis-host:6379' });
const subClient = pubClient.duplicate();

await Promise.all([pubClient.connect(), subClient.connect()]);

// Redis Adapter適用
io.adapter(createAdapter(pubClient, subClient));

// どのサーバーインスタンスからでもブロードキャスト可能
io.emit('globalEvent', { message: 'All servers receive this' });
io.to('room-123').emit('roomEvent', { data: 'value' });

9.3 Sticky Sessions設定(せってい)

# Nginx設定
upstream websocket_backend {
    ip_hash;  # Sticky Session
    server backend1:3000;
    server backend2:3000;
    server backend3:3000;
}

server {
    listen 80;

    location /socket.io/ {
        proxy_pass http://websocket_backend;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

        proxy_read_timeout 86400s;
        proxy_send_timeout 86400s;
    }
}

9.4 NATSを活用したPub/Sub

import { connect } from 'nats';

class NATSAdapter {
  constructor(io) {
    this.io = io;
    this.serverId = process.env.SERVER_ID || generateId();
  }

  async connect() {
    this.nc = await connect({ servers: 'nats://nats-host:4222' });

    const sub = this.nc.subscribe('ws.broadcast.>');
    (async () => {
      for await (const msg of sub) {
        const data = JSON.parse(msg.data.toString());
        if (data.serverId === this.serverId) continue;

        const topic = msg.subject.split('.').slice(2).join('.');
        this.io.to(topic).emit(data.event, data.payload);
      }
    })();
  }

  broadcast(room, event, payload) {
    this.nc.publish(`ws.broadcast.${room}`, JSON.stringify({
      serverId: this.serverId,
      event,
      payload,
    }));
  }
}

10. セキュリティ

10.1 認証(にんしょう)

// トークンベース認証
io.use(async (socket, next) => {
  try {
    const token = socket.handshake.auth.token
      || socket.handshake.query.token;

    if (!token) {
      return next(new Error('Authentication token required'));
    }

    const payload = await verifyJWT(token);

    if (payload.exp * 1000 < Date.now()) {
      return next(new Error('Token expired'));
    }

    socket.data.user = {
      id: payload.sub,
      roles: payload.roles,
    };

    next();
  } catch (err) {
    next(new Error('Invalid authentication token'));
  }
});

10.2 レート制限(せいげん)

// トークンバケットアルゴリズム
class TokenBucket {
  constructor(capacity, refillRate) {
    this.capacity = capacity;
    this.tokens = capacity;
    this.refillRate = refillRate;
    this.lastRefill = Date.now();
  }

  consume(count = 1) {
    this.refill();
    if (this.tokens >= count) {
      this.tokens -= count;
      return true;
    }
    return false;
  }

  refill() {
    const now = Date.now();
    const elapsed = (now - this.lastRefill) / 1000;
    this.tokens = Math.min(
      this.capacity,
      this.tokens + elapsed * this.refillRate
    );
    this.lastRefill = now;
  }
}

// Socket.IOに適用
const rateLimiters = new Map();

socket.use(([event, ...args], next) => {
  const bucket = rateLimiters.get(socket.data.user.id);
  if (bucket.consume()) {
    next();
  } else {
    next(new Error('Rate limit exceeded'));
  }
});

10.3 Origin検証(けんしょう)

const io = new Server(httpServer, {
  cors: {
    origin: (origin, callback) => {
      const allowedOrigins = [
        'https://app.example.com',
        'https://admin.example.com',
      ];

      if (!origin || allowedOrigins.includes(origin)) {
        callback(null, true);
      } else {
        callback(new Error('Not allowed by CORS'));
      }
    },
    credentials: true,
  },
});

11. パフォーマンス最適化(さいてきか)

11.1 メッセージ圧縮(あっしゅく)

// permessage-deflate拡張
import { WebSocketServer } from 'ws';

const wss = new WebSocketServer({
  port: 8080,
  perMessageDeflate: {
    zlibDeflateOptions: {
      chunkSize: 1024,
      memLevel: 7,
      level: 3,
    },
    threshold: 1024, // 1KB以上のみ圧縮
    clientNoContextTakeover: true,
    serverNoContextTakeover: true,
  },
});

11.2 バイナリプロトコル(Protocol Buffers)

// message.proto
syntax = "proto3";

message ChatMessage {
  string id = 1;
  string room_id = 2;
  string user_id = 3;
  string content = 4;
  int64 timestamp = 5;
  MessageType type = 6;

  enum MessageType {
    TEXT = 0;
    IMAGE = 1;
    FILE = 2;
    SYSTEM = 3;
  }
}
// Protobuf使用
import protobuf from 'protobufjs';

const root = await protobuf.load('message.proto');
const ChatMessage = root.lookupType('ChatMessage');

// エンコード
function encodeMessage(event, payload) {
  const frame = WebSocketFrame.create({ event, chatMessage: payload });
  return WebSocketFrame.encode(frame).finish();
}

// デコード
function decodeMessage(buffer) {
  return WebSocketFrame.decode(new Uint8Array(buffer));
}

// WebSocketでバイナリ送信
ws.on('message', (data, isBinary) => {
  if (isBinary) {
    const frame = decodeMessage(data);
    handleFrame(frame);
  }
});

12. プロダクション運用(うんよう)

12.1 ヘルスチェック

app.get('/health', (req, res) => {
  const health = {
    status: 'ok',
    uptime: process.uptime(),
    connections: io.engine.clientsCount,
    rooms: io.sockets.adapter.rooms.size,
    memory: process.memoryUsage(),
    timestamp: Date.now(),
  };

  if (health.connections > 10000) {
    health.status = 'degraded';
    res.status(503);
  }

  res.json(health);
});

12.2 グレースフルシャットダウン

async function gracefulShutdown(signal) {
  console.log(`Received ${signal}. Starting graceful shutdown...`);

  // 1. 新規接続を拒否
  httpServer.close();

  // 2. 既存クライアントに終了を通知
  io.emit('server:shutdown', {
    message: 'Server is shutting down. Please reconnect.',
    reconnectDelay: 5000,
  });

  // 3. 猶予期間後にすべての接続を切断
  const disconnectTimeout = setTimeout(() => {
    io.disconnectSockets(true);
  }, 10000);

  // 4. 保留中の操作を完了
  await flushPendingMessages();

  // 5. Redis接続をクリーンアップ
  await pubClient.quit();
  await subClient.quit();

  clearTimeout(disconnectTimeout);

  console.log('Graceful shutdown complete');
  process.exit(0);
}

process.on('SIGTERM', () => gracefulShutdown('SIGTERM'));
process.on('SIGINT', () => gracefulShutdown('SIGINT'));

12.3 モニタリング

// Prometheusメトリクス
import { Registry, Counter, Gauge, Histogram } from 'prom-client';

const register = new Registry();

const wsConnections = new Gauge({
  name: 'websocket_connections_active',
  help: 'Number of active WebSocket connections',
  registers: [register],
});

const wsMessages = new Counter({
  name: 'websocket_messages_total',
  help: 'Total WebSocket messages',
  labelNames: ['direction', 'type'],
  registers: [register],
});

const wsLatency = new Histogram({
  name: 'websocket_message_latency_seconds',
  help: 'WebSocket message processing latency',
  buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1],
  registers: [register],
});

io.on('connection', (socket) => {
  wsConnections.inc();

  socket.on('disconnect', () => {
    wsConnections.dec();
  });

  socket.use(([event, ...args], next) => {
    const start = process.hrtime.bigint();
    wsMessages.inc({ direction: 'inbound', type: event });
    next();
    const duration = Number(process.hrtime.bigint() - start) / 1e9;
    wsLatency.observe(duration);
  });
});

app.get('/metrics', async (req, res) => {
  res.set('Content-Type', register.contentType);
  res.end(await register.metrics());
});

13. プロダクションアーキテクチャパターン

13.1 チャットサービス全体(ぜんたい)アーキテクチャ

Client (React/Vue)
    |
    | WebSocket (Socket.IO)
    v
Load Balancer (Nginx, sticky sessions)
    |
    +---> App Server 1 ---+
    +---> App Server 2 ---+--> Redis Pub/Sub (メッセージブロードキャスト)
    +---> App Server 3 ---+
                           |
                           +--> PostgreSQL (メッセージ永続化)
                           +--> Redis (セッション、プレゼンス、キャッシュ)
                           +--> S3 (ファイルアップロード)

13.2 イベント駆動(くどう)アーキテクチャ

// イベントバスパターン
class EventBus {
  constructor(redis) {
    this.redis = redis;
    this.handlers = new Map();
  }

  async publish(channel, event) {
    await this.redis.publish(channel, JSON.stringify(event));
  }

  subscribe(channel, handler) {
    if (!this.handlers.has(channel)) {
      this.handlers.set(channel, []);
      this.redis.subscribe(channel);
    }
    this.handlers.get(channel).push(handler);
  }
}

14. クイズ

Q1: WebSocketハンドシェイクで使用されるHTTPステータスコードは何ですか?

回答:101 Switching Protocols

WebSocket接続はHTTPアップグレードメカニズムを通じて確立されます。クライアントがUpgrade: websocketヘッダーを含むリクエストを送信すると、サーバーは101ステータスコードで応答し、プロトコル切替が成功したことを示します。その後、双方向バイナリフレーム通信が開始されます。

Q2: SSEの自動再接続メカニズムで使用されるヘッダーは何ですか?

回答:Last-Event-ID

EventSource APIは接続が切断されると自動的に再接続を試みます。その際、最後に受信したイベントのIDをLast-Event-IDリクエストヘッダーに含めてサーバーに送信します。サーバーはこのID以降のイベントから再送信できます。retryフィールドで再接続間隔(ミリ秒)を指定することもできます。

Q3: Socket.IOのRoomとNamespaceの違いは何ですか?

回答:

Namespaceは単一のWebSocket接続上で複数の通信チャネルをマルチプレクシングします。異なるエンドポイント(例:/chat/admin)で区別され、それぞれ独立したミドルウェアとイベントハンドラーを持てます。

RoomはNamespace内部でクライアントをグループ化する論理的な単位です。socket.join(room)で参加し、io.to(room).emit()で特定グループにのみメッセージを送信できます。Roomはサーバー側の概念であり、クライアントは直接アクセスできません。

Q4: OTとCRDTの核心的な違いを説明してください。

回答:

OTは中央サーバーが同時編集オペレーションを変換して一貫性を維持します。Google Docsが代表例です。サーバー依存であり、オフライン編集に制限があります。

CRDTはデータ構造自体が競合を解決するため、中央サーバーなしでも結果整合性を保証します。Y.jsやAutomergeが代表例です。P2P通信とオフライン編集を自然にサポートしますが、メタデータによりメモリ使用量が大きくなります。

Q5: WebSocketサーバーを水平スケーリングする際にRedis Adapterが必要な理由は?

回答:

WebSocket接続は特定のサーバーインスタンスにバインドされます。複数のサーバーインスタンスがある場合、サーバーAに接続されたクライアントが送信したメッセージをサーバーBに接続されたクライアントに配信する必要があります。

Redis AdapterはRedisのPub/Sub機能を使用して、すべてのサーバーインスタンス間でメッセージをブロードキャストします。あるサーバーでio.to(room).emit()を呼び出すと、Redisを通じて他のすべてのサーバーにもイベントが伝播され、クラスター全体のクライアントにメッセージが配信されます。さらにSticky Sessionの設定も必要です。


15. 参考資料(さんこうしりょう)

  1. RFC 6455 - The WebSocket Protocol - https://datatracker.ietf.org/doc/html/rfc6455
  2. MDN - WebSocket API - https://developer.mozilla.org/en-US/docs/Web/API/WebSocket
  3. MDN - Server-Sent Events - https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events
  4. Socket.IO Documentation - https://socket.io/docs/v4/
  5. ws - Node.js WebSocketライブラリ - https://github.com/websockets/ws
  6. Y.js - CRDTフレームワーク - https://docs.yjs.dev/
  7. Automerge - CRDTライブラリ - https://automerge.org/
  8. Redis Adapter for Socket.IO - https://socket.io/docs/v4/redis-adapter/
  9. NATS Messaging - https://nats.io/
  10. Protocol Buffers - https://protobuf.dev/
  11. Scaling WebSocket (Socket.IO docs) - https://socket.io/docs/v4/using-multiple-nodes/
  12. HTML Living Standard - Server-Sent Events - https://html.spec.whatwg.org/multipage/server-sent-events.html
  13. pino - Node.jsロガー - https://getpino.io/

このガイドで取(と)り上(あ)げたリアルタイム通信技術はそれぞれ固有(こゆう)の強(つよ)みを持っています。SSEはシンプルな単方向ストリーミングに、WebSocketは双方向通信に、Socket.IOは豊富(ほうふ)な機能が必要なプロジェクトに最適です。プロダクションでは必ず認証、レート制限、モニタリングを整(ととの)え、水平スケーリングのためにRedis AdapterとSticky Sessionを設定してください。