Competing Consumers at Scale
Distributed Message Processing Patterns
Introduction
Competing Consumers is a pattern where multiple independent message consumers process messages from the same queue. This provides horizontal scalability, fault tolerance, and better resource utilization compared to sequential processing. When designing high-throughput integration systems on Azure, understanding how to implement competing consumers effectively is critical for handling millions of messages reliably.
This comprehensive guide covers:
- Competing consumers pattern — Understanding the architecture
- Azure Service Bus implementation — Queue and subscription patterns
- Scaling strategies — Auto-scaling and capacity planning
- Fault tolerance — Handling failures and retries
- Message ordering — Managing ordering guarantees
- Performance optimization — Maximizing throughput
Understanding the Pattern
Basic Competing Consumers
┌─────────────────────────────────────────────────────────────────────┐
│ COMPETING CONSUMERS ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ MESSAGE QUEUE │
│ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │
│ │ M1 │ │ M2 │ │ M3 │ │ M4 │ │ M5 │ │ M6 │ │ M7 │ │
│ └──┬─┘ └──┬─┘ └──┬─┘ └──┬─┘ └──┬─┘ └──┬─┘ └──┬─┘ │
│ │ │ │ │ │ │ │ │
│ └──────┴──────┴──────┴──────┴──────┴──────┘ │
│ Load balanced to │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │Consumer │ │Consumer │ │Consumer │ │
│ │ 1 │ │ 2 │ │ 3 │ │
│ │ │ │ │ │ │ │
│ │ Process │ │ Process │ │ Process │ │
│ │ M1, M4 │ │ M2, M5 │ │ M3, M6 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ Benefits: │
│ ✓ Horizontal scalability │
│ ✓ Fault tolerance (consumer failure doesn't stop processing) │
│ ✓ Resource efficiency (no idle consumers) │
│ ✓ Simple load balancing │
│ │
└─────────────────────────────────────────────────────────────────────┘
Scaling Behavior
┌─────────────────────────────────────────────────────────────────────┐
│ CONSUMER SCALING FLOW │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Stage 1: Low Traffic (1 Consumer) │
│ ───────────────────────────────────── │
│ Messages: [M1][M2][M3][M4][M5]... │
│ Consumer: [Processing M1] Idle: M2,M3,M4... │
│ Utilization: Low │
│ │
│ Stage 2: Higher Traffic (Scale to 3 Consumers) │
│ ─────────────────────────────────────────── │
│ Messages: [M1][M2][M3][M4][M5]... │
│ Consumer1: [M1] │
│ Consumer2: [M2] │
│ Consumer3: [M3] │
│ Utilization: Optimal │
│ │
│ Stage 3: High Traffic (Scale to 10 Consumers) │
│ ───────────────────────────────────────── │
│ Messages: [M1][M2]...[M10] │
│ Consumer1-10: [Each processing one] │
│ Utilization: High │
│ │
│ Stage 4: Consumer Failure │
│ ───────────────────────────── │
│ Consumer3 fails → Messages redistributed │
│ Consumer1: [M1] [M4] │
│ Consumer2: [M2] [M5] │
│ Consumer4 (new): [M3] │
│ ✓ No message loss │
│ │
└─────────────────────────────────────────────────────────────────────┘
Azure Service Bus Implementation
Queue Configuration
# Create queue with competing consumers configuration
az servicebus queue create \
--name orders-queue \
--namespace-name mynamespace \
--resource-group my-rg \
--lock-duration PT1M \
--max-delivery-count 10 \
--default-message-time-to-live P7D \
--dead-letter-on-message-expiration true \
--enable-partitioning false \
--max-size-in-megabytes 1024
Consumer Implementation
public class CompetingConsumerService
{
private readonly ServiceBusClient _client;
private readonly ServiceBusProcessorOptions _options;
private readonly ILogger<CompetingConsumerService> _logger;
public CompetingConsumerService()
{
_client = new ServiceBusClient(
"mynamespace.servicebus.windows.net",
new DefaultAzureCredential());
_options = new ServiceBusProcessorOptions
{
MaxConcurrentCalls = 32, // Process 32 messages concurrently
PrefetchCount = 100, // Prefetch messages for efficiency
AutoComplete = false, // Manual complete for reliability
MaxAutoRenewDuration = TimeSpan.FromMinutes(5),
ReceiveMode = ServiceBusReceiveMode.PeekLock
};
}
public async Task StartProcessingAsync()
{
var processor = _client.CreateProcessor("orders-queue", _options);
processor.ProcessMessageAsync += ProcessMessageHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
await processor.StartProcessingAsync();
_logger.LogInformation("Consumer started with {MaxConcurrent} concurrent calls",
_options.MaxConcurrentCalls);
}
private async Task ProcessMessageHandler(ProcessMessageEventArgs args)
{
var message = args.Message;
var consumerId = Environment.MachineName;
try
{
_logger.LogInformation(
"Processing message {MessageId} on consumer {ConsumerId}",
message.MessageId, consumerId);
// Process the message
var order = JsonSerializer.Deserialize<Order>(message.Body.ToString());
await ProcessOrderAsync(order);
// Complete the message
await args.CompleteMessageAsync(message);
_logger.LogInformation(
"Successfully processed message {MessageId}", message.MessageId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing message {MessageId}", message.MessageId);
// Check delivery count and decide next action
if (message.DeliveryCount >= 10)
{
// Max retries exceeded - dead letter
await args.DeadLetterMessageAsync(message, new Dictionary<string, object>
{
{ "Error", ex.Message },
{ "ConsumerId", consumerId }
});
}
else
{
// Abandon for retry
await args.AbandonMessageAsync(message);
}
}
}
private Task ProcessErrorHandler(ProcessErrorEventArgs args)
{
_logger.LogError(args.Exception, "Processor error: {ErrorSource}", args.ErrorSource);
return Task.CompletedTask;
}
private async Task ProcessOrderAsync(Order order)
{
// Business logic here
await Task.Delay(100); // Simulated work
}
}
Auto-Scaling with Azure Functions
public class ScalingFunction
{
[FunctionName("QueueScaler")]
public async Task Run(
[TimerTrigger("0 */5 * * * *")] TimerInfo timer,
[ServiceBusTrigger("orders-queue", Connection = "ServiceBusConnection")]
ServiceBusMessage message,
ILogger log)
{
// Get queue metrics
var queueMetrics = await GetQueueMetricsAsync();
var currentDepth = queueMetrics.ActiveMessageCount;
var avgProcessingTime = queueMetrics.AverageProcessingTimeMs;
// Calculate required instances
var requiredInstances = CalculateRequiredInstances(currentDepth, avgProcessingTime);
// Apply scaling
await ScaleFunctionAppAsync(requiredInstances);
log.LogInformation(
"Queue depth: {Depth}, Avg processing: {Time}ms, Scaled to {Instances} instances",
currentDepth, avgProcessingTime, requiredInstances);
}
private int CalculateRequiredInstances(long queueDepth, double avgProcessingTime)
{
// Example: Want to process queue in 5 minutes
var targetTime = 300; // seconds
var messagesPerSecondTarget = queueDepth / targetTime;
// Each instance handles ~100 messages/second
var instances = (int)Math.Ceiling(messagesPerSecondTarget / 100.0);
// Bound min/max
return Math.Clamp(instances, 1, 50);
}
}
Handling Message Ordering
When Ordering Matters
┌─────────────────────────────────────────────────────────────────────┐
│ ORDERING CONSIDERATIONS │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ IMPORTANT ORDERING (Process Sequentially) │
│ ───────────────────────────────────────── │
│ ✓ Financial transactions (debit then credit) │
│ ✓ Inventory reservations (reserve then confirm) │
│ ✓ State machines (created → approved → shipped) │
│ │
│ Solution: Use sessions or partition keys │
│ │
│ ───────────────────────────────────────── │
│ │
│ UNIMPORTANT ORDERING (Any Order OK) │
│ ─────────────────────────────────────── │
│ ✓ Notification sending │
│ ✓ Analytics events │
│ ✓ Audit logging │
│ ✓ Image processing │
│ │
│ Solution: Full competing consumers (no ordering needed) │
│ │
└─────────────────────────────────────────────────────────────────────┘
Session-Based Ordering
// Send messages with session ID for ordered processing
public async Task SendOrderedMessagesAsync(Order order)
{
var sender = _client.CreateSender("orders-queue");
// All messages for same order use same session
var events = new[]
{
CreateMessage("OrderCreated", order, order.OrderId),
CreateMessage("PaymentProcessed", order, order.OrderId),
CreateMessage("InventoryReserved", order, order.OrderId),
CreateMessage("ShipmentCreated", order, order.OrderId)
};
await sender.SendMessagesAsync(events);
}
private ServiceBusMessage CreateMessage(string eventType, Order order, string sessionId)
{
return new ServiceBusMessage(JsonSerializer.Serialize(new
{
OrderId = order.OrderId,
EventType = eventType,
Data = order
}))
{
SessionId = sessionId, // Ensures ordered processing
Subject = eventType
};
}
Performance Optimization
Optimal Configuration
public class OptimizedProcessor
{
private ServiceBusProcessorOptions GetOptimizedOptions(int expectedRPS)
{
// Rule of thumb: Prefetch = 4x max concurrent calls
var maxConcurrentCalls = Math.Min(expectedRPS / 10, 100);
var prefetchCount = maxConcurrentCalls * 4;
return new ServiceBusProcessorOptions
{
MaxConcurrentCalls = maxConcurrentCalls,
PrefetchCount = prefetchCount,
AutoComplete = false, // Manual for reliability
MaxAutoRenewDuration = TimeSpan.FromMinutes(5),
ReceiveMode = ServiceBusReceiveMode.PeekLock
};
}
}
Throughput Monitoring
{
"monitoring": {
"metrics": [
{
"name": "MessageProcessingLatency",
"description": "Time from receive to complete",
"alertThreshold": "p95 > 30s"
},
{
"name": "QueueDepth",
"description": "Messages waiting to be processed",
"alertThreshold": "> 10000"
},
{
"name": "DeadLetterCount",
"description": "Messages sent to DLQ",
"alertThreshold": "> 100 per hour"
},
{
"name": "ConsumerUtilization",
"description": "Percentage of consumer capacity used",
"alertThreshold": "> 80%"
}
]
}
}
Best Practices
Implementation Checklist
| Practice | Description |
|---|---|
| Manual completion | Don't use auto-complete for reliability |
| Appropriate prefetch | Balance latency vs. memory |
| Graceful shutdown | Complete current message before stopping |
| Dead letter handling | Monitor and process DLQ regularly |
| Idempotent processing | Handle duplicate messages safely |
| Circuit breaker | Stop processing when downstream fails |
Error Handling Pattern
public class ReliableMessageHandler
{
private async Task HandleWithRetryAsync(ServiceBusReceivedMessage message)
{
var retryCount = 0;
var maxRetries = 3;
while (retryCount < maxRetries)
{
try
{
await ProcessMessageAsync(message);
await CompleteMessageAsync(message);
return;
}
catch (TransientException ex) when (retryCount < maxRetries)
{
retryCount++;
var delay = TimeSpan.FromSeconds(Math.Pow(2, retryCount));
await Task.Delay(delay);
_logger.LogWarning(
"Retry {RetryCount} for message {MessageId}: {Error}",
retryCount, message.MessageId, ex.Message);
}
catch (Exception ex)
{
// Non-retryable or max retries reached
await DeadLetterAsync(message, ex);
throw;
}
}
}
}
Related Topics
- Saga Pattern — Distributed transactions
- Outbox Pattern — Reliable publishing
- Event Sourcing — Event-driven state
Azure Integration Hub - Architect Level Enterprise Integration Patterns