Skip to content
Published on

Event Sourcing Production Anti-Patterns: Schema Evolution, Snapshotting, and Event Store Scaling

Authors
  • Name
    Twitter
Event Sourcing Production Patterns

Why This Post Exists

Previous posts in this series covered the fundamentals of Event Sourcing and CQRS implementation, including EventStoreDB-based architecture and Kafka-based microservice patterns with Saga orchestration. This post assumes you already understand the core concepts: append-only event streams, aggregate state reconstruction via replay, read model projections, and the separation of command and query paths.

This post focuses exclusively on what goes wrong in production and the patterns that prevent it. Specifically: event schema evolution and versioning strategies, snapshotting for performance, projection rebuilding at scale, event store technology selection trade-offs, anti-patterns that cause production incidents, and real failure case studies with recovery procedures.

Event Schema Evolution and Versioning

In an event-sourced system, events are immutable. Once persisted, they cannot be modified or deleted. But business requirements change constantly. Fields get added, renamed, or removed. Event structures evolve. This is the single most challenging operational problem in Event Sourcing.

The Four Versioning Strategies

There are four primary strategies for handling event schema changes, each with different trade-offs.

Strategy 1: Weak Schema (Tolerant Reader)

Add new optional fields while keeping old fields. Consumers ignore fields they do not understand.

// Version 1: Original event
interface OrderPlacedV1 {
  type: 'OrderPlaced'
  orderId: string
  customerId: string
  totalAmount: number
}

// Version 2: Added shippingAddress (optional for backward compat)
interface OrderPlacedV2 {
  type: 'OrderPlaced'
  orderId: string
  customerId: string
  totalAmount: number
  shippingAddress?: {
    street: string
    city: string
    zipCode: string
    country: string
  }
  currency?: string // default: 'USD'
}

// Consumer uses tolerant reader pattern
function handleOrderPlaced(event: OrderPlacedV1 | OrderPlacedV2) {
  const currency = 'currency' in event ? event.currency : 'USD'
  const address = 'shippingAddress' in event ? event.shippingAddress : null // fetch from customer profile as fallback
}

This works for additive changes only. It breaks down when you need to rename, restructure, or remove fields.

Strategy 2: Upcasting (On-Read Transformation)

Transform old events into the latest schema version at read time. The event store retains the original bytes, but the application layer applies an upcaster chain before processing.

from dataclasses import dataclass
from typing import Any, Callable

@dataclass
class EventEnvelope:
    stream_id: str
    event_type: str
    version: int
    data: dict
    metadata: dict

class UpcasterChain:
    """Chains upcasters to transform events from any version to the latest."""

    def __init__(self):
        self._upcasters: dict[tuple[str, int], Callable] = {}

    def register(self, event_type: str, from_version: int,
                 upcaster: Callable[[dict], dict]):
        self._upcasters[(event_type, from_version)] = upcaster

    def upcast(self, envelope: EventEnvelope,
               target_version: int) -> EventEnvelope:
        current = envelope
        while current.version < target_version:
            key = (current.event_type, current.version)
            if key not in self._upcasters:
                raise ValueError(
                    f"No upcaster for {current.event_type} "
                    f"v{current.version} -> v{current.version + 1}"
                )
            new_data = self._upcasters[key](current.data)
            current = EventEnvelope(
                stream_id=current.stream_id,
                event_type=current.event_type,
                version=current.version + 1,
                data=new_data,
                metadata=current.metadata,
            )
        return current

# Register upcasters
chain = UpcasterChain()

# V1 -> V2: split 'name' into 'firstName' and 'lastName'
chain.register('CustomerRegistered', 1, lambda d: {
    **d,
    'firstName': d['name'].split(' ')[0],
    'lastName': ' '.join(d['name'].split(' ')[1:]),
})

# V2 -> V3: add 'tier' field with default
chain.register('CustomerRegistered', 2, lambda d: {
    **d,
    'tier': 'standard',
})

Upcasting is the recommended approach for most schema evolution scenarios. The original events remain untouched in storage; only the in-memory representation changes.

Strategy 3: New Event Type

When the semantics of an event change fundamentally, introduce a new event type rather than versioning the existing one.

