Service Bus — Handling Poison Messages and Problematic Payloads
The Problem
Your Service Bus queue processor keeps failing on certain messages:
- Messages with invalid JSON crash the processor
- Binary data in text-expected fields breaks parsing
- Messages with oversized payloads timeout
- Corrupted messages cause infinite retry loops
You need a robust strategy to handle these "poison messages" without losing good messages or getting stuck.
Understanding Poison Messages
┌─────────────────────────────────────────────────────────────────┐
│ Message Processing Flow │
└─────────────────────────────────────────────────────────────────┘
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Queue │────▶│ Processor │────▶│ Success │
└─────────────┘ └─────────────┘ └─────────────┘
│
│ Exception
▼
┌─────────────┐
│ Retry N │ (MaxDeliveryCount = 10)
│ times │
└─────────────┘
│
All retries failed
▼
┌─────────────┐
│ Dead Letter │ ← POISON MESSAGE
│ Queue │
└─────────────┘
Solution Implementation
Step 1: Configure Queue for Dead Lettering
public class QueueSetupService
{
public async Task CreateRobustQueueAsync(ServiceBusAdministrationClient adminClient)
{
var queueOptions = new CreateQueueOptions("order-processing")
{
// Enable dead letter queue
EnableDeadLetteringOnMessageExpiration = true,
EnableDeadLetteringOnMaxDeliveryCountExceeded = true,
// Maximum delivery attempts before DLQ
MaxDeliveryCount = 5,
// Message TTL - how long in queue
DefaultMessageTimeToLive = TimeSpan.FromDays(7),
// Lock duration - processing time
LockDuration = TimeSpan.FromMinutes(1),
// Enable duplicate detection for retries
RequiresDuplicateDetection = true,
DuplicateDetectionHistoryTimeWindow = TimeSpan.FromMinutes(10),
// Max message size (1MB default, max 256KB for Standard)
MaxSizeInMegabytes = 1024,
// Enable partition for throughput
EnablePartitioning = true,
// Auto-delete idle queue
AutoDeleteOnIdle = TimeSpan.FromDays(30)
};
await adminClient.CreateQueueAsync(queueOptions);
}
}
Step 2: Safe Message Processing
public class SafeOrderProcessor
{
private readonly ServiceBusProcessor _processor;
private readonly ILogger<SafeOrderProcessor> _logger;
public SafeOrderProcessor(ServiceBusClient client, ILogger<SafeOrderProcessor> logger)
{
_processor = client.CreateProcessor("order-processing", new ServiceBusProcessorOptions
{
MaxConcurrentCalls = 10,
AutoComplete = false,
PrefetchCount = 20,
MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(5)
});
_logger = logger;
}
public async Task StartProcessingAsync()
{
_processor.ProcessMessageAsync += HandleMessageAsync;
_processor.ProcessErrorAsync += HandleErrorAsync;
await _processor.StartProcessingAsync();
_logger.LogInformation("Safe order processor started");
}
private async Task HandleMessageAsync(ProcessMessageEventArgs args)
{
var message = args.Message;
try
{
// Step 1: Validate message properties first
ValidateMessageProperties(message);
// Step 2: Safe deserialization with try-catch
var order = SafeDeserialize<Order>(message.Body);
// Step 3: Business validation
ValidateOrder(order);
// Step 4: Process order
await ProcessOrderAsync(order);
// Step 5: Success - complete message
await args.CompleteMessageAsync(message);
_logger.LogInformation("Order {OrderId} processed successfully", order.Id);
}
catch (JsonException ex)
{
// Invalid JSON - send directly to DLQ (not worth retrying)
_logger.LogError(ex, "Invalid JSON in message {MessageId}", message.MessageId);
await args.DeadLetterAsync(message, "Invalid JSON payload", ex.Message);
}
catch (ValidationException ex)
{
// Business validation failed - retry might help
_logger.LogWarning(ex, "Order validation failed: {OrderId}", message.MessageId);
await args.AbandonMessageAsync(message, new Dictionary<string, object>
{
["validationError"] = ex.Message,
["retryCount"] = (int)(message.DeliveryCount + 1)
});
}
catch (BusinessException ex)
{
// Business error - might succeed on retry
_logger.LogError(ex, "Business error processing order");
await args.AbandonMessageAsync(message);
}
catch (Exception ex)
{
// Unknown error - let retry mechanism handle it
_logger.LogError(ex, "Unexpected error processing message {MessageId}", message.MessageId);
// Check if max delivery count reached
if (message.DeliveryCount >= 5)
{
_logger.LogWarning("Max delivery count reached, sending to DLQ");
await args.DeadLetterAsync(message, "Max retries exceeded", ex.ToString());
}
else
{
await args.AbandonMessageAsync(message);
}
}
}
private void ValidateMessageProperties(ServiceBusReceivedMessage message)
{
// Check content type
if (string.IsNullOrEmpty(message.ContentType))
{
throw new ValidationException("Missing content type");
}
if (!message.ContentType.Contains("json", StringComparison.OrdinalIgnoreCase))
{
throw new ValidationException($"Unsupported content type: {message.ContentType}");
}
// Check message size
if (message.Body.Length > 1024 * 1024) // 1MB limit
{
throw new ValidationException($"Message too large: {message.Body.Length} bytes");
}
}
private T SafeDeserialize<T>(BinaryData body)
{
try
{
return body.ToObjectFromJson<T>();
}
catch (JsonException ex)
{
_logger.LogWarning(ex, "Failed to deserialize message, attempting recovery");
// Try to clean the payload
var rawString = body.ToString();
if (rawString != null)
{
// Remove BOM, fix common encoding issues
var cleaned = rawString.Trim();
return JsonSerializer.Deserialize<T>(cleaned)
?? throw new JsonException("Deserialization returned null");
}
throw;
}
}
private void ValidateOrder(Order order)
{
if (string.IsNullOrEmpty(order.Id))
throw new ValidationException("Order ID is required");
if (order.TotalAmount <= 0)
throw new ValidationException("Order amount must be positive");
if (order.CustomerId == null)
throw new ValidationException("Customer ID is required");
}
private Task HandleErrorAsync(ProcessErrorEventArgs args)
{
_logger.LogError(args.Exception,
"Processor error. Entity: {Entity}, ErrorSource: {Source}",
args.EntityPath, args.ErrorSource);
return Task.CompletedTask;
}
}
Step 3: Handling Large Payloads
public class LargePayloadHandler
{
private const long MaxInlineSize = 64 * 1024; // 64KB
public async Task<Order> ProcessOrderWithLargePayloadAsync(ServiceBusReceivedMessage message)
{
// Check if payload is in body or needs blob fetch
if (message.Body.Length > MaxInlineSize)
{
_logger.LogInformation("Large payload detected, fetching from external storage");
return await FetchFromExternalStorageAsync(message);
}
return message.Body.ToObjectFromJson<Order>();
}
private async Task<Order> FetchFromExternalStorageAsync(ServiceBusReceivedMessage message)
{
// Get blob URL from message properties
var blobUrl = message.ApplicationProperties.GetValueOrDefault("PayloadBlobUrl") as string;
if (string.IsNullOrEmpty(blobUrl))
{
throw new ValidationException("Large payload without blob URL");
}
// Download and deserialize
var blobClient = new BlobClient(new Uri(blobUrl));
var response = await blobClient.DownloadAsync();
using var stream = response.Value.Content;
using var reader = new StreamReader(stream);
var json = await reader.ReadToEndAsync();
return JsonSerializer.Deserialize<Order>(json)
?? throw new InvalidOperationException("Failed to deserialize blob content");
}
}
Step 4: Processing Dead Letter Queue
public class DeadLetterProcessor
{
private readonly ServiceBusClient _client;
private readonly ILogger<DeadLetterProcessor> _logger;
public DeadLetterProcessor(ServiceBusClient client, ILogger<DeadLetterProcessor> logger)
{
_client = client;
_logger = logger;
}
public async Task ProcessDeadLettersAsync()
{
var receiver = _client.CreateReceiver("order-processing", new ServiceBusReceiverOptions
{
SubQueue = SubQueue.DeadLetter
});
while (true)
{
var messages = await receiver.ReceiveMessagesAsync(
maxMessages: 10,
maxWaitTime: TimeSpan.FromSeconds(5));
if (messages.Count == 0) break;
foreach (var message in messages)
{
await ProcessDeadLetterAsync(message);
}
}
}
private async Task ProcessDeadLetterAsync(ServiceBusReceivedMessage message)
{
var reason = message.DeadLetterReason;
var errorDescription = message.DeadLetterErrorDescription;
_logger.LogWarning("Processing DLQ message. Reason: {Reason}, Error: {Description}",
reason, errorDescription);
try
{
// Analyze the failure
var failureType = AnalyzeFailureType(message);
switch (failureType)
{
case FailureType.InvalidPayload:
await ArchiveAndNotifyAsync(message, "Invalid payload - needs manual review");
break;
case FailureType.TransientError:
// Could retry after fixing issue
await RetryAfterFixAsync(message);
break;
case FailureType.BusinessRule:
await ArchiveAndNotifyAsync(message, "Business rule violation");
break;
case FailureType.Timeout:
await RetryWithLongerTimeoutAsync(message);
break;
default:
await ArchiveForInvestigationAsync(message);
break;
}
// Complete the DLQ message
await receiver.CompleteMessageAsync(message);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to process DLQ message");
}
}
private FailureType AnalyzeFailureType(ServiceBusReceivedMessage message)
{
var reason = message.DeadLetterReason ?? "";
var description = message.DeadLetterErrorDescription ?? "";
if (reason.Contains("JsonException") || description.Contains("JSON"))
return FailureType.InvalidPayload;
if (description.Contains("timeout", StringComparison.OrdinalIgnoreCase))
return FailureType.Timeout;
if (description.Contains("validation", StringComparison.OrdinalIgnoreCase))
return FailureType.BusinessRule;
return FailureType.Unknown;
}
}
public enum FailureType
{
InvalidPayload,
TransientError,
BusinessRule,
Timeout,
Unknown
}
Step 5: Message Validation Pipeline
public class MessageValidationPipeline
{
public async Task<(bool IsValid, string? Error)> ValidateAsync(ServiceBusMessage message)
{
// 1. Check required properties
if (string.IsNullOrEmpty(message.MessageId))
return (false, "Message ID is required");
// 2. Check content type
if (!message.ContentType?.Contains("json", StringComparison.OrdinalIgnoreCase) ?? true)
return (false, $"Invalid content type: {message.ContentType}");
// 3. Check size
if (message.Body.Length > 1024 * 1024)
return (false, $"Message too large: {message.Body.Length} bytes");
// 4. Check required headers
var requiredHeaders = new[] { "x-correlation-id", "x-tenant-id" };
foreach (var header in requiredHeaders)
{
if (!message.ApplicationProperties.ContainsKey(header))
return (false, $"Missing required header: {header}");
}
// 5. Validate JSON structure
try
{
var json = message.Body.ToString();
JsonDocument.Parse(json);
}
catch (JsonException ex)
{
return (false, $"Invalid JSON: {ex.Message}");
}
return (true, null);
}
}
Testing Poison Message Handling
public class PoisonMessageTests
{
private readonly ServiceBusClient _client;
[Fact]
public async Task InvalidJson_SendsToDeadLetter()
{
// Arrange
var sender = _client.CreateSender("order-processing");
// Send malformed JSON
var message = new ServiceBusMessage("{ invalid json }")
{
ContentType = "application/json"
};
await sender.SendMessageAsync(message);
// Wait for processing
await Task.Delay(TimeSpan.FromSeconds(5));
// Assert - check dead letter queue
var dlqReceiver = _client.CreateReceiver("order-processing",
new ServiceBusReceiverOptions { SubQueue = SubQueue.DeadLetter });
var dlqMessages = await dlqReceiver.PeekMessagesAsync(10);
Assert.Single(dlqMessages);
Assert.Contains("Invalid JSON", dlqMessages[0].DeadLetterErrorDescription);
}
[Fact]
public async Task OversizedMessage_SendsToDeadLetter()
{
var sender = _client.CreateSender("order-processing");
// Create message larger than limit
var largePayload = new string('x', 2 * 1024 * 1024); // 2MB
var message = new ServiceBusMessage(largePayload);
await sender.SendMessageAsync(message);
await Task.Delay(TimeSpan.FromSeconds(5));
var dlqReceiver = _client.CreateReceiver("order-processing",
new ServiceBusReceiverOptions { SubQueue = SubQueue.DeadLetter });
var dlqMessages = await dlqReceiver.PeekMessagesAsync(10);
Assert.Single(dlqMessages);
}
}
Monitoring and Alerts
public class DeadLetterMonitor
{
public async Task CheckAndAlertAsync()
{
var adminClient = new ServiceBusAdministrationClient("connection-string");
var queueProperties = await adminClient.GetQueueAsync("order-processing");
var dlqPath = EntityNameFormatter.FormatDeadLetterQueuePath("order-processing");
var dlqProperties = await adminClient.GetQueueAsync(dlqPath);
if (dlqProperties.Value.ActiveMessageCount > 100)
{
// Send alert
await SendAlertEmailAsync(dlqProperties.Value.ActiveMessageCount);
}
}
}
Best Practices Summary
| Practice | Why |
|---|---|
| Set MaxDeliveryCount = 5 | Enough retries, not too many |
| Enable dead letter on expiration | Preserve messages that expire |
| Validate before processing | Fail fast on invalid messages |
| Log DLQ messages | Investigate and fix root cause |
| Monitor DLQ depth | Alert when messages pile up |
| Implement retry with backoff | Don't hammer failing services |
Summary
- Configure queues with proper dead letter settings
- Implement try-catch at every processing stage
- Validate messages before processing
- Move unrecoverable messages to DLQ quickly
- Monitor and process DLQ regularly to prevent message loss