Durable Functions — Aggregator Entity

Stateful Actors, Event Counting, Real-time Totals


Introduction

While orchestrator functions manage workflow execution, Entity functions provide a way to maintain stateful, long-running data structures. Entities are perfect for building aggregators, counters, stateful actors, and any scenario where you need durable state that persists across function executions.

Unlike orchestrations which are about process flow, entities are about data. They provide:

  • Durable state — State that survives function restarts and Azure infrastructure updates
  • Atomic operations — Operations that are guaranteed to be atomic
  • Concurrency control — Built-in handling of concurrent operations
  • Low latency reads — Fast access to current state

Entity Fundamentals

What is an Entity?

An entity is a durable object with:

  • A unique identity (entityId)
  • State that persists across operations
  • Operations that can be invoked to read or modify state
// Example: A simple counter entity
[JsonObject(MemberSerialization.OptIn)]
public class Counter
{
    public int CurrentValue { get; set; }
    
    // Operation to increment the counter
    public void Add(int amount)
    {
        CurrentValue += amount;
    }
    
    // Operation to get current value
    public int Get()
    {
        return CurrentValue;
    }
    
    // Operation to reset
    public void Reset()
    {
        CurrentValue = 0;
    }
}

Implementing Entity Functions

The Entity Class

using Microsoft.Azure.WebJobs.DurableTask;

[JsonObject(MemberSerialization.OptIn)]
public class OrderAggregator
{
    // The entity's state - persists across operations
    public int TotalOrders { get; set; }
    public decimal TotalRevenue { get; set; }
    public List<string> RecentOrderIds { get; set; } = new();
    public DateTime LastOrderDate { get; set; }
    public Dictionary<string, int> OrdersByStatus { get; set; } = new();
    public DateTime LastModified { get; set; }
    
    // Operation: Add a new order
    [EntityMethod]
    public void AddOrder(OrderInfo order)
    {
        TotalOrders++;
        TotalRevenue += order.TotalAmount;
        RecentOrderIds.Add(order.OrderId);
        
        // Keep only last 100 order IDs
        if (RecentOrderIds.Count > 100)
        {
            RecentOrderIds.RemoveAt(0);
        }
        
        // Track orders by status
        if (!OrdersByStatus.ContainsKey(order.Status))
        {
            OrdersByStatus[order.Status] = 0;
        }
        OrdersByStatus[order.Status]++;
        
        LastOrderDate = order.OrderDate;
        LastModified = DateTime.UtcNow;
    }
    
    // Operation: Update order status
    [EntityMethod]
    public void UpdateOrderStatus(OrderStatusUpdate update)
    {
        // Decrement old status count
        if (OrdersByStatus.ContainsKey(update.OldStatus))
        {
            OrdersByStatus[update.OldStatus]--;
        }
        
        // Increment new status count
        if (!OrdersByStatus.ContainsKey(update.NewStatus))
        {
            OrdersByStatus[update.NewStatus] = 0;
        }
        OrdersByStatus[update.NewStatus]++;
        
        LastModified = DateTime.UtcNow;
    }
    
    // Operation: Get current snapshot
    [EntityMethod]
    public OrderAggregatorSnapshot GetSnapshot()
    {
        return new OrderAggregatorSnapshot
        {
            TotalOrders = TotalOrders,
            TotalRevenue = TotalRevenue,
            RecentOrderIds = RecentOrderIds.ToList(),
            LastOrderDate = LastOrderDate,
            OrdersByStatus = OrdersByStatus.ToDictionary(kvp => kvp.Key, kvp => kvp.Value),
            LastModified = LastModified
        };
    }
    
    // Operation: Reset all data
    [EntityMethod]
    public void Reset()
    {
        TotalOrders = 0;
        TotalRevenue = 0;
        RecentOrderIds.Clear();
        OrdersByStatus.Clear();
        LastModified = DateTime.UtcNow;
    }
}

// Snapshot for read-only operations
public class OrderAggregatorSnapshot
{
    public int TotalOrders { get; set; }
    public decimal TotalRevenue { get; set; }
    public List<string> RecentOrderIds { get; set; }
    public DateTime LastOrderDate { get; set; }
    public Dictionary<string, int> OrdersByStatus { get; set; }
    public DateTime LastModified { get; set; }
}

Entity Trigger Function

[FunctionName("OrderAggregatorEntity")]
public static async Task OrderAggregatorEntity(
    [EntityTrigger] IDurableEntityContext context)
{
    // Dispatch to the entity class
    context.DispatchAsync<OrderAggregator>();
}

Calling Entities from Client Code

Sending Signals (Fire-and-Forget)

