Service Bus Topics, Subscriptions & Filters

Publish-Subscribe Messaging with Advanced Filtering


Introduction

Azure Service Bus Topics provide a powerful publish-subscribe (pub/sub) messaging pattern that enables one-to-many communication between components of your application. Unlike queues where each message is processed by a single consumer, topics allow multiple subscribers to receive their own copy of each message.

This comprehensive guide covers:

  • Core concepts — Topics, subscriptions, and how they work together
  • Implementation — Creating and managing topics and subscriptions
  • Filtering — SQL filters and correlation filters for message routing
  • Best practices — Designing efficient pub/sub architectures
  • Advanced patterns — Composite filters, actions, and optimization

Understanding Pub/Sub with Service Bus

How Topics Work

┌─────────────────────────────────────────────────────────────────────┐
│                    SERVICE BUS TOPICS ARCHITECTURE                  │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│   ┌──────────────┐                                                  │
│   │  Publisher   │                                                  │
│   │              │                                                  │
│   │  (Order      │                                                  │
│   │   Service)   │                                                  │
│   └──────┬───────┘                                                  │
│          │                                                          │
│          │ Send Message                                             │
│          ▼                                                          │
│   ┌─────────────────────────────────────────────────────────────┐    │
│   │                       TOPIC                                │    │
│   │                   "orders-topic"                          │    │
│   │                                                            │    │
│   │   ┌────────────────────────────────────────────────────┐  │    │
│   │   │           Message #1                               │  │    │
│   │   │  { OrderId: "123", Priority: "High",              │  │    │
│   │   │    Department: "Sales" }                           │  │    │
│   │   └────────────────────────────────────────────────────┘  │    │
│   └──────┬───────────────────────┬───────────────────────────┘    │
│          │                       │                                  │
│   ┌──────┴──────┐    ┌───────────┴───────────┐    ┌────────────┐  │
│   │ Subscription│    │    Subscription      │    │ Subscription│ │
│   │             │    │                      │    │            │ │
│   │ email-      │    │    analytics-        │    │ warehouse- │ │
│   │ notification│    │    processing        │    │ fulfillment│ │
│   └──────┬──────┘    └───────────┬──────────┘    └──────┬─────┘  │
│          │                       │                       │         │
│          ▼                       ▼                       ▼         │
│   ┌──────────────┐    ┌──────────────────┐    ┌──────────────┐   │
│   │ Send Email  │    │  Store in Data    │    │  Create     │   │
│   │ to Customer │    │  Lake for Reports │    │  Pick List  │   │
│   └──────────────┘    └──────────────────┘    └──────────────┘   │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

Why Use Topics?

ScenarioSolution
Multiple downstream systemsEach gets its own subscription
Different processing per consumerIndependent subscriptions
Event-driven architectureDecouple producers from consumers
Broadcast notificationsSend to all interested parties
Selective routingUse filters to target specific consumers

Create Topics and Subscriptions

Using Azure Portal

  1. Navigate to your Service Bus namespace
  2. Click "Topics" in left menu
  3. Click "+ Topic" to create new topic
  4. Configure name, size, and properties
  5. Create subscriptions under the topic

Using .NET SDK

using Azure.Messaging.ServiceBus;

// Create the client
var client = new ServiceBusClient(connectionString);

// Create topic
var topicClient = client.CreateTopic("orders-topic");
await topicClient.CreateAsync(new CreateTopicOptions
{
    MaxSizeInMegabytes = 1024,
    DefaultMessageTimeToLive = TimeSpan.FromDays(7),
    MaxDeliveryCount = 10,
    EnableBatchedOperations = true
});

// Create subscription - Email notifications
var emailSub = topicClient.CreateSubscription("email-notifications");
await emailSub.CreateAsync(new CreateSubscriptionOptions
{
    MaxDeliveryCount = 3,
    LockDuration = TimeSpan.FromMinutes(1),
    DefaultMessageTimeToLive = TimeSpan.FromDays(1)
});

