Azure Service Bus — Claim-Check Pattern
Large Payloads in Blob, Send Reference via SB
Introduction
The Claim-Check pattern solves a fundamental problem in messaging: what happens when your message payload exceeds the broker's size limits or when you want to optimize costs by keeping messages small?
Instead of sending large payloads through Service Bus, you:
- Store the payload in Azure Blob Storage
- Send a small "claim check" (reference) through Service Bus
- Consumer retrieves the payload from storage using the reference
This pattern enables:
- Bypassing message size limits (Service Bus: 256KB standard, 100MB premium)
- Cost optimization — Blob storage is cheaper than premium queues
- Faster message processing — Smaller messages = faster throughput
- Data governance — Large payloads stored with proper retention policies
Architecture
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Producer │───────▶ │ Azure Blob │ │ Consumer │
│ │ 1. │ Storage │ │ │
│ │────────▶│ │◀──3. │ │
└─────────────┘ └──────────────┘ └─────────────┘
│ ▲
│ 2. Send Claim Check │
│ (Small Message) │
▼ │
┌────────────────────────────────────────────────────────────┐
│ Azure Service Bus │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Message { │ │
│ │ "claimCheck": "true", │ │
│ │ "blobUri": "https://storage.blob.core.windows.", │ │
│ │ "contentType": "application/json", │ │
│ │ "contentLength": 52428800 │ │
│ │ } │ │
│ └──────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────┘
Implementation
Step 1: Claim Check Publisher
public class ClaimCheckPublisher
{
private readonly BlobServiceClient _blobClient;
private readonly ServiceBusSender _sender;
private readonly ILogger<ClaimCheckPublisher> _logger;
public async Task PublishLargeOrderAsync(Order order)
{
var containerName = "order-payloads";
var blobName = $"orders/{order.OrderId}/payload.json";
// Step 1: Upload payload to Blob Storage
var containerClient = _blobClient.GetBlobContainerClient(containerName);
await containerClient.CreateIfNotExistsAsync(PublicAccessType.None);
var blobClient = containerClient.GetBlobClient(blobName);
var orderJson = JsonSerializer.Serialize(order);
var content = Encoding.UTF8.GetBytes(orderJson);
// Upload with metadata for retrieval
await blobClient.UploadAsync(
new MemoryStream(content),
new BlobHttpHeaders
{
ContentType = "application/json",
ContentHash = ComputeMD5Hash(content)
},
new Dictionary<string, string>
{
["orderId"] = order.OrderId,
["createdAt"] = DateTime.UtcNow.ToString("O"),
["contentType"] = order.ContentType,
["sizeBytes"] = content.Length.ToString()
});
_logger.LogInformation("Uploaded payload for order {OrderId} to blob: {BlobUri}",
order.OrderId, blobClient.Uri);
// Step 2: Create and send claim check message
var claimCheck = new ClaimCheckMessage
{
BlobUri = blobClient.Uri.ToString(),
ContentType = order.ContentType,
ContentLength = content.Length,
OrderId = order.OrderId,
Timestamp = DateTime.UtcNow
};
var message = new ServiceBusMessage(
Encoding.UTF8.GetBytes(JsonSerializer.Serialize(claimCheck)))
{
ContentType = "application/json",
MessageId = order.OrderId,
Subject = "LargeOrder",
Properties =
{
["claimCheck"] = true,
["orderId"] = order.OrderId
}
};
await _sender.SendMessageAsync(message);
_logger.LogInformation("Sent claim check for order {OrderId}, payload size: {Size} bytes",
order.OrderId, content.Length);
}
}
Step 2: Claim Check Consumer
public class ClaimCheckConsumer
{
private readonly BlobServiceClient _blobClient;
private readonly ServiceBusProcessor _processor;
private readonly ILogger<ClaimCheckConsumer> _logger;
public ClaimCheckConsumer(
BlobServiceClient blobClient,
ServiceBusProcessor processor,
ILogger<ClaimCheckConsumer> logger)
{
_blobClient = blobClient;
_processor = processor;
_logger = logger;
}
public async Task StartProcessingAsync()
{
_processor.ProcessMessageAsync += async args =>
{
var message = args.Message;
try
{
// Check if this is a claim check message
if (!message.Properties.TryGetValue("claimCheck", out var isClaimCheck) ||
isClaimCheck?.ToString() != "true")
{
// Not a claim check - process normally
await ProcessNormalMessageAsync(message);
return;
}
// Parse claim check
var claimCheck = JsonSerializer.Deserialize<ClaimCheckMessage>(
Encoding.UTF8.GetString(message.Body));
_logger.LogInformation("Processing claim check for order {OrderId}, URI: {Uri}",
claimCheck.OrderId, claimCheck.BlobUri);
// Retrieve payload from Blob Storage
var payload = await RetrievePayloadAsync(claimCheck.BlobUri);
// Process the actual payload
var order = JsonSerializer.Deserialize<Order>(payload);
await ProcessOrderAsync(order);
// Complete the message
await args.CompleteMessageAsync(message);
// Optional: Clean up blob after processing
// await DeleteBlobAsync(claimCheck.BlobUri);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to process claim check message");
await args.AbandonMessageAsync(message);
}
};
await _processor.StartProcessingAsync();
}
private async Task<string> RetrievePayloadAsync(string blobUri)
{
var blobClient = new BlobClient(new Uri(blobUri));
var response = await blobClient.DownloadContentAsync();
return response.Value.Content.ToString();
}
private async Task ProcessOrderAsync(Order order)
{
// Your order processing logic
_logger.LogInformation("Processing order {OrderId} with total: {Total}",
order.OrderId, order.TotalAmount);
await Task.Delay(100); // Simulate processing
}
}
Advanced Implementation
With Automatic Cleanup
public class ClaimCheckPublisherWithCleanup
{
private readonly BlobServiceClient _blobClient;
private readonly ServiceBusSender _sender;
private readonly TableServiceClient _tableClient;
public async Task PublishWithTrackingAsync(Order order)
{
var orderJson = JsonSerializer.Serialize(order);
var blobUri = await UploadToBlobAsync(order);
// Create claim check
var claimCheck = new ClaimCheckMessage
{
BlobUri = blobUri,
OrderId = order.OrderId,
ContentType = "application/json",
ContentLength = Encoding.UTF8.GetByteCount(orderJson)
};
// Track in Table Storage for cleanup
await TrackClaimCheckAsync(claimCheck);
// Send message
await _sender.SendMessageAsync(CreateMessage(claimCheck));
}
private async Task<string> UploadToBlobAsync(Order order)
{
var containerClient = _blobClient.GetBlobContainerClient("orders");
var blobClient = containerClient.GetBlobClient($"orders/{order.OrderId}.json");
await blobClient.UploadAsync(new MemoryStream(Encoding.UTF8.GetBytes(
JsonSerializer.Serialize(order))));
return blobClient.Uri.ToString();
}
private async Task TrackClaimCheckAsync(ClaimCheckMessage claimCheck)
{
var tableClient = _tableClient.GetTableClient("ClaimCheckTracking");
await tableClient.UpsertEntityAsync(new TableEntity
{
PartitionKey = "ClaimCheck",
RowKey = claimCheck.OrderId,
["BlobUri"] = claimCheck.BlobUri,
["CreatedAt"] = DateTime.UtcNow,
["Processed"] = false
});
}
public async Task MarkAsProcessedAsync(string orderId)
{
var tableClient = _tableClient.GetTableClient("ClaimCheckTracking");
var entity = await tableClient.GetEntityAsync<TableEntity>("ClaimCheck", orderId);
entity.Value["Processed"] = true;
entity.Value["ProcessedAt"] = DateTime.UtcNow;
await tableClient.UpsertEntityAsync(entity);
}
public async Task CleanupOldBlobsAsync(int retentionDays = 7)
{
var tableClient = _tableClient.GetTableClient("ClaimCheckTracking");
var cutoff = DateTime.UtcNow.AddDays(-retentionDays);
var query = $"PartitionKey eq 'ClaimCheck' and Processed eq true and ProcessedAt lt '{cutoff:yyyy-MM-dd}'";
await foreach (var entity in tableClient.QueryAsync<TableEntity>(query))
{
try
{
var blobClient = new BlobClient(new Uri(entity.GetString("BlobUri")));
await blobClient.DeleteIfExistsAsync();
await tableClient.DeleteEntityAsync(entity.PartitionKey, entity.RowKey);
}
catch (Exception)
{
// Log and continue
}
}
}
}
With Encryption
public class EncryptedClaimCheckPublisher
{
private readonly BlobServiceClient _blobClient;
private readonly KeyClient _keyClient;
private readonly ServiceBusSender _sender;
public async Task PublishEncryptedAsync(Order order)
{
// Encrypt the payload
var orderJson = JsonSerializer.Serialize(order);
var encryptedPayload = await EncryptAsync(orderJson);
// Upload to blob
var blobUri = await UploadEncryptedBlobAsync(encryptedPayload);
// Generate decryption key reference
var keyId = await GenerateAndStoreKeyAsync(encryptedPayload.Key);
// Create claim check with key reference
var claimCheck = new ClaimCheckMessage
{
BlobUri = blobUri,
KeyId = keyId,
ContentType = order.ContentType,
ContentLength = encryptedPayload.Ciphertext.Length,
OrderId = order.OrderId
};
// Send claim check
await _sender.SendMessageAsync(CreateMessage(claimCheck));
}
private async Task<EncryptedPayload> EncryptAsync(string plaintext)
{
var key = _keyClient.GetKey("my-key");
var encryptor = new Encryptor(key);
return encryptor.Encrypt(plaintext);
}
private async Task<string> UploadEncryptedBlobAsync(EncryptedPayload payload)
{
var containerClient = _blobClient.GetBlobContainerClient("encrypted-orders");
var blobClient = containerClient.GetBlobClient($"{payload.KeyId}.enc");
using var stream = new MemoryStream(payload.Ciphertext);
await blobClient.UploadAsync(stream);
return blobClient.Uri.ToString();
}
}
Benefits and Tradeoffs
Benefits
| Benefit | Description |
|---|---|
| Bypass size limits | Handle messages of any size |
| Cost efficiency | Blob storage cheaper than premium SB |
| Faster processing | Smaller SB messages = faster throughput |
| Retention | Leverage Blob lifecycle policies |
| Governance | Separate retention for messages vs payloads |
Tradeoffs
| Tradeoff | Consideration |
|---|---|
| Extra latency | Blob retrieval adds ~100-500ms |
| Two-step process | More complex than direct messaging |
| Additional cost | Blob storage + transactions |
| Failure handling | Must handle both SB and blob failures |
| Idempotency | Must handle blob retrieval idempotently |
Best Practices
| Practice | Description |
|---|---|
| Use managed identity | Avoid storing storage keys |
| Implement cleanup | Delete blobs after processing |
| Track references | Store mappings in Table/DB |
| Handle large files | Stream instead of loading into memory |
| Add retries | Blob operations can fail transiently |
| Set proper TTL | Clean up blobs after retention period |
Use Cases
- Document processing — Large PDFs/images through messaging
- Event sourcing — Large event payloads
- Data export/import — Export large datasets via queues
- Media processing — Video/audio file processing pipelines
Azure Integration Hub - Advanced Level