← Back to ArticlesService Bus

Pub-Sub Patterns with Service Bus Topics and Filters

Comprehensive guide to implementing publish-subscribe patterns using Azure Service Bus topics with SQL filters, correlation filters, and building event-driven architectures

Pub-Sub Patterns with Service Bus Topics and Filters

Why Publish-Subscribe Matters

In modern distributed systems, multiple services need to react to events:

Traditional: Tight Coupling

┌─────────────┐       ┌─────────────┐
│   Order     │──────▶│   Email     │
│   Service   │       │   Service   │
└─────────────┘       └─────────────┘
         │
         └──────────▶┌─────────────┐
                     │   Shipping  │
                     │   Service   │
                     └─────────────┘
         └──────────▶┌─────────────┐
                     │   Payment   │
                     │   Service   │
                     └─────────────┘
         └──────────▶┌─────────────┐
                     │  Inventory  │
                     │   Service   │
                     └─────────────┘

Problems:
- Order service must know about all consumers
- Adding new consumer = modifying Order service
- Any change ripples through entire system
- Single point of failure


Pub-Sub: Loose Coupling

┌─────────────┐
│   Order     │
│   Service   │
│ (Publisher) │
└──────┬──────┘
       │
       ▼
┌─────────────────────────────┐
│    Service Bus Topic        │  (Order Events)
│                             │
│  ┌───────────────────────┐  │
│  │  Subscriptions        │  │
│  │  - Email Service      │  │
│  │  - Shipping Service   │  │
│  │  - Payment Service    │  │
│  │  - Inventory Service  │  │
│  └───────────────────────┘  │
└─────────────────────────────┘
       │              │              │
       ▼              ▼              ▼
┌──────────┐    ┌──────────┐    ┌──────────┐
│  Email   │    │ Shipping │    │ Payment  │
└──────────┘    └──────────┘    └──────────┘

Benefits:
- Decoupled - publishers don't know consumers
- Scalable - add/remove consumers without changes
- Selective - consumers get only what they need
- Resilient - failures don't cascade

Understanding Service Bus Topics

How Topics Work

┌─────────────────────────────────────────────────────────────────────────────┐
│                        Service Bus Topic Architecture                        │
└─────────────────────────────────────────────────────────────────────────────┘

                            ┌─────────────────────┐
                            │   Publisher         │
                            │   (Order Service)   │
                            └──────────┬──────────┘
                                       │
                                       │ 1. Send Message
                                       ▼
                            ┌─────────────────────┐
                            │   Topic             │
                            │  "order-events"     │
                            │                     │
                            │  Message Properties:│
                            │  - OrderId          │
                            │  - CustomerId       │
                            │  - Amount           │
                            │  - EventType        │
                            │  - Region           │
                            └──────────┬──────────┘
                                       │
         ┌─────────────────────────────┼─────────────────────────────┐
         │                             │                             │
         ▼                             ▼                             ▼
┌─────────────────────┐   ┌─────────────────────┐   ┌─────────────────────┐
│  Subscription       │   │  Subscription       │   │  Subscription       │
│  "email-orders"     │   │  "shipping-orders"  │   │  "analytics"        │
│                     │   │                     │   │                     │
│  Filter:            │   │  Filter:            │   │  Filter:            │
│  EventType='new'    │   │  Region='us-west'   │   │  ALL                │
└─────────────────────┘   └─────────────────────┘   └─────────────────────┘
         │                             │                             │
         ▼                             ▼                             ▼
┌─────────────────────┐   ┌─────────────────────┐   ┌─────────────────────┐
│   Email Service     │   │   Shipping Service  │   │   Analytics         │
│   processes         │   │   processes         │   │   stores all        │
│   new orders        │   │   US West orders    │   │   events            │
└─────────────────────┘   └─────────────────────┘   └─────────────────────┘

Key Concepts

