Durable Functions — Implementing Entity Patterns and State Management
The Problem
Your system needs:
- Reliable long-running workflows that survive process restarts
- State management across multiple workflow executions
- Coordination of distributed transactions
- Tracking of complex multi-step processes
You need Durable Functions entity patterns.
Solution Implementation
Step 1: Entity Basics
// entities/Counter.ts - Simple counter entity
import { EntityFunctionContext } from "@azure/functions";
interface CounterState {
value: number;
}
export const entityFunction = function (context: EntityFunctionContext): void {
const state = context.df.getState<CounterState>() || { value: 0 };
switch (context.df.operationName) {
case "add":
state.value += context.df.getInput<number>() || 1;
break;
case "reset":
state.value = 0;
break;
case "get":
context.df.return(state.value);
break;
}
context.df.setState(state);
};
// Client to interact with entity
import { DurableEntityClient } from "@azure/durable-functions";
const client = new DurableEntityClient(durableClient);
async function incrementCounter(entityId: string): Promise<void> {
const entityKey = `counter-${entityId}`;
const entityId2 = new EntityId("Counter", entityKey);
await client.signalEntity(entityId2, "add", 1);
}
async function getCounterValue(entityId: string): Promise<number> {
const entityKey = `counter-${entityId}`;
const entityId2 = new EntityId("Counter", entityKey);
const response = await client.readEntityState<number>(entityId2);
return response.entityState;
}
Step 2: Queue Entity Pattern
// entities/Queue.ts - Queue entity
interface QueueState<T> {
items: T[];
maxSize: number;
}
export const queueEntity = function <T>(context: EntityFunctionContext): void {
const state = context.df.getState<QueueState<T>>() || { items: [], maxSize: 100 };
switch (context.df.operationName) {
case "enqueue":
if (state.items.length >= state.maxSize) {
throw new Error("Queue is full");
}
const item = context.df.getInput<T>();
state.items.push(item);
context.df.return(state.items.length);
break;
case "dequeue":
if (state.items.length === 0) {
context.df.return(null);
} else {
const item = state.items.shift();
context.df.return(item);
}
break;
case "peek":
context.df.return(state.items[0] || null);
break;
case "size":
context.df.return(state.items.length);
break;
case "clear":
state.items = [];
context.df.return(state.items.length);
break;
}
context.df.setState(state);
};
// Usage in orchestrator
const queueEntityId = new EntityId("TaskQueue", "pending-tasks");
const orchestrator = function (context: DFOrchestrationContext): void {
const queueId = context.df.getInput<string>();
const entityId = new EntityId("TaskQueue", queueId);
// Enqueue tasks
const tasks = ["task1", "task2", "task3"];
for (const task of tasks) {
context.df.signalEntity(entityId, "enqueue", task);
}
// Process all tasks
let task;
do {
task = context.df.callEntity(entityId, "dequeue", null);
if (task) {
yield context.df.callActivity("ProcessTask", task);
}
} while (task !== null);
};
Step 3: Distributed Lock Entity
// entities/DistributedLock.ts
interface LockState {
owner: string;
acquiredAt: string;
expiresAt: string;
}
export const distributedLock = function (context: EntityFunctionContext): void {
const state = context.df.getState<LockState>();
switch (context.df.operationName) {
case "acquire":
const input = context.df.getInput<{ owner: string; timeout: number }>();
if (state && new Date(state.expiresAt) > new Date() && state.owner !== input.owner) {
context.df.return({ success: false, reason: "Already locked" });
} else {
const expiresAt = new Date(Date.now() + input.timeout * 1000);
context.df.setState({
owner: input.owner,
acquiredAt: new Date().toISOString(),
expiresAt: expiresAt.toISOString()
});
context.df.return({ success: true });
}
break;
case "release":
const releaseOwner = context.df.getInput<string>();
if (state && state.owner === releaseOwner) {
context.df.setState(null); // Clear state
context.df.return({ success: true });
} else {
context.df.return({ success: false, reason: "Not owner" });
}
break;
case "forceRelease":
// Admin operation to force release
context.df.setState(null);
context.df.return({ success: true });
break;
case "status":
context.df.return({
isLocked: state && new Date(state.expiresAt) > new Date(),
owner: state?.owner,
expiresAt: state?.expiresAt
});
break;
}
};
Step 4: Reliable Workflow with Checkpointing
// orchestrators/OrderProcessing.ts
const orderOrchestrator = function (context: DFOrchestrationContext): Promise<any> {
const order = context.df.getInput<Order>();
const result: OrderProcessingResult = {
orderId: order.id,
steps: [],
completed: false
};
try {
// Step 1: Validate order
context.df.setCustomStatus("Validating order");
yield context.df.callActivity("ValidateOrder", order);
result.steps.push({ step: "validate", status: "completed" });
// Step 2: Reserve inventory (with checkpoint)
context.df.setCustomStatus("Reserving inventory");
const inventoryResult = yield context.df.callActivityWithRetry("ReserveInventory", {
retryOptions: {
firstRetryIntervalInMilliseconds: 500,
maxNumberOfAttempts: 3
}
}, order.items);
result.steps.push({ step: "inventory", status: "completed", result: inventoryResult });
// Step 3: Process payment
context.df.setCustomStatus("Processing payment");
const paymentResult = yield context.df.callActivity("ProcessPayment", {
orderId: order.id,
amount: order.total
});
result.steps.push({ step: "payment", status: "completed", result: paymentResult });
// Step 4: Create shipment
context.df.setCustomStatus("Creating shipment");
const shipment = yield context.df.callActivity("CreateShipment", order);
result.steps.push({ step: "shipment", status: "completed", result: shipment });
// Step 5: Send notification
yield context.df.callActivity("SendOrderNotification", order);
result.completed = true;
context.df.setCustomStatus("Order completed successfully");
}
catch (error) {
result.error = error.message;
result.steps.push({ step: "failed", status: "failed", error: error.message });
// Compensate for completed steps
if (result.steps.find(s => s.step === "inventory")) {
yield context.df.callActivity("ReleaseInventory", order.items);
}
if (result.steps.find(s => s.step === "payment")) {
yield context.df.callActivity("RefundPayment", order.id);
}
context.df.setCustomStatus("Order failed, compensation complete");
}
return result;
};
Step 5: Timer-Based Cleanup Entity
// entities/SessionManager.ts
interface SessionState {
sessions: Map<string, { createdAt: string; lastAccessed: string }>;
}
export const sessionManager = function (context: EntityFunctionContext): void {
const state = context.df.getState<SessionState>() || { sessions: new Map() };
switch (context.df.operationName) {
case "create":
const sessionId = context.df.getInput<string>();
state.sessions.set(sessionId, {
createdAt: new Date().toISOString(),
lastAccessed: new Date().toISOString()
});
context.df.return(sessionId);
break;
case "touch":
const touchId = context.df.getInput<string>();
const session = state.sessions.get(touchId);
if (session) {
session.lastAccessed = new Date().toISOString();
}
break;
case "delete":
const deleteId = context.df.getInput<string>();
state.sessions.delete(deleteId);
context.df.return(true);
break;
case "cleanup":
const maxAge = context.df.getInput<number>() || 3600000; // 1 hour
const now = Date.now();
let cleaned = 0;
for (const [id, session] of state.sessions) {
const lastAccessed = new Date(session.lastAccessed).getTime();
if (now - lastAccessed > maxAge) {
state.sessions.delete(id);
cleaned++;
}
}
context.df.return({ cleaned, remaining: state.sessions.size });
break;
case "count":
context.df.return(state.sessions.size);
break;
}
context.df.setState(state);
};
// Timer trigger to cleanup old sessions
const cleanupTimer = function (context: Context, timer: Timer): void {
const entityId = new EntityId("SessionManager", "main");
context.df.signalEntity(entityId, "cleanup", 3600000); // 1 hour
};
Step 6: Fan-Out/Fan-In Pattern
// orchestrators/BatchProcessing.ts
const batchOrchestrator = function (context: DFOrchestrationContext): Promise<BatchResult> {
const input = context.df.getInput<{ items: string[]; batchSize: number }>();
const results: any[] = [];
// Fan-Out: Split items into batches
const batches = chunkArray(input.items, input.batchSize);
const tasks: any[] = [];
for (let i = 0; i < batches.length; i++) {
const task = context.df.callActivity("ProcessBatch", {
batchId: i,
items: batches[i]
});
tasks.push(task);
}
// Fan-In: Wait for all to complete
const batchResults = yield context.df.taskAll(tasks);
// Aggregate results
const totalProcessed = batchResults.reduce((sum, r) => sum + r.processed, 0);
const totalFailed = batchResults.reduce((sum, r) => sum + r.failed, 0);
return {
totalBatches: batches.length,
totalProcessed,
totalFailed,
results: batchResults
};
};
function chunkArray<T>(array: T[], size: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
}
Summary
- Use entity functions for persistent state
- Implement queue, lock, and session patterns
- Use orchestrators for workflow coordination
- Add checkpointing with retry policies
- Implement fan-out/fan-in for parallel processing