Event-Driven Architecture with Azure Event Grid
Why Event-Driven Architecture?
Traditional request-response architectures create tight coupling:
Traditional Architecture (Tight Coupling):
┌─────────────┐ ┌─────────────┐
│ Service │────────▶│ Service │
│ A │ │ B │
└─────────────┘ └─────────────┘
│ │
│ Service A must know:
│ - Service B's location
│ - Service B's API
│ - When B is available
│ - How to handle B's errors
▼
Problems:
- Changes to B break A
- A can't function if B is down
- Hard to scale independently
- Complex dependency management
Event-Driven Architecture (Loose Coupling):
┌─────────────┐ ┌─────────────┐
│ Service │────────▶ │ Event │──────▶
│ A │ Event │ Grid │ │
└─────────────┘ └─────────────┘ │
│
┌──────────────────────────────────────────┤
│ Subscribers │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Service │ │ Service │
│ B │ │ C │
└─────────────┘ └─────────────┘
Benefits:
- A doesn't know about B or C
- B and C can be added/removed without changing A
- Each service scales independently
- Failure in one doesn't cascade
Understanding Event Grid Concepts
Event Grid Architecture
┌─────────────────────────────────────────────────────────────────────────────┐
│ Event Grid Architecture │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────┐ ┌─────────────────┐
│ Publishers │ │ Subscribers │
│ │ │ │
│ - Azure │ │ - Azure │
│ Resources │ │ Functions │
│ - Custom Apps │ │ - Webhooks │
│ - 3rd Party │ │ - Logic Apps │
│ │ │ - Service Bus │
└────────┬────────┘ └────────┬────────┘
│ │
│ ┌────────────────────────┐ │
│ │ │ │
└───▶│ Event Grid │◀──┘
│ - Topics │
│ - Domains │
│ - Namespaces │
│ │
│ Event Delivery │
│ - Push-based │
│ - Retry policies │
│ - Dead-lettering │
└────────────────────────┘
Event Structure:
{
"id": "unique-id",
"topic": "/subscriptions/.../topics/orders",
"subject": "orders/new",
"eventType": "Order.Created",
"eventTime": "2024-01-01T12:00:00Z",
"data": { ... },
"dataVersion": "1.0"
}
Event Types
| Event Type | Description | Use Case |
|---|---|---|
| Microsoft.Resources.ResourceWriteSuccess | Resource created/updated | Infrastructure automation |
| Microsoft.Storage.BlobCreated | Blob created | File processing |
| Microsoft.EventGrid.SubscriptionValidation | Validate webhook | Endpoint verification |
| Custom events | Your application events | Business logic |
Step 1: Publishing Events
Using Event Grid Client
using Azure.Messaging.EventGrid;
using Azure.Identity;
public class OrderEventPublisher
{
private readonly EventGridPublisherClient _publisher;
private readonly ILogger<OrderEventPublisher> _logger;
public OrderEventPublisher(
EventGridPublisherClient publisher,
ILogger<OrderEventPublisher> logger)
{
_publisher = publisher;
_logger = logger;
}
public async Task PublishOrderCreatedAsync(Order order)
{
var eventGridEvent = new EventGridEvent(
subject: $"orders/{order.Id}",
eventType: "Order.Created",
dataVersion: "1.0",
data: new OrderCreatedEventData
{
OrderId = order.Id,
CustomerId = order.CustomerId,
TotalAmount = order.TotalAmount,
ItemCount = order.Items.Count,
Region = order.Region
});
await _publisher.SendEventAsync(eventGridEvent);
_logger.LogInformation(
"Published Order.Created event for {OrderId} to topic",
order.Id);
}
public async Task PublishOrderCancelledAsync(Order order, string reason)
{
var eventGridEvent = new EventGridEvent(
subject: $"orders/{order.Id}",
eventType: "Order.Cancelled",
dataVersion: "1.0",
data: new OrderCancelledEventData
{
OrderId = order.Id,
CustomerId = order.CustomerId,
CancellationReason = reason,
RefundAmount = order.TotalAmount
});
await _publisher.SendEventAsync(eventGridEvent);
_logger.LogInformation(
"Published Order.Cancelled event for {OrderId}",
order.Id);
}
public async Task PublishBulkEventsAsync(List<Order> orders)
{
var events = orders.Select(order => new EventGridEvent(
subject: $"orders/{order.Id}",
eventType: "Order.Created",
dataVersion: "1.0",
data: new OrderCreatedEventData
{
OrderId = order.Id,
CustomerId = order.CustomerId,
TotalAmount = order.TotalAmount
})).ToList();
await _publisher.SendEventsAsync(events);
_logger.LogInformation(
"Published {Count} order events to topic",
events.Count);
}
}
// Event data classes
public class OrderCreatedEventData
{
public Guid OrderId { get; set; }
public Guid CustomerId { get; set; }
public decimal TotalAmount { get; set; }
public int ItemCount { get; set; }
public string Region { get; set; }
}
public class OrderCancelledEventData
{
public Guid OrderId { get; set; }
public Guid CustomerId { get; set; }
public string CancellationReason { get; set; }
public decimal RefundAmount { get; set; }
}
Publishing via Cloud Events
using CloudNative.CloudEvents;
// Using CloudEvents format (more portable)
public async Task PublishWithCloudEventsAsync(Order order)
{
var cloudEvent = new CloudEvent(
source: new Uri("https://orders.mycompany.com"),
type: "Order.Created",
dataContentType: "application/json",
data: new
{
orderId = order.Id,
customerId = order.CustomerId,
totalAmount = order.TotalAmount
});
// Add custom extension attributes
cloudEvent["region"] = order.Region;
cloudEvent["priority"] = order.Priority;
var cloudEventFormatter = new JsonCloudEventFormatter();
var content = cloudEventFormatter.EncodeStructuredEvent(cloudEvent);
var httpClient = new HttpClient();
httpClient.DefaultRequestHeaders.Add("ce-specversion", "1.0");
await httpClient.PostAsync(
"https://myapp.eventgrid.azure.net/api/events",
content);
}
Step 2: Subscribing to Events
Creating Event Subscriptions
using Azure.ResourceManager.EventGrid;
public class EventSubscriptionManager
{
private readonly EventGridManagementClient _client;
private readonly ILogger<EventSubscriptionManager> _logger;
public EventSubscriptionManager(
EventGridManagementClient client,
ILogger<EventSubscriptionManager> logger)
{
_client = client;
_logger = logger;
}
public async Task CreateWebhookSubscriptionAsync(
string resourceGroup,
string topicName,
string subscriptionName,
string webhookUrl)
{
var subscription = new EventSubscription
{
Destination = new WebHookEventSubscriptionDestination
{
EndpointUrl = new Uri(webhookUrl),
// For webhook validation
EndpointType = EventSubscriptionDestinationType.WebHook
},
Filter = new EventSubscriptionFilter
{
// Subscribe to specific event types
IncludedEventTypes = new[]
{
"Order.Created",
"Order.Cancelled",
"Order.Shipped"
},
// Filter by subject prefix
SubjectBeginsWith = "orders/",
// Enable advanced filtering
EnableAdvancedFilteringOnArrays = true
},
// Retry policy
RetryPolicy = new EventSubscriptionRetryPolicy
{
MaxDeliveryAttempts = 3,
EventTimeToLive = TimeSpan.FromMinutes(60),
MinRetryInterval = TimeSpan.FromSeconds(10),
MaxRetryInterval = TimeSpan.FromMinutes(30)
},
// Dead letter destination
DeadLetterDestination = new AzureBlobDeadLetterDestination
{
StorageAccountUri = new Uri("https://mystorageaccount.blob.core.windows.net/deadletter"),
BlobContainerName = "events"
}
};
var result = await _client.EventSubscriptions.CreateOrUpdateAsync(
resourceGroup,
topicName,
subscriptionName,
subscription);
_logger.LogInformation(
"Created event subscription {SubscriptionName} for topic {TopicName}",
subscriptionName, topicName);
}
public async Task CreateFunctionSubscriptionAsync(
string resourceGroup,
string topicName,
string functionAppName,
string functionName)
{
var subscription = new EventSubscription
{
Destination = new WebHookEventSubscriptionDestination
{
// Point to Azure Function
EndpointUrl = new Uri($"https://{functionAppName}.azurewebsites.net/runtime/webhooks/httptrigger?functionName={functionName}")
},
Filter = new EventSubscriptionFilter
{
IncludedEventTypes = new[] { "Order.*" } // All Order events
}
};
await _client.EventSubscriptions.CreateOrUpdateAsync(
resourceGroup,
topicName,
$"func-{functionName}",
subscription);
}
public async Task CreateServiceBusSubscriptionAsync(
string resourceGroup,
string topicName,
string serviceBusNamespace,
string queueName)
{
var subscription = new EventSubscription
{
Destination = new ServiceBusQueueEventSubscriptionDestination
{
ResourceId = new ResourceIdentifier(
$"subscriptions/.../resourceGroups/{resourceGroup}/providers/Microsoft.ServiceBus/namespaces/{serviceBusNamespace}/queues/{queueName}")
},
Filter = new EventSubscriptionFilter
{
IncludedEventTypes = new[] { "Order.*" }
}
};
await _client.EventSubscriptions.CreateOrUpdateAsync(
resourceGroup,
topicName,
$"sb-{queueName}",
subscription);
}
}
Step 3: Handling Events in Azure Functions
Event Grid Trigger Function
using Azure.Messaging.EventGrid;
public class OrderEventHandler
{
private readonly IOrderService _orderService;
private readonly IEmailService _emailService;
private readonly ILogger<OrderEventHandler> _logger;
public OrderEventHandler(
IOrderService orderService,
IEmailService emailService,
ILogger<OrderEventHandler> logger)
{
_orderService = orderService;
_emailService = emailService;
_logger = logger;
}
[Function("HandleOrderEvent")]
public async Task Run(
[EventGridTrigger] EventGridEvent eventGridEvent)
{
// Handle validation event (required for webhook subscriptions)
if (eventGridEvent.EventType ==
"Microsoft.EventGrid.SubscriptionValidationEvent")
{
_logger.LogInformation("Handling subscription validation event");
var validationData = eventGridEvent.Data.ToObjectFromJson<SubscriptionValidationResponse>();
return; // Azure handles response automatically
}
// Log the event
_logger.LogInformation(
"Received event: Type={EventType}, Subject={Subject}, ID={Id}",
eventGridEvent.EventType,
eventGridEvent.Subject,
eventGridEvent.Id);
// Route to specific handler based on event type
try
{
switch (eventGridEvent.EventType)
{
case "Order.Created":
await HandleOrderCreatedAsync(eventGridEvent);
break;
case "Order.Cancelled":
await HandleOrderCancelledAsync(eventGridEvent);
break;
case "Order.Shipped":
await HandleOrderShippedAsync(eventGridEvent);
break;
default:
_logger.LogWarning(
"Unhandled event type: {EventType}",
eventGridEvent.EventType);
break;
}
}
catch (Exception ex)
{
_logger.LogError(ex,
"Error handling event {EventId}",
eventGridEvent.Id);
throw; // Re-throw to trigger retry
}
}
private async Task HandleOrderCreatedAsync(EventGridEvent e)
{
var data = e.Data.ToObjectFromJson<OrderCreatedEventData>();
_logger.LogInformation(
"Processing new order {OrderId} for customer {CustomerId}",
data.OrderId, data.CustomerId);
// Process the order
await _orderService.ProcessNewOrderAsync(data.OrderId);
// Send confirmation
await _emailService.SendOrderConfirmationAsync(
data.CustomerId,
data.OrderId);
}
private async Task HandleOrderCancelledAsync(EventGridEvent e)
{
var data = e.Data.ToObjectFromJson<OrderCancelledEventData>();
_logger.LogInformation(
"Processing cancellation for order {OrderId}, reason: {Reason}",
data.OrderId, data.CancellationReason);
// Process cancellation
await _orderService.CancelOrderAsync(data.OrderId);
// Process refund
await _orderService.ProcessRefundAsync(
data.OrderId,
data.RefundAmount);
// Send cancellation email
await _emailService.SendCancellationEmailAsync(
data.CustomerId,
data.OrderId,
data.CancellationReason);
}
private async Task HandleOrderShippedAsync(EventGridEvent e)
{
var data = e.Data.ToObjectFromJson<OrderShippedEventData>();
_logger.LogInformation(
"Order {OrderId} shipped, tracking: {TrackingNumber}",
data.OrderId, data.TrackingNumber);
// Update order status
await _orderService.UpdateShippingInfoAsync(
data.OrderId,
data.TrackingNumber,
data.Carrier);
// Send shipping notification
await _emailService.SendShippingNotificationAsync(
data.CustomerId,
data.OrderId,
data.TrackingNumber);
}
}
Handling Custom Events
public class CustomEventHandler
{
[Function("HandleCustomEvent")]
public async Task Run(
[EventGridTrigger] CloudEventEvent cloudEvent)
{
// Parse the event
var eventType = cloudEvent.Type;
var subject = cloudEvent.Source?.ToString();
var data = cloudEvent.Data;
// Process based on event type
switch (eventType)
{
case "com.myapp.order.created":
await ProcessOrderCreatedAsync(data);
break;
case "com.myapp.order.shipped":
await ProcessOrderShippedAsync(data);
break;
default:
// Unknown event type - log and ignore
break;
}
}
private async Task ProcessOrderCreatedAsync(BinaryData data)
{
var orderData = data.ToObjectFromJson<OrderData>();
// Process the order
}
}
Step 4: Advanced Patterns
Event Filtering and Routing
{
"advanced-filter-subscription": {
"filter": {
"subjectBeginsWith": "orders/",
"includedEventTypes": ["Order.*"],
"advancedFilters": [
{
"key": "data.region",
"operatorType": "StringIn",
"values": ["us-west", "us-east"]
},
{
"key": "data.totalAmount",
"operatorType": "NumberGreaterThan",
"value": 1000
},
{
"key": "data.tags",
"operatorType": "ArrayContains",
"values": ["premium"]
}
]
}
}
}
Dead Letter Handling
public class DeadLetterProcessor
{
private readonly ILogger<DeadLetterProcessor> _logger;
[Function("ProcessDeadLetters")]
public async Task Run(
[BlobTrigger("deadletter/{name}")] string content,
string name)
{
_logger.LogInformation("Processing dead letter: {Name}", name);
try
{
var deadLetterEvent = JsonSerializer.Deserialize<EventGridEvent>(content);
_logger.LogError(
"Dead letter event: Type={Type}, Subject={Subject}, Error={Error}",
deadLetterEvent.EventType,
deadLetterEvent.Subject,
deadLetterEvent.Data);
// Analyze and take action
await AnalyzeAndHandleAsync(deadLetterEvent);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to process dead letter");
throw;
}
}
}
Event Schema Transformation
public class EventTransformation
{
[Function("TransformEvent")]
public async Task<Azure.Messaging.EventGrid.EventGridEvent> Run(
[EventGridTrigger] EventGridEvent inputEvent)
{
// Transform incoming event to internal format
var transformedEvent = new EventGridEvent(
subject: TransformSubject(inputEvent.Subject),
eventType: MapEventType(inputEvent.EventType),
dataVersion: "2.0",
data: TransformData(inputEvent.Data));
return transformedEvent;
}
private string TransformSubject(string subject)
{
// Transform: orders/123 -> order/123
return subject.Replace("orders", "order");
}
private string MapEventType(string eventType)
{
// Map external types to internal
return eventType switch
{
"OrderCreatedEvent" => "Order.Created",
"OrderShippedEvent" => "Order.Shipped",
_ => eventType
};
}
private object TransformData(BinaryData data)
{
// Transform data structure
return data.ToObjectFromJson<TransformedData>();
}
}
Step 5: Best Practices
Security Best Practices
// Validate incoming events
public class EventValidation
{
public bool ValidateEventGridEvent(EventGridEvent e, string topicEndpoint)
{
// Validate event came from expected topic
if (!e.Topic?.Contains("expected-topic") ?? false)
{
return false;
}
// Validate timestamp isn't too old
if (e.EventTime < DateTime.UtcNow.AddMinutes(-5))
{
return false;
}
return true;
}
}
Performance Optimization
{
"performance-tips": {
// 1. Batch processing
"batchSize": 100,
// 2. Use filter to reduce events
"filter": {
"includedEventTypes": ["Order.*"]
},
// 3. Enable managed identity
"identity": {
"type": "SystemAssigned"
},
// 4. Configure TTL appropriately
"eventTimeToLive": "PT1H"
}
}
Monitoring
public class EventGridMonitor
{
public async Task<Diagnostics> GetDiagnosticsAsync(string topicName)
{
// Get topic metrics
var metrics = await client.GetTopicMetricsAsync(topicName);
// Get subscription status
var subscriptions = await client.GetSubscriptionsAsync(topicName);
return new Diagnostics
{
PublishedEvents = metrics.PublishedEventsCount,
DeliveredEvents = metrics.DeliveredEventsCount,
FailedEvents = metrics.DeliveryFailedEventsCount,
Subscriptions = subscriptions.Count
};
}
}
Use Cases and Examples
Real-Time Notifications
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Order │────▶│ Event │────▶│ Email │
│ System │ │ Grid │ │ Service │
└─────────────┘ └─────────────┘ └─────────────┘
│
└───────────┐
▼
┌─────────────┐
│ SMS │
│ Service │
└─────────────┘
Data Synchronization
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Source │────▶│ Event │────▶│ Target │
│ System │ │ Grid │ │ Systems │
└─────────────┘ └─────────────┘ └─────────────┘
│
┌─────────────────┼─────────────────┐
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Azure │ │ Dynamics │ │ SQL │
│ Storage │ │ 365 │ │ Database │
└─────────────┘ └─────────────┘ └─────────────┘
Best Practices Summary
| Practice | Why | Implementation |
|---|---|---|
| Use event filtering | Reduce processing overhead | Filter by event type, subject |
| Configure retry policy | Handle transient failures | 3-5 retries with backoff |
| Use dead letter queue | Capture failed events | Store in blob storage |
| Implement idempotency | Handle duplicate events | Check event ID |
| Validate events | Security | Verify source, timestamp |
Conclusion
Event Grid enables powerful event-driven architectures:
- Loose coupling - Publishers don't know subscribers
- Scalability - Built-in pub/sub scales automatically
- Flexibility - Multiple event types and destinations
- Reliability - Retry policies and dead-lettering
- Integration - Connects Azure services and custom apps
Key takeaways:
- Use custom topics for application events
- Filter events to reduce processing
- Implement retry with dead-letter handling
- Handle webhook validation events
- Monitor event delivery and failures
Azure Integration Hub - Event Grid