Skip to content

Split View: WebSocket & 실시간 통신 완전 가이드 2025: SSE, Socket.IO, 채팅/알림/협업 구현

✨ Learn with Quiz
|

WebSocket & 실시간 통신 완전 가이드 2025: SSE, Socket.IO, 채팅/알림/협업 구현

TOC

1. 실시간 통신이 왜 중요한가

현대 웹 애플리케이션은 더 이상 단순한 요청-응답 모델로 충분하지 않습니다. 채팅, 알림, 실시간 대시보드, 협업 편집, 온라인 게임 등 사용자 경험의 핵심이 **즉시성(immediacy)**에 달려 있습니다.

전통적인 HTTP 요청-응답 모델의 한계:

  • 클라이언트가 항상 먼저 요청해야 서버가 응답
  • 서버가 자발적으로 클라이언트에게 데이터를 보낼 수 없음
  • 폴링(Polling)으로 우회할 수 있지만 비효율적
  • 지연 시간이 높고 서버 리소스 낭비가 심함

이 가이드에서는 WebSocket, SSE(Server-Sent Events), Socket.IO 등 실시간 통신 기술의 전체 스펙트럼을 다룹니다.


2. 통신 방식 비교: Polling vs Long Polling vs SSE vs WebSocket

2.1 비교 테이블

특성Short PollingLong PollingSSEWebSocket
방향클라이언트 → 서버클라이언트 → 서버서버 → 클라이언트양방향
프로토콜HTTPHTTPHTTPws:// / wss://
연결 유지매번 새 연결응답까지 유지지속 연결지속 연결
지연 시간높음 (폴링 간격)중간낮음매우 낮음
서버 부하높음중간낮음낮음
브라우저 지원모든 브라우저모든 브라우저대부분대부분
자동 재연결직접 구현직접 구현내장직접 구현
바이너리 데이터비효율적비효율적미지원지원

2.2 Short Polling

가장 단순한 방식입니다. 일정 간격으로 서버에 요청을 보냅니다.

// Short Polling 구현
function startPolling(interval = 5000) {
  const poll = async () => {
    try {
      const response = await fetch('/api/messages');
      const data = await response.json();
      updateUI(data);
    } catch (error) {
      console.error('Polling error:', error);
    }
  };

  // 즉시 한 번 실행 후 인터벌 설정
  poll();
  return setInterval(poll, interval);
}

// 정리
const pollId = startPolling(3000);
// clearInterval(pollId);

문제: 새로운 데이터가 없어도 계속 요청을 보내므로 서버 리소스가 낭비됩니다.

2.3 Long Polling

서버가 새로운 데이터가 생길 때까지 응답을 지연시킵니다.

// Long Polling 클라이언트
async function longPoll(lastEventId = null) {
  try {
    const url = lastEventId
      ? `/api/events?since=${lastEventId}`
      : '/api/events';

    const response = await fetch(url, {
      signal: AbortSignal.timeout(30000), // 30초 타임아웃
    });

    if (response.status === 200) {
      const data = await response.json();
      handleNewData(data);
      // 즉시 다음 Long Poll 시작
      longPoll(data.lastEventId);
    } else if (response.status === 204) {
      // 데이터 없음 - 타임아웃 후 재시도
      longPoll(lastEventId);
    }
  } catch (error) {
    // 에러 시 점진적 재시도
    console.error('Long poll error:', error);
    setTimeout(() => longPoll(lastEventId), 3000);
  }
}
// Long Polling 서버 (Express)
app.get('/api/events', async (req, res) => {
  const since = req.query.since;
  const timeout = 25000; // 25초

  const checkForData = () => {
    return new Promise((resolve) => {
      const timer = setTimeout(() => {
        resolve(null); // 타임아웃
      }, timeout);

      eventEmitter.once('newData', (data) => {
        clearTimeout(timer);
        resolve(data);
      });
    });
  };

  const data = await checkForData();
  if (data) {
    res.json(data);
  } else {
    res.status(204).end();
  }
});

2.4 어떤 것을 선택해야 하는가

  • Short Polling: 데이터 변경이 드물고 실시간성이 중요하지 않을 때
  • Long Polling: WebSocket을 사용할 수 없는 환경에서의 차선책
  • SSE: 서버에서 클라이언트로의 단방향 스트리밍 (알림, 피드, 실시간 로그)
  • WebSocket: 양방향 통신이 필요한 경우 (채팅, 게임, 협업 편집)

3. WebSocket 프로토콜 심층 분석

3.1 WebSocket 핸드셰이크

WebSocket은 HTTP 업그레이드 메커니즘을 통해 연결을 수립합니다.

// 클라이언트 요청
GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Sec-WebSocket-Protocol: chat, superchat

// 서버 응답
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat

핵심 포인트:

  • HTTP 101 상태 코드로 프로토콜 전환
  • Sec-WebSocket-Key와 매직 GUID를 조합하여 Accept 값 생성
  • 핸드셰이크 이후 양방향 바이너리 프레임 통신

3.2 WebSocket 프레임 구조

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len |    Extended payload length    |
|I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
|N|V|V|V|       |S|             |   (if payload len==126/127)   |
| |1|2|3|       |K|             |                               |
+-+-+-+-+-------+-+-------------+-------------------------------+

주요 opcode:

  • 0x1: 텍스트 프레임
  • 0x2: 바이너리 프레임
  • 0x8: 연결 종료
  • 0x9: Ping
  • 0xA: Pong

3.3 Node.js 기본 WebSocket 서버

import { WebSocketServer } from 'ws';
import { createServer } from 'http';

const server = createServer();
const wss = new WebSocketServer({ server });

// 연결된 클라이언트 관리
const clients = new Map();

wss.on('connection', (ws, req) => {
  const clientId = generateId();
  clients.set(clientId, { ws, alive: true });

  console.log(`Client connected: ${clientId}`);

  // Ping/Pong 하트비트
  ws.isAlive = true;
  ws.on('pong', () => {
    ws.isAlive = true;
  });

  // 메시지 수신
  ws.on('message', (data, isBinary) => {
    const message = isBinary ? data : data.toString();

    try {
      const parsed = JSON.parse(message);
      handleMessage(clientId, parsed);
    } catch (e) {
      ws.send(JSON.stringify({ error: 'Invalid message format' }));
    }
  });

  // 연결 종료
  ws.on('close', (code, reason) => {
    console.log(`Client disconnected: ${clientId}, code: ${code}`);
    clients.delete(clientId);
  });

  // 에러 처리
  ws.on('error', (error) => {
    console.error(`WebSocket error for ${clientId}:`, error);
  });

  // 환영 메시지
  ws.send(JSON.stringify({
    type: 'welcome',
    clientId,
    timestamp: Date.now(),
  }));
});

// 하트비트 인터벌
const heartbeat = setInterval(() => {
  wss.clients.forEach((ws) => {
    if (ws.isAlive === false) {
      return ws.terminate();
    }
    ws.isAlive = false;
    ws.ping();
  });
}, 30000);

wss.on('close', () => {
  clearInterval(heartbeat);
});

function handleMessage(clientId, message) {
  switch (message.type) {
    case 'broadcast':
      broadcast(clientId, message.payload);
      break;
    case 'direct':
      sendToClient(message.targetId, message.payload);
      break;
    default:
      console.warn(`Unknown message type: ${message.type}`);
  }
}

function broadcast(senderId, payload) {
  const message = JSON.stringify({
    type: 'broadcast',
    from: senderId,
    payload,
    timestamp: Date.now(),
  });

  clients.forEach((client, id) => {
    if (id !== senderId && client.ws.readyState === 1) {
      client.ws.send(message);
    }
  });
}

function sendToClient(targetId, payload) {
  const target = clients.get(targetId);
  if (target && target.ws.readyState === 1) {
    target.ws.send(JSON.stringify({ type: 'direct', payload }));
  }
}

server.listen(8080, () => {
  console.log('WebSocket server running on port 8080');
});

3.4 클라이언트 구현 (자동 재연결)

class ReconnectingWebSocket {
  constructor(url, options = {}) {
    this.url = url;
    this.maxRetries = options.maxRetries || 10;
    this.baseDelay = options.baseDelay || 1000;
    this.maxDelay = options.maxDelay || 30000;
    this.retryCount = 0;
    this.handlers = new Map();
    this.messageQueue = [];
    this.connect();
  }

  connect() {
    this.ws = new WebSocket(this.url);

    this.ws.onopen = () => {
      console.log('WebSocket connected');
      this.retryCount = 0;
      // 큐에 쌓인 메시지 전송
      while (this.messageQueue.length > 0) {
        const msg = this.messageQueue.shift();
        this.ws.send(msg);
      }
      this.emit('open');
    };

    this.ws.onmessage = (event) => {
      try {
        const data = JSON.parse(event.data);
        this.emit('message', data);
        if (data.type) {
          this.emit(data.type, data);
        }
      } catch (e) {
        this.emit('message', event.data);
      }
    };

    this.ws.onclose = (event) => {
      this.emit('close', event);
      if (!event.wasClean) {
        this.reconnect();
      }
    };

    this.ws.onerror = (error) => {
      this.emit('error', error);
    };
  }

  reconnect() {
    if (this.retryCount >= this.maxRetries) {
      console.error('Max retries reached');
      this.emit('maxRetriesReached');
      return;
    }

    // 지수 백오프 + 지터
    const delay = Math.min(
      this.baseDelay * Math.pow(2, this.retryCount) + Math.random() * 1000,
      this.maxDelay
    );

    console.log(`Reconnecting in ${delay}ms (attempt ${this.retryCount + 1})`);
    this.retryCount++;
    setTimeout(() => this.connect(), delay);
  }

  send(data) {
    const message = typeof data === 'string' ? data : JSON.stringify(data);
    if (this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(message);
    } else {
      this.messageQueue.push(message);
    }
  }

  on(event, handler) {
    if (!this.handlers.has(event)) {
      this.handlers.set(event, []);
    }
    this.handlers.get(event).push(handler);
  }

  emit(event, data) {
    const handlers = this.handlers.get(event) || [];
    handlers.forEach((handler) => handler(data));
  }

  close() {
    this.maxRetries = 0; // 재연결 방지
    this.ws.close(1000, 'Client closing');
  }
}

// 사용 예시
const ws = new ReconnectingWebSocket('wss://api.example.com/ws');
ws.on('message', (data) => console.log('Received:', data));
ws.send({ type: 'join', room: 'general' });

4. SSE (Server-Sent Events)

4.1 SSE란 무엇인가

SSE는 서버에서 클라이언트로의 단방향 스트리밍을 위한 표준입니다. HTTP 연결을 유지하면서 서버가 이벤트를 push 합니다.

장점:

  • HTTP/2와 자연스럽게 호환
  • 자동 재연결 내장
  • Last-Event-ID를 통한 이벤트 복구
  • 텍스트 기반으로 디버깅 용이
  • 프록시/로드밸런서 친화적

4.2 SSE 서버 구현

// Express SSE 서버
import express from 'express';

const app = express();

// SSE 클라이언트 관리
const sseClients = new Map();
let eventCounter = 0;

