Event Sourcing Architecture
Storing State as a Sequence of Events
Introduction
Event Sourcing is an architectural pattern where the state of an application is stored not as a current snapshot, but as a sequence of immutable events. Instead of storing "what" the current state is, you store "how" it got there. This approach provides complete audit trail, enables powerful analytics by replaying events, and simplifies complex domain logic by making changes explicit.
This comprehensive guide covers:
- Event Sourcing fundamentals — Understanding the pattern
- Implementation approaches — Event stores and projections
- Azure implementation — Cosmos DB, Event Hub integration
- CQRS integration — Read and write optimization
- Event schema design — Versioning and structure
- Best practices — Performance and consistency
Understanding Event Sourcing
Traditional vs Event Sourcing
┌─────────────────────────────────────────────────────────────────────┐
│ TRADITIONAL vs EVENT SOURCING │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ TRADITIONAL (State-Based) │
│ ─────────────────────────── │
│ │
│ Current State (Single Record): │
│ ┌─────────────────────────────────────┐ │
│ │ Order #12345 │ │
│ │ Status: Shipped │ │
│ │ Customer: John Doe │ │
│ │ Total: $99.99 │ │
│ │ Items: 2 │ │
│ └─────────────────────────────────────┘ │
│ │
│ ❌ No history of how we got here │
│ ❌ Can't recreate state from audit │
│ ❌ Hard to track changes │
│ │
│ EVENT SOURCING (Event-Based) │
│ ─────────────────────────── │
│ │
│ Event Store (Sequence): │
│ ┌─────────────────────────────────────┐ │
│ │ OrderCreated(#12345, John, $99.99) │ │
│ │ ItemAdded(Product: A, Qty: 1) │ │
│ │ ItemAdded(Product: B, Qty: 1) │ │
│ │ PaymentProcessed(Visa ****1234) │ │
│ │ ShipmentCreated(FedEx: TRK123) │ │
│ │ StatusChanged(Shipped) │ │
│ └─────────────────────────────────────┘ │
│ │
│ ✓ Complete audit trail │
│ ✓ Recreate state by replaying │
│ ✓ Temporal queries (state at any point) │
│ │
└─────────────────────────────────────────────────────────────────────┘
Event Sourcing Flow
┌─────────────────────────────────────────────────────────────────────┐
│ EVENT SOURCING FLOW │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ EVENT STORE │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Event 1 │ │ Event 2 │ │ Event 3 │ │ Event N │ │ │
│ │ │Created │ │Added │ │Paid │ │ Updated │ │ │
│ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ │
│ │ │ │ │ │ │ │
│ │ └─────────-─┴─-─────────┴───-───────┘ │ │
│ │ │ │ │
│ │ Append-only log │ │
│ └──────────────────────┼───────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────┼───────────────────────────────────────┐ │
│ │ PROJECTIONS │ │
│ │ │ │
│ │ ┌─────────────┐ ┌────────────-─┐ ┌─────────────┐ │ │
│ │ │ Current │ │ Dashboard │ │ Analytics │ │ │
│ │ │ State │ │ Read Model │ │ Model │ │ │
│ │ │ (Order #1) │ │ (Orders List)│ │ (Stats) │ │ │
│ │ └─────────────┘ └─────────────-┘ └─────────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
Azure Event Sourcing Implementation
Cosmos DB as Event Store
public class EventStore
{
private readonly Container _eventContainer;
public async Task AppendEventAsync(string aggregateId, IEvent @event)
{
var eventDocument = new EventDocument
{
Id = Guid.NewGuid().ToString(),
AggregateId = aggregateId,
AggregateType = @event.GetType().Name,
EventType = @event.GetType().Name,
EventData = JsonSerializer.Serialize(@event),
Timestamp = DateTime.UtcNow,
Version = await GetNextVersionAsync(aggregateId)
};
try
{
await _eventContainer.CreateItemAsync(eventDocument);
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.Conflict)
{
throw new ConcurrencyException($"Version conflict for aggregate {aggregateId}");
}
}
public async Task<IReadOnlyList<IEvent>> GetEventsAsync(string aggregateId)
{
var query = new QueryDefinition(
"SELECT * FROM c WHERE c.aggregateId = @aggregateId ORDER BY c.timestamp",
new Dictionary<string, object> { { "@aggregateId", aggregateId } });
var events = new List<IEvent>();
await foreach (var item in _eventContainer.GetItemQueryIterator<EventDocument>(query))
{
var eventType = Type.GetType($"MyApp.Events.{item.EventType}");
var @event = JsonSerializer.Deserialize(item.EventData, eventType);
events.Add(@event!);
}
return events;
}
public async Task<T> GetAggregateAsync<T>(string aggregateId) where T : AggregateRoot, new()
{
var events = await GetEventsAsync(aggregateId);
var aggregate = new T();
foreach (var @event in events)
{
aggregate.Apply(@event);
}
return aggregate;
}
}
Aggregate Root Pattern
public abstract class AggregateRoot
{
private readonly List<IEvent> _uncommittedEvents = new();
public string Id { get; protected set; }
public int Version { get; protected set; }
protected abstract void Apply(IEvent @event);
public IReadOnlyList<IEvent> GetUncommittedEvents() => _uncommittedEvents.AsReadOnly();
public void ClearUncommittedEvents() => _uncommittedEvents.Clear();
protected void RaiseEvent(IEvent @event)
{
Apply(@event);
_uncommittedEvents.Add(@event);
}
}
public class OrderAggregate : AggregateRoot
{
private OrderState _state = new();
public void CreateOrder(string customerId, List<OrderItem> items)
{
if (_state.Status != OrderStatus.None)
throw new InvalidOperationException("Order already created");
RaiseEvent(new OrderCreatedEvent
{
OrderId = Guid.NewGuid().ToString(),
CustomerId = customerId,
Items = items,
Total = items.Sum(i => i.Price * i.Quantity),
CreatedAt = DateTime.UtcNow
});
}
public void AddItem(Product product, int quantity)
{
if (_state.Status != OrderStatus.Pending)
throw new InvalidOperationException("Cannot add items to this order");
RaiseEvent(new ItemAddedEvent
{
OrderId = Id,
ProductId = product.Id,
ProductName = product.Name,
Quantity = quantity,
Price = product.Price
});
}
public void MarkAsPaid(string paymentId, string paymentMethod)
{
if (_state.Status != OrderStatus.Pending)
throw new InvalidOperationException("Order cannot be paid");
RaiseEvent(new PaymentProcessedEvent
{
OrderId = Id,
PaymentId = paymentId,
PaymentMethod = paymentMethod,
Amount = _state.Total,
PaidAt = DateTime.UtcNow
});
}
protected override void Apply(IEvent @event)
{
switch (@event)
{
case OrderCreatedEvent e:
Id = e.OrderId;
_state = new OrderState
{
CustomerId = e.CustomerId,
Items = e.Items,
Total = e.Total,
Status = OrderStatus.Pending,
CreatedAt = e.CreatedAt
};
Version++;
break;
case ItemAddedEvent e:
_state.Items.Add(new OrderItem
{
ProductId = e.ProductId,
ProductName = e.ProductName,
Quantity = e.Quantity,
Price = e.Price
});
_state.Total += e.Price * e.Quantity;
Version++;
break;
case PaymentProcessedEvent e:
_state.Status = OrderStatus.Paid;
_state.PaymentId = e.PaymentId;
_state.PaidAt = e.PaidAt;
Version++;
break;
}
}
}
CQRS with Event Sourcing
Read Model Projections
public class OrderReadModelProjection
{
private readonly Container _readModelContainer;
public async Task Project(OrderCreatedEvent @event)
{
var orderReadModel = new OrderReadModel
{
Id = @event.OrderId,
CustomerId = @event.CustomerId,
Items = @event.Items,
Total = @event.Total,
Status = "Pending",
CreatedAt = @event.CreatedAt
};
await _readModelContainer.CreateItemAsync(orderReadModel);
}
public async Task Project(ItemAddedEvent @event)
{
var order = await _readModelContainer.ReadItemAsync<OrderReadModel>(
@event.OrderId, new PartitionKey(@event.OrderId));
order.Items.Add(new OrderItemReadModel
{
ProductId = @event.ProductId,
ProductName = @event.ProductName,
Quantity = @event.Quantity,
Price = @event.Price
});
order.Total += @event.Price * @event.Quantity;
await _readModelContainer.ReplaceItemAsync(order, @event.OrderId);
}
}
Azure Functions Projection
public class OrderProjectionFunction
{
[FunctionName("ProjectOrderEvent")]
public async Task Run(
[CosmosDBTrigger(
databaseName: "EventStore",
collectionName: "Events",
ConnectionStringSetting = "CosmosConnection",
StartFromBeginning = true)]
IReadOnlyList<OrderEventDocument> events,
ILogger log)
{
var projection = new OrderReadModelProjection();
foreach (var eventDoc in events)
{
var eventType = Type.GetType($"MyApp.Events.{eventDoc.EventType}");
var @event = JsonSerializer.Deserialize(eventDoc.EventData, eventType);
switch (@event)
{
case OrderCreatedEvent e:
await projection.Project(e);
break;
case ItemAddedEvent e:
await projection.Project(e);
break;
case PaymentProcessedEvent e:
await projection.Project(e);
break;
}
log.LogInformation("Projected event {EventType} for order {OrderId}",
eventDoc.EventType, eventDoc.AggregateId);
}
}
}
Event Schema Design
Event Structure
public interface IEvent
{
string EventId { get; }
string AggregateId { get; }
int Version { get; }
DateTime Timestamp { get; }
}
public abstract class EventBase : IEvent
{
public string EventId { get; set; } = Guid.NewGuid().ToString();
public abstract string AggregateId { get; }
public int Version { get; set; }
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
}
public class OrderCreatedEvent : EventBase
{
public override string AggregateId { get; set; }
public string CustomerId { get; set; }
public List<OrderItem> Items { get; set; }
public decimal Total { get; set; }
public DateTime CreatedAt { get; set; }
// Metadata for debugging
public string CorrelationId { get; set; }
public string CausationId { get; set; }
public Dictionary<string, string> CustomMetadata { get; set; }
}
Versioning Strategy
public class EventUpgrader
{
public IEvent Upgrade(IEvent @event, int fromVersion)
{
return fromVersion switch
{
1 => UpgradeV1ToV2(@event),
2 => @event, // Already at latest
_ => @event
};
}
private IEvent UpgradeV1ToV2(IEvent @event)
{
// Add new fields introduced in V2
if (@event is OrderCreatedEventV1 v1)
{
return new OrderCreatedEventV2
{
EventId = v1.EventId,
AggregateId = v1.AggregateId,
Version = 2,
Timestamp = v1.Timestamp,
CustomerId = v1.CustomerId,
Items = v1.Items,
Total = v1.Total,
CreatedAt = v1.CreatedAt,
// New V2 fields
Currency = "USD",
Source = "Web"
};
}
return @event;
}
}
Best Practices
Implementation Checklist
| Practice | Description |
|---|---|
| Immutable events | Never modify stored events |
| Unique IDs | Each event needs unique identifier |
| Version events | Include version for compatibility |
| Idempotent projections | Handle duplicate events gracefully |
| Async projections | Don't block write path |
| Snapshot for performance | Periodically snapshot aggregate state |
Snapshot Strategy
public class SnapshotProjection
{
public async Task CreateSnapshot(OrderAggregate aggregate)
{
var snapshot = new OrderSnapshot
{
AggregateId = aggregate.Id,
Version = aggregate.Version,
State = aggregate.GetState(), // Serialize current state
SnapshotTime = DateTime.UtcNow
};
await _snapshotContainer.UpsertItemAsync(snapshot);
}
public async Task<OrderAggregate> RestoreFromSnapshot(string aggregateId)
{
var snapshot = await _snapshotContainer.GetItemQueryIterator<SnapshotDocument>(
$"SELECT TOP 1 * FROM c WHERE c.aggregateId = '{aggregateId}' ORDER BY c.version DESC")
.FirstOrDefaultAsync();
if (snapshot == null)
return null; // No snapshot, need full replay
var aggregate = new OrderAggregate();
aggregate.RestoreFromSnapshot(snapshot.State, snapshot.Version);
return aggregate;
}
}
Related Topics
- Saga Pattern — Distributed transactions
- Outbox Pattern — Reliable event publishing
- Distributed Tracing — Event correlation
Azure Integration Hub - Architect Level Enterprise Integration Patterns