Event Grid — Ensuring Reliable Event Delivery and Processing
The Problem
Your event-driven system loses events:
- Handler fails and event is never retried
- Network issues cause missing events
- Duplicate events create data inconsistencies
- Events arrive out of order
- No visibility into failed deliveries
You need a reliable, observable event processing system.
Understanding Delivery Reliability
┌─────────────────────────────────────────────────────────────────┐
│ Event Delivery Flow │
└─────────────────────────────────────────────────────────────────┘
Publisher Event Grid Handler Backend
│ │ │ │
├─── Publish ──────▶│ │ │
│ ├── Validate ──────▶│ │
│ │ ├── Process ────▶│
│ │ │ (success) │
│ │◀─── Complete ─────│ │
│ │ │ │
│ │ │ (fail) │
│ │◀─── Abandon ──────│ │
│ │ │ │
│ ├── Retry (1) ─────▶│ │
│ │ ├── Process ────▶│
│ │◀─── Complete ─────│ │
│ │ │ │
│ │ (all failed) │ │
│ ├── Dead Letter ───▶│ (stored) │
│ │ │ │
▼ ▼ ▼ ▼
Success Delivered Processed Completed
Solution Implementation
Step 1: Configure Reliable Subscriptions
# Create subscription with retry and dead letter
az eventgrid event-subscription create \
--name reliable-orders-sub \
--resource-group event-grid-rg \
--topic-name order-events \
--endpoint "https://yourfunction.azurewebsites.net/api/handle" \
--dead-letter-endpoint "/subscriptions/sub-id/resourceGroups/rg/providers/Microsoft.Storage/storageAccounts/mystorage/blobServices/default/containers/deadletter" \
--retry-policy '{
"maxDeliveryAttempts": 5,
"eventTimeToLive": "PT1H",
"maxRetryInterval": "PT30M"
}' \
--delivery-attributes-header "all"
Step 2: Idempotent Event Handler
import { AzureFunction, Context } from "@azure/functions";
const eventGridTrigger: AzureFunction = async function (context: Context, eventGridEvent: any): Promise<void> {
// Extract event identity
const eventId = eventGridEvent.id;
const eventType = eventGridEvent.eventType;
const timestamp = eventGridEvent.eventTime;
context.log(`Processing event ${eventId}, type: ${eventType}`);
// Idempotency check - use event ID to detect duplicates
const isProcessed = await checkIfAlreadyProcessed(eventId);
if (isProcessed) {
context.log(`Event ${eventId} already processed, skipping`);
context.res = { status: 200, body: "Already processed" };
return;
}
try {
// Process the event
const result = await processEvent(eventGridEvent.data);
// Record success - mark event as processed
await recordProcessing(eventId, eventType, timestamp, "success");
context.res = { status: 200, body: "Event processed" };
}
catch (error) {
// Record failure for debugging
await recordProcessing(eventId, eventType, timestamp, "failed", error.message);
throw error; // Re-throw to trigger retry
}
};
// Helper to check processing status
async function checkIfAlreadyProcessed(eventId: string): Promise<boolean> {
// Check in database or cache
const result = await fetch(`https://your-db.table.core.windows.net/events?$filter=EventId eq '${eventId}'`,
{
headers: { "Authorization": `Bearer ${process.env["STORAGE_TOKEN"]}` }
});
const data = await result.json();
return data.value?.length > 0;
}
// Helper to record processing
async function recordProcessing(eventId: string, eventType: string, timestamp: string, status: string, error?: string): Promise<void> {
await fetch("https://your-db.table.core.windows.net/events",
{
method: "POST",
headers: {
"Authorization": `Bearer ${process.env["STORAGE_TOKEN"]}`,
"Content-Type": "application/json"
},
body: JSON.stringify({
PartitionKey: eventType,
RowKey: eventId,
EventId: eventId,
EventType: eventType,
Timestamp: timestamp,
Status: status,
Error: error,
ProcessedAt: new Date().toISOString()
})
});
}
export default eventGridTrigger;
Step 3: Event Ordering with Sequence
// Handle potentially out-of-order events
import { AzureFunction, Context } from "@azure/functions";
interface OrderedEvent {
eventType: string;
data: {
sequenceNumber: number;
partitionKey: string;
[key: string]: any;
};
}
class EventSequenceProcessor {
private processingSequences: Map<string, number> = new Map();
async processOrderedEvent(context: Context, event: OrderedEvent): Promise<void> {
const partitionKey = event.data.partitionKey;
const sequenceNumber = event.data.sequenceNumber;
// Get last processed sequence for this partition
const lastSequence = this.processingSequences.get(partitionKey) || 0;
// Check if we have a gap (missing events)
if (sequenceNumber > lastSequence + 1) {
context.log.warn(`Gap detected: expected ${lastSequence + 1}, got ${sequenceNumber}`);
// Wait briefly for missing event, or queue for later
await this.handleSequenceGap(partitionKey, sequenceNumber);
}
// Process the event
await this.processEvent(event);
// Update sequence
this.processingSequences.set(partitionKey, sequenceNumber);
}
private async handleSequenceGap(partitionKey: string, expectedSequence: number): Promise<void> {
// Option 1: Wait briefly for missing event
await new Promise(resolve => setTimeout(resolve, 5000));
// Option 2: Log gap and continue (forgiving mode)
// Option 3: Trigger replay from source
}
private async processEvent(event: OrderedEvent): Promise<void> {
// Your event processing logic
}
}
const processor = new EventSequenceProcessor();
const eventGridTrigger: AzureFunction = async function (context: Context, eventGridEvent: any): Promise<void> {
await processor.processOrderedEvent(context, eventGridEvent);
};
export default eventGridTrigger;
Step 4: Dead Letter Processing
// Process dead letter events to recover or investigate
import { AzureFunction, Context } from "@azure/functions";
const dlqTrigger: AzureFunction = async function (context: Context, eventGridEvent: any): Promise<void> {
context.log("Processing dead letter event");
const eventType = eventGridEvent.eventType;
const eventData = eventGridEvent.data;
// Analyze why event ended up in DLQ
const failureReason = eventData?.errorMessage || eventData?.deliveryInfo?.failureReason;
switch (true) {
case failureReason?.includes("timeout"):
await handleTimeout(eventData);
break;
case failureReason?.includes("404"):
await handleNotFound(eventData);
break;
case failureReason?.includes("500"):
await handleServerError(eventData);
break;
default:
await handleUnknownError(eventData);
}
};
async function handleTimeout(eventData: any): Promise<void> {
// Retry with longer timeout or different handler
await rePublishEvent(eventData, {
retryTimeout: "PT2M",
handlerUrl: "https://long-running-handler.azurewebsites.net/api/process"
});
}
async function handleNotFound(eventData: any): Promise<void> {
// Fix data or skip
const fixedData = fixMissingData(eventData);
await rePublishEvent(fixedData);
}
async function handleServerError(eventData: any): Promise<void> {
// Wait and retry (transient error)
await new Promise(resolve => setTimeout(resolve, 30000));
await rePublishEvent(eventData);
}
async function handleUnknownError(eventData: any): Promise<void> {
// Archive and alert
await archiveForInvestigation(eventData);
await sendAlertNotification(eventData);
}
async function rePublishEvent(eventData: any, options?: any): Promise<void> {
// Re-publish to original topic or another handler
const { EventGridPublisherClient, AzureKeyCredential } = await import("@azure/eventgrid");
const client = new EventGridPublisherClient(
new URL(process.env["EVENT_GRID_ENDPOINT"]!),
new AzureKeyCredential(process.env["EVENT_GRID_KEY"]!)
);
await client.send([{
id: crypto.randomUUID(),
type: eventData.eventType + ".retry",
source: eventData.source,
data: { ...eventData, retryCount: (eventData.retryCount || 0) + 1 },
dataVersion: "1.0"
}]);
}
async function archiveForInvestigation(eventData: any): Promise<void> {
// Store in blob for later analysis
const { BlobServiceClient } = await import("@azure/storage-blob");
const blobService = new BlobServiceClient(process.env["STORAGE_CONNECTION"]!);
const container = blobService.getContainerClient("dead-letter-archive");
const blobName = `${new Date().toISOString().split("T")[0]}/${eventData.id}.json`;
const blockBlob = container.getBlockBlobClient(blobName);
await blockBlob.upload(JSON.stringify(eventData), { overwrite: true });
}
export default dlqTrigger;
Step 5: Dual Publishing for Reliability
// Publish to both primary and backup
import { EventGridPublisherClient, AzureKeyCredential } from "@azure/eventgrid";
class ReliableEventPublisher {
private primaryClient: EventGridPublisherClient;
private backupClient: EventGridPublisherClient;
constructor() {
this.primaryClient = new EventGridPublisherClient(
new URL(process.env["PRIMARY_TOPIC_ENDPOINT"]!),
new AzureKeyCredential(process.env["PRIMARY_TOPIC_KEY"]!)
);
this.backupClient = new EventGridPublisherClient(
new URL(process.env["BACKUP_TOPIC_ENDPOINT"]!),
new AzureKeyCredential(process.env["BACKUP_TOPIC_KEY"]!)
);
}
async publishOrderCreated(order: Order): Promise<void> {
const event = {
id: crypto.randomUUID(),
type: "com.example.order.created",
source: "https://orders.example.com",
data: order,
dataVersion: "1.0",
time: new Date().toISOString(),
subject: `orders/${order.id}`
};
// Try primary first
try {
await this.primaryClient.send([event]);
context.log("Published to primary topic");
} catch (primaryError) {
context.log.error(`Primary publish failed: ${primaryError}`);
// Fallback to backup
try {
await this.backupClient.send([event]);
context.log("Published to backup topic");
} catch (backupError) {
context.log.error(`Both primary and backup failed: ${backupError}`);
throw backupError;
}
}
}
}
Step 6: Monitoring and Alerting
// KQL to monitor Event Grid reliability
// Failed deliveries
AzureDiagnostics
| where ResourceType == "EVENTGRIDTOPICS"
| where OperationName == "DroppedEvents" or OperationName == "DeliveryFailed"
| project TimeGenerated, Resource, OperationName, EventId, ErrorMessage
// Delivery latency
AzureDiagnostics
| where ResourceType == "EVENTGRIDTOPICS"
| where OperationName == "DeliverySuccess"
| summarize avgLatency = avg(DurationMs) by bin(TimeGenerated, 5min)
| render timechart
// Dead letter accumulation
AzureDiagnostics
| where ResourceType == "EVENTGRIDSUBSCRIPTION"
| where OperationName == "DeadLetteredEvents"
| summarize dlqCount = count() by bin(TimeGenerated, 1h)
| render timechart
// Events by type
AzureDiagnostics
| where ResourceType == "EVENTGRIDTOPICS"
| summarize eventCount = count() by EventType, bin(TimeGenerated, 1h)
Best Practices Summary
| Practice | Purpose |
|---|---|
| Enable dead letter queue | Preserve failed events |
| Set appropriate TTL | Balance retry window vs. stale events |
| Implement idempotency | Handle duplicates safely |
| Monitor delivery success | Alert on failures |
| Implement sequence handling | Handle ordering |
| Use dual publishing | High availability |
Summary
- Configure subscriptions with retry policies and dead letter queues
- Implement idempotent processing using event IDs
- Handle out-of-order events with sequence numbers
- Process and recover from dead letter queue
- Monitor delivery success rates and set up alerts