Azure Event Grid — Subscribe to Service Bus Events

Monitoring Service Bus with Event-Driven Alerts


Introduction

Azure Event Grid can subscribe to events from Azure Service Bus namespaces, enabling you to monitor message queues, detect issues, and trigger automated responses when specific conditions occur. This is particularly valuable for monitoring dead-letter queues and detecting messages that aren't being processed.

This comprehensive guide covers:

  • Event types — Available Service Bus events
  • Configuration — Enabling and subscribing to events
  • Implementation — Handling events in Azure Functions
  • Use cases — Real-world monitoring scenarios
  • Best practices — Effective event-driven monitoring

Available Service Bus Events

Event Types

Event TypeDescriptionWhen It Fires
Microsoft.ServiceBus.ActiveMessagesAvailableWithNoListenersMessages with no active consumersMessages accumulate without processing
Microsoft.ServiceBus.DLQMessagesExpiredMessages in DLQ expiredDead-lettered messages reach TTL
Microsoft.ServiceBus.PeeringsConnectedNamespace pairing connectedGeo-replication peering established
Microsoft.ServiceBus.PeeringsDisconnectedNamespace pairing disconnectedGeo-replication peering broken

Event Schema

{
  "eventType": "Microsoft.ServiceBus.ActiveMessagesAvailableWithNoListeners",
  "subject": "queues/orders-queue",
  "data": {
    "namespaceName": "mynamespace",
    "queueName": "orders-queue",
    "topicName": null,
    "subscriptionName": null,
    "messageCount": 150
  },
  "eventTime": "2024-01-15T10:30:00Z"
}

Enable Events on Service Bus

Azure Portal

  1. Navigate to your Service Bus namespace
  2. Select Events in the left menu
  3. Click Enable Events toggle
  4. Select which event types to emit:
    • Active messages without listeners
    • Dead letter messages expired

Azure CLI

# Enable events on namespace
az servicebus namespace update \
  --resource-group my-rg \
  --name my-namespace \
  --enable-event-grid true

ARM Template

{
  "resources": [
    {
      "type": "Microsoft.ServiceBus/namespaces",
      "name": "my-namespace",
      "properties": {
        "sku": {
          "name": "Standard"
        },
        "namespaceType": "Messaging"
      }
    }
  ],
  "outputs": {
    "eventGridTopic": {
      "type": "string",
      "value": "[reference(resourceId('Microsoft.ServiceBus/namespaces', 'my-namespace')).type]"
    }
  }
}

Create Event Subscription

CLI Command

az eventgrid event-subscription create \
  --name dlq-alerts \
  --resource-group my-rg \
  --namespace-name my-namespace \
  --topic-name ns \
  --endpoint-type webhook \
  --endpoint https://myfunction.azurewebsites.net/api/handler \
  --filter-include-subject-pattern "*/queues/*"

Azure Function Endpoint

# Get function app resource ID
FUNCTION_APP_ID="/subscriptions/xxx/resourceGroups/my-rg/providers/Microsoft.Web/sites/my-function-app"

az eventgrid event-subscription create \
  --name queue-monitor \
  --resource-group my-rg \
  --namespace-name my-namespace \
  --topic-name ns \
  --endpoint-type azurefunction \
  --endpoint "$FUNCTION_APP_ID" \
  --subject-filter "*/queues/*"

Logic Apps Endpoint

az eventgrid event-subscription create \
  --name process-queue-events \
  --resource-group my-rg \
  --namespace-name my-namespace \
  --topic-name ns \
  --endpoint-type logicapp \
  --endpoint "/subscriptions/xxx/resourceGroups/my-rg/providers/Microsoft.Logic/workflows/my-logic-app"

Filter Events

Filter by Queue/Topic

# Only orders queue
az eventgrid event-subscription create \
  --name orders-queue-alerts \
  --namespace-name my-namespace \
  --topic-name ns \
  --resource-group my-rg \
  --endpoint-type webhook \
  --endpoint https://... \
  --filter-include-subject-pattern "*/queues/orders"

Filter by Event Type

{
  "filter": {
    "includedEventTypes": [
      "Microsoft.ServiceBus.ActiveMessagesAvailableWithNoListeners"
    ]
  }
}

Handle Events in Azure Functions

Basic Function

using Azure.Messaging.EventGrid;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;

public class ServiceBusEventsFunction
{
    [FunctionName("ServiceBusEventHandler")]
    public async Task Run(
        [EventGridTrigger] EventGridEvent eventGridEvent,
        ILogger log)
    {
        log.LogInformation("Received Service Bus event: {EventType}", 
            eventGridEvent.EventType);
        
        // Parse event data
        var data = eventGridEvent.Data.ToObjectFromJson<ServiceBusEventData>();
        
        log.LogInformation("Namespace: {Namespace}", data.NamespaceName);
        log.LogInformation("Queue: {Queue}, Messages: {Count}", 
            data.QueueName, data.MessageCount);
    }
}

Event Data Model

public class ServiceBusEventData
{
    public string NamespaceName { get; set; }
    public string QueueName { get; set; }
    public string TopicName { get; set; }
    public string SubscriptionName { get; set; }
    public long MessageCount { get; set; }
}

Handle All Event Types

