Durable Functions — Workflow Orchestration Patterns
Why Durable Functions?
Azure Durable Functions extend Azure Functions with stateful workflows. While regular Azure Functions are stateless and perfect for individual operations, real-world business processes often require:
- Multi-step workflows - Order processing, approval chains, data pipelines
- Long-running operations - Hours or days of processing, not seconds
- Human interactions - Waiting for approvals, reviews, input
- Reliability - Automatic retries, compensation on failures
- Visibility - Tracking progress, debugging failures
Without Durable Functions, you'd need to:
- Build your own state management (databases, queues)
- Handle complex retry logic manually
- Track workflow progress yourself
- Rebuild all of this for each workflow
Durable Functions provides this out of the box.
Understanding the Architecture
How Durable Functions Work
┌─────────────────────────────────────────────────────────────────────────────┐
│ Durable Functions Architecture │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────┐ ┌─────────────────────┐ ┌─────────────────┐
│ Client │────────▶│ Orchestrator │────────▶│ Activities │
│ (HTTP) │ │ Function │ │ Functions │
└─────────────┘ └──────────┬──────────┘ └─────────────────┘
│
▼
┌─────────────────────┐
│ Durable Task Hub │
│ (Azure Storage or │
│ Cosmos DB) │
└─────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ Orchestrator State (What Durable Functions tracks) │
├─────────────────────────────────────────────────────────────────────────────┤
│ - Current step in the workflow │
│ - Output from previous activities │
│ - Timers and external events │
│ - Sub-orchestrator results │
│ - When to retry, what to do on failure │
└─────────────────────────────────────────────────────────────────────────────┘
Key Components
Orchestrator Function - Defines the workflow logic, calls activities, manages state Activity Function - The actual work units (atomic, can be retried) Client Function - Entry point, starts orchestrators, queries status Durable Task Hub - Storage for workflow state (Azure Storage or Cosmos DB) Entity Functions - For stateful entities (counters, queues, etc.)
Step 1: Fan-Out/Fan-In Pattern
The most common pattern - process multiple items in parallel, then aggregate results.
When to Use Fan-Out/Fan-In
- Processing multiple records in parallel (bulk data processing)
- Calling multiple APIs simultaneously (aggregating data)
- Processing files in a batch
- Running calculations on multiple datasets
Implementation
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.DurableTask;
using Microsoft.Extensions.Logging;
public class ImageProcessingOrchestrator
{
private readonly ILogger<ImageProcessingOrchestrator> _logger;
public ImageProcessingOrchestrator(ILogger<ImageProcessingOrchestrator> logger)
{
_logger = logger;
}
[Function(nameof(ImageProcessingOrchestrator))]
public async Task<ImageProcessingResult> RunOrchestrator(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
_logger.LogInformation("Starting image processing workflow");
// Input: List of image URLs to process
var input = context.GetInput<ImageProcessingInput>();
if (input?.ImageUrls == null || !input.ImageUrls.Any())
{
throw new ArgumentException("No images to process");
}
var imageUrls = input.ImageUrls;
_logger.LogInformation("Processing {Count} images", imageUrls.Count);
// ============ FAN-OUT PHASE ============
// Start all image processing tasks in parallel
// Why parallel? Faster processing when items are independent
var processingTasks = imageUrls.Select(async url =>
{
try
{
// Call activity to process single image
// Each activity runs independently in parallel
var result = await context.CallActivityAsync<ImageProcessResult>(
nameof(ImageProcessingActivities.ProcessImage),
new ImageProcessInput
{
ImageUrl = url,
Options = input.Options
});
return result;
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to process image {Url}", url);
// Return a failure result instead of throwing
// This allows other images to continue processing
return new ImageProcessResult
{
ImageUrl = url,
Success = false,
ErrorMessage = ex.Message
};
}
});
// Wait for ALL tasks to complete (fan-in)
// Why Task.WhenAll? We need ALL results before continuing
// Alternative: Task.WhenAny for streaming results
var results = await Task.WhenAll(processingTasks);
// ============ FAN-IN PHASE ============
// Process and aggregate results
var successful = results.Count(r => r.Success);
var failed = results.Count(r => !r.Success);
_logger.LogInformation(
"Processing complete. Success: {Success}, Failed: {Failed}",
successful, failed);
return new ImageProcessingResult
{
TotalProcessed = imageUrls.Count,
Successful = successful,
Failed = failed,
ProcessedImages = results.ToList(),
ProcessedAt = DateTime.UtcNow
};
}
}
// Activity functions - the actual work
public class ImageProcessingActivities
{
private readonly ILogger<ImageProcessingActivities> _logger;
private readonly HttpClient _httpClient;
public ImageProcessingActivities(IHttpClientFactory httpClientFactory,
ILogger<ImageProcessingActivities> logger)
{
_httpClient = httpClientFactory.CreateClient();
_logger = logger;
}
[Function(nameof(ProcessImage))]
public async Task<ImageProcessResult> ProcessImage(
[ActivityTrigger] ImageProcessInput input)
{
_logger.LogInformation("Processing image: {Url}", input.ImageUrl);
// Step 1: Download image
var imageBytes = await DownloadImageAsync(input.ImageUrl);
// Step 2: Generate thumbnail
var thumbnailBytes = GenerateThumbnail(imageBytes, input.Options.ThumbnailSize);
// Step 3: Extract metadata
var metadata = ExtractMetadata(imageBytes);
// Step 4: Upload to storage
var thumbnailUrl = await UploadToStorageAsync(thumbnailBytes,
$"thumbnails/{Guid.NewGuid()}.jpg");
_logger.LogInformation("Successfully processed: {Url}", input.ImageUrl);
return new ImageProcessResult
{
ImageUrl = input.ImageUrl,
Success = true,
ThumbnailUrl = thumbnailUrl,
Metadata = metadata,
ProcessedAt = DateTime.UtcNow
};
}
private async Task<byte[]> DownloadImageAsync(string url)
{
var response = await _httpClient.GetAsync(url);
response.EnsureSuccessStatusCode();
return await response.Content.ReadAsByteArrayAsync();
}
private byte[] GenerateThumbnail(byte[] imageBytes, int size)
{
// Use ImageSharp, SkiaSharp, or similar library
// Simplified example
return imageBytes; // Actual implementation would resize
}
private ImageMetadata ExtractMetadata(byte[] imageBytes)
{
return new ImageMetadata
{
Width = 1920,
Height = 1080,
Format = "JPEG",
SizeBytes = imageBytes.Length
};
}
private async Task<string> UploadToStorageAsync(byte[] data, string blobName)
{
// Upload to Azure Blob Storage
await Task.Delay(100); // Simulate upload
return $"https://storage.blob.core.windows.net/images/{blobName}";
}
}
Why Fan-Out/Fan-In Matters
Without Fan-Out (Sequential):
┌─────────────────────────────────────────────────────────────────────────┐
│ Processing 10 images: │
│ │
│ [img1] ──────▶ │
│ [img2] ──────▶ │
│ [img3] ──────▶ │
│ ... │
│ │
│ Total time: 10 × 5s = 50 seconds │
└─────────────────────────────────────────────────────────────────────────┘
With Fan-Out (Parallel):
┌─────────────────────────────────────────────────────────────────────────┐
│ Processing 10 images simultaneously: │
│ │
│ [img1] ────▶ │
│ [img2] ────▶ │
│ [img3] ────▶ │
│ ... │
│ │
│ Total time: 5 seconds (limited by slowest item) │
└─────────────────────────────────────────────────────────────────────────┘
Speed improvement: 10x faster!
Handling Partial Failures
// Robust fan-out with granular error handling
[Function(nameof(RobustImageOrchestrator))]
public async Task<RobustProcessingResult> RobustImageOrchestrator(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var input = context.GetInput<ImageProcessingInput>();
var results = new List<ImageProcessResult>();
var errors = new List<string>();
// Process in batches to avoid overwhelming resources
var batchSize = 10;
var batches = input.ImageUrls
.Select((url, index) => new { url, index })
.GroupBy(x => x.index / batchSize)
.Select(g => g.Select(x => x.url).ToList());
foreach (var batch in batches)
{
// Process each batch in parallel
var batchTasks = batch.Select(async url =>
{
try
{
return await context.CallActivityAsync<ImageProcessResult>(
nameof(ImageProcessingActivities.ProcessImage), url);
}
catch (Exception ex)
{
// Log but don't fail entire workflow
return new ImageProcessResult
{
ImageUrl = url,
Success = false,
ErrorMessage = ex.Message
};
}
});
var batchResults = await Task.WhenAll(batchTasks);
results.AddRange(batchResults);
}
// Analyze results
var failed = results.Where(r => !r.Success).ToList();
if (failed.Any())
{
// Continue processing but report failures
_logger.LogWarning("{FailedCount} images failed out of {Total}",
failed.Count, results.Count);
}
return new RobustProcessingResult
{
Results = results,
FailedCount = failed.Count,
SucceededCount = results.Count - failed.Count
};
}
Step 2: Human Interaction Pattern
Workflows that require human approval, input, or decision-making.
When to Use Human Interaction
- Approval workflows (expense reports, purchase orders)
- Document review and sign-off
- Task assignment and completion verification
- Exception handling that needs human judgment
Implementation
// Client function - starts the approval workflow
public class ApprovalWorkflowClient
{
private readonly ILogger<ApprovalWorkflowClient> _logger;
private readonly DurableTaskClient _durableTaskClient;
public ApprovalWorkflowClient(
DurableTaskClient durableTaskClient,
ILogger<ApprovalWorkflowClient> logger)
{
_durableTaskClient = durableTaskClient;
_logger = logger;
}
[Function(nameof(StartExpenseApproval))]
public async Task<HttpResponseData> StartExpenseApproval(
[HttpTrigger(AuthorizationLevel.Function, "post", Route = "expenses/approve")]
HttpRequestData request)
{
// Parse expense request
var expenseRequest = await request.ReadFromJsonAsync<ExpenseRequest>();
if (expenseRequest == null)
{
return request.CreateResponse(HttpStatusCode.BadRequest,
new { error = "Invalid request body" });
}
// Validate expense
if (expenseRequest.Amount <= 0)
{
return request.CreateResponse(HttpStatusCode.BadRequest,
new { error = "Amount must be positive" });
}
// Start the orchestration
var instanceId = await _durableTaskClient.ScheduleNewOrchestration(
nameof(ExpenseApprovalOrchestrator),
expenseRequest);
_logger.LogInformation(
"Started expense approval workflow {InstanceId} for amount {Amount}",
instanceId, expenseRequest.Amount);
return request.CreateAcceptedResponse(new
{
instanceId = instanceId,
statusQueryUri = $"/runtime/webhooks/durabletask/instances/{instanceId}",
approvalUri = $"/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent"
});
}
}
// Orchestrator - handles approval workflow
public class ExpenseApprovalOrchestrator
{
private readonly ILogger<ExpenseApprovalOrchestrator> _logger;
public ExpenseApprovalOrchestrator(ILogger<ExpenseApprovalOrchestrator> logger)
{
_logger = logger;
}
[Function(nameof(ExpenseApprovalOrchestrator))]
public async Task<ApprovalResult> RunExpenseApproval(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var expense = context.GetInput<ExpenseRequest>();
_logger.LogInformation(
"Starting approval workflow for expense {ExpenseId}, Amount: {Amount}",
expense.ExpenseId, expense.Amount);
// Step 1: Create approval request and notify approvers
var approvalRequest = new ApprovalRequest
{
RequestId = expense.ExpenseId,
Title = $"Expense Report: {expense.Description}",
Description = $"Amount: {expense.Amount:C}\nDescription: {expense.Description}\nSubmitted by: {expense.SubmittedBy}",
RequestedBy = expense.SubmittedBy,
Amount = expense.Amount,
CreatedAt = DateTime.UtcNow,
RequiresApprovalFrom = DetermineApprover(expense.Amount),
DueBy = DateTime.UtcNow.AddDays(2) // 48 hours to respond
};
await context.CallActivityAsync(nameof(ApprovalActivities.CreateApprovalRequest),
approvalRequest);
// Step 2: Send notification to approver
await context.CallActivityAsync(nameof(ApprovalActivities.SendApprovalEmail),
new EmailNotification
{
To = approvalRequest.RequiresApprovalFrom,
Subject = $"Approval Required: {approvalRequest.Title}",
Body = $"Please review expense report: {approvalRequest.Description}"
});
// Step 3: Wait for approval (or rejection)
// This is where the function "pauses" until external event arrives
var timeout = DateTime.UtcNow.AddDays(2);
var approvalEventTask = context.WaitForExternalEvent<ApprovalResponse>(
"ApprovalEvent",
timeout);
// Also set up a reminder in case something goes wrong
var reminderTask = context.CreateTimer(timeout, CancellationToken.None);
// Wait for either approval OR timeout
var winner = await Task.WhenAny(approvalEventTask, reminderTask);
ApprovalResponse approval;
if (winner == approvalEventTask)
{
// Approval event received
approval = await approvalEventTask;
_logger.LogInformation("Approval received: {ApprovedBy}, Decision: {IsApproved}",
approval.ApprovedBy, approval.IsApproved);
}
else
{
// Timeout - auto-reject
_logger.LogWarning("Approval timeout for expense {ExpenseId}", expense.ExpenseId);
approval = new ApprovalResponse
{
IsApproved = false,
ApprovedBy = "System",
Comments = "Auto-rejected: Approval timeout",
RespondedAt = DateTime.UtcNow
};
}
// Step 4: Process the approval decision
if (approval.IsApproved)
{
// Approved - process the expense
await context.CallActivityAsync(nameof(ApprovalActivities.ProcessApprovedExpense),
new ProcessExpenseRequest
{
Expense = expense,
ApprovedBy = approval.ApprovedBy,
ApprovedAt = approval.RespondedAt
});
// Send confirmation to requester
await context.CallActivityAsync(nameof(ApprovalActivities.SendConfirmationEmail),
new EmailNotification
{
To = expense.SubmittedBy,
Subject = "Expense Approved",
Body = $"Your expense for {expense.Amount:C} has been approved!"
});
return new ApprovalResult
{
ExpenseId = expense.ExpenseId,
Status = ApprovalStatus.Approved,
ApprovedBy = approval.ApprovedBy,
ProcessedAt = DateTime.UtcNow
};
}
else
{
// Rejected - notify requester
await context.CallActivityAsync(nameof(ApprovalActivities.SendRejectionEmail),
new EmailNotification
{
To = expense.SubmittedBy,
Subject = "Expense Rejected",
Body = $"Your expense for {expense.Amount:C} was rejected. Reason: {approval.Comments}"
});
return new ApprovalResult
{
ExpenseId = expense.ExpenseId,
Status = ApprovalStatus.Rejected,
ApprovedBy = approval.ApprovedBy,
Comments = approval.Comments,
ProcessedAt = DateTime.UtcNow
};
}
}
private string DetermineApprover(decimal amount)
{
// Routing logic based on amount
if (amount < 100) return "manager@company.com";
if (amount < 1000) return "director@company.com";
return "vp@company.com";
}
}
// Activity functions
public class ApprovalActivities
{
private readonly ILogger<ApprovalActivities> _logger;
private readonly IApprovalRepository _approvalRepo;
private readonly IEmailService _emailService;
public ApprovalActivities(
IApprovalRepository approvalRepo,
IEmailService emailService,
ILogger<ApprovalActivities> logger)
{
_approvalRepo = approvalRepo;
_emailService = emailService;
_logger = logger;
}
[Function(nameof(CreateApprovalRequest))]
public async Task CreateApprovalRequest([ActivityTrigger] ApprovalRequest request)
{
_logger.LogInformation("Creating approval request {RequestId}", request.RequestId);
request.Status = ApprovalStatus.Pending;
await _approvalRepo.CreateAsync(request);
}
[Function(nameof(SendApprovalEmail))]
public async Task SendApprovalEmail([ActivityTrigger] EmailNotification notification)
{
_logger.LogInformation("Sending approval email to {To}", notification.To);
await _emailService.SendAsync(notification.To, notification.Subject, notification.Body);
}
[Function(nameof(SendConfirmationEmail))]
public async Task SendConfirmationEmail([ActivityTrigger] EmailNotification notification)
{
_logger.LogInformation("Sending confirmation email to {To}", notification.To);
await _emailService.SendAsync(notification.To, notification.Subject, notification.Body);
}
[Function(nameof(SendRejectionEmail))]
public async Task SendRejectionEmail([ActivityTrigger] EmailNotification notification)
{
_logger.LogInformation("Sending rejection email to {To}", notification.To);
await _emailService.SendAsync(notification.To, notification.Subject, notification.Body);
}
[Function(nameof(ProcessApprovedExpense))]
public async Task ProcessApprovedExpense([ActivityTrigger] ProcessExpenseRequest request)
{
_logger.LogInformation("Processing approved expense {ExpenseId}",
request.Expense.ExpenseId);
// Actually process the expense - payment, recording, etc.
await Task.Delay(500); // Simulate processing
}
}
How to Raise the Approval Event
// API to handle approval from external system (webhook, UI, etc.)
public class ApprovalEventsHandler
{
private readonly DurableTaskClient _durableTaskClient;
private readonly ILogger<ApprovalEventsHandler> _logger;
[Function(nameof(RaiseApprovalEvent))]
public async Task<HttpResponseData> RaiseApprovalEvent(
[HttpTrigger(AuthorizationLevel.Function, "post",
Route = "workflows/{instanceId}/approve")] HttpRequestData request,
string instanceId)
{
// Parse approval response
var approvalResponse = await request.ReadFromJsonAsync<ApprovalResponse>();
if (approvalResponse == null)
{
return request.CreateResponse(HttpStatusCode.BadRequest,
new { error = "Invalid approval response" });
}
// Validate the instance exists
var status = await _durableTaskClient.GetStatusAsync(instanceId);
if (status == null)
{
return request.CreateResponse(HttpStatusCode.NotFound,
new { error = "Workflow instance not found" });
}
if (status.RuntimeStatus != OrchestrationRuntimeStatus.Running)
{
return request.CreateResponse(HttpStatusCode.BadRequest,
new { error = $"Workflow is not running (status: {status.RuntimeStatus})" });
}
// Raise the approval event to the waiting orchestration
await _durableTaskClient.RaiseEventAsync(instanceId, "ApprovalEvent", approvalResponse);
_logger.LogInformation(
"Raised approval event for workflow {InstanceId}, Approved: {IsApproved}",
instanceId, approvalResponse.IsApproved);
return request.CreateAcceptedResponse(new
{
message = "Approval event raised successfully",
instanceId = instanceId
});
}
[Function(nameof(GetWorkflowStatus))]
public async Task<HttpResponseData> GetWorkflowStatus(
[HttpTrigger(AuthorizationLevel.Function, "get",
Route = "workflows/{instanceId}/status")] HttpRequestData request,
string instanceId)
{
var status = await _durableTaskClient.GetStatusAsync(instanceId);
if (status == null)
{
return request.CreateResponse(HttpStatusCode.NotFound,
new { error = "Instance not found" });
}
return request.CreateOkResponse(new
{
instanceId = instanceId,
status = status.RuntimeStatus.ToString(),
createdTime = status.CreatedAt,
lastUpdatedTime = status.LastUpdatedAt,
input = status.Input,
output = status.Output
});
}
}
Step 3: Chained Orchestrations
Sequential processing where each step depends on the previous.
When to Use Chained Orchestrations
- Data pipelines with dependencies
- Multi-step business processes
- Any workflow where step N needs output from step N-1
Implementation
// Order processing - sequential steps
public class OrderProcessingOrchestrator
{
[Function(nameof(OrderProcessingOrchestrator))]
public async Task<OrderProcessingResult> ProcessOrder(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var order = context.GetInput<Order>();
var result = new OrderProcessingResult
{
OrderId = order.OrderId,
StartTime = DateTime.UtcNow
};
try
{
// Step 1: Validate order
_logger.LogInformation("Step 1: Validating order {OrderId}", order.OrderId);
var validationResult = await context.CallActivityAsync<OrderValidationResult>(
nameof(OrderActivities.ValidateOrder), order);
if (!validationResult.IsValid)
{
result.Status = OrderStatus.Failed;
result.ErrorMessage = validationResult.ErrorMessage;
return result;
}
// Step 2: Check inventory
_logger.LogInformation("Step 2: Checking inventory for order {OrderId}", order.OrderId);
var inventoryResult = await context.CallActivityAsync<InventoryResult>(
nameof(OrderActivities.CheckInventory), order);
if (!inventoryResult.Available)
{
result.Status = OrderStatus.Backordered;
result.ErrorMessage = "Items not available";
return result;
}
// Step 3: Calculate pricing
_logger.LogInformation("Step 3: Calculating pricing for order {OrderId}", order.OrderId);
var pricing = await context.CallActivityAsync<PriceCalculation>(
nameof(OrderActivities.CalculatePricing), order);
order.FinalPrice = pricing.Total;
// Step 4: Process payment
_logger.LogInformation("Step 4: Processing payment for order {OrderId}", order.OrderId);
var paymentResult = await context.CallActivityAsync<PaymentResult>(
nameof(OrderActivities.ProcessPayment), new PaymentRequest
{
Order = order,
PaymentMethod = order.PaymentMethod
});
if (!paymentResult.Success)
{
result.Status = OrderStatus.PaymentFailed;
result.ErrorMessage = paymentResult.ErrorMessage;
return result;
}
// Step 5: Create shipping order
_logger.LogInformation("Step 5: Creating shipping for order {OrderId}", order.OrderId);
var shipping = await context.CallActivityAsync<ShippingInfo>(
nameof(OrderActivities.CreateShippingOrder), order);
// Step 6: Send confirmation
_logger.LogInformation("Step 6: Sending confirmation for order {OrderId}", order.OrderId);
await context.CallActivityAsync(nameof(OrderActivities.SendOrderConfirmation),
new OrderConfirmation
{
Order = order,
ShippingInfo = shipping
});
// All steps completed successfully
result.Status = OrderStatus.Completed;
result.ShippingInfo = shipping;
result.PaymentConfirmation = paymentResult.ConfirmationId;
result.CompletedAt = DateTime.UtcNow;
_logger.LogInformation("Order {OrderId} processed successfully", order.OrderId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to process order {OrderId}", order.OrderId);
// Compensating actions - rollback previous steps
await context.CallActivityAsync(nameof(OrderActivities.CancelPayment),
new CancelPaymentRequest
{
OrderId = order.OrderId,
Reason = $"Order processing failed: {ex.Message}"
});
result.Status = OrderStatus.Failed;
result.ErrorMessage = ex.Message;
}
return result;
}
}
Step 4: Custom Patterns and Advanced Usage
Monitor Pattern - Long-running background tasks
// Monitor that checks for completion repeatedly
public class OrderMonitorOrchestrator
{
[Function(nameof(OrderStatusMonitor))]
public async Task<MonitorResult> OrderStatusMonitor(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var orderId = context.GetInput<string>();
var startTime = DateTime.UtcNow;
var timeout = DateTime.UtcNow.AddHours(2);
var pollInterval = TimeSpan.FromMinutes(1);
_logger.LogInformation("Starting monitor for order {OrderId}", orderId);
while (DateTime.UtcNow < timeout)
{
// Check current order status
var status = await context.CallActivityAsync<OrderStatus>(
nameof(MonitorActivities.CheckOrderStatus), orderId);
if (status.Status == "Completed")
{
_logger.LogInformation("Order {OrderId} completed successfully", orderId);
return new MonitorResult
{
OrderId = orderId,
FinalStatus = status.Status,
CompletedAt = DateTime.UtcNow,
Success = true
};
}
if (status.Status == "Failed")
{
_logger.LogError("Order {OrderId} failed: {Error}", orderId, status.Error);
return new MonitorResult
{
OrderId = orderId,
FinalStatus = status.Status,
ErrorMessage = status.Error,
Success = false
};
}
// Not done yet - wait before next check
_logger.LogInformation("Order {OrderId} still processing, checking again in {Interval}",
orderId, pollInterval);
context.CreateTimer(pollInterval, CancellationToken.None);
}
// Timeout reached
_logger.LogWarning("Monitor timeout for order {OrderId}", orderId);
return new MonitorResult
{
OrderId = orderId,
FinalStatus = "Timeout",
ErrorMessage = "Order did not complete within timeout period",
Success = false
};
}
}
Sub-Orchestrations
// Parent orchestration that calls child orchestrations
public class ParentOrchestrator
{
[Function(nameof(ParentOrchestrator))]
public async Task<ParentResult> ParentOrchestrator(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var input = context.GetInput<ParentInput>();
// Run multiple child orchestrations in parallel
var inventoryTask = context.CallSubOrchestratorAsync<InventoryResult>(
nameof(ChildInventoryOrchestrator), input);
var pricingTask = context.CallSubOrchestratorAsync<PricingResult>(
nameof(ChildPricingOrchestrator), input);
var shippingTask = context.CallSubOrchestratorAsync<ShippingResult>(
nameof(ChildShippingOrchestrator), input);
// Wait for all to complete
await Task.WhenAll(inventoryTask, pricingTask, shippingTask);
var inventory = await inventoryTask;
var pricing = await pricingTask;
var shipping = await shippingTask;
// Combine results
return new ParentResult
{
Inventory = inventory,
Pricing = pricing,
Shipping = shipping
};
}
}
Step 5: Testing and Debugging
Unit Testing Orchestrations
// Test orchestrator logic without actual Durable Functions runtime
public class ImageProcessingOrchestratorTests
{
[Fact]
public async Task FanOut_FanIn_ProcessesAllImages()
{
// Arrange
var mockContext = new Mock<TaskOrchestrationContext>();
var testInput = new ImageProcessingInput
{
ImageUrls = new List<string> { "img1.jpg", "img2.jpg", "img3.jpg" },
Options = new ProcessingOptions { ThumbnailSize = 200 }
};
mockContext.Setup(c => c.GetInput<ImageProcessingInput>())
.Returns(testInput);
// Track activity calls
var activityCalls = new List<string>();
mockContext.Setup(c => c.CallActivityAsync<ImageProcessResult>(
It.IsAny<string>(), It.IsAny<ImageProcessInput>()))
.Returns(async (string name, object input) =>
{
activityCalls.Add(name);
// Return success result for testing
return new ImageProcessResult
{
ImageUrl = ((ImageProcessInput)input).ImageUrl,
Success = true
};
});
// Act
var orchestrator = new ImageProcessingOrchestrator(Mock.Of<ILogger<ImageProcessingOrchestrator>>());
var result = await orchestrator.RunOrchestrator(mockContext.Object);
// Assert
Assert.Equal(3, result.TotalProcessed);
Assert.Equal(3, result.Successful);
Assert.Equal(0, result.Failed);
}
[Fact]
public async Task Approval_WaitForEvent_ReturnsOnApproval()
{
// Arrange
var mockContext = new Mock<TaskOrchestrationContext>();
var input = new ExpenseRequest
{
ExpenseId = "EXP-001",
Amount = 500,
SubmittedBy = "user@company.com"
};
mockContext.Setup(c => c.GetInput<ExpenseRequest>()).Returns(input);
// Setup activity calls to not actually run
mockContext.Setup(c => c.CallActivityAsync(
It.IsAny<string>(), It.IsAny<object>()))
.Returns(Task.CompletedTask);
// Setup event waiting to return approved response
var approvalResponse = new ApprovalResponse
{
IsApproved = true,
ApprovedBy = "manager@company.com",
Comments = "Approved",
RespondedAt = DateTime.UtcNow
};
mockContext.Setup(c => c.WaitForExternalEvent<ApprovalResponse>(
It.IsAny<string>(), It.IsAny<DateTime>()))
.Returns(Task.FromResult(approvalResponse));
// Act
var orchestrator = new ExpenseApprovalOrchestrator(Mock.Of<ILogger<ExpenseApprovalOrchestrator>>());
var result = await orchestrator.RunExpenseApproval(mockContext.Object);
// Assert
Assert.Equal(ApprovalStatus.Approved, result.Status);
}
}
Debugging Running Orchestrations
// Query orchestration status and history
public class OrchestrationDebugger
{
private readonly DurableTaskClient _durableTaskClient;
public async Task<OrchestrationDebugInfo> GetDebugInfoAsync(string instanceId)
{
var status = await _durableTaskClient.GetStatusAsync(instanceId);
var debugInfo = new OrchestrationDebugInfo
{
InstanceId = instanceId,
RuntimeStatus = status.RuntimeStatus.ToString(),
CreatedAt = status.CreatedAt,
LastUpdatedTime = status.LastUpdatedAt,
Input = status.Input?.ToString(),
Output = status.Output?.ToString()
};
// Get custom status (if using Durable Functions 2.x+)
// var customStatus = await _durableTaskClient.GetStatusAsync(instanceId, true, true);
return debugInfo;
}
// Purge old completed instances (cleanup)
public async Task PurgeOldInstancesAsync(int daysOld = 30)
{
var cutoff = DateTimeOffset.UtcNow.AddDays(-daysOld);
// WARNING: This permanently deletes data
await _durableTaskClient.PurgeInstancesAsync(
DateTimeOffset.MinValue, // createdAfter
cutoff, // createdBefore
new List<OrchestrationRuntimeStatus>
{
OrchestrationRuntimeStatus.Completed,
OrchestrationRuntimeStatus.Failed,
OrchestrationRuntimeStatus.Canceled
});
}
}
Best Practices Summary
| Pattern | Use When | Key Benefit |
|---|---|---|
| Fan-Out/Fan-In | Independent parallel processing | 10x+ faster than sequential |
| Human Interaction | Need approvals/input | Pause until external event |
| Chained | Sequential dependent steps | Guaranteed order of execution |
| Monitor | Long-running background tasks | Polling with timeout |
| Sub-Orchestrator | Reusable workflow components | Code reuse, modularity |
Configuration Recommendations
{
"extensions": {
"durableTask": {
"hubName": "MyTaskHub",
"storageProvider": {
"type": "AzureStorage",
"connectionStringName": "AzureWebJobsStorage"
},
"trackingStore": {
"connectionStringName": "AzureWebJobsStorage",
"enableIfNotExists": true
},
"maxConcurrentActivityFunctions": 100,
"maxConcurrentOrchestratorFunctions": 50,
"extendedSessionEnabled": true
}
}
}
Conclusion
Durable Functions provide powerful patterns for building complex, reliable workflows in Azure:
- Fan-Out/Fan-In - Process items in parallel for massive speed improvements
- Human Interaction - Build approval workflows with wait-for-event
- Chained Orchestrations - Sequential processing with built-in state management
- Monitors - Background polling with timeout handling
Key takeaways:
- Use fan-out/fan-in whenever items can be processed independently
- Design for failure - implement compensating transactions
- Monitor long-running workflows - track instance status
- Test orchestrations thoroughly - use mocks to test logic
Azure Integration Hub - Functions