Azure Service Bus — Message Deferral Pattern

Defer + Fetch by Sequence Number for Ordering


Introduction

Message deferral is a powerful pattern that allows you to temporarily remove a message from the active queue and retrieve it later by its sequence number. This pattern is essential when:

  • Processing messages out of order is not allowed
  • A message depends on another message being processed first
  • You need to wait for external resources before processing
  • Implementing complex workflows with dependencies

Unlike dead-letter queues which handle failed messages, deferral handles messages that are valid but cannot be processed yet.


How Message Deferral Works

The Flow

┌─────────┐    ┌─────────────┐    ┌───────────────┐    ┌─────────────┐
│ Message │───▶│ Processor   │───▶│ Defer Message │───▶│  Deferred   │
│ Received│    │             │    │ (LockToken)   │    │   Queue     │
└─────────┘    └─────────────┘    └───────────────┘    └─────────────┘
                                                            │
         ┌──────────────────────────────────────────────────┘
         ▼
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│ Fetch by    │───▶│ Sequence    │───▶│ Process     │
│ Sequence#   │    │ Number      │    │ Deferred    │
└─────────────┘    └─────────────┘    └─────────────┘

Key Points

  1. Deferred messages are hidden from the queue but not removed
  2. Sequence number is preserved for later retrieval
  3. Messages retain their properties including lock tokens
  4. No time limit on deferral - message stays until explicitly fetched

Basic Deferral Pattern

Scenario: Order Processing with Dependency

public class OrderProcessor
{
    private readonly Dictionary<string, List<long>> _deferredMessages = new();
    
    public async Task ProcessOrderMessageAsync(
        ServiceBusReceivedMessage message,
        ServiceBusMessageReceiver receiver,
        ILogger logger)
    {
        var order = JsonSerializer.Deserialize<Order>(Encoding.UTF8.GetString(message.Body));
        
        logger.LogInformation("Processing order {OrderId}, status: {Status}", 
            order.OrderId, order.Status);
        
        // Check if order has dependencies
        if (order.Status == "PendingInventory")
        {
            // Cannot process yet - need to check inventory first
            logger.LogInformation("Order {OrderId} waiting for inventory check", order.OrderId);
            
            // Store sequence number for later retrieval
            await DeferMessageAsync(message, receiver, order);
            return;
        }
        
        if (order.Status == "PendingPayment")
        {
            // Payment not confirmed yet
            logger.LogInformation("Order {OrderId} waiting for payment", order.OrderId);
            await DeferMessageAsync(message, receiver, order);
            return;
        }
        
        // Ready to process - normal flow
        await ProcessOrderAsync(order, receiver, message);
    }
    
    private async Task DeferMessageAsync(
        ServiceBusReceivedMessage message,
        ServiceBusMessageReceiver receiver,
        Order order)
    {
        // Defer the message - it's hidden but preserved
        await receiver.DeferAsync(message.LockToken, new Dictionary<string, object>
        {
            ["orderId"] = order.OrderId,
            ["reason"] = $"Waiting for {order.Status}",
            ["originalTimestamp"] = message.EnqueuedTime.ToString("O"),
            ["deferredAt"] = DateTime.UtcNow.ToString("O")
        });
        
        logger.LogInformation("Message deferred for order {OrderId}, sequence: {Sequence}", 
            order.OrderId, message.SequenceNumber);
        
        // Store the sequence number keyed by order ID for later retrieval
        if (!_deferredMessages.ContainsKey(order.OrderId))
        {
            _deferredMessages[order.OrderId] = new List<long>();
        }
        _deferredMessages[order.OrderId].Add(message.SequenceNumber);
    }
}

Fetching Deferred Messages

By Sequence Number

public class DeferredMessageHandler
{
    private readonly ServiceBusClient _client;
    private readonly ServiceBusReceiver _receiver;
    
    public async Task<List<ServiceBusReceivedMessage>> FetchDeferredMessagesAsync(
        string orderId,
        List<long> sequenceNumbers,
        ILogger logger)
    {
        var messages = new List<ServiceBusReceivedMessage>();
        
        foreach (var sequenceNumber in sequenceNumbers)
        {
            try
            {
                // Receive by sequence number - returns null if not found
                var message = await _receiver.ReceiveDeferredMessageAsync(
                    sequenceNumber,
                    CancellationToken.None);
                
                if (message != null)
                {
                    messages.Add(message);
                    logger.LogInformation("Fetched deferred message {Sequence} for order {OrderId}", 
                        sequenceNumber, orderId);
                }
            }
            catch (MessageNotFoundException)
            {
                logger.LogWarning("Deferred message not found: {Sequence}", sequenceNumber);
            }
        }
        
        return messages;
    }
}