app.get('/api/events', (req, res) => {
  const clientId = Date.now().toString();

  // SSE 헤더 설정
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
    'X-Accel-Buffering': 'no', // Nginx 버퍼링 비활성화
  });

  // Last-Event-ID 처리 (재연결 시 복구)
  const lastEventId = req.headers['last-event-id'];
  if (lastEventId) {
    // 놓친 이벤트 재전송
    const missedEvents = getEventsSince(parseInt(lastEventId));
    missedEvents.forEach((event) => {
      res.write(`id: ${event.id}\n`);
      res.write(`event: ${event.type}\n`);
      res.write(`data: ${JSON.stringify(event.data)}\n\n`);
    });
  }

  // 클라이언트 등록
  sseClients.set(clientId, res);

  // 재연결 간격 설정 (밀리초)
  res.write('retry: 5000\n\n');

  // 초기 연결 확인
  res.write(`id: ${++eventCounter}\n`);
  res.write('event: connected\n');
  res.write(`data: ${JSON.stringify({ clientId })}\n\n`);

  // Keep-alive (15초마다 코멘트 전송)
  const keepAlive = setInterval(() => {
    res.write(':keep-alive\n\n');
  }, 15000);

  // 연결 종료 처리
  req.on('close', () => {
    clearInterval(keepAlive);
    sseClients.delete(clientId);
    console.log(`SSE client disconnected: ${clientId}`);
  });
});

// 모든 SSE 클라이언트에 이벤트 브로드캐스트
function broadcastSSE(eventType, data) {
  const id = ++eventCounter;
  sseClients.forEach((res) => {
    res.write(`id: ${id}\n`);
    res.write(`event: ${eventType}\n`);
    res.write(`data: ${JSON.stringify(data)}\n\n`);
  });
}

// 알림 발송 예시
app.post('/api/notify', express.json(), (req, res) => {
  broadcastSSE('notification', {
    title: req.body.title,
    message: req.body.message,
    timestamp: Date.now(),
  });
  res.json({ success: true, clients: sseClients.size });
});

4.3 SSE 클라이언트 구현

// EventSource API 활용
class SSEClient {
  constructor(url) {
    this.url = url;
    this.handlers = {};
    this.connect();
  }

  connect() {
    this.source = new EventSource(this.url);

    // 기본 이벤트 핸들러
    this.source.onopen = () => {
      console.log('SSE connection opened');
    };

    this.source.onerror = (error) => {
      console.error('SSE error:', error);
      if (this.source.readyState === EventSource.CLOSED) {
        console.log('SSE connection closed');
      }
      // EventSource가 자동으로 재연결을 시도함
    };

    // 커스텀 이벤트 등록
    Object.entries(this.handlers).forEach(([event, handler]) => {
      this.source.addEventListener(event, handler);
    });
  }

  on(event, handler) {
    const wrappedHandler = (e) => {
      try {
        const data = JSON.parse(e.data);
        handler(data, e);
      } catch {
        handler(e.data, e);
      }
    };

    this.handlers[event] = wrappedHandler;
    if (this.source) {
      this.source.addEventListener(event, wrappedHandler);
    }
  }

  close() {
    if (this.source) {
      this.source.close();
    }
  }
}

// 사용 예시
const sse = new SSEClient('/api/events');
sse.on('notification', (data) => {
  showNotification(data.title, data.message);
});
sse.on('update', (data) => {
  refreshDashboard(data);
});

4.4 SSE vs WebSocket 선택 기준

SSE를 선택해야 할 때:

  • 서버에서 클라이언트로의 단방향 데이터 흐름
  • 실시간 알림/피드/대시보드
  • HTTP/2 멀티플렉싱 활용 가능
  • 로드밸런서/프록시 설정이 단순해야 할 때

WebSocket을 선택해야 할 때:

  • 양방향 통신 필요 (채팅, 게임)
  • 바이너리 데이터 전송
  • 매우 낮은 지연 시간 요구
  • 클라이언트-서버 간 빈번한 메시지 교환

5. Socket.IO 완전 가이드

5.1 Socket.IO 아키텍처

Socket.IO는 WebSocket 위에 구축된 라이브러리로, 다음 기능을 추가합니다:

  • 자동 재연결 (지수 백오프)
  • 패킷 버퍼링
  • 룸(Room)과 네임스페이스(Namespace)
  • 브로드캐스트
  • 멀티플렉싱
  • HTTP Long Polling 폴백
// 서버 설정
import { createServer } from 'http';
import { Server } from 'socket.io';

const httpServer = createServer();
const io = new Server(httpServer, {
  cors: {
    origin: ['https://example.com'],
    methods: ['GET', 'POST'],
    credentials: true,
  },
  // 트랜스포트 설정
  transports: ['websocket', 'polling'],
  // Ping/Pong 설정
  pingTimeout: 20000,
  pingInterval: 25000,
  // 최대 페이로드
  maxHttpBufferSize: 1e6, // 1MB
});

5.2 네임스페이스와 룸

// 네임스페이스 - 논리적 분리
const chatNamespace = io.of('/chat');
const adminNamespace = io.of('/admin');

// 채팅 네임스페이스
chatNamespace.on('connection', (socket) => {
  console.log('User connected to chat:', socket.id);

  // 룸 참여
  socket.on('joinRoom', async (roomName) => {
    await socket.join(roomName);
    // 해당 룸의 다른 사용자에게 알림
    socket.to(roomName).emit('userJoined', {
      userId: socket.id,
      room: roomName,
    });

    // 룸 참여자 수 전송
    const sockets = await chatNamespace.in(roomName).fetchSockets();
    chatNamespace.to(roomName).emit('roomInfo', {
      room: roomName,
      members: sockets.length,
    });
  });

  // 룸 퇴장
  socket.on('leaveRoom', async (roomName) => {
    await socket.leave(roomName);
    socket.to(roomName).emit('userLeft', {
      userId: socket.id,
      room: roomName,
    });
  });

  // 룸 내 메시지 전송
  socket.on('message', (data) => {
    chatNamespace.to(data.room).emit('message', {
      from: socket.id,
      text: data.text,
      room: data.room,
      timestamp: Date.now(),
    });
  });
});

// 관리자 네임스페이스 - 인증 필요
adminNamespace.use((socket, next) => {
  const token = socket.handshake.auth.token;
  if (verifyAdminToken(token)) {
    next();
  } else {
    next(new Error('Unauthorized'));
  }
});

5.3 미들웨어와 인증

// 글로벌 미들웨어
io.use((socket, next) => {
  const token = socket.handshake.auth.token;
  if (!token) {
    return next(new Error('Authentication required'));
  }

  try {
    const user = verifyToken(token);
    socket.data.user = user;
    next();
  } catch (err) {
    next(new Error('Invalid token'));
  }
});

// 네임스페이스 미들웨어
chatNamespace.use((socket, next) => {
  // 추가 권한 확인
  if (socket.data.user.role === 'banned') {
    return next(new Error('User is banned'));
  }
  next();
});

// 연결 후 이벤트별 미들웨어
io.on('connection', (socket) => {
  // 모든 이벤트에 대한 로깅 미들웨어
  socket.use(([event, ...args], next) => {
    console.log(`Event: ${event}, User: ${socket.data.user.name}`);
    next();
  });

  // Rate limiting 미들웨어
  const rateLimiter = createRateLimiter(100, 60000); // 분당 100개
  socket.use(([event, ...args], next) => {
    if (rateLimiter.isAllowed(socket.id)) {
      next();
    } else {
      next(new Error('Rate limit exceeded'));
    }
  });
});

5.4 Acknowledgements (응답 확인)

// 서버
io.on('connection', (socket) => {
  socket.on('createOrder', async (orderData, callback) => {
    try {
      const order = await processOrder(orderData);
      // 성공 응답
      callback({
        status: 'ok',
        orderId: order.id,
      });
    } catch (error) {
      // 에러 응답
      callback({
        status: 'error',
        message: error.message,
      });
    }
  });
});

// 클라이언트
socket.emit('createOrder', { item: 'laptop', qty: 1 }, (response) => {
  if (response.status === 'ok') {
    console.log('Order created:', response.orderId);
  } else {
    console.error('Order failed:', response.message);
  }
});

// 타임아웃 포함 emit
socket.timeout(5000).emit('createOrder', orderData, (err, response) => {
  if (err) {
    // 서버가 5초 안에 응답하지 않음
    console.error('Timeout');
  } else {
    console.log('Response:', response);
  }
});

6. 채팅 시스템 구현

6.1 메시지 순서 보장

분산 환경에서 메시지 순서를 보장하는 것은 핵심 과제입니다.

// 서버: 메시지 순서 관리
class ChatRoom {
  constructor(roomId) {
    this.roomId = roomId;
    this.messageSequence = 0;
    this.pendingMessages = new Map();
  }

  async addMessage(userId, content) {
    const sequence = ++this.messageSequence;
    const message = {
      id: `${this.roomId}-${sequence}`,
      sequence,
      roomId: this.roomId,
      userId,
      content,
      timestamp: Date.now(),
      status: 'sent',
    };

    // 영구 저장
    await this.persistMessage(message);

    return message;
  }

  async getMessagesSince(sequence, limit = 50) {
    // 특정 시퀀스 이후의 메시지 조회
    return await db.messages
      .find({
        roomId: this.roomId,
        sequence: { $gt: sequence },
      })
      .sort({ sequence: 1 })
      .limit(limit)
      .toArray();
  }
}

6.2 접속 상태(Presence) 관리

// Presence 시스템
class PresenceManager {
  constructor(io, redis) {
    this.io = io;
    this.redis = redis;
    this.TTL = 60; // 60초
  }

  async setOnline(userId, socketId) {
    const key = `presence:${userId}`;
    await this.redis.hset(key, {
      status: 'online',
      socketId,
      lastSeen: Date.now().toString(),
    });
    await this.redis.expire(key, this.TTL);

    // 친구에게 상태 변경 알림
    const friends = await this.getFriends(userId);
    friends.forEach((friendId) => {
      this.io.to(`user:${friendId}`).emit('presence', {
        userId,
        status: 'online',
      });
    });
  }

  async setOffline(userId) {
    const key = `presence:${userId}`;
    await this.redis.hset(key, {
      status: 'offline',
      lastSeen: Date.now().toString(),
    });

    const friends = await this.getFriends(userId);
    friends.forEach((friendId) => {
      this.io.to(`user:${friendId}`).emit('presence', {
        userId,
        status: 'offline',
        lastSeen: Date.now(),
      });
    });
  }

  async heartbeat(userId) {
    const key = `presence:${userId}`;
    await this.redis.expire(key, this.TTL);
  }

  async getStatus(userId) {
    const data = await this.redis.hgetall(`presence:${userId}`);
    return data || { status: 'offline' };
  }
}

6.3 타이핑 인디케이터

// 클라이언트
let typingTimeout;

messageInput.addEventListener('input', () => {
  socket.emit('typing', { room: currentRoom });

  clearTimeout(typingTimeout);
  typingTimeout = setTimeout(() => {
    socket.emit('stopTyping', { room: currentRoom });
  }, 2000);
});

// 서버
socket.on('typing', (data) => {
  socket.to(data.room).emit('typing', {
    userId: socket.data.user.id,
    userName: socket.data.user.name,
  });
});

socket.on('stopTyping', (data) => {
  socket.to(data.room).emit('stopTyping', {
    userId: socket.data.user.id,
  });
});

6.4 읽음 확인(Read Receipts)

