Event Sourcing Architecture

Storing State as a Sequence of Events


Introduction

Event Sourcing is an architectural pattern where the state of an application is stored not as a current snapshot, but as a sequence of immutable events. Instead of storing "what" the current state is, you store "how" it got there. This approach provides complete audit trail, enables powerful analytics by replaying events, and simplifies complex domain logic by making changes explicit.

This comprehensive guide covers:

  • Event Sourcing fundamentals — Understanding the pattern
  • Implementation approaches — Event stores and projections
  • Azure implementation — Cosmos DB, Event Hub integration
  • CQRS integration — Read and write optimization
  • Event schema design — Versioning and structure
  • Best practices — Performance and consistency

Understanding Event Sourcing

Traditional vs Event Sourcing

┌─────────────────────────────────────────────────────────────────────┐
│                 TRADITIONAL vs EVENT SOURCING                       │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   TRADITIONAL (State-Based)                                         │
│   ───────────────────────────                                       │
│                                                                     │
│   Current State (Single Record):                                    │
│   ┌─────────────────────────────────────┐                           │
│   │ Order #12345                        │                           │
│   │ Status: Shipped                     │                           │
│   │ Customer: John Doe                  │                           │
│   │ Total: $99.99                       │                           │
│   │ Items: 2                            │                           │
│   └─────────────────────────────────────┘                           │
│                                                                     │
│   ❌ No history of how we got here                                  │
│   ❌ Can't recreate state from audit                                │
│   ❌ Hard to track changes                                          │
│                                                                     │
│   EVENT SOURCING (Event-Based)                                      │
│   ───────────────────────────                                       │
│                                                                     │
│   Event Store (Sequence):                                           │
│   ┌─────────────────────────────────────┐                           │
│   │ OrderCreated(#12345, John, $99.99)  │                           │
│   │ ItemAdded(Product: A, Qty: 1)       │                           │
│   │ ItemAdded(Product: B, Qty: 1)       │                           │
│   │ PaymentProcessed(Visa ****1234)     │                           │
│   │ ShipmentCreated(FedEx: TRK123)      │                           │
│   │ StatusChanged(Shipped)              │                           │
│   └─────────────────────────────────────┘                           │
│                                                                     │
│   ✓ Complete audit trail                                            │
│   ✓ Recreate state by replaying                                     │
│   ✓ Temporal queries (state at any point)                           │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Event Sourcing Flow

┌─────────────────────────────────────────────────────────────────────┐
│                    EVENT SOURCING FLOW                              │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   ┌──────────────────────────────────────────────────────────────┐  │
│   │                      EVENT STORE                             │  │
│   │   ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐            │  │
│   │   │ Event 1 │ │ Event 2 │ │ Event 3 │ │ Event N │            │  │
│   │   │Created  │ │Added    │ │Paid     │ │ Updated │            │  │
│   │   └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘            │  │
│   │        │           │           │           │                 │  │
│   │        └─────────-─┴─-─────────┴───-───────┘                 │  │
│   │                      │                                       │  │
│   │           Append-only log                                    │  │
│   └──────────────────────┼───────────────────────────────────────┘  │
│                          │                                          │
│   ┌──────────────────────┼───────────────────────────────────────┐  │
│   │              PROJECTIONS                                     │  │
│   │                                                              │  │
│   │   ┌─────────────┐  ┌────────────-─┐  ┌─────────────┐         │  │
│   │   │ Current     │  │ Dashboard    │  │ Analytics   │         │  │
│   │   │ State       │  │ Read Model   │  │ Model       │         │  │
│   │   │ (Order #1)  │  │ (Orders List)│  │ (Stats)     │         │  │
│   │   └─────────────┘  └─────────────-┘  └─────────────┘         │  │
│   │                                                              │  │
│   └──────────────────────────────────────────────────────────────┘  │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Azure Event Sourcing Implementation

Cosmos DB as Event Store

public class EventStore
{
    private readonly Container _eventContainer;

    public async Task AppendEventAsync(string aggregateId, IEvent @event)
    {
        var eventDocument = new EventDocument
        {
            Id = Guid.NewGuid().ToString(),
            AggregateId = aggregateId,
            AggregateType = @event.GetType().Name,
            EventType = @event.GetType().Name,
            EventData = JsonSerializer.Serialize(@event),
            Timestamp = DateTime.UtcNow,
            Version = await GetNextVersionAsync(aggregateId)
        };

        try
        {
            await _eventContainer.CreateItemAsync(eventDocument);
        }
        catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.Conflict)
        {
            throw new ConcurrencyException($"Version conflict for aggregate {aggregateId}");
        }
    }

    public async Task<IReadOnlyList<IEvent>> GetEventsAsync(string aggregateId)
    {
        var query = new QueryDefinition(
            "SELECT * FROM c WHERE c.aggregateId = @aggregateId ORDER BY c.timestamp",
            new Dictionary<string, object> { { "@aggregateId", aggregateId } });

        var events = new List<IEvent>();
        
        await foreach (var item in _eventContainer.GetItemQueryIterator<EventDocument>(query))
        {
            var eventType = Type.GetType($"MyApp.Events.{item.EventType}");
            var @event = JsonSerializer.Deserialize(item.EventData, eventType);
            events.Add(@event!);
        }

        return events;
    }

    public async Task<T> GetAggregateAsync<T>(string aggregateId) where T : AggregateRoot, new()
    {
        var events = await GetEventsAsync(aggregateId);
        
        var aggregate = new T();
        foreach (var @event in events)
        {
            aggregate.Apply(@event);
        }
        
        return aggregate;
    }
}

Aggregate Root Pattern

public abstract class AggregateRoot
{
    private readonly List<IEvent> _uncommittedEvents = new();
    
    public string Id { get; protected set; }
    public int Version { get; protected set; }

    protected abstract void Apply(IEvent @event);

    public IReadOnlyList<IEvent> GetUncommittedEvents() => _uncommittedEvents.AsReadOnly();

    public void ClearUncommittedEvents() => _uncommittedEvents.Clear();

    protected void RaiseEvent(IEvent @event)
    {
        Apply(@event);
        _uncommittedEvents.Add(@event);
    }
}

public class OrderAggregate : AggregateRoot
{
    private OrderState _state = new();

    public void CreateOrder(string customerId, List<OrderItem> items)
    {
        if (_state.Status != OrderStatus.None)
            throw new InvalidOperationException("Order already created");

        RaiseEvent(new OrderCreatedEvent
        {
            OrderId = Guid.NewGuid().ToString(),
            CustomerId = customerId,
            Items = items,
            Total = items.Sum(i => i.Price * i.Quantity),
            CreatedAt = DateTime.UtcNow
        });
    }

    public void AddItem(Product product, int quantity)
    {
        if (_state.Status != OrderStatus.Pending)
            throw new InvalidOperationException("Cannot add items to this order");

        RaiseEvent(new ItemAddedEvent
        {
            OrderId = Id,
            ProductId = product.Id,
            ProductName = product.Name,
            Quantity = quantity,
            Price = product.Price
        });
    }

    public void MarkAsPaid(string paymentId, string paymentMethod)
    {
        if (_state.Status != OrderStatus.Pending)
            throw new InvalidOperationException("Order cannot be paid");

        RaiseEvent(new PaymentProcessedEvent
        {
            OrderId = Id,
            PaymentId = paymentId,
            PaymentMethod = paymentMethod,
            Amount = _state.Total,
            PaidAt = DateTime.UtcNow
        });
    }

    protected override void Apply(IEvent @event)
    {
        switch (@event)
        {
            case OrderCreatedEvent e:
                Id = e.OrderId;
                _state = new OrderState
                {
                    CustomerId = e.CustomerId,
                    Items = e.Items,
                    Total = e.Total,
                    Status = OrderStatus.Pending,
                    CreatedAt = e.CreatedAt
                };
                Version++;
                break;

            case ItemAddedEvent e:
                _state.Items.Add(new OrderItem
                {
                    ProductId = e.ProductId,
                    ProductName = e.ProductName,
                    Quantity = e.Quantity,
                    Price = e.Price
                });
                _state.Total += e.Price * e.Quantity;
                Version++;
                break;

            case PaymentProcessedEvent e:
                _state.Status = OrderStatus.Paid;
                _state.PaymentId = e.PaymentId;
                _state.PaidAt = e.PaidAt;
                Version++;
                break;
        }
    }
}

CQRS with Event Sourcing

Read Model Projections

public class OrderReadModelProjection
{
    private readonly Container _readModelContainer;

    public async Task Project(OrderCreatedEvent @event)
    {
        var orderReadModel = new OrderReadModel
        {
            Id = @event.OrderId,
            CustomerId = @event.CustomerId,
            Items = @event.Items,
            Total = @event.Total,
            Status = "Pending",
            CreatedAt = @event.CreatedAt
        };

        await _readModelContainer.CreateItemAsync(orderReadModel);
    }

    public async Task Project(ItemAddedEvent @event)
    {
        var order = await _readModelContainer.ReadItemAsync<OrderReadModel>(
            @event.OrderId, new PartitionKey(@event.OrderId));

        order.Items.Add(new OrderItemReadModel
        {
            ProductId = @event.ProductId,
            ProductName = @event.ProductName,
            Quantity = @event.Quantity,
            Price = @event.Price
        });
        order.Total += @event.Price * @event.Quantity;

        await _readModelContainer.ReplaceItemAsync(order, @event.OrderId);
    }
}

Azure Functions Projection

public class OrderProjectionFunction
{
    [FunctionName("ProjectOrderEvent")]
    public async Task Run(
        [CosmosDBTrigger(
            databaseName: "EventStore",
            collectionName: "Events",
            ConnectionStringSetting = "CosmosConnection",
            StartFromBeginning = true)]
        IReadOnlyList<OrderEventDocument> events,
        ILogger log)
    {
        var projection = new OrderReadModelProjection();
        
        foreach (var eventDoc in events)
        {
            var eventType = Type.GetType($"MyApp.Events.{eventDoc.EventType}");
            var @event = JsonSerializer.Deserialize(eventDoc.EventData, eventType);
            
            switch (@event)
            {
                case OrderCreatedEvent e:
                    await projection.Project(e);
                    break;
                case ItemAddedEvent e:
                    await projection.Project(e);
                    break;
                case PaymentProcessedEvent e:
                    await projection.Project(e);
                    break;
            }
            
            log.LogInformation("Projected event {EventType} for order {OrderId}",
                eventDoc.EventType, eventDoc.AggregateId);
        }
    }
}

Event Schema Design

Event Structure

public interface IEvent
{
    string EventId { get; }
    string AggregateId { get; }
    int Version { get; }
    DateTime Timestamp { get; }
}

public abstract class EventBase : IEvent
{
    public string EventId { get; set; } = Guid.NewGuid().ToString();
    public abstract string AggregateId { get; }
    public int Version { get; set; }
    public DateTime Timestamp { get; set; } = DateTime.UtcNow;
}

public class OrderCreatedEvent : EventBase
{
    public override string AggregateId { get; set; }
    public string CustomerId { get; set; }
    public List<OrderItem> Items { get; set; }
    public decimal Total { get; set; }
    public DateTime CreatedAt { get; set; }
    
    // Metadata for debugging
    public string CorrelationId { get; set; }
    public string CausationId { get; set; }
    public Dictionary<string, string> CustomMetadata { get; set; }
}

Versioning Strategy

public class EventUpgrader
{
    public IEvent Upgrade(IEvent @event, int fromVersion)
    {
        return fromVersion switch
        {
            1 => UpgradeV1ToV2(@event),
            2 => @event, // Already at latest
            _ => @event
        };
    }

    private IEvent UpgradeV1ToV2(IEvent @event)
    {
        // Add new fields introduced in V2
        if (@event is OrderCreatedEventV1 v1)
        {
            return new OrderCreatedEventV2
            {
                EventId = v1.EventId,
                AggregateId = v1.AggregateId,
                Version = 2,
                Timestamp = v1.Timestamp,
                CustomerId = v1.CustomerId,
                Items = v1.Items,
                Total = v1.Total,
                CreatedAt = v1.CreatedAt,
                // New V2 fields
                Currency = "USD",
                Source = "Web"
            };
        }
        
        return @event;
    }
}

Best Practices

Implementation Checklist

PracticeDescription
Immutable eventsNever modify stored events
Unique IDsEach event needs unique identifier
Version eventsInclude version for compatibility
Idempotent projectionsHandle duplicate events gracefully
Async projectionsDon't block write path
Snapshot for performancePeriodically snapshot aggregate state

Snapshot Strategy

public class SnapshotProjection
{
    public async Task CreateSnapshot(OrderAggregate aggregate)
    {
        var snapshot = new OrderSnapshot
        {
            AggregateId = aggregate.Id,
            Version = aggregate.Version,
            State = aggregate.GetState(), // Serialize current state
            SnapshotTime = DateTime.UtcNow
        };

        await _snapshotContainer.UpsertItemAsync(snapshot);
    }

    public async Task<OrderAggregate> RestoreFromSnapshot(string aggregateId)
    {
        var snapshot = await _snapshotContainer.GetItemQueryIterator<SnapshotDocument>(
            $"SELECT TOP 1 * FROM c WHERE c.aggregateId = '{aggregateId}' ORDER BY c.version DESC")
            .FirstOrDefaultAsync();

        if (snapshot == null)
            return null; // No snapshot, need full replay

        var aggregate = new OrderAggregate();
        aggregate.RestoreFromSnapshot(snapshot.State, snapshot.Version);
        
        return aggregate;
    }
}

Related Topics


Azure Integration Hub - Architect Level Enterprise Integration Patterns