Event Sourcing and CQRS in 2026: Production Implementation Patterns
Event Sourcing and CQRS solve real problems in complex systems, but they introduce complexity that must be justified by the problem you're solving.
Executive summary
Event Sourcing and CQRS solve real problems in complex systems, but they introduce complexity that must be justified by the problem you're solving.
Last updated: 3/10/2026
Executive summary
Event Sourcing and Command Query Responsibility Segregation (CQRS) are architectural patterns that solve specific problems in complex systems: auditability, temporal queries, and high read/write throughput. They are not silver bullets—they introduce significant operational complexity that must be justified by actual requirements.
In 2026, these patterns matured from academic concepts to production-ready implementations with established tooling, patterns, and operational practices. The decision to adopt them should be based on concrete problems: need for event replay, audit trails, or separation of read/write models.
When Event Sourcing and CQRS are justified
Problem symptoms that indicate these patterns might help
| Symptom | Traditional Approach | Event Sourcing/CQRS Solution |
|---|---|---|
| Audit trail missing | Manual logging, incomplete history | Immutable event log provides complete audit |
| Unable to replay | Current state only | Event stream enables temporal replay |
| Read/write performance | Same model for both | Separate models optimize each workload |
| Complex queries | Joins across multiple tables | Projections pre-compute read models |
| Eventual consistency | Synchronous transactions | Asynchronous event processing acceptable |
When NOT to use these patterns
- Simple CRUD applications with straightforward queries
- Systems where strong consistency is mandatory
- Teams without capacity for architectural complexity
- Low-volume workloads where optimization isn't needed
- Regulatory environments requiring immediate consistency
Event Store design
Core event store contract
typescript// Event store interface
interface EventStore {
appendEvents(
streamId: string,
expectedVersion: number,
events: DomainEvent[]
): Promise<void>;
readEvents(
streamId: string,
fromVersion?: number,
toVersion?: number
): Promise<DomainEvent[]>;
readAllEvents(
fromTimestamp?: Date,
toTimestamp?: Date
): Promise<DomainEvent[]>;
subscribeToStream(
streamId: string,
handler: EventHandler
): Promise<void>;
subscribeToAll(
handler: EventHandler
): Promise<void>;
}
// Domain event contract
interface DomainEvent {
eventId: string;
streamId: string;
eventType: string;
data: any;
metadata: EventMetadata;
version: number;
timestamp: Date;
}
interface EventMetadata {
causationId?: string;
correlationId?: string;
userId?: string;
source: string;
}Event store implementation
typescript// PostgreSQL-based event store
import { Pool } from 'pg';
class PostgreSQLEventStore implements EventStore {
constructor(private pool: Pool) {
this.initializeSchema();
}
private async initializeSchema(): Promise<void> {
await this.pool.query(`
CREATE TABLE IF NOT EXISTS events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
stream_id VARCHAR(255) NOT NULL,
version INTEGER NOT NULL,
event_type VARCHAR(255) NOT NULL,
data JSONB NOT NULL,
metadata JSONB,
causation_id UUID,
correlation_id UUID,
user_id UUID,
source VARCHAR(255),
timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
UNIQUE(stream_id, version)
);
CREATE INDEX IF NOT EXISTS idx_events_stream_id ON events(stream_id, version);
CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp);
CREATE INDEX IF NOT EXISTS idx_events_event_type ON events(event_type);
CREATE INDEX IF NOT EXISTS idx_events_correlation_id ON events(correlation_id);
CREATE INDEX IF NOT EXISTS idx_events_causation_id ON events(causation_id);
CREATE TABLE IF NOT EXISTS projections (
id VARCHAR(255) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
last_processed_event_id UUID NOT NULL,
last_processed_timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
UNIQUE(name)
);
`);
}
async appendEvents(
streamId: string,
expectedVersion: number,
events: DomainEvent[]
): Promise<void> {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// Check expected version
const versionResult = await client.query(
'SELECT MAX(version) as version FROM events WHERE stream_id = $1',
[streamId]
);
const currentVersion = versionResult.rows[0]?.version || 0;
if (expectedVersion !== null && currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion} but found ${currentVersion} for stream ${streamId}`
);
}
// Insert events
for (let i = 0; i < events.length; i++) {
const event = events[i];
const version = currentVersion + i + 1;
await client.query(
`INSERT INTO events (
stream_id, version, event_type, data, metadata,
causation_id, correlation_id, user_id, source, timestamp
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`,
[
streamId,
version,
event.eventType,
JSON.stringify(event.data),
JSON.stringify(event.metadata),
event.metadata.causationId,
event.metadata.correlationId,
event.metadata.userId,
event.metadata.source,
event.timestamp
]
);
}
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
async readEvents(
streamId: string,
fromVersion?: number,
toVersion?: number
): Promise<DomainEvent[]> {
let query = 'SELECT * FROM events WHERE stream_id = $1';
const params: any[] = [streamId];
let paramIndex = 2;
if (fromVersion !== undefined) {
query += ` AND version >= $${paramIndex}`;
params.push(fromVersion);
paramIndex++;
}
if (toVersion !== undefined) {
query += ` AND version <= $${paramIndex}`;
params.push(toVersion);
paramIndex++;
}
query += ' ORDER BY version ASC';
const result = await this.pool.query(query, params);
return result.rows.map(row => this.mapRowToEvent(row));
}
async readAllEvents(
fromTimestamp?: Date,
toTimestamp?: Date
): Promise<DomainEvent[]> {
let query = 'SELECT * FROM events';
const params: any[] = [];
let paramIndex = 1;
if (fromTimestamp !== undefined) {
query += ` WHERE timestamp >= $${paramIndex}`;
params.push(fromTimestamp);
paramIndex++;
}
if (toTimestamp !== undefined) {
query += (fromTimestamp !== undefined ? ' AND' : ' WHERE');
query += ` timestamp <= $${paramIndex}`;
params.push(toTimestamp);
paramIndex++;
}
query += ' ORDER BY timestamp ASC';
const result = await this.pool.query(query, params);
return result.rows.map(row => this.mapRowToEvent(row));
}
private mapRowToEvent(row: any): DomainEvent {
return {
eventId: row.id,
streamId: row.stream_id,
eventType: row.event_type,
data: row.data,
metadata: {
causationId: row.causation_id,
correlationId: row.correlation_id,
userId: row.user_id,
source: row.source
},
version: row.version,
timestamp: row.timestamp
};
}
}Aggregate design
Aggregate root pattern
typescript// Aggregate base class
abstract class AggregateRoot {
protected _id: string;
private _version: number;
private _events: DomainEvent[] = [];
constructor(id: string, version: number = 0) {
this._id = id;
this._version = version;
}
get id(): string {
return this._id;
}
get version(): number {
return this._version;
}
protected applyEvent(event: DomainEvent): void {
this._events.push(event);
this._version++;
this.apply(event);
}
abstract apply(event: DomainEvent): void;
getUncommittedEvents(): DomainEvent[] {
return [...this._events];
}
markEventsAsCommitted(): void {
this._events = [];
}
static loadFromHistory<T extends AggregateRoot>(
AggregateClass: new (id: string, version: number) => T,
events: DomainEvent[]
): T {
const aggregate = new AggregateClass(events[0].streamId, events[0].version - 1);
for (const event of events) {
aggregate.apply(event);
aggregate._version = event.version;
}
return aggregate;
}
}
// Order aggregate example
class Order extends AggregateRoot {
private status: OrderStatus;
private items: OrderItem[];
private customerId: string;
private totalAmount: number;
private createdAt: Date;
private updatedAt: Date;
constructor(id: string, version: number = 0) {
super(id, version);
this.status = OrderStatus.CREATED;
this.items = [];
this.totalAmount = 0;
this.createdAt = new Date();
this.updatedAt = new Date();
}
apply(event: DomainEvent): void {
switch (event.eventType) {
case 'OrderCreated':
this.applyOrderCreated(event);
break;
case 'ItemAdded':
this.applyItemAdded(event);
break;
case 'OrderConfirmed':
this.applyOrderConfirmed(event);
break;
case 'OrderCancelled':
this.applyOrderCancelled(event);
break;
case 'PaymentReceived':
this.applyPaymentReceived(event);
break;
case 'OrderShipped':
this.applyOrderShipped(event);
break;
}
}
// Command methods
createOrder(customerId: string, items: OrderItem[]): void {
if (this.status !== OrderStatus.CREATED) {
throw new Error('Order already exists');
}
const totalAmount = items.reduce((sum, item) => sum + item.price * item.quantity, 0);
this.applyEvent({
eventId: this.generateEventId(),
streamId: this._id,
eventType: 'OrderCreated',
data: {
customerId,
items,
totalAmount
},
metadata: this.createMetadata(),
version: this._version + 1,
timestamp: new Date()
});
}
addItem(item: OrderItem): void {
if (this.status !== OrderStatus.CREATED && this.status !== OrderStatus.CONFIRMED) {
throw new Error('Cannot add item to order in this status');
}
this.applyEvent({
eventId: this.generateEventId(),
streamId: this._id,
eventType: 'ItemAdded',
data: { item },
metadata: this.createMetadata(),
version: this._version + 1,
timestamp: new Date()
});
}
confirmOrder(): void {
if (this.status !== OrderStatus.CREATED) {
throw new Error('Order must be in CREATED status to confirm');
}
if (this.items.length === 0) {
throw new Error('Cannot confirm empty order');
}
this.applyEvent({
eventId: this.generateEventId(),
streamId: this._id,
eventType: 'OrderConfirmed',
data: {},
metadata: this.createMetadata(),
version: this._version + 1,
timestamp: new Date()
});
}
// Event application methods
private applyOrderCreated(event: DomainEvent): void {
const data = event.data as OrderCreatedData;
this.customerId = data.customerId;
this.items = data.items;
this.totalAmount = data.totalAmount;
this.status = OrderStatus.CREATED;
}
private applyItemAdded(event: DomainEvent): void {
const data = event.data as ItemAddedData;
this.items.push(data.item);
this.totalAmount += data.item.price * data.item.quantity;
}
private applyOrderConfirmed(event: DomainEvent): void {
this.status = OrderStatus.CONFIRMED;
this.updatedAt = new Date();
}
private generateEventId(): string {
return `${this._id}-${this._version + 1}-${Date.now()}`;
}
private createMetadata(): EventMetadata {
return {
source: 'order-service',
correlationId: this.generateCorrelationId()
};
}
private generateCorrelationId(): string {
return `corr-${Date.now()}-${Math.random()}`;
}
}Projection patterns
Read model projections
typescript// Projection handler interface
interface ProjectionHandler<T> {
handle(event: DomainEvent): Promise<void>;
getProjection(id: string): Promise<T | null>;
rebuildProjection(): Promise<void>;
}
// Order summary projection
class OrderSummaryProjection implements ProjectionHandler<OrderSummary> {
private db: Database;
constructor(db: Database) {
this.db = db;
this.initializeSchema();
}
private async initializeSchema(): Promise<void> {
await this.db.query(`
CREATE TABLE IF NOT EXISTS order_summaries (
order_id VARCHAR(255) PRIMARY KEY,
customer_id VARCHAR(255) NOT NULL,
status VARCHAR(50) NOT NULL,
total_amount DECIMAL(10, 2) NOT NULL,
item_count INTEGER NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_order_summaries_customer_id ON order_summaries(customer_id);
CREATE INDEX IF NOT EXISTS idx_order_summaries_status ON order_summaries(status);
`);
}
async handle(event: DomainEvent): Promise<void> {
switch (event.eventType) {
case 'OrderCreated':
await this.handleOrderCreated(event);
break;
case 'ItemAdded':
await this.handleItemAdded(event);
break;
case 'OrderConfirmed':
await this.handleOrderConfirmed(event);
break;
case 'OrderCancelled':
await this.handleOrderCancelled(event);
break;
case 'PaymentReceived':
await this.handlePaymentReceived(event);
break;
case 'OrderShipped':
await this.handleOrderShipped(event);
break;
}
}
private async handleOrderCreated(event: DomainEvent): Promise<void> {
const data = event.data as OrderCreatedData;
await this.db.query(
`INSERT INTO order_summaries (
order_id, customer_id, status, total_amount, item_count, created_at, updated_at
) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
[
event.streamId,
data.customerId,
OrderStatus.CREATED,
data.totalAmount,
data.items.length,
event.timestamp,
event.timestamp
]
);
}
private async handleItemAdded(event: DomainEvent): Promise<void> {
const data = event.data as ItemAddedData;
await this.db.query(
`UPDATE order_summaries
SET total_amount = total_amount + $1,
item_count = item_count + 1,
updated_at = $2
WHERE order_id = $3`,
[
data.item.price * data.item.quantity,
event.timestamp,
event.streamId
]
);
}
private async handleOrderConfirmed(event: DomainEvent): Promise<void> {
await this.db.query(
`UPDATE order_summaries
SET status = $1, updated_at = $2
WHERE order_id = $3`,
[OrderStatus.CONFIRMED, event.timestamp, event.streamId]
);
}
async getProjection(orderId: string): Promise<OrderSummary | null> {
const result = await this.db.query(
'SELECT * FROM order_summaries WHERE order_id = $1',
[orderId]
);
if (result.rows.length === 0) {
return null;
}
return result.rows[0];
}
async rebuildProjection(): Promise<void> {
console.log('Rebuilding order summary projection...');
await this.db.query('TRUNCATE TABLE order_summaries');
const allEvents = await this.eventStore.readAllEvents();
for (const event of allEvents) {
await this.handle(event);
}
console.log('Order summary projection rebuilt');
}
}Event versioning and evolution
Schema evolution strategies
typescript// Event upcasters for versioning
class EventUpcaster {
private upcasters: Map<string, EventUpcasterFn> = new Map();
constructor() {
this.registerUpcasters();
}
private registerUpcasters(): void {
this.upcasters.set('OrderCreated:v1', this.upcastOrderCreatedV1ToV2);
this.upcasters.set('ItemAdded:v1', this.upcastItemAddedV1ToV2);
}
upcast(event: DomainEvent): DomainEvent {
const upcaster = this.upcasters.get(`${event.eventType}:v1`);
if (upcaster) {
return upcaster(event);
}
return event;
}
private upcastOrderCreatedV1ToV2(event: DomainEvent): DomainEvent {
// V1 event structure
const v1Data = event.data as OrderCreatedV1Data;
// V2 event structure - add currency field
const v2Data: OrderCreatedV2Data = {
...v1Data,
currency: 'USD', // Default currency
paymentMethod: 'CREDIT_CARD' // New field
};
return {
...event,
eventType: 'OrderCreated',
version: event.version,
data: v2Data
};
}
private upcastItemAddedV1ToV2(event: DomainEvent): DomainEvent {
const v1Data = event.data as ItemAddedV1Data;
// V2 adds discount field
const v2Data: ItemAddedV2Data = {
...v1Data,
discount: 0,
taxAmount: v1Data.item.price * v1Data.item.quantity * 0.1
};
return {
...event,
eventType: 'ItemAdded',
version: event.version,
data: v2Data
};
}
}
// Event store with versioning
class VersionedEventStore implements EventStore {
private baseEventStore: EventStore;
private upcaster: EventUpcaster;
constructor(baseEventStore: EventStore) {
this.baseEventStore = baseEventStore;
this.upcaster = new EventUpcaster();
}
async readEvents(
streamId: string,
fromVersion?: number,
toVersion?: number
): Promise<DomainEvent[]> {
const events = await this.baseEventStore.readEvents(
streamId,
fromVersion,
toVersion
);
// Apply upcasters to old event versions
return events.map(event => this.upcaster.upcast(event));
}
}Operational considerations
Event replay strategies
typescript// Event replay service
class EventReplayService {
private eventStore: EventStore;
private projectionHandlers: ProjectionHandler<any>[];
constructor(
eventStore: EventStore,
projectionHandlers: ProjectionHandler<any>[]
) {
this.eventStore = eventStore;
this.projectionHandlers = projectionHandlers;
}
async replayFromTimestamp(timestamp: Date): Promise<void> {
console.log(`Replaying events from ${timestamp.toISOString()}`);
const events = await this.eventStore.readAllEvents(timestamp);
for (const projection of this.projectionHandlers) {
console.log(`Replaying events for ${projection.constructor.name}`);
await this.replayForProjection(projection, events);
}
console.log('Event replay completed');
}
private async replayForProjection(
projection: ProjectionHandler<any>,
events: DomainEvent[]
): Promise<void> {
// Clear projection tables
await projection.rebuildProjection();
// Replay events
for (const event of events) {
try {
await projection.handle(event);
} catch (error) {
console.error(
`Failed to handle event ${event.eventId} for projection`,
error
);
}
}
}
async replaySingleStream(streamId: string): Promise<void> {
console.log(`Replaying events for stream ${streamId}`);
const events = await this.eventStore.readEvents(streamId);
for (const projection of this.projectionHandlers) {
for (const event of events) {
await projection.handle(event);
}
}
console.log('Stream replay completed');
}
}Monitoring and alerting
typescript// Event store monitoring
class EventStoreMonitor {
private eventStore: EventStore;
private metrics: MetricsClient;
async monitorEventProcessing(): Promise<void> {
// Event throughput
const hourlyThroughput = await this.calculateHourlyThroughput();
this.metrics.gauge('event_store_hourly_throughput', hourlyThroughput);
// Event lag
const eventLag = await this.calculateEventLag();
this.metrics.gauge('event_store_lag_seconds', eventLag);
// Projection lag
for (const projection of this.projections) {
const projectionLag = await this.calculateProjectionLag(projection);
this.metrics.gauge(`projection_${projection.name}_lag_seconds`, projectionLag);
}
// Alert on thresholds
if (eventLag > 300) { // 5 minutes
await this.alert('Event store lag exceeds 5 minutes', {
lag: eventLag
});
}
}
private async calculateHourlyThroughput(): Promise<number> {
const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000);
const events = await this.eventStore.readAllEvents(oneHourAgo);
return events.length;
}
private async calculateEventLag(): Promise<number> {
const latestEvent = await this.getLatestEvent();
if (!latestEvent) {
return 0;
}
return Math.floor((Date.now() - latestEvent.timestamp.getTime()) / 1000);
}
private async calculateProjectionLag(projection: ProjectionHandler): Promise<number> {
const latestEvent = await this.getLatestEvent();
const projectionState = await this.getProjectionState(projection);
if (!projectionState) {
return 0;
}
const projectedEvent = await this.eventStore.readEvents(
projection.streamId,
projectionState.lastProcessedVersion,
projectionState.lastProcessedVersion
);
if (!projectedEvent || projectedEvent.length === 0) {
return 0;
}
return Math.floor(
(Date.now() - projectedEvent[0].timestamp.getTime()) / 1000
);
}
}Decision framework
When to adopt Event Sourcing and CQRS
Adopt if:
- You need complete audit trails
- You require event replay capabilities
- Read and write workloads have different performance requirements
- You're building complex business domains with rich state transitions
- You need to integrate with external systems through events
Avoid if:
- Simple CRUD operations suffice
- Strong consistency is required
- Your team lacks capacity for architectural complexity
- You're in early stages of product development
- The problem doesn't justify the solution
Conclusion
Event Sourcing and CQRS are powerful patterns for complex systems, but they're architectural commitments with significant operational implications. The decision to adopt them should be based on concrete requirements: audit trails, event replay, or separation of read/write models.
Start with the simplest solution that meets your needs. Implement these patterns incrementally, starting with a single bounded context, and expand only when justified by business requirements and team capacity.
The investment in Event Sourcing and CQRS pays dividends when you actually need what they provide: complete history, temporal queries, and optimized read/write models.
Need help designing an Event Sourcing and CQRS architecture for your complex system? Talk to Imperialis about architecture patterns, event store design, and production implementation.
Sources
- Martin Fowler: Event Sourcing — Event Sourcing pattern
- Martin Fowler: CQRS — CQRS pattern
- EventStoreDB Documentation — Event store implementation
- Axon Framework Documentation — Event Sourcing framework
- Greg Young: CQRS Documents — CQRS patterns