Skip to content
Published on

WebSocket & Real-time Communication Complete Guide 2025: SSE, Socket.IO, Chat/Notifications/Collaboration

Authors

TOC

1. Why Real-time Communication Matters

Modern web applications no longer operate on a simple request-response model. Chat, notifications, live dashboards, collaborative editing, online gaming -- the core of user experience depends on immediacy.

Limitations of the traditional HTTP request-response model:

  • The client must always initiate requests before the server responds
  • The server cannot proactively send data to the client
  • Polling can work around this but is inefficient
  • High latency and significant server resource waste

This guide covers the full spectrum of real-time communication technologies including WebSocket, SSE (Server-Sent Events), and Socket.IO.


2. Communication Methods Compared: Polling vs Long Polling vs SSE vs WebSocket

2.1 Comparison Table

FeatureShort PollingLong PollingSSEWebSocket
DirectionClient to ServerClient to ServerServer to ClientBidirectional
ProtocolHTTPHTTPHTTPws:// / wss://
ConnectionNew connection each timeHeld until responsePersistentPersistent
LatencyHigh (polling interval)MediumLowVery Low
Server LoadHighMediumLowLow
Browser SupportAll browsersAll browsersMostMost
Auto-reconnectManualManualBuilt-inManual
Binary DataInefficientInefficientNot supportedSupported

2.2 Short Polling

The simplest approach. The client sends requests to the server at regular intervals.

// Short Polling implementation
function startPolling(interval = 5000) {
  const poll = async () => {
    try {
      const response = await fetch('/api/messages');
      const data = await response.json();
      updateUI(data);
    } catch (error) {
      console.error('Polling error:', error);
    }
  };

  // Execute immediately then set interval
  poll();
  return setInterval(poll, interval);
}

// Cleanup
const pollId = startPolling(3000);
// clearInterval(pollId);

Problem: Requests are sent continuously even when there is no new data, wasting server resources.

2.3 Long Polling

The server delays the response until new data is available.

// Long Polling client
async function longPoll(lastEventId = null) {
  try {
    const url = lastEventId
      ? `/api/events?since=${lastEventId}`
      : '/api/events';

    const response = await fetch(url, {
      signal: AbortSignal.timeout(30000), // 30-second timeout
    });

    if (response.status === 200) {
      const data = await response.json();
      handleNewData(data);
      // Immediately start next long poll
      longPoll(data.lastEventId);
    } else if (response.status === 204) {
      // No data - retry after timeout
      longPoll(lastEventId);
    }
  } catch (error) {
    // Progressive retry on error
    console.error('Long poll error:', error);
    setTimeout(() => longPoll(lastEventId), 3000);
  }
}
// Long Polling server (Express)
app.get('/api/events', async (req, res) => {
  const since = req.query.since;
  const timeout = 25000; // 25 seconds

  const checkForData = () => {
    return new Promise((resolve) => {
      const timer = setTimeout(() => {
        resolve(null); // Timeout
      }, timeout);

      eventEmitter.once('newData', (data) => {
        clearTimeout(timer);
        resolve(data);
      });
    });
  };

  const data = await checkForData();
  if (data) {
    res.json(data);
  } else {
    res.status(204).end();
  }
});

2.4 Choosing the Right Approach

  • Short Polling: When data changes are infrequent and real-time is not critical
  • Long Polling: Fallback when WebSocket is unavailable
  • SSE: Unidirectional server-to-client streaming (notifications, feeds, live logs)
  • WebSocket: When bidirectional communication is needed (chat, games, collaborative editing)

3. WebSocket Protocol Deep Dive

3.1 WebSocket Handshake

WebSocket establishes connections through the HTTP Upgrade mechanism.

// Client request
GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Sec-WebSocket-Protocol: chat, superchat

// Server response
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat

Key points:

  • HTTP 101 status code signals protocol switch
  • Sec-WebSocket-Accept is computed from the Key plus a magic GUID
  • After the handshake, bidirectional binary frame communication begins

3.2 WebSocket Frame Structure

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len |    Extended payload length    |
|I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
|N|V|V|V|       |S|             |   (if payload len==126/127)   |
| |1|2|3|       |K|             |                               |
+-+-+-+-+-------+-+-------------+-------------------------------+

Key opcodes:

  • 0x1: Text frame
  • 0x2: Binary frame
  • 0x8: Connection close
  • 0x9: Ping
  • 0xA: Pong

3.3 Basic Node.js WebSocket Server

import { WebSocketServer } from 'ws';
import { createServer } from 'http';

const server = createServer();
const wss = new WebSocketServer({ server });

// Connected client management
const clients = new Map();

wss.on('connection', (ws, req) => {
  const clientId = generateId();
  clients.set(clientId, { ws, alive: true });

  console.log(`Client connected: ${clientId}`);

  // Ping/Pong heartbeat
  ws.isAlive = true;
  ws.on('pong', () => {
    ws.isAlive = true;
  });

  // Message reception
  ws.on('message', (data, isBinary) => {
    const message = isBinary ? data : data.toString();

    try {
      const parsed = JSON.parse(message);
      handleMessage(clientId, parsed);
    } catch (e) {
      ws.send(JSON.stringify({ error: 'Invalid message format' }));
    }
  });

  // Connection close
  ws.on('close', (code, reason) => {
    console.log(`Client disconnected: ${clientId}, code: ${code}`);
    clients.delete(clientId);
  });

  // Error handling
  ws.on('error', (error) => {
    console.error(`WebSocket error for ${clientId}:`, error);
  });

  // Welcome message
  ws.send(JSON.stringify({
    type: 'welcome',
    clientId,
    timestamp: Date.now(),
  }));
});

