Cloud and platform

Event Sourcing and CQRS in 2026: Production Implementation Patterns

Event Sourcing and CQRS solve real problems in complex systems, but they introduce complexity that must be justified by the problem you're solving.

3/10/20269 min readCloud
Event Sourcing and CQRS in 2026: Production Implementation Patterns

Executive summary

Event Sourcing and CQRS solve real problems in complex systems, but they introduce complexity that must be justified by the problem you're solving.

Last updated: 3/10/2026

Executive summary

Event Sourcing and Command Query Responsibility Segregation (CQRS) are architectural patterns that solve specific problems in complex systems: auditability, temporal queries, and high read/write throughput. They are not silver bullets—they introduce significant operational complexity that must be justified by actual requirements.

In 2026, these patterns matured from academic concepts to production-ready implementations with established tooling, patterns, and operational practices. The decision to adopt them should be based on concrete problems: need for event replay, audit trails, or separation of read/write models.

When Event Sourcing and CQRS are justified

Problem symptoms that indicate these patterns might help

SymptomTraditional ApproachEvent Sourcing/CQRS Solution
Audit trail missingManual logging, incomplete historyImmutable event log provides complete audit
Unable to replayCurrent state onlyEvent stream enables temporal replay
Read/write performanceSame model for bothSeparate models optimize each workload
Complex queriesJoins across multiple tablesProjections pre-compute read models
Eventual consistencySynchronous transactionsAsynchronous event processing acceptable

When NOT to use these patterns

  • Simple CRUD applications with straightforward queries
  • Systems where strong consistency is mandatory
  • Teams without capacity for architectural complexity
  • Low-volume workloads where optimization isn't needed
  • Regulatory environments requiring immediate consistency

Event Store design

Core event store contract

typescript// Event store interface
interface EventStore {
  appendEvents(
    streamId: string,
    expectedVersion: number,
    events: DomainEvent[]
  ): Promise<void>;

  readEvents(
    streamId: string,
    fromVersion?: number,
    toVersion?: number
  ): Promise<DomainEvent[]>;

  readAllEvents(
    fromTimestamp?: Date,
    toTimestamp?: Date
  ): Promise<DomainEvent[]>;

  subscribeToStream(
    streamId: string,
    handler: EventHandler
  ): Promise<void>;

  subscribeToAll(
    handler: EventHandler
  ): Promise<void>;
}

// Domain event contract
interface DomainEvent {
  eventId: string;
  streamId: string;
  eventType: string;
  data: any;
  metadata: EventMetadata;
  version: number;
  timestamp: Date;
}

interface EventMetadata {
  causationId?: string;
  correlationId?: string;
  userId?: string;
  source: string;
}

Event store implementation

typescript// PostgreSQL-based event store
import { Pool } from 'pg';

class PostgreSQLEventStore implements EventStore {
  constructor(private pool: Pool) {
    this.initializeSchema();
  }

  private async initializeSchema(): Promise<void> {
    await this.pool.query(`
      CREATE TABLE IF NOT EXISTS events (
        id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
        stream_id VARCHAR(255) NOT NULL,
        version INTEGER NOT NULL,
        event_type VARCHAR(255) NOT NULL,
        data JSONB NOT NULL,
        metadata JSONB,
        causation_id UUID,
        correlation_id UUID,
        user_id UUID,
        source VARCHAR(255),
        timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),

        UNIQUE(stream_id, version)
      );

      CREATE INDEX IF NOT EXISTS idx_events_stream_id ON events(stream_id, version);
      CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp);
      CREATE INDEX IF NOT EXISTS idx_events_event_type ON events(event_type);
      CREATE INDEX IF NOT EXISTS idx_events_correlation_id ON events(correlation_id);
      CREATE INDEX IF NOT EXISTS idx_events_causation_id ON events(causation_id);

      CREATE TABLE IF NOT EXISTS projections (
        id VARCHAR(255) PRIMARY KEY,
        name VARCHAR(255) NOT NULL,
        last_processed_event_id UUID NOT NULL,
        last_processed_timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
        UNIQUE(name)
      );
    `);
  }

