← Back to ArticlesService Bus

Service Bus — Handling Poison Messages and Problematic Payloads

Complete guide to handling poison messages, malformed payloads, and message processing failures in Azure Service Bus.

Service Bus — Handling Poison Messages and Problematic Payloads

The Problem

Your Service Bus queue processor keeps failing on certain messages:

You need a robust strategy to handle these "poison messages" without losing good messages or getting stuck.

Understanding Poison Messages

┌─────────────────────────────────────────────────────────────────┐
│                    Message Processing Flow                      │
└─────────────────────────────────────────────────────────────────┘

  ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
  │   Queue     │────▶│  Processor  │────▶│   Success   │
  └─────────────┘     └─────────────┘     └─────────────┘
                            │
                            │ Exception
                            ▼
                     ┌─────────────┐
                     │  Retry N    │ (MaxDeliveryCount = 10)
                     │  times      │
                     └─────────────┘
                            │
                   All retries failed
                            ▼
                     ┌─────────────┐
                     │ Dead Letter │ ← POISON MESSAGE
                     │   Queue     │
                     └─────────────┘

Solution Implementation

Step 1: Configure Queue for Dead Lettering

public class QueueSetupService
{
    public async Task CreateRobustQueueAsync(ServiceBusAdministrationClient adminClient)
    {
        var queueOptions = new CreateQueueOptions("order-processing")
        {
            // Enable dead letter queue
            EnableDeadLetteringOnMessageExpiration = true,
            EnableDeadLetteringOnMaxDeliveryCountExceeded = true,
            
            // Maximum delivery attempts before DLQ
            MaxDeliveryCount = 5,
            
            // Message TTL - how long in queue
            DefaultMessageTimeToLive = TimeSpan.FromDays(7),
            
            // Lock duration - processing time
            LockDuration = TimeSpan.FromMinutes(1),
            
            // Enable duplicate detection for retries
            RequiresDuplicateDetection = true,
            DuplicateDetectionHistoryTimeWindow = TimeSpan.FromMinutes(10),
            
            // Max message size (1MB default, max 256KB for Standard)
            MaxSizeInMegabytes = 1024,
            
            // Enable partition for throughput
            EnablePartitioning = true,
            
            // Auto-delete idle queue
            AutoDeleteOnIdle = TimeSpan.FromDays(30)
        };

        await adminClient.CreateQueueAsync(queueOptions);
    }
}

Step 2: Safe Message Processing

public class SafeOrderProcessor
{
    private readonly ServiceBusProcessor _processor;
    private readonly ILogger<SafeOrderProcessor> _logger;

    public SafeOrderProcessor(ServiceBusClient client, ILogger<SafeOrderProcessor> logger)
    {
        _processor = client.CreateProcessor("order-processing", new ServiceBusProcessorOptions
        {
            MaxConcurrentCalls = 10,
            AutoComplete = false,
            PrefetchCount = 20,
            MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(5)
        });
        
        _logger = logger;
    }

    public async Task StartProcessingAsync()
    {
        _processor.ProcessMessageAsync += HandleMessageAsync;
        _processor.ProcessErrorAsync += HandleErrorAsync;
        
        await _processor.StartProcessingAsync();
        _logger.LogInformation("Safe order processor started");
    }

    private async Task HandleMessageAsync(ProcessMessageEventArgs args)
    {
        var message = args.Message;
        
        try
        {
            // Step 1: Validate message properties first
            ValidateMessageProperties(message);
            
            // Step 2: Safe deserialization with try-catch
            var order = SafeDeserialize<Order>(message.Body);
            
            // Step 3: Business validation
            ValidateOrder(order);
            
            // Step 4: Process order
            await ProcessOrderAsync(order);
            
            // Step 5: Success - complete message
            await args.CompleteMessageAsync(message);
            
            _logger.LogInformation("Order {OrderId} processed successfully", order.Id);
        }
        catch (JsonException ex)
        {
            // Invalid JSON - send directly to DLQ (not worth retrying)
            _logger.LogError(ex, "Invalid JSON in message {MessageId}", message.MessageId);
            await args.DeadLetterAsync(message, "Invalid JSON payload", ex.Message);
        }
        catch (ValidationException ex)
        {
            // Business validation failed - retry might help
            _logger.LogWarning(ex, "Order validation failed: {OrderId}", message.MessageId);
            await args.AbandonMessageAsync(message, new Dictionary<string, object>
            {
                ["validationError"] = ex.Message,
                ["retryCount"] = (int)(message.DeliveryCount + 1)
            });
        }
        catch (BusinessException ex)
        {
            // Business error - might succeed on retry
            _logger.LogError(ex, "Business error processing order");
            await args.AbandonMessageAsync(message);
        }
        catch (Exception ex)
        {
            // Unknown error - let retry mechanism handle it
            _logger.LogError(ex, "Unexpected error processing message {MessageId}", message.MessageId);
            
            // Check if max delivery count reached
            if (message.DeliveryCount >= 5)
            {
                _logger.LogWarning("Max delivery count reached, sending to DLQ");
                await args.DeadLetterAsync(message, "Max retries exceeded", ex.ToString());
            }
            else
            {
                await args.AbandonMessageAsync(message);
            }
        }
    }