// Heartbeat interval
const heartbeat = setInterval(() => {
  wss.clients.forEach((ws) => {
    if (ws.isAlive === false) {
      return ws.terminate();
    }
    ws.isAlive = false;
    ws.ping();
  });
}, 30000);

wss.on('close', () => {
  clearInterval(heartbeat);
});

function handleMessage(clientId, message) {
  switch (message.type) {
    case 'broadcast':
      broadcast(clientId, message.payload);
      break;
    case 'direct':
      sendToClient(message.targetId, message.payload);
      break;
    default:
      console.warn(`Unknown message type: ${message.type}`);
  }
}

function broadcast(senderId, payload) {
  const msg = JSON.stringify({
    type: 'broadcast',
    from: senderId,
    payload,
    timestamp: Date.now(),
  });

  clients.forEach((client, id) => {
    if (id !== senderId && client.ws.readyState === 1) {
      client.ws.send(msg);
    }
  });
}

function sendToClient(targetId, payload) {
  const target = clients.get(targetId);
  if (target && target.ws.readyState === 1) {
    target.ws.send(JSON.stringify({ type: 'direct', payload }));
  }
}

server.listen(8080, () => {
  console.log('WebSocket server running on port 8080');
});

3.4 Client Implementation (Auto-reconnect)

class ReconnectingWebSocket {
  constructor(url, options = {}) {
    this.url = url;
    this.maxRetries = options.maxRetries || 10;
    this.baseDelay = options.baseDelay || 1000;
    this.maxDelay = options.maxDelay || 30000;
    this.retryCount = 0;
    this.handlers = new Map();
    this.messageQueue = [];
    this.connect();
  }

  connect() {
    this.ws = new WebSocket(this.url);

    this.ws.onopen = () => {
      console.log('WebSocket connected');
      this.retryCount = 0;
      // Send queued messages
      while (this.messageQueue.length > 0) {
        const msg = this.messageQueue.shift();
        this.ws.send(msg);
      }
      this.emit('open');
    };

    this.ws.onmessage = (event) => {
      try {
        const data = JSON.parse(event.data);
        this.emit('message', data);
        if (data.type) {
          this.emit(data.type, data);
        }
      } catch (e) {
        this.emit('message', event.data);
      }
    };

    this.ws.onclose = (event) => {
      this.emit('close', event);
      if (!event.wasClean) {
        this.reconnect();
      }
    };

    this.ws.onerror = (error) => {
      this.emit('error', error);
    };
  }

  reconnect() {
    if (this.retryCount >= this.maxRetries) {
      console.error('Max retries reached');
      this.emit('maxRetriesReached');
      return;
    }

    // Exponential backoff with jitter
    const delay = Math.min(
      this.baseDelay * Math.pow(2, this.retryCount) + Math.random() * 1000,
      this.maxDelay
    );

    console.log(`Reconnecting in ${delay}ms (attempt ${this.retryCount + 1})`);
    this.retryCount++;
    setTimeout(() => this.connect(), delay);
  }

  send(data) {
    const message = typeof data === 'string' ? data : JSON.stringify(data);
    if (this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(message);
    } else {
      this.messageQueue.push(message);
    }
  }

  on(event, handler) {
    if (!this.handlers.has(event)) {
      this.handlers.set(event, []);
    }
    this.handlers.get(event).push(handler);
  }

  emit(event, data) {
    const handlers = this.handlers.get(event) || [];
    handlers.forEach((handler) => handler(data));
  }

  close() {
    this.maxRetries = 0; // Prevent reconnection
    this.ws.close(1000, 'Client closing');
  }
}

// Usage
const ws = new ReconnectingWebSocket('wss://api.example.com/ws');
ws.on('message', (data) => console.log('Received:', data));
ws.send({ type: 'join', room: 'general' });

4. SSE (Server-Sent Events)

4.1 What is SSE

SSE is a standard for unidirectional streaming from server to client. It maintains an HTTP connection while the server pushes events.

Advantages:

  • Native compatibility with HTTP/2
  • Built-in auto-reconnection
  • Event recovery through Last-Event-ID
  • Text-based for easy debugging
  • Proxy/load balancer friendly

4.2 SSE Server Implementation

// Express SSE server
import express from 'express';

const app = express();

// SSE client management
const sseClients = new Map();
let eventCounter = 0;

app.get('/api/events', (req, res) => {
  const clientId = Date.now().toString();

  // SSE headers
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
    'X-Accel-Buffering': 'no', // Disable Nginx buffering
  });

  // Handle Last-Event-ID (recovery on reconnect)
  const lastEventId = req.headers['last-event-id'];
  if (lastEventId) {
    // Resend missed events
    const missedEvents = getEventsSince(parseInt(lastEventId));
    missedEvents.forEach((event) => {
      res.write(`id: ${event.id}\n`);
      res.write(`event: ${event.type}\n`);
      res.write(`data: ${JSON.stringify(event.data)}\n\n`);
    });
  }

  // Register client
  sseClients.set(clientId, res);

  // Set reconnection interval (milliseconds)
  res.write('retry: 5000\n\n');

  // Initial connection confirmation
  res.write(`id: ${++eventCounter}\n`);
  res.write('event: connected\n');
  res.write(`data: ${JSON.stringify({ clientId })}\n\n`);

  // Keep-alive (send comment every 15s)
  const keepAlive = setInterval(() => {
    res.write(':keep-alive\n\n');
  }, 15000);

  // Handle disconnect
  req.on('close', () => {
    clearInterval(keepAlive);
    sseClients.delete(clientId);
    console.log(`SSE client disconnected: ${clientId}`);
  });
});

