← Back to ArticlesFunctions

Durable Functions — Implementing Entity Patterns and State Management

Building stateful workflows with Durable Functions entities, counters, queues, and reliable long-running operations.

Durable Functions — Implementing Entity Patterns and State Management

The Problem

Your system needs:

You need Durable Functions entity patterns.

Solution Implementation

Step 1: Entity Basics

// entities/Counter.ts - Simple counter entity
import { EntityFunctionContext } from "@azure/functions";

interface CounterState {
    value: number;
}

export const entityFunction = function (context: EntityFunctionContext): void {
    const state = context.df.getState<CounterState>() || { value: 0 };
    
    switch (context.df.operationName) {
        case "add":
            state.value += context.df.getInput<number>() || 1;
            break;
        case "reset":
            state.value = 0;
            break;
        case "get":
            context.df.return(state.value);
            break;
    }
    
    context.df.setState(state);
};

// Client to interact with entity
import { DurableEntityClient } from "@azure/durable-functions";

const client = new DurableEntityClient(durableClient);

async function incrementCounter(entityId: string): Promise<void> {
    const entityKey = `counter-${entityId}`;
    const entityId2 = new EntityId("Counter", entityKey);
    
    await client.signalEntity(entityId2, "add", 1);
}

async function getCounterValue(entityId: string): Promise<number> {
    const entityKey = `counter-${entityId}`;
    const entityId2 = new EntityId("Counter", entityKey);
    
    const response = await client.readEntityState<number>(entityId2);
    return response.entityState;
}

Step 2: Queue Entity Pattern

// entities/Queue.ts - Queue entity
interface QueueState<T> {
    items: T[];
    maxSize: number;
}

export const queueEntity = function <T>(context: EntityFunctionContext): void {
    const state = context.df.getState<QueueState<T>>() || { items: [], maxSize: 100 };
    
    switch (context.df.operationName) {
        case "enqueue":
            if (state.items.length >= state.maxSize) {
                throw new Error("Queue is full");
            }
            const item = context.df.getInput<T>();
            state.items.push(item);
            context.df.return(state.items.length);
            break;
            
        case "dequeue":
            if (state.items.length === 0) {
                context.df.return(null);
            } else {
                const item = state.items.shift();
                context.df.return(item);
            }
            break;
            
        case "peek":
            context.df.return(state.items[0] || null);
            break;
            
        case "size":
            context.df.return(state.items.length);
            break;
            
        case "clear":
            state.items = [];
            context.df.return(state.items.length);
            break;
    }
    
    context.df.setState(state);
};

// Usage in orchestrator
const queueEntityId = new EntityId("TaskQueue", "pending-tasks");

const orchestrator = function (context: DFOrchestrationContext): void {
    const queueId = context.df.getInput<string>();
    const entityId = new EntityId("TaskQueue", queueId);
    
    // Enqueue tasks
    const tasks = ["task1", "task2", "task3"];
    for (const task of tasks) {
        context.df.signalEntity(entityId, "enqueue", task);
    }
    
    // Process all tasks
    let task;
    do {
        task = context.df.callEntity(entityId, "dequeue", null);
        if (task) {
            yield context.df.callActivity("ProcessTask", task);
        }
    } while (task !== null);
};

Step 3: Distributed Lock Entity

// entities/DistributedLock.ts
interface LockState {
    owner: string;
    acquiredAt: string;
    expiresAt: string;
}

export const distributedLock = function (context: EntityFunctionContext): void {
    const state = context.df.getState<LockState>();
    
    switch (context.df.operationName) {
        case "acquire":
            const input = context.df.getInput<{ owner: string; timeout: number }>();
            
            if (state && new Date(state.expiresAt) > new Date() && state.owner !== input.owner) {
                context.df.return({ success: false, reason: "Already locked" });
            } else {
                const expiresAt = new Date(Date.now() + input.timeout * 1000);
                context.df.setState({
                    owner: input.owner,
                    acquiredAt: new Date().toISOString(),
                    expiresAt: expiresAt.toISOString()
                });
                context.df.return({ success: true });
            }
            break;
            
        case "release":
            const releaseOwner = context.df.getInput<string>();
            
            if (state && state.owner === releaseOwner) {
                context.df.setState(null); // Clear state
                context.df.return({ success: true });
            } else {
                context.df.return({ success: false, reason: "Not owner" });
            }
            break;
            
        case "forceRelease":
            // Admin operation to force release
            context.df.setState(null);
            context.df.return({ success: true });
            break;
            
        case "status":
            context.df.return({
                isLocked: state && new Date(state.expiresAt) > new Date(),
                owner: state?.owner,
                expiresAt: state?.expiresAt
            });
            break;
    }
};

Step 4: Reliable Workflow with Checkpointing

