Azure Service Bus — Claim-Check Pattern

Large Payloads in Blob, Send Reference via SB


Introduction

The Claim-Check pattern solves a fundamental problem in messaging: what happens when your message payload exceeds the broker's size limits or when you want to optimize costs by keeping messages small?

Instead of sending large payloads through Service Bus, you:

  1. Store the payload in Azure Blob Storage
  2. Send a small "claim check" (reference) through Service Bus
  3. Consumer retrieves the payload from storage using the reference

This pattern enables:

  • Bypassing message size limits (Service Bus: 256KB standard, 100MB premium)
  • Cost optimization — Blob storage is cheaper than premium queues
  • Faster message processing — Smaller messages = faster throughput
  • Data governance — Large payloads stored with proper retention policies

Architecture

┌─────────────┐         ┌──────────────┐         ┌─────────────┐
│  Producer   │───────▶ │  Azure Blob  │         │  Consumer   │
│             │   1.    │   Storage    │         │             │
│             │────────▶│              │◀──3.    │             │
└─────────────┘         └──────────────┘         └─────────────┘
       │                                                   ▲
       │ 2. Send Claim Check                               │
       │    (Small Message)                                │
       ▼                                                   │
┌────────────────────────────────────────────────────────────┐
│                    Azure Service Bus                       │
│  ┌──────────────────────────────────────────────────────┐  │
│  │  Message {                                           │  │
│  │    "claimCheck": "true",                             │  │
│  │    "blobUri": "https://storage.blob.core.windows.",  │  │
│  │    "contentType": "application/json",                │  │
│  │    "contentLength": 52428800                         │  │
│  │  }                                                   │  │
│  └──────────────────────────────────────────────────────┘  │
└────────────────────────────────────────────────────────────┘

Implementation

Step 1: Claim Check Publisher

public class ClaimCheckPublisher
{
    private readonly BlobServiceClient _blobClient;
    private readonly ServiceBusSender _sender;
    private readonly ILogger<ClaimCheckPublisher> _logger;
    
    public async Task PublishLargeOrderAsync(Order order)
    {
        var containerName = "order-payloads";
        var blobName = $"orders/{order.OrderId}/payload.json";
        
        // Step 1: Upload payload to Blob Storage
        var containerClient = _blobClient.GetBlobContainerClient(containerName);
        await containerClient.CreateIfNotExistsAsync(PublicAccessType.None);
        
        var blobClient = containerClient.GetBlobClient(blobName);
        
        var orderJson = JsonSerializer.Serialize(order);
        var content = Encoding.UTF8.GetBytes(orderJson);
        
        // Upload with metadata for retrieval
        await blobClient.UploadAsync(
            new MemoryStream(content),
            new BlobHttpHeaders
            {
                ContentType = "application/json",
                ContentHash = ComputeMD5Hash(content)
            },
            new Dictionary<string, string>
            {
                ["orderId"] = order.OrderId,
                ["createdAt"] = DateTime.UtcNow.ToString("O"),
                ["contentType"] = order.ContentType,
                ["sizeBytes"] = content.Length.ToString()
            });
        
        _logger.LogInformation("Uploaded payload for order {OrderId} to blob: {BlobUri}", 
            order.OrderId, blobClient.Uri);
        
        // Step 2: Create and send claim check message
        var claimCheck = new ClaimCheckMessage
        {
            BlobUri = blobClient.Uri.ToString(),
            ContentType = order.ContentType,
            ContentLength = content.Length,
            OrderId = order.OrderId,
            Timestamp = DateTime.UtcNow
        };
        
        var message = new ServiceBusMessage(
            Encoding.UTF8.GetBytes(JsonSerializer.Serialize(claimCheck)))
        {
            ContentType = "application/json",
            MessageId = order.OrderId,
            Subject = "LargeOrder",
            Properties =
            {
                ["claimCheck"] = true,
                ["orderId"] = order.OrderId
            }
        };
        
        await _sender.SendMessageAsync(message);
        
        _logger.LogInformation("Sent claim check for order {OrderId}, payload size: {Size} bytes", 
            order.OrderId, content.Length);
    }
}

Step 2: Claim Check Consumer