// Broadcast event to all SSE clients
function broadcastSSE(eventType, data) {
  const id = ++eventCounter;
  sseClients.forEach((res) => {
    res.write(`id: ${id}\n`);
    res.write(`event: ${eventType}\n`);
    res.write(`data: ${JSON.stringify(data)}\n\n`);
  });
}

// Notification endpoint
app.post('/api/notify', express.json(), (req, res) => {
  broadcastSSE('notification', {
    title: req.body.title,
    message: req.body.message,
    timestamp: Date.now(),
  });
  res.json({ success: true, clients: sseClients.size });
});

4.3 SSE Client Implementation

// EventSource API usage
class SSEClient {
  constructor(url) {
    this.url = url;
    this.handlers = {};
    this.connect();
  }

  connect() {
    this.source = new EventSource(this.url);

    this.source.onopen = () => {
      console.log('SSE connection opened');
    };

    this.source.onerror = (error) => {
      console.error('SSE error:', error);
      if (this.source.readyState === EventSource.CLOSED) {
        console.log('SSE connection closed');
      }
      // EventSource automatically attempts reconnection
    };

    // Register custom events
    Object.entries(this.handlers).forEach(([event, handler]) => {
      this.source.addEventListener(event, handler);
    });
  }

  on(event, handler) {
    const wrappedHandler = (e) => {
      try {
        const data = JSON.parse(e.data);
        handler(data, e);
      } catch {
        handler(e.data, e);
      }
    };

    this.handlers[event] = wrappedHandler;
    if (this.source) {
      this.source.addEventListener(event, wrappedHandler);
    }
  }

  close() {
    if (this.source) {
      this.source.close();
    }
  }
}

// Usage
const sse = new SSEClient('/api/events');
sse.on('notification', (data) => {
  showNotification(data.title, data.message);
});
sse.on('update', (data) => {
  refreshDashboard(data);
});

4.4 SSE vs WebSocket Selection Criteria

Choose SSE when:

  • Unidirectional data flow from server to client
  • Real-time notifications, feeds, dashboards
  • HTTP/2 multiplexing is available
  • Simple load balancer/proxy configuration is preferred

Choose WebSocket when:

  • Bidirectional communication is needed (chat, games)
  • Binary data transmission
  • Very low latency requirements
  • Frequent message exchange between client and server

5. Socket.IO Complete Guide

5.1 Socket.IO Architecture

Socket.IO is a library built on WebSocket that adds:

  • Auto-reconnection (exponential backoff)
  • Packet buffering
  • Rooms and Namespaces
  • Broadcasting
  • Multiplexing
  • HTTP Long Polling fallback
// Server setup
import { createServer } from 'http';
import { Server } from 'socket.io';

const httpServer = createServer();
const io = new Server(httpServer, {
  cors: {
    origin: ['https://example.com'],
    methods: ['GET', 'POST'],
    credentials: true,
  },
  // Transport configuration
  transports: ['websocket', 'polling'],
  // Ping/Pong settings
  pingTimeout: 20000,
  pingInterval: 25000,
  // Max payload
  maxHttpBufferSize: 1e6, // 1MB
});

5.2 Namespaces and Rooms

// Namespaces - logical separation
const chatNamespace = io.of('/chat');
const adminNamespace = io.of('/admin');

// Chat namespace
chatNamespace.on('connection', (socket) => {
  console.log('User connected to chat:', socket.id);

  // Join room
  socket.on('joinRoom', async (roomName) => {
    await socket.join(roomName);
    // Notify other users in the room
    socket.to(roomName).emit('userJoined', {
      userId: socket.id,
      room: roomName,
    });

    // Send room member count
    const sockets = await chatNamespace.in(roomName).fetchSockets();
    chatNamespace.to(roomName).emit('roomInfo', {
      room: roomName,
      members: sockets.length,
    });
  });

  // Leave room
  socket.on('leaveRoom', async (roomName) => {
    await socket.leave(roomName);
    socket.to(roomName).emit('userLeft', {
      userId: socket.id,
      room: roomName,
    });
  });

  // Send message within room
  socket.on('message', (data) => {
    chatNamespace.to(data.room).emit('message', {
      from: socket.id,
      text: data.text,
      room: data.room,
      timestamp: Date.now(),
    });
  });
});

// Admin namespace - requires authentication
adminNamespace.use((socket, next) => {
  const token = socket.handshake.auth.token;
  if (verifyAdminToken(token)) {
    next();
  } else {
    next(new Error('Unauthorized'));
  }
});

5.3 Middleware and Authentication

// Global middleware
io.use((socket, next) => {
  const token = socket.handshake.auth.token;
  if (!token) {
    return next(new Error('Authentication required'));
  }

  try {
    const user = verifyToken(token);
    socket.data.user = user;
    next();
  } catch (err) {
    next(new Error('Invalid token'));
  }
});

