Azure Service Bus — Duplicate Detection

Idempotent Message Processing, Detection Window


Introduction

Duplicate message detection is a critical feature for building reliable messaging systems. In distributed systems, network issues, retries, and failures can cause the same message to be delivered multiple times. Without proper handling, this can lead to:

  • Duplicate processing (e.g., charging a customer twice)
  • Data inconsistencies
  • System overload

Azure Service Bus provides built-in duplicate detection that automatically identifies and discards duplicate messages based on a configurable time window.


How Duplicate Detection Works

Basic Flow

┌─────────┐    ┌─────────────┐    ┌──────────────────┐    ┌─────────────┐
│ Producer│───▶│ Service Bus │───▶│ Detection Check  │───▶│   Consumer  │
└─────────┘    │   Queue     │    │ (MessageId)      │    └─────────────┘
               └─────────────┘    └──────────────────┘
                     │                   │
                     │ Duplicate?        │
                     ▼                   ▼
               ┌─────────────┐    ┌─────────────┐
               │  Store ID   │    │  Discard    │
               │  (Window)   │    │  (Duplicate)│
               └─────────────┘    └─────────────┘

Key Concepts

ConceptDescription
MessageIdUnique identifier for duplicate detection
Detection WindowTime period to track message IDs
DuplicateMessage with same ID within window
AutomaticHandled by Service Bus automatically

Enabling Duplicate Detection

Create Queue with Duplicate Detection

// Using .NET SDK
var namespaceManager = new NamespaceManagerAddress(
    new Uri("https://my-namespace.servicebus.windows.net/"),
    TokenProvider.CreateSharedAccessSignatureTokenProvider(
        "RootManageSharedAccessKey", "shared-access-key"));

var queueDescription = new QueueDescription("orders-queue")
{
    // Enable duplicate detection with 10-minute window
    RequiresDuplicateDetection = true,
    DuplicateDetectionHistoryTimeWindow = TimeSpan.FromMinutes(10)
};

await namespaceManager.CreateQueueAsync(queueDescription);

Using Azure CLI

# Create queue with 10-minute duplicate detection window
az servicebus queue create \
  --name orders-queue \
  --namespace-name my-namespace \
  --resource-group my-rg \
  --duplicate-detection-history-time-window "PT10M" \
  --requires-duplicate-detection true

# Or using ARM template
az group deployment create \
  --resource-group my-rg \
  --template-file deploy.json

ARM Template

{
  "$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
  "contentVersion": "1.0.0.0",
  "resources": [
    {
      "type": "Microsoft.ServiceBus/Queues",
      "name": "[concat('my-namespace', '/', 'orders-queue')]",
      "apiVersion": "2021-06-01-preview",
      "properties": {
        "requiresDuplicateDetection": true,
        "duplicateDetectionHistoryTimeWindow": "PT10M"
      }
    }
  ]
}

Setting Message ID

Generating Deterministic Message IDs

The key to effective duplicate detection is generating consistent MessageIds:

public class OrderService
{
    public ServiceBusMessage CreateOrderMessage(Order order)
    {
        // Use order ID + timestamp for idempotency
        // Format: order-{orderId}-{timestamp}
        // IMPORTANT: Remove timestamp for true idempotency!
        
        var messageId = $"order-{order.OrderId}";
        
        return new ServiceBusMessage(Encoding.UTF8.GetBytes(JsonSerializer.Serialize(order)))
        {
            MessageId = messageId,
            ContentType = "application/json",
            Subject = "OrderCreated"
        };
    }
}

Better: Use Business Key Only

// ✅ BEST: Use only business keys for ID
var messageId = $"order-{order.OrderId}";        // Unique per order
var messageId = $"payment-{payment.PaymentId}"; // Unique per payment
var messageId = $"shipment-{shipment.ShipmentId}"; // Unique per shipment

// ❌ AVOID: Include timestamps
var badMessageId = $"order-{order.OrderId}-{DateTime.UtcNow.Ticks}"; // Different every time!
var badMessageId = $"order-{order.OrderId}-{Guid.NewGuid()}"; // Always different!

