Azure Service Bus — Transactional Messaging

Atomic Send + Complete in a Single Transaction


Introduction

Transaction support in Azure Service Bus enables atomic operations across multiple messaging entities. This is critical for scenarios where you need to ensure that multiple operations succeed or fail together, maintaining data consistency across your system.

Key capabilities include:

  • Send multiple messages atomically — All messages are sent or none are
  • Send + complete in one transaction — Process and forward in a single atomic operation
  • Cross-entity transactions — Messages to different queues/topics within the same namespace
  • No message loss — Guaranteed delivery within the transaction scope

Understanding Service Bus Transactions

Transaction Scope

A transaction scope wraps multiple operations that must succeed or fail together:

await using var scope = new TransactionScope(
    AsyncFlowOption.Enabled);

// All operations here are atomic
await sender1.SendAsync(message1);
await sender2.SendAsync(message2);

await scope.CompleteAsync();  // Commit or rollback

Requirements

  1. Same namespace — All entities must be in the same Service Bus namespace
  2. Premium tier — Transactions require Premium or higher tier
  3. No sessions — Transactions don't support message sessions

Basic Transactional Pattern

Scenario: Order Processing

When an order is placed, you need to:

  1. Save order confirmation to a log
  2. Send to order processing queue
  3. Send notification to customer

All three must succeed together:

public class OrderService
{
    private readonly ServiceBusClient _client;
    
    public async Task ProcessOrderTransactionallyAsync(Order order)
    {
        // Create a new transaction scope
        await using var scope = new TransactionScope(
            AsyncFlowOption.Enabled,
            TransactionScopeAsyncFlowOption.Enabled);
        
        // Sender for order processing
        var orderSender = _client.CreateSender("order-processing");
        
        // Sender for customer notifications
        var notificationSender = _client.CreateSender("customer-notifications");
        
        // Sender for audit/logging
        var auditSender = _client.CreateSender("audit-log");
        
        // 1. Create order message
        var orderMessage = new ServiceBusMessage(
            Encoding.UTF8.GetBytes(JsonSerializer.Serialize(order)))
        {
            ContentType = "application/json",
            Subject = "NewOrder",
            MessageId = order.OrderId,
            Properties =
            {
                ["OrderId"] = order.OrderId,
                ["CustomerId"] = order.CustomerId,
                ["Timestamp"] = DateTime.UtcNow.ToString("O")
            }
        };
        
        // 2. Create notification message
        var notificationMessage = new ServiceBusMessage(
            Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new
            {
                CustomerId = order.CustomerId,
                Email = order.CustomerEmail,
                Subject = "Order Confirmed",
                Body = $"Your order {order.OrderId} has been confirmed."
            })))
        {
            ContentType = "application/json",
            Subject = "OrderNotification"
        };
        
        // 3. Create audit message
        var auditMessage = new ServiceBusMessage(
            Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new
            {
                EventType = "OrderCreated",
                OrderId = order.OrderId,
                CustomerId = order.CustomerId,
                TotalAmount = order.TotalAmount,
                Timestamp = DateTime.UtcNow
            })))
        {
            ContentType = "application/json",
            Subject = "AuditEvent"
        };
        
        // Send all three messages atomically
        // If ANY fails, ALL are rolled back
        await Task.WhenAll(
            orderSender.SendMessageAsync(orderMessage),
            notificationSender.SendMessageAsync(notificationMessage),
            auditSender.SendMessageAsync(auditMessage));
        
        // Commit the transaction
        await scope.CompleteAsync();
        
        // Only reaches here if all sends succeeded
        _logger.LogInformation("Order {OrderId} processed transactionally", order.OrderId);
    }
}

Send + Complete Pattern

This pattern processes a message and forwards it to another queue in a single transaction:

public async Task ProcessAndForwardAsync(
    ServiceBusReceivedMessage message,
    ServiceBusMessageProcessor processor)
{
    await using var scope = new TransactionScope(AsyncFlowOption.Enabled);
    
    // 1. Process the message (e.g., transform)
    var processedData = await ProcessMessageAsync(message);
    
    // 2. Create forward message
    var forwardMessage = new ServiceBusMessage(
        Encoding.UTF8.GetBytes(JsonSerializer.Serialize(processedData)))
    {
        ContentType = "application/json",
        Properties =
        {
            ["OriginalMessageId"] = message.MessageId,
            ["ProcessedAt"] = DateTime.UtcNow.ToString("O")
        }
    };
    
    // 3. Forward to another queue
    var forwardSender = _client.CreateSender("processed-messages");
    await forwardSender.SendMessageAsync(forwardMessage);
    
    // 4. Complete the original message
    await processor.CompleteMessageAsync(message);
    
    // Commit both operations
    await scope.CompleteAsync();
}