// Namespace middleware
chatNamespace.use((socket, next) => {
  // Additional permission checks
  if (socket.data.user.role === 'banned') {
    return next(new Error('User is banned'));
  }
  next();
});

// Per-event middleware after connection
io.on('connection', (socket) => {
  // Logging middleware for all events
  socket.use(([event, ...args], next) => {
    console.log(`Event: ${event}, User: ${socket.data.user.name}`);
    next();
  });

  // Rate limiting middleware
  const rateLimiter = createRateLimiter(100, 60000); // 100 per minute
  socket.use(([event, ...args], next) => {
    if (rateLimiter.isAllowed(socket.id)) {
      next();
    } else {
      next(new Error('Rate limit exceeded'));
    }
  });
});

5.4 Acknowledgements

// Server
io.on('connection', (socket) => {
  socket.on('createOrder', async (orderData, callback) => {
    try {
      const order = await processOrder(orderData);
      // Success response
      callback({
        status: 'ok',
        orderId: order.id,
      });
    } catch (error) {
      // Error response
      callback({
        status: 'error',
        message: error.message,
      });
    }
  });
});

// Client
socket.emit('createOrder', { item: 'laptop', qty: 1 }, (response) => {
  if (response.status === 'ok') {
    console.log('Order created:', response.orderId);
  } else {
    console.error('Order failed:', response.message);
  }
});

// Emit with timeout
socket.timeout(5000).emit('createOrder', orderData, (err, response) => {
  if (err) {
    // Server did not respond within 5 seconds
    console.error('Timeout');
  } else {
    console.log('Response:', response);
  }
});

6. Chat System Implementation

6.1 Message Ordering

Guaranteeing message order in a distributed environment is a core challenge.

// Server: Message ordering management
class ChatRoom {
  constructor(roomId) {
    this.roomId = roomId;
    this.messageSequence = 0;
    this.pendingMessages = new Map();
  }

  async addMessage(userId, content) {
    const sequence = ++this.messageSequence;
    const message = {
      id: `${this.roomId}-${sequence}`,
      sequence,
      roomId: this.roomId,
      userId,
      content,
      timestamp: Date.now(),
      status: 'sent',
    };

    // Persist to storage
    await this.persistMessage(message);

    return message;
  }

  async getMessagesSince(sequence, limit = 50) {
    // Retrieve messages after a specific sequence
    return await db.messages
      .find({
        roomId: this.roomId,
        sequence: { $gt: sequence },
      })
      .sort({ sequence: 1 })
      .limit(limit)
      .toArray();
  }
}

6.2 Presence Management

// Presence system
class PresenceManager {
  constructor(io, redis) {
    this.io = io;
    this.redis = redis;
    this.TTL = 60; // 60 seconds
  }

  async setOnline(userId, socketId) {
    const key = `presence:${userId}`;
    await this.redis.hset(key, {
      status: 'online',
      socketId,
      lastSeen: Date.now().toString(),
    });
    await this.redis.expire(key, this.TTL);

    // Notify friends of status change
    const friends = await this.getFriends(userId);
    friends.forEach((friendId) => {
      this.io.to(`user:${friendId}`).emit('presence', {
        userId,
        status: 'online',
      });
    });
  }

  async setOffline(userId) {
    const key = `presence:${userId}`;
    await this.redis.hset(key, {
      status: 'offline',
      lastSeen: Date.now().toString(),
    });

    const friends = await this.getFriends(userId);
    friends.forEach((friendId) => {
      this.io.to(`user:${friendId}`).emit('presence', {
        userId,
        status: 'offline',
        lastSeen: Date.now(),
      });
    });
  }

  async heartbeat(userId) {
    const key = `presence:${userId}`;
    await this.redis.expire(key, this.TTL);
  }

  async getStatus(userId) {
    const data = await this.redis.hgetall(`presence:${userId}`);
    return data || { status: 'offline' };
  }
}

6.3 Typing Indicators

// Client
let typingTimeout;

messageInput.addEventListener('input', () => {
  socket.emit('typing', { room: currentRoom });

  clearTimeout(typingTimeout);
  typingTimeout = setTimeout(() => {
    socket.emit('stopTyping', { room: currentRoom });
  }, 2000);
});

// Server
socket.on('typing', (data) => {
  socket.to(data.room).emit('typing', {
    userId: socket.data.user.id,
    userName: socket.data.user.name,
  });
});

socket.on('stopTyping', (data) => {
  socket.to(data.room).emit('stopTyping', {
    userId: socket.data.user.id,
  });
});

6.4 Read Receipts

// Server: Read receipt handling
socket.on('markRead', async (data) => {
  const { roomId, messageId, userId } = data;

  // Save read status to DB
  await db.readReceipts.upsert({
    roomId,
    userId,
    lastReadMessageId: messageId,
    readAt: new Date(),
  });

  // Notify room participants
  socket.to(roomId).emit('readReceipt', {
    userId,
    messageId,
    readAt: Date.now(),
  });
});

// Client: Display read receipts
socket.on('readReceipt', (receipt) => {
  updateMessageStatus(receipt.messageId, receipt.userId, 'read');
});

// Auto-detect message visibility with Intersection Observer
const observer = new IntersectionObserver(
  (entries) => {
    entries.forEach((entry) => {
      if (entry.isIntersecting) {
        const messageId = entry.target.dataset.messageId;
        socket.emit('markRead', {
          roomId: currentRoom,
          messageId,
          userId: currentUser.id,
        });
        observer.unobserve(entry.target);
      }
    });
  },
  { threshold: 0.5 }
);