    private void ValidateMessageProperties(ServiceBusReceivedMessage message)
    {
        // Check content type
        if (string.IsNullOrEmpty(message.ContentType))
        {
            throw new ValidationException("Missing content type");
        }

        if (!message.ContentType.Contains("json", StringComparison.OrdinalIgnoreCase))
        {
            throw new ValidationException($"Unsupported content type: {message.ContentType}");
        }

        // Check message size
        if (message.Body.Length > 1024 * 1024) // 1MB limit
        {
            throw new ValidationException($"Message too large: {message.Body.Length} bytes");
        }
    }

    private T SafeDeserialize<T>(BinaryData body)
    {
        try
        {
            return body.ToObjectFromJson<T>();
        }
        catch (JsonException ex)
        {
            _logger.LogWarning(ex, "Failed to deserialize message, attempting recovery");
            
            // Try to clean the payload
            var rawString = body.ToString();
            if (rawString != null)
            {
                // Remove BOM, fix common encoding issues
                var cleaned = rawString.Trim();
                return JsonSerializer.Deserialize<T>(cleaned) 
                    ?? throw new JsonException("Deserialization returned null");
            }
            
            throw;
        }
    }

    private void ValidateOrder(Order order)
    {
        if (string.IsNullOrEmpty(order.Id))
            throw new ValidationException("Order ID is required");
            
        if (order.TotalAmount <= 0)
            throw new ValidationException("Order amount must be positive");
            
        if (order.CustomerId == null)
            throw new ValidationException("Customer ID is required");
    }

    private Task HandleErrorAsync(ProcessErrorEventArgs args)
    {
        _logger.LogError(args.Exception, 
            "Processor error. Entity: {Entity}, ErrorSource: {Source}",
            args.EntityPath, args.ErrorSource);
            
        return Task.CompletedTask;
    }
}

Step 3: Handling Large Payloads

public class LargePayloadHandler
{
    private const long MaxInlineSize = 64 * 1024; // 64KB
    
    public async Task<Order> ProcessOrderWithLargePayloadAsync(ServiceBusReceivedMessage message)
    {
        // Check if payload is in body or needs blob fetch
        if (message.Body.Length > MaxInlineSize)
        {
            _logger.LogInformation("Large payload detected, fetching from external storage");
            return await FetchFromExternalStorageAsync(message);
        }
        
        return message.Body.ToObjectFromJson<Order>();
    }

    private async Task<Order> FetchFromExternalStorageAsync(ServiceBusReceivedMessage message)
    {
        // Get blob URL from message properties
        var blobUrl = message.ApplicationProperties.GetValueOrDefault("PayloadBlobUrl") as string;
        
        if (string.IsNullOrEmpty(blobUrl))
        {
            throw new ValidationException("Large payload without blob URL");
        }
        
        // Download and deserialize
        var blobClient = new BlobClient(new Uri(blobUrl));
        var response = await blobClient.DownloadAsync();
        
        using var stream = response.Value.Content;
        using var reader = new StreamReader(stream);
        var json = await reader.ReadToEndAsync();
        
        return JsonSerializer.Deserialize<Order>(json) 
            ?? throw new InvalidOperationException("Failed to deserialize blob content");
    }
}

Step 4: Processing Dead Letter Queue

public class DeadLetterProcessor
{
    private readonly ServiceBusClient _client;
    private readonly ILogger<DeadLetterProcessor> _logger;

    public DeadLetterProcessor(ServiceBusClient client, ILogger<DeadLetterProcessor> logger)
    {
        _client = client;
        _logger = logger;
    }

    public async Task ProcessDeadLettersAsync()
    {
        var receiver = _client.CreateReceiver("order-processing", new ServiceBusReceiverOptions
        {
            SubQueue = SubQueue.DeadLetter
        });

        while (true)
        {
            var messages = await receiver.ReceiveMessagesAsync(
                maxMessages: 10,
                maxWaitTime: TimeSpan.FromSeconds(5));

            if (messages.Count == 0) break;

            foreach (var message in messages)
            {
                await ProcessDeadLetterAsync(message);
            }
        }
    }

    private async Task ProcessDeadLetterAsync(ServiceBusReceivedMessage message)
    {
        var reason = message.DeadLetterReason;
        var errorDescription = message.DeadLetterErrorDescription;
        
        _logger.LogWarning("Processing DLQ message. Reason: {Reason}, Error: {Description}",
            reason, errorDescription);

        try
        {
            // Analyze the failure
            var failureType = AnalyzeFailureType(message);
            
            switch (failureType)
            {
                case FailureType.InvalidPayload:
                    await ArchiveAndNotifyAsync(message, "Invalid payload - needs manual review");
                    break;
                    
                case FailureType.TransientError:
                    // Could retry after fixing issue
                    await RetryAfterFixAsync(message);
                    break;
                    
                case FailureType.BusinessRule:
                    await ArchiveAndNotifyAsync(message, "Business rule violation");
                    break;
                    
                case FailureType.Timeout:
                    await RetryWithLongerTimeoutAsync(message);
                    break;
                    
                default:
                    await ArchiveForInvestigationAsync(message);
                    break;
            }
            
            // Complete the DLQ message
            await receiver.CompleteMessageAsync(message);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to process DLQ message");
        }
    }