Processing with Transaction

Receive, Process, and Send

[FunctionName("TransactionalProcessor")]
public static async Task TransactionalProcessor(
    [ServiceBusTrigger("input-queue", Connection = "ServiceBusConnection")] 
    ServiceBusReceivedMessage message,
    ServiceBusClient client,
    ILogger logger)
{
    await using var scope = new TransactionScope(AsyncFlowOption.Enabled);
    
    logger.LogInformation("Processing message {MessageId}", message.MessageId);
    
    // Deserialize and process
    var order = JsonSerializer.Deserialize<Order>(Encoding.UTF8.GetString(message.Body));
    
    // Validate order
    if (!ValidateOrder(order))
    {
        logger.LogWarning("Order {OrderId} validation failed", order.OrderId);
        await message.AbandonAsync();
        return;
    }
    
    // Add processing metadata
    order.ProcessedAt = DateTime.UtcNow;
    order.ProcessorVersion = "2.0";
    
    // Create enriched message for downstream
    var enrichedMessage = new ServiceBusMessage(
        Encoding.UTF8.GetBytes(JsonSerializer.Serialize(order)))
    {
        ContentType = "application/json",
        Subject = "ProcessedOrder"
    };
    
    // Send to output queue
    var sender = client.CreateSender("output-queue");
    await sender.SendMessageAsync(enrichedMessage);
    
    // Complete the input message
    await Task.CompletedTask;
    
    await scope.CompleteAsync();
    
    logger.LogInformation("Successfully processed message {MessageId}", message.MessageId);
}

private static bool ValidateOrder(Order order)
{
    // Validation logic
    return order != null && 
           !string.IsNullOrEmpty(order.OrderId) &&
           order.TotalAmount > 0;
}

Cross-Entity Transactions

Send to Multiple Queues and Topics

public async Task PublishToMultipleEntitiesAsync(Order order)
{
    await using var scope = new TransactionScope(AsyncFlowOption.Enabled);
    
    // All entities can be in the same or different namespaces (same subscription)
    
    // 1. Send to main processing queue
    var processingSender = _client.CreateSender("order-processing");
    await processingSender.SendMessageAsync(CreateOrderMessage(order));
    
    // 2. Send to analytics topic
    var analyticsTopicSender = _client.CreateSender("analytics-topic");
    await analyticsTopicSender.SendMessageAsync(CreateAnalyticsMessage(order));
    
    // 3. Send to audit queue
    var auditSender = _client.CreateSender("audit-log");
    await auditSender.SendMessageAsync(CreateAuditMessage(order));
    
    // Commit all
    await scope.CompleteAsync();
}

private static ServiceBusMessage CreateOrderMessage(Order order)
{
    return new ServiceBusMessage(Encoding.UTF8.GetBytes(JsonSerializer.Serialize(order)))
    {
        ContentType = "application/json",
        Subject = "Order",
        MessageId = order.OrderId
    };
}

private static ServiceBusMessage CreateAnalyticsMessage(Order order)
{
    return new ServiceBusMessage(Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new
    {
        EventType = "OrderCreated",
        OrderId = order.OrderId,
        Amount = order.TotalAmount,
        CustomerId = order.CustomerId,
        Timestamp = DateTime.UtcNow
    })))
    {
        ContentType = "application/json",
        Subject = "AnalyticsEvent"
    };
}

private static ServiceBusMessage CreateAuditMessage(Order order)
{
    return new ServiceBusMessage(Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new
    {
        Timestamp = DateTime.UtcNow,
        Action = "CREATE",
        EntityType = "Order",
        EntityId = order.OrderId,
        User = "System"
    })))
    {
        ContentType = "application/json",
        Subject = "Audit"
    };
}

Error Handling and Rollback

Automatic Rollback on Failure