7. Real-time Notification System

7.1 Push Architecture

// Notification service
class NotificationService {
  constructor(io, redis) {
    this.io = io;
    this.redis = redis;
  }

  async send(userId, notification) {
    const enriched = {
      id: generateId(),
      ...notification,
      createdAt: Date.now(),
      read: false,
    };

    // Persist to database
    await db.notifications.insertOne({
      userId,
      ...enriched,
    });

    // Real-time delivery
    this.io.to(`user:${userId}`).emit('notification', enriched);

    // Queue push notification if offline
    const isOnline = await this.isUserOnline(userId);
    if (!isOnline) {
      await this.queuePushNotification(userId, enriched);
    }

    return enriched;
  }

  async sendBatch(userIds, notification) {
    // Efficient batch processing for bulk notifications
    const enriched = {
      id: generateId(),
      ...notification,
      createdAt: Date.now(),
      read: false,
    };

    // Bulk insert
    const docs = userIds.map((userId) => ({
      userId,
      ...enriched,
    }));
    await db.notifications.insertMany(docs);

    // Real-time delivery to online users
    userIds.forEach((userId) => {
      this.io.to(`user:${userId}`).emit('notification', enriched);
    });
  }

  async markAsRead(userId, notificationIds) {
    await db.notifications.updateMany(
      { _id: { $in: notificationIds }, userId },
      { $set: { read: true, readAt: new Date() } }
    );

    // Update unread count
    const unreadCount = await db.notifications.countDocuments({
      userId,
      read: false,
    });

    this.io.to(`user:${userId}`).emit('unreadCount', unreadCount);
  }
}

7.2 Notification Batching

// High-frequency event batching
class NotificationBatcher {
  constructor(flushInterval = 1000, maxBatchSize = 50) {
    this.buffer = new Map(); // userId -> notifications[]
    this.flushInterval = flushInterval;
    this.maxBatchSize = maxBatchSize;

    setInterval(() => this.flush(), this.flushInterval);
  }

  add(userId, notification) {
    if (!this.buffer.has(userId)) {
      this.buffer.set(userId, []);
    }

    const userBuffer = this.buffer.get(userId);
    userBuffer.push(notification);

    // Flush immediately if batch size exceeded
    if (userBuffer.length >= this.maxBatchSize) {
      this.flushUser(userId);
    }
  }

  flush() {
    this.buffer.forEach((notifications, userId) => {
      if (notifications.length > 0) {
        this.flushUser(userId);
      }
    });
  }

  flushUser(userId) {
    const notifications = this.buffer.get(userId) || [];
    if (notifications.length === 0) return;

    this.buffer.set(userId, []);

    // Deduplicate and group
    const grouped = this.groupNotifications(notifications);
    io.to(`user:${userId}`).emit('notifications:batch', grouped);
  }

  groupNotifications(notifications) {
    // Group notifications by type
    const groups = {};
    notifications.forEach((n) => {
      const key = n.type;
      if (!groups[key]) groups[key] = [];
      groups[key].push(n);
    });
    return groups;
  }
}

8. Collaborative Editing: OT vs CRDT

8.1 Operational Transformation (OT)

OT is the approach used by Google Docs, transforming concurrent operations to maintain consistency.

// OT basic concept
// When two users edit simultaneously:
// User A: insert('X', position=0) - Insert 'X' at position 0
// User B: insert('Y', position=2) - Insert 'Y' at position 2

// Transform A's operation from B's perspective:
// B inserted at position 2, so A's operation is unaffected (position 0)

// Transform B's operation from A's perspective:
// A inserted at position 0, so B's position shifts to 2+1=3

function transformInsert(opA, opB) {
  if (opA.position <= opB.position) {
    return { ...opB, position: opB.position + 1 };
  }
  return opB;
}

8.2 CRDT (Conflict-free Replicated Data Type)

CRDT is a data structure that guarantees eventual consistency without a central server.

// CRDT-based collaborative editing with Y.js
import * as Y from 'yjs';
import { WebsocketProvider } from 'y-websocket';

// Create shared document
const ydoc = new Y.Doc();

// Connect WebSocket provider
const provider = new WebsocketProvider(
  'wss://collaboration.example.com',
  'document-room-123',
  ydoc
);

// Shared text
const ytext = ydoc.getText('content');

// Observe text changes
ytext.observe((event) => {
  console.log('Text changed:', event.delta);
});

// Edit text
ytext.insert(0, 'Hello, ');
ytext.insert(7, 'World!');

// Shared map (key-value)
const ymap = ydoc.getMap('metadata');
ymap.set('title', 'My Document');
ymap.set('lastEdited', Date.now());

// Shared array
const yarray = ydoc.getArray('items');
yarray.push(['item1', 'item2']);

// Connection status monitoring
provider.on('status', (event) => {
  console.log('Connection status:', event.status);
});

// Awareness (cursor positions, user info)
const awareness = provider.awareness;
awareness.setLocalState({
  user: { name: 'Alice', color: '#ff0000' },
  cursor: { position: 42 },
});

awareness.on('change', () => {
  const states = awareness.getStates();
  updateCursors(states);
});

8.3 OT vs CRDT Comparison

