Pub-Sub Patterns with Service Bus Topics and Filters
Why Publish-Subscribe Matters
In modern distributed systems, multiple services need to react to events:
Traditional: Tight Coupling
┌─────────────┐ ┌─────────────┐
│ Order │──────▶│ Email │
│ Service │ │ Service │
└─────────────┘ └─────────────┘
│
└──────────▶┌─────────────┐
│ Shipping │
│ Service │
└─────────────┘
└──────────▶┌─────────────┐
│ Payment │
│ Service │
└─────────────┘
└──────────▶┌─────────────┐
│ Inventory │
│ Service │
└─────────────┘
Problems:
- Order service must know about all consumers
- Adding new consumer = modifying Order service
- Any change ripples through entire system
- Single point of failure
Pub-Sub: Loose Coupling
┌─────────────┐
│ Order │
│ Service │
│ (Publisher) │
└──────┬──────┘
│
▼
┌─────────────────────────────┐
│ Service Bus Topic │ (Order Events)
│ │
│ ┌───────────────────────┐ │
│ │ Subscriptions │ │
│ │ - Email Service │ │
│ │ - Shipping Service │ │
│ │ - Payment Service │ │
│ │ - Inventory Service │ │
│ └───────────────────────┘ │
└─────────────────────────────┘
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Email │ │ Shipping │ │ Payment │
└──────────┘ └──────────┘ └──────────┘
Benefits:
- Decoupled - publishers don't know consumers
- Scalable - add/remove consumers without changes
- Selective - consumers get only what they need
- Resilient - failures don't cascade
Understanding Service Bus Topics
How Topics Work
┌─────────────────────────────────────────────────────────────────────────────┐
│ Service Bus Topic Architecture │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────┐
│ Publisher │
│ (Order Service) │
└──────────┬──────────┘
│
│ 1. Send Message
▼
┌─────────────────────┐
│ Topic │
│ "order-events" │
│ │
│ Message Properties:│
│ - OrderId │
│ - CustomerId │
│ - Amount │
│ - EventType │
│ - Region │
└──────────┬──────────┘
│
┌─────────────────────────────┼─────────────────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐
│ Subscription │ │ Subscription │ │ Subscription │
│ "email-orders" │ │ "shipping-orders" │ │ "analytics" │
│ │ │ │ │ │
│ Filter: │ │ Filter: │ │ Filter: │
│ EventType='new' │ │ Region='us-west' │ │ ALL │
└─────────────────────┘ └─────────────────────┘ └─────────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐
│ Email Service │ │ Shipping Service │ │ Analytics │
│ processes │ │ processes │ │ stores all │
│ new orders │ │ US West orders │ │ events │
└─────────────────────┘ └─────────────────────┘ └─────────────────────┘
Key Concepts
Topic - A channel for publishing messages. Like a radio frequency. Subscription - A queue attached to a topic. Each subscription gets a copy of matching messages. Publisher - The service that sends messages to the topic. Subscriber - The service that reads from a subscription. Filter - Rules that determine which messages go to which subscription.
Step 1: Setting Up Topics and Subscriptions
Creating the Infrastructure
using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
public class TopicSetupService
{
private readonly ServiceBusAdministrationClient _adminClient;
private readonly ILogger<TopicSetupService> _logger;
public TopicSetupService(
ServiceBusAdministrationClient adminClient,
ILogger<TopicSetupService> logger)
{
_adminClient = adminClient;
_logger = logger;
}
public async Task CreateOrderEventsTopicAsync()
{
// Check if topic exists
if (await _adminClient.TopicExistsAsync("order-events"))
{
_logger.LogInformation("Topic 'order-events' already exists");
return;
}
// Create topic with proper settings
var topicOptions = new CreateTopicOptions("order-events")
{
// Message retention - how long messages live
DefaultMessageTimeToLive = TimeSpan.FromDays(7),
// Enable duplicate detection to handle retries
RequiresDuplicateDetection = true,
DuplicateDetectionHistoryTimeWindow = TimeSpan.FromMinutes(5),
// Enable partitioning for scalability
EnablePartitioning = true,
PartitionCount = 16,
// Max size in GB
MaxSizeInMegabytes = 1024,
// Authorization rules can be added here
AuthorizationRules = new[]
{
new SharedAccessAuthorizationRule("manageRule",
new[] { AccessRights.Manage, AccessRights.Send, AccessRights.Listen })
}
};
await _adminClient.CreateTopicAsync(topicOptions);
_logger.LogInformation("Created topic 'order-events'");
}
public async Task CreateOrderSubscriptionsAsync()
{
var subscriptions = new[]
{
new
{
Name = "email-notifications",
Filter = new SqlFilter("EventType = 'order-created' OR EventType = 'order-cancelled'"),
Description = "Sends email notifications for order events"
},
new
{
Name = "shipping-orders",
Filter = new SqlFilter("Region = 'us-west' OR Region = 'us-east'"),
Description = "Processes shipping for US orders"
},
new
{
Name = "analytics-pipeline",
Filter = new SqlFilter("1=1"), // Get everything
Description = "All order events for analytics"
},
new
{
Name = "high-value-orders",
Filter = new SqlFilter("Amount > 10000"),
Description = "VIP orders requiring special handling"
}
};
foreach (var sub in subscriptions)
{
await CreateSubscriptionWithFilterAsync("order-events", sub.Name,
sub.Filter, sub.Description);
}
}
private async Task CreateSubscriptionWithFilterAsync(
string topicName,
string subscriptionName,
SqlFilter filter,
string description)
{
var subscriptionOptions = new CreateSubscriptionOptions(topicName, subscriptionName)
{
// Lock duration - how long message is locked
LockDuration = TimeSpan.FromMinutes(1),
// Message expiration
DefaultMessageTimeToLive = TimeSpan.FromDays(7),
// Auto-delete idle subscriptions
AutoDeleteOnIdle = TimeSpan.FromDays(30),
// Session support - for ordered processing
RequiresSession = false,
Description = description
};
// Create the subscription
await _adminClient.CreateSubscriptionAsync(subscriptionOptions);
// Add the filter rule
await _adminClient.CreateRuleAsync(topicName, subscriptionName, new CreateRuleOptions
{
Name = "order-filter",
Filter = filter
});
_logger.LogInformation("Created subscription '{Subscription}' with filter", subscriptionName);
}
}
Programmatic Setup
// Program.cs - DI registration
var connectionString = Environment.GetEnvironmentVariable("ServiceBusConnection");
// For sending messages
builder.Services.AddSingleton<ServiceBusClient>(sp =>
{
return new ServiceBusClient(connectionString);
});
// For administration (creating topics/subscriptions)
builder.Services.AddSingleton<ServiceBusAdministrationClient>(sp =>
{
return new ServiceBusAdministrationClient(connectionString);
});
Step 2: Publishing Messages
Sending to Topics
public class OrderEventPublisher
{
private readonly ServiceBusSender _sender;
private readonly ILogger<OrderEventPublisher> _logger;
public OrderEventPublisher(ServiceBusClient client, ILogger<OrderEventPublisher> logger)
{
_sender = client.CreateSender("order-events");
_logger = logger;
}
public async Task PublishOrderCreatedAsync(Order order)
{
var message = new ServiceBusMessage
{
// Message ID for deduplication
MessageId = Guid.NewGuid().ToString(),
// Subject for routing
Subject = "OrderCreated",
// Properties for filtering
ApplicationProperties =
{
["EventType"] = "order-created",
["OrderId"] = order.Id,
["CustomerId"] = order.CustomerId,
["Amount"] = order.TotalAmount,
["Region"] = order.Region,
["Priority"] = order.Priority,
["Timestamp"] = DateTime.UtcNow
},
// Body
Body = new BinaryData(order),
// Content type for serialization
ContentType = "application/json"
};
// Add correlation ID for tracking
message.CorrelationId = order.Id.ToString();
// Schedule for later if needed
// message.ScheduledEnqueueTime = DateTime.UtcNow.AddHours(1);
await _sender.SendMessageAsync(message);
_logger.LogInformation(
"Published order created event for {OrderId}, Amount: {Amount}, Region: {Region}",
order.Id, order.TotalAmount, order.Region);
}
public async Task PublishOrderCancelledAsync(Order order, string reason)
{
var message = new ServiceBusMessage
{
MessageId = Guid.NewGuid().ToString(),
Subject = "OrderCancelled",
ApplicationProperties =
{
["EventType"] = "order-cancelled",
["OrderId"] = order.Id,
["CustomerId"] = order.CustomerId,
["Amount"] = order.TotalAmount,
["CancellationReason"] = reason,
["Timestamp"] = DateTime.UtcNow
},
Body = new BinaryData(order),
ContentType = "application/json"
};
message.CorrelationId = order.Id.ToString();
await _sender.SendMessageAsync(message);
_logger.LogInformation("Published order cancelled event for {OrderId}", order.Id);
}
public async Task PublishBulkOrderEventsAsync(List<Order> orders)
{
var messages = orders.Select(order => new ServiceBusMessage
{
MessageId = Guid.NewGuid().ToString(),
Subject = "OrderCreated",
ApplicationProperties =
{
["EventType"] = "order-created",
["OrderId"] = order.Id,
["CustomerId"] = order.CustomerId,
["Amount"] = order.TotalAmount,
["Region"] = order.Region,
["Timestamp"] = DateTime.UtcNow
},
Body = new BinaryData(order),
ContentType = "application/json"
});
// Send in batch for efficiency
using var batch = await _sender.CreateMessageBatchAsync();
foreach (var message in messages)
{
if (!batch.TryAddMessage(message))
{
// Batch full - send current and create new
await _sender.SendMessagesAsync(batch);
batch.Clear();
batch.TryAddMessage(message);
}
}
// Send remaining
if (batch.Count > 0)
{
await _sender.SendMessagesAsync(batch);
}
_logger.LogInformation("Published {Count} order events in bulk", orders.Count);
}
}
Step 3: Subscribing with Filters
SQL Filters - Detailed
SQL filters allow sophisticated message routing based on message properties.
// SQL Filter Examples
// 1. Filter by event type
new SqlFilter("EventType = 'order-created'")
// 2. Multiple conditions
new SqlFilter("EventType = 'order-created' AND Amount > 1000")
// 3. IN clause
new SqlFilter("Region IN ('us-west', 'us-east', 'eu-west')")
// 4. String operations
new SqlFilter("CustomerId LIKE 'VIP-%'")
// 5. Null checks
new SqlFilter("Priority IS NOT NULL")
// 6. Date/time comparisons
new SqlFilter("Timestamp > '2024-01-01'")
// 7. Complex expressions
new SqlFilter("(Amount > 10000 OR Priority = 'urgent') AND Region = 'us-west'")
Subscription with Complex Filters
public class SubscriptionManager
{
private readonly ServiceBusAdministrationClient _adminClient;
// Create subscription with SQL filter
public async Task CreateVipOrdersSubscriptionAsync()
{
// VIP customers with high-value orders
var filter = new SqlFilter(@"
(CustomerType = 'vip' AND Amount > 5000)
OR
(CustomerType = 'enterprise' AND Amount > 10000)
OR
Priority = 'urgent'
");
var options = new CreateSubscriptionOptions("order-events", "vip-orders")
{
LockDuration = TimeSpan.FromMinutes(2),
DefaultMessageTimeToLive = TimeSpan.FromDays(14), // VIP orders keep longer
MaxDeliveryCount = 10 // More retries for important orders
};
await _adminClient.CreateSubscriptionAsync(options);
var ruleOptions = new CreateRuleOptions("vip-filter", filter);
await _adminClient.CreateRuleAsync("order-events", "vip-orders", ruleOptions);
}
// Create subscription for specific region
public async Task CreateRegionalSubscriptionAsync(string region)
{
var filter = new SqlFilter($"Region = '{region}'");
var options = new CreateSubscriptionOptions("order-events", $"orders-{region.ToLower()}")
{
LockDuration = TimeSpan.FromMinutes(1),
DefaultMessageTimeToLive = TimeSpan.FromDays(7)
};
await _adminClient.CreateSubscriptionAsync(options);
var ruleOptions = new CreateRuleOptions("region-filter", filter);
await _adminClient.CreateRuleAsync("order-events", $"orders-{region.ToLower()}", ruleOptions);
}
}
Correlation Filters
Correlation filters are more efficient than SQL filters for simple property matching.
// Correlation Filter Examples
// 1. Match single property
new CorrelationFilter("EventType", "order-created")
// 2. Match multiple properties
var filter = new CorrelationFilter
{
CorrelationId = "order-123", // Match specific order
Properties =
{
["Region"] = "us-west",
["Priority"] = "urgent"
}
};
// 3. Match by correlation ID only
new CorrelationFilter("correlation-id-value")
// When to use correlation filters:
// - Simpler matching (fewer properties)
// - Better performance (indexed)
// - When matching correlation ID
// Create subscription with correlation filter
public async Task CreateCorrelationFilterSubscriptionAsync()
{
// Only receive messages for specific order type
var filter = new CorrelationFilter
{
CorrelationId = "order-created",
Properties =
{
["Region"] = "eu-west"
}
};
var options = new CreateSubscriptionOptions("order-events", "eu-orders")
{
// Correlation filter subscriptions can support sessions
RequiresSession = false
};
await _adminClient.CreateSubscriptionAsync(options);
var ruleOptions = new CreateRuleOptions("correlation-filter", filter);
await _adminClient.CreateRuleAsync("order-events", "eu-orders", ruleOptions);
}
Step 4: Processing Messages from Subscriptions
Implementing Subscribers
public class EmailNotificationSubscriber
{
private readonly ServiceBusProcessor _processor;
private readonly IEmailService _emailService;
private readonly ILogger<EmailNotificationSubscriber> _logger;
public EmailNotificationSubscriber(
ServiceBusClient client,
IEmailService emailService,
ILogger<EmailNotificationSubscriber> logger)
{
_emailService = emailService;
_logger = logger;
// Create processor for the subscription
_processor = client.CreateProcessor(
"order-events", // Topic name
"email-notifications", // Subscription name
new ServiceBusProcessorOptions
{
MaxConcurrentCalls = 10,
AutoComplete = false,
PrefetchCount = 20
});
}
public async Task StartProcessingAsync()
{
_processor.ProcessMessageAsync += HandleMessageAsync;
_processor.ProcessErrorAsync += HandleErrorAsync;
await _processor.StartProcessingAsync();
_logger.LogInformation("Started processing email notifications");
}
private async Task HandleMessageAsync(ProcessMessageEventArgs args)
{
var message = args.Message;
try
{
// Extract properties
var eventType = message.ApplicationProperties["EventType"]?.ToString();
var orderId = message.ApplicationProperties["OrderId"]?.ToString();
var customerId = message.ApplicationProperties["CustomerId"]?.ToString();
_logger.LogInformation(
"Processing {EventType} for order {OrderId}",
eventType, orderId);
// Deserialize body
var order = message.Body.ToObject<Order>();
// Send appropriate email
switch (eventType)
{
case "order-created":
await _emailService.SendOrderConfirmationAsync(order);
break;
case "order-cancelled":
var reason = message.ApplicationProperties["CancellationReason"]?.ToString();
await _emailService.SendOrderCancellationAsync(order, reason);
break;
default:
_logger.LogWarning("Unknown event type: {EventType}", eventType);
break;
}
await args.CompleteMessageAsync(message);
_logger.LogInformation("Successfully processed order notification for {OrderId}", orderId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to process message {MessageId}", message.MessageId);
// Don't complete - let it be retried or dead-lettered
await args.AbandonMessageAsync(message);
}
}
private Task HandleErrorAsync(ProcessErrorEventArgs args)
{
_logger.LogError(args.Exception,
"Error in email notification processor. Entity: {Entity}, ErrorSource: {ErrorSource}",
args.EntityPath, args.ErrorSource);
return Task.CompletedTask;
}
}
Multi-Subscription Processing
public class ShippingOrderSubscriber
{
private readonly ServiceBusProcessor _processor;
private readonly IShippingService _shippingService;
public ShippingOrderSubscriber(ServiceBusClient client)
{
// Can listen to multiple subscriptions if needed
// But typically one processor per subscription
_processor = client.CreateProcessor(
"order-events",
"shipping-orders", // Only receives messages matching filter
new ServiceBusProcessorOptions
{
MaxConcurrentCalls = 20,
AutoComplete = false
});
}
public async Task StartProcessingAsync()
{
_processor.ProcessMessageAsync += async args =>
{
var message = args.Message;
var region = message.ApplicationProperties["Region"]?.ToString();
var orderId = message.ApplicationProperties["OrderId"]?.ToString();
_logger.LogInformation("Processing shipping for order {OrderId} in region {Region}",
orderId, region);
var order = message.Body.ToObject<Order>();
// Create shipping label based on region
var shippingLabel = await _shippingService.CreateShippingLabelAsync(order, region);
// Update order with shipping info
await _shippingService.UpdateOrderShippingAsync(order.Id, shippingLabel);
await args.CompleteMessageAsync(message);
};
await _processor.StartProcessingAsync();
}
}
Step 5: Advanced Patterns
Fan-Out with Multiple Subscriptions
// Publishing to trigger multiple workflows
public class OrderEventDispatcher
{
private readonly ServiceBusSender _topicSender;
public async Task DispatchOrderCreatedAsync(Order order)
{
// Single message, but multiple subscriptions will receive it
var message = new ServiceBusMessage
{
MessageId = Guid.NewGuid().ToString(),
Subject = "OrderCreated",
ApplicationProperties =
{
["EventType"] = "order-created",
["OrderId"] = order.Id,
["CustomerId"] = order.CustomerId,
["Amount"] = order.TotalAmount,
["Region"] = order.Region,
["Priority"] = order.Priority,
["CustomerType"] = order.CustomerType, // For VIP filter
["Timestamp"] = DateTime.UtcNow
},
Body = new BinaryData(order)
};
await _topicSender.SendMessageAsync(message);
// Message goes to ALL subscriptions that match:
// - email-notifications (EventType filter)
// - shipping-orders (Region filter)
// - analytics-pipeline (all events)
// - vip-orders (CustomerType + Amount filter)
// - high-value-orders (Amount filter)
}
}
Dead Letter Handling for Topics
public class DeadLetterProcessor
{
private readonly ServiceBusReceiver _dlqReceiver;
public DeadLetterProcessor(ServiceBusClient client)
{
// Access dead letter queue for a subscription
_dlqReceiver = client.CreateReceiver(
"order-events",
"email-notifications",
new ServiceBusReceiverOptions
{
SubQueue = SubQueue.DeadLetter
});
}
public async Task ProcessDeadLettersAsync()
{
while (true)
{
var messages = await _dlqReceiver.ReceiveMessagesAsync(
maxMessages: 10,
maxWaitTime: TimeSpan.FromSeconds(5));
foreach (var message in messages)
{
_logger.LogWarning(
"Processing dead letter. Reason: {Reason}, Error: {Error}",
message.DeadLetterReason,
message.DeadLetterErrorDescription);
// Analyze why it failed
var reason = message.DeadLetterReason;
var eventType = message.ApplicationProperties["EventType"];
// Retry or archive
if (ShouldRetry(message))
{
await RetryMessageAsync(message);
}
else
{
await ArchiveDeadLetterAsync(message);
}
await _dlqReceiver.CompleteMessageAsync(message);
}
if (messages.Count == 0) break;
}
}
private bool ShouldRetry(ServiceBusMessage message)
{
var reason = message.DeadLetterReason;
var deliveryCount = message.DeliveryCount;
// Retry if less than 3 attempts
return deliveryCount < 3 &&
(reason == "MessagingEntityDisabled" ||
reason == "MessageLockLost");
}
}
Step 6: Testing and Monitoring
Subscription Testing
public class TopicSubscriptionTests
{
private readonly ServiceBusAdministrationClient _adminClient;
[Fact]
public async Task Subscription_ReceivesMatchingMessages()
{
// Arrange - create test topic and subscription with filter
var topicName = $"test-topic-{Guid.NewGuid():N}";
await _adminClient.CreateTopicAsync(topicName);
var subscriptionName = "test-subscription";
await _adminClient.CreateSubscriptionAsync(topicName, subscriptionName);
// Add filter: Amount > 5000
await _adminClient.CreateRuleAsync(topicName, subscriptionName,
new CreateRuleOptions("amount-filter",
new SqlFilter("Amount > 5000")));
// Act - send messages with different amounts
var sender = _client.CreateSender(topicName);
await sender.SendMessageAsync(CreateMessage(1000)); // Below filter
await sender.SendMessageAsync(CreateMessage(6000)); // Above filter
// Allow time for message routing
await Task.Delay(TimeSpan.FromSeconds(2));
// Assert - check subscription has expected messages
var receiver = _client.CreateReceiver(topicName, subscriptionName);
var messages = await receiver.PeekMessagesAsync(10);
Assert.Single(messages); // Only the > 5000 message
Assert.Equal(6000m, messages[0].ApplicationProperties["Amount"]);
// Cleanup
await _adminClient.DeleteTopicAsync(topicName);
}
private ServiceBusMessage CreateMessage(decimal amount)
{
return new ServiceBusMessage
{
ApplicationProperties = { ["Amount"] = amount }
};
}
}
Monitoring Subscriptions
public class SubscriptionMonitor
{
private readonly ServiceBusAdministrationClient _adminClient;
private readonly ILogger<SubscriptionMonitor> _logger;
public async Task<SubscriptionMetrics> GetMetricsAsync(string topicName, string subscriptionName)
{
var subProperties = await _adminClient.GetSubscriptionAsync(topicName, subscriptionName);
var dlqName = EntityNameFormatter.FormatDeadLetterQueuePath(subscriptionName);
var dlqProperties = await _adminClient.GetQueueAsync(dlqName);
return new SubscriptionMetrics
{
ActiveMessages = subProperties.Value.ActiveMessageCount,
DeadLetterMessages = dlqProperties.Value.ActiveMessageCount,
ScheduledMessages = subProperties.Value.ScheduledMessageCount,
TransferMessages = subProperties.Value.TransferMessageCount,
TransferDeadLetterMessages = subProperties.Value.TransferDeadLetterMessageCount,
MessageCountDetails = subProperties.Value.MessageCountDetails
};
}
public async Task CheckBacklogAsync(string topicName)
{
var topicProperties = await _adminClient.GetTopicAsync(topicName);
var subscriptions = _adminClient.GetSubscriptionsAsync(topicName);
await foreach (var sub in subscriptions)
{
var metrics = await GetMetricsAsync(topicName, sub.Name);
if (metrics.ActiveMessages > 1000)
{
_logger.LogWarning(
"Subscription {Subscription} has {Count} messages - possible backlog!",
sub.Name, metrics.ActiveMessages);
}
}
}
}
public class SubscriptionMetrics
{
public long ActiveMessages { get; set; }
public long DeadLetterMessages { get; set; }
public long ScheduledMessages { get; set; }
public long TransferMessages { get; set; }
public long TransferDeadLetterMessages { get; set; }
public MessageCountDetails MessageCountDetails { get; set; }
}
Best Practices Summary
| Practice | Why | Implementation |
|---|---|---|
| Use SQL filters for complex routing | Flexible matching | Amount > 1000 AND Region = 'us' |
| Use correlation filters for simple cases | Better performance | Match correlation ID only |
| Filter at subscription level | Reduce message processing | Don't filter in handler |
| Monitor subscription depth | Detect issues early | Set up alerts |
| Configure DLQ per subscription | Isolate failures | Each subscription has own DLQ |
| Use partitioning for high throughput | Scale horizontally | PartitionCount = 16 |
Subscription Design Patterns
Patterns:
1. By Event Type
- order-created → notifications
- order-updated → inventory
- order-cancelled → refund
2. By Region
- us-west-orders → US shipping
- eu-west-orders → EU shipping
3. By Customer Type
- vip-orders → premium handling
- enterprise-orders → dedicated queue
4. By Priority
- urgent-orders → expedited processing
- normal-orders → standard queue
Conclusion
Service Bus Topics provide powerful pub-sub capabilities:
- Decouple publishers from subscribers
- Filter messages using SQL or correlation filters
- Scale with partitioning
- Monitor subscription health
- Handle failures with DLQ
Key takeaways:
- Design subscriptions based on consumer needs, not publisher structure
- Use SQL filters for complex routing, correlation filters for simple cases
- Monitor for backlogs and dead letter accumulation
- Test filters thoroughly - they determine what messages are delivered
- Consider message TTL and retention for debugging
Azure Integration Hub - Service Bus