public class ClaimCheckConsumer
{
    private readonly BlobServiceClient _blobClient;
    private readonly ServiceBusProcessor _processor;
    private readonly ILogger<ClaimCheckConsumer> _logger;
    
    public ClaimCheckConsumer(
        BlobServiceClient blobClient,
        ServiceBusProcessor processor,
        ILogger<ClaimCheckConsumer> logger)
    {
        _blobClient = blobClient;
        _processor = processor;
        _logger = logger;
    }
    
    public async Task StartProcessingAsync()
    {
        _processor.ProcessMessageAsync += async args =>
        {
            var message = args.Message;
            
            try
            {
                // Check if this is a claim check message
                if (!message.Properties.TryGetValue("claimCheck", out var isClaimCheck) ||
                    isClaimCheck?.ToString() != "true")
                {
                    // Not a claim check - process normally
                    await ProcessNormalMessageAsync(message);
                    return;
                }
                
                // Parse claim check
                var claimCheck = JsonSerializer.Deserialize<ClaimCheckMessage>(
                    Encoding.UTF8.GetString(message.Body));
                
                _logger.LogInformation("Processing claim check for order {OrderId}, URI: {Uri}", 
                    claimCheck.OrderId, claimCheck.BlobUri);
                
                // Retrieve payload from Blob Storage
                var payload = await RetrievePayloadAsync(claimCheck.BlobUri);
                
                // Process the actual payload
                var order = JsonSerializer.Deserialize<Order>(payload);
                await ProcessOrderAsync(order);
                
                // Complete the message
                await args.CompleteMessageAsync(message);
                
                // Optional: Clean up blob after processing
                // await DeleteBlobAsync(claimCheck.BlobUri);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Failed to process claim check message");
                await args.AbandonMessageAsync(message);
            }
        };
        
        await _processor.StartProcessingAsync();
    }
    
    private async Task<string> RetrievePayloadAsync(string blobUri)
    {
        var blobClient = new BlobClient(new Uri(blobUri));
        
        var response = await blobClient.DownloadContentAsync();
        
        return response.Value.Content.ToString();
    }
    
    private async Task ProcessOrderAsync(Order order)
    {
        // Your order processing logic
        _logger.LogInformation("Processing order {OrderId} with total: {Total}", 
            order.OrderId, order.TotalAmount);
        
        await Task.Delay(100); // Simulate processing
    }
}

Advanced Implementation

With Automatic Cleanup

public class ClaimCheckPublisherWithCleanup
{
    private readonly BlobServiceClient _blobClient;
    private readonly ServiceBusSender _sender;
    private readonly TableServiceClient _tableClient;
    
    public async Task PublishWithTrackingAsync(Order order)
    {
        var orderJson = JsonSerializer.Serialize(order);
        var blobUri = await UploadToBlobAsync(order);
        
        // Create claim check
        var claimCheck = new ClaimCheckMessage
        {
            BlobUri = blobUri,
            OrderId = order.OrderId,
            ContentType = "application/json",
            ContentLength = Encoding.UTF8.GetByteCount(orderJson)
        };
        
        // Track in Table Storage for cleanup
        await TrackClaimCheckAsync(claimCheck);
        
        // Send message
        await _sender.SendMessageAsync(CreateMessage(claimCheck));
    }
    
    private async Task<string> UploadToBlobAsync(Order order)
    {
        var containerClient = _blobClient.GetBlobContainerClient("orders");
        var blobClient = containerClient.GetBlobClient($"orders/{order.OrderId}.json");
        
        await blobClient.UploadAsync(new MemoryStream(Encoding.UTF8.GetBytes(
            JsonSerializer.Serialize(order))));
        
        return blobClient.Uri.ToString();
    }
    
    private async Task TrackClaimCheckAsync(ClaimCheckMessage claimCheck)
    {
        var tableClient = _tableClient.GetTableClient("ClaimCheckTracking");
        
        await tableClient.UpsertEntityAsync(new TableEntity
        {
            PartitionKey = "ClaimCheck",
            RowKey = claimCheck.OrderId,
            ["BlobUri"] = claimCheck.BlobUri,
            ["CreatedAt"] = DateTime.UtcNow,
            ["Processed"] = false
        });
    }
    
