Skip to content
Published on

MongoDB Sharding and Replica Set Operations Guide: From Cluster Design to Troubleshooting

Authors
  • Name
    Twitter
MongoDB Sharding and Replica Set

Introduction

MongoDB provides two core distributed mechanisms for large-scale data processing and high availability: Sharding and Replica Sets. Sharding horizontally distributes data across multiple servers to scale storage capacity and throughput, while Replica Sets replicate data to ensure automatic failover upon failure. Properly combining these two mechanisms in production is the foundation of reliable MongoDB operations.

This guide covers the full spectrum of production cluster operations: from Sharding architecture components and Shard Key selection strategies, through Replica Set failover mechanisms, chunk migration and balancer management, read/write concern configuration, monitoring and performance tuning, backup/recovery, and real-world troubleshooting scenarios.

Sharding Architecture Overview

A MongoDB Sharded Cluster consists of three components.

mongos (Query Router): Acts as a proxy that routes client requests to the appropriate shard. Being stateless, multiple mongos instances can be deployed behind a load balancer.

Config Server: A Replica Set that stores cluster metadata (shard list, chunk ranges, data distribution information). Since MongoDB 3.4, Config Servers must be deployed as a Replica Set.

Shard: A Replica Set that stores actual data. Each shard is responsible for a subset (chunks) of the total data.

# Sharded Cluster Architecture (Minimum Configuration)
#
# Client
#   |
#   v
# mongos (Router) x 2  -- behind a load balancer
#   |
#   v
# Config Server Replica Set (3 nodes)
#   |
#   +--- Shard 1 Replica Set (Primary + 2 Secondary)
#   +--- Shard 2 Replica Set (Primary + 2 Secondary)
#   +--- Shard 3 Replica Set (Primary + 2 Secondary)
#
# Total nodes: 2 (mongos) + 3 (config) + 9 (shard) = 14

Initial Sharded Cluster Setup

// Connect to mongos and add shards
sh.addShard('shard1/mongo-shard1-a:27018,mongo-shard1-b:27018,mongo-shard1-c:27018')
sh.addShard('shard2/mongo-shard2-a:27018,mongo-shard2-b:27018,mongo-shard2-c:27018')
sh.addShard('shard3/mongo-shard3-a:27018,mongo-shard3-b:27018,mongo-shard3-c:27018')

// Enable sharding for the database
sh.enableSharding('myapp')

// Shard a collection with a Hashed key
sh.shardCollection('myapp.orders', { customerId: 'hashed' })

// Shard a collection with a Ranged key
sh.shardCollection('myapp.logs', { timestamp: 1 })

// Check cluster status
sh.status()

Shard Key Selection Strategies

The Shard Key is extremely difficult to change once set (resharding is supported from MongoDB 5.0 onward), so it must be chosen carefully during the design phase. A good Shard Key should have high cardinality, low frequency, and non-monotonic characteristics.

Shard Key Strategy Comparison

StrategyDescriptionProsConsIdeal Use Case
Hashed ShardingDistributes by hash of field valueEven distribution, prevents hotspotsInefficient range queries, requires all-shard scansHigh-frequency writes, no range queries needed
Ranged ShardingDistributes by value rangeEfficient range queries, targets specific shardsPossible hotspots, monotonic key issuesTime-based logs, frequent range queries
Zone ShardingPlaces data by region/conditionData locality, regulatory complianceComplex setup, potential imbalanceMulti-region, data sovereignty requirements
Compound KeyCombination of multiple fieldsHigher cardinality, query optimizationIncreased design complexityWhen a single field is insufficient

Shard Key Configuration Examples

// 1. Hashed Shard Key - when even distribution is the top priority
sh.shardCollection('myapp.users', { _id: 'hashed' })

// 2. Compound Shard Key - satisfying cardinality and query patterns
sh.shardCollection('myapp.events', { tenantId: 1, createdAt: 1 })

// 3. Zone Sharding - data separation by region
sh.addShardToZone('shard1', 'KR')
sh.addShardToZone('shard2', 'US')
sh.addShardToZone('shard3', 'EU')