  async appendEvents(
    streamId: string,
    expectedVersion: number,
    events: DomainEvent[]
  ): Promise<void> {
    const client = await this.pool.connect();

    try {
      await client.query('BEGIN');

      // Check expected version
      const versionResult = await client.query(
        'SELECT MAX(version) as version FROM events WHERE stream_id = $1',
        [streamId]
      );

      const currentVersion = versionResult.rows[0]?.version || 0;

      if (expectedVersion !== null && currentVersion !== expectedVersion) {
        throw new ConcurrencyError(
          `Expected version ${expectedVersion} but found ${currentVersion} for stream ${streamId}`
        );
      }

      // Insert events
      for (let i = 0; i < events.length; i++) {
        const event = events[i];
        const version = currentVersion + i + 1;

        await client.query(
          `INSERT INTO events (
            stream_id, version, event_type, data, metadata,
            causation_id, correlation_id, user_id, source, timestamp
          ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`,
          [
            streamId,
            version,
            event.eventType,
            JSON.stringify(event.data),
            JSON.stringify(event.metadata),
            event.metadata.causationId,
            event.metadata.correlationId,
            event.metadata.userId,
            event.metadata.source,
            event.timestamp
          ]
        );
      }

      await client.query('COMMIT');
    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }

  async readEvents(
    streamId: string,
    fromVersion?: number,
    toVersion?: number
  ): Promise<DomainEvent[]> {
    let query = 'SELECT * FROM events WHERE stream_id = $1';
    const params: any[] = [streamId];
    let paramIndex = 2;

    if (fromVersion !== undefined) {
      query += ` AND version >= $${paramIndex}`;
      params.push(fromVersion);
      paramIndex++;
    }

    if (toVersion !== undefined) {
      query += ` AND version <= $${paramIndex}`;
      params.push(toVersion);
      paramIndex++;
    }

    query += ' ORDER BY version ASC';

    const result = await this.pool.query(query, params);

    return result.rows.map(row => this.mapRowToEvent(row));
  }

  async readAllEvents(
    fromTimestamp?: Date,
    toTimestamp?: Date
  ): Promise<DomainEvent[]> {
    let query = 'SELECT * FROM events';
    const params: any[] = [];
    let paramIndex = 1;

    if (fromTimestamp !== undefined) {
      query += ` WHERE timestamp >= $${paramIndex}`;
      params.push(fromTimestamp);
      paramIndex++;
    }

    if (toTimestamp !== undefined) {
      query += (fromTimestamp !== undefined ? ' AND' : ' WHERE');
      query += ` timestamp <= $${paramIndex}`;
      params.push(toTimestamp);
      paramIndex++;
    }

    query += ' ORDER BY timestamp ASC';

    const result = await this.pool.query(query, params);

    return result.rows.map(row => this.mapRowToEvent(row));
  }

  private mapRowToEvent(row: any): DomainEvent {
    return {
      eventId: row.id,
      streamId: row.stream_id,
      eventType: row.event_type,
      data: row.data,
      metadata: {
        causationId: row.causation_id,
        correlationId: row.correlation_id,
        userId: row.user_id,
        source: row.source
      },
      version: row.version,
      timestamp: row.timestamp
    };
  }
}

Aggregate design

Aggregate root pattern

typescript// Aggregate base class
abstract class AggregateRoot {
  protected _id: string;
  private _version: number;
  private _events: DomainEvent[] = [];

  constructor(id: string, version: number = 0) {
    this._id = id;
    this._version = version;
  }

  get id(): string {
    return this._id;
  }

  get version(): number {
    return this._version;
  }

  protected applyEvent(event: DomainEvent): void {
    this._events.push(event);
    this._version++;
    this.apply(event);
  }

  abstract apply(event: DomainEvent): void;

  getUncommittedEvents(): DomainEvent[] {
    return [...this._events];
  }

  markEventsAsCommitted(): void {
    this._events = [];
  }

  static loadFromHistory<T extends AggregateRoot>(
    AggregateClass: new (id: string, version: number) => T,
    events: DomainEvent[]
  ): T {
    const aggregate = new AggregateClass(events[0].streamId, events[0].version - 1);

    for (const event of events) {
      aggregate.apply(event);
      aggregate._version = event.version;
    }

    return aggregate;
  }
}

// Order aggregate example
class Order extends AggregateRoot {
  private status: OrderStatus;
  private items: OrderItem[];
  private customerId: string;
  private totalAmount: number;
  private createdAt: Date;
  private updatedAt: Date;

  constructor(id: string, version: number = 0) {
    super(id, version);
    this.status = OrderStatus.CREATED;
    this.items = [];
    this.totalAmount = 0;
    this.createdAt = new Date();
    this.updatedAt = new Date();
  }