// orchestrators/OrderProcessing.ts
const orderOrchestrator = function (context: DFOrchestrationContext): Promise<any> {
    const order = context.df.getInput<Order>();
    const result: OrderProcessingResult = {
        orderId: order.id,
        steps: [],
        completed: false
    };
    
    try {
        // Step 1: Validate order
        context.df.setCustomStatus("Validating order");
        yield context.df.callActivity("ValidateOrder", order);
        result.steps.push({ step: "validate", status: "completed" });
        
        // Step 2: Reserve inventory (with checkpoint)
        context.df.setCustomStatus("Reserving inventory");
        const inventoryResult = yield context.df.callActivityWithRetry("ReserveInventory", {
            retryOptions: {
                firstRetryIntervalInMilliseconds: 500,
                maxNumberOfAttempts: 3
            }
        }, order.items);
        result.steps.push({ step: "inventory", status: "completed", result: inventoryResult });
        
        // Step 3: Process payment
        context.df.setCustomStatus("Processing payment");
        const paymentResult = yield context.df.callActivity("ProcessPayment", {
            orderId: order.id,
            amount: order.total
        });
        result.steps.push({ step: "payment", status: "completed", result: paymentResult });
        
        // Step 4: Create shipment
        context.df.setCustomStatus("Creating shipment");
        const shipment = yield context.df.callActivity("CreateShipment", order);
        result.steps.push({ step: "shipment", status: "completed", result: shipment });
        
        // Step 5: Send notification
        yield context.df.callActivity("SendOrderNotification", order);
        
        result.completed = true;
        context.df.setCustomStatus("Order completed successfully");
    }
    catch (error) {
        result.error = error.message;
        result.steps.push({ step: "failed", status: "failed", error: error.message });
        
        // Compensate for completed steps
        if (result.steps.find(s => s.step === "inventory")) {
            yield context.df.callActivity("ReleaseInventory", order.items);
        }
        if (result.steps.find(s => s.step === "payment")) {
            yield context.df.callActivity("RefundPayment", order.id);
        }
        
        context.df.setCustomStatus("Order failed, compensation complete");
    }
    
    return result;
};

Step 5: Timer-Based Cleanup Entity

// entities/SessionManager.ts
interface SessionState {
    sessions: Map<string, { createdAt: string; lastAccessed: string }>;
}

export const sessionManager = function (context: EntityFunctionContext): void {
    const state = context.df.getState<SessionState>() || { sessions: new Map() };
    
    switch (context.df.operationName) {
        case "create":
            const sessionId = context.df.getInput<string>();
            state.sessions.set(sessionId, {
                createdAt: new Date().toISOString(),
                lastAccessed: new Date().toISOString()
            });
            context.df.return(sessionId);
            break;
            
        case "touch":
            const touchId = context.df.getInput<string>();
            const session = state.sessions.get(touchId);
            if (session) {
                session.lastAccessed = new Date().toISOString();
            }
            break;
            
        case "delete":
            const deleteId = context.df.getInput<string>();
            state.sessions.delete(deleteId);
            context.df.return(true);
            break;
            
        case "cleanup":
            const maxAge = context.df.getInput<number>() || 3600000; // 1 hour
            const now = Date.now();
            let cleaned = 0;
            
            for (const [id, session] of state.sessions) {
                const lastAccessed = new Date(session.lastAccessed).getTime();
                if (now - lastAccessed > maxAge) {
                    state.sessions.delete(id);
                    cleaned++;
                }
            }
            
            context.df.return({ cleaned, remaining: state.sessions.size });
            break;
            
        case "count":
            context.df.return(state.sessions.size);
            break;
    }
    
    context.df.setState(state);
};

// Timer trigger to cleanup old sessions
const cleanupTimer = function (context: Context, timer: Timer): void {
    const entityId = new EntityId("SessionManager", "main");
    
    context.df.signalEntity(entityId, "cleanup", 3600000); // 1 hour
};

Step 6: Fan-Out/Fan-In Pattern

// orchestrators/BatchProcessing.ts
const batchOrchestrator = function (context: DFOrchestrationContext): Promise<BatchResult> {
    const input = context.df.getInput<{ items: string[]; batchSize: number }>();
    const results: any[] = [];
    
    // Fan-Out: Split items into batches
    const batches = chunkArray(input.items, input.batchSize);
    const tasks: any[] = [];
    
    for (let i = 0; i < batches.length; i++) {
        const task = context.df.callActivity("ProcessBatch", {
            batchId: i,
            items: batches[i]
        });
        tasks.push(task);
    }
    
    // Fan-In: Wait for all to complete
    const batchResults = yield context.df.taskAll(tasks);
    
    // Aggregate results
    const totalProcessed = batchResults.reduce((sum, r) => sum + r.processed, 0);
    const totalFailed = batchResults.reduce((sum, r) => sum + r.failed, 0);
    
    return {
        totalBatches: batches.length,
        totalProcessed,
        totalFailed,
        results: batchResults
    };
};

function chunkArray<T>(array: T[], size: number): T[][] {
    const chunks: T[][] = [];
    for (let i = 0; i < array.length; i += size) {
        chunks.push(array.slice(i, i + size));
    }
    return chunks;
}

Summary