// Create subscription - Analytics
var analyticsSub = topicClient.CreateSubscription("analytics-processing");
await analyticsSub.CreateAsync();

// Create subscription - Warehouse
var warehouseSub = topicClient.CreateSubscription("warehouse-fulfillment");
await warehouseSub.CreateAsync();

Using Azure CLI

# Create topic
az servicebus topic create \
  --name orders-topic \
  --namespace-name mynamespace \
  --resource-group myrg \
  --max-size 1024 \
  --default-message-time-to-live "P7D" \
  --max-delivery-count 10

# Create subscriptions
az servicebus topic subscription create \
  --name email-notifications \
  --topic-name orders-topic \
  --namespace-name mynamespace \
  --resource-group myrg

az servicebus topic subscription create \
  --name analytics-processing \
  --topic-name orders-topic \
  --namespace-name mynamespace \
  --resource-group myrg

az servicebus topic subscription create \
  --name warehouse-fulfillment \
  --topic-name orders-topic \
  --namespace-name mynamespace \
  --resource-group myrg

Publishing to Topics

Simple Message

var sender = client.CreateSender("orders-topic");

var message = new ServiceBusMessage("New Order #12345");
message.ContentType = "application/json";
message.MessageId = Guid.NewGuid().ToString();

await sender.SendMessageAsync(message);

Message with Properties

var order = new
{
    OrderId = "ORD-12345",
    CustomerId = "CUST-001",
    Total = 199.99,
    Items = 3
};

var message = new ServiceBusMessage(JsonSerializer.Serialize(order))
{
    ContentType = "application/json",
    Subject = "NewOrder",
    MessageId = Guid.NewGuid().ToString()
};

// Add custom properties for filtering
message.Properties["CustomerId"] = "CUST-001";
message.Properties["Priority"] = "High";
message.Properties["Department"] = "Sales";
message.Properties["Region"] = "US-East";

await sender.SendMessageAsync(message);

Batch Publishing

var messages = new List<ServiceBusMessage>();

for (int i = 0; i < 100; i++)
{
    var message = new ServiceBusMessage($"Order #{i}")
    {
        MessageId = Guid.NewGuid().ToString(),
        Properties =
        {
            ["Priority"] = i % 2 == 0 ? "High" : "Normal",
            ["Region"] = "US-West"
        }
    };
    messages.Add(message);
}

// Send as batch
await sender.SendMessagesAsync(messages);

Consuming from Subscriptions

Basic Processing

var processor = client.CreateProcessor("orders-topic", "email-notifications");

processor.ProcessMessageAsync += async args =>
{
    var message = args.Message;
    var body = message.Body.ToString();
    
    Console.WriteLine($"Received: {message.MessageId}");
    Console.WriteLine($"Body: {body}");
    
    // Process the message
    await SendEmailAsync(message);
    
    // Complete the message
    await args.CompleteMessageAsync(message);
};

processor.ProcessErrorAsync += args =>
{
    Console.WriteLine($"Error: {args.Exception.Message}");
    return Task.CompletedTask;
};

await processor.StartProcessingAsync();

// When done processing
await processor.StopProcessingAsync();

Processing All Subscriptions

// Process analytics subscription
var analyticsProcessor = client.CreateProcessor("orders-topic", "analytics-processing");
analyticsProcessor.ProcessMessageAsync += async args =>
{
    // Store for analytics
    await StoreForAnalyticsAsync(args.Message);
    await args.CompleteMessageAsync(args.Message);
};

// Process warehouse subscription
var warehouseProcessor = client.CreateProcessor("orders-topic", "warehouse-fulfillment");
warehouseProcessor.ProcessMessageAsync += async args =>
{
    // Create fulfillment task
    await CreateFulfillmentTaskAsync(args.Message);
    await args.CompleteMessageAsync(args.Message);
};

await Task.WhenAll(
    analyticsProcessor.StartProcessingAsync(),
    warehouseProcessor.StartProcessingAsync()
);