sh.updateZoneKeyRange(
  'myapp.users',
  { region: 'KR', _id: MinKey },
  { region: 'KR', _id: MaxKey },
  'KR'
)
sh.updateZoneKeyRange(
  'myapp.users',
  { region: 'US', _id: MinKey },
  { region: 'US', _id: MaxKey },
  'US'
)

// 4. MongoDB 8.0+ resharding - when shard key change is needed
sh.reshardCollection('myapp.orders', { customerId: 1, orderId: 1 })

Shard Key Anti-Patterns

Using a monotonically increasing field (e.g., ObjectId, timestamp) as a Ranged Shard Key causes all new writes to concentrate on the last chunk, creating a Hot Shard. In this case, use Hashed Sharding or combine with a compound key. Fields with very low cardinality (e.g., boolean, status enums) cannot distribute data evenly and may cause Jumbo Chunks.

Replica Set Configuration and Failover

Replica Set Initialization

// Initialize Replica Set on the Primary
rs.initiate({
  _id: 'shard1',
  members: [
    { _id: 0, host: 'mongo-shard1-a:27018', priority: 10 },
    { _id: 1, host: 'mongo-shard1-b:27018', priority: 5 },
    { _id: 2, host: 'mongo-shard1-c:27018', priority: 1 },
  ],
  settings: {
    electionTimeoutMillis: 10000, // default 10 seconds
    heartbeatTimeoutSecs: 10, // heartbeat timeout
    chainingAllowed: true, // allow secondary-to-secondary chaining
  },
})

// Check Replica Set status
rs.status()

// Check replication lag
rs.printReplicationInfo()
rs.printSecondaryReplicationInfo()

Failover Mechanism

MongoDB Replica Set failover operates on a Raft-based consensus algorithm. When the Primary becomes unresponsive, the remaining members initiate an election, and the member that receives a majority of votes becomes the new Primary.

Election conditions:

  • A majority of members must be able to communicate with each other
  • Members with priority 0 cannot become Primary
  • Hidden and delayed members can only vote but are excluded from Primary candidacy
  • An election starts if the Primary is unreachable within electionTimeoutMillis (default 10 seconds)
// Manual failover (for maintenance)
rs.stepDown(60) // relinquish Primary role for 60 seconds

// Promote a specific member by adjusting priority
cfg = rs.conf()
cfg.members[1].priority = 100
rs.reconfig(cfg)

// Hidden member setup (for backup)
cfg = rs.conf()
cfg.members[2].hidden = true
cfg.members[2].priority = 0
rs.reconfig(cfg)

// Delayed member setup (for data recovery, 1-hour delay)
cfg = rs.conf()
cfg.members[2].secondaryDelaySecs = 3600
cfg.members[2].hidden = true
cfg.members[2].priority = 0
rs.reconfig(cfg)

Chunk Migration and Balancer

How the Balancer Works

The balancer runs on the Config Server Primary and moves chunks when the chunk count difference between shards exceeds the migration threshold. In a cluster with n shards, a maximum of n/2 concurrent migrations are possible.

// Check balancer status
sh.getBalancerState()
sh.isBalancerRunning()

// Stop/start balancer
sh.stopBalancer()
sh.startBalancer()

// Set balancer time window (run only between 2-6 AM)
db.adminCommand({
  balancerStart: 1,
  condition: {
    activeWindow: { start: '02:00', stop: '06:00' },
  },
})

// Disable balancing for a specific collection
sh.disableBalancing('myapp.orders')

// Manual chunk move
sh.moveChunk('myapp.orders', { customerId: 'user123' }, 'shard3')

// Check current migration status
db.adminCommand({ currentOp: true, desc: /migrate/ })

Chunk Management

// Change chunk size (default 128MB in MongoDB 7.0+)
use config
db.settings.updateOne(
  { _id: "chunksize" },
  { $set: { value: 64 } },
  { upsert: true }
)

// Find jumbo chunks
db.chunks.find({ jumbo: true }).forEach(function(chunk) {
  print("Jumbo chunk: " + chunk.ns + " on " + chunk.shard)
})

// Force split a jumbo chunk
sh.splitAt("myapp.orders", { customerId: "splitPoint" })

// Check chunk distribution
db.chunks.aggregate([
  { $group: { _id: "$shard", count: { $sum: 1 } } },
  { $sort: { count: -1 } }
])

