Event Sourcing e CQRS em 2026: Padrões de Implementação em Produção
Event Sourcing e CQRS resolvem problemas reais em sistemas complexos, mas introduzem complexidade que deve ser justificada pelo problema que você está resolvendo.
Resumo executivo
Event Sourcing e CQRS resolvem problemas reais em sistemas complexos, mas introduzem complexidade que deve ser justificada pelo problema que você está resolvendo.
Ultima atualizacao: 10/03/2026
Resumo executivo
Event Sourcing e Command Query Responsibility Segregation (CQRS) são padrões arquiteturais que resolvem problemas específicos em sistemas complexos: auditabilidade, consultas temporais e throughput alto de leitura/escrita. Eles não são balas de prata—introduzem complexidade operacional significativa que deve ser justificada por requisitos reais.
Em 2026, esses padrões amadureceram de conceitos acadêmicos para implementações prontas para produção com ferramentas estabelecidas, padrões e práticas operacionais. A decisão de adotá-los deve ser baseada em problemas concretos: necessidade de replay de eventos, trilhas de auditoria, ou separação de modelos de leitura/escrita.
Quando Event Sourcing e CQRS são justificados
Sintomas de problema que indicam esses padrões podem ajudar
| Sintoma | Abordagem Tradicional | Solução Event Sourcing/CQRS |
|---|---|---|
| Trilha de auditoria ausente | Logging manual, histórico incompleto | Log de eventos imutável fornece auditoria completa |
| Incapacidade de replay | Estado atual apenas | Stream de eventos habilita replay temporal |
| Performance leitura/escrita | Mesmo modelo para ambos | Modelos separados otimizam cada workload |
| Consultas complexas | Joins através de múltiplas tabelas | Projeções pré-computam modelos de leitura |
| Consistência eventual | Transações síncronas | Processamento de eventos assíncrono aceitável |
Quando NÃO usar esses padrões
- Aplicações CRUD simples com consultas diretas
- Sistemas onde consistência forte é obrigatória
- Equipes sem capacidade para complexidade arquitetural
- Workloads de baixo volume onde otimização não é necessária
- Ambientes regulatórios exigindo consistência imediata
Design de Event Store
Contrato de event store principal
typescript// Interface de event store
interface EventStore {
appendEvents(
streamId: string,
expectedVersion: number,
events: DomainEvent[]
): Promise<void>;
readEvents(
streamId: string,
fromVersion?: number,
toVersion?: number
): Promise<DomainEvent[]>;
readAllEvents(
fromTimestamp?: Date,
toTimestamp?: Date
): Promise<DomainEvent[]>;
subscribeToStream(
streamId: string,
handler: EventHandler
): Promise<void>;
subscribeToAll(
handler: EventHandler
): Promise<void>;
}
// Contrato de evento de domínio
interface DomainEvent {
eventId: string;
streamId: string;
eventType: string;
data: any;
metadata: EventMetadata;
version: number;
timestamp: Date;
}
interface EventMetadata {
causationId?: string;
correlationId?: string;
userId?: string;
source: string;
}Implementação de event store
typescript// Event store baseado em PostgreSQL
import { Pool } from 'pg';
class PostgreSQLEventStore implements EventStore {
constructor(private pool: Pool) {
this.initializeSchema();
}
private async initializeSchema(): Promise<void> {
await this.pool.query(`
CREATE TABLE IF NOT EXISTS events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
stream_id VARCHAR(255) NOT NULL,
version INTEGER NOT NULL,
event_type VARCHAR(255) NOT NULL,
data JSONB NOT NULL,
metadata JSONB,
causation_id UUID,
correlation_id UUID,
user_id UUID,
source VARCHAR(255),
timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
UNIQUE(stream_id, version)
);
CREATE INDEX IF NOT EXISTS idx_events_stream_id ON events(stream_id, version);
CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp);
CREATE INDEX IF NOT EXISTS idx_events_event_type ON events(event_type);
CREATE INDEX IF NOT EXISTS idx_events_correlation_id ON events(correlation_id);
CREATE INDEX IF NOT EXISTS idx_events_causation_id ON events(causation_id);
CREATE TABLE IF NOT EXISTS projections (
id VARCHAR(255) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
last_processed_event_id UUID NOT NULL,
last_processed_timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
UNIQUE(name)
);
`);
}
async appendEvents(
streamId: string,
expectedVersion: number,
events: DomainEvent[]
): Promise<void> {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// Verificar versão esperada
const versionResult = await client.query(
'SELECT MAX(version) as version FROM events WHERE stream_id = $1',
[streamId]
);
const currentVersion = versionResult.rows[0]?.version || 0;
if (expectedVersion !== null && currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`Versão esperada ${expectedVersion} mas encontrada ${currentVersion} para stream ${streamId}`
);
}
// Inserir eventos
for (let i = 0; i < events.length; i++) {
const event = events[i];
const version = currentVersion + i + 1;
await client.query(
`INSERT INTO events (
stream_id, version, event_type, data, metadata,
causation_id, correlation_id, user_id, source, timestamp
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`,
[
streamId,
version,
event.eventType,
JSON.stringify(event.data),
JSON.stringify(event.metadata),
event.metadata.causationId,
event.metadata.correlationId,
event.metadata.userId,
event.metadata.source,
event.timestamp
]
);
}
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
async readEvents(
streamId: string,
fromVersion?: number,
toVersion?: number
): Promise<DomainEvent[]> {
let query = 'SELECT * FROM events WHERE stream_id = $1';
const params: any[] = [streamId];
let paramIndex = 2;
if (fromVersion !== undefined) {
query += ` AND version >= $${paramIndex}`;
params.push(fromVersion);
paramIndex++;
}
if (toVersion !== undefined) {
query += ` AND version <= $${paramIndex}`;
params.push(toVersion);
paramIndex++;
}
query += ' ORDER BY version ASC';
const result = await this.pool.query(query, params);
return result.rows.map(row => this.mapRowToEvent(row));
}
async readAllEvents(
fromTimestamp?: Date,
toTimestamp?: Date
): Promise<DomainEvent[]> {
let query = 'SELECT * FROM events';
const params: any[] = [];
let paramIndex = 1;
if (fromTimestamp !== undefined) {
query += ` WHERE timestamp >= $${paramIndex}`;
params.push(fromTimestamp);
paramIndex++;
}
if (toTimestamp !== undefined) {
query += (fromTimestamp !== undefined ? ' AND' : ' WHERE');
query += ` timestamp <= $${paramIndex}`;
params.push(toTimestamp);
paramIndex++;
}
query += ' ORDER BY timestamp ASC';
const result = await this.pool.query(query, params);
return result.rows.map(row => this.mapRowToEvent(row));
}
private mapRowToEvent(row: any): DomainEvent {
return {
eventId: row.id,
streamId: row.stream_id,
eventType: row.event_type,
data: row.data,
metadata: {
causationId: row.causation_id,
correlationId: row.correlation_id,
userId: row.user_id,
source: row.source
},
version: row.version,
timestamp: row.timestamp
};
}
}Design de agregado
Padrão de aggregate root
typescript// Classe base de agregado
abstract class AggregateRoot {
protected _id: string;
private _version: number;
private _events: DomainEvent[] = [];
constructor(id: string, version: number = 0) {
this._id = id;
this._version = version;
}
get id(): string {
return this._id;
}
get version(): number {
return this._version;
}
protected applyEvent(event: DomainEvent): void {
this._events.push(event);
this._version++;
this.apply(event);
}
abstract apply(event: DomainEvent): void;
getUncommittedEvents(): DomainEvent[] {
return [...this._events];
}
markEventsAsCommitted(): void {
this._events = [];
}
static loadFromHistory<T extends AggregateRoot>(
AggregateClass: new (id: string, version: number) => T,
events: DomainEvent[]
): T {
const aggregate = new AggregateClass(events[0].streamId, events[0].version - 1);
for (const event of events) {
aggregate.apply(event);
aggregate._version = event.version;
}
return aggregate;
}
}
// Exemplo de agregado de pedido
class Order extends AggregateRoot {
private status: OrderStatus;
private items: OrderItem[];
private customerId: string;
private totalAmount: number;
private createdAt: Date;
private updatedAt: Date;
constructor(id: string, version: number = 0) {
super(id, version);
this.status = OrderStatus.CREATED;
this.items = [];
this.totalAmount = 0;
this.createdAt = new Date();
this.updatedAt = new Date();
}
apply(event: DomainEvent): void {
switch (event.eventType) {
case 'OrderCreated':
this.applyOrderCreated(event);
break;
case 'ItemAdded':
this.applyItemAdded(event);
break;
case 'OrderConfirmed':
this.applyOrderConfirmed(event);
break;
case 'OrderCancelled':
this.applyOrderCancelled(event);
break;
case 'PaymentReceived':
this.applyPaymentReceived(event);
break;
case 'OrderShipped':
this.applyOrderShipped(event);
break;
}
}
// Métodos de comando
createOrder(customerId: string, items: OrderItem[]): void {
if (this.status !== OrderStatus.CREATED) {
throw new Error('Pedido já existe');
}
const totalAmount = items.reduce((sum, item) => sum + item.price * item.quantity, 0);
this.applyEvent({
eventId: this.generateEventId(),
streamId: this._id,
eventType: 'OrderCreated',
data: {
customerId,
items,
totalAmount
},
metadata: this.createMetadata(),
version: this._version + 1,
timestamp: new Date()
});
}
addItem(item: OrderItem): void {
if (this.status !== OrderStatus.CREATED && this.status !== OrderStatus.CONFIRMED) {
throw new Error('Não pode adicionar item ao pedido neste status');
}
this.applyEvent({
eventId: this.generateEventId(),
streamId: this._id,
eventType: 'ItemAdded',
data: { item },
metadata: this.createMetadata(),
version: this._version + 1,
timestamp: new Date()
});
}
confirmOrder(): void {
if (this.status !== OrderStatus.CREATED) {
throw new Error('Pedido deve estar em status CREATED para confirmar');
}
if (this.items.length === 0) {
throw new Error('Não pode confirmar pedido vazio');
}
this.applyEvent({
eventId: this.generateEventId(),
streamId: this._id,
eventType: 'OrderConfirmed',
data: {},
metadata: this.createMetadata(),
version: this._version + 1,
timestamp: new Date()
});
}
// Métodos de aplicação de evento
private applyOrderCreated(event: DomainEvent): void {
const data = event.data as OrderCreatedData;
this.customerId = data.customerId;
this.items = data.items;
this.totalAmount = data.totalAmount;
this.status = OrderStatus.CREATED;
}
private applyItemAdded(event: DomainEvent): void {
const data = event.data as ItemAddedData;
this.items.push(data.item);
this.totalAmount += data.item.price * data.item.quantity;
}
private applyOrderConfirmed(event: DomainEvent): void {
this.status = OrderStatus.CONFIRMED;
this.updatedAt = new Date();
}
private generateEventId(): string {
return `${this._id}-${this._version + 1}-${Date.now()}`;
}
private createMetadata(): EventMetadata {
return {
source: 'order-service',
correlationId: this.generateCorrelationId()
};
}
private generateCorrelationId(): string {
return `corr-${Date.now()}-${Math.random()}`;
}
}Padrões de projeção
Projeções de modelo de leitura
typescript// Interface de handler de projeção
interface ProjectionHandler<T> {
handle(event: DomainEvent): Promise<void>;
getProjection(id: string): Promise<T | null>;
rebuildProjection(): Promise<void>;
}
// Projeção de resumo de pedido
class OrderSummaryProjection implements ProjectionHandler<OrderSummary> {
private db: Database;
constructor(db: Database) {
this.db = db;
this.initializeSchema();
}
private async initializeSchema(): Promise<void> {
await this.db.query(`
CREATE TABLE IF NOT EXISTS order_summaries (
order_id VARCHAR(255) PRIMARY KEY,
customer_id VARCHAR(255) NOT NULL,
status VARCHAR(50) NOT NULL,
total_amount DECIMAL(10,2) NOT NULL,
item_count INTEGER NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_order_summaries_customer_id ON order_summaries(customer_id);
CREATE INDEX IF NOT EXISTS idx_order_summaries_status ON order_summaries(status);
`);
}
async handle(event: DomainEvent): Promise<void> {
switch (event.eventType) {
case 'OrderCreated':
await this.handleOrderCreated(event);
break;
case 'ItemAdded':
await this.handleItemAdded(event);
break;
case 'OrderConfirmed':
await this.handleOrderConfirmed(event);
break;
case 'OrderCancelled':
await this.handleOrderCancelled(event);
break;
case 'PaymentReceived':
await this.handlePaymentReceived(event);
break;
case 'OrderShipped':
await this.handleOrderShipped(event);
break;
}
}
private async handleOrderCreated(event: DomainEvent): Promise<void> {
const data = event.data as OrderCreatedData;
await this.db.query(
`INSERT INTO order_summaries (
order_id, customer_id, status, total_amount, item_count, created_at, updated_at
) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
[
event.streamId,
data.customerId,
OrderStatus.CREATED,
data.totalAmount,
data.items.length,
event.timestamp,
event.timestamp
]
);
}
private async handleItemAdded(event: DomainEvent): Promise<void> {
const data = event.data as ItemAddedData;
await this.db.query(
`UPDATE order_summaries
SET total_amount = total_amount + $1,
item_count = item_count + 1,
updated_at = $2
WHERE order_id = $3`,
[
data.item.price * data.item.quantity,
event.timestamp,
event.streamId
]
);
}
private async handleOrderConfirmed(event: DomainEvent): Promise<void> {
await this.db.query(
`UPDATE order_summaries
SET status = $1, updated_at = $2
WHERE order_id = $3`,
[OrderStatus.CONFIRMED, event.timestamp, event.streamId]
);
}
async getProjection(orderId: string): Promise<OrderSummary | null> {
const result = await this.db.query(
'SELECT * FROM order_summaries WHERE order_id = $1',
[orderId]
);
if (result.rows.length === 0) {
return null;
}
return result.rows[0];
}
async rebuildProjection(): Promise<void> {
console.log('Reconstruindo projeção de resumo de pedido...');
await this.db.query('TRUNCATE TABLE order_summaries');
const allEvents = await this.eventStore.readAllEvents();
for (const event of allEvents) {
await this.handle(event);
}
console.log('Projeção de resumo de pedido reconstruída');
}
}Versionamento e evolução de eventos
Estratégias de evolução de schema
typescript// Upcasters de evento para versionamento
class EventUpcaster {
private upcasters: Map<string, EventUpcasterFn> = new Map();
constructor() {
this.registerUpcasters();
}
private registerUpcasters(): void {
this.upcasters.set('OrderCreated:v1', this.upcastOrderCreatedV1ToV2);
this.upcasters.set('ItemAdded:v1', this.upcastItemAddedV1ToV2);
}
upcast(event: DomainEvent): DomainEvent {
const upcaster = this.upcasters.get(`${event.eventType}:v1`);
if (upcaster) {
return upcaster(event);
}
return event;
}
private upcastOrderCreatedV1ToV2(event: DomainEvent): DomainEvent {
// Estrutura de evento V1
const v1Data = event.data as OrderCreatedV1Data;
// Estrutura de evento V2 - adicionar campo de moeda
const v2Data: OrderCreatedV2Data = {
...v1Data,
currency: 'USD', // Moeda padrão
paymentMethod: 'CREDIT_CARD' // Novo campo
};
return {
...event,
eventType: 'OrderCreated',
version: event.version,
data: v2Data
};
}
private upcastItemAddedV1ToV2(event: DomainEvent): DomainEvent {
const v1Data = event.data as ItemAddedV1Data;
// V2 adiciona campo de desconto
const v2Data: ItemAddedV2Data = {
...v1Data,
discount: 0,
taxAmount: v1Data.item.price * v1Data.item.quantity * 0.1
};
return {
...event,
eventType: 'ItemAdded',
version: event.version,
data: v2Data
};
}
}
// Event store com versionamento
class VersionedEventStore implements EventStore {
private baseEventStore: EventStore;
private upcaster: EventUpcaster;
constructor(baseEventStore: EventStore) {
this.baseEventStore = baseEventStore;
this.upcaster = new EventUpcaster();
}
async readEvents(
streamId: string,
fromVersion?: number,
toVersion?: number
): Promise<DomainEvent[]> {
const events = await this.baseEventStore.readEvents(
streamId,
fromVersion,
toVersion
);
// Aplicar upcasters a versões antigas de eventos
return events.map(event => this.upcaster.upcast(event));
}
}Considerações operacionais
Estratégias de replay de eventos
typescript// Serviço de replay de eventos
class EventReplayService {
private eventStore: EventStore;
private projectionHandlers: ProjectionHandler<any>[];
constructor(
eventStore: EventStore,
projectionHandlers: ProjectionHandler<any>[]
) {
this.eventStore = eventStore;
this.projectionHandlers = projectionHandlers;
}
async replayFromTimestamp(timestamp: Date): Promise<void> {
console.log(`Reproduzindo eventos de ${timestamp.toISOString()}`);
const events = await this.eventStore.readAllEvents(timestamp);
for (const projection of this.projectionHandlers) {
console.log(`Reproduzindo eventos para ${projection.constructor.name}`);
await this.replayForProjection(projection, events);
}
console.log('Replay de eventos concluído');
}
private async replayForProjection(
projection: ProjectionHandler<any>,
events: DomainEvent[]
): Promise<void> {
// Limpar tabelas de projeção
await projection.rebuildProjection();
// Reproduzir eventos
for (const event of events) {
try {
await projection.handle(event);
} catch (error) {
console.error(
`Falha ao manipular evento ${event.eventId} para projeção`,
error
);
}
}
}
async replaySingleStream(streamId: string): Promise<void> {
console.log(`Reproduzindo eventos para stream ${streamId}`);
const events = await this.eventStore.readEvents(streamId);
for (const projection of this.projectionHandlers) {
for (const event of events) {
await projection.handle(event);
}
}
console.log('Replay de stream concluído');
}
}Monitoramento e alertas
typescript// Monitoramento de event store
class EventStoreMonitor {
private eventStore: EventStore;
private metrics: MetricsClient;
async monitorEventProcessing(): Promise<void> {
// Throughput de eventos
const hourlyThroughput = await this.calculateHourlyThroughput();
this.metrics.gauge('event_store_hourly_throughput', hourlyThroughput);
// Lag de eventos
const eventLag = await this.calculateEventLag();
this.metrics.gauge('event_store_lag_seconds', eventLag);
// Lag de projeção
for (const projection of this.projections) {
const projectionLag = await this.calculateProjectionLag(projection);
this.metrics.gauge(`projection_${projection.name}_lag_seconds`, projectionLag);
}
// Alertar em limites
if (eventLag > 300) { // 5 minutos
await this.alert('Lag de event store excede 5 minutos', {
lag: eventLag
});
}
}
private async calculateHourlyThroughput(): Promise<number> {
const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000);
const events = await this.eventStore.readAllEvents(oneHourAgo);
return events.length;
}
private async calculateEventLag(): Promise<number> {
const latestEvent = await this.getLatestEvent();
if (!latestEvent) {
return 0;
}
return Math.floor((Date.now() - latestEvent.timestamp.getTime()) / 1000);
}
private async calculateProjectionLag(projection: ProjectionHandler): Promise<number> {
const latestEvent = await this.getLatestEvent();
const projectionState = await this.getProjectionState(projection);
if (!projectionState) {
return 0;
}
const projectedEvent = await this.eventStore.readEvents(
projection.streamId,
projectionState.lastProcessedVersion,
projectionState.lastProcessedVersion
);
if (!projectedEvent || projectedEvent.length === 0) {
return 0;
}
return Math.floor(
(Date.now() - projectedEvent[0].timestamp.getTime()) / 1000
);
}
}Framework de decisão
Quando adotar Event Sourcing e CQRS
Adote se:
- Você precisa de trilhas de auditoria completas
- Você requer capacidades de replay de eventos
- Workloads de leitura e escrita têm requisitos de desempenho diferentes
- Você está construindo domínios de negócio complexos com transições de estado ricas
- Você precisa de integrar com sistemas externos através de eventos
Evite se:
- Operações CRUD simples são suficientes
- Consistência forte é necessária
- Sua equipe falta capacidade para complexidade arquitetural
- Você está em estágios iniciais de desenvolvimento de produto
- O problema não justifica a solução
Conclusão
Event Sourcing e CQRS são padrões poderosos para sistemas complexos, mas são compromissos arquiteturais com implicações operacionais significativas. A decisão de adotá-los deve ser baseada em requisitos concretos: trilhas de auditoria, replay de eventos, ou separação de modelos de leitura/escrita.
Comece com a solução mais simples que atende suas necessidades. Implemente esses padrões incrementalmente, começando com um único contexto limitado, e expanda apenas quando justificado por requisitos de negócio e capacidade da equipe.
O investimento em Event Sourcing e CQRS paga dividendos quando você realmente precisa do que eles fornecem: histórico completo, consultas temporais e modelos de leitura/escrita otimizados.
Precisa de ajuda projetando uma arquitetura Event Sourcing e CQRS para seu sistema complexo? Fale com a Imperialis sobre padrões de arquitetura, design de event store e implementação em produção.
Fontes
- Martin Fowler: Event Sourcing — padrão Event Sourcing
- Martin Fowler: CQRS — padrão CQRS
- EventStoreDB Documentation — implementação de event store
- Axon Framework Documentation — framework Event Sourcing
- Greg Young: CQRS Documents — padrões CQRS