Filter Types

SQL Filters

SQL filters allow you to filter messages based on message properties using SQL-like syntax.

// Simple equality filter
var filter1 = new SqlFilter("CustomerId = 'CUST-001'");

// Multiple conditions
var filter2 = new SqlFilter("Priority = 'High' AND Region = 'US-East'");

// IN clause
var filter3 = new SqlFilter("Priority IN ('High', 'Critical', 'Urgent')");

// String operations
var filter4 = new SqlFilter("CustomerId LIKE 'CUST-%'");

// Numeric comparisons
var filter5 = new SqlFilter("TotalAmount > 1000");

// Complex expression
var filter6 = new SqlFilter(
    "(Priority = 'High' AND Region = 'US') OR " +
    "(Priority = 'Critical' AND Region = 'EU')"
);

// Add filter to subscription
await subscriptionClient.AddRuleAsync("high-priority-rule", filter6);

Correlation Filters

Correlation filters are more efficient than SQL filters for matching specific properties.

// Match single correlation ID
var correlation1 = new CorrelationFilter
{
    CorrelationId = "order-12345"
};

// Match multiple properties
var correlation2 = new CorrelationFilter
{
    CorrelationId = "order-12345",
    Properties =
    {
        ["Department"] = "Sales",
        ["Region"] = "US-East"
    }
};

// Add correlation filter
await subscriptionClient.AddRuleAsync("sales-rule", correlation2);

// Default rule (receives all messages)
var defaultRule = new TrueFilter(); // Matches all messages
await subscriptionClient.AddRuleAsync("$default", defaultRule);

Filter Actions

Filter actions can modify messages as they pass through the subscription.

// Add action to set properties
var filterWithAction = new SqlFilter("Priority = 'High'");
filterWithAction.Action = new SqlRuleAction(
    "SET Priority='Critical'; SET ProcessedBy='AlertSystem';"
);

await subscriptionClient.AddRuleAsync("priority-action", filterWithAction);

Rule Management

Default Rule

Every subscription has a default rule that accepts all messages:

// Get default rule
var rules = subscriptionClient.GetRulesAsync();
await foreach (var rule in rules)
{
    Console.WriteLine($"Rule: {rule.Name}");
}

// The default rule uses TrueFilter - accepts all messages
// Name: "$Default"

Add Custom Rules

// Create a rule with SQL filter
var rule = new CreateRuleOptions
{
    Name = "urgent-orders",
    Filter = new SqlFilter("Priority = 'Urgent'"),
    Action = new SqlRuleAction("SET SentTo='UrgentProcessing'")
};

await subscriptionClient.CreateRuleAsync(rule);

Remove Rules

// Remove specific rule
await subscriptionClient.DeleteRuleAsync("old-rule");

// Remove all rules except default
var rules = subscriptionClient.GetRulesAsync();
await foreach (var rule in rules)
{
    if (rule.Name != "$Default")
    {
        await subscriptionClient.DeleteRuleAsync(rule.Name);
    }
}

Subscription Properties

Dead Letter Topic

When messages can't be processed, they go to a dead letter topic:

var subscriptionOptions = new CreateSubscriptionOptions
{
    DeadLetterPublishing = true,
    MaxDeliveryCount = 3,
    LockDuration = TimeSpan.FromMinutes(1)
};

var subscription = topicClient.CreateSubscription("with-dlq", subscriptionOptions);

// Configure dead letter on existing subscription
var updateOptions = new UpdateSubscriptionOptions
{
    EnableDeadLetteringOnMessageExpiration = true,
    MaxDeliveryCount = 5
};
await subscriptionClient.UpdateAsync(updateOptions);

Message Expiration

var subscriptionOptions = new CreateSubscriptionOptions
{
    DefaultMessageTimeToLive = TimeSpan.FromDays(7),      // Messages expire after 7 days
    LockDuration = TimeSpan.FromMinutes(2),              // Lock duration for processing
    MaxDeliveryCount = 10                                // Max delivery attempts
};