// 서버: 읽음 확인 처리
socket.on('markRead', async (data) => {
  const { roomId, messageId, userId } = data;

  // DB에 읽음 상태 저장
  await db.readReceipts.upsert({
    roomId,
    userId,
    lastReadMessageId: messageId,
    readAt: new Date(),
  });

  // 룸 참여자에게 알림
  socket.to(roomId).emit('readReceipt', {
    userId,
    messageId,
    readAt: Date.now(),
  });
});

// 클라이언트: 읽음 확인 표시
socket.on('readReceipt', (receipt) => {
  updateMessageStatus(receipt.messageId, receipt.userId, 'read');
});

// Intersection Observer로 메시지 읽음 자동 감지
const observer = new IntersectionObserver(
  (entries) => {
    entries.forEach((entry) => {
      if (entry.isIntersecting) {
        const messageId = entry.target.dataset.messageId;
        socket.emit('markRead', {
          roomId: currentRoom,
          messageId,
          userId: currentUser.id,
        });
        observer.unobserve(entry.target);
      }
    });
  },
  { threshold: 0.5 }
);

7. 실시간 알림 시스템

7.1 Push 아키텍처

// 알림 서비스
class NotificationService {
  constructor(io, redis) {
    this.io = io;
    this.redis = redis;
  }

  async send(userId, notification) {
    const enriched = {
      id: generateId(),
      ...notification,
      createdAt: Date.now(),
      read: false,
    };

    // 영구 저장
    await db.notifications.insertOne({
      userId,
      ...enriched,
    });

    // 실시간 전송
    this.io.to(`user:${userId}`).emit('notification', enriched);

    // 오프라인이면 푸시 알림 큐에 추가
    const isOnline = await this.isUserOnline(userId);
    if (!isOnline) {
      await this.queuePushNotification(userId, enriched);
    }

    return enriched;
  }

  async sendBatch(userIds, notification) {
    // 배치 처리로 대량 알림 효율적 전송
    const enriched = {
      id: generateId(),
      ...notification,
      createdAt: Date.now(),
      read: false,
    };

    // 벌크 인서트
    const docs = userIds.map((userId) => ({
      userId,
      ...enriched,
    }));
    await db.notifications.insertMany(docs);

    // 온라인 사용자에게 실시간 전송
    userIds.forEach((userId) => {
      this.io.to(`user:${userId}`).emit('notification', enriched);
    });
  }

  async markAsRead(userId, notificationIds) {
    await db.notifications.updateMany(
      { _id: { $in: notificationIds }, userId },
      { $set: { read: true, readAt: new Date() } }
    );

    // 읽지 않은 알림 수 업데이트
    const unreadCount = await db.notifications.countDocuments({
      userId,
      read: false,
    });

    this.io.to(`user:${userId}`).emit('unreadCount', unreadCount);
  }
}

7.2 알림 배치 처리

// 고빈도 이벤트 배치 처리
class NotificationBatcher {
  constructor(flushInterval = 1000, maxBatchSize = 50) {
    this.buffer = new Map(); // userId -> notifications[]
    this.flushInterval = flushInterval;
    this.maxBatchSize = maxBatchSize;

    setInterval(() => this.flush(), this.flushInterval);
  }

  add(userId, notification) {
    if (!this.buffer.has(userId)) {
      this.buffer.set(userId, []);
    }

    const userBuffer = this.buffer.get(userId);
    userBuffer.push(notification);

    // 배치 크기 초과 시 즉시 전송
    if (userBuffer.length >= this.maxBatchSize) {
      this.flushUser(userId);
    }
  }

  flush() {
    this.buffer.forEach((notifications, userId) => {
      if (notifications.length > 0) {
        this.flushUser(userId);
      }
    });
  }

  flushUser(userId) {
    const notifications = this.buffer.get(userId) || [];
    if (notifications.length === 0) return;

    this.buffer.set(userId, []);

    // 중복 제거 및 그룹화
    const grouped = this.groupNotifications(notifications);
    io.to(`user:${userId}`).emit('notifications:batch', grouped);
  }

  groupNotifications(notifications) {
    // 같은 유형의 알림 그룹화
    const groups = {};
    notifications.forEach((n) => {
      const key = n.type;
      if (!groups[key]) groups[key] = [];
      groups[key].push(n);
    });
    return groups;
  }
}

8. 협업 편집: OT vs CRDT

8.1 Operational Transformation (OT)

OT는 Google Docs에서 사용하는 방식으로, 동시 편집 시 연산(Operation)을 변환(Transform)하여 일관성을 유지합니다.

// OT 기본 개념
// 두 사용자가 동시에 편집할 때:
// User A: insert('X', position=0) - 위치 0에 'X' 삽입
// User B: insert('Y', position=2) - 위치 2에 'Y' 삽입

// A의 연산을 B의 관점에서 변환:
// B가 위치 2에 삽입했으므로 A의 연산은 변경 불필요 (위치 0은 영향 없음)

// B의 연산을 A의 관점에서 변환:
// A가 위치 0에 삽입했으므로 B의 위치는 2+1=3으로 조정

function transformInsert(opA, opB) {
  if (opA.position <= opB.position) {
    return { ...opB, position: opB.position + 1 };
  }
  return opB;
}

8.2 CRDT (Conflict-free Replicated Data Type)

CRDT는 중앙 서버 없이도 최종 일관성(eventual consistency)을 보장하는 데이터 구조입니다.

// Y.js를 활용한 CRDT 기반 협업 편집
import * as Y from 'yjs';
import { WebsocketProvider } from 'y-websocket';

// 공유 문서 생성
const ydoc = new Y.Doc();

// WebSocket 프로바이더 연결
const provider = new WebsocketProvider(
  'wss://collaboration.example.com',
  'document-room-123',
  ydoc
);

// 공유 텍스트
const ytext = ydoc.getText('content');

// 텍스트 변경 감지
ytext.observe((event) => {
  console.log('Text changed:', event.delta);
});

// 텍스트 편집
ytext.insert(0, 'Hello, ');
ytext.insert(7, 'World!');

// 공유 맵 (키-값)
const ymap = ydoc.getMap('metadata');
ymap.set('title', 'My Document');
ymap.set('lastEdited', Date.now());

// 공유 배열
const yarray = ydoc.getArray('items');
yarray.push(['item1', 'item2']);

// 연결 상태 모니터링
provider.on('status', (event) => {
  console.log('Connection status:', event.status);
});

// Awareness (커서 위치, 사용자 정보)
const awareness = provider.awareness;
awareness.setLocalState({
  user: { name: 'Alice', color: '#ff0000' },
  cursor: { position: 42 },
});

awareness.on('change', () => {
  const states = awareness.getStates();
  updateCursors(states);
});

8.3 Y.js 서버 구현

// y-websocket 서버
import { WebSocketServer } from 'ws';
import { setupWSConnection } from 'y-websocket/bin/utils';

const wss = new WebSocketServer({ port: 1234 });

wss.on('connection', (ws, req) => {
  setupWSConnection(ws, req, {
    docname: req.url.slice(1), // URL 경로를 문서 이름으로 사용
    gc: true, // 가비지 컬렉션
  });
});

8.4 OT vs CRDT 비교

특성OTCRDT
서버 의존성중앙 서버 필수서버 없이도 가능
구현 복잡도높음중간
메모리 사용량낮음높음 (메타데이터)
오프라인 지원어려움자연스러움
대표 구현체Google DocsY.js, Automerge
확장성서버 병목 가능P2P 확장 가능

9. WebSocket 수평 확장

9.1 문제: 다중 서버 인스턴스

WebSocket 연결은 특정 서버 인스턴스에 바인딩됩니다. 여러 서버가 있을 때 한 서버의 클라이언트가 보낸 메시지를 다른 서버의 클라이언트에게 전달해야 합니다.

9.2 Redis Adapter (Socket.IO)

import { Server } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';

const io = new Server(httpServer);

// Redis 클라이언트 생성
const pubClient = createClient({ url: 'redis://redis-host:6379' });
const subClient = pubClient.duplicate();

await Promise.all([pubClient.connect(), subClient.connect()]);

// Redis Adapter 적용
io.adapter(createAdapter(pubClient, subClient));

// 이제 어떤 서버 인스턴스에서든 broadcast 가능
io.emit('globalEvent', { message: 'All servers receive this' });

// 특정 룸으로 전송 (모든 서버의 해당 룸 클라이언트에게)
io.to('room-123').emit('roomEvent', { data: 'value' });

9.3 Sticky Sessions 설정

WebSocket 연결이 업그레이드 시 같은 서버로 라우팅되어야 합니다.

# Nginx 설정
upstream websocket_backend {
    ip_hash;  # Sticky Session
    server backend1:3000;
    server backend2:3000;
    server backend3:3000;
}

server {
    listen 80;

    location /socket.io/ {
        proxy_pass http://websocket_backend;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

        # 타임아웃 설정
        proxy_read_timeout 86400s;
        proxy_send_timeout 86400s;
    }
}

9.4 NATS를 활용한 Pub/Sub

import { connect } from 'nats';

class NATSAdapter {
  constructor(io) {
    this.io = io;
    this.serverId = process.env.SERVER_ID || generateId();
  }

  async connect() {
    this.nc = await connect({ servers: 'nats://nats-host:4222' });
    this.js = this.nc.jetstream();

    // 브로드캐스트 채널 구독
    const sub = this.nc.subscribe('ws.broadcast.>');
    (async () => {
      for await (const msg of sub) {
        const data = JSON.parse(msg.data.toString());
        // 자신이 보낸 메시지는 무시
        if (data.serverId === this.serverId) continue;

        const topic = msg.subject.split('.').slice(2).join('.');
        this.io.to(topic).emit(data.event, data.payload);
      }
    })();
  }

  broadcast(room, event, payload) {
    this.nc.publish(`ws.broadcast.${room}`, JSON.stringify({
      serverId: this.serverId,
      event,
      payload,
    }));
  }
}

10. 보안

10.1 인증

// 토큰 기반 인증
io.use(async (socket, next) => {
  try {
    const token = socket.handshake.auth.token
      || socket.handshake.query.token;

    if (!token) {
      return next(new Error('Authentication token required'));
    }

    // JWT 검증
    const payload = await verifyJWT(token);

    // 토큰 만료 확인
    if (payload.exp * 1000 < Date.now()) {
      return next(new Error('Token expired'));
    }

    // 사용자 정보 첨부
    socket.data.user = {
      id: payload.sub,
      roles: payload.roles,
    };

    next();
  } catch (err) {
    next(new Error('Invalid authentication token'));
  }
});

// 주기적 토큰 갱신
socket.on('refreshToken', async (refreshToken, callback) => {
  try {
    const newToken = await refreshAccessToken(refreshToken);
    callback({ status: 'ok', token: newToken });
  } catch (err) {
    callback({ status: 'error', message: 'Token refresh failed' });
    socket.disconnect();
  }
});

10.2 Rate Limiting

// 토큰 버킷 알고리즘
class TokenBucket {
  constructor(capacity, refillRate) {
    this.capacity = capacity;
    this.tokens = capacity;
    this.refillRate = refillRate; // 초당 토큰 추가량
    this.lastRefill = Date.now();
  }

  consume(count = 1) {
    this.refill();
    if (this.tokens >= count) {
      this.tokens -= count;
      return true;
    }
    return false;
  }

  refill() {
    const now = Date.now();
    const elapsed = (now - this.lastRefill) / 1000;
    this.tokens = Math.min(
      this.capacity,
      this.tokens + elapsed * this.refillRate
    );
    this.lastRefill = now;
  }
}