// Instead of OrderPlacedV3 with breaking changes,
// create a completely new event type
interface OrderSubmitted {
  type: 'OrderSubmitted' // new name signals new semantics
  orderId: string
  customer: {
    id: string
    tier: 'standard' | 'premium' | 'enterprise'
  }
  lineItems: Array<{
    productId: string
    quantity: number
    unitPrice: number
    discount: number
  }>
  pricing: {
    subtotal: number
    tax: number
    shipping: number
    total: number
    currency: string
  }
  submittedAt: string
}

This avoids version number proliferation but requires projections to handle both the old and new event types during the transition period.

Strategy 4: Copy-and-Transform (Event Store Migration)

Rewrite the entire event store by applying transformations to every event. This is the nuclear option. Use it only when upcasting becomes too expensive at read time or when you need to purge data for regulatory compliance (such as GDPR right to erasure).

-- WARNING: This is destructive. Take full backups first.
-- Step 1: Create a new events table
CREATE TABLE events_v2 (
    global_position BIGSERIAL PRIMARY KEY,
    stream_id UUID NOT NULL,
    stream_position INTEGER NOT NULL,
    event_type VARCHAR(256) NOT NULL,
    data JSONB NOT NULL,
    metadata JSONB NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    UNIQUE (stream_id, stream_position)
);

-- Step 2: Migrate with transformation
INSERT INTO events_v2 (stream_id, stream_position, event_type, data, metadata, created_at)
SELECT
    stream_id,
    stream_position,
    event_type,
    CASE
        WHEN event_type = 'CustomerRegistered' AND data->>'version' = '1'
        THEN jsonb_build_object(
            'version', '3',
            'firstName', split_part(data->>'name', ' ', 1),
            'lastName', regexp_replace(data->>'name', '^[^ ]+ ', ''),
            'email', data->>'email',
            'tier', 'standard'
        )
        ELSE data
    END,
    metadata,
    created_at
FROM events
ORDER BY global_position;

-- Step 3: Swap tables (during maintenance window)
ALTER TABLE events RENAME TO events_deprecated;
ALTER TABLE events_v2 RENAME TO events;

Schema Evolution Decision Matrix

Change TypeWeak SchemaUpcastingNew Event TypeCopy-Transform
Add optional fieldBest choiceOverkillOverkillOverkill
Rename fieldNot possibleBest choiceGoodHeavy
Split fieldNot possibleBest choiceGoodHeavy
Remove fieldRiskyGoodBest choiceGood
Change semanticsNot possibleNot possibleBest choiceGood
Regulatory purgeNot possibleNot possibleNot possibleRequired

Snapshotting Strategies

As an aggregate accumulates thousands of events over its lifetime, replaying all events on every command becomes unacceptably slow. Snapshotting addresses this by periodically saving the aggregate's current state so that future loads only need to replay events after the snapshot.

Snapshot Implementation Patterns

Pattern 1: Periodic Snapshotting (Every N Events)

from dataclasses import dataclass, field
from typing import Optional
import json

@dataclass
class Snapshot:
    stream_id: str
    version: int  # event version at snapshot time
    state: dict
    created_at: str

class SnapshotStore:
    """PostgreSQL-backed snapshot store."""

    def __init__(self, conn):
        self._conn = conn

    async def save_snapshot(self, snapshot: Snapshot):
        await self._conn.execute("""
            INSERT INTO snapshots (stream_id, version, state, created_at)
            VALUES ($1, $2, $3, $4)
            ON CONFLICT (stream_id) DO UPDATE
            SET version = $4, state = $3, created_at = $4
        """, snapshot.stream_id, snapshot.version,
             json.dumps(snapshot.state), snapshot.created_at)

    async def load_snapshot(self, stream_id: str) -> Optional[Snapshot]:
        row = await self._conn.fetchrow("""
            SELECT stream_id, version, state, created_at
            FROM snapshots WHERE stream_id = $1
        """, stream_id)
        if row:
            return Snapshot(
                stream_id=row['stream_id'],
                version=row['version'],
                state=json.loads(row['state']),
                created_at=row['created_at'],
            )
        return None

