Service Bus Sessions & Ordered Processing

FIFO Message Processing with Session IDs


Introduction

Azure Service Bus Sessions provide a powerful mechanism for processing related messages in strict order. When you need to ensure that messages for a specific entity (like an order, customer, or account) are processed sequentially without interruption, sessions are the solution.

This guide covers:

  • Session fundamentals — How sessions enable ordered processing
  • Configuration — Enabling sessions on queues and subscriptions
  • Implementation — Sending and receiving session messages
  • Session management — Lock behavior and completion
  • Real-world patterns — Common use cases and implementations

Understanding Sessions

How Sessions Work

┌─────────────────────────────────────────────────────────────────────┐
│                    SERVICE BUS SESSIONS                             │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│   SESSION ID: "customer-001"                                        │
│   ┌─────────────────────────────────────────────────────────────┐   │
│   │ Queue with Sessions                                         │   │
│   │                                                              │   │
│   │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐           │   │
│   │ │Msg 1   │ │Msg 2   │ │Msg 3   │ │Msg 4   │           │   │
│   │ │        │ │        │ │        │ │        │           │   │
│   │ │Session │ │Session │ │Session │ │Session │           │   │
│   │ │"cust-1"│ │"cust-1"│ │"cust-2"│ │"cust-1"│           │   │
│   │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘           │   │
│   │      │          │          │          │                  │   │
│   │      └──────────┴──────────┴──────────┘                  │   │
│   │                         │                                  │   │
│   │                         ▼                                  │   │
│   │           ┌─────────────────────────────┐                 │   │
│   │           │   Consumer for Session      │                 │   │
│   │           │       "customer-001"        │                 │   │
│   │           │                             │                 │   │
│   │           │  Process in order:          │                 │   │
│   │           │  1. Msg 1 (Order Placed)    │                 │   │
│   │           │  2. Msg 2 (Payment)         │                 │   │
│   │           │  3. Msg 4 (Shipped)         │                 │   │
│   │           └─────────────────────────────┘                 │   │
│   │                                                              │   │
│   │   Another consumer handles "customer-002"                  │   │
│   └─────────────────────────────────────────────────────────────┘   │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

Why Use Sessions?

ScenarioWithout SessionsWith Sessions
Order processingMessages might be processed out of orderOrders processed in sequence
Inventory updatesStock levels could become inconsistentSequential updates prevent conflicts
Banking transactionsAccount balance could be incorrectTransactions applied in order
User notificationsUser might receive messages in wrong orderNotifications in proper sequence

Enable Sessions

Create Queue with Sessions

var namespaceClient = new ServiceBusClient(connectionString);

// Create queue with session enabled
var queueOptions = new CreateQueueOptions("orders-queue")
{
    RequiresSession = true,           // Enable sessions
    LockDuration = TimeSpan.FromMinutes(1),
    MaxDeliveryCount = 10,
    DefaultMessageTimeToLive = TimeSpan.FromDays(7)
};

await namespaceClient.CreateQueueAsync(queueOptions);

Enable on Topic Subscription

var topicClient = namespaceClient.CreateTopic("orders-topic");
await topicClient.CreateAsync();

// Enable sessions on subscription
var subscriptionOptions = new CreateSubscriptionOptions
{
    TopicName = "orders-topic",
    SubscriptionName = "processing-subscription",
    RequiresSession = true
};

await topicClient.CreateSubscriptionAsync(subscriptionOptions);

Azure Portal Configuration

Queue Settings:
├── Enable Sessions: ON
├── Lock Duration: 1 minute
├── Max Delivery Count: 10
└── Message TTL: 7 days

CLI Configuration

az servicebus queue create \
  --name orders-queue \
  --namespace-name mynamespace \
  --resource-group myrg \
  --enable-session true

Send Messages to Sessions

Basic Session Message

var sender = client.CreateSender("orders-queue");

// All messages with same SessionId go to same consumer
var message1 = new ServiceBusMessage("Order #12345 placed")
{
    SessionId = "customer-001"
};

var message2 = new ServiceBusMessage("Payment received for #12345")
{
    SessionId = "customer-001"
};

var message3 = new ServiceBusMessage("Order #12345 shipped")
{
    SessionId = "customer-001"
};

await sender.SendMessagesAsync(new[] { message1, message2, message3 });

Sending Multiple Sessions