Topic - A channel for publishing messages. Like a radio frequency. Subscription - A queue attached to a topic. Each subscription gets a copy of matching messages. Publisher - The service that sends messages to the topic. Subscriber - The service that reads from a subscription. Filter - Rules that determine which messages go to which subscription.


Step 1: Setting Up Topics and Subscriptions

Creating the Infrastructure

using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;

public class TopicSetupService
{
    private readonly ServiceBusAdministrationClient _adminClient;
    private readonly ILogger<TopicSetupService> _logger;

    public TopicSetupService(
        ServiceBusAdministrationClient adminClient,
        ILogger<TopicSetupService> logger)
    {
        _adminClient = adminClient;
        _logger = logger;
    }

    public async Task CreateOrderEventsTopicAsync()
    {
        // Check if topic exists
        if (await _adminClient.TopicExistsAsync("order-events"))
        {
            _logger.LogInformation("Topic 'order-events' already exists");
            return;
        }

        // Create topic with proper settings
        var topicOptions = new CreateTopicOptions("order-events")
        {
            // Message retention - how long messages live
            DefaultMessageTimeToLive = TimeSpan.FromDays(7),
            
            // Enable duplicate detection to handle retries
            RequiresDuplicateDetection = true,
            DuplicateDetectionHistoryTimeWindow = TimeSpan.FromMinutes(5),
            
            // Enable partitioning for scalability
            EnablePartitioning = true,
            PartitionCount = 16,
            
            // Max size in GB
            MaxSizeInMegabytes = 1024,
            
            // Authorization rules can be added here
            AuthorizationRules = new[]
            {
                new SharedAccessAuthorizationRule("manageRule",
                    new[] { AccessRights.Manage, AccessRights.Send, AccessRights.Listen })
            }
        };

        await _adminClient.CreateTopicAsync(topicOptions);
        _logger.LogInformation("Created topic 'order-events'");
    }

    public async Task CreateOrderSubscriptionsAsync()
    {
        var subscriptions = new[]
        {
            new
            {
                Name = "email-notifications",
                Filter = new SqlFilter("EventType = 'order-created' OR EventType = 'order-cancelled'"),
                Description = "Sends email notifications for order events"
            },
            new
            {
                Name = "shipping-orders",
                Filter = new SqlFilter("Region = 'us-west' OR Region = 'us-east'"),
                Description = "Processes shipping for US orders"
            },
            new
            {
                Name = "analytics-pipeline",
                Filter = new SqlFilter("1=1"), // Get everything
                Description = "All order events for analytics"
            },
            new
            {
                Name = "high-value-orders",
                Filter = new SqlFilter("Amount > 10000"),
                Description = "VIP orders requiring special handling"
            }
        };

        foreach (var sub in subscriptions)
        {
            await CreateSubscriptionWithFilterAsync("order-events", sub.Name, 
                sub.Filter, sub.Description);
        }
    }

    private async Task CreateSubscriptionWithFilterAsync(
        string topicName, 
        string subscriptionName,
        SqlFilter filter,
        string description)
    {
        var subscriptionOptions = new CreateSubscriptionOptions(topicName, subscriptionName)
        {
            // Lock duration - how long message is locked
            LockDuration = TimeSpan.FromMinutes(1),
            
            // Message expiration
            DefaultMessageTimeToLive = TimeSpan.FromDays(7),
            
            // Auto-delete idle subscriptions
            AutoDeleteOnIdle = TimeSpan.FromDays(30),
            
            // Session support - for ordered processing
            RequiresSession = false,
            
            Description = description
        };

        // Create the subscription
        await _adminClient.CreateSubscriptionAsync(subscriptionOptions);

        // Add the filter rule
        await _adminClient.CreateRuleAsync(topicName, subscriptionName, new CreateRuleOptions
        {
            Name = "order-filter",
            Filter = filter
        });

        _logger.LogInformation("Created subscription '{Subscription}' with filter", subscriptionName);
    }
}

Programmatic Setup

// Program.cs - DI registration
var connectionString = Environment.GetEnvironmentVariable("ServiceBusConnection");