// Socket.IO에 적용
const rateLimiters = new Map();

io.use((socket, next) => {
  const userId = socket.data.user.id;
  if (!rateLimiters.has(userId)) {
    rateLimiters.set(userId, new TokenBucket(100, 10)); // 100 용량, 초당 10 리필
  }
  next();
});

socket.use(([event, ...args], next) => {
  const bucket = rateLimiters.get(socket.data.user.id);
  if (bucket.consume()) {
    next();
  } else {
    next(new Error('Rate limit exceeded'));
  }
});

10.3 Origin 검증

const io = new Server(httpServer, {
  cors: {
    origin: (origin, callback) => {
      const allowedOrigins = [
        'https://app.example.com',
        'https://admin.example.com',
      ];

      if (!origin || allowedOrigins.includes(origin)) {
        callback(null, true);
      } else {
        callback(new Error('Not allowed by CORS'));
      }
    },
    credentials: true,
  },
});

// 추가 Origin 검증 미들웨어
io.use((socket, next) => {
  const origin = socket.handshake.headers.origin;
  const allowedOrigins = process.env.ALLOWED_ORIGINS?.split(',') || [];

  if (allowedOrigins.length > 0 && !allowedOrigins.includes(origin)) {
    return next(new Error('Origin not allowed'));
  }
  next();
});

11. 성능 최적화

11.1 메시지 압축

// permessage-deflate 확장
import { WebSocketServer } from 'ws';

const wss = new WebSocketServer({
  port: 8080,
  perMessageDeflate: {
    zlibDeflateOptions: {
      chunkSize: 1024,
      memLevel: 7,
      level: 3, // 압축 레벨 (1-9)
    },
    zlibInflateOptions: {
      chunkSize: 10 * 1024,
    },
    // 임계값 이상만 압축
    threshold: 1024, // 1KB 이상만
    clientNoContextTakeover: true,
    serverNoContextTakeover: true,
    serverMaxWindowBits: 10,
    concurrencyLimit: 10,
  },
});

11.2 바이너리 프로토콜 (Protocol Buffers)

// message.proto
syntax = "proto3";

message ChatMessage {
  string id = 1;
  string room_id = 2;
  string user_id = 3;
  string content = 4;
  int64 timestamp = 5;
  MessageType type = 6;

  enum MessageType {
    TEXT = 0;
    IMAGE = 1;
    FILE = 2;
    SYSTEM = 3;
  }
}

message WebSocketFrame {
  string event = 1;
  oneof payload {
    ChatMessage chat_message = 2;
    PresenceUpdate presence = 3;
    TypingIndicator typing = 4;
  }
}
// Protobuf 사용
import protobuf from 'protobufjs';

const root = await protobuf.load('message.proto');
const ChatMessage = root.lookupType('ChatMessage');
const WebSocketFrame = root.lookupType('WebSocketFrame');

// 인코딩
function encodeMessage(event, payload) {
  const frame = WebSocketFrame.create({ event, chatMessage: payload });
  return WebSocketFrame.encode(frame).finish();
}

// 디코딩
function decodeMessage(buffer) {
  return WebSocketFrame.decode(new Uint8Array(buffer));
}

// WebSocket에서 바이너리 전송
ws.on('message', (data, isBinary) => {
  if (isBinary) {
    const frame = decodeMessage(data);
    handleFrame(frame);
  }
});

11.3 연결 풀링과 최적화

// 연결 수 제한
class ConnectionManager {
  constructor(maxConnectionsPerUser = 5) {
    this.connections = new Map(); // userId -> Set of socketIds
    this.maxPerUser = maxConnectionsPerUser;
  }

  addConnection(userId, socketId) {
    if (!this.connections.has(userId)) {
      this.connections.set(userId, new Set());
    }

    const userConnections = this.connections.get(userId);

    // 최대 연결 수 초과 시 가장 오래된 연결 종료
    if (userConnections.size >= this.maxPerUser) {
      const oldest = userConnections.values().next().value;
      io.sockets.sockets.get(oldest)?.disconnect();
      userConnections.delete(oldest);
    }

    userConnections.add(socketId);
  }

  removeConnection(userId, socketId) {
    const userConnections = this.connections.get(userId);
    if (userConnections) {
      userConnections.delete(socketId);
      if (userConnections.size === 0) {
        this.connections.delete(userId);
      }
    }
  }

  getConnectionCount() {
    let total = 0;
    this.connections.forEach((sockets) => {
      total += sockets.size;
    });
    return total;
  }
}

12. 프로덕션 운영

12.1 Health Check

// WebSocket 서버 헬스 체크
app.get('/health', (req, res) => {
  const health = {
    status: 'ok',
    uptime: process.uptime(),
    connections: io.engine.clientsCount,
    rooms: io.sockets.adapter.rooms.size,
    memory: process.memoryUsage(),
    timestamp: Date.now(),
  };

  // 연결 수 임계값 확인
  if (health.connections > 10000) {
    health.status = 'degraded';
    res.status(503);
  }

  res.json(health);
});

// 쿠버네티스 readiness/liveness 프로브
app.get('/ready', (req, res) => {
  if (isReady) {
    res.status(200).json({ ready: true });
  } else {
    res.status(503).json({ ready: false });
  }
});

12.2 Graceful Shutdown

// 우아한 종료
async function gracefulShutdown(signal) {
  console.log(`Received ${signal}. Starting graceful shutdown...`);

  // 1. 새 연결 거부
  httpServer.close();

  // 2. 기존 클라이언트에 종료 알림
  io.emit('server:shutdown', {
    message: 'Server is shutting down. Please reconnect.',
    reconnectDelay: 5000,
  });

  // 3. 모든 연결 종료 (유예 시간)
  const disconnectTimeout = setTimeout(() => {
    io.disconnectSockets(true);
  }, 10000);

  // 4. 진행 중인 작업 완료 대기
  await flushPendingMessages();

  // 5. Redis 연결 정리
  await pubClient.quit();
  await subClient.quit();

  clearTimeout(disconnectTimeout);

  console.log('Graceful shutdown complete');
  process.exit(0);
}

process.on('SIGTERM', () => gracefulShutdown('SIGTERM'));
process.on('SIGINT', () => gracefulShutdown('SIGINT'));

12.3 모니터링

// Prometheus 메트릭
import { Registry, Counter, Gauge, Histogram } from 'prom-client';

const register = new Registry();

const wsConnections = new Gauge({
  name: 'websocket_connections_active',
  help: 'Number of active WebSocket connections',
  registers: [register],
});

const wsMessages = new Counter({
  name: 'websocket_messages_total',
  help: 'Total WebSocket messages',
  labelNames: ['direction', 'type'],
  registers: [register],
});

const wsLatency = new Histogram({
  name: 'websocket_message_latency_seconds',
  help: 'WebSocket message processing latency',
  buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1],
  registers: [register],
});

// 메트릭 수집
io.on('connection', (socket) => {
  wsConnections.inc();

  socket.on('disconnect', () => {
    wsConnections.dec();
  });

  socket.use(([event, ...args], next) => {
    const start = process.hrtime.bigint();
    wsMessages.inc({ direction: 'inbound', type: event });

    next();

    const duration = Number(process.hrtime.bigint() - start) / 1e9;
    wsLatency.observe(duration);
  });
});

// 메트릭 엔드포인트
app.get('/metrics', async (req, res) => {
  res.set('Content-Type', register.contentType);
  res.end(await register.metrics());
});

12.4 로깅 모범 사례

// 구조화된 로깅
import pino from 'pino';

const logger = pino({
  level: process.env.LOG_LEVEL || 'info',
  serializers: {
    socket: (socket) => ({
      id: socket.id,
      userId: socket.data?.user?.id,
      rooms: [...socket.rooms],
    }),
  },
});

io.on('connection', (socket) => {
  logger.info({ socket }, 'Client connected');

  socket.on('disconnect', (reason) => {
    logger.info({ socket, reason }, 'Client disconnected');
  });

  socket.on('error', (error) => {
    logger.error({ socket, error: error.message }, 'Socket error');
  });
});

13. 실전 아키텍처 패턴

13.1 채팅 서비스 전체 아키텍처

Client (React/Vue)
    |
    | WebSocket (Socket.IO)
    v
Load Balancer (Nginx, sticky sessions)
    |
    +---> App Server 1 ---+
    +---> App Server 2 ---+--> Redis Pub/Sub (메시지 브로드캐스트)
    +---> App Server 3 ---+
                           |
                           +--> PostgreSQL (메시지 영구 저장)
                           +--> Redis (세션, 프레즌스, 캐시)
                           +--> S3 (파일 업로드)

13.2 이벤트 기반 아키텍처

// 이벤트 버스 패턴
class EventBus {
  constructor(redis) {
    this.redis = redis;
    this.handlers = new Map();
  }

  async publish(channel, event) {
    await this.redis.publish(channel, JSON.stringify(event));
  }

  subscribe(channel, handler) {
    if (!this.handlers.has(channel)) {
      this.handlers.set(channel, []);
      this.redis.subscribe(channel);
    }
    this.handlers.get(channel).push(handler);
  }
}

// 사용 예시
const bus = new EventBus(redisClient);

bus.subscribe('orders', (event) => {
  if (event.type === 'ORDER_CREATED') {
    // 관련 사용자에게 실시간 알림
    notificationService.send(event.userId, {
      type: 'order',
      title: 'Order Confirmed',
      body: `Your order #${event.orderId} has been confirmed`,
    });
  }
});

14. 퀴즈

Q1: WebSocket 핸드셰이크에서 사용하는 HTTP 상태 코드는 무엇인가요?

정답: 101 Switching Protocols

WebSocket 연결은 HTTP 업그레이드 메커니즘을 통해 수립됩니다. 클라이언트가 Upgrade: websocket 헤더와 함께 요청을 보내면, 서버는 101 상태 코드로 응답하여 프로토콜 전환이 성공했음을 알립니다. 이후 양방향 바이너리 프레임 통신이 시작됩니다.

Q2: SSE(Server-Sent Events)의 자동 재연결 메커니즘에서 사용하는 헤더는 무엇인가요?

정답: Last-Event-ID

EventSource API는 연결이 끊어지면 자동으로 재연결을 시도합니다. 이때 마지막으로 수신한 이벤트의 ID를 Last-Event-ID 요청 헤더에 포함하여 서버에 보냅니다. 서버는 이 ID 이후의 이벤트부터 다시 전송할 수 있습니다. retry 필드로 재연결 간격(밀리초)을 지정할 수도 있습니다.

Q3: Socket.IO의 Room과 Namespace의 차이점은 무엇인가요?

정답:

Namespace는 단일 WebSocket 연결 위에서 여러 통신 채널을 멀티플렉싱합니다. 서로 다른 엔드포인트(예: /chat, /admin)로 구분되며, 각각 독립적인 미들웨어와 이벤트 핸들러를 가질 수 있습니다.

Room은 Namespace 내부에서 클라이언트를 그룹화하는 논리적 단위입니다. socket.join(room)으로 참여하고 io.to(room).emit()으로 특정 그룹에만 메시지를 보낼 수 있습니다. Room은 서버 측 개념으로 클라이언트는 직접 접근할 수 없습니다.

