Service Bus Sessions & Ordered Processing
FIFO Message Processing with Session IDs
Introduction
Azure Service Bus Sessions provide a powerful mechanism for processing related messages in strict order. When you need to ensure that messages for a specific entity (like an order, customer, or account) are processed sequentially without interruption, sessions are the solution.
This guide covers:
- Session fundamentals — How sessions enable ordered processing
- Configuration — Enabling sessions on queues and subscriptions
- Implementation — Sending and receiving session messages
- Session management — Lock behavior and completion
- Real-world patterns — Common use cases and implementations
Understanding Sessions
How Sessions Work
┌─────────────────────────────────────────────────────────────────────┐
│ SERVICE BUS SESSIONS │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ SESSION ID: "customer-001" │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Queue with Sessions │ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │Msg 1 │ │Msg 2 │ │Msg 3 │ │Msg 4 │ │ │
│ │ │ │ │ │ │ │ │ │ │ │
│ │ │Session │ │Session │ │Session │ │Session │ │ │
│ │ │"cust-1"│ │"cust-1"│ │"cust-2"│ │"cust-1"│ │ │
│ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ │
│ │ │ │ │ │ │ │
│ │ └──────────┴──────────┴──────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────────────────────┐ │ │
│ │ │ Consumer for Session │ │ │
│ │ │ "customer-001" │ │ │
│ │ │ │ │ │
│ │ │ Process in order: │ │ │
│ │ │ 1. Msg 1 (Order Placed) │ │ │
│ │ │ 2. Msg 2 (Payment) │ │ │
│ │ │ 3. Msg 4 (Shipped) │ │ │
│ │ └─────────────────────────────┘ │ │
│ │ │ │
│ │ Another consumer handles "customer-002" │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
Why Use Sessions?
| Scenario | Without Sessions | With Sessions |
|---|---|---|
| Order processing | Messages might be processed out of order | Orders processed in sequence |
| Inventory updates | Stock levels could become inconsistent | Sequential updates prevent conflicts |
| Banking transactions | Account balance could be incorrect | Transactions applied in order |
| User notifications | User might receive messages in wrong order | Notifications in proper sequence |
Enable Sessions
Create Queue with Sessions
var namespaceClient = new ServiceBusClient(connectionString);
// Create queue with session enabled
var queueOptions = new CreateQueueOptions("orders-queue")
{
RequiresSession = true, // Enable sessions
LockDuration = TimeSpan.FromMinutes(1),
MaxDeliveryCount = 10,
DefaultMessageTimeToLive = TimeSpan.FromDays(7)
};
await namespaceClient.CreateQueueAsync(queueOptions);
Enable on Topic Subscription
var topicClient = namespaceClient.CreateTopic("orders-topic");
await topicClient.CreateAsync();
// Enable sessions on subscription
var subscriptionOptions = new CreateSubscriptionOptions
{
TopicName = "orders-topic",
SubscriptionName = "processing-subscription",
RequiresSession = true
};
await topicClient.CreateSubscriptionAsync(subscriptionOptions);
Azure Portal Configuration
Queue Settings:
├── Enable Sessions: ON
├── Lock Duration: 1 minute
├── Max Delivery Count: 10
└── Message TTL: 7 days
CLI Configuration
az servicebus queue create \
--name orders-queue \
--namespace-name mynamespace \
--resource-group myrg \
--enable-session true
Send Messages to Sessions
Basic Session Message
var sender = client.CreateSender("orders-queue");
// All messages with same SessionId go to same consumer
var message1 = new ServiceBusMessage("Order #12345 placed")
{
SessionId = "customer-001"
};
var message2 = new ServiceBusMessage("Payment received for #12345")
{
SessionId = "customer-001"
};
var message3 = new ServiceBusMessage("Order #12345 shipped")
{
SessionId = "customer-001"
};
await sender.SendMessagesAsync(new[] { message1, message2, message3 });
Sending Multiple Sessions
// Each session ID gets processed independently
var messages = new[]
{
new ServiceBusMessage("Order 1") { SessionId = "customer-001" },
new ServiceBusMessage("Order 2") { SessionId = "customer-001" },
new ServiceBusMessage("Order 3") { SessionId = "customer-002" },
new ServiceBusMessage("Order 4") { SessionId = "customer-002" },
new ServiceBusMessage("Order 5") { SessionId = "customer-003" }
};
await sender.SendMessagesAsync(messages);
// Result: 3 consumers each get their session's messages in order
Dynamic Session ID
// Use meaningful session IDs
public async Task SendOrderMessagesAsync(Order order)
{
var sender = client.CreateSender("orders-queue");
var messages = new[]
{
new ServiceBusMessage(JsonSerializer.Serialize(new OrderPlacedEvent
{
OrderId = order.OrderId,
CustomerId = order.CustomerId,
Total = order.Total
}))
{
SessionId = order.OrderId,
Subject = "OrderPlaced"
},
new ServiceBusMessage(JsonSerializer.Serialize(new PaymentReceivedEvent
{
OrderId = order.OrderId,
Amount = order.Total,
PaymentId = order.PaymentId
}))
{
SessionId = order.OrderId,
Subject = "PaymentReceived"
},
new ServiceBusMessage(JsonSerializer.Serialize(new OrderShippedEvent
{
OrderId = order.OrderId,
TrackingNumber = order.TrackingNumber
}))
{
SessionId = order.OrderId,
Subject = "OrderShipped"
}
};
await sender.SendMessagesAsync(messages);
}
Receive Messages from Sessions
Basic Session Processing
var processor = client.CreateProcessor(
"orders-queue",
new ServiceBusProcessorOptions
{
SessionsEnabled = true, // Enable session processing
MaxConcurrentCalls = 5 // Process 5 sessions concurrently
}
);
processor.ProcessMessageAsync += async args =>
{
var message = args.Message;
var sessionId = message.SessionId;
Console.WriteLine($"Session: {sessionId}");
Console.WriteLine($"Message: {message.Body}");
Console.WriteLine($"Subject: {message.Subject}");
// Process message
await ProcessOrderMessageAsync(message);
// Complete the message
await args.CompleteMessageAsync(message);
};
processor.ProcessErrorAsync += args =>
{
Console.WriteLine($"Error: {args.Exception.Message}");
return Task.CompletedTask;
};
await processor.StartProcessingAsync();
Accept Specific Session
// Accept a specific session by ID
var sessionReceiver = client.CreateSessionReceiver(
"orders-queue",
new ServiceBusSessionReceiverOptions
{
SessionId = "customer-001"
}
);
while (true)
{
// Receive messages for this session
var messages = await sessionReceiver.ReceiveMessagesAsync(10);
foreach (var message in messages)
{
Console.WriteLine($"Session {message.SessionId}: {message.Body}");
await sessionReceiver.CompleteMessageAsync(message);
}
if (messages.Count == 0) break;
}
Session-Aware Processing
public class OrderProcessingService
{
public async Task ProcessWithSessionAsync(ProcessMessageEventArgs args)
{
var message = args.Message;
var sessionId = message.SessionId;
// Get or create session state
var sessionState = await GetSessionStateAsync(sessionId);
// Process based on message content
switch (message.Subject)
{
case "OrderPlaced":
await HandleOrderPlacedAsync(message, sessionState);
break;
case "PaymentReceived":
await HandlePaymentReceivedAsync(message, sessionState);
break;
case "OrderShipped":
await HandleOrderShippedAsync(message, sessionState);
break;
}
// Update session state
await UpdateSessionStateAsync(sessionId, sessionState);
await args.CompleteMessageAsync(message);
}
private async Task<Dictionary<string, object>> GetSessionStateAsync(string sessionId)
{
// Implementation to retrieve session state
// Could be stored in cache, database, etc.
}
}
Session Lock Behavior
How Session Lock Works
┌─────────────────────────────────────────────────────────────────────┐
│ SESSION LOCK BEHAVIOR │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Session "customer-001" │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Msg 1 │ → │ Msg 2 │ → │ Msg 3 │ │ │
│ │ │ Locked │ │ Locked │ │ Available│ │ │
│ │ │ by │ │ by │ │ (next in │ │ │
│ │ │Consumer │ │Consumer │ │ sequence) │ │ │
│ │ │ A │ │ A │ │ │ │ │
│ │ └─────────┘ └─────────┘ └───────────┘ │ │
│ │ │ │
│ │ Consumer A has exclusive access to this session │ │
│ │ Other consumers cannot access any messages │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ If Consumer A fails: │
│ - Session lock expires (based on LockDuration) │
│ - Session becomes available to other consumers │
│ - Next consumer picks up from where left off │
│ │
└─────────────────────────────────────────────────────────────────────┘
Lock Duration
var queueOptions = new CreateQueueOptions("orders-queue")
{
RequiresSession = true,
LockDuration = TimeSpan.FromMinutes(2), // Default: 1 minute
MaxAutoRenewDuration = TimeSpan.FromMinutes(5) // Auto-renew lock
};
Use Cases and Examples
Order Processing Pipeline
// Publisher - Send order lifecycle events
public async Task PublishOrderLifecycleAsync(Order order)
{
var sender = client.CreateSender("orders-topic");
// All events for same order use order ID as session
var events = new[]
{
CreateMessage("OrderPlaced", order, order.OrderId),
CreateMessage("PaymentProcessed", order, order.OrderId),
CreateMessage("InventoryReserved", order, order.OrderId),
CreateMessage("OrderShipped", order, order.OrderId),
CreateMessage("OrderDelivered", order, order.OrderId)
};
await sender.SendMessagesAsync(events);
}
private ServiceBusMessage CreateMessage(string eventType, Order order, string sessionId)
{
return new ServiceBusMessage(JsonSerializer.Serialize(order))
{
Subject = eventType,
SessionId = sessionId,
MessageId = Guid.NewGuid().ToString()
};
}
// Consumer - Process in order
public async Task ProcessOrderEventsAsync(ProcessMessageEventArgs args)
{
var message = args.Message;
var orderId = message.SessionId;
// Each order's events processed sequentially
// Different orders can be processed in parallel
switch (message.Subject)
{
case "OrderPlaced":
await OnOrderPlacedAsync(message);
break;
case "PaymentProcessed":
await OnPaymentProcessedAsync(message);
break;
case "InventoryReserved":
await OnInventoryReservedAsync(message);
break;
case "OrderShipped":
await OnOrderShippedAsync(message);
break;
case "OrderDelivered":
await OnOrderDeliveredAsync(message);
break;
}
await args.CompleteMessageAsync(message);
}
User Notification Queue
// Send user notifications to session
public async Task SendUserNotificationsAsync(string userId, IEnumerable<Notification> notifications)
{
var sender = client.CreateSender("user-notifications");
var messages = notifications.Select(n => new ServiceBusMessage(JsonSerializer.Serialize(n))
{
SessionId = userId,
Subject = n.Type,
Priority = n.Priority
});
await sender.SendMessagesAsync(messages);
}
// Process user notifications in order
public async Task ProcessUserNotificationsAsync(ProcessMessageEventArgs args)
{
var userId = args.Message.SessionId;
// All notifications for same user processed sequentially
await SendPushNotificationAsync(userId, args.Message.Body.ToString());
await args.CompleteMessageAsync(args.Message);
}
Best Practices
| Practice | Description |
|---|---|
| Use meaningful session IDs | Customer ID, Order ID, Account ID |
| Set appropriate lock duration | Enough time for processing |
| Handle exceptions | Session becomes available on failure |
| Track session state | Store state externally if needed |
| Enable auto-renew | Prevent lock expiration during long processing |
| Monitor session backlog | Track pending sessions |
Monitoring
# Get active session count
az servicebus queue show \
--name orders-queue \
--namespace-name mynamespace \
--resource-group myrg \
--query "sessionCount"
# Get subscription session count
az servicebus topic subscription show \
--name processing-subscription \
--topic-name orders-topic \
--namespace-name mynamespace \
--resource-group myrg \
--query "sessionCount"
Related Topics
- Dead-Letter Queue — Handling failed messages
- Scheduled Messages — Delayed delivery
- Retry Policies — Handling transient failures
Azure Integration Hub - Intermediate Level