// For sending messages
builder.Services.AddSingleton<ServiceBusClient>(sp =>
{
    return new ServiceBusClient(connectionString);
});

// For administration (creating topics/subscriptions)
builder.Services.AddSingleton<ServiceBusAdministrationClient>(sp =>
{
    return new ServiceBusAdministrationClient(connectionString);
});

Step 2: Publishing Messages

Sending to Topics

public class OrderEventPublisher
{
    private readonly ServiceBusSender _sender;
    private readonly ILogger<OrderEventPublisher> _logger;

    public OrderEventPublisher(ServiceBusClient client, ILogger<OrderEventPublisher> logger)
    {
        _sender = client.CreateSender("order-events");
        _logger = logger;
    }

    public async Task PublishOrderCreatedAsync(Order order)
    {
        var message = new ServiceBusMessage
        {
            // Message ID for deduplication
            MessageId = Guid.NewGuid().ToString(),
            
            // Subject for routing
            Subject = "OrderCreated",
            
            // Properties for filtering
            ApplicationProperties =
            {
                ["EventType"] = "order-created",
                ["OrderId"] = order.Id,
                ["CustomerId"] = order.CustomerId,
                ["Amount"] = order.TotalAmount,
                ["Region"] = order.Region,
                ["Priority"] = order.Priority,
                ["Timestamp"] = DateTime.UtcNow
            },
            
            // Body
            Body = new BinaryData(order),
            
            // Content type for serialization
            ContentType = "application/json"
        };

        // Add correlation ID for tracking
        message.CorrelationId = order.Id.ToString();

        // Schedule for later if needed
        // message.ScheduledEnqueueTime = DateTime.UtcNow.AddHours(1);

        await _sender.SendMessageAsync(message);

        _logger.LogInformation(
            "Published order created event for {OrderId}, Amount: {Amount}, Region: {Region}",
            order.Id, order.TotalAmount, order.Region);
    }

    public async Task PublishOrderCancelledAsync(Order order, string reason)
    {
        var message = new ServiceBusMessage
        {
            MessageId = Guid.NewGuid().ToString(),
            Subject = "OrderCancelled",
            ApplicationProperties =
            {
                ["EventType"] = "order-cancelled",
                ["OrderId"] = order.Id,
                ["CustomerId"] = order.CustomerId,
                ["Amount"] = order.TotalAmount,
                ["CancellationReason"] = reason,
                ["Timestamp"] = DateTime.UtcNow
            },
            Body = new BinaryData(order),
            ContentType = "application/json"
        };

        message.CorrelationId = order.Id.ToString();

        await _sender.SendMessageAsync(message);

        _logger.LogInformation("Published order cancelled event for {OrderId}", order.Id);
    }

    public async Task PublishBulkOrderEventsAsync(List<Order> orders)
    {
        var messages = orders.Select(order => new ServiceBusMessage
        {
            MessageId = Guid.NewGuid().ToString(),
            Subject = "OrderCreated",
            ApplicationProperties =
            {
                ["EventType"] = "order-created",
                ["OrderId"] = order.Id,
                ["CustomerId"] = order.CustomerId,
                ["Amount"] = order.TotalAmount,
                ["Region"] = order.Region,
                ["Timestamp"] = DateTime.UtcNow
            },
            Body = new BinaryData(order),
            ContentType = "application/json"
        });

        // Send in batch for efficiency
        using var batch = await _sender.CreateMessageBatchAsync();
        
        foreach (var message in messages)
        {
            if (!batch.TryAddMessage(message))
            {
                // Batch full - send current and create new
                await _sender.SendMessagesAsync(batch);
                batch.Clear();
                batch.TryAddMessage(message);
            }
        }

        // Send remaining
        if (batch.Count > 0)
        {
            await _sender.SendMessagesAsync(batch);
        }

        _logger.LogInformation("Published {Count} order events in bulk", orders.Count);
    }
}

Step 3: Subscribing with Filters

SQL Filters - Detailed

SQL filters allow sophisticated message routing based on message properties.