Q4: OT(Operational Transformation)와 CRDT의 핵심 차이점을 설명하세요.

정답:

OT는 중앙 서버가 동시 편집 연산을 변환(Transform)하여 일관성을 유지합니다. Google Docs가 대표적입니다. 서버 의존적이며 오프라인 편집에 제한이 있습니다.

CRDT는 데이터 구조 자체가 충돌을 해결하므로 중앙 서버 없이도 최종 일관성을 보장합니다. Y.js, Automerge가 대표적입니다. P2P 통신과 오프라인 편집을 자연스럽게 지원하지만, 메타데이터로 인해 메모리 사용량이 더 큽니다.

Q5: WebSocket 서버를 수평 확장할 때 Redis Adapter가 필요한 이유는 무엇인가요?

정답:

WebSocket 연결은 특정 서버 인스턴스에 바인딩됩니다. 여러 서버 인스턴스가 있을 때, 서버 A에 연결된 클라이언트가 보낸 메시지를 서버 B에 연결된 클라이언트에게 전달해야 합니다.

Redis Adapter는 Redis의 Pub/Sub 기능을 사용하여 모든 서버 인스턴스 간에 메시지를 브로드캐스트합니다. 한 서버에서 io.to(room).emit()을 호출하면, Redis를 통해 다른 서버에도 해당 이벤트가 전파되어 모든 서버의 클라이언트에게 메시지가 전달됩니다. 추가로 Sticky Session 설정이 필요합니다.


15. 참고 자료

  1. RFC 6455 - The WebSocket Protocol - https://datatracker.ietf.org/doc/html/rfc6455
  2. MDN - WebSocket API - https://developer.mozilla.org/en-US/docs/Web/API/WebSocket
  3. MDN - Server-Sent Events - https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events
  4. Socket.IO Documentation - https://socket.io/docs/v4/
  5. ws - WebSocket library for Node.js - https://github.com/websockets/ws
  6. Y.js - CRDT Framework - https://docs.yjs.dev/
  7. Automerge - CRDT Library - https://automerge.org/
  8. Redis Adapter for Socket.IO - https://socket.io/docs/v4/redis-adapter/
  9. NATS Messaging - https://nats.io/
  10. Protocol Buffers - https://protobuf.dev/
  11. Scaling WebSocket (Socket.IO docs) - https://socket.io/docs/v4/using-multiple-nodes/
  12. HTML Living Standard - Server-Sent Events - https://html.spec.whatwg.org/multipage/server-sent-events.html
  13. pino - Node.js Logger - https://getpino.io/

이 가이드에서 다룬 실시간 통신 기술들은 각각 고유한 강점을 가지고 있습니다. SSE는 단순한 단방향 스트리밍에, WebSocket은 양방향 통신에, Socket.IO는 풍부한 기능이 필요한 프로젝트에 적합합니다. 프로덕션에서는 반드시 인증, Rate Limiting, 모니터링을 갖추고, 수평 확장을 위해 Redis Adapter와 Sticky Session을 설정하세요.

WebSocket & Real-time Communication Complete Guide 2025: SSE, Socket.IO, Chat/Notifications/Collaboration

TOC

1. Why Real-time Communication Matters

Modern web applications no longer operate on a simple request-response model. Chat, notifications, live dashboards, collaborative editing, online gaming -- the core of user experience depends on immediacy.

Limitations of the traditional HTTP request-response model:

  • The client must always initiate requests before the server responds
  • The server cannot proactively send data to the client
  • Polling can work around this but is inefficient
  • High latency and significant server resource waste

This guide covers the full spectrum of real-time communication technologies including WebSocket, SSE (Server-Sent Events), and Socket.IO.


2. Communication Methods Compared: Polling vs Long Polling vs SSE vs WebSocket

2.1 Comparison Table

FeatureShort PollingLong PollingSSEWebSocket
DirectionClient to ServerClient to ServerServer to ClientBidirectional
ProtocolHTTPHTTPHTTPws:// / wss://
ConnectionNew connection each timeHeld until responsePersistentPersistent
LatencyHigh (polling interval)MediumLowVery Low
Server LoadHighMediumLowLow
Browser SupportAll browsersAll browsersMostMost
Auto-reconnectManualManualBuilt-inManual
Binary DataInefficientInefficientNot supportedSupported

2.2 Short Polling

The simplest approach. The client sends requests to the server at regular intervals.

// Short Polling implementation
function startPolling(interval = 5000) {
  const poll = async () => {
    try {
      const response = await fetch('/api/messages');
      const data = await response.json();
      updateUI(data);
    } catch (error) {
      console.error('Polling error:', error);
    }
  };

  // Execute immediately then set interval
  poll();
  return setInterval(poll, interval);
}

// Cleanup
const pollId = startPolling(3000);
// clearInterval(pollId);

Problem: Requests are sent continuously even when there is no new data, wasting server resources.

2.3 Long Polling

The server delays the response until new data is available.

// Long Polling client
async function longPoll(lastEventId = null) {
  try {
    const url = lastEventId
      ? `/api/events?since=${lastEventId}`
      : '/api/events';

    const response = await fetch(url, {
      signal: AbortSignal.timeout(30000), // 30-second timeout
    });

    if (response.status === 200) {
      const data = await response.json();
      handleNewData(data);
      // Immediately start next long poll
      longPoll(data.lastEventId);
    } else if (response.status === 204) {
      // No data - retry after timeout
      longPoll(lastEventId);
    }
  } catch (error) {
    // Progressive retry on error
    console.error('Long poll error:', error);
    setTimeout(() => longPoll(lastEventId), 3000);
  }
}
// Long Polling server (Express)
app.get('/api/events', async (req, res) => {
  const since = req.query.since;
  const timeout = 25000; // 25 seconds

  const checkForData = () => {
    return new Promise((resolve) => {
      const timer = setTimeout(() => {
        resolve(null); // Timeout
      }, timeout);

      eventEmitter.once('newData', (data) => {
        clearTimeout(timer);
        resolve(data);
      });
    });
  };

  const data = await checkForData();
  if (data) {
    res.json(data);
  } else {
    res.status(204).end();
  }
});

2.4 Choosing the Right Approach

  • Short Polling: When data changes are infrequent and real-time is not critical
  • Long Polling: Fallback when WebSocket is unavailable
  • SSE: Unidirectional server-to-client streaming (notifications, feeds, live logs)
  • WebSocket: When bidirectional communication is needed (chat, games, collaborative editing)

3. WebSocket Protocol Deep Dive

3.1 WebSocket Handshake

WebSocket establishes connections through the HTTP Upgrade mechanism.

// Client request
GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Sec-WebSocket-Protocol: chat, superchat

// Server response
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat

Key points:

  • HTTP 101 status code signals protocol switch
  • Sec-WebSocket-Accept is computed from the Key plus a magic GUID
  • After the handshake, bidirectional binary frame communication begins

3.2 WebSocket Frame Structure

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len |    Extended payload length    |
|I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
|N|V|V|V|       |S|             |   (if payload len==126/127)   |
| |1|2|3|       |K|             |                               |
+-+-+-+-+-------+-+-------------+-------------------------------+

Key opcodes:

  • 0x1: Text frame
  • 0x2: Binary frame
  • 0x8: Connection close
  • 0x9: Ping
  • 0xA: Pong

3.3 Basic Node.js WebSocket Server

import { WebSocketServer } from 'ws';
import { createServer } from 'http';

const server = createServer();
const wss = new WebSocketServer({ server });

// Connected client management
const clients = new Map();

wss.on('connection', (ws, req) => {
  const clientId = generateId();
  clients.set(clientId, { ws, alive: true });

  console.log(`Client connected: ${clientId}`);

  // Ping/Pong heartbeat
  ws.isAlive = true;
  ws.on('pong', () => {
    ws.isAlive = true;
  });

  // Message reception
  ws.on('message', (data, isBinary) => {
    const message = isBinary ? data : data.toString();

    try {
      const parsed = JSON.parse(message);
      handleMessage(clientId, parsed);
    } catch (e) {
      ws.send(JSON.stringify({ error: 'Invalid message format' }));
    }
  });

  // Connection close
  ws.on('close', (code, reason) => {
    console.log(`Client disconnected: ${clientId}, code: ${code}`);
    clients.delete(clientId);
  });

  // Error handling
  ws.on('error', (error) => {
    console.error(`WebSocket error for ${clientId}:`, error);
  });

  // Welcome message
  ws.send(JSON.stringify({
    type: 'welcome',
    clientId,
    timestamp: Date.now(),
  }));
});

// Heartbeat interval
const heartbeat = setInterval(() => {
  wss.clients.forEach((ws) => {
    if (ws.isAlive === false) {
      return ws.terminate();
    }
    ws.isAlive = false;
    ws.ping();
  });
}, 30000);

wss.on('close', () => {
  clearInterval(heartbeat);
});

function handleMessage(clientId, message) {
  switch (message.type) {
    case 'broadcast':
      broadcast(clientId, message.payload);
      break;
    case 'direct':
      sendToClient(message.targetId, message.payload);
      break;
    default:
      console.warn(`Unknown message type: ${message.type}`);
  }
}

function broadcast(senderId, payload) {
  const msg = JSON.stringify({
    type: 'broadcast',
    from: senderId,
    payload,
    timestamp: Date.now(),
  });

  clients.forEach((client, id) => {
    if (id !== senderId && client.ws.readyState === 1) {
      client.ws.send(msg);
    }
  });
}

function sendToClient(targetId, payload) {
  const target = clients.get(targetId);
  if (target && target.ws.readyState === 1) {
    target.ws.send(JSON.stringify({ type: 'direct', payload }));
  }
}

server.listen(8080, () => {
  console.log('WebSocket server running on port 8080');
});

3.4 Client Implementation (Auto-reconnect)

class ReconnectingWebSocket {
  constructor(url, options = {}) {
    this.url = url;
    this.maxRetries = options.maxRetries || 10;
    this.baseDelay = options.baseDelay || 1000;
    this.maxDelay = options.maxDelay || 30000;
    this.retryCount = 0;
    this.handlers = new Map();
    this.messageQueue = [];
    this.connect();
  }

  connect() {
    this.ws = new WebSocket(this.url);

    this.ws.onopen = () => {
      console.log('WebSocket connected');
      this.retryCount = 0;
      // Send queued messages
      while (this.messageQueue.length > 0) {
        const msg = this.messageQueue.shift();
        this.ws.send(msg);
      }
      this.emit('open');
    };

    this.ws.onmessage = (event) => {
      try {
        const data = JSON.parse(event.data);
        this.emit('message', data);
        if (data.type) {
          this.emit(data.type, data);
        }
      } catch (e) {
        this.emit('message', event.data);
      }
    };

    this.ws.onclose = (event) => {
      this.emit('close', event);
      if (!event.wasClean) {
        this.reconnect();
      }
    };

    this.ws.onerror = (error) => {
      this.emit('error', error);
    };
  }

  reconnect() {
    if (this.retryCount >= this.maxRetries) {
      console.error('Max retries reached');
      this.emit('maxRetriesReached');
      return;
    }

    // Exponential backoff with jitter
    const delay = Math.min(
      this.baseDelay * Math.pow(2, this.retryCount) + Math.random() * 1000,
      this.maxDelay
    );

    console.log(`Reconnecting in ${delay}ms (attempt ${this.retryCount + 1})`);
    this.retryCount++;
    setTimeout(() => this.connect(), delay);
  }