public async Task ProcessWithRollbackAsync(Order order)
{
    try
    {
        await using var scope = new TransactionScope(AsyncFlowOption.Enabled);
        
        // Step 1: Validate
        if (!ValidateOrder(order))
        {
            logger.LogWarning("Order validation failed - not starting transaction");
            return; // No transaction started
        }
        
        // Step 2: Reserve inventory (sends to inventory queue)
        await inventorySender.SendMessageAsync(CreateInventoryReservation(order));
        
        // Step 3: Process payment (sends to payment queue)
        await paymentSender.SendMessageAsync(CreatePaymentRequest(order));
        
        // If we get here, all operations succeeded
        await scope.CompleteAsync();
        
        logger.LogInformation("Transaction committed for order {OrderId}", order.OrderId);
    }
    catch (Exception ex)
    {
        // Transaction automatically rolled back when scope is disposed
        // without calling CompleteAsync()
        logger.LogError(ex, "Transaction failed for order {OrderId}", order.OrderId);
        
        // Additional handling: dead letter, alerts, etc.
        await HandleTransactionFailureAsync(order, ex);
    }
}

Manual Compensation

public async Task ProcessWithCompensationAsync(Order order)
{
    await using var scope = new TransactionScope(AsyncFlowOption.Enabled);
    
    var completedSteps = new List<string>();
    
    try
    {
        // Step 1: Reserve inventory
        var inventoryResult = await ReserveInventoryAsync(order);
        if (!inventoryResult.Success)
            throw new InvalidOperationException("Inventory reservation failed");
        completedSteps.Add("inventory");
        
        // Step 2: Process payment
        var paymentResult = await ProcessPaymentAsync(order);
        if (!paymentResult.Success)
            throw new InvalidOperationException("Payment failed");
        completedSteps.Add("payment");
        
        // Step 3: Create order
        await CreateOrderAsync(order);
        completedSteps.Add("order");
        
        // All steps succeeded - commit
        await scope.CompleteAsync();
    }
    catch (Exception ex)
    {
        // Compensate for completed steps
        await CompensateAsync(completedSteps, order, ex);
        
        // Don't call CompleteAsync() - transaction rolls back
        logger.LogError(ex, "Transaction failed, compensation applied for order {OrderId}", 
            order.OrderId);
    }
}

private async Task CompensateAsync(List<string> completedSteps, Order order, Exception ex)
{
    // Reverse operations in reverse order
    foreach (var step in completedSteps.AsEnumerable().Reverse())
    {
        try
        {
            switch (step)
            {
                case "order":
                    await CancelOrderAsync(order);
                    break;
                case "payment":
                    await RefundPaymentAsync(order);
                    break;
                case "inventory":
                    await ReleaseInventoryReservationAsync(order);
                    break;
            }
        }
        catch (Exception compensationEx)
        {
            logger.LogError(compensationEx, "Failed to compensate {Step} for order {OrderId}", 
                step, order.OrderId);
            // Log for manual intervention
            await AlertAsync($"Compensation failed: {step}, Order: {order.OrderId}", ex);
        }
    }
}

Performance Considerations

Transaction Scope Overhead

OperationLatency Impact
Single sendBaseline
2 messages in transaction+2-5ms
5 messages in transaction+5-15ms
10 messages in transaction+10-30ms

Best Practices

PracticeDescription
Batch related messagesGroup related operations in single transaction
Keep transactions shortDon't hold transactions open while waiting for external calls
Avoid cross-namespaceNot supported - use separate transactions
Consider partitioningBalance transaction scope with parallelism

Comparing Tiers

FeatureBasicStandardPremium
Transactions
Cross-entity
Max message size256KB256KB100MB
Queue size1GB80GB100GB

Use Cases

Use CaseWhy Transactions
Order processingOrder + Inventory + Payment must succeed together
Data replicationSource + Destination + Audit must match
Event sourcingEvent + Snapshot + Projection update
Microservices sagaMultiple service updates in single operation

Configuration

Enable Transactions (Premium)

# Premium namespace is required for transactions
# Create Premium namespace
az servicebus namespace create \
  --name my-namespace \
  --resource-group my-rg \
  --location eastus \
  --sku Premium \
  --capacity 1

Connection String

Endpoint=sb://my-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxx

Best Practices Summary

  1. Use Premium tier — Transactions require Premium
  2. Same namespace — All entities in same namespace
  3. Keep it simple — Don't over-engineer transaction scope
  4. Handle failures — Plan for rollback scenarios
  5. Monitor latency — Transactions add overhead

Common Pitfalls

  • ❌ Creating transaction scope inside a message handler loop
  • ❌ Not calling CompleteAsync() - causes rollback
  • ❌ Mixing service tiers - Premium required
  • ❌ Long-running transactions - hold locks
  • ❌ Forgetting to dispose scope

Azure Integration Hub - Advanced Level