FeatureOTCRDT
Server DependencyCentral server requiredWorks without server
Implementation ComplexityHighMedium
Memory UsageLowHigher (metadata)
Offline SupportDifficultNatural
Representative ImplementationsGoogle DocsY.js, Automerge
ScalabilityServer bottleneck possibleP2P scalable

9. Scaling WebSocket Horizontally

9.1 The Problem: Multiple Server Instances

WebSocket connections are bound to specific server instances. When multiple servers exist, messages from a client on one server must reach clients on other servers.

9.2 Redis Adapter (Socket.IO)

import { Server } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';

const io = new Server(httpServer);

// Create Redis clients
const pubClient = createClient({ url: 'redis://redis-host:6379' });
const subClient = pubClient.duplicate();

await Promise.all([pubClient.connect(), subClient.connect()]);

// Apply Redis Adapter
io.adapter(createAdapter(pubClient, subClient));

// Now broadcasting works across all server instances
io.emit('globalEvent', { message: 'All servers receive this' });

// Send to specific room (reaches clients on all servers)
io.to('room-123').emit('roomEvent', { data: 'value' });

9.3 Sticky Sessions Configuration

WebSocket connections must be routed to the same server during upgrade.

# Nginx configuration
upstream websocket_backend {
    ip_hash;  # Sticky Session
    server backend1:3000;
    server backend2:3000;
    server backend3:3000;
}

server {
    listen 80;

    location /socket.io/ {
        proxy_pass http://websocket_backend;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

        # Timeout settings
        proxy_read_timeout 86400s;
        proxy_send_timeout 86400s;
    }
}

9.4 Pub/Sub with NATS

import { connect } from 'nats';

class NATSAdapter {
  constructor(io) {
    this.io = io;
    this.serverId = process.env.SERVER_ID || generateId();
  }

  async connect() {
    this.nc = await connect({ servers: 'nats://nats-host:4222' });
    this.js = this.nc.jetstream();

    // Subscribe to broadcast channel
    const sub = this.nc.subscribe('ws.broadcast.>');
    (async () => {
      for await (const msg of sub) {
        const data = JSON.parse(msg.data.toString());
        // Ignore messages from self
        if (data.serverId === this.serverId) continue;

        const topic = msg.subject.split('.').slice(2).join('.');
        this.io.to(topic).emit(data.event, data.payload);
      }
    })();
  }

  broadcast(room, event, payload) {
    this.nc.publish(`ws.broadcast.${room}`, JSON.stringify({
      serverId: this.serverId,
      event,
      payload,
    }));
  }
}

10. Security

10.1 Authentication

// Token-based authentication
io.use(async (socket, next) => {
  try {
    const token = socket.handshake.auth.token
      || socket.handshake.query.token;

    if (!token) {
      return next(new Error('Authentication token required'));
    }

    // JWT verification
    const payload = await verifyJWT(token);

    // Check token expiration
    if (payload.exp * 1000 < Date.now()) {
      return next(new Error('Token expired'));
    }

    // Attach user info
    socket.data.user = {
      id: payload.sub,
      roles: payload.roles,
    };

    next();
  } catch (err) {
    next(new Error('Invalid authentication token'));
  }
});

// Periodic token refresh
socket.on('refreshToken', async (refreshToken, callback) => {
  try {
    const newToken = await refreshAccessToken(refreshToken);
    callback({ status: 'ok', token: newToken });
  } catch (err) {
    callback({ status: 'error', message: 'Token refresh failed' });
    socket.disconnect();
  }
});

10.2 Rate Limiting

// Token bucket algorithm
class TokenBucket {
  constructor(capacity, refillRate) {
    this.capacity = capacity;
    this.tokens = capacity;
    this.refillRate = refillRate; // Tokens added per second
    this.lastRefill = Date.now();
  }

  consume(count = 1) {
    this.refill();
    if (this.tokens >= count) {
      this.tokens -= count;
      return true;
    }
    return false;
  }

  refill() {
    const now = Date.now();
    const elapsed = (now - this.lastRefill) / 1000;
    this.tokens = Math.min(
      this.capacity,
      this.tokens + elapsed * this.refillRate
    );
    this.lastRefill = now;
  }
}

// Apply to Socket.IO
const rateLimiters = new Map();

io.use((socket, next) => {
  const userId = socket.data.user.id;
  if (!rateLimiters.has(userId)) {
    rateLimiters.set(userId, new TokenBucket(100, 10)); // 100 capacity, 10/sec refill
  }
  next();
});

socket.use(([event, ...args], next) => {
  const bucket = rateLimiters.get(socket.data.user.id);
  if (bucket.consume()) {
    next();
  } else {
    next(new Error('Rate limit exceeded'));
  }
});

10.3 Origin Validation

const io = new Server(httpServer, {
  cors: {
    origin: (origin, callback) => {
      const allowedOrigins = [
        'https://app.example.com',
        'https://admin.example.com',
      ];

      if (!origin || allowedOrigins.includes(origin)) {
        callback(null, true);
      } else {
        callback(new Error('Not allowed by CORS'));
      }
    },
    credentials: true,
  },
});

// Additional origin validation middleware
io.use((socket, next) => {
  const origin = socket.handshake.headers.origin;
  const allowedOrigins = process.env.ALLOWED_ORIGINS?.split(',') || [];

  if (allowedOrigins.length > 0 && !allowedOrigins.includes(origin)) {
    return next(new Error('Origin not allowed'));
  }
  next();
});

11. Performance Optimization

11.1 Message Compression