// Each session ID gets processed independently
var messages = new[]
{
    new ServiceBusMessage("Order 1") { SessionId = "customer-001" },
    new ServiceBusMessage("Order 2") { SessionId = "customer-001" },
    new ServiceBusMessage("Order 3") { SessionId = "customer-002" },
    new ServiceBusMessage("Order 4") { SessionId = "customer-002" },
    new ServiceBusMessage("Order 5") { SessionId = "customer-003" }
};

await sender.SendMessagesAsync(messages);
// Result: 3 consumers each get their session's messages in order

Dynamic Session ID

// Use meaningful session IDs
public async Task SendOrderMessagesAsync(Order order)
{
    var sender = client.CreateSender("orders-queue");
    
    var messages = new[]
    {
        new ServiceBusMessage(JsonSerializer.Serialize(new OrderPlacedEvent
        {
            OrderId = order.OrderId,
            CustomerId = order.CustomerId,
            Total = order.Total
        }))
        {
            SessionId = order.OrderId,
            Subject = "OrderPlaced"
        },
        
        new ServiceBusMessage(JsonSerializer.Serialize(new PaymentReceivedEvent
        {
            OrderId = order.OrderId,
            Amount = order.Total,
            PaymentId = order.PaymentId
        }))
        {
            SessionId = order.OrderId,
            Subject = "PaymentReceived"
        },
        
        new ServiceBusMessage(JsonSerializer.Serialize(new OrderShippedEvent
        {
            OrderId = order.OrderId,
            TrackingNumber = order.TrackingNumber
        }))
        {
            SessionId = order.OrderId,
            Subject = "OrderShipped"
        }
    };
    
    await sender.SendMessagesAsync(messages);
}

Receive Messages from Sessions

Basic Session Processing

var processor = client.CreateProcessor(
    "orders-queue",
    new ServiceBusProcessorOptions
    {
        SessionsEnabled = true,    // Enable session processing
        MaxConcurrentCalls = 5    // Process 5 sessions concurrently
    }
);

processor.ProcessMessageAsync += async args =>
{
    var message = args.Message;
    var sessionId = message.SessionId;
    
    Console.WriteLine($"Session: {sessionId}");
    Console.WriteLine($"Message: {message.Body}");
    Console.WriteLine($"Subject: {message.Subject}");
    
    // Process message
    await ProcessOrderMessageAsync(message);
    
    // Complete the message
    await args.CompleteMessageAsync(message);
};

processor.ProcessErrorAsync += args =>
{
    Console.WriteLine($"Error: {args.Exception.Message}");
    return Task.CompletedTask;
};

await processor.StartProcessingAsync();

Accept Specific Session

// Accept a specific session by ID
var sessionReceiver = client.CreateSessionReceiver(
    "orders-queue",
    new ServiceBusSessionReceiverOptions
    {
        SessionId = "customer-001"
    }
);

while (true)
{
    // Receive messages for this session
    var messages = await sessionReceiver.ReceiveMessagesAsync(10);
    
    foreach (var message in messages)
    {
        Console.WriteLine($"Session {message.SessionId}: {message.Body}");
        await sessionReceiver.CompleteMessageAsync(message);
    }
    
    if (messages.Count == 0) break;
}

Session-Aware Processing

public class OrderProcessingService
{
    public async Task ProcessWithSessionAsync(ProcessMessageEventArgs args)
    {
        var message = args.Message;
        var sessionId = message.SessionId;
        
        // Get or create session state
        var sessionState = await GetSessionStateAsync(sessionId);
        
        // Process based on message content
        switch (message.Subject)
        {
            case "OrderPlaced":
                await HandleOrderPlacedAsync(message, sessionState);
                break;
                
            case "PaymentReceived":
                await HandlePaymentReceivedAsync(message, sessionState);
                break;
                
            case "OrderShipped":
                await HandleOrderShippedAsync(message, sessionState);
                break;
        }
        
        // Update session state
        await UpdateSessionStateAsync(sessionId, sessionState);
        
        await args.CompleteMessageAsync(message);
    }
    
    private async Task<Dictionary<string, object>> GetSessionStateAsync(string sessionId)
    {
        // Implementation to retrieve session state
        // Could be stored in cache, database, etc.
    }
}

Session Lock Behavior

How Session Lock Works