Signals are one-way operations that don't wait for a response:

[FunctionName("OrderCreatedTrigger")]
public static async Task OrderCreatedTrigger(
    [QueueTrigger("orders")] OrderInfo order,
    [DurableClient] IDurableEntityClient client)
{
    var entityId = new EntityId("OrderAggregatorEntity", "daily-orders");
    
    // Signal the entity - doesn't wait for response
    await client.SignalEntityAsync(entityId, "AddOrder", order);
}

Calling for Response (Request-Reply)

When you need the result back:

[FunctionName("GetOrderStats")]
public static async Task<OrderAggregatorSnapshot> GetOrderStats(
    [HttpTrigger] HttpRequest req,
    [DurableClient] IDurableEntityClient client)
{
    var entityId = new EntityId("OrderAggregatorEntity", "daily-orders");
    
    // Call the entity and get response
    var result = await client.ReadEntityStateAsync<OrderAggregatorSnapshot>(entityId);
    
    return result.EntityState;
}

Real-World Examples

Example 1: Real-Time Analytics Dashboard

// Entity that maintains real-time metrics
[JsonObject(MemberSerialization.OptIn)]
public class AnalyticsEntity
{
    public long PageViews { get; set; }
    public long UniqueVisitors { get; set; }
    public decimal AverageSessionDuration { get; set; }
    public Dictionary<string, long> ViewsByPage { get; set; } = new();
    public Dictionary<string, long> ViewsByCountry { get; set; } = new();
    public List<RecentEvent> RecentEvents { get; set; } = new();
    public DateTime LastUpdated { get; set; }
    
    [EntityMethod]
    public void RecordPageView(PageViewEvent pageView)
    {
        PageViews++;
        
        // Track by page
        if (!ViewsByPage.ContainsKey(pageView.PageUrl))
            ViewsByPage[pageView.PageUrl] = 0;
        ViewsByPage[pageView.PageUrl]++;
        
        // Track by country
        if (!ViewsByCountry.ContainsKey(pageView.Country))
            ViewsByCountry[pageView.Country] = 0;
        ViewsByCountry[pageView.Country]++;
        
        // Track recent events
        RecentEvents.Add(new RecentEvent
        {
            Timestamp = pageView.Timestamp,
            EventType = "PageView",
            PageUrl = pageView.PageUrl
        });
        
        // Keep only last 1000 events
        if (RecentEvents.Count > 1000)
            RecentEvents.RemoveAt(0);
        
        LastUpdated = DateTime.UtcNow;
    }
    
    [EntityMethod]
    public void RecordSession(SessionMetrics metrics)
    {
        // Update running average
        var currentTotal = AverageSessionDuration * UniqueVisitors;
        UniqueVisitors++;
        AverageSessionDuration = (currentTotal + metrics.DurationSeconds) / UniqueVisitors;
        
        LastUpdated = DateTime.UtcNow;
    }
    
    [EntityMethod]
    public AnalyticsSnapshot GetSnapshot()
    {
        return new AnalyticsSnapshot
        {
            PageViews = PageViews,
            UniqueVisitors = UniqueVisitors,
            AverageSessionDurationSeconds = AverageSessionDuration,
            TopPages = ViewsByPage.OrderByDescending(x => x.Value).Take(10).ToDictionary(x => x.Key, x => x.Value),
            ViewsByCountry = ViewsByCountry.ToDictionary(x => x.Key, x => x.Value),
            RecentEvents = RecentEvents.Take(100).ToList(),
            LastUpdated = LastUpdated
        };
    }
}

// Trigger function for page views
[FunctionName("PageViewQueueTrigger")]
public static async Task RecordPageView(
    [QueueTrigger("pageviews")] PageViewEvent pageView,
    [DurableClient] IDurableEntityClient client)
{
    var entityId = new EntityId("AnalyticsEntity", "website-analytics");
    await client.SignalEntityAsync(entityId, "RecordPageView", pageView);
}

Example 2: Inventory Management

// Entity that tracks inventory levels
[JsonObject(MemberSerialization.OptIn)]
public class InventoryEntity
{
    public Dictionary<string, InventoryItem> Items { get; set; } = new();
    public DateTime LastRestockDate { get; set; }
    public int TotalStockValue { get; set; }
    