// SQL Filter Examples

// 1. Filter by event type
new SqlFilter("EventType = 'order-created'")

// 2. Multiple conditions
new SqlFilter("EventType = 'order-created' AND Amount > 1000")

// 3. IN clause
new SqlFilter("Region IN ('us-west', 'us-east', 'eu-west')")

// 4. String operations
new SqlFilter("CustomerId LIKE 'VIP-%'")

// 5. Null checks
new SqlFilter("Priority IS NOT NULL")

// 6. Date/time comparisons
new SqlFilter("Timestamp > '2024-01-01'")

// 7. Complex expressions
new SqlFilter("(Amount > 10000 OR Priority = 'urgent') AND Region = 'us-west'")

Subscription with Complex Filters

public class SubscriptionManager
{
    private readonly ServiceBusAdministrationClient _adminClient;

    // Create subscription with SQL filter
    public async Task CreateVipOrdersSubscriptionAsync()
    {
        // VIP customers with high-value orders
        var filter = new SqlFilter(@"
            (CustomerType = 'vip' AND Amount > 5000)
            OR 
            (CustomerType = 'enterprise' AND Amount > 10000)
            OR
            Priority = 'urgent'
        ");

        var options = new CreateSubscriptionOptions("order-events", "vip-orders")
        {
            LockDuration = TimeSpan.FromMinutes(2),
            DefaultMessageTimeToLive = TimeSpan.FromDays(14), // VIP orders keep longer
            MaxDeliveryCount = 10  // More retries for important orders
        };

        await _adminClient.CreateSubscriptionAsync(options);
        
        var ruleOptions = new CreateRuleOptions("vip-filter", filter);
        await _adminClient.CreateRuleAsync("order-events", "vip-orders", ruleOptions);
    }

    // Create subscription for specific region
    public async Task CreateRegionalSubscriptionAsync(string region)
    {
        var filter = new SqlFilter($"Region = '{region}'");

        var options = new CreateSubscriptionOptions("order-events", $"orders-{region.ToLower()}")
        {
            LockDuration = TimeSpan.FromMinutes(1),
            DefaultMessageTimeToLive = TimeSpan.FromDays(7)
        };

        await _adminClient.CreateSubscriptionAsync(options);
        
        var ruleOptions = new CreateRuleOptions("region-filter", filter);
        await _adminClient.CreateRuleAsync("order-events", $"orders-{region.ToLower()}", ruleOptions);
    }
}

Correlation Filters

Correlation filters are more efficient than SQL filters for simple property matching.

// Correlation Filter Examples

// 1. Match single property
new CorrelationFilter("EventType", "order-created")

// 2. Match multiple properties
var filter = new CorrelationFilter
{
    CorrelationId = "order-123",  // Match specific order
    Properties =
    {
        ["Region"] = "us-west",
        ["Priority"] = "urgent"
    }
};

// 3. Match by correlation ID only
new CorrelationFilter("correlation-id-value")

// When to use correlation filters:
// - Simpler matching (fewer properties)
// - Better performance (indexed)
// - When matching correlation ID
// Create subscription with correlation filter
public async Task CreateCorrelationFilterSubscriptionAsync()
{
    // Only receive messages for specific order type
    var filter = new CorrelationFilter
    {
        CorrelationId = "order-created",
        Properties =
        {
            ["Region"] = "eu-west"
        }
    };

    var options = new CreateSubscriptionOptions("order-events", "eu-orders")
    {
        // Correlation filter subscriptions can support sessions
        RequiresSession = false
    };

    await _adminClient.CreateSubscriptionAsync(options);
    
    var ruleOptions = new CreateRuleOptions("correlation-filter", filter);
    await _adminClient.CreateRuleAsync("order-events", "eu-orders", ruleOptions);
}

Step 4: Processing Messages from Subscriptions

Implementing Subscribers

public class EmailNotificationSubscriber
{
    private readonly ServiceBusProcessor _processor;
    private readonly IEmailService _emailService;
    private readonly ILogger<EmailNotificationSubscriber> _logger;