  send(data) {
    const message = typeof data === 'string' ? data : JSON.stringify(data);
    if (this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(message);
    } else {
      this.messageQueue.push(message);
    }
  }

  on(event, handler) {
    if (!this.handlers.has(event)) {
      this.handlers.set(event, []);
    }
    this.handlers.get(event).push(handler);
  }

  emit(event, data) {
    const handlers = this.handlers.get(event) || [];
    handlers.forEach((handler) => handler(data));
  }

  close() {
    this.maxRetries = 0; // Prevent reconnection
    this.ws.close(1000, 'Client closing');
  }
}

// Usage
const ws = new ReconnectingWebSocket('wss://api.example.com/ws');
ws.on('message', (data) => console.log('Received:', data));
ws.send({ type: 'join', room: 'general' });

4. SSE (Server-Sent Events)

4.1 What is SSE

SSE is a standard for unidirectional streaming from server to client. It maintains an HTTP connection while the server pushes events.

Advantages:

  • Native compatibility with HTTP/2
  • Built-in auto-reconnection
  • Event recovery through Last-Event-ID
  • Text-based for easy debugging
  • Proxy/load balancer friendly

4.2 SSE Server Implementation

// Express SSE server
import express from 'express';

const app = express();

// SSE client management
const sseClients = new Map();
let eventCounter = 0;

app.get('/api/events', (req, res) => {
  const clientId = Date.now().toString();

  // SSE headers
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
    'X-Accel-Buffering': 'no', // Disable Nginx buffering
  });

  // Handle Last-Event-ID (recovery on reconnect)
  const lastEventId = req.headers['last-event-id'];
  if (lastEventId) {
    // Resend missed events
    const missedEvents = getEventsSince(parseInt(lastEventId));
    missedEvents.forEach((event) => {
      res.write(`id: ${event.id}\n`);
      res.write(`event: ${event.type}\n`);
      res.write(`data: ${JSON.stringify(event.data)}\n\n`);
    });
  }

  // Register client
  sseClients.set(clientId, res);

  // Set reconnection interval (milliseconds)
  res.write('retry: 5000\n\n');

  // Initial connection confirmation
  res.write(`id: ${++eventCounter}\n`);
  res.write('event: connected\n');
  res.write(`data: ${JSON.stringify({ clientId })}\n\n`);

  // Keep-alive (send comment every 15s)
  const keepAlive = setInterval(() => {
    res.write(':keep-alive\n\n');
  }, 15000);

  // Handle disconnect
  req.on('close', () => {
    clearInterval(keepAlive);
    sseClients.delete(clientId);
    console.log(`SSE client disconnected: ${clientId}`);
  });
});

// Broadcast event to all SSE clients
function broadcastSSE(eventType, data) {
  const id = ++eventCounter;
  sseClients.forEach((res) => {
    res.write(`id: ${id}\n`);
    res.write(`event: ${eventType}\n`);
    res.write(`data: ${JSON.stringify(data)}\n\n`);
  });
}

// Notification endpoint
app.post('/api/notify', express.json(), (req, res) => {
  broadcastSSE('notification', {
    title: req.body.title,
    message: req.body.message,
    timestamp: Date.now(),
  });
  res.json({ success: true, clients: sseClients.size });
});

4.3 SSE Client Implementation

// EventSource API usage
class SSEClient {
  constructor(url) {
    this.url = url;
    this.handlers = {};
    this.connect();
  }

  connect() {
    this.source = new EventSource(this.url);

    this.source.onopen = () => {
      console.log('SSE connection opened');
    };

    this.source.onerror = (error) => {
      console.error('SSE error:', error);
      if (this.source.readyState === EventSource.CLOSED) {
        console.log('SSE connection closed');
      }
      // EventSource automatically attempts reconnection
    };

    // Register custom events
    Object.entries(this.handlers).forEach(([event, handler]) => {
      this.source.addEventListener(event, handler);
    });
  }

  on(event, handler) {
    const wrappedHandler = (e) => {
      try {
        const data = JSON.parse(e.data);
        handler(data, e);
      } catch {
        handler(e.data, e);
      }
    };

    this.handlers[event] = wrappedHandler;
    if (this.source) {
      this.source.addEventListener(event, wrappedHandler);
    }
  }

  close() {
    if (this.source) {
      this.source.close();
    }
  }
}

// Usage
const sse = new SSEClient('/api/events');
sse.on('notification', (data) => {
  showNotification(data.title, data.message);
});
sse.on('update', (data) => {
  refreshDashboard(data);
});

4.4 SSE vs WebSocket Selection Criteria

Choose SSE when:

  • Unidirectional data flow from server to client
  • Real-time notifications, feeds, dashboards
  • HTTP/2 multiplexing is available
  • Simple load balancer/proxy configuration is preferred

Choose WebSocket when:

  • Bidirectional communication is needed (chat, games)
  • Binary data transmission
  • Very low latency requirements
  • Frequent message exchange between client and server

5. Socket.IO Complete Guide

5.1 Socket.IO Architecture

Socket.IO is a library built on WebSocket that adds:

  • Auto-reconnection (exponential backoff)
  • Packet buffering
  • Rooms and Namespaces
  • Broadcasting
  • Multiplexing
  • HTTP Long Polling fallback
// Server setup
import { createServer } from 'http';
import { Server } from 'socket.io';

const httpServer = createServer();
const io = new Server(httpServer, {
  cors: {
    origin: ['https://example.com'],
    methods: ['GET', 'POST'],
    credentials: true,
  },
  // Transport configuration
  transports: ['websocket', 'polling'],
  // Ping/Pong settings
  pingTimeout: 20000,
  pingInterval: 25000,
  // Max payload
  maxHttpBufferSize: 1e6, // 1MB
});

5.2 Namespaces and Rooms

// Namespaces - logical separation
const chatNamespace = io.of('/chat');
const adminNamespace = io.of('/admin');

// Chat namespace
chatNamespace.on('connection', (socket) => {
  console.log('User connected to chat:', socket.id);

  // Join room
  socket.on('joinRoom', async (roomName) => {
    await socket.join(roomName);
    // Notify other users in the room
    socket.to(roomName).emit('userJoined', {
      userId: socket.id,
      room: roomName,
    });

    // Send room member count
    const sockets = await chatNamespace.in(roomName).fetchSockets();
    chatNamespace.to(roomName).emit('roomInfo', {
      room: roomName,
      members: sockets.length,
    });
  });

  // Leave room
  socket.on('leaveRoom', async (roomName) => {
    await socket.leave(roomName);
    socket.to(roomName).emit('userLeft', {
      userId: socket.id,
      room: roomName,
    });
  });

  // Send message within room
  socket.on('message', (data) => {
    chatNamespace.to(data.room).emit('message', {
      from: socket.id,
      text: data.text,
      room: data.room,
      timestamp: Date.now(),
    });
  });
});

// Admin namespace - requires authentication
adminNamespace.use((socket, next) => {
  const token = socket.handshake.auth.token;
  if (verifyAdminToken(token)) {
    next();
  } else {
    next(new Error('Unauthorized'));
  }
});

5.3 Middleware and Authentication

// Global middleware
io.use((socket, next) => {
  const token = socket.handshake.auth.token;
  if (!token) {
    return next(new Error('Authentication required'));
  }

  try {
    const user = verifyToken(token);
    socket.data.user = user;
    next();
  } catch (err) {
    next(new Error('Invalid token'));
  }
});

// Namespace middleware
chatNamespace.use((socket, next) => {
  // Additional permission checks
  if (socket.data.user.role === 'banned') {
    return next(new Error('User is banned'));
  }
  next();
});

// Per-event middleware after connection
io.on('connection', (socket) => {
  // Logging middleware for all events
  socket.use(([event, ...args], next) => {
    console.log(`Event: ${event}, User: ${socket.data.user.name}`);
    next();
  });

  // Rate limiting middleware
  const rateLimiter = createRateLimiter(100, 60000); // 100 per minute
  socket.use(([event, ...args], next) => {
    if (rateLimiter.isAllowed(socket.id)) {
      next();
    } else {
      next(new Error('Rate limit exceeded'));
    }
  });
});

5.4 Acknowledgements

// Server
io.on('connection', (socket) => {
  socket.on('createOrder', async (orderData, callback) => {
    try {
      const order = await processOrder(orderData);
      // Success response
      callback({
        status: 'ok',
        orderId: order.id,
      });
    } catch (error) {
      // Error response
      callback({
        status: 'error',
        message: error.message,
      });
    }
  });
});

// Client
socket.emit('createOrder', { item: 'laptop', qty: 1 }, (response) => {
  if (response.status === 'ok') {
    console.log('Order created:', response.orderId);
  } else {
    console.error('Order failed:', response.message);
  }
});

// Emit with timeout
socket.timeout(5000).emit('createOrder', orderData, (err, response) => {
  if (err) {
    // Server did not respond within 5 seconds
    console.error('Timeout');
  } else {
    console.log('Response:', response);
  }
});

6. Chat System Implementation

6.1 Message Ordering

Guaranteeing message order in a distributed environment is a core challenge.

// Server: Message ordering management
class ChatRoom {
  constructor(roomId) {
    this.roomId = roomId;
    this.messageSequence = 0;
    this.pendingMessages = new Map();
  }

  async addMessage(userId, content) {
    const sequence = ++this.messageSequence;
    const message = {
      id: `${this.roomId}-${sequence}`,
      sequence,
      roomId: this.roomId,
      userId,
      content,
      timestamp: Date.now(),
      status: 'sent',
    };

    // Persist to storage
    await this.persistMessage(message);

    return message;
  }

  async getMessagesSince(sequence, limit = 50) {
    // Retrieve messages after a specific sequence
    return await db.messages
      .find({
        roomId: this.roomId,
        sequence: { $gt: sequence },
      })
      .sort({ sequence: 1 })
      .limit(limit)
      .toArray();
  }
}

6.2 Presence Management

// Presence system
class PresenceManager {
  constructor(io, redis) {
    this.io = io;
    this.redis = redis;
    this.TTL = 60; // 60 seconds
  }

  async setOnline(userId, socketId) {
    const key = `presence:${userId}`;
    await this.redis.hset(key, {
      status: 'online',
      socketId,
      lastSeen: Date.now().toString(),
    });
    await this.redis.expire(key, this.TTL);

    // Notify friends of status change
    const friends = await this.getFriends(userId);
    friends.forEach((friendId) => {
      this.io.to(`user:${friendId}`).emit('presence', {
        userId,
        status: 'online',
      });
    });
  }

  async setOffline(userId) {
    const key = `presence:${userId}`;
    await this.redis.hset(key, {
      status: 'offline',
      lastSeen: Date.now().toString(),
    });

    const friends = await this.getFriends(userId);
    friends.forEach((friendId) => {
      this.io.to(`user:${friendId}`).emit('presence', {
        userId,
        status: 'offline',
        lastSeen: Date.now(),
      });
    });
  }

  async heartbeat(userId) {
    const key = `presence:${userId}`;
    await this.redis.expire(key, this.TTL);
  }

  async getStatus(userId) {
    const data = await this.redis.hgetall(`presence:${userId}`);
    return data || { status: 'offline' };
  }
}

6.3 Typing Indicators

// Client
let typingTimeout;

