Durable Functions — Orchestrator & Activities

Stateful Workflows, Fan-out/Fan-in, Chaining


Introduction

Azure Durable Functions extend Azure Functions with stateful workflow capabilities. While standard Azure Functions are stateless, Durable Functions maintain workflow state across long-running operations, making them perfect for complex business processes that may span minutes, hours, or even days.

When to Use Durable Functions

  • Long-running workflows — Processes that take longer than the timeout limits of regular functions
  • Complex state management — Need to track progress across multiple steps
  • Human interaction — Workflows that require manual approval or intervention
  • Reliable execution — Must handle failures and resume from the last successful state

Architecture Overview

┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│   Orchestrator  │────▶│   Activity 1    │────▶│   Activity 2    │
│   (Control)     │     │   (Worker)      │     │   (Worker)      │
└─────────────────┘     └─────────────────┘     └─────────────────┘
        │
        ▼
┌─────────────────┐
│    Entity       │
│   (State)       │
└─────────────────┘

The orchestrator acts as the "brain" of the workflow, controlling execution order while the actual work happens in stateless activity functions.


Basic Chaining Pattern

The chaining pattern executes activities sequentially, where each activity's output feeds into the next:

[FunctionName("OrderProcessingOrchestration")]
public static async Task<OrderProcessingResult> OrderProcessingOrchestration(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    // Get input - the order ID to process
    string orderId = context.GetInput<string>();
    
    context.SetCustomStatus("Validating order");
    
    // Step 1: Validate Order - Call activity to validate
    var validationResult = await context.CallActivityAsync<ValidationResult>(
        "ValidateOrderActivity", 
        orderId);
    
    // If validation failed, stop here
    if (!validationResult.IsValid)
    {
        return new OrderProcessingResult 
        { 
            Success = false, 
            FailureReason = validationResult.FailureReason 
        };
    }
    
    context.SetCustomStatus("Processing payment");
    
    // Step 2: Process Payment - Call activity to process payment
    var paymentResult = await context.CallActivityAsync<PaymentResult>(
        "ProcessPaymentActivity", 
        new PaymentRequest 
        { 
            OrderId = orderId, 
            Amount = validationResult.TotalAmount 
        });
    
    if (!paymentResult.Success)
    {
        // Handle payment failure - perhaps notify customer
        await context.CallActivityAsync("NotifyPaymentFailure", orderId);
        return new OrderProcessingResult { Success = false, FailureReason = "Payment failed" };
    }
    
    context.SetCustomStatus("Updating inventory");
    
    // Step 3: Update Inventory
    await context.CallActivityAsync(
        "UpdateInventoryActivity", 
        validationResult.Items);
    
    context.SetCustomStatus("Initiating shipment");
    
    // Step 4: Ship Order - Call shipment service
    var shipmentInfo = await context.CallActivityAsync<ShipmentInfo>(
        "InitiateShipmentActivity", 
        orderId);
    
    context.SetCustomStatus("Sending notification");
    
    // Step 5: Send Notification
    await context.CallActivityAsync(
        "SendOrderConfirmationActivity", 
        new OrderConfirmation 
        { 
            OrderId = orderId,
            ShipmentId = shipmentInfo.TrackingNumber,
            EstimatedDelivery = shipmentInfo.EstimatedDelivery
        });
    
    // Workflow completed successfully
    return new OrderProcessingResult 
    { 
        Success = true, 
        OrderId = orderId,
        TrackingNumber = shipmentInfo.TrackingNumber
    };
}

Activity Functions

// Activity 1: Validate Order
[FunctionName("ValidateOrderActivity")]
public static async Task<ValidationResult> ValidateOrderActivity(
    [ActivityTrigger] string orderId,
    ILogger log)
{
    log.LogInformation("Validating order {OrderId}", orderId);
    
    // Fetch order from database
    var order = await _orderRepository.GetByIdAsync(orderId);
    
    // Validate order has items
    if (order.Items == null || !order.Items.Any())
    {
        return new ValidationResult { IsValid = false, FailureReason = "Order has no items" };
    }
    
    // Validate items are in stock
    var outOfStockItems = new List<string>();
    foreach (var item in order.Items)
    {
        var available = await _inventoryService.CheckAvailabilityAsync(item.ProductId);
        if (!available)
        {
            outOfStockItems.Add(item.ProductId);
        }
    }
    
    if (outOfStockItems.Any())
    {
        return new ValidationResult 
        { 
            IsValid = false, 
            FailureReason = $"Items out of stock: {string.Join(", ", outOfStockItems)}" 
        };
    }
    
    return new ValidationResult 
    { 
        IsValid = true, 
        TotalAmount = order.Total,
        Items = order.Items
    };
}