    public EmailNotificationSubscriber(
        ServiceBusClient client,
        IEmailService emailService,
        ILogger<EmailNotificationSubscriber> logger)
    {
        _emailService = emailService;
        _logger = logger;
        
        // Create processor for the subscription
        _processor = client.CreateProcessor(
            "order-events",           // Topic name
            "email-notifications",    // Subscription name
            new ServiceBusProcessorOptions
            {
                MaxConcurrentCalls = 10,
                AutoComplete = false,
                PrefetchCount = 20
            });
    }

    public async Task StartProcessingAsync()
    {
        _processor.ProcessMessageAsync += HandleMessageAsync;
        _processor.ProcessErrorAsync += HandleErrorAsync;
        
        await _processor.StartProcessingAsync();
        _logger.LogInformation("Started processing email notifications");
    }

    private async Task HandleMessageAsync(ProcessMessageEventArgs args)
    {
        var message = args.Message;
        
        try
        {
            // Extract properties
            var eventType = message.ApplicationProperties["EventType"]?.ToString();
            var orderId = message.ApplicationProperties["OrderId"]?.ToString();
            var customerId = message.ApplicationProperties["CustomerId"]?.ToString();

            _logger.LogInformation(
                "Processing {EventType} for order {OrderId}", 
                eventType, orderId);

            // Deserialize body
            var order = message.Body.ToObject<Order>();

            // Send appropriate email
            switch (eventType)
            {
                case "order-created":
                    await _emailService.SendOrderConfirmationAsync(order);
                    break;
                    
                case "order-cancelled":
                    var reason = message.ApplicationProperties["CancellationReason"]?.ToString();
                    await _emailService.SendOrderCancellationAsync(order, reason);
                    break;
                    
                default:
                    _logger.LogWarning("Unknown event type: {EventType}", eventType);
                    break;
            }

            await args.CompleteMessageAsync(message);
            
            _logger.LogInformation("Successfully processed order notification for {OrderId}", orderId);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to process message {MessageId}", message.MessageId);
            
            // Don't complete - let it be retried or dead-lettered
            await args.AbandonMessageAsync(message);
        }
    }

    private Task HandleErrorAsync(ProcessErrorEventArgs args)
    {
        _logger.LogError(args.Exception,
            "Error in email notification processor. Entity: {Entity}, ErrorSource: {ErrorSource}",
            args.EntityPath, args.ErrorSource);
            
        return Task.CompletedTask;
    }
}

Multi-Subscription Processing

public class ShippingOrderSubscriber
{
    private readonly ServiceBusProcessor _processor;
    private readonly IShippingService _shippingService;

    public ShippingOrderSubscriber(ServiceBusClient client)
    {
        // Can listen to multiple subscriptions if needed
        // But typically one processor per subscription
        
        _processor = client.CreateProcessor(
            "order-events",
            "shipping-orders",  // Only receives messages matching filter
            new ServiceBusProcessorOptions
            {
                MaxConcurrentCalls = 20,
                AutoComplete = false
            });
    }

    public async Task StartProcessingAsync()
    {
        _processor.ProcessMessageAsync += async args =>
        {
            var message = args.Message;
            var region = message.ApplicationProperties["Region"]?.ToString();
            var orderId = message.ApplicationProperties["OrderId"]?.ToString();

            _logger.LogInformation("Processing shipping for order {OrderId} in region {Region}",
                orderId, region);

            var order = message.Body.ToObject<Order>();

            // Create shipping label based on region
            var shippingLabel = await _shippingService.CreateShippingLabelAsync(order, region);

            // Update order with shipping info
            await _shippingService.UpdateOrderShippingAsync(order.Id, shippingLabel);

            await args.CompleteMessageAsync(message);
        };

        await _processor.StartProcessingAsync();
    }
}

Step 5: Advanced Patterns

Fan-Out with Multiple Subscriptions

// Publishing to trigger multiple workflows

public class OrderEventDispatcher
{
    private readonly ServiceBusSender _topicSender;

