Competing Consumers at Scale

Distributed Message Processing Patterns


Introduction

Competing Consumers is a pattern where multiple independent message consumers process messages from the same queue. This provides horizontal scalability, fault tolerance, and better resource utilization compared to sequential processing. When designing high-throughput integration systems on Azure, understanding how to implement competing consumers effectively is critical for handling millions of messages reliably.

This comprehensive guide covers:

  • Competing consumers pattern — Understanding the architecture
  • Azure Service Bus implementation — Queue and subscription patterns
  • Scaling strategies — Auto-scaling and capacity planning
  • Fault tolerance — Handling failures and retries
  • Message ordering — Managing ordering guarantees
  • Performance optimization — Maximizing throughput

Understanding the Pattern

Basic Competing Consumers

┌─────────────────────────────────────────────────────────────────────┐
│                   COMPETING CONSUMERS ARCHITECTURE                  │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│                         MESSAGE QUEUE                               │
│   ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐                  │
│   │ M1 │ │ M2 │ │ M3 │ │ M4 │ │ M5 │ │ M6 │ │ M7 │                  │
│   └──┬─┘ └──┬─┘ └──┬─┘ └──┬─┘ └──┬─┘ └──┬─┘ └──┬─┘                  │
│      │      │      │      │      │      │      │                    │
│      └──────┴──────┴──────┴──────┴──────┴──────┘                    │
│                    Load balanced to                                 │
│                         │                                           │
│        ┌───────────────┼───────────────┐                            │
│        ▼               ▼               ▼                            │
│   ┌─────────┐     ┌─────────┐     ┌─────────┐                       │
│   │Consumer │     │Consumer │     │Consumer │                       │
│   │    1    │     │    2    │     │    3    │                       │
│   │         │     │         │     │         │                       │
│   │ Process │     │ Process │     │ Process │                       │
│   │ M1, M4  │     │ M2, M5  │     │ M3, M6  │                       │
│   └─────────┘     └─────────┘     └─────────┘                       │
│                                                                     │
│   Benefits:                                                         │
│   ✓ Horizontal scalability                                          │
│   ✓ Fault tolerance (consumer failure doesn't stop processing)      │
│   ✓ Resource efficiency (no idle consumers)                         │
│   ✓ Simple load balancing                                           │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Scaling Behavior

┌─────────────────────────────────────────────────────────────────────┐
│                      CONSUMER SCALING FLOW                          │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   Stage 1: Low Traffic (1 Consumer)                                 │
│   ─────────────────────────────────────                             │
│   Messages: [M1][M2][M3][M4][M5]...                                 │
│   Consumer:  [Processing M1]      Idle: M2,M3,M4...                 │
│   Utilization: Low                                                  │
│                                                                     │
│   Stage 2: Higher Traffic (Scale to 3 Consumers)                    │
│   ───────────────────────────────────────────                       │
│   Messages: [M1][M2][M3][M4][M5]...                                 │
│   Consumer1: [M1]                                                   │
│   Consumer2: [M2]                                                   │
│   Consumer3: [M3]                                                   │
│   Utilization: Optimal                                              │
│                                                                     │
│   Stage 3: High Traffic (Scale to 10 Consumers)                     │
│   ─────────────────────────────────────────                         │
│   Messages: [M1][M2]...[M10]                                        │
│   Consumer1-10: [Each processing one]                               │
│   Utilization: High                                                 │
│                                                                     │
│   Stage 4: Consumer Failure                                         │
│   ─────────────────────────────                                     │
│   Consumer3 fails → Messages redistributed                          │
│   Consumer1: [M1] [M4]                                              │
│   Consumer2: [M2] [M5]                                              │
│   Consumer4 (new): [M3]                                             │
│   ✓ No message loss                                                 │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Azure Service Bus Implementation

Queue Configuration

# Create queue with competing consumers configuration
az servicebus queue create \
  --name orders-queue \
  --namespace-name mynamespace \
  --resource-group my-rg \
  --lock-duration PT1M \
  --max-delivery-count 10 \
  --default-message-time-to-live P7D \
  --dead-letter-on-message-expiration true \
  --enable-partitioning false \
  --max-size-in-megabytes 1024

Consumer Implementation

public class CompetingConsumerService
{
    private readonly ServiceBusClient _client;
    private readonly ServiceBusProcessorOptions _options;
    private readonly ILogger<CompetingConsumerService> _logger;

    public CompetingConsumerService()
    {
        _client = new ServiceBusClient(
            "mynamespace.servicebus.windows.net",
            new DefaultAzureCredential());

        _options = new ServiceBusProcessorOptions
        {
            MaxConcurrentCalls = 32,           // Process 32 messages concurrently
            PrefetchCount = 100,               // Prefetch messages for efficiency
            AutoComplete = false,              // Manual complete for reliability
            MaxAutoRenewDuration = TimeSpan.FromMinutes(5),
            ReceiveMode = ServiceBusReceiveMode.PeekLock
        };
    }

    public async Task StartProcessingAsync()
    {
        var processor = _client.CreateProcessor("orders-queue", _options);

        processor.ProcessMessageAsync += ProcessMessageHandler;
        processor.ProcessErrorAsync += ProcessErrorHandler;

        await processor.StartProcessingAsync();
        
        _logger.LogInformation("Consumer started with {MaxConcurrent} concurrent calls", 
            _options.MaxConcurrentCalls);
    }

    private async Task ProcessMessageHandler(ProcessMessageEventArgs args)
    {
        var message = args.Message;
        var consumerId = Environment.MachineName;

        try
        {
            _logger.LogInformation(
                "Processing message {MessageId} on consumer {ConsumerId}",
                message.MessageId, consumerId);

            // Process the message
            var order = JsonSerializer.Deserialize<Order>(message.Body.ToString());
            await ProcessOrderAsync(order);

            // Complete the message
            await args.CompleteMessageAsync(message);

            _logger.LogInformation(
                "Successfully processed message {MessageId}", message.MessageId);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error processing message {MessageId}", message.MessageId);

            // Check delivery count and decide next action
            if (message.DeliveryCount >= 10)
            {
                // Max retries exceeded - dead letter
                await args.DeadLetterMessageAsync(message, new Dictionary<string, object>
                {
                    { "Error", ex.Message },
                    { "ConsumerId", consumerId }
                });
            }
            else
            {
                // Abandon for retry
                await args.AbandonMessageAsync(message);
            }
        }
    }

    private Task ProcessErrorHandler(ProcessErrorEventArgs args)
    {
        _logger.LogError(args.Exception, "Processor error: {ErrorSource}", args.ErrorSource);
        return Task.CompletedTask;
    }

    private async Task ProcessOrderAsync(Order order)
    {
        // Business logic here
        await Task.Delay(100); // Simulated work
    }
}

Auto-Scaling with Azure Functions

public class ScalingFunction
{
    [FunctionName("QueueScaler")]
    public async Task Run(
        [TimerTrigger("0 */5 * * * *")] TimerInfo timer,
        [ServiceBusTrigger("orders-queue", Connection = "ServiceBusConnection")]
        ServiceBusMessage message,
        ILogger log)
    {
        // Get queue metrics
        var queueMetrics = await GetQueueMetricsAsync();
        
        var currentDepth = queueMetrics.ActiveMessageCount;
        var avgProcessingTime = queueMetrics.AverageProcessingTimeMs;
        
        // Calculate required instances
        var requiredInstances = CalculateRequiredInstances(currentDepth, avgProcessingTime);
        
        // Apply scaling
        await ScaleFunctionAppAsync(requiredInstances);
        
        log.LogInformation(
            "Queue depth: {Depth}, Avg processing: {Time}ms, Scaled to {Instances} instances",
            currentDepth, avgProcessingTime, requiredInstances);
    }

    private int CalculateRequiredInstances(long queueDepth, double avgProcessingTime)
    {
        // Example: Want to process queue in 5 minutes
        var targetTime = 300; // seconds
        var messagesPerSecondTarget = queueDepth / targetTime;
        
        // Each instance handles ~100 messages/second
        var instances = (int)Math.Ceiling(messagesPerSecondTarget / 100.0);
        
        // Bound min/max
        return Math.Clamp(instances, 1, 50);
    }
}

Handling Message Ordering

When Ordering Matters

┌─────────────────────────────────────────────────────────────────────┐
│                    ORDERING CONSIDERATIONS                          │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   IMPORTANT ORDERING (Process Sequentially)                         │
│   ─────────────────────────────────────────                         │
│   ✓ Financial transactions (debit then credit)                      │
│   ✓ Inventory reservations (reserve then confirm)                   │
│   ✓ State machines (created → approved → shipped)                   │
│                                                                     │
│   Solution: Use sessions or partition keys                          │
│                                                                     │
│   ─────────────────────────────────────────                         │
│                                                                     │
│   UNIMPORTANT ORDERING (Any Order OK)                               │
│   ───────────────────────────────────────                           │
│   ✓ Notification sending                                            │
│   ✓ Analytics events                                                │
│   ✓ Audit logging                                                   │
│   ✓ Image processing                                                │
│                                                                     │
│   Solution: Full competing consumers (no ordering needed)           │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Session-Based Ordering

// Send messages with session ID for ordered processing
public async Task SendOrderedMessagesAsync(Order order)
{
    var sender = _client.CreateSender("orders-queue");
    
    // All messages for same order use same session
    var events = new[]
    {
        CreateMessage("OrderCreated", order, order.OrderId),
        CreateMessage("PaymentProcessed", order, order.OrderId),
        CreateMessage("InventoryReserved", order, order.OrderId),
        CreateMessage("ShipmentCreated", order, order.OrderId)
    };
    
    await sender.SendMessagesAsync(events);
}

private ServiceBusMessage CreateMessage(string eventType, Order order, string sessionId)
{
    return new ServiceBusMessage(JsonSerializer.Serialize(new 
    {
        OrderId = order.OrderId,
        EventType = eventType,
        Data = order
    }))
    {
        SessionId = sessionId,  // Ensures ordered processing
        Subject = eventType
    };
}

Performance Optimization

Optimal Configuration

public class OptimizedProcessor
{
    private ServiceBusProcessorOptions GetOptimizedOptions(int expectedRPS)
    {
        // Rule of thumb: Prefetch = 4x max concurrent calls
        var maxConcurrentCalls = Math.Min(expectedRPS / 10, 100);
        var prefetchCount = maxConcurrentCalls * 4;

        return new ServiceBusProcessorOptions
        {
            MaxConcurrentCalls = maxConcurrentCalls,
            PrefetchCount = prefetchCount,
            AutoComplete = false,  // Manual for reliability
            MaxAutoRenewDuration = TimeSpan.FromMinutes(5),
            ReceiveMode = ServiceBusReceiveMode.PeekLock
        };
    }
}

Throughput Monitoring

{
  "monitoring": {
    "metrics": [
      {
        "name": "MessageProcessingLatency",
        "description": "Time from receive to complete",
        "alertThreshold": "p95 > 30s"
      },
      {
        "name": "QueueDepth",
        "description": "Messages waiting to be processed",
        "alertThreshold": "> 10000"
      },
      {
        "name": "DeadLetterCount",
        "description": "Messages sent to DLQ",
        "alertThreshold": "> 100 per hour"
      },
      {
        "name": "ConsumerUtilization",
        "description": "Percentage of consumer capacity used",
        "alertThreshold": "> 80%"
      }
    ]
  }
}

Best Practices

Implementation Checklist

PracticeDescription
Manual completionDon't use auto-complete for reliability
Appropriate prefetchBalance latency vs. memory
Graceful shutdownComplete current message before stopping
Dead letter handlingMonitor and process DLQ regularly
Idempotent processingHandle duplicate messages safely
Circuit breakerStop processing when downstream fails

Error Handling Pattern

public class ReliableMessageHandler
{
    private async Task HandleWithRetryAsync(ServiceBusReceivedMessage message)
    {
        var retryCount = 0;
        var maxRetries = 3;
        
        while (retryCount < maxRetries)
        {
            try
            {
                await ProcessMessageAsync(message);
                await CompleteMessageAsync(message);
                return;
            }
            catch (TransientException ex) when (retryCount < maxRetries)
            {
                retryCount++;
                var delay = TimeSpan.FromSeconds(Math.Pow(2, retryCount));
                await Task.Delay(delay);
                
                _logger.LogWarning(
                    "Retry {RetryCount} for message {MessageId}: {Error}",
                    retryCount, message.MessageId, ex.Message);
            }
            catch (Exception ex)
            {
                // Non-retryable or max retries reached
                await DeadLetterAsync(message, ex);
                throw;
            }
        }
    }
}

Related Topics


Azure Integration Hub - Architect Level Enterprise Integration Patterns