Read/Write Concern Configuration

Read Preference Mode Comparison

ModeRead TargetConsistencyLatencyIdeal Use Case
primaryPrimary onlyStrong consistencyBaselineReal-time data required
primaryPreferredPrimary first, fallback to SecondaryMostly strongBaselinePrimary failure fallback
secondarySecondary onlyEventual consistencyLow (nearest node)Reports, analytics queries
secondaryPreferredSecondary first, fallback to PrimaryMostly eventualLowRead load distribution
nearestLowest network latency nodeEventual consistencyMinimumGeo-distributed deployments

Write Concern Level Comparison

Write ConcernDescriptionDurabilityPerformanceIdeal Use Case
w: 0No acknowledgmentVery lowHighestLogs, metrics (loss acceptable)
w: 1Primary acknowledgment onlyModerateHighGeneral writes
w: "majority"Majority acknowledgmentHighModerateCritical data (recommended default)
w: NN node acknowledgmentVery highLowFinance, strict requirements
j: trueJournal write confirmationHighestLowDurability first
// Read/Write Concern configuration examples

// 1. Collection-level Read Preference
db.orders.find({ status: 'pending' }).readPref('secondaryPreferred')

// 2. Connection string configuration
// mongodb://mongos1:27017,mongos2:27017/myapp?readPreference=secondaryPreferred&w=majority

// 3. Write Concern specification
db.orders.insertOne(
  { customerId: 'user123', amount: 50000 },
  { writeConcern: { w: 'majority', j: true, wtimeout: 5000 } }
)

// 4. Set global default Read/Write Concern
db.adminCommand({
  setDefaultRWConcern: 1,
  defaultWriteConcern: { w: 'majority', j: true },
  defaultReadConcern: { level: 'majority' },
})

Monitoring and Performance Tuning

Essential Monitoring Queries

// 1. Check shard data distribution
db.orders.getShardDistribution()

// 2. Find long-running operations
db.currentOp({ secs_running: { $gt: 10 } })

// 3. Enable slow query profiling
db.setProfilingLevel(1, { slowms: 100 })

// 4. Analyze slow queries
db.system.profile
  .find({
    millis: { $gt: 100 },
    ns: /myapp\.orders/,
  })
  .sort({ ts: -1 })
  .limit(10)

// 5. Server status overview
db.serverStatus().opcounters // operation counters
db.serverStatus().connections // connection count
db.serverStatus().wiredTiger.cache // cache status

// 6. Check Replica Set replication lag
db.adminCommand({ replSetGetStatus: 1 }).members.forEach(function (m) {
  if (m.stateStr === 'SECONDARY') {
    var lag = (new Date() - m.optimeDate) / 1000
    print(m.name + ' lag: ' + lag + 's')
  }
})

// 7. Index usage statistics
db.orders.aggregate([{ $indexStats: {} }])

// 8. Config Server metadata consistency check (MongoDB 7.0+)
db.adminCommand({ checkMetadataConsistency: 1 })

Performance Tuning Checklist

  • Index Covering: Use compound indexes that include the Shard Key so queries are processed on a single shard
  • Minimize Scatter-Gather: Include the Shard Key in query filters to target specific shards
  • Connection Pooling: Set appropriate mongos connection pool sizes (default maxPoolSize=100)
  • Read Preference: Route analytics queries to Secondary nodes
  • Chunk Size Adjustment: For write-heavy workloads, reduce chunk size to minimize migration impact
  • WiredTiger Cache: Set cacheSizeGB to 50-60% of system RAM

Backup and Recovery

Backup with mongodump/mongorestore

# 1. Stop balancer before backup (sharded cluster)
mongosh --host mongos1:27017 --eval "sh.stopBalancer()"

# 2. Back up individual shards (--oplog for point-in-time consistency)
mongodump --host shard1/mongo-shard1-a:27018 \
  --oplog --out /backup/shard1-$(date +%Y%m%d)

mongodump --host shard2/mongo-shard2-a:27018 \
  --oplog --out /backup/shard2-$(date +%Y%m%d)

# 3. Back up Config Server
mongodump --host configRS/config1:27019 \
  --oplog --out /backup/config-$(date +%Y%m%d)

