Service Bus — Message Scheduling, Deferral, and Delayed Processing
Overview
Azure Service Bus provides powerful capabilities for controlling when messages are processed. You can schedule messages to be delivered at a specific time in the future, defer messages for later processing, or implement complex delayed processing patterns. These features enable sophisticated workflows like scheduled notifications, delayed retries, and time-based routing.
What You'll Learn
- Scheduling messages for future delivery
- Deferring messages for later processing
- Implementing delayed retry patterns
- Building scheduled notification systems
Problem
You need to:
- Send messages that should be processed at a specific time
- Defer processing until certain conditions are met
- Implement retry with delays
- Build scheduled reminder systems
Solution Implementation
Step 1: Schedule Message for Future Delivery
using Azure.Messaging.ServiceBus;
// Schedule message to be delivered in 1 hour
public async Task ScheduleOrderConfirmationAsync(string orderId)
{
await using var client = new ServiceBusClient(_connectionString);
await using var sender = client.CreateSender("order-notifications");
var message = new ServiceBusMessage(
JsonSerializer.Serialize(new
{
orderId,
message = "Your order has been confirmed",
type = "order_confirmation"
}))
{
Subject = "OrderNotification",
ContentType = "application/json"
};
// Schedule for 1 hour from now
var scheduledTime = DateTimeOffset.UtcNow.AddHours(1);
await sender.ScheduleMessageAsync(message, scheduledTime);
Console.WriteLine($"Message scheduled for {scheduledTime}");
}
// Schedule for specific date/time
public async Task ScheduleReminderAsync(Reminder reminder)
{
await using var client = new ServiceBusClient(_connectionString);
await using var sender = client.CreateSender("reminders");
var message = new ServiceBusMessage(JsonSerializer.Serialize(reminder));
// Schedule for specific time
var sendTime = reminder.DueDate;
var seq = await sender.ScheduleMessageAsync(message, sendTime);
Console.WriteLine($"Reminder scheduled for {sendTime}, sequence: {seq}");
}
Step 2: Cancel Scheduled Message
public async Task CancelScheduledNotificationAsync(long sequenceNumber)
{
await using var client = new ServiceBusClient(_connectionString);
await using var sender = client.CreateSender("order-notifications");
await sender.CancelScheduledMessageAsync(sequenceNumber);
Console.WriteLine($"Cancelled scheduled message: {sequenceNumber}");
}
Step 3: Defer Message for Later Processing
public async Task ProcessWithDeferralAsync()
{
await using var client = new ServiceBusClient(_connectionString);
await using var receiver = client.CreateReceiver("orders-queue");
await foreach (var message in receiver.ReceiveMessagesAsync())
{
try
{
var order = JsonSerializer.Deserialize<Order>(message.Body);
// Check if order needs manual review
if (order.RequiresReview)
{
// Defer message for later processing
await receiver.DeferMessageAsync(message);
Console.WriteLine($"Message deferred for order {order.OrderId}");
// Queue notification for review
await SendToReviewQueueAsync(order);
}
else
{
// Process normally
await ProcessOrderAsync(order);
await receiver.CompleteMessageAsync(message);
}
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
await receiver.AbandonMessageAsync(message);
}
}
}
// Receive deferred messages
public async Task ProcessDeferredMessagesAsync()
{
await using var client = new ServiceBusClient(_connectionString);
await using var receiver = client.CreateReceiver("orders-queue");
// Receive by sequence number (stored previously)
var deferred = await receiver.ReceiveDeferredMessageAsync(sequenceNumber);
if (deferred != null)
{
var order = JsonSerializer.Deserialize<Order>(deferred.Body);
await ProcessReviewResultAsync(order);
await receiver.CompleteMessageAsync(deferred);
}
}
Step 4: Delayed Retry Pattern
public async Task ProcessWithDelayedRetryAsync(ServiceBusReceivedMessage message)
{
var retryCount = GetRetryCount(message);
var maxRetries = 3;
try
{
var order = JsonSerializer.Deserialize<Order>(message.Body);
await _orderService.ProcessAsync(order);
await receiver.CompleteMessageAsync(message);
}
catch (Exception ex)
{
if (retryCount >= maxRetries)
{
// Move to dead letter after max retries
await receiver.DeadLetterMessageAsync(message, "Max retries exceeded", ex.Message);
}
else
{
// Calculate delay with exponential backoff
var delay = TimeSpan.FromSeconds(Math.Pow(2, retryCount));
// Abandon and schedule retry
await receiver.AbandonMessageAsync(message, new Dictionary<string, object>
{
["RetryCount"] = retryCount + 1,
["NextRetryAt"] = DateTime.UtcNow.Add(delay).ToString("O")
});
// Schedule a retry message
await ScheduleRetryAsync(message, delay);
}
}
}
private async Task ScheduleRetryAsync(ServiceBusReceivedMessage original, TimeSpan delay)
{
await using var client = new ServiceBusClient(_connectionString);
await using var sender = client.CreateSender("retry-queue");
var retryMessage = new ServiceBusMessage(original.Body);
retryMessage.Properties["OriginalMessageId"] = original.MessageId;
retryMessage.Properties["OriginalSequenceNumber"] = original.SequenceNumber.ToString();
await sender.ScheduleMessageAsync(retryMessage, DateTime.UtcNow.Add(delay));
}
Step 5: Scheduled Notifications System
public class NotificationScheduler
{
public async Task ScheduleOrderRemindersAsync(Order order)
{
await using var client = new ServiceBusClient(_connectionString);
await using var sender = client.CreateSender("notifications");
// Schedule pre-delivery reminder (1 day before)
var preDeliveryTime = order.ScheduledDelivery.AddDays(-1);
await sender.ScheduleMessageAsync(CreateReminderMessage(order, "pre_delivery"), preDeliveryTime);
// Schedule delivery reminder (same day morning)
var morningOfDelivery = order.ScheduledDelivery.Date.AddHours(8);
await sender.ScheduleMessageAsync(CreateReminderMessage(order, "same_day"), morningOfDelivery);
// Schedule follow-up (1 day after)
var followUpTime = order.ScheduledDelivery.AddDays(1);
await sender.ScheduleMessageAsync(CreateReminderMessage(order, "follow_up"), followUpTime);
}
private ServiceBusMessage CreateReminderMessage(Order order, string type)
{
var template = type switch
{
"pre_delivery" => $"Your order {order.OrderId} arrives tomorrow!",
"same_day" => $"Your order {order.OrderId} is being delivered today!",
"follow_up" => $"How was your order {order.OrderId}? Rate your experience.",
_ => $"Update about order {order.OrderId}"
};
return new ServiceBusMessage(JsonSerializer.Serialize(new
{
orderId = order.OrderId,
customerId = order.CustomerId,
type,
message = template,
scheduledFor = DateTime.UtcNow
}))
{
Subject = "OrderReminder",
ContentType = "application/json"
};
}
}
Step 6: Time-Based Routing
public async Task ProcessTimeSensitiveMessagesAsync()
{
await using var client = new ServiceBusClient(_connectionString);
await using var receiver = client.CreateReceiver("orders-queue");
await foreach (var message in receiver.ReceiveMessagesAsync())
{
var order = JsonSerializer.Deserialize<Order>(message.Body);
// Check if message has expired
if (message.ScheduledEnqueueTimeUtc.HasValue)
{
if (DateTime.UtcNow > message.ScheduledEnqueueTimeUtc.Value.AddHours(24))
{
// Too old, dead letter
await receiver.DeadLetterMessageAsync(message, "Order expired");
continue;
}
}
// Check priority based on order value
if (order.TotalAmount > 1000)
{
// High value - route to priority queue
await using var prioritySender = client.CreateSender("priority-orders");
await prioritySender.SendMessageAsync(new ServiceBusMessage(message.Body)
{
Properties = message.Properties
});
await receiver.CompleteMessageAsync(message);
}
else
{
// Process normally
await ProcessOrderAsync(order);
await receiver.CompleteMessageAsync(message);
}
}
}
Real-Time Scenarios
Scenario 1: Payment Processing with Delay
public async Task ProcessPaymentAsync(Payment payment)
{
await using var client = new ServiceBusClient(_connectionString);
await using var sender = client.CreateSender("payments");
var message = new ServiceBusMessage(JsonSerializer.Serialize(payment))
{
Subject = "PaymentVerification",
ContentType = "application/json"
};
// Schedule verification after 5 minutes
var verifyAt = DateTimeOffset.UtcNow.AddMinutes(5);
var sequence = await sender.ScheduleMessageAsync(message, verifyAt);
// Store sequence for later verification
await _paymentRepository.SaveScheduleInfo(payment.PaymentId, sequence);
}
Scenario 2: Meeting Reminder System
public async Task ScheduleMeetingRemindersAsync(Meeting meeting)
{
await using var client = new ServiceBusClient(_connectionString);
await using var sender = client.CreateSender("meeting-reminders");
var reminders = new[]
{
(meeting.StartTime.AddHours(-24), "24h"),
(meeting.StartTime.AddHours(-1), "1h"),
(meeting.StartTime.AddMinutes(-15), "15m")
};
foreach (var (time, label) in reminders)
{
var message = new ServiceBusMessage(JsonSerializer.Serialize(new
{
meetingId = meeting.Id,
title = meeting.Title,
startTime = meeting.StartTime,
attendees = meeting.Attendees,
reminderType = label
}))
{
Subject = "MeetingReminder"
};
await sender.ScheduleMessageAsync(message, time);
}
}
Scenario 3: Batch Processing with Windows
public async Task ScheduleBatchProcessingAsync()
{
await using var client = new ServiceBusClient(_connectionString);
await using var sender = client.CreateSender("batch-jobs");
// Schedule for next hour boundary
var now = DateTimeOffset.UtcNow;
var nextHour = new DateTimeOffset(
now.Year, now.Month, now.Day,
now.Hour + 1, 0, 0, TimeSpan.Zero);
var batchJob = new ServiceBusMessage(JsonSerializer.Serialize(new
{
jobId = Guid.NewGuid().ToString(),
type = "daily_batch",
parameters = await GetBatchParametersAsync()
}))
{
Subject = "BatchJob"
};
await sender.ScheduleMessageAsync(batchJob, nextHour);
Console.WriteLine($"Batch scheduled for {nextHour}");
}
Testing Scheduled Messages
// Test: Verify scheduled message appears at correct time
public async Task TestMessageSchedulingAsync()
{
var testClient = new ServiceBusClient(_connectionString);
var sender = testClient.CreateSender("test-queue");
var message = new ServiceBusMessage("Test message");
var scheduledTime = DateTimeOffset.UtcNow.AddSeconds(10);
var sequence = await sender.ScheduleMessageAsync(message, scheduledTime);
// Wait and verify
await Task.Delay(11000);
var receiver = testClient.CreateReceiver("test-queue");
var received = await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(5));
Assert.IsNotNull(received);
Console.WriteLine($"Received scheduled message at {DateTime.UtcNow}");
}
Best Practices
| Practice | Why |
|---|---|
| Store sequence numbers | Need to cancel or receive deferred messages |
| Use proper time zones | Schedule in UTC, convert appropriately |
| Set message TTL | Prevent orphaned scheduled messages |
| Handle failures gracefully | Dead letter messages that can't be processed |
| Monitor scheduled messages | Track delivery and delays |
Summary
- Use
ScheduleMessageAsyncfor future delivery - Use
DeferMessageAsyncto pause processing - Implement retry with exponential backoff using scheduled messages
- Cancel scheduled messages using sequence numbers
- Build sophisticated scheduling systems for notifications and reminders