Azure Service Bus — Message Deferral Pattern
Defer + Fetch by Sequence Number for Ordering
Introduction
Message deferral is a powerful pattern that allows you to temporarily remove a message from the active queue and retrieve it later by its sequence number. This pattern is essential when:
- Processing messages out of order is not allowed
- A message depends on another message being processed first
- You need to wait for external resources before processing
- Implementing complex workflows with dependencies
Unlike dead-letter queues which handle failed messages, deferral handles messages that are valid but cannot be processed yet.
How Message Deferral Works
The Flow
┌─────────┐ ┌─────────────┐ ┌───────────────┐ ┌─────────────┐
│ Message │───▶│ Processor │───▶│ Defer Message │───▶│ Deferred │
│ Received│ │ │ │ (LockToken) │ │ Queue │
└─────────┘ └─────────────┘ └───────────────┘ └─────────────┘
│
┌──────────────────────────────────────────────────┘
▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Fetch by │───▶│ Sequence │───▶│ Process │
│ Sequence# │ │ Number │ │ Deferred │
└─────────────┘ └─────────────┘ └─────────────┘
Key Points
- Deferred messages are hidden from the queue but not removed
- Sequence number is preserved for later retrieval
- Messages retain their properties including lock tokens
- No time limit on deferral - message stays until explicitly fetched
Basic Deferral Pattern
Scenario: Order Processing with Dependency
public class OrderProcessor
{
private readonly Dictionary<string, List<long>> _deferredMessages = new();
public async Task ProcessOrderMessageAsync(
ServiceBusReceivedMessage message,
ServiceBusMessageReceiver receiver,
ILogger logger)
{
var order = JsonSerializer.Deserialize<Order>(Encoding.UTF8.GetString(message.Body));
logger.LogInformation("Processing order {OrderId}, status: {Status}",
order.OrderId, order.Status);
// Check if order has dependencies
if (order.Status == "PendingInventory")
{
// Cannot process yet - need to check inventory first
logger.LogInformation("Order {OrderId} waiting for inventory check", order.OrderId);
// Store sequence number for later retrieval
await DeferMessageAsync(message, receiver, order);
return;
}
if (order.Status == "PendingPayment")
{
// Payment not confirmed yet
logger.LogInformation("Order {OrderId} waiting for payment", order.OrderId);
await DeferMessageAsync(message, receiver, order);
return;
}
// Ready to process - normal flow
await ProcessOrderAsync(order, receiver, message);
}
private async Task DeferMessageAsync(
ServiceBusReceivedMessage message,
ServiceBusMessageReceiver receiver,
Order order)
{
// Defer the message - it's hidden but preserved
await receiver.DeferAsync(message.LockToken, new Dictionary<string, object>
{
["orderId"] = order.OrderId,
["reason"] = $"Waiting for {order.Status}",
["originalTimestamp"] = message.EnqueuedTime.ToString("O"),
["deferredAt"] = DateTime.UtcNow.ToString("O")
});
logger.LogInformation("Message deferred for order {OrderId}, sequence: {Sequence}",
order.OrderId, message.SequenceNumber);
// Store the sequence number keyed by order ID for later retrieval
if (!_deferredMessages.ContainsKey(order.OrderId))
{
_deferredMessages[order.OrderId] = new List<long>();
}
_deferredMessages[order.OrderId].Add(message.SequenceNumber);
}
}
Fetching Deferred Messages
By Sequence Number
public class DeferredMessageHandler
{
private readonly ServiceBusClient _client;
private readonly ServiceBusReceiver _receiver;
public async Task<List<ServiceBusReceivedMessage>> FetchDeferredMessagesAsync(
string orderId,
List<long> sequenceNumbers,
ILogger logger)
{
var messages = new List<ServiceBusReceivedMessage>();
foreach (var sequenceNumber in sequenceNumbers)
{
try
{
// Receive by sequence number - returns null if not found
var message = await _receiver.ReceiveDeferredMessageAsync(
sequenceNumber,
CancellationToken.None);
if (message != null)
{
messages.Add(message);
logger.LogInformation("Fetched deferred message {Sequence} for order {OrderId}",
sequenceNumber, orderId);
}
}
catch (MessageNotFoundException)
{
logger.LogWarning("Deferred message not found: {Sequence}", sequenceNumber);
}
}
return messages;
}
}
Scheduled Retrieval Pattern
public class DeferredMessageScheduler
{
private readonly ServiceBusClient _client;
private readonly IRedisCache _cache;
public async Task ScheduleDeferredMessageRetrievalAsync(
string orderId,
List<long> sequenceNumbers,
DateTime processAfter)
{
// Store metadata for scheduled retrieval
var scheduleKey = $"sb:deferred:{orderId}";
await _cache.SetAsync(scheduleKey, JsonSerializer.Serialize(new
{
OrderId = orderId,
SequenceNumbers = sequenceNumbers,
ProcessAfter = processAfter
}), TimeSpan.FromDays(7));
// Schedule a timer to trigger retrieval
await ScheduleTimerAsync(processAfter, async () =>
{
await ProcessDeferredMessagesAsync(orderId);
});
}
public async Task ProcessDeferredMessagesAsync(string orderId)
{
// Get stored sequence numbers
var scheduleKey = $"sb:deferred:{orderId}";
var schedule = await _cache.GetAsync<DeferredSchedule>(scheduleKey);
if (schedule == null)
{
return;
}
// Fetch and process deferred messages
var messages = await FetchDeferredMessagesAsync(orderId, schedule.SequenceNumbers);
foreach (var message in messages)
{
await ProcessMessageAsync(message);
}
}
}
Real-World Use Cases
Use Case 1: Sequenced Processing
Messages must be processed in order, but arrive out of order:
public class SequencedOrderProcessor
{
private readonly Dictionary<int, ServiceBusReceivedMessage> _bufferedMessages = new();
private int _expectedSequence = 1;
public async Task ProcessSequencedMessageAsync(
ServiceBusReceivedMessage message,
ServiceBusMessageReceiver receiver)
{
var messageSequence = (int)message.SequenceNumber;
if (messageSequence == _expectedSequence)
{
// This is the message we expect
await ProcessAndAcknowledgeAsync(message);
_expectedSequence++;
// Check if we can now process buffered messages
await ProcessBufferedMessagesAsync(receiver);
}
else
{
// Out of order - buffer it
_bufferedMessages[messageSequence] = message;
await receiver.DeferAsync(message.LockToken);
}
}
private async Task ProcessBufferedMessagesAsync(ServiceBusMessageReceiver receiver)
{
while (_bufferedMessages.ContainsKey(_expectedSequence))
{
var bufferedMessage = _bufferedMessages[_expectedSequence];
_bufferedMessages.Remove(_expectedSequence);
await ProcessAndAcknowledgeAsync(bufferedMessage);
_expectedSequence++;
}
}
}
Use Case 2: External Dependency Wait
Wait for an external API response before processing:
public class ExternalDependencyProcessor
{
private readonly ServiceBusClient _client;
private readonly HttpClient _externalApi;
private readonly Dictionary<string, ServiceBusReceivedMessage> _pendingMessages = new();
public async Task InitiateExternalCallAsync(ServiceBusReceivedMessage message)
{
var order = JsonSerializer.Deserialize<Order>(Encoding.UTF8.GetString(message.Body));
// Initiate external verification call
var verificationResponse = await _externalApi.PostAsJsonAsync(
"https://verification-service.com/verify",
new { OrderId = order.OrderId, Amount = order.TotalAmount });
if (verificationResponse.IsSuccessStatusCode)
{
// Immediate success - process normally
await ProcessOrderAsync(order);
await CompleteMessageAsync(message);
}
else
{
// Store message for later processing
_pendingMessages[order.OrderId] = message;
// Defer the message
var receiver = _client.CreateReceiver("orders");
await receiver.DeferAsync(message.LockToken);
// Schedule retry after external service processes
await ScheduleCompletionAsync(order.OrderId);
}
}
public async Task CompleteExternalCallAsync(string orderId, bool success)
{
if (!_pendingMessages.ContainsKey(orderId))
{
return;
}
var message = _pendingMessages[orderId];
if (success)
{
// Process and complete
var order = JsonSerializer.Deserialize<Order>(Encoding.UTF8.GetString(message.Body));
await ProcessOrderAsync(order);
}
else
{
// Handle failure - move to DLQ or retry
await MoveToDeadLetterAsync(message, "External verification failed");
}
_pendingMessages.Remove(orderId);
}
}
Use Case 3: Batch Processing with Dependencies
Process batch items that depend on each other:
public class BatchProcessor
{
private readonly Dictionary<string, List<long>> _batchSequenceNumbers = new();
public async Task ProcessBatchItemAsync(
ServiceBusReceivedMessage message,
ServiceBusMessageReceiver receiver)
{
var batch = JsonSerializer.Deserialize<Batch>(Encoding.UTF8.GetString(message.Body));
// Check if all items in batch are complete
var allItemsComplete = await CheckBatchItemsCompleteAsync(batch.BatchId);
if (allItemsComplete)
{
// All items processed - can proceed
await ProcessBatchAsync(batch);
await receiver.CompleteAsync(message.LockToken);
}
else
{
// Store sequence number - will be retrieved when batch completes
if (!_batchSequenceNumbers.ContainsKey(batch.BatchId))
{
_batchSequenceNumbers[batch.BatchId] = new List<long>();
}
_batchSequenceNumbers[batch.BatchId].Add(message.SequenceNumber);
await receiver.DeferAsync(message.LockToken);
}
}
public async Task CompleteBatchAsync(string batchId)
{
if (!_batchSequenceNumbers.ContainsKey(batchId))
{
return;
}
var sequenceNumbers = _batchSequenceNumbers[batchId];
// Fetch all deferred messages for this batch
var receiver = _client.CreateReceiver("batches");
foreach (var sequenceNumber in sequenceNumbers)
{
var message = await receiver.ReceiveDeferredMessageAsync(sequenceNumber);
if (message != null)
{
var batch = JsonSerializer.Deserialize<Batch>(Encoding.UTF8.GetString(message.Body));
await ProcessBatchAsync(batch);
await receiver.CompleteAsync(message.LockToken);
}
}
_batchSequenceNumbers.Remove(batchId);
}
}
Implementation in Azure Functions
Function with Deferral
[FunctionName("OrderProcessor")]
public static async Task Run(
[ServiceBusTrigger("orders", Connection = "ServiceBusConnection")]
ServiceBusReceivedMessage message,
ServiceBusReceiver receiver,
ServiceBusClient client,
ILogger logger)
{
var order = JsonSerializer.Deserialize<Order>(Encoding.UTF8.GetString(message.Body));
if (order.Status == "PendingInventory")
{
// Defer message
await receiver.DeferAsync(message.LockToken);
logger.LogInformation("Order {OrderId} deferred, sequence: {Sequence}",
order.OrderId, message.SequenceNumber);
// Schedule a check in 5 minutes
await ScheduleInventoryCheckAsync(order.OrderId, message.SequenceNumber);
return;
}
// Process the order
await ProcessOrderAsync(order);
await receiver.CompleteAsync(message.LockToken);
}
[FunctionName("InventoryCheckProcessor")]
public static async Task RunInventoryCheck(
[ServiceBusTrigger("inventory-checks", Connection = "ServiceBusConnection")]
InventoryCheckRequest request,
ServiceBusClient client,
ILogger logger)
{
var inventoryAvailable = await CheckInventoryAsync(request.ProductIds);
if (inventoryAvailable)
{
// Inventory is now available - fetch deferred message
var receiver = client.CreateReceiver("orders");
var deferredMessage = await receiver.ReceiveDeferredMessageAsync(request.SequenceNumber);
if (deferredMessage != null)
{
var order = JsonSerializer.Deserialize<Order>(
Encoding.UTF8.GetString(deferredMessage.Body));
await ProcessOrderAsync(order);
await receiver.CompleteAsync(deferredMessage.LockToken);
}
}
else
{
// Still waiting - requeue the inventory check
await ScheduleInventoryCheckAsync(request.OrderId, request.SequenceNumber);
}
}
Best Practices
| Practice | Description |
|---|---|
| Store sequence numbers | Use Redis/database to track deferred messages |
| Set retrieval deadlines | Don't defer indefinitely without a plan |
| Implement cleanup | Remove old deferred messages from storage |
| Use unique keys | Key deferred messages by business ID, not just sequence |
| Handle not found | Sequence numbers can expire or be removed |
Common Pitfalls
- ❌ Deferring without storing sequence number
- ❌ Not implementing scheduled retrieval
- ❌ Forgetting to process deferred messages
- ❌ Deferring for too long (TTL issues)
- ❌ Not handling MessageNotFoundException
Azure Integration Hub - Advanced Level