// permessage-deflate extension
import { WebSocketServer } from 'ws';

const wss = new WebSocketServer({
  port: 8080,
  perMessageDeflate: {
    zlibDeflateOptions: {
      chunkSize: 1024,
      memLevel: 7,
      level: 3, // Compression level (1-9)
    },
    zlibInflateOptions: {
      chunkSize: 10 * 1024,
    },
    // Only compress above threshold
    threshold: 1024, // Only messages above 1KB
    clientNoContextTakeover: true,
    serverNoContextTakeover: true,
    serverMaxWindowBits: 10,
    concurrencyLimit: 10,
  },
});

11.2 Binary Protocol (Protocol Buffers)

// message.proto
syntax = "proto3";

message ChatMessage {
  string id = 1;
  string room_id = 2;
  string user_id = 3;
  string content = 4;
  int64 timestamp = 5;
  MessageType type = 6;

  enum MessageType {
    TEXT = 0;
    IMAGE = 1;
    FILE = 2;
    SYSTEM = 3;
  }
}

message WebSocketFrame {
  string event = 1;
  oneof payload {
    ChatMessage chat_message = 2;
    PresenceUpdate presence = 3;
    TypingIndicator typing = 4;
  }
}
// Using Protobuf
import protobuf from 'protobufjs';

const root = await protobuf.load('message.proto');
const ChatMessage = root.lookupType('ChatMessage');
const WebSocketFrame = root.lookupType('WebSocketFrame');

// Encoding
function encodeMessage(event, payload) {
  const frame = WebSocketFrame.create({ event, chatMessage: payload });
  return WebSocketFrame.encode(frame).finish();
}

// Decoding
function decodeMessage(buffer) {
  return WebSocketFrame.decode(new Uint8Array(buffer));
}

// Binary transmission over WebSocket
ws.on('message', (data, isBinary) => {
  if (isBinary) {
    const frame = decodeMessage(data);
    handleFrame(frame);
  }
});

11.3 Connection Pooling and Optimization

// Connection limit per user
class ConnectionManager {
  constructor(maxConnectionsPerUser = 5) {
    this.connections = new Map(); // userId -> Set of socketIds
    this.maxPerUser = maxConnectionsPerUser;
  }

  addConnection(userId, socketId) {
    if (!this.connections.has(userId)) {
      this.connections.set(userId, new Set());
    }

    const userConnections = this.connections.get(userId);

    // Disconnect oldest if max connections exceeded
    if (userConnections.size >= this.maxPerUser) {
      const oldest = userConnections.values().next().value;
      io.sockets.sockets.get(oldest)?.disconnect();
      userConnections.delete(oldest);
    }

    userConnections.add(socketId);
  }

  removeConnection(userId, socketId) {
    const userConnections = this.connections.get(userId);
    if (userConnections) {
      userConnections.delete(socketId);
      if (userConnections.size === 0) {
        this.connections.delete(userId);
      }
    }
  }

  getConnectionCount() {
    let total = 0;
    this.connections.forEach((sockets) => {
      total += sockets.size;
    });
    return total;
  }
}

12. Production Operations

12.1 Health Check

// WebSocket server health check
app.get('/health', (req, res) => {
  const health = {
    status: 'ok',
    uptime: process.uptime(),
    connections: io.engine.clientsCount,
    rooms: io.sockets.adapter.rooms.size,
    memory: process.memoryUsage(),
    timestamp: Date.now(),
  };

  // Check connection threshold
  if (health.connections > 10000) {
    health.status = 'degraded';
    res.status(503);
  }

  res.json(health);
});

// Kubernetes readiness/liveness probes
app.get('/ready', (req, res) => {
  if (isReady) {
    res.status(200).json({ ready: true });
  } else {
    res.status(503).json({ ready: false });
  }
});

12.2 Graceful Shutdown

// Graceful shutdown implementation
async function gracefulShutdown(signal) {
  console.log(`Received ${signal}. Starting graceful shutdown...`);

  // 1. Stop accepting new connections
  httpServer.close();

  // 2. Notify existing clients
  io.emit('server:shutdown', {
    message: 'Server is shutting down. Please reconnect.',
    reconnectDelay: 5000,
  });

  // 3. Disconnect all connections (with grace period)
  const disconnectTimeout = setTimeout(() => {
    io.disconnectSockets(true);
  }, 10000);

  // 4. Wait for pending operations
  await flushPendingMessages();

  // 5. Clean up Redis connections
  await pubClient.quit();
  await subClient.quit();

  clearTimeout(disconnectTimeout);

  console.log('Graceful shutdown complete');
  process.exit(0);
}

process.on('SIGTERM', () => gracefulShutdown('SIGTERM'));
process.on('SIGINT', () => gracefulShutdown('SIGINT'));

12.3 Monitoring

// Prometheus metrics
import { Registry, Counter, Gauge, Histogram } from 'prom-client';

const register = new Registry();

const wsConnections = new Gauge({
  name: 'websocket_connections_active',
  help: 'Number of active WebSocket connections',
  registers: [register],
});

const wsMessages = new Counter({
  name: 'websocket_messages_total',
  help: 'Total WebSocket messages',
  labelNames: ['direction', 'type'],
  registers: [register],
});

const wsLatency = new Histogram({
  name: 'websocket_message_latency_seconds',
  help: 'WebSocket message processing latency',
  buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1],
  registers: [register],
});