    [EntityMethod]
    public void AddStock(StockOperation operation)
    {
        if (!Items.ContainsKey(operation.ProductId))
        {
            Items[operation.ProductId] = new InventoryItem
            {
                ProductId = operation.ProductId,
                ProductName = operation.ProductName,
                Quantity = 0,
                UnitPrice = operation.UnitPrice
            };
        }
        
        var item = Items[operation.ProductId];
        item.Quantity += operation.Quantity;
        
        if (item.Quantity > item.ReorderLevel && !item.NeedsReorder)
        {
            item.NeedsReorder = false;
        }
        
        // Check if below reorder level after adding
        if (item.Quantity <= item.ReorderLevel)
        {
            item.NeedsReorder = true;
        }
        
        TotalStockValue = Items.Values.Sum(i => i.Quantity * i.UnitPrice);
    }
    
    [EntityMethod]
    public void ReserveStock(ReservationRequest request)
    {
        if (!Items.ContainsKey(request.ProductId))
        {
            throw new InvalidOperationException($"Product {request.ProductId} not found");
        }
        
        var item = Items[request.ProductId];
        
        if (item.Quantity < request.Quantity)
        {
            throw new InvalidOperationException($"Insufficient stock for {request.ProductId}");
        }
        
        item.Quantity -= request.Quantity;
        
        if (item.Quantity <= item.ReorderLevel)
        {
            item.NeedsReorder = true;
        }
        
        TotalStockValue = Items.Values.Sum(i => i.Quantity * i.UnitPrice);
    }
    
    [EntityMethod]
    public InventoryStatus GetStatus(string productId)
    {
        if (!Items.ContainsKey(productId))
        {
            return null;
        }
        
        var item = Items[productId];
        return new InventoryStatus
        {
            ProductId = productId,
            ProductName = item.ProductName,
            Quantity = item.Quantity,
            NeedsReorder = item.NeedsReorder,
            StockValue = item.Quantity * item.UnitPrice
        };
    }
    
    [EntityMethod]
    public List<InventoryStatus> GetLowStockItems()
    {
        return Items.Values
            .Where(i => i.NeedsReorder)
            .Select(i => new InventoryStatus
            {
                ProductId = i.ProductId,
                ProductName = i.ProductName,
                Quantity = i.Quantity,
                NeedsReorder = true
            })
            .ToList();
    }
}

Entity Concurrency

Entities handle concurrency automatically:

// If multiple operations arrive simultaneously:
// - They are serialized (one at a time)
// - The state is always consistent
// - No race conditions possible

[FunctionName("ConcurrentStockUpdate")]
public static async Task UpdateStock(
    [QueueTrigger("stock-updates")] StockUpdate update,
    [DurableClient] IDurableEntityClient client)
{
    // Even if 1000 messages come in at once,
    // each operation executes atomically
    var entityId = new EntityId("InventoryEntity", "warehouse-1");
    await client.SignalEntityAsync(entityId, "AddStock", update);
}

Using Entities with Orchestrations

Combine entities with orchestrations for complex workflows:

[FunctionName("OrderProcessingWithEntity")]
public static async Task<OrderProcessingResult> OrderProcessingWithEntity(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var order = context.GetInput<Order>();
    
    // Reserve inventory using entity
    var inventoryEntity = new EntityId("InventoryEntity", "warehouse-1");
    
    try
    {
        // Try to reserve - this is a signal, so we use CallActivity as workaround
        await context.CallActivityAsync("ReserveInventoryActivity", 
            new { order, EntityId = inventoryEntity });
    }
    catch (Exception ex)
    {
        return new OrderProcessingResult 
        { 
            Success = false, 
            FailureReason = "Insufficient inventory" 
        };
    }
    
    // Process payment
    await context.CallActivityAsync("ProcessPayment", order);
    
    // Update the entity with completed order
    await context.CallActivityAsync("RecordCompletedOrder",
        new { order, EntityId = inventoryEntity });
    
    // Get updated inventory status
    var status = await context.CallEntityAsync<InventoryStatus>(
        inventoryEntity, "GetStatus", order.Items.First().ProductId);
    
    return new OrderProcessingResult { Success = true, RemainingStock = status.Quantity };
}

Best Practices

PracticeDescription
Keep entities focusedOne entity per aggregate root
Use meaningful entity IDse.g., "daily-orders-2024-01-15"
Limit state sizeDon't store large collections
Use signals for writesMore efficient than calls
Use calls for readsWhen you need the result

Entity vs Orchestrator Comparison

FeatureEntityOrchestrator
PurposeMaintain stateControl workflow
OperationsAdd, Update, GetCall activities
StatePersistent objectEvent history
InvocationSignal or CallAwait tasks
ConcurrencyAutomaticManual

Performance Considerations

  • Signals are cheaper than calls — use signals when you don't need a response
  • Batching — Group multiple signals if possible
  • State size — Keep entity state under 64KB for optimal performance

Azure Integration Hub - Advanced Level Durable Functions Series - Part 3 of 3