Durable Functions — Aggregator Entity
Stateful Actors, Event Counting, Real-time Totals
Introduction
While orchestrator functions manage workflow execution, Entity functions provide a way to maintain stateful, long-running data structures. Entities are perfect for building aggregators, counters, stateful actors, and any scenario where you need durable state that persists across function executions.
Unlike orchestrations which are about process flow, entities are about data. They provide:
- Durable state — State that survives function restarts and Azure infrastructure updates
- Atomic operations — Operations that are guaranteed to be atomic
- Concurrency control — Built-in handling of concurrent operations
- Low latency reads — Fast access to current state
Entity Fundamentals
What is an Entity?
An entity is a durable object with:
- A unique identity (entityId)
- State that persists across operations
- Operations that can be invoked to read or modify state
// Example: A simple counter entity
[JsonObject(MemberSerialization.OptIn)]
public class Counter
{
public int CurrentValue { get; set; }
// Operation to increment the counter
public void Add(int amount)
{
CurrentValue += amount;
}
// Operation to get current value
public int Get()
{
return CurrentValue;
}
// Operation to reset
public void Reset()
{
CurrentValue = 0;
}
}
Implementing Entity Functions
The Entity Class
using Microsoft.Azure.WebJobs.DurableTask;
[JsonObject(MemberSerialization.OptIn)]
public class OrderAggregator
{
// The entity's state - persists across operations
public int TotalOrders { get; set; }
public decimal TotalRevenue { get; set; }
public List<string> RecentOrderIds { get; set; } = new();
public DateTime LastOrderDate { get; set; }
public Dictionary<string, int> OrdersByStatus { get; set; } = new();
public DateTime LastModified { get; set; }
// Operation: Add a new order
[EntityMethod]
public void AddOrder(OrderInfo order)
{
TotalOrders++;
TotalRevenue += order.TotalAmount;
RecentOrderIds.Add(order.OrderId);
// Keep only last 100 order IDs
if (RecentOrderIds.Count > 100)
{
RecentOrderIds.RemoveAt(0);
}
// Track orders by status
if (!OrdersByStatus.ContainsKey(order.Status))
{
OrdersByStatus[order.Status] = 0;
}
OrdersByStatus[order.Status]++;
LastOrderDate = order.OrderDate;
LastModified = DateTime.UtcNow;
}
// Operation: Update order status
[EntityMethod]
public void UpdateOrderStatus(OrderStatusUpdate update)
{
// Decrement old status count
if (OrdersByStatus.ContainsKey(update.OldStatus))
{
OrdersByStatus[update.OldStatus]--;
}
// Increment new status count
if (!OrdersByStatus.ContainsKey(update.NewStatus))
{
OrdersByStatus[update.NewStatus] = 0;
}
OrdersByStatus[update.NewStatus]++;
LastModified = DateTime.UtcNow;
}
// Operation: Get current snapshot
[EntityMethod]
public OrderAggregatorSnapshot GetSnapshot()
{
return new OrderAggregatorSnapshot
{
TotalOrders = TotalOrders,
TotalRevenue = TotalRevenue,
RecentOrderIds = RecentOrderIds.ToList(),
LastOrderDate = LastOrderDate,
OrdersByStatus = OrdersByStatus.ToDictionary(kvp => kvp.Key, kvp => kvp.Value),
LastModified = LastModified
};
}
// Operation: Reset all data
[EntityMethod]
public void Reset()
{
TotalOrders = 0;
TotalRevenue = 0;
RecentOrderIds.Clear();
OrdersByStatus.Clear();
LastModified = DateTime.UtcNow;
}
}
// Snapshot for read-only operations
public class OrderAggregatorSnapshot
{
public int TotalOrders { get; set; }
public decimal TotalRevenue { get; set; }
public List<string> RecentOrderIds { get; set; }
public DateTime LastOrderDate { get; set; }
public Dictionary<string, int> OrdersByStatus { get; set; }
public DateTime LastModified { get; set; }
}
Entity Trigger Function
[FunctionName("OrderAggregatorEntity")]
public static async Task OrderAggregatorEntity(
[EntityTrigger] IDurableEntityContext context)
{
// Dispatch to the entity class
context.DispatchAsync<OrderAggregator>();
}
Calling Entities from Client Code
Sending Signals (Fire-and-Forget)
Signals are one-way operations that don't wait for a response:
[FunctionName("OrderCreatedTrigger")]
public static async Task OrderCreatedTrigger(
[QueueTrigger("orders")] OrderInfo order,
[DurableClient] IDurableEntityClient client)
{
var entityId = new EntityId("OrderAggregatorEntity", "daily-orders");
// Signal the entity - doesn't wait for response
await client.SignalEntityAsync(entityId, "AddOrder", order);
}
Calling for Response (Request-Reply)
When you need the result back:
[FunctionName("GetOrderStats")]
public static async Task<OrderAggregatorSnapshot> GetOrderStats(
[HttpTrigger] HttpRequest req,
[DurableClient] IDurableEntityClient client)
{
var entityId = new EntityId("OrderAggregatorEntity", "daily-orders");
// Call the entity and get response
var result = await client.ReadEntityStateAsync<OrderAggregatorSnapshot>(entityId);
return result.EntityState;
}
Real-World Examples
Example 1: Real-Time Analytics Dashboard
// Entity that maintains real-time metrics
[JsonObject(MemberSerialization.OptIn)]
public class AnalyticsEntity
{
public long PageViews { get; set; }
public long UniqueVisitors { get; set; }
public decimal AverageSessionDuration { get; set; }
public Dictionary<string, long> ViewsByPage { get; set; } = new();
public Dictionary<string, long> ViewsByCountry { get; set; } = new();
public List<RecentEvent> RecentEvents { get; set; } = new();
public DateTime LastUpdated { get; set; }
[EntityMethod]
public void RecordPageView(PageViewEvent pageView)
{
PageViews++;
// Track by page
if (!ViewsByPage.ContainsKey(pageView.PageUrl))
ViewsByPage[pageView.PageUrl] = 0;
ViewsByPage[pageView.PageUrl]++;
// Track by country
if (!ViewsByCountry.ContainsKey(pageView.Country))
ViewsByCountry[pageView.Country] = 0;
ViewsByCountry[pageView.Country]++;
// Track recent events
RecentEvents.Add(new RecentEvent
{
Timestamp = pageView.Timestamp,
EventType = "PageView",
PageUrl = pageView.PageUrl
});
// Keep only last 1000 events
if (RecentEvents.Count > 1000)
RecentEvents.RemoveAt(0);
LastUpdated = DateTime.UtcNow;
}
[EntityMethod]
public void RecordSession(SessionMetrics metrics)
{
// Update running average
var currentTotal = AverageSessionDuration * UniqueVisitors;
UniqueVisitors++;
AverageSessionDuration = (currentTotal + metrics.DurationSeconds) / UniqueVisitors;
LastUpdated = DateTime.UtcNow;
}
[EntityMethod]
public AnalyticsSnapshot GetSnapshot()
{
return new AnalyticsSnapshot
{
PageViews = PageViews,
UniqueVisitors = UniqueVisitors,
AverageSessionDurationSeconds = AverageSessionDuration,
TopPages = ViewsByPage.OrderByDescending(x => x.Value).Take(10).ToDictionary(x => x.Key, x => x.Value),
ViewsByCountry = ViewsByCountry.ToDictionary(x => x.Key, x => x.Value),
RecentEvents = RecentEvents.Take(100).ToList(),
LastUpdated = LastUpdated
};
}
}
// Trigger function for page views
[FunctionName("PageViewQueueTrigger")]
public static async Task RecordPageView(
[QueueTrigger("pageviews")] PageViewEvent pageView,
[DurableClient] IDurableEntityClient client)
{
var entityId = new EntityId("AnalyticsEntity", "website-analytics");
await client.SignalEntityAsync(entityId, "RecordPageView", pageView);
}
Example 2: Inventory Management
// Entity that tracks inventory levels
[JsonObject(MemberSerialization.OptIn)]
public class InventoryEntity
{
public Dictionary<string, InventoryItem> Items { get; set; } = new();
public DateTime LastRestockDate { get; set; }
public int TotalStockValue { get; set; }
[EntityMethod]
public void AddStock(StockOperation operation)
{
if (!Items.ContainsKey(operation.ProductId))
{
Items[operation.ProductId] = new InventoryItem
{
ProductId = operation.ProductId,
ProductName = operation.ProductName,
Quantity = 0,
UnitPrice = operation.UnitPrice
};
}
var item = Items[operation.ProductId];
item.Quantity += operation.Quantity;
if (item.Quantity > item.ReorderLevel && !item.NeedsReorder)
{
item.NeedsReorder = false;
}
// Check if below reorder level after adding
if (item.Quantity <= item.ReorderLevel)
{
item.NeedsReorder = true;
}
TotalStockValue = Items.Values.Sum(i => i.Quantity * i.UnitPrice);
}
[EntityMethod]
public void ReserveStock(ReservationRequest request)
{
if (!Items.ContainsKey(request.ProductId))
{
throw new InvalidOperationException($"Product {request.ProductId} not found");
}
var item = Items[request.ProductId];
if (item.Quantity < request.Quantity)
{
throw new InvalidOperationException($"Insufficient stock for {request.ProductId}");
}
item.Quantity -= request.Quantity;
if (item.Quantity <= item.ReorderLevel)
{
item.NeedsReorder = true;
}
TotalStockValue = Items.Values.Sum(i => i.Quantity * i.UnitPrice);
}
[EntityMethod]
public InventoryStatus GetStatus(string productId)
{
if (!Items.ContainsKey(productId))
{
return null;
}
var item = Items[productId];
return new InventoryStatus
{
ProductId = productId,
ProductName = item.ProductName,
Quantity = item.Quantity,
NeedsReorder = item.NeedsReorder,
StockValue = item.Quantity * item.UnitPrice
};
}
[EntityMethod]
public List<InventoryStatus> GetLowStockItems()
{
return Items.Values
.Where(i => i.NeedsReorder)
.Select(i => new InventoryStatus
{
ProductId = i.ProductId,
ProductName = i.ProductName,
Quantity = i.Quantity,
NeedsReorder = true
})
.ToList();
}
}
Entity Concurrency
Entities handle concurrency automatically:
// If multiple operations arrive simultaneously:
// - They are serialized (one at a time)
// - The state is always consistent
// - No race conditions possible
[FunctionName("ConcurrentStockUpdate")]
public static async Task UpdateStock(
[QueueTrigger("stock-updates")] StockUpdate update,
[DurableClient] IDurableEntityClient client)
{
// Even if 1000 messages come in at once,
// each operation executes atomically
var entityId = new EntityId("InventoryEntity", "warehouse-1");
await client.SignalEntityAsync(entityId, "AddStock", update);
}
Using Entities with Orchestrations
Combine entities with orchestrations for complex workflows:
[FunctionName("OrderProcessingWithEntity")]
public static async Task<OrderProcessingResult> OrderProcessingWithEntity(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var order = context.GetInput<Order>();
// Reserve inventory using entity
var inventoryEntity = new EntityId("InventoryEntity", "warehouse-1");
try
{
// Try to reserve - this is a signal, so we use CallActivity as workaround
await context.CallActivityAsync("ReserveInventoryActivity",
new { order, EntityId = inventoryEntity });
}
catch (Exception ex)
{
return new OrderProcessingResult
{
Success = false,
FailureReason = "Insufficient inventory"
};
}
// Process payment
await context.CallActivityAsync("ProcessPayment", order);
// Update the entity with completed order
await context.CallActivityAsync("RecordCompletedOrder",
new { order, EntityId = inventoryEntity });
// Get updated inventory status
var status = await context.CallEntityAsync<InventoryStatus>(
inventoryEntity, "GetStatus", order.Items.First().ProductId);
return new OrderProcessingResult { Success = true, RemainingStock = status.Quantity };
}
Best Practices
| Practice | Description |
|---|---|
| Keep entities focused | One entity per aggregate root |
| Use meaningful entity IDs | e.g., "daily-orders-2024-01-15" |
| Limit state size | Don't store large collections |
| Use signals for writes | More efficient than calls |
| Use calls for reads | When you need the result |
Entity vs Orchestrator Comparison
| Feature | Entity | Orchestrator |
|---|---|---|
| Purpose | Maintain state | Control workflow |
| Operations | Add, Update, Get | Call activities |
| State | Persistent object | Event history |
| Invocation | Signal or Call | Await tasks |
| Concurrency | Automatic | Manual |
Performance Considerations
- Signals are cheaper than calls — use signals when you don't need a response
- Batching — Group multiple signals if possible
- State size — Keep entity state under 64KB for optimal performance
Azure Integration Hub - Advanced Level Durable Functions Series - Part 3 of 3