    private FailureType AnalyzeFailureType(ServiceBusReceivedMessage message)
    {
        var reason = message.DeadLetterReason ?? "";
        var description = message.DeadLetterErrorDescription ?? "";
        
        if (reason.Contains("JsonException") || description.Contains("JSON"))
            return FailureType.InvalidPayload;
            
        if (description.Contains("timeout", StringComparison.OrdinalIgnoreCase))
            return FailureType.Timeout;
            
        if (description.Contains("validation", StringComparison.OrdinalIgnoreCase))
            return FailureType.BusinessRule;
            
        return FailureType.Unknown;
    }
}

public enum FailureType
{
    InvalidPayload,
    TransientError,
    BusinessRule,
    Timeout,
    Unknown
}

Step 5: Message Validation Pipeline

public class MessageValidationPipeline
{
    public async Task<(bool IsValid, string? Error)> ValidateAsync(ServiceBusMessage message)
    {
        // 1. Check required properties
        if (string.IsNullOrEmpty(message.MessageId))
            return (false, "Message ID is required");

        // 2. Check content type
        if (!message.ContentType?.Contains("json", StringComparison.OrdinalIgnoreCase) ?? true)
            return (false, $"Invalid content type: {message.ContentType}");

        // 3. Check size
        if (message.Body.Length > 1024 * 1024)
            return (false, $"Message too large: {message.Body.Length} bytes");

        // 4. Check required headers
        var requiredHeaders = new[] { "x-correlation-id", "x-tenant-id" };
        foreach (var header in requiredHeaders)
        {
            if (!message.ApplicationProperties.ContainsKey(header))
                return (false, $"Missing required header: {header}");
        }

        // 5. Validate JSON structure
        try
        {
            var json = message.Body.ToString();
            JsonDocument.Parse(json);
        }
        catch (JsonException ex)
        {
            return (false, $"Invalid JSON: {ex.Message}");
        }

        return (true, null);
    }
}

Testing Poison Message Handling

public class PoisonMessageTests
{
    private readonly ServiceBusClient _client;
    
    [Fact]
    public async Task InvalidJson_SendsToDeadLetter()
    {
        // Arrange
        var sender = _client.CreateSender("order-processing");
        
        // Send malformed JSON
        var message = new ServiceBusMessage("{ invalid json }")
        {
            ContentType = "application/json"
        };
        
        await sender.SendMessageAsync(message);
        
        // Wait for processing
        await Task.Delay(TimeSpan.FromSeconds(5));
        
        // Assert - check dead letter queue
        var dlqReceiver = _client.CreateReceiver("order-processing", 
            new ServiceBusReceiverOptions { SubQueue = SubQueue.DeadLetter });
        
        var dlqMessages = await dlqReceiver.PeekMessagesAsync(10);
        Assert.Single(dlqMessages);
        Assert.Contains("Invalid JSON", dlqMessages[0].DeadLetterErrorDescription);
    }

    [Fact]
    public async Task OversizedMessage_SendsToDeadLetter()
    {
        var sender = _client.CreateSender("order-processing");
        
        // Create message larger than limit
        var largePayload = new string('x', 2 * 1024 * 1024); // 2MB
        var message = new ServiceBusMessage(largePayload);
        
        await sender.SendMessageAsync(message);
        
        await Task.Delay(TimeSpan.FromSeconds(5));
        
        var dlqReceiver = _client.CreateReceiver("order-processing",
            new ServiceBusReceiverOptions { SubQueue = SubQueue.DeadLetter });
        
        var dlqMessages = await dlqReceiver.PeekMessagesAsync(10);
        Assert.Single(dlqMessages);
    }
}

Monitoring and Alerts

public class DeadLetterMonitor
{
    public async Task CheckAndAlertAsync()
    {
        var adminClient = new ServiceBusAdministrationClient("connection-string");
        
        var queueProperties = await adminClient.GetQueueAsync("order-processing");
        var dlqPath = EntityNameFormatter.FormatDeadLetterQueuePath("order-processing");
        var dlqProperties = await adminClient.GetQueueAsync(dlqPath);

        if (dlqProperties.Value.ActiveMessageCount > 100)
        {
            // Send alert
            await SendAlertEmailAsync(dlqProperties.Value.ActiveMessageCount);
        }
    }
}

Best Practices Summary

PracticeWhy
Set MaxDeliveryCount = 5Enough retries, not too many
Enable dead letter on expirationPreserve messages that expire
Validate before processingFail fast on invalid messages
Log DLQ messagesInvestigate and fix root cause
Monitor DLQ depthAlert when messages pile up
Implement retry with backoffDon't hammer failing services

Summary