messageInput.addEventListener('input', () => {
  socket.emit('typing', { room: currentRoom });

  clearTimeout(typingTimeout);
  typingTimeout = setTimeout(() => {
    socket.emit('stopTyping', { room: currentRoom });
  }, 2000);
});

// Server
socket.on('typing', (data) => {
  socket.to(data.room).emit('typing', {
    userId: socket.data.user.id,
    userName: socket.data.user.name,
  });
});

socket.on('stopTyping', (data) => {
  socket.to(data.room).emit('stopTyping', {
    userId: socket.data.user.id,
  });
});

6.4 Read Receipts

// Server: Read receipt handling
socket.on('markRead', async (data) => {
  const { roomId, messageId, userId } = data;

  // Save read status to DB
  await db.readReceipts.upsert({
    roomId,
    userId,
    lastReadMessageId: messageId,
    readAt: new Date(),
  });

  // Notify room participants
  socket.to(roomId).emit('readReceipt', {
    userId,
    messageId,
    readAt: Date.now(),
  });
});

// Client: Display read receipts
socket.on('readReceipt', (receipt) => {
  updateMessageStatus(receipt.messageId, receipt.userId, 'read');
});

// Auto-detect message visibility with Intersection Observer
const observer = new IntersectionObserver(
  (entries) => {
    entries.forEach((entry) => {
      if (entry.isIntersecting) {
        const messageId = entry.target.dataset.messageId;
        socket.emit('markRead', {
          roomId: currentRoom,
          messageId,
          userId: currentUser.id,
        });
        observer.unobserve(entry.target);
      }
    });
  },
  { threshold: 0.5 }
);

7. Real-time Notification System

7.1 Push Architecture

// Notification service
class NotificationService {
  constructor(io, redis) {
    this.io = io;
    this.redis = redis;
  }

  async send(userId, notification) {
    const enriched = {
      id: generateId(),
      ...notification,
      createdAt: Date.now(),
      read: false,
    };

    // Persist to database
    await db.notifications.insertOne({
      userId,
      ...enriched,
    });

    // Real-time delivery
    this.io.to(`user:${userId}`).emit('notification', enriched);

    // Queue push notification if offline
    const isOnline = await this.isUserOnline(userId);
    if (!isOnline) {
      await this.queuePushNotification(userId, enriched);
    }

    return enriched;
  }

  async sendBatch(userIds, notification) {
    // Efficient batch processing for bulk notifications
    const enriched = {
      id: generateId(),
      ...notification,
      createdAt: Date.now(),
      read: false,
    };

    // Bulk insert
    const docs = userIds.map((userId) => ({
      userId,
      ...enriched,
    }));
    await db.notifications.insertMany(docs);

    // Real-time delivery to online users
    userIds.forEach((userId) => {
      this.io.to(`user:${userId}`).emit('notification', enriched);
    });
  }

  async markAsRead(userId, notificationIds) {
    await db.notifications.updateMany(
      { _id: { $in: notificationIds }, userId },
      { $set: { read: true, readAt: new Date() } }
    );

    // Update unread count
    const unreadCount = await db.notifications.countDocuments({
      userId,
      read: false,
    });

    this.io.to(`user:${userId}`).emit('unreadCount', unreadCount);
  }
}

7.2 Notification Batching

// High-frequency event batching
class NotificationBatcher {
  constructor(flushInterval = 1000, maxBatchSize = 50) {
    this.buffer = new Map(); // userId -> notifications[]
    this.flushInterval = flushInterval;
    this.maxBatchSize = maxBatchSize;

    setInterval(() => this.flush(), this.flushInterval);
  }

  add(userId, notification) {
    if (!this.buffer.has(userId)) {
      this.buffer.set(userId, []);
    }

    const userBuffer = this.buffer.get(userId);
    userBuffer.push(notification);

    // Flush immediately if batch size exceeded
    if (userBuffer.length >= this.maxBatchSize) {
      this.flushUser(userId);
    }
  }

  flush() {
    this.buffer.forEach((notifications, userId) => {
      if (notifications.length > 0) {
        this.flushUser(userId);
      }
    });
  }

  flushUser(userId) {
    const notifications = this.buffer.get(userId) || [];
    if (notifications.length === 0) return;

    this.buffer.set(userId, []);

    // Deduplicate and group
    const grouped = this.groupNotifications(notifications);
    io.to(`user:${userId}`).emit('notifications:batch', grouped);
  }

  groupNotifications(notifications) {
    // Group notifications by type
    const groups = {};
    notifications.forEach((n) => {
      const key = n.type;
      if (!groups[key]) groups[key] = [];
      groups[key].push(n);
    });
    return groups;
  }
}

8. Collaborative Editing: OT vs CRDT

8.1 Operational Transformation (OT)

OT is the approach used by Google Docs, transforming concurrent operations to maintain consistency.

// OT basic concept
// When two users edit simultaneously:
// User A: insert('X', position=0) - Insert 'X' at position 0
// User B: insert('Y', position=2) - Insert 'Y' at position 2

// Transform A's operation from B's perspective:
// B inserted at position 2, so A's operation is unaffected (position 0)

// Transform B's operation from A's perspective:
// A inserted at position 0, so B's position shifts to 2+1=3

function transformInsert(opA, opB) {
  if (opA.position <= opB.position) {
    return { ...opB, position: opB.position + 1 };
  }
  return opB;
}

8.2 CRDT (Conflict-free Replicated Data Type)

CRDT is a data structure that guarantees eventual consistency without a central server.

// CRDT-based collaborative editing with Y.js
import * as Y from 'yjs';
import { WebsocketProvider } from 'y-websocket';

// Create shared document
const ydoc = new Y.Doc();

// Connect WebSocket provider
const provider = new WebsocketProvider(
  'wss://collaboration.example.com',
  'document-room-123',
  ydoc
);

// Shared text
const ytext = ydoc.getText('content');

// Observe text changes
ytext.observe((event) => {
  console.log('Text changed:', event.delta);
});

// Edit text
ytext.insert(0, 'Hello, ');
ytext.insert(7, 'World!');

// Shared map (key-value)
const ymap = ydoc.getMap('metadata');
ymap.set('title', 'My Document');
ymap.set('lastEdited', Date.now());

// Shared array
const yarray = ydoc.getArray('items');
yarray.push(['item1', 'item2']);

// Connection status monitoring
provider.on('status', (event) => {
  console.log('Connection status:', event.status);
});

// Awareness (cursor positions, user info)
const awareness = provider.awareness;
awareness.setLocalState({
  user: { name: 'Alice', color: '#ff0000' },
  cursor: { position: 42 },
});

awareness.on('change', () => {
  const states = awareness.getStates();
  updateCursors(states);
});

8.3 OT vs CRDT Comparison

FeatureOTCRDT
Server DependencyCentral server requiredWorks without server
Implementation ComplexityHighMedium
Memory UsageLowHigher (metadata)
Offline SupportDifficultNatural
Representative ImplementationsGoogle DocsY.js, Automerge
ScalabilityServer bottleneck possibleP2P scalable

9. Scaling WebSocket Horizontally

9.1 The Problem: Multiple Server Instances

WebSocket connections are bound to specific server instances. When multiple servers exist, messages from a client on one server must reach clients on other servers.

9.2 Redis Adapter (Socket.IO)

import { Server } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';

const io = new Server(httpServer);

// Create Redis clients
const pubClient = createClient({ url: 'redis://redis-host:6379' });
const subClient = pubClient.duplicate();

await Promise.all([pubClient.connect(), subClient.connect()]);

// Apply Redis Adapter
io.adapter(createAdapter(pubClient, subClient));

// Now broadcasting works across all server instances
io.emit('globalEvent', { message: 'All servers receive this' });

// Send to specific room (reaches clients on all servers)
io.to('room-123').emit('roomEvent', { data: 'value' });

9.3 Sticky Sessions Configuration

WebSocket connections must be routed to the same server during upgrade.

# Nginx configuration
upstream websocket_backend {
    ip_hash;  # Sticky Session
    server backend1:3000;
    server backend2:3000;
    server backend3:3000;
}

server {
    listen 80;

    location /socket.io/ {
        proxy_pass http://websocket_backend;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

        # Timeout settings
        proxy_read_timeout 86400s;
        proxy_send_timeout 86400s;
    }
}

9.4 Pub/Sub with NATS

import { connect } from 'nats';

class NATSAdapter {
  constructor(io) {
    this.io = io;
    this.serverId = process.env.SERVER_ID || generateId();
  }

  async connect() {
    this.nc = await connect({ servers: 'nats://nats-host:4222' });
    this.js = this.nc.jetstream();

    // Subscribe to broadcast channel
    const sub = this.nc.subscribe('ws.broadcast.>');
    (async () => {
      for await (const msg of sub) {
        const data = JSON.parse(msg.data.toString());
        // Ignore messages from self
        if (data.serverId === this.serverId) continue;

        const topic = msg.subject.split('.').slice(2).join('.');
        this.io.to(topic).emit(data.event, data.payload);
      }
    })();
  }

  broadcast(room, event, payload) {
    this.nc.publish(`ws.broadcast.${room}`, JSON.stringify({
      serverId: this.serverId,
      event,
      payload,
    }));
  }
}

10. Security

10.1 Authentication

// Token-based authentication
io.use(async (socket, next) => {
  try {
    const token = socket.handshake.auth.token
      || socket.handshake.query.token;

    if (!token) {
      return next(new Error('Authentication token required'));
    }

    // JWT verification
    const payload = await verifyJWT(token);

    // Check token expiration
    if (payload.exp * 1000 < Date.now()) {
      return next(new Error('Token expired'));
    }

    // Attach user info
    socket.data.user = {
      id: payload.sub,
      roles: payload.roles,
    };

    next();
  } catch (err) {
    next(new Error('Invalid authentication token'));
  }
});

// Periodic token refresh
socket.on('refreshToken', async (refreshToken, callback) => {
  try {
    const newToken = await refreshAccessToken(refreshToken);
    callback({ status: 'ok', token: newToken });
  } catch (err) {
    callback({ status: 'error', message: 'Token refresh failed' });
    socket.disconnect();
  }
});

10.2 Rate Limiting

// Token bucket algorithm
class TokenBucket {
  constructor(capacity, refillRate) {
    this.capacity = capacity;
    this.tokens = capacity;
    this.refillRate = refillRate; // Tokens added per second
    this.lastRefill = Date.now();
  }

  consume(count = 1) {
    this.refill();
    if (this.tokens >= count) {
      this.tokens -= count;
      return true;
    }
    return false;
  }

  refill() {
    const now = Date.now();
    const elapsed = (now - this.lastRefill) / 1000;
    this.tokens = Math.min(
      this.capacity,
      this.tokens + elapsed * this.refillRate
    );
    this.lastRefill = now;
  }
}

// Apply to Socket.IO
const rateLimiters = new Map();

io.use((socket, next) => {
  const userId = socket.data.user.id;
  if (!rateLimiters.has(userId)) {
    rateLimiters.set(userId, new TokenBucket(100, 10)); // 100 capacity, 10/sec refill
  }
  next();
});

socket.use(([event, ...args], next) => {
  const bucket = rateLimiters.get(socket.data.user.id);
  if (bucket.consume()) {
    next();
  } else {
    next(new Error('Rate limit exceeded'));
  }
});

10.3 Origin Validation