class AggregateRepository:
    SNAPSHOT_INTERVAL = 100  # snapshot every 100 events

    def __init__(self, event_store, snapshot_store):
        self._event_store = event_store
        self._snapshot_store = snapshot_store

    async def load(self, stream_id: str):
        snapshot = await self._snapshot_store.load_snapshot(stream_id)

        if snapshot:
            aggregate = self._rebuild_from_snapshot(snapshot)
            start_version = snapshot.version + 1
        else:
            aggregate = self._create_empty()
            start_version = 0

        events = await self._event_store.read_stream(
            stream_id, from_version=start_version
        )
        for event in events:
            aggregate.apply(event)

        return aggregate

    async def save(self, aggregate):
        new_events = aggregate.uncommitted_events
        current_version = aggregate.version

        await self._event_store.append(
            aggregate.stream_id, new_events, current_version
        )

        # Create snapshot if threshold reached
        if current_version % self.SNAPSHOT_INTERVAL == 0:
            await self._snapshot_store.save_snapshot(Snapshot(
                stream_id=aggregate.stream_id,
                version=current_version,
                state=aggregate.to_dict(),
                created_at=self._now(),
            ))

Pattern 2: Temporal Snapshotting

Take snapshots based on time intervals rather than event counts. Useful when event frequency varies dramatically across aggregates.

Pattern 3: On-Demand Snapshotting

Create snapshots only when load latency exceeds a threshold. This avoids unnecessary snapshot writes for aggregates that are loaded infrequently.

Snapshot Anti-Patterns

  • Snapshotting too early: Do not introduce snapshotting before you have a measured performance problem. Most aggregates have fewer than 100 events and replay is instant.
  • Snapshot version coupling: Snapshots must be versioned. If you change the aggregate state shape, old snapshots become unreadable. Always include a schema version in the snapshot.
  • Never deleting old snapshots: Snapshots accumulate storage. Implement a retention policy that keeps only the latest snapshot per aggregate.

Projection Rebuilding

Projections (read models) are derived from the event stream. When a projection has a bug, or when you introduce a new projection, you must rebuild it from scratch by replaying all relevant events.

Rebuilding at Scale

Rebuilding a projection over millions of events is not trivial. Naive approaches that process events one at a time from position 0 can take hours or days.

import asyncio
from dataclasses import dataclass

@dataclass
class ProjectionRebuildConfig:
    batch_size: int = 1000
    max_concurrent_batches: int = 5
    checkpoint_interval: int = 10_000
    progress_log_interval: int = 50_000

class ProjectionRebuilder:
    """Optimized projection rebuild with batching and checkpointing."""

    def __init__(self, event_store, projection_handler, checkpoint_store,
                 config: ProjectionRebuildConfig = None):
        self._event_store = event_store
        self._handler = projection_handler
        self._checkpoint_store = checkpoint_store
        self._config = config or ProjectionRebuildConfig()

    async def rebuild(self, projection_name: str):
        # Resume from last checkpoint if rebuild was interrupted
        last_position = await self._checkpoint_store.get(
            f"rebuild:{projection_name}"
        )
        start_position = last_position + 1 if last_position else 0

        total_processed = 0
        position = start_position

        while True:
            events = await self._event_store.read_all(
                from_position=position,
                count=self._config.batch_size,
            )
            if not events:
                break

            # Process batch
            await self._handler.handle_batch(events)
            position = events[-1].global_position + 1
            total_processed += len(events)

            # Checkpoint periodically
            if total_processed % self._config.checkpoint_interval == 0:
                await self._checkpoint_store.save(
                    f"rebuild:{projection_name}",
                    events[-1].global_position,
                )

            if total_processed % self._config.progress_log_interval == 0:
                print(
                    f"Rebuild progress: {total_processed} events processed, "
                    f"position: {position}"
                )

        print(f"Rebuild complete: {total_processed} total events processed")

Key Principles for Projection Rebuilding

  1. Idempotency: Projections must be idempotent. Processing the same event twice must produce the same result. Use UPSERT or INSERT ... ON CONFLICT rather than blind inserts.
  2. Checkpointing: Store the last processed global position so rebuilds can resume after failure.
  3. Blue-Green projections: Build the new projection into a separate table or database, then swap atomically. This avoids downtime during rebuilds.
  4. Backpressure: If the projection writes to an external system (Elasticsearch, Redis), implement rate limiting to avoid overwhelming it during replay.

Event Store Technology Comparison

Choosing the right event store is a critical architectural decision. The following table compares four commonly used options across key dimensions.