When to Include Timestamp

Only include timestamp if you specifically want to allow duplicate sends within a time window:

// Allow retry within 5 minutes
var messageId = $"order-{order.OrderId}-{DateTime.UtcNow:yyyyMMddHHmm}";
// Message from 10:30 can retry at 10:32 (same 5-minute block)
// Message from 10:30 cannot retry at 10:36 (different block)

Client-Side Idempotent Processing

Even with Service Bus duplicate detection, you should implement client-side idempotency for defense in depth:

Idempotent Message Processor

public class IdempotentMessageProcessor
{
    private readonly IRedisCache _cache;
    private readonly ServiceBusProcessor _processor;
    
    public async Task ProcessMessageAsync(
        ServiceBusReceivedMessage message,
        ILogger logger)
    {
        var messageId = message.MessageId;
        
        // Check if already processed
        var processedKey = $"sb:processed:{messageId}";
        
        // Try to get lock (prevents parallel processing)
        var lockKey = $"sb:lock:{messageId}";
        var lockAcquired = await _cache.SetIfNotExistsAsync(
            lockKey, 
            "locked", 
            TimeSpan.FromSeconds(30));
        
        if (!lockAcquired)
        {
            // Another instance is processing this message
            logger.LogInformation("Message {MessageId} is being processed elsewhere", messageId);
            await message.AbandonAsync();
            return;
        }
        
        try
        {
            // Double-check: is it already processed?
            var alreadyProcessed = await _cache.ExistsAsync(processedKey);
            if (alreadyProcessed)
            {
                logger.LogInformation("Message {MessageId} already processed, completing", messageId);
                await message.CompleteAsync();
                return;
            }
            
            // Process the message
            var result = await DoProcessMessageAsync(message);
            
            if (result.Success)
            {
                // Mark as processed
                // Use message timestamp + window for TTL
                var window = message.EnqueuedTime.AddHours(24);
                var ttl = window - DateTime.UtcNow;
                
                await _cache.SetAsync(
                    processedKey,
                    JsonSerializer.Serialize(new
                    {
                        ProcessedAt = DateTime.UtcNow,
                        Result = result
                    }),
                    ttl > TimeSpan.Zero ? ttl : TimeSpan.FromHours(24));
                
                await message.CompleteAsync();
                logger.LogInformation("Message {MessageId} processed successfully", messageId);
            }
            else
            {
                // Processing failed - will retry or go to DLQ
                await message.AbandonAsync();
            }
        }
        finally
        {
            // Release lock
            await _cache.DeleteAsync(lockKey);
        }
    }
    
    private async Task<ProcessingResult> DoProcessMessageAsync(ServiceBusReceivedMessage message)
    {
        // Your actual processing logic
        var order = JsonSerializer.Deserialize<Order>(Encoding.UTF8.GetString(message.Body));
        
        // Process order - but check if already processed at database level too
        var existingOrder = await _orderRepository.GetByIdAsync(order.OrderId);
        if (existingOrder != null && existingOrder.Status != "Pending")
        {
            return new ProcessingResult 
            { 
                Success = true, 
                AlreadyProcessed = true 
            };
        }
        
        await _orderRepository.CreateAsync(order);
        
        return new ProcessingResult { Success = true };
    }
}

Database-Level Idempotency

public class OrderRepository
{
    public async Task<Order> CreateAsync(Order order)
    {
        // Use database constraint for idempotency
        await _context.Orders.AddAsync(order);
        
        try
        {
            await _context.SaveChangesAsync();
            return order;
        }
        catch (DbUpdateException ex) when (IsDuplicateKey(ex))
        {
            // Duplicate - fetch and return existing
            return await _context.Orders
                .FirstOrDefaultAsync(o => o.OrderId == order.OrderId);
        }
    }
    
    private bool IsDuplicateKey(DbUpdateException ex)
    {
        return ex.InnerException?.Message.Contains("unique index") ?? false;
    }
}

Configuring Detection Window

Window Duration Guidelines