  apply(event: DomainEvent): void {
    switch (event.eventType) {
      case 'OrderCreated':
        this.applyOrderCreated(event);
        break;
      case 'ItemAdded':
        this.applyItemAdded(event);
        break;
      case 'OrderConfirmed':
        this.applyOrderConfirmed(event);
        break;
      case 'OrderCancelled':
        this.applyOrderCancelled(event);
        break;
      case 'PaymentReceived':
        this.applyPaymentReceived(event);
        break;
      case 'OrderShipped':
        this.applyOrderShipped(event);
        break;
    }
  }

  // Command methods
  createOrder(customerId: string, items: OrderItem[]): void {
    if (this.status !== OrderStatus.CREATED) {
      throw new Error('Order already exists');
    }

    const totalAmount = items.reduce((sum, item) => sum + item.price * item.quantity, 0);

    this.applyEvent({
      eventId: this.generateEventId(),
      streamId: this._id,
      eventType: 'OrderCreated',
      data: {
        customerId,
        items,
        totalAmount
      },
      metadata: this.createMetadata(),
      version: this._version + 1,
      timestamp: new Date()
    });
  }

  addItem(item: OrderItem): void {
    if (this.status !== OrderStatus.CREATED && this.status !== OrderStatus.CONFIRMED) {
      throw new Error('Cannot add item to order in this status');
    }

    this.applyEvent({
      eventId: this.generateEventId(),
      streamId: this._id,
      eventType: 'ItemAdded',
      data: { item },
      metadata: this.createMetadata(),
      version: this._version + 1,
      timestamp: new Date()
    });
  }

  confirmOrder(): void {
    if (this.status !== OrderStatus.CREATED) {
      throw new Error('Order must be in CREATED status to confirm');
    }

    if (this.items.length === 0) {
      throw new Error('Cannot confirm empty order');
    }

    this.applyEvent({
      eventId: this.generateEventId(),
      streamId: this._id,
      eventType: 'OrderConfirmed',
      data: {},
      metadata: this.createMetadata(),
      version: this._version + 1,
      timestamp: new Date()
    });
  }

  // Event application methods
  private applyOrderCreated(event: DomainEvent): void {
    const data = event.data as OrderCreatedData;
    this.customerId = data.customerId;
    this.items = data.items;
    this.totalAmount = data.totalAmount;
    this.status = OrderStatus.CREATED;
  }

  private applyItemAdded(event: DomainEvent): void {
    const data = event.data as ItemAddedData;
    this.items.push(data.item);
    this.totalAmount += data.item.price * data.item.quantity;
  }

  private applyOrderConfirmed(event: DomainEvent): void {
    this.status = OrderStatus.CONFIRMED;
    this.updatedAt = new Date();
  }

  private generateEventId(): string {
    return `${this._id}-${this._version + 1}-${Date.now()}`;
  }

  private createMetadata(): EventMetadata {
    return {
      source: 'order-service',
      correlationId: this.generateCorrelationId()
    };
  }

  private generateCorrelationId(): string {
    return `corr-${Date.now()}-${Math.random()}`;
  }
}

Projection patterns

Read model projections

typescript// Projection handler interface
interface ProjectionHandler<T> {
  handle(event: DomainEvent): Promise<void>;
  getProjection(id: string): Promise<T | null>;
  rebuildProjection(): Promise<void>;
}

// Order summary projection
class OrderSummaryProjection implements ProjectionHandler<OrderSummary> {
  private db: Database;

  constructor(db: Database) {
    this.db = db;
    this.initializeSchema();
  }

  private async initializeSchema(): Promise<void> {
    await this.db.query(`
      CREATE TABLE IF NOT EXISTS order_summaries (
        order_id VARCHAR(255) PRIMARY KEY,
        customer_id VARCHAR(255) NOT NULL,
        status VARCHAR(50) NOT NULL,
        total_amount DECIMAL(10, 2) NOT NULL,
        item_count INTEGER NOT NULL,
        created_at TIMESTAMP WITH TIME ZONE NOT NULL,
        updated_at TIMESTAMP WITH TIME ZONE NOT NULL
      );

      CREATE INDEX IF NOT EXISTS idx_order_summaries_customer_id ON order_summaries(customer_id);
      CREATE INDEX IF NOT EXISTS idx_order_summaries_status ON order_summaries(status);
    `);
  }

  async handle(event: DomainEvent): Promise<void> {
    switch (event.eventType) {
      case 'OrderCreated':
        await this.handleOrderCreated(event);
        break;
      case 'ItemAdded':
        await this.handleItemAdded(event);
        break;
      case 'OrderConfirmed':
        await this.handleOrderConfirmed(event);
        break;
      case 'OrderCancelled':
        await this.handleOrderCancelled(event);
        break;
      case 'PaymentReceived':
        await this.handlePaymentReceived(event);
        break;
      case 'OrderShipped':
        await this.handleOrderShipped(event);
        break;
    }
  }

