Azure Service Bus — Transactional Messaging
Atomic Send + Complete in a Single Transaction
Introduction
Transaction support in Azure Service Bus enables atomic operations across multiple messaging entities. This is critical for scenarios where you need to ensure that multiple operations succeed or fail together, maintaining data consistency across your system.
Key capabilities include:
- Send multiple messages atomically — All messages are sent or none are
- Send + complete in one transaction — Process and forward in a single atomic operation
- Cross-entity transactions — Messages to different queues/topics within the same namespace
- No message loss — Guaranteed delivery within the transaction scope
Understanding Service Bus Transactions
Transaction Scope
A transaction scope wraps multiple operations that must succeed or fail together:
await using var scope = new TransactionScope(
AsyncFlowOption.Enabled);
// All operations here are atomic
await sender1.SendAsync(message1);
await sender2.SendAsync(message2);
await scope.CompleteAsync(); // Commit or rollback
Requirements
- Same namespace — All entities must be in the same Service Bus namespace
- Premium tier — Transactions require Premium or higher tier
- No sessions — Transactions don't support message sessions
Basic Transactional Pattern
Scenario: Order Processing
When an order is placed, you need to:
- Save order confirmation to a log
- Send to order processing queue
- Send notification to customer
All three must succeed together:
public class OrderService
{
private readonly ServiceBusClient _client;
public async Task ProcessOrderTransactionallyAsync(Order order)
{
// Create a new transaction scope
await using var scope = new TransactionScope(
AsyncFlowOption.Enabled,
TransactionScopeAsyncFlowOption.Enabled);
// Sender for order processing
var orderSender = _client.CreateSender("order-processing");
// Sender for customer notifications
var notificationSender = _client.CreateSender("customer-notifications");
// Sender for audit/logging
var auditSender = _client.CreateSender("audit-log");
// 1. Create order message
var orderMessage = new ServiceBusMessage(
Encoding.UTF8.GetBytes(JsonSerializer.Serialize(order)))
{
ContentType = "application/json",
Subject = "NewOrder",
MessageId = order.OrderId,
Properties =
{
["OrderId"] = order.OrderId,
["CustomerId"] = order.CustomerId,
["Timestamp"] = DateTime.UtcNow.ToString("O")
}
};
// 2. Create notification message
var notificationMessage = new ServiceBusMessage(
Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new
{
CustomerId = order.CustomerId,
Email = order.CustomerEmail,
Subject = "Order Confirmed",
Body = $"Your order {order.OrderId} has been confirmed."
})))
{
ContentType = "application/json",
Subject = "OrderNotification"
};
// 3. Create audit message
var auditMessage = new ServiceBusMessage(
Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new
{
EventType = "OrderCreated",
OrderId = order.OrderId,
CustomerId = order.CustomerId,
TotalAmount = order.TotalAmount,
Timestamp = DateTime.UtcNow
})))
{
ContentType = "application/json",
Subject = "AuditEvent"
};
// Send all three messages atomically
// If ANY fails, ALL are rolled back
await Task.WhenAll(
orderSender.SendMessageAsync(orderMessage),
notificationSender.SendMessageAsync(notificationMessage),
auditSender.SendMessageAsync(auditMessage));
// Commit the transaction
await scope.CompleteAsync();
// Only reaches here if all sends succeeded
_logger.LogInformation("Order {OrderId} processed transactionally", order.OrderId);
}
}
Send + Complete Pattern
This pattern processes a message and forwards it to another queue in a single transaction:
public async Task ProcessAndForwardAsync(
ServiceBusReceivedMessage message,
ServiceBusMessageProcessor processor)
{
await using var scope = new TransactionScope(AsyncFlowOption.Enabled);
// 1. Process the message (e.g., transform)
var processedData = await ProcessMessageAsync(message);
// 2. Create forward message
var forwardMessage = new ServiceBusMessage(
Encoding.UTF8.GetBytes(JsonSerializer.Serialize(processedData)))
{
ContentType = "application/json",
Properties =
{
["OriginalMessageId"] = message.MessageId,
["ProcessedAt"] = DateTime.UtcNow.ToString("O")
}
};
// 3. Forward to another queue
var forwardSender = _client.CreateSender("processed-messages");
await forwardSender.SendMessageAsync(forwardMessage);
// 4. Complete the original message
await processor.CompleteMessageAsync(message);
// Commit both operations
await scope.CompleteAsync();
}
Processing with Transaction
Receive, Process, and Send
[FunctionName("TransactionalProcessor")]
public static async Task TransactionalProcessor(
[ServiceBusTrigger("input-queue", Connection = "ServiceBusConnection")]
ServiceBusReceivedMessage message,
ServiceBusClient client,
ILogger logger)
{
await using var scope = new TransactionScope(AsyncFlowOption.Enabled);
logger.LogInformation("Processing message {MessageId}", message.MessageId);
// Deserialize and process
var order = JsonSerializer.Deserialize<Order>(Encoding.UTF8.GetString(message.Body));
// Validate order
if (!ValidateOrder(order))
{
logger.LogWarning("Order {OrderId} validation failed", order.OrderId);
await message.AbandonAsync();
return;
}
// Add processing metadata
order.ProcessedAt = DateTime.UtcNow;
order.ProcessorVersion = "2.0";
// Create enriched message for downstream
var enrichedMessage = new ServiceBusMessage(
Encoding.UTF8.GetBytes(JsonSerializer.Serialize(order)))
{
ContentType = "application/json",
Subject = "ProcessedOrder"
};
// Send to output queue
var sender = client.CreateSender("output-queue");
await sender.SendMessageAsync(enrichedMessage);
// Complete the input message
await Task.CompletedTask;
await scope.CompleteAsync();
logger.LogInformation("Successfully processed message {MessageId}", message.MessageId);
}
private static bool ValidateOrder(Order order)
{
// Validation logic
return order != null &&
!string.IsNullOrEmpty(order.OrderId) &&
order.TotalAmount > 0;
}
Cross-Entity Transactions
Send to Multiple Queues and Topics
public async Task PublishToMultipleEntitiesAsync(Order order)
{
await using var scope = new TransactionScope(AsyncFlowOption.Enabled);
// All entities can be in the same or different namespaces (same subscription)
// 1. Send to main processing queue
var processingSender = _client.CreateSender("order-processing");
await processingSender.SendMessageAsync(CreateOrderMessage(order));
// 2. Send to analytics topic
var analyticsTopicSender = _client.CreateSender("analytics-topic");
await analyticsTopicSender.SendMessageAsync(CreateAnalyticsMessage(order));
// 3. Send to audit queue
var auditSender = _client.CreateSender("audit-log");
await auditSender.SendMessageAsync(CreateAuditMessage(order));
// Commit all
await scope.CompleteAsync();
}
private static ServiceBusMessage CreateOrderMessage(Order order)
{
return new ServiceBusMessage(Encoding.UTF8.GetBytes(JsonSerializer.Serialize(order)))
{
ContentType = "application/json",
Subject = "Order",
MessageId = order.OrderId
};
}
private static ServiceBusMessage CreateAnalyticsMessage(Order order)
{
return new ServiceBusMessage(Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new
{
EventType = "OrderCreated",
OrderId = order.OrderId,
Amount = order.TotalAmount,
CustomerId = order.CustomerId,
Timestamp = DateTime.UtcNow
})))
{
ContentType = "application/json",
Subject = "AnalyticsEvent"
};
}
private static ServiceBusMessage CreateAuditMessage(Order order)
{
return new ServiceBusMessage(Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new
{
Timestamp = DateTime.UtcNow,
Action = "CREATE",
EntityType = "Order",
EntityId = order.OrderId,
User = "System"
})))
{
ContentType = "application/json",
Subject = "Audit"
};
}
Error Handling and Rollback
Automatic Rollback on Failure
public async Task ProcessWithRollbackAsync(Order order)
{
try
{
await using var scope = new TransactionScope(AsyncFlowOption.Enabled);
// Step 1: Validate
if (!ValidateOrder(order))
{
logger.LogWarning("Order validation failed - not starting transaction");
return; // No transaction started
}
// Step 2: Reserve inventory (sends to inventory queue)
await inventorySender.SendMessageAsync(CreateInventoryReservation(order));
// Step 3: Process payment (sends to payment queue)
await paymentSender.SendMessageAsync(CreatePaymentRequest(order));
// If we get here, all operations succeeded
await scope.CompleteAsync();
logger.LogInformation("Transaction committed for order {OrderId}", order.OrderId);
}
catch (Exception ex)
{
// Transaction automatically rolled back when scope is disposed
// without calling CompleteAsync()
logger.LogError(ex, "Transaction failed for order {OrderId}", order.OrderId);
// Additional handling: dead letter, alerts, etc.
await HandleTransactionFailureAsync(order, ex);
}
}
Manual Compensation
public async Task ProcessWithCompensationAsync(Order order)
{
await using var scope = new TransactionScope(AsyncFlowOption.Enabled);
var completedSteps = new List<string>();
try
{
// Step 1: Reserve inventory
var inventoryResult = await ReserveInventoryAsync(order);
if (!inventoryResult.Success)
throw new InvalidOperationException("Inventory reservation failed");
completedSteps.Add("inventory");
// Step 2: Process payment
var paymentResult = await ProcessPaymentAsync(order);
if (!paymentResult.Success)
throw new InvalidOperationException("Payment failed");
completedSteps.Add("payment");
// Step 3: Create order
await CreateOrderAsync(order);
completedSteps.Add("order");
// All steps succeeded - commit
await scope.CompleteAsync();
}
catch (Exception ex)
{
// Compensate for completed steps
await CompensateAsync(completedSteps, order, ex);
// Don't call CompleteAsync() - transaction rolls back
logger.LogError(ex, "Transaction failed, compensation applied for order {OrderId}",
order.OrderId);
}
}
private async Task CompensateAsync(List<string> completedSteps, Order order, Exception ex)
{
// Reverse operations in reverse order
foreach (var step in completedSteps.AsEnumerable().Reverse())
{
try
{
switch (step)
{
case "order":
await CancelOrderAsync(order);
break;
case "payment":
await RefundPaymentAsync(order);
break;
case "inventory":
await ReleaseInventoryReservationAsync(order);
break;
}
}
catch (Exception compensationEx)
{
logger.LogError(compensationEx, "Failed to compensate {Step} for order {OrderId}",
step, order.OrderId);
// Log for manual intervention
await AlertAsync($"Compensation failed: {step}, Order: {order.OrderId}", ex);
}
}
}
Performance Considerations
Transaction Scope Overhead
| Operation | Latency Impact |
|---|---|
| Single send | Baseline |
| 2 messages in transaction | +2-5ms |
| 5 messages in transaction | +5-15ms |
| 10 messages in transaction | +10-30ms |
Best Practices
| Practice | Description |
|---|---|
| Batch related messages | Group related operations in single transaction |
| Keep transactions short | Don't hold transactions open while waiting for external calls |
| Avoid cross-namespace | Not supported - use separate transactions |
| Consider partitioning | Balance transaction scope with parallelism |
Comparing Tiers
| Feature | Basic | Standard | Premium |
|---|---|---|---|
| Transactions | ✗ | ✗ | ✓ |
| Cross-entity | ✗ | ✗ | ✓ |
| Max message size | 256KB | 256KB | 100MB |
| Queue size | 1GB | 80GB | 100GB |
Use Cases
| Use Case | Why Transactions |
|---|---|
| Order processing | Order + Inventory + Payment must succeed together |
| Data replication | Source + Destination + Audit must match |
| Event sourcing | Event + Snapshot + Projection update |
| Microservices saga | Multiple service updates in single operation |
Configuration
Enable Transactions (Premium)
# Premium namespace is required for transactions
# Create Premium namespace
az servicebus namespace create \
--name my-namespace \
--resource-group my-rg \
--location eastus \
--sku Premium \
--capacity 1
Connection String
Endpoint=sb://my-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxx
Best Practices Summary
- Use Premium tier — Transactions require Premium
- Same namespace — All entities in same namespace
- Keep it simple — Don't over-engineer transaction scope
- Handle failures — Plan for rollback scenarios
- Monitor latency — Transactions add overhead
Common Pitfalls
- ❌ Creating transaction scope inside a message handler loop
- ❌ Not calling CompleteAsync() - causes rollback
- ❌ Mixing service tiers - Premium required
- ❌ Long-running transactions - hold locks
- ❌ Forgetting to dispose scope
Azure Integration Hub - Advanced Level