    public async Task MarkAsProcessedAsync(string orderId)
    {
        var tableClient = _tableClient.GetTableClient("ClaimCheckTracking");
        
        var entity = await tableClient.GetEntityAsync<TableEntity>("ClaimCheck", orderId);
        
        entity.Value["Processed"] = true;
        entity.Value["ProcessedAt"] = DateTime.UtcNow;
        
        await tableClient.UpsertEntityAsync(entity);
    }
    
    public async Task CleanupOldBlobsAsync(int retentionDays = 7)
    {
        var tableClient = _tableClient.GetTableClient("ClaimCheckTracking");
        var cutoff = DateTime.UtcNow.AddDays(-retentionDays);
        
        var query = $"PartitionKey eq 'ClaimCheck' and Processed eq true and ProcessedAt lt '{cutoff:yyyy-MM-dd}'";
        
        await foreach (var entity in tableClient.QueryAsync<TableEntity>(query))
        {
            try
            {
                var blobClient = new BlobClient(new Uri(entity.GetString("BlobUri")));
                await blobClient.DeleteIfExistsAsync();
                
                await tableClient.DeleteEntityAsync(entity.PartitionKey, entity.RowKey);
            }
            catch (Exception)
            {
                // Log and continue
            }
        }
    }
}

With Encryption

public class EncryptedClaimCheckPublisher
{
    private readonly BlobServiceClient _blobClient;
    private readonly KeyClient _keyClient;
    private readonly ServiceBusSender _sender;
    
    public async Task PublishEncryptedAsync(Order order)
    {
        // Encrypt the payload
        var orderJson = JsonSerializer.Serialize(order);
        var encryptedPayload = await EncryptAsync(orderJson);
        
        // Upload to blob
        var blobUri = await UploadEncryptedBlobAsync(encryptedPayload);
        
        // Generate decryption key reference
        var keyId = await GenerateAndStoreKeyAsync(encryptedPayload.Key);
        
        // Create claim check with key reference
        var claimCheck = new ClaimCheckMessage
        {
            BlobUri = blobUri,
            KeyId = keyId,
            ContentType = order.ContentType,
            ContentLength = encryptedPayload.Ciphertext.Length,
            OrderId = order.OrderId
        };
        
        // Send claim check
        await _sender.SendMessageAsync(CreateMessage(claimCheck));
    }
    
    private async Task<EncryptedPayload> EncryptAsync(string plaintext)
    {
        var key = _keyClient.GetKey("my-key");
        var encryptor = new Encryptor(key);
        
        return encryptor.Encrypt(plaintext);
    }
    
    private async Task<string> UploadEncryptedBlobAsync(EncryptedPayload payload)
    {
        var containerClient = _blobClient.GetBlobContainerClient("encrypted-orders");
        var blobClient = containerClient.GetBlobClient($"{payload.KeyId}.enc");
        
        using var stream = new MemoryStream(payload.Ciphertext);
        await blobClient.UploadAsync(stream);
        
        return blobClient.Uri.ToString();
    }
}

Benefits and Tradeoffs

Benefits

BenefitDescription
Bypass size limitsHandle messages of any size
Cost efficiencyBlob storage cheaper than premium SB
Faster processingSmaller SB messages = faster throughput
RetentionLeverage Blob lifecycle policies
GovernanceSeparate retention for messages vs payloads

Tradeoffs

TradeoffConsideration
Extra latencyBlob retrieval adds ~100-500ms
Two-step processMore complex than direct messaging
Additional costBlob storage + transactions
Failure handlingMust handle both SB and blob failures
IdempotencyMust handle blob retrieval idempotently

Best Practices

PracticeDescription
Use managed identityAvoid storing storage keys
Implement cleanupDelete blobs after processing
Track referencesStore mappings in Table/DB
Handle large filesStream instead of loading into memory
Add retriesBlob operations can fail transiently
Set proper TTLClean up blobs after retention period

Use Cases

  1. Document processing — Large PDFs/images through messaging
  2. Event sourcing — Large event payloads
  3. Data export/import — Export large datasets via queues
  4. Media processing — Video/audio file processing pipelines

Azure Integration Hub - Advanced Level