    public async Task DispatchOrderCreatedAsync(Order order)
    {
        // Single message, but multiple subscriptions will receive it
        var message = new ServiceBusMessage
        {
            MessageId = Guid.NewGuid().ToString(),
            Subject = "OrderCreated",
            ApplicationProperties =
            {
                ["EventType"] = "order-created",
                ["OrderId"] = order.Id,
                ["CustomerId"] = order.CustomerId,
                ["Amount"] = order.TotalAmount,
                ["Region"] = order.Region,
                ["Priority"] = order.Priority,
                ["CustomerType"] = order.CustomerType,  // For VIP filter
                ["Timestamp"] = DateTime.UtcNow
            },
            Body = new BinaryData(order)
        };

        await _topicSender.SendMessageAsync(message);

        // Message goes to ALL subscriptions that match:
        // - email-notifications (EventType filter)
        // - shipping-orders (Region filter)
        // - analytics-pipeline (all events)
        // - vip-orders (CustomerType + Amount filter)
        // - high-value-orders (Amount filter)
    }
}

Dead Letter Handling for Topics

public class DeadLetterProcessor
{
    private readonly ServiceBusReceiver _dlqReceiver;

    public DeadLetterProcessor(ServiceBusClient client)
    {
        // Access dead letter queue for a subscription
        _dlqReceiver = client.CreateReceiver(
            "order-events",
            "email-notifications",
            new ServiceBusReceiverOptions
            {
                SubQueue = SubQueue.DeadLetter
            });
    }

    public async Task ProcessDeadLettersAsync()
    {
        while (true)
        {
            var messages = await _dlqReceiver.ReceiveMessagesAsync(
                maxMessages: 10, 
                maxWaitTime: TimeSpan.FromSeconds(5));

            foreach (var message in messages)
            {
                _logger.LogWarning(
                    "Processing dead letter. Reason: {Reason}, Error: {Error}",
                    message.DeadLetterReason,
                    message.DeadLetterErrorDescription);

                // Analyze why it failed
                var reason = message.DeadLetterReason;
                var eventType = message.ApplicationProperties["EventType"];

                // Retry or archive
                if (ShouldRetry(message))
                {
                    await RetryMessageAsync(message);
                }
                else
                {
                    await ArchiveDeadLetterAsync(message);
                }

                await _dlqReceiver.CompleteMessageAsync(message);
            }

            if (messages.Count == 0) break;
        }
    }

    private bool ShouldRetry(ServiceBusMessage message)
    {
        var reason = message.DeadLetterReason;
        var deliveryCount = message.DeliveryCount;

        // Retry if less than 3 attempts
        return deliveryCount < 3 && 
               (reason == "MessagingEntityDisabled" || 
                reason == "MessageLockLost");
    }
}

Step 6: Testing and Monitoring

Subscription Testing

public class TopicSubscriptionTests
{
    private readonly ServiceBusAdministrationClient _adminClient;

    [Fact]
    public async Task Subscription_ReceivesMatchingMessages()
    {
        // Arrange - create test topic and subscription with filter
        var topicName = $"test-topic-{Guid.NewGuid():N}";
        await _adminClient.CreateTopicAsync(topicName);
        
        var subscriptionName = "test-subscription";
        await _adminClient.CreateSubscriptionAsync(topicName, subscriptionName);
        
        // Add filter: Amount > 5000
        await _adminClient.CreateRuleAsync(topicName, subscriptionName, 
            new CreateRuleOptions("amount-filter", 
                new SqlFilter("Amount > 5000")));

        // Act - send messages with different amounts
        var sender = _client.CreateSender(topicName);
        
        await sender.SendMessageAsync(CreateMessage(1000));  // Below filter
        await sender.SendMessageAsync(CreateMessage(6000)); // Above filter
        
        // Allow time for message routing
        await Task.Delay(TimeSpan.FromSeconds(2));

        // Assert - check subscription has expected messages
        var receiver = _client.CreateReceiver(topicName, subscriptionName);
        var messages = await receiver.PeekMessagesAsync(10);
        
        Assert.Single(messages); // Only the > 5000 message
        Assert.Equal(6000m, messages[0].ApplicationProperties["Amount"]);

        // Cleanup
        await _adminClient.DeleteTopicAsync(topicName);
    }