Scheduled Retrieval Pattern

public class DeferredMessageScheduler
{
    private readonly ServiceBusClient _client;
    private readonly IRedisCache _cache;
    
    public async Task ScheduleDeferredMessageRetrievalAsync(
        string orderId,
        List<long> sequenceNumbers,
        DateTime processAfter)
    {
        // Store metadata for scheduled retrieval
        var scheduleKey = $"sb:deferred:{orderId}";
        
        await _cache.SetAsync(scheduleKey, JsonSerializer.Serialize(new
        {
            OrderId = orderId,
            SequenceNumbers = sequenceNumbers,
            ProcessAfter = processAfter
        }), TimeSpan.FromDays(7));
        
        // Schedule a timer to trigger retrieval
        await ScheduleTimerAsync(processAfter, async () =>
        {
            await ProcessDeferredMessagesAsync(orderId);
        });
    }
    
    public async Task ProcessDeferredMessagesAsync(string orderId)
    {
        // Get stored sequence numbers
        var scheduleKey = $"sb:deferred:{orderId}";
        var schedule = await _cache.GetAsync<DeferredSchedule>(scheduleKey);
        
        if (schedule == null)
        {
            return;
        }
        
        // Fetch and process deferred messages
        var messages = await FetchDeferredMessagesAsync(orderId, schedule.SequenceNumbers);
        
        foreach (var message in messages)
        {
            await ProcessMessageAsync(message);
        }
    }
}

Real-World Use Cases

Use Case 1: Sequenced Processing

Messages must be processed in order, but arrive out of order:

public class SequencedOrderProcessor
{
    private readonly Dictionary<int, ServiceBusReceivedMessage> _bufferedMessages = new();
    private int _expectedSequence = 1;
    
    public async Task ProcessSequencedMessageAsync(
        ServiceBusReceivedMessage message,
        ServiceBusMessageReceiver receiver)
    {
        var messageSequence = (int)message.SequenceNumber;
        
        if (messageSequence == _expectedSequence)
        {
            // This is the message we expect
            await ProcessAndAcknowledgeAsync(message);
            _expectedSequence++;
            
            // Check if we can now process buffered messages
            await ProcessBufferedMessagesAsync(receiver);
        }
        else
        {
            // Out of order - buffer it
            _bufferedMessages[messageSequence] = message;
            await receiver.DeferAsync(message.LockToken);
        }
    }
    
    private async Task ProcessBufferedMessagesAsync(ServiceBusMessageReceiver receiver)
    {
        while (_bufferedMessages.ContainsKey(_expectedSequence))
        {
            var bufferedMessage = _bufferedMessages[_expectedSequence];
            _bufferedMessages.Remove(_expectedSequence);
            
            await ProcessAndAcknowledgeAsync(bufferedMessage);
            _expectedSequence++;
        }
    }
}

Use Case 2: External Dependency Wait

Wait for an external API response before processing:

public class ExternalDependencyProcessor
{
    private readonly ServiceBusClient _client;
    private readonly HttpClient _externalApi;
    private readonly Dictionary<string, ServiceBusReceivedMessage> _pendingMessages = new();
    
    public async Task InitiateExternalCallAsync(ServiceBusReceivedMessage message)
    {
        var order = JsonSerializer.Deserialize<Order>(Encoding.UTF8.GetString(message.Body));
        
        // Initiate external verification call
        var verificationResponse = await _externalApi.PostAsJsonAsync(
            "https://verification-service.com/verify",
            new { OrderId = order.OrderId, Amount = order.TotalAmount });
        
        if (verificationResponse.IsSuccessStatusCode)
        {
            // Immediate success - process normally
            await ProcessOrderAsync(order);
            await CompleteMessageAsync(message);
        }
        else
        {
            // Store message for later processing
            _pendingMessages[order.OrderId] = message;
            
            // Defer the message
            var receiver = _client.CreateReceiver("orders");
            await receiver.DeferAsync(message.LockToken);
            
            // Schedule retry after external service processes
            await ScheduleCompletionAsync(order.OrderId);
        }
    }
    
    public async Task CompleteExternalCallAsync(string orderId, bool success)
    {
        if (!_pendingMessages.ContainsKey(orderId))
        {
            return;
        }
        
        var message = _pendingMessages[orderId];
        
        if (success)
        {
            // Process and complete
            var order = JsonSerializer.Deserialize<Order>(Encoding.UTF8.GetString(message.Body));
            await ProcessOrderAsync(order);
        }
        else
        {
            // Handle failure - move to DLQ or retry
            await MoveToDeadLetterAsync(message, "External verification failed");
        }
        
        _pendingMessages.Remove(orderId);
    }
}