[FunctionName("ServiceBusAllEvents")]
public async Task Run(
    [EventGridTrigger] EventGridEvent[] events,
    ILogger log)
{
    foreach (var evt in events)
    {
        switch (evt.EventType)
        {
            case "Microsoft.ServiceBus.ActiveMessagesAvailableWithNoListeners":
                await HandleNoListenerMessageAsync(evt, log);
                break;
                
            case "Microsoft.ServiceBus.DLQMessagesExpired":
                await HandleDlqExpiredMessageAsync(evt, log);
                break;
                
            default:
                log.LogWarning("Unknown event type: {Type}", evt.EventType);
                break;
        }
    }
}

Real-World Use Cases

Use Case 1: DLQ Alerting

private async Task HandleDlqExpiredMessageAsync(EventGridEvent evt, ILogger log)
{
    var data = evt.Data.ToObjectFromJson<ServiceBusEventData>();
    
    log.LogWarning("DLQ messages expired! Queue: {Queue}, Count: {Count}",
        data.QueueName, data.MessageCount);
    
    // Send alert
    await _alertService.SendAlertAsync(new AlertRequest
    {
        Severity = "High",
        Title = "Service Bus DLQ Messages Expired",
        Message = $"Queue: {data.QueueName}, Messages: {data.MessageCount}",
        Recipients = new[] { "ops-team@company.com" }
    });
    
    // Store for analysis
    await _logService.LogDlqExpiryAsync(data);
}

Use Case 2: Stuck Message Detection

private async Task HandleNoListenerMessageAsync(EventGridEvent evt, ILogger log)
{
    var data = evt.Data.ToObjectFromJson<ServiceBusEventData>();
    
    log.LogWarning("Messages accumulating without listener! Queue: {Queue}, Count: {Count}",
        data.QueueName, data.MessageCount);
    
    if (data.MessageCount > 100)
    {
        // High severity - notify immediately
        await _pagerDuty.TriggerAsync(new IncidentRequest
        {
            Title = $"Service Bus Queue Backup: {data.QueueName}",
            Severity = "critical",
            Body = $"Queue has {data.MessageCount} unprocessed messages"
        });
    }
    else
    {
        // Lower severity - just email
        await _emailService.SendAsync("devops@company.com",
            $"Queue backup: {data.QueueName}",
            $"Messages: {data.MessageCount}");
    }
}

Use Case 3: Auto-Scaling Trigger

private async Task HandleNoListenerMessageAsync(EventGridEvent evt, ILogger log)
{
    var data = evt.Data.ToObjectFromJson<ServiceBusEventData>();
    
    // Get current consumer count
    var currentScale = await _scaleController.GetCurrentScaleAsync(data.QueueName);
    
    if (currentScale < 5) // Max 5 instances
    {
        // Scale out
        await _scaleController.ScaleOutAsync(
            data.QueueName, 
            currentScale + 1);
        
        log.LogInformation("Scaled out to {Count} instances", currentScale + 1);
    }
}

Use Case 4: Dead-Letter Processing

[FunctionName("ProcessDlqExpired")]
public async Task Run(
    [EventGridTrigger] EventGridEvent evt,
    [ServiceBus("%DeadLetterQueue%")] IAsyncCollector<ServiceBusMessage> dlqOutput,
    ILogger log)
{
    var data = evt.Data.ToObjectFromJson<ServiceBusEventData>();
    
    // Check if we need to reprocess or archive
    if (data.MessageCount > 0)
    {
        // Archive to storage before they expire
        await ArchiveDeadLetterMessagesAsync(data.QueueName);
        
        log.LogInformation("Archived {Count} messages from {Queue}",
            data.MessageCount, data.QueueName);
    }
}

Complete Logic Apps Workflow

{
  "definition": {
    "triggers": {
      "When_Service_Bus_event": {
        "type": "EventGridTrigger",
        "inputs": {
          "topic": "/subscriptions/xxx/resourceGroups/my-rg/providers/Microsoft.EventGrid/topics/servicebus-events"
        }
      }
    },
    "actions": {
      "Switch_on_event_type": {
        "type": "Switch",
        "expression": "@triggerBody().eventType",
        "cases": {
          "DLQ_expired": {
            "actions": {
              "Send_high_priority_alert": {
                "type": "SendEmail",
                "inputs": {
                  "body": "<p>DLQ messages expired: @triggerBody().data.messageCount</p>"
                }
              }
            }
          },
          "No_listener": {
            "actions": {
              "Check_message_count": {
                "type": "Condition",
                "expression": "@greater(triggerBody().data.messageCount, 100)",
                "actions": {
                  "Send_critical_alert": {
                    "type": "SendEmail"
                  }
                },
                "else": {
                  "Log_warning": {
                    "type": "Log"
                  }
                }
              }
            }
          }
        }
      }
    }
  }
}

Monitoring and Alerts

Azure Monitor Metrics

# Get Service Bus metrics
az monitor metrics list \
  --resource-group my-rg \
  --resource-type Microsoft.ServiceBus/namespaces \
  --metric "DeadLetterMessages"

Configure Alerts

# Create alert for DLQ messages
az monitor metrics alert create \
  --name sb-dlq-alert \
  --resource-group my-rg \
  --condition "count > 0" \
  --description "DLQ messages present"

Best Practices

PracticeDescription
Enable on production namespacesMonitor critical queues
Use filteringSubscribe to specific queues
Set up multiple subscriptionsDifferent handlers for different events
Implement retry logicHandle transient failures
Correlate with other logsLink to Application Insights

Related Topics


Azure Integration Hub - Intermediate Level