DimensionEventStoreDBApache KafkaPostgreSQLDynamoDB
Primary purposePurpose-built event storeDistributed streaming platformGeneral-purpose RDBMSManaged NoSQL (AWS)
Stream-level readsNative (indexed by stream ID)Requires consumer filtering by keyCustom query on stream_id columnPartition key query
Global orderingBuilt-in global positionPer-partition only, no global orderBIGSERIAL global position columnNo native global order
Optimistic concurrencyNative expected versionNot supportedCustom version column checkCondition expressions
Built-in projectionsYes (JavaScript-based)No (use Kafka Streams or ksqlDB)No (custom code)No (use DynamoDB Streams plus Lambda)
Max throughputApprox 15,000 writes/sec per node100,000+ writes/sec per cluster10,000-50,000 writes/sec (tuned)Virtually unlimited (on-demand)
Subscription modelCatch-up and persistent subscriptionsConsumer groups with offset trackingPolling or LISTEN/NOTIFYDynamoDB Streams (CDC)
Operational complexityMedium (dedicated cluster)High (ZooKeeper/KRaft, brokers, schema registry)Low (existing infrastructure)Low (fully managed)
Data retentionUnlimited (append-only)Configurable (retention policy)Unlimited (manual management)Configurable (TTL)
GDPR deletionCrypto-shredding or stream deletionTopic compaction with tombstonesSQL DELETE (breaks immutability)TTL or conditional delete
Best forDedicated ES systemsHigh-throughput event streamingTeams with existing PostgreSQLAWS-native serverless

When to Use What

  • EventStoreDB: You are building a dedicated event-sourced system and want native support for streams, subscriptions, and projections out of the box. Your write throughput is under 15,000 events per second.
  • Apache Kafka: Your primary need is high-throughput event streaming across microservices. You are willing to build event sourcing primitives (stream reads, concurrency control) on top of Kafka yourself or with frameworks.
  • PostgreSQL: You already run PostgreSQL and want to start with event sourcing without introducing new infrastructure. You need strong transactional guarantees and can implement stream reads with standard SQL.
  • DynamoDB: You are on AWS and want a fully managed, serverless event store. DynamoDB Streams provides CDC for projections, and the pay-per-request pricing model works well for variable workloads.

Production Anti-Patterns

Anti-Pattern 1: Property Sourcing

Storing events like NameChanged, EmailChanged, AddressChanged instead of domain-meaningful events like CustomerRelocated or CustomerContactUpdated. Property-sourced events carry no business intent and make it impossible to understand why a change occurred.

// BAD: Property sourcing - no business intent
const badEvents = [
  { type: 'FieldUpdated', field: 'status', value: 'suspended' },
  { type: 'FieldUpdated', field: 'reason', value: 'payment_failed' },
]

// GOOD: Domain events with business meaning
const goodEvents = [
  {
    type: 'AccountSuspendedDueToPaymentFailure',
    accountId: 'ACC-123',
    failedPaymentId: 'PAY-456',
    suspendedAt: '2026-03-07T10:30:00Z',
    retryScheduledAt: '2026-03-10T10:30:00Z',
  },
]

Anti-Pattern 2: Using Event Sourcing as Inter-Service Communication

Event Sourcing is a local decision within a single bounded context. Internal domain events should not be directly published to other services. Instead, publish integration events on a message bus, mapping internal events to a public contract.

Anti-Pattern 3: Large Events (Fat Events)

Storing the entire aggregate state in every event defeats the purpose of event sourcing. Events should contain only the delta -- what changed and why.

Anti-Pattern 4: Missing Idempotency in Event Handlers

When a projection handler crashes mid-batch, events will be redelivered. Without idempotency, you get duplicated data in your read models.

-- BAD: Blind insert (duplicates on redelivery)
INSERT INTO order_summary (order_id, status, total)
VALUES ('ORD-001', 'placed', 150.00);

-- GOOD: Idempotent upsert
INSERT INTO order_summary (order_id, status, total, last_event_position)
VALUES ('ORD-001', 'placed', 150.00, 42)
ON CONFLICT (order_id) DO UPDATE
SET status = EXCLUDED.status,
    total = EXCLUDED.total,
    last_event_position = EXCLUDED.last_event_position
WHERE order_summary.last_event_position < EXCLUDED.last_event_position;

Anti-Pattern 5: No Correlation or Causation IDs

Without correlation IDs, debugging production issues across event streams is nearly impossible. Every event should carry correlationId (the originating request) and causationId (the event that directly caused this event).

Real-World Failure Case Studies

Case 1: Projection Lag Causing Stale Reads