# 4. Restart balancer
mongosh --host mongos1:27017 --eval "sh.startBalancer()"

# 5. Restore (--oplogReplay for point-in-time recovery)
mongorestore --host shard1/mongo-shard1-a:27018 \
  --oplogReplay /backup/shard1-20260313

Point-in-Time Recovery (PITR) and Oplog

// Check oplog size and retention
db.getReplicationInfo()
// Example output:
// configured oplog size:   2048MB
// log length start to end: 172800secs (48hrs)
// oplog first event time:  ...
// oplog last event time:   ...

// Resize oplog (recommend minimum 48 hours of retention)
db.adminCommand({ replSetResizeOplog: 1, size: 4096 }) // 4GB

Production backup recommendations:

  • For small clusters, mongodump is sufficient; for large-scale environments, use filesystem snapshots (LVM/EBS) or MongoDB Atlas Backup
  • Configure oplog size to retain at least 48 hours of writes
  • Use Delayed Secondary members for quick recovery from logical errors (accidental data deletion)
  • Perform backup restoration tests regularly (at least monthly)

Troubleshooting Scenarios

1. Shard Key Hotspot

Symptoms: Only one shard shows high CPU/disk I/O while the rest remain idle

Cause: Using a monotonically increasing field (ObjectId, timestamp) as a Ranged Shard Key, concentrating all new writes on the last chunk

Resolution:

  • Perform resharding with a Hashed Shard Key (MongoDB 5.0+)
  • Use a Compound Shard Key to increase cardinality
  • Use sh.shardAndDistributeCollection() in MongoDB 8.0 for fast redistribution

2. Split-brain (Network Partition)

Symptoms: Two or more nodes acting as Primary simultaneously, risking data inconsistency

Cause: Network partition splits the Replica Set into two groups, each electing its own Primary

Resolution:

  • Maintain an odd number (3, 5, 7) of Replica Set members so only one majority group can exist
  • During a network partition, the Primary outside the majority group is automatically demoted to Secondary
  • Use w: "majority" Write Concern to prevent writes not replicated to the majority

3. Chunk Migration Failure

Symptoms: Balancer logs show "Data transfer error" or "WriteConcernFailed" errors

Cause: Severe replication lag, insufficient network bandwidth, or lack of disk space on the target shard

Resolution:

// Enable _secondaryThrottle to ensure replication sync
use config
db.settings.updateOne(
  { _id: "balancer" },
  { $set: { "_secondaryThrottle": { w: 2 } } },
  { upsert: true }
)

// Reduce rangeDeleter batch size
db.adminCommand({
  setParameter: 1,
  rangeDeleterBatchSize: 32,
  rangeDeleterBatchDelayMS: 100
})

4. Replica Lag

Symptoms: Secondary optimeDate falls tens of seconds or more behind the Primary

Cause: Heavy write load, slow disk I/O, index builds, network bottlenecks

Resolution:

  • Check and increase WiredTiger cache size if needed
  • Identify and terminate heavy queries running on Secondaries
  • Verify network bandwidth and optimize oplog transfer paths
  • Perform index builds in a rolling fashion (one node at a time)

Operations Checklist

Pre-deployment:

  • Was the Shard Key chosen based on query patterns, cardinality, and write distribution?
  • Is each Replica Set configured with at least 3 members (odd number)?
  • Is the Config Server deployed as a separate Replica Set (3 nodes)?
  • Are there 2+ mongos instances behind a load balancer?
  • Are authentication (keyFile or x.509) and network encryption (TLS) configured?

Daily operations:

  • Is the balancer running normally with even chunk distribution?
  • Is Replica Lag within threshold (e.g., 10 seconds)?
  • Is the connection count appropriate relative to maxIncomingConnections?
  • Are slow query logs analyzed regularly?
  • Is disk usage below 70%?

Backup/Recovery:

  • Is automated backup running daily?
  • Does the oplog size retain at least 48 hours of data?
  • Was a backup restoration test performed within the last 30 days?
  • Is a Delayed Secondary configured for logical error protection?

Disaster preparedness:

  • Was an automatic failover test performed within the last quarter?
  • Are monitoring alerts set for Replica Lag, disk usage, and connection count?
  • Is a recovery procedure for split-brain scenarios documented?

References