Use Case 3: Batch Processing with Dependencies

Process batch items that depend on each other:

public class BatchProcessor
{
    private readonly Dictionary<string, List<long>> _batchSequenceNumbers = new();
    
    public async Task ProcessBatchItemAsync(
        ServiceBusReceivedMessage message,
        ServiceBusMessageReceiver receiver)
    {
        var batch = JsonSerializer.Deserialize<Batch>(Encoding.UTF8.GetString(message.Body));
        
        // Check if all items in batch are complete
        var allItemsComplete = await CheckBatchItemsCompleteAsync(batch.BatchId);
        
        if (allItemsComplete)
        {
            // All items processed - can proceed
            await ProcessBatchAsync(batch);
            await receiver.CompleteAsync(message.LockToken);
        }
        else
        {
            // Store sequence number - will be retrieved when batch completes
            if (!_batchSequenceNumbers.ContainsKey(batch.BatchId))
            {
                _batchSequenceNumbers[batch.BatchId] = new List<long>();
            }
            _batchSequenceNumbers[batch.BatchId].Add(message.SequenceNumber);
            
            await receiver.DeferAsync(message.LockToken);
        }
    }
    
    public async Task CompleteBatchAsync(string batchId)
    {
        if (!_batchSequenceNumbers.ContainsKey(batchId))
        {
            return;
        }
        
        var sequenceNumbers = _batchSequenceNumbers[batchId];
        
        // Fetch all deferred messages for this batch
        var receiver = _client.CreateReceiver("batches");
        
        foreach (var sequenceNumber in sequenceNumbers)
        {
            var message = await receiver.ReceiveDeferredMessageAsync(sequenceNumber);
            
            if (message != null)
            {
                var batch = JsonSerializer.Deserialize<Batch>(Encoding.UTF8.GetString(message.Body));
                await ProcessBatchAsync(batch);
                await receiver.CompleteAsync(message.LockToken);
            }
        }
        
        _batchSequenceNumbers.Remove(batchId);
    }
}

Implementation in Azure Functions

Function with Deferral

[FunctionName("OrderProcessor")]
public static async Task Run(
    [ServiceBusTrigger("orders", Connection = "ServiceBusConnection")] 
    ServiceBusReceivedMessage message,
    ServiceBusReceiver receiver,
    ServiceBusClient client,
    ILogger logger)
{
    var order = JsonSerializer.Deserialize<Order>(Encoding.UTF8.GetString(message.Body));
    
    if (order.Status == "PendingInventory")
    {
        // Defer message
        await receiver.DeferAsync(message.LockToken);
        
        logger.LogInformation("Order {OrderId} deferred, sequence: {Sequence}", 
            order.OrderId, message.SequenceNumber);
        
        // Schedule a check in 5 minutes
        await ScheduleInventoryCheckAsync(order.OrderId, message.SequenceNumber);
        return;
    }
    
    // Process the order
    await ProcessOrderAsync(order);
    await receiver.CompleteAsync(message.LockToken);
}

[FunctionName("InventoryCheckProcessor")]
public static async Task RunInventoryCheck(
    [ServiceBusTrigger("inventory-checks", Connection = "ServiceBusConnection")] 
    InventoryCheckRequest request,
    ServiceBusClient client,
    ILogger logger)
{
    var inventoryAvailable = await CheckInventoryAsync(request.ProductIds);
    
    if (inventoryAvailable)
    {
        // Inventory is now available - fetch deferred message
        var receiver = client.CreateReceiver("orders");
        var deferredMessage = await receiver.ReceiveDeferredMessageAsync(request.SequenceNumber);
        
        if (deferredMessage != null)
        {
            var order = JsonSerializer.Deserialize<Order>(
                Encoding.UTF8.GetString(deferredMessage.Body));
            
            await ProcessOrderAsync(order);
            await receiver.CompleteAsync(deferredMessage.LockToken);
        }
    }
    else
    {
        // Still waiting - requeue the inventory check
        await ScheduleInventoryCheckAsync(request.OrderId, request.SequenceNumber);
    }
}

Best Practices

PracticeDescription
Store sequence numbersUse Redis/database to track deferred messages
Set retrieval deadlinesDon't defer indefinitely without a plan
Implement cleanupRemove old deferred messages from storage
Use unique keysKey deferred messages by business ID, not just sequence
Handle not foundSequence numbers can expire or be removed

Common Pitfalls

  • ❌ Deferring without storing sequence number
  • ❌ Not implementing scheduled retrieval
  • ❌ Forgetting to process deferred messages
  • ❌ Deferring for too long (TTL issues)
  • ❌ Not handling MessageNotFoundException

Azure Integration Hub - Advanced Level