// Collect metrics
io.on('connection', (socket) => {
  wsConnections.inc();

  socket.on('disconnect', () => {
    wsConnections.dec();
  });

  socket.use(([event, ...args], next) => {
    const start = process.hrtime.bigint();
    wsMessages.inc({ direction: 'inbound', type: event });

    next();

    const duration = Number(process.hrtime.bigint() - start) / 1e9;
    wsLatency.observe(duration);
  });
});

// Metrics endpoint
app.get('/metrics', async (req, res) => {
  res.set('Content-Type', register.contentType);
  res.end(await register.metrics());
});

13. Production Architecture Patterns

13.1 Chat Service Full Architecture

Client (React/Vue)
    |
    | WebSocket (Socket.IO)
    v
Load Balancer (Nginx, sticky sessions)
    |
    +---> App Server 1 ---+
    +---> App Server 2 ---+--> Redis Pub/Sub (message broadcast)
    +---> App Server 3 ---+
                           |
                           +--> PostgreSQL (message persistence)
                           +--> Redis (sessions, presence, cache)
                           +--> S3 (file uploads)

13.2 Event-Driven Architecture

// Event bus pattern
class EventBus {
  constructor(redis) {
    this.redis = redis;
    this.handlers = new Map();
  }

  async publish(channel, event) {
    await this.redis.publish(channel, JSON.stringify(event));
  }

  subscribe(channel, handler) {
    if (!this.handlers.has(channel)) {
      this.handlers.set(channel, []);
      this.redis.subscribe(channel);
    }
    this.handlers.get(channel).push(handler);
  }
}

// Usage
const bus = new EventBus(redisClient);

bus.subscribe('orders', (event) => {
  if (event.type === 'ORDER_CREATED') {
    // Real-time notification to relevant users
    notificationService.send(event.userId, {
      type: 'order',
      title: 'Order Confirmed',
      body: `Your order #${event.orderId} has been confirmed`,
    });
  }
});

14. Quiz

Q1: What HTTP status code is used during the WebSocket handshake?

Answer: 101 Switching Protocols

WebSocket connections are established through the HTTP Upgrade mechanism. When the client sends a request with the Upgrade: websocket header, the server responds with status code 101 to indicate a successful protocol switch. After this, bidirectional binary frame communication begins.

Q2: What header does SSE (Server-Sent Events) use for its auto-reconnect mechanism?

Answer: Last-Event-ID

The EventSource API automatically attempts reconnection when the connection is lost. It includes the ID of the last received event in the Last-Event-ID request header. The server can use this to resend events from after that ID. The retry field can also specify the reconnection interval in milliseconds.

Q3: What is the difference between Socket.IO's Room and Namespace?

Answer:

Namespace multiplexes multiple communication channels over a single WebSocket connection. They are distinguished by different endpoints (e.g., /chat, /admin) and can have independent middleware and event handlers.

Room is a logical grouping of clients within a Namespace. Clients join with socket.join(room) and messages are sent to specific groups with io.to(room).emit(). Rooms are a server-side concept that clients cannot access directly.

Q4: Explain the key difference between OT (Operational Transformation) and CRDT.

Answer:

OT relies on a central server that transforms concurrent editing operations to maintain consistency. Google Docs is the quintessential example. It is server-dependent and has limitations with offline editing.

CRDT uses data structures that inherently resolve conflicts, guaranteeing eventual consistency without a central server. Y.js and Automerge are representative implementations. CRDTs naturally support P2P communication and offline editing but have higher memory usage due to metadata.

Q5: Why is Redis Adapter needed when scaling WebSocket servers horizontally?

Answer:

WebSocket connections are bound to specific server instances. When multiple server instances exist, messages from a client connected to Server A must be delivered to clients connected to Server B.

Redis Adapter uses Redis Pub/Sub to broadcast messages across all server instances. When io.to(room).emit() is called on one server, the event propagates through Redis to all other servers, delivering the message to clients across the entire cluster. Additionally, sticky session configuration is required to ensure WebSocket connections route to the same server during the upgrade process.


15. References

  1. RFC 6455 - The WebSocket Protocol - https://datatracker.ietf.org/doc/html/rfc6455
  2. MDN - WebSocket API - https://developer.mozilla.org/en-US/docs/Web/API/WebSocket
  3. MDN - Server-Sent Events - https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events
  4. Socket.IO Documentation - https://socket.io/docs/v4/
  5. ws - WebSocket library for Node.js - https://github.com/websockets/ws
  6. Y.js - CRDT Framework - https://docs.yjs.dev/
  7. Automerge - CRDT Library - https://automerge.org/
  8. Redis Adapter for Socket.IO - https://socket.io/docs/v4/redis-adapter/
  9. NATS Messaging - https://nats.io/
  10. Protocol Buffers - https://protobuf.dev/
  11. Scaling WebSocket (Socket.IO docs) - https://socket.io/docs/v4/using-multiple-nodes/
  12. HTML Living Standard - Server-Sent Events - https://html.spec.whatwg.org/multipage/server-sent-events.html
  13. pino - Node.js Logger - https://getpino.io/

The real-time communication technologies covered in this guide each have unique strengths. SSE excels at simple unidirectional streaming, WebSocket at bidirectional communication, and Socket.IO at projects requiring rich features. In production, always ensure proper authentication, rate limiting, and monitoring are in place, and configure Redis Adapter with sticky sessions for horizontal scaling.