    private ServiceBusMessage CreateMessage(decimal amount)
    {
        return new ServiceBusMessage
        {
            ApplicationProperties = { ["Amount"] = amount }
        };
    }
}

Monitoring Subscriptions

public class SubscriptionMonitor
{
    private readonly ServiceBusAdministrationClient _adminClient;
    private readonly ILogger<SubscriptionMonitor> _logger;

    public async Task<SubscriptionMetrics> GetMetricsAsync(string topicName, string subscriptionName)
    {
        var subProperties = await _adminClient.GetSubscriptionAsync(topicName, subscriptionName);

        var dlqName = EntityNameFormatter.FormatDeadLetterQueuePath(subscriptionName);
        var dlqProperties = await _adminClient.GetQueueAsync(dlqName);

        return new SubscriptionMetrics
        {
            ActiveMessages = subProperties.Value.ActiveMessageCount,
            DeadLetterMessages = dlqProperties.Value.ActiveMessageCount,
            ScheduledMessages = subProperties.Value.ScheduledMessageCount,
            TransferMessages = subProperties.Value.TransferMessageCount,
            TransferDeadLetterMessages = subProperties.Value.TransferDeadLetterMessageCount,
            MessageCountDetails = subProperties.Value.MessageCountDetails
        };
    }

    public async Task CheckBacklogAsync(string topicName)
    {
        var topicProperties = await _adminClient.GetTopicAsync(topicName);
        
        var subscriptions = _adminClient.GetSubscriptionsAsync(topicName);
        
        await foreach (var sub in subscriptions)
        {
            var metrics = await GetMetricsAsync(topicName, sub.Name);
            
            if (metrics.ActiveMessages > 1000)
            {
                _logger.LogWarning(
                    "Subscription {Subscription} has {Count} messages - possible backlog!",
                    sub.Name, metrics.ActiveMessages);
            }
        }
    }
}

public class SubscriptionMetrics
{
    public long ActiveMessages { get; set; }
    public long DeadLetterMessages { get; set; }
    public long ScheduledMessages { get; set; }
    public long TransferMessages { get; set; }
    public long TransferDeadLetterMessages { get; set; }
    public MessageCountDetails MessageCountDetails { get; set; }
}

Best Practices Summary

PracticeWhyImplementation
Use SQL filters for complex routingFlexible matchingAmount > 1000 AND Region = 'us'
Use correlation filters for simple casesBetter performanceMatch correlation ID only
Filter at subscription levelReduce message processingDon't filter in handler
Monitor subscription depthDetect issues earlySet up alerts
Configure DLQ per subscriptionIsolate failuresEach subscription has own DLQ
Use partitioning for high throughputScale horizontallyPartitionCount = 16

Subscription Design Patterns

Patterns:

1. By Event Type
   - order-created → notifications
   - order-updated → inventory
   - order-cancelled → refund

2. By Region
   - us-west-orders → US shipping
   - eu-west-orders → EU shipping

3. By Customer Type
   - vip-orders → premium handling
   - enterprise-orders → dedicated queue

4. By Priority
   - urgent-orders → expedited processing
   - normal-orders → standard queue

Conclusion

Service Bus Topics provide powerful pub-sub capabilities:

Key takeaways:

  1. Design subscriptions based on consumer needs, not publisher structure
  2. Use SQL filters for complex routing, correlation filters for simple cases
  3. Monitor for backlogs and dead letter accumulation
  4. Test filters thoroughly - they determine what messages are delivered
  5. Consider message TTL and retention for debugging

Azure Integration Hub - Service Bus