Scenario: An e-commerce platform experienced checkout failures because the inventory projection lagged behind the event store by 30 seconds during peak traffic. Customers saw items as "in stock" in the read model, but the write model rejected orders because inventory was already depleted.

Root cause: The projection handler processed events sequentially with no parallelism. During flash sales, the event write rate exceeded projection processing speed.

Recovery:

  1. Implement partitioned projection processing -- parallelize by aggregate ID.
  2. Add a "freshness" indicator on read model queries returning the projection lag.
  3. For critical paths like checkout, read directly from the event stream (stronger consistency) rather than the projection.

Case 2: Snapshot Corruption After Aggregate Refactoring

Scenario: A team refactored the Order aggregate, splitting shippingInfo into separate address and carrier fields. After deployment, all orders with existing snapshots failed to load because deserialization threw errors on the old snapshot format.

Recovery:

  1. Invalidate all existing snapshots for the affected aggregate type.
  2. Deploy with snapshot loading wrapped in a try-catch that falls back to full replay on deserialization failure.
  3. Implement snapshot versioning with migration logic.

Case 3: Event Store Disk Exhaustion

Scenario: A high-volume IoT platform stored raw sensor readings as events. With 50,000 sensors reporting every second, the event store grew by 200 GB per day. After three months, the EventStoreDB cluster ran out of disk space during a weekend, causing a full outage.

Recovery:

  1. Archive old event streams to cold storage (S3) using a scheduled job.
  2. Use stream truncation for streams beyond the retention window.
  3. Reconsider whether raw sensor readings belong in the event store at all -- a time-series database may be more appropriate for high-frequency telemetry.

Operational Checklist

Before going to production with Event Sourcing, verify the following.

  • Event schema registry with version tracking is in place.
  • Upcasters are registered and tested for all historical event versions.
  • Snapshot strategy is defined with versioning and retention policies.
  • Projection rebuilding can be triggered without downtime (blue-green deployment).
  • All event handlers are idempotent with deduplication checks.
  • Correlation and causation IDs are present on every event.
  • Event store monitoring includes: append latency, subscription lag, disk usage, stream count.
  • Disaster recovery plan includes: event store backup/restore procedure, projection rebuild runbook.
  • GDPR compliance strategy is documented (crypto-shredding or stream deletion).

Architecture Diagram: Production Event Sourcing System

+------------------+     +-----------------+     +------------------+
|   API Gateway    |---->| Command Handler |---->| Domain Aggregate |
+------------------+     +-----------------+     +--------+---------+
                                                          |
                              +---------------------------+
                              | Domain Events
                              v
                    +--------------------+
                    |    Event Store     |
                    | (append-only log)  |
                    +--------+-----------+
                             |
              +--------------+--------------+
              |              |              |
              v              v              v
     +--------+---+  +------+------+  +----+--------+
     | Projection |  | Projection  |  | Projection  |
     | Handler A  |  | Handler B   |  | Handler C   |
     +--------+---+  +------+------+  +----+--------+
              |              |              |
              v              v              v
     +--------+---+  +------+------+  +----+--------+
     | PostgreSQL |  |Elasticsearch|  |    Redis     |
     | Read Model |  |Search Index |  |   Cache      |
     +------------+  +-------------+  +-------------+
Event Schema Evolution Pipeline:

  Event Store (raw bytes)
       |
       v
  +----------+     +-----------+     +-----------+
  | Upcaster |---->| Upcaster  |---->| Upcaster  |
  |  V1->V2  |     |  V2->V3   |     |  V3->V4   |
  +----------+     +-----------+     +-----------+
                                          |
                                          v
                                  +---------------+
                                  | Current Event |
                                  |  (Version 4)  |
                                  +---------------+
                                          |
                              +-----------+-----------+
                              |                       |
                              v                       v
                     +--------+------+      +---------+-----+
                     | Command Side  |      | Projection    |
                     | (Aggregate)   |      | (Read Model)  |
                     +---------------+      +---------------+

Conclusion

Event Sourcing delivers tremendous value in systems that require full audit trails, temporal queries, and independent scalability of reads and writes. But the operational complexity is real. Schema evolution, snapshotting, projection management, and event store scaling are not optional concerns -- they are the core challenges that determine whether your event-sourced system thrives or becomes an operational burden.

The key takeaways are: use upcasting as your primary schema evolution strategy, snapshot only when measured performance demands it, design projections for idempotent replay, and choose your event store technology based on your actual throughput requirements and operational capabilities.

References