Service Bus Topics, Subscriptions & Filters
Publish-Subscribe Messaging with Advanced Filtering
Introduction
Azure Service Bus Topics provide a powerful publish-subscribe (pub/sub) messaging pattern that enables one-to-many communication between components of your application. Unlike queues where each message is processed by a single consumer, topics allow multiple subscribers to receive their own copy of each message.
This comprehensive guide covers:
- Core concepts — Topics, subscriptions, and how they work together
- Implementation — Creating and managing topics and subscriptions
- Filtering — SQL filters and correlation filters for message routing
- Best practices — Designing efficient pub/sub architectures
- Advanced patterns — Composite filters, actions, and optimization
Understanding Pub/Sub with Service Bus
How Topics Work
┌─────────────────────────────────────────────────────────────────────┐
│ SERVICE BUS TOPICS ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ │
│ │ Publisher │ │
│ │ │ │
│ │ (Order │ │
│ │ Service) │ │
│ └──────┬───────┘ │
│ │ │
│ │ Send Message │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ TOPIC │ │
│ │ "orders-topic" │ │
│ │ │ │
│ │ ┌────────────────────────────────────────────────────┐ │ │
│ │ │ Message #1 │ │ │
│ │ │ { OrderId: "123", Priority: "High", │ │ │
│ │ │ Department: "Sales" } │ │ │
│ │ └────────────────────────────────────────────────────┘ │ │
│ └──────┬───────────────────────┬───────────────────────────┘ │
│ │ │ │
│ ┌──────┴──────┐ ┌───────────┴───────────┐ ┌────────────┐ │
│ │ Subscription│ │ Subscription │ │ Subscription│ │
│ │ │ │ │ │ │ │
│ │ email- │ │ analytics- │ │ warehouse- │ │
│ │ notification│ │ processing │ │ fulfillment│ │
│ └──────┬──────┘ └───────────┬──────────┘ └──────┬─────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────────┐ ┌──────────────┐ │
│ │ Send Email │ │ Store in Data │ │ Create │ │
│ │ to Customer │ │ Lake for Reports │ │ Pick List │ │
│ └──────────────┘ └──────────────────┘ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
Why Use Topics?
| Scenario | Solution |
|---|---|
| Multiple downstream systems | Each gets its own subscription |
| Different processing per consumer | Independent subscriptions |
| Event-driven architecture | Decouple producers from consumers |
| Broadcast notifications | Send to all interested parties |
| Selective routing | Use filters to target specific consumers |
Create Topics and Subscriptions
Using Azure Portal
- Navigate to your Service Bus namespace
- Click "Topics" in left menu
- Click "+ Topic" to create new topic
- Configure name, size, and properties
- Create subscriptions under the topic
Using .NET SDK
using Azure.Messaging.ServiceBus;
// Create the client
var client = new ServiceBusClient(connectionString);
// Create topic
var topicClient = client.CreateTopic("orders-topic");
await topicClient.CreateAsync(new CreateTopicOptions
{
MaxSizeInMegabytes = 1024,
DefaultMessageTimeToLive = TimeSpan.FromDays(7),
MaxDeliveryCount = 10,
EnableBatchedOperations = true
});
// Create subscription - Email notifications
var emailSub = topicClient.CreateSubscription("email-notifications");
await emailSub.CreateAsync(new CreateSubscriptionOptions
{
MaxDeliveryCount = 3,
LockDuration = TimeSpan.FromMinutes(1),
DefaultMessageTimeToLive = TimeSpan.FromDays(1)
});
// Create subscription - Analytics
var analyticsSub = topicClient.CreateSubscription("analytics-processing");
await analyticsSub.CreateAsync();
// Create subscription - Warehouse
var warehouseSub = topicClient.CreateSubscription("warehouse-fulfillment");
await warehouseSub.CreateAsync();
Using Azure CLI
# Create topic
az servicebus topic create \
--name orders-topic \
--namespace-name mynamespace \
--resource-group myrg \
--max-size 1024 \
--default-message-time-to-live "P7D" \
--max-delivery-count 10
# Create subscriptions
az servicebus topic subscription create \
--name email-notifications \
--topic-name orders-topic \
--namespace-name mynamespace \
--resource-group myrg
az servicebus topic subscription create \
--name analytics-processing \
--topic-name orders-topic \
--namespace-name mynamespace \
--resource-group myrg
az servicebus topic subscription create \
--name warehouse-fulfillment \
--topic-name orders-topic \
--namespace-name mynamespace \
--resource-group myrg
Publishing to Topics
Simple Message
var sender = client.CreateSender("orders-topic");
var message = new ServiceBusMessage("New Order #12345");
message.ContentType = "application/json";
message.MessageId = Guid.NewGuid().ToString();
await sender.SendMessageAsync(message);
Message with Properties
var order = new
{
OrderId = "ORD-12345",
CustomerId = "CUST-001",
Total = 199.99,
Items = 3
};
var message = new ServiceBusMessage(JsonSerializer.Serialize(order))
{
ContentType = "application/json",
Subject = "NewOrder",
MessageId = Guid.NewGuid().ToString()
};
// Add custom properties for filtering
message.Properties["CustomerId"] = "CUST-001";
message.Properties["Priority"] = "High";
message.Properties["Department"] = "Sales";
message.Properties["Region"] = "US-East";
await sender.SendMessageAsync(message);
Batch Publishing
var messages = new List<ServiceBusMessage>();
for (int i = 0; i < 100; i++)
{
var message = new ServiceBusMessage($"Order #{i}")
{
MessageId = Guid.NewGuid().ToString(),
Properties =
{
["Priority"] = i % 2 == 0 ? "High" : "Normal",
["Region"] = "US-West"
}
};
messages.Add(message);
}
// Send as batch
await sender.SendMessagesAsync(messages);
Consuming from Subscriptions
Basic Processing
var processor = client.CreateProcessor("orders-topic", "email-notifications");
processor.ProcessMessageAsync += async args =>
{
var message = args.Message;
var body = message.Body.ToString();
Console.WriteLine($"Received: {message.MessageId}");
Console.WriteLine($"Body: {body}");
// Process the message
await SendEmailAsync(message);
// Complete the message
await args.CompleteMessageAsync(message);
};
processor.ProcessErrorAsync += args =>
{
Console.WriteLine($"Error: {args.Exception.Message}");
return Task.CompletedTask;
};
await processor.StartProcessingAsync();
// When done processing
await processor.StopProcessingAsync();
Processing All Subscriptions
// Process analytics subscription
var analyticsProcessor = client.CreateProcessor("orders-topic", "analytics-processing");
analyticsProcessor.ProcessMessageAsync += async args =>
{
// Store for analytics
await StoreForAnalyticsAsync(args.Message);
await args.CompleteMessageAsync(args.Message);
};
// Process warehouse subscription
var warehouseProcessor = client.CreateProcessor("orders-topic", "warehouse-fulfillment");
warehouseProcessor.ProcessMessageAsync += async args =>
{
// Create fulfillment task
await CreateFulfillmentTaskAsync(args.Message);
await args.CompleteMessageAsync(args.Message);
};
await Task.WhenAll(
analyticsProcessor.StartProcessingAsync(),
warehouseProcessor.StartProcessingAsync()
);
Filter Types
SQL Filters
SQL filters allow you to filter messages based on message properties using SQL-like syntax.
// Simple equality filter
var filter1 = new SqlFilter("CustomerId = 'CUST-001'");
// Multiple conditions
var filter2 = new SqlFilter("Priority = 'High' AND Region = 'US-East'");
// IN clause
var filter3 = new SqlFilter("Priority IN ('High', 'Critical', 'Urgent')");
// String operations
var filter4 = new SqlFilter("CustomerId LIKE 'CUST-%'");
// Numeric comparisons
var filter5 = new SqlFilter("TotalAmount > 1000");
// Complex expression
var filter6 = new SqlFilter(
"(Priority = 'High' AND Region = 'US') OR " +
"(Priority = 'Critical' AND Region = 'EU')"
);
// Add filter to subscription
await subscriptionClient.AddRuleAsync("high-priority-rule", filter6);
Correlation Filters
Correlation filters are more efficient than SQL filters for matching specific properties.
// Match single correlation ID
var correlation1 = new CorrelationFilter
{
CorrelationId = "order-12345"
};
// Match multiple properties
var correlation2 = new CorrelationFilter
{
CorrelationId = "order-12345",
Properties =
{
["Department"] = "Sales",
["Region"] = "US-East"
}
};
// Add correlation filter
await subscriptionClient.AddRuleAsync("sales-rule", correlation2);
// Default rule (receives all messages)
var defaultRule = new TrueFilter(); // Matches all messages
await subscriptionClient.AddRuleAsync("$default", defaultRule);
Filter Actions
Filter actions can modify messages as they pass through the subscription.
// Add action to set properties
var filterWithAction = new SqlFilter("Priority = 'High'");
filterWithAction.Action = new SqlRuleAction(
"SET Priority='Critical'; SET ProcessedBy='AlertSystem';"
);
await subscriptionClient.AddRuleAsync("priority-action", filterWithAction);
Rule Management
Default Rule
Every subscription has a default rule that accepts all messages:
// Get default rule
var rules = subscriptionClient.GetRulesAsync();
await foreach (var rule in rules)
{
Console.WriteLine($"Rule: {rule.Name}");
}
// The default rule uses TrueFilter - accepts all messages
// Name: "$Default"
Add Custom Rules
// Create a rule with SQL filter
var rule = new CreateRuleOptions
{
Name = "urgent-orders",
Filter = new SqlFilter("Priority = 'Urgent'"),
Action = new SqlRuleAction("SET SentTo='UrgentProcessing'")
};
await subscriptionClient.CreateRuleAsync(rule);
Remove Rules
// Remove specific rule
await subscriptionClient.DeleteRuleAsync("old-rule");
// Remove all rules except default
var rules = subscriptionClient.GetRulesAsync();
await foreach (var rule in rules)
{
if (rule.Name != "$Default")
{
await subscriptionClient.DeleteRuleAsync(rule.Name);
}
}
Subscription Properties
Dead Letter Topic
When messages can't be processed, they go to a dead letter topic:
var subscriptionOptions = new CreateSubscriptionOptions
{
DeadLetterPublishing = true,
MaxDeliveryCount = 3,
LockDuration = TimeSpan.FromMinutes(1)
};
var subscription = topicClient.CreateSubscription("with-dlq", subscriptionOptions);
// Configure dead letter on existing subscription
var updateOptions = new UpdateSubscriptionOptions
{
EnableDeadLetteringOnMessageExpiration = true,
MaxDeliveryCount = 5
};
await subscriptionClient.UpdateAsync(updateOptions);
Message Expiration
var subscriptionOptions = new CreateSubscriptionOptions
{
DefaultMessageTimeToLive = TimeSpan.FromDays(7), // Messages expire after 7 days
LockDuration = TimeSpan.FromMinutes(2), // Lock duration for processing
MaxDeliveryCount = 10 // Max delivery attempts
};
Forwarding
// Auto-forward to another queue or topic
var subscriptionOptions = new CreateSubscriptionOptions
{
ForwardTo = "processed-orders-queue",
ForwardDeadLetteredMessagesTo = "dlq-orders"
};
Complete Example: Order Processing System
Publisher
public class OrderPublisher
{
private readonly ServiceBusSender _sender;
public async Task PublishOrderAsync(Order order)
{
var message = new ServiceBusMessage(JsonSerializer.Serialize(order))
{
Subject = "OrderCreated",
MessageId = order.OrderId,
Properties =
{
["OrderType"] = order.Type,
["Priority"] = order.Priority,
["Region"] = order.Region,
["CustomerId"] = order.CustomerId
}
};
await _sender.SendMessageAsync(message);
}
}
Subscriber: Email Notifications
public class EmailNotificationProcessor
{
public async Task ProcessAsync(ProcessMessageEventArgs args)
{
var message = args.Message;
// Filter: High priority or VIP customers
if (message.Properties["Priority"] == "High" ||
IsVipCustomer(message.Properties["CustomerId"]))
{
await SendHighPriorityEmailAsync(message);
}
else
{
await SendStandardEmailAsync(message);
}
await args.CompleteMessageAsync(message);
}
}
Subscriber: Analytics
public class AnalyticsProcessor
{
public async Task ProcessAsync(ProcessMessageEventArgs args)
{
var order = JsonSerializer.Deserialize<Order>(args.Message.Body);
// Store in data lake
await StoreInDataLakeAsync(order);
// Update real-time dashboard
await UpdateDashboardMetricsAsync(order);
await args.CompleteMessageAsync(args.Message);
}
}
Subscriber: Warehouse
public class WarehouseProcessor
{
public async Task ProcessAsync(ProcessMessageEventArgs args)
{
var order = JsonSerializer.Deserialize<Order>(args.Message.Body);
// Skip orders that don't need fulfillment
if (order.Type == "Digital" || order.Type == "Service")
{
await args.CompleteMessageAsync(args.Message);
return;
}
// Create picking list
await CreatePickingListAsync(order);
// Send to warehouse system
await NotifyWarehouseAsync(order);
await args.CompleteMessageAsync(args.Message);
}
}
Best Practices
| Practice | Description |
|---|---|
| Use correlation filters | More efficient than SQL filters |
| Limit filter complexity | Complex filters impact performance |
| Name subscriptions descriptively | Easy to identify purpose |
| Configure dead letter | Handle poison messages gracefully |
| Set appropriate TTL | Don't keep messages longer than needed |
| Use batching | Send multiple messages efficiently |
| Monitor subscription metrics | Track message count, dead letters |
Performance Considerations
// Configure for high throughput
var options = new CreateSubscriptionOptions
{
EnableBatchedOperations = true, // Batch send/receive
MaxDeliveryCount = 10, // Retry before dead letter
LockDuration = TimeSpan.FromSeconds(30) // Adequate processing time
};
Troubleshooting
Common Issues
| Issue | Solution |
|---|---|
| Messages not received | Check subscription exists, verify filters |
| Filter not working | Ensure property name matches exactly |
| Too many dead letters | Increase MaxDeliveryCount, check processing logic |
| Performance issues | Enable batched operations, tune lock duration |
Monitoring
# Get subscription metrics
az servicebus topic subscription show \
--name email-notifications \
--topic-name orders-topic \
--namespace-name mynamespace \
--resource-group myrg
# Check active message count
az servicebus subscription show \
--name email-notifications \
--topic-name orders-topic \
--namespace-name mynamespace \
--resource-group myrg \
--query "messageCountDetails"
Related Topics
- Dead Letter Queue — Handling failed messages
- Message Sessions — Processing related messages
- Retry Policies — Handling transient failures
Azure Integration Hub - Intermediate Level