  private async handleOrderCreated(event: DomainEvent): Promise<void> {
    const data = event.data as OrderCreatedData;

    await this.db.query(
      `INSERT INTO order_summaries (
        order_id, customer_id, status, total_amount, item_count, created_at, updated_at
      ) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
      [
        event.streamId,
        data.customerId,
        OrderStatus.CREATED,
        data.totalAmount,
        data.items.length,
        event.timestamp,
        event.timestamp
      ]
    );
  }

  private async handleItemAdded(event: DomainEvent): Promise<void> {
    const data = event.data as ItemAddedData;

    await this.db.query(
      `UPDATE order_summaries
       SET total_amount = total_amount + $1,
           item_count = item_count + 1,
           updated_at = $2
       WHERE order_id = $3`,
      [
        data.item.price * data.item.quantity,
        event.timestamp,
        event.streamId
      ]
    );
  }

  private async handleOrderConfirmed(event: DomainEvent): Promise<void> {
    await this.db.query(
      `UPDATE order_summaries
       SET status = $1, updated_at = $2
       WHERE order_id = $3`,
      [OrderStatus.CONFIRMED, event.timestamp, event.streamId]
    );
  }

  async getProjection(orderId: string): Promise<OrderSummary | null> {
    const result = await this.db.query(
      'SELECT * FROM order_summaries WHERE order_id = $1',
      [orderId]
    );

    if (result.rows.length === 0) {
      return null;
    }

    return result.rows[0];
  }

  async rebuildProjection(): Promise<void> {
    console.log('Rebuilding order summary projection...');

    await this.db.query('TRUNCATE TABLE order_summaries');

    const allEvents = await this.eventStore.readAllEvents();

    for (const event of allEvents) {
      await this.handle(event);
    }

    console.log('Order summary projection rebuilt');
  }
}

Event versioning and evolution

Schema evolution strategies

typescript// Event upcasters for versioning
class EventUpcaster {
  private upcasters: Map<string, EventUpcasterFn> = new Map();

  constructor() {
    this.registerUpcasters();
  }

  private registerUpcasters(): void {
    this.upcasters.set('OrderCreated:v1', this.upcastOrderCreatedV1ToV2);
    this.upcasters.set('ItemAdded:v1', this.upcastItemAddedV1ToV2);
  }

  upcast(event: DomainEvent): DomainEvent {
    const upcaster = this.upcasters.get(`${event.eventType}:v1`);

    if (upcaster) {
      return upcaster(event);
    }

    return event;
  }

  private upcastOrderCreatedV1ToV2(event: DomainEvent): DomainEvent {
    // V1 event structure
    const v1Data = event.data as OrderCreatedV1Data;

    // V2 event structure - add currency field
    const v2Data: OrderCreatedV2Data = {
      ...v1Data,
      currency: 'USD', // Default currency
      paymentMethod: 'CREDIT_CARD' // New field
    };

    return {
      ...event,
      eventType: 'OrderCreated',
      version: event.version,
      data: v2Data
    };
  }

  private upcastItemAddedV1ToV2(event: DomainEvent): DomainEvent {
    const v1Data = event.data as ItemAddedV1Data;

    // V2 adds discount field
    const v2Data: ItemAddedV2Data = {
      ...v1Data,
      discount: 0,
      taxAmount: v1Data.item.price * v1Data.item.quantity * 0.1
    };

    return {
      ...event,
      eventType: 'ItemAdded',
      version: event.version,
      data: v2Data
    };
  }
}

// Event store with versioning
class VersionedEventStore implements EventStore {
  private baseEventStore: EventStore;
  private upcaster: EventUpcaster;

  constructor(baseEventStore: EventStore) {
    this.baseEventStore = baseEventStore;
    this.upcaster = new EventUpcaster();
  }

  async readEvents(
    streamId: string,
    fromVersion?: number,
    toVersion?: number
  ): Promise<DomainEvent[]> {
    const events = await this.baseEventStore.readEvents(
      streamId,
      fromVersion,
      toVersion
    );

    // Apply upcasters to old event versions
    return events.map(event => this.upcaster.upcast(event));
  }
}

Operational considerations

Event replay strategies

typescript// Event replay service
class EventReplayService {
  private eventStore: EventStore;
  private projectionHandlers: ProjectionHandler<any>[];

  constructor(
    eventStore: EventStore,
    projectionHandlers: ProjectionHandler<any>[]
  ) {
    this.eventStore = eventStore;
    this.projectionHandlers = projectionHandlers;
  }

  async replayFromTimestamp(timestamp: Date): Promise<void> {
    console.log(`Replaying events from ${timestamp.toISOString()}`);

    const events = await this.eventStore.readAllEvents(timestamp);

    for (const projection of this.projectionHandlers) {
      console.log(`Replaying events for ${projection.constructor.name}`);

      await this.replayForProjection(projection, events);
    }

    console.log('Event replay completed');
  }

  private async replayForProjection(
    projection: ProjectionHandler<any>,
    events: DomainEvent[]
  ): Promise<void> {
    // Clear projection tables
    await projection.rebuildProjection();

    // Replay events
    for (const event of events) {
      try {
        await projection.handle(event);
      } catch (error) {
        console.error(
          `Failed to handle event ${event.eventId} for projection`,
          error
        );
      }
    }
  }

  async replaySingleStream(streamId: string): Promise<void> {
    console.log(`Replaying events for stream ${streamId}`);

    const events = await this.eventStore.readEvents(streamId);

    for (const projection of this.projectionHandlers) {
      for (const event of events) {
        await projection.handle(event);
      }
    }

    console.log('Stream replay completed');
  }
}

Monitoring and alerting

typescript// Event store monitoring
class EventStoreMonitor {
  private eventStore: EventStore;
  private metrics: MetricsClient;

  async monitorEventProcessing(): Promise<void> {
    // Event throughput
    const hourlyThroughput = await this.calculateHourlyThroughput();
    this.metrics.gauge('event_store_hourly_throughput', hourlyThroughput);

    // Event lag
    const eventLag = await this.calculateEventLag();
    this.metrics.gauge('event_store_lag_seconds', eventLag);

    // Projection lag
    for (const projection of this.projections) {
      const projectionLag = await this.calculateProjectionLag(projection);
      this.metrics.gauge(`projection_${projection.name}_lag_seconds`, projectionLag);
    }

    // Alert on thresholds
    if (eventLag > 300) { // 5 minutes
      await this.alert('Event store lag exceeds 5 minutes', {
        lag: eventLag
      });
    }
  }

  private async calculateHourlyThroughput(): Promise<number> {
    const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000);
    const events = await this.eventStore.readAllEvents(oneHourAgo);
    return events.length;
  }

  private async calculateEventLag(): Promise<number> {
    const latestEvent = await this.getLatestEvent();
    if (!latestEvent) {
      return 0;
    }

    return Math.floor((Date.now() - latestEvent.timestamp.getTime()) / 1000);
  }

  private async calculateProjectionLag(projection: ProjectionHandler): Promise<number> {
    const latestEvent = await this.getLatestEvent();
    const projectionState = await this.getProjectionState(projection);

    if (!projectionState) {
      return 0;
    }

    const projectedEvent = await this.eventStore.readEvents(
      projection.streamId,
      projectionState.lastProcessedVersion,
      projectionState.lastProcessedVersion
    );

    if (!projectedEvent || projectedEvent.length === 0) {
      return 0;
    }

    return Math.floor(
      (Date.now() - projectedEvent[0].timestamp.getTime()) / 1000
    );
  }
}

Decision framework

When to adopt Event Sourcing and CQRS

Adopt if:

  • You need complete audit trails
  • You require event replay capabilities
  • Read and write workloads have different performance requirements
  • You're building complex business domains with rich state transitions
  • You need to integrate with external systems through events

Avoid if:

  • Simple CRUD operations suffice
  • Strong consistency is required
  • Your team lacks capacity for architectural complexity
  • You're in early stages of product development
  • The problem doesn't justify the solution

Conclusion

Event Sourcing and CQRS are powerful patterns for complex systems, but they're architectural commitments with significant operational implications. The decision to adopt them should be based on concrete requirements: audit trails, event replay, or separation of read/write models.

Start with the simplest solution that meets your needs. Implement these patterns incrementally, starting with a single bounded context, and expand only when justified by business requirements and team capacity.

The investment in Event Sourcing and CQRS pays dividends when you actually need what they provide: complete history, temporal queries, and optimized read/write models.


Need help designing an Event Sourcing and CQRS architecture for your complex system? Talk to Imperialis about architecture patterns, event store design, and production implementation.

Sources

Related reading