┌─────────────────────────────────────────────────────────────────────┐
│                      SESSION LOCK BEHAVIOR                          │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│   Session "customer-001"                                            │
│   ┌─────────────────────────────────────────────────────────────┐   │
│   │                                                              │   │
│   │  ┌─────────┐   ┌─────────┐   ┌─────────┐                  │   │
│   │  │ Msg 1   │ → │ Msg 2   │ → │ Msg 3   │                  │   │
│   │  │ Locked  │   │ Locked  │   │ Available│                 │   │
│   │  │ by      │   │ by      │   │ (next in │                 │   │
│   │  │Consumer │   │Consumer │   │ sequence) │                 │   │
│   │  │    A    │   │    A    │   │           │                 │   │
│   │  └─────────┘   └─────────┘   └───────────┘                 │   │
│   │                                                              │   │
│   │  Consumer A has exclusive access to this session           │   │
│   │  Other consumers cannot access any messages                │   │
│   │                                                              │   │
│   └─────────────────────────────────────────────────────────────┘   │
│                                                                      │
│   If Consumer A fails:                                              │
│   - Session lock expires (based on LockDuration)                   │
│   - Session becomes available to other consumers                  │
│   - Next consumer picks up from where left off                     │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

Lock Duration

var queueOptions = new CreateQueueOptions("orders-queue")
{
    RequiresSession = true,
    LockDuration = TimeSpan.FromMinutes(2),  // Default: 1 minute
    MaxAutoRenewDuration = TimeSpan.FromMinutes(5)  // Auto-renew lock
};

Use Cases and Examples

Order Processing Pipeline

// Publisher - Send order lifecycle events
public async Task PublishOrderLifecycleAsync(Order order)
{
    var sender = client.CreateSender("orders-topic");
    
    // All events for same order use order ID as session
    var events = new[]
    {
        CreateMessage("OrderPlaced", order, order.OrderId),
        CreateMessage("PaymentProcessed", order, order.OrderId),
        CreateMessage("InventoryReserved", order, order.OrderId),
        CreateMessage("OrderShipped", order, order.OrderId),
        CreateMessage("OrderDelivered", order, order.OrderId)
    };
    
    await sender.SendMessagesAsync(events);
}

private ServiceBusMessage CreateMessage(string eventType, Order order, string sessionId)
{
    return new ServiceBusMessage(JsonSerializer.Serialize(order))
    {
        Subject = eventType,
        SessionId = sessionId,
        MessageId = Guid.NewGuid().ToString()
    };
}

// Consumer - Process in order
public async Task ProcessOrderEventsAsync(ProcessMessageEventArgs args)
{
    var message = args.Message;
    var orderId = message.SessionId;
    
    // Each order's events processed sequentially
    // Different orders can be processed in parallel
    
    switch (message.Subject)
    {
        case "OrderPlaced":
            await OnOrderPlacedAsync(message);
            break;
        case "PaymentProcessed":
            await OnPaymentProcessedAsync(message);
            break;
        case "InventoryReserved":
            await OnInventoryReservedAsync(message);
            break;
        case "OrderShipped":
            await OnOrderShippedAsync(message);
            break;
        case "OrderDelivered":
            await OnOrderDeliveredAsync(message);
            break;
    }
    
    await args.CompleteMessageAsync(message);
}

User Notification Queue

// Send user notifications to session
public async Task SendUserNotificationsAsync(string userId, IEnumerable<Notification> notifications)
{
    var sender = client.CreateSender("user-notifications");
    
    var messages = notifications.Select(n => new ServiceBusMessage(JsonSerializer.Serialize(n))
    {
        SessionId = userId,
        Subject = n.Type,
        Priority = n.Priority
    });
    
    await sender.SendMessagesAsync(messages);
}

// Process user notifications in order
public async Task ProcessUserNotificationsAsync(ProcessMessageEventArgs args)
{
    var userId = args.Message.SessionId;
    
    // All notifications for same user processed sequentially
    await SendPushNotificationAsync(userId, args.Message.Body.ToString());
    
    await args.CompleteMessageAsync(args.Message);
}

Best Practices

PracticeDescription
Use meaningful session IDsCustomer ID, Order ID, Account ID
Set appropriate lock durationEnough time for processing
Handle exceptionsSession becomes available on failure
Track session stateStore state externally if needed
Enable auto-renewPrevent lock expiration during long processing
Monitor session backlogTrack pending sessions

Monitoring

# Get active session count
az servicebus queue show \
  --name orders-queue \
  --namespace-name mynamespace \
  --resource-group myrg \
  --query "sessionCount"

# Get subscription session count
az servicebus topic subscription show \
  --name processing-subscription \
  --topic-name orders-topic \
  --namespace-name mynamespace \
  --resource-group myrg \
  --query "sessionCount"

Related Topics


Azure Integration Hub - Intermediate Level