// Activity 2: Process Payment
[FunctionName("ProcessPaymentActivity")]
public static async Task<PaymentResult> ProcessPaymentActivity(
    [ActivityTrigger] PaymentRequest request,
    ILogger log)
{
    log.LogInformation("Processing payment for order {OrderId}, amount {Amount}", 
        request.OrderId, request.Amount);
    
    try
    {
        // Call payment gateway
        var paymentResponse = await _paymentGateway.ChargeAsync(
            request.OrderId,
            request.Amount);
        
        return new PaymentResult 
        { 
            Success = true, 
            TransactionId = paymentResponse.TransactionId 
        };
    }
    catch (PaymentException ex)
    {
        log.LogError(ex, "Payment failed for order {OrderId}", request.OrderId);
        return new PaymentResult { Success = false };
    }
}

Fan-out/Fan-in Pattern

The fan-out/fan-in pattern processes multiple items in parallel, then aggregates results:

[FunctionName("ProcessAllOrdersBatch")]
public static async Task<BatchProcessingResult> ProcessAllOrdersBatch(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    // Get list of orders to process
    var orders = context.GetInput<List<Order>>();
    
    log.LogInformation("Processing {Count} orders in parallel", orders.Count);
    
    // FAN-OUT: Start all order processing tasks in parallel
    // This launches multiple activity calls simultaneously
    var tasks = orders.Select(async order =>
    {
        try
        {
            // Call activity for each order
            var result = await context.CallActivityAsync<OrderProcessResult>(
                "ProcessSingleOrderActivity", 
                order);
            return result;
        }
        catch (Exception ex)
        {
            log.LogError(ex, "Failed to process order {OrderId}", order.OrderId);
            return new OrderProcessResult 
            { 
                OrderId = order.OrderId,
                Success = false,
                ErrorMessage = ex.Message
            };
        }
    });
    
    // Wait for ALL tasks to complete
    // This is where the "fan-in" happens
    var results = await Task.WhenAll(tasks);
    
    // Aggregate results
    var successfulCount = results.Count(r => r.Success);
    var failedCount = results.Count(r => !r.Success);
    var totalRevenue = results.Where(r => r.Success).Sum(r => r.Revenue);
    
    log.LogInformation("Batch complete. Success: {Success}, Failed: {Failed}, Revenue: {Revenue}",
        successfulCount, failedCount, totalRevenue);
    
    return new BatchProcessingResult
    {
        TotalProcessed = orders.Count,
        Successful = successfulCount,
        Failed = failedCount,
        TotalRevenue = totalRevenue,
        FailedOrders = results.Where(r => !r.Success).Select(r => r.OrderId).ToList()
    };
}

Real-World Example: Processing PDF Invoices

[FunctionName("ProcessInvoicesBatch")]
public static async Task<InvoiceProcessingResult> ProcessInvoicesBatch(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var invoiceIds = context.GetInput<List<string>>();
    
    // For each invoice, call OCR and processing in parallel
    var processingTasks = invoiceIds.Select(async invoiceId =>
    {
        // Step 1: Download PDF from blob storage
        var pdfContent = await context.CallActivityAsync<byte[]>(
            "DownloadInvoicePdf", invoiceId);
        
        // Step 2: Extract text using OCR
        var extractedText = await context.CallActivityAsync<string>(
            "ExtractTextFromPdf", pdfContent);
        
        // Step 3: Parse into structured data
        var invoiceData = await context.CallActivityAsync<InvoiceData>(
            "ParseInvoiceData", extractedText);
        
        // Step 4: Validate extracted data
        var validation = await context.CallActivityAsync<ValidationResult>(
            "ValidateInvoiceData", invoiceData);
        
        if (validation.IsValid)
        {
            // Step 5: Save to database
            await context.CallActivityAsync("SaveInvoiceToDatabase", invoiceData);
            
            return new InvoiceProcessResult 
            { 
                InvoiceId = invoiceId, 
                Success = true, 
                Amount = invoiceData.TotalAmount 
            };
        }
        
        // If validation failed, record the error
        await context.CallActivityAsync("LogInvoiceError", 
            new { InvoiceId = invoiceId, Errors = validation.Errors });
        
        return new InvoiceProcessResult 
        { 
            InvoiceId = invoiceId, 
            Success = false 
        };
    });
    
    var results = await Task.WhenAll(processingTasks);
    
    return new InvoiceProcessingResult
    {
        TotalInvoices = invoiceIds.Count,
        ProcessedSuccessfully = results.Count(r => r.Success),
        Failed = results.Count(r => !r.Success),
        TotalAmount = results.Where(r => r.Success).Sum(r => r.Amount)
    };
}

Orchestration Types

Function Chaining (Sequential)

Best for workflows where each step depends on the previous step's output:

// Sequential - each step waits for previous to complete
var result1 = await context.CallActivityAsync<T1>("Activity1", input);
var result2 = await context.CallActivityAsync<T2>("Activity2", result1);
var result3 = await context.CallActivityAsync<T3>("Activity3", result2);

Parallel Execution

Best when activities don't depend on each other:

// All activities start simultaneously
var task1 = context.CallActivityAsync<T1>("Activity1", input1);
var task2 = context.CallActivityAsync<T2>("Activity2", input2);
var task3 = context.CallActivityAsync<T3>("Activity3", input3);

var results = await Task.WhenAll(task1, task2, task3);

Async HTTP Pattern

Useful for calling external APIs that return a status endpoint:

[FunctionName("CallExternalApiOrchestration")]
public static async Task<ApiResponse> CallExternalApiOrchestration(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var request = context.GetInput<ApiRequest>();
    
    // Start the long-running operation
    var initialResponse = await context.CallActivityAsync<InitialApiResponse>(
        "StartLongRunningOperation", request);
    
    // Poll for completion (with timeout)
    var timeout = TimeSpan.FromMinutes(5);
    var deadline = context.CurrentUtcDateTime.Add(timeout);
    
    while (context.CurrentUtcDateTime < deadline)
    {
        var statusResponse = await context.CallActivityAsync<StatusResponse>(
            "CheckOperationStatus", initialResponse.OperationId);
        
        if (statusResponse.IsCompleted)
        {
            return new ApiResponse 
            { 
                Success = true, 
                Data = statusResponse.Result 
            };
        }
        
        // Wait 10 seconds before checking again
        await context.CreateTimer(deadline.AddSeconds(-10), CancellationToken.None);
    }
    
    return new ApiResponse { Success = false, ErrorMessage = "Operation timed out" };
}

Monitoring Orchestrations

Query Orchestration Status

var status = await client.GetStatusAsync(instanceId);
Console.WriteLine($"Status: {status.RuntimeStatus}");
Console.WriteLine($"Custom Status: {status.CustomStatus}");

Custom Status Updates

[FunctionName("LongWorkflow")]
public static async Task RunLongWorkflow(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    // Update custom status as progress changes
    context.SetCustomStatus(new { Phase = "Starting", Progress = 0 });
    
    await context.CallActivityAsync("Step1", context.GetInput());
    context.SetCustomStatus(new { Phase = "Step 1 Complete", Progress = 25 });
    
    await context.CallActivityAsync("Step2", context.GetInput());
    context.SetCustomStatus(new { Phase = "Step 2 Complete", Progress = 50 });
    
    await context.CallActivityAsync("Step3", context.GetInput());
    context.SetCustomStatus(new { Phase = "Complete", Progress = 100 });
}

Error Handling and Retry

Automatic Retry

[FunctionName("OrchestrationWithRetry")]
public static async Task RunWithRetry(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    // Configure retry options for activity calls
    var retryOptions = new RetryOptions(
        firstRetryInterval: TimeSpan.FromSeconds(5),
        maxNumberOfAttempts: 3,
        backoffCoefficient: 2);
    
    // Activity will automatically retry on failure
    var result = await context.CallActivityWithRetryAsync<ActivityResult>(
        "UnreliableActivity", 
        retryOptions, 
        context.GetInput());
}

Handling Failures

[FunctionName("OrchestrationWithErrorHandling")]
public static async Task RunWithErrorHandling(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    try
    {
        var result = await context.CallActivityAsync<ProcessResult>(
            "RiskyActivity", context.GetInput());
    }
    catch (Exception ex)
    {
        // Log the failure
        await context.CallActivityAsync("LogFailure", new 
        { 
            OrchestrationId = context.InstanceId,
            Error = ex.Message 
        });
        
        // Determine if we should retry or fail the workflow
        if (ex is TransientException)
        {
            // Could implement custom retry logic here
            throw;
        }
        
        // For non-recoverable errors, complete the workflow with failure status
        return new { Success = false, Error = ex.Message };
    }
}

Best Practices

PracticeDescription
Keep activities statelessActivities should not maintain state between calls
Use deterministic codeAvoid random numbers, DateTime.Now, external calls in orchestrator
Set custom statusProvide visibility into workflow progress
Configure timeoutsSet appropriate timeout values for long-running operations
Handle failures explicitlyPlan for failure scenarios in your workflow design
Use sub-orchestrationsBreak complex workflows into smaller, reusable orchestrations

Common Pitfalls to Avoid

  1. Calling external APIs directly from orchestrator — Always use activity functions
  2. Using non-deterministic code — Don't use DateTime.Now or Random in orchestrator
  3. Not setting timeouts — Long-running operations can get stuck indefinitely
  4. Ignoring failures — Always handle and log failures appropriately

Azure Integration Hub - Advanced Level Durable Functions Series - Part 1 of 3