WindowUse Case
1 minuteShort retries, quick recovery
5 minutesStandard processing
1 hourLong-running operations
24 hoursBatch processing, audit trails

Choosing Window Size

Consider:

  • Retry frequency — How often does your producer retry?
  • Processing time — How long does one processing take?
  • Network latency — Potential delays in acknowledgment
// Example: 5-minute window
var queue = new QueueDescription("orders")
{
    RequiresDuplicateDetection = true,
    DuplicateDetectionHistoryTimeWindow = TimeSpan.FromMinutes(5)
};

// Too short? Message might be retried after window expires
// Too long? More memory usage in Service Bus

Handling Duplicates at Scale

Using Redis for Distributed Idempotency

public class RedisIdempotencyService
{
    private readonly IConnectionMultiplexer _redis;
    private readonly ILogger<RedisIdempotencyService> _logger;
    
    public async Task<(bool IsNew, T Result)> ProcessIdempotentlyAsync<T>(
        string idempotencyKey,
        Func<T> processor)
    {
        // Key format: idempotency:{entity}:{id}
        var key = $"sb:idempotency:{idempotencyKey}";
        
        // Check if already processed
        var existing = await _redis.GetDatabase().StringGetAsync(key);
        if (!existing.IsNull)
        {
            _logger.LogInformation("Returning cached result for {Key}", idempotencyKey);
            return (false, JsonSerializer.Deserialize<T>(existing));
        }
        
        // Process
        var result = processor();
        
        // Cache result (use appropriate TTL)
        await _redis.GetDatabase().StringSetAsync(
            key,
            JsonSerializer.Serialize(result),
            TimeSpan.FromHours(24));
        
        return (true, result);
    }
}

Testing Duplicate Detection

Unit Test

[Fact]
public async Task DuplicateMessages_AreRejected()
{
    // Arrange
    var queueClient = new QueueClient(
        connectionString, 
        "test-queue",
        ReceiveMode.PeekLock);
    
    var messageId = $"test-{Guid.NewGuid()}";
    
    // Send first message
    var message1 = new Message(Encoding.UTF8.GetBytes("test"))
    {
        MessageId = messageId
    };
    await queueClient.SendAsync(message1);
    
    // Send duplicate
    var message2 = new Message(Encoding.UTF8.GetBytes("test"))
    {
        MessageId = messageId
    };
    await queueClient.SendAsync(message2);
    
    // Act - receive both
    var received1 = await queueClient.ReceiveAsync();
    var received2 = await queueClient.ReceiveAsync();
    
    // Assert - only one should be received
    Assert.NotNull(received1);
    Assert.Null(received2); // Duplicate rejected
    
    // Cleanup
    await queueClient.CloseAsync();
}

Monitoring Duplicate Detection

Azure Metrics

// Track duplicate messages using Application Insights
services.AddApplicationInsightsTelemetry(options =>
{
    options.EnableDependencyTrackingTelemetryModule = true;
});

// Query duplicates in Log Analytics
var query = @"
    AzureMetrics 
    | where TimeGenerated > ago(1h)
    | where MetricName == 'DuplicateMessages'
    | summarize sum(DuplicateMessagesCount) by bin(TimeGenerated, 10m)
";

Alert Configuration

# Create alert for high duplicate rate
az monitor metrics alert create \
  --name "high-duplicate-alert" \
  --resource-group my-rg \
  --condition "avg DuplicateMessages > 10" \
  --description "High number of duplicate messages detected"

Best Practices Summary

PracticeDescription
Use business keysMessageId should be deterministic
Set appropriate windowMatch your retry patterns
Implement client-sideDefense in depth
Add database constraintsUltimate idempotency barrier
Monitor duplicatesTrack and alert on duplicates
Test thoroughlyVerify duplicate handling

Common Mistakes

  • ❌ Using GUID for MessageId (different every time)
  • ❌ Including timestamps in MessageId
  • ❌ Window too short for retry patterns
  • ❌ Not implementing client-side idempotency
  • ❌ Forgetting to handle database duplicates

Azure Integration Hub - Advanced Level