const io = new Server(httpServer, {
  cors: {
    origin: (origin, callback) => {
      const allowedOrigins = [
        'https://app.example.com',
        'https://admin.example.com',
      ];

      if (!origin || allowedOrigins.includes(origin)) {
        callback(null, true);
      } else {
        callback(new Error('Not allowed by CORS'));
      }
    },
    credentials: true,
  },
});

// Additional origin validation middleware
io.use((socket, next) => {
  const origin = socket.handshake.headers.origin;
  const allowedOrigins = process.env.ALLOWED_ORIGINS?.split(',') || [];

  if (allowedOrigins.length > 0 && !allowedOrigins.includes(origin)) {
    return next(new Error('Origin not allowed'));
  }
  next();
});

11. Performance Optimization

11.1 Message Compression

// permessage-deflate extension
import { WebSocketServer } from 'ws';

const wss = new WebSocketServer({
  port: 8080,
  perMessageDeflate: {
    zlibDeflateOptions: {
      chunkSize: 1024,
      memLevel: 7,
      level: 3, // Compression level (1-9)
    },
    zlibInflateOptions: {
      chunkSize: 10 * 1024,
    },
    // Only compress above threshold
    threshold: 1024, // Only messages above 1KB
    clientNoContextTakeover: true,
    serverNoContextTakeover: true,
    serverMaxWindowBits: 10,
    concurrencyLimit: 10,
  },
});

11.2 Binary Protocol (Protocol Buffers)

// message.proto
syntax = "proto3";

message ChatMessage {
  string id = 1;
  string room_id = 2;
  string user_id = 3;
  string content = 4;
  int64 timestamp = 5;
  MessageType type = 6;

  enum MessageType {
    TEXT = 0;
    IMAGE = 1;
    FILE = 2;
    SYSTEM = 3;
  }
}

message WebSocketFrame {
  string event = 1;
  oneof payload {
    ChatMessage chat_message = 2;
    PresenceUpdate presence = 3;
    TypingIndicator typing = 4;
  }
}
// Using Protobuf
import protobuf from 'protobufjs';

const root = await protobuf.load('message.proto');
const ChatMessage = root.lookupType('ChatMessage');
const WebSocketFrame = root.lookupType('WebSocketFrame');

// Encoding
function encodeMessage(event, payload) {
  const frame = WebSocketFrame.create({ event, chatMessage: payload });
  return WebSocketFrame.encode(frame).finish();
}

// Decoding
function decodeMessage(buffer) {
  return WebSocketFrame.decode(new Uint8Array(buffer));
}

// Binary transmission over WebSocket
ws.on('message', (data, isBinary) => {
  if (isBinary) {
    const frame = decodeMessage(data);
    handleFrame(frame);
  }
});

11.3 Connection Pooling and Optimization

// Connection limit per user
class ConnectionManager {
  constructor(maxConnectionsPerUser = 5) {
    this.connections = new Map(); // userId -> Set of socketIds
    this.maxPerUser = maxConnectionsPerUser;
  }

  addConnection(userId, socketId) {
    if (!this.connections.has(userId)) {
      this.connections.set(userId, new Set());
    }

    const userConnections = this.connections.get(userId);

    // Disconnect oldest if max connections exceeded
    if (userConnections.size >= this.maxPerUser) {
      const oldest = userConnections.values().next().value;
      io.sockets.sockets.get(oldest)?.disconnect();
      userConnections.delete(oldest);
    }

    userConnections.add(socketId);
  }

  removeConnection(userId, socketId) {
    const userConnections = this.connections.get(userId);
    if (userConnections) {
      userConnections.delete(socketId);
      if (userConnections.size === 0) {
        this.connections.delete(userId);
      }
    }
  }

  getConnectionCount() {
    let total = 0;
    this.connections.forEach((sockets) => {
      total += sockets.size;
    });
    return total;
  }
}

12. Production Operations

12.1 Health Check

// WebSocket server health check
app.get('/health', (req, res) => {
  const health = {
    status: 'ok',
    uptime: process.uptime(),
    connections: io.engine.clientsCount,
    rooms: io.sockets.adapter.rooms.size,
    memory: process.memoryUsage(),
    timestamp: Date.now(),
  };

  // Check connection threshold
  if (health.connections > 10000) {
    health.status = 'degraded';
    res.status(503);
  }

  res.json(health);
});

// Kubernetes readiness/liveness probes
app.get('/ready', (req, res) => {
  if (isReady) {
    res.status(200).json({ ready: true });
  } else {
    res.status(503).json({ ready: false });
  }
});

12.2 Graceful Shutdown

// Graceful shutdown implementation
async function gracefulShutdown(signal) {
  console.log(`Received ${signal}. Starting graceful shutdown...`);

  // 1. Stop accepting new connections
  httpServer.close();

  // 2. Notify existing clients
  io.emit('server:shutdown', {
    message: 'Server is shutting down. Please reconnect.',
    reconnectDelay: 5000,
  });

  // 3. Disconnect all connections (with grace period)
  const disconnectTimeout = setTimeout(() => {
    io.disconnectSockets(true);
  }, 10000);

  // 4. Wait for pending operations
  await flushPendingMessages();

  // 5. Clean up Redis connections
  await pubClient.quit();
  await subClient.quit();

  clearTimeout(disconnectTimeout);

  console.log('Graceful shutdown complete');
  process.exit(0);
}

process.on('SIGTERM', () => gracefulShutdown('SIGTERM'));
process.on('SIGINT', () => gracefulShutdown('SIGINT'));

12.3 Monitoring

// Prometheus metrics
import { Registry, Counter, Gauge, Histogram } from 'prom-client';

const register = new Registry();

const wsConnections = new Gauge({
  name: 'websocket_connections_active',
  help: 'Number of active WebSocket connections',
  registers: [register],
});

const wsMessages = new Counter({
  name: 'websocket_messages_total',
  help: 'Total WebSocket messages',
  labelNames: ['direction', 'type'],
  registers: [register],
});

const wsLatency = new Histogram({
  name: 'websocket_message_latency_seconds',
  help: 'WebSocket message processing latency',
  buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1],
  registers: [register],
});

// Collect metrics
io.on('connection', (socket) => {
  wsConnections.inc();

  socket.on('disconnect', () => {
    wsConnections.dec();
  });

  socket.use(([event, ...args], next) => {
    const start = process.hrtime.bigint();
    wsMessages.inc({ direction: 'inbound', type: event });

    next();

    const duration = Number(process.hrtime.bigint() - start) / 1e9;
    wsLatency.observe(duration);
  });
});

// Metrics endpoint
app.get('/metrics', async (req, res) => {
  res.set('Content-Type', register.contentType);
  res.end(await register.metrics());
});

13. Production Architecture Patterns

13.1 Chat Service Full Architecture

Client (React/Vue)
    |
    | WebSocket (Socket.IO)
    v
Load Balancer (Nginx, sticky sessions)
    |
    +---> App Server 1 ---+
    +---> App Server 2 ---+--> Redis Pub/Sub (message broadcast)
    +---> App Server 3 ---+
                           |
                           +--> PostgreSQL (message persistence)
                           +--> Redis (sessions, presence, cache)
                           +--> S3 (file uploads)

13.2 Event-Driven Architecture

// Event bus pattern
class EventBus {
  constructor(redis) {
    this.redis = redis;
    this.handlers = new Map();
  }

  async publish(channel, event) {
    await this.redis.publish(channel, JSON.stringify(event));
  }

  subscribe(channel, handler) {
    if (!this.handlers.has(channel)) {
      this.handlers.set(channel, []);
      this.redis.subscribe(channel);
    }
    this.handlers.get(channel).push(handler);
  }
}

// Usage
const bus = new EventBus(redisClient);

bus.subscribe('orders', (event) => {
  if (event.type === 'ORDER_CREATED') {
    // Real-time notification to relevant users
    notificationService.send(event.userId, {
      type: 'order',
      title: 'Order Confirmed',
      body: `Your order #${event.orderId} has been confirmed`,
    });
  }
});

14. Quiz

Q1: What HTTP status code is used during the WebSocket handshake?

Answer: 101 Switching Protocols

WebSocket connections are established through the HTTP Upgrade mechanism. When the client sends a request with the Upgrade: websocket header, the server responds with status code 101 to indicate a successful protocol switch. After this, bidirectional binary frame communication begins.

Q2: What header does SSE (Server-Sent Events) use for its auto-reconnect mechanism?

Answer: Last-Event-ID

The EventSource API automatically attempts reconnection when the connection is lost. It includes the ID of the last received event in the Last-Event-ID request header. The server can use this to resend events from after that ID. The retry field can also specify the reconnection interval in milliseconds.

Q3: What is the difference between Socket.IO's Room and Namespace?

Answer:

Namespace multiplexes multiple communication channels over a single WebSocket connection. They are distinguished by different endpoints (e.g., /chat, /admin) and can have independent middleware and event handlers.

Room is a logical grouping of clients within a Namespace. Clients join with socket.join(room) and messages are sent to specific groups with io.to(room).emit(). Rooms are a server-side concept that clients cannot access directly.

Q4: Explain the key difference between OT (Operational Transformation) and CRDT.

Answer:

OT relies on a central server that transforms concurrent editing operations to maintain consistency. Google Docs is the quintessential example. It is server-dependent and has limitations with offline editing.

CRDT uses data structures that inherently resolve conflicts, guaranteeing eventual consistency without a central server. Y.js and Automerge are representative implementations. CRDTs naturally support P2P communication and offline editing but have higher memory usage due to metadata.

Q5: Why is Redis Adapter needed when scaling WebSocket servers horizontally?

Answer:

WebSocket connections are bound to specific server instances. When multiple server instances exist, messages from a client connected to Server A must be delivered to clients connected to Server B.

Redis Adapter uses Redis Pub/Sub to broadcast messages across all server instances. When io.to(room).emit() is called on one server, the event propagates through Redis to all other servers, delivering the message to clients across the entire cluster. Additionally, sticky session configuration is required to ensure WebSocket connections route to the same server during the upgrade process.


15. References

  1. RFC 6455 - The WebSocket Protocol - https://datatracker.ietf.org/doc/html/rfc6455
  2. MDN - WebSocket API - https://developer.mozilla.org/en-US/docs/Web/API/WebSocket
  3. MDN - Server-Sent Events - https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events
  4. Socket.IO Documentation - https://socket.io/docs/v4/
  5. ws - WebSocket library for Node.js - https://github.com/websockets/ws
  6. Y.js - CRDT Framework - https://docs.yjs.dev/
  7. Automerge - CRDT Library - https://automerge.org/
  8. Redis Adapter for Socket.IO - https://socket.io/docs/v4/redis-adapter/
  9. NATS Messaging - https://nats.io/
  10. Protocol Buffers - https://protobuf.dev/
  11. Scaling WebSocket (Socket.IO docs) - https://socket.io/docs/v4/using-multiple-nodes/
  12. HTML Living Standard - Server-Sent Events - https://html.spec.whatwg.org/multipage/server-sent-events.html
  13. pino - Node.js Logger - https://getpino.io/

The real-time communication technologies covered in this guide each have unique strengths. SSE excels at simple unidirectional streaming, WebSocket at bidirectional communication, and Socket.IO at projects requiring rich features. In production, always ensure proper authentication, rate limiting, and monitoring are in place, and configure Redis Adapter with sticky sessions for horizontal scaling.