Durable Functions — Orchestrator & Activities
Stateful Workflows, Fan-out/Fan-in, Chaining
Introduction
Azure Durable Functions extend Azure Functions with stateful workflow capabilities. While standard Azure Functions are stateless, Durable Functions maintain workflow state across long-running operations, making them perfect for complex business processes that may span minutes, hours, or even days.
When to Use Durable Functions
- Long-running workflows — Processes that take longer than the timeout limits of regular functions
- Complex state management — Need to track progress across multiple steps
- Human interaction — Workflows that require manual approval or intervention
- Reliable execution — Must handle failures and resume from the last successful state
Architecture Overview
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Orchestrator │────▶│ Activity 1 │────▶│ Activity 2 │
│ (Control) │ │ (Worker) │ │ (Worker) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐
│ Entity │
│ (State) │
└─────────────────┘
The orchestrator acts as the "brain" of the workflow, controlling execution order while the actual work happens in stateless activity functions.
Basic Chaining Pattern
The chaining pattern executes activities sequentially, where each activity's output feeds into the next:
[FunctionName("OrderProcessingOrchestration")]
public static async Task<OrderProcessingResult> OrderProcessingOrchestration(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
// Get input - the order ID to process
string orderId = context.GetInput<string>();
context.SetCustomStatus("Validating order");
// Step 1: Validate Order - Call activity to validate
var validationResult = await context.CallActivityAsync<ValidationResult>(
"ValidateOrderActivity",
orderId);
// If validation failed, stop here
if (!validationResult.IsValid)
{
return new OrderProcessingResult
{
Success = false,
FailureReason = validationResult.FailureReason
};
}
context.SetCustomStatus("Processing payment");
// Step 2: Process Payment - Call activity to process payment
var paymentResult = await context.CallActivityAsync<PaymentResult>(
"ProcessPaymentActivity",
new PaymentRequest
{
OrderId = orderId,
Amount = validationResult.TotalAmount
});
if (!paymentResult.Success)
{
// Handle payment failure - perhaps notify customer
await context.CallActivityAsync("NotifyPaymentFailure", orderId);
return new OrderProcessingResult { Success = false, FailureReason = "Payment failed" };
}
context.SetCustomStatus("Updating inventory");
// Step 3: Update Inventory
await context.CallActivityAsync(
"UpdateInventoryActivity",
validationResult.Items);
context.SetCustomStatus("Initiating shipment");
// Step 4: Ship Order - Call shipment service
var shipmentInfo = await context.CallActivityAsync<ShipmentInfo>(
"InitiateShipmentActivity",
orderId);
context.SetCustomStatus("Sending notification");
// Step 5: Send Notification
await context.CallActivityAsync(
"SendOrderConfirmationActivity",
new OrderConfirmation
{
OrderId = orderId,
ShipmentId = shipmentInfo.TrackingNumber,
EstimatedDelivery = shipmentInfo.EstimatedDelivery
});
// Workflow completed successfully
return new OrderProcessingResult
{
Success = true,
OrderId = orderId,
TrackingNumber = shipmentInfo.TrackingNumber
};
}
Activity Functions
// Activity 1: Validate Order
[FunctionName("ValidateOrderActivity")]
public static async Task<ValidationResult> ValidateOrderActivity(
[ActivityTrigger] string orderId,
ILogger log)
{
log.LogInformation("Validating order {OrderId}", orderId);
// Fetch order from database
var order = await _orderRepository.GetByIdAsync(orderId);
// Validate order has items
if (order.Items == null || !order.Items.Any())
{
return new ValidationResult { IsValid = false, FailureReason = "Order has no items" };
}
// Validate items are in stock
var outOfStockItems = new List<string>();
foreach (var item in order.Items)
{
var available = await _inventoryService.CheckAvailabilityAsync(item.ProductId);
if (!available)
{
outOfStockItems.Add(item.ProductId);
}
}
if (outOfStockItems.Any())
{
return new ValidationResult
{
IsValid = false,
FailureReason = $"Items out of stock: {string.Join(", ", outOfStockItems)}"
};
}
return new ValidationResult
{
IsValid = true,
TotalAmount = order.Total,
Items = order.Items
};
}
// Activity 2: Process Payment
[FunctionName("ProcessPaymentActivity")]
public static async Task<PaymentResult> ProcessPaymentActivity(
[ActivityTrigger] PaymentRequest request,
ILogger log)
{
log.LogInformation("Processing payment for order {OrderId}, amount {Amount}",
request.OrderId, request.Amount);
try
{
// Call payment gateway
var paymentResponse = await _paymentGateway.ChargeAsync(
request.OrderId,
request.Amount);
return new PaymentResult
{
Success = true,
TransactionId = paymentResponse.TransactionId
};
}
catch (PaymentException ex)
{
log.LogError(ex, "Payment failed for order {OrderId}", request.OrderId);
return new PaymentResult { Success = false };
}
}
Fan-out/Fan-in Pattern
The fan-out/fan-in pattern processes multiple items in parallel, then aggregates results:
[FunctionName("ProcessAllOrdersBatch")]
public static async Task<BatchProcessingResult> ProcessAllOrdersBatch(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
// Get list of orders to process
var orders = context.GetInput<List<Order>>();
log.LogInformation("Processing {Count} orders in parallel", orders.Count);
// FAN-OUT: Start all order processing tasks in parallel
// This launches multiple activity calls simultaneously
var tasks = orders.Select(async order =>
{
try
{
// Call activity for each order
var result = await context.CallActivityAsync<OrderProcessResult>(
"ProcessSingleOrderActivity",
order);
return result;
}
catch (Exception ex)
{
log.LogError(ex, "Failed to process order {OrderId}", order.OrderId);
return new OrderProcessResult
{
OrderId = order.OrderId,
Success = false,
ErrorMessage = ex.Message
};
}
});
// Wait for ALL tasks to complete
// This is where the "fan-in" happens
var results = await Task.WhenAll(tasks);
// Aggregate results
var successfulCount = results.Count(r => r.Success);
var failedCount = results.Count(r => !r.Success);
var totalRevenue = results.Where(r => r.Success).Sum(r => r.Revenue);
log.LogInformation("Batch complete. Success: {Success}, Failed: {Failed}, Revenue: {Revenue}",
successfulCount, failedCount, totalRevenue);
return new BatchProcessingResult
{
TotalProcessed = orders.Count,
Successful = successfulCount,
Failed = failedCount,
TotalRevenue = totalRevenue,
FailedOrders = results.Where(r => !r.Success).Select(r => r.OrderId).ToList()
};
}
Real-World Example: Processing PDF Invoices
[FunctionName("ProcessInvoicesBatch")]
public static async Task<InvoiceProcessingResult> ProcessInvoicesBatch(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var invoiceIds = context.GetInput<List<string>>();
// For each invoice, call OCR and processing in parallel
var processingTasks = invoiceIds.Select(async invoiceId =>
{
// Step 1: Download PDF from blob storage
var pdfContent = await context.CallActivityAsync<byte[]>(
"DownloadInvoicePdf", invoiceId);
// Step 2: Extract text using OCR
var extractedText = await context.CallActivityAsync<string>(
"ExtractTextFromPdf", pdfContent);
// Step 3: Parse into structured data
var invoiceData = await context.CallActivityAsync<InvoiceData>(
"ParseInvoiceData", extractedText);
// Step 4: Validate extracted data
var validation = await context.CallActivityAsync<ValidationResult>(
"ValidateInvoiceData", invoiceData);
if (validation.IsValid)
{
// Step 5: Save to database
await context.CallActivityAsync("SaveInvoiceToDatabase", invoiceData);
return new InvoiceProcessResult
{
InvoiceId = invoiceId,
Success = true,
Amount = invoiceData.TotalAmount
};
}
// If validation failed, record the error
await context.CallActivityAsync("LogInvoiceError",
new { InvoiceId = invoiceId, Errors = validation.Errors });
return new InvoiceProcessResult
{
InvoiceId = invoiceId,
Success = false
};
});
var results = await Task.WhenAll(processingTasks);
return new InvoiceProcessingResult
{
TotalInvoices = invoiceIds.Count,
ProcessedSuccessfully = results.Count(r => r.Success),
Failed = results.Count(r => !r.Success),
TotalAmount = results.Where(r => r.Success).Sum(r => r.Amount)
};
}
Orchestration Types
Function Chaining (Sequential)
Best for workflows where each step depends on the previous step's output:
// Sequential - each step waits for previous to complete
var result1 = await context.CallActivityAsync<T1>("Activity1", input);
var result2 = await context.CallActivityAsync<T2>("Activity2", result1);
var result3 = await context.CallActivityAsync<T3>("Activity3", result2);
Parallel Execution
Best when activities don't depend on each other:
// All activities start simultaneously
var task1 = context.CallActivityAsync<T1>("Activity1", input1);
var task2 = context.CallActivityAsync<T2>("Activity2", input2);
var task3 = context.CallActivityAsync<T3>("Activity3", input3);
var results = await Task.WhenAll(task1, task2, task3);
Async HTTP Pattern
Useful for calling external APIs that return a status endpoint:
[FunctionName("CallExternalApiOrchestration")]
public static async Task<ApiResponse> CallExternalApiOrchestration(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var request = context.GetInput<ApiRequest>();
// Start the long-running operation
var initialResponse = await context.CallActivityAsync<InitialApiResponse>(
"StartLongRunningOperation", request);
// Poll for completion (with timeout)
var timeout = TimeSpan.FromMinutes(5);
var deadline = context.CurrentUtcDateTime.Add(timeout);
while (context.CurrentUtcDateTime < deadline)
{
var statusResponse = await context.CallActivityAsync<StatusResponse>(
"CheckOperationStatus", initialResponse.OperationId);
if (statusResponse.IsCompleted)
{
return new ApiResponse
{
Success = true,
Data = statusResponse.Result
};
}
// Wait 10 seconds before checking again
await context.CreateTimer(deadline.AddSeconds(-10), CancellationToken.None);
}
return new ApiResponse { Success = false, ErrorMessage = "Operation timed out" };
}
Monitoring Orchestrations
Query Orchestration Status
var status = await client.GetStatusAsync(instanceId);
Console.WriteLine($"Status: {status.RuntimeStatus}");
Console.WriteLine($"Custom Status: {status.CustomStatus}");
Custom Status Updates
[FunctionName("LongWorkflow")]
public static async Task RunLongWorkflow(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
// Update custom status as progress changes
context.SetCustomStatus(new { Phase = "Starting", Progress = 0 });
await context.CallActivityAsync("Step1", context.GetInput());
context.SetCustomStatus(new { Phase = "Step 1 Complete", Progress = 25 });
await context.CallActivityAsync("Step2", context.GetInput());
context.SetCustomStatus(new { Phase = "Step 2 Complete", Progress = 50 });
await context.CallActivityAsync("Step3", context.GetInput());
context.SetCustomStatus(new { Phase = "Complete", Progress = 100 });
}
Error Handling and Retry
Automatic Retry
[FunctionName("OrchestrationWithRetry")]
public static async Task RunWithRetry(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
// Configure retry options for activity calls
var retryOptions = new RetryOptions(
firstRetryInterval: TimeSpan.FromSeconds(5),
maxNumberOfAttempts: 3,
backoffCoefficient: 2);
// Activity will automatically retry on failure
var result = await context.CallActivityWithRetryAsync<ActivityResult>(
"UnreliableActivity",
retryOptions,
context.GetInput());
}
Handling Failures
[FunctionName("OrchestrationWithErrorHandling")]
public static async Task RunWithErrorHandling(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
try
{
var result = await context.CallActivityAsync<ProcessResult>(
"RiskyActivity", context.GetInput());
}
catch (Exception ex)
{
// Log the failure
await context.CallActivityAsync("LogFailure", new
{
OrchestrationId = context.InstanceId,
Error = ex.Message
});
// Determine if we should retry or fail the workflow
if (ex is TransientException)
{
// Could implement custom retry logic here
throw;
}
// For non-recoverable errors, complete the workflow with failure status
return new { Success = false, Error = ex.Message };
}
}
Best Practices
| Practice | Description |
|---|---|
| Keep activities stateless | Activities should not maintain state between calls |
| Use deterministic code | Avoid random numbers, DateTime.Now, external calls in orchestrator |
| Set custom status | Provide visibility into workflow progress |
| Configure timeouts | Set appropriate timeout values for long-running operations |
| Handle failures explicitly | Plan for failure scenarios in your workflow design |
| Use sub-orchestrations | Break complex workflows into smaller, reusable orchestrations |
Common Pitfalls to Avoid
- Calling external APIs directly from orchestrator — Always use activity functions
- Using non-deterministic code — Don't use
DateTime.NoworRandomin orchestrator - Not setting timeouts — Long-running operations can get stuck indefinitely
- Ignoring failures — Always handle and log failures appropriately
Azure Integration Hub - Advanced Level Durable Functions Series - Part 1 of 3