- Published on
WebSocket & リアルタイム通信完全ガイド 2025: SSE, Socket.IO, チャット/通知/コラボレーション実装
- Authors

- Name
- Youngju Kim
- @fjvbn20031
TOC
1. なぜリアルタイム通信(つうしん)が重要(じゅうよう)なのか
現代(げんだい)のWebアプリケーションは、もはや単純(たんじゅん)なリクエスト・レスポンスモデルでは不十分(ふじゅうぶん)です。チャット、通知(つうち)、リアルタイムダッシュボード、協調編集(きょうちょうへんしゅう)、オンラインゲームなど、ユーザー体験(たいけん)の核心(かくしん)は**即時性(そくじせい)**にかかっています。
従来(じゅうらい)のHTTPリクエスト・レスポンスモデルの限界(げんかい):
- クライアントが常(つね)にリクエストを先(さき)に送信(そうしん)しなければサーバーが応答(おうとう)しない
- サーバーが自発的(じはつてき)にクライアントにデータを送信できない
- ポーリングで回避(かいひ)できるが非効率(ひこうりつ)
- レイテンシが高(たか)く、サーバーリソースの浪費(ろうひ)が大(おお)きい
このガイドでは、WebSocket、SSE(Server-Sent Events)、Socket.IO などリアルタイム通信技術(ぎじゅつ)のスペクトラム全体(ぜんたい)をカバーします。
2. 通信方式の比較(ひかく):Polling vs Long Polling vs SSE vs WebSocket
2.1 比較テーブル
| 特性 | Short Polling | Long Polling | SSE | WebSocket |
|---|---|---|---|---|
| 方向 | クライアント→サーバー | クライアント→サーバー | サーバー→クライアント | 双方向 |
| プロトコル | HTTP | HTTP | HTTP | ws:// / wss:// |
| 接続維持 | 毎回新規接続 | 応答まで維持 | 持続接続 | 持続接続 |
| レイテンシ | 高い(ポーリング間隔) | 中程度 | 低い | 非常に低い |
| サーバー負荷 | 高い | 中程度 | 低い | 低い |
| ブラウザ対応 | 全ブラウザ | 全ブラウザ | ほぼ全て | ほぼ全て |
| 自動再接続 | 手動実装 | 手動実装 | 内蔵 | 手動実装 |
| バイナリデータ | 非効率 | 非効率 | 非対応 | 対応 |
2.2 Short Polling
最(もっと)もシンプルな方式(ほうしき)です。一定間隔(いっていかんかく)でサーバーにリクエストを送信します。
// Short Polling 実装
function startPolling(interval = 5000) {
const poll = async () => {
try {
const response = await fetch('/api/messages');
const data = await response.json();
updateUI(data);
} catch (error) {
console.error('Polling error:', error);
}
};
// 即時実行後、インターバル設定
poll();
return setInterval(poll, interval);
}
// クリーンアップ
const pollId = startPolling(3000);
// clearInterval(pollId);
問題点(もんだいてん):新(あたら)しいデータがなくてもリクエストを送信し続(つづ)けるため、サーバーリソースが浪費されます。
2.3 Long Polling
サーバーが新しいデータが発生(はっせい)するまでレスポンスを遅延(ちえん)させます。
// Long Polling クライアント
async function longPoll(lastEventId = null) {
try {
const url = lastEventId
? `/api/events?since=${lastEventId}`
: '/api/events';
const response = await fetch(url, {
signal: AbortSignal.timeout(30000), // 30秒タイムアウト
});
if (response.status === 200) {
const data = await response.json();
handleNewData(data);
// 即座に次のLong Pollを開始
longPoll(data.lastEventId);
} else if (response.status === 204) {
// データなし - タイムアウト後にリトライ
longPoll(lastEventId);
}
} catch (error) {
// エラー時は段階的にリトライ
console.error('Long poll error:', error);
setTimeout(() => longPoll(lastEventId), 3000);
}
}
// Long Polling サーバー (Express)
app.get('/api/events', async (req, res) => {
const since = req.query.since;
const timeout = 25000; // 25秒
const checkForData = () => {
return new Promise((resolve) => {
const timer = setTimeout(() => {
resolve(null); // タイムアウト
}, timeout);
eventEmitter.once('newData', (data) => {
clearTimeout(timer);
resolve(data);
});
});
};
const data = await checkForData();
if (data) {
res.json(data);
} else {
res.status(204).end();
}
});
2.4 どれを選(えら)ぶべきか
- Short Polling: データ変更(へんこう)が稀(まれ)でリアルタイム性が重要でない場合
- Long Polling: WebSocketが使用(しよう)できない環境(かんきょう)での次善策(じぜんさく)
- SSE: サーバーからクライアントへの単方向(たんほうこう)ストリーミング(通知、フィード、リアルタイムログ)
- WebSocket: 双方向(そうほうこう)通信が必要な場合(チャット、ゲーム、協調編集)
3. WebSocketプロトコル深層分析(しんそうぶんせき)
3.1 WebSocketハンドシェイク
WebSocketはHTTPアップグレードメカニズムを通(とお)じて接続(せつぞく)を確立(かくりつ)します。
// クライアントリクエスト
GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Sec-WebSocket-Protocol: chat, superchat
// サーバーレスポンス
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat
ポイント:
- HTTP 101ステータスコードでプロトコル切替(きりかえ)
Sec-WebSocket-KeyとマジックGUIDを組(く)み合(あ)わせてAccept値を生成- ハンドシェイク後、双方向バイナリフレーム通信が開始(かいし)
3.2 WebSocketフレーム構造(こうぞう)
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+-------------------------------+
主要なopcode:
0x1: テキストフレーム0x2: バイナリフレーム0x8: 接続終了0x9: Ping0xA: Pong
3.3 Node.js基本WebSocketサーバー
import { WebSocketServer } from 'ws';
import { createServer } from 'http';
const server = createServer();
const wss = new WebSocketServer({ server });
// 接続クライアント管理
const clients = new Map();
wss.on('connection', (ws, req) => {
const clientId = generateId();
clients.set(clientId, { ws, alive: true });
console.log(`Client connected: ${clientId}`);
// Ping/Pong ハートビート
ws.isAlive = true;
ws.on('pong', () => {
ws.isAlive = true;
});
// メッセージ受信
ws.on('message', (data, isBinary) => {
const message = isBinary ? data : data.toString();
try {
const parsed = JSON.parse(message);
handleMessage(clientId, parsed);
} catch (e) {
ws.send(JSON.stringify({ error: 'Invalid message format' }));
}
});
// 接続終了
ws.on('close', (code, reason) => {
console.log(`Client disconnected: ${clientId}, code: ${code}`);
clients.delete(clientId);
});
// エラー処理
ws.on('error', (error) => {
console.error(`WebSocket error for ${clientId}:`, error);
});
// ウェルカムメッセージ
ws.send(JSON.stringify({
type: 'welcome',
clientId,
timestamp: Date.now(),
}));
});
// ハートビートインターバル
const heartbeat = setInterval(() => {
wss.clients.forEach((ws) => {
if (ws.isAlive === false) {
return ws.terminate();
}
ws.isAlive = false;
ws.ping();
});
}, 30000);
wss.on('close', () => {
clearInterval(heartbeat);
});
function handleMessage(clientId, message) {
switch (message.type) {
case 'broadcast':
broadcast(clientId, message.payload);
break;
case 'direct':
sendToClient(message.targetId, message.payload);
break;
default:
console.warn(`Unknown message type: ${message.type}`);
}
}
function broadcast(senderId, payload) {
const msg = JSON.stringify({
type: 'broadcast',
from: senderId,
payload,
timestamp: Date.now(),
});
clients.forEach((client, id) => {
if (id !== senderId && client.ws.readyState === 1) {
client.ws.send(msg);
}
});
}
function sendToClient(targetId, payload) {
const target = clients.get(targetId);
if (target && target.ws.readyState === 1) {
target.ws.send(JSON.stringify({ type: 'direct', payload }));
}
}
server.listen(8080, () => {
console.log('WebSocket server running on port 8080');
});
3.4 クライアント実装(自動再接続)
class ReconnectingWebSocket {
constructor(url, options = {}) {
this.url = url;
this.maxRetries = options.maxRetries || 10;
this.baseDelay = options.baseDelay || 1000;
this.maxDelay = options.maxDelay || 30000;
this.retryCount = 0;
this.handlers = new Map();
this.messageQueue = [];
this.connect();
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log('WebSocket connected');
this.retryCount = 0;
// キューに溜まったメッセージを送信
while (this.messageQueue.length > 0) {
const msg = this.messageQueue.shift();
this.ws.send(msg);
}
this.emit('open');
};
this.ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
this.emit('message', data);
if (data.type) {
this.emit(data.type, data);
}
} catch (e) {
this.emit('message', event.data);
}
};
this.ws.onclose = (event) => {
this.emit('close', event);
if (!event.wasClean) {
this.reconnect();
}
};
this.ws.onerror = (error) => {
this.emit('error', error);
};
}
reconnect() {
if (this.retryCount >= this.maxRetries) {
console.error('Max retries reached');
this.emit('maxRetriesReached');
return;
}
// 指数バックオフ + ジッター
const delay = Math.min(
this.baseDelay * Math.pow(2, this.retryCount) + Math.random() * 1000,
this.maxDelay
);
console.log(`Reconnecting in ${delay}ms (attempt ${this.retryCount + 1})`);
this.retryCount++;
setTimeout(() => this.connect(), delay);
}
send(data) {
const message = typeof data === 'string' ? data : JSON.stringify(data);
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(message);
} else {
this.messageQueue.push(message);
}
}
on(event, handler) {
if (!this.handlers.has(event)) {
this.handlers.set(event, []);
}
this.handlers.get(event).push(handler);
}
emit(event, data) {
const handlers = this.handlers.get(event) || [];
handlers.forEach((handler) => handler(data));
}
close() {
this.maxRetries = 0; // 再接続を防止
this.ws.close(1000, 'Client closing');
}
}
// 使用例
const ws = new ReconnectingWebSocket('wss://api.example.com/ws');
ws.on('message', (data) => console.log('Received:', data));
ws.send({ type: 'join', room: 'general' });
4. SSE(Server-Sent Events)
4.1 SSEとは
SSEはサーバーからクライアントへの単方向ストリーミングのための標準(ひょうじゅん)です。HTTP接続を維持(いじ)しながらサーバーがイベントをプッシュします。
利点(りてん):
- HTTP/2と自然(しぜん)に互換(ごかん)
- 自動再接続(じどうさいせつぞく)が内蔵
Last-Event-IDによるイベント復旧(ふっきゅう)- テキストベースでデバッグが容易(ようい)
- プロキシ/ロードバランサーとの親和性(しんわせい)が高い
4.2 SSEサーバー実装
// Express SSEサーバー
import express from 'express';
const app = express();
// SSEクライアント管理
const sseClients = new Map();
let eventCounter = 0;
app.get('/api/events', (req, res) => {
const clientId = Date.now().toString();
// SSEヘッダー設定
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no', // Nginxバッファリング無効化
});
// Last-Event-ID処理(再接続時の復旧)
const lastEventId = req.headers['last-event-id'];
if (lastEventId) {
// 欠落イベントの再送信
const missedEvents = getEventsSince(parseInt(lastEventId));
missedEvents.forEach((event) => {
res.write(`id: ${event.id}\n`);
res.write(`event: ${event.type}\n`);
res.write(`data: ${JSON.stringify(event.data)}\n\n`);
});
}
// クライアント登録
sseClients.set(clientId, res);
// 再接続間隔設定(ミリ秒)
res.write('retry: 5000\n\n');
// 初期接続確認
res.write(`id: ${++eventCounter}\n`);
res.write('event: connected\n');
res.write(`data: ${JSON.stringify({ clientId })}\n\n`);
// Keep-alive(15秒ごとにコメント送信)
const keepAlive = setInterval(() => {
res.write(':keep-alive\n\n');
}, 15000);
// 接続終了処理
req.on('close', () => {
clearInterval(keepAlive);
sseClients.delete(clientId);
console.log(`SSE client disconnected: ${clientId}`);
});
});
// 全SSEクライアントにイベントをブロードキャスト
function broadcastSSE(eventType, data) {
const id = ++eventCounter;
sseClients.forEach((res) => {
res.write(`id: ${id}\n`);
res.write(`event: ${eventType}\n`);
res.write(`data: ${JSON.stringify(data)}\n\n`);
});
}
4.3 SSEクライアント実装
// EventSource API活用
class SSEClient {
constructor(url) {
this.url = url;
this.handlers = {};
this.connect();
}
connect() {
this.source = new EventSource(this.url);
this.source.onopen = () => {
console.log('SSE connection opened');
};
this.source.onerror = (error) => {
console.error('SSE error:', error);
if (this.source.readyState === EventSource.CLOSED) {
console.log('SSE connection closed');
}
// EventSourceが自動的に再接続を試みる
};
// カスタムイベント登録
Object.entries(this.handlers).forEach(([event, handler]) => {
this.source.addEventListener(event, handler);
});
}
on(event, handler) {
const wrappedHandler = (e) => {
try {
const data = JSON.parse(e.data);
handler(data, e);
} catch {
handler(e.data, e);
}
};
this.handlers[event] = wrappedHandler;
if (this.source) {
this.source.addEventListener(event, wrappedHandler);
}
}
close() {
if (this.source) {
this.source.close();
}
}
}
// 使用例
const sse = new SSEClient('/api/events');
sse.on('notification', (data) => {
showNotification(data.title, data.message);
});
sse.on('update', (data) => {
refreshDashboard(data);
});
5. Socket.IO完全ガイド
5.1 Socket.IOアーキテクチャ
Socket.IOはWebSocketの上(うえ)に構築(こうちく)されたライブラリで、以下(いか)の機能(きのう)を追加します:
- 自動再接続(指数バックオフ)
- パケットバッファリング
- ルーム(Room)とネームスペース(Namespace)
- ブロードキャスト
- マルチプレクシング
- HTTP Long Pollingフォールバック
// サーバー設定
import { createServer } from 'http';
import { Server } from 'socket.io';
const httpServer = createServer();
const io = new Server(httpServer, {
cors: {
origin: ['https://example.com'],
methods: ['GET', 'POST'],
credentials: true,
},
transports: ['websocket', 'polling'],
pingTimeout: 20000,
pingInterval: 25000,
maxHttpBufferSize: 1e6, // 1MB
});
5.2 ネームスペースとルーム
// ネームスペース - 論理的分離
const chatNamespace = io.of('/chat');
const adminNamespace = io.of('/admin');
// チャットネームスペース
chatNamespace.on('connection', (socket) => {
console.log('User connected to chat:', socket.id);
// ルーム参加
socket.on('joinRoom', async (roomName) => {
await socket.join(roomName);
socket.to(roomName).emit('userJoined', {
userId: socket.id,
room: roomName,
});
const sockets = await chatNamespace.in(roomName).fetchSockets();
chatNamespace.to(roomName).emit('roomInfo', {
room: roomName,
members: sockets.length,
});
});
// ルーム退出
socket.on('leaveRoom', async (roomName) => {
await socket.leave(roomName);
socket.to(roomName).emit('userLeft', {
userId: socket.id,
room: roomName,
});
});
// ルーム内メッセージ送信
socket.on('message', (data) => {
chatNamespace.to(data.room).emit('message', {
from: socket.id,
text: data.text,
room: data.room,
timestamp: Date.now(),
});
});
});
// 管理者ネームスペース - 認証必要
adminNamespace.use((socket, next) => {
const token = socket.handshake.auth.token;
if (verifyAdminToken(token)) {
next();
} else {
next(new Error('Unauthorized'));
}
});
5.3 ミドルウェアと認証(にんしょう)
// グローバルミドルウェア
io.use((socket, next) => {
const token = socket.handshake.auth.token;
if (!token) {
return next(new Error('Authentication required'));
}
try {
const user = verifyToken(token);
socket.data.user = user;
next();
} catch (err) {
next(new Error('Invalid token'));
}
});
// イベント毎のミドルウェア
io.on('connection', (socket) => {
// 全イベントのロギングミドルウェア
socket.use(([event, ...args], next) => {
console.log(`Event: ${event}, User: ${socket.data.user.name}`);
next();
});
// レート制限ミドルウェア
const rateLimiter = createRateLimiter(100, 60000); // 1分あたり100件
socket.use(([event, ...args], next) => {
if (rateLimiter.isAllowed(socket.id)) {
next();
} else {
next(new Error('Rate limit exceeded'));
}
});
});
5.4 Acknowledgements(応答確認)
// サーバー
io.on('connection', (socket) => {
socket.on('createOrder', async (orderData, callback) => {
try {
const order = await processOrder(orderData);
callback({
status: 'ok',
orderId: order.id,
});
} catch (error) {
callback({
status: 'error',
message: error.message,
});
}
});
});
// クライアント
socket.emit('createOrder', { item: 'laptop', qty: 1 }, (response) => {
if (response.status === 'ok') {
console.log('Order created:', response.orderId);
} else {
console.error('Order failed:', response.message);
}
});
// タイムアウト付きemit
socket.timeout(5000).emit('createOrder', orderData, (err, response) => {
if (err) {
// サーバーが5秒以内に応答しなかった
console.error('Timeout');
} else {
console.log('Response:', response);
}
});
6. チャットシステム実装
6.1 メッセージ順序(じゅんじょ)保証(ほしょう)
分散(ぶんさん)環境でメッセージ順序を保証することは核心的(かくしんてき)な課題(かだい)です。
// サーバー:メッセージ順序管理
class ChatRoom {
constructor(roomId) {
this.roomId = roomId;
this.messageSequence = 0;
}
async addMessage(userId, content) {
const sequence = ++this.messageSequence;
const message = {
id: `${this.roomId}-${sequence}`,
sequence,
roomId: this.roomId,
userId,
content,
timestamp: Date.now(),
status: 'sent',
};
await this.persistMessage(message);
return message;
}
async getMessagesSince(sequence, limit = 50) {
return await db.messages
.find({
roomId: this.roomId,
sequence: { $gt: sequence },
})
.sort({ sequence: 1 })
.limit(limit)
.toArray();
}
}
6.2 プレゼンス(接続状態)管理(かんり)
// プレゼンスシステム
class PresenceManager {
constructor(io, redis) {
this.io = io;
this.redis = redis;
this.TTL = 60; // 60秒
}
async setOnline(userId, socketId) {
const key = `presence:${userId}`;
await this.redis.hset(key, {
status: 'online',
socketId,
lastSeen: Date.now().toString(),
});
await this.redis.expire(key, this.TTL);
// フレンドにステータス変更を通知
const friends = await this.getFriends(userId);
friends.forEach((friendId) => {
this.io.to(`user:${friendId}`).emit('presence', {
userId,
status: 'online',
});
});
}
async setOffline(userId) {
const key = `presence:${userId}`;
await this.redis.hset(key, {
status: 'offline',
lastSeen: Date.now().toString(),
});
const friends = await this.getFriends(userId);
friends.forEach((friendId) => {
this.io.to(`user:${friendId}`).emit('presence', {
userId,
status: 'offline',
lastSeen: Date.now(),
});
});
}
}
6.3 タイピングインジケーター
// クライアント
let typingTimeout;
messageInput.addEventListener('input', () => {
socket.emit('typing', { room: currentRoom });
clearTimeout(typingTimeout);
typingTimeout = setTimeout(() => {
socket.emit('stopTyping', { room: currentRoom });
}, 2000);
});
// サーバー
socket.on('typing', (data) => {
socket.to(data.room).emit('typing', {
userId: socket.data.user.id,
userName: socket.data.user.name,
});
});
socket.on('stopTyping', (data) => {
socket.to(data.room).emit('stopTyping', {
userId: socket.data.user.id,
});
});
6.4 既読確認(きどくかくにん)
// サーバー:既読確認処理
socket.on('markRead', async (data) => {
const { roomId, messageId, userId } = data;
await db.readReceipts.upsert({
roomId,
userId,
lastReadMessageId: messageId,
readAt: new Date(),
});
socket.to(roomId).emit('readReceipt', {
userId,
messageId,
readAt: Date.now(),
});
});
// クライアント:Intersection Observerで自動既読検知
const observer = new IntersectionObserver(
(entries) => {
entries.forEach((entry) => {
if (entry.isIntersecting) {
const messageId = entry.target.dataset.messageId;
socket.emit('markRead', {
roomId: currentRoom,
messageId,
userId: currentUser.id,
});
observer.unobserve(entry.target);
}
});
},
{ threshold: 0.5 }
);
7. リアルタイム通知(つうち)システム
7.1 Pushアーキテクチャ
// 通知サービス
class NotificationService {
constructor(io, redis) {
this.io = io;
this.redis = redis;
}
async send(userId, notification) {
const enriched = {
id: generateId(),
...notification,
createdAt: Date.now(),
read: false,
};
// 永続化
await db.notifications.insertOne({
userId,
...enriched,
});
// リアルタイム配信
this.io.to(`user:${userId}`).emit('notification', enriched);
// オフラインならプッシュ通知キューに追加
const isOnline = await this.isUserOnline(userId);
if (!isOnline) {
await this.queuePushNotification(userId, enriched);
}
return enriched;
}
async sendBatch(userIds, notification) {
const enriched = {
id: generateId(),
...notification,
createdAt: Date.now(),
read: false,
};
const docs = userIds.map((userId) => ({
userId,
...enriched,
}));
await db.notifications.insertMany(docs);
userIds.forEach((userId) => {
this.io.to(`user:${userId}`).emit('notification', enriched);
});
}
}
7.2 通知バッチ処理(しょり)
// 高頻度イベントのバッチ処理
class NotificationBatcher {
constructor(flushInterval = 1000, maxBatchSize = 50) {
this.buffer = new Map();
this.flushInterval = flushInterval;
this.maxBatchSize = maxBatchSize;
setInterval(() => this.flush(), this.flushInterval);
}
add(userId, notification) {
if (!this.buffer.has(userId)) {
this.buffer.set(userId, []);
}
const userBuffer = this.buffer.get(userId);
userBuffer.push(notification);
if (userBuffer.length >= this.maxBatchSize) {
this.flushUser(userId);
}
}
flush() {
this.buffer.forEach((notifications, userId) => {
if (notifications.length > 0) {
this.flushUser(userId);
}
});
}
flushUser(userId) {
const notifications = this.buffer.get(userId) || [];
if (notifications.length === 0) return;
this.buffer.set(userId, []);
const grouped = this.groupNotifications(notifications);
io.to(`user:${userId}`).emit('notifications:batch', grouped);
}
groupNotifications(notifications) {
const groups = {};
notifications.forEach((n) => {
const key = n.type;
if (!groups[key]) groups[key] = [];
groups[key].push(n);
});
return groups;
}
}
8. 協調編集(きょうちょうへんしゅう):OT vs CRDT
8.1 OT (Operational Transformation)
OTはGoogle Docsで使用(しよう)されている方式で、同時編集時(どうじへんしゅうじ)にオペレーションを変換(へんかん)して一貫性(いっかんせい)を維持します。
// OTの基本概念
// 2人のユーザーが同時に編集する場合:
// User A: insert('X', position=0) - 位置0に'X'を挿入
// User B: insert('Y', position=2) - 位置2に'Y'を挿入
// Aのオペレーションに対してBの観点で変換:
// Bが位置2に挿入したので、Aのオペレーションは影響なし(位置0)
// Bのオペレーションに対してAの観点で変換:
// Aが位置0に挿入したので、Bの位置は2+1=3に調整
function transformInsert(opA, opB) {
if (opA.position <= opB.position) {
return { ...opB, position: opB.position + 1 };
}
return opB;
}
8.2 CRDT (Conflict-free Replicated Data Type)
CRDTは中央(ちゅうおう)サーバーなしでも結果整合性(けっかせいごうせい)を保証するデータ構造です。
// Y.jsを活用したCRDTベースの協調編集
import * as Y from 'yjs';
import { WebsocketProvider } from 'y-websocket';
// 共有ドキュメント作成
const ydoc = new Y.Doc();
// WebSocketプロバイダー接続
const provider = new WebsocketProvider(
'wss://collaboration.example.com',
'document-room-123',
ydoc
);
// 共有テキスト
const ytext = ydoc.getText('content');
// テキスト変更監視
ytext.observe((event) => {
console.log('Text changed:', event.delta);
});
// テキスト編集
ytext.insert(0, 'Hello, ');
ytext.insert(7, 'World!');
// Awareness(カーソル位置、ユーザー情報)
const awareness = provider.awareness;
awareness.setLocalState({
user: { name: 'Alice', color: '#ff0000' },
cursor: { position: 42 },
});
awareness.on('change', () => {
const states = awareness.getStates();
updateCursors(states);
});
8.3 OT vs CRDT比較
| 特性 | OT | CRDT |
|---|---|---|
| サーバー依存性 | 中央サーバー必須 | サーバーなしでも可能 |
| 実装複雑度 | 高い | 中程度 |
| メモリ使用量 | 低い | 高い(メタデータ) |
| オフライン対応 | 困難 | 自然に対応 |
| 代表的実装 | Google Docs | Y.js, Automerge |
| スケーラビリティ | サーバーボトルネック | P2Pスケール可能 |
9. WebSocket水平(すいへい)スケーリング
9.1 問題:複数(ふくすう)サーバーインスタンス
WebSocket接続は特定(とくてい)のサーバーインスタンスにバインドされます。複数サーバーがある場合、あるサーバーのクライアントが送信したメッセージを別(べつ)のサーバーのクライアントに配信する必要があります。
9.2 Redis Adapter (Socket.IO)
import { Server } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
const io = new Server(httpServer);
const pubClient = createClient({ url: 'redis://redis-host:6379' });
const subClient = pubClient.duplicate();
await Promise.all([pubClient.connect(), subClient.connect()]);
// Redis Adapter適用
io.adapter(createAdapter(pubClient, subClient));
// どのサーバーインスタンスからでもブロードキャスト可能
io.emit('globalEvent', { message: 'All servers receive this' });
io.to('room-123').emit('roomEvent', { data: 'value' });
9.3 Sticky Sessions設定(せってい)
# Nginx設定
upstream websocket_backend {
ip_hash; # Sticky Session
server backend1:3000;
server backend2:3000;
server backend3:3000;
}
server {
listen 80;
location /socket.io/ {
proxy_pass http://websocket_backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_read_timeout 86400s;
proxy_send_timeout 86400s;
}
}
9.4 NATSを活用したPub/Sub
import { connect } from 'nats';
class NATSAdapter {
constructor(io) {
this.io = io;
this.serverId = process.env.SERVER_ID || generateId();
}
async connect() {
this.nc = await connect({ servers: 'nats://nats-host:4222' });
const sub = this.nc.subscribe('ws.broadcast.>');
(async () => {
for await (const msg of sub) {
const data = JSON.parse(msg.data.toString());
if (data.serverId === this.serverId) continue;
const topic = msg.subject.split('.').slice(2).join('.');
this.io.to(topic).emit(data.event, data.payload);
}
})();
}
broadcast(room, event, payload) {
this.nc.publish(`ws.broadcast.${room}`, JSON.stringify({
serverId: this.serverId,
event,
payload,
}));
}
}
10. セキュリティ
10.1 認証(にんしょう)
// トークンベース認証
io.use(async (socket, next) => {
try {
const token = socket.handshake.auth.token
|| socket.handshake.query.token;
if (!token) {
return next(new Error('Authentication token required'));
}
const payload = await verifyJWT(token);
if (payload.exp * 1000 < Date.now()) {
return next(new Error('Token expired'));
}
socket.data.user = {
id: payload.sub,
roles: payload.roles,
};
next();
} catch (err) {
next(new Error('Invalid authentication token'));
}
});
10.2 レート制限(せいげん)
// トークンバケットアルゴリズム
class TokenBucket {
constructor(capacity, refillRate) {
this.capacity = capacity;
this.tokens = capacity;
this.refillRate = refillRate;
this.lastRefill = Date.now();
}
consume(count = 1) {
this.refill();
if (this.tokens >= count) {
this.tokens -= count;
return true;
}
return false;
}
refill() {
const now = Date.now();
const elapsed = (now - this.lastRefill) / 1000;
this.tokens = Math.min(
this.capacity,
this.tokens + elapsed * this.refillRate
);
this.lastRefill = now;
}
}
// Socket.IOに適用
const rateLimiters = new Map();
socket.use(([event, ...args], next) => {
const bucket = rateLimiters.get(socket.data.user.id);
if (bucket.consume()) {
next();
} else {
next(new Error('Rate limit exceeded'));
}
});
10.3 Origin検証(けんしょう)
const io = new Server(httpServer, {
cors: {
origin: (origin, callback) => {
const allowedOrigins = [
'https://app.example.com',
'https://admin.example.com',
];
if (!origin || allowedOrigins.includes(origin)) {
callback(null, true);
} else {
callback(new Error('Not allowed by CORS'));
}
},
credentials: true,
},
});
11. パフォーマンス最適化(さいてきか)
11.1 メッセージ圧縮(あっしゅく)
// permessage-deflate拡張
import { WebSocketServer } from 'ws';
const wss = new WebSocketServer({
port: 8080,
perMessageDeflate: {
zlibDeflateOptions: {
chunkSize: 1024,
memLevel: 7,
level: 3,
},
threshold: 1024, // 1KB以上のみ圧縮
clientNoContextTakeover: true,
serverNoContextTakeover: true,
},
});
11.2 バイナリプロトコル(Protocol Buffers)
// message.proto
syntax = "proto3";
message ChatMessage {
string id = 1;
string room_id = 2;
string user_id = 3;
string content = 4;
int64 timestamp = 5;
MessageType type = 6;
enum MessageType {
TEXT = 0;
IMAGE = 1;
FILE = 2;
SYSTEM = 3;
}
}
// Protobuf使用
import protobuf from 'protobufjs';
const root = await protobuf.load('message.proto');
const ChatMessage = root.lookupType('ChatMessage');
// エンコード
function encodeMessage(event, payload) {
const frame = WebSocketFrame.create({ event, chatMessage: payload });
return WebSocketFrame.encode(frame).finish();
}
// デコード
function decodeMessage(buffer) {
return WebSocketFrame.decode(new Uint8Array(buffer));
}
// WebSocketでバイナリ送信
ws.on('message', (data, isBinary) => {
if (isBinary) {
const frame = decodeMessage(data);
handleFrame(frame);
}
});
12. プロダクション運用(うんよう)
12.1 ヘルスチェック
app.get('/health', (req, res) => {
const health = {
status: 'ok',
uptime: process.uptime(),
connections: io.engine.clientsCount,
rooms: io.sockets.adapter.rooms.size,
memory: process.memoryUsage(),
timestamp: Date.now(),
};
if (health.connections > 10000) {
health.status = 'degraded';
res.status(503);
}
res.json(health);
});
12.2 グレースフルシャットダウン
async function gracefulShutdown(signal) {
console.log(`Received ${signal}. Starting graceful shutdown...`);
// 1. 新規接続を拒否
httpServer.close();
// 2. 既存クライアントに終了を通知
io.emit('server:shutdown', {
message: 'Server is shutting down. Please reconnect.',
reconnectDelay: 5000,
});
// 3. 猶予期間後にすべての接続を切断
const disconnectTimeout = setTimeout(() => {
io.disconnectSockets(true);
}, 10000);
// 4. 保留中の操作を完了
await flushPendingMessages();
// 5. Redis接続をクリーンアップ
await pubClient.quit();
await subClient.quit();
clearTimeout(disconnectTimeout);
console.log('Graceful shutdown complete');
process.exit(0);
}
process.on('SIGTERM', () => gracefulShutdown('SIGTERM'));
process.on('SIGINT', () => gracefulShutdown('SIGINT'));
12.3 モニタリング
// Prometheusメトリクス
import { Registry, Counter, Gauge, Histogram } from 'prom-client';
const register = new Registry();
const wsConnections = new Gauge({
name: 'websocket_connections_active',
help: 'Number of active WebSocket connections',
registers: [register],
});
const wsMessages = new Counter({
name: 'websocket_messages_total',
help: 'Total WebSocket messages',
labelNames: ['direction', 'type'],
registers: [register],
});
const wsLatency = new Histogram({
name: 'websocket_message_latency_seconds',
help: 'WebSocket message processing latency',
buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1],
registers: [register],
});
io.on('connection', (socket) => {
wsConnections.inc();
socket.on('disconnect', () => {
wsConnections.dec();
});
socket.use(([event, ...args], next) => {
const start = process.hrtime.bigint();
wsMessages.inc({ direction: 'inbound', type: event });
next();
const duration = Number(process.hrtime.bigint() - start) / 1e9;
wsLatency.observe(duration);
});
});
app.get('/metrics', async (req, res) => {
res.set('Content-Type', register.contentType);
res.end(await register.metrics());
});
13. プロダクションアーキテクチャパターン
13.1 チャットサービス全体(ぜんたい)アーキテクチャ
Client (React/Vue)
|
| WebSocket (Socket.IO)
v
Load Balancer (Nginx, sticky sessions)
|
+---> App Server 1 ---+
+---> App Server 2 ---+--> Redis Pub/Sub (メッセージブロードキャスト)
+---> App Server 3 ---+
|
+--> PostgreSQL (メッセージ永続化)
+--> Redis (セッション、プレゼンス、キャッシュ)
+--> S3 (ファイルアップロード)
13.2 イベント駆動(くどう)アーキテクチャ
// イベントバスパターン
class EventBus {
constructor(redis) {
this.redis = redis;
this.handlers = new Map();
}
async publish(channel, event) {
await this.redis.publish(channel, JSON.stringify(event));
}
subscribe(channel, handler) {
if (!this.handlers.has(channel)) {
this.handlers.set(channel, []);
this.redis.subscribe(channel);
}
this.handlers.get(channel).push(handler);
}
}
14. クイズ
Q1: WebSocketハンドシェイクで使用されるHTTPステータスコードは何ですか?
回答:101 Switching Protocols
WebSocket接続はHTTPアップグレードメカニズムを通じて確立されます。クライアントがUpgrade: websocketヘッダーを含むリクエストを送信すると、サーバーは101ステータスコードで応答し、プロトコル切替が成功したことを示します。その後、双方向バイナリフレーム通信が開始されます。
Q2: SSEの自動再接続メカニズムで使用されるヘッダーは何ですか?
回答:Last-Event-ID
EventSource APIは接続が切断されると自動的に再接続を試みます。その際、最後に受信したイベントのIDをLast-Event-IDリクエストヘッダーに含めてサーバーに送信します。サーバーはこのID以降のイベントから再送信できます。retryフィールドで再接続間隔(ミリ秒)を指定することもできます。
Q3: Socket.IOのRoomとNamespaceの違いは何ですか?
回答:
Namespaceは単一のWebSocket接続上で複数の通信チャネルをマルチプレクシングします。異なるエンドポイント(例:/chat、/admin)で区別され、それぞれ独立したミドルウェアとイベントハンドラーを持てます。
RoomはNamespace内部でクライアントをグループ化する論理的な単位です。socket.join(room)で参加し、io.to(room).emit()で特定グループにのみメッセージを送信できます。Roomはサーバー側の概念であり、クライアントは直接アクセスできません。
Q4: OTとCRDTの核心的な違いを説明してください。
回答:
OTは中央サーバーが同時編集オペレーションを変換して一貫性を維持します。Google Docsが代表例です。サーバー依存であり、オフライン編集に制限があります。
CRDTはデータ構造自体が競合を解決するため、中央サーバーなしでも結果整合性を保証します。Y.jsやAutomergeが代表例です。P2P通信とオフライン編集を自然にサポートしますが、メタデータによりメモリ使用量が大きくなります。
Q5: WebSocketサーバーを水平スケーリングする際にRedis Adapterが必要な理由は?
回答:
WebSocket接続は特定のサーバーインスタンスにバインドされます。複数のサーバーインスタンスがある場合、サーバーAに接続されたクライアントが送信したメッセージをサーバーBに接続されたクライアントに配信する必要があります。
Redis AdapterはRedisのPub/Sub機能を使用して、すべてのサーバーインスタンス間でメッセージをブロードキャストします。あるサーバーでio.to(room).emit()を呼び出すと、Redisを通じて他のすべてのサーバーにもイベントが伝播され、クラスター全体のクライアントにメッセージが配信されます。さらにSticky Sessionの設定も必要です。
15. 参考資料(さんこうしりょう)
- RFC 6455 - The WebSocket Protocol - https://datatracker.ietf.org/doc/html/rfc6455
- MDN - WebSocket API - https://developer.mozilla.org/en-US/docs/Web/API/WebSocket
- MDN - Server-Sent Events - https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events
- Socket.IO Documentation - https://socket.io/docs/v4/
- ws - Node.js WebSocketライブラリ - https://github.com/websockets/ws
- Y.js - CRDTフレームワーク - https://docs.yjs.dev/
- Automerge - CRDTライブラリ - https://automerge.org/
- Redis Adapter for Socket.IO - https://socket.io/docs/v4/redis-adapter/
- NATS Messaging - https://nats.io/
- Protocol Buffers - https://protobuf.dev/
- Scaling WebSocket (Socket.IO docs) - https://socket.io/docs/v4/using-multiple-nodes/
- HTML Living Standard - Server-Sent Events - https://html.spec.whatwg.org/multipage/server-sent-events.html
- pino - Node.jsロガー - https://getpino.io/
このガイドで取(と)り上(あ)げたリアルタイム通信技術はそれぞれ固有(こゆう)の強(つよ)みを持っています。SSEはシンプルな単方向ストリーミングに、WebSocketは双方向通信に、Socket.IOは豊富(ほうふ)な機能が必要なプロジェクトに最適です。プロダクションでは必ず認証、レート制限、モニタリングを整(ととの)え、水平スケーリングのためにRedis AdapterとSticky Sessionを設定してください。