Forwarding

// Auto-forward to another queue or topic
var subscriptionOptions = new CreateSubscriptionOptions
{
    ForwardTo = "processed-orders-queue",
    ForwardDeadLetteredMessagesTo = "dlq-orders"
};

Complete Example: Order Processing System

Publisher

public class OrderPublisher
{
    private readonly ServiceBusSender _sender;
    
    public async Task PublishOrderAsync(Order order)
    {
        var message = new ServiceBusMessage(JsonSerializer.Serialize(order))
        {
            Subject = "OrderCreated",
            MessageId = order.OrderId,
            Properties =
            {
                ["OrderType"] = order.Type,
                ["Priority"] = order.Priority,
                ["Region"] = order.Region,
                ["CustomerId"] = order.CustomerId
            }
        };
        
        await _sender.SendMessageAsync(message);
    }
}

Subscriber: Email Notifications

public class EmailNotificationProcessor
{
    public async Task ProcessAsync(ProcessMessageEventArgs args)
    {
        var message = args.Message;
        
        // Filter: High priority or VIP customers
        if (message.Properties["Priority"] == "High" || 
            IsVipCustomer(message.Properties["CustomerId"]))
        {
            await SendHighPriorityEmailAsync(message);
        }
        else
        {
            await SendStandardEmailAsync(message);
        }
        
        await args.CompleteMessageAsync(message);
    }
}

Subscriber: Analytics

public class AnalyticsProcessor
{
    public async Task ProcessAsync(ProcessMessageEventArgs args)
    {
        var order = JsonSerializer.Deserialize<Order>(args.Message.Body);
        
        // Store in data lake
        await StoreInDataLakeAsync(order);
        
        // Update real-time dashboard
        await UpdateDashboardMetricsAsync(order);
        
        await args.CompleteMessageAsync(args.Message);
    }
}

Subscriber: Warehouse

public class WarehouseProcessor
{
    public async Task ProcessAsync(ProcessMessageEventArgs args)
    {
        var order = JsonSerializer.Deserialize<Order>(args.Message.Body);
        
        // Skip orders that don't need fulfillment
        if (order.Type == "Digital" || order.Type == "Service")
        {
            await args.CompleteMessageAsync(args.Message);
            return;
        }
        
        // Create picking list
        await CreatePickingListAsync(order);
        
        // Send to warehouse system
        await NotifyWarehouseAsync(order);
        
        await args.CompleteMessageAsync(args.Message);
    }
}

Best Practices

PracticeDescription
Use correlation filtersMore efficient than SQL filters
Limit filter complexityComplex filters impact performance
Name subscriptions descriptivelyEasy to identify purpose
Configure dead letterHandle poison messages gracefully
Set appropriate TTLDon't keep messages longer than needed
Use batchingSend multiple messages efficiently
Monitor subscription metricsTrack message count, dead letters

Performance Considerations

// Configure for high throughput
var options = new CreateSubscriptionOptions
{
    EnableBatchedOperations = true,      // Batch send/receive
    MaxDeliveryCount = 10,                // Retry before dead letter
    LockDuration = TimeSpan.FromSeconds(30)  // Adequate processing time
};

Troubleshooting

Common Issues

IssueSolution
Messages not receivedCheck subscription exists, verify filters
Filter not workingEnsure property name matches exactly
Too many dead lettersIncrease MaxDeliveryCount, check processing logic
Performance issuesEnable batched operations, tune lock duration

Monitoring

# Get subscription metrics
az servicebus topic subscription show \
  --name email-notifications \
  --topic-name orders-topic \
  --namespace-name mynamespace \
  --resource-group myrg

# Check active message count
az servicebus subscription show \
  --name email-notifications \
  --topic-name orders-topic \
  --namespace-name mynamespace \
  --resource-group myrg \
  --query "messageCountDetails"

Related Topics


Azure Integration Hub - Intermediate Level